diff options
| author | 董晓燕 <[email protected]> | 2021-06-03 09:55:45 +0000 |
|---|---|---|
| committer | 董晓燕 <[email protected]> | 2021-06-03 09:55:45 +0000 |
| commit | ac68e65f508799a0e555a240ae374d313a0a8d75 (patch) | |
| tree | 2a339bbd8acd65e2fb235159cc9c5303ae5725b7 /keyword/common/customlibrary/Custometest/cmd_cer.py | |
| parent | 2f39b56d617e5fba2b8d73d81cd5e6d894f85352 (diff) | |
| parent | 4667c668725ff7cb673c637a297c67283876d4d4 (diff) | |
Develop
See merge request dongxiaoyan/gap_tsg_api!4
Diffstat (limited to 'keyword/common/customlibrary/Custometest/cmd_cer.py')
| -rw-r--r-- | keyword/common/customlibrary/Custometest/cmd_cer.py | 290 |
1 files changed, 290 insertions, 0 deletions
diff --git a/keyword/common/customlibrary/Custometest/cmd_cer.py b/keyword/common/customlibrary/Custometest/cmd_cer.py new file mode 100644 index 0000000..50cdf08 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/cmd_cer.py @@ -0,0 +1,290 @@ +import os +import subprocess +from time import sleep +import platform + + + +class Order: + def CMD(self,data): + result = os.popen(data) + # res = result.read().encoding('GBK') + res = result.read() + result.close() + # res = res.decode("unicode-escape") + return res + def Linux(self): + pass + # 根据证书颁发者名字判断证书是否替换 + def Cert_Verification(self,data): + c = [] + print(1) + #with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo: + with open(r'certificate.yaml', 'r') as foo: + print(2) + for line in foo.readlines(): + if data in line: + print(line) + c.append('证书已替换') + else: + pass + if '证书已替换' in c: + # print('证书已替换') + foo.close() + return '证书已替换' + else: + # print('证书未替换') + foo.close() + return '证书未替换' + + def Content_Type(self,data): + d = [] + with open('certificate.yaml', 'r') as foo: + for line in foo.readlines(): + if data in line: + # print(line) + d.append('Content_Type已替换') + else: + pass + if 'Content_Type已替换' in d: + # print('证书已替换') + foo.close() + return 'Content_Type已替换' + else: + # print('证书未替换') + foo.close() + return 'Content_Type未替换' + # curl路由内容设置 + def curl_name(self,data): + #curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk' + curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk' + return curl_name + # 控制器 + def manu(self,url,Certificate): + # print(data['url']) + n = 0 + while n != len(url): + b = self.curl_name(url[n]) + d = self.CMD(b) + # print(d) + sleep(1) + if Certificate != "": + c =self.Cert_Verification(Certificate) + # f = self.Content_Type(data["Content_Type"]) + sleep(1) + assert_cer = url[n]+c + # assert_Content_Type = data['Content_Type']+f + n+=1 + return d,assert_cer + + def FTP(self, ftp_type): + windows_path = os.getcwd() + linux_path = os.getcwd().replace('\\', '/') + # 判断FTP执行类型:(下载/登录) + if ftp_type == "下载": + # 调用cmd执行FTP下载文件 + data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" -o zmmtext123.txt' + d = self.CMD(data) + sleep(5) + fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814 + if fsize == 435814: + return "ftp_success" + else: + return "ftp_fail" + elif ftp_type == "登录": + data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" | iconv -f utf-8 -t gbk' + d = self.CMD(data) + # print(d) + if "Graphical (Xorg) program starter for ADRIANE" in d: + return "ftp_success" + else: + return "ftp_fail" + # FTP 下载 + def FTP_down(self, ftp_url,file_size,file_name): + windows_path = os.getcwd() + linux_path = os.getcwd().replace('\\', '/') + # 判断FTP执行类型:(下载/登录) + # 调用cmd执行FTP下载文件 + data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' " + print(data) + d = self.CMD(data) + sleep(5) + fsize = os.path.getsize(linux_path + "/"+file_name) # 435814 + print(fsize) + if fsize == file_size: + return "ftp_success" + else: + return "ftp_fail" + + # FTP 登录 + def FTP_login(self, ftp_url,file_content): + SYS = self.Operating_System() + if SYS == "Windows": + data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk' + d = self.CMD(data) + else: + data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk' + d = self.CMD(data) + + if file_content in d: + return "ftp_success" + else: + return "ftp_fail" + + # 判断当前操作系统 + def Operating_System(self): + os_name = platform.system() + return os_name + + + #需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容 + # 全局变量 null,将java中的空值(null),装换位python中空值("") + global null + null = '' + + # 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容,header 启用自定义json + def Jsoneditmanu(self, jsons, datas=None,header=None): + #判断是否启用自定义json + if header != None: + header = eval(header) + # 返回 header + return header + # 判断是否需要更改json内容 + elif datas != None: + # datas = eval(datas) + jsons = eval(jsons) + # 循环遍历替换json内容 + for k, v in datas.items(): + Order.UpdateAllvalues(self,jsons, k, v) + return jsons + else: + # 返回初始json + return jsons + + # 循环嵌套替换 + def UpdateAllvalues(self,mydict, key, value): + if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典 + if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key + mydict[key] = value + for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key + chdict = mydict[k] + Order.UpdateAllvalues(self,chdict, key, value) + mydict[k] = chdict + elif isinstance(mydict, list): # 如是list + for element in mydict: # 遍历list元素,以下重复上面的操作 + if isinstance(element, dict): + if key in element.keys(): + element[key] = value + for k in element.keys(): + chdict = element[k] + Order.UpdateAllvalues(self,chdict, key, value) + element[k] = chdict + + + + #递归提取json中符合要求键的值 + import json + def get_dict_allkeys(self,dict_a): + """ + 遍历嵌套字典,获取json返回结果的所有key值 + :param dict_a: + :return: key_list + """ + if isinstance(dict_a, dict): # 使用isinstance检测数据类型 + # 如果为字典类型,则提取key存放到key_list中 + for x in range(len(dict_a)): + temp_key = list(dict_a.keys())[x] + temp_value = dict_a[temp_key] + if temp_key.endswith("Id"): + key_list.append(temp_value) + Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历 + elif isinstance(dict_a, list): + # 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key + for k in dict_a: + if isinstance(k, dict): + for x in range(len(k)): + temp_key = list(k.keys())[x] + temp_value = k[temp_key] + if temp_key.endswith("Id"): + key_list.append(temp_value) + Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历 + return key_list + + # 判断值是否在列表中 + def VerifyProxy(self,data,lists): + global key_list + key_list = [] + datas = Order.get_dict_allkeys(self,data) + print(type(datas)) + lists=lists.split(",") + print(type(lists)) + print("gsd") + datas2=list(map(str,datas)) + print(datas2) + print(datas) + print(lists) + + if set(datas2) > set(lists): + return "true" + else: + return "flase" + + +if __name__ == '__main__': +# datas = {"url":['https://www.baidu.com'], +# "Certificate":"Tango Secure Gateway CA", +# # "Content_Type":"text/html", +# 'log':'Security Event Logs', +# "sni":['baidu'], +# "intercept_code":"200", +# "log_code":"200", +# "certifucate":"1", +# "log_content":"true" +# } +# # data= {"url":['https://www.baidu.com'], +# # "Certificate":"Tango Secure Gateway CA" +# # } +# # url = ['https://www.baidu.com'] +# # url = ['https://www.baidu.com'] +# # url = ['https://www.baidu.com'] +# # # Certificate1 = "Tango Secure Gateway CA" +# # Certificate = "baidu" +# # a='Tango Secure Gateway CA' +# # s = Order() +# # b = s.manu(url,Certificate) +# # print(b[1]) +# # FTP下载 传入ftp的路径和文件大小 +# ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" ' +# ftp_size = 435814 +# ftp_issue = s.FTP_down(ftp_url,ftp_size) +# # FTP登录 传入ftp的路径和文件内容 +# ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" ' +# file_content = "Graphical (Xorg) program starter for ADRIANE" +# ftp_issue = s.FTP_login(ftp_url,file_content) + # for i in b: + # print(i) + # dd = s.CMD('curl -I https://www.baidu.com') + # print(dd) + # if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd: + # print("ok") + # a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk' + + + # 自己写一个字典测试一下上面的方法好用不好用 + jsons = {"opAction":"add","policyList":{"policyId":"","policyName":"2324242423","policyType":"tsg_security","action":"intercept","userTags":"","doBlacklist":0,"doLog":1,"policyDesc":"","effectiveRange":{"tag_sets":[[]]},"userRegion":{"protocol":"SSL","keyring":1,"decryption":1,"decrypt_mirror":{"enable":0,"mirror_profile":null}},"referenceObject":[{"objectId":28054,"protocolFields":["TSG_SECURITY_SOURCE_ADDR"]}],"isValid":0,"scheduleId":[],"appObjectIdArray":[3]}} + datas = {"protocol":"edit","opAction":"edit","policyId":123,'protocolFields':1} + + print("替换前:\n %s" % jsons) + + + a = Order() + b = a.Jsoneditmanu(jsons,datas) + # print("替换前:\n %s" % jsons) + print("替换后:\n %s" % b) + + data = {"aid":[{'bid':2},{'cid':3}]} + print(type(data)) + # data="""{}""" + # data1 = json.loads(data) + get_keys = get_dict_allkeys(data) + print(get_keys) |
