diff options
| author | zhaokun <[email protected]> | 2024-09-03 13:40:34 +0800 |
|---|---|---|
| committer | zhaokun <[email protected]> | 2024-09-03 13:40:34 +0800 |
| commit | f4b3efea6dca0630b63e82c70f41afcc716dfbbf (patch) | |
| tree | 99e135aab70ec84894998e3765eba5622804d44f | |
| parent | 25470ac0d8a4c49029c36754ea78681b24c1d5b5 (diff) | |
add if logic for player
| -rw-r--r-- | dns_player.py | 42 | ||||
| -rw-r--r-- | ftp_player.py | 56 | ||||
| -rw-r--r-- | http_player.py | 22 | ||||
| -rw-r--r-- | mail_player.py | 82 | ||||
| -rw-r--r-- | ssl_player.py | 22 |
5 files changed, 112 insertions, 112 deletions
diff --git a/dns_player.py b/dns_player.py index 3bf5ca1..5e19905 100644 --- a/dns_player.py +++ b/dns_player.py @@ -6,27 +6,27 @@ import dns.resolver class DnsPlayer: def send_dns_query(self, traffic_data, script_type, debug_flag): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send dns traffic for effect verification.", flush=True) - p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - output, error = p.communicate() - result = output + error - - # try: - # domain = traffic_data["domain"] - # query_type = traffic_data["query_type"] - # # 发送DNS查询请求 - # answers = dns.resolver.query(domain, query_type) - # # 获取查询结果 - # result = [str(rdata) for rdata in answers] - # return result - # except dns.resolver.NXDOMAIN: - # return "Domain name does not exist." - # except dns.resolver.NoAnswer: - # return "No record found." - # except dns.resolver.NoNameservers: - # return "The DNS server cannot be found." - # except Exception as e: - # return f"Exception: {e}" - + if "command" in traffic_data["traffic"]: + p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + output, error = p.communicate() + result = output + error + elif "command" not in traffic_data["traffic"]: + try: + domain = traffic_data["domain"] + query_type = traffic_data["query_type"] + # 发送DNS查询请求 + answers = dns.resolver.query(domain, query_type) + # 获取查询结果 + result = [str(rdata) for rdata in answers] + return result + except dns.resolver.NXDOMAIN: + return "Domain name does not exist." + except dns.resolver.NoAnswer: + return "No record found." + except dns.resolver.NoNameservers: + return "The DNS server cannot be found." + except Exception as e: + return f"Exception: {e}" return result if __name__ == "__main__": diff --git a/ftp_player.py b/ftp_player.py index 965b3a4..f2486a2 100644 --- a/ftp_player.py +++ b/ftp_player.py @@ -6,34 +6,34 @@ from ftplib import FTP class FtpPlayer: def send_ftp_traffic(self, traffic_data, script_type, debug_flag): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ftp traffic for effect verification.", flush=True) - p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - output, error = p.communicate() - result = output + error - - # try: - # result = "" - # ftp_server = traffic_data["host"] - # user = traffic_data["user"] - # password = traffic_data["password"] - # action = traffic_data["action"] - # path = traffic_data["path"] - # if len(traffic_data["local_file"]) > 0: - # local_file = traffic_data["local_file"] - # if len(traffic_data["remote_file"]) > 0: - # remote_file = traffic_data["remote_file"] - # ftp = self.ftp_connect(ftp_server, user, password) - # if action == "list": - # self.list_directory(ftp, path) - # elif action == "upload": - # self.upload_file(ftp, local_file, remote_file) - # elif action == "download": - # self.download_file(ftp, remote_file, local_file) - # result += "FTP traffic sent successfully" - # except Exception as e: - # result += f"Exception: {e}" - # finally: - # ftp.quit() - + if "command" in traffic_data["traffic"]: + p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + output, error = p.communicate() + result = output + error + elif "command" not in traffic_data["traffic"]: + try: + result = "" + ftp_server = traffic_data["host"] + user = traffic_data["user"] + password = traffic_data["password"] + action = traffic_data["action"] + path = traffic_data["path"] + if len(traffic_data["local_file"]) > 0: + local_file = traffic_data["local_file"] + if len(traffic_data["remote_file"]) > 0: + remote_file = traffic_data["remote_file"] + ftp = self.ftp_connect(ftp_server, user, password) + if action == "list": + self.list_directory(ftp, path) + elif action == "upload": + self.upload_file(ftp, local_file, remote_file) + elif action == "download": + self.download_file(ftp, remote_file, local_file) + result += "FTP traffic sent successfully" + except Exception as e: + result += f"Exception: {e}" + finally: + ftp.quit() return result def ftp_connect(self, host, user, password): diff --git a/http_player.py b/http_player.py index 270ee02..73ab889 100644 --- a/http_player.py +++ b/http_player.py @@ -6,17 +6,17 @@ import requests class HttpPlayer: def send_http_traffic(self, traffic_data, script_type, debug_flag): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send http traffic for effect verification.", flush=True) - p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - output, error = p.communicate() - result = output + error - - # action = traffic_data["action"] - # url = traffic_data["url"] - # if action == "get": - # result = self.send_get_request(url) - # elif action == "post": - # result = self.send_post_request(url) - + if "command" in traffic_data["traffic"]: + p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + output, error = p.communicate() + result = output + error + elif "command" not in traffic_data["traffic"]: + action = traffic_data["action"] + url = traffic_data["url"] + if action == "get": + result = self.send_get_request(url) + elif action == "post": + result = self.send_post_request(url) return result def send_get_request(self, url): diff --git a/mail_player.py b/mail_player.py index 13d9e5e..0e2a4e7 100644 --- a/mail_player.py +++ b/mail_player.py @@ -11,47 +11,47 @@ from email import encoders class MailPlayer: def send_mail_traffic(self, traffic_data, script_type, debug_flag): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send mail traffic for effect verification.", flush=True) - p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - output, error = p.communicate() - result = output + error - - # mail_type = traffic_data["mail_type"] - # mail_port = traffic_data["mail_port"] - # sender = traffic_data["sender"] - # password = traffic_data["password"] - # receiver = traffic_data["receiver"] - # subject = traffic_data["subject"] - # body = traffic_data["body"] - # attach = traffic_data["attach"] - # msg = MIMEMultipart() - # msg['From'] = Header(sender, 'utf-8') - # msg['To'] = Header(receiver, 'utf-8') - # msg['Subject'] = Header(subject, 'utf-8') - # msg['Date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - # # 邮件正文内容,MIMEText是纯文本 - # msg.attach(MIMEText(body, 'plain', 'utf-8')) - # # 附件 - # with open(attach, "rb") as attachment: - # part = MIMEBase("application", "octet-stream") - # part.set_payload((attachment).read()) - # encoders.encode_base64(part) - # part.add_header("Content-Disposition", f"attachment; filename= {attach}") - # msg.attach(part) - # try: - # if mail_type == "gmail": - # smtp_obj = smtplib.SMTP_SSL("smtp.gmail.com", mail_port) - # else: - # smtp_obj = smtplib.SMTP_SSL("192.168.40.206", mail_port) - # # 如果服务器不支持STARTTLS,需注释掉下面这行 - # # smtp_obj.starttls() - # smtp_obj.login(sender, password) - # smtp_obj.sendmail(sender, receiver, msg.as_string()) - # return "Email sent successfully" - # except smtplib.SMTPException as e: - # return f"SMTPException: {e}" - # finally: - # smtp_obj.quit() - + if "command" in traffic_data["traffic"]: + p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + output, error = p.communicate() + result = output + error + elif "command" not in traffic_data["traffic"]: + mail_type = traffic_data["mail_type"] + mail_port = traffic_data["mail_port"] + sender = traffic_data["sender"] + password = traffic_data["password"] + receiver = traffic_data["receiver"] + subject = traffic_data["subject"] + body = traffic_data["body"] + attach = traffic_data["attach"] + msg = MIMEMultipart() + msg['From'] = Header(sender, 'utf-8') + msg['To'] = Header(receiver, 'utf-8') + msg['Subject'] = Header(subject, 'utf-8') + msg['Date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # 邮件正文内容,MIMEText是纯文本 + msg.attach(MIMEText(body, 'plain', 'utf-8')) + # 附件 + with open(attach, "rb") as attachment: + part = MIMEBase("application", "octet-stream") + part.set_payload((attachment).read()) + encoders.encode_base64(part) + part.add_header("Content-Disposition", f"attachment; filename= {attach}") + msg.attach(part) + try: + if mail_type == "gmail": + smtp_obj = smtplib.SMTP_SSL("smtp.gmail.com", mail_port) + else: + smtp_obj = smtplib.SMTP_SSL("192.168.40.206", mail_port) + # 如果服务器不支持STARTTLS,需注释掉下面这行 + # smtp_obj.starttls() + smtp_obj.login(sender, password) + smtp_obj.sendmail(sender, receiver, msg.as_string()) + return "Email sent successfully" + except smtplib.SMTPException as e: + return f"SMTPException: {e}" + finally: + smtp_obj.quit() return result if __name__ == "__main__": diff --git a/ssl_player.py b/ssl_player.py index 139b57d..0048bd6 100644 --- a/ssl_player.py +++ b/ssl_player.py @@ -9,17 +9,17 @@ from scapy.layers.inet import TCP class SSLPlayer: def send_ssl_traffic(self, traffic_data, script_type, debug_flag): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ssl traffic for effect verification.", flush=True) - p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - output, error = p.communicate() - result = output + error - - # action = traffic_data["action"] - # url = traffic_data["url"] - # if action == "get": - # result = self.send_get_request(url) - # elif action == "post": - # result = self.send_post_request(url) - + if "command" in traffic_data["traffic"]: + p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + output, error = p.communicate() + result = output + error + elif "command" not in traffic_data["traffic"]: + action = traffic_data["action"] + url = traffic_data["url"] + if action == "get": + result = self.send_get_request(url) + elif action == "post": + result = self.send_post_request(url) return result def send_get_request(self, url): |
