Source code for flood_forecast.preprocessing.pytorch_loaders

from torch.utils.data import Dataset
import numpy as np
import pandas as pd
import torch
from typing import List, Union
from flood_forecast.preprocessing.interpolate_preprocess import (
    interpolate_missing_values,
    fix_timezones,
)


[docs]class CSVDataLoader(Dataset): def __init__( self, file_path: str, forecast_history: int, forecast_length: int, target_col: List, relevant_cols: List, scaling=None, start_stamp: int = 0, end_stamp: int = None, interpolate_param=True, ): """ A data loader that takes a CSV file and properly batches for use in training/eval a PyTorch model :param file_path: The path to the CSV file you wish to use. :param forecast_history: This is the length of the historical time series data you wish to utilize for forecasting :param forecast_length: The number of time steps to forecast ahead (for transformer this must equal history_length) :param relevant_cols: Supply column names you wish to predict in the forecast (others will not be used) :param target_col: The target column or columns you to predict. If you only have one still use a list ['cfs'] :param scaling: (highly reccomended) If provided should be a subclass of sklearn.base.BaseEstimator and sklearn.base.TransformerMixin) i.e StandardScaler, MaxAbsScaler, MinMaxScaler, etc) Note without a scaler the loss is likely to explode and cause infinite loss which will corrupt weights :param start_stamp int: Optional if you want to only use part of a CSV for training, validation or testing supply these :param end_stamp int: Optional if you want to only use part of a CSV for training, validation, or testing supply these """ super().__init__() self.forecast_history = forecast_history self.forecast_length = forecast_length # TODO allow other filling methods print("interpolate should be below") if interpolate_param: print("now filling missing values") df = fix_timezones(file_path) df = interpolate_missing_values(df) else: df = pd.read_csv(file_path) print("Now loading and scaling " + file_path) self.df = df.sort_values(by="datetime")[relevant_cols] self.scale = None if start_stamp != 0 and end_stamp is not None: self.df = self.df[start_stamp:end_stamp] elif start_stamp != 0: self.df = self.df[start_stamp:] elif end_stamp is not None: self.df = self.df[:end_stamp] if scaling is not None: self.scale = scaling temp_df = self.scale.fit_transform(self.df) # We define a second scaler to scale the end output # back to normal as models might not necessarily predict # other present time series values. targ_scale_class = self.scale.__class__ self.targ_scaler = targ_scale_class() self.targ_scaler.fit_transform( self.df[target_col[0]].values.reshape(-1, 1) ) self.df = pd.DataFrame( temp_df, index=self.df.index, columns=self.df.columns ) if (len(self.df) - self.df.count()).max() != 0: raise ( "Error nan values detected in data. Please run interpolate ffill or bfill on data" ) self.targ_col = target_col def __getitem__(self, idx): rows = self.df.iloc[idx: self.forecast_history + idx] targs_idx_start = self.forecast_history + idx targ_rows = self.df.iloc[ targs_idx_start: self.forecast_length + targs_idx_start ] src_data = rows.to_numpy() src_data = torch.from_numpy(src_data).float() trg_dat = targ_rows.to_numpy() trg_dat = torch.from_numpy(trg_dat).float() return src_data, trg_dat def __len__(self) -> int: return ( len(self.df.index) - self.forecast_history - self.forecast_length - 1 )
[docs] def inverse_scale( self, result_data: Union[torch.Tensor, pd.Series, np.ndarray] ) -> torch.Tensor: if isinstance(result_data, torch.Tensor): result_data_np = result_data.numpy() if isinstance(result_data, pd.Series) or isinstance( result_data, pd.DataFrame ): result_data_np = result_data.values if isinstance(result_data, np.ndarray): result_data_np = result_data return torch.from_numpy( self.targ_scaler.inverse_transform(result_data_np) )
[docs]class CSVTestLoader(CSVDataLoader): def __init__( self, df_path: str, forecast_total: int, use_real_precip=True, use_real_temp=True, target_supplied=True, interpolate=False, **kwargs ): """ :param str df_path: A data loader for the test data. """ super().__init__(**kwargs) self.original_df = pd.read_csv(df_path) if interpolate: self.original_df = fix_timezones(df_path) self.original_df = interpolate_missing_values(self.original_df) print("CSV Path below") print(df_path) self.forecast_total = forecast_total self.use_real_temp = use_real_temp self.use_real_precip = use_real_precip self.target_supplied = target_supplied # Convert back to datetime and save index self.original_df["datetime"] = self.original_df["datetime"].astype( "datetime64[ns]" ) self.original_df["original_index"] = self.original_df.index
[docs] def get_from_start_date(self, forecast_start: int): dt_row = self.original_df[ self.original_df["datetime"] == forecast_start ] revised_index = dt_row.index[0] return self.__getitem__(revised_index - self.forecast_history)
def __getitem__(self, idx): if self.target_supplied: historical_rows = self.df.iloc[idx: self.forecast_history + idx] target_idx_start = self.forecast_history + idx # Why aren't we using these # targ_rows = self.df.iloc[ # target_idx_start : self.forecast_total + target_idx_start # ] all_rows_orig = self.original_df.iloc[ idx: self.forecast_total + target_idx_start ] historical_rows = torch.from_numpy(historical_rows.to_numpy()) return historical_rows.float(), all_rows_orig, target_idx_start
[docs] def convert_real_batches(self, the_col: str, rows_to_convert): """ A helper function to return properly divided precip and temp values to be stacked with forecasted cfs. """ the_column = torch.from_numpy(rows_to_convert[the_col].to_numpy()) chunks = [ the_column[ self.forecast_length * i: self.forecast_length * (i + 1) ] for i in range(len(the_column) // self.forecast_length + 1) ] return chunks
[docs] def convert_history_batches( self, the_col: Union[str, List[str]], rows_to_convert: pd.DataFrame ): """A helper function to return dataframe in batches of size (history_len, num_features) Args: the_col (str): column names rows_to_convert (pd.Dataframe): rows in a dataframe to be converted into batches """ the_column = torch.from_numpy(rows_to_convert[the_col].to_numpy()) chunks = [ the_column[ self.forecast_history * i: self.forecast_history * (i + 1) ] for i in range(len(the_column) // self.forecast_history + 1) ] return chunks
def __len__(self) -> int: return ( len(self.df.index) - self.forecast_history - self.forecast_total - 1 )