summaryrefslogtreecommitdiff
path: root/detection
diff options
context:
space:
mode:
author尹姜谊 <[email protected]>2024-01-23 10:12:59 +0800
committer尹姜谊 <[email protected]>2024-01-23 10:12:59 +0800
commit6ba8db34d295086a294c15f888b6ec0a928e87f4 (patch)
tree141adfd9ace9af8f4e69045355edcbb42e798aa5 /detection
parente9188b4443008917e71b81cd5221346af809cf8c (diff)
Add: ActiveObtainer
Diffstat (limited to 'detection')
-rw-r--r--detection/tool/ActiveObtainer.py252
-rw-r--r--detection/tool/Config.py18
-rw-r--r--detection/tool/Functions.py79
-rw-r--r--detection/tool/__pycache__/Config.cpython-39.pycbin1190 -> 1513 bytes
4 files changed, 346 insertions, 3 deletions
diff --git a/detection/tool/ActiveObtainer.py b/detection/tool/ActiveObtainer.py
new file mode 100644
index 0000000..56c00e0
--- /dev/null
+++ b/detection/tool/ActiveObtainer.py
@@ -0,0 +1,252 @@
+import argparse
+import subprocess
+import os
+import requests
+import json
+import csv
+import ipaddress
+
+
+
+def get_asn_ip_ranges(asn):
+ """
+ Get IP ranges for a given ASN using RIPEstat API.
+
+ Args:
+ asn (str): The Autonomous System Number (ASN).
+
+ Returns:
+ list: A list of IP ranges for the given ASN.
+ """
+ url = f"https://stat.ripe.net/data/announced-prefixes/data.json?resource=AS{asn}"
+ response = requests.get(url)
+ data = response.json()
+
+ ip_ranges = []
+ if "data" in data and "prefixes" in data["data"]:
+ for item in data["data"]["prefixes"]:
+ ip_ranges.append(item["prefix"])
+ # 创建一个空列表来存储仅 IPv4 地址
+ ipv4_list = []
+
+ for ip in ip_ranges:
+ try:
+ # 尝试将字符串解析为 IP 网络地址
+ network = ipaddress.ip_network(ip, strict=False)
+ # 检查是否为 IPv4 地址
+ if type(network) is ipaddress.IPv4Network:
+ ipv4_list.append(ip)
+ except ValueError:
+ # 如果地址无效,跳过此地址
+ continue
+ return ipv4_list
+
+
+def run_masscan(ip_list_path, ports, rate='10000', output_path='data/tmp/masscan_output.json'):
+ # 将端口列表转换为逗号分隔的字符串
+ # ports = ','.join(str(port) for port in ports)
+
+ # 设置 Masscan 命令
+ masscan_cmd = [
+ 'masscan',
+ '-p', ports,
+ '--rate', rate,
+ '-iL', ip_list_path, # 使用文件路径
+ '--wait', '5',
+ '-oJ', output_path # 输出为 JSON 格式
+ ]
+
+ # 执行 Masscan 扫描
+ print(f"Running masscan with command: {' '.join(masscan_cmd)}")
+ subprocess.run(masscan_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ with open(output_path, 'r') as file:
+ data = json.load(file)
+ port_ip_dict={}
+
+ # 遍历 JSON 数据中的每个条目
+ for entry in data:
+ ip = entry['ip']
+ for port_info in entry['ports']:
+ port = port_info['port']
+ # 将 IP 地址添加到对应端口的列表中
+ if port in port_ip_dict:
+ port_ip_dict[port].append(ip)
+ else:
+ port_ip_dict[port] = [ip]
+
+ # 打印每个端口及其对应的 IP 地址
+ for port, ips in port_ip_dict.items():
+ print(f"Port {port}: {ips}")
+ # 读取 Masscan 输出文件并解析 JSON
+ #temp_output_path.unlink()
+ # try:
+ # subprocess.run(['rm', output_path], check=True)
+ # print(f"The file {output_path} has been deleted.")
+ # except subprocess.CalledProcessError:
+ # print(f"Failed to delete the file {output_path}.")
+ #subprocess.run(f'rm {temp_output_path}')
+ return port_ip_dict
+
+
+
+def run_zgrab2(port_ip_dict, outfile, tmp_path='/tmp/'):
+ # 将端口列表转换为逗号分隔的字符串
+ # ports = ','.join(str(port) for port in ports)
+
+ # 设置 Zgrab 命令
+ for port, ips in port_ip_dict.items():
+ with open(tmp_path+f'ips_with_{port}.txt', 'w') as file:
+ for ip in ips:
+ file.write(ip + '\n')
+ strport=str(port)
+ zgrab2_cmd = [
+ 'zgrab2',
+ 'tls',
+ '-p', strport,
+ '-f', tmp_path+f'ips_with_{port}.txt',
+ '-o', tmp_path+f'zgrab_ips_with_{port}.json', # 使用文件路径
+ ]
+ # 执行 zgrab2
+ print(f"Running zgrab2 with command: {' '.join(zgrab2_cmd)}")
+ result=subprocess.run(zgrab2_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ # 打印输出和错误信息
+ if result.stdout:
+ print("ZGrab2 Output:")
+ print(result.stdout.decode())
+ if result.stderr:
+ print("ZGrab2 Error:")
+ print(result.stderr.decode())
+ json2csv(port_ip_dict,tmp_path+f'zgrab_ips_with_{port}.json','./'+ outfile)
+ subprocess.run(['rm',tmp_path+f'zgrab_ips_with_{port}.json'])
+ subprocess.run(['rm',tmp_path+f'ips_with_{port}.txt'])
+
+
+
+def json2csv(port_ip_dict,input_json_file,output_csv_file):
+ def extract_json_objects(file_path):
+ with open(file_path, 'r', encoding='utf-8') as file:
+ obj_str = '' # 用于存储当前读取的 JSON 对象字符串
+ depth = 0 # 用于跟踪括号深度
+ for char in file.read():
+ if char == '{':
+ depth += 1
+ if depth > 0:
+ obj_str += char
+ if char == '}':
+ depth -= 1
+ if depth == 0:
+ # 当达到闭合的大括号时,尝试解析 JSON 对象
+ try:
+ yield json.loads(obj_str)
+ obj_str = '' # 重置字符串以用于下一个对象
+ except json.JSONDecodeError as e:
+ print(f"Error decoding JSON object: {e}")
+ # 如果出现解码错误,跳过当前对象
+ obj_str = ''
+
+ for port, ips in port_ip_dict.items():
+ # 准备 CSV 文件
+ with open(output_csv_file, 'w', newline='', encoding='utf-8') as csvfile:
+ fieldnames = ['ip','port',
+ 'cert_sha1', 'cert_serial_num', 'cert_san', 'cert_cn',
+ 'cert_issuer', 'cert_issuer_country', 'cert_issuer_organization',
+ 'cert_subject', 'cert_subject_country', 'cert_subject_organization',
+ 'cert_start_time', 'cert_expire_time'
+ ]
+ writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
+ writer.writeheader()
+
+ for item in extract_json_objects(input_json_file):
+ # 处理 json_obj
+
+ tls_data = item.get('data', {}).get('tls', {})
+ if tls_data.get('status') == 'success':
+ result = tls_data.get('result', {})
+ cert_info = result.get('handshake_log', {}).get('server_certificates', {}).get('certificate', {}).get('parsed', {})
+
+ # 提取所需字段
+ cert_row = {
+ 'ip': item.get('ip'),
+ 'port': port,
+ 'cert_sha1': cert_info.get('fingerprint_sha1'),
+ 'cert_serial_num': cert_info.get('serial_number'),
+ 'cert_san': ','.join(cert_info.get('extensions', {}).get('subject_alt_name', {}).get('dns_names', [])),
+ 'cert_cn': ','.join(cert_info.get('subject', {}).get('common_name', [])),
+ 'cert_issuer': ','.join(cert_info.get('issuer', {}).get('common_name', [])),
+ 'cert_issuer_country': ','.join(cert_info.get('issuer', {}).get('country', [])),
+ 'cert_issuer_organization': ','.join(cert_info.get('issuer', {}).get('organization', [])),
+ 'cert_subject': cert_info.get('subject_dn'),
+ 'cert_subject_country': ','.join(cert_info.get('subject', {}).get('country', [])),
+ 'cert_subject_organization': ','.join(cert_info.get('subject', {}).get('organization', [])),
+ 'cert_start_time': cert_info.get('validity', {}).get('start'),
+ 'cert_expire_time': cert_info.get('validity', {}).get('end')
+ }
+
+ # 写入 CSV
+ writer.writerow(cert_row)
+ print(f"{output_csv_file} has been created.")
+
+
+def search_ip_by_ports(ports: object, outfile: object, ip_file: object = None, asns: object = None, rate: object = 10000, port_logic='or') -> object:
+ """
+ Get IP with specific opening ports for a given ASN or a given file
+ """
+ # if ip_file is not None, get iplist from ip_file; if asns is not None, get iplist from asns; else, return error
+ tmp_path = 'data/tmp/iplist.txt'
+ ip_ranges = []
+ if ip_file is not None:
+ if not os.path.isfile(ip_file):
+ print(f"IP list file not found: {ip_file}")
+ exit(1)
+ # 读取文件中的 IP 地址
+ with open(ip_file, 'r', encoding='utf-8') as file:
+ ip_ranges = file.read().splitlines()
+ print("Complete getting ip ranges from file!")
+
+ elif asns is not None:
+ for asn in asns:
+ ip_ranges.extend(get_asn_ip_ranges(asn))
+ # 检查文件是否存在以及是否以换行符结尾
+ print("Complete getting ip ranges from asn!")
+
+ else:
+ print("Please provide an IP list file or an AS number. Using parameter ip_file or asns")
+ exit(1)
+
+ # ip_ranges 保存
+ with open(tmp_path, 'w', encoding='utf-8') as file:
+ for ip_range in ip_ranges:
+ file.write(ip_range + '\n')
+
+ if rate:
+ port_ip_dict=run_masscan(tmp_path, ports, str(rate))
+ else:
+ port_ip_dict=run_masscan(tmp_path, ports)
+
+ # get values in port_ip_dict and get intersection
+ selected_ip_list = []
+ for port, ips in port_ip_dict.items():
+ if len(selected_ip_list) == 0:
+ selected_ip_list = ips
+ continue
+ else:
+ if port_logic == 'and':
+ selected_ip_list = set(selected_ip_list).intersection(ips)
+ elif port_logic == 'or':
+ selected_ip_list = set(selected_ip_list).union(ips)
+ selected_ip_list = list(selected_ip_list)
+
+ if outfile is not None:
+ # 创建文件并保存
+ with open(outfile, 'w', encoding='utf-8') as file:
+ for ip in selected_ip_list:
+ file.write(ip + '\n')
+
+ return selected_ip_list
+
+
+
+
+
+if __name__ == '__main__':
+ ip_filtered_by_ports = search_ip_by_ports(ports='554, 22, 53', port_logic='or', asns=['14061'], outfile='data/tmp/filtered_ip.csv', rate='5000') \ No newline at end of file
diff --git a/detection/tool/Config.py b/detection/tool/Config.py
index 214c3c9..ca23e5f 100644
--- a/detection/tool/Config.py
+++ b/detection/tool/Config.py
@@ -3,19 +3,31 @@
# @Time : 2024/1/8 12:36
# @author : yinjinagyi
# @File : Config.py
-# @Function: read config.yaml file and manage all configurations
+# @Function: read config23.10.yaml file and manage all configurations
import yaml
import os
import sys
+
+import LoggingTool
+
sys.path.append('..')
from tool.Functions import get_project_path
+logger = LoggingTool.Logger().getLogger()
+
class Config:
def __init__(self):
- # read config.yaml file
- self.config = self.read_yaml(get_project_path() + '/config.yaml')
+ # read version information from version.txt, and choose yaml file according to version
+ with open(get_project_path() + '/version.txt', 'r') as f:
+ version = f.read().strip()
+ # 如果文件不存在则报错,日志写入日志文件
+ if not os.path.exists(get_project_path() + '/config{}.yaml'.format(version)):
+ logger.error('config{}.yaml file does not exist'.format(version))
+ sys.exit()
+
+ self.config = self.read_yaml(get_project_path() + '/config{}.yaml'.format(version))
# read yaml file
diff --git a/detection/tool/Functions.py b/detection/tool/Functions.py
index abcee51..e8dc191 100644
--- a/detection/tool/Functions.py
+++ b/detection/tool/Functions.py
@@ -462,6 +462,85 @@ def get_project_path():
return os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
+def get_start_end_ip(ip_range_string):
+ if '-' in ip_range_string:
+ start_ip, end_ip = ip_range_string.split('-')
+ elif '/' in ip_range_string:
+ network = ipaddress.IPv4Network(ip_range_string, strict=False)
+ start_ip = str(network.network_address)
+ end_ip = str(network.broadcast_address)
+ elif ipaddress.ip_address(ip_range_string):
+ start_ip = ip_range_string
+ end_ip = ip_range_string
+
+ if is_valid_public_ip(start_ip) & is_valid_public_ip(end_ip):
+ return start_ip, end_ip
+ return
+
+
+
+def is_valid_public_ip(ip):
+ try:
+ ip_obj = ipaddress.ip_address(ip)
+ if (ip_obj!=None) & (ip_obj.is_global):
+ return True
+ else:
+ return False
+ except ValueError:
+ return False
+
+
+def get_all_ips(start_ip, end_ip):
+ result = []
+ start_ip_obj = ipaddress.ip_address(start_ip)
+ end_ip_obj = ipaddress.ip_address(end_ip)
+
+ # 遍历起始IP和结束IP之间的所有IP地址
+ current_ip = start_ip_obj
+ while current_ip <= end_ip_obj:
+ result.append(str(current_ip))
+ current_ip += 1
+ return result
+
+
+def ipranges_to_ips(ip_ranges):
+ """
+ Discription: 将ipranges转换为ip
+ :param file_path: ipranges
+ :return: ip列表
+ """
+ ip_list = []
+ for ip_range in ip_ranges:
+ result = get_start_end_ip(ip_range)
+ if len(result) == 2:
+ start_ip, end_ip = result
+ ip_list.extend(get_all_ips(start_ip, end_ip))
+ ip_list = list(set(ip_list))
+ ip_list.sort(key=lambda x: struct.unpack("!L", socket.inet_aton(x))[0])
+
+ return ip_list
+
+
+
+if __name__ == '__main__':
+ # 读取数据
+ ipranges_file_path = '/Users/joy/work/iie/project/cyber_narrator/CN/3-新功能研发/vpn-detection/2.分析脚本/主动爬取/AS14061_IPrange.txt'
+ output_file_path = '/Users/joy/work/iie/project/cyber_narrator/CN/3-新功能研发/vpn-detection/2.分析脚本/主动爬取/AS14061_IPlists.txt'
+
+ with open(ipranges_file_path, 'r') as file:
+ lines = file.readlines()
+ ranges = [i.strip('\n') for i in lines]
+ ip_ranges = [get_start_end_ip(ip_range) for ip_range in ranges]
+ ip_list = ipranges_to_ips(ip_ranges)
+
+ # ip_list保存到文件
+ with open(output_file_path, 'w') as file:
+ file.write('\n'.join(ip_list))
+
+
+
+
+
diff --git a/detection/tool/__pycache__/Config.cpython-39.pyc b/detection/tool/__pycache__/Config.cpython-39.pyc
index f1f042c..5a14ac8 100644
--- a/detection/tool/__pycache__/Config.cpython-39.pyc
+++ b/detection/tool/__pycache__/Config.cpython-39.pyc
Binary files differ