diff options
| author | shihaoyue <[email protected]> | 2024-09-20 09:07:10 +0800 |
|---|---|---|
| committer | shihaoyue <[email protected]> | 2024-09-20 09:07:10 +0800 |
| commit | 5d07e2a4e2f5e93c9f4699c49cbcb52c38aebbee (patch) | |
| tree | 8f756f0c014cdfc87412224d9569f1e21bb5ff19 /server/apps/task.py | |
| parent | 78575c5a7322693359d35c4f3d6e9d9698c5188e (diff) | |
# 重大更新 自动化任务,极其不稳定
Diffstat (limited to 'server/apps/task.py')
| -rw-r--r-- | server/apps/task.py | 141 |
1 files changed, 130 insertions, 11 deletions
diff --git a/server/apps/task.py b/server/apps/task.py index 430b38f..3717e76 100644 --- a/server/apps/task.py +++ b/server/apps/task.py @@ -5,14 +5,17 @@ import uuid from apiflask import APIBlueprint, Schema from apiflask.fields import String, Integer, IP, DateTime, List, Nested from apiflask.validators import OneOf -from sqlalchemy import and_, desc +from flask import current_app +from sqlalchemy import and_, desc, select from sqlalchemy.exc import SQLAlchemyError -from model import Task, Agent -from .agentcomm import deliver_task, stop_task +from .target import start_task_monitoring, target_GZ, task_monitoring, stop_task_monitoring +from model import Target, Task, Agent, TargetStatus +from .agentcomm import deliver_task, stop_task_deliver from .policy import * -from .util import error +from .util import error, is_ipaddress, debug +from exts import db, scheduler bp = APIBlueprint("任务管理接口集合", __name__, url_prefix="/task") @@ -101,7 +104,6 @@ def valid_task_info(task_param: dict): "code": Integer(), "msg": String() }) -# TODO:TARGET处理 # TODO: 需要更新接口,created_by def make_task(json_data): task = Task( @@ -116,7 +118,7 @@ def make_task(json_data): task_delay=json_data.get("run_time"), target_scan=json_data.get("scan"), target_domain=json_data.get("target_domain"), - target_rtype="", + # target_rtype=, target_rr=json_data.get("target_rr") ) if task.ptype == "sjqp": @@ -134,6 +136,8 @@ def make_task(json_data): target = db.session.query(Target).filter(Target.addrv4==Ip).first() elif ip_version == 6: target = db.session.query(Target).filter(Target.addrv6==Ip).first() + + # 不存在该目标的相关记录,自动探测并存入数据库 if not target: target = target_GZ(Ip) @@ -158,6 +162,32 @@ def make_task(json_data): # 任务策略初始化 task_policy = init_task_policy(task) + # 创建锚点 + task_monitoring(task) + + # 初始化决策中心 + db.session.close() + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + status = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first() + + req = { + "clxz": task_policy.Policy.p_type, + "script": task_policy.Policy.p_name, + "mbgz": json.loads(task.target.protect), + "ztgz": { + "icmp": status.icmp, + "tcp": status.tcp, + "dns": status.dns, + "record": status.recorde, + }, + "para": task_policy.policy_param + } + url = f"http://localhost:12535/initial" + response = requests.post(url, json=req) + + # 开启监控 + start_task_monitoring(task) + start_policy_change_timer(task) # 根据状态判断是否立刻执行 if task.status == "working": @@ -168,6 +198,84 @@ def make_task(json_data): return {"code": 200, "msg": "ok"} +def policy_change_timer(task): + with scheduler.app.app_context(): + task_policy=db.session.query(TaskPolicy).get(task.task_policies[-1].tp_id) + if not effective_detection(task_policy=task_policy): + adjust_task(task=task) + +# 任务切换倒计时 +# 从任务下发开始计算,倒计时三百秒切换任务 +def start_policy_change_timer(task): + with scheduler.app.app_context(): + scheduler.add_job( + func=policy_change_timer, # 要执行的函数 + trigger='interval', # 触发器类型为间隔 + args = (task, ), # 传递给函数的参数 + id = f"{task.task_id}chnage", + seconds = 300 + ) + +def stop_policy_change_timer(task): + scheduler.remove_job(f"{task.task_id}chnage") + +# 任务成功检测 +def effective_detection(task_policy): + with scheduler.app.app_context(): + debug("检测中") + base = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.asc()).first() + now = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first() + + rec = None + p_type = db.session.query(Policy).filter_by(p_id=task_policy.policy).first().p_type + if p_type=="ddos": + debug("是ddos") + target_scan = db.session.query(Task).filter_by(task_id = task_policy.for_task).first().target_scan + if target_scan=="auto" or target_scan=="icmp": + rec = base.icmp*5 < now.icmp + elif target_scan=="tcp": + rec = base.tcp*5 < now.tcp + elif target_scan=="dns": + rec = base.dns*5 < now.dns + elif p_type=="sjqp": + rec = base.recorde!=now.recorde + pass + debug(f"能行吗{rec}:是{p_type}") + return rec + +def adjust_task(task): + with scheduler.app.app_context(): + debug("再试试") + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + stop_task_deliver(task_policy) + center_process_unit(task) + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + task.status = "working" + db.session.commit() + deliver_task(task_policy) + + +def stop_task(task): + with scheduler.app.app_context(): + debug("暂停啦啦") + task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first() + task.status = "stopped" + stop_task_monitoring(task) + stop_policy_change_timer(task) + stop_task_deliver(task_policy) + db.session.commit() + +def finish_task(task): + with scheduler.app.app_context(): + task = db.session.query(Task).filter_by(task_id = task.task_id).first() + debug("成功啦|失败!") + task_policy = task.task_policies[-1] + task.status = "finish" + stop_task_monitoring(task) + stop_policy_change_timer(task) + stop_task_deliver(task_policy) + db.session.commit() + operation_map = {"start": "开始", "stop": "暂停"} status_map = {"working": "执行中", "stopped": "暂停"} # 合法的状态转移逻辑,例如,暂停的任务不能开始,正在执行的任务不能停止 @@ -220,7 +328,8 @@ def handle_task_operation(json_data): @bp.doc("任务列表信息获取接口") @bp.input({ "page": Integer(load_default=1), - "per_page": Integer(load_default=10) + "per_page": Integer(load_default=10), + "task_id":String(required=False) }, location="query") @bp.output({ "code": Integer(), @@ -231,8 +340,11 @@ def handle_task_operation(json_data): def tasks_state(query_data): per_page = query_data["per_page"] page = query_data["page"] - - query = db.session.query(Task).filter().order_by(Task.created_time.desc()) + task_id = query_data.get("task_id") + if task_id: + query = db.session.query(Task).filter_by(task_id = task_id) + else: + query = db.session.query(Task).filter().order_by(Task.created_time.desc()) task_count = query.count() tasks = query.offset((page - 1) * per_page).limit(per_page).all() @@ -243,7 +355,7 @@ def tasks_state(query_data): task_r["target"] = f"{task.target.addrv4}|{task.target.addrv6}" task_r["name"] = task.task_name task_r["agent"] = task.agent_id - task_r["target_domain"] = task.target_domain, + task_r["target_domain"] = task.target_domain task_r["target_rr"] = task.target_rr task_r["policy"] = task.ptype task_r["create_time"] = task.created_time @@ -316,7 +428,14 @@ def task_info(query_data): "record": delay.recorde }) - + 策略 = { + "应答率": { + "策略名": 60 + }, + "篡改成功率": { + "策略名": 25 + } + } return {"code": 200, "data": task_state_list} |
