diff options
| author | 董晓燕 <[email protected]> | 2021-06-03 09:55:45 +0000 |
|---|---|---|
| committer | 董晓燕 <[email protected]> | 2021-06-03 09:55:45 +0000 |
| commit | ac68e65f508799a0e555a240ae374d313a0a8d75 (patch) | |
| tree | 2a339bbd8acd65e2fb235159cc9c5303ae5725b7 /keyword/common | |
| parent | 2f39b56d617e5fba2b8d73d81cd5e6d894f85352 (diff) | |
| parent | 4667c668725ff7cb673c637a297c67283876d4d4 (diff) | |
Develop
See merge request dongxiaoyan/gap_tsg_api!4
Diffstat (limited to 'keyword/common')
55 files changed, 5404 insertions, 0 deletions
diff --git a/keyword/common/api_request.robot b/keyword/common/api_request.robot new file mode 100644 index 0000000..3325e2a --- /dev/null +++ b/keyword/common/api_request.robot @@ -0,0 +1,52 @@ +*** Settings ***
+Library Collections
+Library json
+Library RequestsLibrary
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+PostRequest1
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ #${data1} json.dumps ${data}
+ ${remoteResponse} Post Request api ${url} data=${data} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+GetRequest1
+ [Arguments] ${url}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Get Request api ${url} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+DeleteRequest1
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Delete Request api ${url} data=${data} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+UpFilePostRequest
+ [Arguments] ${url} ${data} ${files} ${fileDesc}
+ ${header} Set To Dictionary ${fileDesc} Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Post Request api ${url} data=${data} files=${files} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+UpFilePutRequest
+ [Arguments] ${url} ${data} ${files} ${fileDesc}
+ ${header} Set To Dictionary ${fileDesc} Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Put Request api ${url} params=${data} files=${files} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
diff --git a/keyword/common/clear_data.robot b/keyword/common/clear_data.robot new file mode 100644 index 0000000..1c933fb --- /dev/null +++ b/keyword/common/clear_data.robot @@ -0,0 +1,68 @@ +*** Settings ***
+Library Collections
+Library RequestsLibrary
+Resource ../../variable/common_variable.txt
+#Resource functional_keywords.robot
+Resource ../policys/policy.robot
+Resource ../objects/object.robot
+Resource ../objects/application.robot
+Resource common.robot
+*** Variables ***
+${policyUrl} /policy/compile
+*** Keywords ***
+DeletePolicyAndObjectAndOther
+ [Documentation] delete :policy object category app signature profile ...
+ [Arguments] ${policyIds}=${EMPTY} ${objectIds}=${EMPTY} ${categoryIds}=${EMPTY} ${profiledId}=${EMPTY} ${appIds}=${EMPTY} ${signatureId}=${EMPTY}
+ #Run Keyword If "${policyIds}"=="${EMPTY}" log no policyIds to del
+ #... ELSE DeletePoliciebyList ${policyIds}
+ Run Keyword If "${createPolicyIds}"=="${EMPTY}" log no policyIds to del
+ ... ELSE DeletePoliciebyList ${createPolicyIds}
+
+ #Run Keyword If "${objectIds}"=="${EMPTY}" log no objectIds to del
+ #... ELSE DeleteObjectByIds ${objectIds}
+ Run Keyword If "${createObjectIds}"=="${EMPTY}" log no objectIds to del
+ ... ELSE DeleteObjectByIds ${createObjectIds}
+
+ Run Keyword If "${categoryIds}"=="${EMPTY}" log no categoryIds to del
+ ... ELSE DeleteCategoryByIds ${categoryIds}
+ #删除文件
+ Run Keyword If "${profiledId}"=="${EMPTY}" log no profiledId to del
+ ... ELSE DeleteProfileByIds ${url} ${profiledId}
+
+ Run Keyword If "${appids}"=="${EMPTY}" log no appids to del
+ ... ELSE DeleteApplicationByIds ${appids}
+
+ Run Keyword If "${signatureId}"=="${EMPTY}" log no signatureId to del
+ ... ELSE DeleteSignatureByIds ${signatureId}
+
+DeletePolicyAndGroupObject
+ [Documentation] delete :policy object
+ ... policyList
+ [Arguments] ${policyList}=${EMPTY} ${objectIds}=${EMPTY}
+ Run Keyword If "${policyList}"=="${EMPTY}" log no policyList to del
+ ... ELSE DeletePoliciebyList ${policyList}
+
+ Run Keyword If "${objectIds}"=="${EMPTY}" log no objectIds to del
+ ... ELSE DeleteObjectByIds ${objectIds}
+DeletePolicyAndObject
+ [Arguments] ${policyids} ${objectids} ${categoriesId}=null
+ ${objectId1} Create List [${objectids}]
+ ${categoryId1} Create List [${categoriesId}]
+ DeletePolicyAndGroupObject1 ${policyIds} ${objectId1} ${categoryId1}
+
+DeletePolicyAndGroupObject1
+ [Arguments] ${policyids} ${objectids} ${categoriesId}
+ #引用此关键字,${objectids}参数必须是双层列表,eg:['[1]','[2,3]','[4,5,6,7]']
+ #删除策略和对象
+ log toDeletePolicy_DeletePolicyAndObject
+ ${listlenth}= Get Length ${policyids}
+ Run Keyword If "${listlenth}"=="0" log no Policys to del
+ ... ELSE DeletePoliciebyList ${policyids}
+ #删除对象
+ Run Keyword If "${objectids}"=="${EMPTY}" log no Objects to del
+ ... ELSE IF ${objectids}==['[]'] log no Objects to del
+ ... ELSE DeleteGroupObjects ${objectids}
+ log ${categoriesId}
+ Run Keyword If "${categoriesId}"=="${EMPTY}" or ${categoriesId}==['[null]'] or ${categoriesId}==['[]'] log no Categories to del
+ ... ELSE DeleteGroupCategories ${categoriesId}
+
\ No newline at end of file diff --git a/keyword/common/command.robot b/keyword/common/command.robot new file mode 100644 index 0000000..fac6643 --- /dev/null +++ b/keyword/common/command.robot @@ -0,0 +1,103 @@ +*** Settings *** +Library OperatingSystem +Library Selenium2Library +Library RequestsLibrary +Library Collections +Library ../../customlib/common/common.py +Resource ../../variable/common_variable.txt + +*** Keywords *** +SystemCommands + [Documentation] 执行系统命令,在命令结果中匹配指定字符串列表,需匹配到所有字符串才为通过 + [Arguments] ${commandstr} ${stringlist} + log toSystemCommand_SystemCommandTest + ${commandreturn} OperatingSystem.Run ${commandstr} + #Create File ${path}/test.txt ${commandreturn} + #Append To File ${path}/write_file.txt ${commandstr} + #Append To File ${path}/write_file.txt %%%%%%%%%%%%%%newbat + #Append To File ${path}/write_file.txt ${commandreturn} + ${listlenth}= Get Length ${stringlist} + FOR ${var} IN RANGE ${listlenth} + #log ${var} + Should Contain ${commandreturn} ${stringlist}[${var}] + END + ${rescode} Set Variable 200 + log ${rescode} + [Return] ${rescode} + +SystemCommandsRetry + [Documentation] 执行系统命令,失败后重试 + [Arguments] ${commandstr} ${stringlist} ${retryNum}=1 + log to_SystemCommandsRetry + ${trueCounter} Set Variable 0 + ${falseCounter} Set Variable 0 + FOR ${var} IN RANGE ${retryNum} + ${rescode} SystemCommandContains ${commandstr} ${stringlist} + ${trueCounter} ${falseCounter} Counter ${trueCounter} ${falseCounter} ${rescode} + END + [Return] ${trueCounter} +IsContain + [Documentation] 比较一个字符串是否包含另一个字符串,包含返回true,否则返回false + ... 多行内容,例如命令指向结果有问题。 + [Arguments] ${sourceStr} ${findContent} + ${rescode} Evaluate "abc" in "abcd" + ${rescode} Evaluate ${findContent} in ${sourceStr} + log ${rescode} + #https://blog.csdn.net/sun_977759/article/details/107815615 + #${file} Evaluate open(r'${filepath}) + # Evaluate ${file}.close() + #[Robot Framework] 校验字符串中是否包含某个子字符串,校验同时满足两个条件中任意一个 +#${tWarningMessage} Run Keyword If ${tIfExist} AutoItLibrary.Win Get Text Generate Fee Data warning message ELSE Set Variable "" + +#${tIfDuplicateFee} Evaluate "Duplicate Fee Warning" in """${tWarningMessage}""" + +#${rCheckResult} Evaluate ${totalNumberNew}==${totalNumberOld} or ${totalNumberNew}>${totalNumberOld} + + [Return] ${rescode} + +Counter + [Documentation] 给计数器赋值 + [Arguments] ${trueCounter} ${falseCounter} ${value} + ${trueCounter} Run Keyword If "${value}" == "True" Evaluate ${trueCounter}+1 + ... ELSE Set Variable ${trueCounter} + ${falseCounter} Run Keyword If "${value}" == "False" Evaluate ${falseCounter}+1 + ... ELSE Set Variable ${falseCounter} + [Return] ${trueCounter} ${falseCounter} + +SystemCommandContains + [Documentation] 执行系统命令,在命令结果中匹配指定字符串列表,匹配大于指定个数即为通过 + [Arguments] ${commandStr} ${stringList} ${passNum}=1 + log toSystemCommand_SystemCommandTest + ${commandreturn} OperatingSystem.Run ${commandStr} + ${listlenth}= Get Length ${stringList} + ${trueCounter} Set Variable 0 + ${falseCounter} Set Variable 0 + FOR ${var} IN RANGE ${listlenth} + #log ${var} + ${rescode} aisincludeb ${stringList}[${var}] ${commandreturn} + ${trueCounter} ${falseCounter} Counter ${trueCounter} ${falseCounter} ${rescode} + END + #${rescode} Evaluate ${totalNumberNew}==${totalNumberOld} or ${totalNumberNew}>${totalNumberOld} + ${rescode} Evaluate ${trueCounter}==${passNum} or ${trueCounter}>${passNum} + log ${rescode} + [Return] ${rescode} + + +SystemCommandReturnCompare + [Documentation] 执行命令并比对命令返回结果 需要执行的系统命令 命令返回结果要包含的字符串列表 命令返回结果不能包含的字符串列表 + [Arguments] ${commandStr} ${stringList} ${stringListNotIn} + log toSystemCommand_SystemCommandTest + ${commandreturn} OperatingSystem.Run ${commandStr} + ${listlenth}= Get Length ${stringList} + FOR ${var} IN RANGE ${listlenth} + log ${var} + Should Contain ${commandreturn} ${stringList}[${var}] + END + ${listnotin}= Get Length ${stringlistnotin} + FOR ${varn} IN RANGE ${listnotin} + log ${varn} + Should Not Contain ${commandreturn} ${stringListNotIn}[${varn}] + END + ${rescode} Set Variable 200 + log ${rescode} + [Return] ${rescode} diff --git a/keyword/common/common.robot b/keyword/common/common.robot new file mode 100644 index 0000000..4a585a6 --- /dev/null +++ b/keyword/common/common.robot @@ -0,0 +1,255 @@ +*** Settings ***
+Resource ../../variable/common_variable.txt
+Library REST http://${host}:${port}
+Library Collections
+Library RequestsLibrary
+Library json
+
+*** Keywords ***
+ManageApistr
+ [Documentation]
+ ... 描述:入口,apistr 为调用api地址
+ ... 本关键字的作用是判断地址前有没有加版本号,如果没有加上
+ ... policy/compile -> /v1/policy/compile
+ ... /policy/compile -> /v1/policy/compile
+ ... v1/policy/compile -> /v1/policy/compile
+ ... /v1/policy/compile -> /v1/policy/compile
+ [Arguments] ${apistr}
+ ${apiStart} Evaluate '${apistr}'[0:1]
+ ${apiStart1} Evaluate '${apistr}'[0:2]
+ ${apistr} Run Keyword If "${apiStart}"!="/" and "${apiStart}"!="v"
+ ... Set Variable /${version}/${apistr}
+ ... ELSE IF "${apiStart}"=="/" and "${apiStart1}"!="/v"
+ ... Set Variable /${version}${apistr}
+ ... ELSE IF "${apiStart}"=="v" Set Variable /${apistr}
+ ... ELSE IF "${apiStart1}"=="/v" Set Variable ${apistr}
+ ... ELSE Set Variable ${apistr}
+ log ${apistr}
+ [Return] ${apistr}
+BasePostRequest
+ [Arguments] ${apistr} ${body}
+ Set Headers {"Authorization":"${token}","Content-Type":"application/json"}
+ log ${apistr}
+ &{httpResponse} Post ${apistr} ${body}
+ #Output response body
+ Object response body
+ #Integer $.code 200
+ #Array $.data.policyList
+ ${response} Set Variable ${httpResponse.body}
+ [Return] ${response}
+
+BasePostRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BasePostRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+
+BasePostRequestForV2
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Post Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseGetRequest
+ [Arguments] ${apistr} ${body}
+ Set Headers {"Authorization":"${token}","Content-Type":"application/json"}
+ &{httpResponse} Get ${apistr}?${body}
+ #Output response body
+ Object response body
+ #Integer $.code 200
+ #Array $.data.policyList
+ ${response} Set Variable ${httpResponse.body}
+ [Return] ${response}
+BaseGetRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BaseGetRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+BaseGetRequestForV2
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Get Request api ${requestUri}?${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseDeleteRequest
+ [Arguments] ${requestUri} ${data}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port} ${headers}
+ ${response}= Delete Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+BaseDeleteRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BaseDeleteRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+
+BaseEditRequest
+ [Arguments] ${requestUri} ${data}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port} ${headers}
+ ${response}= Put Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+BaseEditRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BaseEditRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+BaseEditRequestForV2
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${apiStart} Evaluate '${requestUri}'[0:1]
+ ${requestUri} Run Keyword If "${apiStart}"=="/" set variable /${apiVersion}${requestUri}
+ ... ELSE set variable /${apiVersion}/${requestUri}
+ ${response} BaseEditRequest ${requestUri} ${data}
+ [Return] ${response}
+#拼接字典类型数据为get请求的字符串
+DictionaryToQueryParams
+ [Documentation]
+ ... 接收get参数字典,转换为请求字符串
+ [Arguments] ${params}
+ ${paramsString} = Set Variable ${EMPTY}
+ Run Keyword And Return If "${params}" == "${EMPTY}" Set Variable ${EMPTY}
+ FOR ${key} IN @{params}
+ ${value} = Get From Dictionary ${params} ${key}
+ ${paramStr} = Catenate SEPARATOR=\= ${key} ${value}
+ ${len} = Get Length ${paramsString}
+ ${paramsString} = Run Keyword If ${len} != 0 Catenate SEPARATOR=& ${paramsString} ${paramStr}
+ ... ELSE Set Variable ${paramStr}
+ END
+ Log To Console ${paramsString}
+ [Return] ${paramsString}
+BaseFormRequest
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response} Run Keyword If "${data}"=="${EMPTY}" Get Request api ${requestUri}
+ ... ELSE Get Request api ${requestUri}?${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseFormRequest1
+ [Documentation]
+ ... 下载文件专用
+ ... 由于下载的json存在特殊字符
+ ... 隐藏需要设置response.encoding='utf-8-sig'
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response} Run Keyword If "${data}"=="${EMPTY}" Get Request api ${requestUri}
+ ... ELSE Get Request api ${requestUri}?${data}
+ log return data =${response}
+ ${response.encoding} set variable utf-8-sig
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} json.Loads ${response.content}
+ [Return] ${response}
+BaseMultipartPostRequest
+ [Arguments] ${requestUri} ${data} ${files} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}"}
+ ${dataString} DictionaryToQueryParams ${data}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Post Request api ${requestUri}?${dataString} files=${files}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BasePostRequestReturnBinary
+ [Documentation]
+ ... 下载文件专用
+ ... 返回二进制数据
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ Log Call BasePostRequestReturnBinary
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ Log ${data}
+ ${response} Post Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ [Return] ${response.content}
+BaseGetRequestReturnBinary
+ [Documentation]
+ ... 下载文件专用
+ ... 返回二进制数据
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ Log Call BasePostRequestReturnBinary
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ ${dataString} DictionaryToQueryParams ${data}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ Log ${data}
+ ${response} Get Request api ${requestUri}?${dataString}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ [Return] ${response.content}
+
+BaseFormEditRequest
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Put Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseFormPostRequest
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Post Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+OamRequest
+ [Arguments] ${requestUri} ${path} ${method} ${contentType} ${data}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"${contentType}","path":"${path}"}
+ create session api http://${host}:${port} ${headers}
+ ${response} Run Keyword If "${method}" == "GET" Get Request api ${requestUri}?${data}
+ ... ELSE IF "${method}" == "POST" Post Request api ${requestUri} data=${data}
+ ... ELSE IF "${method}" == "PUT" Put Request api ${requestUri} data=${data}
+ ... ELSE IF "${method}" == "DELETE" Delete Request api ${requestUri} data=${data}
+ ... ELSE Set Variable ${EMPTY}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+ResultGetRequestOK
+ [Arguments] ${apistr} ${body}
+ sleep 35
+ ${response} BaseGetRequest ${apistr} ${body}
+ log ${response['code']}
+ log ${response}[data][list][0][status]
+ Should Be Equal As Strings ${response['code']} 200
+ Should Be Equal As Strings ${response}[data][list][0][status] 2
+
+##################################################################################################
+AppendListToList
+ [Tags]
+ [Documentation] 把${addList}逐个添加到${souceList},然后返回 ${souceList}
+ ... 入参:${souceList} ${insertList}
+ ... 出:参${souceList}
+ [Arguments] ${souceList} ${insertList}
+ Comment 参数${objids}拼接后的list;参数${objectIds}对象ids
+ Comment 如果参数${objectIds}返回多个id则逐个添加
+ FOR ${objdict} IN @{insertList}
+ Append To List ${souceList} ${objdict}
+ END
+ [Return] ${souceList}
+
\ No newline at end of file diff --git a/keyword/common/common_interface.robot b/keyword/common/common_interface.robot new file mode 100644 index 0000000..7029c3a --- /dev/null +++ b/keyword/common/common_interface.robot @@ -0,0 +1,47 @@ +*** Settings ***
+Library Collections
+Library RequestsLibrary
+Resource ../../../variable/common_variable.txt
+#Resource functional_keywords.robot
+Resource ../policys/policy.robot
+Resource ../objects/object.robot
+Resource ../objects/application.robot
+Resource common.robot
+*** Variables ***
+${policyUrl} /policy/compile
+*** Keywords ***
+GetJsonFromModeAndData
+ [Documentation] 根据接口模板和对应测试数据返回请求接口所需数据
+ [Arguments] ${modleFilePath} ${dataFilePath} ${keyword} ${datalistname}
+ #yaml在线格式化:https://www.bejson.com/validators/yaml_editor/
+ ${yamlMode}= Get File ${modleFilePath}
+ ${loadedMode}= yaml.Safe Load ${yamlMode}
+ #${retkeys} evaluate [one for one in ${ip_secuirty_allow_dns_001}]
+ #${dictType} = Evaluate type(${retkeys})
+ ${yamlData}= Get File ${dataFilePath}
+ ${loadedData}= yaml.Safe Load ${yamlData}
+ ${dataJson} Get From Dictionary ${loadedData} ${keyword}_data
+ ${dataKey} Get From Dictionary ${loadedData} keywords
+ ${dataKeyType} = Evaluate type(${dataKey})
+ FOR ${key} IN @{dataKey}
+ LOG passssssssss
+ ${data} Get From Dictionary ${dataJson}[0] ${key}
+ #Continue For Loop If Dictionary does not contain key ${key}
+ Continue For Loop If "${data}" == "Empty"
+ Remove From Dictionary ${loadedMode}[${keyword}_mode][${datalistname}][0] ${key}
+ Set To Dictionary ${loadedMode}[${keyword}_mode][${datalistname}][0] ${key} ${data}
+ END
+ ${modeJson} Get From Dictionary ${loadedMode} ${keyword}_mode
+ ${returnData} Get From Dictionary ${modeJson} returnData
+ ${returnJson} json.Dumps ${modeJson}
+
+ #Set To Dictionary ${LOADED}[patch_id_bw_data][pronghornResponseBody][responseBody][0][value][0] value=200
+ #${addItemList1} Create Dictionary isSession=endpoint ip=${testClentIP} port=0-65535 direction=0 protocol=0 isInitialize=0
+ #${addItemLists} Create list ${addItemList1}
+ #${objectDict} Create Dictionary objectType=ip objectSubType=endpoint isValid=${1} addItemList=${addItemLists}
+ #${rescode} ${objectId} AddObjects ${1} ${objectDict}
+ #${objectids} set Variable ${objectId}
+ [Return] ${returnJson}
+
+
+
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/Common.py b/keyword/common/customlibrary/Custometest/Common.py new file mode 100644 index 0000000..d009a7a --- /dev/null +++ b/keyword/common/customlibrary/Custometest/Common.py @@ -0,0 +1,51 @@ +import json +import random +import hashlib +import os + +#判断一个字符或字符串是否包含于另一个字符串:a是否再b中,是否则返回True,否则返回Falsle +def aisincludeb(a,b): + result = a in b + print(result) + return result + +#删除字符串当前前几个,或后几个:sourcestr源串,a[2:-2] 表示去掉前面两个和后面两个,如果光去掉后面的a[:-2] +def removeBeforOrAfter(sourcestr,a): + #a = "16541616584984" + #a = a[2:-2] + sourcestr = sourcestr[a] + return result + +#分离字符串 +def string2list(str,split): + return str.split(split) + +#用于生成一个指定范围内的整数 +def randomint(a,b): + return random.randint(a,b) + +#较小文件处理方法: +def get_md5_01(file_path): + md5 = None + if os.path.isfile(file_path): + f = open(file_path,'rb') + md5_obj = hashlib.md5() + md5_obj.update(f.read()) + hash_code = md5_obj.hexdigest() + f.close() + md5 = str(hash_code).lower() + return md5 + +#较大文件处理方法: +def get_md5_02(file_path): + f = open(file_path,'rb') + md5_obj = hashlib.md5() + while True: + d = f.read(8096) + if not d: + break + md5_obj.update(d) + hash_code = md5_obj.hexdigest() + f.close() + md5 = str(hash_code).lower() + return md5
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/JsonDiff.py b/keyword/common/customlibrary/Custometest/JsonDiff.py new file mode 100644 index 0000000..ef50112 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/JsonDiff.py @@ -0,0 +1,6 @@ +from json_compare import Jcompare + +def json_diff(a,b): + cp=Jcompare() + results = cp.compare(a,b) + return results
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/LogResponseVAL.py b/keyword/common/customlibrary/Custometest/LogResponseVAL.py new file mode 100644 index 0000000..dbf6474 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/LogResponseVAL.py @@ -0,0 +1,203 @@ +import re +import time + +import jsonpath +# 1.说明:本方法用于对日志接口返回数据中的字段和数据进行判断 +# 2.传入数据说明:responsedict - 接口返回数据的json数据 +# targetlist - 判断参数 ,可传入多个条件,在robotfromwork中条件以Tab分隔 。参数格式为: 字段(key)_判断条件_数据(value) ,一个条件内各参数以空格分隔 例:common_log_id = 238734003578214400 +# 判断条件包括:= 、 != 、> 、< 、>= 、<= 、in 、notin 、like、 notlike 、notEmpty 、 empty 。 +# (1)其中notin、notlike、notEmpty传入时中间无空格 +# (2)notEmpty、empty 不传数据(value) +# (3) in 、notin 多个字段以逗号分隔 例 : common_log_id notin 238734003578214400,238734003578214402,238734003578214403 + +def FieldValidation(responsedict, targetlist): + responselist = responsedict["data"]["list"] + strlist = [] + if responselist: + # 循环返回数据列表 + sum = 1 + for response in responselist: + # 循环目地列表 + for t in targetlist: + # 将目的根据空格分割成列表 (“key值”,“判断条件”,“value值”) + target = t.split(" ") + #获取json串中所有的key,返回列表 + responsekeys = getKeys(response) + # 判断目的条件的Key在数据中是否存在 + if target[0] in responsekeys: + if len(target) != 1: + #targetkey 判断的字段 + targetkey = target[0] + # 判断条件 + conditions = target[1] + # 返回数据中对应key的Value列表 + responsevaluelist = getjsonvalue(response,target[0]) + for responsevalue in responsevaluelist: + #判断value值是否为列表,转化为字符串 + if isinstance(responsevalue, list): + responsevalue=str(responsevalue) + if len(target) == 3: + targetvalue = target[2] + torf=is_valid_date(responsevalue) + if torf == True: + timeArray = time.strptime(responsevalue, "%Y-%m-%d %H:%M:%S") + timeStamp = str(int(time.mktime(timeArray))) + p = conditional(conditions, timeStamp, targetkey, sum, targetvalue) + if p != "": + strlist.append(p) + else: + p = conditional(conditions, responsevalue, targetkey, sum, targetvalue) + if p != "": + strlist.append(p) + elif len(target) == 2: + p = conditional(conditions, responsevalue, targetkey, sum) + if p != "": + strlist.append(p) + else: + str2 = "返回数据第" + str(sum) + "组数据中不存在该字段:" + target[0] + print(str2) + strlist.append(str2) + sum += 1 + else: + str3 = "返回数据中无数据" + strlist.append(str3) + Assertresults(strlist) + return strlist + +def getjsonvalue(json_data, key_name): + '''获取到json中任意key的值,结果为list格式''' + keyvalue = jsonpath.jsonpath(json_data, '$..{key_name}'.format(key_name=key_name)) + # key的值不为空字符串或者为empty(用例中空固定写为empty)返回对应值,否则返回empty + return keyvalue + +def getKeys(data): + # 获取json串中所有的key + keysAll_list = [] + def getkeys(data): # 遍历json所有key + if (type(data) == type({})): + keys = data.keys() + for key in keys: + value = data.get(key) + if (type(value) != type({}) and type(value) != type([])): + keysAll_list.append(key) + elif (type(value) == type({})): + keysAll_list.append(key) + getkeys(value) + elif (type(value) == type([])): + keysAll_list.append(key) + for para in value: + if (type(para) == type({}) or type(para) == type([])): + getkeys(para) + else: + keysAll_list.append(para) + getkeys(data) + return keysAll_list + +# 对传入的数据根据条件进行判断 +def conditional(conditions, value2, targetkey, sum, value=None): + str1 = "" + if conditions == "=": + if value != value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符" + + if conditions == "!=": + if value == value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + print(str1) + + if conditions == ">": + if int(value2) <= int(value): + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "<": + if int(value2) >= int(value): + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == ">=": + if int(value2) < int(value): + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "<=": + if int(value2) > int(value): + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "in": + value = value.split(",") + if value2 not in value: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "notin": + value = value.split(",") + if value2 in value: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "like": + left = value[0] + right = value[-1] + if left == "%" and right == "%": + value = value[1:len(value) - 1] + if value not in value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + elif left == "%" and right != "%": + v = len(value) + _value = value[1:] + _value2 = value2[-(v - 1):] + print(_value, _value2) + if _value != _value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + elif left != "%" and right == "%": + v = len(value) + _value = value[0:-1] + _value2 = value2[0:v - 1] + if _value != _value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "notlike": + left = value[0] + right = value[-1] + if left == "%" and right == "%": + value = value[1:len(value) - 1] + if value in value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + elif left == "%" and right != "%": + v = len(value) + _value = value[1:] + _value2 = value2[-(v - 1):] + if _value == _value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + elif left != "%" and right == "%": + v = len(value) + _value = value[0:-1] + _value2 = value2[0:v - 1] + if _value == _value2: + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "notEmpty": + if value2 == "": + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + + if conditions == "Empty": + if value2 != "": + str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。" + return str1 + +def Assertresults(resultslist): + print(resultslist) + for i in resultslist: + if i != "": + assert 1 == 2 + + +def is_valid_date(strdate): + '''判断是否是一个有效的日期字符串''' + a = re.findall(":", strdate) + b = re.findall("-", strdate) + if len(a) ==2 and len(b) == 2: + return True + else: + return False + diff --git a/keyword/common/customlibrary/Custometest/LogSchema.py b/keyword/common/customlibrary/Custometest/LogSchema.py new file mode 100644 index 0000000..d190ff1 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/LogSchema.py @@ -0,0 +1,513 @@ +# !/user/bin/python +# -*-coding:utf-8-*- +import requests +import random +import json +import LogResponseVAL +import time, datetime +# import allure + + + + +# 请求schema接口得到返回数据,用于其他接口 +def schema(schemauerl, token): + url = schemauerl + headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token} + response = requests.get(url=url, headers=headers) + return response.json() + + +# 根据schema接口返回数据,得出所有属性所支持的比较类型的列表 +# 1、根据[doc][allow_query]值为true列支持搜索; +# 2、如有[doc][constraints][operator_functions]值,操作优先; +# 3、如有[doc][data]值则对应属性取值为data所列code值; +# 4、int和long的范围不一致; +# 5、string要包含特殊字符 +# 6、给查询条件赋值,要给出边界和正常值 +# 7、IP(V4、V6)和URL要给出专门的方法生成 + +import ipaddress + +# 生成随机ipv4或ipv6 +MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 +MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 + + +def random_ipv4(): + return ipaddress.IPv4Address._string_from_ip_int( + random.randint(0, MAX_IPV4) + ) + + +def random_ipv6(): + return ipaddress.IPv6Address._string_from_ip_int( + random.randint(0, MAX_IPV6) + ) + + +from random import Random + + +# 生成 12 位随机 URL 地址 +def randrom_url(): + str = '' + str1 = '' + chars = 'abcdefghijklmnopqrstuvwxyz0123456789' + chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()' + length = len(chars) + length1 = len(chars1) + random = Random() + for x in range(random.randint(8, 16)): + str += chars[random.randint(0, length - 1)] + for pp in range(random.randint(8, 16)): + str1 += chars1[random.randint(0, length1 - 1)] + url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1 + print(url) + return url + + +def Filter1(schemauerl, token): + list = [] + json_str = schema(schemauerl, token) + print("schemauerl",json_str) + print(type(json_str)) + # 获取日志属性定义 + fields = json_str["data"]["fields"] + print("1111111111",fields) + # 获取不同属性支持的部不同操作 + operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"] + for i in fields: + number = random.randint(0, 2147483647) + maxnumber = 2147483647 + minnumber = -2147483648 + str = random.choice('abcdefghijklmnopqrstuvwxyz!@#%^&*') + name = i["name"] + doc = i["doc"] + # 获取无任何特殊说明列: + if doc == None: + type1 = i["type"] + for j in operator: + if type1 == j["type"]: + if type1 == "int" or type1 == "long": + value1 = number + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + f"{value1}" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + f"{value1}" + list.append(str1) + elif type1 == "string": + value1 = str + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "notEmpty" or v == "empty": + str1 = v + "(" + " '" + name + " '" + ")" + list.append(str1) + elif v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + " '" + value1 + " '" + list.append(str1) + else: + if i["doc"]["constraints"] == None: + type1 = i["type"] + for j in operator: + if type1 == j["type"]: + if type1 == "int" or type1 == "long": + value1 = number + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + f"{value1}" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + f"{value1}" + list.append(str1) + elif type1 == "string": + value1 = str + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "notEmpty" or v == "empty": + str1 = v + "(" + " '" + name + " '" + ")" + list.append(str1) + elif v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + " '" + value1 + " '" + list.append(str1) + + else: + if i["doc"]["constraints"]["operator_functions"] == None: + type1 = i["type"] + for j in operator: + if type1 == j["type"]: + if type1 == "int" or type1 == "long": + value1 = number + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + f"{value1}" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + f"{value1}" + list.append(str1) + elif type1 == "string": + value1 = str + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "notEmpty" or v == "empty": + str1 = v + "(" + " '" + name + " '" + ")" + list.append(str1) + elif v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + " '" + value1 + " '" + list.append(str1) + else: + type1 = i["type"] + operator1 = i["doc"]["constraints"]["operator_functions"] + operator2 = operator1.split(",") + data = i["doc"]["data"] + for d in data: + code = d["code"] + if type1 == "int" or type1 == "long": + for o in operator2: + str1 = name + " " + o + " " + code + list.append(str1) + else: + for o in operator2: + str1 = name + " " + o + " " + " '" + code + " '" + list.append(str1) + + + print("22222222222",list) + return list + + +# 根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口 +def logapiverify(schemauerl,logurl, token, starttime, endtime,logtype): + filter2 = Filter1(schemauerl, token) + a = schema(schemauerl, token) + fields = a["data"]["fields"] + print("333333333333",filter2) + for i in filter2: + print("条件:", i) + url = logurl # "http://192.168.44.72:8080/v1/log/list" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": i + } + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + assert code == 200 + print(response1.json()["code"]) + return response1.json() + # print("111111111111111111111111111111111111111111111111111111111111111111111111111111111111111") + # print(str2) + # str3 = str2[0:-4] + # print(str3) + # url = logurl # "http://192.168.44.72:8080/v1/log/list" + # headers = {"Content-Type": "application/json", + # "Authorization": token} + # data = { + # "start_common_recv_time": starttime, + # "end_common_recv_time": endtime, + # "logType": logtype, + # "fields": fields, + # "filter": str3 + # } + # print(data) + # print(json.dumps(data)) + # response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + # code = response1.json()["code"] + # print(response1.json()) + # assert code == 200 + # print(response1.json()["code"]) + + +# 精确filter,请求日志接口 +def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue): + a = schema(schemauerl, token) + fields = a["data"]["fields"] + url = logurl # "http://192.168.44.72:8080/v1/log/list" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": filtervalue + } + # print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + assert code == 200 + print(response1.json()["code"]) + return response1.json() + +#目的性验证,循坏返回列表中所有字段进行查询 +def loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, datajson): + nullkey = [] + data = datajson + keylist = LogResponseVAL.getKeys(data) + a = schema(schemaurl, token) + fields = a["data"]["fields"] + for i in keylist: + conditions = data[i] + for field in fields: + name = field["name"] + if field["doc"] == None or field["doc"]["visibility"] == None: + if i == name: + if conditions != None and conditions != "": + if field["type"] == "string": + if conditions[0] == "'" and conditions[-1] == "'": + filtervalue = i + " = " + conditions + VasserValue=i + " = " + conditions[1:-1] + + else: + filtervalue = i + " = " + "'" + conditions + "'" + VasserValue= i + " = " + conditions + else: + if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time": + timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S") + timeStamp = str(int(time.mktime(timeArray))) + filtervalue = i + " = " + timeStamp + VasserValue = filtervalue + + else: + filtervalue = i + " = " + str(conditions) + VasserValue = filtervalue + print("filtervalue",filtervalue) + #根据提取条件进行查询日志列表 + responsebody = loglistverify(logurl, schemaurl, token, starttime, endtime, logtype, + filtervalue) + filterlist=[VasserValue] + print(VasserValue) + LogResponseVAL.FieldValidation(responsebody,filterlist) + + else: + nullkey.append(i) #所有为None或者“”的字段 + return nullkey + + # 多条循环 变量设置为公共参数 若循环内一个字段没有值 进行下次循坏 +def logAllFieldsListInterface(logurl, schemaurl, token, starttime, endtime, logtype, datajson,lognumber,logcycles): + datalist = datajson["data"]["list"] + keylist=[] + number=0 + print(lognumber) + print(type(lognumber)) + print(logcycles) + print(type(logcycles)) + for i in range(0, len(datalist), int(lognumber)):# 循环取出count个列表元素 + number+=1 + nullkeylist=[] + ret=datalist[i:i + int(lognumber)] + for data in ret: + nullkey=loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, data) + nullkeylist.append(nullkey) + print(nullkeylist) + for j in nullkeylist: + #对返回的为空的key进行取交集 + if len(keylist) == 0: + keylist=j + else: + #取两个列表的交集 + keylist=list(set(keylist).intersection(set(j))) + if len(keylist) == 0 or number >= int(logcycles): + break + print("最终数据中没有值的字段为:",keylist) + + +# 事件日志和通联日志时间分布查询 ,日志检索条件校验(filter内容验证) +def distributed_query(logurl, token): + url = logurl # url示例:http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M + headers = {"Content-Type": "application/json", "Authorization": token} + response = requests.get(url=url, headers=headers) + code = response.json()["code"] + print(response.json()) + assert code == 200 + print(response.json()["code"]) + return response.json() + +#日志检索条件校验 纯接口 +def LogRetrieve(schemaurl,host,port,token,logType,datajson): + number = random.randint(0, 2147483647) + str1 = random.choice('abcdefghijklmnopqrstuvwxyz') + data=datajson["data"]["list"][0] + keylist = LogResponseVAL.getKeys(data) + a = schema(schemaurl, token) + fields=a["data"]["fields"] + for i in keylist: + conditions = data[i] + for field in fields: + name = field["name"] + if i == name: + if field["type"] == "string": + filter = "logType=" + logType + "&" + "filter=" + i + "=" + "'" + str1 + "'" + else: + if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time": + timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S") + timeStamp = str(int(time.mktime(timeArray))) + filter = "logType=" + logType + "&" + "filter=" + i + "=" + timeStamp + else: + filter = "logType=" + logType + "&" + "filter=" + i + "=" + str(number) + Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + filter + print(Logurl) + responsebody = distributed_query(Logurl, token) + +# 日志检索条件校验 复杂sql +def LogRetrieveSql(schemaurl,host,port,token,logType,datajson): + data = datajson["data"]["list"][0] + keylist = LogResponseVAL.getKeys(data) + sqllist=random.sample(keylist, 4) + number = 45585 + str1 = random.choice('abcdefghijklmnopqrstuvwxyz') + print(sqllist) + a = schema(schemaurl, token) + filterlist=[] + fields=a["data"]["fields"] + for i in sqllist: + conditions = data[i] + for field in fields: + name = field["name"] + if i == name: + if field["type"] == "string": + if conditions == "" or conditions == None: + conditions=str1 + filter = i + "=" + "'" + conditions + "'" + else: + if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time": + timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S") + timeStamp = str(int(time.mktime(timeArray))) + filter =i + "=" + timeStamp + else: + if conditions == "" or conditions == None: + conditions = number + filter = i + "=" + str(conditions) + print(filter) + filterlist.append(filter) + sqlfilter = "(("+filterlist[0]+" OR "+filterlist[1]+") AND "+filterlist[2]+") OR "+filterlist[3] + _filter = "logType=" + logType + "&" + "filter=" + sqlfilter + Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + _filter + print(Logurl) + responsebody = distributed_query(Logurl, token) + print(sqlfilter) + + # 原始日志检索时间分布计算 +def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue): + url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "startTime": starttime, + "endTime": endtime, + "logType": logtype, + "granularity": granularity, + "filter": filtervalue + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + print(response1.json()) + print(response1.json()["code"]) + assert code == 200 + return response1.json() + +# 日志总数查询 +def countlog_query(logurl, token, starttime, endtime, logtype): + url = logurl + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "pageSize": 20, + "logType": logtype, + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "filter": "" + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + print(response1.json()) + print(response1.json()["code"]) + assert code == 200 + return response1.json() + +# 日志导出接口 +def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue): + a = schema(schemauerl, token) + fields = a["data"]["fields"] + print(fields) + url = logurl + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": filtervalue + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + a=type(response1) + if a != "class 'requests.models.Response'": + assert 1 == 1 + else: + assert 1 == 2 + +#判断日志内详情字段 +def LogFieldValidation(schemauerl,token,datajson): + Schemajson = schema(schemauerl, token) + fields=Schemajson["data"]["fields"] + keylist= LogResponseVAL.getKeys(datajson["data"]["list"][0]) + schema_typedict=Schemajson["data"]["doc"]["schema_type"] + schema_typelistkey=schema_typedict.keys() + for schema_typekey in schema_typelistkey: #取出schema_type内的每一个key + for i in schema_typedict[schema_typekey]["columns"]: + for filter in fields: + if filter["name"] == i: + if filter["doc"] == None: + if i not in keylist: + print("该字段未存在日志详情内",i) + assert 1==2 + else: + print("该字段通过在日志详情内",i) + else: + if filter["doc"]["visibility"] != "disabled": + if i not in keylist: + print("该字段未存在日志详情内",i) + assert 1==2 + else: + print("该字段通过在日志详情内",i) + + + + + + + +# if __name__ == '__main__': +# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/MD5.py b/keyword/common/customlibrary/Custometest/MD5.py new file mode 100644 index 0000000..697ae83 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/MD5.py @@ -0,0 +1,40 @@ +# 由于MD5模块在python3中被移除 +# 在python3中使用hashlib模块进行md5操作 + +import hashlib + +class MD5: + def MD5(data,langer,md5_types): + # 创建md5对象 + # m = hashlib.md5() + # Tips + # 此处必须encode + # 若写法为m.update(str) 报错为: Unicode-objects must be encoded before hashing + # 因为python3里默认的str是unicode + # 或者 b = bytes(str, encoding='utf-8'),作用相同,都是encode为bytes + # b = str.encode(encoding='utf-8') + # m.update(b) + # str_md5 = m.hexdigest() + if langer == "英文": + # str_md5 = hashlib.md5(b'this is a md5 test.').hexdigest() + str_md5 = hashlib.md5("b'"+data+"'").hexdigest() + print('MD5加密前为 :' + data) + print('MD5加密后为 :' + str_md5) + return str_md5 + elif langer == "中文": + str_md5 = hashlib.md5('你好'.encode(encoding=md5_types)).hexdigest() + return str_md5 + # utf8 和gbk 加密结构不一样 + # hashlib.md5('你好'.encode(encoding='GBK')).hexdigest() + # hashlib.md5('你好'.encode(encoding='GB2312')).hexdigest() + # hashlib.md5('你好'.encode(encoding='GB18030')).hexdigest() +if __name__ == '__main__': + data = '小猪' + langer = '中文' + md5_types = 'GBK' + a =MD5(data,langer,md5_types) + print(a) + b=r'C:\Users\小猪\AppData\Local\Programs\Python\Python37\Lib\site-packages\custometest\MD5.py' + with open(b, encoding='utf-8') as f: + text = f.read() + print(text)
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/ReportSchema.py b/keyword/common/customlibrary/Custometest/ReportSchema.py new file mode 100644 index 0000000..ac2d947 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/ReportSchema.py @@ -0,0 +1,718 @@ +import requests +import random +import json +import time +import ipaddress +from builtins import list + +# Report纯接口测试正向用例方法,不验证数据统计准确性,单纯验证接口 + +#生成随机ipv4或ipv6 +MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 +MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 +def random_ipv4(): + return ipaddress.IPv4Address._string_from_ip_int( + random.randint(0, MAX_IPV4)) +def random_ipv6(): + return ipaddress.IPv6Address._string_from_ip_int( + random.randint(0, MAX_IPV6)) + +#随机生成邮箱地址 +def RandomEmail( emailType=None, rang=None): + __emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"] + # 如果没有指定邮箱类型,默认在 __emailtype中随机一个 + if emailType == None: + __randomEmail = random.choice(__emailtype) + else: + __randomEmail = emailType + # 如果没有指定邮箱长度,默认在4-10之间随机 + if rang == None: + __rang = random.randint(4, 10) + else: + __rang = int(rang) + __Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ" + __randomNumber = "".join(random.choice(__Number) for i in range(__rang)) + _email = __randomNumber + __randomEmail + return _email + +#获取Schema +def schema(schemauerl,token,logtype): + url ="http://192.168.44.72:8080/v1/log/schema?logType="+logtype + headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token} + response = requests.get(url=url,headers=headers) + return response.json() + +#获取json串中groupColumnList的值 +def groupby(schemajson,logtype,testpoint,field): + dimensions=schemajson["data"]["doc"]["schema_query"]["dimensions"] + dimensions.append("common_recv_time"); + randomstr_1=[] + if logtype == "security_event_log" or logtype == "connection_record_log" or logtype == "voip_record_log": + dimensions.remove("common_start_time") + dimensions.remove("common_end_time") + if testpoint == "DataBindings": + randomstr_1.append("common_recv_time") + elif testpoint == "GroupBy": + randomstr_1.append(field) + else: + randomstr_1=random.sample(dimensions, 4) + + #定义grp为返回值group的列表 + grp=[] + for i in randomstr_1: + a={"name":i} + grp.append(a) + + re=[grp,randomstr_1] + print("groupby",re) + return re + +#获取json串中queryColumnList的值 +def DataBindings(schemajson,randomstr_1,testpoint,field): + #生成queryColumnList列表 + metrics=schemajson["data"]["doc"]["schema_query"]["metrics"] + metrics.append("common_log_id") + #在列表里随机元素 + randomstr_2=[] + if testpoint == "DataBindings": + randomstr_2.append(field) + else: + randomstr_2=random.sample(metrics,6) + #在聚合列表中去掉groupby中的重复的元素 + randomstr_3=array_diff(randomstr_2,randomstr_1) + #将groupby中元素添加到串中 + qul=[] + for i in randomstr_1: + a={"name":i} + qul.append(a) + + fields = schemajson["data"]["fields"] + list_1=["sum","min","max","avg","count"] + list_2=["count","count_distinct"] + if testpoint == "DataBindings": + for i in randomstr_3: + for j in fields: + if i == j["name"] : + jtype=j["type"] + label=i + sun=1 + if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double": + for Aggregate in list_1: + randomstr_4={"name":i,"expression":Aggregate,"label":label} + qul.append(randomstr_4) + label=label+str(sun) + sun+=1 + elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string": + for Aggregate in list_2: + randomstr_4={"name":i,"expression":Aggregate,"label":label} + qul.append(randomstr_4) + label = label + str(sun) + sun += 1 + + else: + for i in randomstr_3: + for j in fields: + if i == j["name"]: + jtype = j["type"] + if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double": + radomlist = random.sample(list_1, 1) + randomstr_4 = {"name": i, "expression": radomlist[0]} + qul.append(randomstr_4) + elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string": + randomlist = random.sample(list_2, 1) + randomstr_4 = {"name": i, "expression": randomlist[0]} + qul.append(randomstr_4) + print("DataBindings",qul) + return qul + +# #去除a列表中存在的b的元素 +def array_diff(a, b): + #定义空列表 + c=[] + #range(len(a))取的为列表a的索引,根据a的 + for i in range(len(a)): + #取出索引对应的值 + t=a[i] + #判断值是否存在在序列b中 + if t not in b: + #如果序列不在b中,则写入序列c + c.append(t) + #返回序列c,c就是列表a去除列表b之后的元素 + return c + +def filterCondition(schemajson,testpoint,field): + number = random.randint(0,100000) + randomstr= random.choice('abcdefghijklmnopqrstuvwxyz') + schemafilters=schemajson["data"]["doc"]["schema_query"]["filters"] + list1=[] + if testpoint=="Filter": + list1.append(field) + else: + list1=random.sample(schemafilters, 4) + #获取不同属性支持的部不同操作 + fields = schemajson["data"]["fields"] + operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"] + andConditions=[] + for i in list1: + #遍历fields列表 + for k in fields: + #当filters列表值等于fields的name时 + if i == k["name"]: + name = k["name"] + doc = k["doc"] + #获取无任何特殊说明列: + if doc == None: + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list=[] + Operator=["=","!=",">","<",">=","<="] + if testpoint=="Filter": + for op in Operator: + value=[str(number)] + Field={"name":name,"expression":op,"value":value,"type":type1} + orConditions_list.append(Field) + else: + randomOperator= random.sample(Operator, 1) + value=[str(number)] + Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list=[] + Operator=["=","!=","Like","Not Like","notEmpty","empty"] + if testpoint=="Filter": + for op in Operator: + value=[] + if op == "=" or op == "!=": + value.append(str(number)) + elif op == "Like" or op == "Not Like": + value.append(randomstr) + elif op=="notEmpty" or op == "empty": + value=[] + Field={"name":name,"expression":op,"value":value,"type":type1} + orConditions_list.append(Field) + else: + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + + else: + if k["doc"]["constraints"]== None: + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list=[] + Operator=["=","!=",">","<",">=","<="] + if testpoint == "Filter": + for op in Operator: + value=[str(number)] + Field={"name":name,"expression":op,"value":value,"type":type1} + orConditions_list.append(Field) + else: + randomOperator= random.sample(Operator, 1) + value=[str(number)] + Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list=[] + Operator=["=","!=","Like","Not Like","notEmpty","empty"] + if testpoint == "Filter": + for op in Operator: + randomOperator = op + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + + else: + if k["doc"]["constraints"]["operator_functions"]==None: + conrandomstraints=k["doc"]["constraints"] + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list = [] + Operator=["=","!=",">","<",">=","<="] + if testpoint == "Filter": + for op in Operator: + randomOperator = op + if conrandomstraints["type"] == "timestamp": + # 获取当前时间戳 + t = int(time.time()) + value = [str(t)] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + if conrandomstraints["type"] == "timestamp": + #获取当前时间戳 + t = int(time.time()) + value=[str(t)] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list = [] + Operator=["=","!=","Like","Not Like","notEmpty","empty"] + if testpoint == "Filter": + if conrandomstraints["type"] == "ip": + for op in Operator: + # 获取ip + ip = random_ipv4() + value = [] + if op == "=" or op == "!=": + value.append(ip) + elif op == "Like" or op == "Not Like": + value.append(ip) + elif op == "notEmpty": + value = [] + Field = {"name": name, "expression": op, "value": value,"type": type1} + orConditions_list.append(Field) + elif conrandomstraints["type"] == "email": + for op in Operator: + randomOperator = op + Operator = ["=", "!=", "Like", "Not Like", "notEmpty","empty"] + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + # 获取ip + emil = RandomEmail() + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(emil) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(emil) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + if conrandomstraints["type"] == "ip": + #获取ip + ip =random_ipv4() + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(ip) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(ip) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + elif conrandomstraints["type"] == "email": + Operator=["=","!=","Like","Not Like","notEmpty","empty"] + randomOperator_1= random.sample(Operator, 1) + randomOperator=randomOperator_1[0] + #获取ip + emil =RandomEmail() + value=[] + if randomOperator == "=" or randomOperator == "!=": + value.append(emil) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(emil) + elif randomOperator=="notEmpty": + value=[] + Field={"name":name,"expression":randomOperator,"value":value,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + else: + type1 = k["type"] + orConditions_list=[] + operator1 = k["doc"]["constraints"]["operator_functions"] + operator2 = operator1.split(",") + if testpoint == "Filter": + for op in operator2: + operatordata = k["doc"]["data"] + code = [] + for i in operatordata: + code_1 = i["code"] + code.append(code_1) + for co in code: + Field = {"name": name, "expression": op, "value": co, "type": type1} + orConditions_list.append(Field) + else: + operator3=random.sample(operator2,1) + operatordata=k["doc"]["data"] + code=[] + for i in operatordata: + code_1=i["code"] + code.append(code_1) + code2=random.sample(code, 1) + Field={"name":name,"expression":operator3[0],"value":code2,"type":type1} + orConditions_list.append(Field) + orConditions={"orConditions":orConditions_list} + andConditions.append(orConditions) + filterCondition={"andConditions":andConditions} + print("filterCondition",filterCondition) + return filterCondition + +#获取having条件的串 +def havingjson(schemajson,testpoint,field): + number = random.randint(0,100000) + schemametrics=schemajson["data"]["doc"]["schema_query"]["metrics"] + aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"] + schemametrics.append("common_log_id") + metricslist=[] + if testpoint == "Having": + metricslist.append(field) + else: + metricslist=random.sample(schemametrics, 4) + fields = schemajson["data"]["fields"] + operator=["=","!=",">","<",">=","<="] + Aggregate=["COUNT","AVG","SUM","MAX","MIN"] + andConditions_list=[] + #遍历的到的having条件列表 + for i in metricslist: + for j in fields: + if i == j["name"]: + name = j["name"] + type1=j["type"] + for v in aggregation: + if type1 == v["type"]: + orConditions_list=[] + if v["type"] != "string": + functionslist=Aggregate + else: + functionsstr=v["functions"] + functionslist = functionsstr.split(",") + if field == "common_log_id": + functionslist=["COUNT"] + if testpoint == "Having": + for functions_1 in functionslist: + for operator_1 in operator: + havingdict = {"name": name, "function": str.lower(functions_1), + "expression": operator_1, "value": str(number)} + orConditions_list.append(havingdict) + orConditions = {"orConditions": orConditions_list} + andConditions_list.append(orConditions) + else: + functions_1=random.sample(functionslist, 1) + if functions_1=="COUNT_DISTINCT" and type1 != "string": + functions_1=random.sample(functionslist, 1) + operator_1=random.sample(operator, 1) + + havingdict={"name":name,"function":str.lower(functions_1[0]),"expression":operator_1[0],"value":str(number)} + orConditions_list.append(havingdict) + orConditions={"orConditions":orConditions_list} + andConditions_list.append(orConditions) + havingCondition={"andConditions":andConditions_list} + print("having",havingCondition) + return havingCondition + +#拼接字符串 +def datasetjson(schemauerl,token,testname,logtype,testpoint,field): + schema_new=schema(schemauerl,token,logtype) + group_re=groupby(schema_new,logtype,testpoint,field) + groupColumnList=group_re[0] + group_randomstr=group_re[1] + queryColumnList=DataBindings(schema_new,group_randomstr,testpoint,field) + filterCondition_1=filterCondition(schema_new,testpoint,field) + havingjson_1=havingjson(schema_new,testpoint,field) + datasetdict = { + "list": { + "name":testname, + "logType": logtype, + "groupColumnList":groupColumnList, + "queryColumnList":queryColumnList, + "filterCondition":filterCondition_1, + "havingCondition":havingjson_1 + } + } + print(datasetdict) + print("datasetjson",json.dumps(datasetdict)) + return json.dumps(datasetdict) + +#拼接char的json串 +def charjson(schemaurl,token,queryColumnList,groupColumnList,datasetid,testname,logtype): + print("queryColumnList",queryColumnList) + schema_new=schema(schemaurl,token,logtype) + fields = schema_new["data"]["fields"] + # 获取条件的label + namelist=[] + for i in queryColumnList: + for j in fields: + if i["name"] == j["name"]: + j_label=j["label"] + namelist.append(j_label) + print("namelist",namelist) + #获取聚合条件的label + groupColumnlaberList=[] + for i in groupColumnList: + for j in fields: + if i["name"] == j["name"]: + j_label=j["label"] + groupColumnlaberList.append(j_label) + print("groupColumnlaberList",groupColumnlaberList) + #图表类型列表 + chartType_1=["line","pie","bar","area","table"] + chartType_2=["pie","bar","table"] + chartType=[] +# #随机选择图表类型 + s=1 + for i in namelist: + if i == "Receive Time" or i == "Start Time" or i == "End Time": + s+=1 + if s != 1: + chartType=random.sample(chartType_1, 1) + else: + chartType=random.sample(chartType_2, 1) + chardict={} + print("chartType",chartType) + if chartType[0] == "line" or chartType[0] == "area": + dataBinding=[] + #将时间条件赋值给dataBinding + for j in namelist: + if j == "Receive Time" or j == "Start Time" or j == "End Time": + dataBinding.append(j) + timelin={ + "dataBinding": dataBinding[0], + "format": "Time" + } + print("timelin",timelin) + namelist.remove(dataBinding[0]) #从统计查询数据列对象内去掉时间条件 + groupColumnlaberList.remove(dataBinding[0]) #从聚合条件内去掉时间的条件 + for i in groupColumnlaberList: #从统计查询条件内去掉聚合条件内的值 + namelist.remove(i) + print("namelistrome",namelist) + linlist=[] + for i in namelist: + lindict={ + "dataBinding": i, + "type": "Line Up", + "format": "Default", + } + linlist.append(lindict) + listdict={ + "name": testname, + "datasetId": datasetid, + "datasetName": "", + "chartType": chartType[0], + "dataTop": 0, + "orderBy": "", + "orderDesc": 0, + "drilldownTop": 0, + "timeline": timelin, + "line": linlist + } + chardict={"list": listdict} + elif chartType[0] == "pie" or chartType[0] == "bar": + xAxisdataBinding=random.sample(groupColumnlaberList, 1) + xAxisdict={ + "dataBinding": xAxisdataBinding[0], + "dataTop": 5, + "dataType": "" + } + for i in groupColumnlaberList: + namelist.remove(i) + yAxisBinding=random.sample(namelist, 1) + yAxisdict={ + "dataBinding": yAxisBinding[0], + "format": "Default", + } + yAxislist=[yAxisdict] + listdict={ + "name": testname, + "datasetId": datasetid, + "datasetName": "", + "chartType": chartType[0], + "dataTop": 0, + "orderBy": "", + "orderDesc": "", + "xAxis": xAxisdict, + "yAxis": yAxislist + } + chardict={"list": listdict} + elif chartType[0] == "table": + columnslist=[] + for i in namelist: + dataBindings={ + "dataType": "", + "dataBinding": i, + "format": "Default", + } + dataBindingslist=[] + dataBindingslist.append(dataBindings) + columnsdict={ + "title": i, + "width": 0, + "dataBindings": dataBindingslist + } + columnslist.append(columnsdict) + + listdict={ + "name": testname, + "datasetId": datasetid, + "datasetName": "", + "chartType": "table", + "dataTop": 5, + "orderBy": "", + "orderDesc": "", + "drilldownTop": 5, + "tableType": "Regular", + "columns": columnslist + } + chardict={"list": listdict} + print("charjson",json.dumps(chardict)) + return json.dumps(chardict) + +def Reportsjson(chartId,testname): + charlist=[] + chardict={ + "chartId": chartId, + "timeGranulartiy": 1, + "timeUnit": "", +# "disabled": true + } + charlist.append(chardict) + reportJobList=[] + reportJobdct_1={ + "rangeType": "last", + "rangeInterval": 1, + "rangeUnit": "week", + "jobName": testname, + "scheduleId": "", + "chartList": charlist, + "isNotice": 0, + "noticeMethod": "", + "startTime": "", + "endTime": "", + "filterCondition": None, + "isDisplayTrafficTrend": 1 + } + reportJobdct_2={"reportJobList": reportJobdct_1} + print("reportjson",json.dumps(reportJobdct_2)) + return json.dumps(reportJobdct_2) + +def ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype,testpoint,field): + headers = {"Content-Type": "application/json","Authorization": token} + #dataset生成json串并发送请求 + _datasetjson=datasetjson(schemaurl,token,testname,logtype,testpoint,field) + response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers) + print("返回数据1",response1) + code = response1.json()["code"] + print("datasetcode:",code) + assert code == 200 + + +# 获取dataset的id + datasetget=requests.get(url=datasetgeturl,headers=headers) + dasetget=datasetget.json() + datesetid=dasetget["data"]["list"][0]["id"] + Deleteinterfaces(dataseturl,token,datesetid) + # _datasetjson=json.loads(_datasetjson) + # queryColumnList=_datasetjson["list"]["queryColumnList"] + # groupColumnList=_datasetjson["list"]["groupColumnList"] + #生成charlibrariesjson串 + # charlibrariesjson=charjson(schemaurl, token,queryColumnList,groupColumnList,datesetid,testname,logtype) + # response2 = requests.post(url=charurl, data=charlibrariesjson, headers=headers) + # code = response2.json()["code"] + # assert code == 200 +# +# #获取char libraries的id +# charget=requests.get(url=chargeturl,headers=headers) +# charget=charget.json() +# charid=charget["data"]["list"][0]["id"] +# +# #report生成json串并发送请求 +# reportjson=Reportsjson(charid,testname) +# response3 = requests.post(url=repporturl, data=reportjson, headers=headers) +# code = response3.json()["code"] +# assert code == 200 +# + +# #循环调用ReportInterfaceTest方法 +# def ReportTest(host,token,dataseturl,charurl,repporturl,logtypelist): +# for logtype in logtypelist: +# testname="Report"+logtype +# datasetgeturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser=" +# chargeturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser=" +# schemaurl="http://"+host+":8080/v1/log/schema?logType="+logtype +# ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype) + + +def Deleteinterfaces(url,token,id): + headers = {"Content-Type": "application/json","Authorization": token} + datedict={"ids":[id]} + datajson=json.dumps(datedict) + response1 = requests.delete(url=url, data=datajson, headers=headers) + + +def ReportPositiveTest(host,port,token,dataseturl,charurl,repporturl,logtypelist): + testpoint=["GroupBy","DataBindings","Filter","Having"] + for logtype in logtypelist: + schemaurl="http://"+host+":"+port+"/v1/log/schema?logType="+logtype + schema_new=schema(schemaurl,token,logtype) + metrics = schema_new["data"]["doc"]["schema_query"]["metrics"] + schemafilters = schema_new["data"]["doc"]["schema_query"]["filters"] + dimensions = schema_new["data"]["doc"]["schema_query"]["dimensions"] + dimensions.append("common_recv_time"); + metrics.append("common_log_id") + for j in testpoint: + if j == "GroupBy": + for filter in dimensions: + testname="Report"+logtype+j+filter + dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser=" + ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter) + if j == "DataBindings": + for filter in metrics: + testname="Report"+logtype+j+filter + dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser=" + ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter) + + if j == "Filter" : + for filter in schemafilters: + testname="Report"+logtype+j+filter + dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser=" + ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter) + + if j == "Having" : + for filter in metrics: + testname="Report"+logtype+j+filter + dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser=" + ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter) + + + + + diff --git a/keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py b/keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py new file mode 100644 index 0000000..dc466a9 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py @@ -0,0 +1,871 @@ +import requests +import random +import json +import time +import ipaddress + +# Report纯接口测试 反向用例方法,不验证数据统计准确性,单纯验证接口 + +# 生成随机ipv4或ipv6 +MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 +MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 + + +def random_ipv4(): + return ipaddress.IPv4Address._string_from_ip_int( + random.randint(0, MAX_IPV4)) + + +def random_ipv6(): + return ipaddress.IPv6Address._string_from_ip_int( + random.randint(0, MAX_IPV6)) + + +# 随机生成邮箱地址 +def RandomEmail(emailType=None, rang=None): + __emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"] + # 如果没有指定邮箱类型,默认在 __emailtype中随机一个 + if emailType == None: + __randomEmail = random.choice(__emailtype) + else: + __randomEmail = emailType + # 如果没有指定邮箱长度,默认在4-10之间随机 + if rang == None: + __rang = random.randint(4, 10) + else: + __rang = int(rang) + __Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ" + __randomNumber = "".join(random.choice(__Number) for i in range(__rang)) + _email = __randomNumber + __randomEmail + return _email + + +# 获取Schema +def schema(schemauerl, token, logtype): + url = "http://192.168.44.72:8080/v1/log/schema?logType=" + logtype + headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token} + response = requests.get(url=url, headers=headers) + return response.json() + + +# 获取json串中groupColumnList的值 +def groupby(schemajson, logtype, testpoint): + dimensions = schemajson["data"]["doc"]["schema_query"]["dimensions"] + dimensions.append("common_recv_time"); + if logtype == "security_event_log" or logtype == "connection_record_log" or logtype == "voip_record_log": + dimensions.remove("common_start_time") + dimensions.remove("common_end_time") + randomstr_1 = [] + if testpoint == "GroupBy": + randomstr_1.append("GroupBy_Negtive") + else: + randomstr_1 = random.sample(dimensions, 4) + + # 定义grp为返回值group的列表 + grp = [] + for i in randomstr_1: + a = {"name": i} + grp.append(a) + + re = [grp, randomstr_1] + print("groupby", re) + return re + + +# 获取json串中queryColumnList的值 +def DataBindings(schemajson, randomstr_1, testpoint, field): + # 生成queryColumnList列表 + print("field", field) + metrics = schemajson["data"]["doc"]["schema_query"]["metrics"] + metrics.append("common_log_id") + # 在列表里随机元素 + randomstr_2 = [] + if testpoint == "DataBindings_Field" or testpoint == "DataBindings_Aggregate": + randomstr_2.append(field) + randomstr_3 = randomstr_2 + else: + randomstr_2 = random.sample(metrics, 6) + # 在聚合列表中去掉groupby中的重复的元素 + randomstr_3 = array_diff(randomstr_2, randomstr_1) + # 将groupby中元素添加到串中 + qul = [] + for i in randomstr_1: + a = {"name": i} + qul.append(a) + + fields = schemajson["data"]["fields"] + if testpoint == "DataBindings_Aggregate": + list_1 = ["countdistinct"] + list_2 = ["summ"] + else: + list_1 = ["sum", "min", "max", "avg", "count"] + list_2 = ["count", "count_distinct"] + + if testpoint == "DataBindings_Field": + Aggregate = "sum" + randomstr_4 = {"name": randomstr_2[0], "expression": Aggregate} + qul.append(randomstr_4) + + elif testpoint == "DataBindings_Aggregate": + for i in randomstr_3: + for j in fields: + if i == j["name"]: + jtype = j["type"] + label = i + sun = 1 + if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double": + for Aggregate in list_1: + randomstr_4 = {"name": i, "expression": Aggregate, "label": label} + qul.append(randomstr_4) + label = label + str(sun) + sun += 1 + elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string": + for Aggregate in list_2: + randomstr_4 = {"name": i, "expression": Aggregate, "label": label} + qul.append(randomstr_4) + label = label + str(sun) + sun += 1 + + else: + for i in randomstr_3: + for j in fields: + if i == j["name"]: + jtype = j["type"] + if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double": + radomlist = random.sample(list_1, 1) + randomstr_4 = {"name": i, "expression": radomlist[0]} + qul.append(randomstr_4) + elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string": + randomlist = random.sample(list_2, 1) + randomstr_4 = {"name": i, "expression": randomlist[0]} + qul.append(randomstr_4) + print("DataBindings", qul) + return qul + + +# #去除a列表中存在的b的元素 +def array_diff(a, b): + # 定义空列表 + c = [] + # range(len(a))取的为列表a的索引,根据a的 + for i in range(len(a)): + # 取出索引对应的值 + t = a[i] + # 判断值是否存在在序列b中 + if t not in b: + # 如果序列不在b中,则写入序列c + c.append(t) + # 返回序列c,c就是列表a去除列表b之后的元素 + return c + + +def filterCondition(schemajson, testpoint, field): + number = random.randint(0, 100000) + randomstr = random.choice('abcdefghijklmnopqrstuvwxyz') + schemafilters = schemajson["data"]["doc"]["schema_query"]["filters"] + list1 = [] + if testpoint == "Filter_Field" or testpoint == "Filter_Operator": + list1.append(field) + else: + list1 = random.sample(schemafilters, 4) + # 获取不同属性支持的部不同操作 + fields = schemajson["data"]["fields"] + operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"] + andConditions = [] + if testpoint == "Filter_Field": + orConditions_list = [] + Field = {"name": field, "expression": "!=", "value": [1], "type": "int"} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + elif testpoint == "Filter_Operator": + for i in list1: + # 遍历fields列表 + for k in fields: + # 当filters列表值等于fields的name时 + if i == k["name"]: + name = k["name"] + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list = [] + Operator = ["=="] + randomOperator = random.sample(Operator, 1) + value = [str(number)] + Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list = [] + Operator = ["=="] + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + value = [] + value.append(str(number)) + Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + # + # else: + # if k["doc"]["constraints"] == None: + # type1 = k["type"] + # if type1 == "int" or type1 == "long": + # orConditions_list = [] + # Operator = ["=", "!=", ">", "<", ">=", "<="] + # if testpoint == "Filter": + # for op in Operator: + # value = [str(number)] + # Field = {"name": name, "expression": op, "value": value, "type": type1} + # orConditions_list.append(Field) + # else: + # randomOperator = random.sample(Operator, 1) + # value = [str(number)] + # Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1} + # orConditions_list.append(Field) + # orConditions = {"orConditions": orConditions_list} + # andConditions.append(orConditions) + # elif type1 == "string": + # orConditions_list = [] + # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + # if testpoint == "Filter": + # for op in Operator: + # randomOperator = op + # value = [] + # if randomOperator == "=" or randomOperator == "!=": + # value.append(str(number)) + # elif randomOperator == "Like" or randomOperator == "Not Like": + # value.append(randomstr) + # elif randomOperator == "notEmpty": + # value = [] + # Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + # orConditions_list.append(Field) + # else: + # randomOperator_1 = random.sample(Operator, 1) + # randomOperator = randomOperator_1[0] + # value = [] + # if randomOperator == "=" or randomOperator == "!=": + # value.append(str(number)) + # elif randomOperator == "Like" or randomOperator == "Not Like": + # value.append(randomstr) + # elif randomOperator == "notEmpty": + # value = [] + # Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + # orConditions_list.append(Field) + # orConditions = {"orConditions": orConditions_list} + # andConditions.append(orConditions) + # + # else: + # if k["doc"]["constraints"]["operator_functions"] == None: + # conrandomstraints = k["doc"]["constraints"] + # type1 = k["type"] + # if type1 == "int" or type1 == "long": + # orConditions_list = [] + # Operator = ["=", "!=", ">", "<", ">=", "<="] + # if testpoint == "Filter": + # for op in Operator: + # randomOperator = op + # if conrandomstraints["type"] == "timestamp": + # # 获取当前时间戳 + # t = int(time.time()) + # value = [str(t)] + # Field = {"name": name, "expression": randomOperator, "value": value, + # "type": type1} + # orConditions_list.append(Field) + # else: + # randomOperator_1 = random.sample(Operator, 1) + # randomOperator = randomOperator_1[0] + # if conrandomstraints["type"] == "timestamp": + # # 获取当前时间戳 + # t = int(time.time()) + # value = [str(t)] + # Field = {"name": name, "expression": randomOperator, "value": value, + # "type": type1} + # orConditions_list.append(Field) + # orConditions = {"orConditions": orConditions_list} + # andConditions.append(orConditions) + # elif type1 == "string": + # orConditions_list = [] + # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + # if testpoint == "Filter": + # if conrandomstraints["type"] == "ip": + # for op in Operator: + # # 获取ip + # ip = random_ipv4() + # value = [] + # if op == "=" or op == "!=": + # value.append(ip) + # elif op == "Like" or op == "Not Like": + # value.append(ip) + # elif op == "notEmpty": + # value = [] + # Field = {"name": name, "expression": op, "value": value, "type": type1} + # orConditions_list.append(Field) + # elif conrandomstraints["type"] == "email": + # for op in Operator: + # randomOperator = op + # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + # randomOperator_1 = random.sample(Operator, 1) + # randomOperator = randomOperator_1[0] + # # 获取ip + # emil = RandomEmail() + # value = [] + # if randomOperator == "=" or randomOperator == "!=": + # value.append(emil) + # elif randomOperator == "Like" or randomOperator == "Not Like": + # value.append(emil) + # elif randomOperator == "notEmpty": + # value = [] + # Field = {"name": name, "expression": randomOperator, "value": value, + # "type": type1} + # orConditions_list.append(Field) + # else: + # randomOperator_1 = random.sample(Operator, 1) + # randomOperator = randomOperator_1[0] + # if conrandomstraints["type"] == "ip": + # # 获取ip + # ip = random_ipv4() + # value = [] + # if randomOperator == "=" or randomOperator == "!=": + # value.append(ip) + # elif randomOperator == "Like" or randomOperator == "Not Like": + # value.append(ip) + # elif randomOperator == "notEmpty": + # value = [] + # Field = {"name": name, "expression": randomOperator, "value": value, + # "type": type1} + # orConditions_list.append(Field) + # orConditions = {"orConditions": orConditions_list} + # andConditions.append(orConditions) + # elif conrandomstraints["type"] == "email": + # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + # randomOperator_1 = random.sample(Operator, 1) + # randomOperator = randomOperator_1[0] + # # 获取ip + # emil = RandomEmail() + # value = [] + # if randomOperator == "=" or randomOperator == "!=": + # value.append(emil) + # elif randomOperator == "Like" or randomOperator == "Not Like": + # value.append(emil) + # elif randomOperator == "notEmpty": + # value = [] + # Field = {"name": name, "expression": randomOperator, "value": value, + # "type": type1} + # orConditions_list.append(Field) + # orConditions = {"orConditions": orConditions_list} + # andConditions.append(orConditions) + # else: + # type1 = k["type"] + # orConditions_list = [] + # operator1 = k["doc"]["constraints"]["operator_functions"] + # operator2 = operator1.split(",") + # if testpoint == "Filter": + # for op in operator2: + # operatordata = k["doc"]["data"] + # code = [] + # for i in operatordata: + # code_1 = i["code"] + # code.append(code_1) + # for co in code: + # Field = {"name": name, "expression": op, "value": co, "type": type1} + # orConditions_list.append(Field) + # else: + # operator3 = random.sample(operator2, 1) + # operatordata = k["doc"]["data"] + # code = [] + # for i in operatordata: + # code_1 = i["code"] + # code.append(code_1) + # code2 = random.sample(code, 1) + # Field = {"name": name, "expression": operator3[0], "value": code2, "type": type1} + # orConditions_list.append(Field) + # orConditions = {"orConditions": orConditions_list} + # andConditions.append(orConditions) + else: + for i in list1: + # 遍历fields列表 + for k in fields: + # 当filters列表值等于fields的name时 + if i == k["name"]: + name = k["name"] + doc = k["doc"] + # 获取无任何特殊说明列: + if doc == None: + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list = [] + Operator = ["=", "!=", ">", "<", ">=", "<="] + if testpoint == "Filter": + for op in Operator: + value = [str(number)] + Field = {"name": name, "expression": op, "value": value, "type": type1} + orConditions_list.append(Field) + else: + randomOperator = random.sample(Operator, 1) + value = [str(number)] + Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list = [] + Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + if testpoint == "Filter": + for op in Operator: + value = [] + if op == "=" or op == "!=": + value.append(str(number)) + elif op == "Like" or op == "Not Like": + value.append(randomstr) + elif op == "notEmpty" or op == "empty": + value = [] + Field = {"name": name, "expression": op, "value": value, "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + + else: + if k["doc"]["constraints"] == None: + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list = [] + Operator = ["=", "!=", ">", "<", ">=", "<="] + if testpoint == "Filter": + for op in Operator: + value = [str(number)] + Field = {"name": name, "expression": op, "value": value, "type": type1} + orConditions_list.append(Field) + else: + randomOperator = random.sample(Operator, 1) + value = [str(number)] + Field = {"name": name, "expression": randomOperator[0], "value": value, + "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list = [] + Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + if testpoint == "Filter": + for op in Operator: + randomOperator = op + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(str(number)) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(randomstr) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + + else: + if k["doc"]["constraints"]["operator_functions"] == None: + conrandomstraints = k["doc"]["constraints"] + type1 = k["type"] + if type1 == "int" or type1 == "long": + orConditions_list = [] + Operator = ["=", "!=", ">", "<", ">=", "<="] + if testpoint == "Filter": + for op in Operator: + randomOperator = op + if conrandomstraints["type"] == "timestamp": + # 获取当前时间戳 + t = int(time.time()) + value = [str(t)] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + if conrandomstraints["type"] == "timestamp": + # 获取当前时间戳 + t = int(time.time()) + value = [str(t)] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + elif type1 == "string": + orConditions_list = [] + Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + if testpoint == "Filter": + if conrandomstraints["type"] == "ip": + for op in Operator: + # 获取ip + ip = random_ipv4() + value = [] + if op == "=" or op == "!=": + value.append(ip) + elif op == "Like" or op == "Not Like": + value.append(ip) + elif op == "notEmpty": + value = [] + Field = {"name": name, "expression": op, "value": value, "type": type1} + orConditions_list.append(Field) + elif conrandomstraints["type"] == "email": + for op in Operator: + randomOperator = op + Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + # 获取ip + emil = RandomEmail() + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(emil) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(emil) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + else: + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + if conrandomstraints["type"] == "ip": + # 获取ip + ip = random_ipv4() + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(ip) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(ip) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + elif conrandomstraints["type"] == "email": + Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"] + randomOperator_1 = random.sample(Operator, 1) + randomOperator = randomOperator_1[0] + # 获取ip + emil = RandomEmail() + value = [] + if randomOperator == "=" or randomOperator == "!=": + value.append(emil) + elif randomOperator == "Like" or randomOperator == "Not Like": + value.append(emil) + elif randomOperator == "notEmpty": + value = [] + Field = {"name": name, "expression": randomOperator, "value": value, + "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + else: + type1 = k["type"] + orConditions_list = [] + operator1 = k["doc"]["constraints"]["operator_functions"] + operator2 = operator1.split(",") + if testpoint == "Filter": + for op in operator2: + operatordata = k["doc"]["data"] + code = [] + for i in operatordata: + code_1 = i["code"] + code.append(code_1) + for co in code: + Field = {"name": name, "expression": op, "value": co, "type": type1} + orConditions_list.append(Field) + else: + operator3 = random.sample(operator2, 1) + operatordata = k["doc"]["data"] + code = [] + for i in operatordata: + code_1 = i["code"] + code.append(code_1) + code2 = random.sample(code, 1) + Field = {"name": name, "expression": operator3[0], "value": code2, "type": type1} + orConditions_list.append(Field) + orConditions = {"orConditions": orConditions_list} + andConditions.append(orConditions) + filterCondition = {"andConditions": andConditions} + print("filterCondition", filterCondition) + return filterCondition + + +# 获取having条件的串 +def havingjson(schemajson, testpoint, field): + number = random.randint(0, 100000) + schemametrics = schemajson["data"]["doc"]["schema_query"]["metrics"] + aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"] + schemametrics.append("common_log_id") + metricslist = [] + if testpoint == "Having_Field" or testpoint == "Having_Aggregate" or testpoint == "Having_Operator": + metricslist.append(field) + else: + metricslist = random.sample(schemametrics, 4) + fields = schemajson["data"]["fields"] + + if testpoint == "Having_Aggregate": + Aggregate = ["COUNTT"] + else: + Aggregate = ["COUNT", "AVG", "SUM", "MAX", "MIN"] + + if testpoint == "Having_Operator": + operator = ["=="] + else: + operator = ["=", "!=", ">", "<", ">=", "<="] + + andConditions_list = [] + # 遍历的到的having条件列表 + if testpoint == "Having_Field": + orConditions_list=[] + havingdict = {"name": field, "function": "count","expression": "=", "value": 11} + orConditions_list.append(havingdict) + orConditions = {"orConditions": orConditions_list} + andConditions_list.append(orConditions) + elif testpoint == "Having_Aggregate": + for j in fields: + if field == j["name"]: + name = j["name"] + type1 = j["type"] + for v in aggregation: + if type1 == v["type"]: + orConditions_list = [] + if v["type"] != "string": + functionslist = Aggregate + else: + functionslist = ["COUNTT"] + if field == "common_log_id": + functionslist = ["COUNTT"] + functions_1 = random.sample(functionslist, 1) + operator_1 = random.sample(operator, 1) + havingdict = {"name": name, "function": str.lower(functions_1[0]), + "expression": operator_1[0], "value": str(number)} + orConditions_list.append(havingdict) + orConditions = {"orConditions": orConditions_list} + andConditions_list.append(orConditions) + elif testpoint == "Having_Operator": + for j in fields: + if field == j["name"]: + name = j["name"] + type1 = j["type"] + for v in aggregation: + if type1 == v["type"]: + orConditions_list = [] + if v["type"] != "string": + functionslist = Aggregate + else: + functionsstr = v["functions"] + functionslist = functionsstr.split(",") + if field == "common_log_id": + functionslist = ["COUNT"] + functions_1 = random.sample(functionslist, 1) + if functions_1 == "COUNT_DISTINCT" and type1 != "string": + functions_1 = random.sample(functionslist, 1) + operator_1 = random.sample(operator, 1) + + havingdict = {"name": name, "function": str.lower(functions_1[0]), + "expression": operator_1[0], "value": str(number)} + orConditions_list.append(havingdict) + orConditions = {"orConditions": orConditions_list} + andConditions_list.append(orConditions) + + else: + for i in metricslist: + for j in fields: + if i == j["name"]: + name = j["name"] + type1 = j["type"] + for v in aggregation: + if type1 == v["type"]: + orConditions_list = [] + if v["type"] != "string": + functionslist = Aggregate + else: + functionsstr = v["functions"] + functionslist = functionsstr.split(",") + if field == "common_log_id": + functionslist = ["COUNT"] + if testpoint == "Having": + for functions_1 in functionslist: + for operator_1 in operator: + havingdict = {"name": name, "function": str.lower(functions_1), + "expression": operator_1, "value": str(number)} + orConditions_list.append(havingdict) + orConditions = {"orConditions": orConditions_list} + andConditions_list.append(orConditions) + else: + functions_1 = random.sample(functionslist, 1) + if functions_1 == "COUNT_DISTINCT" and type1 != "string": + functions_1 = random.sample(functionslist, 1) + operator_1 = random.sample(operator, 1) + + havingdict = {"name": name, "function": str.lower(functions_1[0]), + "expression": operator_1[0], "value": str(number)} + orConditions_list.append(havingdict) + orConditions = {"orConditions": orConditions_list} + andConditions_list.append(orConditions) + havingCondition = {"andConditions": andConditions_list} + print("having", havingCondition) + return havingCondition + +# 拼接字符串 +def datasetjson(schemauerl, token, testname, logtype, testpoint, field): + schema_new = schema(schemauerl, token, logtype) + group_re = groupby(schema_new, logtype, testpoint) + groupColumnList = group_re[0] + group_randomstr = group_re[1] + queryColumnList = DataBindings(schema_new, group_randomstr, testpoint, field) + filterCondition_1 = filterCondition(schema_new, testpoint, field) + havingjson_1 = havingjson(schema_new, testpoint, field) + if testpoint == "LogType": + logtype = field + datasetdict = { + "list": { + "name": testname, + "logType": logtype, + "groupColumnList": groupColumnList, + "queryColumnList": queryColumnList, + "filterCondition": filterCondition_1, + "havingCondition": havingjson_1 + } + } + print(datasetdict) + print("datasetjson", json.dumps(datasetdict)) + return json.dumps(datasetdict) + +def ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, datasetgeturl, chargeturl, testname, logtype, + testpoint, field=None): + headers = {"Content-Type": "application/json", "Authorization": token} + # dataset生成json串并发送请求 + _datasetjson = datasetjson(schemaurl, token, testname, logtype, testpoint, field) + response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers) + print("返回数据1", response1) + code = response1.json()["code"] + print("datasetcode:", code) + if testpoint == "LogType": + assert code == 40040002 + elif testpoint == "GroupBy": + assert code == 40040008 + elif testpoint == "DataBindings_Field": + assert code == 40040004 + elif testpoint == "DataBindings_Aggregate": + assert code == 40040006 + elif testpoint == "Filter_Field": + assert code == 40040007 + elif testpoint == "Filter_Operator": + assert code == 40040010 + elif testpoint == "Having_Field": + assert code == 40040074 + elif testpoint == "Having_Aggregate": + assert code == 40040072 + elif testpoint == "Having_Operator": + assert code == 40040073 + +def ReportPositiveTest_Negtive(host, port, token, dataseturl, charurl, repporturl, logtypelist): + testpoint=["LogType","GroupBy","DataBindings_Field","DataBindings_Aggregate","Filter_Field","Filter_Operator","Having_Field","Having_Aggregate","Having_Operator"] + for logtype in logtypelist: + schemaurl = "http://" + host + ":" + port + "/v1/log/schema?logType=" + logtype + schema_new = schema(schemaurl, token, logtype) + metrics = schema_new["data"]["doc"]["schema_query"]["metrics"] + schemafilters = schema_new["data"]["doc"]["schema_query"]["filters"] + metrics.append("common_log_id") + for j in testpoint: + print(j) + if j == "LogType": + testname = "Report" + logtype + j + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + filter = "Negtive_log" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "GroupBy": + testname = "Report" + logtype + j + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + filter = "GroupByNegtive" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "DataBindings_Field": + testname = "Report" + logtype + j + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + filter = "DataBindingsFieldNegtive" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "DataBindings_Aggregate": + for filter in metrics: + testname = "Report" + logtype + j + filter + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "Filter_Field": + testname = "Report" + logtype + j + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + filter = "FilterFieldNegtive" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "Filter_Operator": + for filter in schemafilters: + testname = "Report" + logtype + j + filter + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + if j == "Having_Field": + testname = "Report" + logtype + j + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + filter="HavingFieldNegtive" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "Having_Aggregate": + for filter in metrics: + testname = "Report" + logtype + j + filter + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) + + if j == "Having_Operator": + for filter in metrics: + testname = "Report" + logtype + j + filter + dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser=" + char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser=" + ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl, + testname, logtype, j, filter) diff --git a/keyword/common/customlibrary/Custometest/Schema.py b/keyword/common/customlibrary/Custometest/Schema.py new file mode 100644 index 0000000..835d96a --- /dev/null +++ b/keyword/common/customlibrary/Custometest/Schema.py @@ -0,0 +1,350 @@ +# !/user/bin/python +# -*-coding:utf-8-*- +import requests +import random +import json + +# import allure + +list = [] + + +# 请求schema接口得到返回数据,用于其他接口 +def schema(schemauerl, token): + url = schemauerl # "http://192.168.44.72:8080/v1/log/schema?logType=security_event_log" + headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token} + response = requests.get(url=url, headers=headers) + return response.json() + + +# 根据schema接口返回数据,得出所有属性所支持的比较类型的列表 +# 1、根据[doc][allow_query]值为true列支持搜索; +# 2、如有[doc][constraints][operator_functions]值,操作优先; +# 3、如有[doc][data]值则对应属性取值为data所列code值; +# 4、int和long的范围不一致; +# 5、string要包含特殊字符 +# 6、给查询条件赋值,要给出边界和正常值 +# 7、IP(V4、V6)和URL要给出专门的方法生成 + +import ipaddress + +# 生成随机ipv4或ipv6 +MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1 +MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1 + + +def random_ipv4(): + return ipaddress.IPv4Address._string_from_ip_int( + random.randint(0, MAX_IPV4) + ) + + +def random_ipv6(): + return ipaddress.IPv6Address._string_from_ip_int( + random.randint(0, MAX_IPV6) + ) + + +from random import Random + + +# 生成 12 位随机 URL 地址 +def randrom_url(): + str = '' + str1 = '' + chars = 'abcdefghijklmnopqrstuvwxyz0123456789' + chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()' + length = len(chars) + length1 = len(chars1) + random = Random() + for x in range(random.randint(8, 16)): + str += chars[random.randint(0, length - 1)] + for pp in range(random.randint(8, 16)): + str1 += chars1[random.randint(0, length1 - 1)] + url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1 + print(url) + return url + + +def Filter1(schemauerl, token): + json_str = schema(schemauerl, token) + print(type(json_str)) + # 获取日志属性定义 + fields = json_str["data"]["fields"] + # 获取不同属性支持的部不同操作 + operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"] + for i in fields: + number = random.randint(-2147483648, 2147483647) + maxnumber = 2147483647 + minnumber = -2147483648 + str = random.choice('abcdefghijklmnopqrstuvwxyz!@#$%^&*') + name = i["name"] + doc = i["doc"] + # 获取无任何特殊说明列: + if doc == None: + type1 = i["type"] + for j in operator: + if type1 == j["type"]: + if type1 == "int" or type1 == "long": + value1 = number + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + f"{value1}" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + f"{value1}" + list.append(str1) + elif type1 == "string": + value1 = str + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "not empty" or v == "empty": + str1 = v + "(" + " '" + name + " '" + ")" + list.append(str1) + elif v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + " '" + value1 + " '" + list.append(str1) + else: + if i["doc"]["constraints"] == None: + type1 = i["type"] + for j in operator: + if type1 == j["type"]: + if type1 == "int" or type1 == "long": + value1 = number + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + f"{value1}" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + f"{value1}" + list.append(str1) + elif type1 == "string": + value1 = str + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "not empty" or v == "empty": + str1 = v + "(" + " '" + name + " '" + ")" + list.append(str1) + elif v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + " '" + value1 + " '" + list.append(str1) + + else: + if i["doc"]["constraints"]["operator_functions"] == None: + type1 = i["type"] + for j in operator: + if type1 == j["type"]: + if type1 == "int" or type1 == "long": + value1 = number + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + f"{value1}" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + f"{value1}" + list.append(str1) + elif type1 == "string": + value1 = str + functions = j["functions"] + functions1 = functions.split(",") + for v in functions1: + if v == "not empty" or v == "empty": + str1 = v + "(" + " '" + name + " '" + ")" + list.append(str1) + elif v == "in" or v == "not in": + str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")" + list.append(str1) + else: + str1 = name + " " + v + " " + " '" + value1 + " '" + list.append(str1) + else: + type1 = i["type"] + operator1 = i["doc"]["constraints"]["operator_functions"] + operator2 = operator1.split(",") + data = i["doc"]["data"] + for d in data: + code = d["code"] + if type1 == "int" or type1 == "long": + for o in operator2: + str1 = name + " " + o + " " + code + list.append(str1) + else: + for o in operator2: + str1 = name + " " + o + " " + " '" + code + " '" + list.append(str1) + + + print(list) + return list + + +# 根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口 +def logapiverify(logurl, schemauerl, token, starttime, endtime, logtype): + filter2 = Filter1(schemauerl, token) + a = schema(schemauerl, token) + fields = a["data"]["fields"] + print(fields) + str2 = "" + for i in filter2: + str2 = str2 + i + " " + "and" + " " + url = logurl # "http://192.168.44.72:8080/v1/log/list" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": i + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + assert code == 200 + print(response1.json()["code"]) + return response1.json() + print(str2) + str3 = str2[0:-4] + print(str3) + url = logurl # "http://192.168.44.72:8080/v1/log/list" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": str3 + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + print(response1.json()) + assert code == 200 + print(response1.json()["code"]) + + +# 精确filter,请求日志接口 +def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue): + a = schema(schemauerl, token) + fields = a["data"]["fields"] + print(fields) + url = logurl # "http://192.168.44.72:8080/v1/log/list" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": filtervalue + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + print(response1.json()) + assert code == 200 + print(response1.json()["code"]) + return response1.json() + + +# 事件日志和通联日志时间分布查询 ,日志检索条件校验(filter内容验证) +def distributed_query(logurl, token): + url = logurl # url示例:http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M + headers = {"Content-Type": "application/json", "Authorization": token} + response = requests.get(url=url, headers=headers) + code = response.json()["code"] + print(response.json()) + assert code == 200 + print(response.json()["code"]) + return response.json() + + +# 原始日志检索时间分布计算 +def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue): + url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution" + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "startTime": starttime, + "endTime": endtime, + "logType": logtype, + "granularity": granularity, + "filter": filtervalue + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + print(response1.json()) + print(response1.json()["code"]) + assert code == 200 + return response1.json() + +# 日志总数查询 +def countlog_query(logurl, token, starttime, endtime, logtype): + url = logurl + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "pageSize": 20, + "logType": logtype, + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "filter": "" + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + code = response1.json()["code"] + print(response1.json()) + print(response1.json()["code"]) + assert code == 200 + return response1.json() + +# 日志导出接口 +def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue): + a = schema(schemauerl, token) + fields = a["data"]["fields"] + print(fields) + url = logurl + headers = {"Content-Type": "application/json", + "Authorization": token} + data = { + "start_common_recv_time": starttime, + "end_common_recv_time": endtime, + "logType": logtype, + "fields": fields, + "filter": filtervalue + } + print(data) + print(json.dumps(data)) + response1 = requests.post(url=url, data=json.dumps(data), headers=headers) + a=type(response1) + if a != "class 'requests.models.Response'": + assert 1 == 1 + else: + assert 1 == 2 + + + + +# if __name__ == '__main__': +# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log")
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/StringManipulation.py b/keyword/common/customlibrary/Custometest/StringManipulation.py new file mode 100644 index 0000000..858d6fc --- /dev/null +++ b/keyword/common/customlibrary/Custometest/StringManipulation.py @@ -0,0 +1,7 @@ +from builtins import len + +#用于Settings和Administrator关键字对字符串的切割 +def StringSegmentation(a,k): + b=len(a) + c=a[k:b] + return c
\ No newline at end of file diff --git a/keyword/common/customlibrary/Custometest/UIAssert.py b/keyword/common/customlibrary/Custometest/UIAssert.py new file mode 100644 index 0000000..267c100 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/UIAssert.py @@ -0,0 +1,22 @@ +import ssl, socket + +# content of test_json_schema.py +from jsonschema import validators + + +#获取页面证书信息 +def ttt(): + hostname = 'vip.com' + ctx = ssl.create_default_context() + with ctx.wrap_socket(socket.socket(), server_hostname=hostname) as s: + s.connect((hostname, 443)) + cert = s.getpeercert() + + subject = dict(x[0] for x in cert['subject']) + issued_to = subject['commonName'] + issuer = dict(x[0] for x in cert['issuer']) + issued_by = issuer['commonName'] + print(issued_by) + + + diff --git a/keyword/common/customlibrary/Custometest/__init__.py b/keyword/common/customlibrary/Custometest/__init__.py new file mode 100644 index 0000000..4081636 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__init__.py @@ -0,0 +1,16 @@ + +#-*- coding:utf-8 -*- +''' + created by hch 2019-06-26 +''' + +from Custometest.printlog import printlog +from Custometest.MD5 import MD5 +from Custometest.cmd_cer import Order +# from custometest.printlog import printlog + + +__version__ = '1.0' + +class Custometest(printlog,Order,MD5): + ROBOT_LIBRARY_SCOPE = 'GLOBAL' diff --git a/keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..190d80c --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..38716f5 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..02d7ce7 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..5c242a7 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..44a26a1 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..30038ed --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..f6ad333 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pyc diff --git a/keyword/common/customlibrary/Custometest/certificate.yaml b/keyword/common/customlibrary/Custometest/certificate.yaml new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/certificate.yaml diff --git a/keyword/common/customlibrary/Custometest/cmd_cer.py b/keyword/common/customlibrary/Custometest/cmd_cer.py new file mode 100644 index 0000000..50cdf08 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/cmd_cer.py @@ -0,0 +1,290 @@ +import os +import subprocess +from time import sleep +import platform + + + +class Order: + def CMD(self,data): + result = os.popen(data) + # res = result.read().encoding('GBK') + res = result.read() + result.close() + # res = res.decode("unicode-escape") + return res + def Linux(self): + pass + # 根据证书颁发者名字判断证书是否替换 + def Cert_Verification(self,data): + c = [] + print(1) + #with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo: + with open(r'certificate.yaml', 'r') as foo: + print(2) + for line in foo.readlines(): + if data in line: + print(line) + c.append('证书已替换') + else: + pass + if '证书已替换' in c: + # print('证书已替换') + foo.close() + return '证书已替换' + else: + # print('证书未替换') + foo.close() + return '证书未替换' + + def Content_Type(self,data): + d = [] + with open('certificate.yaml', 'r') as foo: + for line in foo.readlines(): + if data in line: + # print(line) + d.append('Content_Type已替换') + else: + pass + if 'Content_Type已替换' in d: + # print('证书已替换') + foo.close() + return 'Content_Type已替换' + else: + # print('证书未替换') + foo.close() + return 'Content_Type未替换' + # curl路由内容设置 + def curl_name(self,data): + #curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk' + curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk' + return curl_name + # 控制器 + def manu(self,url,Certificate): + # print(data['url']) + n = 0 + while n != len(url): + b = self.curl_name(url[n]) + d = self.CMD(b) + # print(d) + sleep(1) + if Certificate != "": + c =self.Cert_Verification(Certificate) + # f = self.Content_Type(data["Content_Type"]) + sleep(1) + assert_cer = url[n]+c + # assert_Content_Type = data['Content_Type']+f + n+=1 + return d,assert_cer + + def FTP(self, ftp_type): + windows_path = os.getcwd() + linux_path = os.getcwd().replace('\\', '/') + # 判断FTP执行类型:(下载/登录) + if ftp_type == "下载": + # 调用cmd执行FTP下载文件 + data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" -o zmmtext123.txt' + d = self.CMD(data) + sleep(5) + fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814 + if fsize == 435814: + return "ftp_success" + else: + return "ftp_fail" + elif ftp_type == "登录": + data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" | iconv -f utf-8 -t gbk' + d = self.CMD(data) + # print(d) + if "Graphical (Xorg) program starter for ADRIANE" in d: + return "ftp_success" + else: + return "ftp_fail" + # FTP 下载 + def FTP_down(self, ftp_url,file_size,file_name): + windows_path = os.getcwd() + linux_path = os.getcwd().replace('\\', '/') + # 判断FTP执行类型:(下载/登录) + # 调用cmd执行FTP下载文件 + data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' " + print(data) + d = self.CMD(data) + sleep(5) + fsize = os.path.getsize(linux_path + "/"+file_name) # 435814 + print(fsize) + if fsize == file_size: + return "ftp_success" + else: + return "ftp_fail" + + # FTP 登录 + def FTP_login(self, ftp_url,file_content): + SYS = self.Operating_System() + if SYS == "Windows": + data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk' + d = self.CMD(data) + else: + data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk' + d = self.CMD(data) + + if file_content in d: + return "ftp_success" + else: + return "ftp_fail" + + # 判断当前操作系统 + def Operating_System(self): + os_name = platform.system() + return os_name + + + #需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容 + # 全局变量 null,将java中的空值(null),装换位python中空值("") + global null + null = '' + + # 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容,header 启用自定义json + def Jsoneditmanu(self, jsons, datas=None,header=None): + #判断是否启用自定义json + if header != None: + header = eval(header) + # 返回 header + return header + # 判断是否需要更改json内容 + elif datas != None: + # datas = eval(datas) + jsons = eval(jsons) + # 循环遍历替换json内容 + for k, v in datas.items(): + Order.UpdateAllvalues(self,jsons, k, v) + return jsons + else: + # 返回初始json + return jsons + + # 循环嵌套替换 + def UpdateAllvalues(self,mydict, key, value): + if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典 + if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key + mydict[key] = value + for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key + chdict = mydict[k] + Order.UpdateAllvalues(self,chdict, key, value) + mydict[k] = chdict + elif isinstance(mydict, list): # 如是list + for element in mydict: # 遍历list元素,以下重复上面的操作 + if isinstance(element, dict): + if key in element.keys(): + element[key] = value + for k in element.keys(): + chdict = element[k] + Order.UpdateAllvalues(self,chdict, key, value) + element[k] = chdict + + + + #递归提取json中符合要求键的值 + import json + def get_dict_allkeys(self,dict_a): + """ + 遍历嵌套字典,获取json返回结果的所有key值 + :param dict_a: + :return: key_list + """ + if isinstance(dict_a, dict): # 使用isinstance检测数据类型 + # 如果为字典类型,则提取key存放到key_list中 + for x in range(len(dict_a)): + temp_key = list(dict_a.keys())[x] + temp_value = dict_a[temp_key] + if temp_key.endswith("Id"): + key_list.append(temp_value) + Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历 + elif isinstance(dict_a, list): + # 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key + for k in dict_a: + if isinstance(k, dict): + for x in range(len(k)): + temp_key = list(k.keys())[x] + temp_value = k[temp_key] + if temp_key.endswith("Id"): + key_list.append(temp_value) + Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历 + return key_list + + # 判断值是否在列表中 + def VerifyProxy(self,data,lists): + global key_list + key_list = [] + datas = Order.get_dict_allkeys(self,data) + print(type(datas)) + lists=lists.split(",") + print(type(lists)) + print("gsd") + datas2=list(map(str,datas)) + print(datas2) + print(datas) + print(lists) + + if set(datas2) > set(lists): + return "true" + else: + return "flase" + + +if __name__ == '__main__': +# datas = {"url":['https://www.baidu.com'], +# "Certificate":"Tango Secure Gateway CA", +# # "Content_Type":"text/html", +# 'log':'Security Event Logs', +# "sni":['baidu'], +# "intercept_code":"200", +# "log_code":"200", +# "certifucate":"1", +# "log_content":"true" +# } +# # data= {"url":['https://www.baidu.com'], +# # "Certificate":"Tango Secure Gateway CA" +# # } +# # url = ['https://www.baidu.com'] +# # url = ['https://www.baidu.com'] +# # url = ['https://www.baidu.com'] +# # # Certificate1 = "Tango Secure Gateway CA" +# # Certificate = "baidu" +# # a='Tango Secure Gateway CA' +# # s = Order() +# # b = s.manu(url,Certificate) +# # print(b[1]) +# # FTP下载 传入ftp的路径和文件大小 +# ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" ' +# ftp_size = 435814 +# ftp_issue = s.FTP_down(ftp_url,ftp_size) +# # FTP登录 传入ftp的路径和文件内容 +# ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" ' +# file_content = "Graphical (Xorg) program starter for ADRIANE" +# ftp_issue = s.FTP_login(ftp_url,file_content) + # for i in b: + # print(i) + # dd = s.CMD('curl -I https://www.baidu.com') + # print(dd) + # if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd: + # print("ok") + # a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk' + + + # 自己写一个字典测试一下上面的方法好用不好用 + jsons = {"opAction":"add","policyList":{"policyId":"","policyName":"2324242423","policyType":"tsg_security","action":"intercept","userTags":"","doBlacklist":0,"doLog":1,"policyDesc":"","effectiveRange":{"tag_sets":[[]]},"userRegion":{"protocol":"SSL","keyring":1,"decryption":1,"decrypt_mirror":{"enable":0,"mirror_profile":null}},"referenceObject":[{"objectId":28054,"protocolFields":["TSG_SECURITY_SOURCE_ADDR"]}],"isValid":0,"scheduleId":[],"appObjectIdArray":[3]}} + datas = {"protocol":"edit","opAction":"edit","policyId":123,'protocolFields':1} + + print("替换前:\n %s" % jsons) + + + a = Order() + b = a.Jsoneditmanu(jsons,datas) + # print("替换前:\n %s" % jsons) + print("替换后:\n %s" % b) + + data = {"aid":[{'bid':2},{'cid':3}]} + print(type(data)) + # data="""{}""" + # data1 = json.loads(data) + get_keys = get_dict_allkeys(data) + print(get_keys) diff --git a/keyword/common/customlibrary/Custometest/log_contrast.py b/keyword/common/customlibrary/Custometest/log_contrast.py new file mode 100644 index 0000000..96e5e35 --- /dev/null +++ b/keyword/common/customlibrary/Custometest/log_contrast.py @@ -0,0 +1,8 @@ +#!/user/bin/python +#-*-coding:utf-8-*- +def log_contrast(logs,client_ip,policy_id,parmkey,parmvalue): + if (str(client_ip) in str(logs))and (str(policy_id) in str(logs)) and (str(parmkey) in str(logs)) and (str(parmvalue) in str(logs)): + print(logs) + return "true" + else: + return "false" diff --git a/keyword/common/customlibrary/Custometest/printlog.py b/keyword/common/customlibrary/Custometest/printlog.py new file mode 100644 index 0000000..02f8ced --- /dev/null +++ b/keyword/common/customlibrary/Custometest/printlog.py @@ -0,0 +1,11 @@ + +#-*- coding:utf-8 -*- +''' + created by hch 2019-06-26 +''' + + +class printlog(): + + def printA(): + print("hello word")
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py new file mode 100644 index 0000000..e60bf56 --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py @@ -0,0 +1,7 @@ +#coding=utf-8 +from mytool import mytool + +version = '1.0' + +class ExtensionLibrary(mytool): + ROBOT_LIBRARY_SCOPE = 'GLOBAL'
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py new file mode 100644 index 0000000..03440cb --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py @@ -0,0 +1,26 @@ +#coding=utf-8 +import socket + +""" 获取主机信息 """ +class mytool(): + def __init__(self): + pass + + def get_host_IP(self, flag="50"): + #try: + #s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + #s.connect(('8.8.8.8', 80)) + #ip = s.getsockname()[0] + #finally: + #s.close() + hostname = socket.gethostname() + ipTriple = socket.gethostbyname_ex(hostname) + ips = list(ipTriple[2:]) + ip = "" + for i in ips: + ipList = list(i) + for ip1 in ipList: + if "."+flag+"." in ip1: + ip = ip1 + break + return ip
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py new file mode 100644 index 0000000..29db0ed --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py @@ -0,0 +1,7 @@ +#coding=utf-8 +from filetool import filetool + +version = '1.0' + +class FileLibrary(filetool): + ROBOT_LIBRARY_SCOPE = 'GLOBAL'
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py new file mode 100644 index 0000000..8ad1ef7 --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py @@ -0,0 +1,22 @@ +#coding=utf-8 + +""" 变量文件操作 """ +class filetool(): + def __init__(self): + pass + + def alter_dict(self, path, k, v): + data = '' + flag = True + key = '${%s}' % (k) + add = key + '\t%s' % (v) + '\n' + with open(path, 'r+') as f: + for line in f.readlines(): + if(line.find(key + '\t') == 0): + line = add + flag = False + data += line + if(flag): + data += add + with open(path, 'w+') as f: + f.writelines(data)
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py new file mode 100644 index 0000000..7df32d2 --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py @@ -0,0 +1,85 @@ +#coding=utf-8 +import datetime +import time +import string +class GetTime(): + def __init__(self): + pass + def time1(self,t): + if t=="m": + print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) + if time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[-4] >= "5": + time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-4]+"5:00" + return time2 + else: + time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-4] + "0:00" + return time2 + elif t=="s": + if time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[-2] >= "3": + time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-2]+"30" + return time2 + else: + time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-2] + "00" + return time2 + elif t=="h": + time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-7]+":00:00" + startTime = datetime.datetime.strptime(time2, "%Y-%m-%d %H:%M:%S") + startTime2 = (startTime + datetime.timedelta(hours=-1)).strftime("%Y-%m-%d %H:%M:%S") + return startTime2 + elif t=="5m": + time2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + print(time2) + startTime = datetime.datetime.strptime(time2, "%Y-%m-%d %H:%M:%S") + startTime2 = (startTime + datetime.timedelta(minutes=-5)).strftime("%Y-%m-%d %H:%M:%S") + if startTime2[-4]>="5": + time3 = startTime2[0:-4] + "5:00" + print(time3) + else: + time3 = startTime2[0:-4] + "0:00" + print(time3) + return time3 + elif t== "30s": + time2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + print(time2) + startTime = datetime.datetime.strptime(time2, "%Y-%m-%d %H:%M:%S") + startTime2 = (startTime + datetime.timedelta(seconds=-30)).strftime("%Y-%m-%d %H:%M:%S") + if startTime2[-2] >= "3": + time3 = startTime2[0:-2]+"30" + print(time3) + return time3 + else: + time3 = startTime2[0:-2]+"00" + print(time3) + return time3 + def str2sec(self,x): + a, b = x.strip().split(' ') + y, m, d = a.strip().split('-') + + h, m, s = b.strip().split(':') #.split()函数将其通过':'分隔开,.strip()函数用来除去空格 + + return int(h)*3600 + int(m)*60 + int(s),d #int()函数转换成整数运算 + def intersection(self,a,b): + c= 0 + f = 0 + for j in a: + + for i in j: + if i in b[f]: + print(i) + print(b[f]) + c= c+1 + f = f + 1 + d = len(b)*len(b[0]) + if d == 0: + e=0 + elif c==0: + e = 0 + else: + e = c/d + if e >= 0.8 : + return "ture" + else: + return "false" + + + diff --git a/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py new file mode 100644 index 0000000..c0f47eb --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py @@ -0,0 +1,6 @@ +#coding=utf-8 +from GetTimeLibrary.GetTime import GetTime + +__version__ = '1.0' +class GetTimeLibrary(GetTime): + ROBOT_LIBRARY_SCOPE = 'GLOBAL'
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth b/keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth new file mode 100644 index 0000000..73e1268 --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth @@ -0,0 +1,2 @@ +ExtensionLibrary +FileLibrary
\ No newline at end of file diff --git a/keyword/common/customlibrary/ExtensionPackages/readme.txt b/keyword/common/customlibrary/ExtensionPackages/readme.txt new file mode 100644 index 0000000..e3b091b --- /dev/null +++ b/keyword/common/customlibrary/ExtensionPackages/readme.txt @@ -0,0 +1,4 @@ +# 导入方法: +# 1、将‘extensionLibrary.pth’文件放到...\Python\Lib\site-packages目录下 +# 2、根据需要将包目录同‘extensionLibrary.pth’文件一同放到...\Python\Lib\site-packages目录下(目前有‘ExtensionLibrary’和‘FileLibrary’) +# 3、在测试夹中导入此包即可使用扩展功能
\ No newline at end of file diff --git a/keyword/common/customlibrary/Library/VerifyPolicy.py b/keyword/common/customlibrary/Library/VerifyPolicy.py new file mode 100644 index 0000000..a0bce71 --- /dev/null +++ b/keyword/common/customlibrary/Library/VerifyPolicy.py @@ -0,0 +1,38 @@ +import json +def get_dict_allkeys(dict_a): + if isinstance(dict_a, dict): # 使用isinstance检测数据类型 + # 如果为字典类型,则提取key存放到key_list中 + for x in range(len(dict_a)): + temp_key = list(dict_a.keys())[x] + temp_value = dict_a[temp_key] + if temp_key.endswith("Id"): + key_list.append(temp_value) + get_dict_allkeys(temp_value) # 自我调用实现无限遍历 + elif isinstance(dict_a, list): + # 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key + for k in dict_a: + if isinstance(k, dict): + for x in range(len(k)): + temp_key = list(k.keys())[x] + temp_value = k[temp_key] + if temp_key.endswith("Id"): + key_list.append(temp_value) + get_dict_allkeys(temp_value) # 自我调用实现无限遍历 + return key_list +def VerifyProxy(data,lists): + global key_list + key_list = [] + datas = get_dict_allkeys(data) + print(type(datas)) + lists=lists.split(",") + print(type(lists)) + print("gsd") + datas2=list(map(str,datas)) + print(datas2) + print(datas) + print(lists) + + if set(datas2) > set(lists): + return "true" + else: + return "flase" diff --git a/keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pyc b/keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..12341de --- /dev/null +++ b/keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pyc diff --git a/keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pyc b/keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..694ad83 --- /dev/null +++ b/keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pyc diff --git a/keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pyc b/keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..62d758f --- /dev/null +++ b/keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pyc diff --git a/keyword/common/customlibrary/Library/delUseless.py b/keyword/common/customlibrary/Library/delUseless.py new file mode 100644 index 0000000..3039794 --- /dev/null +++ b/keyword/common/customlibrary/Library/delUseless.py @@ -0,0 +1,45 @@ +import json +def dict_del(key, obj): + if isinstance(obj, dict): + if key in obj: + obj.pop(key) + #print(obj.items()) + for k, v in obj.items(): + #print(obj.items()) + if v is None: + v = '666' + else: + pass + dict_del(key, v) + elif isinstance(obj, list): + for x in obj: + dict_del(key, x) + else: + pass + #print(type(obj)) + obj = json.dumps(obj) + return obj +def deal(jsondata, keylist): + jsondata = json.loads(jsondata) + if "data" in jsondata.keys(): + jsondata = jsondata["data"] + else: + pass + # jsondata = '{"opAction":"add","refuseCode":true,"returnData":1,"objectList":{"objectType":"fqdn","objectSubType":"fqdn","isValid":1,"isInitialize":0,"isExclusion":0,"objectName":"hbn_test_fqdn","objectDesc":"","subObjectIds":[],"addItemList":[{"keywordArray":["*abcds"],"t":"16191477536650","itemId":"","isHexbin":0,"state":2}],"updateItemList":[],"deleteItemIds":[],"iconColor":"#31739C"}}' + # keylist = ["objectType","objectSubType","isValid","isInitialize"] + # len2 = len(keylist) + # print("aaaaaaaaaaaaaaaaaaaaaaaaaa"+str(len2)) + # for num in range(len2): + # retstr = dict_del(keylist[num], jsondata) + # print("#############################"+retstr) + # return retstr +#for num in range(len2): + #print("$$$$$$$$$$$$$$$$$$"+dict_del(keylist[num],jsondata)) + #key1 = keylist[num] + #print(num) + #print("#############################"+dict_del(key1,jsondata)) + len1 = len(keylist) + for num in range(len1): + retstr = dict_del(keylist[num], jsondata) + dict_del(keylist[num], jsondata) + return retstr
\ No newline at end of file diff --git a/keyword/common/customlibrary/Library/fileOperations.py b/keyword/common/customlibrary/Library/fileOperations.py new file mode 100644 index 0000000..4029e26 --- /dev/null +++ b/keyword/common/customlibrary/Library/fileOperations.py @@ -0,0 +1,26 @@ +def CountLines(fname): + count = 0 + with open(fname, 'rb') as f: + for file_line in f: + file_line = file_line.strip() + # print(file_line) + # 空行 + if file_line == b'': + pass + + # 注释 # 开头 + elif file_line.startswith(b'-->'): + pass + + # 代码 + else: + count += 1 + print(fname + '----', count) + # 单个文件行数 + # print(fname,'----count:',count) + return count +def WriteBinary(response,path1): + with open(path1,"wb") as f2: + strb = response + f2.write(strb) + diff --git a/keyword/common/customlibrary/Pop3Library/__init__.py b/keyword/common/customlibrary/Pop3Library/__init__.py new file mode 100644 index 0000000..66219ac --- /dev/null +++ b/keyword/common/customlibrary/Pop3Library/__init__.py @@ -0,0 +1,200 @@ +import poplib +import base64 +import time +from email.parser import Parser +# 用来解析邮件主题 +from email.header import decode_header +# 用来解析邮件来源 +from email.utils import parseaddr + +from robot.api.deco import keyword +from robot.api import logger + + +class AcceptEmail(object): + + def __init__(self, user_email, password, pop3_server='serverDemon'): + self.user_email = user_email + self.password = password + self.pop3_server = pop3_server + + self.connect_email_server() + + def connect_email_server(self): + self.server = poplib.POP3(self.pop3_server) + # 打印POP3服务器的欢迎文字,验证是否正确连接到了邮件服务器 + # print('连接成功 -- ', self.server.getwelcome().decode('utf8')) + # +OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0) + + # 开始进行身份验证 + self.server.user(self.user_email) + self.server.pass_(self.password) + + def __del__(self): + # 关闭与服务器的连接,释放资源 + self.server.close() + + def get_email_count(self): + # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可 + email_num, email_size = self.server.stat() + # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size)) + return email_num + + def receive_email_info(self, now_count=None): + # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可 + email_num, email_size = self.server.stat() + # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size)) + self.email_count = email_num + self.email_sumsize = email_size + + # 使用list()返回所有邮件的编号,默认为字节类型的串 + rsp, msg_list, rsp_siz = self.server.list() + # print(msg_list, '邮件数量',len(msg_list)) + # print("服务器的响应: {0},\n消息列表: {1},\n返回消息的大小: {2}".format(rsp, msg_list, rsp_siz)) + # print('邮件总数: {}'.format(len(msg_list))) + self.response_status = rsp + self.response_size = rsp_siz + + # 下面获取最新的一封邮件,某个邮件下标(1开始算) + # total_mail_numbers = len(msg_list) + + # 动态取消息 + total_mail_numbers = now_count + + rsp, msglines, msgsiz = self.server.retr(total_mail_numbers) + # rsp, msglines, msgsiz = self.server.retr(1) + # print("服务器的响应: {0},\n原始邮件内容: {1},\n该封邮件所占字节大小: {2}".format(rsp, msglines, msgsiz)) + + # 从邮件原内容中解析 + msg_content = b'\r\n'.join(msglines).decode('utf-8')#gbk + msg = Parser().parsestr(text=msg_content) + self.msg = msg + # print('解码后的邮件信息:\n{}'.format(msg)) + + def recv(self, now_count=None): + self.receive_email_info(now_count) + self.parser() + + def get_email_title(self): + subject = self.msg['Subject'] + value, charset = decode_header(subject)[0] + if charset: + value = value.decode(charset) + # print('邮件主题: {0}'.format(value)) + self.email_title = value + + def get_sender_info(self): + hdr, addr = parseaddr(self.msg['From']) + # name 发送人邮箱名称, addr 发送人邮箱地址 + name, charset = decode_header(hdr)[0] + if charset: + name = name.decode(charset) + self.sender_qq_name = name + self.sender_qq_email = addr + # print('发送人邮箱名称: {0},发送人邮箱地址: {1}'.format(name, addr)) + + def get_email_content(self): + content = self.msg.get_payload() + # 文本信息 + content_charset = content[0].get_content_charset() # 获取编码格式 + text = content[0].as_string().split('base64')[-1] + text_content = base64.b64decode(text).decode(content_charset) # base64解码 + self.email_content = text_content + # print('邮件内容: {0}'.format(text_content)) + + # 添加了HTML代码的信息 + content_charset = content[1].get_content_charset() + text = content[1].as_string().split('base64')[-1] + # html_content = base64.b64decode(text).decode(content_charset) + + # print('文本信息: {0}\n添加了HTML代码的信息: {1}'.format(text_content, html_content)) + + def parser(self): + self.get_email_title() + self.get_sender_info() + #self.get_email_content() + + +def get_new_mail(user_name, pwd, pop3_server, second=5): + t = AcceptEmail(user_name, pwd, pop3_server) + now_count = t.get_email_count() + #print('开启的时候的邮件数量为:%s' % now_count) + logger.info("开启的时候的邮件数量为:"+str(now_count)) + # 每次需要重新连接邮箱服务器,才能获取到最新的消息 + # 默认每隔5秒看一次是否有新内容 + num = 0 + while True: + obj = AcceptEmail(user_name, pwd, pop3_server) + count = obj.get_email_count() + if count > now_count: + new_mail_count = count - now_count + #print('有新的邮件数量:%s' % new_mail_count) + logger.info("有新的邮件数量:"+str(new_mail_count)) + now_count += 1 + obj.recv(now_count) + + yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email} + #yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email, + # "email_content": obj.email_content} + if new_mail_count > 0: + return + # print('-' * 30) + # print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % ( + # obj.email_title, obj.sender_qq_name, obj.sender_qq_email, obj.email_content)) + # print('-' * 30) + + #else: + #print('没有任何新消息.') + #logger.info("没有任何新消息.") + time.sleep(second) + num += 1 + if num == 36:#等待时间粒度,一个粒度是5s,如果num=10意思就是等待接收邮件50s,若没有邮件就返回;36是等待3min + return + + +@keyword('Recv Email') +def recv_email(user_name, pwd, pop3_server, send_user, subj): + ''' + 参数说明: + [user_name]:用户名 + [pwd]:密码,第三方登入密码 + [pop server]:pop服务器 + [sender]:发送者邮箱,注意全称 + [subj_sub]:主题的部分内容,这里是测主题是否包含该参数 + [return]:返回值,成功返回success,失败返回n其他 + 其他问题请阅读该库的readme.txt + ''' + dic = {} + logger.info("正在监听邮件服务器端是否有新消息---") + #print('正在监听邮件服务器端是否有新消息---') + try: + iterator = get_new_mail(user_name, pwd, pop3_server) + except TypeError: + #print('监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容') + logger.info("监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容") + return "fail" + else: + for dic in iterator: + #print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % ( + # dic["title"], dic["sender"], dic["sender_email"], dic["email_content"])) + #logger.info("邮件主题: " + str(dic["title"]) + " 发件人: " + str(dic["sender"])+"发件人邮箱:"+str(dic["sender_email"])) + if dic["sender"] == send_user: + #logger.info("发送者一样") + if subj in dic["title"]: + #logger.info("主题也包含") + return "success" + else: + #logger.info("主题不包含") + return "fail" + else: + return "fail" + + +#if __name__ == '__main__': +# user = '[email protected]' +# pwd = 'xxxx' +# pop3_server = 'pop.qq.com' +# send_user = '[email protected]' +# subj = 'or2020' +# result = recv_email(user, pwd, pop3_server, send_user, subj) +# print(result)
\ No newline at end of file diff --git a/keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc b/keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..ecd3bf5 --- /dev/null +++ b/keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc diff --git a/keyword/common/customlibrary/Pop3Library/readme.txt b/keyword/common/customlibrary/Pop3Library/readme.txt new file mode 100644 index 0000000..42c7c9b --- /dev/null +++ b/keyword/common/customlibrary/Pop3Library/readme.txt @@ -0,0 +1,26 @@ +导入方法: +1.将该目录放到....\Python\Lib\site-packages 下 +2.在测试夹具里面要根据绝对路径导入此包 + + +注意: +1.使用该包的关键字时不要用qq邮箱,qq邮箱测试发现时长会失灵现象,最好用163邮箱等 +2.注意关闭邮箱的加密传送方式SSL协议 +3.注意邮箱是否支持POP3的协议以及不同邮箱pop服务器的写法 + + +关键字: +[return] Recv Email [user_name] [pwd] [pop server] [sender] [subj_sub] +参数说明: +[user_name]:用户名 +[pwd]:密码,第三方登入密码 +[pop server]:pop服务器 +[sender]:发送者邮箱,注意全称 +[subj_sub]:主题的部分内容,这里是测主题是否包含该参数 +[return]:返回值,成功返回success,失败返回n其他 +该关键字默认等待3min,3min内每5s检测邮箱是否收到邮件, +若收到邮件与[sender]参数进行完全匹配,与[subj_sub]部分主题内容参数进行是否包含匹配,匹配成功则返回success; +若收到邮件匹配失败或者超时则返回其他, + + +若修改等待时间,可查找源码中的num变量将36改为其他值即可,注意时间粒度是5s,若num=3,则您修改的等待时间是15s diff --git a/keyword/common/customlibrary/Smtp3Library/__init__.py b/keyword/common/customlibrary/Smtp3Library/__init__.py new file mode 100644 index 0000000..c86362a --- /dev/null +++ b/keyword/common/customlibrary/Smtp3Library/__init__.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This file is part of robotframework-Smtp3Library. +# https://github.io/lucamaro/robotframework-Smtp3Library + +# Licensed under the Apache License 2.0 license: +# http://www.opensource.org/licenses/Apache-2.0 +# Copyright (c) 2016, Luca Maragnani <[email protected]> + +""" +Library implementation +""" +import ast +import smtplib +import email +import random +import string +import mimetypes +import quopri +from email.message import Message +from email.mime.audio import MIMEAudio +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email import encoders +import os.path +import socket +from robot.api.deco import keyword +from robot.api import logger + +from Smtp3Library.version import __version__ # NOQA + +COMMASPACE = ', ' + +class Smtp3Library(object): + """ + SMTP Client class + """ + + def __init__(self): + """ + Constructor + """ + self.message = self._MailMessage() + self.host = None + self.port = None + self.user = None + self.password = None + self.smtp = None + + def _prepare_connection(self, host, port, user=None, password=None): + """ + Private method to collect connection informations + """ + self.host = host + self.port = int(port) + self.user = user + self.password = password + self.client_hostname = socket.gethostname() + + + def prepare_ssl_connection(self, host, port=465, user=None, password=None): + """ + Collect connection informations for SSL channel + """ + self._prepare_connection(host, port, user, password) + self.smtp = smtplib.SMTP_SSL() + + def prepare_connection(self, host, port=25, user=None, password=None): + """ + Collect connection informations for unencrypted channel + """ + self._prepare_connection(host, port, user, password) + self.smtp = smtplib.SMTP() + + def add_to_recipient(self, recipient): + """ + Add a recipient to "To:" list + """ + self.message.mail_to.append(recipient) + + def add_cc_recipient(self, recipient): + """ + Add a recipient to "Cc:" list + """ + self.message.mail_cc.append(recipient) + + def add_bcc_recipient(self, recipient): + """ + Add a recipient to "Bcc:" list + """ + self.message.mail_bcc.append(recipient) + + def set_subject(self, subj): + """ + Set email subject + """ + self.message.subject = subj + + def set_from(self, from_recipient): + """ + Set from address of message and envelope + """ + self.message.mail_from = from_recipient + + def set_body(self, body): + """ + Set email body + """ + self.message.body = body + + def set_random_body(self, size): + """ + Set a random body of <size> length + """ + body = '' + for i in range(0, size): + body += ''.join(random.choice(string.uppercase + string.digits)) + if i % 80 == 0: + body += "\n" + self.message.body = body + + def add_attachment(self, attach): + """ + Add attachment to a list of filenames + """ + self.message.attachments.append(attach) + + def add_header(self, name, value): + """ + Add a custom header to headers list + """ + self.message.headers[name] = value + + def connect(self): + ''' + Open connection to server + Returns tuple (smtp status code, message) + ''' + return self.smtp.connect(self.host, self.port) + + def present_client_as(self, client_hostname): + ''' + Set helo/ehlo client identity + ''' + self.client_hostname = client_hostname + + def helo(self): + ''' + Send HELO command + Returns tuple (smtp status code, message) + ''' + result = self.smtp.helo(self.client_hostname) + logger.info(result) + return result + + def ehlo(self): + ''' + Send EHLO command + Returns tuple (smtp status code, message) + ''' + result = self.smtp.ehlo(self.client_hostname) + logger.info(result) + return result + + def get_esmtp_features(self): + ''' + Returns hashmap with ESMTP feature received with EHLO + ''' + logger.info(self.smtp.esmtp_features) + return self.smtp.esmtp_features + + def logins(self): + try: + ''' + Login user + Returns tuple (smtp status code, message) + ''' + logger.info("Login with user " + self.user + " and password " + self.password) + '''try: + subuser=bytes.decode(self.user) + subpassword=bytes.decode(self.password) + result = self.smtp.login(subuser.encode('ascii'), subpassword.encode('ascii')) + logger.info(result) + return result + except: + logger.info("本身就是str类型不需要bytes to str!") + subuser=str(self.user).encode('ascii') + subpassword=str(self.password).encode('ascii') + result = self.smtp.login(subuser, subpassword) + logger.info(result) + return result''' + result = self.smtp.login(self.user, self.password) + logger.info(result) + return "mail_success" + except: + return "mail_fail" + + + def starttls(self, keyfile=None, certfile=None): + ''' + sends STARTTLS + optional: keyfile certfile + Returns tuple (smtp status code, message) + ''' + logger.info("STARTTLS") + if keyfile is None and certfile is None: + result = self.smtp.starttls() + else: + result = self.smtp.starttls(keyfile, certfile) + logger.info(result) + return result + + def data(self): + ''' + Data command send email body with "MAIL FROM:", "RCPT TO:" and "DATA" commands + Returns tuple (smtp status code, message) + ''' + result = self.smtp.mail(self.message.mail_from) + result += self.smtp.rcpt(self.message.get_message_recipients()) + + result += self.smtp.data(self.message.get_message_as_string()) + logger.info(result) + return result + + def sendmail(self): + ''' + Send email with "MAIL FROM:", "RCPT TO:" and "DATA" commands + Returns tuple (smtp status code, message) + ''' + result = self.smtp.sendmail(self.message.mail_from, self.message.get_message_recipients(), self.message.get_message_as_string()) + logger.info(result) + return result + + def quit(self): + ''' + Send QUIT command + Returns tuple (smtp status code, message) + ''' + result = self.smtp.quit() + logger.info(result) + return result + + def close_connection(self): + ''' + Close connection to server + ''' + return self.smtp.close() + + def send_message(self): + """ + Send the message, from connection establishment to quit and close connection. + All the connection and email parameters must be already set before invocation. + Returns sendmail response (code, message) + """ + + # Send the message + try: + self.connect() + + if self.user is not None: + self.ehlo() + self.logins() + + send_result = self.sendmail() + + self.quit() + self.close_connection() + # return send_result + return "mail_success" + except: + return "mail_fail" + + @keyword('Send Message With All Parameters') + def send_message_full(self, host, user, password, subj, + from_recipient, to_recipient, cc_recipient=None, bcc_recipient=None, + body=None, attach=None): + """ + Send a message specifing all parameters on the same linecc + cc, bcc and attach parameters may be strings or array of strings + host, user, password, subj, fromadd, toadd - are mandatory parameters + to use the optional paramaters pleas specify the name fo the parameter in the call + user and password even if mandatory could be set to None so no authentication will be made + Example: + sendMail("smtp.mail.com", None, None, "The subject", "[email protected]", "[email protected]", body="Hello World body") + + sendMail("smtp.mail.com", "scott", "tiger", "The subject", "[email protected]", "[email protected]", body="Hello World body", attach=attaches + where could be: + attaches = ["c:\\desktop\\file1.zip", "c:\\desktop\\file2.zip"] or + attaches = "c:\\desktop\\file1.zip" + Returns sendmail response (code, message) + """ + + self.host = host + self.user = user + self.password = password + + self.set_subject(subj) + self.set_from(from_recipient) + self.message.mail_to = to_recipient + if cc_recipient != None: + self.message.mail_cc = cc_recipient + if bcc_recipient != None: + self.message.mail_bcc = bcc_recipient + #Fill the message + if body != None: + self.set_body(body) + # Part two is attachment + if attach != None: + attachlist = ast.literal_eval(attach) + self.message.attachments = attachlist + #logger.info("self.message.attachments:"+str(type(self.message.attachments))) + #logger.info("attachtype:"+str(type(attachlist))) + #logger.info("attachlist:"+str(attachlist)) + + return self.send_message() + + + class _MailMessage: + """ + Simplified email message + This class represent email headers and payload content, not envelope data + """ + + def __init__(self): + """ + init object variables + """ + self.mail_from = None + self.mail_to = [] + self.mail_cc = [] + self.mail_bcc = [] + self.subject = '' + self.body = '' + self.attachments = [] + self.headers = {} + + def get_message_recipients(self): + ''' + Get all message recipients (to, cc, bcc) + ''' + recipients = [] + tolist = ast.literal_eval(self.mail_to) + cclist = ast.literal_eval(self.mail_cc) + bcclist = ast.literal_eval(self.mail_bcc) + recipients.extend(tolist) + recipients.extend(cclist) + recipients.extend(bcclist) + #logger.info("recipientslist:"+str(recipients)) + return recipients + + def get_message_as_string(self): + ''' + Get message as string to be sent with smtplib.sendmail api + ''' + if len(self.attachments) > 0: + #logger.info("attachments:"+str(self.attachments)) + #logger.info("attachmentstype:"+str(type(self.attachments))) + #logger.info("attachmentsnum:"+str(len(self.attachments))) + + envelope = MIMEMultipart() + envelope.attach(MIMEText(self.body)) + else: + envelope = MIMEText(self.body) + + recipients = self.get_message_recipients() + + + tolist = ast.literal_eval(self.mail_to) + cclist = ast.literal_eval(self.mail_cc) + envelope['From'] = self.mail_from + envelope['To'] = COMMASPACE.join(tolist) + envelope['Cc'] = COMMASPACE.join(cclist) + envelope['Subject'] = self.subject + #logger.info("envelope111:"+str(self.attachments)) + for attachment in list(self.attachments): + ctype, encoding = mimetypes.guess_type(attachment) + #logger.info("attachment:"+attachment+" ctype:"+str(ctype)+" encoding:"+str(encoding)) + if ctype is None or encoding is not None: + # No guess could be made, or the file is encoded (compressed), so + # use a generic bag-of-bits type. + ctype = 'application/octet-stream' + maintype, subtype = ctype.split('/', 1) + #logger.info("maintype:"+str(maintype)+" subtype:"+str(subtype)) + + msg = None + if maintype == 'text': + attach_file = open(attachment,'rb') + # TODO: we should handle calculating the charset + msg = MIMEText(attach_file.read(), _subtype=subtype, _charset='utf-8') + attach_file.close() + elif maintype == 'image': + attach_file = open(attachment, 'rb') + msg = MIMEImage(attach_file.read(), _subtype=subtype) + attach_file.close() + elif maintype == 'audio': + attach_file = open(attachment, 'rb') + msg = MIMEAudio(attach_file.read(), _subtype=subtype) + attach_file.close() + else: + attach_file = open(attachment, 'rb') + msg = MIMEBase(maintype, subtype) + msg.set_payload(attach_file.read()) + attach_file.close() + # Encode the payload using Base64 + encoders.encode_base64(msg) + + # Set the filename parameter + msg.add_header('Content-Disposition', 'attachment', + filename=os.path.basename(attachment)) + envelope.attach(msg) + + + #logger.info("envelope.as_string:"+envelope.as_string()) + return envelope.as_string() diff --git a/keyword/common/customlibrary/Smtp3Library/version.py b/keyword/common/customlibrary/Smtp3Library/version.py new file mode 100644 index 0000000..8f9fcb8 --- /dev/null +++ b/keyword/common/customlibrary/Smtp3Library/version.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This file is part of robotframework-Smtp3Library. +# https://github.io/lucamaro/robotframework-SmtpLibrary + +# Licensed under the Apache License 2.0 license: +# http://www.opensource.org/licenses/Apache-2.0 +# Copyright (c) 2016, Luca Maragnani <[email protected]> + +__version__ = '0.1.3' # NOQA diff --git a/keyword/common/customlibrary/ipandstring/__init__.py b/keyword/common/customlibrary/ipandstring/__init__.py new file mode 100644 index 0000000..e7adf5c --- /dev/null +++ b/keyword/common/customlibrary/ipandstring/__init__.py @@ -0,0 +1,4 @@ +from ipandstring.stringip import stringandip + +class ipandstring(stringandip): + ROBOT_LIBRARY_SCOPE = 'GLOBAL'
\ No newline at end of file diff --git a/keyword/common/customlibrary/ipandstring/stringip.py b/keyword/common/customlibrary/ipandstring/stringip.py new file mode 100644 index 0000000..0ce0c44 --- /dev/null +++ b/keyword/common/customlibrary/ipandstring/stringip.py @@ -0,0 +1,68 @@ +import random +import struct +import socket + +class stringandip (object): + def __init__(self): + pass + def ipv4(self,m, n, x): + if m == '-1': + m = random.randint(0, 255) + if n == '-1': + n = random.randint(0, 255) + if x == '-1': + x = random.randint(0, 255) + y = random.randint(0, 255) + print(str(m) + '.' + str(n) + '.' + str(x) + '.' + str(y)) + return str(m) + '.' + str(n) + '.' + str(x) + '.' + str(y) + + def dec2hex(self,string_num): + base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'), ord('A') + 6)] + num = int(string_num) + mid = [] + while True: + if num == 0: break + num, rem = divmod(num, 16) + mid.append(base[rem]) + + return ''.join([str(x) for x in mid[::-1]]) + + def ipv6(self): + ipInt = random.randint(0, 400000000000000000000000000000000000) + ipStr = '' + leftValue = ipInt + + for i in [7, 6, 5, 4, 3, 2, 1, 0]: + string_num = leftValue / 65536 ** i + base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'), ord('A') + 6)] + num = int(string_num) + mid = [] + while True: + if num == 0: break + num, rem = divmod(num, 16) + mid.append(base[rem]) + + ipTokenInt = ''.join([str(x) for x in mid[::-1]]) + if (ipTokenInt == ''): + ipTokenInt = 0 + ipStr = ipStr + str(ipTokenInt) + if i != 0: + ipStr = ipStr + ':' + leftValue %= 65536 ** i + print(ipStr) + return ipStr + + def getstring(self,randomlength=16,base_str='ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'): + """ + 生成一个指定长度的随机字符串 + """ + random_str = '' + #base_str = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789' + length = len(base_str) - 1 + for i in range(randomlength): + random_str += base_str[random.randint(0, length)] + print(random_str) + return random_str +# if __name__ == '__main__': +# ipandstring = ipandstring() +# print(ipandstring.ipv6())
\ No newline at end of file diff --git a/keyword/common/file_operation.robot b/keyword/common/file_operation.robot new file mode 100644 index 0000000..ee5ebed --- /dev/null +++ b/keyword/common/file_operation.robot @@ -0,0 +1,36 @@ +*** Settings *** +Library String +Library json +Library OperatingSystem +Library RequestsLibrary +Library Selenium2Library +Library Collections +Library FileLibrary +Resource common.robot +Resource ../../variable/common_variable.txt + +*** Keywords *** +InsertPolicyIdToFile + [Arguments] ${key} ${policyId} ${objectids} + ${dict} Create Dictionary policyId=${policyId} objectId=${objectids} + ${json} json.Dumps ${dict} + Alter Dict ${path}/variable/AllFlowCaseVariable.txt ${key} ${json} + +InsertTimeToFile + [Arguments] ${key} ${starttime} ${endtime} + ${value} json.Loads ${${key}} + ${dict} Create Dictionary policyId=${value}[policyId] objectId=${value}[objectId] starttime=${starttime} endtime=${endtime} + ${json} json.Dumps ${dict} + Alter Dict ${path}/variable/AllFlowCaseVariable.txt ${key} ${json} + +InsertObjectIdToFile + [Arguments] ${key} ${objectids} + Alter Dict ${path}/all_flow_case_variable.txt ${key} ${objectids} + +InsertStartTimeToFile + [Arguments] ${key} ${starttime} + Alter Dict ${path}/all_flow_case_variable1.txt ${key} ${starttime} + +InsertReportToFile + [Arguments] ${key} ${objectids} + Alter Dict ${path}/ReportCaseVariable.txt ${key} ${objectids} diff --git a/keyword/common/functional_keywords.robot b/keyword/common/functional_keywords.robot new file mode 100644 index 0000000..d49c862 --- /dev/null +++ b/keyword/common/functional_keywords.robot @@ -0,0 +1,260 @@ +*** Settings ***
+Library String
+Library OperatingSystem
+Library RequestsLibrary
+Library Selenium2Library
+Library Collections
+Resource common.robot
+Resource api_request.robot
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+QueryPolicyFile
+ [Arguments] ${url} ${suffix}
+ ${content_quary} GetRequest1 ${url}?isValid=1&${suffix}
+ ${msg_quary} Set Variable ${content_quary['msg']}
+ ${length} Get Length ${content_quary['data']['list']}
+ Should Be True ${length}>0
+ Log quary operation:${msg_quary}
+ Log data:${content_quary['data']['list']}
+
+QueryPolicyFile2
+ [Arguments] ${url} ${suffix}
+ ${content_quary} GetRequest1 ${url}?${suffix}
+ ${msg_quary} Set Variable ${content_quary['msg']}
+ ${length} Get Length ${content_quary['data']['list']}
+ Should Be True ${length}>0
+ Log quary operation:${msg_quary}
+ Log data:${content_quary['data']['list']}
+ ${certId} Set Variable ${content_quary['data']['list'][0]['certId']}
+ [Return] ${certId}
+
+CreatePolicyFile
+ [Documentation]
+ ... 必传参数:url、filePath(文件路径)、fileName(文件名称)
+ ... 可选参数:header(不传时使用默认值)
+ [Arguments] ${url} ${filePath} ${fileName} @{header}
+ ${suffix} Generate Random String
+ ${certName} Catenate SEPARATOR=_ test ${suffix}
+ ${header} Run Keyword If ${header}==[] Set Variable {"isValid":1,"opAction":"add","certName":"${certName}","certId":null,"returnData":1}
+ ... ELSE Get From List ${header} 0
+
+ ${binFile} Evaluate open(r"${path}/${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${fileDesc} Create Dictionary File-Desc=${header}
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${cerId} Set Variable ${list[0]['certId']}
+ ${certName} Set Variable ${list[0]['certName']}
+ ${response} Create Dictionary msg=${msg} certId=${cerId} certName=${certName}
+ Log add operation:${msg}
+ Log cerId:${cerId}
+ [Return] ${response}
+
+CreatePolicyFile2
+ [Documentation]
+ ... 必传参数:url、filePath(文件路径)、fileName(文件名称)、flag(模块标识)
+ ... 可选参数:header(不传时使用默认值)
+ [Arguments] ${url} ${filePath} ${fileName} ${flag} @{header}
+ ${suffix} Generate Random String
+ ${randomName} Catenate SEPARATOR=_ test ${suffix}
+ ${value} Run Keyword If '${flag}'=='resPages' Set Variable {"isValid":1,"format":"html","opAction":"add","profileName":"${randomName}","profileId":null,"returnData":1}
+ ... ELSE IF '${flag}'=='hijack' Set Variable {"isValid":1,"contentType":"text/html","opAction":"add","profileName":"${randomName}","contentName":"${fileName}","profileId":null,"returnData":1}
+ ... ELSE IF '${flag}'=='insert' Set Variable {"isValid":1,"format":"js","insertOn":"after_page_load","opAction":"add","profileName":"${randomName}","profileId":null,"returnData":1}
+ ... ELSE IF '${flag}'=='insertcss' Set Variable {"isValid":1,"format":"css","insertOn":"after_page_load","opAction":"add","profileName":"${randomName}","profileId":null,"returnData":1}
+
+ ${header} Run Keyword If ${header}==[] Set Variable ${value}
+ ... ELSE Get From List ${header} 0
+
+ ${binFile} Evaluate open(r"${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${suffix} Generate Random String
+ ${profileName} Catenate SEPARATOR=_ test ${suffix}
+ ${fileDesc} Create Dictionary File-Desc=${header}
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreatePolicyFile3
+ [Documentation]
+ ... 必传参数:url
+ ... 可选参数:data(不传时使用默认值)
+ [Arguments] ${url} @{data}
+ ${suffix} Generate Random String
+ ${profileName} Catenate SEPARATOR=_ test ${suffix}
+ ${data} Run Keyword If ${data}==[] Set Variable {"opAction":"add","returnData":1,"trafficMirrorList":[{"profileName":"${profileName}","addrType":"mac","isValid":1,"addrArray":["00:A1:B2:06:C3:29"]}]}
+ ... ELSE Get From List ${data} 0
+
+ ${content} PostRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreatePolicyFileNoFile
+ [Documentation]
+ ... 必传参数:url
+ ... 可选参数:data(不传时使用默认值)
+ [Arguments] ${url} ${requestbody}
+ ${content} PostRequest1 ${url} ${requestbody}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreatePolicyMutipartFile
+ [Arguments] ${url} ${filePath} ${pubFileName} ${priFileName} ${keyringType} @{header}
+ [Documentation] 必传参数:url、filePath(文件路径)、pubFileName(证书文件名)、priFileName(私钥文件名),keyringType(证书类型)
+ ... 可选参数:header(不传时使用默认值)
+ ${suffix} Generate Random String
+ ${certName} Catenate SEPARATOR=_ test ${suffix}
+ ${header} Run Keyword If ${header}==[] Set Variable {"isValid":1,"opAction":"add","returnData":1,"keyringName":"${certName}","keyringType":"${keyringType}","reissueExpiryHour":0,"crl":"null","publicKeyAlgo":"rsa1024","keyringId":null,"includeRoot":0}
+ ... ELSE Get From List ${header} 0
+ ${pubFile} Evaluate open(r"${path}/${filePath}${pubFileName}",'rb')
+ ${priFile} Evaluate open(r"${path}/${filePath}${priFileName}",'rb')
+ ${fileDict} Create Dictionary publicFile ${pubFile}
+ Set To Dictionary ${fileDict} privateFile ${priFile}
+ ${requestData} Create Dictionary name="publicFile" filename="${pubFileName}" Content-Type=application/octet-stream
+ Set To Dictionary ${requestData} name privateFile
+ Set To Dictionary ${requestData} filename ${priFileName}
+ Set To Dictionary ${requestData} Content-Type application/octet-stream
+ ${fileDesc} Create Dictionary File-Desc=${header}
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${keyringId} Set Variable ${list[0]['keyringId']}
+ ${keyringName} Set Variable ${list[0]['keyringName']}
+ ${response} Create Dictionary msg=${msg} keyringId=${keyringId} keyringName=${keyringName}
+ Log add operation:${msg}
+ Log keyringId:${keyringId}
+ [Return] ${response}
+
+UpdatePolicyMutipartFile
+ [Arguments] ${url} ${filePath} ${pubFileName} ${priFileName} ${reqHeader}
+ ${pubFile} Evaluate open(r"${path}/${filePath}${pubFileName}",'rb')
+ ${priFile} Evaluate open(r"${path}/${filePath}${priFileName}",'rb')
+ ${fileDict} Create Dictionary publicFile ${pubFile}
+ Set To Dictionary ${fileDict} privateFile ${priFile}
+
+ ${requestData} Create Dictionary name="publicFile" filename="${pubFileName}" Content-Type=application/octet-stream
+ Set To Dictionary ${requestData} name privateFile
+ Set To Dictionary ${requestData} filename ${priFileName}
+ Set To Dictionary ${requestData} Content-Type application/octet-stream
+
+ ${fileDesc} Create Dictionary File-Desc ${reqHeader}
+ ${content} UpFilePutRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ Log update operation:${msg}
+ Log update condition:${reqHeader}
+
+UpdatePolicyFile
+ [Arguments] ${url} ${filePath} ${fileName} ${reqHeader}
+ ${binFile} Evaluate open(r"${path}/${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${fileDesc} Create Dictionary File-Desc=${reqHeader}
+ ${content} UpFilePutRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ Log update operation:${msg}
+ Log update condition:${reqHeader}
+
+UpdatePolicyFile2
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Put Request api ${url} data=${data} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ ${msg} Set Variable ${response['msg']}
+ Log update operation:${msg}
+ Log update condition:${data}
+
+DeletePolicyFile
+ [Arguments] ${url} ${data}
+ ${content} DeleteRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ Log delete operation:${msg}
+ Log delete condition:${data}
+
+TeardownDelete
+ [Arguments] ${url} ${key} ${value}
+ ${ids} Create List ${value}
+ ${data} Create Dictionary ${key}=${ids}
+ ${content} DeleteRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ Log teardown operation:${msg}
+ Log teardown condition:${data}
+
+
+CreatePolicyFile4
+ [Arguments] ${url} ${filePath} ${fileName} ${objectDict}
+ [Timeout]
+ ${binFile} Evaluate open(r"${path}/${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${suffix} Generate Random String
+ ${profileName} Catenate SEPARATOR=_ test ${suffix}
+ log ${objectDict}
+ ${string} Convert To String ${objectDict}
+ ${fileDesc} Create Dictionary File-Desc=${string}
+ log ${fileDesc}[File-Desc]
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreateRequest
+ [Arguments] ${url} ${data}
+ [Documentation] 必传参数:url
+ ... 可选参数:data(不传时使用默认值)
+ ${content} PostRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+DeletePolicyFile1
+ [Arguments] ${url} ${profileId}
+ #删除文件
+ log todeleteobj
+ ${response} BaseDeleteRequest ${url} {"profileIds":[${profileId}]}
+ ${response_code} Get From Dictionary ${response} code
+ Should Be Equal As Strings ${response_code} 200
+ ${response} Convert to String ${response}
+ log ${response}
+DeleteProfileByIds
+ [Arguments] ${typeUrl} ${profileIds}
+ #删除对象
+ log DeleteProfile
+ ${response} BaseDeleteRequest /${version}/policy/profile/${typeUrl} {"profileIds":[${profileIds}]}
+ ${response_code} Get From Dictionary ${response} code
+ #log aaaaaaaaaa:${response_code}
+ Should Be Equal As Strings ${response_code} 200
+ #Integer ${response_code} 200
+ ${response} Convert to String ${response}
+ log ${response}
\ No newline at end of file diff --git a/keyword/common/log_variable.robot b/keyword/common/log_variable.robot new file mode 100644 index 0000000..dc12fb2 --- /dev/null +++ b/keyword/common/log_variable.robot @@ -0,0 +1,146 @@ +*** Settings ***
+Resource ../../variable/common_variable.txt
+Resource logschema.robot
+Library REST http://${host}:${port}
+Library RequestsLibrary
+Library OperatingSystem
+Library Collections
+Library string
+Library customlibrary/Custometest/log_contrast.py
+
+
+*** Keywords ***
+GetLogSchemaByType
+ [Documentation] 根据日志类型获取对应schema
+ ... ${logType}日志类型:
+ ... security_event_log:安全策略
+ ... proxy_event_log:代理策略
+ ... connection_record_log: 协议采集日志
+ ... radius_record_log radius:采集日志
+ ... active_defence_event_log:主动防御日志
+ ... voip_record_log:voip协议日志
+ ... transaction_record_log:事务日志
+ ... live_session_record_log:活跃会话日志
+ ... gtpc_record_log:gtp协议日志
+ ...
+ [Arguments] ${logType}
+ Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"}
+ &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logType}
+ log ${logType}
+ #Output Schema response body
+ Object response body
+ #Integer $.code 200
+ log ${LogSchemaResponse.body['data']}
+ #${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ [Return] ${LogSchemaResponse.body['data']}
+
+GetLogContentByKeyword
+ [Documentation] 根据关键字重schema获取对应内容
+ ... ${logSchema}schema内容
+ ... security_event_log:安全策略
+ ... proxy_event_log:代理策略
+ ... connection_record_log: 协议采集日志
+ ... radius_record_log radius:采集日志
+ ... active_defence_event_log:主动防御日志
+ ... voip_record_log:voip协议日志
+ ... transaction_record_log:事务日志
+ ... live_session_record_log:活跃会话日志
+ ... gtpc_record_log:gtp协议日志
+ ...
+ [Arguments] ${logSchema} ${keyword}
+ ${keyword} Run Keyword If "${keyword}"=="field" Set Veriable ${LogSchemaResponse.body['data']['fields']}
+ ... ELSE IF "${keyword}"=="field" Set Veriable ${LogSchemaResponse.body['data']['fields']}
+ ... ELSE Set Veriable ${LogSchemaResponse.body['data']['fields']}
+ ${content} Evaluate json.dumps(eval(str(${keyword}))) json
+ #${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ [Return] ${content}
+
+OrganizeLogCondition
+ [Documentation] 根据条件list组织条件
+ [Arguments] ${logname} ${startTime} ${endTime} ${field} ${filter}=
+ #${logname} ${startTime} ${endTime} ${client_ip} ${policy_id}
+ ${pageSize} Set Variable 30
+ ${pageNo} Set Variable 1
+ ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}", "filter":"${filter}"}
+ log this time query condition:${logCondition}
+ [Return] ${filds}
+
+LogVeriable
+ [Arguments] ${logType} ${startTime} ${endTime} ${filter}
+ ${logSchema} GetLogSchemaByType ${logType}
+ ${filds} GetLogContentByKeyword ${logSchema} field
+ ${logCondition} OrganizeLogCondition ${logname} ${startTime} ${endTime} ${filds} ${filter}
+ #common_client_ips":"${client_ip}" and "common_policy_ids":"${policy_id}
+ ${logs} GetLogList ${logType} ${logCondition}
+ ${returnvalue} log_contrast ${logs} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${trueorfalse} Run Keyword If "${returnvalue}"=="true" set variable true
+ ... ELSE set variable false
+ Run Keyword If "${returnvalue}"=="true" Exit for loop
+ [Return] ${trueorfalse}
+
+GetLogList
+ [Arguments] ${logType} ${logCondition}
+ log ${logCondition}
+ ${LogListResponse} PostRemoteData /${version}/log/list ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${responseCode} Get From Dictionary ${returnData} code
+ Log ${responseCode}
+ Should Be Equal ${responseCode} ${200} security_event_log \ \ test query list failed
+ log this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ ${a} Set Variable this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ log ${a}
+ ${log} Set Variable ${LogListResponse.json()}[data][list]
+ FOR ${logs} IN ${log}
+ log ${logs}
+ END
+ [Return] ${logs}
+
+GetLogList1
+ [Arguments] ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${logCondition} GetLogCondition ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id}
+ log ${logCondition}
+ ${LogListResponse} PostRemoteData /${version}/log/list ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${responseCode} Get From Dictionary ${returnData} code
+ Log ${responseCode}
+ Should Be Equal ${responseCode} ${200} security_event_log \ \ test query list failed
+ log this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ ${a} Set Variable this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ log ${a}
+ ${log} Set Variable ${LogListResponse.json()}[data][list]
+ FOR ${logs} IN ${log}
+ log ${logs}
+ END
+ log %%%%%%%%%%%%%%%%%%%%%%%%${logs}
+ Should Contain ${logs}"" ${client_ip}
+ Should Contain ${logs}"" ${policy_id}
+ Should Contain ${logs}"" ${parmkey}
+ Should Contain ${logs}"${parmkey}" ${parmvalue}
+
+GetLogListSize
+ [Documentation]
+ ... 描述:ProxyPinning
+ ...
+ [Arguments] ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${logCondition} GetALLLogCondition ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} 10000 1
+ log ${logCondition}
+ ${LogListResponse} PostRemoteData /${version}/log/list ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${data} Get From Dictionary ${returnData} data
+ ${len} Get Length ${data}[list]
+ [Return] ${len}
+
+
+GetLogCount
+ [Arguments] ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${logCondition} GetALLLogCondition ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} 10000 1
+ ${LogListResponse} PostRemoteData /${version}/log/count ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${len} Set Variable ${LogListResponse.json()}[data][total]
+ #${len} Get From Dictionary ${returnData} total
+ [Return] ${len}
+
\ No newline at end of file diff --git a/keyword/common/login_logout.robot b/keyword/common/login_logout.robot new file mode 100644 index 0000000..218ac31 --- /dev/null +++ b/keyword/common/login_logout.robot @@ -0,0 +1,180 @@ +*** Settings *** +Resource ../../variable/common_variable.txt +Library REST http://${host}:${port} +Library Collections +#Library SSHLibrary +Library yaml +#Library json +Library OperatingSystem +Resource ../policys/policy.robot +Resource ../objects/object.robot +Resource clear_data.robot + +*** Keywords *** +InitPotocol + ${appDict} Create Dictionary + ${appVDict} Create Dictionary + Connect To Database Using Custom Params pymysql ${mysqlHost} + ${app_id} query SELECT group_id,low_boundary,region_name FROM tsg_obj_app_id WHERE is_valid=1 AND region_name='http' OR region_name='ssl' OR region_name='dns' OR region_name='ftp' OR region_name='mail' OR region_name='doh' OR region_name='rtp' OR region_name='sip' + # ... SELECT group_id,low_boundary,region_name FROM tsg_obj_app_id WHERE is_valid=1 and region_name in('http','ftp','https','ssl','dns','doh','quic','mail') + ${app_length} Get Length ${app_id} + FOR ${n} IN RANGE ${app_length} + log ${n} + Set To Dictionary ${appDict} ${app_id}[${n}][2]=${app_id}[${n}][0] + Set To Dictionary ${appVDict} ${app_id}[${n}][2]=${app_id}[${n}][1] + log ${appDict} + log ${appVDict} + END + Disconnect From Database + #供策略创建使用的appid + SET GLOBAL VARIABLE ${objprotol} ${appDict} + #供策略验证使用的appid + SET GLOBAL VARIABLE ${appportol} ${appVDict} + GetProtocol + +GetProtocol + ${HTTP_ID1} Get From Dictionary ${objprotol} http + ${RTP_ID1} Get From Dictionary ${objprotol} rtp + ${DNS_ID1} Get From Dictionary ${objprotol} dns + ${MAIL_ID1} Get From Dictionary ${objprotol} MAIL + ${FTP_ID1} Get From Dictionary ${objprotol} ftp + ${SIP_ID1} Get From Dictionary ${objprotol} sip + ${SSL_ID1} Get From Dictionary ${objprotol} ssl + ${DOH_ID1} Get From Dictionary ${objprotol} DoH + SET GLOBAL VARIABLE ${HTTP_ID} ${HTTP_ID1} + SET GLOBAL VARIABLE ${RTP_ID} ${RTP_ID1} + SET GLOBAL VARIABLE ${DNS_ID} ${DNS_ID1} + SET GLOBAL VARIABLE ${MAIL_ID} ${MAIL_ID1} + SET GLOBAL VARIABLE ${FTP_ID} ${FTP_ID1} + SET GLOBAL VARIABLE ${SIP_ID} ${SIP_ID1} + SET GLOBAL VARIABLE ${SSL_ID} ${SSL_ID1} + SET GLOBAL VARIABLE ${DOH_ID} ${DOH_ID1} + + ${HTTP_VID1} Get From Dictionary ${appportol} http + ${RTP_VID1} Get From Dictionary ${appportol} rtp + ${DNS_VID1} Get From Dictionary ${appportol} dns + ${MAIL_VID1} Get From Dictionary ${appportol} MAIL + ${FTP_VID1} Get From Dictionary ${appportol} ftp + ${SIP_VID1} Get From Dictionary ${appportol} sip + ${SSL_VID1} Get From Dictionary ${appportol} ssl + ${DOH_VID1} Get From Dictionary ${appportol} DoH + SET GLOBAL VARIABLE ${HTTP_VID} ${HTTP_VID1} + SET GLOBAL VARIABLE ${RTP_VID} ${RTP_VID1} + SET GLOBAL VARIABLE ${DNS_VID} ${DNS_VID1} + SET GLOBAL VARIABLE ${MAIL_VID} ${MAIL_VID1} + SET GLOBAL VARIABLE ${FTP_VID} ${FTP_VID1} + SET GLOBAL VARIABLE ${SIP_VID} ${SIP_VID1} + SET GLOBAL VARIABLE ${SSL_VID} ${SSL_VID1} + SET GLOBAL VARIABLE ${DOH_VID} ${DOH_VID1} + + +InitTemplate + #加载对象mode + ${YAML}= Get File ${path}/data/template/template.yaml + ${LOADED}= yaml.Safe Load ${YAML} + ${objMode} Get From Dictionary ${LOADED} ip_batch_mode + ${objList} Get From Dictionary ${objMode} objectList + #转json替换 + ${toJson} json.Dumps ${objMode} + ${objList} json.Dumps ${objList} + SET GLOBAL VARIABLE ${objModeJson} ${toJson} + SET GLOBAL VARIABLE ${objListMode} ${objList} + + #${YAML}= Get File ${path}/data/policy_template.yaml + #${LOADED}= yaml.Safe Load ${YAML} + ${policyMode} Get From Dictionary ${LOADED} policy_template + ${policyList} Get From Dictionary ${policyMode} policyList + ${toJson} json.Dumps ${policyMode} + ${policyList} json.Dumps ${policyList} + SET GLOBAL VARIABLE ${policyModeJson} ${toJson} + SET GLOBAL VARIABLE ${policyListMode} ${policyList} + +ApiLogin + [Tags] + # 毕方接口密码加密 + GET /${version}/user/encryptpwd?password=${password} + Object response body + Integer $.code 200 + #log ${rescode} + ${pwd} String $.data.encryptpwd + #log ${pwd} + ${pwdstr} Get From List ${pwd} 0 + log ${pwdstr} + SET GLOBAL VARIABLE ${encodePassword} ${pwdstr} + log ${encodePassword} + #log ${username} + #log ${pwdstr} + POST /${version}/user/login?username=${username}&password=${encodePassword}&authMode=${authmode} + Object response body + #OUTPUT response body + Integer $.code 200 + ${rescode} Integer $.code + log ${rescode} + ${tokenGlobal} String $.data.token + ${tokenStr} Get From List ${tokenGlobal} 0 + log ${tokenStr} + SET GLOBAL VARIABLE ${token} ${tokenStr} + log ${token} + SET GLOBAL VARIABLE ${headers} {"Contest-Type":"application/json","Authorization":"${token}"} + #初始化接口中策略中引用协议和策略校验协议 + Run Keyword If ${addPolicy}==1 InitPotocol + #加载对象mode + InitTemplate + #初始化删除参数 + SET GLOBAL VARIABLE ${createObjectIds} ${EMPTY} + SET GLOBAL VARIABLE ${createPolicyIds} ${EMPTY} + #Return ${rescode} +ApiLogout + [Tags] tsg_adc tsg_bf_api + POST /${version}/user/logout headers=${headers} + Object response body + Integer $.code 200 + ${rescode} Integer $.code + #[Return] ${rescode} + +BifangLoginAndAddLocalIP + [Tags] tsg_adc tsg_bf_api + log ApiLoginAndAddLocalIP + ApiLogin + #log ********** + + Run Keyword If ${addTestClentIPFlag}==1 AddLocalIPObject + log ApiLoginAndAddLocalIP + #添加tsgUIAPI + Run Keyword If ${addTsgUIAPIFlag}==1 SecurityPolicyAllowTSGUIAPIAdd + +BifangLogoutAndDelLocalIP + [Tags] tsg_adc tsg_bf_api + log ApiLogoutAndDelLocalIP + log to_LogoutAndDelLocalIP_LogoutAndDelLocalIP + #删除tsgUIAPI 先删除白名单,是因为此策略也引用了本机IP + Run Keyword If ${addTsgUIAPIFlag}==1 SecurityPolicyAllowTSGUIAPIDEL + Run Keyword If ${addTestClentIPFlag}==1 DelLocalIPObject + #ApiDeleteAutoTagsCase + ApiLogout + #[Return] ${rescode} + +SecurityPolicyAllowTSGUIAPIAdd + [Tags] uiallow + log toAddTSGUIAPI + ${addItemList1} Create Dictionary isSession=endpoint ip=${host}/32 port=0-0 direction=0 protocol=0 isInitialize=0 + #可以添加多个 + ${addItemLists} Create list ${addItemList1} + #objectList对象 + ${objectDict} Create Dictionary objectType=ip objectSubType=endpoint isValid=${1} addItemList=${addItemLists} + ${rescode} ${objectId} AddObjects ${1} ${objectDict} + SET GLOBAL VARIABLE ${testBifangIP} ${objectId} + ${HTTP_ID} Get From Dictionary ${objprotol} http + ${SSL_ID} Get From Dictionary ${objprotol} ssl + Comment 创建安全策略 + ${policyDict} Create Dictionary policyName=SecurityPolicy-Allow-TSGUIAPI policyType=tsg_security policyDesc=autotest action=allow source=${testClentID}|TSG_SECURITY_SOURCE_ADDR destination=${objectId}|TSG_SECURITY_DESTINATION_ADDR userRegion={} isValid=${1} appIdObjects=${HTTP_ID},${SSL_ID} + log ${policyDict} + ${rescode} ${policyId} AddPolicies 1 ${policyDict} v2 + SET GLOBAL VARIABLE ${testBifangPolicy} ${policyId} + log addTSGUIAPISucess +SecurityPolicyAllowTSGUIAPIDEL + [Tags] uiallow + log toDelTSGUIAPI + ${objectIds} Create List ${testBifangIP} + DeletePolicyAndGroupObject ${testBifangPolicy} ${objectIds} + log delTSGUIAPISucess
\ No newline at end of file diff --git a/keyword/common/login_logout_switch.robot b/keyword/common/login_logout_switch.robot new file mode 100644 index 0000000..38e9d7b --- /dev/null +++ b/keyword/common/login_logout_switch.robot @@ -0,0 +1,25 @@ +*** Settings *** +Resource ../../variable/common_variable.txt +Library Collections +Resource login_logout.robot +#Resource logout.robot +#Resource login.robot + + +*** Keywords *** +LoginAndAddLocalIP + [Tags] tsg_adc tsg_bf_api tsg_device tsg_adc_wp adc_api adc_verify adc_log + #[Tags] 分步骤之外的全流程 毕方接口 设备相关 分步骤全部 分步骤策略 分步骤功能端验证 分步骤日志验证 + # 获取主机IP + #${ip} Get Host IP + #Run Keyword If '${ip}' != '${EMPTY}' Set Global Variable ${testClentIP} ${ip} + + Run Keyword If '${loginType}' == 'api' BifangLoginAndAddLocalIP + #... ELSE IF '${loginType}' == 'cli' CliLogin + ... ELSE IF '${loginType}' != '${None}' UiLoginAndAddLocalIP + +LogoutAndDelLocalIP + [Tags] tsg_adc tsg_bf_api tsg_device tsg_adc_wp adc_api adc_verify adc_log + Run Keyword If '${loginType}' == 'api' BifangLogoutAndDelLocalIP + #... ELSE IF '${loginType}' == 'cli' CliLogout + ... ELSE IF '${loginType}' != '${None}' UiLogoutAndDelLocalIP diff --git a/keyword/common/logschema.robot b/keyword/common/logschema.robot new file mode 100644 index 0000000..f10094c --- /dev/null +++ b/keyword/common/logschema.robot @@ -0,0 +1,69 @@ +*** Settings *** +Resource ../../other/all_flow_case_variable.txt +Library RequestsLibrary +Library OperatingSystem +Library Collections +Library string +Library REST http://${host}:${port} + +*** Keywords *** +GetLogCondition + [Arguments] ${logname} ${startTime} ${endTime} ${client_ip} ${policy_id} + Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"} + &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logname} + log ${logname} + #Output Schema response body + Object response body + #Integer $.code 200 + log ${LogSchemaResponse.body['data']} + ${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json + log ${field} + ${pageSize} Set Variable 30 + ${pageNo} Set Variable 1 + ${condition} Set Variable [{"value":["${startTime}","${endTime}"],"symbol":"between","field":"common_recv_time","type":"timestamp"}] + ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}","conditions":${condition} ,"common_client_ips":"${client_ip}","common_policy_ids":"${policy_id}"} + log this time query condition:${logCondition} + [Return] ${logCondition} + +PostRemoteData + [Arguments] ${url} ${data} + ${header} Create Dictionary Content-Type=application/json Authorization=${token} + Create Session api http://${host}:${port} headers=${header} + ${remoteResponse} Post Request api ${url} data=${data} headers=${header} + [Return] ${remoteResponse} + +GetALLLogCondition + [Arguments] ${logname} ${startTime} ${endTime} ${client_ip} ${policy_id} ${pageSize} ${pageNo} + Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"} + &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logname} + log ${logname} + #Output Schema response body + Object response body + #Integer $.code 200 + log ${LogSchemaResponse.body['data']} + ${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json + log ${field} + #${pageSize} Set Variable 30 + #${pageNo} Set Variable 1 + ${condition} Set Variable [{"value":["${startTime}","${endTime}"],"symbol":"between","field":"common_recv_time","type":"timestamp"}] + ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}","conditions":${condition} ,"common_client_ips":"${client_ip}","common_policy_ids":"${policy_id}"} + log this time query condition:${logCondition} + [Return] ${logCondition} + +GetLogCountConditon + [Arguments] ${logname} ${startTime} ${endTime} ${client_ip} ${policy_id} ${pageSize} ${pageNo} + Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"} + &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logname} + log ${logname} + #Output Schema response body + Object response body + #Integer $.code 200 + log ${LogSchemaResponse.body['data']} + ${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json + log ${field} + #${pageSize} Set Variable 30 + #${pageNo} Set Variable 1 + ${condition} Set Variable [{"value":["${startTime}","${endTime}"],"symbol":"between","field":"common_recv_time","type":"timestamp"}] + ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}","conditions":${condition} ,"common_client_ips":"${client_ip}","common_policy_ids":"${policy_id}"} + log this time query condition:${logCondition} + [Return] ${logCondition}
\ No newline at end of file diff --git a/keyword/common/systemcommand.robot b/keyword/common/systemcommand.robot new file mode 100644 index 0000000..cc96c8c --- /dev/null +++ b/keyword/common/systemcommand.robot @@ -0,0 +1,63 @@ +*** Settings *** +Library OperatingSystem +Library Selenium2Library +Library RequestsLibrary +Library Collections +Resource ../../variable/common_variable.txt + +*** Keywords *** +SystemCommands + [Arguments] ${commandstr} ${stringlist} + log toSystemCommand_SystemCommandTest + ${commandreturn} OperatingSystem.Run ${commandstr} + Append To File ${path}/write_file.txt ${commandstr} + Append To File ${path}/write_file.txt %%%%%%%%%%%%%%newbat + Append To File ${path}/write_file.txt ${commandreturn} + ${listlenth}= Get Length ${stringlist} + FOR ${var} IN RANGE ${listlenth} + #log ${var} + Should Contain ${commandreturn} ${stringlist}[${var}] + END + ${rescode} Set Variable 200 + log ${rescode} + [Return] ${rescode} + +SystemCommand + [Arguments] ${commandstr} @{stringlist} + log dxytest${commandstr} + ${commandreturn} OperatingSystem.Run ${commandstr} + #nslookup -d www.jd.com + log ${commandreturn} + FOR ${var} IN @{stringlist} + log dxytest + log ${var} + Should Contain ${commandreturn} ${var} + END + #Should Contain ${commandreturn} ${qatype} + ${rescode} Set Variable 200 + log ${rescode} + [Return] ${rescode} + + +SystemCommandReturnCompare + #执行命令并比对命令返回结果 需要执行的系统命令 命令返回结果要包含的字符串列表 命令返回结果不能包含的字符串列表 + [Arguments] ${commandstr} ${stringlist} ${stringlistnotin} + log toSystemCommand_SystemCommandTest + ${commandreturn} OperatingSystem.Run ${commandstr} + Append To File ${path}/write_file.txt ${commandstr} + Append To File ${path}/write_file.txt %%%%%%%%%%%%%%newbat + Append To File ${path}/write_file.txt ${commandreturn} + #${commandreturn} Set Variable abcdeConnection was reset + ${listlenth}= Get Length ${stringlist} + FOR ${var} IN RANGE ${listlenth} + log ${var} + Should Contain ${commandreturn} ${stringlist}[${var}] + END + ${listnotin}= Get Length ${stringlistnotin} + FOR ${varn} IN RANGE ${listnotin} + log ${varn} + Should Not Contain ${commandreturn} ${stringlistnotin}[${varn}] + END + ${rescode} Set Variable 200 + log ${rescode} + [Return] ${rescode} |
