diff options
| author | hebingning <[email protected]> | 2024-04-10 09:26:18 +0800 |
|---|---|---|
| committer | hebingning <[email protected]> | 2024-04-10 09:26:18 +0800 |
| commit | 8b840d93b8b159c6cc8b1f7dcc4a714a7814d296 (patch) | |
| tree | 5fd7b54365664ac2752870e62683bb07849abc45 | |
| parent | 7d9ea65725d1bccc7f76d376dfd0ad4fcf6fe521 (diff) | |
提交代码,支持https的接口请求
| -rw-r--r-- | createObject.py | 6 | ||||
| -rw-r--r-- | createProfile.py | 14 | ||||
| -rw-r--r-- | createRule.py | 4 | ||||
| -rw-r--r-- | delConfig.py | 34 | ||||
| -rw-r--r-- | getApplicationId.py | 12 | ||||
| -rw-r--r-- | getLog.py | 10 | ||||
| -rw-r--r-- | log_query.py | 4 | ||||
| -rw-r--r-- | verify.py | 6 |
8 files changed, 50 insertions, 40 deletions
diff --git a/createObject.py b/createObject.py index 62c9c41..e8e9350 100644 --- a/createObject.py +++ b/createObject.py @@ -5,6 +5,8 @@ import requests import json from datetime import datetime import delConfig +import urllib3 +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) class CreateObject(): def __init__(self): @@ -214,7 +216,7 @@ class CreateObject(): files = {"file":(file_name, open(object_file, 'rb'), "text/plain")} url = api_host + "/v1/policy/object/import" time1 = datetime.utcnow() - response = requests.post(url, data=data, headers=headers, files= files) + response = requests.post(url, data=data, headers=headers, files= files, verify=False) print('已经请求了1次了') print('本次请求返回的code号是'+ str(response.status_code)) time.sleep(20) @@ -256,7 +258,7 @@ class CreateObject(): # 调用创建object的接口 def create_object(self, obj_template_dict, headers, api_host, is_repeat): url = api_host + "/v1/policy/object" - response = requests.post(url, headers=headers, json=obj_template_dict) + response = requests.post(url, headers=headers, json=obj_template_dict, verify = False) print('已经请求了1次了') print('本次请求返回的code号是'+ str(response.status_code)) assert response.status_code == 200 diff --git a/createProfile.py b/createProfile.py index bf0d4ff..a95b452 100644 --- a/createProfile.py +++ b/createProfile.py @@ -174,7 +174,7 @@ class CreateProfile(): if profile_type == "service_function": process_post_data = self.process_sf_post_data(sf_tempalte_path, sf_post_data,vsys_id) url = "{}/v1/policy/profile/service_function".format(api_host) - response = requests.post(url, headers=headers, json=process_post_data) + response = requests.post(url, headers=headers, json=process_post_data, verify=False) response_dict = response.json() sf_profile_id = response_dict["data"]["service_function"]["id"] return sf_profile_id @@ -225,7 +225,7 @@ class CreateProfile(): url = "{}/v1/policy/profile/statistics_template".format(api_host) headers["Content-Type"] = "application/json" #print(process_post_data) - response = requests.post(url, headers=headers, json=process_post_data) + response = requests.post(url, headers=headers, json=process_post_data, verify=False) response_dict = response.json() # 处理返回数据,提取id time.sleep(2) @@ -248,7 +248,7 @@ class CreateProfile(): process_post_data = self.process_sff_post_data(sf_profile_id_list, profile_sff_template, profile_data,vsys_id) url = "{}/v1/policy/profile/service_function_forwarder".format(api_host) headers["Content-Type"] = "application/json" - response = requests.post(url, headers=headers, json= process_post_data) + response = requests.post(url, headers=headers, json= process_post_data, verify=False) response_dict = response.json() sff_id = response_dict["data"]["service_function_forwarder"]["id"] create_profile_id_dict["profile_type"] = profile_type @@ -262,7 +262,7 @@ class CreateProfile(): # 添加vsys id profile_data["vsys_id"] = vsys_id url = "{}/v1/policy/profile/response_page".format(api_host) - response = requests.post(url=url, data=profile_data, headers=headers, files=files) + response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False) response_dict = response.json() profile_id = response_dict["data"]["response_page"]["id"] create_profile_id_dict["profile_type"] = profile_type @@ -274,7 +274,7 @@ class CreateProfile(): # 添加vsys id profile_data["vsys_id"] = vsys_id url = "{}/v1/policy/profile/hijack_file".format(api_host) - response = requests.post(url=url, data=profile_data, headers=headers, files=files) + response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False) response_dict = response.json() profile_id = response_dict["data"]["hijack_file"]["id"] create_profile_id_dict["profile_type"] = profile_type @@ -286,7 +286,7 @@ class CreateProfile(): # 添加vsys id profile_data["vsys_id"] = vsys_id url = "{}/v1/policy/profile/insert_script".format(api_host) - response = requests.post(url=url, data=profile_data, headers=headers, files=files) + response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False) response_dict = response.json() profile_id = response_dict["data"]["insert_script"]["id"] create_profile_id_dict["profile_type"] = profile_type @@ -298,7 +298,7 @@ class CreateProfile(): # 添加vsys id profile_data["vsys_id"] = vsys_id url = "{}/v1/policy/profile/run_script".format(api_host) - response = requests.post(url=url, data=profile_data, headers=headers, files=files) + response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False) response_dict = response.json() profile_id = response_dict["data"]["run_script"]["id"] create_profile_id_dict["profile_type"] = profile_type diff --git a/createRule.py b/createRule.py index a1d8ab0..d86cfe2 100644 --- a/createRule.py +++ b/createRule.py @@ -263,8 +263,8 @@ class CreatePolicyRule(): #请求创建策略接口,下发策略 def create_rule(self, rule_template_dict, headers, api_host): url = api_host + "/v1/policy/rule" - #print(json.dumps(rule_template_dict)) - response = requests.post(url, headers=headers, json=rule_template_dict) + print(json.dumps(rule_template_dict)) + response = requests.post(url, headers=headers, json=rule_template_dict, verify=False) #print(response) assert response.status_code == 200 return_json_data = json.loads(response.text) diff --git a/delConfig.py b/delConfig.py index 9eda801..edeb2cf 100644 --- a/delConfig.py +++ b/delConfig.py @@ -15,63 +15,63 @@ class Erase(): del_rule_template['ids'] = list(create_policies_ids[i].values())[0] del_rule_template['vsys_id'] = vsys_id url = api_host + "/v1/policy/rule" - response = requests.delete(url, headers=headers, params=del_rule_template) + response = requests.delete(url, headers=headers, params=del_rule_template, verify=False) assert response.status_code == 200 if len(create_ip_ids) > 0: del_obj_template['type'] = "ip" del_obj_template['ids'] = create_ip_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_subid_ids) > 0: del_obj_template['type'] = "subscriberid" del_obj_template['ids'] = create_subid_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_fqdn_ids) > 0: del_obj_template['type'] = "fqdn" del_obj_template['ids'] = create_fqdn_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_url_ids) > 0: del_obj_template['type'] = "url" del_obj_template['ids'] = create_url_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_flag_ids) > 0: del_obj_template['type'] = "flag" del_obj_template['ids'] = create_flag_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_keywords_ids) > 0: del_obj_template['type'] = "keywords" del_obj_template['ids'] = create_keywords_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_account_ids) > 0: del_obj_template['type'] = "account" del_obj_template['ids'] = create_account_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_http_signature_ids) > 0: del_obj_template['type'] = "http_signature" del_obj_template['ids'] = create_http_signature_ids del_obj_template["vsys_id"] = vsys_id url = api_host + "/v1/policy/object" - response = requests.delete(url, headers=headers, params=del_obj_template) + response = requests.delete(url, headers=headers, params=del_obj_template, verify=False) assert response.status_code == 200 if len(create_profile_ids) > 0: for del_porfile_index in range(len(create_profile_ids)): @@ -79,48 +79,48 @@ class Erase(): profile_id = create_profile_ids[del_porfile_index]["id"] del_profile_template = {"ids":str(profile_id),"vsys_id":vsys_id} url = "{}/v1/policy/profile/statistics_template".format(api_host) - response = requests.delete(url, headers=headers, params=del_profile_template) + response = requests.delete(url, headers=headers, params=del_profile_template, verify=False) assert response.status_code == 200 elif create_profile_ids[del_porfile_index]["profile_type"] == "service_function_forwarder": # 删除SFF和SF profile_id = create_profile_ids[del_porfile_index]["id"] del_profile_sff = {"ids": str(profile_id),"vsys_id":vsys_id} url = "{}/v1/policy/profile/service_function_forwarder".format(api_host) - response = requests.delete(url, headers=headers, params=del_profile_sff) + response = requests.delete(url, headers=headers, params=del_profile_sff, verify=False) if response.status_code == 200: profile_id = create_profile_ids[del_porfile_index]["sff_info"]["sf_info"][0]["id"] del_profile_sf = {"ids": str(profile_id),"vsys_id":vsys_id} url = "{}/v1/policy/profile/service_function".format(api_host) - response = requests.delete(url, headers=headers, params=del_profile_sf) + response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 elif create_profile_ids[del_porfile_index]["profile_type"] == "response_page": # 删除response_page profile_id = create_profile_ids[del_porfile_index]["id"] url = "{}/v1/policy/profile/response_page".format(api_host) del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id} - response = requests.delete(url, headers=headers, params=del_profile_sf) + response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 elif create_profile_ids[del_porfile_index]["profile_type"] == "hijack_files": # 删除hijack_files profile_id = create_profile_ids[del_porfile_index]["id"] url = "{}/v1/policy/profile/hijack_file".format(api_host) del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id} - response = requests.delete(url, headers=headers, params=del_profile_sf) + response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 elif create_profile_ids[del_porfile_index]["profile_type"] == "insert_profile": # 删除insert profile_id = create_profile_ids[del_porfile_index]["id"] url = "{}/v1/policy/profile/insert_script".format(api_host) del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id} - response = requests.delete(url, headers=headers, params=del_profile_sf) + response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 elif create_profile_ids[del_porfile_index]["profile_type"] == "run_script": # 删除run_script profile_id = create_profile_ids[del_porfile_index]["id"] url = "{}/v1/policy/profile/run_script".format(api_host) del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id} - response = requests.delete(url, headers=headers, params=del_profile_sf) + response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 elif create_profile_ids[del_porfile_index]["profile_type"] == "dns_record": # 删除dns_record profile_id = create_profile_ids[del_porfile_index]["id"] url = "{}/v1/policy/profile/run_script".format(api_host) del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id} - response = requests.delete(url, headers=headers, params=del_profile_sf) + response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False) assert response.status_code == 200 diff --git a/getApplicationId.py b/getApplicationId.py index 487b589..d46f9b0 100644 --- a/getApplicationId.py +++ b/getApplicationId.py @@ -5,13 +5,21 @@ import pymysql.cursors import ast def get_app_id(app, api_host): - # 返回group_id + # http测试环境返回group_id mysql_host = api_host[7:] - # mysql_host = "192.168.40.198" + + # https 环境返回group_id + # mysql_host = api_host[8:] + # 集成测试环境 db = pymysql.connect(host=mysql_host, user='test', password='test', database='tsg-bifang') + # 45.250 环境 + # db = pymysql.connect(host=mysql_host, + # user='root', + # password='Bifang&*()', + # database='tsg-bifang-2402') app_group_id_list=[] # 该方法用于将字符串类型的['ssl', 'http']转换为list类型的['ssl', 'http'](在无法使用list函数直接转换的情况下) app = ast.literal_eval(app) @@ -12,7 +12,7 @@ class GetLog(): # 修改完成后删除注释:将下方的security_event替换成变量 headers = {'Content-Type': 'application/json', 'Authorization': token} url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id) - response = requests.get(url, headers=headers) + response = requests.get(url, headers=headers, verify=False) assert response.status_code == 200 log_schema = json.loads(response.text) log_schema = log_schema['data']['fields'] @@ -51,7 +51,7 @@ class GetLog(): log_condition_dict['filter'] = log_condition_dict['filter'].replace(f"client_ip={test_pc_ip}", f"client_ip='{test_pc_ip}'") url = api_host + "/v1/log/query" # print(json.dumps(log_condition_dict)) - response = requests.post(url, headers=headers, json=log_condition_dict) + response = requests.post(url, headers=headers, json=log_condition_dict, verify=False) assert response.status_code == 200 log_list = json.loads(response.text) # print(log_list) @@ -97,7 +97,7 @@ class GetLog(): hit_query_condition['end'] = end_time hit_query_condition['vsys_id'] = vsys_id print(json.dumps(hit_query_condition)) - response = requests.post(url, headers=headers, json=hit_query_condition) + response = requests.post(url, headers=headers, json=hit_query_condition, verify=False) assert response.status_code == 200 response = json.loads(response.text) index = i + 1 @@ -177,7 +177,7 @@ class GetLog(): post_data_ori_dict["sql"] =statistics_info["sql"] # print("statistics metric请求体数据:") #print(post_data_ori_dict) - response = requests.post(url, json=post_data_ori_dict, headers=headers) + response = requests.post(url, json=post_data_ori_dict, headers=headers, verify=False) #assert response.status_code == 200 r_dict = response.json() print("statistics metric结果数据:") @@ -369,7 +369,7 @@ class GetLog(): post_data_ori_dict["vsys_id"] = vsys_id # print("sc metric请求体数据:") # print(post_data_ori_dict) - response = requests.post(url, json=post_data_ori_dict, headers=headers) + response = requests.post(url, json=post_data_ori_dict, headers=headers, verify=False) r_dict = response.json() print("sc metric结果数据:") # print(r_dict) diff --git a/log_query.py b/log_query.py index 6f785c3..3ddef20 100644 --- a/log_query.py +++ b/log_query.py @@ -32,7 +32,7 @@ class Verify(): loginJson = {"username": "", "password": ""} loginJson["username"] = user loginJson["password"] = self.password - response = requests.post(url, json=loginJson) + response = requests.post(url, json=loginJson, verify=False) jsonData = json.loads(response.text) self.token = jsonData["data"]["token"] return self.token @@ -134,7 +134,7 @@ class Verify(): log_condition_dict['filter'] = log_filter url = api_host + "/v1/log/query" # print(json.dumps(log_condition_dict)) - response = requests.post(url, headers=headers, json=log_condition_dict) + response = requests.post(url, headers=headers, json=log_condition_dict, verify=False) log_code = response.status_code if log_code == 200: log_result = True @@ -34,7 +34,7 @@ class Verify(): url = api_host + "/v1/user/encryptpwd" pwJson = {"password": ""} pwJson["password"] = pwd - response = requests.get(url, params=pwJson) + response = requests.get(url, params=pwJson, verify=False) data = json.loads(response.text) self.password = data["data"]["encryptpwd"] return self.password @@ -44,7 +44,7 @@ class Verify(): loginJson = {"username": "", "password": ""} loginJson["username"] = user loginJson["password"] = self.password - response = requests.post(url, json=loginJson) + response = requests.post(url, json=loginJson, verify=False) jsonData = json.loads(response.text) self.token = jsonData["data"]["token"] return self.token @@ -714,7 +714,7 @@ class Verify(): verify_obj_comm_dict_0["appName"] = appName verify_list_dict["verify_list"][0]["verify_session"]["attributes"].append(verify_obj_comm_dict_0) #print(verify_list_dict) - response = requests.post(url, json=verify_list_dict, headers=headers) + response = requests.post(url, json=verify_list_dict, headers=headers, verify=False) r_dict = response.json() #print(json.dumps(r_dict)) hit_policy_list = [] |
