summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhaokun <[email protected]>2024-11-28 16:50:48 +0800
committerzhaokun <[email protected]>2024-11-28 16:50:48 +0800
commit06fa53634037ae81bfb5b24e97a48697061cb083 (patch)
treeafe9bf8b039d2f897de62bbb60bcba2d5d29947d
parent91a7a2de1cd556ab32fe14ac742aab0e1cee531a (diff)
Modify create_rules.py
-rw-r--r--support/ui_utils/policies/create_rules.py496
1 files changed, 258 insertions, 238 deletions
diff --git a/support/ui_utils/policies/create_rules.py b/support/ui_utils/policies/create_rules.py
index 629e779c6..e6c0de80f 100644
--- a/support/ui_utils/policies/create_rules.py
+++ b/support/ui_utils/policies/create_rules.py
@@ -21,8 +21,7 @@ class CreateRules:
def create_rules(self, policy_configuration):
try:
- # 目前所有object类型都是一个值??
-
+ # 目前所有object类型在json data中都只有一个??
# 根据rule type获取增删改查全部元素定位库,若有需要补充的,在下面或在map_element_position_library中追加
element_position_library = get_element_position(policy_configuration["type"])
page_jump_element_position = element_position_library["page_jump"]
@@ -71,6 +70,7 @@ class CreateRules:
self.driver.find_element(By.XPATH,monitorRulePage_logOptions_all_posXpath).click()
# 添加ip
+ # 如果出现多个相同类型的attribute name,需要增加一个函数来判断该attribute name出现的次数
src_ip_object_flag, src_ip_object_configuration, src_ip_object_negate_option = self.is_attribute_name_exsit(policy_configuration, "ATTR_SOURCE_IP")
src_ip_group_flag, src_ip_object_group_configuration, src_ip_object_group_negate_option = self.is_group_exsit(policy_configuration, "member_type")
if src_ip_object_flag and src_ip_group_flag == False:
@@ -232,149 +232,164 @@ class CreateRules:
if ip_protocol_object_flag:
self.add_ip_protocol(ip_protocol_object_configuration["items"][0], "IpProtocol", creation_element_position, ip_protocol_object_negate_option)
- # 添加protocol field ??
- protocol_filed_attribute_flag = self.is_protocol_filed_exsit(policy_configuration)
- if protocol_filed_attribute_flag:
- protocol_field_datas = policy_configuration["condition"]["protocol_filed"]
- tmp_protocol_field_data_split_by_type = [] # 将 protocol_filed 分类临时存储
- tmp_url, tmp_request_header, tmp_response_header, tmp_request_body, tmp_response_body, tmp_ftp_account, tmp_ftp_content, tmp_cn, tmp_san, tmp_qname = [], [], [], [], [], [], [], [], [], []
- tmp_boolean= []
- tmp_mail_account, tmp_mail_from, tmp_mail_to,tmp_mail_subject,tmp_mail_attachment_name,tmp_mail_content,tmp_mail_attachment_content= [],[],[],[],[],[],[]
- for _ in range(len(protocol_field_datas)): # 将 rul、request_header等数据分类处理
- tmp_item = protocol_field_datas[_]
- if tmp_item["object_type"] == "url":
- tmp_url.append(tmp_item)
- elif tmp_item["object_type"] == "fqdn":
- if tmp_item["items"][0]["item_type"] == "cn":
- tmp_cn.append(tmp_item)
- elif tmp_item["items"][0]["item_type"] == "san":
- tmp_san.append(tmp_item)
- elif tmp_item["items"][0]["item_type"] == "qname":
- tmp_qname.append(tmp_item)
- else:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..")
- # elif tmp_item["object_type"] == "http_signature":
- # if tmp_item["item"][0]["item_type"] == "request_header":
- # tmp_request_header.append(tmp_item)
- # elif tmp_item["item"][0]["item_type"] == "response_header":
- # tmp_response_header.append(tmp_item)
- # else:
- # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..")
- elif tmp_item["object_type"] == "keywords":
- if tmp_item["item"][0]["item_type"] == "request_header":
- tmp_request_header.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "response_header":
- tmp_response_header.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "request_body":
- tmp_request_body.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "response_body":
- tmp_response_body.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "ftp_content":
- tmp_ftp_content.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "mail_subject":
- tmp_mail_subject.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "mail_attachment_name":
- tmp_mail_attachment_name.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "mail_attachment_content":
- tmp_mail_attachment_content.append(tmp_item)
- elif tmp_item["item"][0]["item_type"] == "mail_content":
- tmp_mail_content.append(tmp_item)
- else:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..")
- elif tmp_item["object_type"] == "account":
- if tmp_item["items"][0]["item_type"] == "ftp_account":
- tmp_ftp_account.append(tmp_item)
- elif tmp_item["items"][0]["item_type"] == "mail_account":
- tmp_mail_account.append(tmp_item)
- elif tmp_item["items"][0]["item_type"] == "mail_from":
- tmp_mail_from.append(tmp_item)
- elif tmp_item["items"][0]["item_type"] == "to":
- tmp_mail_to.append(tmp_item)
- else:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "check..")
- elif tmp_item["object_type"] == "boolean":
- tmp_boolean.append(tmp_item)
- if len(tmp_url) > 0: tmp_protocol_field_data_split_by_type.append(tmp_url)
- if len(tmp_request_header) > 0: tmp_protocol_field_data_split_by_type.append(tmp_request_header)
- if len(tmp_response_header) > 0: tmp_protocol_field_data_split_by_type.append(tmp_response_header)
- if len(tmp_request_body) > 0: tmp_protocol_field_data_split_by_type.append(tmp_request_body)
- if len(tmp_response_body) > 0: tmp_protocol_field_data_split_by_type.append(tmp_response_body)
- if len(tmp_ftp_account) > 0: tmp_protocol_field_data_split_by_type.append(tmp_ftp_account)
- if len(tmp_ftp_content) > 0: tmp_protocol_field_data_split_by_type.append(tmp_ftp_content)
- if len(tmp_boolean) > 0: tmp_protocol_field_data_split_by_type.append(tmp_boolean)
- if len(tmp_cn) > 0: tmp_protocol_field_data_split_by_type.append(tmp_cn)
- if len(tmp_san) > 0: tmp_protocol_field_data_split_by_type.append(tmp_san)
- if len(tmp_qname) > 0: tmp_protocol_field_data_split_by_type.append(tmp_qname)
- if len(tmp_mail_to) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_to)
- if len(tmp_mail_from) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_from)
- if len(tmp_mail_account) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_account)
- if len(tmp_mail_subject) > 0:tmp_protocol_field_data_split_by_type.append(tmp_mail_subject)
- if len(tmp_mail_attachment_name) > 0: tmp_protocol_field_data_split_by_type.append(tmp_mail_attachment_name)
- if len(tmp_mail_attachment_content) > 0: tmp_protocol_field_data_split_by_type.append(tmp_mail_attachment_content)
- if len(tmp_mail_content) > 0: tmp_protocol_field_data_split_by_type.append(tmp_mail_content)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], tmp_protocol_field_data_split_by_type)
- for protocol_field_data in tmp_protocol_field_data_split_by_type:
- #print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], protocol_field_data)
- for x in range(len(protocol_field_data)):
- temp = x + 1
- if protocol_field_data[x]["object_type"] == "url":
- self.add_protocol_field(protocol_field_data, "Url", creation_element_position, temp)
- elif protocol_field_data[x]["object_type"] == "fqdn":
- for item_dict in protocol_field_data[x]["items"]:
- if item_dict["item_type"] == "cn":
- self.add_protocol_field(protocol_field_data, "Cn", creation_element_position, temp)
- elif item_dict["item_type"] == "san":
- self.add_protocol_field(protocol_field_data, "San", creation_element_position, temp)
- elif item_dict["item_type"] == "qname":
- self.add_protocol_field(protocol_field_data, "Qname", creation_element_position, temp)
- else:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}")
- # elif protocol_field_data[x]["object_type"] == "http_signature":
- # item_dict = protocol_field_data[x]["item"][0]
- # if item_dict["item_type"] == "request_header":
- # self.add_protocol_field(protocol_field_data, "RequestHeader", creation_element_position, temp)
- # elif item_dict["item_type"] == "response_header":
- # self.add_protocol_field(protocol_field_data, "ResponseHeader", creation_element_position, temp)
- # else:
- # print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}")
- elif protocol_field_data[x]["object_type"] == "keywords":
- item_dict = protocol_field_data[x]["item"][0]
- if item_dict["item_type"] == "request_header":
- self.add_protocol_field(protocol_field_data, "RequestHeader", creation_element_position, temp)
- elif item_dict["item_type"] == "response_header":
- self.add_protocol_field(protocol_field_data, "ResponseHeader", creation_element_position, temp)
- elif item_dict["item_type"] == "request_body":
- self.add_protocol_field(protocol_field_data, "RequestBody", creation_element_position, temp)
- elif item_dict["item_type"] == "response_body":
- self.add_protocol_field(protocol_field_data, "ResponseBody", creation_element_position, temp)
- elif item_dict["item_type"] == "ftp_content":
- self.add_protocol_field(protocol_field_data, "FtpContent", creation_element_position, temp)
- elif item_dict["item_type"] == "mail_subject":
- self.add_protocol_field(protocol_field_data, "MailSubject", creation_element_position, temp)
- elif item_dict["item_type"] == "mail_attachment_name":
- self.add_protocol_field(protocol_field_data, "MailAttachmentName", creation_element_position, temp)
- elif item_dict["item_type"] == "mail_attachment_content":
- self.add_protocol_field(protocol_field_data, "MailAttachmentContent", creation_element_position, temp)
- elif item_dict["item_type"] == "mail_content":
- self.add_protocol_field(protocol_field_data, "MailContent", creation_element_position, temp)
- else:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}")
- elif protocol_field_data[x]["object_type"] == "account":
- for item_dict in protocol_field_data[x]["items"]:
- if item_dict["item_type"] == "ftp_account":
- self.add_protocol_field(protocol_field_data, "FtpAccount", creation_element_position, temp)
- elif item_dict["item_type"] == "mail_account":
- self.add_protocol_field(protocol_field_data, "MailAccount",creation_element_position, temp)
- elif item_dict["item_type"] == "mail_from":
- self.add_protocol_field(protocol_field_data, "MailFrom", creation_element_position,temp)
- elif item_dict["item_type"] == "to":
- self.add_protocol_field(protocol_field_data, "MailTo", creation_element_position,temp)
- else:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"todo {item_dict['item_type']}")
- elif protocol_field_data[x]["object_type"] == 'boolean':
- self.add_bool_type_protocol_field(protocol_field_data, "Boolean", creation_element_position, temp)
- break
-
+ # 添加protocol field
+ protocol_filed_attribute_flag = False
+ tmp_protocol_field_data_split_by_type = [] # 将 protocol_filed 分类临时存储
+ tmp_url, tmp_request_header, tmp_response_header, tmp_request_body, tmp_response_body, tmp_ftp_account, tmp_ftp_content, tmp_cn, tmp_san, tmp_qname = [], [], [], [], [], [], [], [], [], []
+ tmp_mail_account, tmp_mail_from, tmp_mail_to, tmp_mail_subject, tmp_mail_attachment_name, tmp_mail_content, tmp_mail_attachment_content, tmp_boolean= [], [], [], [], [], [], [], []
+ tmp_cn_cat, tmp_san_cat, tmp_ech, tmp_esni, tmp_no_sni, tmp_ftp_uri, tmp_sip_ori_description, tmp_sip_res_description = [], [], [], [], [], [], [], []
+ count = 1
+ http_url_flag, http_url_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_URL")
+ if http_url_flag:
+ protocol_filed_attribute_flag = True
+ tmp_url.append(http_url_configuration)
+ count += 1
+ self.add_protocol_field(tmp_url, "Url", creation_element_position, count)
+ http_req_header_flag, http_req_header_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_REQ_HDR")
+ if http_req_header_flag:
+ protocol_filed_attribute_flag = True
+ tmp_request_header.append(http_req_header_configuration)
+ count += 1
+ self.add_protocol_field(tmp_request_header, "RequestHeader", creation_element_position, count)
+ http_res_header_flag, http_res_header_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_RES_HDR")
+ if http_res_header_flag:
+ protocol_filed_attribute_flag = True
+ tmp_response_header.append(http_res_header_configuration)
+ count += 1
+ self.add_protocol_field(tmp_response_header, "ResponseHeader", creation_element_position, count)
+ http_req_body_flag, http_req_body_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_REQ_BODY")
+ if http_req_body_flag:
+ protocol_filed_attribute_flag = True
+ tmp_request_body.append(http_req_body_configuration)
+ count += 1
+ self.add_protocol_field(tmp_request_body, "RequestBody", creation_element_position, count)
+ http_res_body_flag, http_res_body_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_HTTP_RES_BODY")
+ if http_req_header_flag:
+ protocol_filed_attribute_flag = True
+ tmp_response_body.append(http_res_body_configuration)
+ count += 1
+ self.add_protocol_field(tmp_response_body, "ResponseBody", creation_element_position, count)
+ ssl_cn_flag, ssl_cn_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_CN")
+ if ssl_cn_flag:
+ protocol_filed_attribute_flag = True
+ tmp_cn.append(ssl_cn_configuration)
+ count += 1
+ self.add_protocol_field(tmp_cn, "Cn", creation_element_position, count)
+ ssl_cn_cat_flag, ssl_cn_cat_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_CN_CAT")
+ if ssl_cn_cat_flag:
+ protocol_filed_attribute_flag = True
+ tmp_cn_cat.append(ssl_cn_cat_configuration)
+ count += 1
+ self.add_protocol_field(tmp_cn_cat, "CnCat", creation_element_position, count)
+ ssl_san_flag, ssl_san_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_SAN")
+ if ssl_san_flag:
+ protocol_filed_attribute_flag = True
+ tmp_san.append(ssl_san_configuration)
+ count += 1
+ self.add_protocol_field(tmp_san, "San", creation_element_position, count)
+ ssl_san_cat_flag, ssl_san_cat_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_SAN_CAT")
+ if ssl_san_cat_flag:
+ protocol_filed_attribute_flag = True
+ tmp_san_cat.append(ssl_san_cat_configuration)
+ count += 1
+ self.add_protocol_field(tmp_san_cat, "SanCat", creation_element_position, count)
+ ssl_ech_flag, ssl_ech_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_ECH")
+ if ssl_ech_flag:
+ protocol_filed_attribute_flag = True
+ tmp_ech.append(ssl_ech_configuration)
+ count += 1
+ self.add_bool_type_protocol_field(tmp_ech, "Ech", creation_element_position, count)
+ ssl_esni_flag, ssl_esni_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_ESNI")
+ if ssl_esni_flag:
+ protocol_filed_attribute_flag = True
+ tmp_esni.append(ssl_esni_configuration)
+ count += 1
+ self.add_bool_type_protocol_field(tmp_esni, "Esni", creation_element_position, count)
+ ssl_no_sni_flag, ssl_no_sni_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SSL_NO_SNI")
+ if ssl_no_sni_flag:
+ protocol_filed_attribute_flag = True
+ tmp_no_sni.append(ssl_no_sni_configuration)
+ count += 1
+ self.add_bool_type_protocol_field(tmp_no_sni, "NoSni", creation_element_position, count)
+ dns_qname_flag, dns_qname_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_DNS_QNAME")
+ if dns_qname_flag:
+ protocol_filed_attribute_flag = True
+ tmp_qname.append(dns_qname_configuration)
+ count += 1
+ self.add_protocol_field(tmp_qname, "Qname", creation_element_position, count)
+ mail_subject_flag, mail_subject_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_SUBJECT")
+ if mail_subject_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_subject.append(mail_subject_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_subject, "MailSubject", creation_element_position, count)
+ mail_content_flag, mail_content_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_CONTENT")
+ if mail_content_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_content.append(mail_content_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_content, "MailContent", creation_element_position, count)
+ mail_att_name_flag, mail_att_name_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_ATT_NAME")
+ if mail_att_name_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_attachment_name.append(mail_att_name_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_attachment_name, "MailAttachmentName", creation_element_position, count)
+ mail_att_content_flag, mail_att_content_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_ATT_CONTENT")
+ if mail_att_content_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_attachment_content.append(mail_att_content_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_attachment_content, "MailAttachmentContent", creation_element_position, count)
+ mail_from_flag, mail_from_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_FROM")
+ if mail_from_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_from.append(mail_from_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_from, "MailFrom", creation_element_position, count)
+ mail_to_flag, mail_to_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_TO")
+ if mail_to_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_to.append(mail_to_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_to, "MailTo", creation_element_position, count)
+ mail_account_flag, mail_account_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_MAIL_ACCOUNT")
+ if mail_account_flag:
+ protocol_filed_attribute_flag = True
+ tmp_mail_account.append(mail_account_configuration)
+ count += 1
+ self.add_protocol_field(tmp_mail_account, "MailAccount", creation_element_position, count)
+ ftp_uri_flag, ftp_uri_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_FTP_URI")
+ if ftp_uri_flag:
+ protocol_filed_attribute_flag = True
+ tmp_ftp_uri.append(ftp_uri_configuration)
+ count += 1
+ self.add_protocol_field(tmp_ftp_uri, "FtpUri", creation_element_position, count)
+ ftp_content_flag, ftp_content_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_FTP_CONTENT")
+ if ftp_content_flag:
+ protocol_filed_attribute_flag = True
+ tmp_ftp_content.append(ftp_content_configuration)
+ count += 1
+ self.add_protocol_field(tmp_ftp_content, "FtpContent", creation_element_position, count)
+ ftp_account_flag, ftp_account_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_FTP_ACCOUNT")
+ if ftp_account_flag:
+ protocol_filed_attribute_flag = True
+ tmp_ftp_account.append(ftp_account_configuration)
+ count += 1
+ self.add_protocol_field(tmp_ftp_account, "FtpAccount", creation_element_position, count)
+ sip_ori_description_flag, sip_ori_description_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SIP_ORIGINATOR_DESCRIPTION")
+ if sip_ori_description_flag:
+ protocol_filed_attribute_flag = True
+ tmp_sip_ori_description.append(sip_ori_description_configuration)
+ count += 1
+ self.add_protocol_field(tmp_sip_ori_description, "SipOriDescription", creation_element_position, count)
+ sip_res_description_flag, sip_res_description_configuration = self.is_protocol_filed_exsit(policy_configuration, "ATTR_SIP_RESPONDER_DESCRIPTION")
+ if sip_res_description_flag:
+ protocol_filed_attribute_flag = True
+ tmp_sip_res_description.append(sip_res_description_configuration)
+ count += 1
+ self.add_protocol_field(tmp_sip_res_description, "SipResDescription", creation_element_position, count)
+
# override_flag赋值
if application_object_flag and protocol_filed_attribute_flag == False:
override_flag = True
@@ -801,18 +816,18 @@ class CreateRules:
self.driver.find_element(By.XPATH, temp_element_position, find_after_wait_time=0.5).click()
# self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_select{}_posXpath".format(ip_condition_type)]).click()
self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(ip_condition_type)]).click()
- for j in range(len(ip_data)):
- if ip_data[j]["negate"] == True:
- time.sleep(1) # 暂停1秒
- negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(ip_condition_type)])
- # 使用JavaScript确保元素完全进入视图
- self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', inline: 'nearest'});",negate_element)
- # 使用JavaScript执行悬停
- self.driver.execute_script("var evt = new MouseEvent('mouseover', {'view': window, 'bubbles': true, 'cancelable': true}); arguments[0].dispatchEvent(evt);",negate_element)
- # 确保元素可点击后进行点击
- self.driver.execute_script("arguments[0].click();", negate_element)
- # ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见
- # negate_element.click()
+ # for j in range(len(ip_data)):
+ if negate_option == True:
+ time.sleep(1) # 暂停1秒
+ negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(ip_condition_type)])
+ # 使用JavaScript确保元素完全进入视图
+ self.driver.execute_script("arguments[0].scrollIntoView({block: 'center', inline: 'nearest'});", negate_element)
+ # 使用JavaScript执行悬停
+ self.driver.execute_script("var evt = new MouseEvent('mouseover', {'view': window, 'bubbles': true, 'cancelable': true}); arguments[0].dispatchEvent(evt);", negate_element)
+ # 确保元素可点击后进行点击
+ self.driver.execute_script("arguments[0].click();", negate_element)
+ # ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见
+ # negate_element.click()
except Exception as e:
raise
@@ -833,38 +848,41 @@ class CreateRules:
temp_element_position = creation_element_position["policyRulePage_select{}_posXpath".format(port_condition_type)].format(replaceValue=port_data[z]["name"])
self.driver.find_element(By.XPATH, temp_element_position, find_after_wait_time=0.5).click()
self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(port_condition_type)]).click()
- for j in range(len(port_data)):
- if port_data[j]["negate"] == True:
- time.sleep(1) # 暂停1秒
- negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(port_condition_type)])
- ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见
- negate_element.click()
+ # for j in range(len(port_data)):
+ if negate_option == True:
+ time.sleep(1) # 暂停1秒
+ negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negate{}_posXpath".format(port_condition_type)])
+ ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见
+ negate_element.click()
except Exception as e:
raise
def add_device(self, creation_element_position, name, negate_option):
- # 点add condition的+
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click()
- # 选中imsi
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addConditionImsi_posXpath"]).click()
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click()
- # 点imsi的+,因为dst ip已经是第一层且t初始值是0,所以要+2
- temp_element_position = creation_element_position["policyRulePage_addImsi_posXpath"]
- self.driver.find_element(By.XPATH, temp_element_position).click()
- # 通过imsi搜索并选中
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=1).send_keys(name)
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER)
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectImsi_posXpath"], find_after_wait_time=0.5).click()
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeImsi_posXpath"]).click()
- if negate_option == True:
- time.sleep(1) # 暂停1秒
- negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negateImsi_posXpath"])
- ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见
- negate_element.click()
+ try:
+ # 点add condition的+
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click()
+ # 选中imsi
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addConditionImsi_posXpath"]).click()
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click()
+ # 点imsi的+,因为dst ip已经是第一层且t初始值是0,所以要+2
+ temp_element_position = creation_element_position["policyRulePage_addImsi_posXpath"]
+ self.driver.find_element(By.XPATH, temp_element_position).click()
+ # 通过imsi搜索并选中
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=1).send_keys(name)
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_searchImsi_posXpath"], find_after_wait_time=0.5).send_keys(Keys.ENTER)
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_selectImsi_posXpath"], find_after_wait_time=0.5).click()
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_closeImsi_posXpath"]).click()
+ if negate_option == True:
+ time.sleep(1) # 暂停1秒
+ negate_element = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_negateImsi_posXpath"])
+ ActionChains(self.driver).move_to_element(negate_element).perform() # 悬停以使元素可见
+ negate_element.click()
+ except Exception as e:
+ raise
add_subid = add_port
- def add_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num, negate_option):
+ def add_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num):
try:
# 强制点击add condition的+
btn = self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"])
@@ -883,24 +901,24 @@ class CreateRules:
except Exception as e:
raise
- def add_bool_type_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num, negate_option):
+ def add_bool_type_protocol_field(self, protocol_field_data, filed_type, creation_element_position, num):
try:
- for x in range(len(protocol_field_data)):
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click()
- var1 = protocol_field_data[x]["boolean_type"]
- xpath_temp = creation_element_position["policyRulePage_addCondition{}_posXpath".format(filed_type)]
- xpath_last = xpath_temp.format(replaceValue=var1)
- self.driver.find_element(By.XPATH, xpath_last).click()
- self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addCondition_close_posXpath"]).click()
- # 点protocol filed的+
- temp_element_position = creation_element_position["policyRulePage_add{}_posXpath".format(filed_type)].format(replaceValue=num)
- self.driver.find_element(By.XPATH, temp_element_position).click()
- # 通过protocol filed name搜索并选中
- self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_search{}_posXpath".format(filed_type)],find_after_wait_time=1).send_keys(protocol_field_data[x]["name"])
- self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_search{}_posXpath".format(filed_type)],find_after_wait_time=0.5).send_keys(Keys.ENTER)
- self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_select{}_posXpath".format(filed_type)],find_after_wait_time=0.5).click()
- self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(filed_type)]).click()
- num = num +1
+ # for x in range(len(protocol_field_data)):
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click()
+ value = filed_type.lower()
+ xpath_temp = creation_element_position["policyRulePage_addCondition{}_posXpath".format("Boolean")]
+ xpath_last = xpath_temp.format(replaceValue=value)
+ self.driver.find_element(By.XPATH, xpath_last).click()
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click()
+ # 点protocol filed的+
+ temp_element_position = creation_element_position["policyRulePage_add{}_posXpath".format("Boolean")].format(replaceValue=num)
+ self.driver.find_element(By.XPATH, temp_element_position).click()
+ # 通过protocol filed name搜索并选中
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_search{}_posXpath".format("Boolean")], find_after_wait_time=1).send_keys(protocol_field_data["name"])
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_search{}_posXpath".format("Boolean")], find_after_wait_time=0.5).send_keys(Keys.ENTER)
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_select{}_posXpath".format("Boolean")], find_after_wait_time=0.5).click()
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format("Boolean")]).click()
+ num = num +1
except Exception as e:
raise
@@ -910,18 +928,18 @@ class CreateRules:
self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_posXpath"]).click()
# 选中port
self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition{}_posXpath".format(filed_type)]).click()
- self.driver.find_element(By.XPATH,creation_element_position["policyRulePage_addCondition_close_posXpath"]).click()
+ self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_addCondition_close_posXpath"]).click()
# 点source port的+
self.driver.find_element(By.XPATH, creation_element_position[ "policyRulePage_add{}_posXpath".format(filed_type)]).click()
# 通过port name搜索并选中
is_first_iteration = True
- for z in ip_protocol_data:
- #如果不是第一次循环,需要点加号调侧滑框
+ for z in ip_protocol_data["items"]:
+ # 如果不是第一次循环,需要点加号调侧滑框
if not is_first_iteration:
self.driver.find_element(By.XPATH, policyRulePage_little_button_addIpProtocol_posXpath).click()
self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_search{}_posXpath".format(filed_type)], find_after_wait_time=1).send_keys(z)
self.driver.find_element(By.XPATH, creation_element_position[ "policyRulePage_search{}_posXpath".format(filed_type)], find_after_wait_time=0.5).send_keys(Keys.ENTER)
- temp_element_position = creation_element_position[ "policyRulePage_select{}_posXpath".format(filed_type)].format( replaceValue=z)
+ temp_element_position = creation_element_position[ "policyRulePage_select{}_posXpath".format(filed_type)].format(replaceValue=z)
self.driver.find_element(By.XPATH, temp_element_position, find_after_wait_time=0.5).click()
self.driver.find_element(By.XPATH, creation_element_position["policyRulePage_close{}_posXpath".format(filed_type)]).click()
is_first_iteration = False
@@ -948,8 +966,8 @@ class CreateRules:
or_conditions = and_conditions[i]["or_conditions"]
for j in range(len(or_conditions)):
if or_conditions[j]["attribute_name"] == attribute_name:
- return [True, or_conditions[j], or_conditions["negate_option"]]
- return [False, "", ""]
+ return True, or_conditions, and_conditions[i]["negate_option"]
+ return False, "", ""
def is_group_exsit(self, policy_configuration):
if "and_conditions" in policy_configuration:
@@ -958,42 +976,44 @@ class CreateRules:
or_conditions = and_conditions[i]["or_conditions"]
for j in range(len(or_conditions)):
if "member_type" in or_conditions[j].keys and or_conditions[j]["member_type"] != "":
- return [True, or_conditions[j], or_conditions["negate_option"]]
- return [False, "", ""]
+ return True, or_conditions, and_conditions[i]["negate_option"]
+ return False, "", ""
- def is_protocol_filed_exsit(self, policy_configuration):
- protocol_filed_list = [
- "ATTR_HTTP_URL",
- "ATTR_HTTP_REQ_HDR",
- "ATTR_HTTP_RES_HDR",
- "ATTR_HTTP_REQ_BODY",
- "ATTR_HTTP_RES_BODY",
- "ATTR_SSL_CN",
- "ATTR_SSL_CN_CAT",
- "ATTR_SSL_SAN",
- "ATTR_SSL_SAN_CAT",
- "ATTR_SSL_ECH",
- "ATTR_SSL_ESNI",
- "ATTR_SSL_NO_SNI",
- "ATTR_DNS_QNAME",
- "ATTR_MAIL_SUBJECT",
- "ATTR_MAIL_CONTENT",
- "ATTR_MAIL_ATT_NAME",
- "ATTR_MAIL_ATT_CONTENT",
- "ATTR_MAIL_FROM",
- "ATTR_MAIL_TO",
- "ATTR_MAIL_ACCOUNT",
- "ATTR_FTP_URI",
- "ATTR_FTP_CONTENT",
- "ATTR_FTP_ACCOUNT",
- "ATTR_SIP_ORIGINATOR_DESCRIPTION",
- "ATTR_SIP_RESPONDER_DESCRIPTION"
- ]
+ def is_protocol_filed_exsit(self, policy_configuration, attribute_value):
+ """
+ # protocol_filed_list = [
+ # "ATTR_HTTP_URL",
+ # "ATTR_HTTP_REQ_HDR",
+ # "ATTR_HTTP_RES_HDR",
+ # "ATTR_HTTP_REQ_BODY",
+ # "ATTR_HTTP_RES_BODY",
+ # "ATTR_SSL_CN",
+ # "ATTR_SSL_CN_CAT",
+ # "ATTR_SSL_SAN",
+ # "ATTR_SSL_SAN_CAT",
+ # "ATTR_SSL_ECH",
+ # "ATTR_SSL_ESNI",
+ # "ATTR_SSL_NO_SNI",
+ # "ATTR_DNS_QNAME",
+ # "ATTR_MAIL_SUBJECT",
+ # "ATTR_MAIL_CONTENT",
+ # "ATTR_MAIL_ATT_NAME",
+ # "ATTR_MAIL_ATT_CONTENT",
+ # "ATTR_MAIL_FROM",
+ # "ATTR_MAIL_TO",
+ # "ATTR_MAIL_ACCOUNT",
+ # "ATTR_FTP_URI",
+ # "ATTR_FTP_CONTENT",
+ # "ATTR_FTP_ACCOUNT",
+ # "ATTR_SIP_ORIGINATOR_DESCRIPTION",
+ # "ATTR_SIP_RESPONDER_DESCRIPTION"
+ # ]
+ """
if "and_conditions" in policy_configuration:
and_conditions = policy_configuration["and_conditions"]
for i in range(len(and_conditions)):
or_conditions = and_conditions[i]["or_conditions"]
for j in range(len(or_conditions)):
- if or_conditions[j]["attribute_name"] in protocol_filed_list:
- return [True]
- return [False] \ No newline at end of file
+ if or_conditions[j]["attribute_name"] == attribute_value:
+ return True, or_conditions
+ return False, "" \ No newline at end of file