summaryrefslogtreecommitdiff
path: root/server/apps
diff options
context:
space:
mode:
authorshihaoyue <[email protected]>2024-06-19 20:42:55 +0800
committershihaoyue <[email protected]>2024-06-19 20:42:55 +0800
commitbafae949d421cbff2a1cadaa62b6105fea09df99 (patch)
treea88c763045ded9be17ab967975a9d19540e2f2ca /server/apps
parent088d364c5cafce49e1aea5c6bbef65c64c5390e2 (diff)
优化server下发任务流程
Diffstat (limited to 'server/apps')
-rw-r--r--server/apps/agentcomm.py45
-rw-r--r--server/apps/policy.py2
-rw-r--r--server/apps/task.py15
3 files changed, 38 insertions, 24 deletions
diff --git a/server/apps/agentcomm.py b/server/apps/agentcomm.py
index c52d224..103b3f1 100644
--- a/server/apps/agentcomm.py
+++ b/server/apps/agentcomm.py
@@ -90,24 +90,35 @@ def register_agent(json_data):
# 代理任务下发
-def deliver_task(agent_id, policy, policy_param):
- if policy == "ops":
- # TODO:控制代理
- return None
- else:
- agent = db.session.query(Agent).get(agent_id)
- ip, port = agent.ipaddr.split("|")[0], agent.port
- try:
- res = requests.post("http://%s:%s/script" % (ip, port), timeout=3,
- json={'policy': policy, 'param': policy_param})
- if res.status_code == 200:
- return None
- error("任务分发错误,code: " + str(res.status_code) + " err: " + str(res.text))
- return res.text
- except Exception as e:
- error(str(e))
- return str(e)
+def deliver_task(task_policy):
+ agent = db.session.query(Agent).get(task_policy.task.agent_id)
+ ip, port = agent.ipaddr.split("|")[0], agent.port
+ try:
+ res = requests.post("http://%s:%s/script/execute" % (ip, port), timeout=3,
+ json={'policy': task_policy.policy_ref.p_exe,
+ 'param': task_policy.policy_param,
+ 'id' : task_policy.tp_id,
+ })
+ if res.status_code == 200:
+ return None
+ error("任务分发错误,code: " + str(res.status_code) + " err: " + str(res.text))
+ return res.text
+ except Exception as e:
+ error(str(e))
+ return str(e)
+def stop_task(agent_id):
+ agent = db.session.query(Agent).get(agent_id)
+ ip, port = agent.ipaddr.split("|")[0], agent.port
+ try:
+ res = requests.post("http://%s:%s/script" % (ip, port), timeout=3,)
+ if res.status_code == 200:
+ return None
+ error("任务停止错误,code: " + str(res.status_code) + " err: " + str(res.text))
+ return res.text
+ except Exception as e:
+ error(str(e))
+ return str(e)
@bp.post("/res")
@bp.doc("代理输出消息接收接口", hide=True)
diff --git a/server/apps/policy.py b/server/apps/policy.py
index e834fd2..e1de69b 100644
--- a/server/apps/policy.py
+++ b/server/apps/policy.py
@@ -63,4 +63,4 @@ def init_task_policy(ptype, target, task):
db.session.add(task_policy)
db.session.commit()
- return task_policy.tp_id, p_exe, p_param
+ return task_policy
diff --git a/server/apps/task.py b/server/apps/task.py
index 6f65000..d5d508e 100644
--- a/server/apps/task.py
+++ b/server/apps/task.py
@@ -9,7 +9,7 @@ from sqlalchemy import and_
from sqlalchemy.exc import SQLAlchemyError
from model import Task, Agent
-from .agentcomm import deliver_task
+from .agentcomm import deliver_task, stop_task
from .policy import *
from .util import error
@@ -141,11 +141,11 @@ def make_task(json_data):
return {"code": 500, "msg": str(e)}
# 任务策略初始化
- tp_id, p_exe, p_param = init_task_policy(json_data["policy"], task.target_ip, task.task_id)
+ task_policy = init_task_policy(json_data["policy"], task.target_ip, task.task_id)
# 根据状态判断是否立刻执行
if task.status == "working":
- err = deliver_task(task.agent_id, p_exe, p_param)
+ err = deliver_task(task_policy)
if err is not None:
error(str(err))
return {"code": 500, "msg": str(err)}
@@ -182,9 +182,12 @@ def ops_task(json_data):
return {"code": 400, "msg": "任务" + id + "不存在!!"}
# 判断操作是否是逻辑上可执行的
if task.status in ops_state[ops]:
- err = deliver_task(task.agent_id, "ops", ops)
- if err is not None:
- return {"code": 500, "msg": str(err)}
+ if (ops=="stop" or ops == "cancel"):
+ stop_task()
+ elif (ops == "start"):
+ deliver_task(task, )
+ # if err is not None:
+ # return {"code": 500, "msg": str(err)}
task.status = ops_state_result[ops]
db.session.commit()
return {"code": 200, "msg": "任务" + task.task_id + "已" + ops_cn}