summaryrefslogtreecommitdiff
path: root/support/api_utils/query_rule_log.py
blob: 7f8ede0b6dbeddc49225df54b56c525f1701f86a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
# -*- coding: UTF-8 -*-
import json
import time
import pytz
import requests
import os
import configparser
from support.organize_config import *
from support.packet_generator.workpath import workdir
from datetime import datetime
from support.ui_utils.element_position.policy_element_position import *

class QueryRuleLog:
    def __init__(self, parameter, policy_configuration, token, traffic_result):
        self.parameter = parameter
        self.policy_configuration = policy_configuration
        self.token = token
        self.traffic_result = traffic_result

    def query_rule_log(self, traffic_generation, verification_result, rule_uuids_tuple, start_time):
        try:
            self.rule_type = self.policy_configuration["type"]
            if self.rule_type == "security" or self.rule_type == "proxy_intercept" or self.rule_type == "proxy_manipulation" or self.rule_type == "monitor" or self.rule_type == "statistics" or self.rule_type == "service_chaining" or self.rule_type == "dos_protection":
                print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Start to verify the effect of the policy rule by api.", flush=True)
                log_result = None
                for _ in range(int(90)):
                    time.sleep(10)
                    utc_tz = pytz.timezone('UTC')
                    current_utc_time = datetime.now(utc_tz)
                    end_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
                    if log_result == None:
                        log_dict, error = self.get_log(traffic_generation, rule_uuids_tuple, start_time, end_time)
                        #print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "DEBGU:start_time:{},end_time:{};log_dict:{}, error:{}".format(start_time, end_time, log_dict, error), flush=True)
                        if len(error) != 0:
                            return error
                        if self.rule_type == "dos_protection":
                            log_result = self.verify_dos_log(log_dict, verification_result, start_time, end_time)
                        else:
                            log_result = self.verify_log(log_dict, verification_result)
                    if log_result != None:
                        break
                if log_result == True:
                    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], 'The log result checked by calling api is passed.', flush=True)
                elif log_result == False:
                    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], 'The log result checked by calling api is failed.', flush=True)
                elif log_result == None:
                    if not verification_result["expected_log"]:
                        print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], 'The log result checked by calling api is passed.', flush=True)
                    else:
                        print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], 'The log result checked by calling api is none.', flush=True)
                elif log_result == "no_set":
                    print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], 'The log result checked by calling api is no_set.', flush=True)
                return log_result
        except Exception as e:
            print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "When querying rule log, the exception error: ", str(e), flush=True)
            return "When querying rule log, the exception error: " + str(e)

    def get_log(self, traffic_generation, rule_uuids_tuple, start_time, end_time):
        try:
            self.traffic_generation = traffic_generation
            rule_uuids_list = list(rule_uuids_tuple)
            rule_uuid = rule_uuids_list[0]["uuid"]
            headers = {'Content-Type': 'application/json', 'Authorization': self.token}
            dsl_dict = {"dsl": "", "vsys": 1}
            log_condition = {
                "name": "log-query",
                "limit": 20,
                "data_source": "",
                "fields": "",
                "intervals": []
            }
            log_condition_dict = json.loads(json.dumps(log_condition))
            if "rule_number" in self.policy_configuration:
                rule_number = self.policy_configuration["rule_number"]
            else:
                rule_number = 1
            for _ in range(rule_number):
                if self.rule_type == 'security':
                    schema_type = log_source = 'security_event'
                    log_query_rule_type = 'security_rule_uuid_list'
                elif self.rule_type == 'proxy_intercept':
                    schema_type = log_source = 'session_record'
                    log_query_rule_type = 'proxy_rule_uuid_list'
                elif self.rule_type == 'proxy_manipulation':
                    schema_type = log_source = 'proxy_event'
                    log_query_rule_type = 'proxy_rule_uuid_list'
                elif self.rule_type == 'monitor':
                    schema_type = log_source = 'monitor_event'
                    log_query_rule_type = 'monitor_rule_uuid_list'
                elif self.rule_type == 'statistics':
                    schema_type = log_source = 'session_record'
                    log_query_rule_type = 'statistics_rule_uuid_list'
                elif self.rule_type == "dos_protection":
                    schema_type = log_source = "dos_event"
                elif self.rule_type == "service_chaining":
                    schema_type = log_source = 'session_record'
                    log_query_rule_type = 'sc_rule_uuid_list'
                fields = self.get_log_schema(self.token, schema_type, self.parameter["api_server"], self.parameter["vsys"])
                log_condition_dict['fields'] = fields
                log_condition_dict['data_source'] = log_source
                log_condition_dict["intervals"].append(start_time + '/' + end_time)
                log_condition_dict['vsys'] = self.parameter["vsys"]
                if self.is_attribute_name_exsit("ATTR_SUBSCRIBER_ID") == True:
                    log_filter = f"vsys_id in ({int(self.parameter['vsys'])}) AND subscriber_id='{self.parameter['test_subcriber_id']}' AND has({log_query_rule_type}, '{rule_uuid}')"
                    log_condition_dict['filter'] = log_filter
                else:
                    if self.traffic_generation["tool"] == "trex":
                        if self.rule_type == "dos_protection":
                            log_condition_dict['filter'] = f"destination_ip='{traffic_generation['servers_start_ip']}'"
                        else:
                            log_filter = f"vsys_id in ({int(self.parameter['vsys'])}) AND client_ip={traffic_generation['clients_start_ip']} AND has({log_query_rule_type}, '{rule_uuid}')"
                            log_condition_dict['filter'] = log_filter
                            log_condition_dict['filter'] = log_condition_dict['filter'].replace(f"client_ip={traffic_generation['clients_start_ip']}",f"client_ip='{traffic_generation['clients_start_ip']}'")
                    else:
                        # log_filter = f"client_ip={self.parameter['test_pc_ip']} AND has({log_query_rule_type}, '{rule_uuid}')"
                        log_filter = f"vsys_id in ({int(self.parameter['vsys'])}) AND client_ip={self.parameter['test_pc_ip']} AND has({log_query_rule_type}, '{rule_uuid}')"
                        log_condition_dict['filter'] = log_filter
                        log_condition_dict['filter'] = log_condition_dict['filter'].replace(f"client_ip={self.parameter['test_pc_ip']}", f"client_ip='{self.parameter['test_pc_ip']}'")
                dsl_dict["dsl"] = log_condition_dict
                dsl_dict['vsys'] = int(self.parameter["vsys"])
                url = self.parameter["api_server"] + "/v1/logs/query"
                response = requests.post(url, headers=headers, json=dsl_dict, verify=False)
                job_dict = json.loads(response.text)
                job_id = job_dict['data']['job']['job_id']
                """
                {"execution_mode":"oneshot","query_jobs":[{"id":"e9df1ca8-e270-427d-9baf-3c0e8fb13813","is_saved_query":0,"with_result":false}],"vsys":5}
                {
                    "code": 200,
                    "msg": "Success",
                    "data": [{
                        "reason": null,
                        "start_time": "2024-11-27T02:27:21Z",
                        "is_done": true,
                        "job_id": "e9df1ca8-e270-427d-9baf-3c0e8fb13813",
                        "is_canceled": false,
                        "done_progress": 1,
                        "end_time": "2024-11-27T02:27:21Z",
                        "links": {
                            "status": "/v1/query/job/e9df1ca8-e270-427d-9baf-3c0e8fb13813",
                            "count": "/v1/query/job/e9df1ca8-e270-427d-9baf-3c0e8fb13813/count",
                            "list": "/v1/query/job/e9df1ca8-e270-427d-9baf-3c0e8fb13813/list",
                            "timeline": "/v1/query/job/e9df1ca8-e270-427d-9baf-3c0e8fb13813/timeline"
                        },
                        "is_failed": false
                    }],
                    "success": true
                }
                query_job_dict = {
                    "query_jobs": [{"id": job_id, "is_saved_query":0, "with_result":False}],
                    "vsys": int(self.parameter["vsys"]), "limit": 20, "offset": 0
                }
                response = requests.post(url, headers=headers, json=query_job_dict, verify=False)
                if response.status_code == 200:
                    query_job_dict = json.loads(response.text)
                    is_done = query_job_dict['data'][0]['is_done']
                    is_failed = query_job_dict['data'][0]['is_failed']
                    reason = query_job_dict['data'][0]["reason"]
                if is_done and not is_failed:
                    query_list_dict = {
                        "query_jobs":[{"id": job_id,"query_option":"list"}],
                        "vsys":int(self.parameter["vsys"]),"limit":20,"offset":0
                    }
                    response = requests.post(url, headers=headers, json=query_list_dict, verify=False)
                    assert response.status_code == 200
                    log_list = json.loads(response.text)
                    log_list = log_list['data']['list']
                    return log_list, ""
                elif is_done and is_failed:
                    return "", reason
                else:
                    return [], ""                
                """
                query_list_dict = {
                    "query_jobs":[{"id": job_id,"query_option":"list"}],
                    "vsys":int(self.parameter["vsys"]),"limit":20,"offset":0
                }
                time.sleep(2)
                response = requests.post(url, headers=headers, json=query_list_dict, verify=False)
                assert response.status_code == 200
                log_list = json.loads(response.text)
                log_list = log_list['data']['list']
                return log_list, ""
        except Exception as e:
            print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "When getting log, the exception error: ", e, flush=True)
            return "", "When getting log, the exception error: " + str(e)

    def is_attribute_name_exsit(self, attribute_name):
        if "and_conditions" in self.policy_configuration:
            and_conditions = self.policy_configuration["and_conditions"]
            for i in range(len(and_conditions)):
                or_conditions = and_conditions[i]["or_conditions"]
                for j in range(len(or_conditions)):
                    if or_conditions[j]["attribute_name"] == attribute_name:
                        return True
        return False

    def get_log_schema(self, token, schema_type, api_host, vsys):
        headers = {'Content-Type': 'application/json', 'Authorization': token}
        url = api_host + "/v1/logs/schema/" + schema_type
        vsys = {"vsys": vsys}
        response = requests.get(url, headers=headers, params=vsys, verify=False)
        assert response.status_code == 200
        log_schema = json.loads(response.text)
        log_schema = log_schema['data']['fields']
        log_schema = [field['name'] for field in log_schema]
        return log_schema

    def verify_log(self, log_dict, verification_result):
        log_query = verification_result['expected_log']
        temp_log_result_list = []
        if len(log_dict) > 0 and len(log_query) > 0:
            for log in log_dict:
                for log_query_param in log_query:
                    query_field_key = log_query_param["query_field_key"]
                    query_value = log_query_param["query_value"]
                    exclude_fields = [
                        "packet_capture_file", "http_response_body", "http_request_body", "monitor_mirrored_pkts",
                        "monitor_mirrored_bytes", "client_port", "ssl_ech_flag", "ssl_esni_flag",
                        "sessions", "session_rate", "packets", "packet_rate", "bytes", "bit_rate", "rule_uuid", "conditions"
                    ]
                    dos_verification_fields = [
                        "sessions", "session_rate", "packets", "packet_rate", "bytes", "bit_rate", "rule_uuid", "source_ip", "destination_ip"
                    ]
                    if query_field_key in log and query_field_key not in (exclude_fields, dos_verification_fields) and log[query_field_key] == query_value:
                        temp_log_result_list.append(True)
                    elif query_field_key in {"packet_capture_file", "http_response_body", "http_request_body"}:
                        # 为true时,表示支持下载;为false时,表示不支持下载
                        if query_value == True:
                            if len(log[query_field_key]) > 0:
                                packet_capture_url = log[query_field_key]
                                download_parameter = {"url": packet_capture_url, "exportObj": True}
                                url = self.parameter["api_server"] + "/v1/util/download-file" + "?vsys_id=" + str(self.parameter["vsys"])
                                headers = {"Content-Type": "application/json", "Authorization": self.token}
                                response = requests.get(url, headers=headers, params=download_parameter, verify=False)
                                if response.status_code == 200:
                                    temp_log_result_list.append(True)
                                else:
                                    temp_log_result_list.append(False)
                            else:
                                temp_log_result_list.append(False)
                        elif query_value == False:
                            if len(log[query_field_key]) > 0 :
                                temp_log_result_list.append(False)
                            else:
                                temp_log_result_list.append(True)
                    elif query_field_key in {"monitor_mirrored_pkts", "monitor_mirrored_bytes"}:
                        actual_bytes_value = self.traffic_result['total_bytes']
                        actual_pkts_value = self.traffic_result["total_packets"]
                        mirror_bytes_value = 0
                        mirror_pkts_value = 0
                        application = self.get_application_from_configration()
                        enable = self.policy_configuration["action_parameter"]["traffic_mirroring"]["enable"]
                        if enable == 1:
                            attribute_name_list = self.get_attribute_name_from_configration()
                            # 根据不同的协议,对mirror相关参数进行赋值
                            if self.is_exist_in_list(application, ["ftp", "http", "https", "ssl"]):
                                if len(attribute_name_list) > 0:
                                    if self.is_exist_in_list(attribute_name_list, ["ATTR_SSL_CN", "ATTR_SSL_SAN"]):
                                        mirror_bytes_value = actual_bytes_value - 4619
                                        mirror_pkts_value = actual_pkts_value - 9
                                    elif self.is_exist_in_list(attribute_name_list, ["ATTR_HTTP_RES_HDR"]):
                                        mirror_bytes_value = actual_bytes_value - 381
                                        mirror_pkts_value = actual_pkts_value - 5
                                    elif self.is_exist_in_list(attribute_name_list, ["ATTR_FTP_ACCOUNT"]):
                                        mirror_bytes_value = actual_bytes_value - 266
                                        mirror_pkts_value = actual_pkts_value - 4
                                    else:
                                        mirror_bytes_value = actual_bytes_value - 192
                                        mirror_pkts_value = actual_pkts_value - 3
                                else:
                                    mirror_bytes_value = actual_bytes_value - 192
                                    mirror_pkts_value = actual_pkts_value - 3
                                # if len(self.policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]) > 0:
                                if "mirroring_profile" in self.policy_configuration["action_parameter"]["traffic_mirroring"]:
                                    vlan_count = len(self.policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]["vlan_array"])
                                    num_vlans = vlan_count
                                    mirror_bytes_value *= num_vlans
                                    mirror_pkts_value *= num_vlans
                            elif self.is_exist_in_list(application, ["mail"]):
                                if len(attribute_name_list) > 0:
                                    if self.is_exist_in_list(attribute_name_list, ["ATTR_MAIL_FROM", "ATTR_MAIL_TO"]):
                                        mirror_bytes_value = actual_bytes_value - 19578
                                        mirror_pkts_value = actual_pkts_value - 158
                                    elif self.is_exist_in_list(attribute_name_list, ["ATTR_MAIL_ATT_CONTENT", "ATTR_MAIL_ATT_NAME"]):
                                        mirror_bytes_value = actual_bytes_value - 34554
                                        mirror_pkts_value = actual_pkts_value - 170
                                    elif self.is_exist_in_list(attribute_name_list, ["ATTR_MAIL_SUBJECT"]):
                                        mirror_bytes_value = actual_bytes_value - 19578
                                        mirror_pkts_value = actual_pkts_value - 158
                                    elif self.is_exist_in_list(attribute_name_list, ["ATTR_MAIL_CONTENT"]):
                                        mirror_bytes_value = actual_bytes_value - 21092
                                        mirror_pkts_value = actual_pkts_value - 159
                                    else:
                                        mirror_bytes_value = actual_bytes_value - 967
                                        mirror_pkts_value = actual_pkts_value - 9
                                else:
                                    mirror_bytes_value = actual_bytes_value - 967
                                    mirror_pkts_value = actual_pkts_value - 9
                                # if len(self.policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]) > 0:
                                if "mirroring_profile" in self.policy_configuration["action_parameter"]["traffic_mirroring"]:
                                    vlan_count = len(self.policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]["vlan_array"])
                                    num_vlans = vlan_count
                                    mirror_bytes_value *= num_vlans
                                    mirror_pkts_value *= num_vlans
                            elif self.is_exist_in_list(application, ["", "quic", "dns"]):
                                mirror_bytes_value = actual_bytes_value
                                mirror_pkts_value = actual_pkts_value
                                if "mirroring_profile" in self.policy_configuration["action_parameter"]["traffic_mirroring"]:
                                # if len(self.policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]) > 0:
                                    vlan_count = len(self.policy_configuration["action_parameter"]["traffic_mirroring"]["mirroring_profile"]["vlan_array"])
                                    num_vlans = vlan_count
                                    mirror_bytes_value *= num_vlans
                                    mirror_pkts_value *= num_vlans
                            # 讲上述mirror的值赋值给对应的verification_result的expected_log
                            if self.is_exist_in_list(application, ["ftp", "http", "ssl", "dns", "quic", "", "mail"]):
                                if query_field_key == "monitor_mirrored_pkts" and log[query_field_key] == mirror_pkts_value:
                                    temp_log_result_list.append(True)
                                    for log_query_param in log_query:
                                        if log_query_param["query_field_key"] == "monitor_mirrored_pkts":
                                            log_query_param["query_value"] = mirror_pkts_value
                                elif query_field_key == "monitor_mirrored_bytes" and log[query_field_key] == mirror_bytes_value:
                                    temp_log_result_list.append(True)
                                    for log_query_param in log_query:
                                        if log_query_param["query_field_key"] == "monitor_mirrored_bytes":
                                            log_query_param["query_value"] = mirror_bytes_value
                                else:
                                    temp_log_result_list.append(False)
                    elif query_field_key in {"client_port"}:
                        if "_" in query_value:
                            start, end = map(int, query_value.split('-'))
                            if start <= log[query_field_key] <= end:
                                temp_log_result_list.append(True)
                            else:
                                temp_log_result_list.append(False)
                        else:
                            if log[query_field_key] == query_value:
                                temp_log_result_list.append(True)
                            else:
                                temp_log_result_list.append(False)
                    elif query_field_key in {"ssl_ech_flag", "ssl_esni_flag"}:
                        if query_value == "True":
                            query_value = 1
                        elif query_value == "False":
                            query_value = 0
                        if log[query_field_key] == query_value:
                            temp_log_result_list.append(True)
                        else:
                            temp_log_result_list.append(False)
                    elif query_field_key in {"proxy_pinning_status", "proxy_intercept_status", "proxy_passthrough_reason"}:
                        actual_value = log[query_field_key]   # 日志查询到的值
                        expected_value = query_value     # 断言的预期值
                        if query_field_key == "proxy_passthrough_reason":
                            if expected_value in {"EV Certificate", "Certificate Transparency", "Protocol Errors", "Mutual Authentication", "Certificate Not Installed", "Certificate Pinning"} and actual_value in {""}:
                                # 证书动态放行会产生两条日志,其中一条大概率是拦截的日志,则跳过断言
                                continue
                            else:
                                if actual_value == expected_value:
                                    temp_log_result_list.append(True)
                                else:
                                    temp_log_result_list.append(False)
                        elif query_field_key == "proxy_intercept_status":  # proxy_intercept_status的值需要转换
                            if expected_value == "passthrough":      # proxy_intercept_status, 0=passthrough, 1=intercept
                                expected_value = 0
                            else:
                                expected_value = 1
                            if expected_value == 0 and actual_value == 1:
                                continue
                            else:
                                if actual_value == expected_value:
                                    temp_log_result_list.append(True)
                                else:
                                    temp_log_result_list.append(False)
                    elif query_field_key in {"sc_rsp_raw_uuid_list", "sc_rsp_decrypted_uuid_list"}:
                        query_value = self.policy_configuration["action_parameter"]["sff_profiles"][0]["service_func_profiles"]
                        conf_path = os.path.join(workdir, "configuration_file.ini")
                        conf = configparser.ConfigParser()
                        conf.read(conf_path, encoding="utf-8")
                        active_dst_ip_list = conf.get("sc_active_dst_ip", "ip_list")
                        effective_device_tag_list = ["group-xxg-tsgx", "center-xxg-tsgx"]
                        if log[query_field_key] == [] and self.policy_configuration["action_parameter"]["sf_configuration"][0]["connectivity"]["method"] == "vxlan_g" and self.policy_configuration["action_parameter"]["sf_configuration"][0]["connectivity"]["dest_ip"] not in active_dst_ip_list:
                            temp_log_result_list.append(True)
                        elif log[query_field_key] == [] and self.policy_configuration["action_parameter"]["sf_configuration"][0]["device_group"]["value"] not in effective_device_tag_list:
                            temp_log_result_list.append(True)
                        elif log[query_field_key] == [] and self.policy_configuration["action_parameter"]["sf_configuration"][0]["admin_status"] == 0:
                            temp_log_result_list.append(True)
                        elif query_value == log[query_field_key]:
                            temp_log_result_list.append(True)
                        else:
                            temp_log_result_list.append(False)
                    elif query_field_key in {"sent_pkts", "received_pkts", "sent_bytes", "received_bytes"}:
                        if query_field_key == "sent_pkts":
                            query_value = self.traffic_result["total_packets_sent"]
                        elif query_field_key == "received_pkts":
                            query_value = self.traffic_result["total_packets_received"]
                        elif query_field_key == "sent_bytes":
                            query_value = self.traffic_result["total_bytes_sent"]
                        elif query_field_key == "received_bytes":
                            query_value = self.traffic_result["total_bytes_received"]
                        if query_value == log[query_field_key]:
                            temp_log_result_list.append(True)
                        else:
                            temp_log_result_list.append(False)

                    else:
                        temp_log_result_list.append(False)
                if self.rule_type == 'proxy_intercept' and len(temp_log_result_list) < len(log_query):
                    if True in temp_log_result_list:
                        log_result = True
                    else:
                        log_result = False
                elif False not in temp_log_result_list:
                    log_result = True
                else:
                    log_result = False
        elif len(log_dict) == 0 and len(log_query) > 0:
            log_result = None
        elif len(log_query) == 0:
            log_result = True
        return log_result

    def get_application_from_configration(self):
        and_conditions = self.policy_configuration["and_conditions"]
        # action_parameter = self.policy_configuration["action_parameter"]
        for i in range(len(and_conditions)):
            or_conditions = and_conditions[i]["or_conditions"]
            for j in range(len(or_conditions)):
                if or_conditions[j]["attribute_name"] == "ATTR_APP_ID":
                    application = or_conditions[j]["items"]
                    return application

    def is_exist_in_list(self, application, actual_application_list):
        application_set = set(actual_application_list)
        for element in application:
            if element not in application_set:
                return False
        return True

    def get_attribute_name_from_configration(self):
        attribute_name_list = []
        # 只收集Protocol Filed相关的Attribute Name,所以需要根据实际情况补充exclude_list??
        exclude_list = ["ATTR_SOURCE_IP", "ATTR_DESTINATION_IP", "ATTR_APP_ID"]
        and_conditions = self.policy_configuration["and_conditions"]
        for i in range(len(and_conditions)):
            or_conditions = and_conditions[i]["or_conditions"]
            for j in range(len(or_conditions)):
                if or_conditions[j]["attribute_name"] not in exclude_list:
                    attribute_name_list.append(or_conditions[j]["attribute_name"])
        return attribute_name_list
                    
    def verify_dos_log(self, log_dict, verification_result, start_time, end_time):
        self.verification_result = verification_result
        log_query = verification_result['expected_log']
        temp_log_result_list = []
        if len(log_dict) > 0 and len(log_query) > 0:
            count = -1
            for log in log_dict:
                count += 1
                for log_query_param in log_query:
                    query_field_key = log_query_param["query_field_key"]
                    # query_value = log_query_param["query_value"]
                    dos_verification_fields = [
                        "sessions", "session_rate", "packets", "packet_rate", "bytes", "bit_rate", "rule_uuid", "source_ip", "destination_ip"
                    ]
                    if query_field_key in dos_verification_fields:
                        temp_log_result_list = self.verify_value_from_dos_event(log, query_field_key, count, start_time, end_time)
                if len(temp_log_result_list) < len(log_query):
                    if True in temp_log_result_list:
                        log_result = True
                    else:
                        log_result = False
                elif False not in temp_log_result_list:
                    log_result = True
                else:
                    log_result = False
        elif len(log_dict) == 0 and len(log_query) > 0:
            log_result = None
        elif len(log_query) == 0:
            log_result = True
        return log_result

    def verify_value_from_dos_event(self, log, query_field_key, count, start_time, end_time):
        session_number, total_packets, total_bytes = self.get_value_from_session_record(start_time, end_time)
        if self.policy_configuration["action_parameter"]["mitigation"]["behavior"] == "deny":
            expected_session_number = session_number
        elif self.policy_configuration["action_parameter"]["mitigation"]["behavior"] == "none" and log["sessions"] < session_number:
            expected_session_number = log["sessions"]
        else:
            # log_result_list.append(False)
            # expected_session_number = -1
            return [False]
        log_result_list = []
        time_diff = log["end_time"] - log["start_time"]
        expected_packets_count = expected_session_number * total_packets
        expected_bytes_count = expected_session_number * total_bytes
        if query_field_key == "sessions":
            if expected_session_number == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]  # count,第几次循环,则给log_query_param中的第几次-1的数赋值
            else:
                log_result_list.append(False)
        elif query_field_key == "packets":
            if expected_packets_count == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]
            else:
                log_result_list.append(False)
        elif query_field_key == "bytes": #大于0且是个整数
            if log[query_field_key] > 0 and expected_bytes_count == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]
            else:
                log_result_list.append(False)
        elif query_field_key == "bit_rate":
            bits_rate_value = (log["bytes"] * 8) / time_diff
            if bits_rate_value == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]
            else:
                log_result_list.append(False)
        elif query_field_key == "session_rate":
            session_rate = expected_session_number / time_diff
            if session_rate == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]
            else:
                log_result_list.append(False)
        elif query_field_key == "packet_rate":
            packet_rate = expected_packets_count / time_diff
            if packet_rate == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]
            else:
                log_result_list.append(False)
        elif query_field_key in ["basic_attack_type", "basic_sessions","basic_session_rate", "basic_packets", "basic_packet_rate","basic_bytes", "basic_bit_rate"]:
            log_result_list.append(True)
        elif query_field_key == "rule_uuid":
            if len(log["rule_uuid"]) > 0:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log["rule_uuid"]
            else:
                log_result_list.append(False)
        elif query_field_key == "source_ip" :
            if self.traffic_generation["clients_start_ip"] == log ["source_ip"]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log["source_ip"]
            else:
                log_result_list.append(False)
        elif query_field_key == "destination_ip":
            if self.traffic_generation["servers_start_ip"] == log[query_field_key]:
                log_result_list.append(True)
                self.verification_result["expected_log"][count]["query_value"] = log[query_field_key]
            else:
                log_result_list.append(False)
        else:
            log_result_list.append(False)
        return log_result_list

    def get_value_from_session_record(self, start_time, end_time):
        headers = {'Content-Type': 'application/json', 'Authorization': self.token}
        log_condition = {
            "page_no": 1,
            "page_size": 20,
            "source": "session_record",
            "columns": None,
            "start_time": "",
            "end_time": "",
            "filter": "",
            "vsys": 1
        }
        schema_type = "session_record"
        log_condition_dict = json.loads(json.dumps(log_condition))
        fields = self.get_log_schema(self.token, schema_type, self.parameter["api_server"], self.parameter["vsys"])
        log_condition_dict['columns'] = fields
        log_condition_dict['start_time'] = start_time
        log_condition_dict['end_time'] = end_time
        log_condition_dict['vsys'] = self.parameter["vsys"]
        log_condition_dict['source'] = "session_record"
        log_condition_dict['identifier_name'] = "session-record-list"
        log_condition_dict['execution_mode'] = 'oneshot'
        if self.policy_configuration["action_parameter"]["mitigation"]['behavior']== "none":
            log_condition_dict['filter'] = f"server_ip='{self.traffic_generation['servers_start_ip']}'"
        elif self.policy_configuration["action_parameter"]["mitigation"]['behavior'] == "deny":
            json_str = json.dumps(self.policy_configuration)
            if self.is_attribute_name_exsit("ATTR_APP_ID") and "dns" in json_str:
                log_condition_dict['filter'] = f"server_ip='{self.traffic_generation['servers_start_ip']}'AND received_pkts in (0)"
            else:
                log_condition_dict['filter'] = f"server_ip='{self.traffic_generation['servers_start_ip']}'AND received_pkts in (0,1)"
        url = self.parameter["api_server"] + "/v1/logs/query".format(self.parameter["vsys"])
        response = requests.post(url, headers=headers, json=log_condition_dict, verify=False)
        assert response.status_code == 200
        log_list = json.loads(response.text)
        log_list = log_list['data']['list']
        log_entry= log_list[0]
        received_pkts = log_entry['received_pkts']
        sent_pkts = log_entry['sent_pkts']
        total_packets = received_pkts + sent_pkts
        received_bytes = log_entry['received_bytes']
        sent_bytes = log_entry['sent_bytes']
        total_bytes = received_bytes + sent_bytes
        session_number = len(log_list)
        return session_number, total_packets, total_bytes