Source code for flood_forecast.evaluator

from datetime import datetime
from typing import Callable, Dict, List, Tuple, Type, Union

import numpy as np
import pandas as pd
import sklearn.metrics
import torch

from flood_forecast.explain_model_output import (
    deep_explain_model_heatmap,
    deep_explain_model_summary_plot,
)
from flood_forecast.model_dict_function import decoding_functions
from flood_forecast.custom.custom_opt import MASELoss, GaussianLoss
from flood_forecast.preprocessing.pytorch_loaders import CSVTestLoader, TemporalTestLoader
from flood_forecast.time_model import TimeSeriesModel
from flood_forecast.utils import flatten_list_function
from flood_forecast.temporal_decoding import decoding_function


[docs]def stream_baseline( river_flow_df: pd.DataFrame, forecast_column: str, hours_forecast=336 ) -> (pd.DataFrame, float): """ Function to compute the baseline MSE by using the mean value from the train data. """ total_length = len(river_flow_df.index) train_river_data = river_flow_df[: total_length - hours_forecast] test_river_data = river_flow_df[total_length - hours_forecast:] mean_value = train_river_data[[forecast_column]].median()[0] test_river_data["predicted_baseline"] = mean_value mse_baseline = sklearn.metrics.mean_squared_error( test_river_data[forecast_column], test_river_data["predicted_baseline"] ) return test_river_data, round(mse_baseline, ndigits=3)
[docs]def plot_r2(river_flow_preds: pd.DataFrame) -> float: """ We assume at this point river_flow_preds already has a predicted_baseline and a predicted_model column """ pass
[docs]def get_model_r2_score( river_flow_df: pd.DataFrame, model_evaluate_function: Callable, forecast_column: str, hours_forecast=336, ): """ model_evaluate_function should call any necessary preprocessing """ test_river_data, baseline_mse = stream_baseline(river_flow_df, forecast_column)
[docs]def get_r2_value(model_mse, baseline_mse): return 1 - model_mse / baseline_mse
[docs]def get_value(the_path: str) -> None: df = pd.read_csv(the_path) res = stream_baseline(df, "cfs", 336) print(get_r2_value(0.120, res[1]))
[docs]def evaluate_model( model: Type[TimeSeriesModel], model_type: str, target_col: List[str], evaluation_metrics: List, inference_params: Dict, eval_log: Dict, ) -> Tuple[Dict, pd.DataFrame, int, pd.DataFrame]: """ A function to evaluate a model. Called automatically at end of training. Can be imported for continuing to evaluate a model in other places as well. .. highlight:: python .. code-block:: python from flood_forecast.evaluator import evaluate_model forecast_model = PyTorchForecast(config_file) e_log, df_train_test, f_idx, df_preds = evaluate_model(forecast_model, "PyTorch", ["cfs"], ["MSE", "MAPE"], {}) print(e_log) # {"MSE":0.2, "MAPE":0.1} print(df_train_test) # ... ''' """ if model_type == "PyTorch": ( df_train_and_test, end_tensor, forecast_history, forecast_start_idx, test_data, df_predictions, # df_prediction_samples_std_dev, ) = infer_on_torch_model(model, **inference_params) # To-do turn this into a general function g_loss = False probablistic = True if "probabilistic" in inference_params else False if isinstance(end_tensor, tuple) and not probablistic: end_tensor_0 = end_tensor[1] end_tensor = end_tensor[0] g_loss = True print("transform end tens preform") if test_data.scale: print("Un-transforming data") if probablistic: print('probabilistic running on infer_on_torch_model') end_tensor_mean = test_data.inverse_scale(end_tensor[0].detach().reshape(-1, 1)) end_tensor_list = flatten_list_function(end_tensor_mean.numpy().tolist()) end_tensor_mean = end_tensor_mean.squeeze(1) else: if "n_targets" in model.params: end_tensor = test_data.inverse_scale(end_tensor.detach()) else: end_tensor = test_data.inverse_scale(end_tensor.detach().reshape(-1, 1)) end_tensor_list = flatten_list_function(end_tensor.numpy().tolist()) end_tensor = end_tensor.squeeze(1) # Removing extra dim from reshape? history_length = model.params["dataset_params"]["forecast_history"] if "n_targets" in model.params: df_train_and_test.loc[df_train_and_test.index[history_length:], "preds"] = end_tensor[:, 0].numpy().tolist() for i, target in enumerate(target_col): df_train_and_test["pred_" + target] = 0 df_train_and_test.loc[df_train_and_test.index[history_length:], "pred_" + target] = end_tensor[:, i].numpy().tolist() else: df_train_and_test.loc[df_train_and_test.index[history_length:], "preds"] = end_tensor_list df_train_and_test["pred_" + target_col[0]] = 0 df_train_and_test.loc[df_train_and_test.index[history_length:], "pred_" + target_col[0]] = end_tensor_list print("Current historical dataframe ") print(df_train_and_test) for evaluation_metric in model.crit: idx = 0 for target in target_col: labels = torch.from_numpy(df_train_and_test[target][forecast_history:].to_numpy()) evaluation_metric_function = evaluation_metric if "probabilistic" in inference_params: s = evaluation_metric_function( torch.distributions.Normal(end_tensor[0], end_tensor[1][0]), labels, ) elif isinstance(evaluation_metric_function, MASELoss): s = evaluation_metric_function( labels, end_tensor, torch.from_numpy( df_train_and_test[target][:forecast_history].to_numpy() ) ) elif g_loss: g = GaussianLoss(end_tensor.unsqueeze(1), end_tensor_0.unsqueeze(1)) s = g(labels.unsqueeze(1)) else: if "n_targets" in model.params: s = evaluation_metric_function( labels, end_tensor[:, idx], ) else: s = evaluation_metric_function( labels, end_tensor, ) idx += 1 eval_log[target + "_" + evaluation_metric.__class__.__name__] = s # Explain model behaviour using shap if "probabilistic" in inference_params: print("Probabilistic explainability currently not supported.") elif "n_targets" in model.params: print("Multitask forecasting support coming soon") elif g_loss: print("SHAP not yet supported for these models with multiple outputs") else: deep_explain_model_summary_plot( model, test_data, inference_params["datetime_start"] ) deep_explain_model_heatmap(model, test_data, inference_params["datetime_start"]) return eval_log, df_train_and_test, forecast_start_idx, df_predictions
[docs]def infer_on_torch_model( model, test_csv_path: str = None, datetime_start: datetime = datetime(2018, 9, 22, 0), hours_to_forecast: int = 336, decoder_params=None, dataset_params: Dict = {}, num_prediction_samples: int = None, probabilistic: bool = False, criterion_params: Dict = None ) -> (pd.DataFrame, torch.Tensor, int, int, CSVTestLoader, List[pd.DataFrame]): """ Function to handle both test evaluation and inference on a test data-frame. :return: df: df including training and test data end_tensor: the final tensor after the model has finished predictions history_length: num rows to use in training forecast_start_idx: row index to start forecasting test_data: CSVTestLoader instance df_prediction_samples: has same index as df, and num cols equal to num_prediction_samples or no columns if num_prediction_samples is None :rtype: tuple() """ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if isinstance(datetime_start, str): datetime_start = datetime.strptime(datetime_start, "%Y-%m-%d") multi_params = 1 if "n_targets" in model.params: multi_params = model.params["n_targets"] print("This model is currently forecasting for : " + str(multi_params) + " targets") history_length = model.params["dataset_params"]["forecast_history"] forecast_length = model.params["dataset_params"]["forecast_length"] sort_column2 = None # # If the test dataframe is none use default one supplied in params if test_csv_path is None: csv_test_loader = model.test_data elif model.params["dataset_params"]["class"] == "TemporalLoader": input_dict = { "df_path": test_csv_path, "forecast_total": hours_to_forecast, "kwargs": dataset_params } test_idx = None if "label_len" in model.params["model_params"]: test_idx = model.params["model_params"]["label_len"] - model.params["dataset_params"]["forecast_length"] csv_test_loader = TemporalTestLoader(model.params["dataset_params"]["temporal_feats"], input_dict, test_idx) else: csv_test_loader = CSVTestLoader( test_csv_path, hours_to_forecast, **dataset_params, sort_column_clone=sort_column2, interpolate=dataset_params["interpolate_param"] ) model.model.eval() targ = False if model.params["dataset_params"]["class"] == "TemporalLoader": history, targ, df_train_and_test, forecast_start_idx = csv_test_loader.get_from_start_date(datetime_start) else: ( history, df_train_and_test, forecast_start_idx, ) = csv_test_loader.get_from_start_date(datetime_start) end_tensor = generate_predictions( model, df_train_and_test, csv_test_loader, history, device, forecast_start_idx, forecast_length, hours_to_forecast, decoder_params, multi_params=multi_params, targs=targ ) df_train_and_test["preds"] = 0 if decoder_params is not None: if "probabilistic" in decoder_params: df_train_and_test.loc[df_train_and_test.index[history_length:], "preds"] = end_tensor[0].numpy().tolist() df_train_and_test["std_dev"] = 0 print('end_tensor[1][0].numpy().tolist()', end_tensor[1][0].numpy().tolist()) try: df_train_and_test.loc[df_train_and_test.index[history_length:], "std_dev"] = end_tensor[1][0].numpy().tolist() except Exception as e: df_train_and_test.loc[df_train_and_test.index[history_length:], "std_dev"] = [x[0] for x in end_tensor[1][0].numpy().tolist()] print(e) else: df_train_and_test.loc[df_train_and_test.index[history_length:], "preds"] = end_tensor.numpy().tolist() df_prediction_arr = [] df_prediction_samples = pd.DataFrame(index=df_train_and_test.index) # df_prediction_samples_std_dev = pd.DataFrame(index=df_train_and_test.index) if num_prediction_samples is not None: model.model.train() # sets mode to train so the dropout layers will be touched assert num_prediction_samples > 0 prediction_samples = generate_prediction_samples( model, df_train_and_test, csv_test_loader, history, device, forecast_start_idx, forecast_length, hours_to_forecast, decoder_params, num_prediction_samples, multi_params=multi_params, targs=targ ) df_prediction_samples = pd.DataFrame( index=df_train_and_test.index, columns=list(range(num_prediction_samples)), dtype="float", ) num_samples = model.params["inference_params"].get("num_prediction_samples") df_prediction_arr = handle_ci_multi(prediction_samples, csv_test_loader, multi_params, df_prediction_samples, decoder_params, history_length, num_samples) return ( df_train_and_test, end_tensor, history_length, forecast_start_idx, csv_test_loader, df_prediction_arr, # df_prediction_samples_std_dev )
[docs]def handle_ci_multi(prediction_samples: torch.Tensor, csv_test_loader: CSVTestLoader, multi_params: int, df_pred, decoder_param: bool, history_length: int, num_samples: int) -> List[pd.DataFrame]: """[summary] :param prediction_samples: [description] :type prediction_samples: torch.Tensor :param csv_test_loader: [description] :type csv_test_loader: CSVTestLoader :param multi_params: [description] :type multi_params: int :param df_pred: [description] :type df_pred: [type] :param decoder_param: [description] :type decoder_param: bool :param history_length: [description] :type history_length: int :param num_samples: [description] :type num_samples: int :raises ValueError: [description] :raises ValueError: [description] :return: Returns an array with different CI predictions :rtype: List[pd.DataFrame] """ df_prediction_arr = [] if decoder_param is not None: if "probabilistic" in decoder_param: prediction_samples = prediction_samples[0] if multi_params == 1: print(type(prediction_samples)) predict = csv_test_loader.inverse_scale(prediction_samples).numpy() prediction_samples = predict df_pred.iloc[history_length:] = prediction_samples df_prediction_arr.append(df_pred) else: print(prediction_samples.shape) for i in range(0, num_samples): tra = prediction_samples[:, :, 0, i] prediction_samples[:, :, 0, i] = csv_test_loader.inverse_scale(tra.transpose(1, 0)).transpose(1, 0) if i > 0: if np.equal(tra, prediction_samples[:, :, 0, i - 1]).all(): print("WARNING model values are the same. Try varying dropout or other mechanism") for i in range(0, multi_params): if i > 0: if np.equal(prediction_samples[i, :, 0, :], prediction_samples[i - 1, :, 0, :]).all(): raise ValueError("Something is wrong data for the targets is equal") df_pred.iloc[history_length:] = prediction_samples[i, :, 0, :] df_prediction_arr.append(df_pred.copy()) else: df_pred.iloc[history_length:] = prediction_samples df_prediction_arr.append(df_pred) if len(df_prediction_arr) < 1: raise ValueError("Error length of the prediction array must be one or greater") return df_prediction_arr
[docs]def generate_predictions( model: Type[TimeSeriesModel], df: pd.DataFrame, test_data: CSVTestLoader, history: torch.Tensor, device: torch.device, forecast_start_idx: int, forecast_length: int, hours_to_forecast: int, decoder_params: Dict, targs=False, multi_params: int = 1 ) -> torch.Tensor: """A function to generate the actual model prediction :param model: A PyTorchForecast :type model: Type[TimeSeriesModel] :param df: The main dataframe containing data :type df: pd.DataFrame :param test_data: The test data loader :type test_data: CSVTestLoader :param history: The forecast historical data :type history: torch.Tensor :param device: The device usually cpu or cuda :type device: torch.device :param forecast_start_idx: The index you want the forecast to begin :type forecast_start_idx: int :param forecast_length: The length of the forecast the model outputs per time step :type forecast_length: int :param hours_to_forecast: The number of time_steps to forecast in future :type hours_to_forecast: int :param decoder_params: The parameters the decoder function takes. :type decoder_params: Dict :param multi_params: n_targets, defaults to 1 :type multi_params: int, optional :return: The forecasted tensor :rtype: torch.Tensor """ if targs: history_dim = history else: history_dim = history.unsqueeze(0).to(model.device) print("Add debugging crap below") if decoder_params is None: end_tensor = generate_predictions_non_decoded( model, df, test_data, history_dim, forecast_length, hours_to_forecast, ) else: # model, src, max_seq_len, real_target, output_len=1, unsqueeze_dim=1 # hours_to_forecast 336 # greedy_decode(model, src, sequence_size, targ, src, device=device)[:, :, 0] # greedy_decode(model, src:torch.Tensor, max_len:int, # real_target:torch.Tensor, start_symbol:torch.Tensor, # unsqueeze_dim=1, device='cpu') end_tensor = generate_decoded_predictions( model, test_data, forecast_start_idx, device, history_dim, hours_to_forecast, decoder_params, multi_targets=multi_params, targs=targs ) return end_tensor
[docs]def generate_predictions_non_decoded( model: Type[TimeSeriesModel], df: pd.DataFrame, test_data: CSVTestLoader, history_dim: torch.Tensor, forecast_length: int, hours_to_forecast: int, ) -> torch.Tensor: """Generates predictions for the models that do not use a decoder :param model: A PyTorchForecast :type model: Type[TimeSeriesModel] :param df: [description] :type df: pd.DataFrame :param test_data: [description] :type test_data: CSVTestLoader :param history_dim: [description] :type history_dim: torch.Tensor :param forecast_length: [description] :type forecast_length: int :param hours_to_forecast: [description] :type hours_to_forecast: int :return: [description] :rtype: torch.Tensor """ full_history = [history_dim] all_tensor = [] if test_data.use_real_precip: precip_cols = test_data.convert_real_batches("precip", df[forecast_length:]) if test_data.use_real_temp: temp_cols = test_data.convert_real_batches("temp", df[forecast_length:]) for i in range(0, int(np.ceil(hours_to_forecast / forecast_length).item())): output = model.model(full_history[i].to(model.device)) all_tensor.append(output.view(-1)) if i == int(np.ceil(hours_to_forecast / forecast_length).item()) - 1: break rel_cols = model.params["dataset_params"]["relevant_cols"] if test_data.use_real_precip and test_data.use_real_temp: # Order here should match order of original tensor... But what is the best way todo that...? # Hmm right now this will create a bug if for some reason the order [precip, temp, output] intial_numpy = ( torch.stack( [ output.view(-1).float().to(model.device), precip_cols[i].float().to(model.device), temp_cols[i].float().to(model.device), ] ) .to("cpu") .detach() .numpy() ) temp_df = pd.DataFrame(intial_numpy.T, columns=rel_cols) revised_np = temp_df[rel_cols].to_numpy() full_history.append( torch.from_numpy(revised_np).to(model.device).unsqueeze(0) ) remainder = forecast_length - hours_to_forecast % forecast_length if remainder != forecast_length: # Subtract remainder from array end_tensor = torch.cat(all_tensor, axis=0).to("cpu").detach()[:-remainder] else: end_tensor = torch.cat(all_tensor, axis=0).to("cpu").detach() print(end_tensor.shape) # Dimension now is (n_time_steps_to_forecast_steps)!! i.e [16] return end_tensor
[docs]def generate_decoded_predictions( model: Type[TimeSeriesModel], test_data: CSVTestLoader, forecast_start_idx: int, device: torch.device, history_dim: torch.Tensor, hours_to_forecast: int, decoder_params: Dict, multi_targets: int = 1, targs: Union[bool, torch.Tensor] = False ) -> torch.Tensor: probabilistic = False scaler = None if test_data.no_scale: scaler = test_data if decoder_params is not None: if "probabilistic" in decoder_params: probabilistic = True real_target_tensor = ( torch.from_numpy(test_data.df[forecast_start_idx:].to_numpy()) .to(device) .unsqueeze(0) .to(model.device) ) if targs: src = history_dim src0 = src[0] trg = targs decoder_seq_len = model.params["model_params"]["label_len"] end_tensor = decoding_function(model.model, src0, trg[1], model.params["dataset_params"]["forecast_length"], src[1], trg[0], 1, decoder_seq_len, hours_to_forecast, device) else: end_tensor = decoding_functions[decoder_params["decoder_function"]]( model.model, history_dim, hours_to_forecast, real_target_tensor, decoder_params["unsqueeze_dim"], output_len=model.params["dataset_params"]["forecast_length"], multi_targets=multi_targets, device=model.device, probabilistic=probabilistic, scaler=scaler ) if probabilistic: end_tensor_mean = end_tensor[0][:, :, 0].view(-1).to("cpu").detach() return end_tensor_mean, end_tensor[1] elif isinstance(end_tensor, tuple): e = end_tensor[0][:, :, 0].view(-1).to("cpu").detach(), end_tensor[1][:, :, 0].view(-1).to("cpu").detach() return e if multi_targets == 1: end_tensor = end_tensor[:, :, 0].view(-1) return end_tensor.to("cpu").detach()
[docs]def generate_prediction_samples( model: Type[TimeSeriesModel], df: pd.DataFrame, test_data: CSVTestLoader, history: torch.Tensor, device: torch.device, forecast_start_idx: int, forecast_length: int, hours_to_forecast: int, decoder_params: Dict, num_prediction_samples: int, multi_params=1, targs=False ) -> np.ndarray: """ ss """ pred_samples = [] std_dev_samples = [] probabilistic = False if decoder_params is not None: if "probabilistic" in decoder_params: probabilistic = True for _ in range(num_prediction_samples): end_tensor = generate_predictions( model, df, test_data, history, device, forecast_start_idx, forecast_length, hours_to_forecast, decoder_params, multi_params=multi_params, targs=targs ) if probabilistic: pred_samples.append(end_tensor[0].numpy()) std_dev_samples.append(end_tensor[1].numpy()) else: pred_samples.append(end_tensor.numpy()) if probabilistic: return np.array(pred_samples).T, np.array(std_dev_samples).T else: return np.array(pred_samples).T # each column is 1 array of predictions