summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryang liu <[email protected]>2024-11-27 19:46:05 +0800
committeryang liu <[email protected]>2024-11-27 19:46:05 +0800
commitd4c5a4cb9bd156e090b803cc85288bbf002e90e3 (patch)
tree546b3d75759262ba4772fa869cb93b9630efb177
parent3a1440a42ee4438fb5d431959f22063f641d4049 (diff)
update sc case
-rw-r--r--tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py339
1 files changed, 220 insertions, 119 deletions
diff --git a/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py b/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py
index 3002e1476..fca8a92ee 100644
--- a/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py
+++ b/tests/service_chaining/sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none.py
@@ -23,127 +23,229 @@ def run(parameter):
script_start_time = time.time()
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "service_chaining",
- "rule_name": "sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none",
- "rule_action": "service_chaining",
- "targeted_traffic": "decrypted",
- "rule_type": "create",
- "condition": {
- "source_ip": [
- {
- "name": "service_chaining_source_ip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter["test_pc_ip"]
- }
- ]
- }
- ],
- "source_library":[],
- "source_port": [],
- "destination_port": [],
- "internal_ip": [],
- "internal_port": [],
- "external_ip": [],
- "external_library": [
- {
- "category":"geoip_asn",
- "catalogs":[
- {
- "op":"add",
- "ip_entries":"93.184.215.14/32"
- }
- ],
- "tags":[
- {
- "tag_key":"AutoTest",
- "tag_value":"49284324",
- "op":"add"
- }
- ],
- "negate": False,
- }
- ],
- "external_port": [],
- "geography": [],
- "sub_id": [],
- "device": [],
- "tunnel": [],
- "tunnel_level": [],
- "flag": [],
- "application": [
- {
- "name": "ssl",
- "object_type": "application",
- "negate": False
- }
- ],
- "server_fqdn": [
- {
- "name": "service_chaining_fqdn",
- "object_type": "fqdn",
- "select_type": False,
- "negate": False,
- "plus": False,
- "items": [
- {
- "item_operation": "add",
- "item_value": "$www.example.com",
- }
- ]
- }
- ],
- "protocol_filed": [],
- "sub_action_override": False,
- "sub_action": [],
- "packet_capture": [],
- },
- "multiProfile": True,
- "profile": [
+ policy_configuration = {
+ "type": "service_chaining",
+ "name": "sc_decrypted_scrip_fqdnblock_disabled_vxlan_activeip_bfd",
+ "action": "service_chaining",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "statistics_option": "none",
+ "member_type": "item",
+ "name": "service_chaining_source_ip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_EXTERNAL_IP",
+ "type":"library",
+ "catalog":[
+ {
+ "category" : "geoip_asn",
+ "ip_entries":[
+ {
+ "op" : "add",
+ "ip" : "93.184.215.14/32"
+ }
+ ],
+ "tag": [
+ {
+ "category": "geoip_asn",
+ "parent_uuid": 0,
+ "tag_key": "AutoTest",
+ "tag_value": "49284324"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+ },
{
- "name": "mirror_block_vlan_none",
- "profile_type": "create",
- "type": "mirroring",
- "load_balance_method": "hash_innermost_int-ip",
- "load_balance_localization": "nearby",
- "failure_action": "Block",
- "service_functions": [
+ "negate_option": False,
+ "or_conditions": [
{
- "name": "vlan_none",
- "profile_type": "create",
- "device_group": "Device_Group:group-xxg-tsgx",
- "connectivity": "Layer_2_Switch:random:random",
- "health_check": "none",
- "enable": "on"
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["ssl"]
}
]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SERVER_FQDN",
+ "type": "fqdn",
+ "member_type": "item",
+ "name": "service_chaining_fqdn",
+ "items": [
+ {
+ "op": "add",
+ "expr_type": "and",
+ "expression": "^www.example.com$",
+ }
+ ]
+ }
+ ],
}
],
- "log_query_param": [{"query_field_key": "sc_rsp_decrypted_uuid_list", "query_value": ""}],
- "traffic":{
- "protocol": "ssl",
- "type": "curl",
- "command": "curl --connect-timeout 10 --max-time 30 -kv https://www.example.com"
- },
- "expected_return":"example",
- "token": ""
+ "action_parameter": {
+ "targeted_traffic": "decrypted",
+ "sff_profiles":
+ [
+ {
+ "vsys": 1,
+ "return_data": 1,
+ "name": "mirror_block_vlan_none",
+ "type": 2,
+ "load_balance_method": "hash-innermost-int-ip",
+ "load_balance_localization": "nearby",
+ "failure_action": "block",
+ "service_func_profiles":[
+ {
+ "name":"vlan_none",
+ "admin_status": 1,
+ "device_group":{
+ "value": "group-xxg-tsgx",
+ "tag": "device_group"
+ },
+ "connectivity": {
+ "method": "layer2_switch",
+ "int_vlan_tag": "200",
+ "ext_vlan_tag": "100"
+ },
+ "health_check": {
+ "method": "none",
+ "interval_ms": 200,
+ "retires": 5
+ }
+ }
+ ]
+ }
+ ]
+ },
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "tool": "ssl", # or trex/http
+ "command": "curl --connect-timeout 10 --max-time 30 -kv https://www.example.com/"
}
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
+ verification_result = {
+ "excepted_traffic_result": "example",
+ "expected_metric": {"hits": 1},
+ "expected_log": [{"query_field_key": "sc_rsp_decrypted_uuid_list", "query_value": ""}]
+ }
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ tags_tuple, api_error = api_client.create_libraries(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ profiles_tuple, api_error = api_client.create_profiles(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, tags_tuple, profiles_tuple)
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
- # 脚本结束时间和耗时
- end_time = time.time()
- duration = end_time - start_time
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time, traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
+ except Exception as e:
+ exception_result = str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
+ finally:
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if not rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+ if tags_tuple:
+ api_client.delete_libraries(tags_tuple)
+ if profiles_tuple:
+ api_client.delete_profiles(profiles_tuple)
+
+ # 统计脚本用时
+ script_end_time = time.time()
+ duration = script_end_time - script_start_time
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
"Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
@@ -157,17 +259,16 @@ if __name__ == '__main__':
parameter = {
"username": "lytest",
"password": "123456ly",
- "test_pc_ip": "192.168.50.88",
+ "test_pc_ip": "192.168.64.87",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
"initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "root_path": "C:/automation_project/tsg_test",
- "path": "C:/automation_project/tsg_test/tests/ui",
- "module_name": "service_chaining",
- "test_case_name": "sc_decrypted_srcip_ext_cidr_geoip_asn_library_fqdn_ssl_mirror_block_vlan_none"
+ "vsys": 5,
+ "root_path": workdir,
+ "path": workdir + "/tests",
+ "module_name": "monitor",
+ "test_case_name": os.path.basename(__file__)[:-3]
}
run(parameter)
"""