summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorshihaoyue <[email protected]>2024-10-21 16:53:10 +0800
committershihaoyue <[email protected]>2024-10-21 16:57:50 +0800
commit97715cc22c0aea492897d01bdc0441e08dc1e79f (patch)
treeef9356b9e6a2227d1f6a9a7fff63d058044eade1
parent6af369b1f4342aba8080fe40c74733cf3ba04f6d (diff)
first commit
-rw-r--r--.gitkeep0
-rw-r--r--__pycache__/kimi_main.cpython-311.pycbin0 -> 13830 bytes
-rw-r--r--__pycache__/kimi_main2.cpython-311.pycbin0 -> 14168 bytes
-rw-r--r--content_role_system.txt106
-rw-r--r--kimi_main.py313
-rw-r--r--kimi_main2.py383
-rw-r--r--kimi_strategy.py31
7 files changed, 833 insertions, 0 deletions
diff --git a/.gitkeep b/.gitkeep
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/.gitkeep
diff --git a/__pycache__/kimi_main.cpython-311.pyc b/__pycache__/kimi_main.cpython-311.pyc
new file mode 100644
index 0000000..c858a2d
--- /dev/null
+++ b/__pycache__/kimi_main.cpython-311.pyc
Binary files differ
diff --git a/__pycache__/kimi_main2.cpython-311.pyc b/__pycache__/kimi_main2.cpython-311.pyc
new file mode 100644
index 0000000..b0ee635
--- /dev/null
+++ b/__pycache__/kimi_main2.cpython-311.pyc
Binary files differ
diff --git a/content_role_system.txt b/content_role_system.txt
new file mode 100644
index 0000000..1a9c54b
--- /dev/null
+++ b/content_role_system.txt
@@ -0,0 +1,106 @@
+为了响应国家要求对网络基础资源设施的安全能力建设,我需要构建一套面向DNS体系的自动化渗透测试系统,我希望借助你的能力来实现自动化策略调整的部分内容。在这项任务中,你的身份是一位DNS方向的安全技术专家,擅长网络分析和渗透测试及结果分析。我已经实现了11个具有针对性的渗透测试脚本,在最后会给你详细介绍他们的功能。
+你需要根据任务最终目标和目标DNS服务器的当前状态,挑选合适的脚本,可能还要给出对应参数值,我来执行实际的脚本。每次我将告诉你当前针对目标DNS节点执行的脚本信息和目标DNS节点状态,你可以根据这些状态信息来判断渗透脚本是否有效,以及是否需要调整参数等等。
+现在让我们来定义一下策略调整部分的整体宏观思路:1. 尽可能完成渗透测试的最终目的及预期效果;2. 在未达成目的之前确保所有合适的脚本都尝试一遍,若判断当前状态无法完成渗透测试最终目的则主动结束策略调整;3. 尽可能用较少的策略调整步骤实现最终目的;4. 考虑各种脚本及脚本参数设置时的实际影响,不设置明显不合理的参数;5. 按照规定的流程执行,不需要输出多余的响应;6. 按照整体目标描述与脚本介绍中基本功能中的描述,选择合适的拒绝服务或安全性降低两种类型中的一种;7. 若攻击目的是拒绝服务,则你应该通过调整攻击策略让目标节点服务应答率降低。
+总共有两种渗透目标:1. 拒绝服务,使用ddos脚本,使得目标DNS服务器不能响应正常DNS请求;2. 数据欺骗,使用inject或poisoning脚本,使得目标DNS服务器响应错误的数据。
+当前渗透目标节点的特点如下:%s
+当前任务目标:%s
+每次我会向你发送当前执行的脚本名称,以及目标节点的状态,你告诉我是否需要调整,调整理由。
+我的输入为json格式:{
+ "script": <当前执行的脚本名称>,
+ "params": {
+ "<当前参数名>": "<当前参数值>",
+ ...
+ },
+ "target_status": {
+ "response_rate": <目标节点的服务应答率>,
+ "icmp_rr": <目标节点的icmp响应时延>,
+ "udp_rr": <目标节点的udp响应时延>,
+ "tcp_rr": <目标节点的tcp响应时延>,
+ "pps": <无实际意义,可以忽略>
+ },
+ "last_exe_desc": <上次执行结果的描述,此处为空时,你需要根据目标节点状态判断>
+}
+你的返回为json格式:{
+ "next_step": "adjust_params"/"adjust_script"/"keep",
+ "next_script": <推荐执行的脚本名称>
+ "params": {
+ "<参数名>": "<推荐的参数值>",
+ ...
+ },
+ "reason": <做出此判断的依据>
+}
+最后,我将告诉你目前我已经有的11个可选的渗透测试脚本,如下:
+脚本1:
+- 名称:DNSSEC DDoS
+- 脚本限制:支持DNSSEC功能的DNS解析器可使用此脚本
+- 脚本功能:使得目标节点拒绝服务。通过向目标域名解析服务器发送特定请求,使目标服务器服务质量下降或无法提供服务,形成拒绝服务攻击
+- 脚本参数:{"r": 发送请求速率(默认为300), "n": 发送请求总量(默认为60000)}
+
+脚本2:
+- 名称:DNSSEC downgrade
+- 脚本限制:开启DNSSEC功能的DNS解析器可使用此脚本
+- 脚本功能:使目标DNS解析器不对DNSSEC记录进行验证,若目标DNS解析器被降级成功,则可以使用其它DDoS脚本
+- 脚本可调参数:无
+
+脚本3:
+- 名称:DNS IPv6 DDoS
+- 脚本限制:开启IPv6地址的DNS解析器可使用此脚本
+- 脚本功能:构造数据包,发送给目标IPv6 DNS解析器,使其拒绝服务。
+- 脚本可调参数:{"n", 发送数据包总量}
+
+脚本4:
+- 名称:DNS IPv6 inject
+- 脚本限制:开启IPv6地址的DNS解析器可使用此脚本
+- 脚本功能:DNS注入,向目标解析器注入一条目标域名的伪造AAAA记录
+- 脚本可调参数:无
+- 执行后状态集合:{注入成功、注入失败、 脚本执行中}
+
+脚本5:
+- 名称:DNS IPv6 poisoning
+- 脚本限制:开启IPv6地址的DNS解析器可使用此脚本
+- 脚本功能:DNSv6篡改,将目标解析器中目标域名的AAAA记录篡改成伪造的地址
+- 脚本可调参数:无
+- 执行后状态集合:{篡改成功、篡改失败、脚本执行中}
+
+脚本6:
+- 名称:DoT DDoS
+- 脚本限制:开启DoT功能的DNS解析器可使用此脚本
+- 脚本功能:使得目标DoT解析器拒绝服务,若脚本执行成功则目标响应率应该很低且响应时延很高
+- 脚本可调参数:{"n": 使用进程数(默认为4), "r": 每个进程发送请求次数(默认为5),"w": 脉冲等待时长(默认为300秒)}
+- 执行后状态集合:{实时目标状态感知结果、脚本执行中}
+
+脚本7:
+- 名称:DoH DDoS
+- 脚本限制:开启DoH功能的DNS解析器可使用此脚本
+- 脚本功能:使得目标DoH解析器拒绝服务(利用HTTP2快速重置),若脚本执行成功则目标响应率应该很低且响应时延很高
+- 脚本可调参数:{"n": 使用进程数(默认为4)}
+- 执行后状态集合:{实时目标状态感知结果、脚本执行中}
+
+脚本8:
+- 名称:DoH poisoning
+- 脚本限制:开启DoH功能的DNS解析器可使用此脚本
+- 脚本功能:(安全性降低)使目标DoH解析器的响应结果被篡改
+- 脚本可调参数:无
+- 执行后状态集合:{篡改成功、篡改失败、脚本执行中}
+
+脚本9:
+- 名称:DoT poisoning
+- 脚本限制:开启DoT功能的DNS解析器可使用此脚本
+- 脚本功能:(安全性降低)使目标DoT解析器的响应结果被篡改
+- 脚本可调参数:无
+- 执行后状态集合:{篡改成功、篡改失败、脚本执行中}
+
+脚本10:
+- 名称:DoH inject
+- 脚本限制:开启DoH功能的DNS解析器可使用此脚本
+- 脚本功能:(安全性降低)向目标解析器注入一条目标域名的伪造记录
+- 脚本可调参数:无
+- 执行后状态集合:{注入成功、注入失败、脚本执行中、脚本执行失败}
+
+脚本11:
+- 名称:DoT inject
+- 脚本限制:开启DoT功能的DNS解析器可使用此脚本
+- 脚本功能:(安全性降低)向目标解析器注入一条目标域名的伪造记录
+- 脚本可调参数:无
+- 执行后状态集合:{注入成功、注入失败、脚本执行中、脚本执行失败}
+
diff --git a/kimi_main.py b/kimi_main.py
new file mode 100644
index 0000000..3fef6d8
--- /dev/null
+++ b/kimi_main.py
@@ -0,0 +1,313 @@
+from openai import OpenAI
+
+def convert_json_to_text(data):
+ text = []
+
+ # 判断 DNSSEC 启用情况
+ if data.get("dnssec_enabled", False):
+ text.append("(2)开启DNSSEC")
+ else:
+ text.append("(2)未开启DNSSEC")
+
+ # 是否使用 0x20 编码(域名随机化大小写)
+ if data.get("0x20_encoding", False):
+ text.append("(3)不可以使用缓存投毒")
+ else:
+ text.append("(3)可以使用缓存投毒")
+
+ # 判断是否支持DDoS
+ if data.get("merge_dup", False):
+ text.append("(4)不适用DDoS")
+ else:
+ text.append("(4)可使用DDoS")
+
+ # 判断是否适用 DNSSEC_DDoS
+ if data.get("dnssec_enabled", False) and not data.get("random_port", False):
+ text.append("(5)不适用DNSSEC_DDoS")
+ else:
+ text.append("(5)适用DNSSEC_DDoS")
+
+ # 判断是否适用放大攻击
+ if data.get("max_ns_cnt", 0) > 4:
+ text.append("(6)适用放大攻击")
+ else:
+ text.append("(6)不适用放大攻击")
+
+ # 是否开启 DoH 和 DoT
+ if data.get("edns_support", False):
+ text.append("(7)开启了DoH")
+ else:
+ text.append("(7)未开启DoH")
+
+ text.append("(8)开启了DoT") # 假设DoT始终开启
+
+ # IPv6 启用情况
+ text.append("(9)开启了IPv6功能") # 假设IPv6始终开启
+
+ # 返回拼接好的文本
+ return ";".join(text)
+
+
+client = OpenAI(
+ api_key="sk-wFEcyFj8sS2mbncCEqYK5uxYa6VyQyJrQ8reKPFYzxQcwt3i",
+ base_url="https://api.moonshot.cn/v1",
+)
+
+# 加载背景介绍信息
+with open('./content_role_system.txt', 'r', encoding='utf-8') as f:
+ content_system = f.read()
+
+output_constraints = "请按下面规定的格式输出策略调整结果,从下面的集合中选择一种决策输出(不需要其他任何文字反馈,我需要处理格式化输出的内容):{保持当前策略执行、调整脚本参数:参数代称 原始的数值->新的数值、更换攻击脚本:脚本名称}"
+
+mode_mapping = {"保持当前策略执行": "keep",
+ "调整脚本参数": "parameter",
+ "更换攻击脚本": "script"}
+
+history = [
+ {"role": "system",
+ "content": content_system}
+]
+
+
+def chat(query, history):
+ history.append({
+ "role": "user",
+ "content": query
+ })
+ completion = client.chat.completions.create(
+ model="moonshot-v1-32k",
+ messages=history,
+ temperature=0.3,
+ )
+ result = completion.choices[0].message.content
+ history.append({
+ "role": "assistant",
+ "content": result
+ })
+ return result
+
+
+def initial(content):
+ global history
+ history = [
+ {"role": "system",
+ "content": content_system}
+ ]
+
+ clxz_map = {
+ "ddos": "拒绝服务",
+ "sjqp": "数据欺骗"
+ }
+
+ try:
+ clxz = clxz_map.get(content["clxz"], None) # 策略选择
+ # _mbgz = # 目标感知
+ mbgz = convert_json_to_text(content["mbgz"])
+ ztgz = content["ztgz"] # 状态感知
+ _script = content["script"] # 脚本
+ _param = content["para"] # 传来的脚本参数是字典形式的
+ param = "" # 将参数转换成字符串
+ for p, v in _param.items():
+ if param == "":
+ param += f"{p}-{v}"
+ else:
+ param += f"-{p}-{v}"
+ script = f"{_script.replace(' ', '_')};{_script.replace(' ', '_').lower()}_{param}"
+ except Exception as e:
+ raise Exception('post上传的数据key解析失败', e)
+
+ # 构建待上传ai的策略初始化信息
+ assert clxz in ["拒绝服务", "数据欺骗"], "策略选择只能是'拒绝服务'或者'数据欺骗'"
+ clxz_context = f"1. 测试的最终目的是:{clxz}"
+
+ mbgz_context = f"2. 当前目标节点的18维不变特性是:{mbgz}"
+
+ script = script.replace(';', ';').split(';')
+ script_context = f"3. 当前执行的脚本名称为{script[0]};" + ("脚本参数选择为" + ";".join(script[1:]) if len(script) > 1 else "")
+
+ ztgz_context = f"4. 当前(未执行脚本前)目标节点的状态参数为:DNS服务应答率{ztgz['record']*100}%;icmp时延{ztgz['icmp']}ms;udp时延{ztgz['dns']}ms;tcp时延{ztgz['tcp']}ms;pps为200kpps"
+
+ context = '\n'.join([clxz_context, mbgz_context, script_context, ztgz_context])
+
+ # 初始化交互不需要输出响应
+ chat(context, history)
+ return 1
+
+
+def adjust_to_dict(adjust: str):
+ """
+ 将大模型返回的script字符串转换成字典,如 doh_ddos-n-100-200-abc-1000-f, 将被转换为
+ {
+ "script": "doh_ddos", (此处doh_ddos也可能被映射成其它的,具体跟mp中的映射关系有关)
+ "parameter": {
+ "n": ["100", "200"],
+ "abc": ["1000"],
+ "f": []
+ }
+ }
+ """
+ print(adjust)
+ if adjust == "none":
+ return {
+ "script": "none",
+ "parameter": {
+ },
+ }
+ # 这个字典负责将大模型返回的策略名称映射到规定的名称
+ mp = {
+ "v6_数据篡改": "V6 数据篡改",
+ "<v6_数据篡改>": "V6 数据篡改",
+ "v6_poisoning": "V6 数据篡改",
+ "<v6_poisoning>": "V6 数据篡改",
+ "dnssec_ddos": "DNSSEC ddos",
+ "<dnssec_ddos>": "DNSSEC ddos",
+ "DNSSEC_DDoS": "DNSSEC ddos",
+ "<DNSSEC_DDoS>": "DNSSEC ddos",
+ "dnssec_降级": "DNSSEC 降级",
+ "<dnssec_降级>": "DNSSEC 降级",
+ "DNSSEC_downgrade": "DNSSEC 降级",
+ "<DNSSEC_downgrade>": "DNSSEC 降级",
+ "dnssec_downgrade": "DNSSEC 降级",
+ "<dnssec_downgrade>": "DNSSEC 降级",
+ "DNS_IPv6_DDoS": "V6 DDoS",
+ "<DNS_IPv6_DDoS>": "V6 DDoS",
+ "ipv6_ddos": "V6 DDoS",
+ "<ipv6_ddos>": "V6 DDoS",
+ "v6_ddos": "V6 DDoS",
+ "<v6_ddos>": "V6 DDoS",
+ "DNS_IPv6_inject": "V6 数据篡改",
+ "<DNS_IPv6_inject>": "V6 数据篡改",
+ "v6_数据篡改": "V6 数据篡改",
+ "<v6_数据篡改>": "V6 数据篡改",
+ "dns_ipv6_inject": "V6 数据篡改",
+ "<dns_ipv6_inject>": "V6 数据篡改",
+ "DNS_IPv6_poisoning": "V6 数据篡改",
+ "<DNS_IPv6_poisoning>": "V6 数据篡改",
+ "dot_ddos": "DoT DDoS",
+ "<dot_ddos>": "DoT DDoS",
+ "DoT_DDoS": "DoT DDoS",
+ "<DoT_DDoS>": "DoT DDoS",
+ "dot_poisoning": "DoT 数据篡改",
+ "<dot_poisoning>": "DoT 数据篡改",
+ "DoT_poisoning": "DoT 数据篡改",
+ "<DoT_poisoning>": "DoT 数据篡改",
+ "dot_数据篡改": "DoT 数据篡改",
+ "<dot_数据篡改>": "DoT 数据篡改",
+ "dot_数据注入": "DoT 数据注入",
+ "<dot_数据注入>": "DoT 数据注入",
+ "dot_inject": "DoT 数据注入",
+ "<dot_inject>": "DoT 数据注入",
+ "DoT_inject": "DoT 数据注入",
+ "<DoT_inject>": "DoT 数据注入",
+ "DoH_DDoS": "DoH DDoS",
+ "<DoH_DDoS>": "DoH DDoS",
+ "doh_ddos": "DoH DDoS",
+ "<doh_ddos>": "DoH DDoS",
+ "doh_数据注入": "DoH 数据注入",
+ "<doh_数据注入>": "DoH 数据注入",
+ "doh_inject": "DoH 数据注入",
+ "<doh_inject>": "DoH 数据注入",
+ "DoH_inject": "DoH 数据注入",
+ "<DoH_inject>": "DoH 数据注入",
+ "doh_数据篡改": "DoH 数据篡改",
+ "<doh_数据篡改>": "DoH 数据篡改",
+ "doh_poisoning": "DoH 数据篡改",
+ "<doh_poisoning>": "DoH 数据篡改",
+ "DoH_poisoning": "DoH 数据篡改",
+ "<DoH_poisoning>": "DoH 数据篡改"
+ }
+ adjust = adjust.replace("<", "")
+ adjust = adjust.replace(">", "")
+ index = 0
+ for i in range(len(adjust)):
+ if adjust[i] == '_':
+ index = i
+ if adjust[i] == '-':
+ index = i
+ break
+ if '-' not in adjust:
+ script = adjust
+ param_list = []
+ else:
+ script = adjust[:index]
+ param_list = adjust[index+1:].split('-')
+
+ res_dict = {}
+ left, right = 0, 0
+ def is_parma_value(s):
+ """
+ 这个函数只会在下面的while中使用,作用是判断一个字符串,是数值串还是字符串
+ """
+ try:
+ int(s)
+ except:
+ return False
+ return True
+ while len(param_list):
+ right += 1
+ while right < len(param_list) and is_parma_value(param_list[right]):
+ right += 1
+ curr_param = param_list[left]
+ res_dict[curr_param] = []
+ left += 1
+ while left < right:
+ res_dict[curr_param].append(int(param_list[left]))
+ left += 1
+ # 将数组处理成一个数值,如果脚本参数存在一参对多值的情况如 -n 1 2 3 4, 则删除此部分
+ if len(res_dict[curr_param]):
+ res_dict[curr_param] = sum(res_dict[curr_param])//len(res_dict[curr_param])
+ else:
+ res_dict[curr_param] = None
+ if left >= len(param_list):
+ break
+
+ try:
+ return {
+ "script": mp[script],
+ "parameter": res_dict
+ }
+ except:
+ return {
+ "script": script,
+ "parameter": res_dict
+ }
+ # raise f"在将大模型返回的脚本字符串转换成字典时出现错误,原因是 mp 中并没有键: {script}"
+
+
+def adjusting(content):
+ try:
+ ztgz = content["ztgz"] # 状态感知
+ except Exception as e:
+ raise Exception("post上传的数据key解析失败", e)
+
+ context = f"当前目标节点的状态感知结果为:DNS服务应答率{ztgz['record']*100}%;icmp时延{ztgz['icmp']}ms;udp时延{ztgz['dns']}ms;tcp时延{ztgz['tcp']}ms;pps为200kpps\n{output_constraints}"
+
+ response = chat(context, history)
+
+ mode = response.replace(":", ":").split(":")[0]
+ assert mode in ["保持当前策略执行", "调整脚本参数", "更换攻击脚本"], f"响应出现非标情况,当前模式响应为{mode}"
+ mode_context = mode_mapping[mode]
+
+ try:
+ if mode_context == "keep":
+ adjust_context = "none"
+ elif mode_context == "parameter":
+ # 例如:模型响应:调整脚本参数:doh_ddos_n 4->8 输出adjust参数为doh_ddos_n-8
+ para = response.replace(":", ":").split(":")[1]
+ adjust_context = "-".join([para.split(" ")[0], para.split(">")[-1]])
+ else:
+ # 例如:模型响应:更换攻击脚本:DoT_DDoS 输出adjust参数为DoT_DDoS
+ adjust_context = response.replace(":", ":").split(":")[1]
+ except Exception as e:
+ raise Exception("处理模型规范化输出错误", e)
+
+ # # debug
+ # print(history)
+ res = adjust_to_dict(adjust_context)
+ res.update({"mode": mode_context})
+
+ return res
+
+
+if __name__ == "__main__":
+ print(adjust_to_dict('dddd'))
diff --git a/kimi_main2.py b/kimi_main2.py
new file mode 100644
index 0000000..cf97307
--- /dev/null
+++ b/kimi_main2.py
@@ -0,0 +1,383 @@
+import json
+from datetime import datetime
+from openai import OpenAI
+from loguru import logger
+
+
+# logger初始化
+# 获取当前日期,并用作日志文件名的一部分
+log_filename = f"{datetime.now().strftime('%Y-%m-%d')}_run.log"
+
+# 添加日志记录,设置日志文件的旋转时间、日志格式以及日志级别
+logger.add(log_filename, rotation="1 week", format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}", level="INFO")
+
+client = OpenAI(
+ api_key="sk-wFEcyFj8sS2mbncCEqYK5uxYa6VyQyJrQ8reKPFYzxQcwt3i",
+ base_url="https://api.moonshot.cn/v1",
+)
+sessions_with_boss : dict[str,list]={} # 每个session负责记录每个task与gpt交互的历史记录 {session_id/task_id: history}
+
+# 加载背景介绍信息(提示词)
+try:
+ with open('./strategy_adjust_code/content_role_system.txt', 'r', encoding='utf-8') as f:
+ system_prompt = f.read()
+except:
+ logger.error("系统提示词加载失败,请检查提示词文件路径")
+ exit(0)
+
+def chat(query: str, session_id: list):
+ global sessions_with_boss
+ # 检验 session 是否存在
+ if session_id not in sessions_with_boss.keys():
+ # 在别已经有log记录了
+ return None
+
+ # 获得历史记录,处理最多保留10次会话内容
+ history = keep_len_history(history=sessions_with_boss[session_id][0], length=10)
+ history.append({
+ "role": "user",
+ "content": query
+ })
+ # 将提示词(历史记录)发送给GPT,询问结果
+ completion = client.chat.completions.create(
+ model="moonshot-v1-32k",
+ messages=history,
+ response_format={'type': 'json_object'}, # 使用此值需要在提示词中给出json格式
+ # temperature=0.3, # 回答的随机程度0-2,越低越确定(不随机)
+ )
+ # GPT的响应结果,可能有多个,默认使用第一个 choices[0]
+ result = completion.choices[0].message.content
+ # 更新到每个session的历史记录中, 并更新会话存活时间
+ sessions_with_boss[session_id][0].append({
+ "role": "user",
+ "content": query
+ })
+ sessions_with_boss[session_id][0].append({
+ "role": "assistant",
+ "content": result
+ })
+ sessions_with_boss[session_id][1] = datetime.now()
+
+ return result
+
+
+def keep_len_history(history: list[dict], length: int=10):
+ """
+ 保留length长度的历史记录
+ param length: 需要保留的历史记录的长度,一问一答为一个长度,为0则保留所有
+ """
+ res_history = list()
+ # 保留系统提示词
+ for item in history:
+ if item['role'] == 'system':
+ # ?注意,这里是否会出现深浅拷贝的问题呢,查一下?
+ res_history.append(item)
+ # 切片,保留后length的聊天记录
+ res_history.extend(history[-2*min(length, len(history)):])
+ return res_history
+
+
+def mbgz_to_desc(data):
+ """
+ 将目标节点的特点,转换成文本描述,initial()会使用此函数,其它地方未使用
+ """
+ text = []
+
+ # 判断 DNSSEC 启用情况
+ if data.get("dnssec_enabled", False):
+ text.append("(2)开启DNSSEC")
+ else:
+ text.append("(2)未开启DNSSEC")
+
+ # 是否使用 0x20 编码(域名随机化大小写)
+ if data.get("0x20_encoding", False):
+ text.append("(3)不可以使用缓存投毒")
+ else:
+ text.append("(3)可以使用缓存投毒")
+
+ # 判断是否支持DDoS
+ if data.get("merge_dup", False):
+ text.append("(4)不适用DDoS")
+ else:
+ text.append("(4)可使用DDoS")
+
+ # 判断是否适用 DNSSEC_DDoS
+ if data.get("dnssec_enabled", False) and not data.get("random_port", False):
+ text.append("(5)不适用DNSSEC_DDoS")
+ else:
+ text.append("(5)适用DNSSEC_DDoS")
+
+ # 判断是否适用放大攻击
+ if data.get("max_ns_cnt", 0) > 4:
+ text.append("(6)适用放大攻击")
+ else:
+ text.append("(6)不适用放大攻击")
+
+ # 是否开启 DoH 和 DoT
+ if data.get("edns_support", False):
+ text.append("(7)开启了DoH")
+ else:
+ text.append("(7)未开启DoH")
+
+ text.append("(8)开启了DoT") # 假设DoT始终开启
+
+ # IPv6 启用情况
+ text.append("(9)未开启了IPv6功能") # 假设IPv6始终开启
+
+ # 返回拼接好的文本
+ return ";".join(text)
+
+
+def parse_query_content(content: dict):
+ """
+ 解析主控端(boss)发送来的content,转换成GPT需要的描述格式
+ """
+ if content['clxz'] == 'ddos':
+ clxz_desc = "拒绝服务攻击,使用ddos脚本,使得目标DNS服务器不能响应正常DNS请求"
+ elif content['clxz'] == 'sjqp':
+ clxz_desc = "数据欺骗攻击,使用inject或poisoning脚本,使得目标DNS服务器响应错误的数据"
+ elif content['clxz'] == 'auto':
+ clxz_desc = "根据渗透节点的特点,选择 拒绝服务攻击 或 数据欺骗攻击"
+ else:
+ logger.error(f"parameter error: clxz is not ddos or sjqp, but {content['clxz']}")
+ return {'error': f'clxz策略选择只能为ddos/sjqp/auto,收到了{content["clxz"]}'}
+ mbgz_desc = mbgz_to_desc(content['mbgz'])
+ try:
+ ztgz_desc = {
+ "response_rate": f"{float(content['ztgz']['record']) * 100}%",
+ "icmp_rr": f"{content['ztgz']['icmp']}ms",
+ "udp_rr": f"{content['ztgz']['dns']}ms",
+ "tcp_rr": f"{content['ztgz']['tcp']}ms",
+ "pps": '200kbps'
+ }
+ except Exception as e:
+ return {'error': 'ztgz中存在参数错误, {e}'}
+
+ script = content['script']
+ legal_script_name = ['DNSSEC DDoS', 'DNSSEC downgrade', 'DNS IPv6 DDoS', 'DNS IPv6 inject', 'DNS IPv6 poisoning',
+ 'DoT DDoS', 'DoH DDoS', 'DoH poisoning', 'DoT poisoning', 'DoH inject', 'DoT inject']
+ if script not in legal_script_name:
+ logger.error(f'script value error: {script}, we need it is one of {legal_script_name}')
+ return {'error': f'script value error: {script}, we need it is one of {legal_script_name}'}
+ params = content['para']
+ # TODO 检查 params
+ if params:
+ pass
+ # TODO session_id/task_id的检查
+ session_id = content['task_id']
+
+ return {
+ 'clxz': clxz_desc,
+ 'mbgz': mbgz_desc,
+ 'ztgz': ztgz_desc,
+ 'script': script,
+ 'params': params,
+ 'session_id': session_id
+ }
+
+
+def parse_gpt_response(gpt_res_txt: dict, query: dict):
+ """
+ gpt_res_txt格式为:{
+ "next_step": "adjust_param"/"adjust_script"/"keep",
+ "next_script": <推荐执行的脚本名称>
+ "params": {
+ "<参数名>": "<推荐的参数值>",
+ ...
+ },
+ "reason": <做出此判断的依据>
+ }
+ 此函数将gpt的返回格式处理成主控端需要的格式
+ """
+ mp = {
+ "DNSSEC DDoS": "DNSSEC ddos",
+ "DNSSEC downgrade": "DNSSEC 降级",
+ "DNS IPv6 DDoS": "V6 DDoS",
+ "DNS IPv6 inject": "V6 数据注入",
+ "DNS IPv6 poisoning": "V6 数据篡改",
+ "DoT DDoS": "DoT DDoS",
+ "DoT poisoning": "DoT 数据篡改",
+ "DoT inject": "DoT 数据注入",
+ "DoH DDoS": "DoH DDoS",
+ "DoH inject": "DoH 数据注入",
+ "DoH poisoning": "DoH 数据篡改",
+ }
+ gpt_res = json.loads(gpt_res_txt)
+ next_step = gpt_res.get("next_step", None)
+ if not next_step or next_step not in ["keep", "adjust_params", "adjust_script"]:
+ return {'error': 'GPT 返回格式错误'}
+ if next_step == "keep":
+ next_script = query.get("script")
+ new_params = query.get("params")
+ elif next_step == "adjust_params":
+ next_script = query.get("script")
+ new_params = gpt_res.get("params", None)
+ elif next_step == "adjust_script":
+ next_script = gpt_res.get("next_script", None)
+ new_params = gpt_res.get("params", None)
+ else:
+ next_script, new_params = None, None
+ # 如果没有给出下一步脚本
+ if not next_script:
+ return {'error': 'GPT 推荐调整脚本,但未给出具体的脚本名称'}
+ # 如果给出了下一步脚本,但是脚本需要的参数未给出
+ if next_script in ['DNSSEC DDoS', 'DNS IPv6 DDoS', 'DoT DDoS', 'DoH DDoS'] and not new_params:
+ return {'error': 'GPT 推荐调整参数,但未给出具体的值'}
+
+ next_script_desc = mp.get(next_script, None)
+ return {
+ 'mode': next_step,
+ 'script': next_script_desc if next_script_desc else next_script,
+ 'parameter': new_params,
+ 'reason': gpt_res.get('reason', 'GPT未给出原因'),
+ 'error': None
+ }
+
+################# 下面是接口部分 #################
+
+"""
+ 接口的标准输入:
+ type: dict/json
+ format: {
+ "clxz": "ddos"/"sjqp",
+ "mbgz": {
+ ... 太多了省略,和以前一样
+ },
+ "ztgz": {
+ "icmp": <vaule>,
+ "dns": <value>,
+ "tcp": <value>,
+ ... 其它无所谓
+ },
+ "script": <脚本名称>,
+ "para": {
+ ... 脚本参数,为空时推荐填入空字典
+ },
+ "task_id": <任务id ,确保唯一即可>
+ }
+
+ 接口标准返回:
+ type: dict/json
+ format: {
+ "mode": "adjust_params"/"adjust_script"/"keep",
+ "script": <推荐执行的脚本名称>
+ "parameter": {
+ "<参数名>": "<推荐的参数值>",
+ ...
+ },
+ "reason": <做出此判断的依据>,
+ "error": None
+ }
+ 接口出错返回:
+ type: dict/json
+ format: {
+ "mode": "keep",
+ "script": <原脚本>
+ "parameter": {
+ "<参数名>": "<原参数值>",
+ ...
+ },
+ "reason": <出错原因>
+ "error": <出错简述,现在只有"parameter error"/"gpt error">
+ }
+"""
[email protected] # 只有没被处理的异常才会被记录
+def initial(content: dict):
+ """
+ 为一个新的需要大模型辅助的task创建session。session主要记录task和此task与大模型对话的历史记录
+ """
+ global sessions_with_boss
+ # 1. 解析传来的请求
+ content_body = parse_query_content(content=content)
+ if 'error' in content_body.keys():
+ return content_body['error']
+ # 2. 确定最终攻击目标
+
+ # 3. 创建session id
+ new_session_id = content_body['session_id']
+
+ # 4. 创建新的session会话
+ new_history = [
+ {
+ "role": "system",
+ "content": system_prompt % (content_body['clxz'], content_body['mbgz'])
+ },
+ ]
+ active_time = datetime.now() # 类似cookie的有效时间,超时此会话可能会被删除
+ sessions_with_boss.update({new_session_id: [new_history, active_time]})
+ logger.info(f"为task: {new_session_id}创建了新的session")
+
+ try:
+ # 4. 与GPT对话(第一次提问)(似乎不需要)
+ # query = {
+ # "script": content_body['script'],
+ # "params": content_body['params'],
+ # "target_status": content_body['ztgz'],
+ # "last_exe_desc": "无"
+ # }
+ # chat(query=json.dumps(query), session_id=new_session_id)
+ return 1
+ except Exception as e:
+ return -1
+
[email protected] # 只有没被处理的异常才会被记录
+def adjusting(content: dict):
+ """
+ 调整策略
+ """
+ global sessions_with_boss
+ # 1. 解析传来的请求
+ content_body = parse_query_content(content=content)
+ if 'error' in content_body.keys():
+ return {
+ "mode": "keep",
+ "script": None,
+ "parameter": None,
+ 'error': 'parameter error',
+ "reason": content['error']
+ }
+ curr_session_id = content_body['session_id']
+ # 2. 与GPT对话
+ query = {
+ "script": content_body['script'],
+ "params": content_body['params'],
+ "target_status": content_body['ztgz'],
+ "last_exe_desc": "无"
+ }
+ try:
+ gpt_response = chat(query=json.dumps(query), session_id=curr_session_id)
+ if gpt_response:
+ # 3. 处理GPT返回的结果
+ res = parse_gpt_response(gpt_res_txt=gpt_response, query=query)
+ if 'error' in res.keys() and res['error']:
+ logger.error(f"请求gpt时出现错误: {res['error']}")
+ return {
+ "mode": "keep",
+ "script": query.get('script'),
+ "parameter": query.get('params', {}),
+ 'error': 'gpt error',
+ "reason": res['error']
+ }
+ else:
+ logger.info(f"当前节点状态为:\n" +
+ f"{content_body['ztgz']} \n" +
+ f"策略调整,script: {query['script']}, params: {query['params']} --> script: {res['script']}, params: {res['parameter']}. reason: {res['reason']}")
+ return res
+ else:
+ logger.error(f"session id: {curr_session_id} not exist, please call initial-api first.")
+ return {
+ "mode": "keep",
+ "script": query.get('script'),
+ "parameter": query.get('params', {}),
+ 'error': 'parameter error',
+ "reason": "调用initial接口传入了一个无效的session_id/task_id, 你应该先调用initial接口"
+ }
+ except Exception as e:
+ logger.error(f"请求gpt时出现错误: {e}")
+ return {
+ "mode": "keep",
+ "script": query.get('script', ''),
+ "parameter": query.get('params', {}),
+ 'error': 'gpt error',
+ "reason": "调用gpt时出现错误,可能时Token消耗完了"
+ }
diff --git a/kimi_strategy.py b/kimi_strategy.py
new file mode 100644
index 0000000..8c5be16
--- /dev/null
+++ b/kimi_strategy.py
@@ -0,0 +1,31 @@
+# 执行流程
+# 1. 接收初始化指令,获得参数包括策略选择<拒绝服务;数据欺骗>;目标感知结果;状态感知结果;初始的攻击脚本及参数 --> 初始化接口调用
+# 1.1 创建会话
+# 1.2 发送第一条信息,介绍任务背景信息(与下面的合并)
+# 1.3 发送第一条信息,格式化上述参数,并发送当前任务介绍
+# 2. 接收询问策略调整命令,获得参数包括状态感知结果 --> 询问策略调整接口
+
+from flask import Flask, request, jsonify
+import kimi_main2
+
+app = Flask(__name__)
+
+
[email protected]('/initial', methods=['POST'])
+def process_initial():
+ data = request.get_json()
+ if kimi_main2.initial(data):
+ return jsonify({"message": "Task processed"}), 200
+ else:
+ return jsonify({"error": "Service is currently unavailable"}), 503
+
+
[email protected]('/adjustment', methods=['POST'])
+def process_adjustment():
+ data = request.get_json()
+ kimi_response = kimi_main2.adjusting(data)
+ return jsonify(kimi_response), 200
+
+
+if __name__ == '__main__':
+ app.run(port=5006) \ No newline at end of file