1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
# -*- coding: utf-8 -*-
import asyncio
import aioquic.asyncio
from datetime import datetime
from aioquic.quic.configuration import QuicConfiguration
from aioquic.asyncio.protocol import QuicConnectionProtocol
async def send_quic_traffic(traffic_data, script_type, debug_flag):
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "Begin to send quic traffic for effect verification.", flush=True)
configuration = QuicConfiguration(is_client=True)
# protocol = QuicConnectionProtocol("quic")
try:
async with aioquic.asyncio.connect(
host=traffic_data["host"],
port=traffic_data["port"],
configuration=configuration,
# create_protocol=protocol
) as quic_instance:
await quic_instance.connect(host=traffic_data["host"], port=traffic_data["port"])
stream = quic_instance.create_stream(False)
writer = asyncio.StreamWriter(stream)
writer.write("headers")
await writer.drain()
# await asyncio.sleep(10)
except Exception as e:
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], f"Error: {e}.", flush=True)
return f"Error: No handshake operation.."
if __name__ == "__main__":
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), str(datetime.now().microsecond)[:3], "test")
test_data = {
"traffic": {
"protocol": "quic",
"type": "",
"host": "www.youtube.com",
"port": 443
}
}
traffic_data = test_data["traffic"]
# quic_player = QuicPlayer()
asyncio.run(send_quic_traffic(traffic_data, "ui", False))
|