summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhangchengwei <[email protected]>2024-04-24 10:26:18 +0800
committerzhangchengwei <[email protected]>2024-04-24 10:26:18 +0800
commitea7857c1556872eb7e04b2c18ad03d3e1ba030f9 (patch)
treef7ad49f89bb8151cf7af48c851ecf399289cb96d
parentdb487d4a75bc65117b6a08483141055552ef831e (diff)
parentb65a9d7bd4a6c33d07c8d0f005a533ba6e2250d7 (diff)
Merge branch 'develop' of https://git.mesalab.cn/zhaokun/tsg_policy_api into develop
-rw-r--r--data/case_data/multi_policies_priority_data/allow_app-deny_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json4
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json4
-rw-r--r--data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json2
-rw-r--r--data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json2
-rw-r--r--data/log_temp/proxy_event_intercept/session_record_fields.json1
-rw-r--r--data/log_temp/security_events/security_events_fields.json7
-rw-r--r--getLog.py76
-rw-r--r--log_query.py156
-rw-r--r--log_query_4test.py334
-rw-r--r--verify.py10
28 files changed, 203 insertions, 429 deletions
diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_app.json b/data/case_data/multi_policies_priority_data/allow_app-deny_app.json
index 7e5f256..609977b 100644
--- a/data/case_data/multi_policies_priority_data/allow_app-deny_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_app-deny_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [],
"obj_condition_2": [],
diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json
index 830c60d..5027fce 100644
--- a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [],
"obj_condition_2": [
diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json
index 0afacc8..b45d82b 100644
--- a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [],
"obj_condition_2": [
diff --git a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json
index ce14970..364f5eb 100644
--- a/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_app-deny_ip_flag_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [],
"obj_condition_2": [
diff --git a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json
index 6c3153c..fbf67fd 100644
--- a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json
index 97ba799..24119db 100644
--- a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json
index 6c3153c..fbf67fd 100644
--- a/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_flag-deny_flag_sni_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json
index e58a48a..9993cea 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json
index 208a2fa..84f7bc3 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 1,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json
index dbdb333..771b1d2 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_flag_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 1,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json
index 03737df..bf06e67 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_app-deny_ip_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json
index 10b910f..5d5ef17 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json
index 1e2158a..f06d619 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_app.json
@@ -5,8 +5,8 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
- "do_log_2": 1,
+ "do_log_1": 2,
+ "do_log_2": 2,
"obj_condition_1": [
{
"attribute_name": "ATTR_SOURCE_IP",
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json
index 7ab79bb..06ce6a6 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_flag_sni.json
@@ -5,8 +5,8 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
- "do_log_2": 1,
+ "do_log_1": 2,
+ "do_log_2": 2,
"obj_condition_1": [
{
"attribute_name": "ATTR_SOURCE_IP",
diff --git a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json
index 3a41571..b68cdc4 100644
--- a/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_ip_flag_sni-deny_ip_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json
index 0ee95da..e53f6a0 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json
index 61788b1..a2fb609 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json
index 40fe255..abdf40f 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_app.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json
index 5322f8a..e290d19 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_flag_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json
index 9035009..e091d60 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_ip_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json
index 2ae4ac5..b628750 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni copy.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json
index 2ae4ac5..b628750 100644
--- a/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json
+++ b/data/case_data/multi_policies_priority_data/allow_sni-deny_sni.json
@@ -5,7 +5,7 @@
"rule_action_1": "allow",
"rule_action_2": "deny",
"method_2": "drop",
- "do_log_1": 0,
+ "do_log_1": 2,
"do_log_2": 2,
"obj_condition_1": [
{
diff --git a/data/log_temp/proxy_event_intercept/session_record_fields.json b/data/log_temp/proxy_event_intercept/session_record_fields.json
index 8c41700..2d96861 100644
--- a/data/log_temp/proxy_event_intercept/session_record_fields.json
+++ b/data/log_temp/proxy_event_intercept/session_record_fields.json
@@ -22,6 +22,7 @@
"int_fields_common_value": 1234,
"string_fields_common_value": "1234",
"source": "session_record",
+ "mysource": "session_record_intercept",
"fields": [
{
"name": "recv_time",
diff --git a/data/log_temp/security_events/security_events_fields.json b/data/log_temp/security_events/security_events_fields.json
index 0dc59a0..9b85241 100644
--- a/data/log_temp/security_events/security_events_fields.json
+++ b/data/log_temp/security_events/security_events_fields.json
@@ -1385,12 +1385,7 @@
},
{
"name": "rdp_certificate_count",
- "type": "int",
- "doc": {
- "constraints": {
- "type": "decimal"
- }
- }
+ "type": "int"
},
{
"name": "rdp_certificate_permanent",
diff --git a/getLog.py b/getLog.py
index ade497b..b85c750 100644
--- a/getLog.py
+++ b/getLog.py
@@ -25,7 +25,7 @@ class GetLog():
"page_no": 1,
"page_size": 20,
"source": "security_event",
- "fields": None,
+ "columns": None,
"start_time": "",
"end_time": "",
"filter": "",
@@ -36,11 +36,13 @@ class GetLog():
for i in range(ruleNum):
policy_id = list(create_policies_ids[i].values())[0]
fields = self.get_log_schema(token, "security_event", api_host, vsys_id)
- log_condition_dict['fields'] = fields
+ log_condition_dict['columns'] = fields
log_condition_dict['start_time'] = start_time
log_condition_dict['end_time'] = end_time
log_condition_dict['vsys_id'] = vsys_id
log_condition_dict['log_type'] = 'security_event'
+ log_condition_dict['identifier_name'] = 'security-event-list'
+ log_condition_dict['execution_mode'] = 'oneshot'
# 从conditions中获取object_type判断是否存在subscriberid
if len(condition["obj_condition_1"]) > 0 and condition["obj_condition_1"][0]["attribute_name"] == "ATTR_SUBSCRIBER_ID":
log_filter = f"subscriber_id= 'test23' AND has(security_rule_list,{policy_id})"
@@ -51,11 +53,12 @@ class GetLog():
log_condition_dict['filter'] = log_condition_dict['filter'].replace(f"client_ip={test_pc_ip}", f"client_ip='{test_pc_ip}'")
url = api_host + "/v1/log/query"
# print(json.dumps(log_condition_dict))
+ print(log_condition_dict)
response = requests.post(url, headers=headers, json=log_condition_dict, verify=False)
assert response.status_code == 200
log_list = json.loads(response.text)
# print(log_list)
- log_list = log_list['data']['list']
+ log_list = log_list['data']['result']
log_index = i + 1
log_query_params = condition['log_query_param_'+str(log_index)]
if len(log_list) > 0 and len(log_query_params) > 0:
@@ -109,6 +112,7 @@ class GetLog():
metric_result = True
elif 'hits' in counter and counter['hits'] == 'many' and result_len > 0:
metric = response['data']['result'][0]
+ print(metric)
hits = metric['hits']
if hits > 0:
metric_result = True
@@ -513,40 +517,40 @@ class GetLog():
# if __name__ == '__main__':
# ipObject = get_log_by_condition()
# time.sleep(3)
-if __name__ == '__main__':
- api_host = "http://192.168.44.3"
- v = verify.Verify()
- username = "admin"
- password = "admin"
- v.encryptPwd(password, api_host)
- token = v.login(username, api_host)
- l = GetLog()
- sc_info = {
- 'app_name_1': [
+# if __name__ == '__main__':
+# api_host = "http://192.168.44.3"
+# v = verify.Verify()
+# username = "admin"
+# password = "admin"
+# v.encryptPwd(password, api_host)
+# token = v.login(username, api_host)
+# l = GetLog()
+# sc_info = {
+# 'app_name_1': [
- ],
- 'health_check_method': 'none',
- 'rule_id': 311524,
- 'sf_dest_ip': '2.2.2.57',
- 'sf_id': 2096,
- 'sf_method': 'vxlan_g',
- 'sff_id': 2090,
- 'targeted_traffic': 'raw',
- 'type': 1
- }
- sc_metric = {
- 'total_packets': 347,
- 'total_packets_sent': 97,
- 'total_packets_received': 250,
- 'total_bytes': 339823,
- 'total_bytes_sent': 5892,
- 'total_bytes_received': 333931,
- 'total_syn_pkt': 1
-}
- assert_key = {}
- start_time = "2023-12-11T08:16:46Z"
- end_time = "2023-12-11T08:20:31Z"
- l.get_sc_metric(token,start_time,end_time,sc_info,sc_metric,api_host)
+# ],
+# 'health_check_method': 'none',
+# 'rule_id': 311524,
+# 'sf_dest_ip': '2.2.2.57',
+# 'sf_id': 2096,
+# 'sf_method': 'vxlan_g',
+# 'sff_id': 2090,
+# 'targeted_traffic': 'raw',
+# 'type': 1
+# }
+# sc_metric = {
+# 'total_packets': 347,
+# 'total_packets_sent': 97,
+# 'total_packets_received': 250,
+# 'total_bytes': 339823,
+# 'total_bytes_sent': 5892,
+# 'total_bytes_received': 333931,
+# 'total_syn_pkt': 1
+# }
+# assert_key = {}
+# start_time = "2023-12-11T08:16:46Z"
+# end_time = "2023-12-11T08:20:31Z"
+# l.get_sc_metric(token,start_time,end_time,sc_info,sc_metric,api_host)
# test = GetLog()
# log_dict = {
# "common_recv_time": "1698299472",
diff --git a/log_query.py b/log_query.py
index 3ddef20..84f25e2 100644
--- a/log_query.py
+++ b/log_query.py
@@ -6,6 +6,7 @@ import report
import fnmatch
import requests
from datetime import datetime
+import datetime
class Verify():
@@ -56,23 +57,29 @@ class Verify():
for filename in os.listdir(folder_path):
# 检查是否为json文件
if filename.endswith(".json"):
- if debug_json != "": # debug单个文件
+ if debug_json != "": # debug单个文件
debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json)
if debug_flag == -1:
break
- else: # 拼接完整路径, 非debug单个文件
+ else: # 拼接完整路径, 非debug单个文件
file_path = os.path.join(folder_path, filename)
try:
# 读取json文件
with open(file_path, 'r', encoding='utf-8') as f:
- #print("当前执行的用例是:"+ folder_path+'/'+filename)
- print("当前执行的用例是:"+ file_path)
+ # print("当前执行的用例是:"+ folder_path+'/'+filename)
+ print("当前执行的用例是:" + file_path)
config = json.load(f)
+
now = datetime.utcnow()
+ one_minute_later = now + datetime.timedelta(minutes=1)
+ # print(one_minute_later)
+
start_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
- end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
+ end_time = one_minute_later.strftime('%Y-%m-%dT%H:%M:%SZ')
# 查询日志
- log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host, vsys_id)
+ # log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host, vsys_id)
+ log_result = self.operate_sourcedata(self.token, config, start_time, end_time, api_host,
+ vsys_id)
# 生成报告的数据
self.excuted_cases_count += 1
report_data = []
@@ -98,48 +105,146 @@ class Verify():
# 修改完成后删除注释:将下方的security_event替换成变量
headers = {'Content-Type': 'application/json', 'Authorization': token}
url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id)
- print(url)
+ # print(url)
response = requests.get(url, headers=headers)
- print(response)
+ # print(response)
assert response.status_code == 200
log_schema = json.loads(response.text)
log_schema = log_schema['data']['fields']
log_schema = ",".join([field['name'] for field in log_schema])
return log_schema
-
- def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id):
+
+ def operate_sourcedata(self, token, config, start_time, end_time, api_host, vsys_id):
+ # 从数据源中获得对应变量的值
+ # log_filter = config['log_filter']
+ int_fields_common_value = config['int_fields_common_value']
+ string_fields_common_value = config['string_fields_common_value']
+ source = config['source']
+ try:
+ mysource = config['mysource']
+ print(mysource)
+ except:
+ mysource = ''
+ fields = config['fields']
+ for field in fields:
+ field_name = field['name']
+ try:
+ field_type = field['type']['type']
+ except:
+ field_type = field['type']
+
+ if field_type == 'long' or field_type == 'int': # 数字类型字段的运算语句
+ try:
+ str_operators = field['doc']['constraints']['operator_functions']
+ operators = [f'{item.strip()}' for item in str_operators.split(',')]
+ except:
+ operators = config['int_fields_common_operator']
+ for operator in operators:
+ if operator == "in" or operator == "not in":
+ log_filter = ('({} {} ({}))').format(field_name, operator, int_fields_common_value)
+ # print(log_filter)
+ elif operator == "bitAnd":
+ log_filter = ('({}({},{}))').format(operator, field_name, int_fields_common_value)
+ # print(log_filter)
+ else:
+ log_filter = ('({}{}{})').format(field_name, operator, int_fields_common_value)
+ # print(log_filter)
+ self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
+ log_filter, mysource)
+ elif field_type == 'string': # 字符串类型字段的运算语句
+ try:
+ str_operators = field['doc']['constraints']['operator_functions']
+ operators = [f'{item.strip()}' for item in str_operators.split(',')]
+ except:
+ operators = config['str_fields_common_operator']
+ # print(operators)
+ for operator in operators:
+ if operator == "=" or operator == "!=":
+ log_filter = ("({}{}'{}')").format(field_name, operator, string_fields_common_value)
+ # print(log_filter)
+ elif operator == "notEmpty" or operator == "empty":
+ log_filter = ('({}({}))').format(operator, field_name)
+ # print(log_filter)
+ else:
+ log_filter = ("({} {} ('{}'))").format(field_name, operator, string_fields_common_value)
+ # print(log_filter)
+ self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
+ log_filter, mysource)
+ elif field_type == 'array': # 列表类型字段的运算语句
+ try:
+ str_operators = field['doc']['constraints']['operator_functions']
+ operators = [f'{item.strip()}' for item in str_operators.split(',')]
+ except:
+ operators = config['str_fields_common_operator']
+ # print(operators)
+ for operator in operators:
+ if operator == "has":
+ log_filter = ("({}({},{}))").format(operator, field_name, int_fields_common_value)
+ # print(log_filter)
+ else:
+ log_filter = ('({}({}))').format(operator, field_name)
+ # print(log_filter)
+ self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
+ log_filter, mysource)
+
+ def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id, source, log_filter,
+ mysource):
headers = {'Content-Type': 'application/json', 'Authorization': token}
log_condition = {
- "page_no": 1,
- "page_size": 20,
+ "execution_mode": "oneshot",
+ "limit": "0,20",
+ "identifier_name": "security_event-list",
"source": "security_event",
- "fields": None,
+ "columns": None,
"start_time": "",
"end_time": "",
"filter": "",
- "vsys_id": 1
+ "vsys_id": 1,
+ "interval": 1
}
- # 从数据源中获得对应变量的值
- log_filter = config['log_filter']
- source = config['source']
log_condition_dict = json.loads(json.dumps(log_condition))
fields = self.get_log_schema(token, source, api_host, vsys_id)
+ log_condition_dict['filter'] = log_filter
+ if source == "security_event":
+ log_condition_dict['identifier_name'] = "security-event-list"
+ elif source == "monitor_event":
+ log_condition_dict['identifier_name'] = "monitor-event-list"
+ elif source == "monitor_event":
+ log_condition_dict['identifier_name'] = "monitor-event-list"
+ # elif source == "session_record_intercept":
+ # log_condition_dict['filter'] = "({}AND notEmpty(proxy_action)".format(log_filter)
+ elif source == "proxy_event":
+ log_condition_dict['identifier_name'] = "proxy-event-manipulation-list"
+ elif source == "session_record":
+ log_condition_dict['identifier_name'] = "session-record-list"
+ if mysource == "session_record_intercept":
+ log_condition_dict['filter'] = "({}AND notEmpty(proxy_action))".format(log_filter)
+
+ elif source == "voip_record":
+ log_condition_dict['identifier_name'] = "voip-record-list"
+ else:
+ log_condition_dict['identifier_name'] = "dos-event-list"
+
# 用数据源中变量的值替换模板中变量的值
- log_condition_dict['fields'] = fields
+ log_condition_dict['columns'] = fields
log_condition_dict['start_time'] = start_time
log_condition_dict['end_time'] = end_time
log_condition_dict['vsys_id'] = vsys_id
log_condition_dict['source'] = source
- log_condition_dict['filter'] = log_filter
+ # log_condition_dict['identifier_name'] = source+"-list"
+
url = api_host + "/v1/log/query"
- # print(json.dumps(log_condition_dict))
- response = requests.post(url, headers=headers, json=log_condition_dict, verify=False)
+ print(url)
+ print(json.dumps(log_condition_dict))
+ response = requests.post(url, headers=headers, json=log_condition_dict)
+ print(response)
log_code = response.status_code
if log_code == 200:
log_result = True
else:
log_result = False
+ print(log_result)
return log_result
def is_debug_case_file(self, folder_path, debug_json):
@@ -153,19 +258,21 @@ class Verify():
print("debug单个json文件不存在:{}".format(debug_json_abspath))
return -1, ""
# 遍历文件夹,查看有多少个用例
+
def find_json_files(self, directory):
for _, _, files in os.walk(directory):
for file in files:
if fnmatch.fnmatch(file, '*.json'):
self.all_cases_count += 1
+
def build_report(self):
unexcuted_cases_count = self.all_cases_count - self.excuted_cases_count
pie_table_data = [
[self.all_cases_count, self.pass_cases_count, self.fail_cases_count, unexcuted_cases_count]
]
- pass_cases_ratio = self.pass_cases_count/self.all_cases_count*100
- fail_cases_ratio = self.fail_cases_count/self.all_cases_count*100
- unexcuted_cases_ratio = 100.00-pass_cases_ratio-fail_cases_ratio
+ pass_cases_ratio = self.pass_cases_count / self.all_cases_count * 100
+ fail_cases_ratio = self.fail_cases_count / self.all_cases_count * 100
+ unexcuted_cases_ratio = 100.00 - pass_cases_ratio - fail_cases_ratio
pass_cases_ratio = format(pass_cases_ratio, '.2f')
fail_cases_ratio = format(fail_cases_ratio, '.2f')
unexcuted_cases_ratio = format(unexcuted_cases_ratio, '.2f')
@@ -174,6 +281,7 @@ class Verify():
new_report = report.GenerateReport()
new_report.generate_report(self.table_data, pie_data, pie_table_data, project_path)
+
if __name__ == "__main__":
# username = "hebingning"
# password = "hbn66AAA"
diff --git a/log_query_4test.py b/log_query_4test.py
deleted file mode 100644
index 284e17d..0000000
--- a/log_query_4test.py
+++ /dev/null
@@ -1,334 +0,0 @@
-#!/usr/bin/python3
-# coding=utf-8
-import json
-import os
-import report
-import fnmatch
-import requests
-from datetime import datetime
-
-
-class Verify():
- def __init__(self):
- self.password = ""
- self.token = ""
- self.table_data = []
- self.all_cases_count = 0
- self.excuted_cases_count = 0
- self.pass_cases_count = 0
- self.fail_cases_count = 0
-
- def encryptPwd(self, pwd, api_host):
- url = api_host + "/v1/user/encryptpwd"
- pwJson = {"password": ""}
- pwJson["password"] = pwd
- response = requests.get(url, params=pwJson)
- data = json.loads(response.text)
- self.password = data["data"]["encryptpwd"]
- return self.password
-
- def login(self, user, api_host):
- url = api_host + "/v1/user/login"
- loginJson = {"username": "", "password": ""}
- loginJson["username"] = user
- loginJson["password"] = self.password
- response = requests.post(url, json=loginJson)
- jsonData = json.loads(response.text)
- self.token = jsonData["data"]["token"]
- return self.token
-
- def start_verify(self, user, password, api_host, path_dict, vsys_id, debug_json=""):
- """
- :param user:
- :param password:
- :param api_host:
- :param path_dict: 路径字典,需要使用路径参数,在此变量中
- :param vsys_id
- :param debug_json:
- :return:
- """
- self.encryptPwd(password, api_host)
- self.login(user, api_host)
- debug_flag = 0
- folder_path = path_dict["folder_path"]
- self.table_data.append(["Name", "Result", "Failure Reason"])
- # 循环读取文件夹中的json文件,每个json文件代表一个用例
- for filename in os.listdir(folder_path):
- # 检查是否为json文件
- if filename.endswith(".json"):
- if debug_json != "": # debug单个文件
- debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json)
- if debug_flag == -1:
- break
- else: # 拼接完整路径, 非debug单个文件
- file_path = os.path.join(folder_path, filename)
- try:
- # 读取json文件
- with open(file_path, 'r', encoding='utf-8') as f:
- # print("当前执行的用例是:"+ folder_path+'/'+filename)
- print("当前执行的用例是:" + file_path)
- config = json.load(f)
- now = datetime.utcnow()
- start_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
- end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
- # 查询日志
- # log_result = self.get_log_by_condition(self.token, config, start_time, end_time, api_host,
- # vsys_id)
- log_result = self.operate_sourcedata(self.token, config, start_time, end_time, api_host,
- vsys_id)
- # 生成报告的数据
- self.excuted_cases_count += 1
- report_data = []
- case_name = os.path.splitext(filename)[0]
- report_data.append(case_name)
- if log_result == False:
- result = "Fail"
- self.fail_cases_count += 1
- else:
- result = "PASS"
- self.pass_cases_count += 1
- report_data.append(result)
- failure_reason = ""
- if log_result == False:
- failure_reason = failure_reason + "The code returned is not 200."
- report_data.append(failure_reason)
- self.table_data.append(report_data)
- finally:
- if debug_flag == 1: # 只debug单个文件
- break # 遇到json文件绝对路径,只执行一次
-
- # def get_test_cases_len(self,debug_json=""):
- # folder_path = path_dict["folder_path"]
- # debug_flag = 0
- # for filename in os.listdir(folder_path):
- # if filename.endswith(".json"):
- # if debug_json != "": # debug单个文件
- # debug_flag, file_path = self.is_debug_case_file(folder_path, debug_json)
- # if debug_flag == -1:
- # break
- # else: # 拼接完整路径, 非debug单个文件
- # file_path = os.path.join(folder_path, filename)
- # try:
- # # 读取json文件
- # with open(file_path, 'r', encoding='utf-8') as f:
- # config = json.load(f)
- # test_cases = config['TestCases']
- # finally:
- # if debug_flag == 1: # 只debug单个文件
- # break # 遇到json文件绝对路径,只执行一次
- # return len(test_cases)
-
- def get_log_schema(self, token, log_type, api_host, vsys_id):
- # 修改完成后删除注释:将下方的security_event替换成变量
- headers = {'Content-Type': 'application/json', 'Authorization': token}
- url = api_host + "/v1/log/schema/" + log_type + "?vsys_id=" + str(vsys_id)
- response = requests.get(url, headers=headers)
- assert response.status_code == 200
- log_schema = json.loads(response.text)
- log_schema = log_schema['data']['fields']
- log_schema = ",".join([field['name'] for field in log_schema])
- return log_schema
-
- # def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id):
- # headers = {'Content-Type': 'application/json', 'Authorization': token}
- # log_condition = {
- # "page_no": 1,
- # "page_size": 20,
- # "source": "security_event",
- # "fields": None,
- # "start_time": "",
- # "end_time": "",
- # "filter": "",
- # "vsys_id": 1
- # }
- # # 从数据源中获得对应变量的值
- # # log_filter = config['log_filter']
- # source = config['source']
- # print(source)
- # test_cases = config['TestCases']
- # for i in test_cases:
- # log_filter = i['log_filter']
- # # source = i['source']
- # print(log_filter)
- # log_condition_dict = json.loads(json.dumps(log_condition))
- # fields = self.get_log_schema(token, source, api_host, vsys_id)
- # # 用数据源中变量的值替换模板中变量的值
- # log_condition_dict['fields'] = fields
- # log_condition_dict['start_time'] = start_time
- # log_condition_dict['end_time'] = end_time
- # log_condition_dict['vsys_id'] = vsys_id
- # log_condition_dict['source'] = source
- # log_condition_dict['filter'] = log_filter
- #
- # url = api_host + "/v1/log/query"
- # # print(json.dumps(log_condition_dict))
- # response = requests.post(url, headers=headers, json=log_condition_dict)
- # print(response)
- # log_code = response.status_code
- # if log_code == 200:
- # log_result = True
- # else:
- # log_result = False
- # print(log_result)
- # return log_result
- def operate_sourcedata(self, token, config, start_time, end_time, api_host, vsys_id):
- # 从数据源中获得对应变量的值
- # log_filter = config['log_filter']
- int_fields_common_value = config['int_fields_common_value']
- string_fields_common_value = config['string_fields_common_value']
- source = config['source']
- # print(source)
- fields = config['fields']
- for field in fields:
- field_name = field['name']
- try:
- field_type = field['type']['type']
- except:
- field_type = field['type']
-
- if field_type == 'long' or field_type == 'int': # 数字类型字段的运算语句
- try:
- str_operators = field['doc']['constraints']['operator_functions']
- operators = [f'{item.strip()}' for item in str_operators.split(',')]
- except:
- operators = config['int_fields_common_operator']
- for operator in operators:
- if operator == "in" or operator == "not in":
- log_filter = ('({} {} ({}))').format(field_name, operator, int_fields_common_value)
- # print(log_filter)
- elif operator == "bitAnd":
- log_filter = ('({}({},{}))').format(operator, field_name, int_fields_common_value)
- # print(log_filter)
- else:
- log_filter = ('({}{}{})').format(field_name, operator, int_fields_common_value)
- # print(log_filter)
- self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
- log_filter)
- elif field_type == 'string': # 字符串类型字段的运算语句
- try:
- str_operators = field['doc']['constraints']['operator_functions']
- operators = [f'{item.strip()}' for item in str_operators.split(',')]
- except:
- operators = config['str_fields_common_operator']
- # print(operators)
- for operator in operators:
- if operator == "=" or operator == "!=":
- log_filter = ("({}{}'{}')").format(field_name, operator, string_fields_common_value)
- # print(log_filter)
- elif operator == "notEmpty" or operator == "empty":
- log_filter = ('({}({}))').format(operator, field_name)
- # print(log_filter)
- else:
- log_filter = ("({} {} ('{}'))").format(field_name, operator, string_fields_common_value)
- # print(log_filter)
- self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
- log_filter)
- elif field_type == 'array': # 列表类型字段的运算语句
- try:
- str_operators = field['doc']['constraints']['operator_functions']
- operators = [f'{item.strip()}' for item in str_operators.split(',')]
- except:
- operators = config['str_fields_common_operator']
- # print(operators)
- for operator in operators:
- if operator == "has":
- log_filter = ("({}({},{}))").format(operator, field_name, int_fields_common_value)
- # print(log_filter)
- else:
- log_filter = ('({}({}))').format(operator, field_name)
- # print(log_filter)
- self.get_log_by_condition(token, config, start_time, end_time, api_host, vsys_id, source,
- log_filter)
-
- def get_log_by_condition(self, token, config, start_time, end_time, api_host, vsys_id, source, log_filter):
- headers = {'Content-Type': 'application/json', 'Authorization': token}
- log_condition = {
- "page_no": 1,
- "page_size": 20,
- "source": "security_event",
- "fields": None,
- "start_time": "",
- "end_time": "",
- "filter": "",
- "vsys_id": 1
- }
-
- log_condition_dict = json.loads(json.dumps(log_condition))
- fields = self.get_log_schema(token, source, api_host, vsys_id)
- # 用数据源中变量的值替换模板中变量的值
- log_condition_dict['fields'] = fields
- log_condition_dict['start_time'] = start_time
- log_condition_dict['end_time'] = end_time
- log_condition_dict['vsys_id'] = vsys_id
- log_condition_dict['source'] = source
- log_condition_dict['filter'] = log_filter
-
- url = api_host + "/v1/log/query"
- # print(json.dumps(log_condition_dict))
- response = requests.post(url, headers=headers, json=log_condition_dict)
- # print(response)
- log_code = response.status_code
- if log_code == 200:
- log_result = True
- else:
- log_result = False
- # print(log_result)
- return log_result
-
- def is_debug_case_file(self, folder_path, debug_json):
- debug_json_abspath = os.path.join(folder_path, debug_json)
- if os.path.exists(debug_json_abspath):
- debug_flag = 1
- file_path = debug_json_abspath
- return debug_flag, file_path
- else:
- # raise Exception("debug单个json文件不存在:{}".format(debug_json_abspath))
- print("debug单个json文件不存在:{}".format(debug_json_abspath))
- return -1, ""
- # 遍历文件夹,查看有多少个用例
-
- def find_json_files(self, directory):
- for _, _, files in os.walk(directory):
- for file in files:
- if fnmatch.fnmatch(file, '*.json'):
- self.all_cases_count += 1
-
- def build_report(self):
- unexcuted_cases_count = self.all_cases_count - self.excuted_cases_count
- pie_table_data = [
- [self.all_cases_count, self.pass_cases_count, self.fail_cases_count, unexcuted_cases_count]
- ]
- pass_cases_ratio = self.pass_cases_count / self.all_cases_count * 100
- fail_cases_ratio = self.fail_cases_count / self.all_cases_count * 100
- unexcuted_cases_ratio = 100.00 - pass_cases_ratio - fail_cases_ratio
- pass_cases_ratio = format(pass_cases_ratio, '.2f')
- fail_cases_ratio = format(fail_cases_ratio, '.2f')
- unexcuted_cases_ratio = format(unexcuted_cases_ratio, '.2f')
- pie_data = [pass_cases_ratio, fail_cases_ratio, unexcuted_cases_ratio]
- # table_data是每个case的执行明细,pie_data是pass、fail、unexecuted百分比,pie_table_data是all、pass、fail、unexecuted用例数
- new_report = report.GenerateReport()
- new_report.generate_report(self.table_data, pie_data, pie_table_data, project_path)
-
-
-if __name__ == "__main__":
- # username = "hebingning"
- # password = "hbn66AAA"
- username = "baiguorui"
- password = "baiguorui1"
- path_dict = {}
- vsys_id = 6
- api_host = "http://192.168.44.72"
- # project_path = "D:/python_script/tsg_policy_api"
- project_path = "D:/Python Projects/Auto_api/tsg_policy_api"
- temp_folder_path = f"{project_path}/data/log_temp"
- path_dict["project_path"] = project_path
- folder_list = os.listdir(temp_folder_path)
- print("数据源包含的文件夹列表:", folder_list)
- verify_res = Verify()
- verify_res.find_json_files(temp_folder_path)
- for j in range(len(folder_list)):
- folder_path = temp_folder_path + '/' + folder_list[j]
- path_dict["folder_path"] = folder_path
- verify_res.start_verify(username, password, api_host, path_dict, vsys_id)
- # verify_res.start_verify(username, password, test_pc_ip, api_host, is_log, path_dict, env, vsys_id, debug_json="security_deny_create_50w_fqdn.json")
- verify_res.build_report()
diff --git a/verify.py b/verify.py
index 0acfa26..bb9398e 100644
--- a/verify.py
+++ b/verify.py
@@ -310,7 +310,7 @@ class Verify():
if is_log == 1:
if is_log_query == 1:
# 考虑到有5分钟后才出日志的情况,循环5次
- for _ in range(5):
+ for _ in range(7):
now = datetime.utcnow()
end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
# 等待产生日志
@@ -322,12 +322,12 @@ class Verify():
# 此时代表不需要进行logs模块的校验,可能为security shunt动作或statistics,sc策略等
log_result = True
if is_counters == 1:
- for _ in range(5):
+ for _ in range(9):
now = datetime.utcnow()
end_time = now.strftime('%Y-%m-%dT%H:%M:%SZ')
time.sleep(30)
metric_result = log.get_metric(self.token, self.rule_num, start_time, end_time, self.create_policies_ids, condition, api_host, vsys_id)
- if log_result == True:
+ if metric_result == True:
break
elif is_counters == 0:
metric_result = True
@@ -750,10 +750,10 @@ class Verify():
if __name__ == "__main__":
username = "hebingning"
password = "hbn66AAA"
- test_pc_ip = "192.168.64.76"
+ test_pc_ip = "192.168.64.73"
path_dict = {}
env = "tsgx"
- is_log = 0
+ is_log = 1
sleep_time = 30
vsys_id = 6
api_host = "http://192.168.44.72"