1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
# 渗透、目标参数感知脚本
import os
import subprocess
from concurrent.futures import ThreadPoolExecutor
import threading
import time
from flask import request, jsonify
from apiflask import APIBlueprint
from apiflask.fields import String
import requests
import yaml
from .util import debug
bp = APIBlueprint('script', __name__, url_prefix='/script')
# 线程池
executor = ThreadPoolExecutor(5)
BASE_PATH = "./apps/code/"
# config
config = {}
@bp.post('/')
@bp.doc("渗透任务参数接收接口", "返回任务执行状态")
@bp.input({
'policy': String(required=True),
'param': String(required=True)
})
def start_script(json_data):
# 执行命令
exe = [BASE_PATH + json_data['policy']]
# 执行参数
params = json_data['param'].split()
if '.py' in json_data['policy']:
exe = ["python"] + exe
# 通过
executor.submit(process_script, exe + params)
return {"code": 200, "msg": "ok"}
def process_script(command):
# 开始执行命令,不等待其执行完毕
process = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
output = process.stdout.readline().rstrip().decode('utf-8')
if output == '' and process.poll() is not None:
break
if output == '':
continue
if output:
debug(output.strip())
# 获取子进程的返回值
rc = process.poll()
return rc
# 用于存储命令结果和控制信息的字典
tasks = {}
tasks_lock = threading.Lock()
def execute_command(cmd, job_id, stop_event):
with open("./config.yaml", "r") as f:
config = yaml.safe_load(f)
server_url = config['server']
try:
proc = subprocess.Popen(cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
text=True,
encoding='utf-8')
tasks[job_id]['output'] = ''
while proc.poll() is None:
if stop_event.is_set():
proc.terminate()
tasks[job_id]['output'] += "\nExecution terminated."
break
line = proc.stdout.readline()
print(line)
if line:
with tasks_lock:
tasks[job_id]['output'] += line
# 发送每行日志到服务器
response = requests.post(f"http://{server_url}/agent/res",
json={"id" : config['id'],
"taskpolicy" : job_id,
"level" : "INFO",
"info" : line})
time.sleep(1)
if response.status_code != 200:
print(f"Failed to send log to server for job {job_id}: {response.text}")
# proc.stdout.close()
if not stop_event.is_set():
remaining_output = proc.communicate()[0]
with tasks_lock:
tasks[job_id]['output'] += remaining_output
# 发送剩余输出到服务器
if remaining_output:
response = requests.post(f"http://{server_url}/agent/res", json={"id" : config['id'],
"taskpolicy" : job_id,
"level" : "INFO",
"info" : remaining_output})
if response.status_code != 200:
print(f"Failed to send remaining log to server for job {job_id}: {response.text}")
response = requests.post(f"http://{server_url}/agent/res", json={"id" : config['id'],
"taskpolicy" : job_id,
"level" : "INFO",
"info" : "finish"})
except subprocess.CalledProcessError as e:
with tasks_lock:
tasks[job_id]['output'] = e.output
response = requests.post(f"http://{server_url}/agent/res", json={"id" : config['id'],
"taskpolicy" : job_id,
"level" : "ERROR",
"info" : e.output})
if response.status_code != 200:
print(f"Failed to send error log to server for job {job_id}: {response.text}")
except Exception as e:
print(e)
@bp.route('/execute', methods=['POST'])
def execute():
data = request.json
script_path = BASE_PATH + data.get('policy')
# 检查脚本文件是否存在
if not os.path.isfile(script_path):
return jsonify({'error': f'Script file {script_path} not found'}), 404
# 根据脚本类型构建命令
if '.py' in script_path:
command = f"python3 -u {script_path} {data.get('param')}"
elif '.go' in script_path:
command = f"go run {script_path} {data.get('param')}"
else:
command = f"{script_path} {data.get('param')}"
# 生成一个唯一的job_id
job_id = data.get('id')
stop_event = threading.Event()
with tasks_lock:
tasks[job_id] = {'output': None, 'stop_event': stop_event}
# 提交任务到线程池
executor.submit(execute_command, command, job_id, stop_event)
return '202'
@bp.route('/results/<job_id>', methods=['GET'])
def get_result(job_id):
with tasks_lock:
if job_id not in tasks:
return jsonify({'error': 'Job ID not found or still running'}), 404
return jsonify({'job_id': job_id, 'output': tasks[job_id]['output']})
@bp.route('/stop/<job_id>', methods=['POST'])
def stop_job(job_id):
with tasks_lock:
if job_id not in tasks:
return jsonify({'error': 'Job ID not found'}), 404
tasks[job_id]['stop_event'].set()
return jsonify({'message': f'Job {job_id} termination initiated'}), 200
|