diff options
| author | dongxiaoyan <[email protected]> | 2020-04-01 12:42:05 +0800 |
|---|---|---|
| committer | dongxiaoyan <[email protected]> | 2020-04-01 12:42:05 +0800 |
| commit | acc676857bd85512f344a8d06aa1ae8846e7c0db (patch) | |
| tree | f2a7e19139d7f763e699ae10a997184c168fed76 /04-CustomLibrary | |
Diffstat (limited to '04-CustomLibrary')
| -rw-r--r-- | 04-CustomLibrary/Custometest/MD5.py | 40 | ||||
| -rw-r--r-- | 04-CustomLibrary/Custometest/__init__.py | 16 | ||||
| -rw-r--r-- | 04-CustomLibrary/Custometest/certificate.yaml | 0 | ||||
| -rw-r--r-- | 04-CustomLibrary/Custometest/cmd_cer.py | 164 | ||||
| -rw-r--r-- | 04-CustomLibrary/Custometest/printlog.py | 11 | ||||
| -rw-r--r-- | 04-CustomLibrary/FileLibrary/__init__.py | 7 | ||||
| -rw-r--r-- | 04-CustomLibrary/FileLibrary/filetool.py | 27 | ||||
| -rw-r--r-- | 04-CustomLibrary/Pop3Library/__init__.py | 200 | ||||
| -rw-r--r-- | 04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc | bin | 0 -> 4589 bytes | |||
| -rw-r--r-- | 04-CustomLibrary/Pop3Library/readme.txt | 26 | ||||
| -rw-r--r-- | 04-CustomLibrary/Smtp4Library/__init__.py | 417 | ||||
| -rw-r--r-- | 04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc | bin | 0 -> 11414 bytes | |||
| -rw-r--r-- | 04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc | bin | 0 -> 11425 bytes | |||
| -rw-r--r-- | 04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc | bin | 0 -> 207 bytes | |||
| -rw-r--r-- | 04-CustomLibrary/Smtp4Library/version.py | 11 |
15 files changed, 919 insertions, 0 deletions
diff --git a/04-CustomLibrary/Custometest/MD5.py b/04-CustomLibrary/Custometest/MD5.py new file mode 100644 index 0000000..697ae83 --- /dev/null +++ b/04-CustomLibrary/Custometest/MD5.py @@ -0,0 +1,40 @@ +# 由于MD5模块在python3中被移除 +# 在python3中使用hashlib模块进行md5操作 + +import hashlib + +class MD5: + def MD5(data,langer,md5_types): + # 创建md5对象 + # m = hashlib.md5() + # Tips + # 此处必须encode + # 若写法为m.update(str) 报错为: Unicode-objects must be encoded before hashing + # 因为python3里默认的str是unicode + # 或者 b = bytes(str, encoding='utf-8'),作用相同,都是encode为bytes + # b = str.encode(encoding='utf-8') + # m.update(b) + # str_md5 = m.hexdigest() + if langer == "英文": + # str_md5 = hashlib.md5(b'this is a md5 test.').hexdigest() + str_md5 = hashlib.md5("b'"+data+"'").hexdigest() + print('MD5加密前为 :' + data) + print('MD5加密后为 :' + str_md5) + return str_md5 + elif langer == "中文": + str_md5 = hashlib.md5('你好'.encode(encoding=md5_types)).hexdigest() + return str_md5 + # utf8 和gbk 加密结构不一样 + # hashlib.md5('你好'.encode(encoding='GBK')).hexdigest() + # hashlib.md5('你好'.encode(encoding='GB2312')).hexdigest() + # hashlib.md5('你好'.encode(encoding='GB18030')).hexdigest() +if __name__ == '__main__': + data = '小猪' + langer = '中文' + md5_types = 'GBK' + a =MD5(data,langer,md5_types) + print(a) + b=r'C:\Users\小猪\AppData\Local\Programs\Python\Python37\Lib\site-packages\custometest\MD5.py' + with open(b, encoding='utf-8') as f: + text = f.read() + print(text)
\ No newline at end of file diff --git a/04-CustomLibrary/Custometest/__init__.py b/04-CustomLibrary/Custometest/__init__.py new file mode 100644 index 0000000..a4b4d83 --- /dev/null +++ b/04-CustomLibrary/Custometest/__init__.py @@ -0,0 +1,16 @@ + +#-*- coding:utf-8 -*- +''' + created by hch 2019-06-26 +''' + +from custometest.printlog import printlog +from custometest.MD5 import MD5 +from custometest.cmd_cer import Order +# from custometest.printlog import printlog + + +__version__ = '1.0' + +class custometest(printlog,Order,MD5): + ROBOT_LIBRARY_SCOPE = 'GLOBAL' diff --git a/04-CustomLibrary/Custometest/certificate.yaml b/04-CustomLibrary/Custometest/certificate.yaml new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/04-CustomLibrary/Custometest/certificate.yaml diff --git a/04-CustomLibrary/Custometest/cmd_cer.py b/04-CustomLibrary/Custometest/cmd_cer.py new file mode 100644 index 0000000..6e6e636 --- /dev/null +++ b/04-CustomLibrary/Custometest/cmd_cer.py @@ -0,0 +1,164 @@ +import os +import subprocess +from time import sleep + + +class Order: + def CMD(self,data): + result = os.popen(data) + # res = result.read().encoding('GBK') + res = result.read() + result.close() + # res = res.decode("unicode-escape") + return res + def Linux(self): + pass + # 根据证书颁发者名字判断证书是否替换 + def Cert_Verification(self,data): + c = [] + print(1) + #with open(r'C:\Users\iiesoft\AppData\Local\Programs\Python\Python36\Lib\site-packages\custometest\certificate.yaml', 'r') as foo: + with open(r'certificate.yaml', 'r') as foo: + print(2) + for line in foo.readlines(): + if data in line: + print(line) + c.append('证书已替换') + else: + pass + if '证书已替换' in c: + # print('证书已替换') + foo.close() + return '证书已替换' + else: + # print('证书未替换') + foo.close() + return '证书未替换' + + def Content_Type(self,data): + d = [] + with open('certificate.yaml', 'r') as foo: + for line in foo.readlines(): + if data in line: + # print(line) + d.append('Content_Type已替换') + else: + pass + if 'Content_Type已替换' in d: + # print('证书已替换') + foo.close() + return 'Content_Type已替换' + else: + # print('证书未替换') + foo.close() + return 'Content_Type未替换' + # curl路由内容设置 + def curl_name(self,data): + #curl_name = 'curl -kv -m 10 -1 --trace C:/Users/iiesoft/AppData/Local/Programs/Python/Python36/Lib/site-packages/custometest/certificate.yaml '+data+'| iconv -f utf-8 -t gbk' + curl_name = 'curl -kv -m 10 -1 --trace certificate.yaml '+data+'| iconv -f utf-8 -t gbk' + return curl_name + # 控制器 + def manu(self,url,Certificate): + # print(data['url']) + n = 0 + while n != len(url): + b = self.curl_name(url[n]) + d = self.CMD(b) + # print(d) + sleep(1) + if Certificate != "": + c =self.Cert_Verification(Certificate) + # f = self.Content_Type(data["Content_Type"]) + sleep(1) + assert_cer = url[n]+c + # assert_Content_Type = data['Content_Type']+f + n+=1 + return d,assert_cer + + def FTP(self, ftp_type): + windows_path = os.getcwd() + linux_path = os.getcwd().replace('\\', '/') + # 判断FTP执行类型:(下载/登录) + if ftp_type == "下载": + # 调用cmd执行FTP下载文件 + data = 'curl -m 20 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" -o zmmtext123.txt' + d = self.CMD(data) + sleep(5) + fsize = os.path.getsize(linux_path + "/zmmtext123.txt") # 435814 + if fsize == 435814: + return "Success" + else: + return "Fail" + elif ftp_type == "登录": + data = 'curl -m 10 ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" | iconv -f utf-8 -t gbk' + d = self.CMD(data) + # print(d) + if "Graphical (Xorg) program starter for ADRIANE" in d: + return "Success" + else: + return "Fail" + # FTP 下载 + def FTP_down(self, ftp_url,file_size,fiel_name): + windows_path = os.getcwd() + linux_path = os.getcwd().replace('\\', '/') + # 判断FTP执行类型:(下载/登录) + # 调用cmd执行FTP下载文件 + data = 'curl -m 20 '+ftp_url+ '-o '+ fiel_name + " ' " + print(data) + d = self.CMD(data) + sleep(5) + fsize = os.path.getsize(linux_path + "/"+fiel_name) # 435814 + print(fsize) + if fsize == file_size: + return "Success" + else: + return "Fail" + + # FTP 登录 + def FTP_login(self, ftp_url,file_content): + data = 'curl -m 10 '+ftp_url+' | iconv -f utf-8 -t gbk' + d = self.CMD(data) + # print(d) + if file_content in d: + return "Success" + else: + return "Fail" + +if __name__ == '__main__': + datas = {"url":['https://www.baidu.com'], +"Certificate":"Tango Secure Gateway CA", +# "Content_Type":"text/html", +'log':'Security Event Logs', +"sni":['baidu'], +"intercept_code":"200", +"log_code":"200", +"certifucate":"1", +"log_content":"true" +} +# data= {"url":['https://www.baidu.com'], +# "Certificate":"Tango Secure Gateway CA" +# } + # url = ['https://www.baidu.com'] + # url = ['https://www.baidu.com'] + # url = ['https://www.baidu.com'] + # # Certificate1 = "Tango Secure Gateway CA" + # Certificate = "baidu" + # a='Tango Secure Gateway CA' + # s = Order() + # b = s.manu(url,Certificate) + # print(b[1]) + # FTP下载 传入ftp的路径和文件大小 + ftp_url = 'ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" ' + ftp_size = 435814 + ftp_issue = s.FTP_down(ftp_url,ftp_size) + # FTP登录 传入ftp的路径和文件内容 + ftp_url ='ftp://202.38.97.230/pub/iso/linux/knoppix/KNOPPIX_V7.7.1DVD-2016-10-22-EN/dpkg-l-dvd-771.txt -u"anonymous:[email protected]" ' + file_content = "Graphical (Xorg) program starter for ADRIANE" + ftp_issue = s.FTP_login(ftp_url,file_content) + # for i in b: + # print(i) + # dd = s.CMD('curl -I https://www.baidu.com') + # print(dd) + # if "private, no-cache, no-store, proxy-revalidate, no-transform"in dd: + # print("ok") + # a ='curl -kv -1 --trace certificate.yaml https://www.baidu.com | iconv -f utf-8 -t gbk'
\ No newline at end of file diff --git a/04-CustomLibrary/Custometest/printlog.py b/04-CustomLibrary/Custometest/printlog.py new file mode 100644 index 0000000..02f8ced --- /dev/null +++ b/04-CustomLibrary/Custometest/printlog.py @@ -0,0 +1,11 @@ + +#-*- coding:utf-8 -*- +''' + created by hch 2019-06-26 +''' + + +class printlog(): + + def printA(): + print("hello word")
\ No newline at end of file diff --git a/04-CustomLibrary/FileLibrary/__init__.py b/04-CustomLibrary/FileLibrary/__init__.py new file mode 100644 index 0000000..29db0ed --- /dev/null +++ b/04-CustomLibrary/FileLibrary/__init__.py @@ -0,0 +1,7 @@ +#coding=utf-8 +from filetool import filetool + +version = '1.0' + +class FileLibrary(filetool): + ROBOT_LIBRARY_SCOPE = 'GLOBAL'
\ No newline at end of file diff --git a/04-CustomLibrary/FileLibrary/filetool.py b/04-CustomLibrary/FileLibrary/filetool.py new file mode 100644 index 0000000..56eadcf --- /dev/null +++ b/04-CustomLibrary/FileLibrary/filetool.py @@ -0,0 +1,27 @@ +#coding=utf-8 +import sys +reload(sys) +sys.setdefaultencoding("utf-8") + +""" 变量文件操作 """ + +class filetool(): + def __init__(self): + pass + + def alter_dict(self, path, k, v): + data = '' + flag = True + key = '${%s}' % (k) + add = key + '\t%s' % (v) + '\n' + with open(path, 'r+') as f: + for line in f.readlines(): + if(line.find(key + '\t') == 0): + line = add + flag = False + data += line + if(flag): + data += add + print data + with open(path, 'w+') as f: + f.writelines(data)
\ No newline at end of file diff --git a/04-CustomLibrary/Pop3Library/__init__.py b/04-CustomLibrary/Pop3Library/__init__.py new file mode 100644 index 0000000..66219ac --- /dev/null +++ b/04-CustomLibrary/Pop3Library/__init__.py @@ -0,0 +1,200 @@ +import poplib +import base64 +import time +from email.parser import Parser +# 用来解析邮件主题 +from email.header import decode_header +# 用来解析邮件来源 +from email.utils import parseaddr + +from robot.api.deco import keyword +from robot.api import logger + + +class AcceptEmail(object): + + def __init__(self, user_email, password, pop3_server='serverDemon'): + self.user_email = user_email + self.password = password + self.pop3_server = pop3_server + + self.connect_email_server() + + def connect_email_server(self): + self.server = poplib.POP3(self.pop3_server) + # 打印POP3服务器的欢迎文字,验证是否正确连接到了邮件服务器 + # print('连接成功 -- ', self.server.getwelcome().decode('utf8')) + # +OK QQMail POP3 Server v1.0 Service Ready(QQMail v2.0) + + # 开始进行身份验证 + self.server.user(self.user_email) + self.server.pass_(self.password) + + def __del__(self): + # 关闭与服务器的连接,释放资源 + self.server.close() + + def get_email_count(self): + # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可 + email_num, email_size = self.server.stat() + # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size)) + return email_num + + def receive_email_info(self, now_count=None): + # 返回邮件总数目和占用服务器的空间大小(字节数), 通过stat()方法即可 + email_num, email_size = self.server.stat() + # print("消息的数量: {0}, 消息的总大小: {1}".format(email_num, email_size)) + self.email_count = email_num + self.email_sumsize = email_size + + # 使用list()返回所有邮件的编号,默认为字节类型的串 + rsp, msg_list, rsp_siz = self.server.list() + # print(msg_list, '邮件数量',len(msg_list)) + # print("服务器的响应: {0},\n消息列表: {1},\n返回消息的大小: {2}".format(rsp, msg_list, rsp_siz)) + # print('邮件总数: {}'.format(len(msg_list))) + self.response_status = rsp + self.response_size = rsp_siz + + # 下面获取最新的一封邮件,某个邮件下标(1开始算) + # total_mail_numbers = len(msg_list) + + # 动态取消息 + total_mail_numbers = now_count + + rsp, msglines, msgsiz = self.server.retr(total_mail_numbers) + # rsp, msglines, msgsiz = self.server.retr(1) + # print("服务器的响应: {0},\n原始邮件内容: {1},\n该封邮件所占字节大小: {2}".format(rsp, msglines, msgsiz)) + + # 从邮件原内容中解析 + msg_content = b'\r\n'.join(msglines).decode('utf-8')#gbk + msg = Parser().parsestr(text=msg_content) + self.msg = msg + # print('解码后的邮件信息:\n{}'.format(msg)) + + def recv(self, now_count=None): + self.receive_email_info(now_count) + self.parser() + + def get_email_title(self): + subject = self.msg['Subject'] + value, charset = decode_header(subject)[0] + if charset: + value = value.decode(charset) + # print('邮件主题: {0}'.format(value)) + self.email_title = value + + def get_sender_info(self): + hdr, addr = parseaddr(self.msg['From']) + # name 发送人邮箱名称, addr 发送人邮箱地址 + name, charset = decode_header(hdr)[0] + if charset: + name = name.decode(charset) + self.sender_qq_name = name + self.sender_qq_email = addr + # print('发送人邮箱名称: {0},发送人邮箱地址: {1}'.format(name, addr)) + + def get_email_content(self): + content = self.msg.get_payload() + # 文本信息 + content_charset = content[0].get_content_charset() # 获取编码格式 + text = content[0].as_string().split('base64')[-1] + text_content = base64.b64decode(text).decode(content_charset) # base64解码 + self.email_content = text_content + # print('邮件内容: {0}'.format(text_content)) + + # 添加了HTML代码的信息 + content_charset = content[1].get_content_charset() + text = content[1].as_string().split('base64')[-1] + # html_content = base64.b64decode(text).decode(content_charset) + + # print('文本信息: {0}\n添加了HTML代码的信息: {1}'.format(text_content, html_content)) + + def parser(self): + self.get_email_title() + self.get_sender_info() + #self.get_email_content() + + +def get_new_mail(user_name, pwd, pop3_server, second=5): + t = AcceptEmail(user_name, pwd, pop3_server) + now_count = t.get_email_count() + #print('开启的时候的邮件数量为:%s' % now_count) + logger.info("开启的时候的邮件数量为:"+str(now_count)) + # 每次需要重新连接邮箱服务器,才能获取到最新的消息 + # 默认每隔5秒看一次是否有新内容 + num = 0 + while True: + obj = AcceptEmail(user_name, pwd, pop3_server) + count = obj.get_email_count() + if count > now_count: + new_mail_count = count - now_count + #print('有新的邮件数量:%s' % new_mail_count) + logger.info("有新的邮件数量:"+str(new_mail_count)) + now_count += 1 + obj.recv(now_count) + + yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email} + #yield {"title": obj.email_title, "sender": obj.sender_qq_name, "sender_email": obj.sender_qq_email, + # "email_content": obj.email_content} + if new_mail_count > 0: + return + # print('-' * 30) + # print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % ( + # obj.email_title, obj.sender_qq_name, obj.sender_qq_email, obj.email_content)) + # print('-' * 30) + + #else: + #print('没有任何新消息.') + #logger.info("没有任何新消息.") + time.sleep(second) + num += 1 + if num == 36:#等待时间粒度,一个粒度是5s,如果num=10意思就是等待接收邮件50s,若没有邮件就返回;36是等待3min + return + + +@keyword('Recv Email') +def recv_email(user_name, pwd, pop3_server, send_user, subj): + ''' + 参数说明: + [user_name]:用户名 + [pwd]:密码,第三方登入密码 + [pop server]:pop服务器 + [sender]:发送者邮箱,注意全称 + [subj_sub]:主题的部分内容,这里是测主题是否包含该参数 + [return]:返回值,成功返回success,失败返回n其他 + 其他问题请阅读该库的readme.txt + ''' + dic = {} + logger.info("正在监听邮件服务器端是否有新消息---") + #print('正在监听邮件服务器端是否有新消息---') + try: + iterator = get_new_mail(user_name, pwd, pop3_server) + except TypeError: + #print('监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容') + logger.info("监听的内容有误,有图片数据等,无法解析而报错,不是纯文本内容") + return "fail" + else: + for dic in iterator: + #print("邮件主题:%s\n发件人:%s\n发件人邮箱:%s\n邮件内容:%s" % ( + # dic["title"], dic["sender"], dic["sender_email"], dic["email_content"])) + #logger.info("邮件主题: " + str(dic["title"]) + " 发件人: " + str(dic["sender"])+"发件人邮箱:"+str(dic["sender_email"])) + if dic["sender"] == send_user: + #logger.info("发送者一样") + if subj in dic["title"]: + #logger.info("主题也包含") + return "success" + else: + #logger.info("主题不包含") + return "fail" + else: + return "fail" + + +#if __name__ == '__main__': +# user = '[email protected]' +# pwd = 'xxxx' +# pop3_server = 'pop.qq.com' +# send_user = '[email protected]' +# subj = 'or2020' +# result = recv_email(user, pwd, pop3_server, send_user, subj) +# print(result)
\ No newline at end of file diff --git a/04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc b/04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..ecd3bf5 --- /dev/null +++ b/04-CustomLibrary/Pop3Library/__pycache__/__init__.cpython-36.pyc diff --git a/04-CustomLibrary/Pop3Library/readme.txt b/04-CustomLibrary/Pop3Library/readme.txt new file mode 100644 index 0000000..42c7c9b --- /dev/null +++ b/04-CustomLibrary/Pop3Library/readme.txt @@ -0,0 +1,26 @@ +导入方法: +1.将该目录放到....\Python\Lib\site-packages 下 +2.在测试夹具里面要根据绝对路径导入此包 + + +注意: +1.使用该包的关键字时不要用qq邮箱,qq邮箱测试发现时长会失灵现象,最好用163邮箱等 +2.注意关闭邮箱的加密传送方式SSL协议 +3.注意邮箱是否支持POP3的协议以及不同邮箱pop服务器的写法 + + +关键字: +[return] Recv Email [user_name] [pwd] [pop server] [sender] [subj_sub] +参数说明: +[user_name]:用户名 +[pwd]:密码,第三方登入密码 +[pop server]:pop服务器 +[sender]:发送者邮箱,注意全称 +[subj_sub]:主题的部分内容,这里是测主题是否包含该参数 +[return]:返回值,成功返回success,失败返回n其他 +该关键字默认等待3min,3min内每5s检测邮箱是否收到邮件, +若收到邮件与[sender]参数进行完全匹配,与[subj_sub]部分主题内容参数进行是否包含匹配,匹配成功则返回success; +若收到邮件匹配失败或者超时则返回其他, + + +若修改等待时间,可查找源码中的num变量将36改为其他值即可,注意时间粒度是5s,若num=3,则您修改的等待时间是15s diff --git a/04-CustomLibrary/Smtp4Library/__init__.py b/04-CustomLibrary/Smtp4Library/__init__.py new file mode 100644 index 0000000..dff36a4 --- /dev/null +++ b/04-CustomLibrary/Smtp4Library/__init__.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This file is part of robotframework-Smtp3Library. +# https://github.io/lucamaro/robotframework-Smtp3Library + +# Licensed under the Apache License 2.0 license: +# http://www.opensource.org/licenses/Apache-2.0 +# Copyright (c) 2016, Luca Maragnani <[email protected]> + +""" +Library implementation +""" +import ast +import smtplib +import email +import random +import string +import mimetypes +import quopri +from email.message import Message +from email.mime.audio import MIMEAudio +from email.mime.base import MIMEBase +from email.mime.image import MIMEImage +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText +from email import encoders +import os.path +import socket +from robot.api.deco import keyword +from robot.api import logger + +from Smtp3Library.version import __version__ # NOQA + +COMMASPACE = ', ' + +class Smtp3Library(object): + """ + SMTP Client class + """ + + def __init__(self): + """ + Constructor + """ + self.message = self._MailMessage() + self.host = None + self.port = None + self.user = None + self.password = None + self.smtp = None + + def _prepare_connection(self, host, port, user=None, password=None): + """ + Private method to collect connection informations + """ + self.host = host + self.port = int(port) + self.user = user + self.password = password + self.client_hostname = socket.gethostname() + + + def prepare_ssl_connection(self, host, port=465, user=None, password=None): + """ + Collect connection informations for SSL channel + """ + self._prepare_connection(host, port, user, password) + self.smtp = smtplib.SMTP_SSL() + + def prepare_connection(self, host, port=25, user=None, password=None): + """ + Collect connection informations for unencrypted channel + """ + self._prepare_connection(host, port, user, password) + self.smtp = smtplib.SMTP() + + def add_to_recipient(self, recipient): + """ + Add a recipient to "To:" list + """ + self.message.mail_to.append(recipient) + + def add_cc_recipient(self, recipient): + """ + Add a recipient to "Cc:" list + """ + self.message.mail_cc.append(recipient) + + def add_bcc_recipient(self, recipient): + """ + Add a recipient to "Bcc:" list + """ + self.message.mail_bcc.append(recipient) + + def set_subject(self, subj): + """ + Set email subject + """ + self.message.subject = subj + + def set_from(self, from_recipient): + """ + Set from address of message and envelope + """ + self.message.mail_from = from_recipient + + def set_body(self, body): + """ + Set email body + """ + self.message.body = body + + def set_random_body(self, size): + """ + Set a random body of <size> length + """ + body = '' + for i in range(0, size): + body += ''.join(random.choice(string.uppercase + string.digits)) + if i % 80 == 0: + body += "\n" + self.message.body = body + + def add_attachment(self, attach): + """ + Add attachment to a list of filenames + """ + self.message.attachments.append(attach) + + def add_header(self, name, value): + """ + Add a custom header to headers list + """ + self.message.headers[name] = value + + def connect(self): + ''' + Open connection to server + Returns tuple (smtp status code, message) + ''' + return self.smtp.connect(self.host, self.port) + + def present_client_as(self, client_hostname): + ''' + Set helo/ehlo client identity + ''' + self.client_hostname = client_hostname + + def helo(self): + ''' + Send HELO command + Returns tuple (smtp status code, message) + ''' + result = self.smtp.helo(self.client_hostname) + logger.info(result) + return result + + def ehlo(self): + ''' + Send EHLO command + Returns tuple (smtp status code, message) + ''' + result = self.smtp.ehlo(self.client_hostname) + logger.info(result) + return result + + def get_esmtp_features(self): + ''' + Returns hashmap with ESMTP feature received with EHLO + ''' + logger.info(self.smtp.esmtp_features) + return self.smtp.esmtp_features + + def logins(self): + try: + ''' + Login user + Returns tuple (smtp status code, message) + ''' + logger.info("Login with user " + self.user + " and password " + self.password) + '''try: + subuser=bytes.decode(self.user) + subpassword=bytes.decode(self.password) + result = self.smtp.login(subuser.encode('ascii'), subpassword.encode('ascii')) + logger.info(result) + return result + except: + logger.info("本身就是str类型不需要bytes to str!") + subuser=str(self.user).encode('ascii') + subpassword=str(self.password).encode('ascii') + result = self.smtp.login(subuser, subpassword) + logger.info(result) + return result''' + result = self.smtp.login(self.user, self.password) + logger.info(result) + return "success" + except: + return "fail" + + + def starttls(self, keyfile=None, certfile=None): + ''' + sends STARTTLS + optional: keyfile certfile + Returns tuple (smtp status code, message) + ''' + logger.info("STARTTLS") + if keyfile is None and certfile is None: + result = self.smtp.starttls() + else: + result = self.smtp.starttls(keyfile, certfile) + logger.info(result) + return result + + def data(self): + ''' + Data command send email body with "MAIL FROM:", "RCPT TO:" and "DATA" commands + Returns tuple (smtp status code, message) + ''' + result = self.smtp.mail(self.message.mail_from) + result += self.smtp.rcpt(self.message.get_message_recipients()) + + result += self.smtp.data(self.message.get_message_as_string()) + logger.info(result) + return result + + def sendmail(self): + ''' + Send email with "MAIL FROM:", "RCPT TO:" and "DATA" commands + Returns tuple (smtp status code, message) + ''' + result = self.smtp.sendmail(self.message.mail_from, self.message.get_message_recipients(), self.message.get_message_as_string()) + logger.info(result) + return result + + def quit(self): + ''' + Send QUIT command + Returns tuple (smtp status code, message) + ''' + result = self.smtp.quit() + logger.info(result) + return result + + def close_connection(self): + ''' + Close connection to server + ''' + return self.smtp.close() + + def send_message(self): + """ + Send the message, from connection establishment to quit and close connection. + All the connection and email parameters must be already set before invocation. + Returns sendmail response (code, message) + """ + + # Send the message + try: + self.connect() + + if self.user is not None: + self.ehlo() + self.logins() + + send_result = self.sendmail() + + self.quit() + self.close_connection() + # return send_result + return "success" + except: + return "fail" + + @keyword('Send Message With All Parameters') + def send_message_full(self, host, user, password, subj, + from_recipient, to_recipient, cc_recipient=None, bcc_recipient=None, + body=None, attach=None): + """ + Send a message specifing all parameters on the same linecc + cc, bcc and attach parameters may be strings or array of strings + host, user, password, subj, fromadd, toadd - are mandatory parameters + to use the optional paramaters pleas specify the name fo the parameter in the call + user and password even if mandatory could be set to None so no authentication will be made + Example: + sendMail("smtp.mail.com", None, None, "The subject", "[email protected]", "[email protected]", body="Hello World body") + + sendMail("smtp.mail.com", "scott", "tiger", "The subject", "[email protected]", "[email protected]", body="Hello World body", attach=attaches + where could be: + attaches = ["c:\\desktop\\file1.zip", "c:\\desktop\\file2.zip"] or + attaches = "c:\\desktop\\file1.zip" + Returns sendmail response (code, message) + """ + + self.host = host + self.user = user + self.password = password + + self.set_subject(subj) + self.set_from(from_recipient) + self.message.mail_to = to_recipient + if cc_recipient != None: + self.message.mail_cc = cc_recipient + if bcc_recipient != None: + self.message.mail_bcc = bcc_recipient + #Fill the message + if body != None: + self.set_body(body) + # Part two is attachment + if attach != None: + attachlist = ast.literal_eval(attach) + self.message.attachments = attachlist + #logger.info("self.message.attachments:"+str(type(self.message.attachments))) + #logger.info("attachtype:"+str(type(attachlist))) + #logger.info("attachlist:"+str(attachlist)) + + return self.send_message() + + + class _MailMessage: + """ + Simplified email message + This class represent email headers and payload content, not envelope data + """ + + def __init__(self): + """ + init object variables + """ + self.mail_from = None + self.mail_to = [] + self.mail_cc = [] + self.mail_bcc = [] + self.subject = '' + self.body = '' + self.attachments = [] + self.headers = {} + + def get_message_recipients(self): + ''' + Get all message recipients (to, cc, bcc) + ''' + recipients = [] + tolist = ast.literal_eval(self.mail_to) + cclist = ast.literal_eval(self.mail_cc) + bcclist = ast.literal_eval(self.mail_bcc) + recipients.extend(tolist) + recipients.extend(cclist) + recipients.extend(bcclist) + #logger.info("recipientslist:"+str(recipients)) + return recipients + + def get_message_as_string(self): + ''' + Get message as string to be sent with smtplib.sendmail api + ''' + if len(self.attachments) > 0: + #logger.info("attachments:"+str(self.attachments)) + #logger.info("attachmentstype:"+str(type(self.attachments))) + #logger.info("attachmentsnum:"+str(len(self.attachments))) + + envelope = MIMEMultipart() + envelope.attach(MIMEText(self.body)) + else: + envelope = MIMEText(self.body) + + recipients = self.get_message_recipients() + + + tolist = ast.literal_eval(self.mail_to) + cclist = ast.literal_eval(self.mail_cc) + envelope['From'] = self.mail_from + envelope['To'] = COMMASPACE.join(tolist) + envelope['Cc'] = COMMASPACE.join(cclist) + envelope['Subject'] = self.subject + #logger.info("envelope111:"+str(self.attachments)) + for attachment in list(self.attachments): + ctype, encoding = mimetypes.guess_type(attachment) + #logger.info("attachment:"+attachment+" ctype:"+str(ctype)+" encoding:"+str(encoding)) + if ctype is None or encoding is not None: + # No guess could be made, or the file is encoded (compressed), so + # use a generic bag-of-bits type. + ctype = 'application/octet-stream' + maintype, subtype = ctype.split('/', 1) + #logger.info("maintype:"+str(maintype)+" subtype:"+str(subtype)) + + msg = None + if maintype == 'text': + attach_file = open(attachment,'rb') + # TODO: we should handle calculating the charset + msg = MIMEText(attach_file.read(), _subtype=subtype, _charset='utf-8') + attach_file.close() + elif maintype == 'image': + attach_file = open(attachment, 'rb') + msg = MIMEImage(attach_file.read(), _subtype=subtype) + attach_file.close() + elif maintype == 'audio': + attach_file = open(attachment, 'rb') + msg = MIMEAudio(attach_file.read(), _subtype=subtype) + attach_file.close() + else: + attach_file = open(attachment, 'rb') + msg = MIMEBase(maintype, subtype) + msg.set_payload(attach_file.read()) + attach_file.close() + # Encode the payload using Base64 + encoders.encode_base64(msg) + + # Set the filename parameter + msg.add_header('Content-Disposition', 'attachment', + filename=os.path.basename(attachment)) + envelope.attach(msg) + + + #logger.info("envelope.as_string:"+envelope.as_string()) + return envelope.as_string() diff --git a/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc Binary files differnew file mode 100644 index 0000000..15fa648 --- /dev/null +++ b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-36.pyc diff --git a/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..29c241c --- /dev/null +++ b/04-CustomLibrary/Smtp4Library/__pycache__/__init__.cpython-37.pyc diff --git a/04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc b/04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..d83a4bb --- /dev/null +++ b/04-CustomLibrary/Smtp4Library/__pycache__/version.cpython-37.pyc diff --git a/04-CustomLibrary/Smtp4Library/version.py b/04-CustomLibrary/Smtp4Library/version.py new file mode 100644 index 0000000..8f9fcb8 --- /dev/null +++ b/04-CustomLibrary/Smtp4Library/version.py @@ -0,0 +1,11 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# This file is part of robotframework-Smtp3Library. +# https://github.io/lucamaro/robotframework-SmtpLibrary + +# Licensed under the Apache License 2.0 license: +# http://www.opensource.org/licenses/Apache-2.0 +# Copyright (c) 2016, Luca Maragnani <[email protected]> + +__version__ = '0.1.3' # NOQA |
