Source code for flood_forecast.transformer_xl.transformer_basic
import torch
import math
from torch.nn.modules import Transformer, TransformerEncoder, TransformerEncoderLayer, LayerNorm
from flood_forecast.transformer_xl.masks import generate_square_subsequent_mask
from torch.autograd import Variable
from flood_forecast.meta_models.merging_model import MergingModel
from flood_forecast.transformer_xl.lower_upper_config import activation_dict
[docs]
class SimpleTransformer(torch.nn.Module):
[docs]
def __init__(
self,
number_time_series: int,
seq_length: int = 48,
output_seq_len: int = None,
d_model: int = 128,
n_heads: int = 8,
dropout=0.1,
forward_dim=2048,
sigmoid=False):
"""A full transformer model.
:param number_time_series: The total number of time series present
(e.g. n_feature_time_series + n_targets)
:type number_time_series: int
:param seq_length: The length of your input sequence, defaults to 48
:type seq_length: int, optional
:param output_seq_len: The length of your output sequence, defaults
to None
:type output_seq_len: int, optional
:param d_model: The dimensions of your model, defaults to 128
:type d_model: int, optional
:param n_heads: The number of heads in each encoder/decoder block,
defaults to 8
:type n_heads: int, optional
:param dropout: The fraction of dropout you wish to apply during
training, defaults to 0.1 (currently not functional)
:type dropout: float, optional
:param forward_dim: Currently not functional, defaults to 2048
:type forward_dim: int, optional
:param sigmoid: Whether to apply a sigmoid activation to the final
layer (useful for binary classification), defaults to False
:type sigmoid: bool, optional
"""
super().__init__()
if output_seq_len is None:
output_seq_len = seq_length
self.out_seq_len = output_seq_len
self.mask = generate_square_subsequent_mask(seq_length)
self.dense_shape = torch.nn.Linear(number_time_series, d_model)
self.pe = SimplePositionalEncoding(d_model)
self.transformer = Transformer(d_model, nhead=n_heads)
self.final_layer = torch.nn.Linear(d_model, 1)
self.sequence_size = seq_length
self.tgt_mask = generate_square_subsequent_mask(output_seq_len)
self.sigmoid = None
if sigmoid:
self.sigmoid = torch.nn.Sigmoid()
[docs]
def forward(self, x: torch.Tensor, t: torch.Tensor, tgt_mask=None, src_mask=None):
x = self.encode_sequence(x[:, :-1, :], src_mask)
return self.decode_seq(x, t, tgt_mask)
[docs]
def basic_feature(self, x: torch.Tensor):
x = self.dense_shape(x)
x = self.pe(x)
x = x.permute(1, 0, 2)
return x
[docs]
def encode_sequence(self, x, src_mask=None):
x = self.basic_feature(x)
x = self.transformer.encoder(x, src_mask)
return x
[docs]
def decode_seq(self, mem, t, tgt_mask=None, view_number=None) -> torch.Tensor:
if view_number is None:
view_number = self.out_seq_len
if tgt_mask is None:
tgt_mask = self.tgt_mask
t = self.basic_feature(t)
x = self.transformer.decoder(t, mem, tgt_mask=tgt_mask)
x = self.final_layer(x)
if self.sigmoid:
x = self.sigmoid(x)
return x.view(-1, view_number)
[docs]
class CustomTransformerDecoder(torch.nn.Module):
[docs]
def __init__(
self,
seq_length: int,
output_seq_length: int,
n_time_series: int,
d_model=128,
output_dim=1,
n_layers_encoder=6,
forward_dim=2048,
dropout=0.1,
use_mask=False,
meta_data=None,
final_act=None,
squashed_embedding=False,
n_heads=8):
"""Uses a number of encoder layers with simple linear decoder layer.
:param seq_length: The number of historical time-steps fed into the model in each forward pass.
:type seq_length: int
:param output_seq_length: The number of forecasted time-steps outputted by the model.
:type output_seq_length: int
:param n_time_series: The total number of time series present (targets + features)
:type n_time_series: int
:param d_model: The embedding dim of the mode, defaults to 128
:type d_model: int, optional
:param output_dim: The output dimension (should correspond to n_targets), defaults to 1
:type output_dim: int, optional
:param n_layers_encoder: The number of encoder layers, defaults to 6
:type n_layers_encoder: int, optional
:param forward_dim: The forward embedding dim, defaults to 2048
:type forward_dim: int, optional
:param dropout: How much dropout to use, defaults to 0.1
:type dropout: float, optional
:param use_mask: Whether to use subsquent sequence mask during training, defaults to False
:type use_mask: bool, optional
:param meta_data: Whether to use static meta-data, defaults to None
:type meta_data: str, optional
:param final_act: Whether to use a final activation function, defaults to None
:type final_act: str, optional
:param squashed_embedding: Whether to create a one 1-D time embedding, defaults to False
:type squashed_embedding: bool, optional
:param n_heads: [description], defaults to 8
:type n_heads: int, optional
"""
super().__init__()
self.dense_shape = torch.nn.Linear(n_time_series, d_model)
self.pe = SimplePositionalEncoding(d_model)
encoder_layer = TransformerEncoderLayer(d_model, 8, forward_dim, dropout)
encoder_norm = LayerNorm(d_model)
self.transformer_enc = TransformerEncoder(encoder_layer, n_layers_encoder, encoder_norm)
self.output_dim_layer = torch.nn.Linear(d_model, output_dim)
self.output_seq_length = output_seq_length
self.out_length_lay = torch.nn.Linear(seq_length, output_seq_length)
self.mask = generate_square_subsequent_mask(seq_length)
self.out_dim = output_dim
self.mask_it = use_mask
self.final_act = None
self.squashed = None
if final_act:
self.final_act = activation_dict[final_act]
if meta_data:
self.meta_merger = MergingModel(meta_data["method"], meta_data["params"])
if squashed_embedding:
self.squashed = torch.nn.Linear(seq_length, 1)
self.unsquashed = torch.nn.Linear(1, seq_length)
[docs]
def make_embedding(self, x: torch.Tensor):
x = self.dense_shape(x)
x = self.pe(x)
# (L, B, N)
x = x.permute(1, 0, 2)
if self.mask_it:
x = self.transformer_enc(x, self.mask)
else:
# Allow no mask
x = self.transformer_enc(x)
if self.squashed:
x = x.permute(1, 2, 0)
x = self.squashed(x)
return x
def __squashed__embedding(self, x: torch.Tensor):
x = x.permute(1, 2, 0) # (B, N, L)
x = self.squashed(x)
x = self.unsquashed(x)
x = x.permute(0, 2, 1) # (B, L, N)
x = x.permute(1, 0, 2) # (L, B, N)
return x
[docs]
def forward(self, x: torch.Tensor, meta_data=None) -> torch.Tensor:
"""Performs forward pass on tensor of (batch_size, sequence_length, n_time_series) Return tensor of dim
(batch_size, output_seq_length)"""
x = self.dense_shape(x)
if type(meta_data) == torch.Tensor:
# batch_size = x.shape[0]
# meta_data = meta_data.repeat(batch_size, 1).unsqueeze(2)
# x = x.permute(0, 2, 1).contiguous()
x = self.meta_merger(x, meta_data)
# x = x.permute(0, 2, 1)
x = self.pe(x)
# (L, B, N)
x = x.permute(1, 0, 2)
if self.mask_it:
x = self.transformer_enc(x, self.mask)
else:
# Allow no mask
x = self.transformer_enc(x)
if self.squashed:
x = self.__squashed__embedding(x)
x = self.output_dim_layer(x)
# (B, N, L)
x = x.permute(1, 2, 0)
x = self.out_length_lay(x)
if self.final_act:
x = self.final_act(x)
if self.out_dim > 1:
return x.permute(0, 2, 1)
return x.view(-1, self.output_seq_length)
[docs]
class SimplePositionalEncoding(torch.nn.Module):
[docs]
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(SimplePositionalEncoding, self).__init__()
self.dropout = torch.nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
[docs]
def forward(self, x: torch.Tensor) -> torch.Tensor:
"""Creates a basic positional encoding."""
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
[docs]
def greedy_decode(
model,
src: torch.Tensor,
max_len: int,
real_target: torch.Tensor,
unsqueeze_dim=1,
output_len=1,
device='cpu',
multi_targets=1,
probabilistic=False,
scaler=None):
"""Mechanism to sequentially decode the model.
:src The Historical time series values
:real_target The real values (they should be masked), however if you want can include known real values.
:returns torch.Tensor
"""
src = src.float()
real_target = real_target.float()
if hasattr(model, "mask"):
src_mask = model.mask
memory = model.encode_sequence(src, src_mask)
# Get last element of src array to forecast from
ys = src[:, -1, :].unsqueeze(unsqueeze_dim)
for i in range(max_len):
mask = generate_square_subsequent_mask(i + 1).to(device)
with torch.no_grad():
out = model.decode_seq(memory,
Variable(ys),
Variable(mask), i + 1)
real_target[:, i, 0] = out[:, i]
src = torch.cat((src, real_target[:, i, :].unsqueeze(1)), 1)
ys = torch.cat((ys, real_target[:, i, :].unsqueeze(1)), 1)
memory = model.encode_sequence(src[:, i + 1:, :], src_mask)
return ys[:, 1:, :]