summaryrefslogtreecommitdiff
path: root/tests/security/sec_shunt_ip_allow_ip_multi.py
blob: a60f302dd396b107e4d23503df8c6b64df478fb2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

# -*- coding: UTF-8 -*-
import time
import os
import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
from datetime import datetime
from support.report_update import ReportUpdate
from support.common_utils.create_policy import CreatePolicy

def run(parameter):
    try:
        print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
        # 参数初始化
        exception_result = ""
        result = {}
        
        # 脚本启动时间
        script_start_time = time.time()
        # 测试数据
        test_data = {
                "is_multi_priority": True,
                "rule_num": 2,
                "policy_type": "security",
                "rule_action_1": "shunt",
                "rule_action_2": "allow",
                "do_log_1": 0,
                "do_log_2": 2,
                "obj_condition_1": [
                {
                    "attribute_name": "ATTR_SOURCE_IP",
                    "object_type": "ip",
                    "object_sub_type": "ip",
                    "object_list": [
                    {
                        "add_item_list": [
                        {
                            "ip_address": parameter["test_pc_ip"],
                            "port_range": "0-65535"
                        }
                        ]
                    }
                    ]
                }
                ],
                "obj_condition_2": [
                {
                    "attribute_name": "ATTR_SOURCE_IP",
                    "object_type": "ip",
                    "object_sub_type": "ip",
                    "object_list": [
                    {
                        "add_item_list": [
                        {
                            "ip_address": parameter["test_pc_ip"],
                            "port_range": "0-65535"
                        }
                        ]
                    }
                    ]
                }
                ],
                "application_1": [],
                "application_2": [],
                "expected_return": "百度一下",
                "counters_1": {"hits": "many"},
                "counters_2": {"hits": 0},
                "log_query_param_1": [],
                "log_query_param_2": [],
                "traffic": {
                        "protocol": "ssl",
                        "type": "curl",
                        "command": "curl -kv --connect-timeout 25 -m 25 https://www.baidu.com"
                    },
                "token": ""
        }
        
        # 测试用例实例化
        create = CreatePolicy(test_data, parameter)
        result = create.create_policy()
        
        return result
    except Exception as e:
        exception_result = str(e)
        print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
        return "Error: " + str(e)
    finally:
        # 清理环境并删除配置
        if isinstance(create, CreatePolicy):
            create.clean_up()
        # 统计脚本用时
        script_end_time = time.time()
        duration = script_end_time - script_start_time
        print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
        print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
        # 生成csv报告
        update = ReportUpdate()
        update.write_result(parameter, result, exception_result)
    
    
if __name__ == "__main__":
    parameter = {
      "script_type": "api",
      "username": "hebingning",
      "password": "hbn66AAA",
      "test_pc_ip": "192.168.64.73",
      "api_server": "http://192.168.44.72",
      "is_log": 1,
      "env": "tsgx",
      "vsys_id": 6,
      "root_path": "D:/python_script/tsg_test",
      "path": "D:/python_script/tsg_test/tests/api",
      "module_name": "security",
      "test_case_name": "allow_negate_fqdn"
    }
    run(parameter)