summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhaokun <[email protected]>2024-09-03 13:40:34 +0800
committerzhaokun <[email protected]>2024-09-03 13:40:34 +0800
commitf4b3efea6dca0630b63e82c70f41afcc716dfbbf (patch)
tree99e135aab70ec84894998e3765eba5622804d44f
parent25470ac0d8a4c49029c36754ea78681b24c1d5b5 (diff)
add if logic for player
-rw-r--r--dns_player.py42
-rw-r--r--ftp_player.py56
-rw-r--r--http_player.py22
-rw-r--r--mail_player.py82
-rw-r--r--ssl_player.py22
5 files changed, 112 insertions, 112 deletions
diff --git a/dns_player.py b/dns_player.py
index 3bf5ca1..5e19905 100644
--- a/dns_player.py
+++ b/dns_player.py
@@ -6,27 +6,27 @@ import dns.resolver
class DnsPlayer:
def send_dns_query(self, traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send dns traffic for effect verification.", flush=True)
- p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
- output, error = p.communicate()
- result = output + error
-
- # try:
- # domain = traffic_data["domain"]
- # query_type = traffic_data["query_type"]
- # # 发送DNS查询请求
- # answers = dns.resolver.query(domain, query_type)
- # # 获取查询结果
- # result = [str(rdata) for rdata in answers]
- # return result
- # except dns.resolver.NXDOMAIN:
- # return "Domain name does not exist."
- # except dns.resolver.NoAnswer:
- # return "No record found."
- # except dns.resolver.NoNameservers:
- # return "The DNS server cannot be found."
- # except Exception as e:
- # return f"Exception: {e}"
-
+ if "command" in traffic_data["traffic"]:
+ p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
+ output, error = p.communicate()
+ result = output + error
+ elif "command" not in traffic_data["traffic"]:
+ try:
+ domain = traffic_data["domain"]
+ query_type = traffic_data["query_type"]
+ # 发送DNS查询请求
+ answers = dns.resolver.query(domain, query_type)
+ # 获取查询结果
+ result = [str(rdata) for rdata in answers]
+ return result
+ except dns.resolver.NXDOMAIN:
+ return "Domain name does not exist."
+ except dns.resolver.NoAnswer:
+ return "No record found."
+ except dns.resolver.NoNameservers:
+ return "The DNS server cannot be found."
+ except Exception as e:
+ return f"Exception: {e}"
return result
if __name__ == "__main__":
diff --git a/ftp_player.py b/ftp_player.py
index 965b3a4..f2486a2 100644
--- a/ftp_player.py
+++ b/ftp_player.py
@@ -6,34 +6,34 @@ from ftplib import FTP
class FtpPlayer:
def send_ftp_traffic(self, traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ftp traffic for effect verification.", flush=True)
- p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
- output, error = p.communicate()
- result = output + error
-
- # try:
- # result = ""
- # ftp_server = traffic_data["host"]
- # user = traffic_data["user"]
- # password = traffic_data["password"]
- # action = traffic_data["action"]
- # path = traffic_data["path"]
- # if len(traffic_data["local_file"]) > 0:
- # local_file = traffic_data["local_file"]
- # if len(traffic_data["remote_file"]) > 0:
- # remote_file = traffic_data["remote_file"]
- # ftp = self.ftp_connect(ftp_server, user, password)
- # if action == "list":
- # self.list_directory(ftp, path)
- # elif action == "upload":
- # self.upload_file(ftp, local_file, remote_file)
- # elif action == "download":
- # self.download_file(ftp, remote_file, local_file)
- # result += "FTP traffic sent successfully"
- # except Exception as e:
- # result += f"Exception: {e}"
- # finally:
- # ftp.quit()
-
+ if "command" in traffic_data["traffic"]:
+ p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
+ output, error = p.communicate()
+ result = output + error
+ elif "command" not in traffic_data["traffic"]:
+ try:
+ result = ""
+ ftp_server = traffic_data["host"]
+ user = traffic_data["user"]
+ password = traffic_data["password"]
+ action = traffic_data["action"]
+ path = traffic_data["path"]
+ if len(traffic_data["local_file"]) > 0:
+ local_file = traffic_data["local_file"]
+ if len(traffic_data["remote_file"]) > 0:
+ remote_file = traffic_data["remote_file"]
+ ftp = self.ftp_connect(ftp_server, user, password)
+ if action == "list":
+ self.list_directory(ftp, path)
+ elif action == "upload":
+ self.upload_file(ftp, local_file, remote_file)
+ elif action == "download":
+ self.download_file(ftp, remote_file, local_file)
+ result += "FTP traffic sent successfully"
+ except Exception as e:
+ result += f"Exception: {e}"
+ finally:
+ ftp.quit()
return result
def ftp_connect(self, host, user, password):
diff --git a/http_player.py b/http_player.py
index 270ee02..73ab889 100644
--- a/http_player.py
+++ b/http_player.py
@@ -6,17 +6,17 @@ import requests
class HttpPlayer:
def send_http_traffic(self, traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send http traffic for effect verification.", flush=True)
- p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
- output, error = p.communicate()
- result = output + error
-
- # action = traffic_data["action"]
- # url = traffic_data["url"]
- # if action == "get":
- # result = self.send_get_request(url)
- # elif action == "post":
- # result = self.send_post_request(url)
-
+ if "command" in traffic_data["traffic"]:
+ p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
+ output, error = p.communicate()
+ result = output + error
+ elif "command" not in traffic_data["traffic"]:
+ action = traffic_data["action"]
+ url = traffic_data["url"]
+ if action == "get":
+ result = self.send_get_request(url)
+ elif action == "post":
+ result = self.send_post_request(url)
return result
def send_get_request(self, url):
diff --git a/mail_player.py b/mail_player.py
index 13d9e5e..0e2a4e7 100644
--- a/mail_player.py
+++ b/mail_player.py
@@ -11,47 +11,47 @@ from email import encoders
class MailPlayer:
def send_mail_traffic(self, traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send mail traffic for effect verification.", flush=True)
- p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
- output, error = p.communicate()
- result = output + error
-
- # mail_type = traffic_data["mail_type"]
- # mail_port = traffic_data["mail_port"]
- # sender = traffic_data["sender"]
- # password = traffic_data["password"]
- # receiver = traffic_data["receiver"]
- # subject = traffic_data["subject"]
- # body = traffic_data["body"]
- # attach = traffic_data["attach"]
- # msg = MIMEMultipart()
- # msg['From'] = Header(sender, 'utf-8')
- # msg['To'] = Header(receiver, 'utf-8')
- # msg['Subject'] = Header(subject, 'utf-8')
- # msg['Date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- # # 邮件正文内容,MIMEText是纯文本
- # msg.attach(MIMEText(body, 'plain', 'utf-8'))
- # # 附件
- # with open(attach, "rb") as attachment:
- # part = MIMEBase("application", "octet-stream")
- # part.set_payload((attachment).read())
- # encoders.encode_base64(part)
- # part.add_header("Content-Disposition", f"attachment; filename= {attach}")
- # msg.attach(part)
- # try:
- # if mail_type == "gmail":
- # smtp_obj = smtplib.SMTP_SSL("smtp.gmail.com", mail_port)
- # else:
- # smtp_obj = smtplib.SMTP_SSL("192.168.40.206", mail_port)
- # # 如果服务器不支持STARTTLS,需注释掉下面这行
- # # smtp_obj.starttls()
- # smtp_obj.login(sender, password)
- # smtp_obj.sendmail(sender, receiver, msg.as_string())
- # return "Email sent successfully"
- # except smtplib.SMTPException as e:
- # return f"SMTPException: {e}"
- # finally:
- # smtp_obj.quit()
-
+ if "command" in traffic_data["traffic"]:
+ p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
+ output, error = p.communicate()
+ result = output + error
+ elif "command" not in traffic_data["traffic"]:
+ mail_type = traffic_data["mail_type"]
+ mail_port = traffic_data["mail_port"]
+ sender = traffic_data["sender"]
+ password = traffic_data["password"]
+ receiver = traffic_data["receiver"]
+ subject = traffic_data["subject"]
+ body = traffic_data["body"]
+ attach = traffic_data["attach"]
+ msg = MIMEMultipart()
+ msg['From'] = Header(sender, 'utf-8')
+ msg['To'] = Header(receiver, 'utf-8')
+ msg['Subject'] = Header(subject, 'utf-8')
+ msg['Date'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+ # 邮件正文内容,MIMEText是纯文本
+ msg.attach(MIMEText(body, 'plain', 'utf-8'))
+ # 附件
+ with open(attach, "rb") as attachment:
+ part = MIMEBase("application", "octet-stream")
+ part.set_payload((attachment).read())
+ encoders.encode_base64(part)
+ part.add_header("Content-Disposition", f"attachment; filename= {attach}")
+ msg.attach(part)
+ try:
+ if mail_type == "gmail":
+ smtp_obj = smtplib.SMTP_SSL("smtp.gmail.com", mail_port)
+ else:
+ smtp_obj = smtplib.SMTP_SSL("192.168.40.206", mail_port)
+ # 如果服务器不支持STARTTLS,需注释掉下面这行
+ # smtp_obj.starttls()
+ smtp_obj.login(sender, password)
+ smtp_obj.sendmail(sender, receiver, msg.as_string())
+ return "Email sent successfully"
+ except smtplib.SMTPException as e:
+ return f"SMTPException: {e}"
+ finally:
+ smtp_obj.quit()
return result
if __name__ == "__main__":
diff --git a/ssl_player.py b/ssl_player.py
index 139b57d..0048bd6 100644
--- a/ssl_player.py
+++ b/ssl_player.py
@@ -9,17 +9,17 @@ from scapy.layers.inet import TCP
class SSLPlayer:
def send_ssl_traffic(self, traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ssl traffic for effect verification.", flush=True)
- p = subprocess.Popen(traffic_data, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
- output, error = p.communicate()
- result = output + error
-
- # action = traffic_data["action"]
- # url = traffic_data["url"]
- # if action == "get":
- # result = self.send_get_request(url)
- # elif action == "post":
- # result = self.send_post_request(url)
-
+ if "command" in traffic_data["traffic"]:
+ p = subprocess.Popen(traffic_data["traffic"]["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
+ output, error = p.communicate()
+ result = output + error
+ elif "command" not in traffic_data["traffic"]:
+ action = traffic_data["action"]
+ url = traffic_data["url"]
+ if action == "get":
+ result = self.send_get_request(url)
+ elif action == "post":
+ result = self.send_post_request(url)
return result
def send_get_request(self, url):