diff options
| author | wujiating <[email protected]> | 2022-05-19 06:55:44 +0000 |
|---|---|---|
| committer | wujiating <[email protected]> | 2022-05-19 06:55:44 +0000 |
| commit | 2ad145fbbca39e7b4fd654722eabe4749d5960d0 (patch) | |
| tree | c52286eebb41fc5acda6a278bbab91fd01afbc3e | |
| parent | 0811da0d38513b1776948f6e209403fd57c7bdfd (diff) | |
| parent | 8533fa17cfe826361abfc59ba2bf513091f10637 (diff) | |
Merge branch 'master' into 'main'
abc
See merge request wujiating/diamondv!1
29 files changed, 2467 insertions, 0 deletions
diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..eaf91e2 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files
+/shelf/
+/workspace.xml
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ +<component name="InspectionProjectProfileManager"> + <settings> + <option name="USE_PROJECT_PROFILE" value="false" /> + <version value="1.0" /> + </settings> +</component>
\ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..abf7b39 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ +<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7" project-jdk-type="Python SDK" />
+</project>
\ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..ec86cd7 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ +<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+ <component name="ProjectModuleManager">
+ <modules>
+ <module fileurl="file://$PROJECT_DIR$/.idea/quickScan.iml" filepath="$PROJECT_DIR$/.idea/quickScan.iml" />
+ </modules>
+ </component>
+</project>
\ No newline at end of file diff --git a/.idea/quickScan.iml b/.idea/quickScan.iml new file mode 100644 index 0000000..d1bb9af --- /dev/null +++ b/.idea/quickScan.iml @@ -0,0 +1,15 @@ +<?xml version="1.0" encoding="UTF-8"?>
+<module type="PYTHON_MODULE" version="4">
+ <component name="NewModuleRootManager">
+ <content url="file://$MODULE_DIR$" />
+ <orderEntry type="jdk" jdkName="Python 3.7" jdkType="Python SDK" />
+ <orderEntry type="sourceFolder" forTests="false" />
+ </component>
+ <component name="PyDocumentationSettings">
+ <option name="format" value="PLAIN" />
+ <option name="myDocStringFormat" value="Plain" />
+ </component>
+ <component name="TestRunnerService">
+ <option name="PROJECT_TEST_RUNNER" value="pytest" />
+ </component>
+</module>
\ No newline at end of file diff --git a/__pycache__/main.cpython-38.pyc b/__pycache__/main.cpython-38.pyc Binary files differnew file mode 100644 index 0000000..00ce3b9 --- /dev/null +++ b/__pycache__/main.cpython-38.pyc diff --git a/config.ini b/config.ini new file mode 100644 index 0000000..019ddc2 --- /dev/null +++ b/config.ini @@ -0,0 +1,13 @@ +[DetectConfigs]
+cpu_num = 1
+thread_num = 32
+;csv or kafka
+seed_read_model = kafka
+;only seed_read_model=csv this parameter need
+union_detect_num = 100
+;only seed_read_model=csv this parameter need
+filename = ./data/443.csv
+;csv or kafka
+write_model = kafka
+;only write_model=csv this parameter need
+save_filename = ./result.csv
@@ -0,0 +1,551 @@ +from util.util_http import build_dns_query, make_ssl_context, get_domain_from_cert
+from dns.message import from_wire
+import dns
+from dns.name import BadLabelType
+from dns.exception import FormError
+import time
+import configparser
+import pandas as pd
+from util.util_kafka import make_kafka_producer, kafka_topic_fail, kafka_topic_cert, kafka_topic_suc, make_seed_consumer
+from hyper import HTTP20Connection, HTTP11Connection
+from hyper.http20.exceptions import ConnectionError, StreamResetError
+import socket, h2.exceptions, ssl
+from util.concurrent.futures import ThreadPoolExecutor
+import threading
+from util.thread_timeout import time_limited
+import os
+import json
+from hyper.http20.response import HTTPHeaderMap
+from hyper.http11.parser import ParseError
+import zlib
+import sys
+import csv
+from _csv import writer
+
+# 定义except的错误信息,遇到这类错误需要特例处理
+NOT_SUPPORT_FOR_HTTP2 = "No suitable protocol found"
+CERTIFICATE_VERIFY_FAILED = "[SSL: CERTIFICATE_VERIFY_FAILED]"
+
+# HTTP连接类型
+UN_CONNECTED = 0
+CONNECTED_HTTP1_1 = 1
+CONNECTED_HTTP2_0 = 2
+
+# 超时基数
+TIMEOUT = 2
+
+
+def write_data(data: dict, kafka_topic: str = ""):
+ lock.acquire()
+ if write_model == "csv":
+ data["status"] = kafka_topic
+ writer.writerow([data.__str__()])
+ else:
+ try:
+ writer.send(kafka_topic, data)
+ except Exception as e:
+ ...
+ lock.release()
+
+
+def make_doh_request_body(q_name="baidu.com"):
+ dns_query = build_dns_query(q_name)
+ dns_query_wire_format = dns_query.to_wire()
+ return dns_query_wire_format
+
+
+def make_doh_request_header(hostname=None, conn_status=CONNECTED_HTTP2_0):
+ headers = {
+ "content-type": "application/dns-message",
+ "accept": "application/dns-message",
+ "user-agent": 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36 '
+ }
+ if hostname:
+ if conn_status == CONNECTED_HTTP2_0:
+ headers[":authority"] = hostname
+ elif conn_status == CONNECTED_HTTP1_1:
+ headers["host"] = hostname
+ else:
+ ...
+ return headers
+
+
+def decode_header(header: HTTPHeaderMap):
+ # print(header.__str__())
+ ret = {}
+ for key, value in header.items():
+ key = key.decode()
+ try:
+ value = value.decode()
+ except UnicodeDecodeError:
+ key = "UnicodeDecodeError"
+ if key not in ret.keys(): # 解决单key,多value情况
+ ret[key] = value
+ else:
+ ret[key] += (";" + value)
+ return ret
+
+
+def probe_http2_0(conn: HTTP20Connection, host_group, path_group, methods):
+ stream_ids = {}
+ for hostname in host_group:
+ headers = make_doh_request_header(hostname)
+ for path in path_group:
+ for method in methods:
+ if len(stream_ids.keys()) >= 100: # 单IP、port限制探测100次
+ continue
+ if method == "GET":
+ full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
+ try:
+ stream_id = conn.request(method=method, url=full_path, headers=headers)
+ except Exception as e:
+ continue
+ else:
+ try:
+ stream_id = conn.request(method=method, url=path, body=body, headers=headers)
+ except Exception as e:
+ continue
+ stream_ids[stream_id] = (hostname, path, method)
+
+ for stream_id, conn_info in stream_ids.items():
+ hostname, path, method = conn_info
+ try:
+ rep = conn.get_response(stream_id)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ continue
+ rep_code = rep.status
+ rep_header = decode_header(rep.headers)
+ try:
+ rep_body = rep.read()
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": e.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ return
+ # print(rep_body)
+ try:
+ rep_body = from_wire(rep_body)
+ if rep_body:
+ # produce sucInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "rep_body": rep_body.__str__()
+ }
+ # print(rep_body.__str__())
+ write_data(data, kafka_topic_suc)
+ else:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": rep_body.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ except Exception:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP2_0,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": rep_body.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+
+
+def probe_http1_1(conn: HTTP11Connection, host_group, path_group, methods):
+ # start_time = time.time()
+ for hostname in host_group:
+ headers = make_doh_request_header(hostname, CONNECTED_HTTP1_1)
+ for path in path_group:
+ for method in methods:
+ if method == "GET":
+ full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path
+ try:
+ conn.request(method=method, url=full_path, headers=headers)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ # print("GET方法执行失败,错误信息如下:", e.__str__)
+ continue # 此处直接
+ else:
+ try:
+ conn.request(method=method, url=path, body=body, headers=headers)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ # print("POST方法执行失败,错误信息如下:", e.__str__)
+ continue
+
+ try:
+ rep = conn.get_response()
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ # print("get_response方法执行失败,错误信息如下:", e)
+ continue
+ rep_code = rep.status
+ rep_header = decode_header(rep.headers)
+ try:
+ rep_body = rep.read()
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "rep_body": e.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ # print("read方法执行失败,错误信息如下:", e)
+ continue
+ # print(rep_body)
+ try:
+ rep_body = from_wire(rep_body)
+ # # print(hostname, path, method)
+ # # print(rep_body)
+ if rep_body:
+ # produce sucInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "rep_body": rep_body.__str__()
+ }
+ # print(rep_body.__str__())
+ write_data(data, kafka_topic_suc)
+ else:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": None
+ }
+ write_data(data, kafka_topic_fail)
+ except Exception as e:
+ # produce failInfo to kafka
+ data = {
+ "ip": conn.host,
+ "port": conn.port,
+ "host": hostname,
+ "path": path,
+ "method": method,
+ "connect_type": CONNECTED_HTTP1_1,
+ "status_code": rep_code,
+ "rep_header": rep_header,
+ "info": e.__str__()
+ }
+ write_data(data, kafka_topic_fail)
+ # print("from_wire方法执行失败,错误信息如下:", e.__str__)
+
+
+@time_limited(TIMEOUT * 10)
+def doh_probe(ip, port, host_group=None, path_group=None):
+ verify_mode = ssl.CERT_OPTIONAL
+ ctx_2_0 = make_ssl_context()
+ ctx_2_0.set_alpn_protocols(['h2'])
+ conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
+ conn_status = UN_CONNECTED
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP2_0
+ # print("成功建立HTTP2.0连接!")
+ except Exception as e:
+ if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1
+ # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
+ ...
+ elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE
+ # print("证书原因导致连接建立失败,设置sslContext为CERT_NONE")
+ ctx_2_0.verify_mode = ssl.CERT_NONE
+ verify_mode = ssl.CERT_NONE
+ conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP2_0
+ # print("成功建立HTTP2.0连接!")
+ except Exception as e:
+ # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
+ ...
+ else:
+ # print("建立连接失败!错误信息如下:", e)
+ # produce failInfo to kafka
+ data = {
+ "ip": ip,
+ "port": port,
+ "connect_type": UN_CONNECTED,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ return
+
+ if conn_status == UN_CONNECTED:
+ ctx_1_1 = make_ssl_context()
+ ctx_1_1.verify_mode = verify_mode
+ conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP1_1
+ # print("成功建立HTTP1.1连接!")
+ except Exception as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
+ # print("建立连接失败!错误信息如下:", e)
+ # produce failInfo to kafka
+ data = {
+ "ip": ip,
+ "port": port,
+ "connect_type": UN_CONNECTED,
+ "info": e.__str__(),
+ }
+ write_data(data, kafka_topic_fail)
+ return
+
+ try:
+ cert = conn._sock.getpeercert()
+ # produce certInfo to kafka
+ data = {
+ "ip": ip,
+ "port": port,
+ "certificate": cert.__str__()
+ }
+ write_data(data, kafka_topic_cert)
+ except Exception:
+ cert = None
+
+ if host_group:
+ host_group += get_domain_from_cert(cert)
+ else:
+ host_group = get_domain_from_cert(cert)
+ host_group += [None, ip] # host_group中至少包含None和原始IP地址
+ host_group = list(set(host_group)) # 去重
+
+ default_path_group = ["/dns-query", "/", "/resolve", "/doh", "/doh/family-filter",
+ "/doh/secure-filter", "/query", "/ads", "/uncensored", "adblock"]
+ if path_group:
+ path_group += default_path_group
+ path_group = list(set(path_group)) # 去重
+ else:
+ path_group = default_path_group
+
+ methods = ["POST", "GET"]
+
+ if conn_status == CONNECTED_HTTP2_0:
+ probe_http2_0(conn, host_group, path_group, methods)
+ elif conn_status == CONNECTED_HTTP1_1:
+ probe_http1_1(conn, host_group, path_group, methods)
+ else:
+ return
+
+ try:
+ conn.close()
+ except Exception as e:
+ ...
+
+
+def save_status(count):
+ data = {"count": count}
+ with open(save_status_file, "w") as fp:
+ json.dump(data, fp)
+
+
+def load_status():
+ if not os.path.exists(save_status_file):
+ return 0
+ with open(save_status_file, "r") as fp:
+ a = json.load(fp)
+ return a["count"]
+
+
+def run_with_csv_model():
+ chunks = pd.read_csv(seed_filename, iterator=True)
+ count = cpu_num * thread_num * union_detect_num
+ save_point = load_status()
+ count_scan = 0
+ while True:
+ print(1)
+ start_time = time.time()
+ data_group = []
+ try:
+ chunk_data = chunks.get_chunk(count)
+ except StopIteration: # 尾部数据丢失
+ break
+ for data in chunk_data.itertuples():
+ if count_scan < save_point: # 表示种子已经被探测过
+ count_scan += 1
+ continue
+ ip = data.saddr
+ port = data.sport
+ data_group.append((ip, port))
+ count_scan += 1
+
+ with ThreadPoolExecutor(max_workers=thread_num, thread_name_prefix="") as threadPool:
+ for data in data_group:
+ threadPool.submit(doh_probe, *data)
+
+ if count_scan > save_point:
+ save_status(count_scan)
+
+ if len(data_group) != 0:
+ end_time = time.time()
+ print(time.strftime('%y%m%d%H', time.localtime(time.time())))
+ print("CPU核数:", cpu_num, end=" ")
+ print("单核线程数:", thread_num, end=" ")
+ print("本轮完成的探测量为:", len(data_group), end=" ")
+ print("总耗时:", end_time - start_time, end=" ")
+ print("平均每秒探测量", len(data_group) / (end_time - start_time))
+
+
+def run_with_kafka_model():
+ consumer = make_seed_consumer()
+ print_num = 10000 # 扫描print_num个IP后打印输出
+ max_queue_size = thread_num * 10 # 任务队列的长度是线程队列长度的10倍
+ with ThreadPoolExecutor(max_workers=thread_num, max_queue_size=max_queue_size, thread_name_prefix="") as threadPool:
+ count = 0
+ start_time = time.time()
+ for message in consumer:
+ data = message.value
+ ip = data["ip"]
+ port = int(data["port"])
+ threadPool.submit(doh_probe, ip, port)
+ # print("成功从seed中消费数据:", ip, port)
+
+ count += 1
+ if count % print_num == 0:
+ end_time = time.time()
+ print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), end=" ")
+ print("CPU核数:", cpu_num, end=" ")
+ print("单核线程数:", thread_num, end=" ")
+ print("本轮完成的探测量为:", print_num, end=" ")
+ print("总耗时:", end_time - start_time, end=" ")
+ print("平均每秒探测量", print_num / (end_time - start_time))
+
+
+if __name__ == '__main__':
+ # 定义根目录,方便程序移植
+ root_dir = sys.path[0]
+
+ # 读取配置文件
+ config = configparser.ConfigParser()
+ config_file = os.path.join(root_dir, "config.ini")
+ config.read(config_file)
+
+ cpu_num = config.getint('DetectConfigs', 'cpu_num', fallback=1) # 程序对CPU要求不高,暂不考虑多进程
+ thread_num = config.getint('DetectConfigs', 'thread_num', fallback=32)
+ write_model = config.get('DetectConfigs', 'write_model')
+ if write_model == "csv":
+ save_filename = os.path.join(root_dir, config.get('DetectConfigs', 'save_filename'))
+ writer = csv.writer(open(save_filename, "w", newline=""))
+ elif write_model == "kafka":
+ writer = make_kafka_producer()
+ else:
+ print("error write model!")
+ sys.exit()
+
+ seed_read_model = config.get('DetectConfigs', 'seed_read_model')
+ if seed_read_model == "csv":
+ union_detect_num = config.getint('DetectConfigs', 'union_detect_num', fallback=100) # 单线程单次探测量
+ seed_filename = os.path.join(root_dir, config.get('DetectConfigs', 'filename'))
+ elif seed_read_model == "kafka":
+ union_detect_num = None
+ seed_filename = None
+ else:
+ print("error seed_read_model!")
+ sys.exit()
+
+ # 定义探测状态记录文件,用作记录探测进度,防止程序崩溃导致重复探测
+ save_status_path = os.path.join(root_dir, "./status")
+ save_status_file = os.path.join(save_status_path, "status.json")
+ if not os.path.exists(save_status_path):
+ os.mkdir(save_status_path)
+
+ lock = threading.Lock()
+ body = make_doh_request_body()
+
+ # producers = [None] * thread_num
+ print("开始探测:")
+ print("-----", "进程数:", cpu_num)
+ print("-----", "线程数:", thread_num)
+ if seed_read_model == "csv":
+ print("-----", "单线程探测量:", union_detect_num)
+ run_with_csv_model()
+ elif seed_read_model == "kafka":
+ run_with_kafka_model()
diff --git a/requires.txt b/requires.txt new file mode 100644 index 0000000..f2bd99e --- /dev/null +++ b/requires.txt @@ -0,0 +1,4 @@ +hyper
+dnspython
+kafka-python
+django
\ No newline at end of file diff --git a/util/__pycache__/thread_timeout.cpython-37.pyc b/util/__pycache__/thread_timeout.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..540637f --- /dev/null +++ b/util/__pycache__/thread_timeout.cpython-37.pyc diff --git a/util/__pycache__/thread_timeout.cpython-38.pyc b/util/__pycache__/thread_timeout.cpython-38.pyc Binary files differnew file mode 100644 index 0000000..18d132f --- /dev/null +++ b/util/__pycache__/thread_timeout.cpython-38.pyc diff --git a/util/__pycache__/util_http.cpython-37.pyc b/util/__pycache__/util_http.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..94a1b83 --- /dev/null +++ b/util/__pycache__/util_http.cpython-37.pyc diff --git a/util/__pycache__/util_http.cpython-38.pyc b/util/__pycache__/util_http.cpython-38.pyc Binary files differnew file mode 100644 index 0000000..1701ab5 --- /dev/null +++ b/util/__pycache__/util_http.cpython-38.pyc diff --git a/util/__pycache__/util_kafka.cpython-37.pyc b/util/__pycache__/util_kafka.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..aec748a --- /dev/null +++ b/util/__pycache__/util_kafka.cpython-37.pyc diff --git a/util/__pycache__/util_kafka.cpython-38.pyc b/util/__pycache__/util_kafka.cpython-38.pyc Binary files differnew file mode 100644 index 0000000..a321295 --- /dev/null +++ b/util/__pycache__/util_kafka.cpython-38.pyc diff --git a/util/concurrent/__init__.py b/util/concurrent/__init__.py new file mode 100644 index 0000000..196d378 --- /dev/null +++ b/util/concurrent/__init__.py @@ -0,0 +1 @@ +# This directory is a Python package. diff --git a/util/concurrent/__pycache__/__init__.cpython-37.pyc b/util/concurrent/__pycache__/__init__.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..324fd91 --- /dev/null +++ b/util/concurrent/__pycache__/__init__.cpython-37.pyc diff --git a/util/concurrent/futures/__init__.py b/util/concurrent/futures/__init__.py new file mode 100644 index 0000000..8434fcf --- /dev/null +++ b/util/concurrent/futures/__init__.py @@ -0,0 +1,52 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan ([email protected])' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + BrokenExecutor, + Future, + Executor, + wait, + as_completed) + +__all__ = ( + 'FIRST_COMPLETED', + 'FIRST_EXCEPTION', + 'ALL_COMPLETED', + 'CancelledError', + 'TimeoutError', + 'BrokenExecutor', + 'Future', + 'Executor', + 'wait', + 'as_completed', + 'ProcessPoolExecutor', + 'ThreadPoolExecutor', +) + + +def __dir__(): + return __all__ + ('__author__', '__doc__') + + +def __getattr__(name): + global ProcessPoolExecutor, ThreadPoolExecutor + + if name == 'ProcessPoolExecutor': + from .process import ProcessPoolExecutor as pe + ProcessPoolExecutor = pe + return pe + + if name == 'ThreadPoolExecutor': + from .thread import ThreadPoolExecutor as te + ThreadPoolExecutor = te + return te + + raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/util/concurrent/futures/__pycache__/__init__.cpython-37.pyc b/util/concurrent/futures/__pycache__/__init__.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..c00c793 --- /dev/null +++ b/util/concurrent/futures/__pycache__/__init__.cpython-37.pyc diff --git a/util/concurrent/futures/__pycache__/_base.cpython-37.pyc b/util/concurrent/futures/__pycache__/_base.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..2d048ba --- /dev/null +++ b/util/concurrent/futures/__pycache__/_base.cpython-37.pyc diff --git a/util/concurrent/futures/__pycache__/process.cpython-37.pyc b/util/concurrent/futures/__pycache__/process.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..c7193a5 --- /dev/null +++ b/util/concurrent/futures/__pycache__/process.cpython-37.pyc diff --git a/util/concurrent/futures/__pycache__/thread.cpython-37.pyc b/util/concurrent/futures/__pycache__/thread.cpython-37.pyc Binary files differnew file mode 100644 index 0000000..5165f41 --- /dev/null +++ b/util/concurrent/futures/__pycache__/thread.cpython-37.pyc diff --git a/util/concurrent/futures/_base.py b/util/concurrent/futures/_base.py new file mode 100644 index 0000000..46d30a5 --- /dev/null +++ b/util/concurrent/futures/_base.py @@ -0,0 +1,630 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +__author__ = 'Brian Quinlan ([email protected])' + +import collections +import logging +import threading +import time + +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +# Logger for internal use by the futures package. +LOGGER = logging.getLogger("concurrent.futures") + +class Error(Exception): + """Base class for all future-related exceptions.""" + pass + +class CancelledError(Error): + """The Future was cancelled.""" + pass + +class TimeoutError(Error): + """The operation exceeded the given deadline.""" + pass + +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" + def __init__(self): + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + + def add_exception(self, future): + self.finished_futures.append(future) + + def add_cancelled(self, future): + self.finished_futures.append(future) + +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self, future): + super().add_result(future) + self.event.set() + + def add_exception(self, future): + super().add_exception(future) + self.event.set() + + def add_cancelled(self, future): + super().add_cancelled(future) + self.event.set() + +class _AllCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() + super().__init__() + + def _decrement_pending_calls(self): + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_result(self, future): + super().add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super().add_exception(future) + if self.stop_on_exception: + self.event.set() + else: + self._decrement_pending_calls() + + def add_cancelled(self, future): + super().add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). If any given Futures are duplicated, they will be returned + once. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ + if timeout is not None: + end_time = timeout + time.monotonic() + + fs = set(fs) + total_futures = len(fs) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = fs - finished + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + finished = list(finished) + try: + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs,)) + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.monotonic() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), total_futures)) + + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + # reverse to keep finishing order + finished.reverse() + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)) + + finally: + # Remove waiter from unfinished futures + for f in fs: + with f._condition: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + with f._condition: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<%s at %#x state=%s raised %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<%s at %#x state=%s returned %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<%s at %#x state=%s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True + + def cancelled(self): + """Return True if the future was cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + """Return True if the future is currently executing.""" + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + try: + fn(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. + + Should only be used by Executor implementations and unit tests. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + """ + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self), + self._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() + +class Executor(object): + """This is an abstract base class for concrete asynchronous executors.""" + + def submit(*args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + if len(args) >= 2: + pass + elif not args: + raise TypeError("descriptor 'submit' of 'Executor' object " + "needs an argument") + elif 'fn' not in kwargs: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) + + raise NotImplementedError() + + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: The size of the chunks the iterable will be broken into + before being passed to a child process. This argument is only + used by ProcessPoolExecutor; it is ignored by + ThreadPoolExecutor. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if timeout is not None: + end_time = timeout + time.monotonic() + + fs = [self.submit(fn, *args) for args in zip(*iterables)] + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + yield fs.pop().result() + else: + yield fs.pop().result(end_time - time.monotonic()) + finally: + for future in fs: + future.cancel() + return result_iterator() + + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + """ + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False + + +class BrokenExecutor(RuntimeError): + """ + Raised when a executor has become non-functional after a severe failure. + """ diff --git a/util/concurrent/futures/process.py b/util/concurrent/futures/process.py new file mode 100644 index 0000000..9106552 --- /dev/null +++ b/util/concurrent/futures/process.py @@ -0,0 +1,699 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Result Q" +""" + +__author__ = 'Brian Quinlan ([email protected])' + +import atexit +import os +from concurrent.futures import _base +import queue +from queue import Full +import multiprocessing as mp +from multiprocessing.connection import wait +from multiprocessing.queues import Queue +import threading +import weakref +from functools import partial +import itertools +import sys +import traceback + +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpreter shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + +_threads_wakeups = weakref.WeakKeyDictionary() +_global_shutdown = False + + +class _ThreadWakeup: + def __init__(self): + self._reader, self._writer = mp.Pipe(duplex=False) + + def close(self): + self._writer.close() + self._reader.close() + + def wakeup(self): + self._writer.send_bytes(b"") + + def clear(self): + while self._reader.poll(): + self._reader.recv_bytes() + + +def _python_exit(): + global _global_shutdown + _global_shutdown = True + items = list(_threads_wakeups.items()) + for _, thread_wakeup in items: + thread_wakeup.wakeup() + for t, _ in items: + t.join() + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + + +# On Windows, WaitForMultipleObjects is used to wait for processes to finish. +# It can wait on, at most, 63 objects. There is an overhead of two objects: +# - the result queue reader +# - the thread wakeup reader +_MAX_WINDOWS_WORKERS = 63 - 2 + +# Hack to embed stringification of remote traceback in local traceback + +class _RemoteTraceback(Exception): + def __init__(self, tb): + self.tb = tb + def __str__(self): + return self.tb + +class _ExceptionWithTraceback: + def __init__(self, exc, tb): + tb = traceback.format_exception(type(exc), exc, tb) + tb = ''.join(tb) + self.exc = exc + self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): + return _rebuild_exc, (self.exc, self.tb) + +def _rebuild_exc(exc, tb): + exc.__cause__ = _RemoteTraceback(tb) + return exc + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + + +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated. In this case, + # the queue_manager_thread fails all work_items with BrokenProcessPool + if work_item is not None: + work_item.future.set_exception(e) + else: + super()._on_queue_feeder_error(e, obj) + + +def _get_chunks(*iterables, chunksize): + """ Iterates over zip()ed iterables in chunks. """ + it = zip(*iterables) + while True: + chunk = tuple(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + + +def _sendback_result(result_queue, work_id, result=None, exception=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc)) + + +def _process_worker(call_queue, result_queue, initializer, initargs): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a separate process. + + Args: + call_queue: A ctx.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A ctx.Queue of _ResultItems that will written + to by the worker. + initializer: A callable initializer, or None + initargs: A tuple of args for the initializer + """ + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + # The parent will notice that the process stopped and + # mark the pool broken + return + while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(os.getpid()) + return + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + _sendback_result(result_queue, call_item.work_id, exception=exc) + else: + _sendback_result(result_queue, call_item.work_id, result=r) + + # Liberate the resource as soon as possible, to avoid holding onto + # open files or shared memory that is not needed anymore + del call_item + + +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ + while True: + if call_queue.full(): + return + try: + work_id = work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del pending_work_items[work_id] + continue + + +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue, + thread_wakeup): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the ctx.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A ctx.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A ctx.SimpleQueue of _ResultItems generated by the + process workers. + thread_wakeup: A _ThreadWakeup to allow waking up the + queue_manager_thread from the main Thread and avoid deadlocks + caused by permanently locked queues. + """ + executor = None + + def shutting_down(): + return (_global_shutdown or executor is None + or executor._shutdown_thread) + + def shutdown_worker(): + # This is an upper bound on the number of children alive. + n_children_alive = sum(p.is_alive() for p in processes.values()) + n_children_to_stop = n_children_alive + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: + for i in range(n_children_to_stop - n_sentinels_sent): + try: + call_queue.put_nowait(None) + n_sentinels_sent += 1 + except Full: + break + n_children_alive = sum(p.is_alive() for p in processes.values()) + + # Release the queue's resources as soon as possible. + call_queue.close() + # If .join() is not called on the created processes then + # some ctx.Queue methods may deadlock on Mac OS X. + for p in processes.values(): + p.join() + + result_reader = result_queue._reader + wakeup_reader = thread_wakeup._reader + readers = [result_reader, wakeup_reader] + + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + + # Wait for a result to be ready in the result_queue while checking + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. + worker_sentinels = [p.sentinel for p in processes.values()] + ready = wait(readers + worker_sentinels) + + cause = None + is_broken = True + if result_reader in ready: + try: + result_item = result_reader.recv() + is_broken = False + except BaseException as e: + cause = traceback.format_exception(type(e), e, e.__traceback__) + + elif wakeup_reader in ready: + is_broken = False + result_item = None + thread_wakeup.clear() + if is_broken: + # Mark the process pool broken so that submits fail right now. + executor = executor_reference() + if executor is not None: + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') + executor._shutdown_thread = True + executor = None + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") + # All futures in flight must be marked failed + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception(bpe) + # Delete references to object. See issue16284 + del work_item + pending_work_items.clear() + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in processes.values(): + p.terminate() + shutdown_worker() + return + if isinstance(result_item, int): + # Clean shutdown of a worker using its PID + # (avoids marking the executor broken) + assert shutting_down() + p = processes.pop(result_item) + p.join() + if not processes: + shutdown_worker() + return + elif result_item is not None: + work_item = pending_work_items.pop(result_item.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Delete references to object. See issue16284 + del work_item + # Delete reference to result_item + del result_item + + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if shutting_down(): + try: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown_thread = True + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_worker() + return + except Full: + # This is not a problem: we will eventually be woken up (in + # result_queue.get()) and be able to send a sentinel again. + pass + executor = None + + +_system_limits_checked = False +_system_limited = None + + +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermined limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) + raise NotImplementedError(_system_limited) + + +def _chain_from_iterable_of_lists(iterable): + """ + Specialized implementation of itertools.chain.from_iterable. + Each item in *iterable* should be a list. This function is + careful not to keep references to yielded objects. + """ + for element in iterable: + element.reverse() + while element: + yield element.pop() + + +class BrokenProcessPool(_base.BrokenExecutor): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None, mp_context=None, + initializer=None, initargs=()): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + mp_context: A multiprocessing context to launch the workers. This + object should provide SimpleQueue, Queue and Process. + initializer: A callable used to initialize worker processes. + initargs: A tuple of arguments to pass to the initializer. + """ + _check_system_limits() + + if max_workers is None: + self._max_workers = os.cpu_count() or 1 + if sys.platform == 'win32': + self._max_workers = min(_MAX_WINDOWS_WORKERS, + self._max_workers) + else: + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + elif (sys.platform == 'win32' and + max_workers > _MAX_WINDOWS_WORKERS): + raise ValueError( + f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") + + self._max_workers = max_workers + + if mp_context is None: + mp_context = mp.get_context() + self._mp_context = mp_context + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._initializer = initializer + self._initargs = initargs + + # Management thread + self._queue_management_thread = None + + # Map of pids to processes + self._processes = {} + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._broken = False + self._queue_count = 0 + self._pending_work_items = {} + + # Create communication channels for the executor + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + queue_size = self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + self._result_queue = mp_context.SimpleQueue() + self._work_ids = queue.Queue() + + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() + + def _start_queue_management_thread(self): + if self._queue_management_thread is None: + # When the executor gets garbarge collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self._queue_management_thread_wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + thread_wakeup.wakeup() + # Start the processes so that their sentinels are known. + self._adjust_process_count() + self._queue_management_thread = threading.Thread( + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue, + self._queue_management_thread_wakeup), + name="QueueManagerThread") + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + _threads_wakeups[self._queue_management_thread] = \ + self._queue_management_thread_wakeup + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs)) + p.start() + self._processes[p.pid] = p + + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + elif not args: + raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object " + "needs an argument") + elif 'fn' in kwargs: + fn = kwargs.pop('fn') + self, *args = args + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) + + with self._shutdown_lock: + if self._broken: + raise BrokenProcessPool(self._broken) + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + if _global_shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + # Wake up queue management thread + self._queue_management_thread_wakeup.wakeup() + + self._start_queue_management_thread() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize < 1: + raise ValueError("chunksize must be >= 1.") + + results = super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout) + return _chain_from_iterable_of_lists(results) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown_thread = True + if self._queue_management_thread: + # Wake up queue management thread + self._queue_management_thread_wakeup.wakeup() + if wait: + self._queue_management_thread.join() + # To reduce the risk of opening too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + if self._call_queue is not None: + self._call_queue.close() + if wait: + self._call_queue.join_thread() + self._call_queue = None + self._result_queue = None + self._processes = None + + if self._queue_management_thread_wakeup: + self._queue_management_thread_wakeup.close() + self._queue_management_thread_wakeup = None + + shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +atexit.register(_python_exit) diff --git a/util/concurrent/futures/thread.py b/util/concurrent/futures/thread.py new file mode 100644 index 0000000..c773c24 --- /dev/null +++ b/util/concurrent/futures/thread.py @@ -0,0 +1,226 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan ([email protected])' + +import atexit +from concurrent.futures import _base +import itertools +import queue +import threading +import weakref +import os + +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpreter shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + + +atexit.register(_python_exit) + + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as exc: + self.future.set_exception(exc) + # Break a reference cycle with the exception 'exc' + self = None + else: + self.future.set_result(result) + + +def _worker(executor_reference, work_queue, initializer, initargs): + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + executor = executor_reference() + if executor is not None: + executor._initializer_failed() + return + try: + while True: + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + # Delete references to object. See issue16284 + del work_item + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True + # Notice other workers + work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) + + +class BrokenThreadPool(_base.BrokenExecutor): + """ + Raised when a worker thread in a ThreadPoolExecutor failed initializing. + """ + + +class ThreadPoolExecutor(_base.Executor): + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().__next__ + + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=(), max_queue_size=None): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. + """ + if max_workers is None: + # Use this number because ThreadPoolExecutor is often + # used to overlap I/O instead of CPU work. + max_workers = (os.cpu_count() or 1) * 5 + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + + self._max_workers = max_workers + if not max_queue_size: + self._work_queue = queue.Queue() + else: + self._work_queue = queue.Queue(self._max_workers * 2) + # print(max_queue_size) + self._threads = set() + self._broken = False + self._shutdown = False + self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) + self._initializer = initializer + self._initargs = initargs + + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + elif not args: + raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object " + "needs an argument") + elif 'fn' in kwargs: + fn = kwargs.pop('fn') + self, *args = args + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args) - 1)) + + with self._shutdown_lock: + if self._broken: + raise BrokenThreadPool(self._broken) + + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + num_threads = len(self._threads) + # print("num_threads", num_threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs)) + t.daemon = True + t.start() + self._threads.add(t) + # _threads_queues[t] = self._work_queue + + def _initializer_failed(self): + with self._shutdown_lock: + self._broken = ('A thread initializer failed, the thread pool ' + 'is not usable anymore') + # Drain work queue and mark pending futures failed + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception(BrokenThreadPool(self._broken)) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join() + + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/util/get_cert.py b/util/get_cert.py new file mode 100644 index 0000000..2bfb60c --- /dev/null +++ b/util/get_cert.py @@ -0,0 +1,118 @@ +import time
+from hyper import HTTP20Connection, HTTP11Connection
+from util.thread_timeout import time_limited
+from main import TIMEOUT, UN_CONNECTED, CONNECTED_HTTP1_1, CONNECTED_HTTP2_0, NOT_SUPPORT_FOR_HTTP2, \
+ make_doh_request_body, CERTIFICATE_VERIFY_FAILED
+import threading
+from util.util_http import make_ssl_context
+import ssl, socket, sys, csv
+from hyper.http20.exceptions import ConnectionError
+from concurrent.futures import ThreadPoolExecutor
+import os
+
+root_dir = sys.path[0]
+
+# %Y-%m-%d %H:%M:%S
+datetime_str = time.strftime('%y%m%d%H', time.localtime(time.time()))
+save_dir = "./verify"
+if not os.path.exists(save_dir):
+ os.mkdir(save_dir)
+save_file = os.path.join(root_dir, save_dir, f"{datetime_str}.csv")
+# f_verify = csv.writer(open(f"./verify/{datetime_str}.csv", "w", newline=""))
+f = open(save_file, "w", newline="")
+f_verify = csv.writer(f)
+# csv.writer(f_verify)
+
+lock = threading.Lock()
+
+body = make_doh_request_body()
+
+
+@time_limited(TIMEOUT * 10)
+def doh_verify(ip, port):
+ # print(ip, port, host, path, method)
+ verify_mode = ssl.CERT_OPTIONAL
+
+ ctx_2_0 = make_ssl_context()
+ ctx_2_0.set_alpn_protocols(['h2'])
+ # ctx_2_0.verify_mode = ssl.CERT_NONE
+ conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
+ conn_status = UN_CONNECTED
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP2_0
+ # print("成功建立HTTP2.0连接!")
+ except (AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError, OSError,
+ ssl.SSLCertVerificationError, ssl.SSLError,
+ socket.timeout,
+ ConnectionError, # hyper error
+ ) as e:
+ if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1
+ # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
+ ...
+ elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE
+ # print("证书原因导致连接建立失败,设置sslContext为CERT_NONE")
+ ctx_2_0.verify_mode = ssl.CERT_NONE
+ verify_mode = ssl.CERT_NONE
+ conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0)
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP2_0
+ # print("成功建立HTTP2.0连接!")
+ except (
+ AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError,
+ OSError,
+ ssl.SSLCertVerificationError, ssl.SSLError,
+ socket.timeout,
+ ConnectionError, # hyper error
+ ) as e:
+ # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1")
+ ...
+ else:
+ # print("建立连接失败!错误信息如下:", e)
+ return False
+
+ if conn_status == UN_CONNECTED:
+ ctx_1_1 = make_ssl_context()
+ ctx_1_1.verify_mode = verify_mode
+ conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1)
+ try:
+ conn.connect()
+ conn_status = CONNECTED_HTTP1_1
+ # print("成功建立HTTP1.1连接!")
+ except (TimeoutError, ConnectionRefusedError, FileNotFoundError, ConnectionAbortedError,
+ ssl.SSLCertVerificationError, ssl.SSLError,
+ socket.timeout, OSError
+ ) as e: # 如使用HTTP1.1仍然无法建立连接,则放弃
+ # print("建立连接失败!错误信息如下:", e)
+ return False
+
+ try:
+ cert = conn._sock.getpeercert()
+ except AttributeError:
+ cert = None
+ lock.acquire()
+ try:
+ f_verify.writerow([ip, cert.__str__()])
+ except UnicodeEncodeError:
+ print(ip, cert.__str__())
+ lock.release()
+
+
+def run_with_thread_pool(max_workers, data_group, fn):
+ # print("Before ThreadPool!")
+ with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="") as threadPool:
+ for data in data_group:
+ threadPool.submit(fn, *data)
+
+
+if __name__ == '__main__':
+ data_group = []
+ with open("./suspect/no_cert_ips.csv") as fp:
+ csv_reader = csv.reader(fp)
+ for row in csv_reader:
+ ip = row[0]
+ data_group.append((ip, 443))
+ print(len(data_group))
+ run_with_thread_pool(64, data_group, doh_verify)
+ f.close()
diff --git a/util/thread_timeout.py b/util/thread_timeout.py new file mode 100644 index 0000000..9a2637c --- /dev/null +++ b/util/thread_timeout.py @@ -0,0 +1,65 @@ +from threading import Thread
+import inspect
+import ctypes
+from functools import wraps
+
+
+def _async_raise(tid, exctype):
+ """raises the exception, performs cleanup if needed"""
+ tid = ctypes.c_long(tid)
+ if not inspect.isclass(exctype):
+ exctype = type(exctype)
+ res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype))
+ if res == 0:
+ raise ValueError("invalid thread id")
+ elif res != 1:
+ # """if it returns a number greater than one, you're in trouble,
+ # and you should call it again with exc=NULL to revert the effect"""
+ ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None)
+ raise SystemError("PyThreadState_SetAsyncExc failed")
+
+
+def stop_thread(thread):
+ _async_raise(thread.ident, SystemExit)
+
+
+class TimeoutException(Exception):
+ # print("timeout!")
+ pass
+
+
+ThreadStop = stop_thread
+
+
+def time_limited(timeout):
+ def decorator(function):
+ @wraps(function)
+ def wrapped_function(*args, **kwargs):
+ class TimeLimited(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.error = None
+ self.result = None
+
+ def run(self):
+ self.result = function(*args, **kwargs)
+
+ def stop(self):
+ if self.is_alive():
+ ThreadStop(self)
+
+ t = TimeLimited()
+ t.start()
+ t.join(timeout)
+ if isinstance(t.error, TimeoutException):
+ t.stop()
+ raise TimeoutException('timeout for %s' % (repr(function)))
+ if t.is_alive():
+ t.stop()
+ raise TimeoutException('timeout for %s' % (repr(function)))
+ if t.error is None:
+ return t.result
+
+ return wrapped_function
+
+ return decorator
diff --git a/util/util_http.py b/util/util_http.py new file mode 100644 index 0000000..853b351 --- /dev/null +++ b/util/util_http.py @@ -0,0 +1,45 @@ +from dns.message import make_query, from_wire
+import ssl
+
+
+def make_ssl_context(check_cert=False):
+ ssl_ctx = ssl.create_default_context()
+ ssl_ctx.check_hostname = False
+ if check_cert:
+ ssl_ctx.verify_mode = ssl.CERT_REQUIRED
+ else:
+ # ssl_ctx.verify_mode = ssl.CERT_NONE
+ ssl_ctx.verify_mode = ssl.CERT_OPTIONAL
+ # ssl_ctx.minimum_version = ssl.TLSVersion.TLSv1_1
+ return ssl_ctx
+
+
+def build_dns_query(qname="example.com", rtype="A"):
+ dns_query = make_query(
+ qname=qname,
+ rdtype=rtype,
+ want_dnssec=False,
+ )
+ return dns_query
+
+
+def get_domain_from_cert(cert):
+ cert_info_domain_related = []
+
+ if not cert:
+ return cert_info_domain_related
+
+ for key, value in cert.items():
+ if key == "subject":
+ for (sub_key, sub_value), in value:
+ if sub_key == "commonName":
+ # print(sub_value)
+ cert_info_domain_related.append(sub_value)
+ if key == "subjectAltName":
+ for (sub_key, sub_value) in value:
+ cert_info_domain_related.append(sub_value)
+ return list(set(cert_info_domain_related))
+
+# def build_dns_query_wireformat(qname="example.com", rtype="A"):
+# dns_query = build_dns_query()
+# return dns_query.to_wire()
diff --git a/util/util_kafka.py b/util/util_kafka.py new file mode 100644 index 0000000..07e5302 --- /dev/null +++ b/util/util_kafka.py @@ -0,0 +1,27 @@ +from kafka import KafkaProducer, KafkaConsumer
+import json
+
+kafka_topic_suc = 'sucInfo'
+kafka_topic_fail = 'failInfo'
+kafka_topic_cert = 'certInfo'
+broker_list = ['0.kafka.my:9092', '1.kafka.my:9092']
+
+seed_kafka_topic = "seed"
+seed_consumer_group_id = "hyConsumer"
+seed_broker_list = ["172.19.0.6:9092"]
+
+
+def make_kafka_producer():
+ return KafkaProducer(
+ bootstrap_servers=broker_list,
+ value_serializer=lambda v: json.dumps(v).encode('utf-8'),
+ )
+
+
+def make_seed_consumer():
+ return KafkaConsumer(seed_kafka_topic,
+ group_id=seed_consumer_group_id,
+ auto_commit_interval_ms=1000,
+ bootstrap_servers=seed_broker_list,
+ value_deserializer=lambda v: json.loads(v.decode('utf-8'))
+ )
|
