diff options
| author | shihaoyue <[email protected]> | 2024-10-21 16:53:10 +0800 |
|---|---|---|
| committer | shihaoyue <[email protected]> | 2024-10-21 16:57:50 +0800 |
| commit | 97715cc22c0aea492897d01bdc0441e08dc1e79f (patch) | |
| tree | ef9356b9e6a2227d1f6a9a7fff63d058044eade1 | |
| parent | 6af369b1f4342aba8080fe40c74733cf3ba04f6d (diff) | |
first commit
| -rw-r--r-- | .gitkeep | 0 | ||||
| -rw-r--r-- | __pycache__/kimi_main.cpython-311.pyc | bin | 0 -> 13830 bytes | |||
| -rw-r--r-- | __pycache__/kimi_main2.cpython-311.pyc | bin | 0 -> 14168 bytes | |||
| -rw-r--r-- | content_role_system.txt | 106 | ||||
| -rw-r--r-- | kimi_main.py | 313 | ||||
| -rw-r--r-- | kimi_main2.py | 383 | ||||
| -rw-r--r-- | kimi_strategy.py | 31 |
7 files changed, 833 insertions, 0 deletions
diff --git a/.gitkeep b/.gitkeep new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/.gitkeep diff --git a/__pycache__/kimi_main.cpython-311.pyc b/__pycache__/kimi_main.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..c858a2d --- /dev/null +++ b/__pycache__/kimi_main.cpython-311.pyc diff --git a/__pycache__/kimi_main2.cpython-311.pyc b/__pycache__/kimi_main2.cpython-311.pyc Binary files differnew file mode 100644 index 0000000..b0ee635 --- /dev/null +++ b/__pycache__/kimi_main2.cpython-311.pyc diff --git a/content_role_system.txt b/content_role_system.txt new file mode 100644 index 0000000..1a9c54b --- /dev/null +++ b/content_role_system.txt @@ -0,0 +1,106 @@ +为了响应国家要求对网络基础资源设施的安全能力建设,我需要构建一套面向DNS体系的自动化渗透测试系统,我希望借助你的能力来实现自动化策略调整的部分内容。在这项任务中,你的身份是一位DNS方向的安全技术专家,擅长网络分析和渗透测试及结果分析。我已经实现了11个具有针对性的渗透测试脚本,在最后会给你详细介绍他们的功能。 +你需要根据任务最终目标和目标DNS服务器的当前状态,挑选合适的脚本,可能还要给出对应参数值,我来执行实际的脚本。每次我将告诉你当前针对目标DNS节点执行的脚本信息和目标DNS节点状态,你可以根据这些状态信息来判断渗透脚本是否有效,以及是否需要调整参数等等。 +现在让我们来定义一下策略调整部分的整体宏观思路:1. 尽可能完成渗透测试的最终目的及预期效果;2. 在未达成目的之前确保所有合适的脚本都尝试一遍,若判断当前状态无法完成渗透测试最终目的则主动结束策略调整;3. 尽可能用较少的策略调整步骤实现最终目的;4. 考虑各种脚本及脚本参数设置时的实际影响,不设置明显不合理的参数;5. 按照规定的流程执行,不需要输出多余的响应;6. 按照整体目标描述与脚本介绍中基本功能中的描述,选择合适的拒绝服务或安全性降低两种类型中的一种;7. 若攻击目的是拒绝服务,则你应该通过调整攻击策略让目标节点服务应答率降低。 +总共有两种渗透目标:1. 拒绝服务,使用ddos脚本,使得目标DNS服务器不能响应正常DNS请求;2. 数据欺骗,使用inject或poisoning脚本,使得目标DNS服务器响应错误的数据。 +当前渗透目标节点的特点如下:%s +当前任务目标:%s +每次我会向你发送当前执行的脚本名称,以及目标节点的状态,你告诉我是否需要调整,调整理由。 +我的输入为json格式:{ + "script": <当前执行的脚本名称>, + "params": { + "<当前参数名>": "<当前参数值>", + ... + }, + "target_status": { + "response_rate": <目标节点的服务应答率>, + "icmp_rr": <目标节点的icmp响应时延>, + "udp_rr": <目标节点的udp响应时延>, + "tcp_rr": <目标节点的tcp响应时延>, + "pps": <无实际意义,可以忽略> + }, + "last_exe_desc": <上次执行结果的描述,此处为空时,你需要根据目标节点状态判断> +} +你的返回为json格式:{ + "next_step": "adjust_params"/"adjust_script"/"keep", + "next_script": <推荐执行的脚本名称> + "params": { + "<参数名>": "<推荐的参数值>", + ... + }, + "reason": <做出此判断的依据> +} +最后,我将告诉你目前我已经有的11个可选的渗透测试脚本,如下: +脚本1: +- 名称:DNSSEC DDoS +- 脚本限制:支持DNSSEC功能的DNS解析器可使用此脚本 +- 脚本功能:使得目标节点拒绝服务。通过向目标域名解析服务器发送特定请求,使目标服务器服务质量下降或无法提供服务,形成拒绝服务攻击 +- 脚本参数:{"r": 发送请求速率(默认为300), "n": 发送请求总量(默认为60000)} + +脚本2: +- 名称:DNSSEC downgrade +- 脚本限制:开启DNSSEC功能的DNS解析器可使用此脚本 +- 脚本功能:使目标DNS解析器不对DNSSEC记录进行验证,若目标DNS解析器被降级成功,则可以使用其它DDoS脚本 +- 脚本可调参数:无 + +脚本3: +- 名称:DNS IPv6 DDoS +- 脚本限制:开启IPv6地址的DNS解析器可使用此脚本 +- 脚本功能:构造数据包,发送给目标IPv6 DNS解析器,使其拒绝服务。 +- 脚本可调参数:{"n", 发送数据包总量} + +脚本4: +- 名称:DNS IPv6 inject +- 脚本限制:开启IPv6地址的DNS解析器可使用此脚本 +- 脚本功能:DNS注入,向目标解析器注入一条目标域名的伪造AAAA记录 +- 脚本可调参数:无 +- 执行后状态集合:{注入成功、注入失败、 脚本执行中} + +脚本5: +- 名称:DNS IPv6 poisoning +- 脚本限制:开启IPv6地址的DNS解析器可使用此脚本 +- 脚本功能:DNSv6篡改,将目标解析器中目标域名的AAAA记录篡改成伪造的地址 +- 脚本可调参数:无 +- 执行后状态集合:{篡改成功、篡改失败、脚本执行中} + +脚本6: +- 名称:DoT DDoS +- 脚本限制:开启DoT功能的DNS解析器可使用此脚本 +- 脚本功能:使得目标DoT解析器拒绝服务,若脚本执行成功则目标响应率应该很低且响应时延很高 +- 脚本可调参数:{"n": 使用进程数(默认为4), "r": 每个进程发送请求次数(默认为5),"w": 脉冲等待时长(默认为300秒)} +- 执行后状态集合:{实时目标状态感知结果、脚本执行中} + +脚本7: +- 名称:DoH DDoS +- 脚本限制:开启DoH功能的DNS解析器可使用此脚本 +- 脚本功能:使得目标DoH解析器拒绝服务(利用HTTP2快速重置),若脚本执行成功则目标响应率应该很低且响应时延很高 +- 脚本可调参数:{"n": 使用进程数(默认为4)} +- 执行后状态集合:{实时目标状态感知结果、脚本执行中} + +脚本8: +- 名称:DoH poisoning +- 脚本限制:开启DoH功能的DNS解析器可使用此脚本 +- 脚本功能:(安全性降低)使目标DoH解析器的响应结果被篡改 +- 脚本可调参数:无 +- 执行后状态集合:{篡改成功、篡改失败、脚本执行中} + +脚本9: +- 名称:DoT poisoning +- 脚本限制:开启DoT功能的DNS解析器可使用此脚本 +- 脚本功能:(安全性降低)使目标DoT解析器的响应结果被篡改 +- 脚本可调参数:无 +- 执行后状态集合:{篡改成功、篡改失败、脚本执行中} + +脚本10: +- 名称:DoH inject +- 脚本限制:开启DoH功能的DNS解析器可使用此脚本 +- 脚本功能:(安全性降低)向目标解析器注入一条目标域名的伪造记录 +- 脚本可调参数:无 +- 执行后状态集合:{注入成功、注入失败、脚本执行中、脚本执行失败} + +脚本11: +- 名称:DoT inject +- 脚本限制:开启DoT功能的DNS解析器可使用此脚本 +- 脚本功能:(安全性降低)向目标解析器注入一条目标域名的伪造记录 +- 脚本可调参数:无 +- 执行后状态集合:{注入成功、注入失败、脚本执行中、脚本执行失败} + diff --git a/kimi_main.py b/kimi_main.py new file mode 100644 index 0000000..3fef6d8 --- /dev/null +++ b/kimi_main.py @@ -0,0 +1,313 @@ +from openai import OpenAI + +def convert_json_to_text(data): + text = [] + + # 判断 DNSSEC 启用情况 + if data.get("dnssec_enabled", False): + text.append("(2)开启DNSSEC") + else: + text.append("(2)未开启DNSSEC") + + # 是否使用 0x20 编码(域名随机化大小写) + if data.get("0x20_encoding", False): + text.append("(3)不可以使用缓存投毒") + else: + text.append("(3)可以使用缓存投毒") + + # 判断是否支持DDoS + if data.get("merge_dup", False): + text.append("(4)不适用DDoS") + else: + text.append("(4)可使用DDoS") + + # 判断是否适用 DNSSEC_DDoS + if data.get("dnssec_enabled", False) and not data.get("random_port", False): + text.append("(5)不适用DNSSEC_DDoS") + else: + text.append("(5)适用DNSSEC_DDoS") + + # 判断是否适用放大攻击 + if data.get("max_ns_cnt", 0) > 4: + text.append("(6)适用放大攻击") + else: + text.append("(6)不适用放大攻击") + + # 是否开启 DoH 和 DoT + if data.get("edns_support", False): + text.append("(7)开启了DoH") + else: + text.append("(7)未开启DoH") + + text.append("(8)开启了DoT") # 假设DoT始终开启 + + # IPv6 启用情况 + text.append("(9)开启了IPv6功能") # 假设IPv6始终开启 + + # 返回拼接好的文本 + return ";".join(text) + + +client = OpenAI( + api_key="sk-wFEcyFj8sS2mbncCEqYK5uxYa6VyQyJrQ8reKPFYzxQcwt3i", + base_url="https://api.moonshot.cn/v1", +) + +# 加载背景介绍信息 +with open('./content_role_system.txt', 'r', encoding='utf-8') as f: + content_system = f.read() + +output_constraints = "请按下面规定的格式输出策略调整结果,从下面的集合中选择一种决策输出(不需要其他任何文字反馈,我需要处理格式化输出的内容):{保持当前策略执行、调整脚本参数:参数代称 原始的数值->新的数值、更换攻击脚本:脚本名称}" + +mode_mapping = {"保持当前策略执行": "keep", + "调整脚本参数": "parameter", + "更换攻击脚本": "script"} + +history = [ + {"role": "system", + "content": content_system} +] + + +def chat(query, history): + history.append({ + "role": "user", + "content": query + }) + completion = client.chat.completions.create( + model="moonshot-v1-32k", + messages=history, + temperature=0.3, + ) + result = completion.choices[0].message.content + history.append({ + "role": "assistant", + "content": result + }) + return result + + +def initial(content): + global history + history = [ + {"role": "system", + "content": content_system} + ] + + clxz_map = { + "ddos": "拒绝服务", + "sjqp": "数据欺骗" + } + + try: + clxz = clxz_map.get(content["clxz"], None) # 策略选择 + # _mbgz = # 目标感知 + mbgz = convert_json_to_text(content["mbgz"]) + ztgz = content["ztgz"] # 状态感知 + _script = content["script"] # 脚本 + _param = content["para"] # 传来的脚本参数是字典形式的 + param = "" # 将参数转换成字符串 + for p, v in _param.items(): + if param == "": + param += f"{p}-{v}" + else: + param += f"-{p}-{v}" + script = f"{_script.replace(' ', '_')};{_script.replace(' ', '_').lower()}_{param}" + except Exception as e: + raise Exception('post上传的数据key解析失败', e) + + # 构建待上传ai的策略初始化信息 + assert clxz in ["拒绝服务", "数据欺骗"], "策略选择只能是'拒绝服务'或者'数据欺骗'" + clxz_context = f"1. 测试的最终目的是:{clxz}" + + mbgz_context = f"2. 当前目标节点的18维不变特性是:{mbgz}" + + script = script.replace(';', ';').split(';') + script_context = f"3. 当前执行的脚本名称为{script[0]};" + ("脚本参数选择为" + ";".join(script[1:]) if len(script) > 1 else "") + + ztgz_context = f"4. 当前(未执行脚本前)目标节点的状态参数为:DNS服务应答率{ztgz['record']*100}%;icmp时延{ztgz['icmp']}ms;udp时延{ztgz['dns']}ms;tcp时延{ztgz['tcp']}ms;pps为200kpps" + + context = '\n'.join([clxz_context, mbgz_context, script_context, ztgz_context]) + + # 初始化交互不需要输出响应 + chat(context, history) + return 1 + + +def adjust_to_dict(adjust: str): + """ + 将大模型返回的script字符串转换成字典,如 doh_ddos-n-100-200-abc-1000-f, 将被转换为 + { + "script": "doh_ddos", (此处doh_ddos也可能被映射成其它的,具体跟mp中的映射关系有关) + "parameter": { + "n": ["100", "200"], + "abc": ["1000"], + "f": [] + } + } + """ + print(adjust) + if adjust == "none": + return { + "script": "none", + "parameter": { + }, + } + # 这个字典负责将大模型返回的策略名称映射到规定的名称 + mp = { + "v6_数据篡改": "V6 数据篡改", + "<v6_数据篡改>": "V6 数据篡改", + "v6_poisoning": "V6 数据篡改", + "<v6_poisoning>": "V6 数据篡改", + "dnssec_ddos": "DNSSEC ddos", + "<dnssec_ddos>": "DNSSEC ddos", + "DNSSEC_DDoS": "DNSSEC ddos", + "<DNSSEC_DDoS>": "DNSSEC ddos", + "dnssec_降级": "DNSSEC 降级", + "<dnssec_降级>": "DNSSEC 降级", + "DNSSEC_downgrade": "DNSSEC 降级", + "<DNSSEC_downgrade>": "DNSSEC 降级", + "dnssec_downgrade": "DNSSEC 降级", + "<dnssec_downgrade>": "DNSSEC 降级", + "DNS_IPv6_DDoS": "V6 DDoS", + "<DNS_IPv6_DDoS>": "V6 DDoS", + "ipv6_ddos": "V6 DDoS", + "<ipv6_ddos>": "V6 DDoS", + "v6_ddos": "V6 DDoS", + "<v6_ddos>": "V6 DDoS", + "DNS_IPv6_inject": "V6 数据篡改", + "<DNS_IPv6_inject>": "V6 数据篡改", + "v6_数据篡改": "V6 数据篡改", + "<v6_数据篡改>": "V6 数据篡改", + "dns_ipv6_inject": "V6 数据篡改", + "<dns_ipv6_inject>": "V6 数据篡改", + "DNS_IPv6_poisoning": "V6 数据篡改", + "<DNS_IPv6_poisoning>": "V6 数据篡改", + "dot_ddos": "DoT DDoS", + "<dot_ddos>": "DoT DDoS", + "DoT_DDoS": "DoT DDoS", + "<DoT_DDoS>": "DoT DDoS", + "dot_poisoning": "DoT 数据篡改", + "<dot_poisoning>": "DoT 数据篡改", + "DoT_poisoning": "DoT 数据篡改", + "<DoT_poisoning>": "DoT 数据篡改", + "dot_数据篡改": "DoT 数据篡改", + "<dot_数据篡改>": "DoT 数据篡改", + "dot_数据注入": "DoT 数据注入", + "<dot_数据注入>": "DoT 数据注入", + "dot_inject": "DoT 数据注入", + "<dot_inject>": "DoT 数据注入", + "DoT_inject": "DoT 数据注入", + "<DoT_inject>": "DoT 数据注入", + "DoH_DDoS": "DoH DDoS", + "<DoH_DDoS>": "DoH DDoS", + "doh_ddos": "DoH DDoS", + "<doh_ddos>": "DoH DDoS", + "doh_数据注入": "DoH 数据注入", + "<doh_数据注入>": "DoH 数据注入", + "doh_inject": "DoH 数据注入", + "<doh_inject>": "DoH 数据注入", + "DoH_inject": "DoH 数据注入", + "<DoH_inject>": "DoH 数据注入", + "doh_数据篡改": "DoH 数据篡改", + "<doh_数据篡改>": "DoH 数据篡改", + "doh_poisoning": "DoH 数据篡改", + "<doh_poisoning>": "DoH 数据篡改", + "DoH_poisoning": "DoH 数据篡改", + "<DoH_poisoning>": "DoH 数据篡改" + } + adjust = adjust.replace("<", "") + adjust = adjust.replace(">", "") + index = 0 + for i in range(len(adjust)): + if adjust[i] == '_': + index = i + if adjust[i] == '-': + index = i + break + if '-' not in adjust: + script = adjust + param_list = [] + else: + script = adjust[:index] + param_list = adjust[index+1:].split('-') + + res_dict = {} + left, right = 0, 0 + def is_parma_value(s): + """ + 这个函数只会在下面的while中使用,作用是判断一个字符串,是数值串还是字符串 + """ + try: + int(s) + except: + return False + return True + while len(param_list): + right += 1 + while right < len(param_list) and is_parma_value(param_list[right]): + right += 1 + curr_param = param_list[left] + res_dict[curr_param] = [] + left += 1 + while left < right: + res_dict[curr_param].append(int(param_list[left])) + left += 1 + # 将数组处理成一个数值,如果脚本参数存在一参对多值的情况如 -n 1 2 3 4, 则删除此部分 + if len(res_dict[curr_param]): + res_dict[curr_param] = sum(res_dict[curr_param])//len(res_dict[curr_param]) + else: + res_dict[curr_param] = None + if left >= len(param_list): + break + + try: + return { + "script": mp[script], + "parameter": res_dict + } + except: + return { + "script": script, + "parameter": res_dict + } + # raise f"在将大模型返回的脚本字符串转换成字典时出现错误,原因是 mp 中并没有键: {script}" + + +def adjusting(content): + try: + ztgz = content["ztgz"] # 状态感知 + except Exception as e: + raise Exception("post上传的数据key解析失败", e) + + context = f"当前目标节点的状态感知结果为:DNS服务应答率{ztgz['record']*100}%;icmp时延{ztgz['icmp']}ms;udp时延{ztgz['dns']}ms;tcp时延{ztgz['tcp']}ms;pps为200kpps\n{output_constraints}" + + response = chat(context, history) + + mode = response.replace(":", ":").split(":")[0] + assert mode in ["保持当前策略执行", "调整脚本参数", "更换攻击脚本"], f"响应出现非标情况,当前模式响应为{mode}" + mode_context = mode_mapping[mode] + + try: + if mode_context == "keep": + adjust_context = "none" + elif mode_context == "parameter": + # 例如:模型响应:调整脚本参数:doh_ddos_n 4->8 输出adjust参数为doh_ddos_n-8 + para = response.replace(":", ":").split(":")[1] + adjust_context = "-".join([para.split(" ")[0], para.split(">")[-1]]) + else: + # 例如:模型响应:更换攻击脚本:DoT_DDoS 输出adjust参数为DoT_DDoS + adjust_context = response.replace(":", ":").split(":")[1] + except Exception as e: + raise Exception("处理模型规范化输出错误", e) + + # # debug + # print(history) + res = adjust_to_dict(adjust_context) + res.update({"mode": mode_context}) + + return res + + +if __name__ == "__main__": + print(adjust_to_dict('dddd')) diff --git a/kimi_main2.py b/kimi_main2.py new file mode 100644 index 0000000..cf97307 --- /dev/null +++ b/kimi_main2.py @@ -0,0 +1,383 @@ +import json +from datetime import datetime +from openai import OpenAI +from loguru import logger + + +# logger初始化 +# 获取当前日期,并用作日志文件名的一部分 +log_filename = f"{datetime.now().strftime('%Y-%m-%d')}_run.log" + +# 添加日志记录,设置日志文件的旋转时间、日志格式以及日志级别 +logger.add(log_filename, rotation="1 week", format="{time:YYYY-MM-DD HH:mm:ss} - {level} - {message}", level="INFO") + +client = OpenAI( + api_key="sk-wFEcyFj8sS2mbncCEqYK5uxYa6VyQyJrQ8reKPFYzxQcwt3i", + base_url="https://api.moonshot.cn/v1", +) +sessions_with_boss : dict[str,list]={} # 每个session负责记录每个task与gpt交互的历史记录 {session_id/task_id: history} + +# 加载背景介绍信息(提示词) +try: + with open('./strategy_adjust_code/content_role_system.txt', 'r', encoding='utf-8') as f: + system_prompt = f.read() +except: + logger.error("系统提示词加载失败,请检查提示词文件路径") + exit(0) + +def chat(query: str, session_id: list): + global sessions_with_boss + # 检验 session 是否存在 + if session_id not in sessions_with_boss.keys(): + # 在别已经有log记录了 + return None + + # 获得历史记录,处理最多保留10次会话内容 + history = keep_len_history(history=sessions_with_boss[session_id][0], length=10) + history.append({ + "role": "user", + "content": query + }) + # 将提示词(历史记录)发送给GPT,询问结果 + completion = client.chat.completions.create( + model="moonshot-v1-32k", + messages=history, + response_format={'type': 'json_object'}, # 使用此值需要在提示词中给出json格式 + # temperature=0.3, # 回答的随机程度0-2,越低越确定(不随机) + ) + # GPT的响应结果,可能有多个,默认使用第一个 choices[0] + result = completion.choices[0].message.content + # 更新到每个session的历史记录中, 并更新会话存活时间 + sessions_with_boss[session_id][0].append({ + "role": "user", + "content": query + }) + sessions_with_boss[session_id][0].append({ + "role": "assistant", + "content": result + }) + sessions_with_boss[session_id][1] = datetime.now() + + return result + + +def keep_len_history(history: list[dict], length: int=10): + """ + 保留length长度的历史记录 + param length: 需要保留的历史记录的长度,一问一答为一个长度,为0则保留所有 + """ + res_history = list() + # 保留系统提示词 + for item in history: + if item['role'] == 'system': + # ?注意,这里是否会出现深浅拷贝的问题呢,查一下? + res_history.append(item) + # 切片,保留后length的聊天记录 + res_history.extend(history[-2*min(length, len(history)):]) + return res_history + + +def mbgz_to_desc(data): + """ + 将目标节点的特点,转换成文本描述,initial()会使用此函数,其它地方未使用 + """ + text = [] + + # 判断 DNSSEC 启用情况 + if data.get("dnssec_enabled", False): + text.append("(2)开启DNSSEC") + else: + text.append("(2)未开启DNSSEC") + + # 是否使用 0x20 编码(域名随机化大小写) + if data.get("0x20_encoding", False): + text.append("(3)不可以使用缓存投毒") + else: + text.append("(3)可以使用缓存投毒") + + # 判断是否支持DDoS + if data.get("merge_dup", False): + text.append("(4)不适用DDoS") + else: + text.append("(4)可使用DDoS") + + # 判断是否适用 DNSSEC_DDoS + if data.get("dnssec_enabled", False) and not data.get("random_port", False): + text.append("(5)不适用DNSSEC_DDoS") + else: + text.append("(5)适用DNSSEC_DDoS") + + # 判断是否适用放大攻击 + if data.get("max_ns_cnt", 0) > 4: + text.append("(6)适用放大攻击") + else: + text.append("(6)不适用放大攻击") + + # 是否开启 DoH 和 DoT + if data.get("edns_support", False): + text.append("(7)开启了DoH") + else: + text.append("(7)未开启DoH") + + text.append("(8)开启了DoT") # 假设DoT始终开启 + + # IPv6 启用情况 + text.append("(9)未开启了IPv6功能") # 假设IPv6始终开启 + + # 返回拼接好的文本 + return ";".join(text) + + +def parse_query_content(content: dict): + """ + 解析主控端(boss)发送来的content,转换成GPT需要的描述格式 + """ + if content['clxz'] == 'ddos': + clxz_desc = "拒绝服务攻击,使用ddos脚本,使得目标DNS服务器不能响应正常DNS请求" + elif content['clxz'] == 'sjqp': + clxz_desc = "数据欺骗攻击,使用inject或poisoning脚本,使得目标DNS服务器响应错误的数据" + elif content['clxz'] == 'auto': + clxz_desc = "根据渗透节点的特点,选择 拒绝服务攻击 或 数据欺骗攻击" + else: + logger.error(f"parameter error: clxz is not ddos or sjqp, but {content['clxz']}") + return {'error': f'clxz策略选择只能为ddos/sjqp/auto,收到了{content["clxz"]}'} + mbgz_desc = mbgz_to_desc(content['mbgz']) + try: + ztgz_desc = { + "response_rate": f"{float(content['ztgz']['record']) * 100}%", + "icmp_rr": f"{content['ztgz']['icmp']}ms", + "udp_rr": f"{content['ztgz']['dns']}ms", + "tcp_rr": f"{content['ztgz']['tcp']}ms", + "pps": '200kbps' + } + except Exception as e: + return {'error': 'ztgz中存在参数错误, {e}'} + + script = content['script'] + legal_script_name = ['DNSSEC DDoS', 'DNSSEC downgrade', 'DNS IPv6 DDoS', 'DNS IPv6 inject', 'DNS IPv6 poisoning', + 'DoT DDoS', 'DoH DDoS', 'DoH poisoning', 'DoT poisoning', 'DoH inject', 'DoT inject'] + if script not in legal_script_name: + logger.error(f'script value error: {script}, we need it is one of {legal_script_name}') + return {'error': f'script value error: {script}, we need it is one of {legal_script_name}'} + params = content['para'] + # TODO 检查 params + if params: + pass + # TODO session_id/task_id的检查 + session_id = content['task_id'] + + return { + 'clxz': clxz_desc, + 'mbgz': mbgz_desc, + 'ztgz': ztgz_desc, + 'script': script, + 'params': params, + 'session_id': session_id + } + + +def parse_gpt_response(gpt_res_txt: dict, query: dict): + """ + gpt_res_txt格式为:{ + "next_step": "adjust_param"/"adjust_script"/"keep", + "next_script": <推荐执行的脚本名称> + "params": { + "<参数名>": "<推荐的参数值>", + ... + }, + "reason": <做出此判断的依据> + } + 此函数将gpt的返回格式处理成主控端需要的格式 + """ + mp = { + "DNSSEC DDoS": "DNSSEC ddos", + "DNSSEC downgrade": "DNSSEC 降级", + "DNS IPv6 DDoS": "V6 DDoS", + "DNS IPv6 inject": "V6 数据注入", + "DNS IPv6 poisoning": "V6 数据篡改", + "DoT DDoS": "DoT DDoS", + "DoT poisoning": "DoT 数据篡改", + "DoT inject": "DoT 数据注入", + "DoH DDoS": "DoH DDoS", + "DoH inject": "DoH 数据注入", + "DoH poisoning": "DoH 数据篡改", + } + gpt_res = json.loads(gpt_res_txt) + next_step = gpt_res.get("next_step", None) + if not next_step or next_step not in ["keep", "adjust_params", "adjust_script"]: + return {'error': 'GPT 返回格式错误'} + if next_step == "keep": + next_script = query.get("script") + new_params = query.get("params") + elif next_step == "adjust_params": + next_script = query.get("script") + new_params = gpt_res.get("params", None) + elif next_step == "adjust_script": + next_script = gpt_res.get("next_script", None) + new_params = gpt_res.get("params", None) + else: + next_script, new_params = None, None + # 如果没有给出下一步脚本 + if not next_script: + return {'error': 'GPT 推荐调整脚本,但未给出具体的脚本名称'} + # 如果给出了下一步脚本,但是脚本需要的参数未给出 + if next_script in ['DNSSEC DDoS', 'DNS IPv6 DDoS', 'DoT DDoS', 'DoH DDoS'] and not new_params: + return {'error': 'GPT 推荐调整参数,但未给出具体的值'} + + next_script_desc = mp.get(next_script, None) + return { + 'mode': next_step, + 'script': next_script_desc if next_script_desc else next_script, + 'parameter': new_params, + 'reason': gpt_res.get('reason', 'GPT未给出原因'), + 'error': None + } + +################# 下面是接口部分 ################# + +""" + 接口的标准输入: + type: dict/json + format: { + "clxz": "ddos"/"sjqp", + "mbgz": { + ... 太多了省略,和以前一样 + }, + "ztgz": { + "icmp": <vaule>, + "dns": <value>, + "tcp": <value>, + ... 其它无所谓 + }, + "script": <脚本名称>, + "para": { + ... 脚本参数,为空时推荐填入空字典 + }, + "task_id": <任务id ,确保唯一即可> + } + + 接口标准返回: + type: dict/json + format: { + "mode": "adjust_params"/"adjust_script"/"keep", + "script": <推荐执行的脚本名称> + "parameter": { + "<参数名>": "<推荐的参数值>", + ... + }, + "reason": <做出此判断的依据>, + "error": None + } + 接口出错返回: + type: dict/json + format: { + "mode": "keep", + "script": <原脚本> + "parameter": { + "<参数名>": "<原参数值>", + ... + }, + "reason": <出错原因> + "error": <出错简述,现在只有"parameter error"/"gpt error"> + } +""" [email protected] # 只有没被处理的异常才会被记录 +def initial(content: dict): + """ + 为一个新的需要大模型辅助的task创建session。session主要记录task和此task与大模型对话的历史记录 + """ + global sessions_with_boss + # 1. 解析传来的请求 + content_body = parse_query_content(content=content) + if 'error' in content_body.keys(): + return content_body['error'] + # 2. 确定最终攻击目标 + + # 3. 创建session id + new_session_id = content_body['session_id'] + + # 4. 创建新的session会话 + new_history = [ + { + "role": "system", + "content": system_prompt % (content_body['clxz'], content_body['mbgz']) + }, + ] + active_time = datetime.now() # 类似cookie的有效时间,超时此会话可能会被删除 + sessions_with_boss.update({new_session_id: [new_history, active_time]}) + logger.info(f"为task: {new_session_id}创建了新的session") + + try: + # 4. 与GPT对话(第一次提问)(似乎不需要) + # query = { + # "script": content_body['script'], + # "params": content_body['params'], + # "target_status": content_body['ztgz'], + # "last_exe_desc": "无" + # } + # chat(query=json.dumps(query), session_id=new_session_id) + return 1 + except Exception as e: + return -1 + [email protected] # 只有没被处理的异常才会被记录 +def adjusting(content: dict): + """ + 调整策略 + """ + global sessions_with_boss + # 1. 解析传来的请求 + content_body = parse_query_content(content=content) + if 'error' in content_body.keys(): + return { + "mode": "keep", + "script": None, + "parameter": None, + 'error': 'parameter error', + "reason": content['error'] + } + curr_session_id = content_body['session_id'] + # 2. 与GPT对话 + query = { + "script": content_body['script'], + "params": content_body['params'], + "target_status": content_body['ztgz'], + "last_exe_desc": "无" + } + try: + gpt_response = chat(query=json.dumps(query), session_id=curr_session_id) + if gpt_response: + # 3. 处理GPT返回的结果 + res = parse_gpt_response(gpt_res_txt=gpt_response, query=query) + if 'error' in res.keys() and res['error']: + logger.error(f"请求gpt时出现错误: {res['error']}") + return { + "mode": "keep", + "script": query.get('script'), + "parameter": query.get('params', {}), + 'error': 'gpt error', + "reason": res['error'] + } + else: + logger.info(f"当前节点状态为:\n" + + f"{content_body['ztgz']} \n" + + f"策略调整,script: {query['script']}, params: {query['params']} --> script: {res['script']}, params: {res['parameter']}. reason: {res['reason']}") + return res + else: + logger.error(f"session id: {curr_session_id} not exist, please call initial-api first.") + return { + "mode": "keep", + "script": query.get('script'), + "parameter": query.get('params', {}), + 'error': 'parameter error', + "reason": "调用initial接口传入了一个无效的session_id/task_id, 你应该先调用initial接口" + } + except Exception as e: + logger.error(f"请求gpt时出现错误: {e}") + return { + "mode": "keep", + "script": query.get('script', ''), + "parameter": query.get('params', {}), + 'error': 'gpt error', + "reason": "调用gpt时出现错误,可能时Token消耗完了" + } diff --git a/kimi_strategy.py b/kimi_strategy.py new file mode 100644 index 0000000..8c5be16 --- /dev/null +++ b/kimi_strategy.py @@ -0,0 +1,31 @@ +# 执行流程 +# 1. 接收初始化指令,获得参数包括策略选择<拒绝服务;数据欺骗>;目标感知结果;状态感知结果;初始的攻击脚本及参数 --> 初始化接口调用 +# 1.1 创建会话 +# 1.2 发送第一条信息,介绍任务背景信息(与下面的合并) +# 1.3 发送第一条信息,格式化上述参数,并发送当前任务介绍 +# 2. 接收询问策略调整命令,获得参数包括状态感知结果 --> 询问策略调整接口 + +from flask import Flask, request, jsonify +import kimi_main2 + +app = Flask(__name__) + + [email protected]('/initial', methods=['POST']) +def process_initial(): + data = request.get_json() + if kimi_main2.initial(data): + return jsonify({"message": "Task processed"}), 200 + else: + return jsonify({"error": "Service is currently unavailable"}), 503 + + [email protected]('/adjustment', methods=['POST']) +def process_adjustment(): + data = request.get_json() + kimi_response = kimi_main2.adjusting(data) + return jsonify(kimi_response), 200 + + +if __name__ == '__main__': + app.run(port=5006)
\ No newline at end of file |
