1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
|
# import os
import sys
import time
# sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from support.ui_utils.profiles.page_jump import PageJump
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from support.ui_utils.element_position.profile_element_position import *
import re
import random
import copy
from datetime import datetime
class CreateServiceFunctionForwarderProfile:
def __init__(self, driver):
self.driver = driver
self.sc_info = {}
def create_sff_profile(self,test_data, sff_profile_element_position,sf_profile_element_position):
self.sc_info["targeted_traffic"] = test_data["targeted_traffic"]
self.sc_info["application"] = []
if len(test_data["condition"]["application"]) > 0:
self.sc_info["application"].append(test_data["condition"]["application"][0]["name"])
negate_list = []
if "condition" in test_data.keys():
for con_key , con_value in test_data["condition"].items():
if con_key == "ip_protocol" or con_key == "sub_action_override":
continue
else:
if len(con_value) > 0:
for i in range(len(con_value)):
if "negate" in con_value[i].keys() and con_value[i]["negate"] == True:
negate_list.append(1)
else:
negate_list.append(0)
if 1 in negate_list:
self.sc_info["is_negate"] = 1
else:
self.sc_info["is_negate"] = 0
for j in range(len(test_data["profile"])):
sff_profile_data = test_data["profile"][j]
page_jump_element_position = sff_profile_element_position["page_jump"]
create_element_position = sff_profile_element_position["create"]
try:
# 新建sf
if "service_functions" in sff_profile_data.keys(): # 判断是否需要创建sf
service_functions = sff_profile_data["service_functions"]
sf_name_list= self.get_sf_info(service_functions,sf_profile_element_position)
# 页面跳转
page_jump = PageJump(self.driver)
page_jump.jump_sub_profile_page(page_jump_element_position)
time.sleep(1)
# 点击create
self.driver.find_element(By.XPATH, create_element_position["profileListPage_createButton_posXpath"]).click()
time.sleep(1)
# 输入name
sff_name = sff_profile_data["name"]
self.driver.find_element(By.XPATH, create_element_position["profilePge_inputName_posXpath"]).send_keys(sff_name)
#页面除name外其他选项的操作
self.operate_sff_page(sff_profile_data,sf_name_list,sff_profile_element_position)
# 确认创建
self.driver.find_element(By.XPATH, create_element_position["profilePage_okButton_posXpath"]).click()
return 200,self.sc_info
except Exception as e:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"Exception: {e}", flush=True)
raise
#return 400
def operate_sff_page(self, data,sf_name_list,sff_profile_element_position):
create_element_position = sff_profile_element_position["create"]
"""
除name的其它选项操作,共创建、修改逻辑调用
:param data:
:param operation_type: create modify。创建 修改时调用
:return:
"""
# 重组定位使用
label_dict = {'type': 'Type', 'load_balance_method': 'Load Balance Method', 'load_balance_localization': 'Load Balance Localization',
'failure_action': 'Failure Action', "unavailability_action": "Unavailability Action"}
radio_name_dict = {'steering': 'Steering', 'mirroring': 'Mirroring', 'hash_int-ip': 'Hash Int-IP', 'hash_ext-ip': 'Hash Ext-IP',
'hash_int-ip_and_ext-ip': 'Hash Int-IP and Ext-IP', 'hash_innermost_int-ip': 'Hash Innermost Int-IP',
'nearby': 'Nearby', 'global': 'Global', 'bypass': 'Bypass', 're-dispatch': 'Re-dispatch', 'block': 'Block'}
# Type 操作
sff_type = self.format_from_data_lower_and_underline(data, key="type")
if sff_type == "steering":
self.sc_info["type"] = 1
elif sff_type == "mirroring":
self.sc_info["type"] = 2
self.sff_operate_radio(create_element_position, key="type", value=sff_type, label_dict=label_dict, radio_name_dict=radio_name_dict)
# Load Balance Method 操作
load_balance_method = self.format_from_data_lower_and_underline(data, key="load_balance_method")
self.sff_operate_radio(create_element_position, key="load_balance_method", value=load_balance_method, label_dict=label_dict, radio_name_dict=radio_name_dict)
time.sleep(1)
# Load Balance Localization操作
load_balance_localization = self.format_from_data_lower_and_underline(data, key="load_balance_localization")
self.sc_info["load_balance_localization"] = load_balance_localization
self.sff_operate_radio(create_element_position, key="load_balance_localization", value=load_balance_localization, label_dict=label_dict, radio_name_dict=radio_name_dict)
time.sleep(1)
# Failure Action 操作
failure_action = self.format_from_data_lower_and_underline(data, key="failure_action")
failure_action_list = failure_action.split(":")
failure_action = failure_action_list[0]
self.sff_operate_radio(create_element_position, key="failure_action", value=failure_action, label_dict=label_dict, radio_name_dict=radio_name_dict) #点击 Failure Action 操作
time.sleep(1)
if "re-dispatch" in failure_action_list:
# Unavailability Action 点击操作
replaceLabel_unavailability_action = label_dict["unavailability_action"]
replaceRadioName_unavailability_action = radio_name_dict[failure_action_list[1]]
ep_unavailability_action = create_element_position["profilePage_operateUnavailabilityAction_Radio_posXpath"].format(replaceLabel=replaceLabel_unavailability_action, replaceRadioName=replaceRadioName_unavailability_action)
time.sleep(1.5)
self.driver.find_element(By.XPATH,ep_unavailability_action).click() #点击 Unavailability Action
if "bypass" in failure_action_list: # bypass 下的输入框操作 Minimum Health Service Functions
data_minHealthServiceFunctions = int(failure_action_list[-1])
self.operate_input(data_input=data_minHealthServiceFunctions, ep_input=create_element_position["profilePage_inputMiniHealthServerFunctions_posXpath"]) # 输入 Minimum Health Service Functions
#添加SF的操作
self.driver.find_element(By.XPATH,create_element_position["profilePage_addSF_posXpath"]).click()
time.sleep(1)
for name in sf_name_list:
self.driver.find_element(By.XPATH,create_element_position["profilePage_searchSF_posXpath"]).click()
self.driver.find_element(By.XPATH,create_element_position["profilePage_searchSF_posXpath"]).send_keys(name)
time.sleep(0.5)
self.driver.find_element(By.XPATH,create_element_position["profilePage_searchSF_posXpath"]).send_keys(Keys.ENTER)
time.sleep(1)
self.driver.find_element(By.XPATH,create_element_position["profilePage_chooseSF_posXpath"].format(replaceName=name)).click()
time.sleep(1)
self.driver.find_element(By.XPATH,create_element_position["profilePage_closeSFProfile_posXpath"]).click()
def sff_operate_radio(self,create_element_position,key="", value="", label_dict=None, radio_name_dict=None):
"""Type Load Balance Method Load Balance Localization Failure Action 单选按钮操作
适合
:param key:
:param value:
:param label_dict: 为字典类型{}
:param radio_name_dict:为字典类型{}
:return:
"""
# 替换元素定位中的值,组织元素定位
replaceLabel_type = label_dict[key]
replaceRadioName_type = radio_name_dict[value]
ep_radio_type = create_element_position["profilePage_operateRadio_posXpath"].format(replaceLabel=replaceLabel_type, replaceRadioName=replaceRadioName_type)
# type点选按钮操作
self.driver.find_element(By.XPATH,ep_radio_type).click()
def format_lower_and_underline(self, str_a: str):
"""
将字符串替换为小写,所有空格替换为一个下划线 A b C a_b_c
"""
str_a = str_a.lower()
str_b = re.sub(r"\s+", "_", str_a)
return str_b
def format_from_data_lower_and_underline(self, data, key=""):
value = data[key]
value = self.format_lower_and_underline(value)
return value
def get_sf_info(self, service_functions,sf_profile_element_position):
"""
可以创建多个sf对象
:param service_functions: sf所需数据列表
:return: [sf_name_id_dict, sf_id_data_dict] 例如: [{"name1":"1", "name2":"2"}, {"1":{sf1}, "2":{sf2}}]
"""
sf_name_list = [] #sf的name的列表
for sf in range(len(service_functions)):
try:
serviceFunctions_name= self.create_sf_profile(service_functions[sf],sf_profile_element_position)
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Create service function profile successfully.", flush=True)
sf_name_list.append(serviceFunctions_name)
except Exception as e:
raise
#return "Error: " + str(e)
return sf_name_list
def create_sf_profile(self,data, profile_element_position):
# self.sc_info["sf_id_list"] = []
page_jump_element_position = profile_element_position["page_jump"]
create_element_position = profile_element_position["create"]
# search_element_position = profile_element_position["search"]
#页面跳转
page_jump = PageJump(self.driver)
page_jump.jump_sub_profile_page(page_jump_element_position)
time.sleep(1)
#点击create
self.driver.find_element(By.XPATH,create_element_position["profileListPage_createButton_posXpath"]).click()
#在打开页面操作:输入name
sf_name = data["name"]
self.driver.find_element(By.XPATH,create_element_position["profileListPage_inputName_posXpath"]).send_keys(sf_name)
#页面其它选项操作
self.operate_sf_page(data,create_element_position)
#点击OK
self.driver.find_element(By.XPATH, create_element_position["profilePage_okButton_posXpath"]).click()
time.sleep(2)
# # 在列表页以name搜索sf 获取sf id
# btn = self.driver.find_element(By.XPATH,search_element_position["profileListPage_searchLabel_posXpath"])
# self.driver.execute_script("arguments[0].click()", btn)
# self.driver.find_element(By.XPATH, search_element_position["profileListPage_searchLabel_selectName_posXpath"]).click()
# self.driver.find_element(By.XPATH,search_element_position["profileListPage_searchLabel_inputContent_posXpath"]).send_keys(sf_name)
# btn = self.driver.find_element(By.XPATH, search_element_position["profileListPage_searchButton_posXpath"])
# self.driver.execute_script("arguments[0].click()", btn)
# time.sleep(2)
# sf_id = self.driver.find_element(By.XPATH,"//div[@class='table-status-item-id']//span").text
# self.sc_info["sf_id_list"].append(int(sf_id))
return sf_name
def operate_sf_page(self,data,create_element_position):
# Device Group 操作
device_group = self.format_from_data_lower_and_underline(data, key="device_group")
device_group_list = device_group.split(":")
self.sc_info["device_tag"] = device_group_list[-1]
#替换元素定位中的值,组织元素定位
effective_devices_group_dict = {"device_group": "Group", "data_center": "Center"}
effective_devices_name_dict = {'group-xxg-tsgxsfcos': 'group-xxg-tsgxSfcOs', 'group-xxg-7400': 'group-xxg-7400', 'group-xxg-9140': 'group-xxg-9140', 'group-xxg-tsgx': 'group-xxg-tsgx',
'center-xxg-7400': 'center-xxg-7400', 'center-xxg-9140': 'center-xxg-9140', 'center-xxg-tsgx': 'center-xxg-tsgx', 'center_02': 'Center 02', 'city_a': 'City A',
'group_华严3楼':'GROUP 华严3楼', 'group_华严4楼':'GROUP 华严4楼', 'center_华严3楼':'Center 华严3楼', 'center_华严4楼':'Center 华严4楼'}
replaceDeviceType = effective_devices_group_dict[device_group_list[0]]
replaceDeviceName = effective_devices_name_dict[device_group_list[-1]]
#点击添加框
self.driver.find_element(By.XPATH, create_element_position["profilePage_addDeviceTag_posXpath"]).click()
time.sleep(0.5)
# 点击侧滑框中Device类型
self.driver.find_element(By.XPATH, create_element_position["profilePage_selectDeviceType_posXpath"].format(deviceType=replaceDeviceType)).click()
time.sleep(0.5)
# 选择Device
self.driver.find_element(By.XPATH, create_element_position["profilePage_searchDevice_inputName_posXpath"]).click
self.driver.find_element(By.XPATH, create_element_position["profilePage_searchDevice_inputName_posXpath"]).send_keys(replaceDeviceName)
self.driver.find_element(By.XPATH, create_element_position["profilePage_searchDevice_inputName_posXpath"]).send_keys(Keys.ENTER)
time.sleep(1)
self.driver.find_element(By.XPATH, create_element_position["profilePage_selectDevice_posXpath"]).click()
# 关闭侧滑框
self.driver.find_element(By.XPATH, create_element_position["profilePage_closeSidePage_posXpath"]).click()
time.sleep(0.5)
# Connectivity选择操作
connectivity = self.format_from_data_lower_and_underline(data, key="connectivity")
data_internal_vlan = data_external_vlan = ""
connectivity_list = connectivity.split(":")
if "layer" in connectivity: # Layer 2 Switch 定位
self.sc_info["sf_method"] = "layer2_switch"
self.driver.find_element(By.XPATH,create_element_position["profilePage_layer2Switch_posXpath"]).click()
data_internal_vlan = connectivity_list[1]
if data_internal_vlan == "random": #随机生成
data_internal_vlan = random.randint(2048, 4000)
self.driver.find_element(By.XPATH,create_element_position["profilePage_intInternalVlan_posXpath"]).send_keys(data_internal_vlan)
data_external_vlan = connectivity_list[2]
if data_external_vlan == "random": #随机生成
data_external_vlan = random.randint(2048, 4000)
self.driver.find_element(By.XPATH,create_element_position["profilePage_intExternalVlan_posXpath"]).send_keys(data_external_vlan)
else: # vxlan定位
self.sc_info["sf_method"] = "vxlan_g"
self.driver.find_element(By.XPATH,create_element_position["profilePage_Vxlan_G_posXpath"]).click()
sf_dest_ip = connectivity_list[1]
self.sc_info["sf_dest_ip"] = sf_dest_ip
self.operate_input(sf_dest_ip,create_element_position["profilePage_inputDestIP_posXpath"],True)
# Health Check 操作
health_check = self.format_from_data_lower_and_underline(data, key="health_check")
time.sleep(1)
# 组织元素定位
health_check_radio_index_dict = {'none': 'None', 'in-band_bfd': 'In-band BFD', 'bfd': 'BFD', 'http': 'HTTP'}
health_item = health_check.split(":")[0]
replaceValue = health_check_radio_index_dict[health_item]
# 点击health check单选按钮
self.driver.find_element(By.XPATH,create_element_position["profilePage_operateHealthCheckRadio_posXpath"].format(type=replaceValue)).click()
time.sleep(1)
if health_check.startswith("bfd:"):
health_check_list = health_check.split(":")
data_interval = int(health_check_list[1])
data_retries = int(health_check_list[2])
# 输入input操作
self.operate_input(data_interval,create_element_position["profilePage_inputInterval_posXpath"],True)
self.operate_input(data_retries,create_element_position["profilePage_inputRetries_posXpath"],True)
if data["enable"] == "off":
self.driver.find_element(By.XPATH,create_element_position["profilePage_switch_enableButton_posXpath"]).click()
self.sc_info["admin_status"] = 0
else:
self.sc_info["admin_status"] = 1
def operate_input(self, data_input="", ep_input="", is_clear=True):
"""
页面中输入框类型操作
:param input_data: input 输入值
:param ep_input: 输入框元素定位
:param is_clear: 输入前是否情况数据,默认是:True False
:return:
"""
if is_clear: # 清楚原有数据
self.driver.find_element(By.XPATH, ep_input).clear()
self.driver.find_element(By.XPATH, ep_input).send_keys(data_input)
# def operate_drawer_box(self, ep_dropdown_input="", ep_dropdown_item=""):
# """
# 侧滑选择点击操作
# :param ep_dropdown_input: 下拉菜单输入框定位
# :param ep_dropdown_item: 下拉菜单列表选项定位
# :return:
# """
# #点击添加框
# self.driver.find_element(By.XPATH, ep_dropdown_input).click()
# #点击侧滑里列表中按钮
# self.driver.find_element(By.XPATH, ep_dropdown_item).click()
# self.driver.find_element(By.XPATH, serviceFunctions_button_effectiveDevicesDrawerOk_posXpath).click()
|