summaryrefslogtreecommitdiff
path: root/utils.py
blob: 59202da9aadda59e3cce762f83bdb84b349ddec7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
import os
import numpy
import torch.nn as nn
import torch
import torch.utils.data as tud
import preprocess
import evaluation
from loguru import logger
import time
from tqdm import tqdm


def create_dataloader(dataset_name: str, input_size: int = 1, output_size: int = 1, step: int = 1, batch_size: int = 1,
                      time_index: bool = True, del_column_name: bool = True,
                      preprocess_name: str = "standardization") -> (tud.DataLoader, tud.DataLoader):
    """
    针对一个数据集,构建Dataloader
    :param dataset_name: 数据集名称
    :param input_size: 输入数据长度
    :param output_size: 输出数据长度
    :param step: 截取数据的窗口移动间隔
    :param batch_size: batch的大小
    :param time_index: True为第一列是时间戳,False为不
    :param del_column_name: 文件中第一行为列名时,使用True
    :param preprocess_name: 预处理方法
    :return: 训练数据与测试数据
    """
    ds = eval(f"preprocess.{preprocess_name}.MyDataset")(name=dataset_name.replace("/", "-"),
                                                         train_path=f"./dataset/{dataset_name}/train.csv",
                                                         test_path=f"./dataset/{dataset_name}/test.csv",
                                                         input_size=input_size, output_size=output_size,
                                                         step=step, time_index=time_index,
                                                         del_column_name=del_column_name)
    normal_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=True)
    ds.mode = "test"
    attack_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=False)
    return normal_dl, attack_dl


def create_all_dataloader(datasets: [str], input_size: int = 1, output_size: int = 1, step: int = 1,
                          batch_size: int = 1, time_index: bool = True, del_column_name: bool = True,
                          preprocess_name: str = "standardization") -> [{}]:
    """
    对所有数据集构建dataloader
    :param datasets: 数据集列表
    :param input_size: 输入数据长度
    :param output_size: 输出数据长度
    :param step: 截取数据的窗口移动间隔
    :param batch_size: batch的大小
    :param time_index: True为第一列是时间戳,False为不。
    :param del_column_name: 文件中第一行为列名时,使用True
    :param preprocess_name: 预处理方法
    :return: 所有数据集的dataloader构建结果
    """
    all_dataloader = []
    for dataset_name in datasets:
        logger.info(f'开始建立 dataloader {dataset_name}')
        if "train.csv" in os.listdir(f"./dataset/{dataset_name}"):
            normal_dl, attack_dl = create_dataloader(dataset_name=dataset_name, input_size=input_size,
                                                     output_size=output_size, step=step, batch_size=batch_size,
                                                     time_index=time_index, del_column_name=del_column_name,
                                                     preprocess_name=preprocess_name)
            all_dataloader.append([{
                'dataset_name': dataset_name,
                'normal': normal_dl,
                'attack': attack_dl
            }])
        else:
            all_sub_dataloader = []
            for sub_dataset_dir in os.listdir(f"./dataset/{dataset_name}"):
                sub_dataset_name = f"{dataset_name}/{sub_dataset_dir}"
                normal_dl, attack_dl = create_dataloader(dataset_name=sub_dataset_name, input_size=input_size,
                                                         output_size=output_size, step=step, batch_size=batch_size,
                                                         del_time=del_time, del_column_name=del_column_name)
                all_sub_dataloader.append({
                    'dataset_name': sub_dataset_name.replace("/", "-"),
                    'normal': normal_dl,
                    'attack': attack_dl
                })
            all_dataloader.append(all_sub_dataloader)

        logger.info(f'完成建立 dataloader {dataset_name}')
    return all_dataloader


