1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
|
# 任务管理接口
import datetime
import random
from apiflask import APIBlueprint, Schema
from apiflask.fields import String, Integer, IP, DateTime, List, Nested
from apiflask.validators import OneOf
from .util import fake, da
bp = APIBlueprint("任务管理接口集合", __name__, url_prefix="/task")
# 数据库列与返回值的键对应关系
task_response_map = {
"TASK_ID": "id",
"TARGET_IP": "target",
"TASK_NAME": "name",
"AGENT_ID": "agent",
"TARGET_DOMAIN": "target_domain",
"TARGET_RR": "target_rr",
"POLICY": "policy",
"CREATE_TIME": "create_time",
"STATUS": "status",
}
class Task(Schema):
id = Integer()
target = String()
name = String()
agent = String()
target_domain = String()
target_rr = String()
policy = String()
create_time = DateTime()
status = String(validate=OneOf(["working", "stop", "finish"]))
# 执行输出日志
class TaskLog(Schema):
# 输出时间
time = DateTime()
# 输出来源IP
ip = String()
# 该策略的目标IP
targetip = String()
# 日志等级
level = String(validate=OneOf(["INFO", "WARNING", "ERROR"]))
# 日志内容
info = String()
# 任务状态时间轴信息
class TaskState(Schema):
# 策略开始执行时间
start_time = DateTime()
# 策略名称
policy_name = String()
# 策略参数
policy_param = String()
# 执行策略编号
policy_id = String()
# 效果评估
policy_status = String()
# 创建任务接口
@bp.post("/create")
@bp.doc("任务创建接口", "部分字段值的映射关系:</br>"
+ "policy 期望策略,可选参数范围及对应含义为:,auto-自动,ddos-拒绝服务,sjqp-数据欺骗</br>"
+ "scan 状态感知方式,可选参数范围及对应含义为:auto-自动,icmp-icmp/v6时延,tcp-tcp时延,dns-dns时延,record-记录正确性验证")
@bp.input({
# 任务名称
"name": String(),
# 目标IP
"target": IP(),
# 执行代理
"agent": String(),
# 目标域名
"target_domain": String(),
# 期望注入记录
"target_rr": String(),
# 期望策略
"policy": String(validate=OneOf(["auto", "ddos", "sjqp"])),
# 状态感知方式
"scan": String(validate=OneOf(["auto", "icmp", "dns", "tcp", "record"])),
# 策略切换时限
"policy_time": Integer(),
# 任务执行时限
"run_time": Integer(),
# 运行配置
"run_flag": String(validate=OneOf(["now", "man"]))
})
@bp.output({
"code": Integer(),
"msg": String()
})
# TODO:创建任务接口具体实现
def make_task(json_data):
print(json_data)
return {"code": 200, "msg": "ok"}
opsmap = {"start": "开始", "stop": "暂停", "cancel": "停止"}
# 操作任务开始停止控制接口
@bp.post("/ops")
@bp.doc("任务操作接口")
@bp.input({
"taskid": String(required=True),
"ops": String(required=True, validate=OneOf(["start", "stop", "cancel"]))
})
@bp.output({
"code": Integer(),
"msg": String(),
})
# TODO:操作任务开始停止控制接口具体实现
def ops_task(json_data):
ops = opsmap[json_data["ops"]]
return {"code": 200, "msg": "任务" + json_data["taskid"] + "已" + ops}
# 查询任务列表接口
@bp.get("/")
@bp.doc("任务列表信息获取接口")
@bp.input({
"page": Integer(load_default=1),
"per_page": Integer(load_default=10)
}, location="query")
@bp.output({
"code": Integer(),
"data": List(Nested(Task())),
"total": Integer()
})
# TODO:查询任务状态接口具体实现
def tasks_state(query_data):
per_page = query_data["per_page"]
page = query_data["page"]
# 任务列表
task_list = []
res = da.get_data(data_type="task",
offset=(page - 1) * per_page, limit=per_page)
res_count = da.count_data(data_type="task")
for r in res:
task = {}
for key, value in r.items():
task[task_response_map[key]] = value
task_list.append(task)
return {"code": 200, "data": task_list, "total": res_count}
# 任务详情接口
@bp.get("/detail")
@bp.doc("任务执行状态时间轴信息获取接口")
@bp.input({
"taskid": String(required=True),
}, location="query")
@bp.output({
"code": Integer(),
"data": List(Nested(TaskState()))
})
# TODO:任务详情接口具体实现
def task_info(query_data):
print(query_data)
round = random.randint(1, 10)
task_state_list = []
# 过往记录
for _ in range(round):
task_state_list.append({
"start_time": fake.date_time_between(start_date="-1y"),
"policy_name": random.choice(["IPv6", "DNSSEC", "DoT", "DoH"]) + " " + random.choice(["DDoS", "数据欺骗"]),
"policy_param": random.choice(["攻击速率: 1000pps", "目标域名: www.google.com | 目标记录: NS attack.com"]),
"policy_id": str(fake.random.randint(1, 10000)),
"policy_status": "无效;原因为:超时未成功"
})
# 当前正在执行的策略
task_state_list.append({
"start_time": datetime.datetime.now(),
"policy_name": random.choice(["IPv6", "DNSSEC", "DoT", "DoH"]) + " " + random.choice(["DDoS", "数据欺骗"]),
"policy_param": random.choice(["攻击速率: 1000pps", "目标域名: www.google.com | 目标记录: NS attack.com"]),
"policy_id": str(fake.random.randint(1, 10000)),
"policy_status": "评估中"
})
return {"code": 200, "data": task_state_list}
@bp.get("/tp")
@bp.doc("任务策略执行日志获取接口")
@bp.input({
"id": String(required=True),
"per_page": Integer(load_default=10),
"page": Integer(load_default=1)
}, location="query")
@bp.output({
"code": Integer(),
"data": List(Nested(TaskLog())),
"total": Integer()
})
def taskpolicy_log(query_data):
per_page = query_data["per_page"]
page = query_data["page"]
policy_output = [{
"time": fake.date_time_between(start_date="-1y"),
"ip": "192.168.1.1",
"targetip": "2406:1234:1234:1234:1234:1234:1234:1234",
"level": random.choice(["INFO", "WARNING", "ERROR"]),
"info": fake.text(max_nb_chars=20, ext_word_list=None)
} for _ in range(per_page)]
return {"code": 200, "data": policy_output, "total": 10 * per_page}
|