summaryrefslogtreecommitdiff
path: root/keyword/common/customlibrary/Custometest/cmd_cer.py
diff options
context:
space:
mode:
Diffstat (limited to 'keyword/common/customlibrary/Custometest/cmd_cer.py')
-rw-r--r--keyword/common/customlibrary/Custometest/cmd_cer.py290
1 files changed, 290 insertions, 0 deletions
diff --git a/keyword/common/customlibrary/Custometest/cmd_cer.py b/keyword/common/customlibrary/Custometest/cmd_cer.py
new file mode 100644
index 0000000..50cdf08
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/cmd_cer.py
@@ -0,0 +1,290 @@
+import os
+import subprocess
+from time import sleep
+import platform
+
+
+
+class Order:
+ def CMD(self,data):
+ result = os.popen(data)
+ # res = result.read().encoding('GBK')
+ res = result.read()
+ result.close()
+ # res = res.decode("unicode-escape")
+ return res
+ def Linux(self):
+ pass
+ # 根据证书颁发者名字判断证书是否替换
+ def Cert_Verification(self,data):
+ c = []
+ print(1)
+ #with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
+ with open(r'certificate.yaml', 'r') as foo:
+ print(2)
+ for line in foo.readlines():
+ if data in line:
+ print(line)
+ c.append('证书已替换')
+ else:
+ pass
+ if '证书已替换' in c:
+ # print('证书已替换')
+ foo.close()
+ return '证书已替换'
+ else:
+ # print('证书未替换')
+ foo.close()
+ return '证书未替换'
+
+ def Content_Type(self,data):
+ d = []
+ with open('certificate.yaml', 'r') as foo:
+ for line in foo.readlines():
+ if data in line:
+ # print(line)
+ d.append('Content_Type已替换')
+ else:
+ pass
+ if 'Content_Type已替换' in d:
+ # print('证书已替换')
+ foo.close()
+ return 'Content_Type已替换'
+ else:
+ # print('证书未替换')
+ foo.close()
+ return 'Content_Type未替换'
+ # curl路由内容设置
+ def curl_name(self,data):
+ #curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
+ curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
+ return curl_name
+ # 控制器
+ def manu(self,url,Certificate):
+ # print(data['url'])
+ n = 0
+ while n != len(url):
+ b = self.curl_name(url[n])
+ d = self.CMD(b)
+ # print(d)
+ sleep(1)
+ if Certificate != "":
+ c =self.Cert_Verification(Certificate)
+ # f = self.Content_Type(data["Content_Type"])
+ sleep(1)
+ assert_cer = url[n]+c
+ # assert_Content_Type = data['Content_Type']+f
+ n+=1
+ return d,assert_cer
+
+ def FTP(self, ftp_type):
+ windows_path = os.getcwd()
+ linux_path = os.getcwd().replace('\\', '/')
+ # 判断FTP执行类型:(下载/登录)
+ if ftp_type == "下载":
+ # 调用cmd执行FTP下载文件
+ data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" -o zmmtext123.txt'
+ d = self.CMD(data)
+ sleep(5)
+ fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
+ if fsize == 435814:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+ elif ftp_type == "登录":
+ data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+ # print(d)
+ if "Graphical (Xorg) program starter for ADRIANE" in d:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+ # FTP 下载
+ def FTP_down(self, ftp_url,file_size,file_name):
+ windows_path = os.getcwd()
+ linux_path = os.getcwd().replace('\\', '/')
+ # 判断FTP执行类型:(下载/登录)
+ # 调用cmd执行FTP下载文件
+ data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' "
+ print(data)
+ d = self.CMD(data)
+ sleep(5)
+ fsize = os.path.getsize(linux_path + "/"+file_name) # 435814
+ print(fsize)
+ if fsize == file_size:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+
+ # FTP 登录
+ def FTP_login(self, ftp_url,file_content):
+ SYS = self.Operating_System()
+ if SYS == "Windows":
+ data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+ else:
+ data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+
+ if file_content in d:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+
+ # 判断当前操作系统
+ def Operating_System(self):
+ os_name = platform.system()
+ return os_name
+
+
+ #需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容
+ # 全局变量 null,将java中的空值(null),装换位python中空值("")
+ global null
+ null = ''
+
+ # 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容,header 启用自定义json
+ def Jsoneditmanu(self, jsons, datas=None,header=None):
+ #判断是否启用自定义json
+ if header != None:
+ header = eval(header)
+ # 返回 header
+ return header
+ # 判断是否需要更改json内容
+ elif datas != None:
+ # datas = eval(datas)
+ jsons = eval(jsons)
+ # 循环遍历替换json内容
+ for k, v in datas.items():
+ Order.UpdateAllvalues(self,jsons, k, v)
+ return jsons
+ else:
+ # 返回初始json
+ return jsons
+
+ # 循环嵌套替换
+ def UpdateAllvalues(self,mydict, key, value):
+ if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典
+ if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
+ mydict[key] = value
+ for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key
+ chdict = mydict[k]
+ Order.UpdateAllvalues(self,chdict, key, value)
+ mydict[k] = chdict
+ elif isinstance(mydict, list): # 如是list
+ for element in mydict: # 遍历list元素,以下重复上面的操作
+ if isinstance(element, dict):
+ if key in element.keys():
+ element[key] = value
+ for k in element.keys():
+ chdict = element[k]
+ Order.UpdateAllvalues(self,chdict, key, value)
+ element[k] = chdict
+
+
+
+ #递归提取json中符合要求键的值
+ import json
+ def get_dict_allkeys(self,dict_a):
+ """
+ 遍历嵌套字典,获取json返回结果的所有key值
+ :param dict_a:
+ :return: key_list
+ """
+ if isinstance(dict_a, dict): # 使用isinstance检测数据类型
+ # 如果为字典类型,则提取key存放到key_list中
+ for x in range(len(dict_a)):
+ temp_key = list(dict_a.keys())[x]
+ temp_value = dict_a[temp_key]
+ if temp_key.endswith("Id"):
+ key_list.append(temp_value)
+ Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
+ elif isinstance(dict_a, list):
+ # 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key
+ for k in dict_a:
+ if isinstance(k, dict):
+ for x in range(len(k)):
+ temp_key = list(k.keys())[x]
+ temp_value = k[temp_key]
+ if temp_key.endswith("Id"):
+ key_list.append(temp_value)
+ Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
+ return key_list
+
+ # 判断值是否在列表中
+ def VerifyProxy(self,data,lists):
+ global key_list
+ key_list = []
+ datas = Order.get_dict_allkeys(self,data)
+ print(type(datas))
+ lists=lists.split(",")
+ print(type(lists))
+ print("gsd")
+ datas2=list(map(str,datas))
+ print(datas2)
+ print(datas)
+ print(lists)
+
+ if set(datas2) > set(lists):
+ return "true"
+ else:
+ return "flase"
+
+
+if __name__ == '__main__':
+# datas = {"url":['https://www.baidu.com'],
+# "Certificate":"Tango Secure Gateway CA",
+# # "Content_Type":"text/html",
+# 'log':'Security Event Logs',
+# "sni":['baidu'],
+# "intercept_code":"200",
+# "log_code":"200",
+# "certifucate":"1",
+# "log_content":"true"
+# }
+# # data= {"url":['https://www.baidu.com'],
+# # "Certificate":"Tango Secure Gateway CA"
+# # }
+# # url = ['https://www.baidu.com']
+# # url = ['https://www.baidu.com']
+# # url = ['https://www.baidu.com']
+# # # Certificate1 = "Tango Secure Gateway CA"
+# # Certificate = "baidu"
+# # a='Tango Secure Gateway CA'
+# # s = Order()
+# # b = s.manu(url,Certificate)
+# # print(b[1])
+# # FTP下载 传入ftp的路径和文件大小
+# ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" '
+# ftp_size = 435814
+# ftp_issue = s.FTP_down(ftp_url,ftp_size)
+# # FTP登录 传入ftp的路径和文件内容
+# ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" '
+# file_content = "Graphical (Xorg) program starter for ADRIANE"
+# ftp_issue = s.FTP_login(ftp_url,file_content)
+ # for i in b:
+ # print(i)
+ # dd = s.CMD('curl -I https://www.baidu.com')
+ # print(dd)
+ # if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
+ # print("ok")
+ # a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'
+
+
+ # 自己写一个字典测试一下上面的方法好用不好用
+ jsons = {"opAction":"add","policyList":{"policyId":"","policyName":"2324242423","policyType":"tsg_security","action":"intercept","userTags":"","doBlacklist":0,"doLog":1,"policyDesc":"","effectiveRange":{"tag_sets":[[]]},"userRegion":{"protocol":"SSL","keyring":1,"decryption":1,"decrypt_mirror":{"enable":0,"mirror_profile":null}},"referenceObject":[{"objectId":28054,"protocolFields":["TSG_SECURITY_SOURCE_ADDR"]}],"isValid":0,"scheduleId":[],"appObjectIdArray":[3]}}
+ datas = {"protocol":"edit","opAction":"edit","policyId":123,'protocolFields':1}
+
+ print("替换前:\n %s" % jsons)
+
+
+ a = Order()
+ b = a.Jsoneditmanu(jsons,datas)
+ # print("替换前:\n %s" % jsons)
+ print("替换后:\n %s" % b)
+
+ data = {"aid":[{'bid':2},{'cid':3}]}
+ print(type(data))
+ # data="""{}"""
+ # data1 = json.loads(data)
+ get_keys = get_dict_allkeys(data)
+ print(get_keys)