Source code for h3.dataloading.general_df_utils

from __future__ import annotations

import pandas as pd
import geopy
import shapely
from h3 import utils


[docs]def standardise_df( df: pd.DataFrame, date_cols: list[str] = None, new_point_col_name: str = 'geometry' ) -> pd.DataFrame: """Apply various formatting functions to make any df behave as you'd expect. Parameters ---------- df : pd.DataFrame any pandas df date_cols : list[str], optional list of column names containing date values. Default is None. Returns ------- pd.DataFrame reformatted pd.DataFrame object """ # makeall headers lower case df.columns = df.columns.str.lower() # remove any whitespace from headers df.columns = df.columns.str.replace(' ', '_') # if any columns with dates provided if date_cols: df[date_cols] = df[date_cols].apply(pd.to_datetime) if 'geometry' in df.columns: # if geometry column not containing shapely Point objects if not type(df.geometry.iloc[0]) == shapely.geometry.point.Point: df.geometry = df.geometry.apply( lambda x: utils.geometry_ops.convert_point_string_to_point(x)) # generate lat-lon columns from any Point objects df = generate_lat_lon_from_points_cols(df, ['geometry']) if set(['lat', 'lon']).issubset(df.columns): # make geometry column of shapely Point objects df = points_from_df_lat_lon_cols(df, point_col_name=new_point_col_name) # common variation elif set(['lat', 'long']).issubset(df.columns): df.rename(columns={'long': 'lon'}, inplace=True) df = points_from_df_lat_lon_cols(df, point_col_name=new_point_col_name) return df
[docs]def points_from_df_lat_lon_cols( df: pd.DataFrame, point_col_name: str = 'geometry' ) -> pd.DataFrame: """TODO: docstring""" df[point_col_name] = df.apply(lambda row: shapely.geometry.point.Point( row['lon'], row['lat']), axis=1) return df
[docs]def exclude_df_rows_symmetrically_around_value( df: pd.DataFrame, col_names: list[str], poi: list[float] | list[pd.Timestamp], buffer_val: list[float] | list[tuple[float, str]] ) -> pd.DataFrame: """Return a pd.DataFrame which excludes rows outside a range of +/- 1 buffer. Buffer can be floats objects, or can specify a period of time. Handy e.g. for excluding stations for which there is no weather data within the period of interest. Parameters ---------- df : pd.DataFrame pd.DataFrame containing values to potentially exclude col_names : list[str] list of strings specifying the names of the columns of interest poi : [ list[float], list[pd.Timestamp] ] points of interest (value about which any exclusion will be centred). One value for each relevant column. buffer : [ list[float], list[tuple[float,str]] ] distance from poi to be excluded. In the case that poi is a Timestamp object, a string specifying the unit of time is necessary e.g. 'h' for hours (either as a tuple or list). See https://pandas.pydata.org/docs/reference/api/pandas.Timedelta.html for more info. One value for each relevant column. Returns ------- pd.DataFrame excluding values outside of provided ranges """ # check lists same lengths utils.simple_functions.checklistLengthsEqual([col_names, poi, buffer_val]) for i, col in enumerate(col_names): if type(poi[i]) == pd.Timestamp: # specify the buffer as a Timestamp object (separating time and unit) buffer = pd.Timedelta(buffer_val[i][0], buffer_val[i][1]) else: buffer = buffer_val[i] # restrict to only observations within the range df = df[df[col].between(poi[i]-buffer, poi[i]+buffer)] return df
[docs]def exclude_df_rows_by_range( df: pd.DataFrame, col_names: list[str], value_bounds: list[tuple[float]] | list[float], buffer: list[float] | list[tuple[float, str]] = 0 ) -> pd.DataFrame: """Return pd.DataFrame composed of only rows containing only values in columns listed in col_names within the range of value_bounds +/- optional buffer amount. Handy for restricting large dataframes based on date ranges (must specify bounds as pd.Timestamp objects), or lat/lon ranges. Parameters ---------- df : pd.DataFrame df to limit col_names : list[str] e.g. ['col1', ..., 'colN'] list of column names to be restricted by their relevant... value_bounds : list[ [tuple[float], list[float]] ] e.g. [ (start_val1,end_val1), ..., (start_valN,end_valN) ] list of tuples (or lists) specifying minimum and maximum values to allow buffer : [ list[float], list[tuple[float,str]] ] = 0 add buffer on either side of value_bounds. Defaults to no buffer. Useful for specifying weather station observations must exist some time before and after the event of interest Returns ------- restricted pd.DataFrame object (sub-set of original df) """ # check lists same lengths utils.simple_functions.checklistLengthsEqual([col_names, value_bounds]) for i, col in enumerate(col_names): if type(value_bounds[i][0]) == pd.Timestamp: # specify the buffer as a Timestamp object (separating time and unit) buffer = pd.Timedelta(buffer[0], buffer[1]) df = df[df[col].between( min(value_bounds[i])-buffer, max(value_bounds[i])+buffer)] return df
[docs]def concat_df_cols( df: pd.DataFrame, concatted_col_name: str, cols_to_concat: list[str], delimiter: str = "" ) -> pd.DataFrame: """Concatenate columns in a pd.DataFrame into a new column of strings linked by 'delimiter'.capitalize() Parameters ---------- df : pd.DataFrame df containing columns to concatenate concatted_col_name : str name of new concatenated column cols_to_concat : list[str] names of columns to concatenate (in desired order) delimiter : str, optional character to insert in between column values. Defaults to empty string Returns ------- pd.DataFrame with additional concatted column """ df[concatted_col_name] = df[cols_to_concat].astype(str).apply( delimiter.join, axis=1) return df
[docs]def generate_lat_lon_from_points_cols( df: pd.DataFrame, points_cols: list[str] ) -> None: """Generate a column(s) of lat and lon from column(s) of shapely.Point objects. Column(s) added to current df being processed Parameters ---------- df: pd.DataFrame pd.DataFrame containing column(s) of shapely.Point objects points_cols: list[str] names of columns to convert to lat/lon. Chosen not to find the columns as default (by dtype) since might only want to convert one. Returns ------- pd.DataFrame containing new lat and lon columns """ for i, col in enumerate(points_cols): if len(points_cols) == 1: lon_col_name = 'lon' lat_col_name = 'lat' else: lon_col_name = f'lon{i+1}' lat_col_name = f'lat{i+1}' df[lon_col_name] = df[col].apply(lambda p: p.x) df[lat_col_name] = df[col].apply(lambda p: p.y)
[docs]def calc_distance_between_df_cols( df: pd.DataFrame, cols_compare: list[tuple[str]] | list[list[str]], new_col_name: str = "distance", ) -> pd.DataFrame: """Calculate the geodesic distance between sets of lat/lon values. See https://geopy.readthedocs.io/en/stable/#module-geopy.distance for more info. Parameters ---------- df: pd.DataFrame df containing two pairs of lat/lon values cols_compare: list[[tuple[str]] or list[list[str]] list of columns of lat/lon values. Inputted as pairs as a tuple or list new_col_name: str, optional The default is 'distance'. Returns ------- pd.DataFrame copy of df with an extra 'distance' column """ if not len(cols_compare) == 2: raise ValueError( 'Cannot compare more or fewer than two sets of lat/lon values at a time') df[new_col_name] = df.apply( lambda x: geopy.distance.geodesic( (x[cols_compare[0][0]], x[cols_compare[0][1]]), (x[cols_compare[1][0]], x[cols_compare[1][1]])).km, axis=1) return df
[docs]def find_index_closest_point_in_col( poi: shapely.Point, points_df: pd.DataFrame, points_df_geom_col: str, which_closest: int = 0 ) -> int: """Find the df index of the closest point object to poi in the df object. Parameters ---------- poi : shapely.Point point of interest (shapely.Point object) points_df : pd.DataFrame dataframe containing a column of shapely.Point objects points_df_geom_col : str name of column of shapely.Point objects which_closest : int = 1 if 1 (default), find closest. For any other N, find the Nth closest Returns ------- int object relating to index of points_df df """ distances = points_df[points_df_geom_col].apply(lambda x: poi.distance(x)) s = sorted(set(distances)) return distances[distances == s[which_closest]].index[0]
[docs]def calculate_first_last_dates_from_df( df: pd.DataFrame, time_buffer: tuple[float, str] = [0, 'h'], date_col_name: list[str] = None ) -> tuple[pd.Timestamp]: """Calculate the first and last dates from a df, with a time buffer before and after. Parameters ---------- df : pd.DataFrame should contain at least one datetime column. If multiple datetime columns, column to be used should be specified. Will default to first occurence of such a column time_buffer : tuple[float,str] defaults to [0,'h'] (no buffer) extra time to remove from first occurence and add to last occurence. A string specifying the unit of time is necessary e.g. 'h' for hours (either as a tuple or list). See https://pandas.pydata.org/docs/reference/api/pandas.Timedelta.html for more info. date_col_name : list[str] defaults to None name of column containing datetime objects to be processed Returns ------- tuple[pd.Timestamp] detailing start and end time/date N.B. currently discarding any timezone information """ # if no date_column_name provided if not date_col_name: try: # try to find first occurence of a coolumn containing dates date_col = df.columns[df.apply( pd.api.types.is_datetime64_any_dtype)].tolist()[0] except TypeError(): print('No column containing datetime64 objects found') else: # check if column provided contains datetime objects if pd.api.types.is_datetime64_any_dtype(df[date_col_name]): date_col = df[date_col_name] else: raise ValueError( 'Column provided as date_col_name does not contain datetime objects') # if date column type has a timezone detailed if pd.api.types.is_datetime64tz_dtype(df[date_col]): # convert the 'dates' column to naive timestamps df[date_col] = df[date_col].dt.tz_localize(None) # generate time buffer delta = pd.Timedelta(time_buffer[0], time_buffer[1]) # find minimum and maximum date values start = df[date_col].min() - delta end = df[date_col].max() + delta return start, end
[docs]def calc_means_df_cols( df: pd.DataFrame, col_names: list[str] ) -> pd.DataFrame: """Return mean values of prescribed columns in df Parameters ---------- df : pd.DataFrame col_names : list[str] list of columns to calculate mean Returns ------- list[float] of mean value of each column """ means = [] for col in col_names: means.append(df[col].mean()) return means
[docs]def limit_df_spatial_range( df: pd.DataFrame, centre_coords: list[float] | tuple[float], min_number: int = None, distance_buffer: float = None, verbose: bool = False ) -> pd.DataFrame: """Restrict df to within +/- a lat-lon distance, or to a min_number number of rows. Parameters ---------- df : pd.DataFrame df containing 'lat' and 'lon' columns centre_coords : list[float] or tuple[float] geographical centre about which to restrict df min_number : int = None minimum number of rows in df to be returned distance_buffer : float = None distance from geographical centre within which points in df should be returned verbose : bool = False (don't show message re expansion of distance_buffer) choose whether or not to show that distance_buffer was expanded Returns ------- pd.DataFrame spatially limited df """ if not set(['lat', 'lon']).issubset(df.columns): raise ValueError('Columns by name of lat and lon not found in df') # if choosing to find closest N points at any distance away if distance_buffer is None: # set arbitrarily small distance buffer distance_buffer = 1 df_spatial_lim = exclude_df_rows_symmetrically_around_value( df, ['lat', 'lon'], centre_coords, [distance_buffer, distance_buffer]) # expand distance buffer until minimum number reached while len(df_spatial_lim) <= min_number: distance_buffer += 1 df_spatial_lim = exclude_df_rows_symmetrically_around_value( df, ['lat', 'lon'], centre_coords, [distance_buffer, distance_buffer]) if verbose is True: print(f'Spatial search expanded to +/- {distance_buffer} degrees') # if choosing to find closest stations only up to a certain distance else: df_spatial_lim = exclude_df_rows_symmetrically_around_value( df, ['lat', 'lon'], centre_coords, [distance_buffer, distance_buffer]) return df_spatial_lim
[docs]def station_availability( df_stations: pd.DataFrame, df_noaa_weather_event: pd.DataFrame, time_buffer: list[float, str] = [0, 'h'], available: bool = True ) -> pd.DataFrame: """ Filter dataframe by time to return only stations with observation present. Defaults to available """ start, end = calculate_first_last_dates_from_df( df_noaa_weather_event, time_buffer) if available: # return available stations df_station_time_lim = df_stations[ ((df_stations['begin'] <= start) & (df_stations['end'] >= end))] else: # limit stations df to those operational within +/- 1 time_buffer # either side of event df_station_time_lim = df_stations[ ~((df_stations['begin'] <= start) & (df_stations['end'] >= end))] return df_station_time_lim