diff options
| author | hebingning <[email protected]> | 2024-05-28 19:47:02 +0800 |
|---|---|---|
| committer | hebingning <[email protected]> | 2024-05-28 19:47:02 +0800 |
| commit | 1901b3207c063d1131ff64bfb0a1ce81fa4c1ff1 (patch) | |
| tree | d19dbbea554a92915a15f271042504fc168ac871 | |
| parent | 41f957343ad8bf2652132d4135c4061e92d1ca24 (diff) | |
提交geolocation的新增、删除、被策略引用逻辑
| -rw-r--r-- | createObject.py | 81 | ||||
| -rw-r--r-- | delConfig.py | 13 | ||||
| -rw-r--r-- | verify.py | 8 |
3 files changed, 85 insertions, 17 deletions
diff --git a/createObject.py b/createObject.py index e8e9350..5e7ad67 100644 --- a/createObject.py +++ b/createObject.py @@ -18,6 +18,8 @@ class CreateObject(): self.keywords_ids = [] self.account_ids = [] self.http_signature_ids = [] + self.geo_ids = [] + self.geo_name_ids = [] def create_condition(self, token, condition, template, api_host, test_pc_ip, vsys_id): # 此处的condition为测试用例json中obj_condition_{i}的相关数据 @@ -45,7 +47,7 @@ class CreateObject(): if attribute_name == 'ATTR_SOURCE_IP' or attribute_name == 'ATTR_SUBSCRIBER_ID': for obj_dict in objects_list: source_list.append(obj_dict) - elif attribute_name == 'ATTR_DESTINATION_IP' or attribute_name == "ATTR_DESTINATION_LOCATION" or attribute_name == "ATTR_SERVER_FQDN": + elif attribute_name == 'ATTR_DESTINATION_IP' or attribute_name == "ATTR_DESTINATION_GEO_COUNTRY" or attribute_name == "ATTR_SERVER_FQDN": for obj_dict in objects_list: dst_list.append(obj_dict) elif attribute_name == 'ATTR_FLAG': @@ -67,7 +69,7 @@ class CreateObject(): if attribute_name == 'ATTR_SOURCE_IP' or attribute_name == 'ATTR_SUBSCRIBER_ID': for obj_dict in objects_list: source_list.append(obj_dict) - elif attribute_name == 'ATTR_DESTINATION_IP' or attribute_name == "ATTR_DESTINATION_LOCATION" or attribute_name == "ATTR_SERVER_FQDN": + elif attribute_name == 'ATTR_DESTINATION_IP' or attribute_name == "ATTR_DESTINATION_GEO_COUNTRY" or attribute_name == "ATTR_SERVER_FQDN": for obj_dict in objects_list: dst_list.append(obj_dict) elif attribute_name == 'ATTR_FLAG': @@ -76,7 +78,7 @@ class CreateObject(): else: for obj_dict in objects_list: filter_list.append(obj_dict) - return res_time_result, source_list, dst_list, filter_list, flag_list, self.ip_ids, self.subid_ids, self.fqdn_ids, self.url_ids, self.flag_ids, self.keywords_ids, self.account_ids, self.http_signature_ids + return res_time_result, source_list, dst_list, filter_list, flag_list, self.ip_ids, self.subid_ids, self.fqdn_ids, self.url_ids, self.flag_ids, self.keywords_ids, self.account_ids, self.http_signature_ids, self.geo_name_ids # 对多个objects的处理(处理之后要调用create_object) def organize_objects_data(self, obj_template_dict, condition, headers, api_host, test_pc_ip, vsys_id, token, index, run_times): @@ -107,6 +109,7 @@ class CreateObject(): if is_import == 0: member = self.combine_object_data(obj_type, obj_sub_type, add_item_list, test_pc_ip, context_name, is_repeat) obj_template_dict['object']['member'] = member + # object_ids_temp_list 用于创建rule的object id object_ids_temp_list, res_time_result = self.create_object(obj_template_dict, headers, api_host, is_repeat) else: object_ids_temp_list, res_time_result = self.import_object(obj_type, vsys_id, add_item_list, api_host, token, is_repeat) @@ -119,12 +122,13 @@ class CreateObject(): object_temp_list.append(objects_list) elif len(objects_list) > 0 and is_repeat == 1 and index > run_times-1: object_temp_list.append(objects_list) - return object_temp_list, res_time_result + return object_temp_list, res_time_result # 组织创建通过items进行新增的object的数据 def combine_object_data(self, obj_type, obj_sub_type, add_item_list, test_pc_ip, context_name, is_repeat): items = [] object_dict = {} + geolocation = {} if is_repeat == 0: if obj_type == "ip" and obj_sub_type != "geo_location": for item in add_item_list: @@ -167,6 +171,35 @@ class CreateObject(): string = dict(contextual_string=patterns, op='add') string['contextual_string']['contextName'] = context_name items.append(string) + elif obj_type == "geolocation": + geo_ip_address = [] + # geo_ip_address_dict = {} + for item in add_item_list: + if 'continent' in item: + geolocation['continent'] = item['continent'] + if 'geoname_id' in item: + geolocation['geoname_id'] = item['geoname_id'] + if 'super_administrative_area' in item: + geolocation['super_administrative_area'] = item['super_administrative_area'] + if 'administrative_area' in item: + geolocation['administrative_area'] = item['administrative_area'] + if 'country_abbr' in item: + geolocation['country_abbr'] = item['country_abbr'] + if 'country' in item: + geolocation['country'] = item['country'] + if 'location_type' in item: + geolocation['location_type'] = item['location_type'] + if 'latitude' in item: + geolocation['latitude'] = item['latitude'] + if 'longitude' in item: + geolocation['longitude'] = item['longitude'] + if 'addr_type' in item: + addr_type = item['addr_type'] + if 'ip_address' in item: + ip_address = item['ip_address'] + geo_ip_address_dict = dict(addr_type = addr_type, ip_address = ip_address, op = 'add') + geo_ip_address.append(geo_ip_address_dict) + geolocation['ip_addresses'] = geo_ip_address else: for item in add_item_list: patterns = [] @@ -202,20 +235,26 @@ class CreateObject(): object_dict['ip_address'] = ip object_dict = dict(ip=object_dict, op='add') items.append(object_dict) - - member = dict(items=items, type=1) - # print(member) + if obj_type == 'geolocation': + member = dict(geolocation = geolocation, type = 'library') + else: + member = dict(items=items, type=1) return member def import_object(self, obj_type, vsys_id, add_item_list, api_host, token, is_repeat): headers = {"Authorization": token} data = {'type': obj_type, 'vsys_id': vsys_id} + dry_data = {'type': obj_type, 'vsys_id': vsys_id, 'is_dry_run':1} for item in add_item_list: object_file = item['keywordArray'][0] file_name = object_file.split("/")[-1] files = {"file":(file_name, open(object_file, 'rb'), "text/plain")} url = api_host + "/v1/policy/object/import" + # dry run校验导入的文件是否合规 + response = requests.post(url, data=dry_data, headers=headers, files= files, verify=False) + assert response.status_code == 200 time1 = datetime.utcnow() + # 正式导入 response = requests.post(url, data=data, headers=headers, files= files, verify=False) print('已经请求了1次了') print('本次请求返回的code号是'+ str(response.status_code)) @@ -229,7 +268,7 @@ class CreateObject(): else: res_time_result = True response_dict = json.loads(response.text) - object_ids = self.get_object_ids(response_dict) + object_ids, geo_name_id_list = self.get_object_ids(response_dict, obj_type) # 为了每次清空需要反复创建的object的组合使用,否则会反复删除已经存在的object if is_repeat == 1 and obj_type != 'ip': self.fqdn_ids = [] @@ -253,12 +292,17 @@ class CreateObject(): self.account_ids = self.handle_ids(object_ids, self.account_ids) elif obj_type == 'http_signature': self.http_signature_ids = self.handle_ids(object_ids, self.http_signature_ids) + elif obj_type == 'geolocation': + self.geo_ids = self.handle_ids(object_ids, self.geo_ids) + self.geo_name_ids = self.handle_ids(geo_name_id_list, self.geo_name_ids) return object_ids, res_time_result # 调用创建object的接口 def create_object(self, obj_template_dict, headers, api_host, is_repeat): url = api_host + "/v1/policy/object" + print(json.dumps(obj_template_dict)) response = requests.post(url, headers=headers, json=obj_template_dict, verify = False) + print(response.text) print('已经请求了1次了') print('本次请求返回的code号是'+ str(response.status_code)) assert response.status_code == 200 @@ -276,7 +320,7 @@ class CreateObject(): print(response_dict) time.sleep(20) obj_type = response_dict['data']['object']['type'] - object_ids = self.get_object_ids(response_dict) + object_ids, geo_name_id_list = self.get_object_ids(response_dict, obj_type) # 为了每次清空需要反复创建的object的组合使用,否则会反复删除已经存在的object obj_template_dict['object']['type'] = obj_type if is_repeat == 1 and obj_type != 'ip': @@ -301,14 +345,25 @@ class CreateObject(): self.account_ids = self.handle_ids(object_ids, self.account_ids) elif obj_type == 'http_signature': self.http_signature_ids = self.handle_ids(object_ids, self.http_signature_ids) + elif obj_type == 'geolocation': + self.geo_ids = self.handle_ids(object_ids, self.geo_ids) + self.geo_name_ids = self.handle_ids(geo_name_id_list, self.geo_name_ids) return object_ids, res_time_result - # 获取返回结果中的objectsid - def get_object_ids(self, response_dict): + # 非geolocation,获取返回结果中的objects id(用于策略引用和删除object) + # 对于geolocation,获取country_object_id(策略引用),geoname_id(删除object) + def get_object_ids(self, response_dict, obj_type): temp_list = [] - object_id = response_dict['data']['object']['id'] + temp_geo_name_id_list = [] + if obj_type == 'geolocation': + object_id = response_dict['data']['object']['member']['geolocation']['country_object_id'] + else: + object_id = response_dict['data']['object']['id'] + if obj_type == 'geolocation': + geoname_id = response_dict['data']['object']['member']['geolocation']['geoname_id'] + temp_geo_name_id_list.append(geoname_id) temp_list.append(object_id) - return temp_list + return temp_list,temp_geo_name_id_list # 组织在策略中使用的source,filter等数据 def obj_ids_to_policy_obj_list(self, object_ids, attribute_name, is_negate): diff --git a/delConfig.py b/delConfig.py index edeb2cf..63983a3 100644 --- a/delConfig.py +++ b/delConfig.py @@ -4,7 +4,7 @@ import time import requests class Erase(): - def del_config(self, token, create_policies_ids, create_ip_ids, create_subid_ids, create_fqdn_ids, create_url_ids, create_flag_ids, create_keywords_ids, create_account_ids, create_http_signature_ids, create_profile_ids, api_host, vsys_id): + def del_config(self, token, create_policies_ids, create_ip_ids, create_subid_ids, create_fqdn_ids, create_url_ids, create_flag_ids, create_keywords_ids, create_account_ids, create_http_signature_ids, create_profile_ids, create_geo_name_ids, api_host, vsys_id): headers = {"Authorization": token} del_obj_template = {"type":"ip", "ids":[], "vsys_id":1} del_rule_template = {"ids":[], "type":"security", "vsys_id":1} @@ -122,7 +122,16 @@ class Erase(): del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id} response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 - + if len(create_geo_name_ids) > 0: + del del_obj_template["type"] + del del_obj_template["ids"] + create_geo_name_id = create_geo_name_ids[0] + del_obj_template["geoname_ids"] = create_geo_name_id + del_obj_template["vsys_id"] = vsys_id + url = api_host + "/v1/policy/library/geolocation" + print(del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) + assert response.status_code == 200 if __name__ == '__main__': # ipObject = CreateIpObject() @@ -85,6 +85,7 @@ class Verify(): self.create_keywords_ids = [] self.create_account_ids = [] self.create_http_signature_ids = [] + self.create_geo_name_ids = [] self.create_policies_ids = [] self.statistics_info_list = [] # 组织statistics policy template id关系 # 读取json文件 @@ -111,7 +112,7 @@ class Verify(): cmd_result, log_result, metric_result = True, True, True # 删除配置 erase = delConfig.Erase() - erase.del_config(self.token, self.create_policies_ids, self.create_ip_ids, self.create_subid_ip_ids, self.create_fqdn_ids, self.create_url_ids, self.create_flag_ids, self.create_keywords_ids, self.create_account_ids, self.create_http_signature_ids, self.create_profile_ids, api_host, vsys_id) + erase.del_config(self.token, self.create_policies_ids, self.create_ip_ids, self.create_subid_ip_ids, self.create_fqdn_ids, self.create_url_ids, self.create_flag_ids, self.create_keywords_ids, self.create_account_ids, self.create_http_signature_ids, self.create_profile_ids, self.create_geo_name_ids, api_host, vsys_id) # 生成报告的数据 self.excuted_cases_count += 1 report_data = [] @@ -192,7 +193,7 @@ class Verify(): # 创建object object_info = createObject.CreateObject() obj_template = path_dict["obj_template"] - res_time_result, source_list, dst_list, filter_list, flag_list, ip_ids, subid_ids, fqdn_ids, url_ids, flag_ids, keywords_ids, account_ids, http_signature_ids = object_info.create_condition(self.token, obj_condition, obj_template, api_host, test_pc_ip, vsys_id) + res_time_result, source_list, dst_list, filter_list, flag_list, ip_ids, subid_ids, fqdn_ids, url_ids, flag_ids, keywords_ids, account_ids, http_signature_ids, geo_name_ids = object_info.create_condition(self.token, obj_condition, obj_template, api_host, test_pc_ip, vsys_id) self.res_time_result = res_time_result # 如果need_to_verify为true或has_verify_flag为0,表示需要验证功能,如果need_to_verify为false,则不需要验证,直接退出 if has_verify_flag == 1 and condition["need_to_verify"] == False: @@ -214,6 +215,8 @@ class Verify(): self.create_account_ids.extend(account_ids) if len(http_signature_ids) > 0: self.create_http_signature_ids.extend(http_signature_ids) + if len(geo_name_ids) > 0: + self.create_geo_name_ids.extend(geo_name_ids) # 创建profile profile_condition_key = "profile_condition_{}".format(create_number) if (profile_condition_key in condition.keys()) and len(condition[profile_condition_key]) > 0: # 含有profile_condition_ key 并且非空 @@ -296,6 +299,7 @@ class Verify(): command = condition['command'] p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE,encoding="utf-8") output, error = p.communicate() + output = output + error # cmd_return = output.decode(errors='ignore') cmd_return = output |
