diff options
| author | 韩丁康 <[email protected]> | 2024-05-11 20:03:24 +0800 |
|---|---|---|
| committer | 韩丁康 <[email protected]> | 2024-05-11 20:03:24 +0800 |
| commit | 840d64c187899afd7dab123f461dd0ac275c1a9c (patch) | |
| tree | 2e9761b6a772af10e81e7a9daebee68a8cc6594e | |
| parent | 6208efa535f5ba7aba079cb50a8b9b871edacb48 (diff) | |
已实现接口同步
| -rw-r--r-- | server/apps/policy.py | 76 | ||||
| -rw-r--r-- | server/apps/target.py | 43 | ||||
| -rw-r--r-- | server/apps/task.py | 232 | ||||
| -rw-r--r-- | server/apps/util.py | 25 |
4 files changed, 314 insertions, 62 deletions
diff --git a/server/apps/policy.py b/server/apps/policy.py new file mode 100644 index 0000000..3c75175 --- /dev/null +++ b/server/apps/policy.py @@ -0,0 +1,76 @@ +# 策略生成与调整,效果评估模块 +import random + +from settings import * +from .util import da, is_ipaddress + + +# 初始策略创建,输入参数为期望策略类型、目标(IP或域名)以及任务编号 +def init_task_policy(ptype, target, task): + ip_version = is_ipaddress(target) + # 非ip地址,默认为DoH域名 + if ip_version is None: + # TODO:DoH处理 + res = da.get_data(data_type="target", search={""}) + # IPv4 + elif ip_version == 4: + res = da.get_data(data_type="target", search={"ADDRv4": target})[0] + # IPv6 + elif ip_version == 6: + res = da.get_data(data_type="target", search={"ADDRv6": target})[0] + else: + exit(1) + + # 目标支持协议范围 + proto = [] + # 不存在该目标的相关记录 + # TODO:后续对该目标进行探测 + if len(res) == 0: + proto = ["IPv6", "DOH", "DOT", "DNSSEC"] + # 存在该目标的记录 + else: + for k in ["IPv6", "DOH", "DOT", "DNSSEC"]: + if res[k]: + proto.append(k) + + # 可选策略范围 policy + # 根据用户期望手段与目标协议寻找初始化策略 + # auto自动包含两类策略 + if ptype == "auto": + policy = ["ddos", "sjqp"] + # 否则仅支持用户选定的策略类型 + else: + policy = [ptype] + + # 策略记录,将上述两个范围列表proto,policy中的值组合进sql语句中 + sql = """ + SELECT P_ID,P_NAME,P_EXE,P_PAYLOAD + FROM %s + WHERE P_TYPE IN (%s) AND P_PROTO IN (%s) + """ % (MYSQL_TAB_POLICY, "\'" + "\',\'".join(policy) + "\'", "\'" + "\',\'".join(proto) + "\'") + da.cursor.execute(sql) + policy_list = da.cursor.fetchall() + # 随机选择一个作为初始策略 + first_policy = random.choice(policy_list) + # 策略对应的执行文件路径 + p_exe = first_policy["P_EXE"] + # 策略对应的初始参数 + p_param = first_policy["P_PAYLOAD"] + + task_policy_info = {"policy": first_policy["P_ID"], "param": p_param, "task": task, "tab": MYSQL_TAB_TASKPOLICY} + + # 记录该任务策略 + task_policy_sql = """ + INSERT INTO %(tab)s + (POLICY, + POLICY_PARAM, + FOR_TASK) + VALUES('%(policy)s', '%(param)s', '%(task)s') + """ % task_policy_info + + # 获取任务策略的主键值 + da.cursor.execute(task_policy_sql) + tp_id = da.conn.insert_id() + da.conn.commit() + + return tp_id, p_exe, p_param diff --git a/server/apps/target.py b/server/apps/target.py index 5a1875e..b80e130 100644 --- a/server/apps/target.py +++ b/server/apps/target.py @@ -1,6 +1,7 @@ # 目标状态感知 # 时延测试接口 import threading +import json import pandas as pd import requests @@ -95,34 +96,20 @@ def get_pernode_delay(query_data, type): threadLock = threading.Lock() -def task(ans, addr, row, type): +def task(ans, addr, agent, type): + res = 0 if type == "icmp": - res = icmp_delay_query(addr, row['ip']) - threadLock.acquire() - ans.append({ - 'Id': row['id'], - 'CurrDelay': res, - 'Type': type}) - threadLock.release() - return + res = icmp_delay_query(addr, agent['ip']) if type == "tcp": - res = tcp_delay_query(addr, row['ip']) - threadLock.acquire() - ans.append({ - 'Id': row['id'], - 'CurrDelay': res, - 'Type': type}) - threadLock.release() - return + res = tcp_delay_query(addr, agent['ip']) if type == "dns": - res = dns_delay_query(addr, row['ip']) - threadLock.acquire() - ans.append({ - 'Id': row['id'], - 'CurrDelay': res, - 'Type': type}) - threadLock.release() - + res = dns_delay_query(addr, agent['ip']) + threadLock.acquire() + ans.append({ + 'Id': agent['id'], + 'CurrDelay': res, + 'Type': type}) + threadLock.release() def icmp_delay_query(target, addr): try: @@ -180,7 +167,10 @@ from dns import resolver @bp.get("/check") [email protected]("通过指定的解析器获取指定域名的A/AAAA记录", hide=True) [email protected]("通过指定的解析器获取指定域名的A/AAAA记录", description="参数说明:</br>" + + "rev:解析器的IP地址</br>" + + "domain:查询的目标域名</br>" + + "qtype:查询的记录类型") @bp.input({ 'rev': String(required=True), 'domain': String(required=True), @@ -225,6 +215,7 @@ def record(query_data): return {"code": 200, 'ans': ans} + @bp.get("/") @bp.doc("(表格)目标信息获取接口", "返回目标信息") @bp.input({ diff --git a/server/apps/task.py b/server/apps/task.py index 5993ee5..d448eb3 100644 --- a/server/apps/task.py +++ b/server/apps/task.py @@ -1,12 +1,15 @@ # 任务管理接口 -import datetime +import json +import uuid import random from apiflask import APIBlueprint, Schema from apiflask.fields import String, Integer, IP, DateTime, List, Nested from apiflask.validators import OneOf -from .util import fake, da +from .agentcomm import deliver_task +from .policy import * +from .util import fake, da, error, debug bp = APIBlueprint("任务管理接口集合", __name__, url_prefix="/task") @@ -63,6 +66,12 @@ class TaskState(Schema): # 效果评估 policy_status = String() +# 任务创建参数逻辑合法性检查 +def valid_task_info(task_param: dict): + if task_param["policy"] == "sjqp": + if task_param["target_rr"] == "" or task_param["tdomain"] == "": + return "数据欺骗缺乏目标域名或注入参数" + return None # 创建任务接口 @bp.post("/create") @@ -97,12 +106,131 @@ class TaskState(Schema): }) # TODO:创建任务接口具体实现 def make_task(json_data): - print(json_data) + # 请求参数转储到变量 + name = json_data["name"] + target = str(json_data["target"]) + agent_id = json_data["agent"] + tdomain = json_data["target_domain"] + target_rr = json_data["target_rr"] + policy_type = json_data["policy"] + scan = json_data["scan"] + policy_time = json_data["policy_time"] + run_flag = json_data["run_flag"] + run_time = json_data["run_time"] + + # 任务编号生成 + t_id = str(uuid.uuid1()) + + # 任务信息生成 + + task_info = { + "t_id": t_id, + "name": name, + "target": target, + "ag_id": agent_id, + "p_delay": policy_time, + "t_delay": run_time, + "scan": scan, + "status": "working" if run_flag == "now" else "stop", + "policy": policy_type, + "tdomain": tdomain, + "target_rr": target_rr, + "target_rtype": "" + } + + # 输入参数检查处理 + warn = valid_task_info(task_info) + if warn is not None: + return {"code": 400, "msg": warn} + if task_info["target_rr"] != "": + task_info["target_rtype"], task_info["target_rr"] = tuple(task_info["target_rr"].split(" ")) + + # 添加表名 + task_info["tab"] = MYSQL_TAB_TASK + # 若目标为IP地址 + if is_ipaddress(target) is not None: + task_info["TARGET"] = "TARGET_IP" + # 若目标为域名,视为DoH服务 + else: + task_info["TARGET"] = "TARGET_DOH" + + # 添加状态探测节点,用于状态感知 + scan_node_sql = """ + SELECT AGENT_ID,IPADDR + FROM %s + WHERE STATUS='1' AND AGENT_TYPE='ztgz' + LIMIT 50; + """ % MYSQL_TAB_AGENT + da.cursor.execute(scan_node_sql) + # 所有在线的支持状态感知的代理 + all_nodes = da.cursor.fetchall() + # 随机选择10个 + selected_nodes = random.choices(all_nodes, k=10 if len(all_nodes) > 10 else len(all_nodes)) + # 将选择的代理节点ID和对应IPv4地址存储为字典,键为代理ID,值为代理的IPv4地址 + selected_nodes_info = {'{}'.format(n['AGENT_ID']): '{}'.format(str(n['IPADDR']).split("|")[0]) for n in + selected_nodes} + task_info["nodes"] = json.dumps(selected_nodes_info) + + # sql语句组合 + sql = """ + INSERT INTO %(tab)s ( + TASK_ID, + TASK_NAME, + AGENT_ID, + STATUS, + POLICY_DELAY, + TASK_DELAY, + TARGET_SCAN, + TARGET_DOMAIN, + TARGET_RTYPE, + TARGET_RR, + %(TARGET)s, + SCAN_AGENT_ID_LIST) + VALUES ( + '%(t_id)s', + '%(name)s', + '%(ag_id)s', + '%(status)s', + '%(p_delay)s', + '%(t_delay)s', + '%(scan)s', + '%(tdomain)s', + '%(target_rtype)s', + '%(target_rr)s', + '%(target)s', + '%(nodes)s' + ); + """ % task_info + debug(sql) + da.cursor.execute(sql) + da.conn.commit() + + # 任务策略初始化 + tp_id, p_exe, p_param = init_task_policy(policy_type, target, t_id) + update_policy_sql = """ + UPDATE %s + SET POLICY='%s' + WHERE TASK_ID='%s'; + """ % (MYSQL_TAB_TASK, tp_id, t_id) + da.cursor.execute(update_policy_sql) + da.conn.commit() + + # 根据run_flag判断是否立刻执行 + if run_flag == "now": + err = deliver_task(agent_id, p_exe, p_param) + if err is not None: + error(str(err)) + return {"code": 500, "msg": str(err)} return {"code": 200, "msg": "ok"} -opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"} +opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"} +statemap = {"working": "执行中", "stop": "暂停", "finish": "结束"} +# 合法逻辑映射,例如不能让一个暂停的任务开始,或是让一个正在运行的任务停止 +ops_state = {"start": ["stop"], "stop": ["working"], "cancel": ["working", "stop"]} +# 操作与状态结果的映射关系 +ops_state_result = {"start": "working", "stop": "stop", "cancel": "finish"} # 操作任务开始停止控制接口 @bp.post("/ops") @@ -117,8 +245,36 @@ opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"} }) # TODO:操作任务开始停止控制接口具体实现 def ops_task(json_data): - ops = opsmap[json_data["ops"]] - return {"code": 200, "msg": "任务" + json_data["taskid"] + "已" + ops} + ops = json_data["ops"] + # 操作中文名 + ops_cn = opsmap[json_data["ops"]] + id = json_data["taskid"] + + # 查询目标任务状态 + sql = """ + SELECT AGENT_ID,STATUS + FROM %s + WHERE TASK_ID='%s'; + """ % (MYSQL_TAB_TASK, id) + da.cursor.execute(sql) + data = da.cursor.fetchall() + if len(data) == 0: + return {"code": 400, "msg": "任务" + id + "不存在!!"} + # 判断操作是否是逻辑上可执行的 + if data[0]["STATUS"] in ops_state[ops]: + # 向对应代理下发控制指令 + err = deliver_task(data[0]["AGENT_ID"], "ops", ops) + if err is not None: + return {"code": 500, "msg": str(err)} + task_update_status_sql = """ + UPDATE %s + SET STATUS='%s' + WHERE TASK_ID='%s';""" % (MYSQL_TAB_TASK, ops_state_result[ops], id) + da.cursor.execute(task_update_status_sql) + da.conn.commit() + return {"code": 200, "msg": "任务" + json_data["taskid"] + "已" + ops_cn} + else: + return {"code": 400, "msg": "无法对一个" + statemap[data[0]["STATUS"]] + "的任务执行" + ops_cn + "操作"} # 查询任务列表接口 @@ -146,7 +302,10 @@ def tasks_state(query_data): for r in res: task = {} for key, value in r.items(): - task[task_response_map[key]] = value + try: + task[task_response_map[key]] = value + except KeyError: + continue task_list.append(task) return {"code": 200, "data": task_list, "total": res_count} @@ -161,28 +320,30 @@ def tasks_state(query_data): "code": Integer(), "data": List(Nested(TaskState())) }) -# TODO:任务详情信息获取接口具体实现 def task_info(query_data): - print(query_data) - round = random.randint(1, 10) + taskid = query_data["taskid"] + + # 任务状态列表 + sql = """ + SELECT tp.TP_TIME as tp_time, p.P_NAME as p_name, tp.POLICY_PARAM as tp_param,tp.TP_ID as tp_id + FROM %s as tp,%s as p + WHERE tp.POLICY=p.P_ID AND tp.FOR_TASK='%s' + ORDER BY TP_TIME; + """ % (MYSQL_TAB_TASKPOLICY, MYSQL_TAB_POLICY, taskid) + da.cursor.execute(sql) + data = da.cursor.fetchall() task_state_list = [] # 过往记录 - for _ in range(round): + for d in data: task_state_list.append({ - "start_time": fake.date_time_between(start_date="-1y"), - "policy_name": random.choice(["IPv6", "DNSSEC", "DoT", "DoH"]) + " " + random.choice(["DDoS", "数据欺骗"]), - "policy_param": random.choice(["攻击速率: 1000pps", "目标域名: www.google.com | 目标记录: NS attack.com"]), - "policy_id": str(fake.random.randint(1, 10000)), + "start_time": d["tp_time"], + "policy_name": d["p_name"], + "policy_param": d["tp_param"], + "policy_id": d["tp_id"], "policy_status": "无效;原因为:超时未成功" }) - # 当前正在执行的策略 - task_state_list.append({ - "start_time": datetime.datetime.now(), - "policy_name": random.choice(["IPv6", "DNSSEC", "DoT", "DoH"]) + " " + random.choice(["DDoS", "数据欺骗"]), - "policy_param": random.choice(["攻击速率: 1000pps", "目标域名: www.google.com | 目标记录: NS attack.com"]), - "policy_id": str(fake.random.randint(1, 10000)), - "policy_status": "评估中" - }) + # 最后一项为当前正在执行的策略 + task_state_list[-1]["policy_status"] = "评估中" return {"code": 200, "data": task_state_list} @@ -212,3 +373,28 @@ def taskpolicy_log(query_data): } for _ in range(per_page)] return {"code": 200, "data": policy_output, "total": 10 * per_page} + [email protected]("任务删除接口", "输入参数说明:</br>" + + "id: 任务编号") [email protected]("/del") + "id": String(required=True) +}) + "code": Integer(), + "msg": String() +}) +def del_task(json_data): + taskid = json_data["id"] + sql = """ + DELETE + FROM %s + WHERE TASK_ID='%s'; + """ % (MYSQL_TAB_TASK, taskid) + try: + da.cursor.execute(sql) + da.conn.commit() + except Exception as e: + error(str(e)) + return {"code": 500, "msg": str(e)} + return {"code": 200, "msg": "ok"} diff --git a/server/apps/util.py b/server/apps/util.py index 337c946..ad5bec2 100644 --- a/server/apps/util.py +++ b/server/apps/util.py @@ -1,4 +1,5 @@ # 工具包 +import ipaddress import sys import pymysql @@ -179,7 +180,7 @@ class DataHandler: # 完全一致 if len(differ) == 0: if not count: - sql = """SELECT * FROM %s LIMIT %s, %s""" % (tabname, offset, offset + limit) + sql = """SELECT * FROM %s LIMIT %s, %s;""" % (tabname, offset, offset + limit) self.cursor.execute(sql) return self.cursor.fetchall() else: @@ -195,7 +196,7 @@ class DataHandler: # 参数在数据表中对应的字段名 tab_key = model.keymapping[data_type][key] if model.typemapping[key] == "str": - condition[tab_key] = "\"".join(["", str(val), ""]) + condition[tab_key] = "\'".join(["", str(val), ""]) else: condition[tab_key] = str(val) if not count: @@ -228,17 +229,6 @@ class DataHandler: da = DataHandler() -# 将插入mysql中的字符串处理为满足mysql形式的单引号字符串,输入为字典形式 -def string_to_mysql(data: dict): - res = {} - for key, val in data.items(): - if type(val) == type("text"): - res[key] = "\'" + val + "\'" - else: - res[key] = val - return res - - def debug(message, *args, **kwargs): logger.debug(message, *args, **kwargs) @@ -249,3 +239,12 @@ def info(message, *args, **kwargs): def error(message, *args, **kwargs): logger.error(message, *args, **kwargs) + + +# 判断是否为IP地址,若是返回4(IPv4)或6(IPv6),否则返回None +def is_ipaddress(address: str): + try: + ip = ipaddress.ip_address(address) + return int(ip.version) + except ValueError: + return None |
