summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhebingning <[email protected]>2024-04-10 09:26:18 +0800
committerhebingning <[email protected]>2024-04-10 09:26:18 +0800
commit8b840d93b8b159c6cc8b1f7dcc4a714a7814d296 (patch)
tree5fd7b54365664ac2752870e62683bb07849abc45
parent7d9ea65725d1bccc7f76d376dfd0ad4fcf6fe521 (diff)
提交代码,支持https的接口请求
-rw-r--r--createObject.py6
-rw-r--r--createProfile.py14
-rw-r--r--createRule.py4
-rw-r--r--delConfig.py34
-rw-r--r--getApplicationId.py12
-rw-r--r--getLog.py10
-rw-r--r--log_query.py4
-rw-r--r--verify.py6
8 files changed, 50 insertions, 40 deletions
diff --git a/createObject.py b/createObject.py
index 62c9c41..e8e9350 100644
--- a/createObject.py
+++ b/createObject.py
@@ -5,6 +5,8 @@ import requests
import json
from datetime import datetime
import delConfig
+import urllib3
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class CreateObject():
def __init__(self):
@@ -214,7 +216,7 @@ class CreateObject():
files = {"file":(file_name, open(object_file, 'rb'), "text/plain")}
url = api_host + "/v1/policy/object/import"
time1 = datetime.utcnow()
- response = requests.post(url, data=data, headers=headers, files= files)
+ response = requests.post(url, data=data, headers=headers, files= files, verify=False)
print('已经请求了1次了')
print('本次请求返回的code号是'+ str(response.status_code))
time.sleep(20)
@@ -256,7 +258,7 @@ class CreateObject():
# 调用创建object的接口
def create_object(self, obj_template_dict, headers, api_host, is_repeat):
url = api_host + "/v1/policy/object"
- response = requests.post(url, headers=headers, json=obj_template_dict)
+ response = requests.post(url, headers=headers, json=obj_template_dict, verify = False)
print('已经请求了1次了')
print('本次请求返回的code号是'+ str(response.status_code))
assert response.status_code == 200
diff --git a/createProfile.py b/createProfile.py
index bf0d4ff..a95b452 100644
--- a/createProfile.py
+++ b/createProfile.py
@@ -174,7 +174,7 @@ class CreateProfile():
if profile_type == "service_function":
process_post_data = self.process_sf_post_data(sf_tempalte_path, sf_post_data,vsys_id)
url = "{}/v1/policy/profile/service_function".format(api_host)
- response = requests.post(url, headers=headers, json=process_post_data)
+ response = requests.post(url, headers=headers, json=process_post_data, verify=False)
response_dict = response.json()
sf_profile_id = response_dict["data"]["service_function"]["id"]
return sf_profile_id
@@ -225,7 +225,7 @@ class CreateProfile():
url = "{}/v1/policy/profile/statistics_template".format(api_host)
headers["Content-Type"] = "application/json"
#print(process_post_data)
- response = requests.post(url, headers=headers, json=process_post_data)
+ response = requests.post(url, headers=headers, json=process_post_data, verify=False)
response_dict = response.json()
# 处理返回数据,提取id
time.sleep(2)
@@ -248,7 +248,7 @@ class CreateProfile():
process_post_data = self.process_sff_post_data(sf_profile_id_list, profile_sff_template, profile_data,vsys_id)
url = "{}/v1/policy/profile/service_function_forwarder".format(api_host)
headers["Content-Type"] = "application/json"
- response = requests.post(url, headers=headers, json= process_post_data)
+ response = requests.post(url, headers=headers, json= process_post_data, verify=False)
response_dict = response.json()
sff_id = response_dict["data"]["service_function_forwarder"]["id"]
create_profile_id_dict["profile_type"] = profile_type
@@ -262,7 +262,7 @@ class CreateProfile():
# 添加vsys id
profile_data["vsys_id"] = vsys_id
url = "{}/v1/policy/profile/response_page".format(api_host)
- response = requests.post(url=url, data=profile_data, headers=headers, files=files)
+ response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False)
response_dict = response.json()
profile_id = response_dict["data"]["response_page"]["id"]
create_profile_id_dict["profile_type"] = profile_type
@@ -274,7 +274,7 @@ class CreateProfile():
# 添加vsys id
profile_data["vsys_id"] = vsys_id
url = "{}/v1/policy/profile/hijack_file".format(api_host)
- response = requests.post(url=url, data=profile_data, headers=headers, files=files)
+ response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False)
response_dict = response.json()
profile_id = response_dict["data"]["hijack_file"]["id"]
create_profile_id_dict["profile_type"] = profile_type
@@ -286,7 +286,7 @@ class CreateProfile():
# 添加vsys id
profile_data["vsys_id"] = vsys_id
url = "{}/v1/policy/profile/insert_script".format(api_host)
- response = requests.post(url=url, data=profile_data, headers=headers, files=files)
+ response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False)
response_dict = response.json()
profile_id = response_dict["data"]["insert_script"]["id"]
create_profile_id_dict["profile_type"] = profile_type
@@ -298,7 +298,7 @@ class CreateProfile():
# 添加vsys id
profile_data["vsys_id"] = vsys_id
url = "{}/v1/policy/profile/run_script".format(api_host)
- response = requests.post(url=url, data=profile_data, headers=headers, files=files)
+ response = requests.post(url=url, data=profile_data, headers=headers, files=files, verify=False)
response_dict = response.json()
profile_id = response_dict["data"]["run_script"]["id"]
create_profile_id_dict["profile_type"] = profile_type
diff --git a/createRule.py b/createRule.py
index a1d8ab0..d86cfe2 100644
--- a/createRule.py
+++ b/createRule.py
@@ -263,8 +263,8 @@ class CreatePolicyRule():
#请求创建策略接口,下发策略
def create_rule(self, rule_template_dict, headers, api_host):
url = api_host + "/v1/policy/rule"
- #print(json.dumps(rule_template_dict))
- response = requests.post(url, headers=headers, json=rule_template_dict)
+ print(json.dumps(rule_template_dict))
+ response = requests.post(url, headers=headers, json=rule_template_dict, verify=False)
#print(response)
assert response.status_code == 200
return_json_data = json.loads(response.text)
diff --git a/delConfig.py b/delConfig.py
index 9eda801..edeb2cf 100644
--- a/delConfig.py
+++ b/delConfig.py
@@ -15,63 +15,63 @@ class Erase():
del_rule_template['ids'] = list(create_policies_ids[i].values())[0]
del_rule_template['vsys_id'] = vsys_id
url = api_host + "/v1/policy/rule"
- response = requests.delete(url, headers=headers, params=del_rule_template)
+ response = requests.delete(url, headers=headers, params=del_rule_template, verify=False)
assert response.status_code == 200
if len(create_ip_ids) > 0:
del_obj_template['type'] = "ip"
del_obj_template['ids'] = create_ip_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_subid_ids) > 0:
del_obj_template['type'] = "subscriberid"
del_obj_template['ids'] = create_subid_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_fqdn_ids) > 0:
del_obj_template['type'] = "fqdn"
del_obj_template['ids'] = create_fqdn_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_url_ids) > 0:
del_obj_template['type'] = "url"
del_obj_template['ids'] = create_url_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_flag_ids) > 0:
del_obj_template['type'] = "flag"
del_obj_template['ids'] = create_flag_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_keywords_ids) > 0:
del_obj_template['type'] = "keywords"
del_obj_template['ids'] = create_keywords_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_account_ids) > 0:
del_obj_template['type'] = "account"
del_obj_template['ids'] = create_account_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_http_signature_ids) > 0:
del_obj_template['type'] = "http_signature"
del_obj_template['ids'] = create_http_signature_ids
del_obj_template["vsys_id"] = vsys_id
url = api_host + "/v1/policy/object"
- response = requests.delete(url, headers=headers, params=del_obj_template)
+ response = requests.delete(url, headers=headers, params=del_obj_template, verify=False)
assert response.status_code == 200
if len(create_profile_ids) > 0:
for del_porfile_index in range(len(create_profile_ids)):
@@ -79,48 +79,48 @@ class Erase():
profile_id = create_profile_ids[del_porfile_index]["id"]
del_profile_template = {"ids":str(profile_id),"vsys_id":vsys_id}
url = "{}/v1/policy/profile/statistics_template".format(api_host)
- response = requests.delete(url, headers=headers, params=del_profile_template)
+ response = requests.delete(url, headers=headers, params=del_profile_template, verify=False)
assert response.status_code == 200
elif create_profile_ids[del_porfile_index]["profile_type"] == "service_function_forwarder": # 删除SFF和SF
profile_id = create_profile_ids[del_porfile_index]["id"]
del_profile_sff = {"ids": str(profile_id),"vsys_id":vsys_id}
url = "{}/v1/policy/profile/service_function_forwarder".format(api_host)
- response = requests.delete(url, headers=headers, params=del_profile_sff)
+ response = requests.delete(url, headers=headers, params=del_profile_sff, verify=False)
if response.status_code == 200:
profile_id = create_profile_ids[del_porfile_index]["sff_info"]["sf_info"][0]["id"]
del_profile_sf = {"ids": str(profile_id),"vsys_id":vsys_id}
url = "{}/v1/policy/profile/service_function".format(api_host)
- response = requests.delete(url, headers=headers, params=del_profile_sf)
+ response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False)
assert response.status_code == 200
elif create_profile_ids[del_porfile_index]["profile_type"] == "response_page": # 删除response_page
profile_id = create_profile_ids[del_porfile_index]["id"]
url = "{}/v1/policy/profile/response_page".format(api_host)
del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id}
- response = requests.delete(url, headers=headers, params=del_profile_sf)
+ response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False)
assert response.status_code == 200
elif create_profile_ids[del_porfile_index]["profile_type"] == "hijack_files": # 删除hijack_files
profile_id = create_profile_ids[del_porfile_index]["id"]
url = "{}/v1/policy/profile/hijack_file".format(api_host)
del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id}
- response = requests.delete(url, headers=headers, params=del_profile_sf)
+ response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False)
assert response.status_code == 200
elif create_profile_ids[del_porfile_index]["profile_type"] == "insert_profile": # 删除insert
profile_id = create_profile_ids[del_porfile_index]["id"]
url = "{}/v1/policy/profile/insert_script".format(api_host)
del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id}
- response = requests.delete(url, headers=headers, params=del_profile_sf)
+ response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False)
assert response.status_code == 200
elif create_profile_ids[del_porfile_index]["profile_type"] == "run_script": # 删除run_script
profile_id = create_profile_ids[del_porfile_index]["id"]
url = "{}/v1/policy/profile/run_script".format(api_host)
del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id}
- response = requests.delete(url, headers=headers, params=del_profile_sf)
+ response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False)
assert response.status_code == 200
elif create_profile_ids[del_porfile_index]["profile_type"] == "dns_record": # 删除dns_record
profile_id = create_profile_ids[del_porfile_index]["id"]
url = "{}/v1/policy/profile/run_script".format(api_host)
del_profile_sf = {"ids": str(profile_id), "vsys_id": vsys_id}
- response = requests.delete(url, headers=headers, params=del_profile_sf)
+ response = requests.delete(url, headers=headers, params=del_profile_sf, verify=False)
assert response.status_code == 200
diff --git a/getApplicationId.py b/getApplicationId.py
index 487b589..d46f9b0 100644
--- a/getApplicationId.py
+++ b/getApplicationId.py
@@ -5,13 +5,21 @@ import pymysql.cursors
import ast
def get_app_id(app, api_host):
- # 返回group_id
+ # http测试环境返回group_id
mysql_host = api_host[7:]
- # mysql_host = "192.168.40.198"
+
+ # https 环境返回group_id
+ # mysql_host = api_host[8:]
+ # 集成测试环境
db = pymysql.connect(host=mysql_host,
user='test',
password='test',
database='tsg-bifang')
+ # 45.250 环境
+ # db = pymysql.connect(host=mysql_host,
+ # user='root',
+ # password='Bifang&*()',
+ # database='tsg-bifang-2402')
app_group_id_list=[]
# 该方法用于将字符串类型的['ssl', 'http']转换为list类型的['ssl', 'http'](在无法使用list函数直接转换的情况下)
app = ast.literal_eval(app)
diff --git a/getLog.py b/getLog.py
index 76591a9..6618a2f 100644
--- a/getLog.py
+++ b/getLog.py
@@ -12,7 +12,7 @@ class GetLog():
# 修改完成后删除注释:将下方的security_event替换成变量
headers = {'Content-Type': 'application/json', 'Authorization': token}
url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id)
- response = requests.get(url, headers=headers)
+ response = requests.get(url, headers=headers, verify=False)
assert response.status_code == 200
log_schema = json.loads(response.text)
log_schema = log_schema['data']['fields']
@@ -51,7 +51,7 @@ class GetLog():
log_condition_dict['filter'] = log_condition_dict['filter'].replace(f"client_ip={test_pc_ip}", f"client_ip='{test_pc_ip}'")
url = api_host + "/v1/log/query"
# print(json.dumps(log_condition_dict))
- response = requests.post(url, headers=headers, json=log_condition_dict)
+ response = requests.post(url, headers=headers, json=log_condition_dict, verify=False)
assert response.status_code == 200
log_list = json.loads(response.text)
# print(log_list)
@@ -97,7 +97,7 @@ class GetLog():
hit_query_condition['end'] = end_time
hit_query_condition['vsys_id'] = vsys_id
print(json.dumps(hit_query_condition))
- response = requests.post(url, headers=headers, json=hit_query_condition)
+ response = requests.post(url, headers=headers, json=hit_query_condition, verify=False)
assert response.status_code == 200
response = json.loads(response.text)
index = i + 1
@@ -177,7 +177,7 @@ class GetLog():
post_data_ori_dict["sql"] =statistics_info["sql"]
# print("statistics metric请求体数据:")
#print(post_data_ori_dict)
- response = requests.post(url, json=post_data_ori_dict, headers=headers)
+ response = requests.post(url, json=post_data_ori_dict, headers=headers, verify=False)
#assert response.status_code == 200
r_dict = response.json()
print("statistics metric结果数据:")
@@ -369,7 +369,7 @@ class GetLog():
post_data_ori_dict["vsys_id"] = vsys_id
# print("sc metric请求体数据:")
# print(post_data_ori_dict)
- response = requests.post(url, json=post_data_ori_dict, headers=headers)
+ response = requests.post(url, json=post_data_ori_dict, headers=headers, verify=False)
r_dict = response.json()
print("sc metric结果数据:")
# print(r_dict)
diff --git a/log_query.py b/log_query.py
index 6f785c3..3ddef20 100644
--- a/log_query.py
+++ b/log_query.py
@@ -32,7 +32,7 @@ class Verify():
loginJson = {"username": "", "password": ""}
loginJson["username"] = user
loginJson["password"] = self.password
- response = requests.post(url, json=loginJson)
+ response = requests.post(url, json=loginJson, verify=False)
jsonData = json.loads(response.text)
self.token = jsonData["data"]["token"]
return self.token
@@ -134,7 +134,7 @@ class Verify():
log_condition_dict['filter'] = log_filter
url = api_host + "/v1/log/query"
# print(json.dumps(log_condition_dict))
- response = requests.post(url, headers=headers, json=log_condition_dict)
+ response = requests.post(url, headers=headers, json=log_condition_dict, verify=False)
log_code = response.status_code
if log_code == 200:
log_result = True
diff --git a/verify.py b/verify.py
index 7398174..f3bf0a8 100644
--- a/verify.py
+++ b/verify.py
@@ -34,7 +34,7 @@ class Verify():
url = api_host + "/v1/user/encryptpwd"
pwJson = {"password": ""}
pwJson["password"] = pwd
- response = requests.get(url, params=pwJson)
+ response = requests.get(url, params=pwJson, verify=False)
data = json.loads(response.text)
self.password = data["data"]["encryptpwd"]
return self.password
@@ -44,7 +44,7 @@ class Verify():
loginJson = {"username": "", "password": ""}
loginJson["username"] = user
loginJson["password"] = self.password
- response = requests.post(url, json=loginJson)
+ response = requests.post(url, json=loginJson, verify=False)
jsonData = json.loads(response.text)
self.token = jsonData["data"]["token"]
return self.token
@@ -714,7 +714,7 @@ class Verify():
verify_obj_comm_dict_0["appName"] = appName
verify_list_dict["verify_list"][0]["verify_session"]["attributes"].append(verify_obj_comm_dict_0)
#print(verify_list_dict)
- response = requests.post(url, json=verify_list_dict, headers=headers)
+ response = requests.post(url, json=verify_list_dict, headers=headers, verify=False)
r_dict = response.json()
#print(json.dumps(r_dict))
hit_policy_list = []