summaryrefslogtreecommitdiff
path: root/server/apps/task.py
diff options
context:
space:
mode:
authorshihaoyue <[email protected]>2024-09-20 09:07:10 +0800
committershihaoyue <[email protected]>2024-09-20 09:07:10 +0800
commit5d07e2a4e2f5e93c9f4699c49cbcb52c38aebbee (patch)
tree8f756f0c014cdfc87412224d9569f1e21bb5ff19 /server/apps/task.py
parent78575c5a7322693359d35c4f3d6e9d9698c5188e (diff)
# 重大更新 自动化任务,极其不稳定
Diffstat (limited to 'server/apps/task.py')
-rw-r--r--server/apps/task.py141
1 files changed, 130 insertions, 11 deletions
diff --git a/server/apps/task.py b/server/apps/task.py
index 430b38f..3717e76 100644
--- a/server/apps/task.py
+++ b/server/apps/task.py
@@ -5,14 +5,17 @@ import uuid
from apiflask import APIBlueprint, Schema
from apiflask.fields import String, Integer, IP, DateTime, List, Nested
from apiflask.validators import OneOf
-from sqlalchemy import and_, desc
+from flask import current_app
+from sqlalchemy import and_, desc, select
from sqlalchemy.exc import SQLAlchemyError
-from model import Task, Agent
-from .agentcomm import deliver_task, stop_task
+from .target import start_task_monitoring, target_GZ, task_monitoring, stop_task_monitoring
+from model import Target, Task, Agent, TargetStatus
+from .agentcomm import deliver_task, stop_task_deliver
from .policy import *
-from .util import error
+from .util import error, is_ipaddress, debug
+from exts import db, scheduler
bp = APIBlueprint("任务管理接口集合", __name__, url_prefix="/task")
@@ -101,7 +104,6 @@ def valid_task_info(task_param: dict):
"code": Integer(),
"msg": String()
})
-# TODO:TARGET处理
# TODO: 需要更新接口,created_by
def make_task(json_data):
task = Task(
@@ -116,7 +118,7 @@ def make_task(json_data):
task_delay=json_data.get("run_time"),
target_scan=json_data.get("scan"),
target_domain=json_data.get("target_domain"),
- target_rtype="",
+ # target_rtype=,
target_rr=json_data.get("target_rr")
)
if task.ptype == "sjqp":
@@ -134,6 +136,8 @@ def make_task(json_data):
target = db.session.query(Target).filter(Target.addrv4==Ip).first()
elif ip_version == 6:
target = db.session.query(Target).filter(Target.addrv6==Ip).first()
+
+
# 不存在该目标的相关记录,自动探测并存入数据库
if not target:
target = target_GZ(Ip)
@@ -158,6 +162,32 @@ def make_task(json_data):
# 任务策略初始化
task_policy = init_task_policy(task)
+ # 创建锚点
+ task_monitoring(task)
+
+ # 初始化决策中心
+ db.session.close()
+ task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first()
+ status = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first()
+
+ req = {
+ "clxz": task_policy.Policy.p_type,
+ "script": task_policy.Policy.p_name,
+ "mbgz": json.loads(task.target.protect),
+ "ztgz": {
+ "icmp": status.icmp,
+ "tcp": status.tcp,
+ "dns": status.dns,
+ "record": status.recorde,
+ },
+ "para": task_policy.policy_param
+ }
+ url = f"http://localhost:12535/initial"
+ response = requests.post(url, json=req)
+
+ # 开启监控
+ start_task_monitoring(task)
+ start_policy_change_timer(task)
# 根据状态判断是否立刻执行
if task.status == "working":
@@ -168,6 +198,84 @@ def make_task(json_data):
return {"code": 200, "msg": "ok"}
+def policy_change_timer(task):
+ with scheduler.app.app_context():
+ task_policy=db.session.query(TaskPolicy).get(task.task_policies[-1].tp_id)
+ if not effective_detection(task_policy=task_policy):
+ adjust_task(task=task)
+
+# 任务切换倒计时
+# 从任务下发开始计算,倒计时三百秒切换任务
+def start_policy_change_timer(task):
+ with scheduler.app.app_context():
+ scheduler.add_job(
+ func=policy_change_timer, # 要执行的函数
+ trigger='interval', # 触发器类型为间隔
+ args = (task, ), # 传递给函数的参数
+ id = f"{task.task_id}chnage",
+ seconds = 300
+ )
+
+def stop_policy_change_timer(task):
+ scheduler.remove_job(f"{task.task_id}chnage")
+
+# 任务成功检测
+def effective_detection(task_policy):
+ with scheduler.app.app_context():
+ debug("检测中")
+ base = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.asc()).first()
+ now = db.session.query(TargetStatus).filter_by(tp_id=task_policy.tp_id).order_by(TargetStatus.time.desc()).first()
+
+ rec = None
+ p_type = db.session.query(Policy).filter_by(p_id=task_policy.policy).first().p_type
+ if p_type=="ddos":
+ debug("是ddos")
+ target_scan = db.session.query(Task).filter_by(task_id = task_policy.for_task).first().target_scan
+ if target_scan=="auto" or target_scan=="icmp":
+ rec = base.icmp*5 < now.icmp
+ elif target_scan=="tcp":
+ rec = base.tcp*5 < now.tcp
+ elif target_scan=="dns":
+ rec = base.dns*5 < now.dns
+ elif p_type=="sjqp":
+ rec = base.recorde!=now.recorde
+ pass
+ debug(f"能行吗{rec}:是{p_type}")
+ return rec
+
+def adjust_task(task):
+ with scheduler.app.app_context():
+ debug("再试试")
+ task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first()
+ stop_task_deliver(task_policy)
+ center_process_unit(task)
+ task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first()
+ task.status = "working"
+ db.session.commit()
+ deliver_task(task_policy)
+
+
+def stop_task(task):
+ with scheduler.app.app_context():
+ debug("暂停啦啦")
+ task_policy = db.session.query(TaskPolicy).filter_by(for_task = task.task_id).order_by(TaskPolicy.tp_time.desc()).first()
+ task.status = "stopped"
+ stop_task_monitoring(task)
+ stop_policy_change_timer(task)
+ stop_task_deliver(task_policy)
+ db.session.commit()
+
+def finish_task(task):
+ with scheduler.app.app_context():
+ task = db.session.query(Task).filter_by(task_id = task.task_id).first()
+ debug("成功啦|失败!")
+ task_policy = task.task_policies[-1]
+ task.status = "finish"
+ stop_task_monitoring(task)
+ stop_policy_change_timer(task)
+ stop_task_deliver(task_policy)
+ db.session.commit()
+
operation_map = {"start": "开始", "stop": "暂停"}
status_map = {"working": "执行中", "stopped": "暂停"}
# 合法的状态转移逻辑,例如,暂停的任务不能开始,正在执行的任务不能停止
@@ -220,7 +328,8 @@ def handle_task_operation(json_data):
@bp.doc("任务列表信息获取接口")
@bp.input({
"page": Integer(load_default=1),
- "per_page": Integer(load_default=10)
+ "per_page": Integer(load_default=10),
+ "task_id":String(required=False)
}, location="query")
@bp.output({
"code": Integer(),
@@ -231,8 +340,11 @@ def handle_task_operation(json_data):
def tasks_state(query_data):
per_page = query_data["per_page"]
page = query_data["page"]
-
- query = db.session.query(Task).filter().order_by(Task.created_time.desc())
+ task_id = query_data.get("task_id")
+ if task_id:
+ query = db.session.query(Task).filter_by(task_id = task_id)
+ else:
+ query = db.session.query(Task).filter().order_by(Task.created_time.desc())
task_count = query.count()
tasks = query.offset((page - 1) * per_page).limit(per_page).all()
@@ -243,7 +355,7 @@ def tasks_state(query_data):
task_r["target"] = f"{task.target.addrv4}|{task.target.addrv6}"
task_r["name"] = task.task_name
task_r["agent"] = task.agent_id
- task_r["target_domain"] = task.target_domain,
+ task_r["target_domain"] = task.target_domain
task_r["target_rr"] = task.target_rr
task_r["policy"] = task.ptype
task_r["create_time"] = task.created_time
@@ -316,7 +428,14 @@ def task_info(query_data):
"record": delay.recorde
})
-
+ 策略 = {
+ "应答率": {
+ "策略名": 60
+ },
+ "篡改成功率": {
+ "策略名": 25
+ }
+ }
return {"code": 200, "data": task_state_list}