diff options
| author | zhaokun <[email protected]> | 2024-11-27 19:18:05 +0800 |
|---|---|---|
| committer | zhaokun <[email protected]> | 2024-11-27 19:18:05 +0800 |
| commit | f719e3f8edbe5beae3b9c3c58fd23525f54353fa (patch) | |
| tree | cd6a8a48154955a93fd5352117f1e735637f8764 | |
| parent | 0e85a20e9bdc467f9c0cd320140251340ec2fa4f (diff) | |
Modify python files related to rules
| -rw-r--r-- | support/ui_utils/policies/create_rules.py | 482 | ||||
| -rw-r--r-- | support/ui_utils/policies/edit_rules.py | 24 | ||||
| -rw-r--r-- | support/ui_utils/policies/search_rules.py | 49 | ||||
| -rw-r--r-- | support/ui_utils/ui_client.py | 4 |
4 files changed, 221 insertions, 338 deletions
diff --git a/support/ui_utils/policies/create_rules.py b/support/ui_utils/policies/create_rules.py index a73fe1863..ca8a72f6d 100644 --- a/support/ui_utils/policies/create_rules.py +++ b/support/ui_utils/policies/create_rules.py @@ -232,7 +232,7 @@ class CreateRules: if attribute_flag: self.add_ip_protocol(object_configuration["items"][0], "IpProtocol", creation_element_position, negate_option) - # 添加protocol field + # 添加protocol field ?? protocol_filed_attribute_flag = self.is_protocol_filed_exsit(policy_configuration) if attribute_flag: protocol_field_datas = policy_configuration["condition"]["protocol_filed"] @@ -384,9 +384,9 @@ class CreateRules: # 选择target traffic if policy_configuration["type"] == "service_chaining": if policy_configuration["action_parameter"]["targeted_traffic"] == "decrypted": - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_decryptedTraffic_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_decryptedTraffic_posXpath"]).click() else: - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_rawTraffic_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_rawTraffic_posXpath"]).click() # 添加sub action if "sub_action" in policy_configuration["action_parameter"]: @@ -537,305 +537,184 @@ class CreateRules: self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_fairFactor_posId"]).send_keys(policy_configuration["action_parameter"]["fair_factor"]) # 添加statistics、shaping、sc、manipulation的profile - if "action_parameter" in policy_configuration and len(policy_configuration["action_parameter"]) != 0: - if policy_configuration["type"] == "traffic_shaping": - # 点profile的+ - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addProfile_posXpath"]).click() - # 通过profile name搜索并选中 - profile_data = policy_configuration["action_parameter"] - for y in range(len(profile_data)): - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=1).send_keys(profile_data[y]["name"]) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectProfile_posXpath"], find_after_wait_time=0.5).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeProfile_posXpath"]).click() - if policy_configuration["multiProfile"] == True and y != len(profile_data) - 1: - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addProfile_plus_posXpath"]).click() - elif policy_configuration["type"] == "proxy_intercept": - profile_data = policy_configuration["action_parameter"] - if "ssl_decryption_keyring" in profile_data.keys(): - ssl_decryption_keyrings = profile_data["ssl_decryption_keyring"] - for i in range(len(ssl_decryption_keyrings)): - # trusted - if ssl_decryption_keyrings[i]["decryption_keying_type"] == "trusted": - trusted_profile_name = ssl_decryption_keyrings[i]["profile_file"]["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTrustedSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedInput_posXpath"], find_after_wait_time=0.6).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedInput_posXpath"]).send_keys(trusted_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedInput_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedDorpItem_posXpath"], find_after_wait_time=0.6).click() - # untrusted - elif ssl_decryption_keyrings[i]["decryption_keying_type"] == "untrusted": - untrusted_profile_name = ssl_decryption_keyrings[i]["profile_file"]["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addUntrustedSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedInput_posXpath"], find_after_wait_time=0.6).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedInput_posXpath"]).send_keys(untrusted_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedInput_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedDorpItem_posXpath"], find_after_wait_time=0.6).click() - # mirror decrypted profile - if "traffic_mirroring_profile" in profile_data.keys(): - traffic_mirroring_profiles = profile_data["traffic_mirroring_profile"] - for i in range(len(traffic_mirroring_profiles)): - value_id = traffic_mirroring_profiles[i]["vlan_id"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTrafficMirrorProfileButton_posXpath"]).click() - if value_id == "default": - continue - else: - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addVlanIDButton_posXpath"]).click() - traffic_mirroring_profile_name = traffic_mirroring_profiles[i]["profile_file"]["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTrafficMirrorProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searcTrafficMirrorProfileInput_posXpath"], find_after_wait_time=0.6).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searcTrafficMirrorProfileInput_posXpath"]).send_keys(traffic_mirroring_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searcTrafficMirrorProfileInput_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrafficMirrorProfileDorpItem_posXpath"], find_after_wait_time=0.6).click() - # decryption profile - if "ssl_decryption_profile" in profile_data.keys(): - ssl_decryption_profiles = profile_data["ssl_decryption_profile"] - for i in range(len(ssl_decryption_profiles)): - ssl_decryption_profile_name = ssl_decryption_profiles[i]["profile_file"]["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addDecryptionProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileInput_posXpath"], find_after_wait_time=0.6).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileInput_posXpath"]).send_keys(ssl_decryption_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileInput_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileDorpItem_posXpath"], find_after_wait_time=0.6).click() - # tcp proxy profile - if "tcp_proxy_profile" in profile_data.keys(): - tcp_proxy_profiles = profile_data["tcp_proxy_profile"] - for i in range(len(tcp_proxy_profiles)): - tcp_proxy_profile_name = tcp_proxy_profiles[i]["profile_file"]["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTcpProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileInput_posXpath"], find_after_wait_time=0.6).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileInput_posXpath"]).send_keys(tcp_proxy_profile_name) + if policy_configuration["type"] == "traffic_shaping": + # 点profile的+ + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addProfile_posXpath"]).click() + # 通过profile name搜索并选中 + profile_chain = policy_configuration["action_parameter"]["profile_chain"] + for y in range(len(profile_chain)): + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=1).send_keys(profile_chain[y]["name"]) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectProfile_posXpath"], find_after_wait_time=0.5).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeProfile_posXpath"]).click() + # if policy_configuration["multiProfile"] == True and y != len(profile_chain) - 1: + # self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addProfile_plus_posXpath"]).click() + elif policy_configuration["type"] == "proxy_intercept": + profile_data = policy_configuration["action_parameter"] + if "keyring_for_trusted" in profile_data.keys(): + keyring_for_trusted_name = profile_data["keyring_for_trusted"]["name"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTrustedSelectBox_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedInput_posXpath"], find_after_wait_time=0.6).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedInput_posXpath"]).send_keys(keyring_for_trusted_name) + # 点击回车键 + search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedInput_posXpath"]) + search_box.send_keys(Keys.RETURN) + # 选择第一个 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrustedDorpItem_posXpath"], find_after_wait_time=0.6).click() + if "keyring_for_untrusted" in profile_data.keys(): + keyring_for_untrusted_name = profile_data["keyring_for_untrusted"]["name"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addUntrustedSelectBox_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedInput_posXpath"], find_after_wait_time=0.6).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedInput_posXpath"]).send_keys(keyring_for_untrusted_name) + # 点击回车键 + search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedInput_posXpath"]) + search_box.send_keys(Keys.RETURN) + # 选择第一个 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchUntrustedDorpItem_posXpath"], find_after_wait_time=0.6).click() + # mirror decrypted profile + if "traffic_mirroring" in profile_data.keys(): + traffic_mirroring_name = profile_data["traffic_mirroring"]["name"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addVlanIDButton_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTrafficMirrorProfileSelectBox_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searcTrafficMirrorProfileInput_posXpath"], find_after_wait_time=0.6).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searcTrafficMirrorProfileInput_posXpath"]).send_keys(traffic_mirroring_name) + # 点击回车键 + search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searcTrafficMirrorProfileInput_posXpath"]) + search_box.send_keys(Keys.RETURN) + # 选择第一个 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTrafficMirrorProfileDorpItem_posXpath"], find_after_wait_time=0.6).click() + # decryption profile + if "decryption_profile" in profile_data.keys(): + decryption_profile_name = profile_data["decryption_profile"]["name"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addDecryptionProfileSelectBox_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileInput_posXpath"], find_after_wait_time=0.6).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileInput_posXpath"]).send_keys(decryption_profile_name) + # 点击回车键 + search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileInput_posXpath"]) + search_box.send_keys(Keys.RETURN) + # 选择第一个 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchDecryptionProfileDorpItem_posXpath"], find_after_wait_time=0.6).click() + # tcp proxy profile + if "tcp_option_profile" in profile_data.keys(): + tcp_option_profile_name = profile_data["tcp_option_profile"]["name"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_addTcpProfileSelectBox_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileInput_posXpath"], find_after_wait_time=0.6).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileInput_posXpath"]).send_keys(tcp_option_profile_name) + # 点击回车键 + search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileInput_posXpath"]) + search_box.send_keys(Keys.RETURN) + # 选择第一个 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileDorpItem_posXpath"], find_after_wait_time=0.6).click() + elif policy_configuration["type"] == "proxy_manipulation": + profile_data = policy_configuration["action_parameter"] + # 页面滚动到最底部 + div_container = self.driver.find_element(By.XPATH, "//div[@id='root']//div[contains(@class, 'manipulation-policy-page')]") + self.driver.execute_script("arguments[0].scrollTop = arguments[0].scrollHeight;", div_container) # 移动到页面最底部 + if policy_configuration["action"] == "deny": + if "manipulation_block" in policy_configuration["action_parameter"].keys(): + response_code = policy_configuration["action_parameter"]["code"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseCodeSelectBox_posXpath"]).click() + response_code_dropdown_position = creation_element_position["policyRulePage_deny_addResponseCodeSelectDropItem_posXpath"].format(replace_code=response_code) + self.driver.find_element(By.XPATH, response_code_dropdown_position).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentSelectBox_posXpath"]).click() + if "message" in policy_configuration["action_parameter"].keys(): + type = "TEXT" + else: + type = "Profile" + response_content_type_position = creation_element_position["policyRulePage_deny_addResponseContentSelectDropItem_posXpath"].format(replace_type=type) + self.driver.find_element(By.XPATH, response_content_type_position).click() + if type == "TEXT": + response_context_text = policy_configuration["action_parameter"]["message"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentTextInput_posXpath"]).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentTextInput_posXpath"]).send_keys(response_context_text) + else: + response_profile_name = policy_configuration["action_parameter"]["html_profile"] + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentProfileSelectBox_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfile_posXpath"]).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfile_posXpath"]).send_keys(response_profile_name) # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileInput_posXpath"]) + search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfile_posXpath"]) search_box.send_keys(Keys.RETURN) # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_intercept_searchTcpProfileDorpItem_posXpath"], find_after_wait_time=0.6).click() - elif policy_configuration["type"] == "proxy_manipulation": - profile_data = policy_configuration["action_parameter"] - # 页面滚动到最底部 - div_container = self.driver.find_element(By.XPATH, "//div[@id='root']//div[contains(@class, 'manipulation-policy-page')]") - self.driver.execute_script("arguments[0].scrollTop = arguments[0].scrollHeight;", div_container) # 移动到页面最底部 - if policy_configuration["action"] == "deny": - # 选择 response code - if "deny_response_block" in policy_configuration["action_parameter"].keys(): # action为 close connection - pass - else: - if "deny_response_text" in policy_configuration["action_parameter"].keys(): - response_type = "deny_response_text" # 这个变量索引key使用 - else: - response_type = "response_page" - response_code = policy_configuration["action_parameter"][response_type][0]["response_code"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseCodeSelectBox_posXpath"]).click() - response_code_drop_position = creation_element_position["policyRulePage_deny_addResponseCodeSelectDropItem_posXpath"].format(replace_code=response_code) - self.driver.find_element(By.XPATH, response_code_drop_position).click() - # 选择 response content type - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentSelectBox_posXpath"]).click() - response_content_type = policy_configuration["action_parameter"][response_type][0]["response_content_type"].strip() - response_content_type_position = creation_element_position["policyRulePage_deny_addResponseContentSelectDropItem_posXpath"].format(replace_type=response_content_type) - self.driver.find_element(By.XPATH, response_content_type_position).click() - if response_content_type == "TEXT": - # 输入response content text - response_context_text = policy_configuration["action_parameter"][response_type][0]["response_content"].strip() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentTextInput_posXpath"]).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentTextInput_posXpath"]).send_keys(response_context_text) - else: # Profile - response_profile_data = policy_configuration["action_parameter"][response_type][0]["profile_file"] - response_profile_name = response_profile_data["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_addResponseContentProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfile_posXpath"]).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfile_posXpath"]).send_keys(response_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfile_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfileDorpItem_posXpath"]).click() - elif policy_configuration["action"] == "redirect": - # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], policy_configuration["condition"]["application"][0]["name"].lower().strip()) - if policy_configuration["condition"]["application"][0]["name"].lower().strip() == "http": - response_code = policy_configuration["action_parameter"]["redirect_url"][0]["response_code"] - response_code_drop_position = creation_element_position["policyRulePage_redirectHttp_addResponseCodeSelectDropItem_posXpath"].format(replace_code=response_code) - redirect_url = policy_configuration["action_parameter"]["redirect_url"][0]["redirect_url"] - # 操作 action parameter - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectHttp_addResponseCodeSelectBox_posXpath"]).click() - time.sleep(0.3) - btn = self.driver.find_element(By.XPATH, response_code_drop_position) - self.driver.execute_script("arguments[0].click()", btn) # 强制点击 - time.sleep(0.3) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectHttp_redirectHttp_addRedirectUrlInput_posXpath"]).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectHttp_redirectHttp_addRedirectUrlInput_posXpath"]).send_keys(redirect_url) - else: - # 遍历,是否含有多个value - doh_parameter_list = policy_configuration["action_parameter"]["doh"] - for i in range(len(doh_parameter_list)): - q_type = doh_parameter_list[i]["q_type"] - a_type = doh_parameter_list[i]["a_type"] - doh_value = doh_parameter_list[i]["doh_value"] - doh_ttl = doh_parameter_list[i]["doh_ttl"] - replace_index = i + 2 - if i != 0: # 非第一次遍历点击 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_add_posXpath"]).click() - if i == 0: # 只有第一次遍历点击 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_qtypeRadio_posXpath"].format(replace_qtype=q_type)).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_atypeSelectBox_posXpath"].format(replace_index=replace_index)).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_atypeSelectDropItem_posXpath"].format(replace_atype=a_type)).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_valueInput_posXpath"].format(replace_index=replace_index)).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_valueInput_posXpath"].format(replace_index=replace_index), find_after_wait_time=0.5).send_keys(doh_value) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_ttlInput_posXpath"].format(replace_index=replace_index)).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_ttlInput_posXpath"].format(replace_index=replace_index), find_after_wait_time=0.5).send_keys(doh_ttl) - elif policy_configuration["action"] == "replace": - # 遍历,是否含有多个search in - replace_parameter_list = policy_configuration["action_parameter"]["replace_parameter"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_subActionButton_replaceText_posXpath"]).click() - for i in range(len(replace_parameter_list)): - # 从第二次遍历开始点击 + 加号 - if i != 0: - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_add_posXpath"]).click() - regex_status = replace_parameter_list[i]["regex"].lower() - search_in = replace_parameter_list[i]["search_in"] - find_text = replace_parameter_list[i]["find"] - replace_with_text = replace_parameter_list[i]["replace_with"] - # 操作action parameter, 获取regex当前属性 is-checked 表示开 - regex_class_status = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addRegex_posXpath"].format(replace_sort=i+1)).get_attribute("class") - #print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], regex_class_status) - if regex_status == "on": # 开 - if "Mui-checked" in regex_class_status: # 当前状态开,不用再点击 - pass - else: # 当前状态关,需要再点击打开 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addRegex_posXpath"].format(replace_sort=i+1)).click() - else: # off 关 - if "Mui-checked" in regex_class_status: # 当前状态开,需要点击关闭 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addRegex_posXpath"].format(replace_sort=i+1)).click() - else: # # 当前状态关,不用再点击 - pass - # 操作search in - search_in_drop_position = creation_element_position["policyRulePage_replace_addSearchInSelectDropItem_posXpath"].format(replace_sort=i+1, replace_type=search_in) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addSearchInSelectBox_posXpath"].format(replace_sort=i+1)).click() - self.driver.find_element(By.XPATH, search_in_drop_position, find_after_wait_time=0.5).click() - # 操作 find replace with 输入框 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addFindInput_posXpath"].format(replace_sort=i+1)).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addFindInput_posXpath"].format(replace_sort=i+1), find_after_wait_time=0.5).send_keys(find_text) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addReplaceWithInput_posXpath"].format(replace_sort=i+1)).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_replace_addReplaceWithInput_posXpath"].format(replace_sort=i+1), find_after_wait_time=0.5).send_keys(replace_with_text) - elif policy_configuration["action"] == "hijack": - hijack_profile_data = policy_configuration["action_parameter"]["hijack_file"][0]["profile_file"] - hijack_profile_name = hijack_profile_data["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_hijack_addHijackProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_hijack_searchHijackProfile_posXpath"]).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_hijack_searchHijackProfile_posXpath"]).send_keys(hijack_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_hijack_searchHijackProfile_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_hijack_searchHijackProfileDorpItem_posXpath"]).click() - elif policy_configuration["action"] == "insert": - hijack_profile_data = policy_configuration["action_parameter"]["insert_script"][0]["profile_file"] - hijack_profile_name = hijack_profile_data["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_insert_addInsertProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_insert_searchInsertProfile_posXpath"]).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_insert_searchInsertProfile_posXpath"]).send_keys(hijack_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_insert_searchInsertProfile_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_insert_searchInsertProfileDorpItem_posXpath"]).click() - elif policy_configuration["action"] == "edit_element": - # 遍历多个编辑内容 - edit_element_parameter_list = policy_configuration["action_parameter"]["edit_element_parameter"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_subActionButton_editElement_posXpath"]).click() - for i in range(len(edit_element_parameter_list)): - # 从第二次遍历开始点击 + 加号 - replace_sort = i + 1 - if i != 0: - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_add_posXpath"]).click() - search_scope = edit_element_parameter_list[i]["search_scope"].lower().strip() - start_indicator = edit_element_parameter_list[i]["start_indicator"].lower() - contained_keyword = edit_element_parameter_list[i]["contained_keyword"].lower() - distance_from_anchor = edit_element_parameter_list[i]["distance_from_anchor"].lower() - treatment = edit_element_parameter_list[i]["treatment"].lower() - if search_scope == "whole_file": # search scope 操作 - # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], creation_element_position["policyRulePage_editElement_addSearchScopeWholeFileRadio_posXpath"]) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addSearchScopeWholeFileRadio_posXpath"].format(replace_sort=replace_sort)).click() - else: # inside_element - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addSearchScopeInsideElementRadio_posXpath"].format(replace_sort=replace_sort)).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addStartIndicatorInput_posXpath"].format(replace_sort=replace_sort), find_after_wait_time=0.5).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addStartIndicatorInput_posXpath"].format(replace_sort=replace_sort), find_after_wait_time=0.5).send_keys(start_indicator) - # 输入 contained_keyword - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addContainedKeywordInput_posXpath"].format(replace_sort=replace_sort)).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addContainedKeywordInput_posXpath"].format(replace_sort=replace_sort), find_after_wait_time=0.5).send_keys(contained_keyword) - # 输入 distance_from_anchor - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addSDistanceFromAnchorInput_posXpath"].format(replace_sort=replace_sort)).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addSDistanceFromAnchorInput_posXpath"].format(replace_sort=replace_sort), find_after_wait_time=0.5).send_keys(distance_from_anchor) - if treatment == "mark": - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addMarkRadio_posXpath"].format(replace_sort=replace_sort)).click() - else: # inside_element - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_editElement_addRemoveRadio_posXpath"].format(replace_sort=replace_sort)).click() - elif policy_configuration["action"] == "run_script": - run_scirpt_profile_data = policy_configuration["action_parameter"]["run_script"][0]["profile_file"] - run_scirpt_profile_name = run_scirpt_profile_data["name"] - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_runScript_addrunScriptProfileSelectBox_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_runScript_searchrunScriptProfile_posXpath"]).clear() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_runScript_searchrunScriptProfile_posXpath"]).send_keys(run_scirpt_profile_name) - # 点击回车键 - search_box = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_runScript_searchrunScriptProfile_posXpath"]) - search_box.send_keys(Keys.RETURN) - # 选择第一个 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_runScript_searchrunScriptProfileDorpItem_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_runScript_closerunScriptProfile_posXpath"]).click() - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "todo manipulation action parameters..") - elif policy_configuration["type"] == "statistics": - # 点击Statistics Template添加按钮打开Template侧滑列表 - self.driver.find_element(By.XPATH, creation_element_position["plicyRulePage_addStatisticsTemplate_posXpath"]).click() - # 通过Statistics Template name搜索并选中 - profile_data = policy_configuration["action_parameter"]["statistics_template"] - for template in range(len(profile_data)): - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=1).send_keys(profile_data[template]["name"]) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectProfile_posXpath"].format(replaceValue=profile_data[template]["name"]), find_after_wait_time=0.5).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeProfile_posXpath"]).click() - elif policy_configuration["type"] == "service_chaining": - sff_data = policy_configuration["profile"] - # 点击SFF Profile添加按钮 - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addSFFProfile_posXpath"]).click() - time.sleep(1) - #在侧滑框中搜索sff并选中 - for sff in sff_data: - btn = self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_searchSFFProfile_posXpath"], find_after_wait_time=1) - self.driver.execute_script("arguments[0].click()", btn) - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_searchSFFProfile_posXpath"]).send_keys(sff["name"]) - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_searchSFFProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) - selectSFFXpath = creation_element_position["policyRulePage_selectSFFProfile_posXpath"].format(replaceValue=sff["name"]) - self.driver.find_element(By.XPATH,selectSFFXpath).click() - # self.driver.execute_script("arguments[0].click()", btn) - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_closeSFFProfile_posXpath"]).click() - elif policy_configuration["type"] == "monitor": - #步骤:开启vlan_button->选择select->输入name->搜索 - if policy_configuration["is_mirror"] == "on": - self.driver.find_element(By.XPATH, creation_element_position[ - "policyRulePage_mirrorTrafficButton_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_deny_searchResponseProfileDorpItem_posXpath"]).click() + elif policy_configuration["action"] == "redirect": + if application_configuration["items"][0] == "http": + response_code = policy_configuration["action_parameter"]["code"] + response_code_dropdown_position = creation_element_position["policyRulePage_redirectHttp_addResponseCodeSelectDropItem_posXpath"].format(replace_code=response_code) + redirect_url = policy_configuration["action_parameter"]["to"] + # 操作 action parameter + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectHttp_addResponseCodeSelectBox_posXpath"]).click() + time.sleep(0.3) + btn = self.driver.find_element(By.XPATH, response_code_dropdown_position) + self.driver.execute_script("arguments[0].click()", btn) # 强制点击 + time.sleep(0.3) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectHttp_redirectHttp_addRedirectUrlInput_posXpath"]).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectHttp_redirectHttp_addRedirectUrlInput_posXpath"]).send_keys(redirect_url) else: - pass - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addVLANIDButton_posXpath"]).click() - for flag in range(len(policy_configuration["profile"])): - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_firstSelectMirrorProfile_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchMirrorProfile_posXpath"], find_after_wait_time=1).send_keys(policy_configuration["profile"][flag]["name"]) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchMirrorProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) - self.driver.find_element(By.XPATH,'//ul[@class="MuiList-root MuiList-vertical MuiList-variantPlain MuiList-colorNeutral MuiList-sizeMd css-1cklc3"]//li[1]//div[@class="MuiListItemButton-root MuiListItemButton-colorNeutral MuiListItemButton-variantPlain css-18mv95p"]').click() - self.driver.find_element(By.XPATH,'//div[@class="absolute bottom-0 h-[40px] w-[100%] overflow-hidden text-[16px] truncate bg-[--color-background-secondary] flex justify-center items-center pl-[12px] pr-[38px]"]/button').click() - # selectProfileXpath = creation_element_position["policyRulePage_selectMirrorProfile_posXpath"].format(replaceValue=policy_configuration["profile"][flag]["name"]) - # self.driver.find_element(By.XPATH,selectProfileXpath, find_after_wait_time=0.5).click() + # 遍历,是否含有多个value + doh_action_parameter_list = policy_configuration["action_parameter"] + for i in range(len(doh_action_parameter_list)): + q_type = doh_action_parameter_list[i]["q_type"] + a_type = doh_action_parameter_list[i]["a_type"] + doh_value = doh_action_parameter_list[i]["doh_value"] + doh_ttl = doh_action_parameter_list[i]["doh_ttl"] + replace_index = i + 2 + if i != 0: # 非第一次遍历点击 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_add_posXpath"]).click() + if i == 0: # 只有第一次遍历点击 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_qtypeRadio_posXpath"].format(replace_qtype=q_type)).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_atypeSelectBox_posXpath"].format(replace_index=replace_index)).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_atypeSelectDropItem_posXpath"].format(replace_atype=a_type)).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_valueInput_posXpath"].format(replace_index=replace_index)).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_valueInput_posXpath"].format(replace_index=replace_index), find_after_wait_time=0.5).send_keys(doh_value) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_ttlInput_posXpath"].format(replace_index=replace_index)).clear() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_redirectDoH_ttlInput_posXpath"].format(replace_index=replace_index), find_after_wait_time=0.5).send_keys(doh_ttl) + elif policy_configuration["action"] == "modify": + # ?? + if policy_configuration["action_parameter"]["sub_action"] == "replace_file": + print("todo") + elif policy_configuration["action_parameter"]["sub_action"] == "inject_javascript": + print("todo") + elif policy_configuration["type"] == "statistics": + # 点击Statistics Template添加按钮打开Template侧滑列表 + self.driver.find_element(By.XPATH, creation_element_position["plicyRulePage_addStatisticsTemplate_posXpath"]).click() + # 通过Statistics Template name搜索并选中 + template_profile = policy_configuration["action_parameter"]["template_profile"] + if len(template_profile) > 0: + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=1).send_keys(template_profile["name"]) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectProfile_posXpath"].format(replaceValue=template_profile["name"]), find_after_wait_time=0.5).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeProfile_posXpath"]).click() + elif policy_configuration["type"] == "service_chaining": + sff_profiles = policy_configuration["action_parameter"]["sff_profiles"] + # 点击SFF Profile添加按钮 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addSFFProfile_posXpath"]).click() + time.sleep(1) + #在侧滑框中搜索sff并选中 + for sff in sff_profiles: + btn = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchSFFProfile_posXpath"], find_after_wait_time=1) + self.driver.execute_script("arguments[0].click()", btn) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchSFFProfile_posXpath"]).send_keys(sff["name"]) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchSFFProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) + select_sff_xpath = creation_element_position["policyRulePage_selectSFFProfile_posXpath"].format(replaceValue=sff["name"]) + self.driver.find_element(By.XPATH, select_sff_xpath).click() + # self.driver.execute_script("arguments[0].click()", btn) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeSFFProfile_posXpath"]).click() + elif policy_configuration["type"] == "monitor": + # 步骤:开启vlan_button->选择select->输入name->搜索 + if policy_configuration["action_parameter"]["traffic_mirroring"]["enable"] == 1: + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_mirrorTrafficButton_posXpath"]).click() + else: + pass + self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addVLANIDButton_posXpath"]).click() + if len(policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]) > 0: + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_firstSelectMirrorProfile_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchMirrorProfile_posXpath"], find_after_wait_time=1).send_keys(policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]["name"]) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchMirrorProfile_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) + self.driver.find_element(By.XPATH, '//ul[@class="MuiList-root MuiList-vertical MuiList-variantPlain MuiList-colorNeutral MuiList-sizeMd css-1cklc3"]//li[1]//div[@class="MuiListItemButton-root MuiListItemButton-colorNeutral MuiListItemButton-variantPlain css-18mv95p"]').click() + self.driver.find_element(By.XPATH, '//div[@class="absolute bottom-0 h-[40px] w-[100%] overflow-hidden text-[16px] truncate bg-[--color-background-secondary] flex justify-center items-center pl-[12px] pr-[38px]"]/button').click() + # selectProfileXpath = creation_element_position["policyRulePage_selectMirrorProfile_posXpath"].format(replaceValue=policy_configuration["profile"][flag]["name"]) + # self.driver.find_element(By.XPATH,selectProfileXpath, find_after_wait_time=0.5).click() # 配置DoS的Threshold和Mitigation if policy_configuration["type"] == "dos_protection" and policy_configuration["action"] == "protect": @@ -897,11 +776,10 @@ class CreateRules: # 确认创建 self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_okButton_posXpath"]).click() #self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_okButton_warningYes_posXpath"]).click() - return 200 + return element_position_library, 200 except Exception as e: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"Exception: {e}", flush=True) - raise - #return 400, None + print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "When creating rule, the exception error: ", str(e), flush=True) + return "", 400 def add_ip(self, ip_data, ip_condition_type, creation_element_position, negate_option): try: diff --git a/support/ui_utils/policies/edit_rules.py b/support/ui_utils/policies/edit_rules.py index 678237ee3..9fe75efdd 100644 --- a/support/ui_utils/policies/edit_rules.py +++ b/support/ui_utils/policies/edit_rules.py @@ -13,10 +13,12 @@ class EditRules: def __init__(self, driver): self.driver = driver - def edit_rules(self, type, name, rule_element_position, vsys_name=""): + def edit_rules(self, policy_configuration, element_position, vsys_name=""): try: - search_element_position = rule_element_position["search"] - page_jump_element_position = rule_element_position["page_jump"] + type = policy_configuration["type"] + name = policy_configuration["name"] + search_element_position = element_position["search"] + page_jump_element_position = element_position["page_jump"] # 打开该列表 page_jump = PageJump(self.driver) page_jump.jump_sub_policy_page(page_jump_element_position) @@ -34,31 +36,31 @@ class EditRules: self.driver.execute_script("arguments[0].click()", btn) # 强制点击 time.sleep(0.5) if type == "security": - rule_id = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault el-popover__reference']//span").text + rule_uuid = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault el-popover__reference']//span").text edit_button_position = "appEdit-_OperateBtns_ElRow_Objects_ProxyInterception_Home_App_anonymousComponent" elif type == "service_chaining": - rule_id = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault mouseSchedule el-popover__reference']//span").text + rule_uuid = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault mouseSchedule el-popover__reference']//span").text edit_button_position = "appEdit-_OperateBtns_ElRow_policy_ServiceChaining_Home_App_anonymousComponent" elif type == "monitor": - rule_id = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text + rule_uuid = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text edit_button_position ="appEdit-_OperateBtns_ElRow_policy_monitor_Home_App_anonymousComponent" elif type == "proxy_intercept": - rule_id = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() + rule_uuid = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() edit_button_position = "appEdit-_OperateBtns_ElRow_policy_Intercept_Home_App_anonymousComponent" elif type == "proxy_manipulation": - rule_id = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() + rule_uuid = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() edit_button_position = "appEdit-_OperateBtns_ElRow_policy_Manipulation_Home_App_anonymousComponent" elif type == "statistics": - rule_id = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text + rule_uuid = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text edit_button_position = "appEdit-_OperateBtns_ElRow_policy_statistics_Home_App_anonymousComponent" elif type == "dos_protection": - rule_id = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text + rule_uuid = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text edit_button_position = "appEdit-_OperateBtns_ElRow_policy_DoSProtection_Home_App_anonymousComponent" all_button_position = "//div[@class='page-box-containcheck']//label[contains(@class, 'allCheckbox')]/span/span" all_selectBox_position = "//table//*[@class='el-checkbox']//span//span" self.driver.find_elements(By.XPATH, all_selectBox_position)[0].click() self.driver.find_element(By.ID, edit_button_position).click() - return 200, rule_id + return 200 except Exception as e: print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"Exception: {e}", flush=True) raise diff --git a/support/ui_utils/policies/search_rules.py b/support/ui_utils/policies/search_rules.py index 973430f63..f1741b6e0 100644 --- a/support/ui_utils/policies/search_rules.py +++ b/support/ui_utils/policies/search_rules.py @@ -13,10 +13,11 @@ class SearchRules: def __init__(self, driver): self.driver = driver - def search_rules(self, type, name, rule_element_position, vsys_name=""): + def search_rules(self, policy_configuration, element_position, vsys_name=""): try: - search_element_position = rule_element_position["search"] - page_jump_element_position = rule_element_position["page_jump"] + name = policy_configuration["name"] + search_element_position = element_position["search"] + page_jump_element_position = element_position["page_jump"] # 打开该列表 page_jump = PageJump(self.driver) page_jump.jump_sub_policy_page(page_jump_element_position) @@ -50,10 +51,12 @@ class SearchRules: except Exception as e: raise - def get_rule_id(self, type, name, rule_element_position, vsys_name=""): + def get_rule_uuid(self, policy_configuration, element_position, vsys_name=""): try: - search_element_position = rule_element_position["search"] - page_jump_element_position = rule_element_position["page_jump"] + type = policy_configuration["type"] + name = policy_configuration["name"] + search_element_position = element_position["search"] + page_jump_element_position = element_position["page_jump"] # 打开该列表 page_jump = PageJump(self.driver) page_jump.jump_sub_policy_page(page_jump_element_position) @@ -71,33 +74,33 @@ class SearchRules: self.driver.execute_script("arguments[0].click()", btn) # 强制点击 time.sleep(1) if type == "security": - rule_id = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault el-popover__reference']//span").text - return 200, rule_id + rule_uuid = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault el-popover__reference']//span").text elif type == "service_chaining": - rule_id = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault mouseSchedule el-popover__reference']//span").text - return 200, rule_id + rule_uuid = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault mouseSchedule el-popover__reference']//span").text elif type == "monitor": - rule_id = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text - return 200, rule_id + rule_uuid = self.driver.find_element(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span").text elif type == "proxy_intercept": - rule_id = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() - return 200, rule_id + rule_uuid = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() elif type == "proxy_manipulation": - rule_id = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() - return 200, rule_id + rule_uuid = self.driver.find_elements(By.XPATH,"//span[@class='idWidth cursorDefault el-popover__reference']//span")[0].text.strip() elif type == "statistics": # 点击列设置打开列设置界面 self.driver.find_element(By.XPATH, ruleListPage_column_set_button_posXpath).click() # 选择UUID self.driver.find_element(By.XPATH, ruleListPage_column_set_field_posXpath.format(replaceFiled="UUID")).click() - rule_id = self.driver.find_element(By.XPATH, "//div[contains(@class,'MuiDataGrid-row')][1]//div[@data-field='uuid']").text + rule_uuid = self.driver.find_element(By.XPATH, "//div[contains(@class,'MuiDataGrid-row')][1]//div[@data-field='uuid']").text #点击其他地方比如再次点击搜索按钮,不然影响菜单切换 self.driver.find_element(By.XPATH, search_element_position["policyRuleListPage_searchLabel_searchButton_posId"]).click() - return 200, rule_id elif type == "dos_protection": - rule_id = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault el-popover__reference']//span").text - return 200, rule_id - else: - return 400, "" + rule_uuid = self.driver.find_element(By.XPATH, "//span[@class='idWidth cursorDefault el-popover__reference']//span").text + rule_uuids_list = [] + rule_uuids_temp_dict = {} + rule_uuids_temp_dict["type"] = policy_configuration["type"] + rule_uuids_temp_dict["uuid"] = rule_uuid + rule_uuids_temp_dict["name"] = policy_configuration["name"] + rule_uuids_temp_dict["attribute_name"] = policy_configuration["type"].upper() + "_" + "RULE" + rule_uuids_list.append(rule_uuids_temp_dict) + rule_uuids_tuple = tuple(rule_uuids_list) + return rule_uuids_tuple, 200 except Exception as e: - raise
\ No newline at end of file + return "", 400
\ No newline at end of file diff --git a/support/ui_utils/ui_client.py b/support/ui_utils/ui_client.py index 41e9c7bcd..628dfcce8 100644 --- a/support/ui_utils/ui_client.py +++ b/support/ui_utils/ui_client.py @@ -56,7 +56,7 @@ class UIClient: def search_rules(self, policy_configuration): rules = SearchRules(self.driver) - rule_tuple, code = rules.search_rules(policy_configuration, self.element_position) + rule_tuple, code = rules.get_rule_uuid(policy_configuration, self.element_position) self.rule_tuple = rule_tuple return rule_tuple, code @@ -67,7 +67,7 @@ class UIClient: def delete_rules(self, rule_tuple): rules = DeleteRules(self.driver) - rules.delete_rules(self.parameter, rule_tuple) + rules.delete_rules(self.element_position, rule_tuple) def query_rule_metric(self, verification_result, traffic_result): query = QueryRuleMetric(self.policy_configuration, verification_result, self.driver, self.element_position, traffic_result) |
