diff options
Diffstat (limited to 'utils.py')
| -rw-r--r-- | utils.py | 318 |
1 files changed, 318 insertions, 0 deletions
diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..59202da --- /dev/null +++ b/utils.py @@ -0,0 +1,318 @@ +import os +import numpy +import torch.nn as nn +import torch +import torch.utils.data as tud +import preprocess +import evaluation +from loguru import logger +import time +from tqdm import tqdm + + +def create_dataloader(dataset_name: str, input_size: int = 1, output_size: int = 1, step: int = 1, batch_size: int = 1, + time_index: bool = True, del_column_name: bool = True, + preprocess_name: str = "standardization") -> (tud.DataLoader, tud.DataLoader): + """ + 针对一个数据集,构建Dataloader + :param dataset_name: 数据集名称 + :param input_size: 输入数据长度 + :param output_size: 输出数据长度 + :param step: 截取数据的窗口移动间隔 + :param batch_size: batch的大小 + :param time_index: True为第一列是时间戳,False为不 + :param del_column_name: 文件中第一行为列名时,使用True + :param preprocess_name: 预处理方法 + :return: 训练数据与测试数据 + """ + ds = eval(f"preprocess.{preprocess_name}.MyDataset")(name=dataset_name.replace("/", "-"), + train_path=f"./dataset/{dataset_name}/train.csv", + test_path=f"./dataset/{dataset_name}/test.csv", + input_size=input_size, output_size=output_size, + step=step, time_index=time_index, + del_column_name=del_column_name) + normal_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=True) + ds.mode = "test" + attack_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=False) + return normal_dl, attack_dl + + +def create_all_dataloader(datasets: [str], input_size: int = 1, output_size: int = 1, step: int = 1, + batch_size: int = 1, time_index: bool = True, del_column_name: bool = True, + preprocess_name: str = "standardization") -> [{}]: + """ + 对所有数据集构建dataloader + :param datasets: 数据集列表 + :param input_size: 输入数据长度 + :param output_size: 输出数据长度 + :param step: 截取数据的窗口移动间隔 + :param batch_size: batch的大小 + :param time_index: True为第一列是时间戳,False为不。 + :param del_column_name: 文件中第一行为列名时,使用True + :param preprocess_name: 预处理方法 + :return: 所有数据集的dataloader构建结果 + """ + all_dataloader = [] + for dataset_name in datasets: + logger.info(f'开始建立 dataloader {dataset_name}') + if "train.csv" in os.listdir(f"./dataset/{dataset_name}"): + normal_dl, attack_dl = create_dataloader(dataset_name=dataset_name, input_size=input_size, + output_size=output_size, step=step, batch_size=batch_size, + time_index=time_index, del_column_name=del_column_name, + preprocess_name=preprocess_name) + all_dataloader.append([{ + 'dataset_name': dataset_name, + 'normal': normal_dl, + 'attack': attack_dl + }]) + else: + all_sub_dataloader = [] + for sub_dataset_dir in os.listdir(f"./dataset/{dataset_name}"): + sub_dataset_name = f"{dataset_name}/{sub_dataset_dir}" + normal_dl, attack_dl = create_dataloader(dataset_name=sub_dataset_name, input_size=input_size, + output_size=output_size, step=step, batch_size=batch_size, + del_time=del_time, del_column_name=del_column_name) + all_sub_dataloader.append({ + 'dataset_name': sub_dataset_name.replace("/", "-"), + 'normal': normal_dl, + 'attack': attack_dl + }) + all_dataloader.append(all_sub_dataloader) + + logger.info(f'完成建立 dataloader {dataset_name}') + return all_dataloader + + +class EvaluationScore: + def __init__(self, evaluations: [str], attack=1): + """ + 用于自动划分阈值并进行批量评估 + :param evaluations: 使用的评估方法名称(需在evaluation文件夹中进行定义) + :param attack: 异常的标签,0 or 1 + """ + self.time = 0 + self.f1 = 0 + self.f1_pa = 0 + self.f_tad = 0 + self.attack = attack + self.normal = 1 - attack + self._total_y_loss = None + self._total_label = None + self._total_pred_label = None + self.true_pred_df = None + self.true_pred_dict = None + self.evaluations = evaluations + self.scores = {} + + def add_data(self, y_loss, true_label, pred_label=None): + """ + 添加每个batch的数据 + :param y_loss: 数据偏差 + :param true_label: 真实数据标签 + :param pred_label: 预测标签 + """ + if pred_label is not None: + if self._total_label is None and self._total_pred_label is None: + self._total_label = true_label + self._total_pred_label = pred_label + else: + self._total_label = torch.cat([self._total_label, true_label], dim=0) + self._total_pred_label = torch.cat([self._total_pred_label, pred_label], dim=0) + return + + y_loss = y_loss.view(-1).cpu().detach().numpy() + true_label = true_label.view(-1).cpu().detach().numpy() + + if self._total_y_loss is None and self._total_label is None: + self._total_y_loss = y_loss + self._total_label = true_label + return + self._total_y_loss = numpy.concatenate((self._total_y_loss, y_loss), axis=0) + self._total_label = numpy.concatenate((self._total_label, true_label), axis=0) + + def best_threshold(self, true_label: list, y_loss: list) -> dict: + ret = {} + for func_name in self.evaluations: + threshold_max = max(y_loss) + threshold_min = 0 + best_threshold = 0 + for _ in range(5): + threshold_list = [threshold_max - i * (threshold_max - threshold_min) / 10 for i in range(11)] + f1_list = [] + for threshold_one in threshold_list: + prediction_loss = numpy.where(numpy.array(y_loss) > threshold_one, self.attack, self.normal) + f1 = eval(f"evaluation.{func_name}.evaluate")(y_true=true_label, y_pred=prediction_loss.tolist()) + f1_list.append(f1) + ind = f1_list.index(max(f1_list)) + best_threshold = threshold_list[ind] + if ind == 0: + threshold_max = threshold_list[ind] + threshold_min = threshold_list[ind+1] + elif ind == len(threshold_list)-1: + threshold_max = threshold_list[ind-1] + threshold_min = threshold_list[ind] + else: + threshold_max = threshold_list[ind-1] + threshold_min = threshold_list[ind+1] + ret[func_name] = best_threshold + return ret + + def auto_threshold(self): + if self._total_pred_label is not None: + return + self._total_y_loss[numpy.isnan(self._total_y_loss)] = 0 + self._total_y_loss = self._total_y_loss / max(self._total_y_loss) + thresholds = self.best_threshold( + self._total_label.reshape(-1).data.tolist(), self._total_y_loss.reshape(-1).data.tolist()) + self.true_pred_dict = { + 'true': self._total_label.squeeze().tolist() + } + for func_name in thresholds: + self.true_pred_dict[func_name] = \ + numpy.where(self._total_y_loss > thresholds[func_name], self.attack, self.normal).squeeze().tolist() + # self.true_pred_df = pandas.DataFrame(self.true_pred_dict) + for func_name in self.true_pred_dict: + if func_name == "true": + continue + self.scores[func_name] = self.get_score(func_name) + + def get_score(self, func_name): + if self._total_pred_label is not None: + return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(), + self._total_pred_label.reshape(-1).tolist()) + return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(), + self.true_pred_dict[f"{func_name}"]) + + def __str__(self): + res = "" + for func_name in self.scores: + res += f"{func_name}={self.scores[func_name]:.3f} " + return res[:-1] + + +def train_model(epoch: int, optimizer: torch.optim, dataloader: tud.DataLoader, model: nn.Module, + device: str = "cpu") -> (nn.Module, str): + """ + 训练模型 + :param epoch: 当前训练轮数 + :param optimizer: 优化器 + :param dataloader: 数据集 + :param model: 模型 + :param device: 训练设备使用cpu还是gpu + :return: 训练完成的模型;训练完成需要输出的信息 + """ + model.train() + avg_loss = [] + dataloader.dataset.mode = "train" + start_time = time.time() + with tqdm(total=len(dataloader), ncols=150) as _tqdm: + _tqdm.set_description(f'(进度条部分不会写进本地日志)epoch:{epoch},训练进度') + for data in dataloader: + x = data[0].to(device) + y_true = data[2].to(device) + optimizer.zero_grad() + loss = model.loss(x=x, y_true=y_true, epoch=epoch, device=device) + avg_loss.append(loss) + optimizer.step() + _tqdm.set_postfix(loss='{:.6f}'.format(sum(avg_loss) / len(avg_loss))) + _tqdm.update(1) + end_time = time.time() + info = f"epoch={epoch}, average loss={'{:.6f}'.format(sum(avg_loss) / len(avg_loss))}, " \ + f"train time={'{:.1f}'.format(end_time-start_time)}s" + return model, info + + +def test_model(dataloader: tud.DataLoader, model: nn.Module, evaluations: [str], device: str = "cpu") -> \ + (EvaluationScore, str): + """ + 测试模型 + :param dataloader: 数据集 + :param model: 模型 + :param device: 训练设备使用cpu还是gpu + :return: 评估分数;测试完成需要输出的信息 + """ + es = EvaluationScore(evaluations) + model.eval() + dataloader.dataset.mode = "test" + start_time = time.time() + with tqdm(total=len(dataloader), ncols=150) as _tqdm: + _tqdm.set_description(f'(进度条部分不会写进本地日志)测试进度') + with torch.no_grad(): + for data in dataloader: + x = data[0].to(device) + y_true = data[2].to(device) + label_true = data[1].int().to(device) + y_loss, label_pred = model.detection(x=x, y_true=y_true, device=device) + if label_pred is not None: + es.add_data(y_loss=None, true_label=label_true, pred_label=label_pred) + else: + es.add_data(y_loss=y_loss, true_label=label_true, pred_label=None) + _tqdm.update(1) + end_time = time.time() + es.auto_threshold() + es_score = es.__str__().replace(" ", ", ") + info = f"{es_score}, test time={'{:.1f}'.format(end_time-start_time)}s" + return es, info + + +def train_and_test_model(start_time: str, epochs: int, normal_dataloader: tud.DataLoader, attack_dataloader: tud.DataLoader, + model: nn.Module, evaluations: [str], device: str = "cpu", lr: float = 1e-4, + model_path: str = None, train: bool = True) -> (dict, dict): + """ + 训练与测试 + :param start_time: 实验的开始时间。此处用于寻找存放路径。 + :param epochs: 总共训练轮数 + :param normal_dataloader: 训练数据集 + :param attack_dataloader: 测试数据集 + :param model: 模型 + :param evaluations: 评估方法 + :param device: 设备 + :param lr: 学习率 + :param model_path: 模型参数文件路径 + :param train: 是否训练,如果为否,则仅进行测试 + :return: 各个评估方法的最佳分数、各个评估方法最佳情况下的检测标签 + """ + dataset_name = normal_dataloader.dataset.name + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + if model_path: + try: + checkpoint = torch.load(model_path) + model.load_state_dict(checkpoint['model_state_dict']) + optimizer.load_state_dict(checkpoint['optimizer_state_dict']) + logger.info(f"模型参数文件{model_path}加载成功") + except: + logger.warning(f"模型参数文件{model_path}加载失败,将训练新模型") + logger.info(f"模型:{model.name},数据集:{dataset_name},设备:{device},训练开始") + best_score = {} + best_detection = {} + if train: + logger.info(f"模式:训练并测试") + for epoch in range(1, epochs+1): + model, train_info = train_model(epoch=epoch, optimizer=optimizer, dataloader=normal_dataloader, model=model, + device=device) + es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations, + device=device) + logger.info(f"{train_info}, {test_info}") + es_score = es.__str__().replace(" ", "_") + torch.save({ + 'model_state_dict': model.state_dict(), + 'optimizer_state_dict': optimizer.state_dict() + }, f'./records/{start_time}/model/model={model.name}_dataset={dataset_name}_epoch={epoch}_{es_score}.pth') + for func_name in es.scores: + if func_name not in best_score or es.scores[func_name] > best_score[func_name]: + best_score[func_name] = es.scores[func_name] + best_detection[func_name] = es.true_pred_dict[func_name] + best_detection["true"] = es.true_pred_dict["true"] + else: + logger.info(f"模式:仅进行测试") + es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations, device=device) + logger.info(test_info) + for func_name in es.scores: + best_score[func_name] = es.scores[func_name] + best_detection[func_name] = es.true_pred_dict[func_name] + best_detection["true"] = es.true_pred_dict["true"] + return best_score, best_detection + + + + |
