Source code for invert4geom.cross_validation

import copy  # pylint: disable=too-many-lines
import typing
import warnings

import harmonica as hm
import numpy as np
import pandas as pd
import polartoolkit as ptk
import sklearn
import verde as vd
import xarray as xr
from numpy.typing import NDArray

from invert4geom import logger, utils

# pylint: enable=duplicate-code


[docs] def remove_test_points(ds: xr.Dataset) -> xr.Dataset: """ Remove the test rows and the column denoting test points Parameters ---------- ds : xarray.Dataset gravity dataset with a boolean variable "test". Returns ------- xarray.Dataset gravity dataset with test points removed and no "test" column. """ df = ds.inv.df try: df = df[df.test == False].copy() # noqa: E712 # pylint: disable=singleton-comparison df = df.drop(columns=["test"]) ds_new = df.set_index(list(ds.dims)).to_xarray() # retrain attributes ds_new.attrs.update(ds.attrs) # pylint: disable=protected-access return ds_new except AttributeError: msg = "gravity dataframe does not contain a 'test' column, cannot remove test points." warnings.warn(msg, stacklevel=2) return ds
[docs] def add_test_points( ds: xr.Dataset, ) -> xr.Dataset: """ take a dataframe of coordinates and make all rows that fall on the data_spacing grid training points. Add rows at each point which falls on the grid points of half the data_spacing, assign these with label "test". If other data is present in dataframe, will sample at each new location. Parameters ---------- ds : xarray.Dataset gravity dataset to be resampled. Returns ------- xarray.Dataset gravity dataset with a boolean variable "test" denoting whether the row is a training or testing point. """ df = ds.inv.df if "test" in df.columns: msg = "gravity dataframe already contains a 'test' column, not resampling. If you'd like to remove the test points, use `remove_test_points()." raise ValueError(msg) coord_names = ds.coord_names # create coords for full data at half spacing coords = vd.grid_coordinates( region=ds.region, spacing=ds.spacing / 2, pixel_register=False, ) # turn coordinates into dataarray full_points = vd.make_xarray_grid( (coords[0], coords[1]), data=np.ones_like(coords[0]), data_names="tmp", dims=[coord_names[1], coord_names[0]], ) # turn dataarray in dataframe full_df: pd.DataFrame = vd.grid_to_table(full_points).drop(columns="tmp") # set all points to test full_df["test"] = True # pylint: disable=unsupported-assignment-operation # subset training points, every other value train_df = full_df[ # pylint: disable=unsubscriptable-object (full_df[coord_names[0]].isin(full_points[coord_names[0]].to_numpy()[::2])) # pylint: disable=unsubscriptable-object & (full_df[coord_names[1]].isin(full_points[coord_names[1]].to_numpy()[::2])) # pylint: disable=unsubscriptable-object ].copy() # set training points to not be test points train_df["test"] = False # merge training and testing dfs df = full_df.set_index([coord_names[1], coord_names[0]]) df.update(train_df.set_index([coord_names[1], coord_names[0]])) df2 = df.reset_index() df2["test"] = df2.test.astype(bool) # sample any other columns in original df at new locations for i in list(ds): if i == "test" or ds[i].dtype == object: pass else: if not bool(ds[i].coords): msg = ("Issue with dataset variable '%s'.", i) # type: ignore[assignment] raise ValueError(msg) try: df2[i] = utils.sample_grids( df=df2, grid=ds[i], sampled_name=i, coord_names=coord_names, )[i].astype(ds[i].dtype) except pd.errors.IntCastingNaNError as e: logger.error(e) df2[i] = utils.sample_grids( df=df2, grid=ds[i], sampled_name=i, coord_names=coord_names, )[i].astype(ds[i].dtype) # retain original data types dtypes = {k: v for k, v in ds.inv.df.dtypes.items() if k in ds.inv.df} df2 = df2.astype(dtypes) # test with this, using same input spacing as original # pd.testing.assert_frame_equal(df2, full_res_grav, check_like=True,) ds_new = df2.set_index([coord_names[1], coord_names[0]]).to_xarray() # retain attributes ds_new.attrs.update(ds.attrs) # pylint: disable=protected-access return ds_new
def resample_with_test_points( data_spacing: typing.Any, # noqa: ARG001 data: typing.Any, # noqa: ARG001 region: typing.Any, # noqa: ARG001 ) -> None: """ DEPRECATED: use function `add_test_points` instead """ # pylint: disable=W0613 msg = "Function `resample_with_test_points` deprecated, use function `add_test_points` instead" raise DeprecationWarning(msg) def grav_cv_score( training_data: pd.DataFrame, # noqa: ARG001 testing_data: pd.DataFrame, # noqa: ARG001 progressbar: bool = True, # noqa: ARG001 rmse_as_median: bool = False, # noqa: ARG001 plot: bool = False, # noqa: ARG001 **kwargs: typing.Any, # noqa: ARG001 ) -> None: """ DEPRECATED: use the `Inversion` class method `gravity_score` instead """ # pylint: disable=W0613 msg = ( "Function `grav_cv_score` deprecated, use the `Inversion` class method " "`gravity_score` instead" ) raise DeprecationWarning(msg) def constraints_cv_score( grav_df: pd.DataFrame, # noqa: ARG001 constraints_df: pd.DataFrame, # noqa: ARG001 rmse_as_median: bool = False, # noqa: ARG001 **kwargs: typing.Any, # noqa: ARG001 ) -> None: """ DEPRECATED: use the `Inversion` class method `constraints_score` instead """ # pylint: disable=W0613 msg = ( "Function `constraints_cv_score` deprecated, use the `Inversion` class method " "`constraints_score` instead" ) raise DeprecationWarning(msg)
[docs] def random_split_test_train( data_df: pd.DataFrame, test_size: float = 0.3, random_state: int = 10, coord_names: tuple[str, str] = ("easting", "northing"), plot: bool = False, ) -> pd.DataFrame: """ split data into training and testing sets randomly with a specified percentage of points to be in the test set set by test_size. Parameters ---------- data_df : pandas.DataFrame data to be split, must have columns set by parameter coord_names test_size : float, optional decimal percentage of points to put in the testing set, by default 0.3 random_state : int, optional number to set th random splitting, by default 10 coord_names : tuple[str, str], optional names of the coordinate columns in the dataframe, by default ("easting", "northing") plot : bool, optional choose to plot the results, by default False Returns ------- pandas.DataFrame dataframe with a new column "test" which is a boolean value of whether the row is in the training or testing set. """ data_df = data_df.copy() train, test = vd.train_test_split( coordinates=(data_df[coord_names[0]], data_df[coord_names[1]]), data=data_df.index, test_size=test_size, random_state=random_state, ) train_df = pd.DataFrame( data={ coord_names[0]: train[0][0], coord_names[1]: train[0][1], "index": train[1][0], "test": False, } ) test_df = pd.DataFrame( data={ coord_names[0]: test[0][0], coord_names[1]: test[0][1], "index": test[1][0], "test": True, } ) random_split_df = pd.concat([train_df, test_df]) random_split_df = pd.merge(data_df, random_split_df, on=list(coord_names)) # noqa: PD015 random_split_df = random_split_df.drop(columns=["index"]) if plot is True: try: df_train = random_split_df[random_split_df.test == False] # noqa: E712 # pylint: disable=singleton-comparison df_test = random_split_df[random_split_df.test == True] # noqa: E712 # pylint: disable=singleton-comparison region = vd.get_region( (random_split_df[coord_names[0]], random_split_df[coord_names[1]]) ) plot_region = vd.pad_region(region, (region[1] - region[0]) / 10) fig = ptk.basemap( region=plot_region, title="Random split", ) fig.plot( x=df_train[coord_names[0]], y=df_train[coord_names[1]], style="c.3c", fill="blue", label="Train", ) fig.plot( x=df_test[coord_names[0]], y=df_test[coord_names[1]], style="t.5c", fill="red", label="Test", ) fig.add_box(box=region) fig.legend() fig.show() except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return random_split_df
[docs] def split_test_train( data_df: pd.DataFrame, method: str, spacing: float | tuple[float, float] | None = None, shape: tuple[float, float] | None = None, n_splits: int = 5, random_state: int = 10, coord_names: tuple[str, str] = ("easting", "northing"), plot: bool = False, ) -> pd.DataFrame: """ Split data into training or testing sets either using KFold (optional blocked) or LeaveOneOut methods. Parameters ---------- data_df : pandas.DataFrame dataframe with coordinate columns set by coord_names method : str choose between "LeaveOneOut" or "KFold" methods. spacing : float | tuple[float, float] | None, optional grid spacing to use for Block K-Folds, by default None shape : tuple[float, float] | None, optional number of blocks to use for Block K-Folds, by default None n_splits : int, optional number for folds to make for K-Folds method, by default 5 random_state : int, optional random state used for both methods, by default 10 coord_names : tuple[str, str], optional names of the coordinate columns in the dataframe, by default ("easting", "northing") plot : bool, optional plot the separated training and testing dataset, by default False Returns ------- pandas.DataFrame a dataset with a new column for each fold in the form fold_0, fold_1 etc., with the value "train" or "test" """ df = data_df.copy() if method == "LeaveOneOut": kfold = sklearn.model_selection.LeaveOneOut() elif method == "KFold": if n_splits > len(df): msg = ( "n_splits must be less than or equal to the number of data points, " "decreasing n_splits" ) logger.warning(msg) n_splits = len(df) if n_splits == 1: msg = "n_splits must be greater than 1" raise ValueError(msg) if spacing or shape is None: kfold = sklearn.model_selection.KFold( n_splits=n_splits, shuffle=True, random_state=random_state, ) else: kfold = vd.BlockKFold( spacing=spacing, shape=shape, n_splits=n_splits, shuffle=True, random_state=random_state, ) # kfold = vd.BlockShuffleSplit( # spacing=spacing, # shape=shape, # n_splits=n_splits, # test_size=test_size, # random_state = random_state, # ) else: msg = "invalid string for `method`" raise ValueError(msg) coords = (df[coord_names[0]], df[coord_names[1]]) feature_matrix = np.transpose(coords) coord_shape = coords[0].shape mask = np.full(shape=coord_shape, fill_value=" ") for iteration, (train, test) in enumerate(kfold.split(feature_matrix)): mask[np.unravel_index(train, coord_shape)] = "train" mask[np.unravel_index(test, coord_shape)] = "_test" df = pd.concat( [df, pd.DataFrame({f"fold_{iteration}": mask}, index=df.index)], axis=1 ) df = df.replace("_test", "test") if plot is True: try: folds = list(df.columns[df.columns.str.startswith("fold_")]) _, ncols = ptk.square_subplots(len(folds)) df = df.copy() for i in range(len(folds)): if i == 0: fig = None origin_shift = "initialize" xshift_amount = None yshift_amount = None elif i % ncols == 0: origin_shift = "both" xshift_amount = -ncols + 1 yshift_amount = -1 else: origin_shift = "x" xshift_amount = 1 yshift_amount = 1 df_test = df[df[f"fold_{i}"] == "test"] df_train = df[df[f"fold_{i}"] == "train"] region = vd.get_region((df[coord_names[0]], df[coord_names[1]])) plot_region = vd.pad_region(region, (region[1] - region[0]) / 10) fig = ptk.basemap( region=plot_region, title=f"Fold {i} ({len(df_test)} testing points)", origin_shift=origin_shift, xshift_amount=xshift_amount, yshift_amount=yshift_amount, fig=fig, ) fig.add_box(box=region) fig.plot( x=df_train[coord_names[0]], y=df_train[coord_names[1]], style="c.4c", fill="blue", label="Train", ) fig.plot( x=df_test[coord_names[0]], y=df_test[coord_names[1]], style="t.7c", fill="red", label="Test", ) fig.legend() fig.show() # type: ignore[union-attr] except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return df
[docs] def kfold_df_to_lists( df: pd.DataFrame, ) -> tuple[list[pd.DataFrame], list[pd.DataFrame]]: """ convert a single dataframe with fold columns in the form fold_0, fold_1 etc. into a list of testing dataframes for each fold and a list of training dataframes for each fold. Parameters ---------- df : pandas.DataFrame dataframe with fold columns in the form fold_0, fold_1 etc., as output by function `split_test_train()`. Returns ------- test_dfs : list[pandas.DataFrame] a list of testing dataframes for each fold train_dfs : list[pandas.DataFrame] a list of training dataframes for each fold """ # get list of fold column names folds = list(df.columns[df.columns.str.startswith("fold_")]) test_dfs = [] train_dfs = [] # add train and test df for each fold to each own list for f in folds: # remove other fold column for clarity cols_to_remove = [i for i in folds if i != f] df1 = df.drop(cols_to_remove, axis=1) # append new dfs to lists test_dfs.append(df1[df1[f] == "test"]) train_dfs.append(df1[df1[f] == "train"]) return test_dfs, train_dfs
[docs] def eq_sources_score( coordinates: tuple[NDArray, NDArray, NDArray], data: pd.Series | NDArray, delayed: bool = False, weights: NDArray | None = None, **kwargs: typing.Any, ) -> float: """ Calculate the cross-validation score for fitting gravity data to equivalent sources. Uses Verde's cross_val_score function to calculate the score. All kwargs are passed to the harmonica.EquivalentSources class. Parameters ---------- coordinates : tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] tuple of easting, northing, and upward coordinates of the gravity data data : pandas.Series | numpy.ndarray the gravity data delayed : bool, optional compute the scores in parallel if True, by default False weights : numpy.ndarray | None, optional optional weight values for each gravity data point, by default None Keyword Arguments ----------------- damping : float | None The positive damping regularization parameter. Controls how much smoothness is imposed on the estimated coefficients. If None, no regularization is used. points : list[numpy.ndarray] | None List containing the coordinates of the equivalent point sources. Coordinates are assumed to be in the following order: (``easting``, ``northing``, ``upward``). If None, will place one point source below each observation point at a fixed relative depth below the observation point. Defaults to None. depth : float | str Parameter used to control the depth at which the point sources will be located. If a value is provided, each source is located beneath each data point (or block-averaged location) at a depth equal to its elevation minus the ``depth`` value. If set to ``"default"``, the depth of the sources will be estimated as 4.5 times the mean distance between first neighboring sources. This parameter is ignored if *points* is specified. Defaults to ``"default"``. block_size: float | tuple[float, float] | None Size of the blocks used on block-averaged equivalent sources. If a single value is passed, the blocks will have a square shape. Alternatively, the dimensions of the blocks in the South-North and West-East directions can be specified by passing a tuple. If None, no block-averaging is applied. This parameter is ignored if *points* are specified. Default to None. parallel : bool If True any predictions and Jacobian building is carried out in parallel through Numba's ``jit.prange``, reducing the computation time. If False, these tasks will be run on a single CPU. Default to True. dtype : str The desired data-type for the predictions and the Jacobian matrix. Default to ``"float64"``. Returns ------- float a float of the score, the higher the value to better the fit. """ if np.isnan(coordinates).any(): msg = "coordinates contain NaN" raise ValueError(msg) if np.isnan(data).any(): msg = "data contains is NaN" raise ValueError(msg) eqs = hm.EquivalentSources( **kwargs, ) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=sklearn.exceptions.UndefinedMetricWarning ) score = np.nan n_splits = 5 while np.isnan(score): try: score = np.mean( vd.cross_val_score( eqs, coordinates, data, delayed=delayed, weights=weights, cv=sklearn.model_selection.KFold( n_splits=n_splits, shuffle=True, random_state=0, ), scoring="r2", ) ) except ValueError: score = np.nan if (n_splits == 5) and (np.isnan(score)): msg = ( "eq sources score is NaN, reducing n_splits (5) by 1 until " "scoring metric is defined" ) logger.warning(msg) n_splits -= 1 if n_splits == 0: break if np.isnan(score): msg = ( "score is still NaN after reducing n_splits, make sure you're supplying " "enough points for the equivalent sources" ) raise ValueError(msg) return score # type: ignore[no-any-return]
[docs] def regional_separation_score( grav_ds: xr.Dataset, testing_df: pd.DataFrame, score_as_median: bool = False, **kwargs: typing.Any, ) -> tuple[float, float, float | None, xr.Dataset]: """ Evaluate the effectiveness of the gravity regional-residual separation. The optimal regional component is that which results in a residual component which is lowest at constraint points, while still contains a high amplitude elsewhere. Parameters ---------- grav_ds : xarray.Dataset gravity dataset with variables "gravity_anomaly" and "forward_gravity". testing_df : pandas.DataFrame dataframe containing a priori measurements of the topography of interest with columns "upward", "easting", and "northing" score_as_median : bool, optional switch from using the root mean square to the root median square for the score, by default is False., by default False **kwargs: typing.Any, additional keyword arguments for the specified method. Returns ------- residual_constraint_score : float the RMS of the residual at constraint points residual_amplitude_score : float the RMS of the residuals amplitude at all grid points true_reg_score : float | None the RMSE between the true regional field and the estimated field, if provided, otherwise None ds_anomalies : xarray.Dataset the dataframe of the regional and residual gravity anomalies """ # pull out kwargs kwargs = copy.deepcopy(kwargs) method = kwargs.pop("method") true_regional = kwargs.pop("true_regional", None) if method == "constraints_cv": msg = ( "method `constraints_cv` internally calculated regional separation scores " "so it should not be used here." ) raise ValueError(msg) # estimate the regional field grav_ds.inv.regional_separation( method=method, **kwargs, ) # sample the residual at the constraint points df = utils.sample_grids( df=testing_df, grid=grav_ds.res, sampled_name="res", coord_names=grav_ds.coord_names, ) # calculate scores residual_constraint_score = utils.rmse(df.res, as_median=score_as_median) if np.isnan(residual_constraint_score): msg = "residual_constraint_score is NaN" raise ValueError(msg) residual_amplitude_score = utils.rmse(grav_ds.res, as_median=score_as_median) if np.isnan(residual_amplitude_score): msg = "residual_amplitude_score is NaN" raise ValueError(msg) if true_regional is not None: true_reg_score = utils.rmse( np.abs(true_regional - grav_ds.reg), as_median=score_as_median ) else: true_reg_score = None # misfit = reg + res # want residual at constraint to be low # don't want this accomplished by having reg = misfit everywhere # optimize on # 1) low residual at constraints # 2) high residual everywhere else return ( residual_constraint_score, residual_amplitude_score, true_reg_score, grav_ds, )