# import os import sys import time # sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from support.ui_utils.profiles.page_jump import PageJump from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from support.ui_utils.element_position.profile_element_position import * import re import random import copy from datetime import datetime class CreateServiceFunctionForwarderProfile: def __init__(self, driver): self.driver = driver self.sc_info = {} def create_sff_profile(self,test_data, sff_profile_element_position,sf_profile_element_position): self.sc_info["targeted_traffic"] = test_data["targeted_traffic"] self.sc_info["application"] = [] if len(test_data["condition"]["application"]) > 0: self.sc_info["application"].append(test_data["condition"]["application"][0]["name"]) negate_list = [] if "condition" in test_data.keys(): for con_key , con_value in test_data["condition"].items(): if con_key == "ip_protocol" or con_key == "sub_action_override": continue else: if len(con_value) > 0: for i in range(len(con_value)): if "negate" in con_value[i].keys() and con_value[i]["negate"] == True: negate_list.append(1) else: negate_list.append(0) if 1 in negate_list: self.sc_info["is_negate"] = 1 else: self.sc_info["is_negate"] = 0 for j in range(len(test_data["profile"])): sff_profile_data = test_data["profile"][j] page_jump_element_position = sff_profile_element_position["page_jump"] create_element_position = sff_profile_element_position["create"] try: # 新建sf if "service_functions" in sff_profile_data.keys(): # 判断是否需要创建sf service_functions = sff_profile_data["service_functions"] sf_name_list= self.get_sf_info(service_functions,sf_profile_element_position) # 页面跳转 page_jump = PageJump(self.driver) page_jump.jump_sub_profile_page(page_jump_element_position) time.sleep(1) # 点击create self.driver.find_element(By.XPATH, create_element_position["profileListPage_createButton_posXpath"]).click() time.sleep(1) # 输入name sff_name = sff_profile_data["name"] self.driver.find_element(By.XPATH, create_element_position["profilePge_inputName_posXpath"]).send_keys(sff_name) #页面除name外其他选项的操作 self.operate_sff_page(sff_profile_data,sf_name_list,sff_profile_element_position) # 确认创建 self.driver.find_element(By.XPATH, create_element_position["profilePage_okButton_posXpath"]).click() return 200,self.sc_info except Exception as e: print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"Exception: {e}", flush=True) raise #return 400 def operate_sff_page(self, data,sf_name_list,sff_profile_element_position): create_element_position = sff_profile_element_position["create"] """ 除name的其它选项操作,共创建、修改逻辑调用 :param data: :param operation_type: create modify。创建 修改时调用 :return: """ # 重组定位使用 label_dict = {'type': 'Type', 'load_balance_method': 'Load Balance Method', 'load_balance_localization': 'Load Balance Localization', 'failure_action': 'Failure Action', "unavailability_action": "Unavailability Action"} radio_name_dict = {'steering': 'Steering', 'mirroring': 'Mirroring', 'hash_int-ip': 'Hash Int-IP', 'hash_ext-ip': 'Hash Ext-IP', 'hash_int-ip_and_ext-ip': 'Hash Int-IP and Ext-IP', 'hash_innermost_int-ip': 'Hash Innermost Int-IP', 'nearby': 'Nearby', 'global': 'Global', 'bypass': 'Bypass', 're-dispatch': 'Re-dispatch', 'block': 'Block'} # Type 操作 sff_type = self.format_from_data_lower_and_underline(data, key="type") if sff_type == "steering": self.sc_info["type"] = 1 elif sff_type == "mirroring": self.sc_info["type"] = 2 self.sff_operate_radio(create_element_position, key="type", value=sff_type, label_dict=label_dict, radio_name_dict=radio_name_dict) # Load Balance Method 操作 load_balance_method = self.format_from_data_lower_and_underline(data, key="load_balance_method") self.sff_operate_radio(create_element_position, key="load_balance_method", value=load_balance_method, label_dict=label_dict, radio_name_dict=radio_name_dict) time.sleep(1) # Load Balance Localization操作 load_balance_localization = self.format_from_data_lower_and_underline(data, key="load_balance_localization") self.sc_info["load_balance_localization"] = load_balance_localization self.sff_operate_radio(create_element_position, key="load_balance_localization", value=load_balance_localization, label_dict=label_dict, radio_name_dict=radio_name_dict) time.sleep(1) # Failure Action 操作 failure_action = self.format_from_data_lower_and_underline(data, key="failure_action") failure_action_list = failure_action.split(":") failure_action = failure_action_list[0] self.sff_operate_radio(create_element_position, key="failure_action", value=failure_action, label_dict=label_dict, radio_name_dict=radio_name_dict) #点击 Failure Action 操作 time.sleep(1) if "re-dispatch" in failure_action_list: # Unavailability Action 点击操作 replaceLabel_unavailability_action = label_dict["unavailability_action"] replaceRadioName_unavailability_action = radio_name_dict[failure_action_list[1]] ep_unavailability_action = create_element_position["profilePage_operateUnavailabilityAction_Radio_posXpath"].format(replaceLabel=replaceLabel_unavailability_action, replaceRadioName=replaceRadioName_unavailability_action) time.sleep(1.5) self.driver.find_element(By.XPATH,ep_unavailability_action).click() #点击 Unavailability Action if "bypass" in failure_action_list: # bypass 下的输入框操作 Minimum Health Service Functions data_minHealthServiceFunctions = int(failure_action_list[-1]) self.operate_input(data_input=data_minHealthServiceFunctions, ep_input=create_element_position["profilePage_inputMiniHealthServerFunctions_posXpath"]) # 输入 Minimum Health Service Functions #添加SF的操作 self.driver.find_element(By.XPATH,create_element_position["profilePage_addSF_posXpath"]).click() time.sleep(1) for name in sf_name_list: self.driver.find_element(By.XPATH,create_element_position["profilePage_searchSF_posXpath"]).click() self.driver.find_element(By.XPATH,create_element_position["profilePage_searchSF_posXpath"]).send_keys(name) time.sleep(0.5) self.driver.find_element(By.XPATH,create_element_position["profilePage_searchSF_posXpath"]).send_keys(Keys.ENTER) time.sleep(1) self.driver.find_element(By.XPATH,create_element_position["profilePage_chooseSF_posXpath"].format(replaceName=name)).click() time.sleep(1) self.driver.find_element(By.XPATH,create_element_position["profilePage_closeSFProfile_posXpath"]).click() def sff_operate_radio(self,create_element_position,key="", value="", label_dict=None, radio_name_dict=None): """Type Load Balance Method Load Balance Localization Failure Action 单选按钮操作 适合 :param key: :param value: :param label_dict: 为字典类型{} :param radio_name_dict:为字典类型{} :return: """ # 替换元素定位中的值,组织元素定位 replaceLabel_type = label_dict[key] replaceRadioName_type = radio_name_dict[value] ep_radio_type = create_element_position["profilePage_operateRadio_posXpath"].format(replaceLabel=replaceLabel_type, replaceRadioName=replaceRadioName_type) # type点选按钮操作 self.driver.find_element(By.XPATH,ep_radio_type).click() def format_lower_and_underline(self, str_a: str): """ 将字符串替换为小写,所有空格替换为一个下划线 A b C a_b_c """ str_a = str_a.lower() str_b = re.sub(r"\s+", "_", str_a) return str_b def format_from_data_lower_and_underline(self, data, key=""): value = data[key] value = self.format_lower_and_underline(value) return value def get_sf_info(self, service_functions,sf_profile_element_position): """ 可以创建多个sf对象 :param service_functions: sf所需数据列表 :return: [sf_name_id_dict, sf_id_data_dict] 例如: [{"name1":"1", "name2":"2"}, {"1":{sf1}, "2":{sf2}}] """ sf_name_list = [] #sf的name的列表 for sf in range(len(service_functions)): try: serviceFunctions_name= self.create_sf_profile(service_functions[sf],sf_profile_element_position) print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Create service function profile successfully.", flush=True) sf_name_list.append(serviceFunctions_name) except Exception as e: raise #return "Error: " + str(e) return sf_name_list def create_sf_profile(self,data, profile_element_position): # self.sc_info["sf_id_list"] = [] page_jump_element_position = profile_element_position["page_jump"] create_element_position = profile_element_position["create"] # search_element_position = profile_element_position["search"] #页面跳转 page_jump = PageJump(self.driver) page_jump.jump_sub_profile_page(page_jump_element_position) time.sleep(1) #点击create self.driver.find_element(By.XPATH,create_element_position["profileListPage_createButton_posXpath"]).click() #在打开页面操作:输入name sf_name = data["name"] self.driver.find_element(By.XPATH,create_element_position["profileListPage_inputName_posXpath"]).send_keys(sf_name) #页面其它选项操作 self.operate_sf_page(data,create_element_position) #点击OK self.driver.find_element(By.XPATH, create_element_position["profilePage_okButton_posXpath"]).click() time.sleep(2) # # 在列表页以name搜索sf 获取sf id # btn = self.driver.find_element(By.XPATH,search_element_position["profileListPage_searchLabel_posXpath"]) # self.driver.execute_script("arguments[0].click()", btn) # self.driver.find_element(By.XPATH, search_element_position["profileListPage_searchLabel_selectName_posXpath"]).click() # self.driver.find_element(By.XPATH,search_element_position["profileListPage_searchLabel_inputContent_posXpath"]).send_keys(sf_name) # btn = self.driver.find_element(By.XPATH, search_element_position["profileListPage_searchButton_posXpath"]) # self.driver.execute_script("arguments[0].click()", btn) # time.sleep(2) # sf_id = self.driver.find_element(By.XPATH,"//div[@class='table-status-item-id']//span").text # self.sc_info["sf_id_list"].append(int(sf_id)) return sf_name def operate_sf_page(self,data,create_element_position): # Device Group 操作 device_group = self.format_from_data_lower_and_underline(data, key="device_group") device_group_list = device_group.split(":") self.sc_info["device_tag"] = device_group_list[-1] #替换元素定位中的值,组织元素定位 effective_devices_group_dict = {"device_group": "Group", "data_center": "Center"} effective_devices_name_dict = {'group-xxg-tsgxsfcos': 'group-xxg-tsgxSfcOs', 'group-xxg-7400': 'group-xxg-7400', 'group-xxg-9140': 'group-xxg-9140', 'group-xxg-tsgx': 'group-xxg-tsgx', 'center-xxg-7400': 'center-xxg-7400', 'center-xxg-9140': 'center-xxg-9140', 'center-xxg-tsgx': 'center-xxg-tsgx', 'center_02': 'Center 02', 'city_a': 'City A', 'group_华严3楼':'GROUP 华严3楼', 'group_华严4楼':'GROUP 华严4楼', 'center_华严3楼':'Center 华严3楼', 'center_华严4楼':'Center 华严4楼'} replaceDeviceType = effective_devices_group_dict[device_group_list[0]] replaceDeviceName = effective_devices_name_dict[device_group_list[-1]] #点击添加框 self.driver.find_element(By.XPATH, create_element_position["profilePage_addDeviceTag_posXpath"]).click() time.sleep(0.5) # 点击侧滑框中Device类型 self.driver.find_element(By.XPATH, create_element_position["profilePage_selectDeviceType_posXpath"].format(deviceType=replaceDeviceType)).click() time.sleep(0.5) # 选择Device self.driver.find_element(By.XPATH, create_element_position["profilePage_searchDevice_inputName_posXpath"]).click self.driver.find_element(By.XPATH, create_element_position["profilePage_searchDevice_inputName_posXpath"]).send_keys(replaceDeviceName) self.driver.find_element(By.XPATH, create_element_position["profilePage_searchDevice_inputName_posXpath"]).send_keys(Keys.ENTER) time.sleep(1) self.driver.find_element(By.XPATH, create_element_position["profilePage_selectDevice_posXpath"]).click() # 关闭侧滑框 self.driver.find_element(By.XPATH, create_element_position["profilePage_closeSidePage_posXpath"]).click() time.sleep(0.5) # Connectivity选择操作 connectivity = self.format_from_data_lower_and_underline(data, key="connectivity") data_internal_vlan = data_external_vlan = "" connectivity_list = connectivity.split(":") if "layer" in connectivity: # Layer 2 Switch 定位 self.sc_info["sf_method"] = "layer2_switch" self.driver.find_element(By.XPATH,create_element_position["profilePage_layer2Switch_posXpath"]).click() data_internal_vlan = connectivity_list[1] if data_internal_vlan == "random": #随机生成 data_internal_vlan = random.randint(2048, 4000) self.driver.find_element(By.XPATH,create_element_position["profilePage_intInternalVlan_posXpath"]).send_keys(data_internal_vlan) data_external_vlan = connectivity_list[2] if data_external_vlan == "random": #随机生成 data_external_vlan = random.randint(2048, 4000) self.driver.find_element(By.XPATH,create_element_position["profilePage_intExternalVlan_posXpath"]).send_keys(data_external_vlan) else: # vxlan定位 self.sc_info["sf_method"] = "vxlan_g" self.driver.find_element(By.XPATH,create_element_position["profilePage_Vxlan_G_posXpath"]).click() sf_dest_ip = connectivity_list[1] self.sc_info["sf_dest_ip"] = sf_dest_ip self.operate_input(sf_dest_ip,create_element_position["profilePage_inputDestIP_posXpath"],True) # Health Check 操作 health_check = self.format_from_data_lower_and_underline(data, key="health_check") time.sleep(1) # 组织元素定位 health_check_radio_index_dict = {'none': 'None', 'in-band_bfd': 'In-band BFD', 'bfd': 'BFD', 'http': 'HTTP'} health_item = health_check.split(":")[0] replaceValue = health_check_radio_index_dict[health_item] # 点击health check单选按钮 self.driver.find_element(By.XPATH,create_element_position["profilePage_operateHealthCheckRadio_posXpath"].format(type=replaceValue)).click() time.sleep(1) if health_check.startswith("bfd:"): health_check_list = health_check.split(":") data_interval = int(health_check_list[1]) data_retries = int(health_check_list[2]) # 输入input操作 self.operate_input(data_interval,create_element_position["profilePage_inputInterval_posXpath"],True) self.operate_input(data_retries,create_element_position["profilePage_inputRetries_posXpath"],True) if data["enable"] == "off": self.driver.find_element(By.XPATH,create_element_position["profilePage_switch_enableButton_posXpath"]).click() self.sc_info["admin_status"] = 0 else: self.sc_info["admin_status"] = 1 def operate_input(self, data_input="", ep_input="", is_clear=True): """ 页面中输入框类型操作 :param input_data: input 输入值 :param ep_input: 输入框元素定位 :param is_clear: 输入前是否情况数据,默认是:True False :return: """ if is_clear: # 清楚原有数据 self.driver.find_element(By.XPATH, ep_input).clear() self.driver.find_element(By.XPATH, ep_input).send_keys(data_input) # def operate_drawer_box(self, ep_dropdown_input="", ep_dropdown_item=""): # """ # 侧滑选择点击操作 # :param ep_dropdown_input: 下拉菜单输入框定位 # :param ep_dropdown_item: 下拉菜单列表选项定位 # :return: # """ # #点击添加框 # self.driver.find_element(By.XPATH, ep_dropdown_input).click() # #点击侧滑里列表中按钮 # self.driver.find_element(By.XPATH, ep_dropdown_item).click() # self.driver.find_element(By.XPATH, serviceFunctions_button_effectiveDevicesDrawerOk_posXpath).click()