summaryrefslogtreecommitdiff
path: root/server/apps/target.py
diff options
context:
space:
mode:
authorhandingkang <[email protected]>2024-06-18 22:50:42 +0800
committerhandingkang <[email protected]>2024-06-18 22:50:42 +0800
commit1d4200da7064fe41e027f49f967b6cfe795cd88c (patch)
tree438016f2b548e5caa455e2220bf4391e57656b08 /server/apps/target.py
parent4339ecd79f54d1cb1cdd129be7fdfe51a9d3cb14 (diff)
1. 实现主控端实际调用代理探测目标时延的功能
2. 修复部分数据格式问题
Diffstat (limited to 'server/apps/target.py')
-rw-r--r--server/apps/target.py117
1 files changed, 62 insertions, 55 deletions
diff --git a/server/apps/target.py b/server/apps/target.py
index c056a82..9ae34e6 100644
--- a/server/apps/target.py
+++ b/server/apps/target.py
@@ -1,7 +1,7 @@
# 目标状态感知
# 时延测试接口
import ipaddress
-import random
+import json
import threading
from operator import or_
@@ -12,6 +12,7 @@ from apiflask.validators import OneOf
from requests.exceptions import Timeout
from sqlalchemy import distinct, func, case
+from apps.util import debug
from exts import db
from model import Target, Task, Agent
@@ -55,7 +56,7 @@ class TestNode(Schema):
class Delay(Schema):
- Id = Integer()
+ Id = String()
CurrDelay = String()
# MeanDelay=Integer()
# MaxDelay=Integer()
@@ -89,13 +90,13 @@ class CouInfo(Schema):
def get_nodes(query_data):
node_list = []
query_session = db.session
- # 查询所有的isp
+ # 查询负责该任务的所有状态感知节点简要信息
node_data = query_session.query(Task.SCAN_AGENT_ID_LIST).filter(Task.task_id == query_data['taskid']).all()
# 存在数据
if len(node_data) > 0:
# 负责该任务目标的状态感知的节点
- nodes_info = node_data[0].SCAN_AGENT_ID_LIST
+ nodes_info = json.loads(node_data[0].SCAN_AGENT_ID_LIST)
# 查询每一个节点的信息
nodes = query_session.query(Agent).filter(Agent.agent_id.in_(nodes_info.keys())).all()
for node in nodes:
@@ -108,10 +109,11 @@ def get_nodes(query_data):
"Lat": node.lat,
"Lng": node.lng
})
+ query_session.close()
else:
+ query_session.close()
return {"code": 500, "nodes": []}
- query_session.close()
# 废弃代码
# num = 10
# for i in range(num):
@@ -137,9 +139,10 @@ def get_nodes(query_data):
@bp.output(DelayOut)
# TODO:和实际节点联调测试
def get_pernode_delay(query_data):
+ query_session = db.session
# TODO:DoH处理
# 探测地址
- addr = query_data['ip']
+ addr = query_data['target']
taskid = query_data['taskid']
scan_type = query_data['type']
@@ -147,95 +150,99 @@ def get_pernode_delay(query_data):
ans = []
# 线程池
threads = []
- # # 检索探测节点信息
- # sql = """
- # SELECT SCAN_AGENT_ID_LIST as node_info
- # FROM %s
- # WHERE TASK_ID='%s'
- # """ % (MYSQL_TAB_TASK, taskid)
- # da.cursor.execute(sql)
- # # 探测节点ID与地址
- # nodes = json.loads(da.cursor.fetchall()[0]["node_info"].replace('"', "\""))
- #
- # for id, ip in nodes.items():
- # mythread = threading.Thread(target=task, args=[ans, addr, {'id': id, 'ip': ip}, type])
- # mythread.start()
- # threads.append(mythread)
- # for t in threads:
- # t.join()
+
+ # 检索探测节点信息
+ # 查询负责该任务的所有状态感知节点简要信息
+ node_data = query_session.query(Task.SCAN_AGENT_ID_LIST).filter(Task.task_id == taskid).all()
+
+ # 存在数据
+ if len(node_data) > 0:
+ # 负责该任务目标的状态感知的节点ID和IP地址+端口号组成的列表
+ nodes_info = json.loads(node_data[0].SCAN_AGENT_ID_LIST)
+ # 向每个节点查询时延信息
+ for id, ip_port in nodes_info.items():
+ mythread = threading.Thread(target=task, args=[ans, addr, {'id': id, 'ip_port': ip_port}, scan_type])
+ mythread.start()
+ threads.append(mythread)
+ for t in threads:
+ t.join()
+ query_session.close()
+ else:
+ query_session.close()
+ return {"code": 500, "delay_data": []}
# 暂未部署实际代理节点,以假数据返回
- ans = []
- for i in range(10):
- ans.append({"Id": str(i), "Type": scan_type, "CurrDelay": random.randint(1, 1000)})
+ # ans = []
+ # for i in range(10):
+ # ans.append({"Id": str(i), "Type": scan_type, "CurrDelay": random.randint(1, 1000)})
return {"code": 200, 'delay_data': ans}
threadLock = threading.Lock()
-def task(ans, addr, agent, type):
+def task(ans, addr, agent, scan_type):
res = 0
- if type == "icmp":
- res = icmp_delay_query(addr, agent['ip'])
- if type == "tcp":
- res = tcp_delay_query(addr, agent['ip'])
- if type == "dns":
- res = dns_delay_query(addr, agent['ip'])
+ if scan_type == "icmp":
+ res = icmp_delay_query(addr, agent['ip_port'])
+ if scan_type == "tcp":
+ res = tcp_delay_query(addr, agent['ip_port'])
+ if scan_type == "dns":
+ res = dns_delay_query(addr, agent['ip_port'])
threadLock.acquire()
ans.append({
'Id': agent['id'],
'CurrDelay': res,
- 'Type': type})
+ 'Type': scan_type})
threadLock.release()
-def icmp_delay_query(target, addr):
+def icmp_delay_query(target, addr_port):
try:
- res = requests.get(url="http://" + addr + ":2525/script/icmpdelay", params={'ip': target}, timeout=5)
- print("icmp ok:" + addr + "-------" + res.text + "-------" + str(res.elapsed.total_seconds()))
- icmp_delaytable[str(addr) + str(target)] = res.text
+ res = requests.get(url="http://" + addr_port + "/delay/icmp", params={'ip': target}, timeout=5)
+ debug("icmp ok:" + addr_port + "-------" + res.text + "-------" + str(res.elapsed.total_seconds()))
+ icmp_delaytable[str(addr_port) + str(target)] = res.text
return res.text
except Timeout:
# 如果存在旧数据
- if str(addr) + str(target) in icmp_delaytable.keys():
+ if str(addr_port) + str(target) in icmp_delaytable.keys():
pass
# 不存在则设0
else:
- icmp_delaytable[str(addr) + str(target)] = 0
- return icmp_delaytable[str(addr) + str(target)]
+ icmp_delaytable[str(addr_port) + str(target)] = 0
+ return icmp_delaytable[str(addr_port) + str(target)]
-def tcp_delay_query(target, addr):
+def tcp_delay_query(target, addr_port):
try:
- res = requests.get(url="http://" + addr + ":2525/script/tcpdelay", params={'ip': target, 'port': 53}, timeout=5)
- print("tcp ok:" + addr + "-------" + res.text)
- tcp_delaytable[str(addr) + str(target)] = res.text
+ res = requests.get(url="http://" + addr_port + "/delay/tcp", params={'ip': target, 'port': 53}, timeout=5)
+ debug("tcp ok:" + addr_port + "-------" + res.text)
+ tcp_delaytable[str(addr_port) + str(target)] = res.text
return res.text
except Timeout:
# 如果存在旧数据
- if str(addr) + str(target) in tcp_delaytable.keys():
+ if str(addr_port) + str(target) in tcp_delaytable.keys():
pass
# 不存在则设0
else:
- tcp_delaytable[str(addr) + str(target)] = 0
- return tcp_delaytable[str(addr) + str(target)]
+ tcp_delaytable[str(addr_port) + str(target)] = 0
+ return tcp_delaytable[str(addr_port) + str(target)]
-def dns_delay_query(target, addr):
+def dns_delay_query(target, addr_port):
try:
- res = requests.get(url="http://" + addr + ":2525/script/dnsdelay", params={'ip': target}, timeout=5)
- print("dns ok:" + addr + "-------" + res.text)
- dns_delaytable[str(addr) + str(target)] = res.text
- return dns_delaytable[str(addr) + str(target)]
+ res = requests.get(url="http://" + addr_port + "/delay/dns", params={'ip': target}, timeout=5)
+ debug("dns ok:" + addr_port + "-------" + res.text)
+ dns_delaytable[str(addr_port) + str(target)] = res.text
+ return dns_delaytable[str(addr_port) + str(target)]
except Timeout:
# 如果存在旧数据
- if str(addr) + str(target) in dns_delaytable.keys():
+ if str(addr_port) + str(target) in dns_delaytable.keys():
pass
# 不存在则设0
else:
- dns_delaytable[str(addr) + str(target)] = 0
- return dns_delaytable[str(addr) + str(target)]
+ dns_delaytable[str(addr_port) + str(target)] = 0
+ return dns_delaytable[str(addr_port) + str(target)]
# 状态感知——DNS记录测试接口