diff options
| author | shihaoyue <[email protected]> | 2024-09-20 09:07:10 +0800 |
|---|---|---|
| committer | shihaoyue <[email protected]> | 2024-09-20 09:07:10 +0800 |
| commit | 5d07e2a4e2f5e93c9f4699c49cbcb52c38aebbee (patch) | |
| tree | 8f756f0c014cdfc87412224d9569f1e21bb5ff19 | |
| parent | 78575c5a7322693359d35c4f3d6e9d9698c5188e (diff) | |
# 重大更新 自动化任务,极其不稳定
| -rw-r--r-- | agent/apps/target_gz.py | 7 | ||||
| -rw-r--r-- | server/apps/agentcomm.py | 6 | ||||
| -rw-r--r-- | server/apps/policy.py | 131 | ||||
| -rw-r--r-- | server/apps/target.py | 205 | ||||
| -rw-r--r-- | server/apps/task.py | 141 |
5 files changed, 404 insertions, 86 deletions
diff --git a/agent/apps/target_gz.py b/agent/apps/target_gz.py index d3287d8..19bd912 100644 --- a/agent/apps/target_gz.py +++ b/agent/apps/target_gz.py @@ -20,5 +20,8 @@ def execute_command(IP): encoding='utf-8') output = proc.communicate()[0].strip() - - return output
\ No newline at end of file + try: + json.loads(output) + return output, 200 + except: + return 500
\ No newline at end of file diff --git a/server/apps/agentcomm.py b/server/apps/agentcomm.py index f9b11d1..1277768 100644 --- a/server/apps/agentcomm.py +++ b/server/apps/agentcomm.py @@ -8,7 +8,7 @@ from apiflask.fields import String, Integer, List, Nested, Boolean, DateTime, Fl from apiflask.validators import OneOf from flask import request -from .util import error +from .util import error, debug from model import Agent, TaskLog, Policy, TaskPolicy from exts import db @@ -114,13 +114,13 @@ def deliver_task(task_policy): error(str(e)) return str(e) -def stop_task(task_policy): +def stop_task_deliver(task_policy): agent = db.session.query(Agent).get(task_policy.task.agent_id) ip, port = agent.ipaddr.split("|")[0], agent.port try: res = requests.post(f"http://{ip}:{port}/script/stop/{str(task_policy.tp_id)}", timeout=3) if res.status_code == 200: - task_policy.task.status = "stopped" + debug("停止任务") return None error("任务停止错误,code: " + str(res.status_code) + " err: " + str(res.text)) return res.text diff --git a/server/apps/policy.py b/server/apps/policy.py index f74267c..4dbc3bb 100644 --- a/server/apps/policy.py +++ b/server/apps/policy.py @@ -3,7 +3,8 @@ import random import requests -from exts import db +from apps.util import debug +from exts import db, scheduler from model import Policy, TargetStatus, TaskPolicy # --------------------------策略----------------------------- @@ -31,10 +32,10 @@ def chiose_policy(task): def copy_task_policy(task): - last_policy = task.task_policies[-1].Policy + last_policy = task.task_policies[-1] task_policy = TaskPolicy( - policy = last_policy.p_id, + policy = last_policy.policy, policy_param = last_policy.policy_param, for_task = last_policy.for_task ) @@ -69,6 +70,7 @@ def init_task_policy(task): # 自动task_policy调整 def center_process_unit(task): + db.session.close() task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() status = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first() req = { @@ -83,16 +85,34 @@ def center_process_unit(task): }, "para": task_policy.policy_param } - url = f"http://localhost:12535" - response = requests.get(url, data = req) - # data = response.json() - data = { - "mode": "keep/parameter/script", - "script": None, - "parameter":{ - "n": 6, + url = f"http://localhost:12535/adjustment" + response = requests.post(url, json = req) + data = response.json() + import random + + data_options = [ + { + "mode": "script", + "script": "V6 DDOS", + "parameter": { + "n": str(random.randint(10000, 30000)), + }, }, - } + { + "mode": "script", + "script": "V6 数据篡改", + "parameter": { + }, + } + ] + + # 随机选择一个 data + data = random.choices(data_options, weights = [0.6, 0.4])[0] + + debug(data) + policy = "" + p_payload = {} + db.session.close() if data["mode"]== "keep": copy_task_policy(task) elif data["mode"] == "parameter": @@ -101,6 +121,9 @@ def center_process_unit(task): elif data["mode"] == "script": policy = db.session.query(Policy).filter_by(p_name = data["script"]).first().p_id p_payload = db.session.query(Policy).filter_by(p_name = data["script"]).first().p_payload + debug("新策略或者新参数!!!!!!!!!!!!!!!") + debug(policy) + debug(p_payload) task_policy = TaskPolicy( policy = policy, policy_param = p_payload, @@ -116,49 +139,61 @@ def center_process_unit(task): # -----------------------------------------不可调整参数------------------------------------------------ def target2polocy(task_policy): - commmand = task_policy.Policy.p_payload - # 固定 - # <target> - commmand.replace("<target>", task_policy.task.target.addrv4) - # 篡改 - # <tamper> 目标域名 (DoH DoT) - commmand.replace("<target_domain>", task_policy.task.target_domain) - # 注入 - # <inject> 目标域名 (DoH DoT) - # <ns> 要注入的记录 (DoH DoT) - commmand.replace("<record>", task_policy.task.target_rr) - task_policy.policy_param = commmand - - db.session.add(task_policy) - db.session.commit() + with scheduler.app.app_context(): + task_policy = db.session.query(TaskPolicy).filter_by(tp_id = task_policy.tp_id).first() + commmand = task_policy.Policy.p_payload + # 固定 + # <target> + commmand = commmand.replace("<target>", task_policy.task.target.addrv4) + # 篡改 + # <tamper> 目标域名 (DoH DoT) + commmand = commmand.replace("<target_domain>", task_policy.task.target_domain) + # 注入 + # <inject> 目标域名 (DoH DoT) + # <ns> 要注入的记录 (DoH DoT) + commmand = commmand.replace("<record>", task_policy.task.target_rr) + task_policy.policy_param = commmand + + # db.session.add(task_policy) + db.session.commit() # ----------------------------------------可调整参数----------------------------------------------------- # 生成初始参数 def generate_parameters(task_policy): - commmand = task_policy.Policy.p_payload - # 调整 - # DDoS - # <n> 启动请求发送的进程数(DoT DoH) 包数 (DNSSEC v6) - if task_policy.Policy.p_proto=="DOH" or task_policy.Policy.p_proto=="DOT": - commmand.replace("<n>", '4') - elif task_policy.Policy.p_proto=="DNSSEC" : - commmand.replace("<n>", '30000') - elif task_policy.Policy.p_proto=="IPv6": - commmand.replace("<n>", '10000') - # <r> 发包速率(DNSSEC) - if task_policy.Policy.p_proto=="DNSSEC" : - commmand.replace("<r>", '300') - # <round> 向CND节点发送的请求次数 (DoT) + with scheduler.app.app_context(): - # <wait> pending 总时长 (DoT) - db.session.add(task_policy) - db.session.commit() + task_policy = db.session.query(TaskPolicy).filter_by(tp_id = task_policy.tp_id).first() + + commmand = task_policy.policy_param + # 调整 + # DDoS + # <n> 启动请求发送的进程数(DoT DoH) 包数 (DNSSEC v6) + if task_policy.Policy.p_proto=="DOH" or task_policy.Policy.p_proto=="DOT": + commmand = commmand.replace("<n>", '4') + elif task_policy.Policy.p_proto=="DNSSEC" : + commmand = commmand.replace("<n>", '30000') + elif task_policy.Policy.p_proto=="IPv6": + commmand = commmand.replace("<n>", '10000') + # <r> 发包速率(DNSSEC) + if task_policy.Policy.p_proto=="DNSSEC" : + commmand = commmand.replace("<r>", '300') + # <round> 向CND节点发送的请求次数 (DoT) + + # <wait> pending 总时长 (DoT) + task_policy.policy_param = commmand + # db.session.add(task_policy) + db.session.commit() # 生成调整参数 def adjust_parameters(task_policy, param): - commmand = task_policy.Policy.p_payload - - for k, v in param.items(): - commmand.replace(f"<{k}>", v)
\ No newline at end of file + with scheduler.app.app_context(): + task_policy = db.session.query(TaskPolicy).filter_by(tp_id = task_policy.tp_id).first() + commmand = task_policy.policy_param + for k, v in param.items(): + commmand = commmand.replace(f"<{k}>", v) + + task_policy.policy_param = commmand + db.session.add(task_policy) + db.session.commit()
\ No newline at end of file diff --git a/server/apps/target.py b/server/apps/target.py index 62aca9a..ffd00f2 100644 --- a/server/apps/target.py +++ b/server/apps/target.py @@ -7,6 +7,7 @@ import threading import asyncio from operator import or_ from concurrent.futures import ThreadPoolExecutor +import time from flask import current_app import requests from apiflask import APIBlueprint, Schema @@ -122,6 +123,121 @@ class CouInfo(Schema): title = String() # 总数量 value = Integer() +# from apiflask.fields import Integer, String, List, Nested, Float + +class ClusterInfo(Schema): + lat = Float(required=True) + lng = Float(required=True) + count = Integer(required=True) + +class TargetInfo(Schema): + ipv4 = String(allow_none=True) + ipv6 = String(allow_none=True) + cou = String() + isp = String() + lat = Float() + lng = Float() + time = String() + protocol = List(String()) + protect = String() + + + +def task_monitoring(task): + with scheduler.app.app_context(): + debug("taskmonitor") + target_status = task.task_policies[-1].target_status + debug(target_status) + addr = task.target.addrv4 if task.target.addrv4 else task.target.addrv6 + + nodes_info = json.loads(task.SCAN_AGENT_ID_LIST) + + # 初始化延迟结果列表 + icmp_results = [] + tcp_results = [] + dns_results = [] + + # 执行 ICMP 查询并收集结果 + with ThreadPoolExecutor() as executor: + for id, ip_port in nodes_info.items(): + future = executor.submit(icmp_delay_query, addr, f"{ip_port}") + icmp_results.append(float(future.result())) + + # 执行 TCP 查询并收集结果 + with ThreadPoolExecutor() as executor: + for id, ip_port in nodes_info.items(): + future = executor.submit(tcp_delay_query, addr, f"{ip_port}") + tcp_results.append(float(future.result())) + + # 执行 DNS 查询并收集结果 + with ThreadPoolExecutor() as executor: + for id, ip_port in nodes_info.items(): + future = executor.submit(dns_delay_query, addr, f"{ip_port}") + dns_results.append(float(future.result())) + + # 计算平均值 + icmp_avg = sum(icmp_results) / len(icmp_results) if icmp_results else 0 + tcp_avg = sum(tcp_results) / len(tcp_results) if tcp_results else 0 + dns_avg = sum(dns_results) / len(dns_results) if dns_results else 0 + # id, ip_port = nodes_info.items()[0] + query_data = { + 'rev' : task.target.addrv4 if task.target.addrv4 else task.target.addrv6, + 'domain' : task.target_domain, + 'qtype' :'A' if task.target.addrv4 else "AAAA" + } + target_domain = get_record(query_data) + try: + first_ip = target_domain[0]["rrset"] if target_domain else None + except: + first_ip = "" + target_status = TargetStatus( + tp_id = task.task_policies[-1].tp_id, + icmp = icmp_avg, + tcp = tcp_avg, + dns = dns_avg, + recorde = first_ip, + ) + + db.session.add(target_status) + db.session.commit() + task = db.session.query(Task).get(task.task_id) + last_task_policy = task.task_policies[-1] + + # 现在可以安全地访问 target_status + target_status = last_task_policy.target_status + debug(target_status) + +def dida_task(task, ): + from .task import effective_detection, finish_task + from .task import adjust_task + with scheduler.app.app_context(): + task = db.session.query(Task).filter_by(task_id = task.task_id).first() + task_policy=db.session.query(TaskPolicy).get(task.task_policies[-1].tp_id) + task_monitoring(task) + # 如果任务没有成功 + if not effective_detection(task_policy=task_policy): + debug(task.status) + if task.status == "stopped": + adjust_task(task=task) + else: + finish_task(task) + pass + debug("didadida") + +def start_task_monitoring(task): + + with scheduler.app.app_context(): + scheduler.add_job( + func=dida_task, # 要执行的函数 + trigger='interval', # 触发器类型为间隔 + args = (task, ), # 传递给函数的参数 + id = task.task_id, # 任务的唯一标识符 + seconds = 30, # 触发器的参数,表示每 5 秒执行一次 + max_instances = 100 + ) + +def stop_task_monitoring(task): + scheduler.remove_job(task.task_id) @bp.get("/nodes") @@ -281,7 +397,7 @@ from apiflask.validators import OneOf, ContainsOnly from dns import resolver def get_record(query_data): - # 特殊协议头 + # 特殊协议头 protols = ["https", "tls"] ans = [] # 参数读取 @@ -329,7 +445,7 @@ def get_record(query_data): }) def record(query_data): ans = get_record(query_data) - return {"code": 200, 'ans': ans} + return {"code": 200, 'ans': ans} @bp.get("/") @@ -451,6 +567,10 @@ def map_info(query_data): max_lat = query_data.get("max_lat") # 获取最大纬度 min_lng = query_data.get("min_lng") # 获取最小经度 max_lng = query_data.get("max_lng") # 获取最大经度 + if zoom_level == 0: + res = db.session.query(Target.target_id, Target.lat, Target.lng).all() + res_dict = [{"target_id": row.target_id, "lat": row.lat, "lng": row.lng} for row in res] + return {"code": 200, "data": res_dict, "total": len(res_dict)} if zoom_level <= 10: # 查询目标数据 @@ -531,35 +651,85 @@ def map_info(query_data): @bp.get("/gz") @bp.doc("目标感知") @bp.input({ - "ip": IP(required=True) + "ip": String(required=True) }, location="query") @bp.output({ "code": Integer(), - "dataObject": List(Nested(TargetSchema())), + "dataObject": Nested(TargetSchema()) }) def target_GZ_API(query_data): - target_GZ(query_data["ip"]) + ip = query_data["ip"] + target = target_GZ(ip) + target_dict = { + "addrv4": target.addrv4, + "addrv6": target.addrv6, + "ipv6": target.ipv6, + "dnssec": target.dnssec, + "dot": target.dot, + "doh": target.doh, + "cou": target.cou, + "isp": target.isp, + "lat": target.lat, + "lng": target.lng, + "protect": target.protect, + "doh_domain": target.doh_domain + } return { 'code': 200, - 'MSG': "success" + 'dataObject': target_dict } def target_GZ(IP_addr): - # 获取 - csgz = db.session.query(Agent).filter_by(agent_type = 'gjst').all() + + existing_obj = db.session.query(Target).filter( + (Target.addrv4 == IP_addr) | (Target.addrv6 == IP_addr) + ).first() + if existing_obj: + return existing_obj + + ipv6 = None + ipv4 = None + + # 判断 IP 地址的版本并存储 + if 6 == is_ipaddress(IP_addr): + ipv6 = IP_addr + elif 4 == is_ipaddress(IP_addr): + ipv4 = IP_addr + + # 获取随机的 agent + csgz = db.session.query(Agent).filter_by(agent_type='gjst').all() csgz = random.choice(csgz) - url = f"http://{csgz.ipaddr}:{csgz.port}/target_gz/{IP_addr}" - protect = requests.get(url) - + # 根据 IP 地址类型构建 URL + if ipv6: + # IPv6 地址需要加上中括号 + url = f"http://{csgz.ipaddr}:{csgz.port}/target_gz/[{ipv6}]" + elif ipv4: + url = f"http://{csgz.ipaddr}:{csgz.port}/target_gz/{ipv4}" + else: + raise ValueError("Invalid IP address") + + # 发送请求 + i = 0 + while i < 30: + protect = requests.get(url) + status_code = protect.status_code + debug(f"目标感知:重试{i}次") + i+=1 + if status_code == 200: + break + else: + time.sleep(0.5) + + url = f'https://ipinfo.io/{IP_addr}/json?token=2c3db02b7ffce3' response = requests.get(url) data = response.json() # 存数据库 target = Target( - addrv4 = IP_addr, - addrv6 = False, + addrv4 = ipv4, + addrv6 = ipv6, ipv6 = (6 == is_ipaddress(IP_addr)), dnssec = json.loads(protect.text)['dnssec_enabled'], dot = False, @@ -571,15 +741,6 @@ def target_GZ(IP_addr): protect = protect.text, doh_domain = None ) - if 6 == is_ipaddress(IP_addr): - target.addrv6 = IP_addr - target.ipv6 = True - elif 4== is_ipaddress(IP_addr): - target.addrv4 = IP_addr - - existing_obj = db.session.query(Target).filter_by(addrv4 = target.addrv4).first() - if existing_obj: - db.session.delete(existing_obj) db.session.add(target) db.session.commit() return target
\ No newline at end of file diff --git a/server/apps/task.py b/server/apps/task.py index 430b38f..3717e76 100644 --- a/server/apps/task.py +++ b/server/apps/task.py @@ -5,14 +5,17 @@ import uuid from apiflask import APIBlueprint, Schema from apiflask.fields import String, Integer, IP, DateTime, List, Nested from apiflask.validators import OneOf -from sqlalchemy import and_, desc +from flask import current_app +from sqlalchemy import and_, desc, select from sqlalchemy.exc import SQLAlchemyError -from model import Task, Agent -from .agentcomm import deliver_task, stop_task +from .target import start_task_monitoring, target_GZ, task_monitoring, stop_task_monitoring +from model import Target, Task, Agent, TargetStatus +from .agentcomm import deliver_task, stop_task_deliver from .policy import * -from .util import error +from .util import error, is_ipaddress, debug +from exts import db, scheduler bp = APIBlueprint("任务管理接口集合", __name__, url_prefix="/task") @@ -101,7 +104,6 @@ def valid_task_info(task_param: dict): "code": Integer(), "msg": String() }) -# TODO:TARGET处理 # TODO: 需要更新接口,created_by def make_task(json_data): task = Task( @@ -116,7 +118,7 @@ def make_task(json_data): task_delay=json_data.get("run_time"), target_scan=json_data.get("scan"), target_domain=json_data.get("target_domain"), - target_rtype="", + # target_rtype=, target_rr=json_data.get("target_rr") ) if task.ptype == "sjqp": @@ -134,6 +136,8 @@ def make_task(json_data): target = db.session.query(Target).filter(Target.addrv4==Ip).first() elif ip_version == 6: target = db.session.query(Target).filter(Target.addrv6==Ip).first() + + # 不存在该目标的相关记录,自动探测并存入数据库 if not target: target = target_GZ(Ip) @@ -158,6 +162,32 @@ def make_task(json_data): # 任务策略初始化 task_policy = init_task_policy(task) + # 创建锚点 + task_monitoring(task) + + # 初始化决策中心 + db.session.close() + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + status = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first() + + req = { + "clxz": task_policy.Policy.p_type, + "script": task_policy.Policy.p_name, + "mbgz": json.loads(task.target.protect), + "ztgz": { + "icmp": status.icmp, + "tcp": status.tcp, + "dns": status.dns, + "record": status.recorde, + }, + "para": task_policy.policy_param + } + url = f"http://localhost:12535/initial" + response = requests.post(url, json=req) + + # 开启监控 + start_task_monitoring(task) + start_policy_change_timer(task) # 根据状态判断是否立刻执行 if task.status == "working": @@ -168,6 +198,84 @@ def make_task(json_data): return {"code": 200, "msg": "ok"} +def policy_change_timer(task): + with scheduler.app.app_context(): + task_policy=db.session.query(TaskPolicy).get(task.task_policies[-1].tp_id) + if not effective_detection(task_policy=task_policy): + adjust_task(task=task) + +# 任务切换倒计时 +# 从任务下发开始计算,倒计时三百秒切换任务 +def start_policy_change_timer(task): + with scheduler.app.app_context(): + scheduler.add_job( + func=policy_change_timer, # 要执行的函数 + trigger='interval', # 触发器类型为间隔 + args = (task, ), # 传递给函数的参数 + id = f"{task.task_id}chnage", + seconds = 300 + ) + +def stop_policy_change_timer(task): + scheduler.remove_job(f"{task.task_id}chnage") + +# 任务成功检测 +def effective_detection(task_policy): + with scheduler.app.app_context(): + debug("检测中") + base = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.asc()).first() + now = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first() + + rec = None + p_type = db.session.query(Policy).filter_by(p_id=task_policy.policy).first().p_type + if p_type=="ddos": + debug("是ddos") + target_scan = db.session.query(Task).filter_by(task_id = task_policy.for_task).first().target_scan + if target_scan=="auto" or target_scan=="icmp": + rec = base.icmp*5 < now.icmp + elif target_scan=="tcp": + rec = base.tcp*5 < now.tcp + elif target_scan=="dns": + rec = base.dns*5 < now.dns + elif p_type=="sjqp": + rec = base.recorde!=now.recorde + pass + debug(f"能行吗{rec}:是{p_type}") + return rec + +def adjust_task(task): + with scheduler.app.app_context(): + debug("再试试") + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + stop_task_deliver(task_policy) + center_process_unit(task) + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + task.status = "working" + db.session.commit() + deliver_task(task_policy) + + +def stop_task(task): + with scheduler.app.app_context(): + debug("暂停啦啦") + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + task.status = "stopped" + stop_task_monitoring(task) + stop_policy_change_timer(task) + stop_task_deliver(task_policy) + db.session.commit() + +def finish_task(task): + with scheduler.app.app_context(): + task = db.session.query(Task).filter_by(task_id = task.task_id).first() + debug("成功啦|失败!") + task_policy = task.task_policies[-1] + task.status = "finish" + stop_task_monitoring(task) + stop_policy_change_timer(task) + stop_task_deliver(task_policy) + db.session.commit() + operation_map = {"start": "开始", "stop": "暂停"} status_map = {"working": "执行中", "stopped": "暂停"} # 合法的状态转移逻辑,例如,暂停的任务不能开始,正在执行的任务不能停止 @@ -220,7 +328,8 @@ def handle_task_operation(json_data): @bp.doc("任务列表信息获取接口") @bp.input({ "page": Integer(load_default=1), - "per_page": Integer(load_default=10) + "per_page": Integer(load_default=10), + "task_id":String(required=False) }, location="query") @bp.output({ "code": Integer(), @@ -231,8 +340,11 @@ def handle_task_operation(json_data): def tasks_state(query_data): per_page = query_data["per_page"] page = query_data["page"] - - query = db.session.query(Task).filter().order_by(Task.created_time.desc()) + task_id = query_data.get("task_id") + if task_id: + query = db.session.query(Task).filter_by(task_id = task_id) + else: + query = db.session.query(Task).filter().order_by(Task.created_time.desc()) task_count = query.count() tasks = query.offset((page - 1) * per_page).limit(per_page).all() @@ -243,7 +355,7 @@ def tasks_state(query_data): task_r["target"] = f"{task.target.addrv4}|{task.target.addrv6}" task_r["name"] = task.task_name task_r["agent"] = task.agent_id - task_r["target_domain"] = task.target_domain, + task_r["target_domain"] = task.target_domain task_r["target_rr"] = task.target_rr task_r["policy"] = task.ptype task_r["create_time"] = task.created_time @@ -316,7 +428,14 @@ def task_info(query_data): "record": delay.recorde }) - + 策略 = { + "应答率": { + "策略名": 60 + }, + "篡改成功率": { + "策略名": 25 + } + } return {"code": 200, "data": task_state_list} |
