summaryrefslogtreecommitdiff
path: root/keyword/common/customlibrary/Custometest/LogSchema.py
diff options
context:
space:
mode:
Diffstat (limited to 'keyword/common/customlibrary/Custometest/LogSchema.py')
-rw-r--r--keyword/common/customlibrary/Custometest/LogSchema.py513
1 files changed, 513 insertions, 0 deletions
diff --git a/keyword/common/customlibrary/Custometest/LogSchema.py b/keyword/common/customlibrary/Custometest/LogSchema.py
new file mode 100644
index 0000000..d190ff1
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/LogSchema.py
@@ -0,0 +1,513 @@
+# !/user/bin/python
+# -*-coding:utf-8-*-
+import requests
+import random
+import json
+import LogResponseVAL
+import time, datetime
+# import allure
+
+
+
+
+# 请求schema接口得到返回数据,用于其他接口
+def schema(schemauerl, token):
+ url = schemauerl
+ headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ return response.json()
+
+
+# 根据schema接口返回数据,得出所有属性所支持的比较类型的列表
+# 1、根据[doc][allow_query]值为true列支持搜索;
+# 2、如有[doc][constraints][operator_functions]值,操作优先;
+# 3、如有[doc][data]值则对应属性取值为data所列code值;
+# 4、int和long的范围不一致;
+# 5、string要包含特殊字符
+# 6、给查询条件赋值,要给出边界和正常值
+# 7、IP(V4、V6)和URL要给出专门的方法生成
+
+import ipaddress
+
+# 生成随机ipv4或ipv6
+MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
+MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
+
+
+def random_ipv4():
+ return ipaddress.IPv4Address._string_from_ip_int(
+ random.randint(0, MAX_IPV4)
+ )
+
+
+def random_ipv6():
+ return ipaddress.IPv6Address._string_from_ip_int(
+ random.randint(0, MAX_IPV6)
+ )
+
+
+from random import Random
+
+
+# 生成 12 位随机 URL 地址
+def randrom_url():
+ str = ''
+ str1 = ''
+ chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
+ chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()'
+ length = len(chars)
+ length1 = len(chars1)
+ random = Random()
+ for x in range(random.randint(8, 16)):
+ str += chars[random.randint(0, length - 1)]
+ for pp in range(random.randint(8, 16)):
+ str1 += chars1[random.randint(0, length1 - 1)]
+ url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1
+ print(url)
+ return url
+
+
+def Filter1(schemauerl, token):
+ list = []
+ json_str = schema(schemauerl, token)
+ print("schemauerl",json_str)
+ print(type(json_str))
+ # 获取日志属性定义
+ fields = json_str["data"]["fields"]
+ print("1111111111",fields)
+ # 获取不同属性支持的部不同操作
+ operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
+ for i in fields:
+ number = random.randint(0, 2147483647)
+ maxnumber = 2147483647
+ minnumber = -2147483648
+ str = random.choice('abcdefghijklmnopqrstuvwxyz!@#%^&*')
+ name = i["name"]
+ doc = i["doc"]
+ # 获取无任何特殊说明列:
+ if doc == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "notEmpty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ if i["doc"]["constraints"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "notEmpty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+
+ else:
+ if i["doc"]["constraints"]["operator_functions"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "notEmpty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ type1 = i["type"]
+ operator1 = i["doc"]["constraints"]["operator_functions"]
+ operator2 = operator1.split(",")
+ data = i["doc"]["data"]
+ for d in data:
+ code = d["code"]
+ if type1 == "int" or type1 == "long":
+ for o in operator2:
+ str1 = name + " " + o + " " + code
+ list.append(str1)
+ else:
+ for o in operator2:
+ str1 = name + " " + o + " " + " '" + code + " '"
+ list.append(str1)
+
+
+ print("22222222222",list)
+ return list
+
+
+# 根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口
+def logapiverify(schemauerl,logurl, token, starttime, endtime,logtype):
+ filter2 = Filter1(schemauerl, token)
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print("333333333333",filter2)
+ for i in filter2:
+ print("条件:", i)
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": i
+ }
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+ # print("111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+ # print(str2)
+ # str3 = str2[0:-4]
+ # print(str3)
+ # url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ # headers = {"Content-Type": "application/json",
+ # "Authorization": token}
+ # data = {
+ # "start_common_recv_time": starttime,
+ # "end_common_recv_time": endtime,
+ # "logType": logtype,
+ # "fields": fields,
+ # "filter": str3
+ # }
+ # print(data)
+ # print(json.dumps(data))
+ # response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ # code = response1.json()["code"]
+ # print(response1.json())
+ # assert code == 200
+ # print(response1.json()["code"])
+
+
+# 精确filter,请求日志接口
+def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ # print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+
+#目的性验证,循坏返回列表中所有字段进行查询
+def loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, datajson):
+ nullkey = []
+ data = datajson
+ keylist = LogResponseVAL.getKeys(data)
+ a = schema(schemaurl, token)
+ fields = a["data"]["fields"]
+ for i in keylist:
+ conditions = data[i]
+ for field in fields:
+ name = field["name"]
+ if field["doc"] == None or field["doc"]["visibility"] == None:
+ if i == name:
+ if conditions != None and conditions != "":
+ if field["type"] == "string":
+ if conditions[0] == "'" and conditions[-1] == "'":
+ filtervalue = i + " = " + conditions
+ VasserValue=i + " = " + conditions[1:-1]
+
+ else:
+ filtervalue = i + " = " + "'" + conditions + "'"
+ VasserValue= i + " = " + conditions
+ else:
+ if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
+ timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ filtervalue = i + " = " + timeStamp
+ VasserValue = filtervalue
+
+ else:
+ filtervalue = i + " = " + str(conditions)
+ VasserValue = filtervalue
+ print("filtervalue",filtervalue)
+ #根据提取条件进行查询日志列表
+ responsebody = loglistverify(logurl, schemaurl, token, starttime, endtime, logtype,
+ filtervalue)
+ filterlist=[VasserValue]
+ print(VasserValue)
+ LogResponseVAL.FieldValidation(responsebody,filterlist)
+
+ else:
+ nullkey.append(i) #所有为None或者“”的字段
+ return nullkey
+
+ # 多条循环 变量设置为公共参数 若循环内一个字段没有值 进行下次循坏
+def logAllFieldsListInterface(logurl, schemaurl, token, starttime, endtime, logtype, datajson,lognumber,logcycles):
+ datalist = datajson["data"]["list"]
+ keylist=[]
+ number=0
+ print(lognumber)
+ print(type(lognumber))
+ print(logcycles)
+ print(type(logcycles))
+ for i in range(0, len(datalist), int(lognumber)):# 循环取出count个列表元素
+ number+=1
+ nullkeylist=[]
+ ret=datalist[i:i + int(lognumber)]
+ for data in ret:
+ nullkey=loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, data)
+ nullkeylist.append(nullkey)
+ print(nullkeylist)
+ for j in nullkeylist:
+ #对返回的为空的key进行取交集
+ if len(keylist) == 0:
+ keylist=j
+ else:
+ #取两个列表的交集
+ keylist=list(set(keylist).intersection(set(j)))
+ if len(keylist) == 0 or number >= int(logcycles):
+ break
+ print("最终数据中没有值的字段为:",keylist)
+
+
+# 事件日志和通联日志时间分布查询 ,日志检索条件校验(filter内容验证)
+def distributed_query(logurl, token):
+ url = logurl # url示例:http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
+ headers = {"Content-Type": "application/json", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ code = response.json()["code"]
+ print(response.json())
+ assert code == 200
+ print(response.json()["code"])
+ return response.json()
+
+#日志检索条件校验 纯接口
+def LogRetrieve(schemaurl,host,port,token,logType,datajson):
+ number = random.randint(0, 2147483647)
+ str1 = random.choice('abcdefghijklmnopqrstuvwxyz')
+ data=datajson["data"]["list"][0]
+ keylist = LogResponseVAL.getKeys(data)
+ a = schema(schemaurl, token)
+ fields=a["data"]["fields"]
+ for i in keylist:
+ conditions = data[i]
+ for field in fields:
+ name = field["name"]
+ if i == name:
+ if field["type"] == "string":
+ filter = "logType=" + logType + "&" + "filter=" + i + "=" + "'" + str1 + "'"
+ else:
+ if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
+ timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ filter = "logType=" + logType + "&" + "filter=" + i + "=" + timeStamp
+ else:
+ filter = "logType=" + logType + "&" + "filter=" + i + "=" + str(number)
+ Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + filter
+ print(Logurl)
+ responsebody = distributed_query(Logurl, token)
+
+# 日志检索条件校验 复杂sql
+def LogRetrieveSql(schemaurl,host,port,token,logType,datajson):
+ data = datajson["data"]["list"][0]
+ keylist = LogResponseVAL.getKeys(data)
+ sqllist=random.sample(keylist, 4)
+ number = 45585
+ str1 = random.choice('abcdefghijklmnopqrstuvwxyz')
+ print(sqllist)
+ a = schema(schemaurl, token)
+ filterlist=[]
+ fields=a["data"]["fields"]
+ for i in sqllist:
+ conditions = data[i]
+ for field in fields:
+ name = field["name"]
+ if i == name:
+ if field["type"] == "string":
+ if conditions == "" or conditions == None:
+ conditions=str1
+ filter = i + "=" + "'" + conditions + "'"
+ else:
+ if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
+ timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ filter =i + "=" + timeStamp
+ else:
+ if conditions == "" or conditions == None:
+ conditions = number
+ filter = i + "=" + str(conditions)
+ print(filter)
+ filterlist.append(filter)
+ sqlfilter = "(("+filterlist[0]+" OR "+filterlist[1]+") AND "+filterlist[2]+") OR "+filterlist[3]
+ _filter = "logType=" + logType + "&" + "filter=" + sqlfilter
+ Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + _filter
+ print(Logurl)
+ responsebody = distributed_query(Logurl, token)
+ print(sqlfilter)
+
+ # 原始日志检索时间分布计算
+def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue):
+ url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "startTime": starttime,
+ "endTime": endtime,
+ "logType": logtype,
+ "granularity": granularity,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志总数查询
+def countlog_query(logurl, token, starttime, endtime, logtype):
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "pageSize": 20,
+ "logType": logtype,
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "filter": ""
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志导出接口
+def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ a=type(response1)
+ if a != "class 'requests.models.Response'":
+ assert 1 == 1
+ else:
+ assert 1 == 2
+
+#判断日志内详情字段
+def LogFieldValidation(schemauerl,token,datajson):
+ Schemajson = schema(schemauerl, token)
+ fields=Schemajson["data"]["fields"]
+ keylist= LogResponseVAL.getKeys(datajson["data"]["list"][0])
+ schema_typedict=Schemajson["data"]["doc"]["schema_type"]
+ schema_typelistkey=schema_typedict.keys()
+ for schema_typekey in schema_typelistkey: #取出schema_type内的每一个key
+ for i in schema_typedict[schema_typekey]["columns"]:
+ for filter in fields:
+ if filter["name"] == i:
+ if filter["doc"] == None:
+ if i not in keylist:
+ print("该字段未存在日志详情内",i)
+ assert 1==2
+ else:
+ print("该字段通过在日志详情内",i)
+ else:
+ if filter["doc"]["visibility"] != "disabled":
+ if i not in keylist:
+ print("该字段未存在日志详情内",i)
+ assert 1==2
+ else:
+ print("该字段通过在日志详情内",i)
+
+
+
+
+
+
+
+# if __name__ == '__main__':
+# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log") \ No newline at end of file