1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
# -*- coding: UTF-8 -*-
import time
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
from datetime import datetime
from support.report_update import ReportUpdate
from support.common_utils.create_policy import CreatePolicy
def run(parameter):
try:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
exception_result = ""
result = {}
# 脚本启动时间
script_start_time = time.time()
# 测试数据
test_data = {
"is_multi_priority": False,
"rule_num": 1,
"policy_type": "security",
"rule_name": "sec_shunt_intip_serverfqdn_ssl",
"rule_action": "shunt",
"rule_type": "create",
"condition": {
"source_ip": [],
"source_port": [],
"destination_ip": [],
"destination_port": [],
"internal_ip": [],
"internal_port": [],
"external_ip": [],
"external_port": [],
"source_geography": [],
"destination_geography": [],
"sub_id": [
{
"name": "sec_subid",
"object_type": "subscriberid",
"select_type": False,
"negate": False,
"items": [
{
"item_operation": "add",
"item_type": "subscriberid",
"item_value": '$' + parameter["test_subcriber_id"],
}
]
}
],
"device": [],
"tunnel": [],
"tunnel_level": [],
"flag": [],
"application": [
{
"name": "ssl", #
"object_type": "application",
"negate": False
}
],
"server_fqdn": [],
"protocol_filed": [],
"sub_action_override": True,
"sub_action": [],
"packet_capture": []
},
"profile": [],
"expected_return": "Example Domain",
"counters": {"hits": "many"},
"log_query_param": [],
"traffic": {
"protocol": "ssl",
"type": "wget",
"command": "wget --debug -q -O- https://www.example.com"
},
"token": ""
}
# 测试用例实例化
create = CreatePolicy(test_data, parameter)
result = create.create_policy()
return result
except Exception as e:
exception_result = str(e)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
return "Error: " + str(e)
finally:
# 清理环境并删除配置
if isinstance(create, CreatePolicy):
create.clean_up()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
if __name__ == '__main__':
# ui
# parameter = {
# "username": "hebingning",
# "password": "hbn66AAA",
# "test_pc_ip": "192.168.64.65",
# "test_subcriber_id": "test6776",
# "api_server": "http://192.168.44.72",
# "debug_flag": "local",
# "script_type": "ui",
# "env": "tsgx",
# "vsys_id": 1,
# "is_log": 1,
# "root_path": "D:/Document/Project-TSG/Code/git/tsg_test",
# "path": "D:/Document/Project-TSG/Code/git/tsg_test/tests/ui",
# "module_name": "security",
# "test_case_name": "deny_srcip_fqdn_drop_rst_icmp"
# }
# run(parameter)
# api
from support.ui_utils.element_position.map_element_position_library import replace_paras
from support.ui_utils.workpath import workdir
parameter = {
"username": "hebingning",
"password": "hbn66AAA",
"test_pc_ip": "192.168.64.93",
"test_subcriber_id": "test6491",
"api_server": "http://192.168.44.72",
"debug_flag": "local",
"script_type": "api", # api ui 空字符串
"is_log": 1,
"env": "tsgx",
"vsys_id": 1,
"root_path": workdir,
"path": workdir + "/tests/api",
"module_name": "security",
"test_case_name": os.path.basename(__file__)[:-3]
}
parameter = replace_paras(parameter)
run(parameter)
|