diff options
| author | zhangchengwei <[email protected]> | 2024-04-24 10:26:18 +0800 |
|---|---|---|
| committer | zhangchengwei <[email protected]> | 2024-04-24 10:26:18 +0800 |
| commit | ea7857c1556872eb7e04b2c18ad03d3e1ba030f9 (patch) | |
| tree | f7ad49f89bb8151cf7af48c851ecf399289cb96d | |
| parent | db487d4a75bc65117b6a08483141055552ef831e (diff) | |
| parent | b65a9d7bd4a6c33d07c8d0f005a533ba6e2250d7 (diff) | |
Merge branch 'develop' of https://git.mesalab.cn/zhaokun/tsg_policy_api into develop
28 files changed, 203 insertions, 429 deletions
diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_app.json b/data/case_data/multi_policies_priority_data/allow_app-deny_app.json index 7e5f256..609977b 100644 --- a/data/case_data/multi_policies_priority_data/allow_app-deny_app.json +++ b/data/case_data/multi_policies_priority_data/allow_app-deny_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [], "obj_condition_2": [], diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json index 830c60d..5027fce 100644 --- a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json +++ b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [], "obj_condition_2": [ diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json index 0afacc8..b45d82b 100644 --- a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json +++ b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [], "obj_condition_2": [ diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json index ce14970..364f5eb 100644 --- a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [], "obj_condition_2": [ diff --git a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json index 6c3153c..fbf67fd 100644 --- a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json +++ b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json index 97ba799..24119db 100644 --- a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json index 6c3153c..fbf67fd 100644 --- a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json +++ b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json index e58a48a..9993cea 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json index 208a2fa..84f7bc3 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 1, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json index dbdb333..771b1d2 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 1, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json index 03737df..bf06e67 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json index 10b910f..5d5ef17 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json index 1e2158a..f06d619 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json @@ -5,8 +5,8 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, - "do_log_2": 1, + "do_log_1": 2, + "do_log_2": 2, "obj_condition_1": [ { "attribute_name": "ATTR_SOURCE_IP", diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json index 7ab79bb..06ce6a6 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json @@ -5,8 +5,8 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, - "do_log_2": 1, + "do_log_1": 2, + "do_log_2": 2, "obj_condition_1": [ { "attribute_name": "ATTR_SOURCE_IP", diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json index 3a41571..b68cdc4 100644 --- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json index 0ee95da..e53f6a0 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json index 61788b1..a2fb609 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json index 40fe255..abdf40f 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json index 5322f8a..e290d19 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json index 9035009..e091d60 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json index 2ae4ac5..b628750 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json index 2ae4ac5..b628750 100644 --- a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json +++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json @@ -5,7 +5,7 @@ "rule_action_1": "allow", "rule_action_2": "deny", "method_2": "drop", - "do_log_1": 0, + "do_log_1": 2, "do_log_2": 2, "obj_condition_1": [ { diff --git a/data/log_temp/proxy_event_intercept/session_record_fields.json b/data/log_temp/proxy_event_intercept/session_record_fields.json index 8c41700..2d96861 100644 --- a/data/log_temp/proxy_event_intercept/session_record_fields.json +++ b/data/log_temp/proxy_event_intercept/session_record_fields.json @@ -22,6 +22,7 @@ "int_fields_common_value": 1234, "string_fields_common_value": "1234", "source": "session_record", + "mysource": "session_record_intercept", "fields": [ { "name": "recv_time", diff --git a/data/log_temp/security_events/security_events_fields.json b/data/log_temp/security_events/security_events_fields.json index 0dc59a0..9b85241 100644 --- a/data/log_temp/security_events/security_events_fields.json +++ b/data/log_temp/security_events/security_events_fields.json @@ -1385,12 +1385,7 @@ }, { "name": "rdp_certificate_count", - "type": "int", - "doc": { - "constraints": { - "type": "decimal" - } - } + "type": "int" }, { "name": "rdp_certificate_permanent", @@ -25,7 +25,7 @@ class GetLog(): "page_no": 1, "page_size": 20, "source": "security_event", - "fields": None, + "columns": None, "start_time": "", "end_time": "", "filter": "", @@ -36,11 +36,13 @@ class GetLog(): for i in range(ruleNum): policy_id = list(create_policies_ids[i].values())[0] fields = self.get_log_schema(token, "security_event", api_host, vsys_id) - log_condition_dict['fields'] = fields + log_condition_dict['columns'] = fields log_condition_dict['start_time'] = start_time log_condition_dict['end_time'] = end_time log_condition_dict['vsys_id'] = vsys_id log_condition_dict['log_type'] = 'security_event' + log_condition_dict['identifier_name'] = 'security-event-list' + log_condition_dict['execution_mode'] = 'oneshot' # 从conditions中获取object_type判断是否存在subscriberid if len(condition["obj_condition_1"]) > 0 and condition["obj_condition_1"][0]["attribute_name"] == "ATTR_SUBSCRIBER_ID": log_filter = f"subscriber_id= 'test23' AND has(security_rule_list,{policy_id})" @@ -51,11 +53,12 @@ class GetLog(): log_condition_dict['filter'] = log_condition_dict['filter'].replace(f"client_ip={test_pc_ip}", f"client_ip='{test_pc_ip}'") url = api_host + "/v1/log/query" # print(json.dumps(log_condition_dict)) + print(log_condition_dict) response = requests.post(url, headers=headers, json=log_condition_dict, verify=False) assert response.status_code == 200 log_list = json.loads(response.text) # print(log_list) - log_list = log_list['data']['list'] + log_list = log_list['data']['result'] log_index = i + 1 log_query_params = condition['log_query_param_'+str(log_index)] if len(log_list) > 0 and len(log_query_params) > 0: @@ -109,6 +112,7 @@ class GetLog(): metric_result = True elif 'hits' in counter and counter['hits'] == 'many' and result_len > 0: metric = response['data']['result'][0] + print(metric) hits = metric['hits'] if hits > 0: metric_result = True @@ -513,40 +517,40 @@ class GetLog(): # if __name__ == '__main__': # ipObject = get_log_by_condition() # time.sleep(3) -if __name__ == '__main__': - api_host = "http://192.168.44.3" - v = verify.Verify() - username = "admin" - password = "admin" - v.encryptPwd(password, api_host) - token = v.login(username, api_host) - l = GetLog() - sc_info = { - 'app_name_1': [ +# if __name__ == '__main__': +# api_host = "http://192.168.44.3" +# v = verify.Verify() +# username = "admin" +# password = "admin" +# v.encryptPwd(password, api_host) +# token = v.login(username, api_host) +# l = GetLog() +# sc_info = { +# 'app_name_1': [ - ], - 'health_check_method': 'none', - 'rule_id': 311524, - 'sf_dest_ip': '2.2.2.57', - 'sf_id': 2096, - 'sf_method': 'vxlan_g', - 'sff_id': 2090, - 'targeted_traffic': 'raw', - 'type': 1 - } - sc_metric = { - 'total_packets': 347, - 'total_packets_sent': 97, - 'total_packets_received': 250, - 'total_bytes': 339823, - 'total_bytes_sent': 5892, - 'total_bytes_received': 333931, - 'total_syn_pkt': 1 -} - assert_key = {} - start_time = "2023-12-11T08:16:46Z" - end_time = "2023-12-11T08:20:31Z" - l.get_sc_metric(token,start_time,end_time,sc_info,sc_metric,api_host) +# ], +# 'health_check_method': 'none', +# 'rule_id': 311524, +# 'sf_dest_ip': '2.2.2.57', +# 'sf_id': 2096, +# 'sf_method': 'vxlan_g', +# 'sff_id': 2090, +# 'targeted_traffic': 'raw', +# 'type': 1 +# } +# sc_metric = { +# 'total_packets': 347, +# 'total_packets_sent': 97, +# 'total_packets_received': 250, +# 'total_bytes': 339823, +# 'total_bytes_sent': 5892, +# 'total_bytes_received': 333931, +# 'total_syn_pkt': 1 +# } +# assert_key = {} +# start_time = "2023-12-11T08:16:46Z" +# end_time = "2023-12-11T08:20:31Z" +# l.get_sc_metric(token,start_time,end_time,sc_info,sc_metric,api_host) # test = GetLog() # log_dict = { # "common_recv_time": "1698299472", diff --git a/log_query.py b/log_query.py index 3ddef20..84f25e2 100644 --- a/log_query.py +++ b/log_query.py @@ -6,6 +6,7 @@ import report import fnmatch import requests from datetime import datetime +import datetime class Verify(): @@ -56,23 +57,29 @@ class Verify(): for filename in os.listdir(folder_path): # 检查是否为json文件 if filename.endswith(".json"): - if debug_json != "": # debug单个文件 + if debug_json != "": # debug单个文件 debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json) if debug_flag == -1: break - else: # 拼接完整路径, 非debug单个文件 + else: # 拼接完整路径, 非debug单个文件 file_path = os.path.join(folder_path, filename) try: # 读取json文件 with open(file_path, 'r', encoding='utf-8') as f: - #print("当前执行的用例是:"+ folder_path+'/'+filename) - print("当前执行的用例是:"+ file_path) + # print("当前执行的用例是:"+ folder_path+'/'+filename) + print("当前执行的用例是:" + file_path) config = json.load(f) + now = datetime.utcnow() + one_minute_later = now + datetime.timedelta(minutes=1) + # print(one_minute_later) + start_time = now.strftime('%Y-%m-%dT%H:%M:%SZ') - end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ') + end_time = one_minute_later.strftime('%Y-%m-%dT%H:%M:%SZ') # 查询日志 - log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host, vsys_id) + # log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host, vsys_id) + log_result = self.operate_sourcedata(self.token, config, start_time, end_time, api_host, + vsys_id) # 生成报告的数据 self.excuted_cases_count += 1 report_data = [] @@ -98,48 +105,146 @@ class Verify(): # 修改完成后删除注释:将下方的security_event替换成变量 headers = {'Content-Type': 'application/json', 'Authorization': token} url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id) - print(url) + # print(url) response = requests.get(url, headers=headers) - print(response) + # print(response) assert response.status_code == 200 log_schema = json.loads(response.text) log_schema = log_schema['data']['fields'] log_schema = ",".join([field['name'] for field in log_schema]) return log_schema - - def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id): + + def operate_sourcedata(self, token, config, start_time, end_time, api_host, vsys_id): + # 从数据源中获得对应变量的值 + # log_filter = config['log_filter'] + int_fields_common_value = config['int_fields_common_value'] + string_fields_common_value = config['string_fields_common_value'] + source = config['source'] + try: + mysource = config['mysource'] + print(mysource) + except: + mysource = '' + fields = config['fields'] + for field in fields: + field_name = field['name'] + try: + field_type = field['type']['type'] + except: + field_type = field['type'] + + if field_type == 'long' or field_type == 'int': # 数字类型字段的运算语句 + try: + str_operators = field['doc']['constraints']['operator_functions'] + operators = [f'{item.strip()}' for item in str_operators.split(',')] + except: + operators = config['int_fields_common_operator'] + for operator in operators: + if operator == "in" or operator == "not in": + log_filter = ('({} {} ({}))').format(field_name, operator, int_fields_common_value) + # print(log_filter) + elif operator == "bitAnd": + log_filter = ('({}({},{}))').format(operator, field_name, int_fields_common_value) + # print(log_filter) + else: + log_filter = ('({}{}{})').format(field_name, operator, int_fields_common_value) + # print(log_filter) + self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source, + log_filter, mysource) + elif field_type == 'string': # 字符串类型字段的运算语句 + try: + str_operators = field['doc']['constraints']['operator_functions'] + operators = [f'{item.strip()}' for item in str_operators.split(',')] + except: + operators = config['str_fields_common_operator'] + # print(operators) + for operator in operators: + if operator == "=" or operator == "!=": + log_filter = ("({}{}'{}')").format(field_name, operator, string_fields_common_value) + # print(log_filter) + elif operator == "notEmpty" or operator == "empty": + log_filter = ('({}({}))').format(operator, field_name) + # print(log_filter) + else: + log_filter = ("({} {} ('{}'))").format(field_name, operator, string_fields_common_value) + # print(log_filter) + self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source, + log_filter, mysource) + elif field_type == 'array': # 列表类型字段的运算语句 + try: + str_operators = field['doc']['constraints']['operator_functions'] + operators = [f'{item.strip()}' for item in str_operators.split(',')] + except: + operators = config['str_fields_common_operator'] + # print(operators) + for operator in operators: + if operator == "has": + log_filter = ("({}({},{}))").format(operator, field_name, int_fields_common_value) + # print(log_filter) + else: + log_filter = ('({}({}))').format(operator, field_name) + # print(log_filter) + self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source, + log_filter, mysource) + + def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id, source, log_filter, + mysource): headers = {'Content-Type': 'application/json', 'Authorization': token} log_condition = { - "page_no": 1, - "page_size": 20, + "execution_mode": "oneshot", + "limit": "0,20", + "identifier_name": "security_event-list", "source": "security_event", - "fields": None, + "columns": None, "start_time": "", "end_time": "", "filter": "", - "vsys_id": 1 + "vsys_id": 1, + "interval": 1 } - # 从数据源中获得对应变量的值 - log_filter = config['log_filter'] - source = config['source'] log_condition_dict = json.loads(json.dumps(log_condition)) fields = self.get_log_schema(token, source, api_host, vsys_id) + log_condition_dict['filter'] = log_filter + if source == "security_event": + log_condition_dict['identifier_name'] = "security-event-list" + elif source == "monitor_event": + log_condition_dict['identifier_name'] = "monitor-event-list" + elif source == "monitor_event": + log_condition_dict['identifier_name'] = "monitor-event-list" + # elif source == "session_record_intercept": + # log_condition_dict['filter'] = "({}AND notEmpty(proxy_action)".format(log_filter) + elif source == "proxy_event": + log_condition_dict['identifier_name'] = "proxy-event-manipulation-list" + elif source == "session_record": + log_condition_dict['identifier_name'] = "session-record-list" + if mysource == "session_record_intercept": + log_condition_dict['filter'] = "({}AND notEmpty(proxy_action))".format(log_filter) + + elif source == "voip_record": + log_condition_dict['identifier_name'] = "voip-record-list" + else: + log_condition_dict['identifier_name'] = "dos-event-list" + # 用数据源中变量的值替换模板中变量的值 - log_condition_dict['fields'] = fields + log_condition_dict['columns'] = fields log_condition_dict['start_time'] = start_time log_condition_dict['end_time'] = end_time log_condition_dict['vsys_id'] = vsys_id log_condition_dict['source'] = source - log_condition_dict['filter'] = log_filter + # log_condition_dict['identifier_name'] = source+"-list" + url = api_host + "/v1/log/query" - # print(json.dumps(log_condition_dict)) - response = requests.post(url, headers=headers, json=log_condition_dict, verify=False) + print(url) + print(json.dumps(log_condition_dict)) + response = requests.post(url, headers=headers, json=log_condition_dict) + print(response) log_code = response.status_code if log_code == 200: log_result = True else: log_result = False + print(log_result) return log_result def is_debug_case_file(self, folder_path, debug_json): @@ -153,19 +258,21 @@ class Verify(): print("debug单个json文件不存在:{}".format(debug_json_abspath)) return -1, "" # 遍历文件夹,查看有多少个用例 + def find_json_files(self, directory): for _, _, files in os.walk(directory): for file in files: if fnmatch.fnmatch(file, '*.json'): self.all_cases_count += 1 + def build_report(self): unexcuted_cases_count = self.all_cases_count - self.excuted_cases_count pie_table_data = [ [self.all_cases_count, self.pass_cases_count, self.fail_cases_count, unexcuted_cases_count] ] - pass_cases_ratio = self.pass_cases_count/self.all_cases_count*100 - fail_cases_ratio = self.fail_cases_count/self.all_cases_count*100 - unexcuted_cases_ratio = 100.00-pass_cases_ratio-fail_cases_ratio + pass_cases_ratio = self.pass_cases_count / self.all_cases_count * 100 + fail_cases_ratio = self.fail_cases_count / self.all_cases_count * 100 + unexcuted_cases_ratio = 100.00 - pass_cases_ratio - fail_cases_ratio pass_cases_ratio = format(pass_cases_ratio, '.2f') fail_cases_ratio = format(fail_cases_ratio, '.2f') unexcuted_cases_ratio = format(unexcuted_cases_ratio, '.2f') @@ -174,6 +281,7 @@ class Verify(): new_report = report.GenerateReport() new_report.generate_report(self.table_data, pie_data, pie_table_data, project_path) + if __name__ == "__main__": # username = "hebingning" # password = "hbn66AAA" diff --git a/log_query_4test.py b/log_query_4test.py deleted file mode 100644 index 284e17d..0000000 --- a/log_query_4test.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/python3 -# coding=utf-8 -import json -import os -import report -import fnmatch -import requests -from datetime import datetime - - -class Verify(): - def __init__(self): - self.password = "" - self.token = "" - self.table_data = [] - self.all_cases_count = 0 - self.excuted_cases_count = 0 - self.pass_cases_count = 0 - self.fail_cases_count = 0 - - def encryptPwd(self, pwd, api_host): - url = api_host + "/v1/user/encryptpwd" - pwJson = {"password": ""} - pwJson["password"] = pwd - response = requests.get(url, params=pwJson) - data = json.loads(response.text) - self.password = data["data"]["encryptpwd"] - return self.password - - def login(self, user, api_host): - url = api_host + "/v1/user/login" - loginJson = {"username": "", "password": ""} - loginJson["username"] = user - loginJson["password"] = self.password - response = requests.post(url, json=loginJson) - jsonData = json.loads(response.text) - self.token = jsonData["data"]["token"] - return self.token - - def start_verify(self, user, password, api_host, path_dict, vsys_id, debug_json=""): - """ - :param user: - :param password: - :param api_host: - :param path_dict: 路径字典,需要使用路径参数,在此变量中 - :param vsys_id - :param debug_json: - :return: - """ - self.encryptPwd(password, api_host) - self.login(user, api_host) - debug_flag = 0 - folder_path = path_dict["folder_path"] - self.table_data.append(["Name", "Result", "Failure Reason"]) - # 循环读取文件夹中的json文件,每个json文件代表一个用例 - for filename in os.listdir(folder_path): - # 检查是否为json文件 - if filename.endswith(".json"): - if debug_json != "": # debug单个文件 - debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json) - if debug_flag == -1: - break - else: # 拼接完整路径, 非debug单个文件 - file_path = os.path.join(folder_path, filename) - try: - # 读取json文件 - with open(file_path, 'r', encoding='utf-8') as f: - # print("当前执行的用例是:"+ folder_path+'/'+filename) - print("当前执行的用例是:" + file_path) - config = json.load(f) - now = datetime.utcnow() - start_time = now.strftime('%Y-%m-%dT%H:%M:%SZ') - end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ') - # 查询日志 - # log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host, - # vsys_id) - log_result = self.operate_sourcedata(self.token, config, start_time, end_time, api_host, - vsys_id) - # 生成报告的数据 - self.excuted_cases_count += 1 - report_data = [] - case_name = os.path.splitext(filename)[0] - report_data.append(case_name) - if log_result == False: - result = "Fail" - self.fail_cases_count += 1 - else: - result = "PASS" - self.pass_cases_count += 1 - report_data.append(result) - failure_reason = "" - if log_result == False: - failure_reason = failure_reason + "The code returned is not 200." - report_data.append(failure_reason) - self.table_data.append(report_data) - finally: - if debug_flag == 1: # 只debug单个文件 - break # 遇到json文件绝对路径,只执行一次 - - # def get_test_cases_len(self,debug_json=""): - # folder_path = path_dict["folder_path"] - # debug_flag = 0 - # for filename in os.listdir(folder_path): - # if filename.endswith(".json"): - # if debug_json != "": # debug单个文件 - # debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json) - # if debug_flag == -1: - # break - # else: # 拼接完整路径, 非debug单个文件 - # file_path = os.path.join(folder_path, filename) - # try: - # # 读取json文件 - # with open(file_path, 'r', encoding='utf-8') as f: - # config = json.load(f) - # test_cases = config['TestCases'] - # finally: - # if debug_flag == 1: # 只debug单个文件 - # break # 遇到json文件绝对路径,只执行一次 - # return len(test_cases) - - def get_log_schema(self, token, log_type, api_host, vsys_id): - # 修改完成后删除注释:将下方的security_event替换成变量 - headers = {'Content-Type': 'application/json', 'Authorization': token} - url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id) - response = requests.get(url, headers=headers) - assert response.status_code == 200 - log_schema = json.loads(response.text) - log_schema = log_schema['data']['fields'] - log_schema = ",".join([field['name'] for field in log_schema]) - return log_schema - - # def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id): - # headers = {'Content-Type': 'application/json', 'Authorization': token} - # log_condition = { - # "page_no": 1, - # "page_size": 20, - # "source": "security_event", - # "fields": None, - # "start_time": "", - # "end_time": "", - # "filter": "", - # "vsys_id": 1 - # } - # # 从数据源中获得对应变量的值 - # # log_filter = config['log_filter'] - # source = config['source'] - # print(source) - # test_cases = config['TestCases'] - # for i in test_cases: - # log_filter = i['log_filter'] - # # source = i['source'] - # print(log_filter) - # log_condition_dict = json.loads(json.dumps(log_condition)) - # fields = self.get_log_schema(token, source, api_host, vsys_id) - # # 用数据源中变量的值替换模板中变量的值 - # log_condition_dict['fields'] = fields - # log_condition_dict['start_time'] = start_time - # log_condition_dict['end_time'] = end_time - # log_condition_dict['vsys_id'] = vsys_id - # log_condition_dict['source'] = source - # log_condition_dict['filter'] = log_filter - # - # url = api_host + "/v1/log/query" - # # print(json.dumps(log_condition_dict)) - # response = requests.post(url, headers=headers, json=log_condition_dict) - # print(response) - # log_code = response.status_code - # if log_code == 200: - # log_result = True - # else: - # log_result = False - # print(log_result) - # return log_result - def operate_sourcedata(self, token, config, start_time, end_time, api_host, vsys_id): - # 从数据源中获得对应变量的值 - # log_filter = config['log_filter'] - int_fields_common_value = config['int_fields_common_value'] - string_fields_common_value = config['string_fields_common_value'] - source = config['source'] - # print(source) - fields = config['fields'] - for field in fields: - field_name = field['name'] - try: - field_type = field['type']['type'] - except: - field_type = field['type'] - - if field_type == 'long' or field_type == 'int': # 数字类型字段的运算语句 - try: - str_operators = field['doc']['constraints']['operator_functions'] - operators = [f'{item.strip()}' for item in str_operators.split(',')] - except: - operators = config['int_fields_common_operator'] - for operator in operators: - if operator == "in" or operator == "not in": - log_filter = ('({} {} ({}))').format(field_name, operator, int_fields_common_value) - # print(log_filter) - elif operator == "bitAnd": - log_filter = ('({}({},{}))').format(operator, field_name, int_fields_common_value) - # print(log_filter) - else: - log_filter = ('({}{}{})').format(field_name, operator, int_fields_common_value) - # print(log_filter) - self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source, - log_filter) - elif field_type == 'string': # 字符串类型字段的运算语句 - try: - str_operators = field['doc']['constraints']['operator_functions'] - operators = [f'{item.strip()}' for item in str_operators.split(',')] - except: - operators = config['str_fields_common_operator'] - # print(operators) - for operator in operators: - if operator == "=" or operator == "!=": - log_filter = ("({}{}'{}')").format(field_name, operator, string_fields_common_value) - # print(log_filter) - elif operator == "notEmpty" or operator == "empty": - log_filter = ('({}({}))').format(operator, field_name) - # print(log_filter) - else: - log_filter = ("({} {} ('{}'))").format(field_name, operator, string_fields_common_value) - # print(log_filter) - self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source, - log_filter) - elif field_type == 'array': # 列表类型字段的运算语句 - try: - str_operators = field['doc']['constraints']['operator_functions'] - operators = [f'{item.strip()}' for item in str_operators.split(',')] - except: - operators = config['str_fields_common_operator'] - # print(operators) - for operator in operators: - if operator == "has": - log_filter = ("({}({},{}))").format(operator, field_name, int_fields_common_value) - # print(log_filter) - else: - log_filter = ('({}({}))').format(operator, field_name) - # print(log_filter) - self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source, - log_filter) - - def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id, source, log_filter): - headers = {'Content-Type': 'application/json', 'Authorization': token} - log_condition = { - "page_no": 1, - "page_size": 20, - "source": "security_event", - "fields": None, - "start_time": "", - "end_time": "", - "filter": "", - "vsys_id": 1 - } - - log_condition_dict = json.loads(json.dumps(log_condition)) - fields = self.get_log_schema(token, source, api_host, vsys_id) - # 用数据源中变量的值替换模板中变量的值 - log_condition_dict['fields'] = fields - log_condition_dict['start_time'] = start_time - log_condition_dict['end_time'] = end_time - log_condition_dict['vsys_id'] = vsys_id - log_condition_dict['source'] = source - log_condition_dict['filter'] = log_filter - - url = api_host + "/v1/log/query" - # print(json.dumps(log_condition_dict)) - response = requests.post(url, headers=headers, json=log_condition_dict) - # print(response) - log_code = response.status_code - if log_code == 200: - log_result = True - else: - log_result = False - # print(log_result) - return log_result - - def is_debug_case_file(self, folder_path, debug_json): - debug_json_abspath = os.path.join(folder_path, debug_json) - if os.path.exists(debug_json_abspath): - debug_flag = 1 - file_path = debug_json_abspath - return debug_flag, file_path - else: - # raise Exception("debug单个json文件不存在:{}".format(debug_json_abspath)) - print("debug单个json文件不存在:{}".format(debug_json_abspath)) - return -1, "" - # 遍历文件夹,查看有多少个用例 - - def find_json_files(self, directory): - for _, _, files in os.walk(directory): - for file in files: - if fnmatch.fnmatch(file, '*.json'): - self.all_cases_count += 1 - - def build_report(self): - unexcuted_cases_count = self.all_cases_count - self.excuted_cases_count - pie_table_data = [ - [self.all_cases_count, self.pass_cases_count, self.fail_cases_count, unexcuted_cases_count] - ] - pass_cases_ratio = self.pass_cases_count / self.all_cases_count * 100 - fail_cases_ratio = self.fail_cases_count / self.all_cases_count * 100 - unexcuted_cases_ratio = 100.00 - pass_cases_ratio - fail_cases_ratio - pass_cases_ratio = format(pass_cases_ratio, '.2f') - fail_cases_ratio = format(fail_cases_ratio, '.2f') - unexcuted_cases_ratio = format(unexcuted_cases_ratio, '.2f') - pie_data = [pass_cases_ratio, fail_cases_ratio, unexcuted_cases_ratio] - # table_data是每个case的执行明细,pie_data是pass、fail、unexecuted百分比,pie_table_data是all、pass、fail、unexecuted用例数 - new_report = report.GenerateReport() - new_report.generate_report(self.table_data, pie_data, pie_table_data, project_path) - - -if __name__ == "__main__": - # username = "hebingning" - # password = "hbn66AAA" - username = "baiguorui" - password = "baiguorui1" - path_dict = {} - vsys_id = 6 - api_host = "http://192.168.44.72" - # project_path = "D:/python_script/tsg_policy_api" - project_path = "D:/Python Projects/Auto_api/tsg_policy_api" - temp_folder_path = f"{project_path}/data/log_temp" - path_dict["project_path"] = project_path - folder_list = os.listdir(temp_folder_path) - print("数据源包含的文件夹列表:", folder_list) - verify_res = Verify() - verify_res.find_json_files(temp_folder_path) - for j in range(len(folder_list)): - folder_path = temp_folder_path + '/' + folder_list[j] - path_dict["folder_path"] = folder_path - verify_res.start_verify(username, password, api_host, path_dict, vsys_id) - # verify_res.start_verify(username, password, test_pc_ip, api_host, is_log, path_dict, env, vsys_id, debug_json="security_deny_create_50w_fqdn.json") - verify_res.build_report() @@ -310,7 +310,7 @@ class Verify(): if is_log == 1: if is_log_query == 1: # 考虑到有5分钟后才出日志的情况,循环5次 - for _ in range(5): + for _ in range(7): now = datetime.utcnow() end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ') # 等待产生日志 @@ -322,12 +322,12 @@ class Verify(): # 此时代表不需要进行logs模块的校验,可能为security shunt动作或statistics,sc策略等 log_result = True if is_counters == 1: - for _ in range(5): + for _ in range(9): now = datetime.utcnow() end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ') time.sleep(30) metric_result = log.get_metric(self.token, self.rule_num, start_time, end_time, self.create_policies_ids, condition, api_host, vsys_id) - if log_result == True: + if metric_result == True: break elif is_counters == 0: metric_result = True @@ -750,10 +750,10 @@ class Verify(): if __name__ == "__main__": username = "hebingning" password = "hbn66AAA" - test_pc_ip = "192.168.64.76" + test_pc_ip = "192.168.64.73" path_dict = {} env = "tsgx" - is_log = 0 + is_log = 1 sleep_time = 30 vsys_id = 6 api_host = "http://192.168.44.72" |
