Source code for flood_forecast.preprocessing.buil_dataset

import os
from flood_forecast.preprocessing.closest_station import get_weather_data, process_asos_data
from flood_forecast.preprocessing.process_usgs import make_usgs_data, process_intermediate_csv
from flood_forecast.gcp_integration.basic_utils import get_storage_client, upload_file
from flood_forecast.preprocessing.eco_gage_set import eco_gage_set
import json
from datetime import datetime
import pytz
import pandas as pd


[docs]def build_weather_csv( json_full_path, asos_base_url, base_url_2, econet_data, visited_gages_path, start=0, end_index=100): directory = os.fsencode(json_full_path) sorted_list = sorted(os.listdir(directory)) for i in range(start, end_index): file = sorted_list[i] filename = os.fsdecode(file) get_weather_data( os.path.join( json_full_path, filename), econet_data, asos_base_url, visited_gages_path) process_asos_data(os.path.join(json_full_path, filename), base_url_2, visited_gages_path)
# todo fix this function so it does more than open files # def make_usgs(meta_data_path: str, start, end_index: int): # meta_directory = os.fsencode(meta_data_path) # sorted_list = sorted(os.listdir(meta_directory)) # for i in range(start, end_index): # with open(sorted_list[i]) as d: # data = json.loads(d) # # make_usgs_data(datetime(2014, 1, 1), datetime(2019,1,1), data["gage_id"])
[docs]def join_data(weather_csv, meta_json_file, flow_csv): pass
[docs]def create_visited(): visited_gages = {"stations_visited": {}, "saved_complete": {}} with open("visited_gages.json", "w+") as f: json.dump(visited_gages, f)
[docs]def get_eco_netset(directory_path: str) -> set: """ Econet data was supplied to us by the NC State climate office. They gave us a directory of CSV files in following format `LastName_First_station_id_Hourly.txt` This code simply constructs a set of stations based on what is in the folder. """ directory = os.fsencode(directory_path) print(sorted(os.listdir(directory))) for file in sorted(os.listdir(directory)): filename = os.fsdecode(file) try: eco_gage_set.add(filename.split("c_")[1].split("_H")[0]) except BaseException: print(filename) return eco_gage_set
[docs]def combine_data(flow_df: pd.DataFrame, precip_df: pd.DataFrame): tz = pytz.timezone("UTC") precip_df['hour_updated'] = precip_df['hour_updated'].map( lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S")) precip_df['hour_updated'] = precip_df['hour_updated'].map(lambda x: tz.localize(x)) joined_df = precip_df.merge( flow_df, left_on='hour_updated', right_on='datetime', how='outer')[ 4:-4] nan_precip = sum(pd.isnull(joined_df['p01m'])) nan_flow = sum(pd.isnull(joined_df['cfs'])) return joined_df, nan_flow, nan_precip
[docs]def create_usgs(meta_data_dir: str, precip_path: str, start: int, end: int): gage_list = sorted(os.listdir(meta_data_dir)) exceptions = {} client = get_storage_client() for i in range(start, end): try: file_name = gage_list[i] gage_id = file_name.split("stations")[0] with open(os.path.join(meta_data_dir, file_name)) as f: print(os.path.join(meta_data_dir, file_name)) data = json.load(f) if len(gage_id) == 7: gage_id = "0" + gage_id raw_df = make_usgs_data(datetime(2014, 1, 1), datetime(2019, 1, 1), gage_id) else: raw_df = make_usgs_data(datetime(2014, 1, 1), datetime(2019, 1, 1), gage_id) df, max_flow, min_flow = process_intermediate_csv(raw_df) data["time_zone_code"] = df["tz_cd"].iloc[0] data["max_flow"] = max_flow data["min_flow"] = min_flow precip_df = pd.read_csv(os.path.join(precip_path, data["stations"][0]["station_id"] + ".csv")) fixed_df, nan_flow, nan_precip = combine_data(df, precip_df) data["nan_flow"] = nan_flow data["nan_precip"] = nan_precip joined_name = str(gage_id) + data["stations"][0]["station_id"] + "_flow.csv" joined_upload = "joined/" + joined_name meta_path = os.path.join(meta_data_dir, file_name) data["files"] = [joined_name] fixed_df.to_csv(joined_name) with open(meta_path, 'w') as f: json.dump(data, f) upload_file("predict_cfs", "meta2/" + file_name, meta_path, client) upload_file("predict_cfs", joined_upload, joined_name, client) except Exception as e: exceptions[str(gage_id)] = str(e) with open("exceptions.json", "w+") as a: json.dump(exceptions, a) print("exception") upload_file("predict_cfs", "meta2/" + "exceptions.json", "exceptions.json", client)