diff options
| author | ZHENG Yanqin <[email protected]> | 2023-05-25 07:37:53 +0000 |
|---|---|---|
| committer | ZHENG Yanqin <[email protected]> | 2023-05-25 07:37:53 +0000 |
| commit | e9896bd62bb29da00ec00a121374167ad91bfe47 (patch) | |
| tree | d94845574c8ef7473d0204d28b4efd4038035463 /method/MtadGat.py | |
| parent | fad9aa875c84b38cbb5a6010e104922b1eea7291 (diff) | |
| parent | 4c5734c624705449c6b21c4b2bc5554e7259fdba (diff) | |
readme
See merge request zyq/time_series_anomaly_detection!1
Diffstat (limited to 'method/MtadGat.py')
| -rw-r--r-- | method/MtadGat.py | 414 |
1 files changed, 414 insertions, 0 deletions
diff --git a/method/MtadGat.py b/method/MtadGat.py new file mode 100644 index 0000000..1ed679f --- /dev/null +++ b/method/MtadGat.py @@ -0,0 +1,414 @@ +""" +github找的第三方实现的Mtad-Gat源码 +由于源码使用了torch.empty,经常导致loss为nan,因此替换为了torch.zeros +""" + +import torch +import torch.nn as nn +import torch.nn.functional as F +import torch.utils.data as tud + + +class ConvLayer(nn.Module): + """1-D Convolution layer to extract high-level features of each time-series input + :param n_features: Number of input features/nodes + :param window_size: length of the input sequence + :param kernel_size: size of kernel to use in the convolution operation + """ + + def __init__(self, n_features, kernel_size=7): + super(ConvLayer, self).__init__() + self.padding = nn.ConstantPad1d((kernel_size - 1) // 2, 0.0) + self.conv = nn.Conv1d(in_channels=n_features, out_channels=n_features, kernel_size=kernel_size) + self.relu = nn.ReLU() + + def forward(self, x): + x = x.permute(0, 2, 1) + x = self.padding(x) + x = self.relu(self.conv(x)) + return x.permute(0, 2, 1) # Permute back + + +class FeatureAttentionLayer(nn.Module): + """Single Graph Feature/Spatial Attention Layer + :param n_features: Number of input features/nodes + :param window_size: length of the input sequence + :param dropout: percentage of nodes to dropout + :param alpha: negative slope used in the leaky rely activation function + :param embed_dim: embedding dimension (output dimension of linear transformation) + :param use_gatv2: whether to use the modified attention mechanism of GATv2 instead of standard GAT + :param use_bias: whether to include a bias term in the attention layer + """ + + def __init__(self, n_features, window_size, dropout, alpha, embed_dim=None, use_gatv2=True, use_bias=True): + super(FeatureAttentionLayer, self).__init__() + self.n_features = n_features + self.window_size = window_size + self.dropout = dropout + self.embed_dim = embed_dim if embed_dim is not None else window_size + self.use_gatv2 = use_gatv2 + self.num_nodes = n_features + self.use_bias = use_bias + + # Because linear transformation is done after concatenation in GATv2 + if self.use_gatv2: + self.embed_dim *= 2 + lin_input_dim = 2 * window_size + a_input_dim = self.embed_dim + else: + lin_input_dim = window_size + a_input_dim = 2 * self.embed_dim + + self.lin = nn.Linear(lin_input_dim, self.embed_dim) + # self.a = nn.Parameter(torch.empty((a_input_dim, 1))) + self.a = nn.Parameter(torch.zeros((a_input_dim, 1))) + nn.init.xavier_uniform_(self.a.data, gain=1.414) + + if self.use_bias: + # self.bias = nn.Parameter(torch.empty(n_features, n_features)) + self.bias = nn.Parameter(torch.zeros(n_features, n_features)) + + self.leakyrelu = nn.LeakyReLU(alpha) + self.sigmoid = nn.Sigmoid() + + def forward(self, x): + # x shape (b, n, k): b - batch size, n - window size, k - number of features + # For feature attention we represent a node as the values of a particular feature across all timestamps + + x = x.permute(0, 2, 1) + + # 'Dynamic' GAT attention + # Proposed by Brody et. al., 2021 (https://arxiv.org/pdf/2105.14491.pdf) + # Linear transformation applied after concatenation and attention layer applied after leakyrelu + if self.use_gatv2: + a_input = self._make_attention_input(x) # (b, k, k, 2*window_size) + a_input = self.leakyrelu(self.lin(a_input)) # (b, k, k, embed_dim) + e = torch.matmul(a_input, self.a).squeeze(3) # (b, k, k, 1) + + # Original GAT attention + else: + Wx = self.lin(x) # (b, k, k, embed_dim) + a_input = self._make_attention_input(Wx) # (b, k, k, 2*embed_dim) + e = self.leakyrelu(torch.matmul(a_input, self.a)).squeeze(3) # (b, k, k, 1) + + if self.use_bias: + e += self.bias + + # Attention weights + attention = torch.softmax(e, dim=2) + attention = torch.dropout(attention, self.dropout, train=self.training) + + # Computing new node features using the attention + h = self.sigmoid(torch.matmul(attention, x)) + + return h.permute(0, 2, 1) + + def _make_attention_input(self, v): + """Preparing the feature attention mechanism. + Creating matrix with all possible combinations of concatenations of node. + Each node consists of all values of that node within the window + v1 || v1, + ... + v1 || vK, + v2 || v1, + ... + v2 || vK, + ... + ... + vK || v1, + ... + vK || vK, + """ + + K = self.num_nodes + blocks_repeating = v.repeat_interleave(K, dim=1) # Left-side of the matrix + blocks_alternating = v.repeat(1, K, 1) # Right-side of the matrix + combined = torch.cat((blocks_repeating, blocks_alternating), dim=2) # (b, K*K, 2*window_size) + + if self.use_gatv2: + return combined.view(v.size(0), K, K, 2 * self.window_size) + else: + return combined.view(v.size(0), K, K, 2 * self.embed_dim) + + +class TemporalAttentionLayer(nn.Module): + """Single Graph Temporal Attention Layer + :param n_features: number of input features/nodes + :param window_size: length of the input sequence + :param dropout: percentage of nodes to dropout + :param alpha: negative slope used in the leaky rely activation function + :param embed_dim: embedding dimension (output dimension of linear transformation) + :param use_gatv2: whether to use the modified attention mechanism of GATv2 instead of standard GAT + :param use_bias: whether to include a bias term in the attention layer + + """ + + def __init__(self, n_features, window_size, dropout, alpha, embed_dim=None, use_gatv2=True, use_bias=True): + super(TemporalAttentionLayer, self).__init__() + self.n_features = n_features + self.window_size = window_size + self.dropout = dropout + self.use_gatv2 = use_gatv2 + self.embed_dim = embed_dim if embed_dim is not None else n_features + self.num_nodes = window_size + self.use_bias = use_bias + + # Because linear transformation is performed after concatenation in GATv2 + if self.use_gatv2: + self.embed_dim *= 2 + lin_input_dim = 2 * n_features + a_input_dim = self.embed_dim + else: + lin_input_dim = n_features + a_input_dim = 2 * self.embed_dim + + self.lin = nn.Linear(lin_input_dim, self.embed_dim) + # self.a = nn.Parameter(torch.empty((a_input_dim, 1))) + self.a = nn.Parameter(torch.zeros((a_input_dim, 1))) + nn.init.xavier_uniform_(self.a.data, gain=1.414) + + if self.use_bias: + # self.bias = nn.Parameter(torch.empty(window_size, window_size)) + self.bias = nn.Parameter(torch.zeros(window_size, window_size)) + self.leakyrelu = nn.LeakyReLU(alpha) + self.sigmoid = nn.Sigmoid() + + def forward(self, x): + # x shape (b, n, k): b - batch size, n - window size, k - number of features + # For temporal attention a node is represented as all feature values at a specific timestamp + + # 'Dynamic' GAT attention + # Proposed by Brody et. al., 2021 (https://arxiv.org/pdf/2105.14491.pdf) + # Linear transformation applied after concatenation and attention layer applied after leakyrelu + if self.use_gatv2: + a_input = self._make_attention_input(x) # (b, n, n, 2*n_features) + a_input = self.leakyrelu(self.lin(a_input)) # (b, n, n, embed_dim) + e = torch.matmul(a_input, self.a).squeeze(3) # (b, n, n, 1) + + # Original GAT attention + else: + Wx = self.lin(x) # (b, n, n, embed_dim) + a_input = self._make_attention_input(Wx) # (b, n, n, 2*embed_dim) + e = self.leakyrelu(torch.matmul(a_input, self.a)).squeeze(3) # (b, n, n, 1) + + if self.use_bias: + e += self.bias # (b, n, n, 1) + + # Attention weights + attention = torch.softmax(e, dim=2) + attention = torch.dropout(attention, self.dropout, train=self.training) + + h = self.sigmoid(torch.matmul(attention, x)) # (b, n, k) + + return h + + def _make_attention_input(self, v): + """Preparing the temporal attention mechanism. + Creating matrix with all possible combinations of concatenations of node values: + (v1, v2..)_t1 || (v1, v2..)_t1 + (v1, v2..)_t1 || (v1, v2..)_t2 + + ... + ... + + (v1, v2..)_tn || (v1, v2..)_t1 + (v1, v2..)_tn || (v1, v2..)_t2 + + """ + + K = self.num_nodes + blocks_repeating = v.repeat_interleave(K, dim=1) # Left-side of the matrix + blocks_alternating = v.repeat(1, K, 1) # Right-side of the matrix + combined = torch.cat((blocks_repeating, blocks_alternating), dim=2) + + if self.use_gatv2: + return combined.view(v.size(0), K, K, 2 * self.n_features) + else: + return combined.view(v.size(0), K, K, 2 * self.embed_dim) + + +class GRULayer(nn.Module): + """Gated Recurrent Unit (GRU) Layer + :param in_dim: number of input features + :param hid_dim: hidden size of the GRU + :param n_layers: number of layers in GRU + :param dropout: dropout rate + """ + + def __init__(self, in_dim, hid_dim, n_layers, dropout): + super(GRULayer, self).__init__() + self.hid_dim = hid_dim + self.n_layers = n_layers + self.dropout = 0.0 if n_layers == 1 else dropout + self.gru = nn.GRU(in_dim, hid_dim, num_layers=n_layers, batch_first=True, dropout=self.dropout) + + def forward(self, x): + out, h = self.gru(x) + out, h = out[-1, :, :], h[-1, :, :] # Extracting from last layer + return out, h + + +class RNNDecoder(nn.Module): + """GRU-based Decoder network that converts latent vector into output + :param in_dim: number of input features + :param n_layers: number of layers in RNN + :param hid_dim: hidden size of the RNN + :param dropout: dropout rate + """ + + def __init__(self, in_dim, hid_dim, n_layers, dropout): + super(RNNDecoder, self).__init__() + self.in_dim = in_dim + self.dropout = 0.0 if n_layers == 1 else dropout + self.rnn = nn.GRU(in_dim, hid_dim, n_layers, batch_first=True, dropout=self.dropout) + + def forward(self, x): + decoder_out, _ = self.rnn(x) + return decoder_out + + +class ReconstructionModel(nn.Module): + """Reconstruction Model + :param window_size: length of the input sequence + :param in_dim: number of input features + :param n_layers: number of layers in RNN + :param hid_dim: hidden size of the RNN + :param in_dim: number of output features + :param dropout: dropout rate + """ + + def __init__(self, window_size, in_dim, hid_dim, out_dim, n_layers, dropout): + super(ReconstructionModel, self).__init__() + self.window_size = window_size + self.decoder = RNNDecoder(in_dim, hid_dim, n_layers, dropout) + self.fc = nn.Linear(hid_dim, out_dim) + + def forward(self, x): + # x will be last hidden state of the GRU layer + h_end = x + h_end_rep = h_end.repeat_interleave(self.window_size, dim=1).view(x.size(0), self.window_size, -1) + + decoder_out = self.decoder(h_end_rep) + out = self.fc(decoder_out) + return out + + +class Forecasting_Model(nn.Module): + """Forecasting model (fully-connected network) + :param in_dim: number of input features + :param hid_dim: hidden size of the FC network + :param out_dim: number of output features + :param n_layers: number of FC layers + :param dropout: dropout rate + """ + + def __init__(self, in_dim, hid_dim, out_dim, n_layers, dropout): + super(Forecasting_Model, self).__init__() + layers = [nn.Linear(in_dim, hid_dim)] + for _ in range(n_layers - 1): + layers.append(nn.Linear(hid_dim, hid_dim)) + + layers.append(nn.Linear(hid_dim, out_dim)) + + self.layers = nn.ModuleList(layers) + self.dropout = nn.Dropout(dropout) + self.relu = nn.ReLU() + + def forward(self, x): + for i in range(len(self.layers) - 1): + x = self.relu(self.layers[i](x)) + x = self.dropout(x) + return self.layers[-1](x) + + +class Model(nn.Module): + """ MTAD_GAT model class. + + :param n_features: Number of input features + :param window_size: Length of the input sequence + :param out_dim: Number of features to output + :param kernel_size: size of kernel to use in the 1-D convolution + :param feat_gat_embed_dim: embedding dimension (output dimension of linear transformation) + in feat-oriented GAT layer + :param time_gat_embed_dim: embedding dimension (output dimension of linear transformation) + in time-oriented GAT layer + :param use_gatv2: whether to use the modified attention mechanism of GATv2 instead of standard GAT + :param gru_n_layers: number of layers in the GRU layer + :param gru_hid_dim: hidden dimension in the GRU layer + :param forecast_n_layers: number of layers in the FC-based Forecasting Model + :param forecast_hid_dim: hidden dimension in the FC-based Forecasting Model + :param recon_n_layers: number of layers in the GRU-based Reconstruction Model + :param recon_hid_dim: hidden dimension in the GRU-based Reconstruction Model + :param dropout: dropout rate + :param alpha: negative slope used in the leaky rely activation function + + """ + + def __init__(self, customs: {}, dataloader: tud.DataLoader): + super(Model, self).__init__() + n_features = dataloader.dataset.train_inputs.shape[-1] + window_size = int(customs["input_size"]) + out_dim = n_features + kernel_size = 7 + feat_gat_embed_dim = None + time_gat_embed_dim = None + use_gatv2 = True + gru_n_layers = 1 + gru_hid_dim = 150 + forecast_n_layers = 1 + forecast_hid_dim = 150 + recon_n_layers = 1 + recon_hid_dim = 150 + dropout = 0.2 + alpha = 0.2 + + self.name = "MtadGat" + self.conv = ConvLayer(n_features, kernel_size) + self.feature_gat = FeatureAttentionLayer( + n_features, window_size, dropout, alpha, feat_gat_embed_dim, use_gatv2) + self.temporal_gat = TemporalAttentionLayer(n_features, window_size, dropout, alpha, time_gat_embed_dim, + use_gatv2) + self.gru = GRULayer(3 * n_features, gru_hid_dim, gru_n_layers, dropout) + self.forecasting_model = Forecasting_Model( + gru_hid_dim, forecast_hid_dim, out_dim, forecast_n_layers, dropout) + self.recon_model = ReconstructionModel(window_size, gru_hid_dim, recon_hid_dim, out_dim, recon_n_layers, + dropout) + + def forward(self, x): + # x shape (b, n, k): b - batch size, n - window size, k - number of features + + x = self.conv(x) + h_feat = self.feature_gat(x) + h_temp = self.temporal_gat(x) + + h_cat = torch.cat([x, h_feat, h_temp], dim=2) # (b, n, 3k) + + _, h_end = self.gru(h_cat) + h_end = h_end.view(x.shape[0], -1) # Hidden state for last timestamp + + predictions = self.forecasting_model(h_end) + recons = self.recon_model(h_end) + + return predictions, recons + + def loss(self, x, y_true, epoch: int = None, device: str = "cpu"): + preds, recons = self.forward(x) + if preds.ndim == 3: + preds = preds.squeeze(1) + if y_true.ndim == 3: + y_true = y_true.squeeze(1) + forecast_criterion = nn.MSELoss() + recon_criterion = nn.MSELoss() + forecast_loss = torch.sqrt(forecast_criterion(y_true, preds)) + recon_loss = torch.sqrt(recon_criterion(x, recons)) + loss = forecast_loss + recon_loss + loss.backward() + return loss.item() + + def detection(self, x, y_true, epoch: int = None, device: str = "cpu"): + preds, recons = self.forward(x) + score = F.pairwise_distance(recons.reshape(recons.size(0), -1), x.reshape(x.size(0), -1)) + \ + F.pairwise_distance(y_true.reshape(y_true.size(0), -1), preds.reshape(preds.size(0), -1)) + return score, None + + |
