summaryrefslogtreecommitdiff
path: root/strategy_adjust_code/kimi_main.py
blob: 87d0282750a800ff811832867293d7bc8680753f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from openai import OpenAI

client = OpenAI(
    api_key="sk-wFEcyFj8sS2mbncCEqYK5uxYa6VyQyJrQ8reKPFYzxQcwt3i",
    base_url="https://api.moonshot.cn/v1",
)

# 加载背景介绍信息
with open('./content_role_system.txt', 'r', encoding='utf-8') as f:
    content_system = f.read()

output_constraints = "请按下面规定的格式输出策略调整结果,从下面的集合中选择一种决策输出(不需要其他任何文字反馈,我需要处理格式化输出的内容):{保持当前策略执行、调整脚本参数:参数代称 原始的数值->新的数值、更换攻击脚本:脚本名称}"

mode_mapping = {"保持当前策略执行": "keep",
                "调整脚本参数": "parameter",
                "更换攻击脚本": "script"}

history = [
    {"role": "system",
     "content": content_system}
]


def chat(query, history):
    history.append({
        "role": "user",
        "content": query
    })
    completion = client.chat.completions.create(
        model="moonshot-v1-32k",
        messages=history,
        temperature=0.3,
    )
    result = completion.choices[0].message.content
    history.append({
        "role": "assistant",
        "content": result
    })
    return result


def initial(content):
    global history
    history = [
        {"role": "system",
         "content": content_system}
    ]

    try:
        clxz = content["clxz"]      # 策略选择
        mbgz = content["mbgz"]      # 目标感知
        ztgz = content["ztgz"]      # 状态感知
        script = content["script"]  # 脚本及参数
    except Exception as e:
        raise Exception('post上传的数据key解析失败', e)

    # 构建待上传ai的策略初始化信息
    assert clxz in ["拒绝服务", "数据欺骗"], "策略选择只能是'拒绝服务'或者'数据欺骗'"
    clxz_context = f"1. 测试的最终目的是:{clxz}"

    mbgz_context = f"2. 当前目标节点的18维不变特性是:{mbgz}"

    script = script.replace(';', ';').split(';')
    script_context = f"3. 当前执行的脚本名称为{script[0]};" + ("脚本参数选择为" + ";".join(script[1:]) if len(script) > 1 else "")

    ztgz_context = f"4. 当前(未执行脚本前)目标节点的状态参数为:{ztgz}"

    context = '\n'.join([clxz_context, mbgz_context, script_context, ztgz_context])

    # 初始化交互不需要输出响应
    chat(context, history)
    return 1


def adjusting(content):
    try:
        ztgz = content["ztgz"]      # 状态感知
    except Exception as e:
        raise Exception("post上传的数据key解析失败", e)

    context = f"当前目标节点的状态感知结果为:{ztgz}\n{output_constraints}"

    response = chat(context, history)

    mode = response.replace(":", ":").split(":")[0]
    assert mode in ["保持当前策略执行", "调整脚本参数", "更换攻击脚本"], f"响应出现非标情况,当前模式响应为{mode}"
    mode_context = mode_mapping[mode]

    try:
        if mode_context == "keep":
            adjust_context = "none"
        elif mode_context == "parameter":
            # 例如:模型响应:调整脚本参数:doh_ddos_n 4->8  输出adjust参数为doh_ddos_n-8
            para = response.replace(":", ":").split(":")[1]
            adjust_context = "-".join([para.split(" ")[0], para.split(">")[-1]])
        else:
            # 例如:模型响应:更换攻击脚本:DoT_DDoS   输出adjust参数为DoT_DDoS
            adjust_context = response.replace(":", ":").split(":")[1]
    except Exception as e:
        raise Exception("处理模型规范化输出错误", e)

    # # debug
    # print(history)

    return {"mode": mode_context, "adjust": adjust_context}