summaryrefslogtreecommitdiff
path: root/keyword/common/customlibrary/Custometest/Schema.py
diff options
context:
space:
mode:
Diffstat (limited to 'keyword/common/customlibrary/Custometest/Schema.py')
-rw-r--r--keyword/common/customlibrary/Custometest/Schema.py350
1 files changed, 350 insertions, 0 deletions
diff --git a/keyword/common/customlibrary/Custometest/Schema.py b/keyword/common/customlibrary/Custometest/Schema.py
new file mode 100644
index 0000000..835d96a
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/Schema.py
@@ -0,0 +1,350 @@
+# !/user/bin/python
+# -*-coding:utf-8-*-
+import requests
+import random
+import json
+
+# import allure
+
+list = []
+
+
+# 请求schema接口得到返回数据,用于其他接口
+def schema(schemauerl, token):
+ url = schemauerl # "http://192.168.44.72:8080/v1/log/schema?logType=security_event_log"
+ headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ return response.json()
+
+
+# 根据schema接口返回数据,得出所有属性所支持的比较类型的列表
+# 1、根据[doc][allow_query]值为true列支持搜索;
+# 2、如有[doc][constraints][operator_functions]值,操作优先;
+# 3、如有[doc][data]值则对应属性取值为data所列code值;
+# 4、int和long的范围不一致;
+# 5、string要包含特殊字符
+# 6、给查询条件赋值,要给出边界和正常值
+# 7、IP(V4、V6)和URL要给出专门的方法生成
+
+import ipaddress
+
+# 生成随机ipv4或ipv6
+MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
+MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
+
+
+def random_ipv4():
+ return ipaddress.IPv4Address._string_from_ip_int(
+ random.randint(0, MAX_IPV4)
+ )
+
+
+def random_ipv6():
+ return ipaddress.IPv6Address._string_from_ip_int(
+ random.randint(0, MAX_IPV6)
+ )
+
+
+from random import Random
+
+
+# 生成 12 位随机 URL 地址
+def randrom_url():
+ str = ''
+ str1 = ''
+ chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
+ chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()'
+ length = len(chars)
+ length1 = len(chars1)
+ random = Random()
+ for x in range(random.randint(8, 16)):
+ str += chars[random.randint(0, length - 1)]
+ for pp in range(random.randint(8, 16)):
+ str1 += chars1[random.randint(0, length1 - 1)]
+ url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1
+ print(url)
+ return url
+
+
+def Filter1(schemauerl, token):
+ json_str = schema(schemauerl, token)
+ print(type(json_str))
+ # 获取日志属性定义
+ fields = json_str["data"]["fields"]
+ # 获取不同属性支持的部不同操作
+ operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
+ for i in fields:
+ number = random.randint(-2147483648, 2147483647)
+ maxnumber = 2147483647
+ minnumber = -2147483648
+ str = random.choice('abcdefghijklmnopqrstuvwxyz!@#$%^&*')
+ name = i["name"]
+ doc = i["doc"]
+ # 获取无任何特殊说明列:
+ if doc == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "not empty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ if i["doc"]["constraints"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "not empty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+
+ else:
+ if i["doc"]["constraints"]["operator_functions"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "not empty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ type1 = i["type"]
+ operator1 = i["doc"]["constraints"]["operator_functions"]
+ operator2 = operator1.split(",")
+ data = i["doc"]["data"]
+ for d in data:
+ code = d["code"]
+ if type1 == "int" or type1 == "long":
+ for o in operator2:
+ str1 = name + " " + o + " " + code
+ list.append(str1)
+ else:
+ for o in operator2:
+ str1 = name + " " + o + " " + " '" + code + " '"
+ list.append(str1)
+
+
+ print(list)
+ return list
+
+
+# 根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口
+def logapiverify(logurl, schemauerl, token, starttime, endtime, logtype):
+ filter2 = Filter1(schemauerl, token)
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ str2 = ""
+ for i in filter2:
+ str2 = str2 + i + " " + "and" + " "
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": i
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+ print(str2)
+ str3 = str2[0:-4]
+ print(str3)
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": str3
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ assert code == 200
+ print(response1.json()["code"])
+
+
+# 精确filter,请求日志接口
+def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+
+
+# 事件日志和通联日志时间分布查询 ,日志检索条件校验(filter内容验证)
+def distributed_query(logurl, token):
+ url = logurl # url示例:http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
+ headers = {"Content-Type": "application/json", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ code = response.json()["code"]
+ print(response.json())
+ assert code == 200
+ print(response.json()["code"])
+ return response.json()
+
+
+# 原始日志检索时间分布计算
+def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue):
+ url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "startTime": starttime,
+ "endTime": endtime,
+ "logType": logtype,
+ "granularity": granularity,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志总数查询
+def countlog_query(logurl, token, starttime, endtime, logtype):
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "pageSize": 20,
+ "logType": logtype,
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "filter": ""
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志导出接口
+def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ a=type(response1)
+ if a != "class 'requests.models.Response'":
+ assert 1 == 1
+ else:
+ assert 1 == 2
+
+
+
+
+# if __name__ == '__main__':
+# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log") \ No newline at end of file