summaryrefslogtreecommitdiff
path: root/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils.py')
-rw-r--r--utils.py318
1 files changed, 318 insertions, 0 deletions
diff --git a/utils.py b/utils.py
new file mode 100644
index 0000000..59202da
--- /dev/null
+++ b/utils.py
@@ -0,0 +1,318 @@
+import os
+import numpy
+import torch.nn as nn
+import torch
+import torch.utils.data as tud
+import preprocess
+import evaluation
+from loguru import logger
+import time
+from tqdm import tqdm
+
+
+def create_dataloader(dataset_name: str, input_size: int = 1, output_size: int = 1, step: int = 1, batch_size: int = 1,
+ time_index: bool = True, del_column_name: bool = True,
+ preprocess_name: str = "standardization") -> (tud.DataLoader, tud.DataLoader):
+ """
+ 针对一个数据集,构建Dataloader
+ :param dataset_name: 数据集名称
+ :param input_size: 输入数据长度
+ :param output_size: 输出数据长度
+ :param step: 截取数据的窗口移动间隔
+ :param batch_size: batch的大小
+ :param time_index: True为第一列是时间戳,False为不
+ :param del_column_name: 文件中第一行为列名时,使用True
+ :param preprocess_name: 预处理方法
+ :return: 训练数据与测试数据
+ """
+ ds = eval(f"preprocess.{preprocess_name}.MyDataset")(name=dataset_name.replace("/", "-"),
+ train_path=f"./dataset/{dataset_name}/train.csv",
+ test_path=f"./dataset/{dataset_name}/test.csv",
+ input_size=input_size, output_size=output_size,
+ step=step, time_index=time_index,
+ del_column_name=del_column_name)
+ normal_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=True)
+ ds.mode = "test"
+ attack_dl = tud.DataLoader(dataset=ds, batch_size=batch_size, shuffle=False)
+ return normal_dl, attack_dl
+
+
+def create_all_dataloader(datasets: [str], input_size: int = 1, output_size: int = 1, step: int = 1,
+ batch_size: int = 1, time_index: bool = True, del_column_name: bool = True,
+ preprocess_name: str = "standardization") -> [{}]:
+ """
+ 对所有数据集构建dataloader
+ :param datasets: 数据集列表
+ :param input_size: 输入数据长度
+ :param output_size: 输出数据长度
+ :param step: 截取数据的窗口移动间隔
+ :param batch_size: batch的大小
+ :param time_index: True为第一列是时间戳,False为不。
+ :param del_column_name: 文件中第一行为列名时,使用True
+ :param preprocess_name: 预处理方法
+ :return: 所有数据集的dataloader构建结果
+ """
+ all_dataloader = []
+ for dataset_name in datasets:
+ logger.info(f'开始建立 dataloader {dataset_name}')
+ if "train.csv" in os.listdir(f"./dataset/{dataset_name}"):
+ normal_dl, attack_dl = create_dataloader(dataset_name=dataset_name, input_size=input_size,
+ output_size=output_size, step=step, batch_size=batch_size,
+ time_index=time_index, del_column_name=del_column_name,
+ preprocess_name=preprocess_name)
+ all_dataloader.append([{
+ 'dataset_name': dataset_name,
+ 'normal': normal_dl,
+ 'attack': attack_dl
+ }])
+ else:
+ all_sub_dataloader = []
+ for sub_dataset_dir in os.listdir(f"./dataset/{dataset_name}"):
+ sub_dataset_name = f"{dataset_name}/{sub_dataset_dir}"
+ normal_dl, attack_dl = create_dataloader(dataset_name=sub_dataset_name, input_size=input_size,
+ output_size=output_size, step=step, batch_size=batch_size,
+ del_time=del_time, del_column_name=del_column_name)
+ all_sub_dataloader.append({
+ 'dataset_name': sub_dataset_name.replace("/", "-"),
+ 'normal': normal_dl,
+ 'attack': attack_dl
+ })
+ all_dataloader.append(all_sub_dataloader)
+
+ logger.info(f'完成建立 dataloader {dataset_name}')
+ return all_dataloader
+
+
+class EvaluationScore:
+ def __init__(self, evaluations: [str], attack=1):
+ """
+ 用于自动划分阈值并进行批量评估
+ :param evaluations: 使用的评估方法名称(需在evaluation文件夹中进行定义)
+ :param attack: 异常的标签,0 or 1
+ """
+ self.time = 0
+ self.f1 = 0
+ self.f1_pa = 0
+ self.f_tad = 0
+ self.attack = attack
+ self.normal = 1 - attack
+ self._total_y_loss = None
+ self._total_label = None
+ self._total_pred_label = None
+ self.true_pred_df = None
+ self.true_pred_dict = None
+ self.evaluations = evaluations
+ self.scores = {}
+
+ def add_data(self, y_loss, true_label, pred_label=None):
+ """
+ 添加每个batch的数据
+ :param y_loss: 数据偏差
+ :param true_label: 真实数据标签
+ :param pred_label: 预测标签
+ """
+ if pred_label is not None:
+ if self._total_label is None and self._total_pred_label is None:
+ self._total_label = true_label
+ self._total_pred_label = pred_label
+ else:
+ self._total_label = torch.cat([self._total_label, true_label], dim=0)
+ self._total_pred_label = torch.cat([self._total_pred_label, pred_label], dim=0)
+ return
+
+ y_loss = y_loss.view(-1).cpu().detach().numpy()
+ true_label = true_label.view(-1).cpu().detach().numpy()
+
+ if self._total_y_loss is None and self._total_label is None:
+ self._total_y_loss = y_loss
+ self._total_label = true_label
+ return
+ self._total_y_loss = numpy.concatenate((self._total_y_loss, y_loss), axis=0)
+ self._total_label = numpy.concatenate((self._total_label, true_label), axis=0)
+
+ def best_threshold(self, true_label: list, y_loss: list) -> dict:
+ ret = {}
+ for func_name in self.evaluations:
+ threshold_max = max(y_loss)
+ threshold_min = 0
+ best_threshold = 0
+ for _ in range(5):
+ threshold_list = [threshold_max - i * (threshold_max - threshold_min) / 10 for i in range(11)]
+ f1_list = []
+ for threshold_one in threshold_list:
+ prediction_loss = numpy.where(numpy.array(y_loss) > threshold_one, self.attack, self.normal)
+ f1 = eval(f"evaluation.{func_name}.evaluate")(y_true=true_label, y_pred=prediction_loss.tolist())
+ f1_list.append(f1)
+ ind = f1_list.index(max(f1_list))
+ best_threshold = threshold_list[ind]
+ if ind == 0:
+ threshold_max = threshold_list[ind]
+ threshold_min = threshold_list[ind+1]
+ elif ind == len(threshold_list)-1:
+ threshold_max = threshold_list[ind-1]
+ threshold_min = threshold_list[ind]
+ else:
+ threshold_max = threshold_list[ind-1]
+ threshold_min = threshold_list[ind+1]
+ ret[func_name] = best_threshold
+ return ret
+
+ def auto_threshold(self):
+ if self._total_pred_label is not None:
+ return
+ self._total_y_loss[numpy.isnan(self._total_y_loss)] = 0
+ self._total_y_loss = self._total_y_loss / max(self._total_y_loss)
+ thresholds = self.best_threshold(
+ self._total_label.reshape(-1).data.tolist(), self._total_y_loss.reshape(-1).data.tolist())
+ self.true_pred_dict = {
+ 'true': self._total_label.squeeze().tolist()
+ }
+ for func_name in thresholds:
+ self.true_pred_dict[func_name] = \
+ numpy.where(self._total_y_loss > thresholds[func_name], self.attack, self.normal).squeeze().tolist()
+ # self.true_pred_df = pandas.DataFrame(self.true_pred_dict)
+ for func_name in self.true_pred_dict:
+ if func_name == "true":
+ continue
+ self.scores[func_name] = self.get_score(func_name)
+
+ def get_score(self, func_name):
+ if self._total_pred_label is not None:
+ return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(),
+ self._total_pred_label.reshape(-1).tolist())
+ return eval(f"evaluation.{func_name}.evaluate")(self._total_label.reshape(-1).tolist(),
+ self.true_pred_dict[f"{func_name}"])
+
+ def __str__(self):
+ res = ""
+ for func_name in self.scores:
+ res += f"{func_name}={self.scores[func_name]:.3f} "
+ return res[:-1]
+
+
+def train_model(epoch: int, optimizer: torch.optim, dataloader: tud.DataLoader, model: nn.Module,
+ device: str = "cpu") -> (nn.Module, str):
+ """
+ 训练模型
+ :param epoch: 当前训练轮数
+ :param optimizer: 优化器
+ :param dataloader: 数据集
+ :param model: 模型
+ :param device: 训练设备使用cpu还是gpu
+ :return: 训练完成的模型;训练完成需要输出的信息
+ """
+ model.train()
+ avg_loss = []
+ dataloader.dataset.mode = "train"
+ start_time = time.time()
+ with tqdm(total=len(dataloader), ncols=150) as _tqdm:
+ _tqdm.set_description(f'(进度条部分不会写进本地日志)epoch:{epoch},训练进度')
+ for data in dataloader:
+ x = data[0].to(device)
+ y_true = data[2].to(device)
+ optimizer.zero_grad()
+ loss = model.loss(x=x, y_true=y_true, epoch=epoch, device=device)
+ avg_loss.append(loss)
+ optimizer.step()
+ _tqdm.set_postfix(loss='{:.6f}'.format(sum(avg_loss) / len(avg_loss)))
+ _tqdm.update(1)
+ end_time = time.time()
+ info = f"epoch={epoch}, average loss={'{:.6f}'.format(sum(avg_loss) / len(avg_loss))}, " \
+ f"train time={'{:.1f}'.format(end_time-start_time)}s"
+ return model, info
+
+
+def test_model(dataloader: tud.DataLoader, model: nn.Module, evaluations: [str], device: str = "cpu") -> \
+ (EvaluationScore, str):
+ """
+ 测试模型
+ :param dataloader: 数据集
+ :param model: 模型
+ :param device: 训练设备使用cpu还是gpu
+ :return: 评估分数;测试完成需要输出的信息
+ """
+ es = EvaluationScore(evaluations)
+ model.eval()
+ dataloader.dataset.mode = "test"
+ start_time = time.time()
+ with tqdm(total=len(dataloader), ncols=150) as _tqdm:
+ _tqdm.set_description(f'(进度条部分不会写进本地日志)测试进度')
+ with torch.no_grad():
+ for data in dataloader:
+ x = data[0].to(device)
+ y_true = data[2].to(device)
+ label_true = data[1].int().to(device)
+ y_loss, label_pred = model.detection(x=x, y_true=y_true, device=device)
+ if label_pred is not None:
+ es.add_data(y_loss=None, true_label=label_true, pred_label=label_pred)
+ else:
+ es.add_data(y_loss=y_loss, true_label=label_true, pred_label=None)
+ _tqdm.update(1)
+ end_time = time.time()
+ es.auto_threshold()
+ es_score = es.__str__().replace(" ", ", ")
+ info = f"{es_score}, test time={'{:.1f}'.format(end_time-start_time)}s"
+ return es, info
+
+
+def train_and_test_model(start_time: str, epochs: int, normal_dataloader: tud.DataLoader, attack_dataloader: tud.DataLoader,
+ model: nn.Module, evaluations: [str], device: str = "cpu", lr: float = 1e-4,
+ model_path: str = None, train: bool = True) -> (dict, dict):
+ """
+ 训练与测试
+ :param start_time: 实验的开始时间。此处用于寻找存放路径。
+ :param epochs: 总共训练轮数
+ :param normal_dataloader: 训练数据集
+ :param attack_dataloader: 测试数据集
+ :param model: 模型
+ :param evaluations: 评估方法
+ :param device: 设备
+ :param lr: 学习率
+ :param model_path: 模型参数文件路径
+ :param train: 是否训练,如果为否,则仅进行测试
+ :return: 各个评估方法的最佳分数、各个评估方法最佳情况下的检测标签
+ """
+ dataset_name = normal_dataloader.dataset.name
+ optimizer = torch.optim.Adam(model.parameters(), lr=lr)
+ if model_path:
+ try:
+ checkpoint = torch.load(model_path)
+ model.load_state_dict(checkpoint['model_state_dict'])
+ optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
+ logger.info(f"模型参数文件{model_path}加载成功")
+ except:
+ logger.warning(f"模型参数文件{model_path}加载失败,将训练新模型")
+ logger.info(f"模型:{model.name},数据集:{dataset_name},设备:{device},训练开始")
+ best_score = {}
+ best_detection = {}
+ if train:
+ logger.info(f"模式:训练并测试")
+ for epoch in range(1, epochs+1):
+ model, train_info = train_model(epoch=epoch, optimizer=optimizer, dataloader=normal_dataloader, model=model,
+ device=device)
+ es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations,
+ device=device)
+ logger.info(f"{train_info}, {test_info}")
+ es_score = es.__str__().replace(" ", "_")
+ torch.save({
+ 'model_state_dict': model.state_dict(),
+ 'optimizer_state_dict': optimizer.state_dict()
+ }, f'./records/{start_time}/model/model={model.name}_dataset={dataset_name}_epoch={epoch}_{es_score}.pth')
+ for func_name in es.scores:
+ if func_name not in best_score or es.scores[func_name] > best_score[func_name]:
+ best_score[func_name] = es.scores[func_name]
+ best_detection[func_name] = es.true_pred_dict[func_name]
+ best_detection["true"] = es.true_pred_dict["true"]
+ else:
+ logger.info(f"模式:仅进行测试")
+ es, test_info = test_model(dataloader=attack_dataloader, model=model, evaluations=evaluations, device=device)
+ logger.info(test_info)
+ for func_name in es.scores:
+ best_score[func_name] = es.scores[func_name]
+ best_detection[func_name] = es.true_pred_dict[func_name]
+ best_detection["true"] = es.true_pred_dict["true"]
+ return best_score, best_detection
+
+
+
+