summaryrefslogtreecommitdiff
path: root/keyword/common
diff options
context:
space:
mode:
author董晓燕 <[email protected]>2021-06-03 09:55:45 +0000
committer董晓燕 <[email protected]>2021-06-03 09:55:45 +0000
commitac68e65f508799a0e555a240ae374d313a0a8d75 (patch)
tree2a339bbd8acd65e2fb235159cc9c5303ae5725b7 /keyword/common
parent2f39b56d617e5fba2b8d73d81cd5e6d894f85352 (diff)
parent4667c668725ff7cb673c637a297c67283876d4d4 (diff)
Merge branch 'develop' into 'master'HEADmaster
Develop See merge request dongxiaoyan/gap_tsg_api!4
Diffstat (limited to 'keyword/common')
-rw-r--r--keyword/common/api_request.robot52
-rw-r--r--keyword/common/clear_data.robot68
-rw-r--r--keyword/common/command.robot103
-rw-r--r--keyword/common/common.robot255
-rw-r--r--keyword/common/common_interface.robot47
-rw-r--r--keyword/common/customlibrary/Custometest/Common.py51
-rw-r--r--keyword/common/customlibrary/Custometest/JsonDiff.py6
-rw-r--r--keyword/common/customlibrary/Custometest/LogResponseVAL.py203
-rw-r--r--keyword/common/customlibrary/Custometest/LogSchema.py513
-rw-r--r--keyword/common/customlibrary/Custometest/MD5.py40
-rw-r--r--keyword/common/customlibrary/Custometest/ReportSchema.py718
-rw-r--r--keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py871
-rw-r--r--keyword/common/customlibrary/Custometest/Schema.py350
-rw-r--r--keyword/common/customlibrary/Custometest/StringManipulation.py7
-rw-r--r--keyword/common/customlibrary/Custometest/UIAssert.py22
-rw-r--r--keyword/common/customlibrary/Custometest/__init__.py16
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pycbin0 -> 1315 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pycbin0 -> 343 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pycbin0 -> 3619 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pycbin0 -> 6947 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pycbin0 -> 339 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pycbin0 -> 898 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pycbin0 -> 415 bytes
-rw-r--r--keyword/common/customlibrary/Custometest/certificate.yaml0
-rw-r--r--keyword/common/customlibrary/Custometest/cmd_cer.py290
-rw-r--r--keyword/common/customlibrary/Custometest/log_contrast.py8
-rw-r--r--keyword/common/customlibrary/Custometest/printlog.py11
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py7
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py26
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py7
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py22
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py85
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py6
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth2
-rw-r--r--keyword/common/customlibrary/ExtensionPackages/readme.txt4
-rw-r--r--keyword/common/customlibrary/Library/VerifyPolicy.py38
-rw-r--r--keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pycbin0 -> 953 bytes
-rw-r--r--keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pycbin0 -> 745 bytes
-rw-r--r--keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pycbin0 -> 625 bytes
-rw-r--r--keyword/common/customlibrary/Library/delUseless.py45
-rw-r--r--keyword/common/customlibrary/Library/fileOperations.py26
-rw-r--r--keyword/common/customlibrary/Pop3Library/__init__.py200
-rw-r--r--keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pycbin0 -> 4589 bytes
-rw-r--r--keyword/common/customlibrary/Pop3Library/readme.txt26
-rw-r--r--keyword/common/customlibrary/Smtp3Library/__init__.py417
-rw-r--r--keyword/common/customlibrary/Smtp3Library/version.py11
-rw-r--r--keyword/common/customlibrary/ipandstring/__init__.py4
-rw-r--r--keyword/common/customlibrary/ipandstring/stringip.py68
-rw-r--r--keyword/common/file_operation.robot36
-rw-r--r--keyword/common/functional_keywords.robot260
-rw-r--r--keyword/common/log_variable.robot146
-rw-r--r--keyword/common/login_logout.robot180
-rw-r--r--keyword/common/login_logout_switch.robot25
-rw-r--r--keyword/common/logschema.robot69
-rw-r--r--keyword/common/systemcommand.robot63
55 files changed, 5404 insertions, 0 deletions
diff --git a/keyword/common/api_request.robot b/keyword/common/api_request.robot
new file mode 100644
index 0000000..3325e2a
--- /dev/null
+++ b/keyword/common/api_request.robot
@@ -0,0 +1,52 @@
+*** Settings ***
+Library Collections
+Library json
+Library RequestsLibrary
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+PostRequest1
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ #${data1} json.dumps ${data}
+ ${remoteResponse} Post Request api ${url} data=${data} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+GetRequest1
+ [Arguments] ${url}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Get Request api ${url} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+DeleteRequest1
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Delete Request api ${url} data=${data} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+UpFilePostRequest
+ [Arguments] ${url} ${data} ${files} ${fileDesc}
+ ${header} Set To Dictionary ${fileDesc} Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Post Request api ${url} data=${data} files=${files} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
+
+UpFilePutRequest
+ [Arguments] ${url} ${data} ${files} ${fileDesc}
+ ${header} Set To Dictionary ${fileDesc} Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Put Request api ${url} params=${data} files=${files} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ [Return] ${response}
diff --git a/keyword/common/clear_data.robot b/keyword/common/clear_data.robot
new file mode 100644
index 0000000..1c933fb
--- /dev/null
+++ b/keyword/common/clear_data.robot
@@ -0,0 +1,68 @@
+*** Settings ***
+Library Collections
+Library RequestsLibrary
+Resource ../../variable/common_variable.txt
+#Resource functional_keywords.robot
+Resource ../policys/policy.robot
+Resource ../objects/object.robot
+Resource ../objects/application.robot
+Resource common.robot
+*** Variables ***
+${policyUrl} /policy/compile
+*** Keywords ***
+DeletePolicyAndObjectAndOther
+ [Documentation] delete :policy object category app signature profile ...
+ [Arguments] ${policyIds}=${EMPTY} ${objectIds}=${EMPTY} ${categoryIds}=${EMPTY} ${profiledId}=${EMPTY} ${appIds}=${EMPTY} ${signatureId}=${EMPTY}
+ #Run Keyword If "${policyIds}"=="${EMPTY}" log no policyIds to del
+ #... ELSE DeletePoliciebyList ${policyIds}
+ Run Keyword If "${createPolicyIds}"=="${EMPTY}" log no policyIds to del
+ ... ELSE DeletePoliciebyList ${createPolicyIds}
+
+ #Run Keyword If "${objectIds}"=="${EMPTY}" log no objectIds to del
+ #... ELSE DeleteObjectByIds ${objectIds}
+ Run Keyword If "${createObjectIds}"=="${EMPTY}" log no objectIds to del
+ ... ELSE DeleteObjectByIds ${createObjectIds}
+
+ Run Keyword If "${categoryIds}"=="${EMPTY}" log no categoryIds to del
+ ... ELSE DeleteCategoryByIds ${categoryIds}
+ #删除文件
+ Run Keyword If "${profiledId}"=="${EMPTY}" log no profiledId to del
+ ... ELSE DeleteProfileByIds ${url} ${profiledId}
+
+ Run Keyword If "${appids}"=="${EMPTY}" log no appids to del
+ ... ELSE DeleteApplicationByIds ${appids}
+
+ Run Keyword If "${signatureId}"=="${EMPTY}" log no signatureId to del
+ ... ELSE DeleteSignatureByIds ${signatureId}
+
+DeletePolicyAndGroupObject
+ [Documentation] delete :policy object
+ ... policyList
+ [Arguments] ${policyList}=${EMPTY} ${objectIds}=${EMPTY}
+ Run Keyword If "${policyList}"=="${EMPTY}" log no policyList to del
+ ... ELSE DeletePoliciebyList ${policyList}
+
+ Run Keyword If "${objectIds}"=="${EMPTY}" log no objectIds to del
+ ... ELSE DeleteObjectByIds ${objectIds}
+DeletePolicyAndObject
+ [Arguments] ${policyids} ${objectids} ${categoriesId}=null
+ ${objectId1} Create List [${objectids}]
+ ${categoryId1} Create List [${categoriesId}]
+ DeletePolicyAndGroupObject1 ${policyIds} ${objectId1} ${categoryId1}
+
+DeletePolicyAndGroupObject1
+ [Arguments] ${policyids} ${objectids} ${categoriesId}
+ #引用此关键字,${objectids}参数必须是双层列表,eg:['[1]','[2,3]','[4,5,6,7]']
+ #删除策略和对象
+ log toDeletePolicy_DeletePolicyAndObject
+ ${listlenth}= Get Length ${policyids}
+ Run Keyword If "${listlenth}"=="0" log no Policys to del
+ ... ELSE DeletePoliciebyList ${policyids}
+ #删除对象
+ Run Keyword If "${objectids}"=="${EMPTY}" log no Objects to del
+ ... ELSE IF ${objectids}==['[]'] log no Objects to del
+ ... ELSE DeleteGroupObjects ${objectids}
+ log ${categoriesId}
+ Run Keyword If "${categoriesId}"=="${EMPTY}" or ${categoriesId}==['[null]'] or ${categoriesId}==['[]'] log no Categories to del
+ ... ELSE DeleteGroupCategories ${categoriesId}
+ \ No newline at end of file
diff --git a/keyword/common/command.robot b/keyword/common/command.robot
new file mode 100644
index 0000000..fac6643
--- /dev/null
+++ b/keyword/common/command.robot
@@ -0,0 +1,103 @@
+*** Settings ***
+Library OperatingSystem
+Library Selenium2Library
+Library RequestsLibrary
+Library Collections
+Library ../../customlib/common/common.py
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+SystemCommands
+ [Documentation] 执行系统命令,在命令结果中匹配指定字符串列表,需匹配到所有字符串才为通过
+ [Arguments] ${commandstr} ${stringlist}
+ log toSystemCommand_SystemCommandTest
+ ${commandreturn} OperatingSystem.Run ${commandstr}
+ #Create File ${path}/test.txt ${commandreturn}
+ #Append To File ${path}/write_file.txt ${commandstr}
+ #Append To File ${path}/write_file.txt %%%%%%%%%%%%%%newbat
+ #Append To File ${path}/write_file.txt ${commandreturn}
+ ${listlenth}= Get Length ${stringlist}
+ FOR ${var} IN RANGE ${listlenth}
+ #log ${var}
+ Should Contain ${commandreturn} ${stringlist}[${var}]
+ END
+ ${rescode} Set Variable 200
+ log ${rescode}
+ [Return] ${rescode}
+
+SystemCommandsRetry
+ [Documentation] 执行系统命令,失败后重试
+ [Arguments] ${commandstr} ${stringlist} ${retryNum}=1
+ log to_SystemCommandsRetry
+ ${trueCounter} Set Variable 0
+ ${falseCounter} Set Variable 0
+ FOR ${var} IN RANGE ${retryNum}
+ ${rescode} SystemCommandContains ${commandstr} ${stringlist}
+ ${trueCounter} ${falseCounter} Counter ${trueCounter} ${falseCounter} ${rescode}
+ END
+ [Return] ${trueCounter}
+IsContain
+ [Documentation] 比较一个字符串是否包含另一个字符串,包含返回true,否则返回false
+ ... 多行内容,例如命令指向结果有问题。
+ [Arguments] ${sourceStr} ${findContent}
+ ${rescode} Evaluate "abc" in "abcd"
+ ${rescode} Evaluate ${findContent} in ${sourceStr}
+ log ${rescode}
+ #https://blog.csdn.net/sun_977759/article/details/107815615
+ #${file} Evaluate open(r'${filepath})
+ # Evaluate ${file}.close()
+ #[Robot Framework] 校验字符串中是否包含某个子字符串,校验同时满足两个条件中任意一个
+#${tWarningMessage} Run Keyword If ${tIfExist} AutoItLibrary.Win Get Text Generate Fee Data warning message ELSE Set Variable ""
+
+#${tIfDuplicateFee} Evaluate "Duplicate Fee Warning" in """${tWarningMessage}"""
+
+#${rCheckResult} Evaluate ${totalNumberNew}==${totalNumberOld} or ${totalNumberNew}>${totalNumberOld}
+
+ [Return] ${rescode}
+
+Counter
+ [Documentation] 给计数器赋值
+ [Arguments] ${trueCounter} ${falseCounter} ${value}
+ ${trueCounter} Run Keyword If "${value}" == "True" Evaluate ${trueCounter}+1
+ ... ELSE Set Variable ${trueCounter}
+ ${falseCounter} Run Keyword If "${value}" == "False" Evaluate ${falseCounter}+1
+ ... ELSE Set Variable ${falseCounter}
+ [Return] ${trueCounter} ${falseCounter}
+
+SystemCommandContains
+ [Documentation] 执行系统命令,在命令结果中匹配指定字符串列表,匹配大于指定个数即为通过
+ [Arguments] ${commandStr} ${stringList} ${passNum}=1
+ log toSystemCommand_SystemCommandTest
+ ${commandreturn} OperatingSystem.Run ${commandStr}
+ ${listlenth}= Get Length ${stringList}
+ ${trueCounter} Set Variable 0
+ ${falseCounter} Set Variable 0
+ FOR ${var} IN RANGE ${listlenth}
+ #log ${var}
+ ${rescode} aisincludeb ${stringList}[${var}] ${commandreturn}
+ ${trueCounter} ${falseCounter} Counter ${trueCounter} ${falseCounter} ${rescode}
+ END
+ #${rescode} Evaluate ${totalNumberNew}==${totalNumberOld} or ${totalNumberNew}>${totalNumberOld}
+ ${rescode} Evaluate ${trueCounter}==${passNum} or ${trueCounter}>${passNum}
+ log ${rescode}
+ [Return] ${rescode}
+
+
+SystemCommandReturnCompare
+ [Documentation] 执行命令并比对命令返回结果 需要执行的系统命令 命令返回结果要包含的字符串列表 命令返回结果不能包含的字符串列表
+ [Arguments] ${commandStr} ${stringList} ${stringListNotIn}
+ log toSystemCommand_SystemCommandTest
+ ${commandreturn} OperatingSystem.Run ${commandStr}
+ ${listlenth}= Get Length ${stringList}
+ FOR ${var} IN RANGE ${listlenth}
+ log ${var}
+ Should Contain ${commandreturn} ${stringList}[${var}]
+ END
+ ${listnotin}= Get Length ${stringlistnotin}
+ FOR ${varn} IN RANGE ${listnotin}
+ log ${varn}
+ Should Not Contain ${commandreturn} ${stringListNotIn}[${varn}]
+ END
+ ${rescode} Set Variable 200
+ log ${rescode}
+ [Return] ${rescode}
diff --git a/keyword/common/common.robot b/keyword/common/common.robot
new file mode 100644
index 0000000..4a585a6
--- /dev/null
+++ b/keyword/common/common.robot
@@ -0,0 +1,255 @@
+*** Settings ***
+Resource ../../variable/common_variable.txt
+Library REST http://${host}:${port}
+Library Collections
+Library RequestsLibrary
+Library json
+
+*** Keywords ***
+ManageApistr
+ [Documentation]
+ ... 描述:入口,apistr 为调用api地址
+ ... 本关键字的作用是判断地址前有没有加版本号,如果没有加上
+ ... policy/compile -> /v1/policy/compile
+ ... /policy/compile -> /v1/policy/compile
+ ... v1/policy/compile -> /v1/policy/compile
+ ... /v1/policy/compile -> /v1/policy/compile
+ [Arguments] ${apistr}
+ ${apiStart} Evaluate '${apistr}'[0:1]
+ ${apiStart1} Evaluate '${apistr}'[0:2]
+ ${apistr} Run Keyword If "${apiStart}"!="/" and "${apiStart}"!="v"
+ ... Set Variable /${version}/${apistr}
+ ... ELSE IF "${apiStart}"=="/" and "${apiStart1}"!="/v"
+ ... Set Variable /${version}${apistr}
+ ... ELSE IF "${apiStart}"=="v" Set Variable /${apistr}
+ ... ELSE IF "${apiStart1}"=="/v" Set Variable ${apistr}
+ ... ELSE Set Variable ${apistr}
+ log ${apistr}
+ [Return] ${apistr}
+BasePostRequest
+ [Arguments] ${apistr} ${body}
+ Set Headers {"Authorization":"${token}","Content-Type":"application/json"}
+ log ${apistr}
+ &{httpResponse} Post ${apistr} ${body}
+ #Output response body
+ Object response body
+ #Integer $.code 200
+ #Array $.data.policyList
+ ${response} Set Variable ${httpResponse.body}
+ [Return] ${response}
+
+BasePostRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BasePostRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+
+BasePostRequestForV2
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Post Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseGetRequest
+ [Arguments] ${apistr} ${body}
+ Set Headers {"Authorization":"${token}","Content-Type":"application/json"}
+ &{httpResponse} Get ${apistr}?${body}
+ #Output response body
+ Object response body
+ #Integer $.code 200
+ #Array $.data.policyList
+ ${response} Set Variable ${httpResponse.body}
+ [Return] ${response}
+BaseGetRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BaseGetRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+BaseGetRequestForV2
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Get Request api ${requestUri}?${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseDeleteRequest
+ [Arguments] ${requestUri} ${data}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port} ${headers}
+ ${response}= Delete Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+BaseDeleteRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BaseDeleteRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+
+BaseEditRequest
+ [Arguments] ${requestUri} ${data}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port} ${headers}
+ ${response}= Put Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+BaseEditRequestOK
+ [Arguments] ${apistr} ${body}
+ ${response} BaseEditRequest ${apistr} ${body}
+ log ${response['code']}
+ Should Be Equal As Strings ${response['code']} 200
+ [Return] ${response}
+BaseEditRequestForV2
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${apiStart} Evaluate '${requestUri}'[0:1]
+ ${requestUri} Run Keyword If "${apiStart}"=="/" set variable /${apiVersion}${requestUri}
+ ... ELSE set variable /${apiVersion}/${requestUri}
+ ${response} BaseEditRequest ${requestUri} ${data}
+ [Return] ${response}
+#拼接字典类型数据为get请求的字符串
+DictionaryToQueryParams
+ [Documentation]
+ ... 接收get参数字典,转换为请求字符串
+ [Arguments] ${params}
+ ${paramsString} = Set Variable ${EMPTY}
+ Run Keyword And Return If "${params}" == "${EMPTY}" Set Variable ${EMPTY}
+ FOR ${key} IN @{params}
+ ${value} = Get From Dictionary ${params} ${key}
+ ${paramStr} = Catenate SEPARATOR=\= ${key} ${value}
+ ${len} = Get Length ${paramsString}
+ ${paramsString} = Run Keyword If ${len} != 0 Catenate SEPARATOR=& ${paramsString} ${paramStr}
+ ... ELSE Set Variable ${paramStr}
+ END
+ Log To Console ${paramsString}
+ [Return] ${paramsString}
+BaseFormRequest
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response} Run Keyword If "${data}"=="${EMPTY}" Get Request api ${requestUri}
+ ... ELSE Get Request api ${requestUri}?${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseFormRequest1
+ [Documentation]
+ ... 下载文件专用
+ ... 由于下载的json存在特殊字符
+ ... 隐藏需要设置response.encoding='utf-8-sig'
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response} Run Keyword If "${data}"=="${EMPTY}" Get Request api ${requestUri}
+ ... ELSE Get Request api ${requestUri}?${data}
+ log return data =${response}
+ ${response.encoding} set variable utf-8-sig
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} json.Loads ${response.content}
+ [Return] ${response}
+BaseMultipartPostRequest
+ [Arguments] ${requestUri} ${data} ${files} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}"}
+ ${dataString} DictionaryToQueryParams ${data}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Post Request api ${requestUri}?${dataString} files=${files}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BasePostRequestReturnBinary
+ [Documentation]
+ ... 下载文件专用
+ ... 返回二进制数据
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ Log Call BasePostRequestReturnBinary
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ Log ${data}
+ ${response} Post Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ [Return] ${response.content}
+BaseGetRequestReturnBinary
+ [Documentation]
+ ... 下载文件专用
+ ... 返回二进制数据
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ Log Call BasePostRequestReturnBinary
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/json"}
+ ${dataString} DictionaryToQueryParams ${data}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ Log ${data}
+ ${response} Get Request api ${requestUri}?${dataString}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ [Return] ${response.content}
+
+BaseFormEditRequest
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Put Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+BaseFormPostRequest
+ [Arguments] ${requestUri} ${data} ${apiVersion}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"application/x-www-form-urlencoded"}
+ create session api http://${host}:${port}/${apiVersion} ${headers}
+ ${response}= Post Request api ${requestUri} data=${data}
+ log return data =${response}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+OamRequest
+ [Arguments] ${requestUri} ${path} ${method} ${contentType} ${data}
+ ${headers} set variable {"Authorization":"${token}","Content-Type":"${contentType}","path":"${path}"}
+ create session api http://${host}:${port} ${headers}
+ ${response} Run Keyword If "${method}" == "GET" Get Request api ${requestUri}?${data}
+ ... ELSE IF "${method}" == "POST" Post Request api ${requestUri} data=${data}
+ ... ELSE IF "${method}" == "PUT" Put Request api ${requestUri} data=${data}
+ ... ELSE IF "${method}" == "DELETE" Delete Request api ${requestUri} data=${data}
+ ... ELSE Set Variable ${EMPTY}
+ Should Be Equal As Strings ${response.status_code} 200
+ ${response} to json ${response.content}
+ [Return] ${response}
+
+ResultGetRequestOK
+ [Arguments] ${apistr} ${body}
+ sleep 35
+ ${response} BaseGetRequest ${apistr} ${body}
+ log ${response['code']}
+ log ${response}[data][list][0][status]
+ Should Be Equal As Strings ${response['code']} 200
+ Should Be Equal As Strings ${response}[data][list][0][status] 2
+
+##################################################################################################
+AppendListToList
+ [Tags]
+ [Documentation] 把${addList}逐个添加到${souceList},然后返回 ${souceList}
+ ... 入参:${souceList} ${insertList}
+ ... 出:参${souceList}
+ [Arguments] ${souceList} ${insertList}
+ Comment 参数${objids}拼接后的list;参数${objectIds}对象ids
+ Comment 如果参数${objectIds}返回多个id则逐个添加
+ FOR ${objdict} IN @{insertList}
+ Append To List ${souceList} ${objdict}
+ END
+ [Return] ${souceList}
+ \ No newline at end of file
diff --git a/keyword/common/common_interface.robot b/keyword/common/common_interface.robot
new file mode 100644
index 0000000..7029c3a
--- /dev/null
+++ b/keyword/common/common_interface.robot
@@ -0,0 +1,47 @@
+*** Settings ***
+Library Collections
+Library RequestsLibrary
+Resource ../../../variable/common_variable.txt
+#Resource functional_keywords.robot
+Resource ../policys/policy.robot
+Resource ../objects/object.robot
+Resource ../objects/application.robot
+Resource common.robot
+*** Variables ***
+${policyUrl} /policy/compile
+*** Keywords ***
+GetJsonFromModeAndData
+ [Documentation] 根据接口模板和对应测试数据返回请求接口所需数据
+ [Arguments] ${modleFilePath} ${dataFilePath} ${keyword} ${datalistname}
+ #yaml在线格式化:https://www.bejson.com/validators/yaml_editor/
+ ${yamlMode}= Get File ${modleFilePath}
+ ${loadedMode}= yaml.Safe Load ${yamlMode}
+ #${retkeys} evaluate [one for one in ${ip_secuirty_allow_dns_001}]
+ #${dictType} = Evaluate type(${retkeys})
+ ${yamlData}= Get File ${dataFilePath}
+ ${loadedData}= yaml.Safe Load ${yamlData}
+ ${dataJson} Get From Dictionary ${loadedData} ${keyword}_data
+ ${dataKey} Get From Dictionary ${loadedData} keywords
+ ${dataKeyType} = Evaluate type(${dataKey})
+ FOR ${key} IN @{dataKey}
+ LOG passssssssss
+ ${data} Get From Dictionary ${dataJson}[0] ${key}
+ #Continue For Loop If Dictionary does not contain key ${key}
+ Continue For Loop If "${data}" == "Empty"
+ Remove From Dictionary ${loadedMode}[${keyword}_mode][${datalistname}][0] ${key}
+ Set To Dictionary ${loadedMode}[${keyword}_mode][${datalistname}][0] ${key} ${data}
+ END
+ ${modeJson} Get From Dictionary ${loadedMode} ${keyword}_mode
+ ${returnData} Get From Dictionary ${modeJson} returnData
+ ${returnJson} json.Dumps ${modeJson}
+
+ #Set To Dictionary ${LOADED}[patch_id_bw_data][pronghornResponseBody][responseBody][0][value][0] value=200
+ #${addItemList1} Create Dictionary isSession=endpoint ip=${testClentIP} port=0-65535 direction=0 protocol=0 isInitialize=0
+ #${addItemLists} Create list ${addItemList1}
+ #${objectDict} Create Dictionary objectType=ip objectSubType=endpoint isValid=${1} addItemList=${addItemLists}
+ #${rescode} ${objectId} AddObjects ${1} ${objectDict}
+ #${objectids} set Variable ${objectId}
+ [Return] ${returnJson}
+
+
+ \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/Common.py b/keyword/common/customlibrary/Custometest/Common.py
new file mode 100644
index 0000000..d009a7a
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/Common.py
@@ -0,0 +1,51 @@
+import json
+import random
+import hashlib
+import os
+
+#判断一个字符或字符串是否包含于另一个字符串:a是否再b中,是否则返回True,否则返回Falsle
+def aisincludeb(a,b):
+ result = a in b
+ print(result)
+ return result
+
+#删除字符串当前前几个,或后几个:sourcestr源串,a[2:-2] 表示去掉前面两个和后面两个,如果光去掉后面的a[:-2]
+def removeBeforOrAfter(sourcestr,a):
+ #a = "16541616584984"
+ #a = a[2:-2]
+ sourcestr = sourcestr[a]
+ return result
+
+#分离字符串
+def string2list(str,split):
+ return str.split(split)
+
+#用于生成一个指定范围内的整数
+def randomint(a,b):
+ return random.randint(a,b)
+
+#较小文件处理方法:
+def get_md5_01(file_path):
+ md5 = None
+ if os.path.isfile(file_path):
+ f = open(file_path,'rb')
+ md5_obj = hashlib.md5()
+ md5_obj.update(f.read())
+ hash_code = md5_obj.hexdigest()
+ f.close()
+ md5 = str(hash_code).lower()
+ return md5
+
+#较大文件处理方法:
+def get_md5_02(file_path):
+ f = open(file_path,'rb')
+ md5_obj = hashlib.md5()
+ while True:
+ d = f.read(8096)
+ if not d:
+ break
+ md5_obj.update(d)
+ hash_code = md5_obj.hexdigest()
+ f.close()
+ md5 = str(hash_code).lower()
+ return md5 \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/JsonDiff.py b/keyword/common/customlibrary/Custometest/JsonDiff.py
new file mode 100644
index 0000000..ef50112
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/JsonDiff.py
@@ -0,0 +1,6 @@
+from json_compare import Jcompare
+
+def json_diff(a,b):
+ cp=Jcompare()
+ results = cp.compare(a,b)
+ return results \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/LogResponseVAL.py b/keyword/common/customlibrary/Custometest/LogResponseVAL.py
new file mode 100644
index 0000000..dbf6474
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/LogResponseVAL.py
@@ -0,0 +1,203 @@
+import re
+import time
+
+import jsonpath
+# 1.说明:本方法用于对日志接口返回数据中的字段和数据进行判断
+# 2.传入数据说明:responsedict - 接口返回数据的json数据
+# targetlist - 判断参数 ,可传入多个条件,在robotfromwork中条件以Tab分隔 。参数格式为: 字段(key)_判断条件_数据(value) ,一个条件内各参数以空格分隔 例:common_log_id = 238734003578214400
+# 判断条件包括:= 、 != 、> 、< 、>= 、<= 、in 、notin 、like、 notlike 、notEmpty 、 empty 。
+# (1)其中notin、notlike、notEmpty传入时中间无空格
+# (2)notEmpty、empty 不传数据(value)
+# (3) in 、notin 多个字段以逗号分隔 例 : common_log_id notin 238734003578214400,238734003578214402,238734003578214403
+
+def FieldValidation(responsedict, targetlist):
+ responselist = responsedict["data"]["list"]
+ strlist = []
+ if responselist:
+ # 循环返回数据列表
+ sum = 1
+ for response in responselist:
+ # 循环目地列表
+ for t in targetlist:
+ # 将目的根据空格分割成列表 (“key值”,“判断条件”,“value值”)
+ target = t.split(" ")
+ #获取json串中所有的key,返回列表
+ responsekeys = getKeys(response)
+ # 判断目的条件的Key在数据中是否存在
+ if target[0] in responsekeys:
+ if len(target) != 1:
+ #targetkey 判断的字段
+ targetkey = target[0]
+ # 判断条件
+ conditions = target[1]
+ # 返回数据中对应key的Value列表
+ responsevaluelist = getjsonvalue(response,target[0])
+ for responsevalue in responsevaluelist:
+ #判断value值是否为列表,转化为字符串
+ if isinstance(responsevalue, list):
+ responsevalue=str(responsevalue)
+ if len(target) == 3:
+ targetvalue = target[2]
+ torf=is_valid_date(responsevalue)
+ if torf == True:
+ timeArray = time.strptime(responsevalue, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ p = conditional(conditions, timeStamp, targetkey, sum, targetvalue)
+ if p != "":
+ strlist.append(p)
+ else:
+ p = conditional(conditions, responsevalue, targetkey, sum, targetvalue)
+ if p != "":
+ strlist.append(p)
+ elif len(target) == 2:
+ p = conditional(conditions, responsevalue, targetkey, sum)
+ if p != "":
+ strlist.append(p)
+ else:
+ str2 = "返回数据第" + str(sum) + "组数据中不存在该字段:" + target[0]
+ print(str2)
+ strlist.append(str2)
+ sum += 1
+ else:
+ str3 = "返回数据中无数据"
+ strlist.append(str3)
+ Assertresults(strlist)
+ return strlist
+
+def getjsonvalue(json_data, key_name):
+ '''获取到json中任意key的值,结果为list格式'''
+ keyvalue = jsonpath.jsonpath(json_data, '$..{key_name}'.format(key_name=key_name))
+ # key的值不为空字符串或者为empty(用例中空固定写为empty)返回对应值,否则返回empty
+ return keyvalue
+
+def getKeys(data):
+ # 获取json串中所有的key
+ keysAll_list = []
+ def getkeys(data): # 遍历json所有key
+ if (type(data) == type({})):
+ keys = data.keys()
+ for key in keys:
+ value = data.get(key)
+ if (type(value) != type({}) and type(value) != type([])):
+ keysAll_list.append(key)
+ elif (type(value) == type({})):
+ keysAll_list.append(key)
+ getkeys(value)
+ elif (type(value) == type([])):
+ keysAll_list.append(key)
+ for para in value:
+ if (type(para) == type({}) or type(para) == type([])):
+ getkeys(para)
+ else:
+ keysAll_list.append(para)
+ getkeys(data)
+ return keysAll_list
+
+# 对传入的数据根据条件进行判断
+def conditional(conditions, value2, targetkey, sum, value=None):
+ str1 = ""
+ if conditions == "=":
+ if value != value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符"
+
+ if conditions == "!=":
+ if value == value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+ print(str1)
+
+ if conditions == ">":
+ if int(value2) <= int(value):
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "<":
+ if int(value2) >= int(value):
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == ">=":
+ if int(value2) < int(value):
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "<=":
+ if int(value2) > int(value):
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "in":
+ value = value.split(",")
+ if value2 not in value:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "notin":
+ value = value.split(",")
+ if value2 in value:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "like":
+ left = value[0]
+ right = value[-1]
+ if left == "%" and right == "%":
+ value = value[1:len(value) - 1]
+ if value not in value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ elif left == "%" and right != "%":
+ v = len(value)
+ _value = value[1:]
+ _value2 = value2[-(v - 1):]
+ print(_value, _value2)
+ if _value != _value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ elif left != "%" and right == "%":
+ v = len(value)
+ _value = value[0:-1]
+ _value2 = value2[0:v - 1]
+ if _value != _value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "notlike":
+ left = value[0]
+ right = value[-1]
+ if left == "%" and right == "%":
+ value = value[1:len(value) - 1]
+ if value in value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ elif left == "%" and right != "%":
+ v = len(value)
+ _value = value[1:]
+ _value2 = value2[-(v - 1):]
+ if _value == _value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ elif left != "%" and right == "%":
+ v = len(value)
+ _value = value[0:-1]
+ _value2 = value2[0:v - 1]
+ if _value == _value2:
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "notEmpty":
+ if value2 == "":
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+
+ if conditions == "Empty":
+ if value2 != "":
+ str1 = "返回数据第" + str(sum) + "组数据中," + targetkey + "的值与和条件不符。"
+ return str1
+
+def Assertresults(resultslist):
+ print(resultslist)
+ for i in resultslist:
+ if i != "":
+ assert 1 == 2
+
+
+def is_valid_date(strdate):
+ '''判断是否是一个有效的日期字符串'''
+ a = re.findall(":", strdate)
+ b = re.findall("-", strdate)
+ if len(a) ==2 and len(b) == 2:
+ return True
+ else:
+ return False
+
diff --git a/keyword/common/customlibrary/Custometest/LogSchema.py b/keyword/common/customlibrary/Custometest/LogSchema.py
new file mode 100644
index 0000000..d190ff1
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/LogSchema.py
@@ -0,0 +1,513 @@
+# !/user/bin/python
+# -*-coding:utf-8-*-
+import requests
+import random
+import json
+import LogResponseVAL
+import time, datetime
+# import allure
+
+
+
+
+# 请求schema接口得到返回数据,用于其他接口
+def schema(schemauerl, token):
+ url = schemauerl
+ headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ return response.json()
+
+
+# 根据schema接口返回数据,得出所有属性所支持的比较类型的列表
+# 1、根据[doc][allow_query]值为true列支持搜索;
+# 2、如有[doc][constraints][operator_functions]值,操作优先;
+# 3、如有[doc][data]值则对应属性取值为data所列code值;
+# 4、int和long的范围不一致;
+# 5、string要包含特殊字符
+# 6、给查询条件赋值,要给出边界和正常值
+# 7、IP(V4、V6)和URL要给出专门的方法生成
+
+import ipaddress
+
+# 生成随机ipv4或ipv6
+MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
+MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
+
+
+def random_ipv4():
+ return ipaddress.IPv4Address._string_from_ip_int(
+ random.randint(0, MAX_IPV4)
+ )
+
+
+def random_ipv6():
+ return ipaddress.IPv6Address._string_from_ip_int(
+ random.randint(0, MAX_IPV6)
+ )
+
+
+from random import Random
+
+
+# 生成 12 位随机 URL 地址
+def randrom_url():
+ str = ''
+ str1 = ''
+ chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
+ chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()'
+ length = len(chars)
+ length1 = len(chars1)
+ random = Random()
+ for x in range(random.randint(8, 16)):
+ str += chars[random.randint(0, length - 1)]
+ for pp in range(random.randint(8, 16)):
+ str1 += chars1[random.randint(0, length1 - 1)]
+ url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1
+ print(url)
+ return url
+
+
+def Filter1(schemauerl, token):
+ list = []
+ json_str = schema(schemauerl, token)
+ print("schemauerl",json_str)
+ print(type(json_str))
+ # 获取日志属性定义
+ fields = json_str["data"]["fields"]
+ print("1111111111",fields)
+ # 获取不同属性支持的部不同操作
+ operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
+ for i in fields:
+ number = random.randint(0, 2147483647)
+ maxnumber = 2147483647
+ minnumber = -2147483648
+ str = random.choice('abcdefghijklmnopqrstuvwxyz!@#%^&*')
+ name = i["name"]
+ doc = i["doc"]
+ # 获取无任何特殊说明列:
+ if doc == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "notEmpty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ if i["doc"]["constraints"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "notEmpty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+
+ else:
+ if i["doc"]["constraints"]["operator_functions"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "notEmpty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ type1 = i["type"]
+ operator1 = i["doc"]["constraints"]["operator_functions"]
+ operator2 = operator1.split(",")
+ data = i["doc"]["data"]
+ for d in data:
+ code = d["code"]
+ if type1 == "int" or type1 == "long":
+ for o in operator2:
+ str1 = name + " " + o + " " + code
+ list.append(str1)
+ else:
+ for o in operator2:
+ str1 = name + " " + o + " " + " '" + code + " '"
+ list.append(str1)
+
+
+ print("22222222222",list)
+ return list
+
+
+# 根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口
+def logapiverify(schemauerl,logurl, token, starttime, endtime,logtype):
+ filter2 = Filter1(schemauerl, token)
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print("333333333333",filter2)
+ for i in filter2:
+ print("条件:", i)
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": i
+ }
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+ # print("111111111111111111111111111111111111111111111111111111111111111111111111111111111111111")
+ # print(str2)
+ # str3 = str2[0:-4]
+ # print(str3)
+ # url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ # headers = {"Content-Type": "application/json",
+ # "Authorization": token}
+ # data = {
+ # "start_common_recv_time": starttime,
+ # "end_common_recv_time": endtime,
+ # "logType": logtype,
+ # "fields": fields,
+ # "filter": str3
+ # }
+ # print(data)
+ # print(json.dumps(data))
+ # response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ # code = response1.json()["code"]
+ # print(response1.json())
+ # assert code == 200
+ # print(response1.json()["code"])
+
+
+# 精确filter,请求日志接口
+def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ # print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+
+#目的性验证,循坏返回列表中所有字段进行查询
+def loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, datajson):
+ nullkey = []
+ data = datajson
+ keylist = LogResponseVAL.getKeys(data)
+ a = schema(schemaurl, token)
+ fields = a["data"]["fields"]
+ for i in keylist:
+ conditions = data[i]
+ for field in fields:
+ name = field["name"]
+ if field["doc"] == None or field["doc"]["visibility"] == None:
+ if i == name:
+ if conditions != None and conditions != "":
+ if field["type"] == "string":
+ if conditions[0] == "'" and conditions[-1] == "'":
+ filtervalue = i + " = " + conditions
+ VasserValue=i + " = " + conditions[1:-1]
+
+ else:
+ filtervalue = i + " = " + "'" + conditions + "'"
+ VasserValue= i + " = " + conditions
+ else:
+ if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
+ timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ filtervalue = i + " = " + timeStamp
+ VasserValue = filtervalue
+
+ else:
+ filtervalue = i + " = " + str(conditions)
+ VasserValue = filtervalue
+ print("filtervalue",filtervalue)
+ #根据提取条件进行查询日志列表
+ responsebody = loglistverify(logurl, schemaurl, token, starttime, endtime, logtype,
+ filtervalue)
+ filterlist=[VasserValue]
+ print(VasserValue)
+ LogResponseVAL.FieldValidation(responsebody,filterlist)
+
+ else:
+ nullkey.append(i) #所有为None或者“”的字段
+ return nullkey
+
+ # 多条循环 变量设置为公共参数 若循环内一个字段没有值 进行下次循坏
+def logAllFieldsListInterface(logurl, schemaurl, token, starttime, endtime, logtype, datajson,lognumber,logcycles):
+ datalist = datajson["data"]["list"]
+ keylist=[]
+ number=0
+ print(lognumber)
+ print(type(lognumber))
+ print(logcycles)
+ print(type(logcycles))
+ for i in range(0, len(datalist), int(lognumber)):# 循环取出count个列表元素
+ number+=1
+ nullkeylist=[]
+ ret=datalist[i:i + int(lognumber)]
+ for data in ret:
+ nullkey=loglistverifys(logurl, schemaurl, token, starttime, endtime, logtype, data)
+ nullkeylist.append(nullkey)
+ print(nullkeylist)
+ for j in nullkeylist:
+ #对返回的为空的key进行取交集
+ if len(keylist) == 0:
+ keylist=j
+ else:
+ #取两个列表的交集
+ keylist=list(set(keylist).intersection(set(j)))
+ if len(keylist) == 0 or number >= int(logcycles):
+ break
+ print("最终数据中没有值的字段为:",keylist)
+
+
+# 事件日志和通联日志时间分布查询 ,日志检索条件校验(filter内容验证)
+def distributed_query(logurl, token):
+ url = logurl # url示例:http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
+ headers = {"Content-Type": "application/json", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ code = response.json()["code"]
+ print(response.json())
+ assert code == 200
+ print(response.json()["code"])
+ return response.json()
+
+#日志检索条件校验 纯接口
+def LogRetrieve(schemaurl,host,port,token,logType,datajson):
+ number = random.randint(0, 2147483647)
+ str1 = random.choice('abcdefghijklmnopqrstuvwxyz')
+ data=datajson["data"]["list"][0]
+ keylist = LogResponseVAL.getKeys(data)
+ a = schema(schemaurl, token)
+ fields=a["data"]["fields"]
+ for i in keylist:
+ conditions = data[i]
+ for field in fields:
+ name = field["name"]
+ if i == name:
+ if field["type"] == "string":
+ filter = "logType=" + logType + "&" + "filter=" + i + "=" + "'" + str1 + "'"
+ else:
+ if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
+ timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ filter = "logType=" + logType + "&" + "filter=" + i + "=" + timeStamp
+ else:
+ filter = "logType=" + logType + "&" + "filter=" + i + "=" + str(number)
+ Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + filter
+ print(Logurl)
+ responsebody = distributed_query(Logurl, token)
+
+# 日志检索条件校验 复杂sql
+def LogRetrieveSql(schemaurl,host,port,token,logType,datajson):
+ data = datajson["data"]["list"][0]
+ keylist = LogResponseVAL.getKeys(data)
+ sqllist=random.sample(keylist, 4)
+ number = 45585
+ str1 = random.choice('abcdefghijklmnopqrstuvwxyz')
+ print(sqllist)
+ a = schema(schemaurl, token)
+ filterlist=[]
+ fields=a["data"]["fields"]
+ for i in sqllist:
+ conditions = data[i]
+ for field in fields:
+ name = field["name"]
+ if i == name:
+ if field["type"] == "string":
+ if conditions == "" or conditions == None:
+ conditions=str1
+ filter = i + "=" + "'" + conditions + "'"
+ else:
+ if i == "common_recv_time" or i == "common_start_time" or i == "common_end_time" or i == "common_processing_time":
+ timeArray = time.strptime(conditions, "%Y-%m-%d %H:%M:%S")
+ timeStamp = str(int(time.mktime(timeArray)))
+ filter =i + "=" + timeStamp
+ else:
+ if conditions == "" or conditions == None:
+ conditions = number
+ filter = i + "=" + str(conditions)
+ print(filter)
+ filterlist.append(filter)
+ sqlfilter = "(("+filterlist[0]+" OR "+filterlist[1]+") AND "+filterlist[2]+") OR "+filterlist[3]
+ _filter = "logType=" + logType + "&" + "filter=" + sqlfilter
+ Logurl = "http://" + host + ":" + port + "/v1/interface/gateway/sql/galaxy/log/filter/validation?" + _filter
+ print(Logurl)
+ responsebody = distributed_query(Logurl, token)
+ print(sqlfilter)
+
+ # 原始日志检索时间分布计算
+def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue):
+ url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "startTime": starttime,
+ "endTime": endtime,
+ "logType": logtype,
+ "granularity": granularity,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志总数查询
+def countlog_query(logurl, token, starttime, endtime, logtype):
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "pageSize": 20,
+ "logType": logtype,
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "filter": ""
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志导出接口
+def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ a=type(response1)
+ if a != "class 'requests.models.Response'":
+ assert 1 == 1
+ else:
+ assert 1 == 2
+
+#判断日志内详情字段
+def LogFieldValidation(schemauerl,token,datajson):
+ Schemajson = schema(schemauerl, token)
+ fields=Schemajson["data"]["fields"]
+ keylist= LogResponseVAL.getKeys(datajson["data"]["list"][0])
+ schema_typedict=Schemajson["data"]["doc"]["schema_type"]
+ schema_typelistkey=schema_typedict.keys()
+ for schema_typekey in schema_typelistkey: #取出schema_type内的每一个key
+ for i in schema_typedict[schema_typekey]["columns"]:
+ for filter in fields:
+ if filter["name"] == i:
+ if filter["doc"] == None:
+ if i not in keylist:
+ print("该字段未存在日志详情内",i)
+ assert 1==2
+ else:
+ print("该字段通过在日志详情内",i)
+ else:
+ if filter["doc"]["visibility"] != "disabled":
+ if i not in keylist:
+ print("该字段未存在日志详情内",i)
+ assert 1==2
+ else:
+ print("该字段通过在日志详情内",i)
+
+
+
+
+
+
+
+# if __name__ == '__main__':
+# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log") \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/MD5.py b/keyword/common/customlibrary/Custometest/MD5.py
new file mode 100644
index 0000000..697ae83
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/MD5.py
@@ -0,0 +1,40 @@
+# 由于MD5模块在python3中被移除
+# 在python3中使用hashlib模块进行md5操作
+
+import hashlib
+
+class MD5:
+ def MD5(data,langer,md5_types):
+ # 创建md5对象
+ # m = hashlib.md5()
+ # Tips
+ # 此处必须encode
+ # 若写法为m.update(str) 报错为: Unicode-objects must be encoded before hashing
+ # 因为python3里默认的str是unicode
+ # 或者 b = bytes(str, encoding='utf-8'),作用相同,都是encode为bytes
+ # b = str.encode(encoding='utf-8')
+ # m.update(b)
+ # str_md5 = m.hexdigest()
+ if langer == "英文":
+ # str_md5 = hashlib.md5(b'this is a md5 test.').hexdigest()
+ str_md5 = hashlib.md5("b'"+data+"'").hexdigest()
+ print('MD5加密前为 :' + data)
+ print('MD5加密后为 :' + str_md5)
+ return str_md5
+ elif langer == "中文":
+ str_md5 = hashlib.md5('你好'.encode(encoding=md5_types)).hexdigest()
+ return str_md5
+ # utf8 和gbk 加密结构不一样
+ # hashlib.md5('你好'.encode(encoding='GBK')).hexdigest()
+ # hashlib.md5('你好'.encode(encoding='GB2312')).hexdigest()
+ # hashlib.md5('你好'.encode(encoding='GB18030')).hexdigest()
+if __name__ == '__main__':
+ data = '小猪'
+ langer = '中文'
+ md5_types = 'GBK'
+ a =MD5(data,langer,md5_types)
+ print(a)
+ b=r'C:\Users\小猪\AppData\Local\Programs\Python\Python37\Lib\site-packages\custometest\MD5.py'
+ with open(b, encoding='utf-8') as f:
+ text = f.read()
+ print(text) \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/ReportSchema.py b/keyword/common/customlibrary/Custometest/ReportSchema.py
new file mode 100644
index 0000000..ac2d947
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/ReportSchema.py
@@ -0,0 +1,718 @@
+import requests
+import random
+import json
+import time
+import ipaddress
+from builtins import list
+
+# Report纯接口测试正向用例方法,不验证数据统计准确性,单纯验证接口
+
+#生成随机ipv4或ipv6
+MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
+MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
+def random_ipv4():
+ return ipaddress.IPv4Address._string_from_ip_int(
+ random.randint(0, MAX_IPV4))
+def random_ipv6():
+ return ipaddress.IPv6Address._string_from_ip_int(
+ random.randint(0, MAX_IPV6))
+
+#随机生成邮箱地址
+def RandomEmail( emailType=None, rang=None):
+ __emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"]
+ # 如果没有指定邮箱类型,默认在 __emailtype中随机一个
+ if emailType == None:
+ __randomEmail = random.choice(__emailtype)
+ else:
+ __randomEmail = emailType
+ # 如果没有指定邮箱长度,默认在4-10之间随机
+ if rang == None:
+ __rang = random.randint(4, 10)
+ else:
+ __rang = int(rang)
+ __Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ"
+ __randomNumber = "".join(random.choice(__Number) for i in range(__rang))
+ _email = __randomNumber + __randomEmail
+ return _email
+
+#获取Schema
+def schema(schemauerl,token,logtype):
+ url ="http://192.168.44.72:8080/v1/log/schema?logType="+logtype
+ headers = {"Content-Type":"application/x-www-form-urlencoded","Authorization":token}
+ response = requests.get(url=url,headers=headers)
+ return response.json()
+
+#获取json串中groupColumnList的值
+def groupby(schemajson,logtype,testpoint,field):
+ dimensions=schemajson["data"]["doc"]["schema_query"]["dimensions"]
+ dimensions.append("common_recv_time");
+ randomstr_1=[]
+ if logtype == "security_event_log" or logtype == "connection_record_log" or logtype == "voip_record_log":
+ dimensions.remove("common_start_time")
+ dimensions.remove("common_end_time")
+ if testpoint == "DataBindings":
+ randomstr_1.append("common_recv_time")
+ elif testpoint == "GroupBy":
+ randomstr_1.append(field)
+ else:
+ randomstr_1=random.sample(dimensions, 4)
+
+ #定义grp为返回值group的列表
+ grp=[]
+ for i in randomstr_1:
+ a={"name":i}
+ grp.append(a)
+
+ re=[grp,randomstr_1]
+ print("groupby",re)
+ return re
+
+#获取json串中queryColumnList的值
+def DataBindings(schemajson,randomstr_1,testpoint,field):
+ #生成queryColumnList列表
+ metrics=schemajson["data"]["doc"]["schema_query"]["metrics"]
+ metrics.append("common_log_id")
+ #在列表里随机元素
+ randomstr_2=[]
+ if testpoint == "DataBindings":
+ randomstr_2.append(field)
+ else:
+ randomstr_2=random.sample(metrics,6)
+ #在聚合列表中去掉groupby中的重复的元素
+ randomstr_3=array_diff(randomstr_2,randomstr_1)
+ #将groupby中元素添加到串中
+ qul=[]
+ for i in randomstr_1:
+ a={"name":i}
+ qul.append(a)
+
+ fields = schemajson["data"]["fields"]
+ list_1=["sum","min","max","avg","count"]
+ list_2=["count","count_distinct"]
+ if testpoint == "DataBindings":
+ for i in randomstr_3:
+ for j in fields:
+ if i == j["name"] :
+ jtype=j["type"]
+ label=i
+ sun=1
+ if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
+ for Aggregate in list_1:
+ randomstr_4={"name":i,"expression":Aggregate,"label":label}
+ qul.append(randomstr_4)
+ label=label+str(sun)
+ sun+=1
+ elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
+ for Aggregate in list_2:
+ randomstr_4={"name":i,"expression":Aggregate,"label":label}
+ qul.append(randomstr_4)
+ label = label + str(sun)
+ sun += 1
+
+ else:
+ for i in randomstr_3:
+ for j in fields:
+ if i == j["name"]:
+ jtype = j["type"]
+ if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
+ radomlist = random.sample(list_1, 1)
+ randomstr_4 = {"name": i, "expression": radomlist[0]}
+ qul.append(randomstr_4)
+ elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
+ randomlist = random.sample(list_2, 1)
+ randomstr_4 = {"name": i, "expression": randomlist[0]}
+ qul.append(randomstr_4)
+ print("DataBindings",qul)
+ return qul
+
+# #去除a列表中存在的b的元素
+def array_diff(a, b):
+ #定义空列表
+ c=[]
+ #range(len(a))取的为列表a的索引,根据a的
+ for i in range(len(a)):
+ #取出索引对应的值
+ t=a[i]
+ #判断值是否存在在序列b中
+ if t not in b:
+ #如果序列不在b中,则写入序列c
+ c.append(t)
+ #返回序列c,c就是列表a去除列表b之后的元素
+ return c
+
+def filterCondition(schemajson,testpoint,field):
+ number = random.randint(0,100000)
+ randomstr= random.choice('abcdefghijklmnopqrstuvwxyz')
+ schemafilters=schemajson["data"]["doc"]["schema_query"]["filters"]
+ list1=[]
+ if testpoint=="Filter":
+ list1.append(field)
+ else:
+ list1=random.sample(schemafilters, 4)
+ #获取不同属性支持的部不同操作
+ fields = schemajson["data"]["fields"]
+ operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"]
+ andConditions=[]
+ for i in list1:
+ #遍历fields列表
+ for k in fields:
+ #当filters列表值等于fields的name时
+ if i == k["name"]:
+ name = k["name"]
+ doc = k["doc"]
+ #获取无任何特殊说明列:
+ if doc == None:
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list=[]
+ Operator=["=","!=",">","<",">=","<="]
+ if testpoint=="Filter":
+ for op in Operator:
+ value=[str(number)]
+ Field={"name":name,"expression":op,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator= random.sample(Operator, 1)
+ value=[str(number)]
+ Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list=[]
+ Operator=["=","!=","Like","Not Like","notEmpty","empty"]
+ if testpoint=="Filter":
+ for op in Operator:
+ value=[]
+ if op == "=" or op == "!=":
+ value.append(str(number))
+ elif op == "Like" or op == "Not Like":
+ value.append(randomstr)
+ elif op=="notEmpty" or op == "empty":
+ value=[]
+ Field={"name":name,"expression":op,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(str(number))
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(randomstr)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+
+ else:
+ if k["doc"]["constraints"]== None:
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list=[]
+ Operator=["=","!=",">","<",">=","<="]
+ if testpoint == "Filter":
+ for op in Operator:
+ value=[str(number)]
+ Field={"name":name,"expression":op,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator= random.sample(Operator, 1)
+ value=[str(number)]
+ Field={"name":name,"expression":randomOperator[0],"value":value,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list=[]
+ Operator=["=","!=","Like","Not Like","notEmpty","empty"]
+ if testpoint == "Filter":
+ for op in Operator:
+ randomOperator = op
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(str(number))
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(randomstr)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1= random.sample(Operator, 1)
+ randomOperator=randomOperator_1[0]
+ value=[]
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(str(number))
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(randomstr)
+ elif randomOperator=="notEmpty":
+ value=[]
+ Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+
+ else:
+ if k["doc"]["constraints"]["operator_functions"]==None:
+ conrandomstraints=k["doc"]["constraints"]
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list = []
+ Operator=["=","!=",">","<",">=","<="]
+ if testpoint == "Filter":
+ for op in Operator:
+ randomOperator = op
+ if conrandomstraints["type"] == "timestamp":
+ # 获取当前时间戳
+ t = int(time.time())
+ value = [str(t)]
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1= random.sample(Operator, 1)
+ randomOperator=randomOperator_1[0]
+ if conrandomstraints["type"] == "timestamp":
+ #获取当前时间戳
+ t = int(time.time())
+ value=[str(t)]
+ Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list = []
+ Operator=["=","!=","Like","Not Like","notEmpty","empty"]
+ if testpoint == "Filter":
+ if conrandomstraints["type"] == "ip":
+ for op in Operator:
+ # 获取ip
+ ip = random_ipv4()
+ value = []
+ if op == "=" or op == "!=":
+ value.append(ip)
+ elif op == "Like" or op == "Not Like":
+ value.append(ip)
+ elif op == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": op, "value": value,"type": type1}
+ orConditions_list.append(Field)
+ elif conrandomstraints["type"] == "email":
+ for op in Operator:
+ randomOperator = op
+ Operator = ["=", "!=", "Like", "Not Like", "notEmpty","empty"]
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ # 获取ip
+ emil = RandomEmail()
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(emil)
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(emil)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1= random.sample(Operator, 1)
+ randomOperator=randomOperator_1[0]
+ if conrandomstraints["type"] == "ip":
+ #获取ip
+ ip =random_ipv4()
+ value=[]
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(ip)
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(ip)
+ elif randomOperator=="notEmpty":
+ value=[]
+ Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+ elif conrandomstraints["type"] == "email":
+ Operator=["=","!=","Like","Not Like","notEmpty","empty"]
+ randomOperator_1= random.sample(Operator, 1)
+ randomOperator=randomOperator_1[0]
+ #获取ip
+ emil =RandomEmail()
+ value=[]
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(emil)
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(emil)
+ elif randomOperator=="notEmpty":
+ value=[]
+ Field={"name":name,"expression":randomOperator,"value":value,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+ else:
+ type1 = k["type"]
+ orConditions_list=[]
+ operator1 = k["doc"]["constraints"]["operator_functions"]
+ operator2 = operator1.split(",")
+ if testpoint == "Filter":
+ for op in operator2:
+ operatordata = k["doc"]["data"]
+ code = []
+ for i in operatordata:
+ code_1 = i["code"]
+ code.append(code_1)
+ for co in code:
+ Field = {"name": name, "expression": op, "value": co, "type": type1}
+ orConditions_list.append(Field)
+ else:
+ operator3=random.sample(operator2,1)
+ operatordata=k["doc"]["data"]
+ code=[]
+ for i in operatordata:
+ code_1=i["code"]
+ code.append(code_1)
+ code2=random.sample(code, 1)
+ Field={"name":name,"expression":operator3[0],"value":code2,"type":type1}
+ orConditions_list.append(Field)
+ orConditions={"orConditions":orConditions_list}
+ andConditions.append(orConditions)
+ filterCondition={"andConditions":andConditions}
+ print("filterCondition",filterCondition)
+ return filterCondition
+
+#获取having条件的串
+def havingjson(schemajson,testpoint,field):
+ number = random.randint(0,100000)
+ schemametrics=schemajson["data"]["doc"]["schema_query"]["metrics"]
+ aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"]
+ schemametrics.append("common_log_id")
+ metricslist=[]
+ if testpoint == "Having":
+ metricslist.append(field)
+ else:
+ metricslist=random.sample(schemametrics, 4)
+ fields = schemajson["data"]["fields"]
+ operator=["=","!=",">","<",">=","<="]
+ Aggregate=["COUNT","AVG","SUM","MAX","MIN"]
+ andConditions_list=[]
+ #遍历的到的having条件列表
+ for i in metricslist:
+ for j in fields:
+ if i == j["name"]:
+ name = j["name"]
+ type1=j["type"]
+ for v in aggregation:
+ if type1 == v["type"]:
+ orConditions_list=[]
+ if v["type"] != "string":
+ functionslist=Aggregate
+ else:
+ functionsstr=v["functions"]
+ functionslist = functionsstr.split(",")
+ if field == "common_log_id":
+ functionslist=["COUNT"]
+ if testpoint == "Having":
+ for functions_1 in functionslist:
+ for operator_1 in operator:
+ havingdict = {"name": name, "function": str.lower(functions_1),
+ "expression": operator_1, "value": str(number)}
+ orConditions_list.append(havingdict)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions_list.append(orConditions)
+ else:
+ functions_1=random.sample(functionslist, 1)
+ if functions_1=="COUNT_DISTINCT" and type1 != "string":
+ functions_1=random.sample(functionslist, 1)
+ operator_1=random.sample(operator, 1)
+
+ havingdict={"name":name,"function":str.lower(functions_1[0]),"expression":operator_1[0],"value":str(number)}
+ orConditions_list.append(havingdict)
+ orConditions={"orConditions":orConditions_list}
+ andConditions_list.append(orConditions)
+ havingCondition={"andConditions":andConditions_list}
+ print("having",havingCondition)
+ return havingCondition
+
+#拼接字符串
+def datasetjson(schemauerl,token,testname,logtype,testpoint,field):
+ schema_new=schema(schemauerl,token,logtype)
+ group_re=groupby(schema_new,logtype,testpoint,field)
+ groupColumnList=group_re[0]
+ group_randomstr=group_re[1]
+ queryColumnList=DataBindings(schema_new,group_randomstr,testpoint,field)
+ filterCondition_1=filterCondition(schema_new,testpoint,field)
+ havingjson_1=havingjson(schema_new,testpoint,field)
+ datasetdict = {
+ "list": {
+ "name":testname,
+ "logType": logtype,
+ "groupColumnList":groupColumnList,
+ "queryColumnList":queryColumnList,
+ "filterCondition":filterCondition_1,
+ "havingCondition":havingjson_1
+ }
+ }
+ print(datasetdict)
+ print("datasetjson",json.dumps(datasetdict))
+ return json.dumps(datasetdict)
+
+#拼接char的json串
+def charjson(schemaurl,token,queryColumnList,groupColumnList,datasetid,testname,logtype):
+ print("queryColumnList",queryColumnList)
+ schema_new=schema(schemaurl,token,logtype)
+ fields = schema_new["data"]["fields"]
+ # 获取条件的label
+ namelist=[]
+ for i in queryColumnList:
+ for j in fields:
+ if i["name"] == j["name"]:
+ j_label=j["label"]
+ namelist.append(j_label)
+ print("namelist",namelist)
+ #获取聚合条件的label
+ groupColumnlaberList=[]
+ for i in groupColumnList:
+ for j in fields:
+ if i["name"] == j["name"]:
+ j_label=j["label"]
+ groupColumnlaberList.append(j_label)
+ print("groupColumnlaberList",groupColumnlaberList)
+ #图表类型列表
+ chartType_1=["line","pie","bar","area","table"]
+ chartType_2=["pie","bar","table"]
+ chartType=[]
+# #随机选择图表类型
+ s=1
+ for i in namelist:
+ if i == "Receive Time" or i == "Start Time" or i == "End Time":
+ s+=1
+ if s != 1:
+ chartType=random.sample(chartType_1, 1)
+ else:
+ chartType=random.sample(chartType_2, 1)
+ chardict={}
+ print("chartType",chartType)
+ if chartType[0] == "line" or chartType[0] == "area":
+ dataBinding=[]
+ #将时间条件赋值给dataBinding
+ for j in namelist:
+ if j == "Receive Time" or j == "Start Time" or j == "End Time":
+ dataBinding.append(j)
+ timelin={
+ "dataBinding": dataBinding[0],
+ "format": "Time"
+ }
+ print("timelin",timelin)
+ namelist.remove(dataBinding[0]) #从统计查询数据列对象内去掉时间条件
+ groupColumnlaberList.remove(dataBinding[0]) #从聚合条件内去掉时间的条件
+ for i in groupColumnlaberList: #从统计查询条件内去掉聚合条件内的值
+ namelist.remove(i)
+ print("namelistrome",namelist)
+ linlist=[]
+ for i in namelist:
+ lindict={
+ "dataBinding": i,
+ "type": "Line Up",
+ "format": "Default",
+ }
+ linlist.append(lindict)
+ listdict={
+ "name": testname,
+ "datasetId": datasetid,
+ "datasetName": "",
+ "chartType": chartType[0],
+ "dataTop": 0,
+ "orderBy": "",
+ "orderDesc": 0,
+ "drilldownTop": 0,
+ "timeline": timelin,
+ "line": linlist
+ }
+ chardict={"list": listdict}
+ elif chartType[0] == "pie" or chartType[0] == "bar":
+ xAxisdataBinding=random.sample(groupColumnlaberList, 1)
+ xAxisdict={
+ "dataBinding": xAxisdataBinding[0],
+ "dataTop": 5,
+ "dataType": ""
+ }
+ for i in groupColumnlaberList:
+ namelist.remove(i)
+ yAxisBinding=random.sample(namelist, 1)
+ yAxisdict={
+ "dataBinding": yAxisBinding[0],
+ "format": "Default",
+ }
+ yAxislist=[yAxisdict]
+ listdict={
+ "name": testname,
+ "datasetId": datasetid,
+ "datasetName": "",
+ "chartType": chartType[0],
+ "dataTop": 0,
+ "orderBy": "",
+ "orderDesc": "",
+ "xAxis": xAxisdict,
+ "yAxis": yAxislist
+ }
+ chardict={"list": listdict}
+ elif chartType[0] == "table":
+ columnslist=[]
+ for i in namelist:
+ dataBindings={
+ "dataType": "",
+ "dataBinding": i,
+ "format": "Default",
+ }
+ dataBindingslist=[]
+ dataBindingslist.append(dataBindings)
+ columnsdict={
+ "title": i,
+ "width": 0,
+ "dataBindings": dataBindingslist
+ }
+ columnslist.append(columnsdict)
+
+ listdict={
+ "name": testname,
+ "datasetId": datasetid,
+ "datasetName": "",
+ "chartType": "table",
+ "dataTop": 5,
+ "orderBy": "",
+ "orderDesc": "",
+ "drilldownTop": 5,
+ "tableType": "Regular",
+ "columns": columnslist
+ }
+ chardict={"list": listdict}
+ print("charjson",json.dumps(chardict))
+ return json.dumps(chardict)
+
+def Reportsjson(chartId,testname):
+ charlist=[]
+ chardict={
+ "chartId": chartId,
+ "timeGranulartiy": 1,
+ "timeUnit": "",
+# "disabled": true
+ }
+ charlist.append(chardict)
+ reportJobList=[]
+ reportJobdct_1={
+ "rangeType": "last",
+ "rangeInterval": 1,
+ "rangeUnit": "week",
+ "jobName": testname,
+ "scheduleId": "",
+ "chartList": charlist,
+ "isNotice": 0,
+ "noticeMethod": "",
+ "startTime": "",
+ "endTime": "",
+ "filterCondition": None,
+ "isDisplayTrafficTrend": 1
+ }
+ reportJobdct_2={"reportJobList": reportJobdct_1}
+ print("reportjson",json.dumps(reportJobdct_2))
+ return json.dumps(reportJobdct_2)
+
+def ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype,testpoint,field):
+ headers = {"Content-Type": "application/json","Authorization": token}
+ #dataset生成json串并发送请求
+ _datasetjson=datasetjson(schemaurl,token,testname,logtype,testpoint,field)
+ response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers)
+ print("返回数据1",response1)
+ code = response1.json()["code"]
+ print("datasetcode:",code)
+ assert code == 200
+
+
+# 获取dataset的id
+ datasetget=requests.get(url=datasetgeturl,headers=headers)
+ dasetget=datasetget.json()
+ datesetid=dasetget["data"]["list"][0]["id"]
+ Deleteinterfaces(dataseturl,token,datesetid)
+ # _datasetjson=json.loads(_datasetjson)
+ # queryColumnList=_datasetjson["list"]["queryColumnList"]
+ # groupColumnList=_datasetjson["list"]["groupColumnList"]
+ #生成charlibrariesjson串
+ # charlibrariesjson=charjson(schemaurl, token,queryColumnList,groupColumnList,datesetid,testname,logtype)
+ # response2 = requests.post(url=charurl, data=charlibrariesjson, headers=headers)
+ # code = response2.json()["code"]
+ # assert code == 200
+#
+# #获取char libraries的id
+# charget=requests.get(url=chargeturl,headers=headers)
+# charget=charget.json()
+# charid=charget["data"]["list"][0]["id"]
+#
+# #report生成json串并发送请求
+# reportjson=Reportsjson(charid,testname)
+# response3 = requests.post(url=repporturl, data=reportjson, headers=headers)
+# code = response3.json()["code"]
+# assert code == 200
+#
+
+# #循环调用ReportInterfaceTest方法
+# def ReportTest(host,token,dataseturl,charurl,repporturl,logtypelist):
+# for logtype in logtypelist:
+# testname="Report"+logtype
+# datasetgeturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser="
+# chargeturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser="
+# schemaurl="http://"+host+":8080/v1/log/schema?logType="+logtype
+# ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,datasetgeturl,chargeturl,testname,logtype)
+
+
+def Deleteinterfaces(url,token,id):
+ headers = {"Content-Type": "application/json","Authorization": token}
+ datedict={"ids":[id]}
+ datajson=json.dumps(datedict)
+ response1 = requests.delete(url=url, data=datajson, headers=headers)
+
+
+def ReportPositiveTest(host,port,token,dataseturl,charurl,repporturl,logtypelist):
+ testpoint=["GroupBy","DataBindings","Filter","Having"]
+ for logtype in logtypelist:
+ schemaurl="http://"+host+":"+port+"/v1/log/schema?logType="+logtype
+ schema_new=schema(schemaurl,token,logtype)
+ metrics = schema_new["data"]["doc"]["schema_query"]["metrics"]
+ schemafilters = schema_new["data"]["doc"]["schema_query"]["filters"]
+ dimensions = schema_new["data"]["doc"]["schema_query"]["dimensions"]
+ dimensions.append("common_recv_time");
+ metrics.append("common_log_id")
+ for j in testpoint:
+ if j == "GroupBy":
+ for filter in dimensions:
+ testname="Report"+logtype+j+filter
+ dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser="
+ ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter)
+ if j == "DataBindings":
+ for filter in metrics:
+ testname="Report"+logtype+j+filter
+ dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser="
+ ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter)
+
+ if j == "Filter" :
+ for filter in schemafilters:
+ testname="Report"+logtype+j+filter
+ dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser="
+ ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter)
+
+ if j == "Having" :
+ for filter in metrics:
+ testname="Report"+logtype+j+filter
+ dataset_geturl=dataseturl+"?pageSize=20&pageNo=1&id=&name="+testname+"&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl=charurl+"?pageSize=20&pageNo=1&id=&name="+testname+"&opUser="
+ ReportInterfaceTest(schemaurl,token,dataseturl,charurl,repporturl,dataset_geturl,char_geturl,testname,logtype,j,filter)
+
+
+
+
+
diff --git a/keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py b/keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py
new file mode 100644
index 0000000..dc466a9
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/ReportSchema_Negtive.py
@@ -0,0 +1,871 @@
+import requests
+import random
+import json
+import time
+import ipaddress
+
+# Report纯接口测试 反向用例方法,不验证数据统计准确性,单纯验证接口
+
+# 生成随机ipv4或ipv6
+MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
+MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
+
+
+def random_ipv4():
+ return ipaddress.IPv4Address._string_from_ip_int(
+ random.randint(0, MAX_IPV4))
+
+
+def random_ipv6():
+ return ipaddress.IPv6Address._string_from_ip_int(
+ random.randint(0, MAX_IPV6))
+
+
+# 随机生成邮箱地址
+def RandomEmail(emailType=None, rang=None):
+ __emailtype = ["@qq.com", "@163.com", "@126.com", "@189.com"]
+ # 如果没有指定邮箱类型,默认在 __emailtype中随机一个
+ if emailType == None:
+ __randomEmail = random.choice(__emailtype)
+ else:
+ __randomEmail = emailType
+ # 如果没有指定邮箱长度,默认在4-10之间随机
+ if rang == None:
+ __rang = random.randint(4, 10)
+ else:
+ __rang = int(rang)
+ __Number = "0123456789qbcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPWRSTUVWXYZ"
+ __randomNumber = "".join(random.choice(__Number) for i in range(__rang))
+ _email = __randomNumber + __randomEmail
+ return _email
+
+
+# 获取Schema
+def schema(schemauerl, token, logtype):
+ url = "http://192.168.44.72:8080/v1/log/schema?logType=" + logtype
+ headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ return response.json()
+
+
+# 获取json串中groupColumnList的值
+def groupby(schemajson, logtype, testpoint):
+ dimensions = schemajson["data"]["doc"]["schema_query"]["dimensions"]
+ dimensions.append("common_recv_time");
+ if logtype == "security_event_log" or logtype == "connection_record_log" or logtype == "voip_record_log":
+ dimensions.remove("common_start_time")
+ dimensions.remove("common_end_time")
+ randomstr_1 = []
+ if testpoint == "GroupBy":
+ randomstr_1.append("GroupBy_Negtive")
+ else:
+ randomstr_1 = random.sample(dimensions, 4)
+
+ # 定义grp为返回值group的列表
+ grp = []
+ for i in randomstr_1:
+ a = {"name": i}
+ grp.append(a)
+
+ re = [grp, randomstr_1]
+ print("groupby", re)
+ return re
+
+
+# 获取json串中queryColumnList的值
+def DataBindings(schemajson, randomstr_1, testpoint, field):
+ # 生成queryColumnList列表
+ print("field", field)
+ metrics = schemajson["data"]["doc"]["schema_query"]["metrics"]
+ metrics.append("common_log_id")
+ # 在列表里随机元素
+ randomstr_2 = []
+ if testpoint == "DataBindings_Field" or testpoint == "DataBindings_Aggregate":
+ randomstr_2.append(field)
+ randomstr_3 = randomstr_2
+ else:
+ randomstr_2 = random.sample(metrics, 6)
+ # 在聚合列表中去掉groupby中的重复的元素
+ randomstr_3 = array_diff(randomstr_2, randomstr_1)
+ # 将groupby中元素添加到串中
+ qul = []
+ for i in randomstr_1:
+ a = {"name": i}
+ qul.append(a)
+
+ fields = schemajson["data"]["fields"]
+ if testpoint == "DataBindings_Aggregate":
+ list_1 = ["countdistinct"]
+ list_2 = ["summ"]
+ else:
+ list_1 = ["sum", "min", "max", "avg", "count"]
+ list_2 = ["count", "count_distinct"]
+
+ if testpoint == "DataBindings_Field":
+ Aggregate = "sum"
+ randomstr_4 = {"name": randomstr_2[0], "expression": Aggregate}
+ qul.append(randomstr_4)
+
+ elif testpoint == "DataBindings_Aggregate":
+ for i in randomstr_3:
+ for j in fields:
+ if i == j["name"]:
+ jtype = j["type"]
+ label = i
+ sun = 1
+ if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
+ for Aggregate in list_1:
+ randomstr_4 = {"name": i, "expression": Aggregate, "label": label}
+ qul.append(randomstr_4)
+ label = label + str(sun)
+ sun += 1
+ elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
+ for Aggregate in list_2:
+ randomstr_4 = {"name": i, "expression": Aggregate, "label": label}
+ qul.append(randomstr_4)
+ label = label + str(sun)
+ sun += 1
+
+ else:
+ for i in randomstr_3:
+ for j in fields:
+ if i == j["name"]:
+ jtype = j["type"]
+ if jtype == "int" or jtype == "long" or jtype == "float" or jtype == "double":
+ radomlist = random.sample(list_1, 1)
+ randomstr_4 = {"name": i, "expression": radomlist[0]}
+ qul.append(randomstr_4)
+ elif jtype == "randomstring" or jtype == "date" or jtype == "timestamp" or jtype == "string":
+ randomlist = random.sample(list_2, 1)
+ randomstr_4 = {"name": i, "expression": randomlist[0]}
+ qul.append(randomstr_4)
+ print("DataBindings", qul)
+ return qul
+
+
+# #去除a列表中存在的b的元素
+def array_diff(a, b):
+ # 定义空列表
+ c = []
+ # range(len(a))取的为列表a的索引,根据a的
+ for i in range(len(a)):
+ # 取出索引对应的值
+ t = a[i]
+ # 判断值是否存在在序列b中
+ if t not in b:
+ # 如果序列不在b中,则写入序列c
+ c.append(t)
+ # 返回序列c,c就是列表a去除列表b之后的元素
+ return c
+
+
+def filterCondition(schemajson, testpoint, field):
+ number = random.randint(0, 100000)
+ randomstr = random.choice('abcdefghijklmnopqrstuvwxyz')
+ schemafilters = schemajson["data"]["doc"]["schema_query"]["filters"]
+ list1 = []
+ if testpoint == "Filter_Field" or testpoint == "Filter_Operator":
+ list1.append(field)
+ else:
+ list1 = random.sample(schemafilters, 4)
+ # 获取不同属性支持的部不同操作
+ fields = schemajson["data"]["fields"]
+ operator = schemajson["data"]["doc"]["schema_query"]["references"]["operator"]
+ andConditions = []
+ if testpoint == "Filter_Field":
+ orConditions_list = []
+ Field = {"name": field, "expression": "!=", "value": [1], "type": "int"}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ elif testpoint == "Filter_Operator":
+ for i in list1:
+ # 遍历fields列表
+ for k in fields:
+ # 当filters列表值等于fields的name时
+ if i == k["name"]:
+ name = k["name"]
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list = []
+ Operator = ["=="]
+ randomOperator = random.sample(Operator, 1)
+ value = [str(number)]
+ Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list = []
+ Operator = ["=="]
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ value = []
+ value.append(str(number))
+ Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ #
+ # else:
+ # if k["doc"]["constraints"] == None:
+ # type1 = k["type"]
+ # if type1 == "int" or type1 == "long":
+ # orConditions_list = []
+ # Operator = ["=", "!=", ">", "<", ">=", "<="]
+ # if testpoint == "Filter":
+ # for op in Operator:
+ # value = [str(number)]
+ # Field = {"name": name, "expression": op, "value": value, "type": type1}
+ # orConditions_list.append(Field)
+ # else:
+ # randomOperator = random.sample(Operator, 1)
+ # value = [str(number)]
+ # Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1}
+ # orConditions_list.append(Field)
+ # orConditions = {"orConditions": orConditions_list}
+ # andConditions.append(orConditions)
+ # elif type1 == "string":
+ # orConditions_list = []
+ # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ # if testpoint == "Filter":
+ # for op in Operator:
+ # randomOperator = op
+ # value = []
+ # if randomOperator == "=" or randomOperator == "!=":
+ # value.append(str(number))
+ # elif randomOperator == "Like" or randomOperator == "Not Like":
+ # value.append(randomstr)
+ # elif randomOperator == "notEmpty":
+ # value = []
+ # Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ # orConditions_list.append(Field)
+ # else:
+ # randomOperator_1 = random.sample(Operator, 1)
+ # randomOperator = randomOperator_1[0]
+ # value = []
+ # if randomOperator == "=" or randomOperator == "!=":
+ # value.append(str(number))
+ # elif randomOperator == "Like" or randomOperator == "Not Like":
+ # value.append(randomstr)
+ # elif randomOperator == "notEmpty":
+ # value = []
+ # Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ # orConditions_list.append(Field)
+ # orConditions = {"orConditions": orConditions_list}
+ # andConditions.append(orConditions)
+ #
+ # else:
+ # if k["doc"]["constraints"]["operator_functions"] == None:
+ # conrandomstraints = k["doc"]["constraints"]
+ # type1 = k["type"]
+ # if type1 == "int" or type1 == "long":
+ # orConditions_list = []
+ # Operator = ["=", "!=", ">", "<", ">=", "<="]
+ # if testpoint == "Filter":
+ # for op in Operator:
+ # randomOperator = op
+ # if conrandomstraints["type"] == "timestamp":
+ # # 获取当前时间戳
+ # t = int(time.time())
+ # value = [str(t)]
+ # Field = {"name": name, "expression": randomOperator, "value": value,
+ # "type": type1}
+ # orConditions_list.append(Field)
+ # else:
+ # randomOperator_1 = random.sample(Operator, 1)
+ # randomOperator = randomOperator_1[0]
+ # if conrandomstraints["type"] == "timestamp":
+ # # 获取当前时间戳
+ # t = int(time.time())
+ # value = [str(t)]
+ # Field = {"name": name, "expression": randomOperator, "value": value,
+ # "type": type1}
+ # orConditions_list.append(Field)
+ # orConditions = {"orConditions": orConditions_list}
+ # andConditions.append(orConditions)
+ # elif type1 == "string":
+ # orConditions_list = []
+ # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ # if testpoint == "Filter":
+ # if conrandomstraints["type"] == "ip":
+ # for op in Operator:
+ # # 获取ip
+ # ip = random_ipv4()
+ # value = []
+ # if op == "=" or op == "!=":
+ # value.append(ip)
+ # elif op == "Like" or op == "Not Like":
+ # value.append(ip)
+ # elif op == "notEmpty":
+ # value = []
+ # Field = {"name": name, "expression": op, "value": value, "type": type1}
+ # orConditions_list.append(Field)
+ # elif conrandomstraints["type"] == "email":
+ # for op in Operator:
+ # randomOperator = op
+ # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ # randomOperator_1 = random.sample(Operator, 1)
+ # randomOperator = randomOperator_1[0]
+ # # 获取ip
+ # emil = RandomEmail()
+ # value = []
+ # if randomOperator == "=" or randomOperator == "!=":
+ # value.append(emil)
+ # elif randomOperator == "Like" or randomOperator == "Not Like":
+ # value.append(emil)
+ # elif randomOperator == "notEmpty":
+ # value = []
+ # Field = {"name": name, "expression": randomOperator, "value": value,
+ # "type": type1}
+ # orConditions_list.append(Field)
+ # else:
+ # randomOperator_1 = random.sample(Operator, 1)
+ # randomOperator = randomOperator_1[0]
+ # if conrandomstraints["type"] == "ip":
+ # # 获取ip
+ # ip = random_ipv4()
+ # value = []
+ # if randomOperator == "=" or randomOperator == "!=":
+ # value.append(ip)
+ # elif randomOperator == "Like" or randomOperator == "Not Like":
+ # value.append(ip)
+ # elif randomOperator == "notEmpty":
+ # value = []
+ # Field = {"name": name, "expression": randomOperator, "value": value,
+ # "type": type1}
+ # orConditions_list.append(Field)
+ # orConditions = {"orConditions": orConditions_list}
+ # andConditions.append(orConditions)
+ # elif conrandomstraints["type"] == "email":
+ # Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ # randomOperator_1 = random.sample(Operator, 1)
+ # randomOperator = randomOperator_1[0]
+ # # 获取ip
+ # emil = RandomEmail()
+ # value = []
+ # if randomOperator == "=" or randomOperator == "!=":
+ # value.append(emil)
+ # elif randomOperator == "Like" or randomOperator == "Not Like":
+ # value.append(emil)
+ # elif randomOperator == "notEmpty":
+ # value = []
+ # Field = {"name": name, "expression": randomOperator, "value": value,
+ # "type": type1}
+ # orConditions_list.append(Field)
+ # orConditions = {"orConditions": orConditions_list}
+ # andConditions.append(orConditions)
+ # else:
+ # type1 = k["type"]
+ # orConditions_list = []
+ # operator1 = k["doc"]["constraints"]["operator_functions"]
+ # operator2 = operator1.split(",")
+ # if testpoint == "Filter":
+ # for op in operator2:
+ # operatordata = k["doc"]["data"]
+ # code = []
+ # for i in operatordata:
+ # code_1 = i["code"]
+ # code.append(code_1)
+ # for co in code:
+ # Field = {"name": name, "expression": op, "value": co, "type": type1}
+ # orConditions_list.append(Field)
+ # else:
+ # operator3 = random.sample(operator2, 1)
+ # operatordata = k["doc"]["data"]
+ # code = []
+ # for i in operatordata:
+ # code_1 = i["code"]
+ # code.append(code_1)
+ # code2 = random.sample(code, 1)
+ # Field = {"name": name, "expression": operator3[0], "value": code2, "type": type1}
+ # orConditions_list.append(Field)
+ # orConditions = {"orConditions": orConditions_list}
+ # andConditions.append(orConditions)
+ else:
+ for i in list1:
+ # 遍历fields列表
+ for k in fields:
+ # 当filters列表值等于fields的name时
+ if i == k["name"]:
+ name = k["name"]
+ doc = k["doc"]
+ # 获取无任何特殊说明列:
+ if doc == None:
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list = []
+ Operator = ["=", "!=", ">", "<", ">=", "<="]
+ if testpoint == "Filter":
+ for op in Operator:
+ value = [str(number)]
+ Field = {"name": name, "expression": op, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator = random.sample(Operator, 1)
+ value = [str(number)]
+ Field = {"name": name, "expression": randomOperator[0], "value": value, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list = []
+ Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ if testpoint == "Filter":
+ for op in Operator:
+ value = []
+ if op == "=" or op == "!=":
+ value.append(str(number))
+ elif op == "Like" or op == "Not Like":
+ value.append(randomstr)
+ elif op == "notEmpty" or op == "empty":
+ value = []
+ Field = {"name": name, "expression": op, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(str(number))
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(randomstr)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+
+ else:
+ if k["doc"]["constraints"] == None:
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list = []
+ Operator = ["=", "!=", ">", "<", ">=", "<="]
+ if testpoint == "Filter":
+ for op in Operator:
+ value = [str(number)]
+ Field = {"name": name, "expression": op, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator = random.sample(Operator, 1)
+ value = [str(number)]
+ Field = {"name": name, "expression": randomOperator[0], "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list = []
+ Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ if testpoint == "Filter":
+ for op in Operator:
+ randomOperator = op
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(str(number))
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(randomstr)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(str(number))
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(randomstr)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+
+ else:
+ if k["doc"]["constraints"]["operator_functions"] == None:
+ conrandomstraints = k["doc"]["constraints"]
+ type1 = k["type"]
+ if type1 == "int" or type1 == "long":
+ orConditions_list = []
+ Operator = ["=", "!=", ">", "<", ">=", "<="]
+ if testpoint == "Filter":
+ for op in Operator:
+ randomOperator = op
+ if conrandomstraints["type"] == "timestamp":
+ # 获取当前时间戳
+ t = int(time.time())
+ value = [str(t)]
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ if conrandomstraints["type"] == "timestamp":
+ # 获取当前时间戳
+ t = int(time.time())
+ value = [str(t)]
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ elif type1 == "string":
+ orConditions_list = []
+ Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ if testpoint == "Filter":
+ if conrandomstraints["type"] == "ip":
+ for op in Operator:
+ # 获取ip
+ ip = random_ipv4()
+ value = []
+ if op == "=" or op == "!=":
+ value.append(ip)
+ elif op == "Like" or op == "Not Like":
+ value.append(ip)
+ elif op == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": op, "value": value, "type": type1}
+ orConditions_list.append(Field)
+ elif conrandomstraints["type"] == "email":
+ for op in Operator:
+ randomOperator = op
+ Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ # 获取ip
+ emil = RandomEmail()
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(emil)
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(emil)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ else:
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ if conrandomstraints["type"] == "ip":
+ # 获取ip
+ ip = random_ipv4()
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(ip)
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(ip)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ elif conrandomstraints["type"] == "email":
+ Operator = ["=", "!=", "Like", "Not Like", "notEmpty", "empty"]
+ randomOperator_1 = random.sample(Operator, 1)
+ randomOperator = randomOperator_1[0]
+ # 获取ip
+ emil = RandomEmail()
+ value = []
+ if randomOperator == "=" or randomOperator == "!=":
+ value.append(emil)
+ elif randomOperator == "Like" or randomOperator == "Not Like":
+ value.append(emil)
+ elif randomOperator == "notEmpty":
+ value = []
+ Field = {"name": name, "expression": randomOperator, "value": value,
+ "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ else:
+ type1 = k["type"]
+ orConditions_list = []
+ operator1 = k["doc"]["constraints"]["operator_functions"]
+ operator2 = operator1.split(",")
+ if testpoint == "Filter":
+ for op in operator2:
+ operatordata = k["doc"]["data"]
+ code = []
+ for i in operatordata:
+ code_1 = i["code"]
+ code.append(code_1)
+ for co in code:
+ Field = {"name": name, "expression": op, "value": co, "type": type1}
+ orConditions_list.append(Field)
+ else:
+ operator3 = random.sample(operator2, 1)
+ operatordata = k["doc"]["data"]
+ code = []
+ for i in operatordata:
+ code_1 = i["code"]
+ code.append(code_1)
+ code2 = random.sample(code, 1)
+ Field = {"name": name, "expression": operator3[0], "value": code2, "type": type1}
+ orConditions_list.append(Field)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions.append(orConditions)
+ filterCondition = {"andConditions": andConditions}
+ print("filterCondition", filterCondition)
+ return filterCondition
+
+
+# 获取having条件的串
+def havingjson(schemajson, testpoint, field):
+ number = random.randint(0, 100000)
+ schemametrics = schemajson["data"]["doc"]["schema_query"]["metrics"]
+ aggregation = schemajson["data"]["doc"]["schema_query"]["references"]["aggregation"]
+ schemametrics.append("common_log_id")
+ metricslist = []
+ if testpoint == "Having_Field" or testpoint == "Having_Aggregate" or testpoint == "Having_Operator":
+ metricslist.append(field)
+ else:
+ metricslist = random.sample(schemametrics, 4)
+ fields = schemajson["data"]["fields"]
+
+ if testpoint == "Having_Aggregate":
+ Aggregate = ["COUNTT"]
+ else:
+ Aggregate = ["COUNT", "AVG", "SUM", "MAX", "MIN"]
+
+ if testpoint == "Having_Operator":
+ operator = ["=="]
+ else:
+ operator = ["=", "!=", ">", "<", ">=", "<="]
+
+ andConditions_list = []
+ # 遍历的到的having条件列表
+ if testpoint == "Having_Field":
+ orConditions_list=[]
+ havingdict = {"name": field, "function": "count","expression": "=", "value": 11}
+ orConditions_list.append(havingdict)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions_list.append(orConditions)
+ elif testpoint == "Having_Aggregate":
+ for j in fields:
+ if field == j["name"]:
+ name = j["name"]
+ type1 = j["type"]
+ for v in aggregation:
+ if type1 == v["type"]:
+ orConditions_list = []
+ if v["type"] != "string":
+ functionslist = Aggregate
+ else:
+ functionslist = ["COUNTT"]
+ if field == "common_log_id":
+ functionslist = ["COUNTT"]
+ functions_1 = random.sample(functionslist, 1)
+ operator_1 = random.sample(operator, 1)
+ havingdict = {"name": name, "function": str.lower(functions_1[0]),
+ "expression": operator_1[0], "value": str(number)}
+ orConditions_list.append(havingdict)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions_list.append(orConditions)
+ elif testpoint == "Having_Operator":
+ for j in fields:
+ if field == j["name"]:
+ name = j["name"]
+ type1 = j["type"]
+ for v in aggregation:
+ if type1 == v["type"]:
+ orConditions_list = []
+ if v["type"] != "string":
+ functionslist = Aggregate
+ else:
+ functionsstr = v["functions"]
+ functionslist = functionsstr.split(",")
+ if field == "common_log_id":
+ functionslist = ["COUNT"]
+ functions_1 = random.sample(functionslist, 1)
+ if functions_1 == "COUNT_DISTINCT" and type1 != "string":
+ functions_1 = random.sample(functionslist, 1)
+ operator_1 = random.sample(operator, 1)
+
+ havingdict = {"name": name, "function": str.lower(functions_1[0]),
+ "expression": operator_1[0], "value": str(number)}
+ orConditions_list.append(havingdict)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions_list.append(orConditions)
+
+ else:
+ for i in metricslist:
+ for j in fields:
+ if i == j["name"]:
+ name = j["name"]
+ type1 = j["type"]
+ for v in aggregation:
+ if type1 == v["type"]:
+ orConditions_list = []
+ if v["type"] != "string":
+ functionslist = Aggregate
+ else:
+ functionsstr = v["functions"]
+ functionslist = functionsstr.split(",")
+ if field == "common_log_id":
+ functionslist = ["COUNT"]
+ if testpoint == "Having":
+ for functions_1 in functionslist:
+ for operator_1 in operator:
+ havingdict = {"name": name, "function": str.lower(functions_1),
+ "expression": operator_1, "value": str(number)}
+ orConditions_list.append(havingdict)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions_list.append(orConditions)
+ else:
+ functions_1 = random.sample(functionslist, 1)
+ if functions_1 == "COUNT_DISTINCT" and type1 != "string":
+ functions_1 = random.sample(functionslist, 1)
+ operator_1 = random.sample(operator, 1)
+
+ havingdict = {"name": name, "function": str.lower(functions_1[0]),
+ "expression": operator_1[0], "value": str(number)}
+ orConditions_list.append(havingdict)
+ orConditions = {"orConditions": orConditions_list}
+ andConditions_list.append(orConditions)
+ havingCondition = {"andConditions": andConditions_list}
+ print("having", havingCondition)
+ return havingCondition
+
+# 拼接字符串
+def datasetjson(schemauerl, token, testname, logtype, testpoint, field):
+ schema_new = schema(schemauerl, token, logtype)
+ group_re = groupby(schema_new, logtype, testpoint)
+ groupColumnList = group_re[0]
+ group_randomstr = group_re[1]
+ queryColumnList = DataBindings(schema_new, group_randomstr, testpoint, field)
+ filterCondition_1 = filterCondition(schema_new, testpoint, field)
+ havingjson_1 = havingjson(schema_new, testpoint, field)
+ if testpoint == "LogType":
+ logtype = field
+ datasetdict = {
+ "list": {
+ "name": testname,
+ "logType": logtype,
+ "groupColumnList": groupColumnList,
+ "queryColumnList": queryColumnList,
+ "filterCondition": filterCondition_1,
+ "havingCondition": havingjson_1
+ }
+ }
+ print(datasetdict)
+ print("datasetjson", json.dumps(datasetdict))
+ return json.dumps(datasetdict)
+
+def ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, datasetgeturl, chargeturl, testname, logtype,
+ testpoint, field=None):
+ headers = {"Content-Type": "application/json", "Authorization": token}
+ # dataset生成json串并发送请求
+ _datasetjson = datasetjson(schemaurl, token, testname, logtype, testpoint, field)
+ response1 = requests.post(url=dataseturl, data=_datasetjson, headers=headers)
+ print("返回数据1", response1)
+ code = response1.json()["code"]
+ print("datasetcode:", code)
+ if testpoint == "LogType":
+ assert code == 40040002
+ elif testpoint == "GroupBy":
+ assert code == 40040008
+ elif testpoint == "DataBindings_Field":
+ assert code == 40040004
+ elif testpoint == "DataBindings_Aggregate":
+ assert code == 40040006
+ elif testpoint == "Filter_Field":
+ assert code == 40040007
+ elif testpoint == "Filter_Operator":
+ assert code == 40040010
+ elif testpoint == "Having_Field":
+ assert code == 40040074
+ elif testpoint == "Having_Aggregate":
+ assert code == 40040072
+ elif testpoint == "Having_Operator":
+ assert code == 40040073
+
+def ReportPositiveTest_Negtive(host, port, token, dataseturl, charurl, repporturl, logtypelist):
+ testpoint=["LogType","GroupBy","DataBindings_Field","DataBindings_Aggregate","Filter_Field","Filter_Operator","Having_Field","Having_Aggregate","Having_Operator"]
+ for logtype in logtypelist:
+ schemaurl = "http://" + host + ":" + port + "/v1/log/schema?logType=" + logtype
+ schema_new = schema(schemaurl, token, logtype)
+ metrics = schema_new["data"]["doc"]["schema_query"]["metrics"]
+ schemafilters = schema_new["data"]["doc"]["schema_query"]["filters"]
+ metrics.append("common_log_id")
+ for j in testpoint:
+ print(j)
+ if j == "LogType":
+ testname = "Report" + logtype + j
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ filter = "Negtive_log"
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "GroupBy":
+ testname = "Report" + logtype + j
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ filter = "GroupByNegtive"
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "DataBindings_Field":
+ testname = "Report" + logtype + j
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ filter = "DataBindingsFieldNegtive"
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "DataBindings_Aggregate":
+ for filter in metrics:
+ testname = "Report" + logtype + j + filter
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "Filter_Field":
+ testname = "Report" + logtype + j
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ filter = "FilterFieldNegtive"
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "Filter_Operator":
+ for filter in schemafilters:
+ testname = "Report" + logtype + j + filter
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+ if j == "Having_Field":
+ testname = "Report" + logtype + j
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ filter="HavingFieldNegtive"
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "Having_Aggregate":
+ for filter in metrics:
+ testname = "Report" + logtype + j + filter
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
+
+ if j == "Having_Operator":
+ for filter in metrics:
+ testname = "Report" + logtype + j + filter
+ dataset_geturl = dataseturl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&logType=&opStartTime=&opEndTime=&opUser="
+ char_geturl = charurl + "?pageSize=20&pageNo=1&id=&name=" + testname + "&opUser="
+ ReportInterfaceTest(schemaurl, token, dataseturl, charurl, repporturl, dataset_geturl, char_geturl,
+ testname, logtype, j, filter)
diff --git a/keyword/common/customlibrary/Custometest/Schema.py b/keyword/common/customlibrary/Custometest/Schema.py
new file mode 100644
index 0000000..835d96a
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/Schema.py
@@ -0,0 +1,350 @@
+# !/user/bin/python
+# -*-coding:utf-8-*-
+import requests
+import random
+import json
+
+# import allure
+
+list = []
+
+
+# 请求schema接口得到返回数据,用于其他接口
+def schema(schemauerl, token):
+ url = schemauerl # "http://192.168.44.72:8080/v1/log/schema?logType=security_event_log"
+ headers = {"Content-Type": "application/x-www-form-urlencoded", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ return response.json()
+
+
+# 根据schema接口返回数据,得出所有属性所支持的比较类型的列表
+# 1、根据[doc][allow_query]值为true列支持搜索;
+# 2、如有[doc][constraints][operator_functions]值,操作优先;
+# 3、如有[doc][data]值则对应属性取值为data所列code值;
+# 4、int和long的范围不一致;
+# 5、string要包含特殊字符
+# 6、给查询条件赋值,要给出边界和正常值
+# 7、IP(V4、V6)和URL要给出专门的方法生成
+
+import ipaddress
+
+# 生成随机ipv4或ipv6
+MAX_IPV4 = ipaddress.IPv4Address._ALL_ONES # 2 ** 32 - 1
+MAX_IPV6 = ipaddress.IPv6Address._ALL_ONES # 2 ** 128 - 1
+
+
+def random_ipv4():
+ return ipaddress.IPv4Address._string_from_ip_int(
+ random.randint(0, MAX_IPV4)
+ )
+
+
+def random_ipv6():
+ return ipaddress.IPv6Address._string_from_ip_int(
+ random.randint(0, MAX_IPV6)
+ )
+
+
+from random import Random
+
+
+# 生成 12 位随机 URL 地址
+def randrom_url():
+ str = ''
+ str1 = ''
+ chars = 'abcdefghijklmnopqrstuvwxyz0123456789'
+ chars1 = 'abcdefghijklmnopqrstuvwxyz0123456789!#$%^&*()'
+ length = len(chars)
+ length1 = len(chars1)
+ random = Random()
+ for x in range(random.randint(8, 16)):
+ str += chars[random.randint(0, length - 1)]
+ for pp in range(random.randint(8, 16)):
+ str1 += chars1[random.randint(0, length1 - 1)]
+ url = str[0:-5] + "." + str[0:-6] + "." + str[0:-7] + "/" + str1
+ print(url)
+ return url
+
+
+def Filter1(schemauerl, token):
+ json_str = schema(schemauerl, token)
+ print(type(json_str))
+ # 获取日志属性定义
+ fields = json_str["data"]["fields"]
+ # 获取不同属性支持的部不同操作
+ operator = json_str["data"]["doc"]["schema_query"]["references"]["operator"]
+ for i in fields:
+ number = random.randint(-2147483648, 2147483647)
+ maxnumber = 2147483647
+ minnumber = -2147483648
+ str = random.choice('abcdefghijklmnopqrstuvwxyz!@#$%^&*')
+ name = i["name"]
+ doc = i["doc"]
+ # 获取无任何特殊说明列:
+ if doc == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "not empty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ if i["doc"]["constraints"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "not empty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+
+ else:
+ if i["doc"]["constraints"]["operator_functions"] == None:
+ type1 = i["type"]
+ for j in operator:
+ if type1 == j["type"]:
+ if type1 == "int" or type1 == "long":
+ value1 = number
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + f"{value1}" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + f"{value1}"
+ list.append(str1)
+ elif type1 == "string":
+ value1 = str
+ functions = j["functions"]
+ functions1 = functions.split(",")
+ for v in functions1:
+ if v == "not empty" or v == "empty":
+ str1 = v + "(" + " '" + name + " '" + ")"
+ list.append(str1)
+ elif v == "in" or v == "not in":
+ str1 = name + " " + v + " " + "(" + " '" + value1 + " '" + ")"
+ list.append(str1)
+ else:
+ str1 = name + " " + v + " " + " '" + value1 + " '"
+ list.append(str1)
+ else:
+ type1 = i["type"]
+ operator1 = i["doc"]["constraints"]["operator_functions"]
+ operator2 = operator1.split(",")
+ data = i["doc"]["data"]
+ for d in data:
+ code = d["code"]
+ if type1 == "int" or type1 == "long":
+ for o in operator2:
+ str1 = name + " " + o + " " + code
+ list.append(str1)
+ else:
+ for o in operator2:
+ str1 = name + " " + o + " " + " '" + code + " '"
+ list.append(str1)
+
+
+ print(list)
+ return list
+
+
+# 根据Filter1方法中的的数据,写入log请求接口中,来验证log请求接口
+def logapiverify(logurl, schemauerl, token, starttime, endtime, logtype):
+ filter2 = Filter1(schemauerl, token)
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ str2 = ""
+ for i in filter2:
+ str2 = str2 + i + " " + "and" + " "
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": i
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+ print(str2)
+ str3 = str2[0:-4]
+ print(str3)
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": str3
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ assert code == 200
+ print(response1.json()["code"])
+
+
+# 精确filter,请求日志接口
+def loglistverify(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ url = logurl # "http://192.168.44.72:8080/v1/log/list"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ assert code == 200
+ print(response1.json()["code"])
+ return response1.json()
+
+
+# 事件日志和通联日志时间分布查询 ,日志检索条件校验(filter内容验证)
+def distributed_query(logurl, token):
+ url = logurl # url示例:http://192.168.44.72:8080/v1/interface/gateway/sql/galaxy/security_event_hits_log/timedistribution?logType=security_event_hits_log&startTime=2021-03-26 12:27:03&endTime=2021-03-29 12:27:03&granularity=PT5M
+ headers = {"Content-Type": "application/json", "Authorization": token}
+ response = requests.get(url=url, headers=headers)
+ code = response.json()["code"]
+ print(response.json())
+ assert code == 200
+ print(response.json()["code"])
+ return response.json()
+
+
+# 原始日志检索时间分布计算
+def timedistribution(logurl, token, starttime, endtime, logtype, granularity, filtervalue):
+ url = logurl # "http://192.168.44.72:8080/v1/log/timedistribution"
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "startTime": starttime,
+ "endTime": endtime,
+ "logType": logtype,
+ "granularity": granularity,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志总数查询
+def countlog_query(logurl, token, starttime, endtime, logtype):
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "pageSize": 20,
+ "logType": logtype,
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "filter": ""
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ code = response1.json()["code"]
+ print(response1.json())
+ print(response1.json()["code"])
+ assert code == 200
+ return response1.json()
+
+# 日志导出接口
+def exportlog(logurl, schemauerl, token, starttime, endtime, logtype, filtervalue):
+ a = schema(schemauerl, token)
+ fields = a["data"]["fields"]
+ print(fields)
+ url = logurl
+ headers = {"Content-Type": "application/json",
+ "Authorization": token}
+ data = {
+ "start_common_recv_time": starttime,
+ "end_common_recv_time": endtime,
+ "logType": logtype,
+ "fields": fields,
+ "filter": filtervalue
+ }
+ print(data)
+ print(json.dumps(data))
+ response1 = requests.post(url=url, data=json.dumps(data), headers=headers)
+ a=type(response1)
+ if a != "class 'requests.models.Response'":
+ assert 1 == 1
+ else:
+ assert 1 == 2
+
+
+
+
+# if __name__ == '__main__':
+# logapiverify("http://192.168.32.59:8080/v1/log/list","http://192.168.32.59:8080/v1/log/schema?logType=security_event_log","d475b20d-e2b8-4f24-87ee-d54af46e6aff&807&",'2021-03-20 16:36:41','2021-03-21 17:36:41',"security_event_log") \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/StringManipulation.py b/keyword/common/customlibrary/Custometest/StringManipulation.py
new file mode 100644
index 0000000..858d6fc
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/StringManipulation.py
@@ -0,0 +1,7 @@
+from builtins import len
+
+#用于Settings和Administrator关键字对字符串的切割
+def StringSegmentation(a,k):
+ b=len(a)
+ c=a[k:b]
+ return c \ No newline at end of file
diff --git a/keyword/common/customlibrary/Custometest/UIAssert.py b/keyword/common/customlibrary/Custometest/UIAssert.py
new file mode 100644
index 0000000..267c100
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/UIAssert.py
@@ -0,0 +1,22 @@
+import ssl, socket
+
+# content of test_json_schema.py
+from jsonschema import validators
+
+
+#获取页面证书信息
+def ttt():
+ hostname = 'vip.com'
+ ctx = ssl.create_default_context()
+ with ctx.wrap_socket(socket.socket(), server_hostname=hostname) as s:
+ s.connect((hostname, 443))
+ cert = s.getpeercert()
+
+ subject = dict(x[0] for x in cert['subject'])
+ issued_to = subject['commonName']
+ issuer = dict(x[0] for x in cert['issuer'])
+ issued_by = issuer['commonName']
+ print(issued_by)
+
+
+
diff --git a/keyword/common/customlibrary/Custometest/__init__.py b/keyword/common/customlibrary/Custometest/__init__.py
new file mode 100644
index 0000000..4081636
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__init__.py
@@ -0,0 +1,16 @@
+
+#-*- coding:utf-8 -*-
+'''
+ created by hch 2019-06-26
+'''
+
+from Custometest.printlog import printlog
+from Custometest.MD5 import MD5
+from Custometest.cmd_cer import Order
+# from custometest.printlog import printlog
+
+
+__version__ = '1.0'
+
+class Custometest(printlog,Order,MD5):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL'
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pyc
new file mode 100644
index 0000000..190d80c
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/Common.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pyc
new file mode 100644
index 0000000..38716f5
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/JsonDiff.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pyc
new file mode 100644
index 0000000..02d7ce7
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/LogResponseVAL.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pyc
new file mode 100644
index 0000000..5c242a7
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/Schema.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pyc
new file mode 100644
index 0000000..44a26a1
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/StringManipulation.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pyc
new file mode 100644
index 0000000..30038ed
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/UIAssert.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pyc b/keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pyc
new file mode 100644
index 0000000..f6ad333
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/__pycache__/log_contrast.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Custometest/certificate.yaml b/keyword/common/customlibrary/Custometest/certificate.yaml
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/certificate.yaml
diff --git a/keyword/common/customlibrary/Custometest/cmd_cer.py b/keyword/common/customlibrary/Custometest/cmd_cer.py
new file mode 100644
index 0000000..50cdf08
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/cmd_cer.py
@@ -0,0 +1,290 @@
+import os
+import subprocess
+from time import sleep
+import platform
+
+
+
+class Order:
+ def CMD(self,data):
+ result = os.popen(data)
+ # res = result.read().encoding('GBK')
+ res = result.read()
+ result.close()
+ # res = res.decode("unicode-escape")
+ return res
+ def Linux(self):
+ pass
+ # 根据证书颁发者名字判断证书是否替换
+ def Cert_Verification(self,data):
+ c = []
+ print(1)
+ #with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
+ with open(r'certificate.yaml', 'r') as foo:
+ print(2)
+ for line in foo.readlines():
+ if data in line:
+ print(line)
+ c.append('证书已替换')
+ else:
+ pass
+ if '证书已替换' in c:
+ # print('证书已替换')
+ foo.close()
+ return '证书已替换'
+ else:
+ # print('证书未替换')
+ foo.close()
+ return '证书未替换'
+
+ def Content_Type(self,data):
+ d = []
+ with open('certificate.yaml', 'r') as foo:
+ for line in foo.readlines():
+ if data in line:
+ # print(line)
+ d.append('Content_Type已替换')
+ else:
+ pass
+ if 'Content_Type已替换' in d:
+ # print('证书已替换')
+ foo.close()
+ return 'Content_Type已替换'
+ else:
+ # print('证书未替换')
+ foo.close()
+ return 'Content_Type未替换'
+ # curl路由内容设置
+ def curl_name(self,data):
+ #curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
+ curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
+ return curl_name
+ # 控制器
+ def manu(self,url,Certificate):
+ # print(data['url'])
+ n = 0
+ while n != len(url):
+ b = self.curl_name(url[n])
+ d = self.CMD(b)
+ # print(d)
+ sleep(1)
+ if Certificate != "":
+ c =self.Cert_Verification(Certificate)
+ # f = self.Content_Type(data["Content_Type"])
+ sleep(1)
+ assert_cer = url[n]+c
+ # assert_Content_Type = data['Content_Type']+f
+ n+=1
+ return d,assert_cer
+
+ def FTP(self, ftp_type):
+ windows_path = os.getcwd()
+ linux_path = os.getcwd().replace('\\', '/')
+ # 判断FTP执行类型:(下载/登录)
+ if ftp_type == "下载":
+ # 调用cmd执行FTP下载文件
+ data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" -o zmmtext123.txt'
+ d = self.CMD(data)
+ sleep(5)
+ fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
+ if fsize == 435814:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+ elif ftp_type == "登录":
+ data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+ # print(d)
+ if "Graphical (Xorg) program starter for ADRIANE" in d:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+ # FTP 下载
+ def FTP_down(self, ftp_url,file_size,file_name):
+ windows_path = os.getcwd()
+ linux_path = os.getcwd().replace('\\', '/')
+ # 判断FTP执行类型:(下载/登录)
+ # 调用cmd执行FTP下载文件
+ data = 'curl -m 20 '+ftp_url+ '-o '+ file_name + " ' "
+ print(data)
+ d = self.CMD(data)
+ sleep(5)
+ fsize = os.path.getsize(linux_path + "/"+file_name) # 435814
+ print(fsize)
+ if fsize == file_size:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+
+ # FTP 登录
+ def FTP_login(self, ftp_url,file_content):
+ SYS = self.Operating_System()
+ if SYS == "Windows":
+ data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+ else:
+ data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+
+ if file_content in d:
+ return "ftp_success"
+ else:
+ return "ftp_fail"
+
+ # 判断当前操作系统
+ def Operating_System(self):
+ os_name = platform.system()
+ return os_name
+
+
+ #需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容
+ # 全局变量 null,将java中的空值(null),装换位python中空值("")
+ global null
+ null = ''
+
+ # 对需要替换的内容进行循环替换 jsons为初始默认json datas为需要替换的内容,header 启用自定义json
+ def Jsoneditmanu(self, jsons, datas=None,header=None):
+ #判断是否启用自定义json
+ if header != None:
+ header = eval(header)
+ # 返回 header
+ return header
+ # 判断是否需要更改json内容
+ elif datas != None:
+ # datas = eval(datas)
+ jsons = eval(jsons)
+ # 循环遍历替换json内容
+ for k, v in datas.items():
+ Order.UpdateAllvalues(self,jsons, k, v)
+ return jsons
+ else:
+ # 返回初始json
+ return jsons
+
+ # 循环嵌套替换
+ def UpdateAllvalues(self,mydict, key, value):
+ if isinstance(mydict, dict): # 使用isinstance检测数据类型,如果是字典
+ if key in mydict.keys(): # 替换字典第一层中所有key与传参一致的key
+ mydict[key] = value
+ for k in mydict.keys(): # 遍历字典的所有子层级,将子层级赋值为变量chdict,分别替换子层级第一层中所有key对应的value,最后在把替换后的子层级赋值给当前处理的key
+ chdict = mydict[k]
+ Order.UpdateAllvalues(self,chdict, key, value)
+ mydict[k] = chdict
+ elif isinstance(mydict, list): # 如是list
+ for element in mydict: # 遍历list元素,以下重复上面的操作
+ if isinstance(element, dict):
+ if key in element.keys():
+ element[key] = value
+ for k in element.keys():
+ chdict = element[k]
+ Order.UpdateAllvalues(self,chdict, key, value)
+ element[k] = chdict
+
+
+
+ #递归提取json中符合要求键的值
+ import json
+ def get_dict_allkeys(self,dict_a):
+ """
+ 遍历嵌套字典,获取json返回结果的所有key值
+ :param dict_a:
+ :return: key_list
+ """
+ if isinstance(dict_a, dict): # 使用isinstance检测数据类型
+ # 如果为字典类型,则提取key存放到key_list中
+ for x in range(len(dict_a)):
+ temp_key = list(dict_a.keys())[x]
+ temp_value = dict_a[temp_key]
+ if temp_key.endswith("Id"):
+ key_list.append(temp_value)
+ Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
+ elif isinstance(dict_a, list):
+ # 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key
+ for k in dict_a:
+ if isinstance(k, dict):
+ for x in range(len(k)):
+ temp_key = list(k.keys())[x]
+ temp_value = k[temp_key]
+ if temp_key.endswith("Id"):
+ key_list.append(temp_value)
+ Order.get_dict_allkeys(self,temp_value) # 自我调用实现无限遍历
+ return key_list
+
+ # 判断值是否在列表中
+ def VerifyProxy(self,data,lists):
+ global key_list
+ key_list = []
+ datas = Order.get_dict_allkeys(self,data)
+ print(type(datas))
+ lists=lists.split(",")
+ print(type(lists))
+ print("gsd")
+ datas2=list(map(str,datas))
+ print(datas2)
+ print(datas)
+ print(lists)
+
+ if set(datas2) > set(lists):
+ return "true"
+ else:
+ return "flase"
+
+
+if __name__ == '__main__':
+# datas = {"url":['https://www.baidu.com'],
+# "Certificate":"Tango Secure Gateway CA",
+# # "Content_Type":"text/html",
+# 'log':'Security Event Logs',
+# "sni":['baidu'],
+# "intercept_code":"200",
+# "log_code":"200",
+# "certifucate":"1",
+# "log_content":"true"
+# }
+# # data= {"url":['https://www.baidu.com'],
+# # "Certificate":"Tango Secure Gateway CA"
+# # }
+# # url = ['https://www.baidu.com']
+# # url = ['https://www.baidu.com']
+# # url = ['https://www.baidu.com']
+# # # Certificate1 = "Tango Secure Gateway CA"
+# # Certificate = "baidu"
+# # a='Tango Secure Gateway CA'
+# # s = Order()
+# # b = s.manu(url,Certificate)
+# # print(b[1])
+# # FTP下载 传入ftp的路径和文件大小
+# ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" '
+# ftp_size = 435814
+# ftp_issue = s.FTP_down(ftp_url,ftp_size)
+# # FTP登录 传入ftp的路径和文件内容
+# ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" '
+# file_content = "Graphical (Xorg) program starter for ADRIANE"
+# ftp_issue = s.FTP_login(ftp_url,file_content)
+ # for i in b:
+ # print(i)
+ # dd = s.CMD('curl -I https://www.baidu.com')
+ # print(dd)
+ # if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
+ # print("ok")
+ # a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'
+
+
+ # 自己写一个字典测试一下上面的方法好用不好用
+ jsons = {"opAction":"add","policyList":{"policyId":"","policyName":"2324242423","policyType":"tsg_security","action":"intercept","userTags":"","doBlacklist":0,"doLog":1,"policyDesc":"","effectiveRange":{"tag_sets":[[]]},"userRegion":{"protocol":"SSL","keyring":1,"decryption":1,"decrypt_mirror":{"enable":0,"mirror_profile":null}},"referenceObject":[{"objectId":28054,"protocolFields":["TSG_SECURITY_SOURCE_ADDR"]}],"isValid":0,"scheduleId":[],"appObjectIdArray":[3]}}
+ datas = {"protocol":"edit","opAction":"edit","policyId":123,'protocolFields':1}
+
+ print("替换前:\n %s" % jsons)
+
+
+ a = Order()
+ b = a.Jsoneditmanu(jsons,datas)
+ # print("替换前:\n %s" % jsons)
+ print("替换后:\n %s" % b)
+
+ data = {"aid":[{'bid':2},{'cid':3}]}
+ print(type(data))
+ # data="""{}"""
+ # data1 = json.loads(data)
+ get_keys = get_dict_allkeys(data)
+ print(get_keys)
diff --git a/keyword/common/customlibrary/Custometest/log_contrast.py b/keyword/common/customlibrary/Custometest/log_contrast.py
new file mode 100644
index 0000000..96e5e35
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/log_contrast.py
@@ -0,0 +1,8 @@
+#!/user/bin/python
+#-*-coding:utf-8-*-
+def log_contrast(logs,client_ip,policy_id,parmkey,parmvalue):
+ if (str(client_ip) in str(logs))and (str(policy_id) in str(logs)) and (str(parmkey) in str(logs)) and (str(parmvalue) in str(logs)):
+ print(logs)
+ return "true"
+ else:
+ return "false"
diff --git a/keyword/common/customlibrary/Custometest/printlog.py b/keyword/common/customlibrary/Custometest/printlog.py
new file mode 100644
index 0000000..02f8ced
--- /dev/null
+++ b/keyword/common/customlibrary/Custometest/printlog.py
@@ -0,0 +1,11 @@
+
+#-*- coding:utf-8 -*-
+'''
+ created by hch 2019-06-26
+'''
+
+
+class printlog():
+
+ def printA():
+ print("hello word") \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py
new file mode 100644
index 0000000..e60bf56
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/__init__.py
@@ -0,0 +1,7 @@
+#coding=utf-8
+from mytool import mytool
+
+version = '1.0'
+
+class ExtensionLibrary(mytool):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL' \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py
new file mode 100644
index 0000000..03440cb
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/ExtensionLibrary/mytool.py
@@ -0,0 +1,26 @@
+#coding=utf-8
+import socket
+
+""" 获取主机信息 """
+class mytool():
+ def __init__(self):
+ pass
+
+ def get_host_IP(self, flag="50"):
+ #try:
+ #s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ #s.connect(('8.8.8.8', 80))
+ #ip = s.getsockname()[0]
+ #finally:
+ #s.close()
+ hostname = socket.gethostname()
+ ipTriple = socket.gethostbyname_ex(hostname)
+ ips = list(ipTriple[2:])
+ ip = ""
+ for i in ips:
+ ipList = list(i)
+ for ip1 in ipList:
+ if "."+flag+"." in ip1:
+ ip = ip1
+ break
+ return ip \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py
new file mode 100644
index 0000000..29db0ed
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/__init__.py
@@ -0,0 +1,7 @@
+#coding=utf-8
+from filetool import filetool
+
+version = '1.0'
+
+class FileLibrary(filetool):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL' \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py
new file mode 100644
index 0000000..8ad1ef7
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/FileLibrary/filetool.py
@@ -0,0 +1,22 @@
+#coding=utf-8
+
+""" 变量文件操作 """
+class filetool():
+ def __init__(self):
+ pass
+
+ def alter_dict(self, path, k, v):
+ data = ''
+ flag = True
+ key = '${%s}' % (k)
+ add = key + '\t%s' % (v) + '\n'
+ with open(path, 'r+') as f:
+ for line in f.readlines():
+ if(line.find(key + '\t') == 0):
+ line = add
+ flag = False
+ data += line
+ if(flag):
+ data += add
+ with open(path, 'w+') as f:
+ f.writelines(data) \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py
new file mode 100644
index 0000000..7df32d2
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/GetTime.py
@@ -0,0 +1,85 @@
+#coding=utf-8
+import datetime
+import time
+import string
+class GetTime():
+ def __init__(self):
+ pass
+ def time1(self,t):
+ if t=="m":
+ print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
+ if time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[-4] >= "5":
+ time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-4]+"5:00"
+ return time2
+ else:
+ time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-4] + "0:00"
+ return time2
+ elif t=="s":
+ if time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[-2] >= "3":
+ time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-2]+"30"
+ return time2
+ else:
+ time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-2] + "00"
+ return time2
+ elif t=="h":
+ time2=time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))[0:-7]+":00:00"
+ startTime = datetime.datetime.strptime(time2, "%Y-%m-%d %H:%M:%S")
+ startTime2 = (startTime + datetime.timedelta(hours=-1)).strftime("%Y-%m-%d %H:%M:%S")
+ return startTime2
+ elif t=="5m":
+ time2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+ print(time2)
+ startTime = datetime.datetime.strptime(time2, "%Y-%m-%d %H:%M:%S")
+ startTime2 = (startTime + datetime.timedelta(minutes=-5)).strftime("%Y-%m-%d %H:%M:%S")
+ if startTime2[-4]>="5":
+ time3 = startTime2[0:-4] + "5:00"
+ print(time3)
+ else:
+ time3 = startTime2[0:-4] + "0:00"
+ print(time3)
+ return time3
+ elif t== "30s":
+ time2 = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+ print(time2)
+ startTime = datetime.datetime.strptime(time2, "%Y-%m-%d %H:%M:%S")
+ startTime2 = (startTime + datetime.timedelta(seconds=-30)).strftime("%Y-%m-%d %H:%M:%S")
+ if startTime2[-2] >= "3":
+ time3 = startTime2[0:-2]+"30"
+ print(time3)
+ return time3
+ else:
+ time3 = startTime2[0:-2]+"00"
+ print(time3)
+ return time3
+ def str2sec(self,x):
+ a, b = x.strip().split(' ')
+ y, m, d = a.strip().split('-')
+
+ h, m, s = b.strip().split(':') #.split()函数将其通过':'分隔开,.strip()函数用来除去空格
+
+ return int(h)*3600 + int(m)*60 + int(s),d #int()函数转换成整数运算
+ def intersection(self,a,b):
+ c= 0
+ f = 0
+ for j in a:
+
+ for i in j:
+ if i in b[f]:
+ print(i)
+ print(b[f])
+ c= c+1
+ f = f + 1
+ d = len(b)*len(b[0])
+ if d == 0:
+ e=0
+ elif c==0:
+ e = 0
+ else:
+ e = c/d
+ if e >= 0.8 :
+ return "ture"
+ else:
+ return "false"
+
+
+
diff --git a/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py
new file mode 100644
index 0000000..c0f47eb
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/GetTimeLibrary/__init__.py
@@ -0,0 +1,6 @@
+#coding=utf-8
+from GetTimeLibrary.GetTime import GetTime
+
+__version__ = '1.0'
+class GetTimeLibrary(GetTime):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL' \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth b/keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth
new file mode 100644
index 0000000..73e1268
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/extensionLibrary.pth
@@ -0,0 +1,2 @@
+ExtensionLibrary
+FileLibrary \ No newline at end of file
diff --git a/keyword/common/customlibrary/ExtensionPackages/readme.txt b/keyword/common/customlibrary/ExtensionPackages/readme.txt
new file mode 100644
index 0000000..e3b091b
--- /dev/null
+++ b/keyword/common/customlibrary/ExtensionPackages/readme.txt
@@ -0,0 +1,4 @@
+# 导入方法:
+# 1、将‘extensionLibrary.pth’文件放到...\Python\Lib\site-packages目录下
+# 2、根据需要将包目录同‘extensionLibrary.pth’文件一同放到...\Python\Lib\site-packages目录下(目前有‘ExtensionLibrary’和‘FileLibrary’)
+# 3、在测试夹中导入此包即可使用扩展功能 \ No newline at end of file
diff --git a/keyword/common/customlibrary/Library/VerifyPolicy.py b/keyword/common/customlibrary/Library/VerifyPolicy.py
new file mode 100644
index 0000000..a0bce71
--- /dev/null
+++ b/keyword/common/customlibrary/Library/VerifyPolicy.py
@@ -0,0 +1,38 @@
+import json
+def get_dict_allkeys(dict_a):
+ if isinstance(dict_a, dict): # 使用isinstance检测数据类型
+ # 如果为字典类型,则提取key存放到key_list中
+ for x in range(len(dict_a)):
+ temp_key = list(dict_a.keys())[x]
+ temp_value = dict_a[temp_key]
+ if temp_key.endswith("Id"):
+ key_list.append(temp_value)
+ get_dict_allkeys(temp_value) # 自我调用实现无限遍历
+ elif isinstance(dict_a, list):
+ # 如果为列表类型,则遍历列表里的元素,将字典类型的按照上面的方法提取key
+ for k in dict_a:
+ if isinstance(k, dict):
+ for x in range(len(k)):
+ temp_key = list(k.keys())[x]
+ temp_value = k[temp_key]
+ if temp_key.endswith("Id"):
+ key_list.append(temp_value)
+ get_dict_allkeys(temp_value) # 自我调用实现无限遍历
+ return key_list
+def VerifyProxy(data,lists):
+ global key_list
+ key_list = []
+ datas = get_dict_allkeys(data)
+ print(type(datas))
+ lists=lists.split(",")
+ print(type(lists))
+ print("gsd")
+ datas2=list(map(str,datas))
+ print(datas2)
+ print(datas)
+ print(lists)
+
+ if set(datas2) > set(lists):
+ return "true"
+ else:
+ return "flase"
diff --git a/keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pyc b/keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pyc
new file mode 100644
index 0000000..12341de
--- /dev/null
+++ b/keyword/common/customlibrary/Library/__pycache__/VerifyPolicy.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pyc b/keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pyc
new file mode 100644
index 0000000..694ad83
--- /dev/null
+++ b/keyword/common/customlibrary/Library/__pycache__/delUseless.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pyc b/keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pyc
new file mode 100644
index 0000000..62d758f
--- /dev/null
+++ b/keyword/common/customlibrary/Library/__pycache__/fileOperations.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Library/delUseless.py b/keyword/common/customlibrary/Library/delUseless.py
new file mode 100644
index 0000000..3039794
--- /dev/null
+++ b/keyword/common/customlibrary/Library/delUseless.py
@@ -0,0 +1,45 @@
+import json
+def dict_del(key, obj):
+ if isinstance(obj, dict):
+ if key in obj:
+ obj.pop(key)
+ #print(obj.items())
+ for k, v in obj.items():
+ #print(obj.items())
+ if v is None:
+ v = '666'
+ else:
+ pass
+ dict_del(key, v)
+ elif isinstance(obj, list):
+ for x in obj:
+ dict_del(key, x)
+ else:
+ pass
+ #print(type(obj))
+ obj = json.dumps(obj)
+ return obj
+def deal(jsondata, keylist):
+ jsondata = json.loads(jsondata)
+ if "data" in jsondata.keys():
+ jsondata = jsondata["data"]
+ else:
+ pass
+ # jsondata = '{"opAction":"add","refuseCode":true,"returnData":1,"objectList":{"objectType":"fqdn","objectSubType":"fqdn","isValid":1,"isInitialize":0,"isExclusion":0,"objectName":"hbn_test_fqdn","objectDesc":"","subObjectIds":[],"addItemList":[{"keywordArray":["*abcds"],"t":"16191477536650","itemId":"","isHexbin":0,"state":2}],"updateItemList":[],"deleteItemIds":[],"iconColor":"#31739C"}}'
+ # keylist = ["objectType","objectSubType","isValid","isInitialize"]
+ # len2 = len(keylist)
+ # print("aaaaaaaaaaaaaaaaaaaaaaaaaa"+str(len2))
+ # for num in range(len2):
+ # retstr = dict_del(keylist[num], jsondata)
+ # print("#############################"+retstr)
+ # return retstr
+#for num in range(len2):
+ #print("$$$$$$$$$$$$$$$$$$"+dict_del(keylist[num],jsondata))
+ #key1 = keylist[num]
+ #print(num)
+ #print("#############################"+dict_del(key1,jsondata))
+ len1 = len(keylist)
+ for num in range(len1):
+ retstr = dict_del(keylist[num], jsondata)
+ dict_del(keylist[num], jsondata)
+ return retstr \ No newline at end of file
diff --git a/keyword/common/customlibrary/Library/fileOperations.py b/keyword/common/customlibrary/Library/fileOperations.py
new file mode 100644
index 0000000..4029e26
--- /dev/null
+++ b/keyword/common/customlibrary/Library/fileOperations.py
@@ -0,0 +1,26 @@
+def CountLines(fname):
+ count = 0
+ with open(fname, 'rb') as f:
+ for file_line in f:
+ file_line = file_line.strip()
+ # print(file_line)
+ # 空行
+ if file_line == b'':
+ pass
+
+ # 注释 # 开头
+ elif file_line.startswith(b'-->'):
+ pass
+
+ # 代码
+ else:
+ count += 1
+ print(fname + '----', count)
+ # 单个文件行数
+ # print(fname,'----count:',count)
+ return count
+def WriteBinary(response,path1):
+ with open(path1,"wb") as f2:
+ strb = response
+ f2.write(strb)
+
diff --git a/keyword/common/customlibrary/Pop3Library/__init__.py b/keyword/common/customlibrary/Pop3Library/__init__.py
new file mode 100644
index 0000000..66219ac
--- /dev/null
+++ b/keyword/common/customlibrary/Pop3Library/__init__.py
@@ -0,0 +1,200 @@
+import poplib
+import base64
+import time
+from email.parser import Parser
+# 用来解析邮件主题
+from email.header import decode_header
+# 用来解析邮件来源
+from email.utils import parseaddr
+
+from robot.api.deco import keyword
+from robot.api import logger
+
+
+class AcceptEmail(object):
+
+ def __init__(self, user_email, password, pop3_server='serverDemon'):
+ self.user_email = user_email
+ self.password = password
+ self.pop3_server = pop3_server
+
+ self.connect_email_server()
+
+ def connect_email_server(self):
+ self.server = poplib.POP3(self.pop3_server)
+ # 打印POP3服务器的欢迎文字,验证是否正确连接到了邮件服务器
+ # print('连接成功 -- ', self.server.getwelcome().decode('utf8'))
+ # +OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0)
+
+ # 开始进行身份验证
+ self.server.user(self.user_email)
+ self.server.pass_(self.password)
+
+ def __del__(self):
+ # 关闭与服务器的连接,释放资源
+ self.server.close()
+
+ def get_email_count(self):
+ # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可
+ email_num, email_size = self.server.stat()
+ # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size))
+ return email_num
+
+ def receive_email_info(self, now_count=None):
+ # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可
+ email_num, email_size = self.server.stat()
+ # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size))
+ self.email_count = email_num
+ self.email_sumsize = email_size
+
+ # 使用list()返回所有邮件的编号,默认为字节类型的串
+ rsp, msg_list, rsp_siz = self.server.list()
+ # print(msg_list, '邮件数量',len(msg_list))
+ # print("服务器的响应: {0},\n消息列表: {1},\n返回消息的大小: {2}".format(rsp, msg_list, rsp_siz))
+ # print('邮件总数: {}'.format(len(msg_list)))
+ self.response_status = rsp
+ self.response_size = rsp_siz
+
+ # 下面获取最新的一封邮件,某个邮件下标(1开始算)
+ # total_mail_numbers = len(msg_list)
+
+ # 动态取消息
+ total_mail_numbers = now_count
+
+ rsp, msglines, msgsiz = self.server.retr(total_mail_numbers)
+ # rsp, msglines, msgsiz = self.server.retr(1)
+ # print("服务器的响应: {0},\n原始邮件内容: {1},\n该封邮件所占字节大小: {2}".format(rsp, msglines, msgsiz))
+
+ # 从邮件原内容中解析
+ msg_content = b'\r\n'.join(msglines).decode('utf-8')#gbk
+ msg = Parser().parsestr(text=msg_content)
+ self.msg = msg
+ # print('解码后的邮件信息:\n{}'.format(msg))
+
+ def recv(self, now_count=None):
+ self.receive_email_info(now_count)
+ self.parser()
+
+ def get_email_title(self):
+ subject = self.msg['Subject']
+ value, charset = decode_header(subject)[0]
+ if charset:
+ value = value.decode(charset)
+ # print('邮件主题: {0}'.format(value))
+ self.email_title = value
+
+ def get_sender_info(self):
+ hdr, addr = parseaddr(self.msg['From'])
+ # name 发送人邮箱名称, addr 发送人邮箱地址
+ name, charset = decode_header(hdr)[0]
+ if charset:
+ name = name.decode(charset)
+ self.sender_qq_name = name
+ self.sender_qq_email = addr
+ # print('发送人邮箱名称: {0},发送人邮箱地址: {1}'.format(name, addr))
+
+ def get_email_content(self):
+ content = self.msg.get_payload()
+ # 文本信息
+ content_charset = content[0].get_content_charset() # 获取编码格式
+ text = content[0].as_string().split('base64')[-1]
+ text_content = base64.b64decode(text).decode(content_charset) # base64解码
+ self.email_content = text_content
+ # print('邮件内容: {0}'.format(text_content))
+
+ # 添加了HTML代码的信息
+ content_charset = content[1].get_content_charset()
+ text = content[1].as_string().split('base64')[-1]
+ # html_content = base64.b64decode(text).decode(content_charset)
+
+ # print('文本信息: {0}\n添加了HTML代码的信息: {1}'.format(text_content, html_content))
+
+ def parser(self):
+ self.get_email_title()
+ self.get_sender_info()
+ #self.get_email_content()
+
+
+def get_new_mail(user_name, pwd, pop3_server, second=5):
+ t = AcceptEmail(user_name, pwd, pop3_server)
+ now_count = t.get_email_count()
+ #print('开启的时候的邮件数量为:%s' % now_count)
+ logger.info("开启的时候的邮件数量为:"+str(now_count))
+ # 每次需要重新连接邮箱服务器,才能获取到最新的消息
+ # 默认每隔5秒看一次是否有新内容
+ num = 0
+ while True:
+ obj = AcceptEmail(user_name, pwd, pop3_server)
+ count = obj.get_email_count()
+ if count > now_count:
+ new_mail_count = count - now_count
+ #print('有新的邮件数量:%s' % new_mail_count)
+ logger.info("有新的邮件数量:"+str(new_mail_count))
+ now_count += 1
+ obj.recv(now_count)
+
+ yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email}
+ #yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email,
+ # "email_content": obj.email_content}
+ if new_mail_count > 0:
+ return
+ # print('-' * 30)
+ # print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % (
+ # obj.email_title, obj.sender_qq_name, obj.sender_qq_email, obj.email_content))
+ # print('-' * 30)
+
+ #else:
+ #print('没有任何新消息.')
+ #logger.info("没有任何新消息.")
+ time.sleep(second)
+ num += 1
+ if num == 36:#等待时间粒度,一个粒度是5s,如果num=10意思就是等待接收邮件50s,若没有邮件就返回;36是等待3min
+ return
+
+
+@keyword('Recv Email')
+def recv_email(user_name, pwd, pop3_server, send_user, subj):
+ '''
+ 参数说明:
+ [user_name]:用户名
+ [pwd]:密码,第三方登入密码
+ [pop server]:pop服务器
+ [sender]:发送者邮箱,注意全称
+ [subj_sub]:主题的部分内容,这里是测主题是否包含该参数
+ [return]:返回值,成功返回success,失败返回n其他
+ 其他问题请阅读该库的readme.txt
+ '''
+ dic = {}
+ logger.info("正在监听邮件服务器端是否有新消息---")
+ #print('正在监听邮件服务器端是否有新消息---')
+ try:
+ iterator = get_new_mail(user_name, pwd, pop3_server)
+ except TypeError:
+ #print('监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容')
+ logger.info("监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容")
+ return "fail"
+ else:
+ for dic in iterator:
+ #print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % (
+ # dic["title"], dic["sender"], dic["sender_email"], dic["email_content"]))
+ #logger.info("邮件主题: " + str(dic["title"]) + " 发件人: " + str(dic["sender"])+"发件人邮箱:"+str(dic["sender_email"]))
+ if dic["sender"] == send_user:
+ #logger.info("发送者一样")
+ if subj in dic["title"]:
+ #logger.info("主题也包含")
+ return "success"
+ else:
+ #logger.info("主题不包含")
+ return "fail"
+ else:
+ return "fail"
+
+
+#if __name__ == '__main__':
+# user = '[email protected]'
+# pwd = 'xxxx'
+# pop3_server = 'pop.qq.com'
+# send_user = '[email protected]'
+# subj = 'or2020'
+# result = recv_email(user, pwd, pop3_server, send_user, subj)
+# print(result) \ No newline at end of file
diff --git a/keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc b/keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc
new file mode 100644
index 0000000..ecd3bf5
--- /dev/null
+++ b/keyword/common/customlibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc
Binary files differ
diff --git a/keyword/common/customlibrary/Pop3Library/readme.txt b/keyword/common/customlibrary/Pop3Library/readme.txt
new file mode 100644
index 0000000..42c7c9b
--- /dev/null
+++ b/keyword/common/customlibrary/Pop3Library/readme.txt
@@ -0,0 +1,26 @@
+导入方法:
+1.将该目录放到....\Python\Lib\site-packages 下
+2.在测试夹具里面要根据绝对路径导入此包
+
+
+注意:
+1.使用该包的关键字时不要用qq邮箱,qq邮箱测试发现时长会失灵现象,最好用163邮箱等
+2.注意关闭邮箱的加密传送方式SSL协议
+3.注意邮箱是否支持POP3的协议以及不同邮箱pop服务器的写法
+
+
+关键字:
+[return] Recv Email [user_name] [pwd] [pop server] [sender] [subj_sub]
+参数说明:
+[user_name]:用户名
+[pwd]:密码,第三方登入密码
+[pop server]:pop服务器
+[sender]:发送者邮箱,注意全称
+[subj_sub]:主题的部分内容,这里是测主题是否包含该参数
+[return]:返回值,成功返回success,失败返回n其他
+该关键字默认等待3min,3min内每5s检测邮箱是否收到邮件,
+若收到邮件与[sender]参数进行完全匹配,与[subj_sub]部分主题内容参数进行是否包含匹配,匹配成功则返回success;
+若收到邮件匹配失败或者超时则返回其他,
+
+
+若修改等待时间,可查找源码中的num变量将36改为其他值即可,注意时间粒度是5s,若num=3,则您修改的等待时间是15s
diff --git a/keyword/common/customlibrary/Smtp3Library/__init__.py b/keyword/common/customlibrary/Smtp3Library/__init__.py
new file mode 100644
index 0000000..c86362a
--- /dev/null
+++ b/keyword/common/customlibrary/Smtp3Library/__init__.py
@@ -0,0 +1,417 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# This file is part of robotframework-Smtp3Library.
+# https://github.io/lucamaro/robotframework-Smtp3Library
+
+# Licensed under the Apache License 2.0 license:
+# http://www.opensource.org/licenses/Apache-2.0
+# Copyright (c) 2016, Luca Maragnani <[email protected]>
+
+"""
+Library implementation
+"""
+import ast
+import smtplib
+import email
+import random
+import string
+import mimetypes
+import quopri
+from email.message import Message
+from email.mime.audio import MIMEAudio
+from email.mime.base import MIMEBase
+from email.mime.image import MIMEImage
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email import encoders
+import os.path
+import socket
+from robot.api.deco import keyword
+from robot.api import logger
+
+from Smtp3Library.version import __version__ # NOQA
+
+COMMASPACE = ', '
+
+class Smtp3Library(object):
+ """
+ SMTP Client class
+ """
+
+ def __init__(self):
+ """
+ Constructor
+ """
+ self.message = self._MailMessage()
+ self.host = None
+ self.port = None
+ self.user = None
+ self.password = None
+ self.smtp = None
+
+ def _prepare_connection(self, host, port, user=None, password=None):
+ """
+ Private method to collect connection informations
+ """
+ self.host = host
+ self.port = int(port)
+ self.user = user
+ self.password = password
+ self.client_hostname = socket.gethostname()
+
+
+ def prepare_ssl_connection(self, host, port=465, user=None, password=None):
+ """
+ Collect connection informations for SSL channel
+ """
+ self._prepare_connection(host, port, user, password)
+ self.smtp = smtplib.SMTP_SSL()
+
+ def prepare_connection(self, host, port=25, user=None, password=None):
+ """
+ Collect connection informations for unencrypted channel
+ """
+ self._prepare_connection(host, port, user, password)
+ self.smtp = smtplib.SMTP()
+
+ def add_to_recipient(self, recipient):
+ """
+ Add a recipient to "To:" list
+ """
+ self.message.mail_to.append(recipient)
+
+ def add_cc_recipient(self, recipient):
+ """
+ Add a recipient to "Cc:" list
+ """
+ self.message.mail_cc.append(recipient)
+
+ def add_bcc_recipient(self, recipient):
+ """
+ Add a recipient to "Bcc:" list
+ """
+ self.message.mail_bcc.append(recipient)
+
+ def set_subject(self, subj):
+ """
+ Set email subject
+ """
+ self.message.subject = subj
+
+ def set_from(self, from_recipient):
+ """
+ Set from address of message and envelope
+ """
+ self.message.mail_from = from_recipient
+
+ def set_body(self, body):
+ """
+ Set email body
+ """
+ self.message.body = body
+
+ def set_random_body(self, size):
+ """
+ Set a random body of <size> length
+ """
+ body = ''
+ for i in range(0, size):
+ body += ''.join(random.choice(string.uppercase + string.digits))
+ if i % 80 == 0:
+ body += "\n"
+ self.message.body = body
+
+ def add_attachment(self, attach):
+ """
+ Add attachment to a list of filenames
+ """
+ self.message.attachments.append(attach)
+
+ def add_header(self, name, value):
+ """
+ Add a custom header to headers list
+ """
+ self.message.headers[name] = value
+
+ def connect(self):
+ '''
+ Open connection to server
+ Returns tuple (smtp status code, message)
+ '''
+ return self.smtp.connect(self.host, self.port)
+
+ def present_client_as(self, client_hostname):
+ '''
+ Set helo/ehlo client identity
+ '''
+ self.client_hostname = client_hostname
+
+ def helo(self):
+ '''
+ Send HELO command
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.helo(self.client_hostname)
+ logger.info(result)
+ return result
+
+ def ehlo(self):
+ '''
+ Send EHLO command
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.ehlo(self.client_hostname)
+ logger.info(result)
+ return result
+
+ def get_esmtp_features(self):
+ '''
+ Returns hashmap with ESMTP feature received with EHLO
+ '''
+ logger.info(self.smtp.esmtp_features)
+ return self.smtp.esmtp_features
+
+ def logins(self):
+ try:
+ '''
+ Login user
+ Returns tuple (smtp status code, message)
+ '''
+ logger.info("Login with user " + self.user + " and password " + self.password)
+ '''try:
+ subuser=bytes.decode(self.user)
+ subpassword=bytes.decode(self.password)
+ result = self.smtp.login(subuser.encode('ascii'), subpassword.encode('ascii'))
+ logger.info(result)
+ return result
+ except:
+ logger.info("本身就是str类型不需要bytes to str!")
+ subuser=str(self.user).encode('ascii')
+ subpassword=str(self.password).encode('ascii')
+ result = self.smtp.login(subuser, subpassword)
+ logger.info(result)
+ return result'''
+ result = self.smtp.login(self.user, self.password)
+ logger.info(result)
+ return "mail_success"
+ except:
+ return "mail_fail"
+
+
+ def starttls(self, keyfile=None, certfile=None):
+ '''
+ sends STARTTLS
+ optional: keyfile certfile
+ Returns tuple (smtp status code, message)
+ '''
+ logger.info("STARTTLS")
+ if keyfile is None and certfile is None:
+ result = self.smtp.starttls()
+ else:
+ result = self.smtp.starttls(keyfile, certfile)
+ logger.info(result)
+ return result
+
+ def data(self):
+ '''
+ Data command send email body with "MAIL FROM:", "RCPT TO:" and "DATA" commands
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.mail(self.message.mail_from)
+ result += self.smtp.rcpt(self.message.get_message_recipients())
+
+ result += self.smtp.data(self.message.get_message_as_string())
+ logger.info(result)
+ return result
+
+ def sendmail(self):
+ '''
+ Send email with "MAIL FROM:", "RCPT TO:" and "DATA" commands
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.sendmail(self.message.mail_from, self.message.get_message_recipients(), self.message.get_message_as_string())
+ logger.info(result)
+ return result
+
+ def quit(self):
+ '''
+ Send QUIT command
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.quit()
+ logger.info(result)
+ return result
+
+ def close_connection(self):
+ '''
+ Close connection to server
+ '''
+ return self.smtp.close()
+
+ def send_message(self):
+ """
+ Send the message, from connection establishment to quit and close connection.
+ All the connection and email parameters must be already set before invocation.
+ Returns sendmail response (code, message)
+ """
+
+ # Send the message
+ try:
+ self.connect()
+
+ if self.user is not None:
+ self.ehlo()
+ self.logins()
+
+ send_result = self.sendmail()
+
+ self.quit()
+ self.close_connection()
+ # return send_result
+ return "mail_success"
+ except:
+ return "mail_fail"
+
+ @keyword('Send Message With All Parameters')
+ def send_message_full(self, host, user, password, subj,
+ from_recipient, to_recipient, cc_recipient=None, bcc_recipient=None,
+ body=None, attach=None):
+ """
+ Send a message specifing all parameters on the same linecc
+ cc, bcc and attach parameters may be strings or array of strings
+ host, user, password, subj, fromadd, toadd - are mandatory parameters
+ to use the optional paramaters pleas specify the name fo the parameter in the call
+ user and password even if mandatory could be set to None so no authentication will be made
+ Example:
+ sendMail("smtp.mail.com", None, None, "The subject", "[email protected]", "[email protected]", body="Hello World body")
+
+ sendMail("smtp.mail.com", "scott", "tiger", "The subject", "[email protected]", "[email protected]", body="Hello World body", attach=attaches
+ where could be:
+ attaches = ["c:\\desktop\\file1.zip", "c:\\desktop\\file2.zip"] or
+ attaches = "c:\\desktop\\file1.zip"
+ Returns sendmail response (code, message)
+ """
+
+ self.host = host
+ self.user = user
+ self.password = password
+
+ self.set_subject(subj)
+ self.set_from(from_recipient)
+ self.message.mail_to = to_recipient
+ if cc_recipient != None:
+ self.message.mail_cc = cc_recipient
+ if bcc_recipient != None:
+ self.message.mail_bcc = bcc_recipient
+ #Fill the message
+ if body != None:
+ self.set_body(body)
+ # Part two is attachment
+ if attach != None:
+ attachlist = ast.literal_eval(attach)
+ self.message.attachments = attachlist
+ #logger.info("self.message.attachments:"+str(type(self.message.attachments)))
+ #logger.info("attachtype:"+str(type(attachlist)))
+ #logger.info("attachlist:"+str(attachlist))
+
+ return self.send_message()
+
+
+ class _MailMessage:
+ """
+ Simplified email message
+ This class represent email headers and payload content, not envelope data
+ """
+
+ def __init__(self):
+ """
+ init object variables
+ """
+ self.mail_from = None
+ self.mail_to = []
+ self.mail_cc = []
+ self.mail_bcc = []
+ self.subject = ''
+ self.body = ''
+ self.attachments = []
+ self.headers = {}
+
+ def get_message_recipients(self):
+ '''
+ Get all message recipients (to, cc, bcc)
+ '''
+ recipients = []
+ tolist = ast.literal_eval(self.mail_to)
+ cclist = ast.literal_eval(self.mail_cc)
+ bcclist = ast.literal_eval(self.mail_bcc)
+ recipients.extend(tolist)
+ recipients.extend(cclist)
+ recipients.extend(bcclist)
+ #logger.info("recipientslist:"+str(recipients))
+ return recipients
+
+ def get_message_as_string(self):
+ '''
+ Get message as string to be sent with smtplib.sendmail api
+ '''
+ if len(self.attachments) > 0:
+ #logger.info("attachments:"+str(self.attachments))
+ #logger.info("attachmentstype:"+str(type(self.attachments)))
+ #logger.info("attachmentsnum:"+str(len(self.attachments)))
+
+ envelope = MIMEMultipart()
+ envelope.attach(MIMEText(self.body))
+ else:
+ envelope = MIMEText(self.body)
+
+ recipients = self.get_message_recipients()
+
+
+ tolist = ast.literal_eval(self.mail_to)
+ cclist = ast.literal_eval(self.mail_cc)
+ envelope['From'] = self.mail_from
+ envelope['To'] = COMMASPACE.join(tolist)
+ envelope['Cc'] = COMMASPACE.join(cclist)
+ envelope['Subject'] = self.subject
+ #logger.info("envelope111:"+str(self.attachments))
+ for attachment in list(self.attachments):
+ ctype, encoding = mimetypes.guess_type(attachment)
+ #logger.info("attachment:"+attachment+" ctype:"+str(ctype)+" encoding:"+str(encoding))
+ if ctype is None or encoding is not None:
+ # No guess could be made, or the file is encoded (compressed), so
+ # use a generic bag-of-bits type.
+ ctype = 'application/octet-stream'
+ maintype, subtype = ctype.split('/', 1)
+ #logger.info("maintype:"+str(maintype)+" subtype:"+str(subtype))
+
+ msg = None
+ if maintype == 'text':
+ attach_file = open(attachment,'rb')
+ # TODO: we should handle calculating the charset
+ msg = MIMEText(attach_file.read(), _subtype=subtype, _charset='utf-8')
+ attach_file.close()
+ elif maintype == 'image':
+ attach_file = open(attachment, 'rb')
+ msg = MIMEImage(attach_file.read(), _subtype=subtype)
+ attach_file.close()
+ elif maintype == 'audio':
+ attach_file = open(attachment, 'rb')
+ msg = MIMEAudio(attach_file.read(), _subtype=subtype)
+ attach_file.close()
+ else:
+ attach_file = open(attachment, 'rb')
+ msg = MIMEBase(maintype, subtype)
+ msg.set_payload(attach_file.read())
+ attach_file.close()
+ # Encode the payload using Base64
+ encoders.encode_base64(msg)
+
+ # Set the filename parameter
+ msg.add_header('Content-Disposition', 'attachment',
+ filename=os.path.basename(attachment))
+ envelope.attach(msg)
+
+
+ #logger.info("envelope.as_string:"+envelope.as_string())
+ return envelope.as_string()
diff --git a/keyword/common/customlibrary/Smtp3Library/version.py b/keyword/common/customlibrary/Smtp3Library/version.py
new file mode 100644
index 0000000..8f9fcb8
--- /dev/null
+++ b/keyword/common/customlibrary/Smtp3Library/version.py
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# This file is part of robotframework-Smtp3Library.
+# https://github.io/lucamaro/robotframework-SmtpLibrary
+
+# Licensed under the Apache License 2.0 license:
+# http://www.opensource.org/licenses/Apache-2.0
+# Copyright (c) 2016, Luca Maragnani <[email protected]>
+
+__version__ = '0.1.3' # NOQA
diff --git a/keyword/common/customlibrary/ipandstring/__init__.py b/keyword/common/customlibrary/ipandstring/__init__.py
new file mode 100644
index 0000000..e7adf5c
--- /dev/null
+++ b/keyword/common/customlibrary/ipandstring/__init__.py
@@ -0,0 +1,4 @@
+from ipandstring.stringip import stringandip
+
+class ipandstring(stringandip):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL' \ No newline at end of file
diff --git a/keyword/common/customlibrary/ipandstring/stringip.py b/keyword/common/customlibrary/ipandstring/stringip.py
new file mode 100644
index 0000000..0ce0c44
--- /dev/null
+++ b/keyword/common/customlibrary/ipandstring/stringip.py
@@ -0,0 +1,68 @@
+import random
+import struct
+import socket
+
+class stringandip (object):
+ def __init__(self):
+ pass
+ def ipv4(self,m, n, x):
+ if m == '-1':
+ m = random.randint(0, 255)
+ if n == '-1':
+ n = random.randint(0, 255)
+ if x == '-1':
+ x = random.randint(0, 255)
+ y = random.randint(0, 255)
+ print(str(m) + '.' + str(n) + '.' + str(x) + '.' + str(y))
+ return str(m) + '.' + str(n) + '.' + str(x) + '.' + str(y)
+
+ def dec2hex(self,string_num):
+ base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'), ord('A') + 6)]
+ num = int(string_num)
+ mid = []
+ while True:
+ if num == 0: break
+ num, rem = divmod(num, 16)
+ mid.append(base[rem])
+
+ return ''.join([str(x) for x in mid[::-1]])
+
+ def ipv6(self):
+ ipInt = random.randint(0, 400000000000000000000000000000000000)
+ ipStr = ''
+ leftValue = ipInt
+
+ for i in [7, 6, 5, 4, 3, 2, 1, 0]:
+ string_num = leftValue / 65536 ** i
+ base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'), ord('A') + 6)]
+ num = int(string_num)
+ mid = []
+ while True:
+ if num == 0: break
+ num, rem = divmod(num, 16)
+ mid.append(base[rem])
+
+ ipTokenInt = ''.join([str(x) for x in mid[::-1]])
+ if (ipTokenInt == ''):
+ ipTokenInt = 0
+ ipStr = ipStr + str(ipTokenInt)
+ if i != 0:
+ ipStr = ipStr + ':'
+ leftValue %= 65536 ** i
+ print(ipStr)
+ return ipStr
+
+ def getstring(self,randomlength=16,base_str='ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'):
+ """
+ 生成一个指定长度的随机字符串
+ """
+ random_str = ''
+ #base_str = 'ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
+ length = len(base_str) - 1
+ for i in range(randomlength):
+ random_str += base_str[random.randint(0, length)]
+ print(random_str)
+ return random_str
+# if __name__ == '__main__':
+# ipandstring = ipandstring()
+# print(ipandstring.ipv6()) \ No newline at end of file
diff --git a/keyword/common/file_operation.robot b/keyword/common/file_operation.robot
new file mode 100644
index 0000000..ee5ebed
--- /dev/null
+++ b/keyword/common/file_operation.robot
@@ -0,0 +1,36 @@
+*** Settings ***
+Library String
+Library json
+Library OperatingSystem
+Library RequestsLibrary
+Library Selenium2Library
+Library Collections
+Library FileLibrary
+Resource common.robot
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+InsertPolicyIdToFile
+ [Arguments] ${key} ${policyId} ${objectids}
+ ${dict} Create Dictionary policyId=${policyId} objectId=${objectids}
+ ${json} json.Dumps ${dict}
+ Alter Dict ${path}/variable/AllFlowCaseVariable.txt ${key} ${json}
+
+InsertTimeToFile
+ [Arguments] ${key} ${starttime} ${endtime}
+ ${value} json.Loads ${${key}}
+ ${dict} Create Dictionary policyId=${value}[policyId] objectId=${value}[objectId] starttime=${starttime} endtime=${endtime}
+ ${json} json.Dumps ${dict}
+ Alter Dict ${path}/variable/AllFlowCaseVariable.txt ${key} ${json}
+
+InsertObjectIdToFile
+ [Arguments] ${key} ${objectids}
+ Alter Dict ${path}/all_flow_case_variable.txt ${key} ${objectids}
+
+InsertStartTimeToFile
+ [Arguments] ${key} ${starttime}
+ Alter Dict ${path}/all_flow_case_variable1.txt ${key} ${starttime}
+
+InsertReportToFile
+ [Arguments] ${key} ${objectids}
+ Alter Dict ${path}/ReportCaseVariable.txt ${key} ${objectids}
diff --git a/keyword/common/functional_keywords.robot b/keyword/common/functional_keywords.robot
new file mode 100644
index 0000000..d49c862
--- /dev/null
+++ b/keyword/common/functional_keywords.robot
@@ -0,0 +1,260 @@
+*** Settings ***
+Library String
+Library OperatingSystem
+Library RequestsLibrary
+Library Selenium2Library
+Library Collections
+Resource common.robot
+Resource api_request.robot
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+QueryPolicyFile
+ [Arguments] ${url} ${suffix}
+ ${content_quary} GetRequest1 ${url}?isValid=1&${suffix}
+ ${msg_quary} Set Variable ${content_quary['msg']}
+ ${length} Get Length ${content_quary['data']['list']}
+ Should Be True ${length}>0
+ Log quary operation:${msg_quary}
+ Log data:${content_quary['data']['list']}
+
+QueryPolicyFile2
+ [Arguments] ${url} ${suffix}
+ ${content_quary} GetRequest1 ${url}?${suffix}
+ ${msg_quary} Set Variable ${content_quary['msg']}
+ ${length} Get Length ${content_quary['data']['list']}
+ Should Be True ${length}>0
+ Log quary operation:${msg_quary}
+ Log data:${content_quary['data']['list']}
+ ${certId} Set Variable ${content_quary['data']['list'][0]['certId']}
+ [Return] ${certId}
+
+CreatePolicyFile
+ [Documentation]
+ ... 必传参数:url、filePath(文件路径)、fileName(文件名称)
+ ... 可选参数:header(不传时使用默认值)
+ [Arguments] ${url} ${filePath} ${fileName} @{header}
+ ${suffix} Generate Random String
+ ${certName} Catenate SEPARATOR=_ test ${suffix}
+ ${header} Run Keyword If ${header}==[] Set Variable {"isValid":1,"opAction":"add","certName":"${certName}","certId":null,"returnData":1}
+ ... ELSE Get From List ${header} 0
+
+ ${binFile} Evaluate open(r"${path}/${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${fileDesc} Create Dictionary File-Desc=${header}
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${cerId} Set Variable ${list[0]['certId']}
+ ${certName} Set Variable ${list[0]['certName']}
+ ${response} Create Dictionary msg=${msg} certId=${cerId} certName=${certName}
+ Log add operation:${msg}
+ Log cerId:${cerId}
+ [Return] ${response}
+
+CreatePolicyFile2
+ [Documentation]
+ ... 必传参数:url、filePath(文件路径)、fileName(文件名称)、flag(模块标识)
+ ... 可选参数:header(不传时使用默认值)
+ [Arguments] ${url} ${filePath} ${fileName} ${flag} @{header}
+ ${suffix} Generate Random String
+ ${randomName} Catenate SEPARATOR=_ test ${suffix}
+ ${value} Run Keyword If '${flag}'=='resPages' Set Variable {"isValid":1,"format":"html","opAction":"add","profileName":"${randomName}","profileId":null,"returnData":1}
+ ... ELSE IF '${flag}'=='hijack' Set Variable {"isValid":1,"contentType":"text/html","opAction":"add","profileName":"${randomName}","contentName":"${fileName}","profileId":null,"returnData":1}
+ ... ELSE IF '${flag}'=='insert' Set Variable {"isValid":1,"format":"js","insertOn":"after_page_load","opAction":"add","profileName":"${randomName}","profileId":null,"returnData":1}
+ ... ELSE IF '${flag}'=='insertcss' Set Variable {"isValid":1,"format":"css","insertOn":"after_page_load","opAction":"add","profileName":"${randomName}","profileId":null,"returnData":1}
+
+ ${header} Run Keyword If ${header}==[] Set Variable ${value}
+ ... ELSE Get From List ${header} 0
+
+ ${binFile} Evaluate open(r"${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${suffix} Generate Random String
+ ${profileName} Catenate SEPARATOR=_ test ${suffix}
+ ${fileDesc} Create Dictionary File-Desc=${header}
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreatePolicyFile3
+ [Documentation]
+ ... 必传参数:url
+ ... 可选参数:data(不传时使用默认值)
+ [Arguments] ${url} @{data}
+ ${suffix} Generate Random String
+ ${profileName} Catenate SEPARATOR=_ test ${suffix}
+ ${data} Run Keyword If ${data}==[] Set Variable {"opAction":"add","returnData":1,"trafficMirrorList":[{"profileName":"${profileName}","addrType":"mac","isValid":1,"addrArray":["00:A1:B2:06:C3:29"]}]}
+ ... ELSE Get From List ${data} 0
+
+ ${content} PostRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreatePolicyFileNoFile
+ [Documentation]
+ ... 必传参数:url
+ ... 可选参数:data(不传时使用默认值)
+ [Arguments] ${url} ${requestbody}
+ ${content} PostRequest1 ${url} ${requestbody}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreatePolicyMutipartFile
+ [Arguments] ${url} ${filePath} ${pubFileName} ${priFileName} ${keyringType} @{header}
+ [Documentation] 必传参数:url、filePath(文件路径)、pubFileName(证书文件名)、priFileName(私钥文件名),keyringType(证书类型)
+ ... 可选参数:header(不传时使用默认值)
+ ${suffix} Generate Random String
+ ${certName} Catenate SEPARATOR=_ test ${suffix}
+ ${header} Run Keyword If ${header}==[] Set Variable {"isValid":1,"opAction":"add","returnData":1,"keyringName":"${certName}","keyringType":"${keyringType}","reissueExpiryHour":0,"crl":"null","publicKeyAlgo":"rsa1024","keyringId":null,"includeRoot":0}
+ ... ELSE Get From List ${header} 0
+ ${pubFile} Evaluate open(r"${path}/${filePath}${pubFileName}",'rb')
+ ${priFile} Evaluate open(r"${path}/${filePath}${priFileName}",'rb')
+ ${fileDict} Create Dictionary publicFile ${pubFile}
+ Set To Dictionary ${fileDict} privateFile ${priFile}
+ ${requestData} Create Dictionary name="publicFile" filename="${pubFileName}" Content-Type=application/octet-stream
+ Set To Dictionary ${requestData} name privateFile
+ Set To Dictionary ${requestData} filename ${priFileName}
+ Set To Dictionary ${requestData} Content-Type application/octet-stream
+ ${fileDesc} Create Dictionary File-Desc=${header}
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${keyringId} Set Variable ${list[0]['keyringId']}
+ ${keyringName} Set Variable ${list[0]['keyringName']}
+ ${response} Create Dictionary msg=${msg} keyringId=${keyringId} keyringName=${keyringName}
+ Log add operation:${msg}
+ Log keyringId:${keyringId}
+ [Return] ${response}
+
+UpdatePolicyMutipartFile
+ [Arguments] ${url} ${filePath} ${pubFileName} ${priFileName} ${reqHeader}
+ ${pubFile} Evaluate open(r"${path}/${filePath}${pubFileName}",'rb')
+ ${priFile} Evaluate open(r"${path}/${filePath}${priFileName}",'rb')
+ ${fileDict} Create Dictionary publicFile ${pubFile}
+ Set To Dictionary ${fileDict} privateFile ${priFile}
+
+ ${requestData} Create Dictionary name="publicFile" filename="${pubFileName}" Content-Type=application/octet-stream
+ Set To Dictionary ${requestData} name privateFile
+ Set To Dictionary ${requestData} filename ${priFileName}
+ Set To Dictionary ${requestData} Content-Type application/octet-stream
+
+ ${fileDesc} Create Dictionary File-Desc ${reqHeader}
+ ${content} UpFilePutRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ Log update operation:${msg}
+ Log update condition:${reqHeader}
+
+UpdatePolicyFile
+ [Arguments] ${url} ${filePath} ${fileName} ${reqHeader}
+ ${binFile} Evaluate open(r"${path}/${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${fileDesc} Create Dictionary File-Desc=${reqHeader}
+ ${content} UpFilePutRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ Log update operation:${msg}
+ Log update condition:${reqHeader}
+
+UpdatePolicyFile2
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Put Request api ${url} data=${data} headers=${header}
+ ${response} to json ${remoteResponse.content}
+ Should Be Equal As Strings ${remoteResponse.status_code} 200
+ ${msg} Set Variable ${response['msg']}
+ Log update operation:${msg}
+ Log update condition:${data}
+
+DeletePolicyFile
+ [Arguments] ${url} ${data}
+ ${content} DeleteRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ Log delete operation:${msg}
+ Log delete condition:${data}
+
+TeardownDelete
+ [Arguments] ${url} ${key} ${value}
+ ${ids} Create List ${value}
+ ${data} Create Dictionary ${key}=${ids}
+ ${content} DeleteRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ Log teardown operation:${msg}
+ Log teardown condition:${data}
+
+
+CreatePolicyFile4
+ [Arguments] ${url} ${filePath} ${fileName} ${objectDict}
+ [Timeout]
+ ${binFile} Evaluate open(r"${path}/${filePath}${fileName}",'rb')
+ ${fileDict} Create Dictionary file=${binFile}
+ ${requestData} Create Dictionary name="file" filename="${fileName}" Content-Type=application/octet-stream
+ ${suffix} Generate Random String
+ ${profileName} Catenate SEPARATOR=_ test ${suffix}
+ log ${objectDict}
+ ${string} Convert To String ${objectDict}
+ ${fileDesc} Create Dictionary File-Desc=${string}
+ log ${fileDesc}[File-Desc]
+ ${content} UpFilePostRequest ${url} ${requestData} ${fileDict} ${fileDesc}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+
+CreateRequest
+ [Arguments] ${url} ${data}
+ [Documentation] 必传参数:url
+ ... 可选参数:data(不传时使用默认值)
+ ${content} PostRequest1 ${url} ${data}
+ ${msg} Set Variable ${content['msg']}
+ ${list} Set Variable ${content['data']['list']}
+ ${profileId} Set Variable ${list[0]['profileId']}
+ ${profileName} Set Variable ${list[0]['profileName']}
+ ${response} Create Dictionary msg=${msg} profileId=${profileId} profileName=${profileName}
+ Log add operation:${msg}
+ Log profileId:${profileId}
+ [Return] ${response}
+DeletePolicyFile1
+ [Arguments] ${url} ${profileId}
+ #删除文件
+ log todeleteobj
+ ${response} BaseDeleteRequest ${url} {"profileIds":[${profileId}]}
+ ${response_code} Get From Dictionary ${response} code
+ Should Be Equal As Strings ${response_code} 200
+ ${response} Convert to String ${response}
+ log ${response}
+DeleteProfileByIds
+ [Arguments] ${typeUrl} ${profileIds}
+ #删除对象
+ log DeleteProfile
+ ${response} BaseDeleteRequest /${version}/policy/profile/${typeUrl} {"profileIds":[${profileIds}]}
+ ${response_code} Get From Dictionary ${response} code
+ #log aaaaaaaaaa:${response_code}
+ Should Be Equal As Strings ${response_code} 200
+ #Integer ${response_code} 200
+ ${response} Convert to String ${response}
+ log ${response} \ No newline at end of file
diff --git a/keyword/common/log_variable.robot b/keyword/common/log_variable.robot
new file mode 100644
index 0000000..dc12fb2
--- /dev/null
+++ b/keyword/common/log_variable.robot
@@ -0,0 +1,146 @@
+*** Settings ***
+Resource ../../variable/common_variable.txt
+Resource logschema.robot
+Library REST http://${host}:${port}
+Library RequestsLibrary
+Library OperatingSystem
+Library Collections
+Library string
+Library customlibrary/Custometest/log_contrast.py
+
+
+*** Keywords ***
+GetLogSchemaByType
+ [Documentation] 根据日志类型获取对应schema
+ ... ${logType}日志类型:
+ ... security_event_log:安全策略
+ ... proxy_event_log:代理策略
+ ... connection_record_log: 协议采集日志
+ ... radius_record_log radius:采集日志
+ ... active_defence_event_log:主动防御日志
+ ... voip_record_log:voip协议日志
+ ... transaction_record_log:事务日志
+ ... live_session_record_log:活跃会话日志
+ ... gtpc_record_log:gtp协议日志
+ ...
+ [Arguments] ${logType}
+ Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"}
+ &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logType}
+ log ${logType}
+ #Output Schema response body
+ Object response body
+ #Integer $.code 200
+ log ${LogSchemaResponse.body['data']}
+ #${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ [Return] ${LogSchemaResponse.body['data']}
+
+GetLogContentByKeyword
+ [Documentation] 根据关键字重schema获取对应内容
+ ... ${logSchema}schema内容
+ ... security_event_log:安全策略
+ ... proxy_event_log:代理策略
+ ... connection_record_log: 协议采集日志
+ ... radius_record_log radius:采集日志
+ ... active_defence_event_log:主动防御日志
+ ... voip_record_log:voip协议日志
+ ... transaction_record_log:事务日志
+ ... live_session_record_log:活跃会话日志
+ ... gtpc_record_log:gtp协议日志
+ ...
+ [Arguments] ${logSchema} ${keyword}
+ ${keyword} Run Keyword If "${keyword}"=="field" Set Veriable ${LogSchemaResponse.body['data']['fields']}
+ ... ELSE IF "${keyword}"=="field" Set Veriable ${LogSchemaResponse.body['data']['fields']}
+ ... ELSE Set Veriable ${LogSchemaResponse.body['data']['fields']}
+ ${content} Evaluate json.dumps(eval(str(${keyword}))) json
+ #${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ [Return] ${content}
+
+OrganizeLogCondition
+ [Documentation] 根据条件list组织条件
+ [Arguments] ${logname} ${startTime} ${endTime} ${field} ${filter}=
+ #${logname} ${startTime} ${endTime} ${client_ip} ${policy_id}
+ ${pageSize} Set Variable 30
+ ${pageNo} Set Variable 1
+ ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}", "filter":"${filter}"}
+ log this time query condition:${logCondition}
+ [Return] ${filds}
+
+LogVeriable
+ [Arguments] ${logType} ${startTime} ${endTime} ${filter}
+ ${logSchema} GetLogSchemaByType ${logType}
+ ${filds} GetLogContentByKeyword ${logSchema} field
+ ${logCondition} OrganizeLogCondition ${logname} ${startTime} ${endTime} ${filds} ${filter}
+ #common_client_ips":"${client_ip}" and "common_policy_ids":"${policy_id}
+ ${logs} GetLogList ${logType} ${logCondition}
+ ${returnvalue} log_contrast ${logs} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${trueorfalse} Run Keyword If "${returnvalue}"=="true" set variable true
+ ... ELSE set variable false
+ Run Keyword If "${returnvalue}"=="true" Exit for loop
+ [Return] ${trueorfalse}
+
+GetLogList
+ [Arguments] ${logType} ${logCondition}
+ log ${logCondition}
+ ${LogListResponse} PostRemoteData /${version}/log/list ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${responseCode} Get From Dictionary ${returnData} code
+ Log ${responseCode}
+ Should Be Equal ${responseCode} ${200} security_event_log \ \ test query list failed
+ log this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ ${a} Set Variable this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ log ${a}
+ ${log} Set Variable ${LogListResponse.json()}[data][list]
+ FOR ${logs} IN ${log}
+ log ${logs}
+ END
+ [Return] ${logs}
+
+GetLogList1
+ [Arguments] ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${logCondition} GetLogCondition ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id}
+ log ${logCondition}
+ ${LogListResponse} PostRemoteData /${version}/log/list ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${responseCode} Get From Dictionary ${returnData} code
+ Log ${responseCode}
+ Should Be Equal ${responseCode} ${200} security_event_log \ \ test query list failed
+ log this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ ${a} Set Variable this time request security_event_log \ table logRecord \ : ${LogListResponse.content}
+ log ${a}
+ ${log} Set Variable ${LogListResponse.json()}[data][list]
+ FOR ${logs} IN ${log}
+ log ${logs}
+ END
+ log %%%%%%%%%%%%%%%%%%%%%%%%${logs}
+ Should Contain ${logs}"" ${client_ip}
+ Should Contain ${logs}"" ${policy_id}
+ Should Contain ${logs}"" ${parmkey}
+ Should Contain ${logs}"${parmkey}" ${parmvalue}
+
+GetLogListSize
+ [Documentation]
+ ... 描述:ProxyPinning
+ ...
+ [Arguments] ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${logCondition} GetALLLogCondition ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} 10000 1
+ log ${logCondition}
+ ${LogListResponse} PostRemoteData /${version}/log/list ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${data} Get From Dictionary ${returnData} data
+ ${len} Get Length ${data}[list]
+ [Return] ${len}
+
+
+GetLogCount
+ [Arguments] ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} ${parmkey} ${parmvalue}
+ ${logCondition} GetALLLogCondition ${logType} ${startTime} ${endTime} ${client_ip} ${policy_id} 10000 1
+ ${LogListResponse} PostRemoteData /${version}/log/count ${logCondition}
+ Should Be Equal As Strings ${LogListResponse.status_code} 200
+ ${returnData} To Json ${LogListResponse.content}
+ ${len} Set Variable ${LogListResponse.json()}[data][total]
+ #${len} Get From Dictionary ${returnData} total
+ [Return] ${len}
+ \ No newline at end of file
diff --git a/keyword/common/login_logout.robot b/keyword/common/login_logout.robot
new file mode 100644
index 0000000..218ac31
--- /dev/null
+++ b/keyword/common/login_logout.robot
@@ -0,0 +1,180 @@
+*** Settings ***
+Resource ../../variable/common_variable.txt
+Library REST http://${host}:${port}
+Library Collections
+#Library SSHLibrary
+Library yaml
+#Library json
+Library OperatingSystem
+Resource ../policys/policy.robot
+Resource ../objects/object.robot
+Resource clear_data.robot
+
+*** Keywords ***
+InitPotocol
+ ${appDict} Create Dictionary
+ ${appVDict} Create Dictionary
+ Connect To Database Using Custom Params pymysql ${mysqlHost}
+ ${app_id} query SELECT group_id,low_boundary,region_name FROM tsg_obj_app_id WHERE is_valid=1 AND region_name='http' OR region_name='ssl' OR region_name='dns' OR region_name='ftp' OR region_name='mail' OR region_name='doh' OR region_name='rtp' OR region_name='sip'
+ # ... SELECT group_id,low_boundary,region_name FROM tsg_obj_app_id WHERE is_valid=1 and region_name in('http','ftp','https','ssl','dns','doh','quic','mail')
+ ${app_length} Get Length ${app_id}
+ FOR ${n} IN RANGE ${app_length}
+ log ${n}
+ Set To Dictionary ${appDict} ${app_id}[${n}][2]=${app_id}[${n}][0]
+ Set To Dictionary ${appVDict} ${app_id}[${n}][2]=${app_id}[${n}][1]
+ log ${appDict}
+ log ${appVDict}
+ END
+ Disconnect From Database
+ #供策略创建使用的appid
+ SET GLOBAL VARIABLE ${objprotol} ${appDict}
+ #供策略验证使用的appid
+ SET GLOBAL VARIABLE ${appportol} ${appVDict}
+ GetProtocol
+
+GetProtocol
+ ${HTTP_ID1} Get From Dictionary ${objprotol} http
+ ${RTP_ID1} Get From Dictionary ${objprotol} rtp
+ ${DNS_ID1} Get From Dictionary ${objprotol} dns
+ ${MAIL_ID1} Get From Dictionary ${objprotol} MAIL
+ ${FTP_ID1} Get From Dictionary ${objprotol} ftp
+ ${SIP_ID1} Get From Dictionary ${objprotol} sip
+ ${SSL_ID1} Get From Dictionary ${objprotol} ssl
+ ${DOH_ID1} Get From Dictionary ${objprotol} DoH
+ SET GLOBAL VARIABLE ${HTTP_ID} ${HTTP_ID1}
+ SET GLOBAL VARIABLE ${RTP_ID} ${RTP_ID1}
+ SET GLOBAL VARIABLE ${DNS_ID} ${DNS_ID1}
+ SET GLOBAL VARIABLE ${MAIL_ID} ${MAIL_ID1}
+ SET GLOBAL VARIABLE ${FTP_ID} ${FTP_ID1}
+ SET GLOBAL VARIABLE ${SIP_ID} ${SIP_ID1}
+ SET GLOBAL VARIABLE ${SSL_ID} ${SSL_ID1}
+ SET GLOBAL VARIABLE ${DOH_ID} ${DOH_ID1}
+
+ ${HTTP_VID1} Get From Dictionary ${appportol} http
+ ${RTP_VID1} Get From Dictionary ${appportol} rtp
+ ${DNS_VID1} Get From Dictionary ${appportol} dns
+ ${MAIL_VID1} Get From Dictionary ${appportol} MAIL
+ ${FTP_VID1} Get From Dictionary ${appportol} ftp
+ ${SIP_VID1} Get From Dictionary ${appportol} sip
+ ${SSL_VID1} Get From Dictionary ${appportol} ssl
+ ${DOH_VID1} Get From Dictionary ${appportol} DoH
+ SET GLOBAL VARIABLE ${HTTP_VID} ${HTTP_VID1}
+ SET GLOBAL VARIABLE ${RTP_VID} ${RTP_VID1}
+ SET GLOBAL VARIABLE ${DNS_VID} ${DNS_VID1}
+ SET GLOBAL VARIABLE ${MAIL_VID} ${MAIL_VID1}
+ SET GLOBAL VARIABLE ${FTP_VID} ${FTP_VID1}
+ SET GLOBAL VARIABLE ${SIP_VID} ${SIP_VID1}
+ SET GLOBAL VARIABLE ${SSL_VID} ${SSL_VID1}
+ SET GLOBAL VARIABLE ${DOH_VID} ${DOH_VID1}
+
+
+InitTemplate
+ #加载对象mode
+ ${YAML}= Get File ${path}/data/template/template.yaml
+ ${LOADED}= yaml.Safe Load ${YAML}
+ ${objMode} Get From Dictionary ${LOADED} ip_batch_mode
+ ${objList} Get From Dictionary ${objMode} objectList
+ #转json替换
+ ${toJson} json.Dumps ${objMode}
+ ${objList} json.Dumps ${objList}
+ SET GLOBAL VARIABLE ${objModeJson} ${toJson}
+ SET GLOBAL VARIABLE ${objListMode} ${objList}
+
+ #${YAML}= Get File ${path}/data/policy_template.yaml
+ #${LOADED}= yaml.Safe Load ${YAML}
+ ${policyMode} Get From Dictionary ${LOADED} policy_template
+ ${policyList} Get From Dictionary ${policyMode} policyList
+ ${toJson} json.Dumps ${policyMode}
+ ${policyList} json.Dumps ${policyList}
+ SET GLOBAL VARIABLE ${policyModeJson} ${toJson}
+ SET GLOBAL VARIABLE ${policyListMode} ${policyList}
+
+ApiLogin
+ [Tags]
+ # 毕方接口密码加密
+ GET /${version}/user/encryptpwd?password=${password}
+ Object response body
+ Integer $.code 200
+ #log ${rescode}
+ ${pwd} String $.data.encryptpwd
+ #log ${pwd}
+ ${pwdstr} Get From List ${pwd} 0
+ log ${pwdstr}
+ SET GLOBAL VARIABLE ${encodePassword} ${pwdstr}
+ log ${encodePassword}
+ #log ${username}
+ #log ${pwdstr}
+ POST /${version}/user/login?username=${username}&password=${encodePassword}&authMode=${authmode}
+ Object response body
+ #OUTPUT response body
+ Integer $.code 200
+ ${rescode} Integer $.code
+ log ${rescode}
+ ${tokenGlobal} String $.data.token
+ ${tokenStr} Get From List ${tokenGlobal} 0
+ log ${tokenStr}
+ SET GLOBAL VARIABLE ${token} ${tokenStr}
+ log ${token}
+ SET GLOBAL VARIABLE ${headers} {"Contest-Type":"application/json","Authorization":"${token}"}
+ #初始化接口中策略中引用协议和策略校验协议
+ Run Keyword If ${addPolicy}==1 InitPotocol
+ #加载对象mode
+ InitTemplate
+ #初始化删除参数
+ SET GLOBAL VARIABLE ${createObjectIds} ${EMPTY}
+ SET GLOBAL VARIABLE ${createPolicyIds} ${EMPTY}
+ #Return ${rescode}
+ApiLogout
+ [Tags] tsg_adc tsg_bf_api
+ POST /${version}/user/logout headers=${headers}
+ Object response body
+ Integer $.code 200
+ ${rescode} Integer $.code
+ #[Return] ${rescode}
+
+BifangLoginAndAddLocalIP
+ [Tags] tsg_adc tsg_bf_api
+ log ApiLoginAndAddLocalIP
+ ApiLogin
+ #log **********
+
+ Run Keyword If ${addTestClentIPFlag}==1 AddLocalIPObject
+ log ApiLoginAndAddLocalIP
+ #添加tsgUIAPI
+ Run Keyword If ${addTsgUIAPIFlag}==1 SecurityPolicyAllowTSGUIAPIAdd
+
+BifangLogoutAndDelLocalIP
+ [Tags] tsg_adc tsg_bf_api
+ log ApiLogoutAndDelLocalIP
+ log to_LogoutAndDelLocalIP_LogoutAndDelLocalIP
+ #删除tsgUIAPI 先删除白名单,是因为此策略也引用了本机IP
+ Run Keyword If ${addTsgUIAPIFlag}==1 SecurityPolicyAllowTSGUIAPIDEL
+ Run Keyword If ${addTestClentIPFlag}==1 DelLocalIPObject
+ #ApiDeleteAutoTagsCase
+ ApiLogout
+ #[Return] ${rescode}
+
+SecurityPolicyAllowTSGUIAPIAdd
+ [Tags] uiallow
+ log toAddTSGUIAPI
+ ${addItemList1} Create Dictionary isSession=endpoint ip=${host}/32 port=0-0 direction=0 protocol=0 isInitialize=0
+ #可以添加多个
+ ${addItemLists} Create list ${addItemList1}
+ #objectList对象
+ ${objectDict} Create Dictionary objectType=ip objectSubType=endpoint isValid=${1} addItemList=${addItemLists}
+ ${rescode} ${objectId} AddObjects ${1} ${objectDict}
+ SET GLOBAL VARIABLE ${testBifangIP} ${objectId}
+ ${HTTP_ID} Get From Dictionary ${objprotol} http
+ ${SSL_ID} Get From Dictionary ${objprotol} ssl
+ Comment 创建安全策略
+ ${policyDict} Create Dictionary policyName=SecurityPolicy-Allow-TSGUIAPI policyType=tsg_security policyDesc=autotest action=allow source=${testClentID}|TSG_SECURITY_SOURCE_ADDR destination=${objectId}|TSG_SECURITY_DESTINATION_ADDR userRegion={} isValid=${1} appIdObjects=${HTTP_ID},${SSL_ID}
+ log ${policyDict}
+ ${rescode} ${policyId} AddPolicies 1 ${policyDict} v2
+ SET GLOBAL VARIABLE ${testBifangPolicy} ${policyId}
+ log addTSGUIAPISucess
+SecurityPolicyAllowTSGUIAPIDEL
+ [Tags] uiallow
+ log toDelTSGUIAPI
+ ${objectIds} Create List ${testBifangIP}
+ DeletePolicyAndGroupObject ${testBifangPolicy} ${objectIds}
+ log delTSGUIAPISucess \ No newline at end of file
diff --git a/keyword/common/login_logout_switch.robot b/keyword/common/login_logout_switch.robot
new file mode 100644
index 0000000..38e9d7b
--- /dev/null
+++ b/keyword/common/login_logout_switch.robot
@@ -0,0 +1,25 @@
+*** Settings ***
+Resource ../../variable/common_variable.txt
+Library Collections
+Resource login_logout.robot
+#Resource logout.robot
+#Resource login.robot
+
+
+*** Keywords ***
+LoginAndAddLocalIP
+ [Tags] tsg_adc tsg_bf_api tsg_device tsg_adc_wp adc_api adc_verify adc_log
+ #[Tags] 分步骤之外的全流程 毕方接口 设备相关 分步骤全部 分步骤策略 分步骤功能端验证 分步骤日志验证
+ # 获取主机IP
+ #${ip} Get Host IP
+ #Run Keyword If '${ip}' != '${EMPTY}' Set Global Variable ${testClentIP} ${ip}
+
+ Run Keyword If '${loginType}' == 'api' BifangLoginAndAddLocalIP
+ #... ELSE IF '${loginType}' == 'cli' CliLogin
+ ... ELSE IF '${loginType}' != '${None}' UiLoginAndAddLocalIP
+
+LogoutAndDelLocalIP
+ [Tags] tsg_adc tsg_bf_api tsg_device tsg_adc_wp adc_api adc_verify adc_log
+ Run Keyword If '${loginType}' == 'api' BifangLogoutAndDelLocalIP
+ #... ELSE IF '${loginType}' == 'cli' CliLogout
+ ... ELSE IF '${loginType}' != '${None}' UiLogoutAndDelLocalIP
diff --git a/keyword/common/logschema.robot b/keyword/common/logschema.robot
new file mode 100644
index 0000000..f10094c
--- /dev/null
+++ b/keyword/common/logschema.robot
@@ -0,0 +1,69 @@
+*** Settings ***
+Resource ../../other/all_flow_case_variable.txt
+Library RequestsLibrary
+Library OperatingSystem
+Library Collections
+Library string
+Library REST http://${host}:${port}
+
+*** Keywords ***
+GetLogCondition
+ [Arguments] ${logname} ${startTime} ${endTime} ${client_ip} ${policy_id}
+ Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"}
+ &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logname}
+ log ${logname}
+ #Output Schema response body
+ Object response body
+ #Integer $.code 200
+ log ${LogSchemaResponse.body['data']}
+ ${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ log ${field}
+ ${pageSize} Set Variable 30
+ ${pageNo} Set Variable 1
+ ${condition} Set Variable [{"value":["${startTime}","${endTime}"],"symbol":"between","field":"common_recv_time","type":"timestamp"}]
+ ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}","conditions":${condition} ,"common_client_ips":"${client_ip}","common_policy_ids":"${policy_id}"}
+ log this time query condition:${logCondition}
+ [Return] ${logCondition}
+
+PostRemoteData
+ [Arguments] ${url} ${data}
+ ${header} Create Dictionary Content-Type=application/json Authorization=${token}
+ Create Session api http://${host}:${port} headers=${header}
+ ${remoteResponse} Post Request api ${url} data=${data} headers=${header}
+ [Return] ${remoteResponse}
+
+GetALLLogCondition
+ [Arguments] ${logname} ${startTime} ${endTime} ${client_ip} ${policy_id} ${pageSize} ${pageNo}
+ Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"}
+ &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logname}
+ log ${logname}
+ #Output Schema response body
+ Object response body
+ #Integer $.code 200
+ log ${LogSchemaResponse.body['data']}
+ ${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ log ${field}
+ #${pageSize} Set Variable 30
+ #${pageNo} Set Variable 1
+ ${condition} Set Variable [{"value":["${startTime}","${endTime}"],"symbol":"between","field":"common_recv_time","type":"timestamp"}]
+ ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}","conditions":${condition} ,"common_client_ips":"${client_ip}","common_policy_ids":"${policy_id}"}
+ log this time query condition:${logCondition}
+ [Return] ${logCondition}
+
+GetLogCountConditon
+ [Arguments] ${logname} ${startTime} ${endTime} ${client_ip} ${policy_id} ${pageSize} ${pageNo}
+ Set Headers {"Content-Type":"application/x-www-form-urlencoded","Authorization":"${token}"}
+ &{LogSchemaResponse}= GET /${version}/log/schema?logType=${logname}
+ log ${logname}
+ #Output Schema response body
+ Object response body
+ #Integer $.code 200
+ log ${LogSchemaResponse.body['data']}
+ ${field} Evaluate json.dumps(eval(str(${LogSchemaResponse.body['data']['fields']}))) json
+ log ${field}
+ #${pageSize} Set Variable 30
+ #${pageNo} Set Variable 1
+ ${condition} Set Variable [{"value":["${startTime}","${endTime}"],"symbol":"between","field":"common_recv_time","type":"timestamp"}]
+ ${logCondition} Set Variable {"pageNo":${pageNo},"pageSize":${pageSize},"logType":"${logname}","fields":${field},"start_common_recv_time":"${startTime}","end_common_recv_time":"${endTime}","conditions":${condition} ,"common_client_ips":"${client_ip}","common_policy_ids":"${policy_id}"}
+ log this time query condition:${logCondition}
+ [Return] ${logCondition} \ No newline at end of file
diff --git a/keyword/common/systemcommand.robot b/keyword/common/systemcommand.robot
new file mode 100644
index 0000000..cc96c8c
--- /dev/null
+++ b/keyword/common/systemcommand.robot
@@ -0,0 +1,63 @@
+*** Settings ***
+Library OperatingSystem
+Library Selenium2Library
+Library RequestsLibrary
+Library Collections
+Resource ../../variable/common_variable.txt
+
+*** Keywords ***
+SystemCommands
+ [Arguments] ${commandstr} ${stringlist}
+ log toSystemCommand_SystemCommandTest
+ ${commandreturn} OperatingSystem.Run ${commandstr}
+ Append To File ${path}/write_file.txt ${commandstr}
+ Append To File ${path}/write_file.txt %%%%%%%%%%%%%%newbat
+ Append To File ${path}/write_file.txt ${commandreturn}
+ ${listlenth}= Get Length ${stringlist}
+ FOR ${var} IN RANGE ${listlenth}
+ #log ${var}
+ Should Contain ${commandreturn} ${stringlist}[${var}]
+ END
+ ${rescode} Set Variable 200
+ log ${rescode}
+ [Return] ${rescode}
+
+SystemCommand
+ [Arguments] ${commandstr} @{stringlist}
+ log dxytest${commandstr}
+ ${commandreturn} OperatingSystem.Run ${commandstr}
+ #nslookup -d www.jd.com
+ log ${commandreturn}
+ FOR ${var} IN @{stringlist}
+ log dxytest
+ log ${var}
+ Should Contain ${commandreturn} ${var}
+ END
+ #Should Contain ${commandreturn} ${qatype}
+ ${rescode} Set Variable 200
+ log ${rescode}
+ [Return] ${rescode}
+
+
+SystemCommandReturnCompare
+ #执行命令并比对命令返回结果 需要执行的系统命令 命令返回结果要包含的字符串列表 命令返回结果不能包含的字符串列表
+ [Arguments] ${commandstr} ${stringlist} ${stringlistnotin}
+ log toSystemCommand_SystemCommandTest
+ ${commandreturn} OperatingSystem.Run ${commandstr}
+ Append To File ${path}/write_file.txt ${commandstr}
+ Append To File ${path}/write_file.txt %%%%%%%%%%%%%%newbat
+ Append To File ${path}/write_file.txt ${commandreturn}
+ #${commandreturn} Set Variable abcdeConnection was reset
+ ${listlenth}= Get Length ${stringlist}
+ FOR ${var} IN RANGE ${listlenth}
+ log ${var}
+ Should Contain ${commandreturn} ${stringlist}[${var}]
+ END
+ ${listnotin}= Get Length ${stringlistnotin}
+ FOR ${varn} IN RANGE ${listnotin}
+ log ${varn}
+ Should Not Contain ${commandreturn} ${stringlistnotin}[${varn}]
+ END
+ ${rescode} Set Variable 200
+ log ${rescode}
+ [Return] ${rescode}