Source code for invert4geom.cross_validation

import copy  # pylint: disable=too-many-lines
import itertools
import logging
import pathlib
import pickle
import random
import typing
import warnings

import deprecation
import harmonica as hm
import numpy as np
import pandas as pd
import sklearn
import verde as vd
import xarray as xr
from numpy.typing import NDArray
from polartoolkit import maps
from polartoolkit import utils as polar_utils
from tqdm.autonotebook import tqdm

import invert4geom
from invert4geom import inversion, logger, plotting, regional, utils


[docs] def resample_with_test_points( data_spacing: float, data: pd.DataFrame, region: tuple[float, float, float, float], ) -> pd.DataFrame: """ take a dataframe of coordinates and make all rows that fall on the data_spacing grid training points. Add rows at each point which falls on the grid points of half the data_spacing, assign these with label "test". If other data is present in dataframe, will sample at each new location. Parameters ---------- data_spacing : float full spacing size which will be halved data : pandas.DataFrame dataframe with coordinate columns "easting" and "northing", all other columns will be sampled at new grid spacing region : tuple[float, float, float, float] region to create grid over, in the form (min_easting, max_easting, min_northing, max_northing) Returns ------- pandas.DataFrame a new dataframe with new column "test" of booleans which shows whether each row is a testing or training point. """ # create coords for full data coords = vd.grid_coordinates( region=region, spacing=data_spacing / 2, pixel_register=False, ) # turn coordinates into dataarray full_points = vd.make_xarray_grid( (coords[0], coords[1]), data=np.ones_like(coords[0]), data_names="tmp", dims=("northing", "easting"), ) # turn dataarray in dataframe full_df = vd.grid_to_table(full_points).drop(columns="tmp") # set all points to test full_df["test"] = True # pylint: disable=unsupported-assignment-operation # subset training points, every other value train_df = full_df[ # pylint: disable=unsubscriptable-object (full_df.easting.isin(full_points.easting.to_numpy()[::2])) & (full_df.northing.isin(full_points.northing.to_numpy()[::2])) ].copy() # set training points to not be test points train_df["test"] = False # merge training and testing dfs df = full_df.set_index(["northing", "easting"]) df.update(train_df.set_index(["northing", "easting"])) df2 = df.reset_index() df2["test"] = df2.test.astype(bool) grid = data.set_index(["northing", "easting"]).to_xarray() for i in list(grid): if i == "test" or grid[i].dtype == object: pass else: try: df2[i] = utils.sample_grids( df2, grid[i], i, )[i].astype(data[i].dtype) except pd.errors.IntCastingNaNError as e: logger.error(e) df2[i] = utils.sample_grids( df2, grid[i], i, )[i] # test with this, using same input spacing as original # pd.testing.assert_frame_equal(df2, full_res_grav, check_like=True,) return df2.dropna()
[docs] def grav_cv_score( training_data: pd.DataFrame, testing_data: pd.DataFrame, progressbar: bool = True, rmse_as_median: bool = False, plot: bool = False, **kwargs: typing.Any, ) -> tuple[float, tuple[pd.DataFrame, pd.DataFrame, dict[str, typing.Any], float]]: """ Find the score, represented by the root mean (or median) squared error (RMSE), between the testing gravity data, and the predict gravity data after an inversion. Follows methods of :footcite:t:`uiedafast2017`. Used in `optimization.optimize_inversion_damping()`. Parameters ---------- training_data : pandas.DataFrame rows of the gravity data frame which are just the training data testing_data : pandas.DataFrame rows of the gravity data frame which are just the testing data rmse_as_median : bool, optional calculate the RMSE as the median as opposed to the mean, by default False progressbar : bool, optional choose to show the progress bar for the forward gravity calculation, by default True plot : bool, optional choose to plot the observed and predicted data grids, and their difference, located at the testing points, by default False Returns ------- score : float the root mean squared error, between the testing gravity data and the predicted gravity data results : tuple[pandas.DataFrame, pandas.DataFrame, dict[str, typing.Any], float] a tuple of the inversion results. References ---------- :footcite:t:`uiedafast2017` """ train = training_data.copy() test = testing_data.copy() # extract density contrast and zref from prism layer prism_layer: xr.Dataset = kwargs.get("prism_layer") density_contrast = np.fabs(prism_layer.density) zref = prism_layer.attrs.get("zref") # make sure dynamic plotting of inversion iterations is off kwargs["plot_dynamic_convergence"] = False with utils._log_level(logging.WARN): # pylint: disable=protected-access # run inversion results = inversion.run_inversion( grav_df=train, progressbar=False, **kwargs, ) prism_results, _, _, _ = results # get last iteration's layer result final_topography = prism_results.set_index(["northing", "easting"]).to_xarray().topo density_grid = xr.where( final_topography >= zref, density_contrast, -density_contrast, ) # create new prism layer prism_layer = utils.grids_to_prisms( final_topography, reference=zref, density=density_grid, ) # calculate forward gravity of inverted prism layer test["test_point_grav"] = prism_layer.prism_layer.gravity( coordinates=( test.easting, test.northing, test.upward, ), field="g_z", progressbar=progressbar, ) # compare forward of inverted layer with observed observed = test.gravity_anomaly - test.reg predicted = test.test_point_grav dif = predicted - observed score = utils.rmse(dif, as_median=rmse_as_median) if plot: try: test_grid = test.set_index(["northing", "easting"]).to_xarray() obs = test_grid.gravity_anomaly - test_grid.reg pred = test_grid.test_point_grav.rename("") _ = polar_utils.grd_compare( pred, obs, grid1_name="Predicted gravity", grid2_name="Observed gravity", plot_type="xarray", robust=True, title=f"Score={score}", rmse_in_title=False, hist=True, inset=False, ) except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return score, results
@deprecation.deprecated( deprecated_in="0.8.0", removed_in="0.14.0", current_version=invert4geom.__version__, details=( "Use the new function `optimization.optimize_inversion_damping()`" "instead, which uses Optuna for optimization. If you would still like to " "conduct a grid search, set `grid_search=True` in the new function.", ), )
[docs] def grav_optimal_parameter( training_data: pd.DataFrame, testing_data: pd.DataFrame, param_to_test: tuple[str, list[float]], rmse_as_median: bool = False, progressbar: bool = True, plot_grids: bool = False, plot_cv: bool = False, results_fname: str | None = None, **kwargs: typing.Any, ) -> tuple[ tuple[pd.DataFrame, pd.DataFrame, dict[str, typing.Any], float], float, float, list[float], list[float], ]: """ Calculate the cross validation scores for a set of parameter values and return the best score and value. Parameters ---------- training_data : pandas.DataFrame just the training data rows testing_data : pandas.DataFrame just the testing data rows param_to_test : tuple[str, list[float]] first value is a string of the parameter that is being tested, and the second value is a list of the values to test rmse_as_median : bool, optional calculate the RMSE as the median as opposed to the mean, by default False progressbar : bool, optional display a progress bar for the number of tested values, by default True plot_grids : bool, optional plot all the grids of observed and predicted data for each parameter value, by default False plot_cv : bool, optional plot a graph of scores vs parameter values, by default False results_fname : str, optional file name to save results to, by default "tmp" with an attached random number Returns ------- tuple[ tuple[pandas.DataFrame, pandas.DataFrame, dict[str, typing.Any], float], float, float, list[float], list[float], ] the inversion results, the optimal parameter value, the score associated with it, the parameter values and the scores for each parameter value """ train = training_data.copy() test = testing_data.copy() # pull parameter out of kwargs param_name = param_to_test[0] param_values = param_to_test[1] # set file name for saving results with random number between 0 and 999 if results_fname is None: results_fname = f"tmp_{random.randint(0, 999)}" # run inversions and collect scores scores = [] if progressbar is True: pbar = tqdm( param_values, desc=f"{param_name} values", ) elif progressbar is False: pbar = param_values else: msg = "progressbar must be a boolean" # type: ignore[unreachable] raise ValueError(msg) for i, value in enumerate(pbar): # update parameter value in kwargs kwargs[param_name] = value # run cross validation score, _ = grav_cv_score( training_data=train, testing_data=test, rmse_as_median=rmse_as_median, plot=plot_grids, results_fname=f"{results_fname}_trial_{i}", progressbar=False, **kwargs, ) scores.append(score) if (i == 1) and (score > scores[0]): msg = ( "first score was lower than second, consider changing the lower" " parameter value range" ) logger.warning(msg) # print value and score pairs for value, score in zip(param_values, scores, strict=False): logger.info("%s value: %s -> Score: %s", param_name, value, score) best_idx = np.argmin(scores) best_score = scores[best_idx] best_param_value = param_values[best_idx] logger.info("Best score of %s with %s=%s", best_score, param_name, best_param_value) # get best inversion result of each set with pathlib.Path(f"{results_fname}_trial_{best_idx}.pickle").open("rb") as f: inv_results = pickle.load(f) # delete other inversion results for i in range(len(scores)): if i == best_idx: pass else: pathlib.Path(f"{results_fname}_trial_{i}.pickle").unlink(missing_ok=True) # put scores and parameter values into dict results = { "scores": scores, "param_values": param_values, } if best_param_value in [np.min(param_values), np.max(param_values)]: logger.warning( "Best parameter value (%s) for %s CV is at the limit of provided " "values (%s, %s) and thus is likely not a global minimum, expand the range " "of values tested to ensure the best parameter value is found.", best_param_value, param_name, np.nanmin(param_values), np.nanmax(param_values), ) # remove if exists pathlib.Path(results_fname).unlink(missing_ok=True) # save scores and dampings to pickle with pathlib.Path(f"{results_fname}.pickle").open("wb") as f: pickle.dump(results, f) if plot_cv: try: # plot scores plotting.plot_cv_scores( scores, param_values, param_name=param_name, # logx=True, # logy=True, ) except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return inv_results, best_param_value, best_score, param_values, scores
[docs] def constraints_cv_score( grav_df: pd.DataFrame, constraints_df: pd.DataFrame, rmse_as_median: bool = False, **kwargs: typing.Any, ) -> tuple[float, tuple[pd.DataFrame, pd.DataFrame, dict[str, typing.Any], float]]: """ Find the score, represented by the root mean squared error (RMSE), between the constraint point elevation, and the inverted topography at the constraint points. Follows methods of :footcite:t:`uiedafast2017`. Used in `optimization.optimize_inversion_zref_density_contrast()`. Parameters ---------- grav_df : pandas.DataFrame gravity dataframe with columns "res", "reg", and "gravity_anomaly" constraints_df : pandas.DataFrame constraints dataframe with columns "easting", "northing", and "upward" rmse_as_median : bool, optional calculate the RMSE as the median of the , as opposed to the mean, by default False Returns ------- score : float the root mean squared error, between the constraint point elevation and the inverted topography at the constraint points results : tuple[pandas.DataFrame, pandas.DataFrame, dict[str, typing.Any], float] a tuple of the inversion results. References ---------- .. footbibliography:: """ constraints_df = constraints_df.copy() with utils._log_level(logging.WARN): # pylint: disable=protected-access # run inversion results = inversion.run_inversion( grav_df=grav_df, progressbar=False, **kwargs, ) prism_results, _, _, _ = results # get last iteration's layer result final_topography = prism_results.set_index(["northing", "easting"]).to_xarray().topo # sample the inverted topography at the constraint points constraints_df = utils.sample_grids( constraints_df, final_topography, "inverted_topo", ) dif = constraints_df.upward - constraints_df.inverted_topo return utils.rmse(dif, as_median=rmse_as_median), results
# pylint: disable=duplicate-code @deprecation.deprecated( deprecated_in="0.8.0", removed_in="0.14.0", current_version=invert4geom.__version__, details=( "Use the new function `optimization.optimize_inversion_zref_density_contrast()`" "instead, which uses Optuna for optimization. If you would still like to " "conduct a grid search, set `grid_search=True` in the new function.", ), )
[docs] def zref_density_optimal_parameter( grav_df: pd.DataFrame, constraints_df: pd.DataFrame, starting_topography: xr.DataArray | None = None, zref_values: list[float] | None = None, density_contrast_values: list[float] | None = None, starting_topography_kwargs: dict[str, typing.Any] | None = None, regional_grav_kwargs: dict[str, typing.Any] | None = None, rmse_as_median: bool = False, progressbar: bool = True, plot_cv: bool = False, results_fname: str | None = None, **kwargs: typing.Any, ) -> tuple[ tuple[pd.DataFrame, pd.DataFrame, dict[str, typing.Any], float], float, float, float, list[typing.Any], list[float], ]: """ Calculate the cross validation scores for a set of zref and density values and return the best score and values. If only 1 parameter is needed to be test, can pass a single value of the other parameter. This uses constraint points, where the target topography is known. The inverted topography at each of these points is compared to the known value and used to calculate the score. Parameters ---------- grav_df : pandas.DataFrame dataframe with gravity data and coordinates, must have coordinate columns "easting", "northing", and "upward", and gravity data column "gravity_anomaly" constraints_df : pandas.DataFrame dataframe with points where the topography of interest has been previously measured, to be used for score, must have coordinate columns "easting", "northing", and "upward". starting_topography : xarray.DataArray | None, optional starting topography to use to create the starting prism model. If not provided, will make a flat surface at each provided zref value using the region and spacing values provided in starting_topography_kwargs. zref_values : list[float] | None, optional Reference level values to test, by default None density_contrast_values : list[float] | None, optional Density contrast values to test, by default None starting_topography_kwargs : dict[str, typing.Any] | None, optional region, spacing and dampings used to create a flat starting topography for each zref value, by default None. regional_grav_kwargs : dict[str, typing.Any] | None, optional Keywords used to calculate the regional field, by default None. If method is `constraints` for constraint point minimization, must separate the constraints into testing and training sets and provide the training set to this argument and the testing set to `constraints_df` to avoid biasing the scores. rmse_as_median : bool, optional Use the median instead of the root mean square as the scoring metric, by default False progressbar : bool, optional display a progress bar for the number of tested values, by default True plot_cv : bool, optional plot a graph of scores vs parameter values, by default False results_fname : str, optional file name to save results to, by default "tmp" with an attached random number Returns ------- tuple[ tuple[pandas.DataFrame, pandas.DataFrame, dict[str, typing.Any], float], float, float, float, list[typing.Any], list[float], ] the inversion results, the optimal parameter value, the score associated with it, the parameter values and the scores for each parameter value """ # set file name for saving results with random number between 0 and 999 if results_fname is None: results_fname = f"tmp_{random.randint(0, 999)}" if (zref_values is None) & (density_contrast_values is None): msg = "must provide either or both zref_values and density_contrast_values" raise ValueError(msg) if zref_values is None: zref = kwargs.get("zref") if zref is None: msg = "must provide zref_values or zref in kwargs" raise ValueError(msg) zref_values = [zref] elif density_contrast_values is None: density_contrast = kwargs.get("density_contrast") if density_contrast is None: msg = "must provide density_contrast_values or density_contrast in kwargs" raise ValueError(msg) density_contrast_values = [density_contrast] if starting_topography is None: msg = ( "starting_topography not provided, will create a flat surface at each zref " "value to be the starting topography." ) logger.warning(msg) if starting_topography_kwargs is None: msg = ( "must provide `starting_topography_kwargs` with items `region` " "`spacing`, and `dampings` to create the starting topography for each " "zref level." ) raise ValueError(msg) # raise warning about using constraint point minimization for regional estimation if ( (regional_grav_kwargs is not None) and (regional_grav_kwargs.get("method") == "constraints") and (len(regional_grav_kwargs.get("constraints_df")) == len(constraints_df)) # type: ignore[arg-type] ): msg = ( "Using constraint point minimization technique for regional field " "estimation. This is not recommended as the constraint points are used for " "the density / reference level cross-validation scoring, which biases the " "scoring. Consider using a different method for regional field estimation, " "or set separate constraints in training and testing sets and provide the " "training set to `regional_grav_kwargs` and the testing set to " "constraints_df to use for scoring." ) logger.warning(msg) # create all possible combinations of zref and density contrast parameter_pairs = list(itertools.product(zref_values, density_contrast_values)) # type: ignore[arg-type] if "test" in grav_df.columns: assert grav_df.test.any(), ( "test column contains True value, not needed except for during damping CV" ) # run inversions and collect scores scores = [] if progressbar is True: pbar = tqdm( parameter_pairs, desc="Zref/Density pairs", ) elif progressbar is False: pbar = parameter_pairs else: msg = "progressbar must be a boolean" # type: ignore[unreachable] raise ValueError(msg) for i, (zref, density_contrast) in enumerate(pbar): if starting_topography is None: starting_topo = utils.create_topography( method="flat", dampings=starting_topography_kwargs.get("dampings"), # type: ignore[union-attr] region=starting_topography_kwargs.get("region"), # type: ignore[union-attr, arg-type] spacing=starting_topography_kwargs.get("spacing"), # type: ignore[union-attr, arg-type] upwards=zref, ) else: starting_topo = starting_topography.copy() # re-calculate density grid with new density contrast density_grid = xr.where( starting_topo >= zref, density_contrast, -density_contrast ) # create layer of prisms starting_prisms = utils.grids_to_prisms( starting_topo, reference=zref, density=density_grid, ) # pylint: disable=duplicate-code # calculate forward gravity of starting prism layer grav_df["starting_gravity"] = starting_prisms.prism_layer.gravity( coordinates=( grav_df.easting, grav_df.northing, grav_df.upward, ), field="g_z", progressbar=False, ) # pylint: enable=duplicate-code # calculate regional field reg_kwargs = copy.deepcopy(regional_grav_kwargs) grav_df = regional.regional_separation( grav_df=grav_df, **reg_kwargs, # type: ignore[arg-type] ) # update starting model in kwargs kwargs["prism_layer"] = starting_prisms # pylint: disable=duplicate-code new_kwargs = { key: value for key, value in kwargs.items() if key not in [ "zref", "density_contrast", ] } # pylint: enable=duplicate-code # run cross validation score, _ = constraints_cv_score( grav_df=grav_df, constraints_df=constraints_df, results_fname=f"{results_fname}_trial_{i}", rmse_as_median=rmse_as_median, **new_kwargs, ) scores.append(score) # print parameter and score pairs for (zref, density_contrast), score in zip(parameter_pairs, scores, strict=False): logger.info( "Reference level: %s, Density contrast: %s -> Score: %s", zref, density_contrast, score, ) best_idx = np.argmin(scores) best_score = scores[best_idx] best_zref = parameter_pairs[best_idx][0] best_density = parameter_pairs[best_idx][1] logger.info( "Best score of %s with reference level=%s and density contrast=%s", best_score, best_zref, best_density, ) # get best inversion result of each set with pathlib.Path(f"{results_fname}_trial_{best_idx}.pickle").open("rb") as f: inv_results = pickle.load(f) # delete other inversion results for i in range(len(scores)): if i == best_idx: pass else: pathlib.Path(f"{results_fname}_trial_{i}.pickle").unlink(missing_ok=True) # put scores and parameter pairs into dict results = { "scores": scores, "zref_values": parameter_pairs[0], "density_contrast_values": parameter_pairs[1], } # remove if exists pathlib.Path(results_fname).unlink(missing_ok=True) # save scores and parameter pairs to pickle with pathlib.Path(f"{results_fname}.pickle").open("wb") as f: pickle.dump(results, f) if plot_cv: try: if len(zref_values) == 1: plotting.plot_cv_scores( scores, density_contrast_values, # type: ignore[arg-type] param_name="Density contrast (kg/m$^3$)", plot_title="Density contrast Cross-validation", # logx=True, # logy=True, ) elif len(density_contrast_values) == 1: # type: ignore[arg-type] plotting.plot_cv_scores( scores, zref_values, param_name="Reference level (m)", plot_title="Reference level Cross-validation", # logx=True, # logy=True, ) else: plotting.plot_2_parameter_cv_scores( scores, parameter_pairs, param_names=("Reference level (m)", "Density contrast (kg/m$^3$)"), # logx=True, # logy=True, ) except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return inv_results, best_zref, best_density, best_score, parameter_pairs, scores
# pylint: enable=duplicate-code
[docs] def random_split_test_train( data_df: pd.DataFrame, test_size: float = 0.3, random_state: int = 10, plot: bool = False, ) -> pd.DataFrame: """ split data into training and testing sets randomly with a specified percentage of points to be in the test set set by test_size. Parameters ---------- data_df : pandas.DataFrame data to be split, must have columns "easting" and "northing". test_size : float, optional decimal percentage of points to put in the testing set, by default 0.3 random_state : int, optional number to set th random splitting, by default 10 plot : bool, optional choose to plot the results, by default False Returns ------- pandas.DataFrame dataframe with a new column "test" which is a boolean value of whether the row is in the training or testing set. """ data_df = data_df.copy() train, test = vd.train_test_split( coordinates=(data_df.easting, data_df.northing), data=data_df.index, test_size=test_size, random_state=random_state, ) train_df = pd.DataFrame( data={ "easting": train[0][0], "northing": train[0][1], "index": train[1][0], "test": False, } ) test_df = pd.DataFrame( data={ "easting": test[0][0], "northing": test[0][1], "index": test[1][0], "test": True, } ) random_split_df = pd.concat([train_df, test_df]) random_split_df = pd.merge(data_df, random_split_df, on=["easting", "northing"]) # noqa: PD015 random_split_df = random_split_df.drop(columns=["index"]) if plot is True: try: df_train = random_split_df[random_split_df.test == False] # noqa: E712 # pylint: disable=singleton-comparison df_test = random_split_df[random_split_df.test == True] # noqa: E712 # pylint: disable=singleton-comparison region = vd.get_region((random_split_df.easting, random_split_df.northing)) plot_region = vd.pad_region(region, (region[1] - region[0]) / 10) fig = maps.basemap( region=plot_region, title="Random split", ) fig.plot( x=df_train.easting, y=df_train.northing, style="c.3c", fill="blue", label="Train", ) fig.plot( x=df_test.easting, y=df_test.northing, style="t.5c", fill="red", label="Test", ) maps.add_box(fig, box=region) fig.legend() fig.show() except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return random_split_df
[docs] def split_test_train( data_df: pd.DataFrame, method: str, spacing: float | tuple[float, float] | None = None, shape: tuple[float, float] | None = None, n_splits: int = 5, random_state: int = 10, plot: bool = False, ) -> pd.DataFrame: """ Split data into training or testing sets either using KFold (optional blocked) or LeaveOneOut methods. Parameters ---------- data_df : pandas.DataFrame dataframe with coordinate columns "easting" and "northing" method : str choose between "LeaveOneOut" or "KFold" methods. spacing : float | tuple[float, float] | None, optional grid spacing to use for Block K-Folds, by default None shape : tuple[float, float] | None, optional number of blocks to use for Block K-Folds, by default None n_splits : int, optional number for folds to make for K-Folds method, by default 5 random_state : int, optional random state used for both methods, by default 10 plot : bool, optional plot the separated training and testing dataset, by default False Returns ------- pandas.DataFrame a dataset with a new column for each fold in the form fold_0, fold_1 etc., with the value "train" or "test" """ df = data_df.copy() if method == "LeaveOneOut": kfold = sklearn.model_selection.LeaveOneOut() elif method == "KFold": if n_splits > len(df): msg = ( "n_splits must be less than or equal to the number of data points, " "decreasing n_splits" ) logger.warning(msg) n_splits = len(df) if n_splits == 1: msg = "n_splits must be greater than 1" raise ValueError(msg) if spacing or shape is None: kfold = sklearn.model_selection.KFold( n_splits=n_splits, shuffle=True, random_state=random_state, ) else: kfold = vd.BlockKFold( spacing=spacing, shape=shape, n_splits=n_splits, shuffle=True, random_state=random_state, ) # kfold = vd.BlockShuffleSplit( # spacing=spacing, # shape=shape, # n_splits=n_splits, # test_size=test_size, # random_state = random_state, # ) else: msg = "invalid string for `method`" raise ValueError(msg) coords = (df.easting, df.northing) feature_matrix = np.transpose(coords) coord_shape = coords[0].shape mask = np.full(shape=coord_shape, fill_value=" ") for iteration, (train, test) in enumerate(kfold.split(feature_matrix)): mask[np.unravel_index(train, coord_shape)] = "train" mask[np.unravel_index(test, coord_shape)] = "_test" df = pd.concat( [df, pd.DataFrame({f"fold_{iteration}": mask}, index=df.index)], axis=1 ) df = df.replace("_test", "test") if plot is True: try: folds = list(df.columns[df.columns.str.startswith("fold_")]) _, ncols = polar_utils.square_subplots(len(folds)) df = df.copy() for i in range(len(folds)): if i == 0: fig = None origin_shift = "initialize" xshift_amount = None yshift_amount = None elif i % ncols == 0: origin_shift = "both" xshift_amount = -ncols + 1 yshift_amount = -1 else: origin_shift = "x" xshift_amount = 1 yshift_amount = 1 df_test = df[df[f"fold_{i}"] == "test"] df_train = df[df[f"fold_{i}"] == "train"] region = vd.get_region((df.easting, df.northing)) plot_region = vd.pad_region(region, (region[1] - region[0]) / 10) fig = maps.basemap( region=plot_region, title=f"Fold {i} ({len(df_test)} testing points)", origin_shift=origin_shift, xshift_amount=xshift_amount, yshift_amount=yshift_amount, fig=fig, ) maps.add_box(fig, box=region) fig.plot( x=df_train.easting, y=df_train.northing, style="c.4c", fill="blue", label="Train", ) fig.plot( x=df_test.easting, y=df_test.northing, style="t.7c", fill="red", label="Test", ) fig.legend() fig.show() # type: ignore[union-attr] except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return df
[docs] def kfold_df_to_lists( df: pd.DataFrame, ) -> tuple[list[pd.DataFrame], list[pd.DataFrame]]: """ convert a single dataframe with fold columns in the form fold_0, fold_1 etc. into a list of testing dataframes for each fold and a list of training dataframes for each fold. Parameters ---------- df : pandas.DataFrame dataframe with fold columns in the form fold_0, fold_1 etc., as output by function `split_test_train()`. Returns ------- test_dfs : list[pandas.DataFrame] a list of testing dataframes for each fold train_dfs : list[pandas.DataFrame] a list of training dataframes for each fold """ # get list of fold column names folds = list(df.columns[df.columns.str.startswith("fold_")]) test_dfs = [] train_dfs = [] # add train and test df for each fold to each own list for f in folds: # remove other fold column for clarity cols_to_remove = [i for i in folds if i != f] df1 = df.drop(cols_to_remove, axis=1) # append new dfs to lists test_dfs.append(df1[df1[f] == "test"]) train_dfs.append(df1[df1[f] == "train"]) return test_dfs, train_dfs
[docs] def eq_sources_score( coordinates: tuple[NDArray, NDArray, NDArray], data: pd.Series | NDArray, delayed: bool = False, weights: NDArray | None = None, **kwargs: typing.Any, ) -> float: """ Calculate the cross-validation score for fitting gravity data to equivalent sources. Uses Verde's cross_val_score function to calculate the score. All kwargs are passed to the harmonica.EquivalentSources class. Parameters ---------- coordinates : tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] tuple of easting, northing, and upward coordinates of the gravity data data : pandas.Series | numpy.ndarray the gravity data delayed : bool, optional compute the scores in parallel if True, by default False weights : numpy.ndarray | None, optional optional weight values for each gravity data point, by default None Keyword Arguments ----------------- damping : float | None The positive damping regularization parameter. Controls how much smoothness is imposed on the estimated coefficients. If None, no regularization is used. points : list[numpy.ndarray] | None List containing the coordinates of the equivalent point sources. Coordinates are assumed to be in the following order: (``easting``, ``northing``, ``upward``). If None, will place one point source below each observation point at a fixed relative depth below the observation point. Defaults to None. depth : float | str Parameter used to control the depth at which the point sources will be located. If a value is provided, each source is located beneath each data point (or block-averaged location) at a depth equal to its elevation minus the ``depth`` value. If set to ``"default"``, the depth of the sources will be estimated as 4.5 times the mean distance between first neighboring sources. This parameter is ignored if *points* is specified. Defaults to ``"default"``. block_size: float | tuple[float, float] | None Size of the blocks used on block-averaged equivalent sources. If a single value is passed, the blocks will have a square shape. Alternatively, the dimensions of the blocks in the South-North and West-East directions can be specified by passing a tuple. If None, no block-averaging is applied. This parameter is ignored if *points* are specified. Default to None. parallel : bool If True any predictions and Jacobian building is carried out in parallel through Numba's ``jit.prange``, reducing the computation time. If False, these tasks will be run on a single CPU. Default to True. dtype : str The desired data-type for the predictions and the Jacobian matrix. Default to ``"float64"``. Returns ------- float a float of the score, the higher the value to better the fit. """ if np.isnan(coordinates).any(): msg = "coordinates contain NaN" raise ValueError(msg) if np.isnan(data).any(): msg = "data contains is NaN" raise ValueError(msg) eqs = hm.EquivalentSources( **kwargs, ) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", category=sklearn.exceptions.UndefinedMetricWarning ) score = np.nan n_splits = 5 while np.isnan(score): try: score = np.mean( vd.cross_val_score( eqs, coordinates, data, delayed=delayed, weights=weights, cv=sklearn.model_selection.KFold( n_splits=n_splits, shuffle=True, random_state=0, ), scoring="r2", ) ) except ValueError: score = np.nan if (n_splits == 5) and (np.isnan(score)): msg = ( "eq sources score is NaN, reducing n_splits (5) by 1 until " "scoring metric is defined" ) logger.warning(msg) n_splits -= 1 if n_splits == 0: break if np.isnan(score): msg = ( "score is still NaN after reducing n_splits, makes sure you're supplying " "enough points for the equivalent sources" ) raise ValueError(msg) return score # type: ignore[no-any-return]
[docs] def regional_separation_score( testing_df: pd.DataFrame, score_as_median: bool = False, **kwargs: typing.Any, ) -> tuple[float, float, float | None, pd.DataFrame]: """ Evaluate the effectiveness of the gravity regional-residual separation. The optimal regional component is that which results in a residual component which is lowest at constraint points, while still contains a high amplitude elsewhere. Parameters ---------- testing_df : pandas.DataFrame dataframe containing a priori measurements of the topography of interest with columns "upward", "easting", and "northing" score_as_median : bool, optional switch from using the root mean square to the root median square for the score, by default is False., by default False **kwargs: typing.Any, additional keyword arguments for the specified method. Returns ------- residual_constraint_score : float the RMS of the residual at constraint points residual_amplitude_score : float the RMS of the residuals amplitude at all grid points true_reg_score : float | None the RMSE between the true regional field and the estimated field, if provided, otherwise None df_anomalies : pandas.DataFrame the dataframe of the regional and residual gravity anomalies """ # pull out kwargs kwargs = copy.deepcopy(kwargs) method = kwargs.pop("method") grav_df = kwargs.pop("grav_df") true_regional = kwargs.pop("true_regional", None) remove_starting_grav_mean = kwargs.pop("remove_starting_grav_mean", False) if method == "constraints_cv": msg = ( "method `constraints_cv` internally calculated regional separation scores " "so it should not be used here." ) raise ValueError(msg) df_anomalies = regional.regional_separation( method=method, grav_df=grav_df, remove_starting_grav_mean=remove_starting_grav_mean, **kwargs, ) # grid the anomalies grid = df_anomalies.set_index(["northing", "easting"]).to_xarray() # sample the residual and regional at the constraint points df = utils.sample_grids( df=testing_df, grid=grid.res, sampled_name="res", ) residual_constraint_score = utils.rmse(df.res, as_median=score_as_median) if np.isnan(residual_constraint_score): msg = "residual_constraint_score is NaN" raise ValueError(msg) residual_amplitude_score = utils.rmse(grid.res, as_median=score_as_median) if np.isnan(residual_amplitude_score): msg = "residual_amplitude_score is NaN" raise ValueError(msg) if true_regional is not None: true_reg_score = utils.rmse( np.abs(true_regional - grid.reg), as_median=score_as_median ) else: true_reg_score = None # misfit = reg + res # want residual at constraint to be low # don't want this accomplished by having reg = misfit everywhere # optimize on # 1) low residual at constraints # 2) high residual everywhere else return ( residual_constraint_score, residual_amplitude_score, true_reg_score, df_anomalies, )