summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author韩丁康 <[email protected]>2024-05-11 20:03:24 +0800
committer韩丁康 <[email protected]>2024-05-11 20:03:24 +0800
commit840d64c187899afd7dab123f461dd0ac275c1a9c (patch)
tree2e9761b6a772af10e81e7a9daebee68a8cc6594e
parent6208efa535f5ba7aba079cb50a8b9b871edacb48 (diff)
已实现接口同步
-rw-r--r--server/apps/policy.py76
-rw-r--r--server/apps/target.py43
-rw-r--r--server/apps/task.py232
-rw-r--r--server/apps/util.py25
4 files changed, 314 insertions, 62 deletions
diff --git a/server/apps/policy.py b/server/apps/policy.py
new file mode 100644
index 0000000..3c75175
--- /dev/null
+++ b/server/apps/policy.py
@@ -0,0 +1,76 @@
+# 策略生成与调整,效果评估模块
+import random
+
+from settings import *
+from .util import da, is_ipaddress
+
+
+# 初始策略创建,输入参数为期望策略类型、目标(IP或域名)以及任务编号
+def init_task_policy(ptype, target, task):
+ ip_version = is_ipaddress(target)
+ # 非ip地址,默认为DoH域名
+ if ip_version is None:
+ # TODO:DoH处理
+ res = da.get_data(data_type="target", search={""})
+ # IPv4
+ elif ip_version == 4:
+ res = da.get_data(data_type="target", search={"ADDRv4": target})[0]
+ # IPv6
+ elif ip_version == 6:
+ res = da.get_data(data_type="target", search={"ADDRv6": target})[0]
+ else:
+ exit(1)
+
+ # 目标支持协议范围
+ proto = []
+ # 不存在该目标的相关记录
+ # TODO:后续对该目标进行探测
+ if len(res) == 0:
+ proto = ["IPv6", "DOH", "DOT", "DNSSEC"]
+ # 存在该目标的记录
+ else:
+ for k in ["IPv6", "DOH", "DOT", "DNSSEC"]:
+ if res[k]:
+ proto.append(k)
+
+ # 可选策略范围 policy
+ # 根据用户期望手段与目标协议寻找初始化策略
+ # auto自动包含两类策略
+ if ptype == "auto":
+ policy = ["ddos", "sjqp"]
+ # 否则仅支持用户选定的策略类型
+ else:
+ policy = [ptype]
+
+ # 策略记录,将上述两个范围列表proto,policy中的值组合进sql语句中
+ sql = """
+ SELECT P_ID,P_NAME,P_EXE,P_PAYLOAD
+ FROM %s
+ WHERE P_TYPE IN (%s) AND P_PROTO IN (%s)
+ """ % (MYSQL_TAB_POLICY, "\'" + "\',\'".join(policy) + "\'", "\'" + "\',\'".join(proto) + "\'")
+ da.cursor.execute(sql)
+ policy_list = da.cursor.fetchall()
+ # 随机选择一个作为初始策略
+ first_policy = random.choice(policy_list)
+ # 策略对应的执行文件路径
+ p_exe = first_policy["P_EXE"]
+ # 策略对应的初始参数
+ p_param = first_policy["P_PAYLOAD"]
+
+ task_policy_info = {"policy": first_policy["P_ID"], "param": p_param, "task": task, "tab": MYSQL_TAB_TASKPOLICY}
+
+ # 记录该任务策略
+ task_policy_sql = """
+ INSERT INTO %(tab)s
+ (POLICY,
+ POLICY_PARAM,
+ FOR_TASK)
+ VALUES('%(policy)s', '%(param)s', '%(task)s')
+ """ % task_policy_info
+
+ # 获取任务策略的主键值
+ da.cursor.execute(task_policy_sql)
+ tp_id = da.conn.insert_id()
+ da.conn.commit()
+
+ return tp_id, p_exe, p_param
diff --git a/server/apps/target.py b/server/apps/target.py
index 5a1875e..b80e130 100644
--- a/server/apps/target.py
+++ b/server/apps/target.py
@@ -1,6 +1,7 @@
# 目标状态感知
# 时延测试接口
import threading
+import json
import pandas as pd
import requests
@@ -95,34 +96,20 @@ def get_pernode_delay(query_data, type):
threadLock = threading.Lock()
-def task(ans, addr, row, type):
+def task(ans, addr, agent, type):
+ res = 0
if type == "icmp":
- res = icmp_delay_query(addr, row['ip'])
- threadLock.acquire()
- ans.append({
- 'Id': row['id'],
- 'CurrDelay': res,
- 'Type': type})
- threadLock.release()
- return
+ res = icmp_delay_query(addr, agent['ip'])
if type == "tcp":
- res = tcp_delay_query(addr, row['ip'])
- threadLock.acquire()
- ans.append({
- 'Id': row['id'],
- 'CurrDelay': res,
- 'Type': type})
- threadLock.release()
- return
+ res = tcp_delay_query(addr, agent['ip'])
if type == "dns":
- res = dns_delay_query(addr, row['ip'])
- threadLock.acquire()
- ans.append({
- 'Id': row['id'],
- 'CurrDelay': res,
- 'Type': type})
- threadLock.release()
-
+ res = dns_delay_query(addr, agent['ip'])
+ threadLock.acquire()
+ ans.append({
+ 'Id': agent['id'],
+ 'CurrDelay': res,
+ 'Type': type})
+ threadLock.release()
def icmp_delay_query(target, addr):
try:
@@ -180,7 +167,10 @@ from dns import resolver
@bp.get("/check")
[email protected]("通过指定的解析器获取指定域名的A/AAAA记录", hide=True)
[email protected]("通过指定的解析器获取指定域名的A/AAAA记录", description="参数说明:</br>" +
+ "rev:解析器的IP地址</br>"
+ + "domain:查询的目标域名</br>"
+ + "qtype:查询的记录类型")
@bp.input({
'rev': String(required=True),
'domain': String(required=True),
@@ -225,6 +215,7 @@ def record(query_data):
return {"code": 200, 'ans': ans}
+
@bp.get("/")
@bp.doc("(表格)目标信息获取接口", "返回目标信息")
@bp.input({
diff --git a/server/apps/task.py b/server/apps/task.py
index 5993ee5..d448eb3 100644
--- a/server/apps/task.py
+++ b/server/apps/task.py
@@ -1,12 +1,15 @@
# 任务管理接口
-import datetime
+import json
+import uuid
import random
from apiflask import APIBlueprint, Schema
from apiflask.fields import String, Integer, IP, DateTime, List, Nested
from apiflask.validators import OneOf
-from .util import fake, da
+from .agentcomm import deliver_task
+from .policy import *
+from .util import fake, da, error, debug
bp = APIBlueprint("任务管理接口集合", __name__, url_prefix="/task")
@@ -63,6 +66,12 @@ class TaskState(Schema):
# 效果评估
policy_status = String()
+# 任务创建参数逻辑合法性检查
+def valid_task_info(task_param: dict):
+ if task_param["policy"] == "sjqp":
+ if task_param["target_rr"] == "" or task_param["tdomain"] == "":
+ return "数据欺骗缺乏目标域名或注入参数"
+ return None
# 创建任务接口
@bp.post("/create")
@@ -97,12 +106,131 @@ class TaskState(Schema):
})
# TODO:创建任务接口具体实现
def make_task(json_data):
- print(json_data)
+ # 请求参数转储到变量
+ name = json_data["name"]
+ target = str(json_data["target"])
+ agent_id = json_data["agent"]
+ tdomain = json_data["target_domain"]
+ target_rr = json_data["target_rr"]
+ policy_type = json_data["policy"]
+ scan = json_data["scan"]
+ policy_time = json_data["policy_time"]
+ run_flag = json_data["run_flag"]
+ run_time = json_data["run_time"]
+
+ # 任务编号生成
+ t_id = str(uuid.uuid1())
+
+ # 任务信息生成
+
+ task_info = {
+ "t_id": t_id,
+ "name": name,
+ "target": target,
+ "ag_id": agent_id,
+ "p_delay": policy_time,
+ "t_delay": run_time,
+ "scan": scan,
+ "status": "working" if run_flag == "now" else "stop",
+ "policy": policy_type,
+ "tdomain": tdomain,
+ "target_rr": target_rr,
+ "target_rtype": ""
+ }
+
+ # 输入参数检查处理
+ warn = valid_task_info(task_info)
+ if warn is not None:
+ return {"code": 400, "msg": warn}
+ if task_info["target_rr"] != "":
+ task_info["target_rtype"], task_info["target_rr"] = tuple(task_info["target_rr"].split(" "))
+
+ # 添加表名
+ task_info["tab"] = MYSQL_TAB_TASK
+ # 若目标为IP地址
+ if is_ipaddress(target) is not None:
+ task_info["TARGET"] = "TARGET_IP"
+ # 若目标为域名,视为DoH服务
+ else:
+ task_info["TARGET"] = "TARGET_DOH"
+
+ # 添加状态探测节点,用于状态感知
+ scan_node_sql = """
+ SELECT AGENT_ID,IPADDR
+ FROM %s
+ WHERE STATUS='1' AND AGENT_TYPE='ztgz'
+ LIMIT 50;
+ """ % MYSQL_TAB_AGENT
+ da.cursor.execute(scan_node_sql)
+ # 所有在线的支持状态感知的代理
+ all_nodes = da.cursor.fetchall()
+ # 随机选择10个
+ selected_nodes = random.choices(all_nodes, k=10 if len(all_nodes) > 10 else len(all_nodes))
+ # 将选择的代理节点ID和对应IPv4地址存储为字典,键为代理ID,值为代理的IPv4地址
+ selected_nodes_info = {'{}'.format(n['AGENT_ID']): '{}'.format(str(n['IPADDR']).split("|")[0]) for n in
+ selected_nodes}
+ task_info["nodes"] = json.dumps(selected_nodes_info)
+
+ # sql语句组合
+ sql = """
+ INSERT INTO %(tab)s (
+ TASK_ID,
+ TASK_NAME,
+ AGENT_ID,
+ STATUS,
+ POLICY_DELAY,
+ TASK_DELAY,
+ TARGET_SCAN,
+ TARGET_DOMAIN,
+ TARGET_RTYPE,
+ TARGET_RR,
+ %(TARGET)s,
+ SCAN_AGENT_ID_LIST)
+ VALUES (
+ '%(t_id)s',
+ '%(name)s',
+ '%(ag_id)s',
+ '%(status)s',
+ '%(p_delay)s',
+ '%(t_delay)s',
+ '%(scan)s',
+ '%(tdomain)s',
+ '%(target_rtype)s',
+ '%(target_rr)s',
+ '%(target)s',
+ '%(nodes)s'
+ );
+ """ % task_info
+ debug(sql)
+ da.cursor.execute(sql)
+ da.conn.commit()
+
+ # 任务策略初始化
+ tp_id, p_exe, p_param = init_task_policy(policy_type, target, t_id)
+ update_policy_sql = """
+ UPDATE %s
+ SET POLICY='%s'
+ WHERE TASK_ID='%s';
+ """ % (MYSQL_TAB_TASK, tp_id, t_id)
+ da.cursor.execute(update_policy_sql)
+ da.conn.commit()
+
+ # 根据run_flag判断是否立刻执行
+ if run_flag == "now":
+ err = deliver_task(agent_id, p_exe, p_param)
+ if err is not None:
+ error(str(err))
+ return {"code": 500, "msg": str(err)}
return {"code": 200, "msg": "ok"}
-opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"}
+opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"}
+statemap = {"working": "执行中", "stop": "暂停", "finish": "结束"}
+# 合法逻辑映射,例如不能让一个暂停的任务开始,或是让一个正在运行的任务停止
+ops_state = {"start": ["stop"], "stop": ["working"], "cancel": ["working", "stop"]}
+# 操作与状态结果的映射关系
+ops_state_result = {"start": "working", "stop": "stop", "cancel": "finish"}
# 操作任务开始停止控制接口
@bp.post("/ops")
@@ -117,8 +245,36 @@ opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"}
})
# TODO:操作任务开始停止控制接口具体实现
def ops_task(json_data):
- ops = opsmap[json_data["ops"]]
- return {"code": 200, "msg": "任务" + json_data["taskid"] + "已" + ops}
+ ops = json_data["ops"]
+ # 操作中文名
+ ops_cn = opsmap[json_data["ops"]]
+ id = json_data["taskid"]
+
+ # 查询目标任务状态
+ sql = """
+ SELECT AGENT_ID,STATUS
+ FROM %s
+ WHERE TASK_ID='%s';
+ """ % (MYSQL_TAB_TASK, id)
+ da.cursor.execute(sql)
+ data = da.cursor.fetchall()
+ if len(data) == 0:
+ return {"code": 400, "msg": "任务" + id + "不存在!!"}
+ # 判断操作是否是逻辑上可执行的
+ if data[0]["STATUS"] in ops_state[ops]:
+ # 向对应代理下发控制指令
+ err = deliver_task(data[0]["AGENT_ID"], "ops", ops)
+ if err is not None:
+ return {"code": 500, "msg": str(err)}
+ task_update_status_sql = """
+ UPDATE %s
+ SET STATUS='%s'
+ WHERE TASK_ID='%s';""" % (MYSQL_TAB_TASK, ops_state_result[ops], id)
+ da.cursor.execute(task_update_status_sql)
+ da.conn.commit()
+ return {"code": 200, "msg": "任务" + json_data["taskid"] + "已" + ops_cn}
+ else:
+ return {"code": 400, "msg": "无法对一个" + statemap[data[0]["STATUS"]] + "的任务执行" + ops_cn + "操作"}
# 查询任务列表接口
@@ -146,7 +302,10 @@ def tasks_state(query_data):
for r in res:
task = {}
for key, value in r.items():
- task[task_response_map[key]] = value
+ try:
+ task[task_response_map[key]] = value
+ except KeyError:
+ continue
task_list.append(task)
return {"code": 200, "data": task_list, "total": res_count}
@@ -161,28 +320,30 @@ def tasks_state(query_data):
"code": Integer(),
"data": List(Nested(TaskState()))
})
-# TODO:任务详情信息获取接口具体实现
def task_info(query_data):
- print(query_data)
- round = random.randint(1, 10)
+ taskid = query_data["taskid"]
+
+ # 任务状态列表
+ sql = """
+ SELECT tp.TP_TIME as tp_time, p.P_NAME as p_name, tp.POLICY_PARAM as tp_param,tp.TP_ID as tp_id
+ FROM %s as tp,%s as p
+ WHERE tp.POLICY=p.P_ID AND tp.FOR_TASK='%s'
+ ORDER BY TP_TIME;
+ """ % (MYSQL_TAB_TASKPOLICY, MYSQL_TAB_POLICY, taskid)
+ da.cursor.execute(sql)
+ data = da.cursor.fetchall()
task_state_list = []
# 过往记录
- for _ in range(round):
+ for d in data:
task_state_list.append({
- "start_time": fake.date_time_between(start_date="-1y"),
- "policy_name": random.choice(["IPv6", "DNSSEC", "DoT", "DoH"]) + " " + random.choice(["DDoS", "数据欺骗"]),
- "policy_param": random.choice(["攻击速率: 1000pps", "目标域名: www.google.com | 目标记录: NS attack.com"]),
- "policy_id": str(fake.random.randint(1, 10000)),
+ "start_time": d["tp_time"],
+ "policy_name": d["p_name"],
+ "policy_param": d["tp_param"],
+ "policy_id": d["tp_id"],
"policy_status": "无效;原因为:超时未成功"
})
- # 当前正在执行的策略
- task_state_list.append({
- "start_time": datetime.datetime.now(),
- "policy_name": random.choice(["IPv6", "DNSSEC", "DoT", "DoH"]) + " " + random.choice(["DDoS", "数据欺骗"]),
- "policy_param": random.choice(["攻击速率: 1000pps", "目标域名: www.google.com | 目标记录: NS attack.com"]),
- "policy_id": str(fake.random.randint(1, 10000)),
- "policy_status": "评估中"
- })
+ # 最后一项为当前正在执行的策略
+ task_state_list[-1]["policy_status"] = "评估中"
return {"code": 200, "data": task_state_list}
@@ -212,3 +373,28 @@ def taskpolicy_log(query_data):
} for _ in range(per_page)]
return {"code": 200, "data": policy_output, "total": 10 * per_page}
+
[email protected]("任务删除接口", "输入参数说明:</br>"
+ + "id: 任务编号")
+ "id": String(required=True)
+})
+ "code": Integer(),
+ "msg": String()
+})
+def del_task(json_data):
+ taskid = json_data["id"]
+ sql = """
+ DELETE
+ FROM %s
+ WHERE TASK_ID='%s';
+ """ % (MYSQL_TAB_TASK, taskid)
+ try:
+ da.cursor.execute(sql)
+ da.conn.commit()
+ except Exception as e:
+ error(str(e))
+ return {"code": 500, "msg": str(e)}
+ return {"code": 200, "msg": "ok"}
diff --git a/server/apps/util.py b/server/apps/util.py
index 337c946..ad5bec2 100644
--- a/server/apps/util.py
+++ b/server/apps/util.py
@@ -1,4 +1,5 @@
# 工具包
+import ipaddress
import sys
import pymysql
@@ -179,7 +180,7 @@ class DataHandler:
# 完全一致
if len(differ) == 0:
if not count:
- sql = """SELECT * FROM %s LIMIT %s, %s""" % (tabname, offset, offset + limit)
+ sql = """SELECT * FROM %s LIMIT %s, %s;""" % (tabname, offset, offset + limit)
self.cursor.execute(sql)
return self.cursor.fetchall()
else:
@@ -195,7 +196,7 @@ class DataHandler:
# 参数在数据表中对应的字段名
tab_key = model.keymapping[data_type][key]
if model.typemapping[key] == "str":
- condition[tab_key] = "\"".join(["", str(val), ""])
+ condition[tab_key] = "\'".join(["", str(val), ""])
else:
condition[tab_key] = str(val)
if not count:
@@ -228,17 +229,6 @@ class DataHandler:
da = DataHandler()
-# 将插入mysql中的字符串处理为满足mysql形式的单引号字符串,输入为字典形式
-def string_to_mysql(data: dict):
- res = {}
- for key, val in data.items():
- if type(val) == type("text"):
- res[key] = "\'" + val + "\'"
- else:
- res[key] = val
- return res
-
-
def debug(message, *args, **kwargs):
logger.debug(message, *args, **kwargs)
@@ -249,3 +239,12 @@ def info(message, *args, **kwargs):
def error(message, *args, **kwargs):
logger.error(message, *args, **kwargs)
+
+
+# 判断是否为IP地址,若是返回4(IPv4)或6(IPv6),否则返回None
+def is_ipaddress(address: str):
+ try:
+ ip = ipaddress.ip_address(address)
+ return int(ip.version)
+ except ValueError:
+ return None