summaryrefslogtreecommitdiff
path: root/04-CustomLibrary
diff options
context:
space:
mode:
Diffstat (limited to '04-CustomLibrary')
-rw-r--r--04-CustomLibrary/Custometest/MD5.py40
-rw-r--r--04-CustomLibrary/Custometest/__init__.py16
-rw-r--r--04-CustomLibrary/Custometest/certificate.yaml0
-rw-r--r--04-CustomLibrary/Custometest/cmd_cer.py164
-rw-r--r--04-CustomLibrary/Custometest/printlog.py11
-rw-r--r--04-CustomLibrary/FileLibrary/__init__.py7
-rw-r--r--04-CustomLibrary/FileLibrary/filetool.py27
-rw-r--r--04-CustomLibrary/Pop3Library/__init__.py200
-rw-r--r--04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pycbin0 -> 4589 bytes
-rw-r--r--04-CustomLibrary/Pop3Library/readme.txt26
-rw-r--r--04-CustomLibrary/Smtp4Library/__init__.py417
-rw-r--r--04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pycbin0 -> 11414 bytes
-rw-r--r--04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pycbin0 -> 11425 bytes
-rw-r--r--04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pycbin0 -> 207 bytes
-rw-r--r--04-CustomLibrary/Smtp4Library/version.py11
15 files changed, 919 insertions, 0 deletions
diff --git a/04-CustomLibrary/Custometest/MD5.py b/04-CustomLibrary/Custometest/MD5.py
new file mode 100644
index 0000000..697ae83
--- /dev/null
+++ b/04-CustomLibrary/Custometest/MD5.py
@@ -0,0 +1,40 @@
+# 由于MD5模块在python3中被移除
+# 在python3中使用hashlib模块进行md5操作
+
+import hashlib
+
+class MD5:
+ def MD5(data,langer,md5_types):
+ # 创建md5对象
+ # m = hashlib.md5()
+ # Tips
+ # 此处必须encode
+ # 若写法为m.update(str) 报错为: Unicode-objects must be encoded before hashing
+ # 因为python3里默认的str是unicode
+ # 或者 b = bytes(str, encoding='utf-8'),作用相同,都是encode为bytes
+ # b = str.encode(encoding='utf-8')
+ # m.update(b)
+ # str_md5 = m.hexdigest()
+ if langer == "英文":
+ # str_md5 = hashlib.md5(b'this is a md5 test.').hexdigest()
+ str_md5 = hashlib.md5("b'"+data+"'").hexdigest()
+ print('MD5加密前为 :' + data)
+ print('MD5加密后为 :' + str_md5)
+ return str_md5
+ elif langer == "中文":
+ str_md5 = hashlib.md5('你好'.encode(encoding=md5_types)).hexdigest()
+ return str_md5
+ # utf8 和gbk 加密结构不一样
+ # hashlib.md5('你好'.encode(encoding='GBK')).hexdigest()
+ # hashlib.md5('你好'.encode(encoding='GB2312')).hexdigest()
+ # hashlib.md5('你好'.encode(encoding='GB18030')).hexdigest()
+if __name__ == '__main__':
+ data = '小猪'
+ langer = '中文'
+ md5_types = 'GBK'
+ a =MD5(data,langer,md5_types)
+ print(a)
+ b=r'C:\Users\小猪\AppData\Local\Programs\Python\Python37\Lib\site-packages\custometest\MD5.py'
+ with open(b, encoding='utf-8') as f:
+ text = f.read()
+ print(text) \ No newline at end of file
diff --git a/04-CustomLibrary/Custometest/__init__.py b/04-CustomLibrary/Custometest/__init__.py
new file mode 100644
index 0000000..a4b4d83
--- /dev/null
+++ b/04-CustomLibrary/Custometest/__init__.py
@@ -0,0 +1,16 @@
+
+#-*- coding:utf-8 -*-
+'''
+ created by hch 2019-06-26
+'''
+
+from custometest.printlog import printlog
+from custometest.MD5 import MD5
+from custometest.cmd_cer import Order
+# from custometest.printlog import printlog
+
+
+__version__ = '1.0'
+
+class custometest(printlog,Order,MD5):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL'
diff --git a/04-CustomLibrary/Custometest/certificate.yaml b/04-CustomLibrary/Custometest/certificate.yaml
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/04-CustomLibrary/Custometest/certificate.yaml
diff --git a/04-CustomLibrary/Custometest/cmd_cer.py b/04-CustomLibrary/Custometest/cmd_cer.py
new file mode 100644
index 0000000..6e6e636
--- /dev/null
+++ b/04-CustomLibrary/Custometest/cmd_cer.py
@@ -0,0 +1,164 @@
+import os
+import subprocess
+from time import sleep
+
+
+class Order:
+ def CMD(self,data):
+ result = os.popen(data)
+ # res = result.read().encoding('GBK')
+ res = result.read()
+ result.close()
+ # res = res.decode("unicode-escape")
+ return res
+ def Linux(self):
+ pass
+ # 根据证书颁发者名字判断证书是否替换
+ def Cert_Verification(self,data):
+ c = []
+ print(1)
+ #with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo:
+ with open(r'certificate.yaml', 'r') as foo:
+ print(2)
+ for line in foo.readlines():
+ if data in line:
+ print(line)
+ c.append('证书已替换')
+ else:
+ pass
+ if '证书已替换' in c:
+ # print('证书已替换')
+ foo.close()
+ return '证书已替换'
+ else:
+ # print('证书未替换')
+ foo.close()
+ return '证书未替换'
+
+ def Content_Type(self,data):
+ d = []
+ with open('certificate.yaml', 'r') as foo:
+ for line in foo.readlines():
+ if data in line:
+ # print(line)
+ d.append('Content_Type已替换')
+ else:
+ pass
+ if 'Content_Type已替换' in d:
+ # print('证书已替换')
+ foo.close()
+ return 'Content_Type已替换'
+ else:
+ # print('证书未替换')
+ foo.close()
+ return 'Content_Type未替换'
+ # curl路由内容设置
+ def curl_name(self,data):
+ #curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
+ curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk'
+ return curl_name
+ # 控制器
+ def manu(self,url,Certificate):
+ # print(data['url'])
+ n = 0
+ while n != len(url):
+ b = self.curl_name(url[n])
+ d = self.CMD(b)
+ # print(d)
+ sleep(1)
+ if Certificate != "":
+ c =self.Cert_Verification(Certificate)
+ # f = self.Content_Type(data["Content_Type"])
+ sleep(1)
+ assert_cer = url[n]+c
+ # assert_Content_Type = data['Content_Type']+f
+ n+=1
+ return d,assert_cer
+
+ def FTP(self, ftp_type):
+ windows_path = os.getcwd()
+ linux_path = os.getcwd().replace('\\', '/')
+ # 判断FTP执行类型:(下载/登录)
+ if ftp_type == "下载":
+ # 调用cmd执行FTP下载文件
+ data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" -o zmmtext123.txt'
+ d = self.CMD(data)
+ sleep(5)
+ fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814
+ if fsize == 435814:
+ return "Success"
+ else:
+ return "Fail"
+ elif ftp_type == "登录":
+ data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+ # print(d)
+ if "Graphical (Xorg) program starter for ADRIANE" in d:
+ return "Success"
+ else:
+ return "Fail"
+ # FTP 下载
+ def FTP_down(self, ftp_url,file_size,fiel_name):
+ windows_path = os.getcwd()
+ linux_path = os.getcwd().replace('\\', '/')
+ # 判断FTP执行类型:(下载/登录)
+ # 调用cmd执行FTP下载文件
+ data = 'curl -m 20 '+ftp_url+ '-o '+ fiel_name + " ' "
+ print(data)
+ d = self.CMD(data)
+ sleep(5)
+ fsize = os.path.getsize(linux_path + "/"+fiel_name) # 435814
+ print(fsize)
+ if fsize == file_size:
+ return "Success"
+ else:
+ return "Fail"
+
+ # FTP 登录
+ def FTP_login(self, ftp_url,file_content):
+ data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk'
+ d = self.CMD(data)
+ # print(d)
+ if file_content in d:
+ return "Success"
+ else:
+ return "Fail"
+
+if __name__ == '__main__':
+ datas = {"url":['https://www.baidu.com'],
+"Certificate":"Tango Secure Gateway CA",
+# "Content_Type":"text/html",
+'log':'Security Event Logs',
+"sni":['baidu'],
+"intercept_code":"200",
+"log_code":"200",
+"certifucate":"1",
+"log_content":"true"
+}
+# data= {"url":['https://www.baidu.com'],
+# "Certificate":"Tango Secure Gateway CA"
+# }
+ # url = ['https://www.baidu.com']
+ # url = ['https://www.baidu.com']
+ # url = ['https://www.baidu.com']
+ # # Certificate1 = "Tango Secure Gateway CA"
+ # Certificate = "baidu"
+ # a='Tango Secure Gateway CA'
+ # s = Order()
+ # b = s.manu(url,Certificate)
+ # print(b[1])
+ # FTP下载 传入ftp的路径和文件大小
+ ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" '
+ ftp_size = 435814
+ ftp_issue = s.FTP_down(ftp_url,ftp_size)
+ # FTP登录 传入ftp的路径和文件内容
+ ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" '
+ file_content = "Graphical (Xorg) program starter for ADRIANE"
+ ftp_issue = s.FTP_login(ftp_url,file_content)
+ # for i in b:
+ # print(i)
+ # dd = s.CMD('curl -I https://www.baidu.com')
+ # print(dd)
+ # if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd:
+ # print("ok")
+ # a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk' \ No newline at end of file
diff --git a/04-CustomLibrary/Custometest/printlog.py b/04-CustomLibrary/Custometest/printlog.py
new file mode 100644
index 0000000..02f8ced
--- /dev/null
+++ b/04-CustomLibrary/Custometest/printlog.py
@@ -0,0 +1,11 @@
+
+#-*- coding:utf-8 -*-
+'''
+ created by hch 2019-06-26
+'''
+
+
+class printlog():
+
+ def printA():
+ print("hello word") \ No newline at end of file
diff --git a/04-CustomLibrary/FileLibrary/__init__.py b/04-CustomLibrary/FileLibrary/__init__.py
new file mode 100644
index 0000000..29db0ed
--- /dev/null
+++ b/04-CustomLibrary/FileLibrary/__init__.py
@@ -0,0 +1,7 @@
+#coding=utf-8
+from filetool import filetool
+
+version = '1.0'
+
+class FileLibrary(filetool):
+ ROBOT_LIBRARY_SCOPE = 'GLOBAL' \ No newline at end of file
diff --git a/04-CustomLibrary/FileLibrary/filetool.py b/04-CustomLibrary/FileLibrary/filetool.py
new file mode 100644
index 0000000..56eadcf
--- /dev/null
+++ b/04-CustomLibrary/FileLibrary/filetool.py
@@ -0,0 +1,27 @@
+#coding=utf-8
+import sys
+reload(sys)
+sys.setdefaultencoding("utf-8")
+
+""" 变量文件操作 """
+
+class filetool():
+ def __init__(self):
+ pass
+
+ def alter_dict(self, path, k, v):
+ data = ''
+ flag = True
+ key = '${%s}' % (k)
+ add = key + '\t%s' % (v) + '\n'
+ with open(path, 'r+') as f:
+ for line in f.readlines():
+ if(line.find(key + '\t') == 0):
+ line = add
+ flag = False
+ data += line
+ if(flag):
+ data += add
+ print data
+ with open(path, 'w+') as f:
+ f.writelines(data) \ No newline at end of file
diff --git a/04-CustomLibrary/Pop3Library/__init__.py b/04-CustomLibrary/Pop3Library/__init__.py
new file mode 100644
index 0000000..66219ac
--- /dev/null
+++ b/04-CustomLibrary/Pop3Library/__init__.py
@@ -0,0 +1,200 @@
+import poplib
+import base64
+import time
+from email.parser import Parser
+# 用来解析邮件主题
+from email.header import decode_header
+# 用来解析邮件来源
+from email.utils import parseaddr
+
+from robot.api.deco import keyword
+from robot.api import logger
+
+
+class AcceptEmail(object):
+
+ def __init__(self, user_email, password, pop3_server='serverDemon'):
+ self.user_email = user_email
+ self.password = password
+ self.pop3_server = pop3_server
+
+ self.connect_email_server()
+
+ def connect_email_server(self):
+ self.server = poplib.POP3(self.pop3_server)
+ # 打印POP3服务器的欢迎文字,验证是否正确连接到了邮件服务器
+ # print('连接成功 -- ', self.server.getwelcome().decode('utf8'))
+ # +OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0)
+
+ # 开始进行身份验证
+ self.server.user(self.user_email)
+ self.server.pass_(self.password)
+
+ def __del__(self):
+ # 关闭与服务器的连接,释放资源
+ self.server.close()
+
+ def get_email_count(self):
+ # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可
+ email_num, email_size = self.server.stat()
+ # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size))
+ return email_num
+
+ def receive_email_info(self, now_count=None):
+ # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可
+ email_num, email_size = self.server.stat()
+ # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size))
+ self.email_count = email_num
+ self.email_sumsize = email_size
+
+ # 使用list()返回所有邮件的编号,默认为字节类型的串
+ rsp, msg_list, rsp_siz = self.server.list()
+ # print(msg_list, '邮件数量',len(msg_list))
+ # print("服务器的响应: {0},\n消息列表: {1},\n返回消息的大小: {2}".format(rsp, msg_list, rsp_siz))
+ # print('邮件总数: {}'.format(len(msg_list)))
+ self.response_status = rsp
+ self.response_size = rsp_siz
+
+ # 下面获取最新的一封邮件,某个邮件下标(1开始算)
+ # total_mail_numbers = len(msg_list)
+
+ # 动态取消息
+ total_mail_numbers = now_count
+
+ rsp, msglines, msgsiz = self.server.retr(total_mail_numbers)
+ # rsp, msglines, msgsiz = self.server.retr(1)
+ # print("服务器的响应: {0},\n原始邮件内容: {1},\n该封邮件所占字节大小: {2}".format(rsp, msglines, msgsiz))
+
+ # 从邮件原内容中解析
+ msg_content = b'\r\n'.join(msglines).decode('utf-8')#gbk
+ msg = Parser().parsestr(text=msg_content)
+ self.msg = msg
+ # print('解码后的邮件信息:\n{}'.format(msg))
+
+ def recv(self, now_count=None):
+ self.receive_email_info(now_count)
+ self.parser()
+
+ def get_email_title(self):
+ subject = self.msg['Subject']
+ value, charset = decode_header(subject)[0]
+ if charset:
+ value = value.decode(charset)
+ # print('邮件主题: {0}'.format(value))
+ self.email_title = value
+
+ def get_sender_info(self):
+ hdr, addr = parseaddr(self.msg['From'])
+ # name 发送人邮箱名称, addr 发送人邮箱地址
+ name, charset = decode_header(hdr)[0]
+ if charset:
+ name = name.decode(charset)
+ self.sender_qq_name = name
+ self.sender_qq_email = addr
+ # print('发送人邮箱名称: {0},发送人邮箱地址: {1}'.format(name, addr))
+
+ def get_email_content(self):
+ content = self.msg.get_payload()
+ # 文本信息
+ content_charset = content[0].get_content_charset() # 获取编码格式
+ text = content[0].as_string().split('base64')[-1]
+ text_content = base64.b64decode(text).decode(content_charset) # base64解码
+ self.email_content = text_content
+ # print('邮件内容: {0}'.format(text_content))
+
+ # 添加了HTML代码的信息
+ content_charset = content[1].get_content_charset()
+ text = content[1].as_string().split('base64')[-1]
+ # html_content = base64.b64decode(text).decode(content_charset)
+
+ # print('文本信息: {0}\n添加了HTML代码的信息: {1}'.format(text_content, html_content))
+
+ def parser(self):
+ self.get_email_title()
+ self.get_sender_info()
+ #self.get_email_content()
+
+
+def get_new_mail(user_name, pwd, pop3_server, second=5):
+ t = AcceptEmail(user_name, pwd, pop3_server)
+ now_count = t.get_email_count()
+ #print('开启的时候的邮件数量为:%s' % now_count)
+ logger.info("开启的时候的邮件数量为:"+str(now_count))
+ # 每次需要重新连接邮箱服务器,才能获取到最新的消息
+ # 默认每隔5秒看一次是否有新内容
+ num = 0
+ while True:
+ obj = AcceptEmail(user_name, pwd, pop3_server)
+ count = obj.get_email_count()
+ if count > now_count:
+ new_mail_count = count - now_count
+ #print('有新的邮件数量:%s' % new_mail_count)
+ logger.info("有新的邮件数量:"+str(new_mail_count))
+ now_count += 1
+ obj.recv(now_count)
+
+ yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email}
+ #yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email,
+ # "email_content": obj.email_content}
+ if new_mail_count > 0:
+ return
+ # print('-' * 30)
+ # print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % (
+ # obj.email_title, obj.sender_qq_name, obj.sender_qq_email, obj.email_content))
+ # print('-' * 30)
+
+ #else:
+ #print('没有任何新消息.')
+ #logger.info("没有任何新消息.")
+ time.sleep(second)
+ num += 1
+ if num == 36:#等待时间粒度,一个粒度是5s,如果num=10意思就是等待接收邮件50s,若没有邮件就返回;36是等待3min
+ return
+
+
+@keyword('Recv Email')
+def recv_email(user_name, pwd, pop3_server, send_user, subj):
+ '''
+ 参数说明:
+ [user_name]:用户名
+ [pwd]:密码,第三方登入密码
+ [pop server]:pop服务器
+ [sender]:发送者邮箱,注意全称
+ [subj_sub]:主题的部分内容,这里是测主题是否包含该参数
+ [return]:返回值,成功返回success,失败返回n其他
+ 其他问题请阅读该库的readme.txt
+ '''
+ dic = {}
+ logger.info("正在监听邮件服务器端是否有新消息---")
+ #print('正在监听邮件服务器端是否有新消息---')
+ try:
+ iterator = get_new_mail(user_name, pwd, pop3_server)
+ except TypeError:
+ #print('监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容')
+ logger.info("监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容")
+ return "fail"
+ else:
+ for dic in iterator:
+ #print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % (
+ # dic["title"], dic["sender"], dic["sender_email"], dic["email_content"]))
+ #logger.info("邮件主题: " + str(dic["title"]) + " 发件人: " + str(dic["sender"])+"发件人邮箱:"+str(dic["sender_email"]))
+ if dic["sender"] == send_user:
+ #logger.info("发送者一样")
+ if subj in dic["title"]:
+ #logger.info("主题也包含")
+ return "success"
+ else:
+ #logger.info("主题不包含")
+ return "fail"
+ else:
+ return "fail"
+
+
+#if __name__ == '__main__':
+# user = '[email protected]'
+# pwd = 'xxxx'
+# pop3_server = 'pop.qq.com'
+# send_user = '[email protected]'
+# subj = 'or2020'
+# result = recv_email(user, pwd, pop3_server, send_user, subj)
+# print(result) \ No newline at end of file
diff --git a/04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc b/04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc
new file mode 100644
index 0000000..ecd3bf5
--- /dev/null
+++ b/04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc
Binary files differ
diff --git a/04-CustomLibrary/Pop3Library/readme.txt b/04-CustomLibrary/Pop3Library/readme.txt
new file mode 100644
index 0000000..42c7c9b
--- /dev/null
+++ b/04-CustomLibrary/Pop3Library/readme.txt
@@ -0,0 +1,26 @@
+导入方法:
+1.将该目录放到....\Python\Lib\site-packages 下
+2.在测试夹具里面要根据绝对路径导入此包
+
+
+注意:
+1.使用该包的关键字时不要用qq邮箱,qq邮箱测试发现时长会失灵现象,最好用163邮箱等
+2.注意关闭邮箱的加密传送方式SSL协议
+3.注意邮箱是否支持POP3的协议以及不同邮箱pop服务器的写法
+
+
+关键字:
+[return] Recv Email [user_name] [pwd] [pop server] [sender] [subj_sub]
+参数说明:
+[user_name]:用户名
+[pwd]:密码,第三方登入密码
+[pop server]:pop服务器
+[sender]:发送者邮箱,注意全称
+[subj_sub]:主题的部分内容,这里是测主题是否包含该参数
+[return]:返回值,成功返回success,失败返回n其他
+该关键字默认等待3min,3min内每5s检测邮箱是否收到邮件,
+若收到邮件与[sender]参数进行完全匹配,与[subj_sub]部分主题内容参数进行是否包含匹配,匹配成功则返回success;
+若收到邮件匹配失败或者超时则返回其他,
+
+
+若修改等待时间,可查找源码中的num变量将36改为其他值即可,注意时间粒度是5s,若num=3,则您修改的等待时间是15s
diff --git a/04-CustomLibrary/Smtp4Library/__init__.py b/04-CustomLibrary/Smtp4Library/__init__.py
new file mode 100644
index 0000000..dff36a4
--- /dev/null
+++ b/04-CustomLibrary/Smtp4Library/__init__.py
@@ -0,0 +1,417 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# This file is part of robotframework-Smtp3Library.
+# https://github.io/lucamaro/robotframework-Smtp3Library
+
+# Licensed under the Apache License 2.0 license:
+# http://www.opensource.org/licenses/Apache-2.0
+# Copyright (c) 2016, Luca Maragnani <[email protected]>
+
+"""
+Library implementation
+"""
+import ast
+import smtplib
+import email
+import random
+import string
+import mimetypes
+import quopri
+from email.message import Message
+from email.mime.audio import MIMEAudio
+from email.mime.base import MIMEBase
+from email.mime.image import MIMEImage
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+from email import encoders
+import os.path
+import socket
+from robot.api.deco import keyword
+from robot.api import logger
+
+from Smtp3Library.version import __version__ # NOQA
+
+COMMASPACE = ', '
+
+class Smtp3Library(object):
+ """
+ SMTP Client class
+ """
+
+ def __init__(self):
+ """
+ Constructor
+ """
+ self.message = self._MailMessage()
+ self.host = None
+ self.port = None
+ self.user = None
+ self.password = None
+ self.smtp = None
+
+ def _prepare_connection(self, host, port, user=None, password=None):
+ """
+ Private method to collect connection informations
+ """
+ self.host = host
+ self.port = int(port)
+ self.user = user
+ self.password = password
+ self.client_hostname = socket.gethostname()
+
+
+ def prepare_ssl_connection(self, host, port=465, user=None, password=None):
+ """
+ Collect connection informations for SSL channel
+ """
+ self._prepare_connection(host, port, user, password)
+ self.smtp = smtplib.SMTP_SSL()
+
+ def prepare_connection(self, host, port=25, user=None, password=None):
+ """
+ Collect connection informations for unencrypted channel
+ """
+ self._prepare_connection(host, port, user, password)
+ self.smtp = smtplib.SMTP()
+
+ def add_to_recipient(self, recipient):
+ """
+ Add a recipient to "To:" list
+ """
+ self.message.mail_to.append(recipient)
+
+ def add_cc_recipient(self, recipient):
+ """
+ Add a recipient to "Cc:" list
+ """
+ self.message.mail_cc.append(recipient)
+
+ def add_bcc_recipient(self, recipient):
+ """
+ Add a recipient to "Bcc:" list
+ """
+ self.message.mail_bcc.append(recipient)
+
+ def set_subject(self, subj):
+ """
+ Set email subject
+ """
+ self.message.subject = subj
+
+ def set_from(self, from_recipient):
+ """
+ Set from address of message and envelope
+ """
+ self.message.mail_from = from_recipient
+
+ def set_body(self, body):
+ """
+ Set email body
+ """
+ self.message.body = body
+
+ def set_random_body(self, size):
+ """
+ Set a random body of <size> length
+ """
+ body = ''
+ for i in range(0, size):
+ body += ''.join(random.choice(string.uppercase + string.digits))
+ if i % 80 == 0:
+ body += "\n"
+ self.message.body = body
+
+ def add_attachment(self, attach):
+ """
+ Add attachment to a list of filenames
+ """
+ self.message.attachments.append(attach)
+
+ def add_header(self, name, value):
+ """
+ Add a custom header to headers list
+ """
+ self.message.headers[name] = value
+
+ def connect(self):
+ '''
+ Open connection to server
+ Returns tuple (smtp status code, message)
+ '''
+ return self.smtp.connect(self.host, self.port)
+
+ def present_client_as(self, client_hostname):
+ '''
+ Set helo/ehlo client identity
+ '''
+ self.client_hostname = client_hostname
+
+ def helo(self):
+ '''
+ Send HELO command
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.helo(self.client_hostname)
+ logger.info(result)
+ return result
+
+ def ehlo(self):
+ '''
+ Send EHLO command
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.ehlo(self.client_hostname)
+ logger.info(result)
+ return result
+
+ def get_esmtp_features(self):
+ '''
+ Returns hashmap with ESMTP feature received with EHLO
+ '''
+ logger.info(self.smtp.esmtp_features)
+ return self.smtp.esmtp_features
+
+ def logins(self):
+ try:
+ '''
+ Login user
+ Returns tuple (smtp status code, message)
+ '''
+ logger.info("Login with user " + self.user + " and password " + self.password)
+ '''try:
+ subuser=bytes.decode(self.user)
+ subpassword=bytes.decode(self.password)
+ result = self.smtp.login(subuser.encode('ascii'), subpassword.encode('ascii'))
+ logger.info(result)
+ return result
+ except:
+ logger.info("本身就是str类型不需要bytes to str!")
+ subuser=str(self.user).encode('ascii')
+ subpassword=str(self.password).encode('ascii')
+ result = self.smtp.login(subuser, subpassword)
+ logger.info(result)
+ return result'''
+ result = self.smtp.login(self.user, self.password)
+ logger.info(result)
+ return "success"
+ except:
+ return "fail"
+
+
+ def starttls(self, keyfile=None, certfile=None):
+ '''
+ sends STARTTLS
+ optional: keyfile certfile
+ Returns tuple (smtp status code, message)
+ '''
+ logger.info("STARTTLS")
+ if keyfile is None and certfile is None:
+ result = self.smtp.starttls()
+ else:
+ result = self.smtp.starttls(keyfile, certfile)
+ logger.info(result)
+ return result
+
+ def data(self):
+ '''
+ Data command send email body with "MAIL FROM:", "RCPT TO:" and "DATA" commands
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.mail(self.message.mail_from)
+ result += self.smtp.rcpt(self.message.get_message_recipients())
+
+ result += self.smtp.data(self.message.get_message_as_string())
+ logger.info(result)
+ return result
+
+ def sendmail(self):
+ '''
+ Send email with "MAIL FROM:", "RCPT TO:" and "DATA" commands
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.sendmail(self.message.mail_from, self.message.get_message_recipients(), self.message.get_message_as_string())
+ logger.info(result)
+ return result
+
+ def quit(self):
+ '''
+ Send QUIT command
+ Returns tuple (smtp status code, message)
+ '''
+ result = self.smtp.quit()
+ logger.info(result)
+ return result
+
+ def close_connection(self):
+ '''
+ Close connection to server
+ '''
+ return self.smtp.close()
+
+ def send_message(self):
+ """
+ Send the message, from connection establishment to quit and close connection.
+ All the connection and email parameters must be already set before invocation.
+ Returns sendmail response (code, message)
+ """
+
+ # Send the message
+ try:
+ self.connect()
+
+ if self.user is not None:
+ self.ehlo()
+ self.logins()
+
+ send_result = self.sendmail()
+
+ self.quit()
+ self.close_connection()
+ # return send_result
+ return "success"
+ except:
+ return "fail"
+
+ @keyword('Send Message With All Parameters')
+ def send_message_full(self, host, user, password, subj,
+ from_recipient, to_recipient, cc_recipient=None, bcc_recipient=None,
+ body=None, attach=None):
+ """
+ Send a message specifing all parameters on the same linecc
+ cc, bcc and attach parameters may be strings or array of strings
+ host, user, password, subj, fromadd, toadd - are mandatory parameters
+ to use the optional paramaters pleas specify the name fo the parameter in the call
+ user and password even if mandatory could be set to None so no authentication will be made
+ Example:
+ sendMail("smtp.mail.com", None, None, "The subject", "[email protected]", "[email protected]", body="Hello World body")
+
+ sendMail("smtp.mail.com", "scott", "tiger", "The subject", "[email protected]", "[email protected]", body="Hello World body", attach=attaches
+ where could be:
+ attaches = ["c:\\desktop\\file1.zip", "c:\\desktop\\file2.zip"] or
+ attaches = "c:\\desktop\\file1.zip"
+ Returns sendmail response (code, message)
+ """
+
+ self.host = host
+ self.user = user
+ self.password = password
+
+ self.set_subject(subj)
+ self.set_from(from_recipient)
+ self.message.mail_to = to_recipient
+ if cc_recipient != None:
+ self.message.mail_cc = cc_recipient
+ if bcc_recipient != None:
+ self.message.mail_bcc = bcc_recipient
+ #Fill the message
+ if body != None:
+ self.set_body(body)
+ # Part two is attachment
+ if attach != None:
+ attachlist = ast.literal_eval(attach)
+ self.message.attachments = attachlist
+ #logger.info("self.message.attachments:"+str(type(self.message.attachments)))
+ #logger.info("attachtype:"+str(type(attachlist)))
+ #logger.info("attachlist:"+str(attachlist))
+
+ return self.send_message()
+
+
+ class _MailMessage:
+ """
+ Simplified email message
+ This class represent email headers and payload content, not envelope data
+ """
+
+ def __init__(self):
+ """
+ init object variables
+ """
+ self.mail_from = None
+ self.mail_to = []
+ self.mail_cc = []
+ self.mail_bcc = []
+ self.subject = ''
+ self.body = ''
+ self.attachments = []
+ self.headers = {}
+
+ def get_message_recipients(self):
+ '''
+ Get all message recipients (to, cc, bcc)
+ '''
+ recipients = []
+ tolist = ast.literal_eval(self.mail_to)
+ cclist = ast.literal_eval(self.mail_cc)
+ bcclist = ast.literal_eval(self.mail_bcc)
+ recipients.extend(tolist)
+ recipients.extend(cclist)
+ recipients.extend(bcclist)
+ #logger.info("recipientslist:"+str(recipients))
+ return recipients
+
+ def get_message_as_string(self):
+ '''
+ Get message as string to be sent with smtplib.sendmail api
+ '''
+ if len(self.attachments) > 0:
+ #logger.info("attachments:"+str(self.attachments))
+ #logger.info("attachmentstype:"+str(type(self.attachments)))
+ #logger.info("attachmentsnum:"+str(len(self.attachments)))
+
+ envelope = MIMEMultipart()
+ envelope.attach(MIMEText(self.body))
+ else:
+ envelope = MIMEText(self.body)
+
+ recipients = self.get_message_recipients()
+
+
+ tolist = ast.literal_eval(self.mail_to)
+ cclist = ast.literal_eval(self.mail_cc)
+ envelope['From'] = self.mail_from
+ envelope['To'] = COMMASPACE.join(tolist)
+ envelope['Cc'] = COMMASPACE.join(cclist)
+ envelope['Subject'] = self.subject
+ #logger.info("envelope111:"+str(self.attachments))
+ for attachment in list(self.attachments):
+ ctype, encoding = mimetypes.guess_type(attachment)
+ #logger.info("attachment:"+attachment+" ctype:"+str(ctype)+" encoding:"+str(encoding))
+ if ctype is None or encoding is not None:
+ # No guess could be made, or the file is encoded (compressed), so
+ # use a generic bag-of-bits type.
+ ctype = 'application/octet-stream'
+ maintype, subtype = ctype.split('/', 1)
+ #logger.info("maintype:"+str(maintype)+" subtype:"+str(subtype))
+
+ msg = None
+ if maintype == 'text':
+ attach_file = open(attachment,'rb')
+ # TODO: we should handle calculating the charset
+ msg = MIMEText(attach_file.read(), _subtype=subtype, _charset='utf-8')
+ attach_file.close()
+ elif maintype == 'image':
+ attach_file = open(attachment, 'rb')
+ msg = MIMEImage(attach_file.read(), _subtype=subtype)
+ attach_file.close()
+ elif maintype == 'audio':
+ attach_file = open(attachment, 'rb')
+ msg = MIMEAudio(attach_file.read(), _subtype=subtype)
+ attach_file.close()
+ else:
+ attach_file = open(attachment, 'rb')
+ msg = MIMEBase(maintype, subtype)
+ msg.set_payload(attach_file.read())
+ attach_file.close()
+ # Encode the payload using Base64
+ encoders.encode_base64(msg)
+
+ # Set the filename parameter
+ msg.add_header('Content-Disposition', 'attachment',
+ filename=os.path.basename(attachment))
+ envelope.attach(msg)
+
+
+ #logger.info("envelope.as_string:"+envelope.as_string())
+ return envelope.as_string()
diff --git a/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc
new file mode 100644
index 0000000..15fa648
--- /dev/null
+++ b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc
Binary files differ
diff --git a/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc
new file mode 100644
index 0000000..29c241c
--- /dev/null
+++ b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc
Binary files differ
diff --git a/04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc b/04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc
new file mode 100644
index 0000000..d83a4bb
--- /dev/null
+++ b/04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc
Binary files differ
diff --git a/04-CustomLibrary/Smtp4Library/version.py b/04-CustomLibrary/Smtp4Library/version.py
new file mode 100644
index 0000000..8f9fcb8
--- /dev/null
+++ b/04-CustomLibrary/Smtp4Library/version.py
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+# This file is part of robotframework-Smtp3Library.
+# https://github.io/lucamaro/robotframework-SmtpLibrary
+
+# Licensed under the Apache License 2.0 license:
+# http://www.opensource.org/licenses/Apache-2.0
+# Copyright (c) 2016, Luca Maragnani <[email protected]>
+
+__version__ = '0.1.3' # NOQA