From 06fa53634037ae81bfb5b24e97a48697061cb083 Mon Sep 17 00:00:00 2001 From: zhaokun Date: Thu, 28 Nov 2024 16:50:48 +0800 Subject: Modify create_rules.py --- support/ui_utils/policies/create_rules.py | 496 ++++++++++++++++-------------- 1 file changed, 258 insertions(+), 238 deletions(-) diff --git a/support/ui_utils/policies/create_rules.py b/support/ui_utils/policies/create_rules.py index 629e779c6..e6c0de80f 100644 --- a/support/ui_utils/policies/create_rules.py +++ b/support/ui_utils/policies/create_rules.py @@ -21,8 +21,7 @@ class CreateRules: def create_rules(self, policy_configuration): try: - # 目前所有object类型都是一个值?? - + # 目前所有object类型在json data中都只有一个?? # 根据rule type获取增删改查全部元素定位库,若有需要补充的,在下面或在map_element_position_library中追加 element_position_library = get_element_position(policy_configuration["type"]) page_jump_element_position = element_position_library["page_jump"] @@ -71,6 +70,7 @@ class CreateRules: self.driver.find_element(By.XPATH,monitorRulePage_logOptions_all_posXpath).click() # 添加ip + # 如果出现多个相同类型的attribute name,需要增加一个函数来判断该attribute name出现的次数 src_ip_object_flag, src_ip_object_configuration, src_ip_object_negate_option = self.is_attribute_name_exsit(policy_configuration, "ATTR_SOURCE_IP") src_ip_group_flag, src_ip_object_group_configuration, src_ip_object_group_negate_option = self.is_group_exsit(policy_configuration, "member_type") if src_ip_object_flag and src_ip_group_flag == False: @@ -232,149 +232,164 @@ class CreateRules: if ip_protocol_object_flag: self.add_ip_protocol(ip_protocol_object_configuration["items"][0], "IpProtocol", creation_element_position, ip_protocol_object_negate_option) - # 添加protocol field ?? - protocol_filed_attribute_flag = self.is_protocol_filed_exsit(policy_configuration) - if protocol_filed_attribute_flag: - protocol_field_datas = policy_configuration["condition"]["protocol_filed"] - tmp_protocol_field_data_split_by_type = [] # 将 protocol_filed 分类临时存储 - tmp_url, tmp_request_header, tmp_response_header, tmp_request_body, tmp_response_body, tmp_ftp_account, tmp_ftp_content, tmp_cn, tmp_san, tmp_qname = [], [], [], [], [], [], [], [], [], [] - tmp_boolean= [] - tmp_mail_account, tmp_mail_from, tmp_mail_to,tmp_mail_subject,tmp_mail_attachment_name,tmp_mail_content,tmp_mail_attachment_content= [],[],[],[],[],[],[] - for _ in range(len(protocol_field_datas)): # 将 rul、request_header等数据分类处理 - tmp_item = protocol_field_datas[_] - if tmp_item["object_type"] == "url": - tmp_url.append(tmp_item) - elif tmp_item["object_type"] == "fqdn": - if tmp_item["items"][0]["item_type"] == "cn": - tmp_cn.append(tmp_item) - elif tmp_item["items"][0]["item_type"] == "san": - tmp_san.append(tmp_item) - elif tmp_item["items"][0]["item_type"] == "qname": - tmp_qname.append(tmp_item) - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..") - # elif tmp_item["object_type"] == "http_signature": - # if tmp_item["item"][0]["item_type"] == "request_header": - # tmp_request_header.append(tmp_item) - # elif tmp_item["item"][0]["item_type"] == "response_header": - # tmp_response_header.append(tmp_item) - # else: - # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..") - elif tmp_item["object_type"] == "keywords": - if tmp_item["item"][0]["item_type"] == "request_header": - tmp_request_header.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "response_header": - tmp_response_header.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "request_body": - tmp_request_body.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "response_body": - tmp_response_body.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "ftp_content": - tmp_ftp_content.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "mail_subject": - tmp_mail_subject.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "mail_attachment_name": - tmp_mail_attachment_name.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "mail_attachment_content": - tmp_mail_attachment_content.append(tmp_item) - elif tmp_item["item"][0]["item_type"] == "mail_content": - tmp_mail_content.append(tmp_item) - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..") - elif tmp_item["object_type"] == "account": - if tmp_item["items"][0]["item_type"] == "ftp_account": - tmp_ftp_account.append(tmp_item) - elif tmp_item["items"][0]["item_type"] == "mail_account": - tmp_mail_account.append(tmp_item) - elif tmp_item["items"][0]["item_type"] == "mail_from": - tmp_mail_from.append(tmp_item) - elif tmp_item["items"][0]["item_type"] == "to": - tmp_mail_to.append(tmp_item) - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..") - elif tmp_item["object_type"] == "boolean": - tmp_boolean.append(tmp_item) - if len(tmp_url) > 0: tmp_protocol_field_data_split_by_type.append(tmp_url) - if len(tmp_request_header) > 0: tmp_protocol_field_data_split_by_type.append(tmp_request_header) - if len(tmp_response_header) > 0: tmp_protocol_field_data_split_by_type.append(tmp_response_header) - if len(tmp_request_body) > 0: tmp_protocol_field_data_split_by_type.append(tmp_request_body) - if len(tmp_response_body) > 0: tmp_protocol_field_data_split_by_type.append(tmp_response_body) - if len(tmp_ftp_account) > 0: tmp_protocol_field_data_split_by_type.append(tmp_ftp_account) - if len(tmp_ftp_content) > 0: tmp_protocol_field_data_split_by_type.append(tmp_ftp_content) - if len(tmp_boolean) > 0: tmp_protocol_field_data_split_by_type.append(tmp_boolean) - if len(tmp_cn) > 0: tmp_protocol_field_data_split_by_type.append(tmp_cn) - if len(tmp_san) > 0: tmp_protocol_field_data_split_by_type.append(tmp_san) - if len(tmp_qname) > 0: tmp_protocol_field_data_split_by_type.append(tmp_qname) - if len(tmp_mail_to) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_to) - if len(tmp_mail_from) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_from) - if len(tmp_mail_account) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_account) - if len(tmp_mail_subject) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_subject) - if len(tmp_mail_attachment_name) > 0: tmp_protocol_field_data_split_by_type.append(tmp_mail_attachment_name) - if len(tmp_mail_attachment_content) > 0: tmp_protocol_field_data_split_by_type.append(tmp_mail_attachment_content) - if len(tmp_mail_content) > 0: tmp_protocol_field_data_split_by_type.append(tmp_mail_content) - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], tmp_protocol_field_data_split_by_type) - for protocol_field_data in tmp_protocol_field_data_split_by_type: - #print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], protocol_field_data) - for x in range(len(protocol_field_data)): - temp = x + 1 - if protocol_field_data[x]["object_type"] == "url": - self.add_protocol_field(protocol_field_data, "Url", creation_element_position, temp) - elif protocol_field_data[x]["object_type"] == "fqdn": - for item_dict in protocol_field_data[x]["items"]: - if item_dict["item_type"] == "cn": - self.add_protocol_field(protocol_field_data, "Cn", creation_element_position, temp) - elif item_dict["item_type"] == "san": - self.add_protocol_field(protocol_field_data, "San", creation_element_position, temp) - elif item_dict["item_type"] == "qname": - self.add_protocol_field(protocol_field_data, "Qname", creation_element_position, temp) - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}") - # elif protocol_field_data[x]["object_type"] == "http_signature": - # item_dict = protocol_field_data[x]["item"][0] - # if item_dict["item_type"] == "request_header": - # self.add_protocol_field(protocol_field_data, "RequestHeader", creation_element_position, temp) - # elif item_dict["item_type"] == "response_header": - # self.add_protocol_field(protocol_field_data, "ResponseHeader", creation_element_position, temp) - # else: - # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}") - elif protocol_field_data[x]["object_type"] == "keywords": - item_dict = protocol_field_data[x]["item"][0] - if item_dict["item_type"] == "request_header": - self.add_protocol_field(protocol_field_data, "RequestHeader", creation_element_position, temp) - elif item_dict["item_type"] == "response_header": - self.add_protocol_field(protocol_field_data, "ResponseHeader", creation_element_position, temp) - elif item_dict["item_type"] == "request_body": - self.add_protocol_field(protocol_field_data, "RequestBody", creation_element_position, temp) - elif item_dict["item_type"] == "response_body": - self.add_protocol_field(protocol_field_data, "ResponseBody", creation_element_position, temp) - elif item_dict["item_type"] == "ftp_content": - self.add_protocol_field(protocol_field_data, "FtpContent", creation_element_position, temp) - elif item_dict["item_type"] == "mail_subject": - self.add_protocol_field(protocol_field_data, "MailSubject", creation_element_position, temp) - elif item_dict["item_type"] == "mail_attachment_name": - self.add_protocol_field(protocol_field_data, "MailAttachmentName", creation_element_position, temp) - elif item_dict["item_type"] == "mail_attachment_content": - self.add_protocol_field(protocol_field_data, "MailAttachmentContent", creation_element_position, temp) - elif item_dict["item_type"] == "mail_content": - self.add_protocol_field(protocol_field_data, "MailContent", creation_element_position, temp) - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}") - elif protocol_field_data[x]["object_type"] == "account": - for item_dict in protocol_field_data[x]["items"]: - if item_dict["item_type"] == "ftp_account": - self.add_protocol_field(protocol_field_data, "FtpAccount", creation_element_position, temp) - elif item_dict["item_type"] == "mail_account": - self.add_protocol_field(protocol_field_data, "MailAccount",creation_element_position, temp) - elif item_dict["item_type"] == "mail_from": - self.add_protocol_field(protocol_field_data, "MailFrom", creation_element_position,temp) - elif item_dict["item_type"] == "to": - self.add_protocol_field(protocol_field_data, "MailTo", creation_element_position,temp) - else: - print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}") - elif protocol_field_data[x]["object_type"] == 'boolean': - self.add_bool_type_protocol_field(protocol_field_data, "Boolean", creation_element_position, temp) - break - + # 添加protocol field + protocol_filed_attribute_flag = False + tmp_protocol_field_data_split_by_type = [] # 将 protocol_filed 分类临时存储 + tmp_url, tmp_request_header, tmp_response_header, tmp_request_body, tmp_response_body, tmp_ftp_account, tmp_ftp_content, tmp_cn, tmp_san, tmp_qname = [], [], [], [], [], [], [], [], [], [] + tmp_mail_account, tmp_mail_from, tmp_mail_to, tmp_mail_subject, tmp_mail_attachment_name, tmp_mail_content, tmp_mail_attachment_content, tmp_boolean= [], [], [], [], [], [], [], [] + tmp_cn_cat, tmp_san_cat, tmp_ech, tmp_esni, tmp_no_sni, tmp_ftp_uri, tmp_sip_ori_description, tmp_sip_res_description = [], [], [], [], [], [], [], [] + count = 1 + http_url_flag, http_url_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_URL") + if http_url_flag: + protocol_filed_attribute_flag = True + tmp_url.append(http_url_configuration) + count += 1 + self.add_protocol_field(tmp_url, "Url", creation_element_position, count) + http_req_header_flag, http_req_header_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_REQ_HDR") + if http_req_header_flag: + protocol_filed_attribute_flag = True + tmp_request_header.append(http_req_header_configuration) + count += 1 + self.add_protocol_field(tmp_request_header, "RequestHeader", creation_element_position, count) + http_res_header_flag, http_res_header_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_RES_HDR") + if http_res_header_flag: + protocol_filed_attribute_flag = True + tmp_response_header.append(http_res_header_configuration) + count += 1 + self.add_protocol_field(tmp_response_header, "ResponseHeader", creation_element_position, count) + http_req_body_flag, http_req_body_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_REQ_BODY") + if http_req_body_flag: + protocol_filed_attribute_flag = True + tmp_request_body.append(http_req_body_configuration) + count += 1 + self.add_protocol_field(tmp_request_body, "RequestBody", creation_element_position, count) + http_res_body_flag, http_res_body_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_RES_BODY") + if http_req_header_flag: + protocol_filed_attribute_flag = True + tmp_response_body.append(http_res_body_configuration) + count += 1 + self.add_protocol_field(tmp_response_body, "ResponseBody", creation_element_position, count) + ssl_cn_flag, ssl_cn_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_CN") + if ssl_cn_flag: + protocol_filed_attribute_flag = True + tmp_cn.append(ssl_cn_configuration) + count += 1 + self.add_protocol_field(tmp_cn, "Cn", creation_element_position, count) + ssl_cn_cat_flag, ssl_cn_cat_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_CN_CAT") + if ssl_cn_cat_flag: + protocol_filed_attribute_flag = True + tmp_cn_cat.append(ssl_cn_cat_configuration) + count += 1 + self.add_protocol_field(tmp_cn_cat, "CnCat", creation_element_position, count) + ssl_san_flag, ssl_san_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_SAN") + if ssl_san_flag: + protocol_filed_attribute_flag = True + tmp_san.append(ssl_san_configuration) + count += 1 + self.add_protocol_field(tmp_san, "San", creation_element_position, count) + ssl_san_cat_flag, ssl_san_cat_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_SAN_CAT") + if ssl_san_cat_flag: + protocol_filed_attribute_flag = True + tmp_san_cat.append(ssl_san_cat_configuration) + count += 1 + self.add_protocol_field(tmp_san_cat, "SanCat", creation_element_position, count) + ssl_ech_flag, ssl_ech_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_ECH") + if ssl_ech_flag: + protocol_filed_attribute_flag = True + tmp_ech.append(ssl_ech_configuration) + count += 1 + self.add_bool_type_protocol_field(tmp_ech, "Ech", creation_element_position, count) + ssl_esni_flag, ssl_esni_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_ESNI") + if ssl_esni_flag: + protocol_filed_attribute_flag = True + tmp_esni.append(ssl_esni_configuration) + count += 1 + self.add_bool_type_protocol_field(tmp_esni, "Esni", creation_element_position, count) + ssl_no_sni_flag, ssl_no_sni_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_NO_SNI") + if ssl_no_sni_flag: + protocol_filed_attribute_flag = True + tmp_no_sni.append(ssl_no_sni_configuration) + count += 1 + self.add_bool_type_protocol_field(tmp_no_sni, "NoSni", creation_element_position, count) + dns_qname_flag, dns_qname_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_DNS_QNAME") + if dns_qname_flag: + protocol_filed_attribute_flag = True + tmp_qname.append(dns_qname_configuration) + count += 1 + self.add_protocol_field(tmp_qname, "Qname", creation_element_position, count) + mail_subject_flag, mail_subject_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_SUBJECT") + if mail_subject_flag: + protocol_filed_attribute_flag = True + tmp_mail_subject.append(mail_subject_configuration) + count += 1 + self.add_protocol_field(tmp_mail_subject, "MailSubject", creation_element_position, count) + mail_content_flag, mail_content_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_CONTENT") + if mail_content_flag: + protocol_filed_attribute_flag = True + tmp_mail_content.append(mail_content_configuration) + count += 1 + self.add_protocol_field(tmp_mail_content, "MailContent", creation_element_position, count) + mail_att_name_flag, mail_att_name_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_ATT_NAME") + if mail_att_name_flag: + protocol_filed_attribute_flag = True + tmp_mail_attachment_name.append(mail_att_name_configuration) + count += 1 + self.add_protocol_field(tmp_mail_attachment_name, "MailAttachmentName", creation_element_position, count) + mail_att_content_flag, mail_att_content_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_ATT_CONTENT") + if mail_att_content_flag: + protocol_filed_attribute_flag = True + tmp_mail_attachment_content.append(mail_att_content_configuration) + count += 1 + self.add_protocol_field(tmp_mail_attachment_content, "MailAttachmentContent", creation_element_position, count) + mail_from_flag, mail_from_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_FROM") + if mail_from_flag: + protocol_filed_attribute_flag = True + tmp_mail_from.append(mail_from_configuration) + count += 1 + self.add_protocol_field(tmp_mail_from, "MailFrom", creation_element_position, count) + mail_to_flag, mail_to_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_TO") + if mail_to_flag: + protocol_filed_attribute_flag = True + tmp_mail_to.append(mail_to_configuration) + count += 1 + self.add_protocol_field(tmp_mail_to, "MailTo", creation_element_position, count) + mail_account_flag, mail_account_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_ACCOUNT") + if mail_account_flag: + protocol_filed_attribute_flag = True + tmp_mail_account.append(mail_account_configuration) + count += 1 + self.add_protocol_field(tmp_mail_account, "MailAccount", creation_element_position, count) + ftp_uri_flag, ftp_uri_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_FTP_URI") + if ftp_uri_flag: + protocol_filed_attribute_flag = True + tmp_ftp_uri.append(ftp_uri_configuration) + count += 1 + self.add_protocol_field(tmp_ftp_uri, "FtpUri", creation_element_position, count) + ftp_content_flag, ftp_content_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_FTP_CONTENT") + if ftp_content_flag: + protocol_filed_attribute_flag = True + tmp_ftp_content.append(ftp_content_configuration) + count += 1 + self.add_protocol_field(tmp_ftp_content, "FtpContent", creation_element_position, count) + ftp_account_flag, ftp_account_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_FTP_ACCOUNT") + if ftp_account_flag: + protocol_filed_attribute_flag = True + tmp_ftp_account.append(ftp_account_configuration) + count += 1 + self.add_protocol_field(tmp_ftp_account, "FtpAccount", creation_element_position, count) + sip_ori_description_flag, sip_ori_description_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SIP_ORIGINATOR_DESCRIPTION") + if sip_ori_description_flag: + protocol_filed_attribute_flag = True + tmp_sip_ori_description.append(sip_ori_description_configuration) + count += 1 + self.add_protocol_field(tmp_sip_ori_description, "SipOriDescription", creation_element_position, count) + sip_res_description_flag, sip_res_description_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SIP_RESPONDER_DESCRIPTION") + if sip_res_description_flag: + protocol_filed_attribute_flag = True + tmp_sip_res_description.append(sip_res_description_configuration) + count += 1 + self.add_protocol_field(tmp_sip_res_description, "SipResDescription", creation_element_position, count) + # override_flag赋值 if application_object_flag and protocol_filed_attribute_flag == False: override_flag = True @@ -801,18 +816,18 @@ class CreateRules: self.driver.find_element(By.XPATH, temp_element_position, find_after_wait_time=0.5).click() # self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_select{}_posXpath".format(ip_condition_type)]).click() self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(ip_condition_type)]).click() - for j in range(len(ip_data)): - if ip_data[j]["negate"] == True: - time.sleep(1) # 暂停1秒 - negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(ip_condition_type)]) - # 使用JavaScript确保元素完全进入视图 - self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', inline: 'nearest'});",negate_element) - # 使用JavaScript执行悬停 - self.driver.execute_script("var evt = new MouseEvent('mouseover', {'view': window, 'bubbles': true, 'cancelable': true}); arguments[0].dispatchEvent(evt);",negate_element) - # 确保元素可点击后进行点击 - self.driver.execute_script("arguments[0].click();", negate_element) - # ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见 - # negate_element.click() + # for j in range(len(ip_data)): + if negate_option == True: + time.sleep(1) # 暂停1秒 + negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(ip_condition_type)]) + # 使用JavaScript确保元素完全进入视图 + self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', inline: 'nearest'});", negate_element) + # 使用JavaScript执行悬停 + self.driver.execute_script("var evt = new MouseEvent('mouseover', {'view': window, 'bubbles': true, 'cancelable': true}); arguments[0].dispatchEvent(evt);", negate_element) + # 确保元素可点击后进行点击 + self.driver.execute_script("arguments[0].click();", negate_element) + # ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见 + # negate_element.click() except Exception as e: raise @@ -833,38 +848,41 @@ class CreateRules: temp_element_position = creation_element_position["policyRulePage_select{}_posXpath".format(port_condition_type)].format(replaceValue=port_data[z]["name"]) self.driver.find_element(By.XPATH, temp_element_position, find_after_wait_time=0.5).click() self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(port_condition_type)]).click() - for j in range(len(port_data)): - if port_data[j]["negate"] == True: - time.sleep(1) # 暂停1秒 - negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(port_condition_type)]) - ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见 - negate_element.click() + # for j in range(len(port_data)): + if negate_option == True: + time.sleep(1) # 暂停1秒 + negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(port_condition_type)]) + ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见 + negate_element.click() except Exception as e: raise def add_device(self, creation_element_position, name, negate_option): - # 点add condition的+ - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click() - # 选中imsi - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addConditionImsi_posXpath"]).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click() - # 点imsi的+,因为dst ip已经是第一层且t初始值是0,所以要+2 - temp_element_position = creation_element_position["policyRulePage_addImsi_posXpath"] - self.driver.find_element(By.XPATH, temp_element_position).click() - # 通过imsi搜索并选中 - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=1).send_keys(name) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectImsi_posXpath"], find_after_wait_time=0.5).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeImsi_posXpath"]).click() - if negate_option == True: - time.sleep(1) # 暂停1秒 - negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negateImsi_posXpath"]) - ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见 - negate_element.click() + try: + # 点add condition的+ + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click() + # 选中imsi + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addConditionImsi_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click() + # 点imsi的+,因为dst ip已经是第一层且t初始值是0,所以要+2 + temp_element_position = creation_element_position["policyRulePage_addImsi_posXpath"] + self.driver.find_element(By.XPATH, temp_element_position).click() + # 通过imsi搜索并选中 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=1).send_keys(name) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectImsi_posXpath"], find_after_wait_time=0.5).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeImsi_posXpath"]).click() + if negate_option == True: + time.sleep(1) # 暂停1秒 + negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negateImsi_posXpath"]) + ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见 + negate_element.click() + except Exception as e: + raise add_subid = add_port - def add_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num, negate_option): + def add_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num): try: # 强制点击add condition的+ btn = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]) @@ -883,24 +901,24 @@ class CreateRules: except Exception as e: raise - def add_bool_type_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num, negate_option): + def add_bool_type_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num): try: - for x in range(len(protocol_field_data)): - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click() - var1 = protocol_field_data[x]["boolean_type"] - xpath_temp = creation_element_position["policyRulePage_addCondition{}_posXpath".format(filed_type)] - xpath_last = xpath_temp.format(replaceValue=var1) - self.driver.find_element(By.XPATH, xpath_last).click() - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addCondition_close_posXpath"]).click() - # 点protocol filed的+ - temp_element_position = creation_element_position["policyRulePage_add{}_posXpath".format(filed_type)].format(replaceValue=num) - self.driver.find_element(By.XPATH, temp_element_position).click() - # 通过protocol filed name搜索并选中 - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_search{}_posXpath".format(filed_type)],find_after_wait_time=1).send_keys(protocol_field_data[x]["name"]) - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_search{}_posXpath".format(filed_type)],find_after_wait_time=0.5).send_keys(Keys.ENTER) - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_select{}_posXpath".format(filed_type)],find_after_wait_time=0.5).click() - self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(filed_type)]).click() - num = num +1 + # for x in range(len(protocol_field_data)): + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click() + value = filed_type.lower() + xpath_temp = creation_element_position["policyRulePage_addCondition{}_posXpath".format("Boolean")] + xpath_last = xpath_temp.format(replaceValue=value) + self.driver.find_element(By.XPATH, xpath_last).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click() + # 点protocol filed的+ + temp_element_position = creation_element_position["policyRulePage_add{}_posXpath".format("Boolean")].format(replaceValue=num) + self.driver.find_element(By.XPATH, temp_element_position).click() + # 通过protocol filed name搜索并选中 + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_search{}_posXpath".format("Boolean")], find_after_wait_time=1).send_keys(protocol_field_data["name"]) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_search{}_posXpath".format("Boolean")], find_after_wait_time=0.5).send_keys(Keys.ENTER) + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_select{}_posXpath".format("Boolean")], find_after_wait_time=0.5).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format("Boolean")]).click() + num = num +1 except Exception as e: raise @@ -910,18 +928,18 @@ class CreateRules: self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click() # 选中port self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition{}_posXpath".format(filed_type)]).click() - self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addCondition_close_posXpath"]).click() + self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click() # 点source port的+ self.driver.find_element(By.XPATH, creation_element_position[ "policyRulePage_add{}_posXpath".format(filed_type)]).click() # 通过port name搜索并选中 is_first_iteration = True - for z in ip_protocol_data: - #如果不是第一次循环,需要点加号调侧滑框 + for z in ip_protocol_data["items"]: + # 如果不是第一次循环,需要点加号调侧滑框 if not is_first_iteration: self.driver.find_element(By.XPATH, policyRulePage_little_button_addIpProtocol_posXpath).click() self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_search{}_posXpath".format(filed_type)], find_after_wait_time=1).send_keys(z) self.driver.find_element(By.XPATH, creation_element_position[ "policyRulePage_search{}_posXpath".format(filed_type)], find_after_wait_time=0.5).send_keys(Keys.ENTER) - temp_element_position = creation_element_position[ "policyRulePage_select{}_posXpath".format(filed_type)].format( replaceValue=z) + temp_element_position = creation_element_position[ "policyRulePage_select{}_posXpath".format(filed_type)].format(replaceValue=z) self.driver.find_element(By.XPATH, temp_element_position, find_after_wait_time=0.5).click() self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(filed_type)]).click() is_first_iteration = False @@ -948,8 +966,8 @@ class CreateRules: or_conditions = and_conditions[i]["or_conditions"] for j in range(len(or_conditions)): if or_conditions[j]["attribute_name"] == attribute_name: - return [True, or_conditions[j], or_conditions["negate_option"]] - return [False, "", ""] + return True, or_conditions, and_conditions[i]["negate_option"] + return False, "", "" def is_group_exsit(self, policy_configuration): if "and_conditions" in policy_configuration: @@ -958,42 +976,44 @@ class CreateRules: or_conditions = and_conditions[i]["or_conditions"] for j in range(len(or_conditions)): if "member_type" in or_conditions[j].keys and or_conditions[j]["member_type"] != "": - return [True, or_conditions[j], or_conditions["negate_option"]] - return [False, "", ""] + return True, or_conditions, and_conditions[i]["negate_option"] + return False, "", "" - def is_protocol_filed_exsit(self, policy_configuration): - protocol_filed_list = [ - "ATTR_HTTP_URL", - "ATTR_HTTP_REQ_HDR", - "ATTR_HTTP_RES_HDR", - "ATTR_HTTP_REQ_BODY", - "ATTR_HTTP_RES_BODY", - "ATTR_SSL_CN", - "ATTR_SSL_CN_CAT", - "ATTR_SSL_SAN", - "ATTR_SSL_SAN_CAT", - "ATTR_SSL_ECH", - "ATTR_SSL_ESNI", - "ATTR_SSL_NO_SNI", - "ATTR_DNS_QNAME", - "ATTR_MAIL_SUBJECT", - "ATTR_MAIL_CONTENT", - "ATTR_MAIL_ATT_NAME", - "ATTR_MAIL_ATT_CONTENT", - "ATTR_MAIL_FROM", - "ATTR_MAIL_TO", - "ATTR_MAIL_ACCOUNT", - "ATTR_FTP_URI", - "ATTR_FTP_CONTENT", - "ATTR_FTP_ACCOUNT", - "ATTR_SIP_ORIGINATOR_DESCRIPTION", - "ATTR_SIP_RESPONDER_DESCRIPTION" - ] + def is_protocol_filed_exsit(self, policy_configuration, attribute_value): + """ + # protocol_filed_list = [ + # "ATTR_HTTP_URL", + # "ATTR_HTTP_REQ_HDR", + # "ATTR_HTTP_RES_HDR", + # "ATTR_HTTP_REQ_BODY", + # "ATTR_HTTP_RES_BODY", + # "ATTR_SSL_CN", + # "ATTR_SSL_CN_CAT", + # "ATTR_SSL_SAN", + # "ATTR_SSL_SAN_CAT", + # "ATTR_SSL_ECH", + # "ATTR_SSL_ESNI", + # "ATTR_SSL_NO_SNI", + # "ATTR_DNS_QNAME", + # "ATTR_MAIL_SUBJECT", + # "ATTR_MAIL_CONTENT", + # "ATTR_MAIL_ATT_NAME", + # "ATTR_MAIL_ATT_CONTENT", + # "ATTR_MAIL_FROM", + # "ATTR_MAIL_TO", + # "ATTR_MAIL_ACCOUNT", + # "ATTR_FTP_URI", + # "ATTR_FTP_CONTENT", + # "ATTR_FTP_ACCOUNT", + # "ATTR_SIP_ORIGINATOR_DESCRIPTION", + # "ATTR_SIP_RESPONDER_DESCRIPTION" + # ] + """ if "and_conditions" in policy_configuration: and_conditions = policy_configuration["and_conditions"] for i in range(len(and_conditions)): or_conditions = and_conditions[i]["or_conditions"] for j in range(len(or_conditions)): - if or_conditions[j]["attribute_name"] in protocol_filed_list: - return [True] - return [False] \ No newline at end of file + if or_conditions[j]["attribute_name"] == attribute_value: + return True, or_conditions + return False, "" \ No newline at end of file -- cgit v1.2.3