summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorguoqiuya <[email protected]>2024-11-26 19:17:07 +0800
committerguoqiuya <[email protected]>2024-11-26 19:17:07 +0800
commit8d9c4c503dc8879cfe1fc1c1cc68d6c8861599e0 (patch)
tree4c0f24e3efa2f38a0f79a469a8c953d32ef41202
parent87c675daea56bc3da328ba1adc0d7e3731f7abea (diff)
parent2b38630962a53b763f06ecb8c6cea479eb6d30f4 (diff)
Merge branch 'develop' of https://git.mesalab.cn/chongming/tsg_test into develop
-rw-r--r--support/ui_utils/delete_objects_example.py10
-rw-r--r--support/ui_utils/element_position/map_element_position_library.py34
-rw-r--r--support/ui_utils/object/create_objects_example.py2
-rw-r--r--support/ui_utils/object/delete_objects_example.py20
-rw-r--r--support/ui_utils/object/edit_objects_example.py72
-rw-r--r--support/ui_utils/object/search_objects_example.py116
-rw-r--r--support/ui_utils/ui_client.py18
-rw-r--r--tests/dos_protection/dos_allow_srcip.py206
-rw-r--r--tests/dos_protection/dos_deny_srcip.py205
-rw-r--r--tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_10s.py247
-rw-r--r--tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1h.py248
-rw-r--r--tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1min.py245
-rw-r--r--tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_2min.py247
-rw-r--r--tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_5min.py247
-rw-r--r--tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10min.py264
-rw-r--r--tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10s.py264
-rw-r--r--tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1h.py264
-rw-r--r--tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1min.py264
-rw-r--r--tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_2min.py264
-rw-r--r--tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_5min.py264
-rw-r--r--tests/object/test_temp/create_account_object_temp.py68
-rw-r--r--tests/object/test_temp/create_fqdn_object_temp.py94
-rw-r--r--tests/object/test_temp/create_ip_learning_objects_temp.py83
-rw-r--r--tests/object/test_temp/create_ip_object_temp.py71
-rw-r--r--tests/object/test_temp/create_url_object_temp.py92
25 files changed, 2531 insertions, 1378 deletions
diff --git a/support/ui_utils/delete_objects_example.py b/support/ui_utils/delete_objects_example.py
index 059f9f8c6..41cf40990 100644
--- a/support/ui_utils/delete_objects_example.py
+++ b/support/ui_utils/delete_objects_example.py
@@ -31,13 +31,3 @@ class DeleteObjectsExample:
result = result + " In addition, fail to delete {} object.".format(obj["type"])
else:
result = result + " In addition, fail to search {} object.".format(obj["type"])
- # 环境清理
- support.ui_utils.env.teardown()
- # 强制删除
- log_in = LogIn()
- pwd = log_in.encryptPwd(parameter["password"], parameter["api_server"])
- token = log_in.login(parameter["username"], pwd, parameter["api_server"])
- headers = {"Content-Type": "application/json", "Authorization": token}
- objects = DeleteObjectsAPI(parameter, headers)
- objects.delete_objects(objects_tuple)
- return result
diff --git a/support/ui_utils/element_position/map_element_position_library.py b/support/ui_utils/element_position/map_element_position_library.py
index f9aab6331..1b9b8b59d 100644
--- a/support/ui_utils/element_position/map_element_position_library.py
+++ b/support/ui_utils/element_position/map_element_position_library.py
@@ -1025,7 +1025,7 @@ object_ip_object_element_position = {
},
"search": {
"objectListPage_searchLabel_posXpath": ipObjectListPage_searchLabel_posXpath,
- "objectListPage_searchLabel_selectName_posXpath": ipObjectListPage_searchLabel_selectName_posXpath,
+ "objectListPage_searchLabel_selectID_posXpath": objectListPage_search_select_Id_posXpath,
"objectListPage_tableTbody_posXpath": ipObjectListPage_tableTbody_posXpath,
"objectListPage_search_input_posXpath": ipObjectListPage_search_input_posXpath,
"objectListPage_search_dropDown_item_posXpath": ipObjectListPage_search_dropDown_item_posXpath,
@@ -1172,9 +1172,19 @@ object_fqdn_object_element_position = {
"objectPage_okButton_posXpath": fqdnObjectPage_okButton_posXpath,
"objectPage_okButton_warningYes_posXpath": fqdnObjectPage_okButton_warningYes_posXpath
},
+ "edit":{
+ "ObjectListPage_editButton_posXpath":ipObjectListPage_editButton_posXpath,
+ "objectPage_addItem_posXpath": fqdnobjectPage_addItem_posXpath,
+ "ObjectPage_edit_item_posXpath": ipObjectPage_edit_item_posXpath,
+ "ObjectPage_search_item_posXpath": ipObjectPage_search_item_posXpath,
+ "objectPage_inputItem_posXpath": fqdnObjectPage_inputItem_posXpath,
+ "objectPage_button_saveItem_posXpath": fqdnObjectPage_button_saveItem_posXpath,
+ "objectPage_okButton_posXpath": fqdnObjectPage_okButton_posXpath,
+ },
"search": {
"objectListPage_searchLabel_posXpath": fqdnObjectListPage_searchLabel_posXpath,
"objectListPage_searchLabel_selectName_posXpath": fqdnObjectListPage_searchLabel_selectName_posXpath,
+ "objectListPage_searchLabel_selectID_posXpath":objectListPage_search_select_Id_posXpath,
"objectListPage_tableTbody_posXpath": fqdnObjectListPage_tableTbody_posXpath,
"objectListPage_search_input_posXpath": fqdnObjectListPage_search_input_posXpath,
"objectListPage_search_dropDown_item_posXpath": fqdnObjectListPage_search_dropDown_item_posXpath,
@@ -1277,9 +1287,18 @@ object_http_url_object_element_position = {
"ObjectPage_sameItem_addItem_posXpath":urlObjectPage_sameItem_addItem_posXpath,
"ObjectPage_sameItem_inputItem_posXpath": urlObjectPage_sameItem_inputItem_posXpath,
},
+ "edit":{
+ "ObjectListPage_editButton_posXpath":ipObjectListPage_editButton_posXpath,
+ "objectPage_addItem_posXpath": urlObjectPage_addItem_posXpath,
+ "ObjectPage_edit_item_posXpath": ipObjectPage_edit_item_posXpath,
+ "ObjectPage_search_item_posXpath": ipObjectPage_search_item_posXpath,
+ "objectPage_inputItem_posXpath": urlObjectPage_inputItem_posXpath,
+ "objectPage_button_saveItem_posXpath": urlObjectPage_saveItem_posXpath,
+ "objectPage_okButton_posXpath": urlObjectPage_okButton_posXpath,
+ },
"search": {
"objectListPage_searchLabel_posXpath": urlObjectPage_searchLabel_posXpath,
- "objectListPage_searchLabel_selectName_posXpath": urlObjectListPage_searchLabel_selectName_posXpath,
+ "objectListPage_searchLabel_selectID_posXpath": objectListPage_search_select_Id_posXpath,
"objectListPage_tableTbody_posXpath": urlObjectListPage_tableTbody_posXpath,
"objectListPage_search_input_posXpath": urlObjectListPage_search_input_posXpath,
"objectListPage_search_dropDown_item_posXpath": urlObjectListPage_search_dropDown_item_posXpath,
@@ -1313,9 +1332,18 @@ object_account_object_element_position = {
"ObjectPage_sameItem_addItem_posXpath": urlObjectPage_sameItem_addItem_posXpath,
"ObjectPage_sameItem_inputItem_posXpath": urlObjectPage_sameItem_inputItem_posXpath,
},
+ "edit":{
+ "ObjectListPage_editButton_posXpath":ipObjectListPage_editButton_posXpath,
+ "objectPage_addItem_posXpath": accountobjectPage_addItem_posXpath,
+ "ObjectPage_edit_item_posXpath": ipObjectPage_edit_item_posXpath,
+ "ObjectPage_search_item_posXpath": ipObjectPage_search_item_posXpath,
+ "objectPage_inputItem_posXpath": accountObjectPage_inputItem_posXpath,
+ "objectPage_button_saveItem_posXpath": accountObjectPage_button_saveItem_posXpath,
+ "objectPage_okButton_posXpath": accountObjectPage_okButton_posXpath,
+ },
"search": {
"objectListPage_searchLabel_posXpath": accountObjectPage_searchLabel_posXpath,
- "objectListPage_searchLabel_selectName_posXpath": accountObjectListPage_searchLabel_selectName_posXpath,
+ "objectListPage_searchLabel_selectID_posXpath": objectListPage_search_select_Id_posXpath,
"objectListPage_tableTbody_posXpath": accountObjectListPage_tableTbody_posXpath,
"objectListPage_search_input_posXpath": accountObjectListPage_search_input_posXpath,
"objectListPage_search_dropDown_item_posXpath": accountObjectListPage_search_dropDown_item_posXpath,
diff --git a/support/ui_utils/object/create_objects_example.py b/support/ui_utils/object/create_objects_example.py
index d848fa22a..09a1c958b 100644
--- a/support/ui_utils/object/create_objects_example.py
+++ b/support/ui_utils/object/create_objects_example.py
@@ -25,7 +25,7 @@ class CreateObjects:
# if isinstance(temp_object_configuration, dict):
# object_configuration.append(temp_object_configuration)
code_list = []
- if "and_conditions" in configuration.keys():
+ if isinstance(configuration,dict) and "and_conditions" in configuration.keys():
and_conditions = configuration["and_conditions"]
for i in range(len(and_conditions)):
or_conditions = and_conditions[i]["or_conditions"]
diff --git a/support/ui_utils/object/delete_objects_example.py b/support/ui_utils/object/delete_objects_example.py
index 7ecb073e9..72b83eaef 100644
--- a/support/ui_utils/object/delete_objects_example.py
+++ b/support/ui_utils/object/delete_objects_example.py
@@ -12,12 +12,24 @@ class DeleteObjects:
self.driver = driver
self.driver.implicitly_wait(5)
- def delete(self,object_tuple,first_row_checkbox_element):
+ def delete(self,parmameter,objects_tuple):
try:
- for obj in object_tuple:
- element_position_library = get_element_position(obj["type"])
+ for object in objects_tuple:
+ element_position_library = get_element_position(object["type"])
delete_element_position = element_position_library["delete"]
- first_row_checkbox_element.click()
+ page_jump_element_position = element_position_library["page_jump"]
+ search_element_position = element_position_library["search"]
+ page_jump = PageJump(self.driver)
+ time.sleep(0.5)
+ page_jump.jump_sub_object_page(page_jump_element_position)
+ time.sleep(3)
+ self.driver.find_element(By.XPATH,search_element_position["objectListPage_searchLabel_posXpath"]).click()
+ self.driver.find_element(By.XPATH,search_element_position["objectListPage_searchLabel_posXpath"]).send_keys(object["uuid"])
+ self.driver.find_element(By.XPATH, search_element_position["objectListPage_searchLabel_selectID_posXpath"]).click()
+ self.driver.find_element(By.XPATH,search_element_position["objectListPage_search_button_posXpath"]).click()
+ time.sleep(2)
+ self.driver.find_element(By.XPATH, '(//input[@aria-label="Select row"])[1]').click()
+ #点击删除按钮
self.driver.find_element(By.XPATH, delete_element_position["objectListPage_deleteButton_posXpath"]).click()
self.driver.find_element(By.XPATH,delete_element_position["objectListPage_deleteButton_warningYes_posXpath"]).click()
time.sleep(3)
diff --git a/support/ui_utils/object/edit_objects_example.py b/support/ui_utils/object/edit_objects_example.py
index 5a2894d27..a5a339e5c 100644
--- a/support/ui_utils/object/edit_objects_example.py
+++ b/support/ui_utils/object/edit_objects_example.py
@@ -8,20 +8,29 @@ from support.ui_utils.object.page_jump import PageJump
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from support.ui_utils.element_position.map_element_position_library import *
-from support.ui_utils.element_position.object_element_position import *
-from datetime import datetime
-
+from support.ui_utils.object.search_objects import SearchObject
class EditObjects:
def __init__(self, driver):
self.driver = driver
- def edit(self, objects_tuple, first_row_checkbox_element,src_item, new_item):
+ def edit(self, objects_tuple,src_item, new_item):
try:
for object in objects_tuple:
element_position_library = get_element_position(object["type"])
+ page_jump_element_position = element_position_library["page_jump"]
edit_element_position = element_position_library["edit"]
- first_row_checkbox_element.click()
+ search_element_position = element_position_library["search"]
+ page_jump = PageJump(self.driver)
+ time.sleep(0.5)
+ page_jump.jump_sub_object_page(page_jump_element_position)
+ time.sleep(3)
+ self.driver.find_element(By.XPATH, search_element_position["objectListPage_searchLabel_posXpath"]).click()
+ self.driver.find_element(By.XPATH, search_element_position["objectListPage_searchLabel_posXpath"]).send_keys(object["uuid"])
+ self.driver.find_element(By.XPATH, search_element_position["objectListPage_searchLabel_selectID_posXpath"]).click()
+ self.driver.find_element(By.XPATH, search_element_position["objectListPage_search_button_posXpath"]).click()
+ time.sleep(2)
+ self.driver.find_element(By.XPATH,'(//input[@aria-label="Select row"])[1]').click()
self.driver.find_element(By.XPATH, edit_element_position["ObjectListPage_editButton_posXpath"]).click()
object_type = object["type"]
if object_type in ["keyword", "http_signature"]:
@@ -58,33 +67,34 @@ class EditObjects:
elif object_type in ["url", "account", "phone_number", "fqdn", "subscriberid", "apn"]:
if object_type == "phone_number":
self.driver.find_element(By.XPATH,edit_element_position["objectPage_phnoeNumberType_posXpath"]).click()
- for i in range(len(object["items"])): #
+
+ if src_item == "": # 编辑 新增 模式
self.driver.find_element(By.XPATH, edit_element_position["objectPage_addItem_posXpath"]).click()
- if object["items"][i]["op"] == "add": # 直接 新增 模式
- if "->" in object["items"][i]["expression"]: # -> 存在,需要拆分
- item_value_str = object["items"][i]["expression"].split("->")[0].strip()
- else: # -> 不存在,直接取值
- item_value_str = object["items"][i]["expression"].strip()
- # 匹配多个multiple substrings 类型
- if "&" in item_value_str: # & 含有多个 and multiple substrings
- item_values = item_value_str.split("&")
- else:
- item_values = [item_value_str]
- else: # 修改模式,未完成
- # print("todo modify moudle ...")
- item_values = [object["items"][i]["item_value"]]
- for j, item_value in enumerate(item_values): # 遍历添加 multi substr,第一个元素不用点击 + 图标
- item_value = item_value.strip()
- if j == 0:
- # 适配24.10 版本,需要逐个字符输入
- input_field = self.driver.find_element(By.XPATH, edit_element_position["objectPage_inputItem_posXpath"])
- for char in str(item_value):
- input_field.send_keys(char)
- time.sleep(0.1)
- else:
- self.driver.find_element(By.XPATH, edit_element_position["ObjectPage_sameItem_addItem_posXpath"]).click()
- self.driver.find_element(By.XPATH, edit_element_position["ObjectPage_sameItem_inputItem_posXpath"]).send_keys(item_value)
- self.driver.find_element(By.XPATH,edit_element_position["objectPage_button_saveItem_posXpath"]).click()
+ item_value_str = new_item
+ # 匹配多个multiple substrings 类型
+ if "&" in item_value_str: # & 含有多个 and multiple substrings
+ item_values = item_value_str.split("&")
+ else:
+ item_values = [item_value_str]
+ else: # 修改模式
+ self.driver.find_element(By.XPATH,edit_element_position["ObjectPage_search_item_posXpath"]).send_keys(src_item + Keys.ENTER)
+ self.driver.find_element(By.XPATH,edit_element_position["ObjectPage_edit_item_posXpath"]).click()
+ input_field = self.driver.find_element(By.XPATH, edit_element_position["objectPage_inputItem_posXpath"])
+ input_field.send_keys(Keys.CONTROL, 'a') # 或使用 Keys.COMMAND 在 macOS
+ input_field.send_keys(Keys.DELETE)
+ item_values = [new_item]
+ for j, item_value in enumerate(item_values): # 遍历添加 multi substr,第一个元素不用点击 + 图标
+ item_value = item_value.strip()
+ if j == 0:
+ # 适配24.10 版本,需要逐个字符输入
+ input_field = self.driver.find_element(By.XPATH, edit_element_position["objectPage_inputItem_posXpath"])
+ for char in str(item_value):
+ input_field.send_keys(char)
+ time.sleep(0.1)
+ else:
+ self.driver.find_element(By.XPATH, edit_element_position["ObjectPage_sameItem_addItem_posXpath"]).click()
+ self.driver.find_element(By.XPATH, edit_element_position["ObjectPage_sameItem_inputItem_posXpath"]).send_keys(item_value)
+ self.driver.find_element(By.XPATH,edit_element_position["objectPage_button_saveItem_posXpath"]).click()
elif object_type == "flag":
for i in range(len(object["items"])): #
self.driver.find_element(By.XPATH, edit_element_position["objectPage_addItem_posXpath"]).click()
diff --git a/support/ui_utils/object/search_objects_example.py b/support/ui_utils/object/search_objects_example.py
index f71ba02a9..edc27542a 100644
--- a/support/ui_utils/object/search_objects_example.py
+++ b/support/ui_utils/object/search_objects_example.py
@@ -17,56 +17,64 @@ class SearchObjects:
self.my_random = RandomName()
+ def search(self,name,policy_configuration,search_type):
+ if "name" != "":
+ self.get_object_uuid(name,policy_configuration)
+ else:
+ self.search_objects(policy_configuration,search_type)
+
- def search(self,object_info,search,element_position_library):
+ def search_objects(self,objects_tuple,search_type):
try:
- page_jump_element_position = element_position_library["page_jump"]
- # 打开该列表
- page_jump = PageJump(self.driver)
- time.sleep(0.5)
- page_jump.jump_sub_object_page(page_jump_element_position)
- search_types = search["type"].split('&')
- for search_type in search_types:
- search_value = self.get_search_value(object_info,search_type,search["is_fuzzy"])
- if search_type == "uuid":
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_Id_posXpath).click()
- elif search_type == "name":
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_Name_posXpath).click()
- elif search_type == "description":
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_Description_posXpath).click()
- elif search_type == "details":
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_Details_posXpath).click()
- elif search_type == "sub_type":
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_subType_posXpath).click()
- elif search_type == 'sub_name':
- # 删除Group下的sub mobile
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_Name_posXpath).click()
- elif search_type == "ip":
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH, objectListPage_search_select_IP_posXpath).click()
- elif "global" in search_type :
- self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
- self.driver.find_element(By.XPATH,listPage_objectSearch_ip_address_select_Search_posXpath).click()
- # self.driver.find_element(By.XPATH, mainPage_objectSearch_buttonSearch_posXpath).click()
- # time.sleep(0.5)
- # row = self.driver.find_elements(By.XPATH, "//div[@class='MuiDataGrid-virtualScrollerContent css-0']/div/div")
- # if len(row) != 0:
- # first_row = row[0]
- # first_row_checkBox_element = first_row.find_element(By.XPATH, '//input[@aria-label="Select row"]')
- return 200
+ for object in objects_tuple:
+ element_position_library = get_element_position(object["type"])
+ page_jump_element_position = element_position_library["page_jump"]
+ # 打开该列表
+ page_jump = PageJump(self.driver)
+ time.sleep(0.5)
+ page_jump.jump_sub_object_page(page_jump_element_position)
+ search_types = search_type["type"].split('&')
+ for search_type in search_types:
+ search_value = self.get_search_value(object, search_type,search_type["is_fuzzy"])
+ if search_type == "uuid":
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_Id_posXpath).click()
+ elif search_type == "name":
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_Name_posXpath).click()
+ elif search_type == "description":
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_Description_posXpath).click()
+ elif search_type == "details":
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_Details_posXpath).click()
+ elif search_type == "sub_type":
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_subType_posXpath).click()
+ elif search_type == 'sub_name':
+ # 删除Group下的sub mobile
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_Name_posXpath).click()
+ elif search_type == "ip":
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).click()
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH, objectListPage_search_select_IP_posXpath).click()
+ elif "global" in search_type :
+ self.driver.find_element(By.XPATH,ipObjectListPage_searchLabel_posXpath).send_keys(search_value)
+ self.driver.find_element(By.XPATH,listPage_objectSearch_ip_address_select_Search_posXpath).click()
+ # self.driver.find_element(By.XPATH, mainPage_objectSearch_buttonSearch_posXpath).click()
+ # time.sleep(0.5)
+ # row = self.driver.find_elements(By.XPATH, "//div[@class='MuiDataGrid-virtualScrollerContent css-0']/div/div")
+ # if len(row) != 0:
+ # first_row = row[0]
+ # first_row_checkBox_element = first_row.find_element(By.XPATH, '//input[@aria-label="Select row"]')
+ return 200
except Exception as e:
raise
@@ -104,11 +112,11 @@ class SearchObjects:
return search_value
- def get_object_info(self,name,object_configuration):
+ def get_object_uuid(self,name,object_configuration):
try:
object_uuids_list = []
object_uuids_temp_dict = {}
- for object in object_configuration["or_conditions"]:
+ for object in object_configuration:
# 获取当前所需的element
if "sub_type" in object.keys():
object_type = object["sub_type"]
@@ -139,10 +147,10 @@ class SearchObjects:
object_uuids_temp_dict["name"] = object["name"]
object_uuids_temp_dict["description"] = object_description
object_uuids_list.append(object_uuids_temp_dict)
- rule_uuids_tuple = tuple(object_uuids_list)
- return rule_uuids_tuple,200
+ object_uuids_tuple = tuple(object_uuids_list)
+ return object_uuids_tuple,200
except Exception as e:
- raise
+ return 400
def get_first_object(self,uuid,object_configuration):
try:
@@ -168,7 +176,7 @@ class SearchObjects:
if len(row) != 0:
first_row = row[0]
first_row_checkBox_element = first_row.find_element(By.XPATH, '//input[@aria-label="Select row"]')
- return 200, first_row_checkBox_element
+ return first_row_checkBox_element,200
except Exception as e:
raise
# def main_page_item_search(self,object_info):
diff --git a/support/ui_utils/ui_client.py b/support/ui_utils/ui_client.py
index 7a6fcd5e9..1bec8ecdf 100644
--- a/support/ui_utils/ui_client.py
+++ b/support/ui_utils/ui_client.py
@@ -31,21 +31,21 @@ class UIClient:
objects = CreateObjects(self.driver)
code = objects.create_objects(policy_configuration)
return code
-
- def search_objects(self, policy_configuration, search_type):
- objects = SearchObjects()
- objects_tuple, code = objects.search_objects(policy_configuration, search_type)
+
+ def search_objects(self, name, policy_configuration, search_type):
+ objects = SearchObjects(self.driver)
+ objects_tuple, code = objects.get_object_uuid(name, policy_configuration)
self.objects_tuple = objects_tuple
return objects_tuple, code
- def edit_objects(self, objects_tuple, policy_configuration):
- objects = EditObjectsExample()
- code = objects.edit_objects(objects_tuple, policy_configuration, self.driver)
+ def edit_objects(self, objects_tuple, src_item,new_item):
+ objects = EditObjects(self.driver)
+ code = objects.edit(objects_tuple, src_item,new_item)
return code
def delete_objects(self, objects_tuple):
- objects = DeleteObjectsExample(self.driver)
- objects.delete_objects(self.parameter, objects_tuple)
+ objects = DeleteObjects(self.driver)
+ objects.delete(self.parameter, objects_tuple)
def create_rules(self, policy_configuration):
self.policy_configuration = policy_configuration
diff --git a/tests/dos_protection/dos_allow_srcip.py b/tests/dos_protection/dos_allow_srcip.py
index a018bd087..7bf4ab76d 100644
--- a/tests/dos_protection/dos_allow_srcip.py
+++ b/tests/dos_protection/dos_allow_srcip.py
@@ -1,82 +1,164 @@
-# -*- coding: UTF-8 -*-
-import time
import os
import sys
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "allow",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "expected_return": "",
- "counters": {"hits": 1},
- "log_query_param": [
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "allow",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
],
- "traffic": {
- "protocol": "ssl",
- "type": "curl",
- "command": "curl -kv --connect-timeout 10 -m 10 https://www.baidu.com"
- },
- "token": ""
+ "is_enabled": 1,
+ "log_option": "metadata",
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ traffic_generation = {
+ "tool": "ssl",
+ "command": "curl -kv --connect-timeout 10 -m 10 https://www.baidu.com"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "baidu",
+ "expected_metric": {"hits": 1},
+ "expected_log": [
+
+ ]
+ }
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -84,13 +166,11 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "ui",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 0,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
diff --git a/tests/dos_protection/dos_deny_srcip.py b/tests/dos_protection/dos_deny_srcip.py
index 7d2331f5f..36db9e885 100644
--- a/tests/dos_protection/dos_deny_srcip.py
+++ b/tests/dos_protection/dos_deny_srcip.py
@@ -1,81 +1,164 @@
-# -*- coding: UTF-8 -*-
-import time
import os
import sys
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "deny",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "expected_return": "",
- "counters": {"hits": 1},
- "log_query_param": [
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "deny",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
],
- "traffic": {
- "protocol": "ssl",
- "type": "curl",
- "command": "curl -kv --connect-timeout 10 -m 10 https://www.baidu.com"
- }
+ "is_enabled": 1,
+ "log_option": "metadata",
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ traffic_generation = {
+ "tool": "ssl",
+ "command": "curl -kv --connect-timeout 10 -m 10 https://www.baidu.com"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": 1},
+ "expected_log": [
+
+ ]
+ }
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -83,13 +166,11 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "ui",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 0,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
diff --git a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_10s.py b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_10s.py
index cb7b68783..0c7d485d6 100644
--- a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_10s.py
+++ b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_10s.py
@@ -1,66 +1,88 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
-
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "threshold": {
- "type": "concurrency",
- "concurrency_threshold": {
- "group_by": "source_ip",
- "concurrent_sessions": 2 #为1的时候packets_received有为0、为1的不好校验
- }
- },
- "mitigation": {
- "behavior": "deny",
- "timeout": "10s"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "concurrency",
+ "concurrency_threshold": {
+ "group_by": "source_ip",
+ "concurrent_sessions": 2
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout": "10s"
+ },
},
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.30",
+ "servers_end_ip": "2.1.2.30",
+ "active_flows": 3,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "tcp_long30s"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -69,49 +91,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.30",
- "servers_end_ip": "2.1.2.30",
- "active_flows": 3,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "tcp_long30s"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -119,15 +199,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1h.py b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1h.py
index e5ac69dcb..807b03a3e 100644
--- a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1h.py
+++ b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1h.py
@@ -1,66 +1,88 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
-
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "threshold": {
- "type": "concurrency",
- "concurrency_threshold": {
- "group_by": "source_ip",
- "concurrent_sessions": 2
- }
- },
- "mitigation": {
- "behavior": "deny",
- "timeout": "1h"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "concurrency",
+ "concurrency_threshold": {
+ "group_by": "source_ip",
+ "concurrent_sessions": 2
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout": "1h"
+ },
},
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.30",
+ "servers_end_ip": "2.1.2.30",
+ "active_flows": 3,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "tcp_long30s"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -69,49 +91,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.30",
- "servers_end_ip": "2.1.2.30",
- "active_flows": 3,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "tcp_long30s"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -119,16 +199,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 0,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
-
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1min.py b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1min.py
index ea18482b8..5fbc1a88c 100644
--- a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1min.py
+++ b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_1min.py
@@ -1,63 +1,88 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "threshold": {
- "type": "concurrency",
- "concurrency_threshold": {
- "group_by": "source_ip",
- "concurrent_sessions": 2
- }
- },
- "mitigation": {
- "behavior": "deny",
- "timeout": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "concurrency",
+ "concurrency_threshold": {
+ "group_by": "source_ip",
+ "concurrent_sessions": 2
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout": "1min"
+ },
},
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.30",
+ "servers_end_ip": "2.1.2.30",
+ "active_flows": 3,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "tcp_long30s"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -66,49 +91,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.30",
- "servers_end_ip": "2.1.2.30",
- "active_flows": 3,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "tcp_long30s"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -116,13 +199,11 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "ui",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 0,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
diff --git a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_2min.py b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_2min.py
index 402820180..5589af5b7 100644
--- a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_2min.py
+++ b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_2min.py
@@ -1,66 +1,88 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
-
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "threshold": {
- "type": "concurrency",
- "concurrency_threshold": {
- "group_by": "source_ip",
- "concurrent_sessions": 2 #为1的时候packets_received有为0、为1的不好校验
- }
- },
- "mitigation": {
- "behavior": "deny",
- "timeout": "2min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "concurrency",
+ "concurrency_threshold": {
+ "group_by": "source_ip",
+ "concurrent_sessions": 2
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout": "2min"
+ },
},
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.30",
+ "servers_end_ip": "2.1.2.30",
+ "active_flows": 3,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "tcp_long30s"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -69,49 +91,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.30",
- "servers_end_ip": "2.1.2.30",
- "active_flows": 3,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "tcp_long30s"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -119,15 +199,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_5min.py b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_5min.py
index 505fc668a..efd15e389 100644
--- a/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_5min.py
+++ b/tests/dos_protection/dos_protect_srcip_concurrency_srcip_d3_5min.py
@@ -1,66 +1,88 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
-
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- },
- "threshold": {
- "type": "concurrency",
- "concurrency_threshold": {
- "group_by": "source_ip",
- "concurrent_sessions": 2 #为1的时候packets_received有为0、为1的不好校验
- }
- },
- "mitigation": {
- "behavior": "deny",
- "timeout": "5min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": 0,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "concurrency",
+ "concurrency_threshold": {
+ "group_by": "source_ip",
+ "concurrent_sessions": 2
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout": "5min"
+ },
},
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.30",
+ "servers_end_ip": "2.1.2.30",
+ "active_flows": 3,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "tcp_long30s"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -69,49 +91,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.30",
- "servers_end_ip": "2.1.2.30",
- "active_flows": 3,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "tcp_long30s"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -119,15 +199,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10min.py b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10min.py
index 3c6e9fdde..6d0a77359 100644
--- a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10min.py
+++ b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10min.py
@@ -1,74 +1,101 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
+from support.organize_config import OrganizeConfig
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- "application": [
- {
- "name": "dns",
- "object_type": "application",
- "negate": False
- }
- ]
- },
- "threshold": {
- "type": "rate",
- "rate_threshold": {
- "group_by": "source_ip",
- "request_per_period": 1,
- "counting_period": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["dns"]
+ }
+ ],
}
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "rate",
+ "rate_threshold": {
+ "group_by": "source_ip",
+ "request_per_period": 1,
+ "counting_period": "1min"
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout":"10min"
+ },
},
- "mitigation": {
- "behavior": "deny",
- "timeout": "10min"
- },
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.32",
+ "servers_end_ip": "2.1.2.32",
+ "m": 2,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "dns"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -77,49 +104,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.32",
- "servers_end_ip": "2.1.2.32",
- "m": 2,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "dns"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -127,15 +212,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10s.py b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10s.py
index 55beeba34..ecb26951a 100644
--- a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10s.py
+++ b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_10s.py
@@ -1,74 +1,101 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
+from support.organize_config import OrganizeConfig
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- "application": [
- {
- "name": "dns",
- "object_type": "application",
- "negate": False
- }
- ]
- },
- "threshold": {
- "type": "rate",
- "rate_threshold": {
- "group_by": "source_ip",
- "request_per_period": 1,
- "counting_period": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["dns"]
+ }
+ ],
}
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "rate",
+ "rate_threshold": {
+ "group_by": "source_ip",
+ "request_per_period": 1,
+ "counting_period": "1min"
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout":"10s"
+ },
},
- "mitigation": {
- "behavior": "deny",
- "timeout": "10s"
- },
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.32",
+ "servers_end_ip": "2.1.2.32",
+ "m": 2,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "dns"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -77,49 +104,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.32",
- "servers_end_ip": "2.1.2.32",
- "m": 2,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "dns"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -127,15 +212,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1h.py b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1h.py
index b81fbf4f8..4241f3eff 100644
--- a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1h.py
+++ b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1h.py
@@ -1,74 +1,101 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
+from support.organize_config import OrganizeConfig
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- "application": [
- {
- "name": "dns",
- "object_type": "application",
- "negate": False
- }
- ]
- },
- "threshold": {
- "type": "rate",
- "rate_threshold": {
- "group_by": "source_ip",
- "request_per_period": 1,
- "counting_period": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["dns"]
+ }
+ ],
}
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "rate",
+ "rate_threshold": {
+ "group_by": "source_ip",
+ "request_per_period": 1,
+ "counting_period": "1min"
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout":"1h"
+ },
},
- "mitigation": {
- "behavior": "deny",
- "timeout": "1h"
- },
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.32",
+ "servers_end_ip": "2.1.2.32",
+ "m": 2,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "dns"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -77,49 +104,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.32",
- "servers_end_ip": "2.1.2.32",
- "m": 2,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "dns"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -127,15 +212,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1min.py b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1min.py
index dd48736dc..06ac47c0e 100644
--- a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1min.py
+++ b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_1min.py
@@ -1,74 +1,101 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
+from support.organize_config import OrganizeConfig
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- "application": [
- {
- "name": "dns",
- "object_type": "application",
- "negate": False
- }
- ]
- },
- "threshold": {
- "type": "rate",
- "rate_threshold": {
- "group_by": "source_ip",
- "request_per_period": 1,
- "counting_period": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["dns"]
+ }
+ ],
}
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "rate",
+ "rate_threshold": {
+ "group_by": "source_ip",
+ "request_per_period": 1,
+ "counting_period": "1min"
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout":"1min"
+ },
},
- "mitigation": {
- "behavior": "deny",
- "timeout": "1min"
- },
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.32",
+ "servers_end_ip": "2.1.2.32",
+ "m": 2,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "dns"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -77,49 +104,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.32",
- "servers_end_ip": "2.1.2.32",
- "m": 2,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "dns"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -127,15 +212,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_2min.py b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_2min.py
index 4ed8b5a59..896a206d8 100644
--- a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_2min.py
+++ b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_2min.py
@@ -1,71 +1,101 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+
+from support.organize_config import OrganizeConfig
+
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- "application": [
- {
- "name": "dns",
- "object_type": "application",
- "negate": False
- }
- ]
- },
- "threshold": {
- "type": "rate",
- "rate_threshold": {
- "group_by": "source_ip",
- "request_per_period": 1,
- "counting_period": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["dns"]
+ }
+ ],
}
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "rate",
+ "rate_threshold": {
+ "group_by": "source_ip",
+ "request_per_period": 1,
+ "counting_period": "1min"
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout":"2min"
+ },
},
- "mitigation": {
- "behavior": "deny",
- "timeout": "2min"
- },
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.32",
+ "servers_end_ip": "2.1.2.32",
+ "m": 2,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "dns"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -74,49 +104,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.32",
- "servers_end_ip": "2.1.2.32",
- "m": 2,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "dns"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -124,13 +212,11 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "ui",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
diff --git a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_5min.py b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_5min.py
index e849a7532..f87cc770a 100644
--- a/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_5min.py
+++ b/tests/dos_protection/dos_protect_srcip_dns_rate_srcip_cperiod_1min_d3_5min.py
@@ -1,74 +1,101 @@
# -*- coding: UTF-8 -*-
-import time
import os
import sys
-from support.ui_utils.element_position.map_element_position_library import replace_paras
+from support.organize_config import OrganizeConfig
-sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
+sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
+import time
+import pytz
from datetime import datetime
+from support.ui_utils.workpath import workdir
+from support.ui_utils.ui_client import UIClient
+from support.api_utils.api_client import APIClient
+from support.packet_generator.traffic_generator import *
from support.report_update import ReportUpdate
-from support.common_utils.create_policy import CreatePolicy
+
def run(parameter):
try:
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to run test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Begin to run test case: " + parameter["test_case_name"], flush=True)
# 参数初始化
- exception_result = ""
- result = {}
-
+ result, exception_result = "", ""
+ test_summary = {}
+
# 脚本启动时间
script_start_time = time.time()
-
+
# 测试数据
- test_data = {
- "is_multi_priority": False,
- "rule_num": 1,
- "policy_type": "dos_protection",
- "rule_name": os.path.splitext(os.path.basename(__file__))[0],
- "rule_action": "protect",
- "rule_type": "create",
- "do_log" : 2,
- "condition": {
- "source_ip": [
- {
- "name": "dos_srcip",
- "object_type": "ip",
- "select_type": False,
- "negate": False,
- "item": [
- {
- "item_operation": "add",
- "item_type": "ipv4",
- "item_value": parameter['test_pc_ip'],
- }
- ]
- }
- ],
- "application": [
- {
- "name": "dns",
- "object_type": "application",
- "negate": False
- }
- ]
- },
- "threshold": {
- "type": "rate",
- "rate_threshold": {
- "group_by": "source_ip",
- "request_per_period": 1,
- "counting_period": "1min"
+ policy_configuration = {
+ "type": "dos_protection",
+ "name": os.path.splitext(os.path.basename(__file__))[0],
+ "action": "protect",
+ "and_conditions": [
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_SOURCE_IP",
+ "type": "ip",
+ "sub_type": "ip",
+ "name": "sec_srcip",
+ "items": [
+ {
+ "op": "add",
+ "ip": parameter["test_pc_ip"],
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "negate_option": False,
+ "or_conditions": [
+ {
+ "attribute_name": "ATTR_APP_ID",
+ "type": "application",
+ "items": ["dns"]
+ }
+ ],
}
+ ],
+ "action_parameter": {
+ "threshold": {
+ "type": "rate",
+ "rate_threshold": {
+ "group_by": "source_ip",
+ "request_per_period": 1,
+ "counting_period": "1min"
+ }
+ },
+ "mitigation": {
+ "behavior": "deny",
+ "timeout":"5min"
+ },
},
- "mitigation": {
- "behavior": "deny",
- "timeout": "5min"
- },
- "expected_return": "",
- "counters": {"hits": "many"},
- "log_query_param": [
- #,
+ "is_enabled": 1,
+ "log_option": "metadata",
+ }
+
+ traffic_generation = {
+ "protocol": "",
+ "tool":"trex",
+ "clients_start_ip": "1.1.1.1",
+ "clients_end_ip": "1.1.1.1",
+ "servers_start_ip": "2.1.2.32",
+ "servers_end_ip": "2.1.2.32",
+ "m": 2,
+ "d": 5,
+ "yaml_name": "test",
+ "pcap_name": "dns"
+ }
+
+ verification_result = {
+ "excepted_traffic_result": "",
+ "expected_metric": {"hits": "many"},
+ "expected_log": [
{"query_field_key": "sessions", "query_value": ""},
{"query_field_key": "packets", "query_value": ""},
{"query_field_key": "bytes", "query_value": ""},
@@ -77,49 +104,107 @@ def run(parameter):
{"query_field_key": "session_rate", "query_value": ""},
{"query_field_key": "rule_uuid", "query_value": ""},
- #{"query_field_key": "conditions", "query_value": ""},
{"query_field_key": "source_ip", "query_value": ""},
{"query_field_key": "source_country", "query_value": "Private Network"},
{"query_field_key": "destination_ip", "query_value": ""},
{"query_field_key": "destination_country", "query_value": "US"},
- ],
- "traffic": {
- "protocol": "",
- "tool":"trex",
- "clients_start_ip": "1.1.1.1",
- "clients_end_ip": "1.1.1.1",
- "servers_start_ip": "2.1.2.32",
- "servers_end_ip": "2.1.2.32",
- "m": 2,
- "d": 5,
- "yaml_name": "test",
- "pcap_name": "dns"
- },
- "token": ""
+ ]
}
-
- # 测试用例实例化
- create = CreatePolicy(test_data, parameter)
- result = create.create_policy()
-
- return result
+
+ # 创建
+ if parameter["initiation_method"] == "ui":
+ ui_client = UIClient()
+ rules_tuple, ui_error = ui_client.create_rules(policy_configuration)
+ if len(ui_error) > 0:
+ return ui_error
+ elif parameter["initiation_method"] == "api":
+ api_client = APIClient(parameter)
+ # {uuid, type}, i.e., {"12341-232-a21", "ip"}
+ objects_tuple, api_error = api_client.create_objects(policy_configuration)
+ if len(api_error) > 0:
+ return api_error
+ rules_tuple, api_error = api_client.create_rules(policy_configuration, objects_tuple, "", "")
+ if len(api_error) > 0:
+ return api_error
+
+ # 等待下发配置生效
+ time.sleep(3)
+
+ # 类实例化
+ generator = TrafficGenerator()
+
+ # 获取当前时间
+ utc_tz = pytz.timezone('UTC')
+ current_utc_time = datetime.now(utc_tz)
+ start_time = current_utc_time.strftime('%Y-%m-%dT%H:%M:%SZ')
+
+ # 触发流量
+ traffic_result = generator.run(policy_configuration, traffic_generation)
+
+ # 验证流量生成器的返回值是否符合策略执行的预期
+ excepted_traffic_result, error = generator.result(verification_result, traffic_result)
+ if excepted_traffic_result == False:
+ return error
+
+ # 验证tsg的日志是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ log_result = ui_client.query_rule_log(verification_result, rules_tuple, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ log_result = api_client.query_rule_log(traffic_generation, verification_result, rules_tuple, start_time,
+ traffic_result)
+ if log_result == True:
+ test_summary["log"] = "Pass."
+ elif log_result == False:
+ test_summary["log"] = "The failure reason: the returned log does not match the expected result."
+ elif log_result == None:
+ test_summary["log"] = "The failure reason: the returned log is empty."
+ elif len(log_result) > 0:
+ test_summary["log"] = log_result
+
+ # 验证tsg的metric是否符合策略执行的预期
+ if parameter["initiation_method"] == "ui":
+ metric_result = ui_client.query_rule_metric(verification_result, traffic_result)
+ elif parameter["initiation_method"] == "api":
+ metric_result = api_client.query_rule_metric(verification_result, rules_tuple, start_time, traffic_result)
+ if metric_result == True:
+ test_summary["metric"] = "Pass."
+ elif metric_result == False:
+ test_summary["metric"] = "The failure reason: the returned metric does not match the expected result."
+ elif metric_result == None:
+ test_summary["metric"] = "The failure reason: the returned metric is empty."
+ elif len(metric_result) > 0:
+ test_summary["metric"] = metric_result
+
+ return test_summary
except Exception as e:
exception_result = str(e)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Error: ", e, flush=True)
- return "Error: " + str(e)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "When running test case, the exception error: ", str(e), flush=True)
+ return "When running test case, the exception error: " + str(e)
finally:
- # 清理环境并删除配置
- if isinstance(create, CreatePolicy):
- create.clean_up()
+ # 删除
+ if parameter["initiation_method"] == "ui":
+ if rules_tuple:
+ ui_client.delete_rules(parameter, policy_configuration)
+ elif parameter["initiation_method"] == "api":
+ if rules_tuple:
+ api_client.delete_rules(rules_tuple)
+ if objects_tuple:
+ api_client.delete_objects(objects_tuple)
+
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Finish test case: " + parameter["test_case_name"], flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
-
+
+
if __name__ == '__main__':
parameter = {
"username": "lina2024",
@@ -127,15 +212,12 @@ if __name__ == '__main__':
"test_pc_ip": "192.168.64.96",
"test_subcriber_id": "test6776",
"api_server": "http://192.168.44.72",
- "script_type": "api",
+ "initiation_method": "api",
"env": "tsgx",
- "vsys_id": 1,
- "is_log": 1,
- "debug_flag": "local",
- "root_path": "E:/tsg_test",
- "path": "E:/tsg_test/tests/ui",
+ "vsys": 1,
+ "root_path": workdir,
+ "path": workdir + "/tests",
"module_name": "dos_protection",
"test_case_name": os.path.basename(__file__)[:-3]
}
- parameter = replace_paras(parameter)
run(parameter) \ No newline at end of file
diff --git a/tests/object/test_temp/create_account_object_temp.py b/tests/object/test_temp/create_account_object_temp.py
index fbaf53c3f..e5023f68c 100644
--- a/tests/object/test_temp/create_account_object_temp.py
+++ b/tests/object/test_temp/create_account_object_temp.py
@@ -16,10 +16,9 @@ def run(parameter):
script_start_time = time.time()
#测试数据
- object_configuration = {
- "or_conditions": [
+ object_configuration = [
{
- "name": "test",
+ "name": "create_account_object",
"type": "account",
"member_type": "item",
"statistics_option": "", # random
@@ -27,46 +26,63 @@ def run(parameter):
{
"op": "add",
"expr_type": "and",
- "expression": "[email protected]&abcd"
+ "expression": "[email protected]"
}
]
}
- ],
- # "search": {
- # "is_fuzzy": False,
- # "type": "uuid"
- # },
- # "audit_log": False # system
- }
+ ]
+
+ ui_client = UIClient(parameter)
# 创建
- ui_client = UIClient()
- objects_tuple, ui_error = ui_client.create_objects(object_configuration)
- if len(ui_error) > 0:
- return ui_error
+ code = ui_client.create_objects(object_configuration)
+ if code != 200:
+ return "Fail to create object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Create Object successful ")
+
+ search_type = {
+ "is_fuzzy": False,
+ "type": "",
+ }
+ # 查询
+ objects_tuple, code = ui_client.search_objects("create_account_object", object_configuration, search_type)
objects_list = list(objects_tuple)
- if len(objects_list) > 0 and len(objects_list[0]["uuid"]) > 0:
- return ""
+ if len(objects_list) == 0:
+ return "Fail to get object uuid."
+ if code != 200:
+ return "Fail to search object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Search Object successful ")
+
+ # 编辑
+ code = ui_client.edit_objects(objects_tuple, src_item="", new_item="user")
+ if code != 200:
+ return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
+ code = ui_client.edit_objects(objects_tuple, src_item="[email protected]", new_item="ftpuser")
+ if code != 200:
+ return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
+ return ""
except Exception as e:
exception_result = str(e)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"When running test case, the exception error: ", str(e), flush=True)
return "When running test case, the exception error: " + str(e)
finally:
# 删除
- if parameter["initiation_method"] == "ui":
- if objects_tuple is not None:
- ui_client.delete_objects(parameter, objects_tuple, object_configuration["search"])
- # elif parameter["initiation_method"] == "api":
- # if not objects_tuple:
- # api_client.delete_rules(rules_tuple)
- # if not objects_tuple:
- # api_client.delete_objects(objects_tuple)
-
+ if objects_tuple:
+ ui_client.delete_objects(objects_tuple)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Delete Object successful ")
+ # ui_client.cleanup()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
diff --git a/tests/object/test_temp/create_fqdn_object_temp.py b/tests/object/test_temp/create_fqdn_object_temp.py
index 1108859df..2d35a55eb 100644
--- a/tests/object/test_temp/create_fqdn_object_temp.py
+++ b/tests/object/test_temp/create_fqdn_object_temp.py
@@ -16,40 +16,62 @@ def run(parameter):
script_start_time = time.time()
#测试数据
- object_configuration = {
- "or_conditions": [
- {
- "name": "test",
- "type": "fqdn",
- "member_type": "item",
- "statistics_option": "", # random
- "items": [
- {
- "op": "add",
- "expr_type": "and",
- "expression": "open.node.com"
- },
- ],
- }
- ],
- "search": {
- "is_fuzzy": False,
- "type": "uuid"
- },
- "audit_log": False # system
- }
+ object_configuration = [
+ {
+ "name": "create_fqdn_object",
+ "type": "fqdn",
+ "member_type": "item",
+ "statistics_option": "", # random
+ "items": [
+ {
+ "op": "add",
+ "expr_type": "and",
+ "expression": "open.node.com"
+ },
+ ]
+ }
+ ]
+
+ ui_client = UIClient(parameter)
# 创建
- # 创建
- ui_client = UIClient()
- objects_tuple, ui_error = ui_client.create_objects(object_configuration)
- if len(ui_error) > 0:
- return ui_error
+ code = ui_client.create_objects(object_configuration)
+ if code != 200:
+ return "Fail to create object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Create Object successful ")
+
+ search_type = {
+ "is_fuzzy": False,
+ "type": "",
+ }
+ # 查询
+ objects_tuple, code = ui_client.search_objects("create_fqdn_object", object_configuration, search_type)
objects_list = list(objects_tuple)
- if len(objects_list) > 0 and len(objects_list[0]["uuid"]) > 0:
- return ""
+ if len(objects_list) == 0:
+ return "Fail to get object uuid."
+ if code != 200:
+ return "Fail to search object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Search Object successful ")
+
+ # 编辑
+ # 如果src_item是空,则表示该item是新加
+ code = ui_client.edit_objects(objects_tuple, src_item="", new_item="baidu.com")
+ if code != 200:
+ return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
+ # 如果src_item是不是空,在item输入框,输入src_item,找到item,点击item的编辑按钮,删除src_item,输入new_item,点击保存按钮
+ code = ui_client.edit_objects(objects_tuple, src_item="open.node.com", new_item="^www.youtube.com$")
+ if code != 200:
+ return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
+ return ""
except Exception as e:
exception_result = str(e)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"When running test case, the exception error: ", str(e), flush=True)
@@ -57,15 +79,10 @@ def run(parameter):
finally:
# 删除
- if parameter["initiation_method"] == "ui":
- if objects_tuple is not None:
- ui_client.delete_objects(parameter, objects_tuple, object_configuration["search"])
- # elif parameter["initiation_method"] == "api":
- # if not objects_tuple:
- # api_client.delete_rules(rules_tuple)
- # if not objects_tuple:
- # api_client.delete_objects(objects_tuple)
-
+ if objects_tuple:
+ ui_client.delete_objects(objects_tuple)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Delete Object successful ")
+ # ui_client.cleanup()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
@@ -73,6 +90,7 @@ def run(parameter):
"Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
"Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
diff --git a/tests/object/test_temp/create_ip_learning_objects_temp.py b/tests/object/test_temp/create_ip_learning_objects_temp.py
index 241716d3f..ec4499dbf 100644
--- a/tests/object/test_temp/create_ip_learning_objects_temp.py
+++ b/tests/object/test_temp/create_ip_learning_objects_temp.py
@@ -16,46 +16,49 @@ def run(parameter):
script_start_time = time.time()
#测试数据
- object_configuration = {
- "or_conditions": [
- {
- "name": "test",
- "type": "ip",
- "op": "add",
- "sub_type": "ip_learning",
- "member_type": "item",
- "statistics_option": "", # random
- "learning_plan": {
- "fqdn_ip_learning_plan": {
- "learning_depth": "1",
- "from_protocol": "SSL",
- "aging_time": 10,
- "vote_client_num": 10,
- "goal_upper_limit": 10,
- "from_fqdns": [
- "^www.example.com$"
- ]
- }
+ object_configuration = [
+ {
+ "name": "create_ip_learing_object",
+ "type": "ip",
+ "op": "add",
+ "sub_type": "ip_learning",
+ "member_type": "item",
+ "statistics_option": "", # random
+ "learning_plan": {
+ "fqdn_ip_learning_plan": {
+ "learning_depth": "1",
+ "from_protocol": "SSL",
+ "aging_time": 10,
+ "vote_client_num": 10,
+ "goal_upper_limit": 10,
+ "from_fqdns": [
+ "^www.example.com$"
+ ]
}
}
- ],
- "search": {
- "is_fuzzy": False,
- "type": "uuid"
- },
+ }
+ ]
- "audit_log": False # system
- }
+ ui_client = UIClient(parameter)
# 创建
- ui_client = UIClient()
- objects_tuple, ui_error = ui_client.create_objects(object_configuration)
- if len(ui_error) > 0:
- return ui_error
+ code = ui_client.create_objects(object_configuration)
+ if code != 200:
+ return "Fail to create object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Create Object successful ")
+
+ # 查询
+ objects_tuple, code = ui_client.search_objects("create_ip_learing_object", object_configuration, "")
objects_list = list(objects_tuple)
- if len(objects_list) > 0 and len(objects_list[0]["uuid"]) > 0:
- return ""
+ if len(objects_list) == 0:
+ return "Fail to get object uuid."
+ if code != 200:
+ return "Fail to search object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Search Object successful ")
+ return ""
except Exception as e:
exception_result = str(e)
@@ -64,15 +67,10 @@ def run(parameter):
finally:
# 删除
- if parameter["initiation_method"] == "ui":
- if objects_tuple is not None:
- ui_client.delete_objects(parameter, objects_tuple, object_configuration["search"])
- # elif parameter["initiation_method"] == "api":
- # if not objects_tuple:
- # api_client.delete_rules(rules_tuple)
- # if not objects_tuple:
- # api_client.delete_objects(objects_tuple)
-
+ if objects_tuple:
+ ui_client.delete_objects(objects_tuple)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Delete Object successful ")
+ # ui_client.cleanup()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
@@ -80,6 +78,7 @@ def run(parameter):
"Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
"Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)
diff --git a/tests/object/test_temp/create_ip_object_temp.py b/tests/object/test_temp/create_ip_object_temp.py
index ff822e12f..3c0312afe 100644
--- a/tests/object/test_temp/create_ip_object_temp.py
+++ b/tests/object/test_temp/create_ip_object_temp.py
@@ -15,56 +15,64 @@ def run(parameter):
script_start_time = time.time()
#测试数据
- object_configuration = {
- "or_conditions": [
- {
- "name": "create_ip_object",
- "type": "ip",
- "sub_type": "ip",
- "member_type": "item",
- "statistics_option": "", # random
- "items": [
- {
- "op": "add",
- "ip": "1.1.1.1",
- "interval": "0-65535"
- },
- {
- "op": "add",
- "ip": "192.168.2.1/24",
- "interval": "0-65535"
- }
- ]
- },
- ]
- }
+ object_configuration = [
+ {
+ "name": "create_ip_object",
+ "type": "ip",
+ "sub_type": "ip",
+ "member_type": "item",
+ "statistics_option": "", # random
+ "items": [
+ {
+ "op": "add",
+ "ip": "1.1.1.1",
+ "interval": "0-65535"
+ },
+ {
+ "op": "add",
+ "ip": "192.168.2.1/24",
+ "interval": "0-65535"
+ }
+ ]
+ }
+ ]
- ui_client = UIClient()
+ ui_client = UIClient(parameter)
# 创建
code = ui_client.create_objects(object_configuration)
if code != 200:
return "Fail to create object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Create Object successful ")
+ search_type ={
+ "is_fuzzy": False,
+ "type": "",
+ }
# 查询
- objects_tuple, code = ui_client.search_objects("create_ip_object",object_configuration,type="object_info")
+ objects_tuple, code = ui_client.search_objects("create_ip_object",object_configuration,search_type)
objects_list = list(objects_tuple)
if len(objects_list) == 0:
return "Fail to get object uuid."
if code != 200:
return "Fail to search object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Search Object successful ")
# 编辑
- object_uuid = objects_tuple[0]["uuid"]
- first_row_checkbox_element,result = ui_client.search_objects(object_uuid,object_configuration,type="object_select")
# 如果src_item是空,则表示该item是新加
- code = ui_client.edit_objects(objects_tuple,first_row_checkbox_element, src_item="", new_item="3.3.3.3")
+ code = ui_client.edit_objects(objects_tuple, src_item="", new_item="3.3.3.3")
if code != 200:
return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
# 如果src_item是不是空,在item输入框,输入src_item,找到item,点击item的编辑按钮,删除src_item,输入new_item,点击保存按钮
- code = ui_client.edit_objects(objects_tuple,first_row_checkbox_element, src_item="1.1.1.1", new_item="2.2.2.2")
+ code = ui_client.edit_objects(objects_tuple, src_item="1.1.1.1", new_item="2.2.2.2")
if code != 200:
return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
return ""
except Exception as e:
@@ -74,8 +82,9 @@ def run(parameter):
finally:
# 删除
if objects_tuple:
- ui_client.delete_objects( objects_tuple,first_row_checkbox_element)
-
+ ui_client.delete_objects(objects_tuple)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Delete Object successful ")
+ # ui_client.cleanup()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
diff --git a/tests/object/test_temp/create_url_object_temp.py b/tests/object/test_temp/create_url_object_temp.py
index c91450eb4..8d661c3f7 100644
--- a/tests/object/test_temp/create_url_object_temp.py
+++ b/tests/object/test_temp/create_url_object_temp.py
@@ -16,39 +16,60 @@ def run(parameter):
script_start_time = time.time()
#测试数据
- object_configuration = {
- "or_conditions": [
- {
- "name": "test",
- "type": "url",
- "member_type": "item",
- "statistics_option": "", # random
- "items": [
- {
- "op": "add",
- "expr_type": "and",
- "expression": "english&abcd"
- }
- ]
- }
- ],
- "search": {
- "is_fuzzy": False,
- "type": "uuid"
- },
- "audit_log": False # system
- }
+ object_configuration = [
+ {
+ "name": "create_url_object",
+ "type": "url",
+ "member_type": "item",
+ "statistics_option": "", # random
+ "items": [
+ {
+ "op": "add",
+ "expr_type": "and",
+ "expression": "english"
+ }
+ ]
+ }
+ ]
+
+ ui_client = UIClient(parameter)
# 创建
- ui_client = UIClient()
- objects_tuple, ui_error = ui_client.create_objects(object_configuration)
- if len(ui_error) > 0:
- return ui_error
+ code = ui_client.create_objects(object_configuration)
+ if code != 200:
+ return "Fail to create object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Create Object successful ")
+
+ search_type = {
+ "is_fuzzy": False,
+ "type": "",
+ }
+ # 查询
+ objects_tuple, code = ui_client.search_objects("create_url_object", object_configuration, search_type)
objects_list = list(objects_tuple)
- if len(objects_list) > 0 and len(objects_list[0]["uuid"]) > 0:
- return ""
+ if len(objects_list) == 0:
+ return "Fail to get object uuid."
+ if code != 200:
+ return "Fail to search object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Search Object successful ")
+ # 编辑
+ # 如果src_item是空,则表示该item是新加
+ code = ui_client.edit_objects(objects_tuple, src_item="", new_item="facebook")
+ if code != 200:
+ return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
+ # 如果src_item是不是空,在item输入框,输入src_item,找到item,点击item的编辑按钮,删除src_item,输入new_item,点击保存按钮
+ code = ui_client.edit_objects(objects_tuple, src_item="english", new_item="baidu")
+ if code != 200:
+ return "Fail to edit object."
+ else:
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"Edit Object successful ")
+ return ""
except Exception as e:
exception_result = str(e)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],"When running test case, the exception error: ", str(e), flush=True)
@@ -56,15 +77,11 @@ def run(parameter):
finally:
# 删除
- if parameter["initiation_method"] == "ui":
- if objects_tuple is not None:
- ui_client.delete_objects(parameter, objects_tuple, object_configuration["search"])
- # elif parameter["initiation_method"] == "api":
- # if not objects_tuple:
- # api_client.delete_rules(rules_tuple)
- # if not objects_tuple:
- # api_client.delete_objects(objects_tuple)
-
+ if objects_tuple:
+ ui_client.delete_objects(objects_tuple)
+ print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
+ "Delete Object successful ")
+ # ui_client.cleanup()
# 统计脚本用时
script_end_time = time.time()
duration = script_end_time - script_start_time
@@ -72,6 +89,7 @@ def run(parameter):
"Duration of running the test case: ", "{:.3f}".format(duration), flush=True)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3],
"Finish test case: " + parameter["test_case_name"], flush=True)
+
# 生成csv报告
update = ReportUpdate()
update.write_result(parameter, result, exception_result)