diff options
| author | yang liu <[email protected]> | 2024-11-27 19:46:05 +0800 |
|---|---|---|
| committer | yang liu <[email protected]> | 2024-11-27 19:46:05 +0800 |
| commit | d4c5a4cb9bd156e090b803cc85288bbf002e90e3 (patch) | |
| tree | 546b3d75759262ba4772fa869cb93b9630efb177 | |
| parent | 3a1440a42ee4438fb5d431959f22063f641d4049 (diff) | |
update sc case
| -rw-r--r-- | tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py | 339 |
1 files changed, 220 insertions, 119 deletions
diff --git a/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py b/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py index 3002e1476..fca8a92ee 100644 --- a/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py +++ b/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py @@ -23,127 +23,229 @@ def run(parameter): script_start_time = time.time() # 测试数据 - test_data = { - "is_multi_priority": False, - "rule_num": 1, - "policy_type": "service_chaining", - "rule_name": "sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none", - "rule_action": "service_chaining", - "targeted_traffic": "decrypted", - "rule_type": "create", - "condition": { - "source_ip": [ - { - "name": "service_chaining_source_ip", - "object_type": "ip", - "select_type": False, - "negate": False, - "item": [ - { - "item_operation": "add", - "item_type": "ipv4", - "item_value": parameter["test_pc_ip"] - } - ] - } - ], - "source_library":[], - "source_port": [], - "destination_port": [], - "internal_ip": [], - "internal_port": [], - "external_ip": [], - "external_library": [ - { - "category":"geoip_asn", - "catalogs":[ - { - "op":"add", - "ip_entries":"93.184.215.14/32" - } - ], - "tags":[ - { - "tag_key":"AutoTest", - "tag_value":"49284324", - "op":"add" - } - ], - "negate": False, - } - ], - "external_port": [], - "geography": [], - "sub_id": [], - "device": [], - "tunnel": [], - "tunnel_level": [], - "flag": [], - "application": [ - { - "name": "ssl", - "object_type": "application", - "negate": False - } - ], - "server_fqdn": [ - { - "name": "service_chaining_fqdn", - "object_type": "fqdn", - "select_type": False, - "negate": False, - "plus": False, - "items": [ - { - "item_operation": "add", - "item_value": "$www.example.com", - } - ] - } - ], - "protocol_filed": [], - "sub_action_override": False, - "sub_action": [], - "packet_capture": [], - }, - "multiProfile": True, - "profile": [ + policy_configuration = { + "type": "service_chaining", + "name": "sc_decrypted_scrip_fqdnblock_disabled_vxlan_activeip_bfd", + "action": "service_chaining", + "and_conditions": [ + { + "negate_option": False, + "or_conditions": [ + { + "attribute_name": "ATTR_SOURCE_IP", + "type": "ip", + "sub_type": "ip", + "statistics_option": "none", + "member_type": "item", + "name": "service_chaining_source_ip", + "items": [ + { + "op": "add", + "ip": parameter["test_pc_ip"], + "interval": "0-65535" + } + ] + } + ] + }, + { + "negate_option": False, + "or_conditions": [ + { + "attribute_name": "ATTR_EXTERNAL_IP", + "type":"library", + "catalog":[ + { + "category" : "geoip_asn", + "ip_entries":[ + { + "op" : "add", + "ip" : "93.184.215.14/32" + } + ], + "tag": [ + { + "category": "geoip_asn", + "parent_uuid": 0, + "tag_key": "AutoTest", + "tag_value": "49284324" + } + ] + } + ] + } + ] + }, { - "name": "mirror_block_vlan_none", - "profile_type": "create", - "type": "mirroring", - "load_balance_method": "hash_innermost_int-ip", - "load_balance_localization": "nearby", - "failure_action": "Block", - "service_functions": [ + "negate_option": False, + "or_conditions": [ { - "name": "vlan_none", - "profile_type": "create", - "device_group": "Device_Group:group-xxg-tsgx", - "connectivity": "Layer_2_Switch:random:random", - "health_check": "none", - "enable": "on" + "attribute_name": "ATTR_APP_ID", + "type": "application", + "items": ["ssl"] } ] + }, + { + "negate_option": False, + "or_conditions": [ + { + "attribute_name": "ATTR_SERVER_FQDN", + "type": "fqdn", + "member_type": "item", + "name": "service_chaining_fqdn", + "items": [ + { + "op": "add", + "expr_type": "and", + "expression": "^www.example.com$", + } + ] + } + ], } ], - "log_query_param": [{"query_field_key": "sc_rsp_decrypted_uuid_list", "query_value": ""}], - "traffic":{ - "protocol": "ssl", - "type": "curl", - "command": "curl --connect-timeout 10 --max-time 30 -kv https://www.example.com" - }, - "expected_return":"example", - "token": "" + "action_parameter": { + "targeted_traffic": "decrypted", + "sff_profiles": + [ + { + "vsys": 1, + "return_data": 1, + "name": "mirror_block_vlan_none", + "type": 2, + "load_balance_method": "hash-innermost-int-ip", + "load_balance_localization": "nearby", + "failure_action": "block", + "service_func_profiles":[ + { + "name":"vlan_none", + "admin_status": 1, + "device_group":{ + "value": "group-xxg-tsgx", + "tag": "device_group" + }, + "connectivity": { + "method": "layer2_switch", + "int_vlan_tag": "200", + "ext_vlan_tag": "100" + }, + "health_check": { + "method": "none", + "interval_ms": 200, + "retires": 5 + } + } + ] + } + ] + }, + "is_enabled": 1, + "log_option": "metadata", + } + + traffic_generation = { + "tool": "ssl", # or trex/http + "command": "curl --connect-timeout 10 --max-time 30 -kv https://www.example.com/" } - # 测试用例实例化 - create = CreatePolicy(test_data, parameter) - result = create.create_policy() + verification_result = { + "excepted_traffic_result": "example", + "expected_metric": {"hits": 1}, + "expected_log": [{"query_field_key": "sc_rsp_decrypted_uuid_list", "query_value": ""}] + } + + # 创建 + if parameter["initiation_method"] == "ui": + ui_client = UIClient() + rules_tuple, ui_error = ui_client.create_rules(policy_configuration) + if len(ui_error) > 0: + return ui_error + elif parameter["initiation_method"] == "api": + api_client = APIClient(parameter) + objects_tuple, api_error = api_client.create_objects(policy_configuration) + if len(api_error) > 0: + return api_error + tags_tuple, api_error = api_client.create_libraries(policy_configuration) + if len(api_error) > 0: + return api_error + profiles_tuple, api_error = api_client.create_profiles(policy_configuration) + if len(api_error) > 0: + return api_error + rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, tags_tuple, profiles_tuple) + if len(api_error) > 0: + return api_error + + # 等待下发配置生效 + time.sleep(3) - # 脚本结束时间和耗时 - end_time = time.time() - duration = end_time - start_time + # 类实例化 + generator = TrafficGenerator() + + # 获取当前时间 + utc_tz = pytz.timezone('UTC') + current_utc_time = datetime.now(utc_tz) + start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ') + + # 触发流量 + traffic_result = generator.run(policy_configuration, traffic_generation) + + # 验证流量生成器的返回值是否符合策略执行的预期 + excepted_traffic_result, error = generator.result(verification_result, traffic_result) + if excepted_traffic_result == False: + return error + + # 验证tsg的日志是否符合策略执行的预期 + if parameter["initiation_method"] == "ui": + log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result) + elif parameter["initiation_method"] == "api": + log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time, traffic_result) + if log_result == True: + test_summary["log"] = "Pass." + elif log_result == False: + test_summary["log"] = "The failure reason: the returned log does not match the expected result." + elif log_result == None: + test_summary["log"] = "The failure reason: the returned log is empty." + elif len(log_result) > 0: + test_summary["log"] = log_result + + # 验证tsg的metric是否符合策略执行的预期 + if parameter["initiation_method"] == "ui": + metric_result = ui_client.query_rule_metric(verification_result, traffic_result) + elif parameter["initiation_method"] == "api": + metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result) + if metric_result == True: + test_summary["metric"] = "Pass." + elif metric_result == False: + test_summary["metric"] = "The failure reason: the returned metric does not match the expected result." + elif metric_result == None: + test_summary["metric"] = "The failure reason: the returned metric is empty." + elif len(metric_result) > 0: + test_summary["metric"] = metric_result + + return test_summary + except Exception as e: + exception_result = str(e) + print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "When running test case, the exception error: ", str(e), flush=True) + return "When running test case, the exception error: " + str(e) + finally: + # 删除 + if parameter["initiation_method"] == "ui": + if not rules_tuple: + ui_client.delete_rules(parameter, policy_configuration) + elif parameter["initiation_method"] == "api": + if rules_tuple: + api_client.delete_rules(rules_tuple) + if objects_tuple: + api_client.delete_objects(objects_tuple) + if tags_tuple: + api_client.delete_libraries(tags_tuple) + if profiles_tuple: + api_client.delete_profiles(profiles_tuple) + + # 统计脚本用时 + script_end_time = time.time() + duration = script_end_time - script_start_time print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True) print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], @@ -157,17 +259,16 @@ if __name__ == '__main__': parameter = { "username": "lytest", "password": "123456ly", - "test_pc_ip": "192.168.50.88", + "test_pc_ip": "192.168.64.87", "test_subcriber_id": "test6776", "api_server": "http://192.168.44.72", "initiation_method": "api", "env": "tsgx", - "vsys_id": 1, - "is_log": 1, - "root_path": "C:/automation_project/tsg_test", - "path": "C:/automation_project/tsg_test/tests/ui", - "module_name": "service_chaining", - "test_case_name": "sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none" + "vsys": 5, + "root_path": workdir, + "path": workdir + "/tests", + "module_name": "monitor", + "test_case_name": os.path.basename(__file__)[:-3] } run(parameter) """ |
