diff options
| author | zhaokun <[email protected]> | 2024-09-03 14:39:45 +0800 |
|---|---|---|
| committer | zhaokun <[email protected]> | 2024-09-03 14:39:45 +0800 |
| commit | 57edbf5b10a1a3d9565ebff66de198d1660bfa02 (patch) | |
| tree | 4b2d7dff1e798fd4b230089c38764c1eb2758e87 | |
| parent | 29691d17ad0fb2fd2861dc339288b33763d590b3 (diff) | |
add flag for curl
| -rw-r--r-- | app.py | 16 | ||||
| -rw-r--r-- | ssl_player.py | 20 |
2 files changed, 29 insertions, 7 deletions
@@ -26,39 +26,49 @@ def run_traffic_env(): type = json_data.get("type") protocol = json_data.get("protocol") temp_data = json_data + flag = False if protocol == "http": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate http traffic.", flush=True) + flag = True player = HttpPlayer() result = player.send_http_traffic(temp_data, "", "") elif protocol == "https" or protocol == "ssl": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate ssl traffic.", flush=True) + flag = True player = SSLPlayer() result = player.send_ssl_traffic(temp_data, "", "") elif protocol == "ftp": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate ftp traffic.", flush=True) + flag = True player = FtpPlayer() result = player.send_ftp_traffic(temp_data, "", "") elif protocol == "dns": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate dns traffic.", flush=True) + flag = True player = DnsPlayer() result = player.send_dns_query(temp_data, "", "") elif protocol == "mail": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate mail traffic.", flush=True) + flag = True player = MailPlayer() result = player.send_mail_traffic(temp_data, "", "") elif protocol == "doh": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate doh traffic.", flush=True) + flag = True player = DohPlayer() result = player.send_doh_query(temp_data, "", "") elif protocol == "dtls": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate dtls traffic by client.", flush=True) + flag = True player = DtlsPlayer() result = player.send_dtls_traffic(temp_data, "", "") elif protocol == "quic": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate quic traffic by client.", flush=True) + flag = True result = quic_player.asyncio.run(quic_player.send_quic_traffic(temp_data, "", "")) elif protocol == "mpls": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate mpls traffic by tcpreplay.", flush=True) + flag = True if type == "one_label": player = MplsOneLabelPlayer() result = player.send_mpls_one_label_traffic(temp_data, "", "") @@ -66,6 +76,7 @@ def run_traffic_env(): print("to be added") elif protocol == "gre": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate gre traffic by tcpreplay.", flush=True) + flag = True if type == "gre_over_gre": player = GreOverGrePlayer() result = player.send_gre_over_gre_traffic(temp_data, "", "") @@ -73,13 +84,16 @@ def run_traffic_env(): print("to be added") elif protocol == "unordered_tcp": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate unordered tcp by tcpreplay.", flush=True) + flag = True player = UnorderedTcpPlayer() result = player.send_unordered_tcp_traffic(temp_data, "", "") elif protocol == "vxlan": print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Simulate vxlan traffic by tcpreplay.", flush=True) + flag = True print("to be added") elif protocol == "": print("Execute {} operation.".format(type)) + flag = True pcap_name = json_data.get("pcap_name") + ".pcap" yaml_name = json_data.get("yaml_name") + ".yaml" m = json_data.get("m") @@ -94,7 +108,7 @@ def run_traffic_env(): } tr = trex_replay.TrafficReplay(path_dict) result = tr.trex_playback(yaml_name, pcap_name, m, d, clients_start_ip, clients_end_ip, servers_start_ip, servers_end_ip) - if type == "curl" or type == "wget" or type == "nslookup": + if flag == False and (type == "curl" or type == "wget" or type == "nslookup"): print("Execute {} operation.".format(type)) command = json_data.get("command") p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") diff --git a/ssl_player.py b/ssl_player.py index 82c868d..9558927 100644 --- a/ssl_player.py +++ b/ssl_player.py @@ -10,9 +10,10 @@ class SSLPlayer: def send_ssl_traffic(self, traffic_data, script_type, debug_flag): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ssl traffic for effect verification.", flush=True) if "command" in traffic_data: - p = subprocess.Popen(traffic_data["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") - output, error = p.communicate() - result = output + error + # p = subprocess.Popen(traffic_data["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8") + # output, error = p.communicate() + # result = output + error + result = "" elif "command" not in traffic_data: action = traffic_data["action"] url = traffic_data["url"] @@ -66,11 +67,18 @@ if __name__ == "__main__": test_data = { "traffic": { "protocol": "ssl", - "type": "client", # client/curl - "action": "post", # get/post - "url": "https://www.google.com" + "type": "curl", + "command": "curl --connect-timeout 30 -m 60 -H \"Content-Type:application/json;charset=UTF-8\" -X POST -d \"{\\\"requestbody\\\":\\\"test_request_body\\\",\\\"setcook\\\":\\\"test_setcook\\\",\\\"contenttype\\\": \\\"test_cont\\\",\\\"responsebody\\\": \\\"test_resbody\\\"}\" -kv --user-agent \"Wget (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.70 Safari/537.36\" https://192.168.40.206:1443/go" } } + # test_data = { + # "traffic": { + # "protocol": "ssl", + # "type": "client", # client/curl + # "action": "post", # get/post + # "url": "https://www.google.com" + # } + # } traffic_data = test_data["traffic"] ssl_player = SSLPlayer() ssl_player.send_ssl_traffic(traffic_data, "ui", False) |
