1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
|
#!/usr/bin/python3
# coding=utf-8
import json
import os
import report
import fnmatch
import requests
from datetime import datetime
import datetime
class Verify():
def __init__(self):
self.password = ""
self.token = ""
self.table_data = []
self.all_cases_count = 0
self.excuted_cases_count = 0
self.pass_cases_count = 0
self.fail_cases_count = 0
def encryptPwd(self, pwd, api_host):
url = api_host + "/v1/user/encryptpwd"
pwJson = {"password": ""}
pwJson["password"] = pwd
response = requests.get(url, params=pwJson)
data = json.loads(response.text)
self.password = data["data"]["encryptpwd"]
return self.password
def login(self, user, api_host):
url = api_host + "/v1/user/login"
loginJson = {"username": "", "password": ""}
loginJson["username"] = user
loginJson["password"] = self.password
response = requests.post(url, json=loginJson, verify=False)
jsonData = json.loads(response.text)
self.token = jsonData["data"]["token"]
return self.token
def start_verify(self, user, password, api_host, path_dict, vsys_id, debug_json=""):
"""
:param user:
:param password:
:param api_host:
:param path_dict: 路径字典,需要使用路径参数,在此变量中
:param vsys_id
:param debug_json:
:return:
"""
self.encryptPwd(password, api_host)
self.login(user, api_host)
debug_flag = 0
folder_path = path_dict["folder_path"]
self.table_data.append(["Name", "Result", "Failure Reason"])
# 循环读取文件夹中的json文件,每个json文件代表一个用例
for filename in os.listdir(folder_path):
# 检查是否为json文件
if filename.endswith(".json"):
if debug_json != "": # debug单个文件
debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json)
if debug_flag == -1:
break
else: # 拼接完整路径, 非debug单个文件
file_path = os.path.join(folder_path, filename)
try:
# 读取json文件
with open(file_path, 'r', encoding='utf-8') as f:
# print("当前执行的用例是:"+ folder_path+'/'+filename)
print("当前执行的用例是:" + file_path)
config = json.load(f)
now = datetime.utcnow()
one_minute_later = now + datetime.timedelta(minutes=1)
# print(one_minute_later)
start_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
end_time = one_minute_later.strftime('%Y-%m-%dT%H:%M:%SZ')
# 查询日志
# log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host, vsys_id)
log_result = self.operate_sourcedata(self.token, config, start_time, end_time, api_host,
vsys_id)
# 生成报告的数据
self.excuted_cases_count += 1
report_data = []
case_name = os.path.splitext(filename)[0]
report_data.append(case_name)
if log_result == False:
result = "Fail"
self.fail_cases_count += 1
else:
result = "PASS"
self.pass_cases_count += 1
report_data.append(result)
failure_reason = ""
if log_result == False:
failure_reason = failure_reason + "The code returned is not 200."
report_data.append(failure_reason)
self.table_data.append(report_data)
finally:
if debug_flag == 1: # 只debug单个文件
break # 遇到json文件绝对路径,只执行一次
def get_log_schema(self, token, log_type, api_host, vsys_id):
# 修改完成后删除注释:将下方的security_event替换成变量
headers = {'Content-Type': 'application/json', 'Authorization': token}
url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id)
# print(url)
response = requests.get(url, headers=headers)
# print(response)
assert response.status_code == 200
log_schema = json.loads(response.text)
log_schema = log_schema['data']['fields']
log_schema = ",".join([field['name'] for field in log_schema])
return log_schema
def operate_sourcedata(self, token, config, start_time, end_time, api_host, vsys_id):
# 从数据源中获得对应变量的值
# log_filter = config['log_filter']
int_fields_common_value = config['int_fields_common_value']
string_fields_common_value = config['string_fields_common_value']
source = config['source']
try:
mysource = config['mysource']
print(mysource)
except:
mysource = ''
fields = config['fields']
for field in fields:
field_name = field['name']
try:
field_type = field['type']['type']
except:
field_type = field['type']
if field_type == 'long' or field_type == 'int': # 数字类型字段的运算语句
try:
str_operators = field['doc']['constraints']['operator_functions']
operators = [f'{item.strip()}' for item in str_operators.split(',')]
except:
operators = config['int_fields_common_operator']
for operator in operators:
if operator == "in" or operator == "not in":
log_filter = ('({} {} ({}))').format(field_name, operator, int_fields_common_value)
# print(log_filter)
elif operator == "bitAnd":
log_filter = ('({}({},{}))').format(operator, field_name, int_fields_common_value)
# print(log_filter)
else:
log_filter = ('({}{}{})').format(field_name, operator, int_fields_common_value)
# print(log_filter)
self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
log_filter, mysource)
elif field_type == 'string': # 字符串类型字段的运算语句
try:
str_operators = field['doc']['constraints']['operator_functions']
operators = [f'{item.strip()}' for item in str_operators.split(',')]
except:
operators = config['str_fields_common_operator']
# print(operators)
for operator in operators:
if operator == "=" or operator == "!=":
log_filter = ("({}{}'{}')").format(field_name, operator, string_fields_common_value)
# print(log_filter)
elif operator == "notEmpty" or operator == "empty":
log_filter = ('({}({}))').format(operator, field_name)
# print(log_filter)
else:
log_filter = ("({} {} ('{}'))").format(field_name, operator, string_fields_common_value)
# print(log_filter)
self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
log_filter, mysource)
elif field_type == 'array': # 列表类型字段的运算语句
try:
str_operators = field['doc']['constraints']['operator_functions']
operators = [f'{item.strip()}' for item in str_operators.split(',')]
except:
operators = config['str_fields_common_operator']
# print(operators)
for operator in operators:
if operator == "has":
log_filter = ("({}({},{}))").format(operator, field_name, int_fields_common_value)
# print(log_filter)
else:
log_filter = ('({}({}))').format(operator, field_name)
# print(log_filter)
self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
log_filter, mysource)
def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id, source, log_filter,
mysource):
headers = {'Content-Type': 'application/json', 'Authorization': token}
log_condition = {
"execution_mode": "oneshot",
"limit": "0,20",
"identifier_name": "security_event-list",
"source": "security_event",
"columns": None,
"start_time": "",
"end_time": "",
"filter": "",
"vsys_id": 1,
"interval": 1
}
log_condition_dict = json.loads(json.dumps(log_condition))
fields = self.get_log_schema(token, source, api_host, vsys_id)
log_condition_dict['filter'] = log_filter
if source == "security_event":
log_condition_dict['identifier_name'] = "security-event-list"
elif source == "monitor_event":
log_condition_dict['identifier_name'] = "monitor-event-list"
elif source == "monitor_event":
log_condition_dict['identifier_name'] = "monitor-event-list"
# elif source == "session_record_intercept":
# log_condition_dict['filter'] = "({}AND notEmpty(proxy_action)".format(log_filter)
elif source == "proxy_event":
log_condition_dict['identifier_name'] = "proxy-event-manipulation-list"
elif source == "session_record":
log_condition_dict['identifier_name'] = "session-record-list"
if mysource == "session_record_intercept":
log_condition_dict['filter'] = "({}AND notEmpty(proxy_action))".format(log_filter)
elif source == "voip_record":
log_condition_dict['identifier_name'] = "voip-record-list"
else:
log_condition_dict['identifier_name'] = "dos-event-list"
# 用数据源中变量的值替换模板中变量的值
log_condition_dict['columns'] = fields
log_condition_dict['start_time'] = start_time
log_condition_dict['end_time'] = end_time
log_condition_dict['vsys_id'] = vsys_id
log_condition_dict['source'] = source
# log_condition_dict['identifier_name'] = source+"-list"
url = api_host + "/v1/log/query"
print(url)
print(json.dumps(log_condition_dict))
response = requests.post(url, headers=headers, json=log_condition_dict)
print(response)
log_code = response.status_code
if log_code == 200:
log_result = True
else:
log_result = False
print(log_result)
return log_result
def is_debug_case_file(self, folder_path, debug_json):
debug_json_abspath = os.path.join(folder_path, debug_json)
if os.path.exists(debug_json_abspath):
debug_flag = 1
file_path = debug_json_abspath
return debug_flag, file_path
else:
# raise Exception("debug单个json文件不存在:{}".format(debug_json_abspath))
print("debug单个json文件不存在:{}".format(debug_json_abspath))
return -1, ""
# 遍历文件夹,查看有多少个用例
def find_json_files(self, directory):
for _, _, files in os.walk(directory):
for file in files:
if fnmatch.fnmatch(file, '*.json'):
self.all_cases_count += 1
def build_report(self):
unexcuted_cases_count = self.all_cases_count - self.excuted_cases_count
pie_table_data = [
[self.all_cases_count, self.pass_cases_count, self.fail_cases_count, unexcuted_cases_count]
]
pass_cases_ratio = self.pass_cases_count / self.all_cases_count * 100
fail_cases_ratio = self.fail_cases_count / self.all_cases_count * 100
unexcuted_cases_ratio = 100.00 - pass_cases_ratio - fail_cases_ratio
pass_cases_ratio = format(pass_cases_ratio, '.2f')
fail_cases_ratio = format(fail_cases_ratio, '.2f')
unexcuted_cases_ratio = format(unexcuted_cases_ratio, '.2f')
pie_data = [pass_cases_ratio, fail_cases_ratio, unexcuted_cases_ratio]
# table_data是每个case的执行明细,pie_data是pass、fail、unexecuted百分比,pie_table_data是all、pass、fail、unexecuted用例数
new_report = report.GenerateReport()
new_report.generate_report(self.table_data, pie_data, pie_table_data, project_path)
if __name__ == "__main__":
# username = "hebingning"
# password = "hbn66AAA"
username = "baiguorui"
password = "baiguorui1"
path_dict = {}
vsys_id = 6
api_host = "http://192.168.44.72"
# project_path = "D:/python_script/tsg_policy_api"
project_path = "D:/Python Projects/Auto_api/tsg_policy_api"
temp_folder_path = f"{project_path}/data/log_temp"
path_dict["project_path"] = project_path
folder_list = os.listdir(temp_folder_path)
print("数据源包含的文件夹列表:", folder_list)
verify_res = Verify()
verify_res.find_json_files(temp_folder_path)
for j in range(len(folder_list)):
folder_path = temp_folder_path + '/' + folder_list[j]
path_dict["folder_path"] = folder_path
verify_res.start_verify(username, password, api_host, path_dict, vsys_id)
# verify_res.start_verify(username, password, test_pc_ip, api_host, is_log, path_dict, env, vsys_id, debug_json="security_deny_create_50w_fqdn.json")
verify_res.build_report()
|