Source code for flood_forecast.evaluator

from datetime import datetime
from typing import Callable, Dict, List, Tuple, Type

import numpy as np
import pandas as pd
import sklearn.metrics
import torch

from flood_forecast.explain_model_output import (
    deep_explain_model_heatmap,
    deep_explain_model_summary_plot,
)
from flood_forecast.model_dict_function import decoding_functions
from flood_forecast.preprocessing.pytorch_loaders import CSVTestLoader
from flood_forecast.time_model import TimeSeriesModel
from flood_forecast.utils import flatten_list_function


[docs]def stream_baseline( river_flow_df: pd.DataFrame, forecast_column: str, hours_forecast=336 ) -> (pd.DataFrame, float): """ Function to compute the baseline MSE by using the mean value from the train data. """ total_length = len(river_flow_df.index) train_river_data = river_flow_df[: total_length - hours_forecast] test_river_data = river_flow_df[total_length - hours_forecast:] mean_value = train_river_data[[forecast_column]].median()[0] test_river_data["predicted_baseline"] = mean_value mse_baseline = sklearn.metrics.mean_squared_error( test_river_data[forecast_column], test_river_data["predicted_baseline"] ) print(mse_baseline) return test_river_data, round(mse_baseline, ndigits=3)
[docs]def plot_r2(river_flow_preds: pd.DataFrame) -> float: """ We assume at this point river_flow_preds already has a predicted_baseline and a predicted_model column """ pass
[docs]def get_model_r2_score( river_flow_df: pd.DataFrame, model_evaluate_function: Callable, forecast_column: str, hours_forecast=336, ): """ model_evaluate_function should call any necessary preprocessing """ test_river_data, baseline_mse = stream_baseline(river_flow_df, forecast_column)
[docs]def get_r2_value(model_mse, baseline_mse): return 1 - model_mse / baseline_mse
[docs]def get_value(the_path: str) -> None: df = pd.read_csv(the_path) res = stream_baseline(df, "cfs", 336) print(get_r2_value(0.120, res[1]))
[docs]def metric_dict(metric: str) -> Callable: dic = {"MSE": torch.nn.MSELoss(), "L1": torch.nn.L1Loss()} return dic[metric]
[docs]def evaluate_model( model: Type[TimeSeriesModel], model_type: str, target_col: List[str], evaluation_metrics: List, inference_params: Dict, eval_log: Dict, ) -> Tuple[Dict, pd.DataFrame, int, pd.DataFrame]: """ A function to evaluate a model. Requires a model of type TimeSeriesModel """ if model_type == "PyTorch": ( df_train_and_test, end_tensor, forecast_history, forecast_start_idx, test_data, df_predictions, ) = infer_on_torch_model(model, **inference_params) # Unscale test data if scaler was applied print("test_data scale") if test_data.scale: print("Un-transforming data") end_tensor = test_data.inverse_scale(end_tensor.detach().reshape(-1, 1)) end_tensor_list = flatten_list_function(end_tensor.numpy().tolist()) history_length = model.params["dataset_params"]["forecast_history"] df_train_and_test["preds"][history_length:] = end_tensor_list end_tensor = end_tensor.squeeze(1) df_predictions = pd.DataFrame( test_data.inverse_scale(df_predictions).numpy(), index=df_predictions.index, ) print("Current historical dataframe") print(df_train_and_test) for evaluation_metric in evaluation_metrics: for target in target_col: evaluation_metric_function = metric_dict(evaluation_metric) s = evaluation_metric_function( torch.from_numpy( df_train_and_test[target][forecast_history:].to_numpy() ), end_tensor, ) eval_log[target + "_" + evaluation_metric] = s # Explain model behaviour using shap deep_explain_model_summary_plot( model, test_data, inference_params["datetime_start"] ) deep_explain_model_heatmap(model, test_data, inference_params["datetime_start"]) return eval_log, df_train_and_test, forecast_start_idx, df_predictions
[docs]def infer_on_torch_model( model, test_csv_path: str = None, datetime_start: datetime = datetime(2018, 9, 22, 0), hours_to_forecast: int = 336, decoder_params=None, dataset_params: Dict = {}, num_prediction_samples: int = None, ) -> (pd.DataFrame, torch.Tensor, int, int, CSVTestLoader, pd.DataFrame): """ Function to handle both test evaluation and inference on a test dataframe. :returns df: df including training and test data end_tensor: the final tensor after the model has finished predictions history_length: num rows to use in training forecast_start_idx: row index to start forecasting test_data: CSVTestLoader instance df_prediction_samples: has same index as df, and num cols equal to num_prediction_samples or no columns if num_prediction_samples is None """ device = torch.device("cuda" if torch.cuda.is_available() else "cpu") if isinstance(datetime_start, str): datetime_start = datetime.strptime(datetime_start, "%Y-%m-%d") history_length = model.params["dataset_params"]["forecast_history"] forecast_length = model.params["dataset_params"]["forecast_length"] # If the test dataframe is none use default one supplied in params if test_csv_path is None: csv_test_loader = model.test_data else: csv_test_loader = CSVTestLoader( test_csv_path, hours_to_forecast, **dataset_params, interpolate=dataset_params["interpolate_param"] ) model.model.eval() ( history, df_train_and_test, forecast_start_idx, ) = csv_test_loader.get_from_start_date(datetime_start) end_tensor = generate_predictions( model, df_train_and_test, csv_test_loader, history, device, forecast_start_idx, forecast_length, hours_to_forecast, decoder_params, ) df_train_and_test["preds"] = 0 df_train_and_test["preds"][history_length:] = end_tensor.numpy().tolist() print(end_tensor.shape) df_prediction_samples = pd.DataFrame(index=df_train_and_test.index) if num_prediction_samples is not None: model.model.train() # sets mode to train so the dropout layers will be touched assert num_prediction_samples > 1 prediction_samples = generate_prediction_samples( model, df_train_and_test, csv_test_loader, history, device, forecast_start_idx, forecast_length, hours_to_forecast, decoder_params, num_prediction_samples, ) df_prediction_samples = pd.DataFrame( index=df_train_and_test.index, columns=list(range(num_prediction_samples)), dtype="float", ) df_prediction_samples.iloc[history_length:] = prediction_samples return ( df_train_and_test, end_tensor, history_length, forecast_start_idx, csv_test_loader, df_prediction_samples, )
[docs]def generate_predictions( model: Type[TimeSeriesModel], df: pd.DataFrame, test_data: CSVTestLoader, history: torch.Tensor, device: torch.device, forecast_start_idx: int, forecast_length: int, hours_to_forecast: int, decoder_params: Dict, ) -> torch.Tensor: history_dim = history.unsqueeze(0).to(model.device) print(history_dim.shape) print("Add debugging crap below") if decoder_params is None: end_tensor = generate_predictions_non_decoded( model, df, test_data, history_dim, forecast_length, hours_to_forecast, ) else: # model, src, max_seq_len, real_target, output_len=1, unsqueeze_dim=1 # hours_to_forecast 336 # greedy_decode(model, src, sequence_size, targ, src, device=device)[:, :, 0] # greedy_decode(model, src:torch.Tensor, max_len:int, # real_target:torch.Tensor, start_symbol:torch.Tensor, # unsqueeze_dim=1, device='cpu') end_tensor = generate_decoded_predictions( model, test_data, forecast_start_idx, device, history_dim, hours_to_forecast, decoder_params, ) return end_tensor
[docs]def generate_predictions_non_decoded( model: Type[TimeSeriesModel], df: pd.DataFrame, test_data: CSVTestLoader, history_dim: torch.Tensor, forecast_length: int, hours_to_forecast: int, ) -> torch.Tensor: full_history = [history_dim] all_tensor = [] if test_data.use_real_precip: precip_cols = test_data.convert_real_batches("precip", df[forecast_length:]) if test_data.use_real_temp: temp_cols = test_data.convert_real_batches("temp", df[forecast_length:]) for i in range(0, int(np.ceil(hours_to_forecast / forecast_length).item())): output = model.model(full_history[i].to(model.device)) all_tensor.append(output.view(-1)) if i == int(np.ceil(hours_to_forecast / forecast_length).item()) - 1: break rel_cols = model.params["dataset_params"]["relevant_cols"] if test_data.use_real_precip and test_data.use_real_temp: # Order here should match order of original tensor... But what is the best way todo that...? # Hmm right now this will create a bug if for some reason the order [precip, temp, output] intial_numpy = ( torch.stack( [ output.view(-1).float().to(model.device), precip_cols[i].float().to(model.device), temp_cols[i].float().to(model.device), ] ) .to("cpu") .detach() .numpy() ) temp_df = pd.DataFrame(intial_numpy.T, columns=rel_cols) revised_np = temp_df[rel_cols].to_numpy() full_history.append( torch.from_numpy(revised_np).to(model.device).unsqueeze(0) ) remainder = forecast_length - hours_to_forecast % forecast_length if remainder != forecast_length: # Subtract remainder from array end_tensor = torch.cat(all_tensor, axis=0).to("cpu").detach()[:-remainder] else: end_tensor = torch.cat(all_tensor, axis=0).to("cpu").detach() print(end_tensor.shape) return end_tensor
[docs]def generate_decoded_predictions( model: Type[TimeSeriesModel], test_data: CSVTestLoader, forecast_start_idx: int, device: torch.device, history_dim: torch.Tensor, hours_to_forecast: int, decoder_params: Dict, ) -> torch.Tensor: real_target_tensor = ( torch.from_numpy(test_data.df[forecast_start_idx:].to_numpy()) .to(device) .unsqueeze(0) .to(model.device) ) end_tensor = decoding_functions[decoder_params["decoder_function"]]( model.model, history_dim, hours_to_forecast, real_target_tensor, decoder_params["unsqueeze_dim"], device=model.device, ) end_tensor = end_tensor[:, :, 0].view(-1).to("cpu").detach() return end_tensor
[docs]def generate_prediction_samples( model: Type[TimeSeriesModel], df: pd.DataFrame, test_data: CSVTestLoader, history: torch.Tensor, device: torch.device, forecast_start_idx: int, forecast_length: int, hours_to_forecast: int, decoder_params: Dict, num_prediction_samples: int, ) -> np.ndarray: pred_samples = [] for _ in range(num_prediction_samples): end_tensor = generate_predictions( model, df, test_data, history, device, forecast_start_idx, forecast_length, hours_to_forecast, decoder_params, ) pred_samples.append(end_tensor.numpy()) return np.array(pred_samples).T # each column is 1 array of predictions