summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author韩丁康 <[email protected]>2024-04-01 09:38:14 +0800
committer韩丁康 <[email protected]>2024-04-01 09:38:14 +0800
commitbe6e8df0266ee6126000b6fcf270d376ea04b503 (patch)
tree370d2712d12a711f3d34aabefb62481be62d1bbb
parent38d397681518173398a86003f7f30db5aaa448a2 (diff)
代理状态感知功能完善
-rw-r--r--agent/app.py34
-rw-r--r--agent/apps/datacheck.py52
-rw-r--r--agent/apps/delay.py51
3 files changed, 92 insertions, 45 deletions
diff --git a/agent/app.py b/agent/app.py
index d929dca..eaca222 100644
--- a/agent/app.py
+++ b/agent/app.py
@@ -1,21 +1,20 @@
import argparse
+import ipaddress
import platform
+import socket
-import requests
-from apiflask import APIFlask, Schema, PaginationSchema
-from apiflask.fields import List, Nested
-from flask import request
-from apps.script import bp as scriptbp
-import psutil
import geocoder
-import socket
-import ipaddress
+import psutil
+import requests
+from apiflask import APIFlask
-import pandas as pd
+from apps.datacheck import bp as datacheckbp
+from apps.delay import bp as scriptbp
# 注册蓝图
app = APIFlask(__name__, template_folder='./static/templates')
app.register_blueprint(scriptbp)
+app.register_blueprint(datacheckbp)
@app.get('/')
@@ -55,20 +54,21 @@ def nodeinfo():
# 注册代理
-def registernode(port=2525,atype="stgj",server="127.0.0.1:8888"):
- info=nodeinfo()
- info["port"]=port
- info["type"]=atype
- requests.post("http://"+server)
+def registernode(port=2525, atype="stgj", server="127.0.0.1:8888"):
+ info = nodeinfo()
+ info["port"] = port
+ info["type"] = atype
+ requests.post("http://" + server)
if __name__ == '__main__':
# 命令行参数设置
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--port", type=int, default=2525, help="代理的开放通信端口")
- parser.add_argument("-t", "--type", type=str, default="stgj", help="代理的工作类型 {stgj(渗透攻击) / mbgz(目标感知) / ztgz(状态感知)}")
- parser.add_argument("-s", "--server", type=str, default="127.0.0.1:8888",help="主控端访问地址+端口号")
+ parser.add_argument("-t", "--type", type=str, default="stgj",
+ help="代理的工作类型 {stgj(渗透攻击) / mbgz(目标感知) / ztgz(状态感知)}")
+ parser.add_argument("-s", "--server", type=str, default="127.0.0.1:8888", help="主控端访问地址+端口号")
args = parser.parse_args()
- registernode(args.port,server=args.server)
+ registernode(args.port, server=args.server)
app.run(host="0.0.0.0", debug=True, port=args.port)
diff --git a/agent/apps/datacheck.py b/agent/apps/datacheck.py
index 3c066cf..260c496 100644
--- a/agent/apps/datacheck.py
+++ b/agent/apps/datacheck.py
@@ -1,54 +1,50 @@
-# DNS记录测试接口
+# 状态感知——DNS记录测试接口
import dns.nameserver
-from apiflask import APIFlask,APIBlueprint
-from apiflask.fields import List,String,Nested
+from apiflask import APIBlueprint
+from apiflask.fields import String
from apiflask.validators import OneOf
from dns import resolver
-bp=APIBlueprint("check",__name__,url_prefix="/check")
+bp = APIBlueprint("check", __name__, url_prefix="/check")
+
@bp.route("/")
@bp.doc("通过指定的解析器获取指定域名的A/AAAA记录")
@bp.input({
- 'rev':String(required=True),
- 'domain':String(required=True),
- 'qtype':String(required=True,validate=OneOf(['A','AAAA',"CNAME","NS"]))
- },location='query')
+ 'rev': String(required=True),
+ 'domain': String(required=True),
+ 'qtype': String(required=True, validate=OneOf(['A', 'AAAA', "CNAME", "NS"]))
+}, location='query')
def record(query_data):
# 特殊协议头
- protols=["https","tls"]
- ans=[]
+ protols = ["https", "tls"]
+ ans = []
# 参数读取
- rev=query_data['rev']
- domain=query_data['domain']
- qtype=query_data['qtype']
+ rev = query_data['rev']
+ domain = query_data['domain']
+ qtype = query_data['qtype']
# 解析器配置
- myResolver=resolver.Resolver()
+ myResolver = resolver.Resolver()
# 根据rev参数配置解析器
- is_Do53=any(proto if proto in rev else False for proto in protols)
+ is_Do53 = any(proto if proto in rev else False for proto in protols)
if not is_Do53:
# 存在端口指定
if ":" in rev:
- res=rev.split(":")
- myResolver.nameservers=[dns.nameserver.Do53Nameserver(res[0],port=res[1])]
+ res = rev.split(":")
+ myResolver.nameservers = [dns.nameserver.Do53Nameserver(res[0], port=res[1])]
else:
- myResolver.nameservers=[rev]
+ myResolver.nameservers = [rev]
else:
# doh服务
if "https" in rev:
- myResolver.nameservers=[dns.nameserver.DoHNameserver(url=rev)]
+ myResolver.nameservers = [dns.nameserver.DoHNameserver(url=rev)]
# dot服务
if "tls" in rev:
- res=rev.split("//")
- myResolver.nameservers=[dns.nameserver.DoTNameserver(address=res[1])]
+ res = rev.split("//")
+ myResolver.nameservers = [dns.nameserver.DoTNameserver(address=res[1])]
myAnswers = myResolver.resolve(domain, qtype)
if myAnswers.rrset is not None:
for r in myAnswers.rrset:
- ans.append({"rrset":str(r)})
- return {'ans':ans}
-
-
-
-
-
+ ans.append({"rrset": str(r)})
+ return {'ans': ans}
diff --git a/agent/apps/delay.py b/agent/apps/delay.py
new file mode 100644
index 0000000..578860b
--- /dev/null
+++ b/agent/apps/delay.py
@@ -0,0 +1,51 @@
+# 状态感知——时延测试
+import time
+
+import apps.utiltcping as utping
+import dns.nameserver
+from dns import resolver
+from flask import Blueprint, request
+from icmplib import ping
+
+bp = Blueprint("delay", __name__, url_prefix="/delay")
+
+
+def hello():
+ return "hello"
+
+
+# icmp时延测试
+def icmpdelay():
+ addr = request.args.get("ip")
+ host = ping(addr, count=2, interval=0.1, timeout=2)
+ return str(host.avg_rtt)
+
+
+def tcpdelay():
+ addr = request.args.get("ip")
+ port = request.args.get("port", default=53)
+ tdelay = utping.cli(addr, port, 2, 2)
+ return str(tdelay)
+
+
+# dns时延测试函数,返回执行完成时的时间戳
+def dnsdelay():
+ addr = request.args.get("ip")
+ port = request.args.get("port", default=53)
+ domain = request.args.get("domain", default="www.baidu.com")
+ qtype = request.args.get("qtype", default="A")
+
+ start_time = time.time()
+ # 解析器配置
+ myResolver = resolver.Resolver()
+ myResolver.nameservers = [dns.nameserver.Do53Nameserver(addr, port=port)]
+
+ # 开始解析
+ ans = myResolver.resolve(domain, qtype, lifetime=4)
+ stop_time = time.time()
+ t = stop_time - start_time
+ return str(format(t * 1000, ".3f"))