class EvaluationScore:
    def __init__(self, evaluations: [str], attack=1):
        """
        用于自动划分阈值并进行批量评估
        :param evaluations: 使用的评估方法名称(需在evaluation文件夹中进行定义)
        :param attack: 异常的标签,0 or 1
        """
        self.time = 0
        self.f1 = 0
        self.f1_pa = 0
        self.f_tad = 0
        self.attack = attack
        self.normal = 1 - attack
        self._total_y_loss = None
        self._total_label = None
        self._total_pred_label = None
        self.true_pred_df = None
        self.true_pred_dict = None
        self.evaluations = evaluations
        self.scores = {}

    def add_data(self, y_loss, true_label, pred_label=None):
        """
        添加每个batch的数据
        :param y_loss: 数据偏差
        :param true_label: 真实数据标签
        :param pred_label: 预测标签
        """
        if pred_label is not None:
            if self._total_label is None and self._total_pred_label is None:
                self._total_label = true_label
                self._total_pred_label = pred_label
            else:
                self._total_label = torch.cat([self._total_label, true_label], dim=0)
                self._total_pred_label = torch.cat([self._total_pred_label, pred_label], dim=0)
            return

        y_loss = y_loss.view(-1).cpu().detach().numpy()
        true_label = true_label.view(-1).cpu().detach().numpy()

        if self._total_y_loss is None and self._total_label is None:
            self._total_y_loss = y_loss
            self._total_label = true_label
            return
        self._total_y_loss = numpy.concatenate((self._total_y_loss, y_loss), axis=0)
        self._total_label = numpy.concatenate((self._total_label, true_label), axis=0)

    def best_threshold(self, true_label: list, y_loss: list) -> dict:
        ret = {}
        for func_name in self.evaluations:
            threshold_max = max(y_loss)
            threshold_min = 0
            best_threshold = 0
            for _ in range(5):
                threshold_list = [threshold_max - i * (threshold_max - threshold_min) / 10 for i in range(11)]
                f1_list = []
                for threshold_one in threshold_list:
                    prediction_loss = numpy.where(numpy.array(y_loss) > threshold_one, self.attack, self.normal)
                    f1 = eval(f"evaluation.{func_name}.evaluate")(y_true=true_label, y_pred=prediction_loss.tolist())
                    f1_list.append(f1)
                ind = f1_list.index(max(f1_list))
                best_threshold = threshold_list[ind]
                if ind == 0:
                    threshold_max = threshold_list[ind]
                    threshold_min = threshold_list[ind+1]
                elif ind == len(threshold_list)-1:
                    threshold_max = threshold_list[ind-1]
                    threshold_min = threshold_list[ind]
                else:
                    threshold_max = threshold_list[ind-1]
                    threshold_min = threshold_list[ind+1]
            ret[func_name] = best_threshold
        return ret

    def auto_threshold(self):
        if self._total_pred_label is not None:
            return
        self._total_y_loss[numpy.isnan(self._total_y_loss)] = 0
        self._total_y_loss = self._total_y_loss / max(self._total_y_loss)
        thresholds = self.best_threshold(
            self._total_label.reshape(-1).data.tolist(), self._total_y_loss.reshape(-1).data.tolist())
        self.true_pred_dict = {
            'true': self._total_label.squeeze().tolist()
        }
        for func_name in thresholds:
            self.true_pred_dict[func_name] = \
                numpy.where(self._total_y_loss > thresholds[func_name], self.attack, self.normal).squeeze().tolist()
        # self.true_pred_df = pandas.DataFrame(self.true_pred_dict)
        for func_name in self.true_pred_dict:
            if func_name == "true":
                continue
            self.scores[func_name] = self.get_score(func_name)

    def get_score(self, func_name):
        if self._total_pred_label is not None:
            return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(),
                                                            self._total_pred_label.reshape(-1).tolist())
        return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(),
                                                        self.true_pred_dict[f"{func_name}"])

    def __str__(self):
        res = ""
        for func_name in self.scores:
            res += f"{func_name}={self.scores[func_name]:.3f} "
        return res[:-1]


def train_model(epoch: int, optimizer: torch.optim, dataloader: tud.DataLoader, model: nn.Module,
                device: str = "cpu") -> (nn.Module, str):
    """
    训练模型
    :param epoch: 当前训练轮数
    :param optimizer: 优化器
    :param dataloader: 数据集
    :param model: 模型
    :param device: 训练设备使用cpu还是gpu
    :return: 训练完成的模型;训练完成需要输出的信息
    """
    model.train()
    avg_loss = []
    dataloader.dataset.mode = "train"
    start_time = time.time()
    with tqdm(total=len(dataloader), ncols=150) as _tqdm:
        _tqdm.set_description(f'(进度条部分不会写进本地日志)epoch:{epoch},训练进度')
        for data in dataloader:
            x = data[0].to(device)
            y_true = data[2].to(device)
            optimizer.zero_grad()
            loss = model.loss(x=x, y_true=y_true, epoch=epoch, device=device)
            avg_loss.append(loss)
            optimizer.step()
            _tqdm.set_postfix(loss='{:.6f}'.format(sum(avg_loss) / len(avg_loss)))
            _tqdm.update(1)
    end_time = time.time()
    info = f"epoch={epoch}, average loss={'{:.6f}'.format(sum(avg_loss) / len(avg_loss))}, " \
           f"train time={'{:.1f}'.format(end_time-start_time)}s"
    return model, info


