summaryrefslogtreecommitdiff
path: root/method/MtadGat.py
diff options
context:
space:
mode:
authorZHENG Yanqin <[email protected]>2023-05-25 07:37:53 +0000
committerZHENG Yanqin <[email protected]>2023-05-25 07:37:53 +0000
commite9896bd62bb29da00ec00a121374167ad91bfe47 (patch)
treed94845574c8ef7473d0204d28b4efd4038035463 /method/MtadGat.py
parentfad9aa875c84b38cbb5a6010e104922b1eea7291 (diff)
parent4c5734c624705449c6b21c4b2bc5554e7259fdba (diff)
Merge branch 'master' into 'main'HEADmain
readme See merge request zyq/time_series_anomaly_detection!1
Diffstat (limited to 'method/MtadGat.py')
-rw-r--r--method/MtadGat.py414
1 files changed, 414 insertions, 0 deletions
diff --git a/method/MtadGat.py b/method/MtadGat.py
new file mode 100644
index 0000000..1ed679f
--- /dev/null
+++ b/method/MtadGat.py
@@ -0,0 +1,414 @@
+"""
+github找的第三方实现的Mtad-Gat源码
+由于源码使用了torch.empty,经常导致loss为nan,因此替换为了torch.zeros
+"""
+
+import torch
+import torch.nn as nn
+import torch.nn.functional as F
+import torch.utils.data as tud
+
+
+class ConvLayer(nn.Module):
+ """1-D Convolution layer to extract high-level features of each time-series input
+ :param n_features: Number of input features/nodes
+ :param window_size: length of the input sequence
+ :param kernel_size: size of kernel to use in the convolution operation
+ """
+
+ def __init__(self, n_features, kernel_size=7):
+ super(ConvLayer, self).__init__()
+ self.padding = nn.ConstantPad1d((kernel_size - 1) // 2, 0.0)
+ self.conv = nn.Conv1d(in_channels=n_features, out_channels=n_features, kernel_size=kernel_size)
+ self.relu = nn.ReLU()
+
+ def forward(self, x):
+ x = x.permute(0, 2, 1)
+ x = self.padding(x)
+ x = self.relu(self.conv(x))
+ return x.permute(0, 2, 1) # Permute back
+
+
+class FeatureAttentionLayer(nn.Module):
+ """Single Graph Feature/Spatial Attention Layer
+ :param n_features: Number of input features/nodes
+ :param window_size: length of the input sequence
+ :param dropout: percentage of nodes to dropout
+ :param alpha: negative slope used in the leaky rely activation function
+ :param embed_dim: embedding dimension (output dimension of linear transformation)
+ :param use_gatv2: whether to use the modified attention mechanism of GATv2 instead of standard GAT
+ :param use_bias: whether to include a bias term in the attention layer
+ """
+
+ def __init__(self, n_features, window_size, dropout, alpha, embed_dim=None, use_gatv2=True, use_bias=True):
+ super(FeatureAttentionLayer, self).__init__()
+ self.n_features = n_features
+ self.window_size = window_size
+ self.dropout = dropout
+ self.embed_dim = embed_dim if embed_dim is not None else window_size
+ self.use_gatv2 = use_gatv2
+ self.num_nodes = n_features
+ self.use_bias = use_bias
+
+ # Because linear transformation is done after concatenation in GATv2
+ if self.use_gatv2:
+ self.embed_dim *= 2
+ lin_input_dim = 2 * window_size
+ a_input_dim = self.embed_dim
+ else:
+ lin_input_dim = window_size
+ a_input_dim = 2 * self.embed_dim
+
+ self.lin = nn.Linear(lin_input_dim, self.embed_dim)
+ # self.a = nn.Parameter(torch.empty((a_input_dim, 1)))
+ self.a = nn.Parameter(torch.zeros((a_input_dim, 1)))
+ nn.init.xavier_uniform_(self.a.data, gain=1.414)
+
+ if self.use_bias:
+ # self.bias = nn.Parameter(torch.empty(n_features, n_features))
+ self.bias = nn.Parameter(torch.zeros(n_features, n_features))
+
+ self.leakyrelu = nn.LeakyReLU(alpha)
+ self.sigmoid = nn.Sigmoid()
+
+ def forward(self, x):
+ # x shape (b, n, k): b - batch size, n - window size, k - number of features
+ # For feature attention we represent a node as the values of a particular feature across all timestamps
+
+ x = x.permute(0, 2, 1)
+
+ # 'Dynamic' GAT attention
+ # Proposed by Brody et. al., 2021 (https://arxiv.org/pdf/2105.14491.pdf)
+ # Linear transformation applied after concatenation and attention layer applied after leakyrelu
+ if self.use_gatv2:
+ a_input = self._make_attention_input(x) # (b, k, k, 2*window_size)
+ a_input = self.leakyrelu(self.lin(a_input)) # (b, k, k, embed_dim)
+ e = torch.matmul(a_input, self.a).squeeze(3) # (b, k, k, 1)
+
+ # Original GAT attention
+ else:
+ Wx = self.lin(x) # (b, k, k, embed_dim)
+ a_input = self._make_attention_input(Wx) # (b, k, k, 2*embed_dim)
+ e = self.leakyrelu(torch.matmul(a_input, self.a)).squeeze(3) # (b, k, k, 1)
+
+ if self.use_bias:
+ e += self.bias
+
+ # Attention weights
+ attention = torch.softmax(e, dim=2)
+ attention = torch.dropout(attention, self.dropout, train=self.training)
+
+ # Computing new node features using the attention
+ h = self.sigmoid(torch.matmul(attention, x))
+
+ return h.permute(0, 2, 1)
+
+ def _make_attention_input(self, v):
+ """Preparing the feature attention mechanism.
+ Creating matrix with all possible combinations of concatenations of node.
+ Each node consists of all values of that node within the window
+ v1 || v1,
+ ...
+ v1 || vK,
+ v2 || v1,
+ ...
+ v2 || vK,
+ ...
+ ...
+ vK || v1,
+ ...
+ vK || vK,
+ """
+
+ K = self.num_nodes
+ blocks_repeating = v.repeat_interleave(K, dim=1) # Left-side of the matrix
+ blocks_alternating = v.repeat(1, K, 1) # Right-side of the matrix
+ combined = torch.cat((blocks_repeating, blocks_alternating), dim=2) # (b, K*K, 2*window_size)
+
+ if self.use_gatv2:
+ return combined.view(v.size(0), K, K, 2 * self.window_size)
+ else:
+ return combined.view(v.size(0), K, K, 2 * self.embed_dim)
+
+
+class TemporalAttentionLayer(nn.Module):
+ """Single Graph Temporal Attention Layer
+ :param n_features: number of input features/nodes
+ :param window_size: length of the input sequence
+ :param dropout: percentage of nodes to dropout
+ :param alpha: negative slope used in the leaky rely activation function
+ :param embed_dim: embedding dimension (output dimension of linear transformation)
+ :param use_gatv2: whether to use the modified attention mechanism of GATv2 instead of standard GAT
+ :param use_bias: whether to include a bias term in the attention layer
+
+ """
+
+ def __init__(self, n_features, window_size, dropout, alpha, embed_dim=None, use_gatv2=True, use_bias=True):
+ super(TemporalAttentionLayer, self).__init__()
+ self.n_features = n_features
+ self.window_size = window_size
+ self.dropout = dropout
+ self.use_gatv2 = use_gatv2
+ self.embed_dim = embed_dim if embed_dim is not None else n_features
+ self.num_nodes = window_size
+ self.use_bias = use_bias
+
+ # Because linear transformation is performed after concatenation in GATv2
+ if self.use_gatv2:
+ self.embed_dim *= 2
+ lin_input_dim = 2 * n_features
+ a_input_dim = self.embed_dim
+ else:
+ lin_input_dim = n_features
+ a_input_dim = 2 * self.embed_dim
+
+ self.lin = nn.Linear(lin_input_dim, self.embed_dim)
+ # self.a = nn.Parameter(torch.empty((a_input_dim, 1)))
+ self.a = nn.Parameter(torch.zeros((a_input_dim, 1)))
+ nn.init.xavier_uniform_(self.a.data, gain=1.414)
+
+ if self.use_bias:
+ # self.bias = nn.Parameter(torch.empty(window_size, window_size))
+ self.bias = nn.Parameter(torch.zeros(window_size, window_size))
+ self.leakyrelu = nn.LeakyReLU(alpha)
+ self.sigmoid = nn.Sigmoid()
+
+ def forward(self, x):
+ # x shape (b, n, k): b - batch size, n - window size, k - number of features
+ # For temporal attention a node is represented as all feature values at a specific timestamp
+
+ # 'Dynamic' GAT attention
+ # Proposed by Brody et. al., 2021 (https://arxiv.org/pdf/2105.14491.pdf)
+ # Linear transformation applied after concatenation and attention layer applied after leakyrelu
+ if self.use_gatv2:
+ a_input = self._make_attention_input(x) # (b, n, n, 2*n_features)
+ a_input = self.leakyrelu(self.lin(a_input)) # (b, n, n, embed_dim)
+ e = torch.matmul(a_input, self.a).squeeze(3) # (b, n, n, 1)
+
+ # Original GAT attention
+ else:
+ Wx = self.lin(x) # (b, n, n, embed_dim)
+ a_input = self._make_attention_input(Wx) # (b, n, n, 2*embed_dim)
+ e = self.leakyrelu(torch.matmul(a_input, self.a)).squeeze(3) # (b, n, n, 1)
+
+ if self.use_bias:
+ e += self.bias # (b, n, n, 1)
+
+ # Attention weights
+ attention = torch.softmax(e, dim=2)
+ attention = torch.dropout(attention, self.dropout, train=self.training)
+
+ h = self.sigmoid(torch.matmul(attention, x)) # (b, n, k)
+
+ return h
+
+ def _make_attention_input(self, v):
+ """Preparing the temporal attention mechanism.
+ Creating matrix with all possible combinations of concatenations of node values:
+ (v1, v2..)_t1 || (v1, v2..)_t1
+ (v1, v2..)_t1 || (v1, v2..)_t2
+
+ ...
+ ...
+
+ (v1, v2..)_tn || (v1, v2..)_t1
+ (v1, v2..)_tn || (v1, v2..)_t2
+
+ """
+
+ K = self.num_nodes
+ blocks_repeating = v.repeat_interleave(K, dim=1) # Left-side of the matrix
+ blocks_alternating = v.repeat(1, K, 1) # Right-side of the matrix
+ combined = torch.cat((blocks_repeating, blocks_alternating), dim=2)
+
+ if self.use_gatv2:
+ return combined.view(v.size(0), K, K, 2 * self.n_features)
+ else:
+ return combined.view(v.size(0), K, K, 2 * self.embed_dim)
+
+
+class GRULayer(nn.Module):
+ """Gated Recurrent Unit (GRU) Layer
+ :param in_dim: number of input features
+ :param hid_dim: hidden size of the GRU
+ :param n_layers: number of layers in GRU
+ :param dropout: dropout rate
+ """
+
+ def __init__(self, in_dim, hid_dim, n_layers, dropout):
+ super(GRULayer, self).__init__()
+ self.hid_dim = hid_dim
+ self.n_layers = n_layers
+ self.dropout = 0.0 if n_layers == 1 else dropout
+ self.gru = nn.GRU(in_dim, hid_dim, num_layers=n_layers, batch_first=True, dropout=self.dropout)
+
+ def forward(self, x):
+ out, h = self.gru(x)
+ out, h = out[-1, :, :], h[-1, :, :] # Extracting from last layer
+ return out, h
+
+
+class RNNDecoder(nn.Module):
+ """GRU-based Decoder network that converts latent vector into output
+ :param in_dim: number of input features
+ :param n_layers: number of layers in RNN
+ :param hid_dim: hidden size of the RNN
+ :param dropout: dropout rate
+ """
+
+ def __init__(self, in_dim, hid_dim, n_layers, dropout):
+ super(RNNDecoder, self).__init__()
+ self.in_dim = in_dim
+ self.dropout = 0.0 if n_layers == 1 else dropout
+ self.rnn = nn.GRU(in_dim, hid_dim, n_layers, batch_first=True, dropout=self.dropout)
+
+ def forward(self, x):
+ decoder_out, _ = self.rnn(x)
+ return decoder_out
+
+
+class ReconstructionModel(nn.Module):
+ """Reconstruction Model
+ :param window_size: length of the input sequence
+ :param in_dim: number of input features
+ :param n_layers: number of layers in RNN
+ :param hid_dim: hidden size of the RNN
+ :param in_dim: number of output features
+ :param dropout: dropout rate
+ """
+
+ def __init__(self, window_size, in_dim, hid_dim, out_dim, n_layers, dropout):
+ super(ReconstructionModel, self).__init__()
+ self.window_size = window_size
+ self.decoder = RNNDecoder(in_dim, hid_dim, n_layers, dropout)
+ self.fc = nn.Linear(hid_dim, out_dim)
+
+ def forward(self, x):
+ # x will be last hidden state of the GRU layer
+ h_end = x
+ h_end_rep = h_end.repeat_interleave(self.window_size, dim=1).view(x.size(0), self.window_size, -1)
+
+ decoder_out = self.decoder(h_end_rep)
+ out = self.fc(decoder_out)
+ return out
+
+
+class Forecasting_Model(nn.Module):
+ """Forecasting model (fully-connected network)
+ :param in_dim: number of input features
+ :param hid_dim: hidden size of the FC network
+ :param out_dim: number of output features
+ :param n_layers: number of FC layers
+ :param dropout: dropout rate
+ """
+
+ def __init__(self, in_dim, hid_dim, out_dim, n_layers, dropout):
+ super(Forecasting_Model, self).__init__()
+ layers = [nn.Linear(in_dim, hid_dim)]
+ for _ in range(n_layers - 1):
+ layers.append(nn.Linear(hid_dim, hid_dim))
+
+ layers.append(nn.Linear(hid_dim, out_dim))
+
+ self.layers = nn.ModuleList(layers)
+ self.dropout = nn.Dropout(dropout)
+ self.relu = nn.ReLU()
+
+ def forward(self, x):
+ for i in range(len(self.layers) - 1):
+ x = self.relu(self.layers[i](x))
+ x = self.dropout(x)
+ return self.layers[-1](x)
+
+
+class Model(nn.Module):
+ """ MTAD_GAT model class.
+
+ :param n_features: Number of input features
+ :param window_size: Length of the input sequence
+ :param out_dim: Number of features to output
+ :param kernel_size: size of kernel to use in the 1-D convolution
+ :param feat_gat_embed_dim: embedding dimension (output dimension of linear transformation)
+ in feat-oriented GAT layer
+ :param time_gat_embed_dim: embedding dimension (output dimension of linear transformation)
+ in time-oriented GAT layer
+ :param use_gatv2: whether to use the modified attention mechanism of GATv2 instead of standard GAT
+ :param gru_n_layers: number of layers in the GRU layer
+ :param gru_hid_dim: hidden dimension in the GRU layer
+ :param forecast_n_layers: number of layers in the FC-based Forecasting Model
+ :param forecast_hid_dim: hidden dimension in the FC-based Forecasting Model
+ :param recon_n_layers: number of layers in the GRU-based Reconstruction Model
+ :param recon_hid_dim: hidden dimension in the GRU-based Reconstruction Model
+ :param dropout: dropout rate
+ :param alpha: negative slope used in the leaky rely activation function
+
+ """
+
+ def __init__(self, customs: {}, dataloader: tud.DataLoader):
+ super(Model, self).__init__()
+ n_features = dataloader.dataset.train_inputs.shape[-1]
+ window_size = int(customs["input_size"])
+ out_dim = n_features
+ kernel_size = 7
+ feat_gat_embed_dim = None
+ time_gat_embed_dim = None
+ use_gatv2 = True
+ gru_n_layers = 1
+ gru_hid_dim = 150
+ forecast_n_layers = 1
+ forecast_hid_dim = 150
+ recon_n_layers = 1
+ recon_hid_dim = 150
+ dropout = 0.2
+ alpha = 0.2
+
+ self.name = "MtadGat"
+ self.conv = ConvLayer(n_features, kernel_size)
+ self.feature_gat = FeatureAttentionLayer(
+ n_features, window_size, dropout, alpha, feat_gat_embed_dim, use_gatv2)
+ self.temporal_gat = TemporalAttentionLayer(n_features, window_size, dropout, alpha, time_gat_embed_dim,
+ use_gatv2)
+ self.gru = GRULayer(3 * n_features, gru_hid_dim, gru_n_layers, dropout)
+ self.forecasting_model = Forecasting_Model(
+ gru_hid_dim, forecast_hid_dim, out_dim, forecast_n_layers, dropout)
+ self.recon_model = ReconstructionModel(window_size, gru_hid_dim, recon_hid_dim, out_dim, recon_n_layers,
+ dropout)
+
+ def forward(self, x):
+ # x shape (b, n, k): b - batch size, n - window size, k - number of features
+
+ x = self.conv(x)
+ h_feat = self.feature_gat(x)
+ h_temp = self.temporal_gat(x)
+
+ h_cat = torch.cat([x, h_feat, h_temp], dim=2) # (b, n, 3k)
+
+ _, h_end = self.gru(h_cat)
+ h_end = h_end.view(x.shape[0], -1) # Hidden state for last timestamp
+
+ predictions = self.forecasting_model(h_end)
+ recons = self.recon_model(h_end)
+
+ return predictions, recons
+
+ def loss(self, x, y_true, epoch: int = None, device: str = "cpu"):
+ preds, recons = self.forward(x)
+ if preds.ndim == 3:
+ preds = preds.squeeze(1)
+ if y_true.ndim == 3:
+ y_true = y_true.squeeze(1)
+ forecast_criterion = nn.MSELoss()
+ recon_criterion = nn.MSELoss()
+ forecast_loss = torch.sqrt(forecast_criterion(y_true, preds))
+ recon_loss = torch.sqrt(recon_criterion(x, recons))
+ loss = forecast_loss + recon_loss
+ loss.backward()
+ return loss.item()
+
+ def detection(self, x, y_true, epoch: int = None, device: str = "cpu"):
+ preds, recons = self.forward(x)
+ score = F.pairwise_distance(recons.reshape(recons.size(0), -1), x.reshape(x.size(0), -1)) + \
+ F.pairwise_distance(y_true.reshape(y_true.size(0), -1), preds.reshape(preds.size(0), -1))
+ return score, None
+
+