From 8533fa17cfe826361abfc59ba2bf513091f10637 Mon Sep 17 00:00:00 2001 From: nanwct <674129850@qq.com> Date: Thu, 19 May 2022 14:55:02 +0800 Subject: abc --- .idea/.gitignore | 3 + .idea/inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 4 + .idea/modules.xml | 8 + .idea/quickScan.iml | 15 + __pycache__/main.cpython-38.pyc | Bin 0 -> 10347 bytes config.ini | 13 + main.py | 551 ++++++++++++++++ requires.txt | 4 + util/__pycache__/thread_timeout.cpython-37.pyc | Bin 0 -> 2618 bytes util/__pycache__/thread_timeout.cpython-38.pyc | Bin 0 -> 2615 bytes util/__pycache__/util_http.cpython-37.pyc | Bin 0 -> 1113 bytes util/__pycache__/util_http.cpython-38.pyc | Bin 0 -> 1084 bytes util/__pycache__/util_kafka.cpython-37.pyc | Bin 0 -> 1219 bytes util/__pycache__/util_kafka.cpython-38.pyc | Bin 0 -> 719 bytes util/concurrent/__init__.py | 1 + .../concurrent/__pycache__/__init__.cpython-37.pyc | Bin 0 -> 113 bytes util/concurrent/futures/__init__.py | 52 ++ .../futures/__pycache__/__init__.cpython-37.pyc | Bin 0 -> 1053 bytes .../futures/__pycache__/_base.cpython-37.pyc | Bin 0 -> 21151 bytes .../futures/__pycache__/process.cpython-37.pyc | Bin 0 -> 19960 bytes .../futures/__pycache__/thread.cpython-37.pyc | Bin 0 -> 5496 bytes util/concurrent/futures/_base.py | 630 +++++++++++++++++++ util/concurrent/futures/process.py | 699 +++++++++++++++++++++ util/concurrent/futures/thread.py | 226 +++++++ util/get_cert.py | 118 ++++ util/thread_timeout.py | 65 ++ util/util_http.py | 45 ++ util/util_kafka.py | 27 + 29 files changed, 2467 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/quickScan.iml create mode 100644 __pycache__/main.cpython-38.pyc create mode 100644 config.ini create mode 100644 main.py create mode 100644 requires.txt create mode 100644 util/__pycache__/thread_timeout.cpython-37.pyc create mode 100644 util/__pycache__/thread_timeout.cpython-38.pyc create mode 100644 util/__pycache__/util_http.cpython-37.pyc create mode 100644 util/__pycache__/util_http.cpython-38.pyc create mode 100644 util/__pycache__/util_kafka.cpython-37.pyc create mode 100644 util/__pycache__/util_kafka.cpython-38.pyc create mode 100644 util/concurrent/__init__.py create mode 100644 util/concurrent/__pycache__/__init__.cpython-37.pyc create mode 100644 util/concurrent/futures/__init__.py create mode 100644 util/concurrent/futures/__pycache__/__init__.cpython-37.pyc create mode 100644 util/concurrent/futures/__pycache__/_base.cpython-37.pyc create mode 100644 util/concurrent/futures/__pycache__/process.cpython-37.pyc create mode 100644 util/concurrent/futures/__pycache__/thread.cpython-37.pyc create mode 100644 util/concurrent/futures/_base.py create mode 100644 util/concurrent/futures/process.py create mode 100644 util/concurrent/futures/thread.py create mode 100644 util/get_cert.py create mode 100644 util/thread_timeout.py create mode 100644 util/util_http.py create mode 100644 util/util_kafka.py diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..eaf91e2 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..abf7b39 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..ec86cd7 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/quickScan.iml b/.idea/quickScan.iml new file mode 100644 index 0000000..d1bb9af --- /dev/null +++ b/.idea/quickScan.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/__pycache__/main.cpython-38.pyc b/__pycache__/main.cpython-38.pyc new file mode 100644 index 0000000..00ce3b9 Binary files /dev/null and b/__pycache__/main.cpython-38.pyc differ diff --git a/config.ini b/config.ini new file mode 100644 index 0000000..019ddc2 --- /dev/null +++ b/config.ini @@ -0,0 +1,13 @@ +[DetectConfigs] +cpu_num = 1 +thread_num = 32 +;csv or kafka +seed_read_model = kafka +;only seed_read_model=csv this parameter need +union_detect_num = 100 +;only seed_read_model=csv this parameter need +filename = ./data/443.csv +;csv or kafka +write_model = kafka +;only write_model=csv this parameter need +save_filename = ./result.csv diff --git a/main.py b/main.py new file mode 100644 index 0000000..20f21bc --- /dev/null +++ b/main.py @@ -0,0 +1,551 @@ +from util.util_http import build_dns_query, make_ssl_context, get_domain_from_cert +from dns.message import from_wire +import dns +from dns.name import BadLabelType +from dns.exception import FormError +import time +import configparser +import pandas as pd +from util.util_kafka import make_kafka_producer, kafka_topic_fail, kafka_topic_cert, kafka_topic_suc, make_seed_consumer +from hyper import HTTP20Connection, HTTP11Connection +from hyper.http20.exceptions import ConnectionError, StreamResetError +import socket, h2.exceptions, ssl +from util.concurrent.futures import ThreadPoolExecutor +import threading +from util.thread_timeout import time_limited +import os +import json +from hyper.http20.response import HTTPHeaderMap +from hyper.http11.parser import ParseError +import zlib +import sys +import csv +from _csv import writer + +# 定义except的错误信息,遇到这类错误需要特例处理 +NOT_SUPPORT_FOR_HTTP2 = "No suitable protocol found" +CERTIFICATE_VERIFY_FAILED = "[SSL: CERTIFICATE_VERIFY_FAILED]" + +# HTTP连接类型 +UN_CONNECTED = 0 +CONNECTED_HTTP1_1 = 1 +CONNECTED_HTTP2_0 = 2 + +# 超时基数 +TIMEOUT = 2 + + +def write_data(data: dict, kafka_topic: str = ""): + lock.acquire() + if write_model == "csv": + data["status"] = kafka_topic + writer.writerow([data.__str__()]) + else: + try: + writer.send(kafka_topic, data) + except Exception as e: + ... + lock.release() + + +def make_doh_request_body(q_name="baidu.com"): + dns_query = build_dns_query(q_name) + dns_query_wire_format = dns_query.to_wire() + return dns_query_wire_format + + +def make_doh_request_header(hostname=None, conn_status=CONNECTED_HTTP2_0): + headers = { + "content-type": "application/dns-message", + "accept": "application/dns-message", + "user-agent": 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36 ' + } + if hostname: + if conn_status == CONNECTED_HTTP2_0: + headers[":authority"] = hostname + elif conn_status == CONNECTED_HTTP1_1: + headers["host"] = hostname + else: + ... + return headers + + +def decode_header(header: HTTPHeaderMap): + # print(header.__str__()) + ret = {} + for key, value in header.items(): + key = key.decode() + try: + value = value.decode() + except UnicodeDecodeError: + key = "UnicodeDecodeError" + if key not in ret.keys(): # 解决单key,多value情况 + ret[key] = value + else: + ret[key] += (";" + value) + return ret + + +def probe_http2_0(conn: HTTP20Connection, host_group, path_group, methods): + stream_ids = {} + for hostname in host_group: + headers = make_doh_request_header(hostname) + for path in path_group: + for method in methods: + if len(stream_ids.keys()) >= 100: # 单IP、port限制探测100次 + continue + if method == "GET": + full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path + try: + stream_id = conn.request(method=method, url=full_path, headers=headers) + except Exception as e: + continue + else: + try: + stream_id = conn.request(method=method, url=path, body=body, headers=headers) + except Exception as e: + continue + stream_ids[stream_id] = (hostname, path, method) + + for stream_id, conn_info in stream_ids.items(): + hostname, path, method = conn_info + try: + rep = conn.get_response(stream_id) + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP2_0, + "info": e.__str__(), + } + write_data(data, kafka_topic_fail) + continue + rep_code = rep.status + rep_header = decode_header(rep.headers) + try: + rep_body = rep.read() + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP2_0, + "status_code": rep_code, + "rep_header": rep_header, + "info": e.__str__() + } + write_data(data, kafka_topic_fail) + return + # print(rep_body) + try: + rep_body = from_wire(rep_body) + if rep_body: + # produce sucInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP2_0, + "status_code": rep_code, + "rep_header": rep_header, + "rep_body": rep_body.__str__() + } + # print(rep_body.__str__()) + write_data(data, kafka_topic_suc) + else: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP2_0, + "status_code": rep_code, + "rep_header": rep_header, + "info": rep_body.__str__() + } + write_data(data, kafka_topic_fail) + except Exception: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP2_0, + "status_code": rep_code, + "rep_header": rep_header, + "info": rep_body.__str__() + } + write_data(data, kafka_topic_fail) + + +def probe_http1_1(conn: HTTP11Connection, host_group, path_group, methods): + # start_time = time.time() + for hostname in host_group: + headers = make_doh_request_header(hostname, CONNECTED_HTTP1_1) + for path in path_group: + for method in methods: + if method == "GET": + full_path = "%s?dns=DUIBAAABAAAAAAAABWJhaWR1A2NvbQAAAQAB" % path + try: + conn.request(method=method, url=full_path, headers=headers) + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "info": e.__str__(), + } + write_data(data, kafka_topic_fail) + # print("GET方法执行失败,错误信息如下:", e.__str__) + continue # 此处直接 + else: + try: + conn.request(method=method, url=path, body=body, headers=headers) + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "info": e.__str__(), + } + write_data(data, kafka_topic_fail) + # print("POST方法执行失败,错误信息如下:", e.__str__) + continue + + try: + rep = conn.get_response() + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "info": e.__str__(), + } + write_data(data, kafka_topic_fail) + # print("get_response方法执行失败,错误信息如下:", e) + continue + rep_code = rep.status + rep_header = decode_header(rep.headers) + try: + rep_body = rep.read() + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "status_code": rep_code, + "rep_header": rep_header, + "rep_body": e.__str__() + } + write_data(data, kafka_topic_fail) + # print("read方法执行失败,错误信息如下:", e) + continue + # print(rep_body) + try: + rep_body = from_wire(rep_body) + # # print(hostname, path, method) + # # print(rep_body) + if rep_body: + # produce sucInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "status_code": rep_code, + "rep_header": rep_header, + "rep_body": rep_body.__str__() + } + # print(rep_body.__str__()) + write_data(data, kafka_topic_suc) + else: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "status_code": rep_code, + "rep_header": rep_header, + "info": None + } + write_data(data, kafka_topic_fail) + except Exception as e: + # produce failInfo to kafka + data = { + "ip": conn.host, + "port": conn.port, + "host": hostname, + "path": path, + "method": method, + "connect_type": CONNECTED_HTTP1_1, + "status_code": rep_code, + "rep_header": rep_header, + "info": e.__str__() + } + write_data(data, kafka_topic_fail) + # print("from_wire方法执行失败,错误信息如下:", e.__str__) + + +@time_limited(TIMEOUT * 10) +def doh_probe(ip, port, host_group=None, path_group=None): + verify_mode = ssl.CERT_OPTIONAL + ctx_2_0 = make_ssl_context() + ctx_2_0.set_alpn_protocols(['h2']) + conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0) + conn_status = UN_CONNECTED + try: + conn.connect() + conn_status = CONNECTED_HTTP2_0 + # print("成功建立HTTP2.0连接!") + except Exception as e: + if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1 + # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1") + ... + elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE + # print("证书原因导致连接建立失败,设置sslContext为CERT_NONE") + ctx_2_0.verify_mode = ssl.CERT_NONE + verify_mode = ssl.CERT_NONE + conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0) + try: + conn.connect() + conn_status = CONNECTED_HTTP2_0 + # print("成功建立HTTP2.0连接!") + except Exception as e: + # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1") + ... + else: + # print("建立连接失败!错误信息如下:", e) + # produce failInfo to kafka + data = { + "ip": ip, + "port": port, + "connect_type": UN_CONNECTED, + "info": e.__str__(), + } + write_data(data, kafka_topic_fail) + return + + if conn_status == UN_CONNECTED: + ctx_1_1 = make_ssl_context() + ctx_1_1.verify_mode = verify_mode + conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1) + try: + conn.connect() + conn_status = CONNECTED_HTTP1_1 + # print("成功建立HTTP1.1连接!") + except Exception as e: # 如使用HTTP1.1仍然无法建立连接,则放弃 + # print("建立连接失败!错误信息如下:", e) + # produce failInfo to kafka + data = { + "ip": ip, + "port": port, + "connect_type": UN_CONNECTED, + "info": e.__str__(), + } + write_data(data, kafka_topic_fail) + return + + try: + cert = conn._sock.getpeercert() + # produce certInfo to kafka + data = { + "ip": ip, + "port": port, + "certificate": cert.__str__() + } + write_data(data, kafka_topic_cert) + except Exception: + cert = None + + if host_group: + host_group += get_domain_from_cert(cert) + else: + host_group = get_domain_from_cert(cert) + host_group += [None, ip] # host_group中至少包含None和原始IP地址 + host_group = list(set(host_group)) # 去重 + + default_path_group = ["/dns-query", "/", "/resolve", "/doh", "/doh/family-filter", + "/doh/secure-filter", "/query", "/ads", "/uncensored", "adblock"] + if path_group: + path_group += default_path_group + path_group = list(set(path_group)) # 去重 + else: + path_group = default_path_group + + methods = ["POST", "GET"] + + if conn_status == CONNECTED_HTTP2_0: + probe_http2_0(conn, host_group, path_group, methods) + elif conn_status == CONNECTED_HTTP1_1: + probe_http1_1(conn, host_group, path_group, methods) + else: + return + + try: + conn.close() + except Exception as e: + ... + + +def save_status(count): + data = {"count": count} + with open(save_status_file, "w") as fp: + json.dump(data, fp) + + +def load_status(): + if not os.path.exists(save_status_file): + return 0 + with open(save_status_file, "r") as fp: + a = json.load(fp) + return a["count"] + + +def run_with_csv_model(): + chunks = pd.read_csv(seed_filename, iterator=True) + count = cpu_num * thread_num * union_detect_num + save_point = load_status() + count_scan = 0 + while True: + print(1) + start_time = time.time() + data_group = [] + try: + chunk_data = chunks.get_chunk(count) + except StopIteration: # 尾部数据丢失 + break + for data in chunk_data.itertuples(): + if count_scan < save_point: # 表示种子已经被探测过 + count_scan += 1 + continue + ip = data.saddr + port = data.sport + data_group.append((ip, port)) + count_scan += 1 + + with ThreadPoolExecutor(max_workers=thread_num, thread_name_prefix="") as threadPool: + for data in data_group: + threadPool.submit(doh_probe, *data) + + if count_scan > save_point: + save_status(count_scan) + + if len(data_group) != 0: + end_time = time.time() + print(time.strftime('%y%m%d%H', time.localtime(time.time()))) + print("CPU核数:", cpu_num, end=" ") + print("单核线程数:", thread_num, end=" ") + print("本轮完成的探测量为:", len(data_group), end=" ") + print("总耗时:", end_time - start_time, end=" ") + print("平均每秒探测量", len(data_group) / (end_time - start_time)) + + +def run_with_kafka_model(): + consumer = make_seed_consumer() + print_num = 10000 # 扫描print_num个IP后打印输出 + max_queue_size = thread_num * 10 # 任务队列的长度是线程队列长度的10倍 + with ThreadPoolExecutor(max_workers=thread_num, max_queue_size=max_queue_size, thread_name_prefix="") as threadPool: + count = 0 + start_time = time.time() + for message in consumer: + data = message.value + ip = data["ip"] + port = int(data["port"]) + threadPool.submit(doh_probe, ip, port) + # print("成功从seed中消费数据:", ip, port) + + count += 1 + if count % print_num == 0: + end_time = time.time() + print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())), end=" ") + print("CPU核数:", cpu_num, end=" ") + print("单核线程数:", thread_num, end=" ") + print("本轮完成的探测量为:", print_num, end=" ") + print("总耗时:", end_time - start_time, end=" ") + print("平均每秒探测量", print_num / (end_time - start_time)) + + +if __name__ == '__main__': + # 定义根目录,方便程序移植 + root_dir = sys.path[0] + + # 读取配置文件 + config = configparser.ConfigParser() + config_file = os.path.join(root_dir, "config.ini") + config.read(config_file) + + cpu_num = config.getint('DetectConfigs', 'cpu_num', fallback=1) # 程序对CPU要求不高,暂不考虑多进程 + thread_num = config.getint('DetectConfigs', 'thread_num', fallback=32) + write_model = config.get('DetectConfigs', 'write_model') + if write_model == "csv": + save_filename = os.path.join(root_dir, config.get('DetectConfigs', 'save_filename')) + writer = csv.writer(open(save_filename, "w", newline="")) + elif write_model == "kafka": + writer = make_kafka_producer() + else: + print("error write model!") + sys.exit() + + seed_read_model = config.get('DetectConfigs', 'seed_read_model') + if seed_read_model == "csv": + union_detect_num = config.getint('DetectConfigs', 'union_detect_num', fallback=100) # 单线程单次探测量 + seed_filename = os.path.join(root_dir, config.get('DetectConfigs', 'filename')) + elif seed_read_model == "kafka": + union_detect_num = None + seed_filename = None + else: + print("error seed_read_model!") + sys.exit() + + # 定义探测状态记录文件,用作记录探测进度,防止程序崩溃导致重复探测 + save_status_path = os.path.join(root_dir, "./status") + save_status_file = os.path.join(save_status_path, "status.json") + if not os.path.exists(save_status_path): + os.mkdir(save_status_path) + + lock = threading.Lock() + body = make_doh_request_body() + + # producers = [None] * thread_num + print("开始探测:") + print("-----", "进程数:", cpu_num) + print("-----", "线程数:", thread_num) + if seed_read_model == "csv": + print("-----", "单线程探测量:", union_detect_num) + run_with_csv_model() + elif seed_read_model == "kafka": + run_with_kafka_model() diff --git a/requires.txt b/requires.txt new file mode 100644 index 0000000..f2bd99e --- /dev/null +++ b/requires.txt @@ -0,0 +1,4 @@ +hyper +dnspython +kafka-python +django \ No newline at end of file diff --git a/util/__pycache__/thread_timeout.cpython-37.pyc b/util/__pycache__/thread_timeout.cpython-37.pyc new file mode 100644 index 0000000..540637f Binary files /dev/null and b/util/__pycache__/thread_timeout.cpython-37.pyc differ diff --git a/util/__pycache__/thread_timeout.cpython-38.pyc b/util/__pycache__/thread_timeout.cpython-38.pyc new file mode 100644 index 0000000..18d132f Binary files /dev/null and b/util/__pycache__/thread_timeout.cpython-38.pyc differ diff --git a/util/__pycache__/util_http.cpython-37.pyc b/util/__pycache__/util_http.cpython-37.pyc new file mode 100644 index 0000000..94a1b83 Binary files /dev/null and b/util/__pycache__/util_http.cpython-37.pyc differ diff --git a/util/__pycache__/util_http.cpython-38.pyc b/util/__pycache__/util_http.cpython-38.pyc new file mode 100644 index 0000000..1701ab5 Binary files /dev/null and b/util/__pycache__/util_http.cpython-38.pyc differ diff --git a/util/__pycache__/util_kafka.cpython-37.pyc b/util/__pycache__/util_kafka.cpython-37.pyc new file mode 100644 index 0000000..aec748a Binary files /dev/null and b/util/__pycache__/util_kafka.cpython-37.pyc differ diff --git a/util/__pycache__/util_kafka.cpython-38.pyc b/util/__pycache__/util_kafka.cpython-38.pyc new file mode 100644 index 0000000..a321295 Binary files /dev/null and b/util/__pycache__/util_kafka.cpython-38.pyc differ diff --git a/util/concurrent/__init__.py b/util/concurrent/__init__.py new file mode 100644 index 0000000..196d378 --- /dev/null +++ b/util/concurrent/__init__.py @@ -0,0 +1 @@ +# This directory is a Python package. diff --git a/util/concurrent/__pycache__/__init__.cpython-37.pyc b/util/concurrent/__pycache__/__init__.cpython-37.pyc new file mode 100644 index 0000000..324fd91 Binary files /dev/null and b/util/concurrent/__pycache__/__init__.cpython-37.pyc differ diff --git a/util/concurrent/futures/__init__.py b/util/concurrent/futures/__init__.py new file mode 100644 index 0000000..8434fcf --- /dev/null +++ b/util/concurrent/futures/__init__.py @@ -0,0 +1,52 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Execute computations asynchronously using threads or processes.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +from concurrent.futures._base import (FIRST_COMPLETED, + FIRST_EXCEPTION, + ALL_COMPLETED, + CancelledError, + TimeoutError, + BrokenExecutor, + Future, + Executor, + wait, + as_completed) + +__all__ = ( + 'FIRST_COMPLETED', + 'FIRST_EXCEPTION', + 'ALL_COMPLETED', + 'CancelledError', + 'TimeoutError', + 'BrokenExecutor', + 'Future', + 'Executor', + 'wait', + 'as_completed', + 'ProcessPoolExecutor', + 'ThreadPoolExecutor', +) + + +def __dir__(): + return __all__ + ('__author__', '__doc__') + + +def __getattr__(name): + global ProcessPoolExecutor, ThreadPoolExecutor + + if name == 'ProcessPoolExecutor': + from .process import ProcessPoolExecutor as pe + ProcessPoolExecutor = pe + return pe + + if name == 'ThreadPoolExecutor': + from .thread import ThreadPoolExecutor as te + ThreadPoolExecutor = te + return te + + raise AttributeError(f"module {__name__} has no attribute {name}") diff --git a/util/concurrent/futures/__pycache__/__init__.cpython-37.pyc b/util/concurrent/futures/__pycache__/__init__.cpython-37.pyc new file mode 100644 index 0000000..c00c793 Binary files /dev/null and b/util/concurrent/futures/__pycache__/__init__.cpython-37.pyc differ diff --git a/util/concurrent/futures/__pycache__/_base.cpython-37.pyc b/util/concurrent/futures/__pycache__/_base.cpython-37.pyc new file mode 100644 index 0000000..2d048ba Binary files /dev/null and b/util/concurrent/futures/__pycache__/_base.cpython-37.pyc differ diff --git a/util/concurrent/futures/__pycache__/process.cpython-37.pyc b/util/concurrent/futures/__pycache__/process.cpython-37.pyc new file mode 100644 index 0000000..c7193a5 Binary files /dev/null and b/util/concurrent/futures/__pycache__/process.cpython-37.pyc differ diff --git a/util/concurrent/futures/__pycache__/thread.cpython-37.pyc b/util/concurrent/futures/__pycache__/thread.cpython-37.pyc new file mode 100644 index 0000000..5165f41 Binary files /dev/null and b/util/concurrent/futures/__pycache__/thread.cpython-37.pyc differ diff --git a/util/concurrent/futures/_base.py b/util/concurrent/futures/_base.py new file mode 100644 index 0000000..46d30a5 --- /dev/null +++ b/util/concurrent/futures/_base.py @@ -0,0 +1,630 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +import collections +import logging +import threading +import time + +FIRST_COMPLETED = 'FIRST_COMPLETED' +FIRST_EXCEPTION = 'FIRST_EXCEPTION' +ALL_COMPLETED = 'ALL_COMPLETED' +_AS_COMPLETED = '_AS_COMPLETED' + +# Possible future states (for internal use by the futures package). +PENDING = 'PENDING' +RUNNING = 'RUNNING' +# The future was cancelled by the user... +CANCELLED = 'CANCELLED' +# ...and _Waiter.add_cancelled() was called by a worker. +CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' +FINISHED = 'FINISHED' + +_FUTURE_STATES = [ + PENDING, + RUNNING, + CANCELLED, + CANCELLED_AND_NOTIFIED, + FINISHED +] + +_STATE_TO_DESCRIPTION_MAP = { + PENDING: "pending", + RUNNING: "running", + CANCELLED: "cancelled", + CANCELLED_AND_NOTIFIED: "cancelled", + FINISHED: "finished" +} + +# Logger for internal use by the futures package. +LOGGER = logging.getLogger("concurrent.futures") + +class Error(Exception): + """Base class for all future-related exceptions.""" + pass + +class CancelledError(Error): + """The Future was cancelled.""" + pass + +class TimeoutError(Error): + """The operation exceeded the given deadline.""" + pass + +class _Waiter(object): + """Provides the event that wait() and as_completed() block on.""" + def __init__(self): + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + + def add_exception(self, future): + self.finished_futures.append(future) + + def add_cancelled(self, future): + self.finished_futures.append(future) + +class _AsCompletedWaiter(_Waiter): + """Used by as_completed().""" + + def __init__(self): + super(_AsCompletedWaiter, self).__init__() + self.lock = threading.Lock() + + def add_result(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_result(future) + self.event.set() + + def add_exception(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_exception(future) + self.event.set() + + def add_cancelled(self, future): + with self.lock: + super(_AsCompletedWaiter, self).add_cancelled(future) + self.event.set() + +class _FirstCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_COMPLETED).""" + + def add_result(self, future): + super().add_result(future) + self.event.set() + + def add_exception(self, future): + super().add_exception(future) + self.event.set() + + def add_cancelled(self, future): + super().add_cancelled(future) + self.event.set() + +class _AllCompletedWaiter(_Waiter): + """Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" + + def __init__(self, num_pending_calls, stop_on_exception): + self.num_pending_calls = num_pending_calls + self.stop_on_exception = stop_on_exception + self.lock = threading.Lock() + super().__init__() + + def _decrement_pending_calls(self): + with self.lock: + self.num_pending_calls -= 1 + if not self.num_pending_calls: + self.event.set() + + def add_result(self, future): + super().add_result(future) + self._decrement_pending_calls() + + def add_exception(self, future): + super().add_exception(future) + if self.stop_on_exception: + self.event.set() + else: + self._decrement_pending_calls() + + def add_cancelled(self, future): + super().add_cancelled(future) + self._decrement_pending_calls() + +class _AcquireFutures(object): + """A context manager that does an ordered acquire of Future conditions.""" + + def __init__(self, futures): + self.futures = sorted(futures, key=id) + + def __enter__(self): + for future in self.futures: + future._condition.acquire() + + def __exit__(self, *args): + for future in self.futures: + future._condition.release() + +def _create_and_install_waiters(fs, return_when): + if return_when == _AS_COMPLETED: + waiter = _AsCompletedWaiter() + elif return_when == FIRST_COMPLETED: + waiter = _FirstCompletedWaiter() + else: + pending_count = sum( + f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) + + if return_when == FIRST_EXCEPTION: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) + elif return_when == ALL_COMPLETED: + waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) + else: + raise ValueError("Invalid return condition: %r" % return_when) + + for f in fs: + f._waiters.append(waiter) + + return waiter + + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + +def as_completed(fs, timeout=None): + """An iterator over the given futures that yields each as it completes. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + iterate over. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + + Returns: + An iterator that yields the given Futures as they complete (finished or + cancelled). If any given Futures are duplicated, they will be returned + once. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + """ + if timeout is not None: + end_time = timeout + time.monotonic() + + fs = set(fs) + total_futures = len(fs) + with _AcquireFutures(fs): + finished = set( + f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + pending = fs - finished + waiter = _create_and_install_waiters(fs, _AS_COMPLETED) + finished = list(finished) + try: + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs,)) + + while pending: + if timeout is None: + wait_timeout = None + else: + wait_timeout = end_time - time.monotonic() + if wait_timeout < 0: + raise TimeoutError( + '%d (of %d) futures unfinished' % ( + len(pending), total_futures)) + + waiter.event.wait(wait_timeout) + + with waiter.lock: + finished = waiter.finished_futures + waiter.finished_futures = [] + waiter.event.clear() + + # reverse to keep finishing order + finished.reverse() + yield from _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)) + + finally: + # Remove waiter from unfinished futures + for f in fs: + with f._condition: + f._waiters.remove(waiter) + +DoneAndNotDoneFutures = collections.namedtuple( + 'DoneAndNotDoneFutures', 'done not_done') +def wait(fs, timeout=None, return_when=ALL_COMPLETED): + """Wait for the futures in the given sequence to complete. + + Args: + fs: The sequence of Futures (possibly created by different Executors) to + wait upon. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + return_when: Indicates when this function should return. The options + are: + + FIRST_COMPLETED - Return when any future finishes or is + cancelled. + FIRST_EXCEPTION - Return when any future finishes by raising an + exception. If no future raises an exception + then it is equivalent to ALL_COMPLETED. + ALL_COMPLETED - Return when all futures finish or are cancelled. + + Returns: + A named 2-tuple of sets. The first set, named 'done', contains the + futures that completed (is finished or cancelled) before the wait + completed. The second set, named 'not_done', contains uncompleted + futures. + """ + with _AcquireFutures(fs): + done = set(f for f in fs + if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) + not_done = set(fs) - done + + if (return_when == FIRST_COMPLETED) and done: + return DoneAndNotDoneFutures(done, not_done) + elif (return_when == FIRST_EXCEPTION) and done: + if any(f for f in done + if not f.cancelled() and f.exception() is not None): + return DoneAndNotDoneFutures(done, not_done) + + if len(done) == len(fs): + return DoneAndNotDoneFutures(done, not_done) + + waiter = _create_and_install_waiters(fs, return_when) + + waiter.event.wait(timeout) + for f in fs: + with f._condition: + f._waiters.remove(waiter) + + done.update(waiter.finished_futures) + return DoneAndNotDoneFutures(done, set(fs) - done) + +class Future(object): + """Represents the result of an asynchronous computation.""" + + def __init__(self): + """Initializes the future. Should not be called by clients.""" + self._condition = threading.Condition() + self._state = PENDING + self._result = None + self._exception = None + self._waiters = [] + self._done_callbacks = [] + + def _invoke_callbacks(self): + for callback in self._done_callbacks: + try: + callback(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def __repr__(self): + with self._condition: + if self._state == FINISHED: + if self._exception: + return '<%s at %#x state=%s raised %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._exception.__class__.__name__) + else: + return '<%s at %#x state=%s returned %s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state], + self._result.__class__.__name__) + return '<%s at %#x state=%s>' % ( + self.__class__.__name__, + id(self), + _STATE_TO_DESCRIPTION_MAP[self._state]) + + def cancel(self): + """Cancel the future if possible. + + Returns True if the future was cancelled, False otherwise. A future + cannot be cancelled if it is running or has already completed. + """ + with self._condition: + if self._state in [RUNNING, FINISHED]: + return False + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + return True + + self._state = CANCELLED + self._condition.notify_all() + + self._invoke_callbacks() + return True + + def cancelled(self): + """Return True if the future was cancelled.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] + + def running(self): + """Return True if the future is currently executing.""" + with self._condition: + return self._state == RUNNING + + def done(self): + """Return True of the future was cancelled or finished executing.""" + with self._condition: + return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] + + def __get_result(self): + if self._exception: + raise self._exception + else: + return self._result + + def add_done_callback(self, fn): + """Attaches a callable that will be called when the future finishes. + + Args: + fn: A callable that will be called with this future as its only + argument when the future completes or is cancelled. The callable + will always be called by a thread in the same process in which + it was added. If the future has already completed or been + cancelled then the callable will be called immediately. These + callables are called in the order that they were added. + """ + with self._condition: + if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: + self._done_callbacks.append(fn) + return + try: + fn(self) + except Exception: + LOGGER.exception('exception calling callback for %r', self) + + def result(self, timeout=None): + """Return the result of the call that the future represents. + + Args: + timeout: The number of seconds to wait for the result if the future + isn't done. If None, then there is no limit on the wait time. + + Returns: + The result of the call that the future represents. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + Exception: If the call raised then that exception will be raised. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self.__get_result() + else: + raise TimeoutError() + + def exception(self, timeout=None): + """Return the exception raised by the call that the future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception + else: + raise TimeoutError() + + # The following methods should only be used by Executors and in tests. + def set_running_or_notify_cancel(self): + """Mark the future as running or process any cancel notifications. + + Should only be used by Executor implementations and unit tests. + + If the future has been cancelled (cancel() was called and returned + True) then any threads waiting on the future completing (though calls + to as_completed() or wait()) are notified and False is returned. + + If the future was not cancelled then it is put in the running state + (future calls to running() will return True) and True is returned. + + This method should be called by Executor implementations before + executing the work associated with this future. If this method returns + False then the work should not be executed. + + Returns: + False if the Future was cancelled, True otherwise. + + Raises: + RuntimeError: if this method was already called or if set_result() + or set_exception() was called. + """ + with self._condition: + if self._state == CANCELLED: + self._state = CANCELLED_AND_NOTIFIED + for waiter in self._waiters: + waiter.add_cancelled(self) + # self._condition.notify_all() is not necessary because + # self.cancel() triggers a notification. + return False + elif self._state == PENDING: + self._state = RUNNING + return True + else: + LOGGER.critical('Future %s in unexpected state: %s', + id(self), + self._state) + raise RuntimeError('Future in unexpected state') + + def set_result(self, result): + """Sets the return value of work associated with the future. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._result = result + self._state = FINISHED + for waiter in self._waiters: + waiter.add_result(self) + self._condition.notify_all() + self._invoke_callbacks() + + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + with self._condition: + self._exception = exception + self._state = FINISHED + for waiter in self._waiters: + waiter.add_exception(self) + self._condition.notify_all() + self._invoke_callbacks() + +class Executor(object): + """This is an abstract base class for concrete asynchronous executors.""" + + def submit(*args, **kwargs): + """Submits a callable to be executed with the given arguments. + + Schedules the callable to be executed as fn(*args, **kwargs) and returns + a Future instance representing the execution of the callable. + + Returns: + A Future representing the given call. + """ + if len(args) >= 2: + pass + elif not args: + raise TypeError("descriptor 'submit' of 'Executor' object " + "needs an argument") + elif 'fn' not in kwargs: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) + + raise NotImplementedError() + + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: The size of the chunks the iterable will be broken into + before being passed to a child process. This argument is only + used by ProcessPoolExecutor; it is ignored by + ThreadPoolExecutor. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if timeout is not None: + end_time = timeout + time.monotonic() + + fs = [self.submit(fn, *args) for args in zip(*iterables)] + + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + yield fs.pop().result() + else: + yield fs.pop().result(end_time - time.monotonic()) + finally: + for future in fs: + future.cancel() + return result_iterator() + + def shutdown(self, wait=True): + """Clean-up the resources associated with the Executor. + + It is safe to call this method several times. Otherwise, no other + methods can be called after this one. + + Args: + wait: If True then shutdown will not return until all running + futures have finished executing and the resources used by the + executor have been reclaimed. + """ + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.shutdown(wait=True) + return False + + +class BrokenExecutor(RuntimeError): + """ + Raised when a executor has become non-functional after a severe failure. + """ diff --git a/util/concurrent/futures/process.py b/util/concurrent/futures/process.py new file mode 100644 index 0000000..9106552 --- /dev/null +++ b/util/concurrent/futures/process.py @@ -0,0 +1,699 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ProcessPoolExecutor. + +The follow diagram and text describe the data-flow through the system: + +|======================= In-process =====================|== Out-of-process ==| + ++----------+ +----------+ +--------+ +-----------+ +---------+ +| | => | Work Ids | | | | Call Q | | Process | +| | +----------+ | | +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | +| | | 7 | | | | ... | | | +| Process | | ... | | Local | +-----------+ | Process | +| Pool | +----------+ | Worker | | #1..n | +| Executor | | Thread | | | +| | +----------- + | | +-----------+ | | +| | <=> | Work Items | <=> | | <= | Result Q | <= | | +| | +------------+ | | +-----------+ | | +| | | 6: call() | | | | ... | | | +| | | future | | | | 4, result | | | +| | | ... | | | | 3, except | | | ++----------+ +------------+ +--------+ +-----------+ +---------+ + +Executor.submit() called: +- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict +- adds the id of the _WorkItem to the "Work Ids" queue + +Local worker thread: +- reads work ids from the "Work Ids" queue and looks up the corresponding + WorkItem from the "Work Items" dict: if the work item has been cancelled then + it is simply removed from the dict, otherwise it is repackaged as a + _CallItem and put in the "Call Q". New _CallItems are put in the "Call Q" + until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because + calls placed in the "Call Q" can no longer be cancelled with Future.cancel(). +- reads _ResultItems from "Result Q", updates the future stored in the + "Work Items" dict and deletes the dict entry + +Process #1..n: +- reads _CallItems from "Call Q", executes the calls, and puts the resulting + _ResultItems in "Result Q" +""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +import atexit +import os +from concurrent.futures import _base +import queue +from queue import Full +import multiprocessing as mp +from multiprocessing.connection import wait +from multiprocessing.queues import Queue +import threading +import weakref +from functools import partial +import itertools +import sys +import traceback + +# Workers are created as daemon threads and processes. This is done to allow the +# interpreter to exit when there are still idle processes in a +# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, +# allowing workers to die with the interpreter has two undesirable properties: +# - The workers would still be running during interpreter shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads/processes finish. + +_threads_wakeups = weakref.WeakKeyDictionary() +_global_shutdown = False + + +class _ThreadWakeup: + def __init__(self): + self._reader, self._writer = mp.Pipe(duplex=False) + + def close(self): + self._writer.close() + self._reader.close() + + def wakeup(self): + self._writer.send_bytes(b"") + + def clear(self): + while self._reader.poll(): + self._reader.recv_bytes() + + +def _python_exit(): + global _global_shutdown + _global_shutdown = True + items = list(_threads_wakeups.items()) + for _, thread_wakeup in items: + thread_wakeup.wakeup() + for t, _ in items: + t.join() + +# Controls how many more calls than processes will be queued in the call queue. +# A smaller number will mean that processes spend more time idle waiting for +# work while a larger number will make Future.cancel() succeed less frequently +# (Futures in the call queue cannot be cancelled). +EXTRA_QUEUED_CALLS = 1 + + +# On Windows, WaitForMultipleObjects is used to wait for processes to finish. +# It can wait on, at most, 63 objects. There is an overhead of two objects: +# - the result queue reader +# - the thread wakeup reader +_MAX_WINDOWS_WORKERS = 63 - 2 + +# Hack to embed stringification of remote traceback in local traceback + +class _RemoteTraceback(Exception): + def __init__(self, tb): + self.tb = tb + def __str__(self): + return self.tb + +class _ExceptionWithTraceback: + def __init__(self, exc, tb): + tb = traceback.format_exception(type(exc), exc, tb) + tb = ''.join(tb) + self.exc = exc + self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): + return _rebuild_exc, (self.exc, self.tb) + +def _rebuild_exc(exc, tb): + exc.__cause__ = _RemoteTraceback(tb) + return exc + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + +class _ResultItem(object): + def __init__(self, work_id, exception=None, result=None): + self.work_id = work_id + self.exception = exception + self.result = result + +class _CallItem(object): + def __init__(self, work_id, fn, args, kwargs): + self.work_id = work_id + self.fn = fn + self.args = args + self.kwargs = kwargs + + +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated. In this case, + # the queue_manager_thread fails all work_items with BrokenProcessPool + if work_item is not None: + work_item.future.set_exception(e) + else: + super()._on_queue_feeder_error(e, obj) + + +def _get_chunks(*iterables, chunksize): + """ Iterates over zip()ed iterables in chunks. """ + it = zip(*iterables) + while True: + chunk = tuple(itertools.islice(it, chunksize)) + if not chunk: + return + yield chunk + +def _process_chunk(fn, chunk): + """ Processes a chunk of an iterable passed to map. + + Runs the function passed to map() on a chunk of the + iterable passed to map. + + This function is run in a separate process. + + """ + return [fn(*args) for args in chunk] + + +def _sendback_result(result_queue, work_id, result=None, exception=None): + """Safely send back the given result or exception""" + try: + result_queue.put(_ResultItem(work_id, result=result, + exception=exception)) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(work_id, exception=exc)) + + +def _process_worker(call_queue, result_queue, initializer, initargs): + """Evaluates calls from call_queue and places the results in result_queue. + + This worker is run in a separate process. + + Args: + call_queue: A ctx.Queue of _CallItems that will be read and + evaluated by the worker. + result_queue: A ctx.Queue of _ResultItems that will written + to by the worker. + initializer: A callable initializer, or None + initargs: A tuple of args for the initializer + """ + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + # The parent will notice that the process stopped and + # mark the pool broken + return + while True: + call_item = call_queue.get(block=True) + if call_item is None: + # Wake up queue management thread + result_queue.put(os.getpid()) + return + try: + r = call_item.fn(*call_item.args, **call_item.kwargs) + except BaseException as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + _sendback_result(result_queue, call_item.work_id, exception=exc) + else: + _sendback_result(result_queue, call_item.work_id, result=r) + + # Liberate the resource as soon as possible, to avoid holding onto + # open files or shared memory that is not needed anymore + del call_item + + +def _add_call_item_to_queue(pending_work_items, + work_ids, + call_queue): + """Fills call_queue with _WorkItems from pending_work_items. + + This function never blocks. + + Args: + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids + are consumed and the corresponding _WorkItems from + pending_work_items are transformed into _CallItems and put in + call_queue. + call_queue: A multiprocessing.Queue that will be filled with _CallItems + derived from _WorkItems. + """ + while True: + if call_queue.full(): + return + try: + work_id = work_ids.get(block=False) + except queue.Empty: + return + else: + work_item = pending_work_items[work_id] + + if work_item.future.set_running_or_notify_cancel(): + call_queue.put(_CallItem(work_id, + work_item.fn, + work_item.args, + work_item.kwargs), + block=True) + else: + del pending_work_items[work_id] + continue + + +def _queue_management_worker(executor_reference, + processes, + pending_work_items, + work_ids_queue, + call_queue, + result_queue, + thread_wakeup): + """Manages the communication between this process and the worker processes. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + process: A list of the ctx.Process instances used as + workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). + call_queue: A ctx.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + result_queue: A ctx.SimpleQueue of _ResultItems generated by the + process workers. + thread_wakeup: A _ThreadWakeup to allow waking up the + queue_manager_thread from the main Thread and avoid deadlocks + caused by permanently locked queues. + """ + executor = None + + def shutting_down(): + return (_global_shutdown or executor is None + or executor._shutdown_thread) + + def shutdown_worker(): + # This is an upper bound on the number of children alive. + n_children_alive = sum(p.is_alive() for p in processes.values()) + n_children_to_stop = n_children_alive + n_sentinels_sent = 0 + # Send the right number of sentinels, to make sure all children are + # properly terminated. + while n_sentinels_sent < n_children_to_stop and n_children_alive > 0: + for i in range(n_children_to_stop - n_sentinels_sent): + try: + call_queue.put_nowait(None) + n_sentinels_sent += 1 + except Full: + break + n_children_alive = sum(p.is_alive() for p in processes.values()) + + # Release the queue's resources as soon as possible. + call_queue.close() + # If .join() is not called on the created processes then + # some ctx.Queue methods may deadlock on Mac OS X. + for p in processes.values(): + p.join() + + result_reader = result_queue._reader + wakeup_reader = thread_wakeup._reader + readers = [result_reader, wakeup_reader] + + while True: + _add_call_item_to_queue(pending_work_items, + work_ids_queue, + call_queue) + + # Wait for a result to be ready in the result_queue while checking + # that all worker processes are still running, or for a wake up + # signal send. The wake up signals come either from new tasks being + # submitted, from the executor being shutdown/gc-ed, or from the + # shutdown of the python interpreter. + worker_sentinels = [p.sentinel for p in processes.values()] + ready = wait(readers + worker_sentinels) + + cause = None + is_broken = True + if result_reader in ready: + try: + result_item = result_reader.recv() + is_broken = False + except BaseException as e: + cause = traceback.format_exception(type(e), e, e.__traceback__) + + elif wakeup_reader in ready: + is_broken = False + result_item = None + thread_wakeup.clear() + if is_broken: + # Mark the process pool broken so that submits fail right now. + executor = executor_reference() + if executor is not None: + executor._broken = ('A child process terminated ' + 'abruptly, the process pool is not ' + 'usable anymore') + executor._shutdown_thread = True + executor = None + bpe = BrokenProcessPool("A process in the process pool was " + "terminated abruptly while the future was " + "running or pending.") + if cause is not None: + bpe.__cause__ = _RemoteTraceback( + f"\n'''\n{''.join(cause)}'''") + # All futures in flight must be marked failed + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception(bpe) + # Delete references to object. See issue16284 + del work_item + pending_work_items.clear() + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + for p in processes.values(): + p.terminate() + shutdown_worker() + return + if isinstance(result_item, int): + # Clean shutdown of a worker using its PID + # (avoids marking the executor broken) + assert shutting_down() + p = processes.pop(result_item) + p.join() + if not processes: + shutdown_worker() + return + elif result_item is not None: + work_item = pending_work_items.pop(result_item.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + if result_item.exception: + work_item.future.set_exception(result_item.exception) + else: + work_item.future.set_result(result_item.result) + # Delete references to object. See issue16284 + del work_item + # Delete reference to result_item + del result_item + + # Check whether we should start shutting down. + executor = executor_reference() + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that owns this worker has been collected OR + # - The executor that owns this worker has been shutdown. + if shutting_down(): + try: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown_thread = True + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_worker() + return + except Full: + # This is not a problem: we will eventually be woken up (in + # result_queue.get()) and be able to send a sentinel again. + pass + executor = None + + +_system_limits_checked = False +_system_limited = None + + +def _check_system_limits(): + global _system_limits_checked, _system_limited + if _system_limits_checked: + if _system_limited: + raise NotImplementedError(_system_limited) + _system_limits_checked = True + try: + nsems_max = os.sysconf("SC_SEM_NSEMS_MAX") + except (AttributeError, ValueError): + # sysconf not available or setting not available + return + if nsems_max == -1: + # indetermined limit, assume that limit is determined + # by available memory only + return + if nsems_max >= 256: + # minimum number of semaphores available + # according to POSIX + return + _system_limited = ("system provides too few semaphores (%d" + " available, 256 necessary)" % nsems_max) + raise NotImplementedError(_system_limited) + + +def _chain_from_iterable_of_lists(iterable): + """ + Specialized implementation of itertools.chain.from_iterable. + Each item in *iterable* should be a list. This function is + careful not to keep references to yielded objects. + """ + for element in iterable: + element.reverse() + while element: + yield element.pop() + + +class BrokenProcessPool(_base.BrokenExecutor): + """ + Raised when a process in a ProcessPoolExecutor terminated abruptly + while a future was in the running state. + """ + + +class ProcessPoolExecutor(_base.Executor): + def __init__(self, max_workers=None, mp_context=None, + initializer=None, initargs=()): + """Initializes a new ProcessPoolExecutor instance. + + Args: + max_workers: The maximum number of processes that can be used to + execute the given calls. If None or not given then as many + worker processes will be created as the machine has processors. + mp_context: A multiprocessing context to launch the workers. This + object should provide SimpleQueue, Queue and Process. + initializer: A callable used to initialize worker processes. + initargs: A tuple of arguments to pass to the initializer. + """ + _check_system_limits() + + if max_workers is None: + self._max_workers = os.cpu_count() or 1 + if sys.platform == 'win32': + self._max_workers = min(_MAX_WINDOWS_WORKERS, + self._max_workers) + else: + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + elif (sys.platform == 'win32' and + max_workers > _MAX_WINDOWS_WORKERS): + raise ValueError( + f"max_workers must be <= {_MAX_WINDOWS_WORKERS}") + + self._max_workers = max_workers + + if mp_context is None: + mp_context = mp.get_context() + self._mp_context = mp_context + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + self._initializer = initializer + self._initargs = initargs + + # Management thread + self._queue_management_thread = None + + # Map of pids to processes + self._processes = {} + + # Shutdown is a two-step process. + self._shutdown_thread = False + self._shutdown_lock = threading.Lock() + self._broken = False + self._queue_count = 0 + self._pending_work_items = {} + + # Create communication channels for the executor + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + queue_size = self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, ctx=self._mp_context, + pending_work_items=self._pending_work_items) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + self._result_queue = mp_context.SimpleQueue() + self._work_ids = queue.Queue() + + # _ThreadWakeup is a communication channel used to interrupt the wait + # of the main loop of queue_manager_thread from another thread (e.g. + # when calling executor.submit or executor.shutdown). We do not use the + # _result_queue to send the wakeup signal to the queue_manager_thread + # as it could result in a deadlock if a worker process dies with the + # _result_queue write lock still acquired. + self._queue_management_thread_wakeup = _ThreadWakeup() + + def _start_queue_management_thread(self): + if self._queue_management_thread is None: + # When the executor gets garbarge collected, the weakref callback + # will wake up the queue management thread so that it can terminate + # if there is no pending work item. + def weakref_cb(_, + thread_wakeup=self._queue_management_thread_wakeup): + mp.util.debug('Executor collected: triggering callback for' + ' QueueManager wakeup') + thread_wakeup.wakeup() + # Start the processes so that their sentinels are known. + self._adjust_process_count() + self._queue_management_thread = threading.Thread( + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue, + self._queue_management_thread_wakeup), + name="QueueManagerThread") + self._queue_management_thread.daemon = True + self._queue_management_thread.start() + _threads_wakeups[self._queue_management_thread] = \ + self._queue_management_thread_wakeup + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = self._mp_context.Process( + target=_process_worker, + args=(self._call_queue, + self._result_queue, + self._initializer, + self._initargs)) + p.start() + self._processes[p.pid] = p + + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + elif not args: + raise TypeError("descriptor 'submit' of 'ProcessPoolExecutor' object " + "needs an argument") + elif 'fn' in kwargs: + fn = kwargs.pop('fn') + self, *args = args + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args)-1)) + + with self._shutdown_lock: + if self._broken: + raise BrokenProcessPool(self._broken) + if self._shutdown_thread: + raise RuntimeError('cannot schedule new futures after shutdown') + if _global_shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._pending_work_items[self._queue_count] = w + self._work_ids.put(self._queue_count) + self._queue_count += 1 + # Wake up queue management thread + self._queue_management_thread_wakeup.wakeup() + + self._start_queue_management_thread() + return f + submit.__doc__ = _base.Executor.submit.__doc__ + + def map(self, fn, *iterables, timeout=None, chunksize=1): + """Returns an iterator equivalent to map(fn, iter). + + Args: + fn: A callable that will take as many arguments as there are + passed iterables. + timeout: The maximum number of seconds to wait. If None, then there + is no limit on the wait time. + chunksize: If greater than one, the iterables will be chopped into + chunks of size chunksize and submitted to the process pool. + If set to one, the items in the list will be sent one at a time. + + Returns: + An iterator equivalent to: map(func, *iterables) but the calls may + be evaluated out-of-order. + + Raises: + TimeoutError: If the entire result iterator could not be generated + before the given timeout. + Exception: If fn(*args) raises for any values. + """ + if chunksize < 1: + raise ValueError("chunksize must be >= 1.") + + results = super().map(partial(_process_chunk, fn), + _get_chunks(*iterables, chunksize=chunksize), + timeout=timeout) + return _chain_from_iterable_of_lists(results) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown_thread = True + if self._queue_management_thread: + # Wake up queue management thread + self._queue_management_thread_wakeup.wakeup() + if wait: + self._queue_management_thread.join() + # To reduce the risk of opening too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + if self._call_queue is not None: + self._call_queue.close() + if wait: + self._call_queue.join_thread() + self._call_queue = None + self._result_queue = None + self._processes = None + + if self._queue_management_thread_wakeup: + self._queue_management_thread_wakeup.close() + self._queue_management_thread_wakeup = None + + shutdown.__doc__ = _base.Executor.shutdown.__doc__ + +atexit.register(_python_exit) diff --git a/util/concurrent/futures/thread.py b/util/concurrent/futures/thread.py new file mode 100644 index 0000000..c773c24 --- /dev/null +++ b/util/concurrent/futures/thread.py @@ -0,0 +1,226 @@ +# Copyright 2009 Brian Quinlan. All Rights Reserved. +# Licensed to PSF under a Contributor Agreement. + +"""Implements ThreadPoolExecutor.""" + +__author__ = 'Brian Quinlan (brian@sweetapp.com)' + +import atexit +from concurrent.futures import _base +import itertools +import queue +import threading +import weakref +import os + +# Workers are created as daemon threads. This is done to allow the interpreter +# to exit when there are still idle threads in a ThreadPoolExecutor's thread +# pool (i.e. shutdown() was not called). However, allowing workers to die with +# the interpreter has two undesirable properties: +# - The workers would still be running during interpreter shutdown, +# meaning that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could +# be bad if the callable being evaluated has external side-effects e.g. +# writing to a file. +# +# To work around this problem, an exit handler is installed which tells the +# workers to exit when their work queues are empty and then waits until the +# threads finish. + +_threads_queues = weakref.WeakKeyDictionary() +_shutdown = False + + +def _python_exit(): + global _shutdown + _shutdown = True + items = list(_threads_queues.items()) + for t, q in items: + q.put(None) + for t, q in items: + t.join() + + +atexit.register(_python_exit) + + +class _WorkItem(object): + def __init__(self, future, fn, args, kwargs): + self.future = future + self.fn = fn + self.args = args + self.kwargs = kwargs + + def run(self): + if not self.future.set_running_or_notify_cancel(): + return + + try: + result = self.fn(*self.args, **self.kwargs) + except BaseException as exc: + self.future.set_exception(exc) + # Break a reference cycle with the exception 'exc' + self = None + else: + self.future.set_result(result) + + +def _worker(executor_reference, work_queue, initializer, initargs): + if initializer is not None: + try: + initializer(*initargs) + except BaseException: + _base.LOGGER.critical('Exception in initializer:', exc_info=True) + executor = executor_reference() + if executor is not None: + executor._initializer_failed() + return + try: + while True: + work_item = work_queue.get(block=True) + if work_item is not None: + work_item.run() + # Delete references to object. See issue16284 + del work_item + continue + executor = executor_reference() + # Exit if: + # - The interpreter is shutting down OR + # - The executor that owns the worker has been collected OR + # - The executor that owns the worker has been shutdown. + if _shutdown or executor is None or executor._shutdown: + # Flag the executor as shutting down as early as possible if it + # is not gc-ed yet. + if executor is not None: + executor._shutdown = True + # Notice other workers + work_queue.put(None) + return + del executor + except BaseException: + _base.LOGGER.critical('Exception in worker', exc_info=True) + + +class BrokenThreadPool(_base.BrokenExecutor): + """ + Raised when a worker thread in a ThreadPoolExecutor failed initializing. + """ + + +class ThreadPoolExecutor(_base.Executor): + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().__next__ + + def __init__(self, max_workers=None, thread_name_prefix='', + initializer=None, initargs=(), max_queue_size=None): + """Initializes a new ThreadPoolExecutor instance. + + Args: + max_workers: The maximum number of threads that can be used to + execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. + initializer: A callable used to initialize worker threads. + initargs: A tuple of arguments to pass to the initializer. + """ + if max_workers is None: + # Use this number because ThreadPoolExecutor is often + # used to overlap I/O instead of CPU work. + max_workers = (os.cpu_count() or 1) * 5 + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + + if initializer is not None and not callable(initializer): + raise TypeError("initializer must be a callable") + + self._max_workers = max_workers + if not max_queue_size: + self._work_queue = queue.Queue() + else: + self._work_queue = queue.Queue(self._max_workers * 2) + # print(max_queue_size) + self._threads = set() + self._broken = False + self._shutdown = False + self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) + self._initializer = initializer + self._initargs = initargs + + def submit(*args, **kwargs): + if len(args) >= 2: + self, fn, *args = args + elif not args: + raise TypeError("descriptor 'submit' of 'ThreadPoolExecutor' object " + "needs an argument") + elif 'fn' in kwargs: + fn = kwargs.pop('fn') + self, *args = args + else: + raise TypeError('submit expected at least 1 positional argument, ' + 'got %d' % (len(args) - 1)) + + with self._shutdown_lock: + if self._broken: + raise BrokenThreadPool(self._broken) + + if self._shutdown: + raise RuntimeError('cannot schedule new futures after shutdown') + if _shutdown: + raise RuntimeError('cannot schedule new futures after ' + 'interpreter shutdown') + + f = _base.Future() + w = _WorkItem(f, fn, args, kwargs) + + self._work_queue.put(w) + self._adjust_thread_count() + return f + + submit.__doc__ = _base.Executor.submit.__doc__ + + def _adjust_thread_count(self): + # When the executor gets lost, the weakref callback will wake up + # the worker threads. + def weakref_cb(_, q=self._work_queue): + q.put(None) + + # TODO(bquinlan): Should avoid creating new threads if there are more + # idle threads than items in the work queue. + num_threads = len(self._threads) + # print("num_threads", num_threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, + args=(weakref.ref(self, weakref_cb), + self._work_queue, + self._initializer, + self._initargs)) + t.daemon = True + t.start() + self._threads.add(t) + # _threads_queues[t] = self._work_queue + + def _initializer_failed(self): + with self._shutdown_lock: + self._broken = ('A thread initializer failed, the thread pool ' + 'is not usable anymore') + # Drain work queue and mark pending futures failed + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + break + if work_item is not None: + work_item.future.set_exception(BrokenThreadPool(self._broken)) + + def shutdown(self, wait=True): + with self._shutdown_lock: + self._shutdown = True + self._work_queue.put(None) + if wait: + for t in self._threads: + t.join() + + shutdown.__doc__ = _base.Executor.shutdown.__doc__ diff --git a/util/get_cert.py b/util/get_cert.py new file mode 100644 index 0000000..2bfb60c --- /dev/null +++ b/util/get_cert.py @@ -0,0 +1,118 @@ +import time +from hyper import HTTP20Connection, HTTP11Connection +from util.thread_timeout import time_limited +from main import TIMEOUT, UN_CONNECTED, CONNECTED_HTTP1_1, CONNECTED_HTTP2_0, NOT_SUPPORT_FOR_HTTP2, \ + make_doh_request_body, CERTIFICATE_VERIFY_FAILED +import threading +from util.util_http import make_ssl_context +import ssl, socket, sys, csv +from hyper.http20.exceptions import ConnectionError +from concurrent.futures import ThreadPoolExecutor +import os + +root_dir = sys.path[0] + +# %Y-%m-%d %H:%M:%S +datetime_str = time.strftime('%y%m%d%H', time.localtime(time.time())) +save_dir = "./verify" +if not os.path.exists(save_dir): + os.mkdir(save_dir) +save_file = os.path.join(root_dir, save_dir, f"{datetime_str}.csv") +# f_verify = csv.writer(open(f"./verify/{datetime_str}.csv", "w", newline="")) +f = open(save_file, "w", newline="") +f_verify = csv.writer(f) +# csv.writer(f_verify) + +lock = threading.Lock() + +body = make_doh_request_body() + + +@time_limited(TIMEOUT * 10) +def doh_verify(ip, port): + # print(ip, port, host, path, method) + verify_mode = ssl.CERT_OPTIONAL + + ctx_2_0 = make_ssl_context() + ctx_2_0.set_alpn_protocols(['h2']) + # ctx_2_0.verify_mode = ssl.CERT_NONE + conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0) + conn_status = UN_CONNECTED + try: + conn.connect() + conn_status = CONNECTED_HTTP2_0 + # print("成功建立HTTP2.0连接!") + except (AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError, OSError, + ssl.SSLCertVerificationError, ssl.SSLError, + socket.timeout, + ConnectionError, # hyper error + ) as e: + if e.__str__().startswith(NOT_SUPPORT_FOR_HTTP2): # 预期内的错误,继续尝试HTTP1.1 + # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1") + ... + elif e.__str__().startswith(CERTIFICATE_VERIFY_FAILED): # 证书导致的错误,将sslContext设置为CERT_NONE + # print("证书原因导致连接建立失败,设置sslContext为CERT_NONE") + ctx_2_0.verify_mode = ssl.CERT_NONE + verify_mode = ssl.CERT_NONE + conn = HTTP20Connection(ip, port=port, ssl_context=ctx_2_0) + try: + conn.connect() + conn_status = CONNECTED_HTTP2_0 + # print("成功建立HTTP2.0连接!") + except ( + AssertionError, TimeoutError, FileNotFoundError, ConnectionResetError, ConnectionRefusedError, + OSError, + ssl.SSLCertVerificationError, ssl.SSLError, + socket.timeout, + ConnectionError, # hyper error + ) as e: + # print("建立连接失败!服务端不支持HTTP2.0,继续尝试HTTP1.1") + ... + else: + # print("建立连接失败!错误信息如下:", e) + return False + + if conn_status == UN_CONNECTED: + ctx_1_1 = make_ssl_context() + ctx_1_1.verify_mode = verify_mode + conn = HTTP11Connection(ip, port=port, ssl_context=ctx_1_1) + try: + conn.connect() + conn_status = CONNECTED_HTTP1_1 + # print("成功建立HTTP1.1连接!") + except (TimeoutError, ConnectionRefusedError, FileNotFoundError, ConnectionAbortedError, + ssl.SSLCertVerificationError, ssl.SSLError, + socket.timeout, OSError + ) as e: # 如使用HTTP1.1仍然无法建立连接,则放弃 + # print("建立连接失败!错误信息如下:", e) + return False + + try: + cert = conn._sock.getpeercert() + except AttributeError: + cert = None + lock.acquire() + try: + f_verify.writerow([ip, cert.__str__()]) + except UnicodeEncodeError: + print(ip, cert.__str__()) + lock.release() + + +def run_with_thread_pool(max_workers, data_group, fn): + # print("Before ThreadPool!") + with ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="") as threadPool: + for data in data_group: + threadPool.submit(fn, *data) + + +if __name__ == '__main__': + data_group = [] + with open("./suspect/no_cert_ips.csv") as fp: + csv_reader = csv.reader(fp) + for row in csv_reader: + ip = row[0] + data_group.append((ip, 443)) + print(len(data_group)) + run_with_thread_pool(64, data_group, doh_verify) + f.close() diff --git a/util/thread_timeout.py b/util/thread_timeout.py new file mode 100644 index 0000000..9a2637c --- /dev/null +++ b/util/thread_timeout.py @@ -0,0 +1,65 @@ +from threading import Thread +import inspect +import ctypes +from functools import wraps + + +def _async_raise(tid, exctype): + """raises the exception, performs cleanup if needed""" + tid = ctypes.c_long(tid) + if not inspect.isclass(exctype): + exctype = type(exctype) + res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(exctype)) + if res == 0: + raise ValueError("invalid thread id") + elif res != 1: + # """if it returns a number greater than one, you're in trouble, + # and you should call it again with exc=NULL to revert the effect""" + ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, None) + raise SystemError("PyThreadState_SetAsyncExc failed") + + +def stop_thread(thread): + _async_raise(thread.ident, SystemExit) + + +class TimeoutException(Exception): + # print("timeout!") + pass + + +ThreadStop = stop_thread + + +def time_limited(timeout): + def decorator(function): + @wraps(function) + def wrapped_function(*args, **kwargs): + class TimeLimited(Thread): + def __init__(self): + Thread.__init__(self) + self.error = None + self.result = None + + def run(self): + self.result = function(*args, **kwargs) + + def stop(self): + if self.is_alive(): + ThreadStop(self) + + t = TimeLimited() + t.start() + t.join(timeout) + if isinstance(t.error, TimeoutException): + t.stop() + raise TimeoutException('timeout for %s' % (repr(function))) + if t.is_alive(): + t.stop() + raise TimeoutException('timeout for %s' % (repr(function))) + if t.error is None: + return t.result + + return wrapped_function + + return decorator diff --git a/util/util_http.py b/util/util_http.py new file mode 100644 index 0000000..853b351 --- /dev/null +++ b/util/util_http.py @@ -0,0 +1,45 @@ +from dns.message import make_query, from_wire +import ssl + + +def make_ssl_context(check_cert=False): + ssl_ctx = ssl.create_default_context() + ssl_ctx.check_hostname = False + if check_cert: + ssl_ctx.verify_mode = ssl.CERT_REQUIRED + else: + # ssl_ctx.verify_mode = ssl.CERT_NONE + ssl_ctx.verify_mode = ssl.CERT_OPTIONAL + # ssl_ctx.minimum_version = ssl.TLSVersion.TLSv1_1 + return ssl_ctx + + +def build_dns_query(qname="example.com", rtype="A"): + dns_query = make_query( + qname=qname, + rdtype=rtype, + want_dnssec=False, + ) + return dns_query + + +def get_domain_from_cert(cert): + cert_info_domain_related = [] + + if not cert: + return cert_info_domain_related + + for key, value in cert.items(): + if key == "subject": + for (sub_key, sub_value), in value: + if sub_key == "commonName": + # print(sub_value) + cert_info_domain_related.append(sub_value) + if key == "subjectAltName": + for (sub_key, sub_value) in value: + cert_info_domain_related.append(sub_value) + return list(set(cert_info_domain_related)) + +# def build_dns_query_wireformat(qname="example.com", rtype="A"): +# dns_query = build_dns_query() +# return dns_query.to_wire() diff --git a/util/util_kafka.py b/util/util_kafka.py new file mode 100644 index 0000000..07e5302 --- /dev/null +++ b/util/util_kafka.py @@ -0,0 +1,27 @@ +from kafka import KafkaProducer, KafkaConsumer +import json + +kafka_topic_suc = 'sucInfo' +kafka_topic_fail = 'failInfo' +kafka_topic_cert = 'certInfo' +broker_list = ['0.kafka.my:9092', '1.kafka.my:9092'] + +seed_kafka_topic = "seed" +seed_consumer_group_id = "hyConsumer" +seed_broker_list = ["172.19.0.6:9092"] + + +def make_kafka_producer(): + return KafkaProducer( + bootstrap_servers=broker_list, + value_serializer=lambda v: json.dumps(v).encode('utf-8'), + ) + + +def make_seed_consumer(): + return KafkaConsumer(seed_kafka_topic, + group_id=seed_consumer_group_id, + auto_commit_interval_ms=1000, + bootstrap_servers=seed_broker_list, + value_deserializer=lambda v: json.loads(v.decode('utf-8')) + ) -- cgit v1.2.3