def test_model(dataloader: tud.DataLoader, model: nn.Module, evaluations: [str], device: str = "cpu") -> \
        (EvaluationScore, str):
    """
    测试模型
    :param dataloader: 数据集
    :param model: 模型
    :param device: 训练设备使用cpu还是gpu
    :return: 评估分数;测试完成需要输出的信息
    """
    es = EvaluationScore(evaluations)
    model.eval()
    dataloader.dataset.mode = "test"
    start_time = time.time()
    with tqdm(total=len(dataloader), ncols=150) as _tqdm:
        _tqdm.set_description(f'(进度条部分不会写进本地日志)测试进度')
        with torch.no_grad():
            for data in dataloader:
                x = data[0].to(device)
                y_true = data[2].to(device)
                label_true = data[1].int().to(device)
                y_loss, label_pred = model.detection(x=x, y_true=y_true, device=device)
                if label_pred is not None:
                    es.add_data(y_loss=None, true_label=label_true, pred_label=label_pred)
                else:
                    es.add_data(y_loss=y_loss, true_label=label_true, pred_label=None)
                _tqdm.update(1)
    end_time = time.time()
    es.auto_threshold()
    es_score = es.__str__().replace(" ", ", ")
    info = f"{es_score}, test time={'{:.1f}'.format(end_time-start_time)}s"
    return es, info


def train_and_test_model(start_time: str, epochs: int, normal_dataloader: tud.DataLoader, attack_dataloader: tud.DataLoader,
                         model: nn.Module, evaluations: [str], device: str = "cpu", lr: float = 1e-4,
                         model_path: str = None, train: bool = True) -> (dict, dict):
    """
    训练与测试
    :param start_time: 实验的开始时间。此处用于寻找存放路径。
    :param epochs: 总共训练轮数
    :param normal_dataloader: 训练数据集
    :param attack_dataloader: 测试数据集
    :param model: 模型
    :param evaluations: 评估方法
    :param device: 设备
    :param lr: 学习率
    :param model_path: 模型参数文件路径
    :param train: 是否训练,如果为否,则仅进行测试
    :return: 各个评估方法的最佳分数、各个评估方法最佳情况下的检测标签
    """
    dataset_name = normal_dataloader.dataset.name
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    if model_path:
        try:
            checkpoint = torch.load(model_path)
            model.load_state_dict(checkpoint['model_state_dict'])
            optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
            logger.info(f"模型参数文件{model_path}加载成功")
        except:
            logger.warning(f"模型参数文件{model_path}加载失败,将训练新模型")
    logger.info(f"模型:{model.name},数据集:{dataset_name},设备:{device},训练开始")
    best_score = {}
    best_detection = {}
    if train:
        logger.info(f"模式:训练并测试")
        for epoch in range(1, epochs+1):
            model, train_info = train_model(epoch=epoch, optimizer=optimizer, dataloader=normal_dataloader, model=model,
                                            device=device)
            es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations,
                                       device=device)
            logger.info(f"{train_info}, {test_info}")
            es_score = es.__str__().replace(" ", "_")
            torch.save({
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict()
            }, f'./records/{start_time}/model/model={model.name}_dataset={dataset_name}_epoch={epoch}_{es_score}.pth')
            for func_name in es.scores:
                if func_name not in best_score or es.scores[func_name] > best_score[func_name]:
                    best_score[func_name] = es.scores[func_name]
                    best_detection[func_name] = es.true_pred_dict[func_name]
                    best_detection["true"] = es.true_pred_dict["true"]
    else:
        logger.info(f"模式:仅进行测试")
        es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations, device=device)
        logger.info(test_info)
        for func_name in es.scores:
            best_score[func_name] = es.scores[func_name]
            best_detection[func_name] = es.true_pred_dict[func_name]
            best_detection["true"] = es.true_pred_dict["true"]
    return best_score, best_detection