1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
# -*- coding: utf-8 -*-
import subprocess
from datetime import datetime
import requests
# import http.client
from scapy.all import *
from scapy.layers.inet import TCP
class SSLPlayer:
def send_ssl_traffic(self, traffic_data):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send ssl traffic for effect verification.", flush=True)
if "command" in traffic_data:
p = subprocess.Popen(traffic_data["command"], shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
output, error = p.communicate()
result = output + error
elif "command" not in traffic_data:
action = traffic_data["action"]
url = traffic_data["url"]
if action == "get":
result = self.send_get_request(url)
elif action == "post":
result = self.send_post_request(url)
return result
def send_get_request(self, url):
try:
response = requests.get(url)
response.raise_for_status()
return response.text
except requests.RequestException as e:
return f"RequestException: {e}"
def send_post_request(self, url, data=None, json=None):
try:
# 可以根据需要选择发送data(表单数据)或json(JSON数据)
if json:
response = requests.post(url, json=json)
else:
response = requests.post(url, data=data)
response.raise_for_status()
return response.text
except requests.RequestException as e:
return f"RequestException: {e}"
def get_http_method(self):
pcap_file = "d:/zk.pcap"
packets = rdpcap(pcap_file)
http_methods = []
for p in packets:
if TCP in p:
for tls_record in p[TCP].payload:
if "handshake" in tls_record.payload:
handshake = tls_record.payload["handshake"]
if "ClientHello" in handshake:
tls_app_data = p[TCP:].payload.payload
if tls_app_data:
ssl_record = tls_app_data[0]
if "ApplicationData" in ssl_record.payload:
http_payload = ssl_record.payload["ApplicationData"]
http_request = http_payload.load
http_methods.append(http_request.method)
# print(http_methods)
if __name__ == "__main__":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "test")
test_data = {
"traffic": {
"protocol": "ssl",
"type": "client", # client/curl
"action": "post", # get/post
"url": "https://www.google.com"
}
}
traffic_data = test_data["traffic"]
ssl_player = SSLPlayer()
ssl_player.send_ssl_traffic(traffic_data, "ui", False)
# ssl_player.get_http_method()
|