summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhaokun <[email protected]>2024-09-03 14:39:45 +0800
committerzhaokun <[email protected]>2024-09-03 14:39:45 +0800
commit57edbf5b10a1a3d9565ebff66de198d1660bfa02 (patch)
tree4b2d7dff1e798fd4b230089c38764c1eb2758e87
parent29691d17ad0fb2fd2861dc339288b33763d590b3 (diff)
add flag for curl
-rw-r--r--app.py16
-rw-r--r--ssl_player.py20
2 files changed, 29 insertions, 7 deletions
diff --git a/app.py b/app.py
index 8b1ab49..e0a67d8 100644
--- a/app.py
+++ b/app.py
@@ -26,39 +26,49 @@ def run_traffic_env():
type = json_data.get("type")
protocol = json_data.get("protocol")
temp_data = json_data
+ flag = False
if protocol == "http":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate http traffic.", flush=True)
+ flag = True
player = HttpPlayer()
result = player.send_http_traffic(temp_data, "", "")
elif protocol == "https" or protocol == "ssl":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate ssl traffic.", flush=True)
+ flag = True
player = SSLPlayer()
result = player.send_ssl_traffic(temp_data, "", "")
elif protocol == "ftp":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate ftp traffic.", flush=True)
+ flag = True
player = FtpPlayer()
result = player.send_ftp_traffic(temp_data, "", "")
elif protocol == "dns":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate dns traffic.", flush=True)
+ flag = True
player = DnsPlayer()
result = player.send_dns_query(temp_data, "", "")
elif protocol == "mail":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate mail traffic.", flush=True)
+ flag = True
player = MailPlayer()
result = player.send_mail_traffic(temp_data, "", "")
elif protocol == "doh":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate doh traffic.", flush=True)
+ flag = True
player = DohPlayer()
result = player.send_doh_query(temp_data, "", "")
elif protocol == "dtls":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate dtls traffic by client.", flush=True)
+ flag = True
player = DtlsPlayer()
result = player.send_dtls_traffic(temp_data, "", "")
elif protocol == "quic":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate quic traffic by client.", flush=True)
+ flag = True
result = quic_player.asyncio.run(quic_player.send_quic_traffic(temp_data, "", ""))
elif protocol == "mpls":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate mpls traffic by tcpreplay.", flush=True)
+ flag = True
if type == "one_label":
player = MplsOneLabelPlayer()
result = player.send_mpls_one_label_traffic(temp_data, "", "")
@@ -66,6 +76,7 @@ def run_traffic_env():
print("to be added")
elif protocol == "gre":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate gre traffic by tcpreplay.", flush=True)
+ flag = True
if type == "gre_over_gre":
player = GreOverGrePlayer()
result = player.send_gre_over_gre_traffic(temp_data, "", "")
@@ -73,13 +84,16 @@ def run_traffic_env():
print("to be added")
elif protocol == "unordered_tcp":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate unordered tcp by tcpreplay.", flush=True)
+ flag = True
player = UnorderedTcpPlayer()
result = player.send_unordered_tcp_traffic(temp_data, "", "")
elif protocol == "vxlan":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate vxlan traffic by tcpreplay.", flush=True)
+ flag = True
print("to be added")
elif protocol == "":
print("Execute {} operation.".format(type))
+ flag = True
pcap_name = json_data.get("pcap_name") + ".pcap"
yaml_name = json_data.get("yaml_name") + ".yaml"
m = json_data.get("m")
@@ -94,7 +108,7 @@ def run_traffic_env():
}
tr = trex_replay.TrafficReplay(path_dict)
result = tr.trex_playback(yaml_name, pcap_name, m, d, clients_start_ip, clients_end_ip, servers_start_ip, servers_end_ip)
- if type == "curl" or type == "wget" or type == "nslookup":
+ if flag == False and (type == "curl" or type == "wget" or type == "nslookup"):
print("Execute {} operation.".format(type))
command = json_data.get("command")
p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
diff --git a/ssl_player.py b/ssl_player.py
index 82c868d..9558927 100644
--- a/ssl_player.py
+++ b/ssl_player.py
@@ -10,9 +10,10 @@ class SSLPlayer:
def send_ssl_traffic(self, traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ssl traffic for effect verification.", flush=True)
if "command" in traffic_data:
- p = subprocess.Popen(traffic_data["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
- output, error = p.communicate()
- result = output + error
+ # p = subprocess.Popen(traffic_data["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
+ # output, error = p.communicate()
+ # result = output + error
+ result = ""
elif "command" not in traffic_data:
action = traffic_data["action"]
url = traffic_data["url"]
@@ -66,11 +67,18 @@ if __name__ == "__main__":
test_data = {
"traffic": {
"protocol": "ssl",
- "type": "client", # client/curl
- "action": "post", # get/post
- "url": "https://www.google.com"
+ "type": "curl",
+ "command": "curl --connect-timeout 30 -m 60 -H \"Content-Type:application/json;charset=UTF-8\" -X POST -d \"{\\\"requestbody\\\":\\\"test_request_body\\\",\\\"setcook\\\":\\\"test_setcook\\\",\\\"contenttype\\\": \\\"test_cont\\\",\\\"responsebody\\\": \\\"test_resbody\\\"}\" -kv --user-agent \"Wget (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36\" https://192.168.40.206:1443/go"
}
}
+ # test_data = {
+ # "traffic": {
+ # "protocol": "ssl",
+ # "type": "client", # client/curl
+ # "action": "post", # get/post
+ # "url": "https://www.google.com"
+ # }
+ # }
traffic_data = test_data["traffic"]
ssl_player = SSLPlayer()
ssl_player.send_ssl_traffic(traffic_data, "ui", False)