diff options
| author | nanwct <[email protected]> | 2022-05-19 14:55:02 +0800 |
|---|---|---|
| committer | nanwct <[email protected]> | 2022-05-19 14:55:02 +0800 |
| commit | 8533fa17cfe826361abfc59ba2bf513091f10637 (patch) | |
| tree | cfef7ec7aef6b1bd0b5ca48d00910d20dfc289e3 /main.py | |
abc
Diffstat (limited to 'main.py')
| -rw-r--r-- | main.py | 551 |
1 files changed, 551 insertions, 0 deletions
@@ -0,0 +1,551 @@ +from util.util_http import build_dns_query, make_ssl_context, get_domain_from_cert
+from dns.message import from_wire
+import dns
+from dns.name import BadLabelType
+from dns.exception import FormError
+import time
+import configparser
+import pandas as pd
+from util.util_kafka import make_kafka_producer, kafka_topic_fail, kafka_topic_cert, kafka_topic_suc, make_seed_consumer
+from hyper import HTTP20Connection, HTTP11Connection
+from hyper.http20.exceptions import ConnectionError, StreamResetError
+import socket, h2.exceptions, ssl
+from util.concurrent.futures import ThreadPoolExecutor
+import threading
+from util.thread_timeout import time_limited
+import os
+import json
+from hyper.http20.response import HTTPHeaderMap
+from hyper.http11.parser import ParseError
+import zlib
+import sys
+import csv
+from _csv import writer
+
+# 定义except的错误信息,遇到这类错误需要特例处理
+NOT_SUPPORT_FOR_HTTP2 = "No suitable protocol found"
+CERTIFICATE_VERIFY_FAILED = "[SSL: CERTIFICATE_VERIFY_FAILED]"
+
+# HTTP连接类型
+UN_CONNECTED = 0
+CONNECTED_HTTP1_1 = 1
+CONNECTED_HTTP2_0 = 2
+
+# 超时基数
+TIMEOUT = 2
+
+
+def write_data(data: dict, kafka_topic: str = ""):
+ lock.acquire()
+ if write_model == "csv":
+ data["status"] = kafka_topic
+ writer.writerow([data.__str__()])
+ else:
+ try:
+ writer.send(kafka_topic, data)
+ except Exception as e:
+ ...
+ lock.release()
+
+
+def make_doh_request_body(q_name="baidu.com"):
+ dns_query = build_dns_query(q_name)
+ dns_query_wire_format = dns_query.to_wire()
+ return dns_query_wire_format
+
+
+def make_doh_request_header(hostname=None, conn_status=CONNECTED_HTTP2_0):
+ headers = {
+ "content-type": "application/dns-message",
+ "accept": "application/dns-message",
+ "user-agent": 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36 '
+ }
+ if hostname:
+ if conn_status == CONNECTED_HTTP2_0:
+ headers[":authority"] = hostname
+ elif conn_status == CONNECTED_HTTP1_1:
+ headers["host"] = hostname
+ else:
+ ...
+ return headers
+
+
+def decode_header(header: HTTPHeaderMap):
+ # print(header.__str__())
+ ret = {}
+ for key, value in header.items():
+ key = key.decode()
+ try:
+ value = value.decode()
+ except UnicodeDecodeError:
+ key = "UnicodeDecodeError"
+ if key not in ret.keys(): # 解决单key,多value情况
+ ret[key] = value
+ else:
+ ret[key] += (";" + value)
+ return ret
+
+
+def probe_http2_0(conn: HTTP20Connection, host_group, path_group, methods):
+ stream_ids = {}
+ for hostname in host_group:
+ headers = make_doh_request_header(hostname)
+ for path in path_group:
+ for method in methods:
+ if len(stream_ids.keys()) >= 100: # 单IP、port限制探测100次
+ continue
+ if method == "GET":
+ full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
+ try:
+ stream_id = conn.request(method=method, url=full_path, headers=headers)
+ except Exception as e:
+ continue
+ else:
+ try:
+ stream_id = conn.request(method=method, url=path, body=body, headers=headers)
+ except Exception as e:
+ continue
+ stream_ids[stream_id] = (hostname, path, method)
+
+ for stream_id, conn_info in stream_ids.items():
+ hostname, path, method = conn_info
+ try:
+ rep = conn.get_response(stream_id)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ continue
+ rep_code = rep.status
+ rep_header = decode_header(rep.headers)
+ try:
+ rep_body = rep.read()
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": e.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ return
+ # print(rep_body)
+ try:
+ rep_body = from_wire(rep_body)
+ if rep_body:
+ # produce sucInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "rep_body": rep_body.__str__()
+ }
+ # print(rep_body.__str__())
+ write_data(data, kafka_topic_suc)
+ else:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": rep_body.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ except Exception:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": rep_body.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+
+
+def probe_http1_1(conn: HTTP11Connection, host_group, path_group, methods):
+ # start_time = time.time()
+ for hostname in host_group:
+ headers = make_doh_request_header(hostname, CONNECTED_HTTP1_1)
+ for path in path_group:
+ for method in methods:
+ if method == "GET":
+ full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
+ try:
+ conn.request(method=method, url=full_path, headers=headers)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ # print("GET方法执行失败,错误信息如下:", e.__str__)
+ continue # 此处直接
+ else:
+ try:
+ conn.request(method=method, url=path, body=body, headers=headers)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ # print("POST方法执行失败,错误信息如下:", e.__str__)
+ continue
+
+ try:
+ rep = conn.get_response()
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ # print("get_response方法执行失败,错误信息如下:", e)
+ continue
+ rep_code = rep.status
+ rep_header = decode_header(rep.headers)
+ try:
+ rep_body = rep.read()
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "rep_body": e.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ # print("read方法执行失败,错误信息如下:", e)
+ continue
+ # print(rep_body)
+ try:
+ rep_body = from_wire(rep_body)
+ # # print(hostname, path, method)
+ # # print(rep_body)
+ if rep_body:
+ # produce sucInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "rep_body": rep_body.__str__()
+ }
+ # print(rep_body.__str__())
+ write_data(data, kafka_topic_suc)
+ else:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": None
+ }
+ write_data(data, kafka_topic_fail)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": e.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ # print("from_wire方法执行失败,错误信息如下:", e.__str__)
+
+
+@time_limited(TIMEOUT * 10)
+def doh_probe(ip, port, host_group=None, path_group=None):
+ verify_mode = ssl.CERT_OPTIONAL
+ ctx_2_0 = make_ssl_context()
+ ctx_2_0.set_alpn_protocols(['h2'])
+ conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
+ conn_status = UN_CONNECTED
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP2_0
+ # print("成功建立HTTP2.0连接!")
+ except Exception as e:
+ if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1
+ # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
+ ...
+ elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE
+ # print("证书原因导致连接建立失败,设置sslContext为CERT_NONE")
+ ctx_2_0.verify_mode = ssl.CERT_NONE
+ verify_mode = ssl.CERT_NONE
+ conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP2_0
+ # print("成功建立HTTP2.0连接!")
+ except Exception as e:
+ # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
+ ...
+ else:
+ # print("建立连接失败!错误信息如下:", e)
+ # produce failInfo to kafka
+ data = {
+ "ip": ip,
+ "port": port,
+ "connect_type": UN_CONNECTED,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ return
+
+ if conn_status == UN_CONNECTED:
+ ctx_1_1 = make_ssl_context()
+ ctx_1_1.verify_mode = verify_mode
+ conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP1_1
+ # print("成功建立HTTP1.1连接!")
+ except Exception as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
+ # print("建立连接失败!错误信息如下:", e)
+ # produce failInfo to kafka
+ data = {
+ "ip": ip,
+ "port": port,
+ "connect_type": UN_CONNECTED,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ return
+
+ try:
+ cert = conn._sock.getpeercert()
+ # produce certInfo to kafka
+ data = {
+ "ip": ip,
+ "port": port,
+ "certificate": cert.__str__()
+ }
+ write_data(data, kafka_topic_cert)
+ except Exception:
+ cert = None
+
+ if host_group:
+ host_group += get_domain_from_cert(cert)
+ else:
+ host_group = get_domain_from_cert(cert)
+ host_group += [None, ip] # host_group中至少包含None和原始IP地址
+ host_group = list(set(host_group)) # 去重
+
+ default_path_group = ["/dns-query", "/", "/resolve", "/doh", "/doh/family-filter",
+ "/doh/secure-filter", "/query", "/ads", "/uncensored", "adblock"]
+ if path_group:
+ path_group += default_path_group
+ path_group = list(set(path_group)) # 去重
+ else:
+ path_group = default_path_group
+
+ methods = ["POST", "GET"]
+
+ if conn_status == CONNECTED_HTTP2_0:
+ probe_http2_0(conn, host_group, path_group, methods)
+ elif conn_status == CONNECTED_HTTP1_1:
+ probe_http1_1(conn, host_group, path_group, methods)
+ else:
+ return
+
+ try:
+ conn.close()
+ except Exception as e:
+ ...
+
+
+def save_status(count):
+ data = {"count": count}
+ with open(save_status_file, "w") as fp:
+ json.dump(data, fp)
+
+
+def load_status():
+ if not os.path.exists(save_status_file):
+ return 0
+ with open(save_status_file, "r") as fp:
+ a = json.load(fp)
+ return a["count"]
+
+
+def run_with_csv_model():
+ chunks = pd.read_csv(seed_filename, iterator=True)
+ count = cpu_num * thread_num * union_detect_num
+ save_point = load_status()
+ count_scan = 0
+ while True:
+ print(1)
+ start_time = time.time()
+ data_group = []
+ try:
+ chunk_data = chunks.get_chunk(count)
+ except StopIteration: # 尾部数据丢失
+ break
+ for data in chunk_data.itertuples():
+ if count_scan < save_point: # 表示种子已经被探测过
+ count_scan += 1
+ continue
+ ip = data.saddr
+ port = data.sport
+ data_group.append((ip, port))
+ count_scan += 1
+
+ with ThreadPoolExecutor(max_workers=thread_num, thread_name_prefix="") as threadPool:
+ for data in data_group:
+ threadPool.submit(doh_probe, *data)
+
+ if count_scan > save_point:
+ save_status(count_scan)
+
+ if len(data_group) != 0:
+ end_time = time.time()
+ print(time.strftime('%y%m%d%H', time.localtime(time.time())))
+ print("CPU核数:", cpu_num, end=" ")
+ print("单核线程数:", thread_num, end=" ")
+ print("本轮完成的探测量为:", len(data_group), end=" ")
+ print("总耗时:", end_time - start_time, end=" ")
+ print("平均每秒探测量", len(data_group) / (end_time - start_time))
+
+
+def run_with_kafka_model():
+ consumer = make_seed_consumer()
+ print_num = 10000 # 扫描print_num个IP后打印输出
+ max_queue_size = thread_num * 10 # 任务队列的长度是线程队列长度的10倍
+ with ThreadPoolExecutor(max_workers=thread_num, max_queue_size=max_queue_size, thread_name_prefix="") as threadPool:
+ count = 0
+ start_time = time.time()
+ for message in consumer:
+ data = message.value
+ ip = data["ip"]
+ port = int(data["port"])
+ threadPool.submit(doh_probe, ip, port)
+ # print("成功从seed中消费数据:", ip, port)
+
+ count += 1
+ if count % print_num == 0:
+ end_time = time.time()
+ print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), end=" ")
+ print("CPU核数:", cpu_num, end=" ")
+ print("单核线程数:", thread_num, end=" ")
+ print("本轮完成的探测量为:", print_num, end=" ")
+ print("总耗时:", end_time - start_time, end=" ")
+ print("平均每秒探测量", print_num / (end_time - start_time))
+
+
+if __name__ == '__main__':
+ # 定义根目录,方便程序移植
+ root_dir = sys.path[0]
+
+ # 读取配置文件
+ config = configparser.ConfigParser()
+ config_file = os.path.join(root_dir, "config.ini")
+ config.read(config_file)
+
+ cpu_num = config.getint('DetectConfigs', 'cpu_num', fallback=1) # 程序对CPU要求不高,暂不考虑多进程
+ thread_num = config.getint('DetectConfigs', 'thread_num', fallback=32)
+ write_model = config.get('DetectConfigs', 'write_model')
+ if write_model == "csv":
+ save_filename = os.path.join(root_dir, config.get('DetectConfigs', 'save_filename'))
+ writer = csv.writer(open(save_filename, "w", newline=""))
+ elif write_model == "kafka":
+ writer = make_kafka_producer()
+ else:
+ print("error write model!")
+ sys.exit()
+
+ seed_read_model = config.get('DetectConfigs', 'seed_read_model')
+ if seed_read_model == "csv":
+ union_detect_num = config.getint('DetectConfigs', 'union_detect_num', fallback=100) # 单线程单次探测量
+ seed_filename = os.path.join(root_dir, config.get('DetectConfigs', 'filename'))
+ elif seed_read_model == "kafka":
+ union_detect_num = None
+ seed_filename = None
+ else:
+ print("error seed_read_model!")
+ sys.exit()
+
+ # 定义探测状态记录文件,用作记录探测进度,防止程序崩溃导致重复探测
+ save_status_path = os.path.join(root_dir, "./status")
+ save_status_file = os.path.join(save_status_path, "status.json")
+ if not os.path.exists(save_status_path):
+ os.mkdir(save_status_path)
+
+ lock = threading.Lock()
+ body = make_doh_request_body()
+
+ # producers = [None] * thread_num
+ print("开始探测:")
+ print("-----", "进程数:", cpu_num)
+ print("-----", "线程数:", thread_num)
+ if seed_read_model == "csv":
+ print("-----", "单线程探测量:", union_detect_num)
+ run_with_csv_model()
+ elif seed_read_model == "kafka":
+ run_with_kafka_model()
|
