1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
# -*- coding: UTF-8 -*-
import time
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
from datetime import datetime
from support.report_update import ReportUpdate
from support.common_utils.create_policy import CreatePolicy
def run(parameter):
try:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
exception_result = ""
result = {}
# 脚本启动时间
script_start_time = time.time()
# 测试数据
test_data = {
"is_multi_priority": True,
"rule_num": 2,
"policy_type": "security",
"rule_action_1": "shunt",
"rule_action_2": "allow",
"do_log_1": 0,
"do_log_2": 2,
"obj_condition_1": [
{
"attribute_name": "ATTR_SOURCE_IP",
"object_type": "ip",
"object_sub_type": "ip",
"object_list": [
{
"add_item_list": [
{
"ip_address": parameter["test_pc_ip"],
"port_range": "0-65535"
}
]
}
]
}
],
"obj_condition_2": [
{
"attribute_name": "ATTR_SOURCE_IP",
"object_type": "ip",
"object_sub_type": "ip",
"object_list": [
{
"add_item_list": [
{
"ip_address": parameter["test_pc_ip"],
"port_range": "0-65535"
}
]
}
]
}
],
"application_1": [],
"application_2": [],
"expected_return": "百度一下",
"counters_1": {"hits": "many"},
"counters_2": {"hits": 0},
"log_query_param_1": [],
"log_query_param_2": [],
"traffic": {
"protocol": "ssl",
"type": "curl",
"command": "curl -kv --connect-timeout 25 -m 25 https://www.baidu.com"
},
"token": ""
}
# 测试用例实例化
create = CreatePolicy(test_data, parameter)
result = create.create_policy()
return result
except Exception as e:
exception_result = str(e)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
return "Error: " + str(e)
finally:
# 清理环境并删除配置
if isinstance(create, CreatePolicy):
create.clean_up()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
if __name__ == "__main__":
parameter = {
"script_type": "api",
"username": "hebingning",
"password": "hbn66AAA",
"test_pc_ip": "192.168.64.73",
"api_server": "http://192.168.44.72",
"is_log": 1,
"env": "tsgx",
"vsys_id": 6,
"root_path": "D:/python_script/tsg_test",
"path": "D:/python_script/tsg_test/tests/api",
"module_name": "security",
"test_case_name": "allow_negate_fqdn"
}
run(parameter)
|