Source code for invert4geom.utils

from __future__ import annotations  # pylint: disable=too-many-lines

import copy
import os
import typing
import warnings
from contextlib import contextmanager

import dask
import deprecation
import harmonica as hm
import numpy as np
import pandas as pd
import pygmt
import sklearn
import verde as vd
import xarray as xr
import xrft
from numpy.typing import NDArray
from pykdtree.kdtree import KDTree  # pylint: disable=no-name-in-module

import invert4geom
from invert4geom import cross_validation, log, plotting


@contextmanager
[docs] def _log_level(level): # type: ignore[no-untyped-def] "Run body with logger at a different level" saved_logger_level = log.level log.setLevel(level) try: yield saved_logger_level finally: log.setLevel(saved_logger_level)
@contextmanager
[docs] def environ(**env): # type: ignore[no-untyped-def] # pylint: disable=missing-function-docstring """temporarily set/reset an environment variable""" originals = {k: os.environ.get(k) for k in env} for k, val in env.items(): if val is None: os.environ.pop(k, None) else: os.environ[k] = val try: yield finally: for k, val in originals.items(): if val is None: os.environ.pop(k, None) else: os.environ[k] = val
[docs] class DuplicateFilter: """ Filters away duplicate log messages. Adapted from https://stackoverflow.com/a/60462619/18686384 """ def __init__(self, logger): # type: ignore[no-untyped-def]
[docs] self.msgs = set()
[docs] self.logger = logger
[docs] def filter(self, record): # type: ignore[no-untyped-def] # pylint: disable=missing-function-docstring msg = str(record.msg) is_duplicate = msg in self.msgs if not is_duplicate: self.msgs.add(msg) return not is_duplicate
[docs] def __enter__(self): # type: ignore[no-untyped-def] self.logger.addFilter(self)
[docs] def __exit__(self, exc_type, exc_val, exc_tb): # type: ignore[no-untyped-def] self.logger.removeFilter(self)
[docs] def _check_constraints_inside_gravity_region( constraints_df: pd.DataFrame, grav_df: pd.DataFrame, ) -> None: """check that all constraints are inside the region of the gravity data""" grav_region = vd.get_region((grav_df.easting, grav_df.northing)) inside = vd.inside( (constraints_df.easting, constraints_df.northing), region=grav_region ) if not inside.all(): msg = ( "Some constraints are outside the region of the gravity data. " "This may result in unexpected behavior." ) log.warning(msg)
[docs] def _check_gravity_inside_topography_region( grav_df: pd.DataFrame, topography: xr.DataArray, ) -> None: """check that all gravity data is inside the region of the topography grid""" topo_region = vd.get_region((topography.easting.values, topography.northing.values)) inside = vd.inside((grav_df.easting, grav_df.northing), region=topo_region) if not inside.all(): msg = ( "Some gravity data are outside the region of the topography grid. " "This may result in unexpected behavior." ) raise ValueError(msg)
[docs] def rmse(data: NDArray, as_median: bool = False) -> float: """ function to give the root mean/median squared error (RMSE) of data Parameters ---------- data : numpy.ndarray input data as_median : bool, optional choose to give root median squared error instead, by default False Returns ------- float RMSE value """ if as_median: value: float = np.sqrt(np.nanmedian(data**2).item()) else: value = np.sqrt(np.nanmean(data**2).item()) return value
[docs] def nearest_grid_fill( grid: xr.DataArray, method: str = "verde", crs: str | None = None, ) -> xr.DataArray: """ fill missing values in a grid with the nearest value. Parameters ---------- grid : xarray.DataArray grid with missing values method : str, optional choose method of filling, by default "verde" crs : str | None, optional if method is 'rioxarray', provide the crs of the grid, in format 'epsg:xxxx', by default None Returns ------- xarray.DataArray filled grid """ # get coordinate names original_dims = list(grid.sizes.keys()) # get original grid name original_name = grid.name if method == "rioxarray": filled: xr.DataArray = ( grid.rio.write_crs(crs) .rio.set_spatial_dims(original_dims[1], original_dims[0]) .rio.write_nodata(np.nan) .rio.interpolate_na(method="nearest") .rename(original_name) ) elif method == "verde": df = vd.grid_to_table(grid) df_dropped = df[df[grid.name].notnull()] coords = (df_dropped[grid.dims[1]], df_dropped[grid.dims[0]]) region = vd.get_region((df[grid.dims[1]], df[grid.dims[0]])) filled = ( vd.KNeighbors() .fit(coords, df_dropped[grid.name]) .grid(region=region, shape=grid.shape, data_names=original_name)[ original_name ] ) # elif method == "pygmt": # filled = pygmt.grdfill(grid, mode="n", verbose="q").rename(original_name) else: msg = "method must be 'rioxarray', or 'verde'" raise ValueError(msg) # reset coordinate names if changed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="rename '") return filled.rename( { next(iter(filled.dims)): original_dims[0], list(filled.dims)[1]: original_dims[1], } )
[docs] def filter_grid( grid: xr.DataArray, filter_width: float | None = None, height_displacement: float | None = None, filt_type: str = "lowpass", pad_width_factor: int = 3, pad_mode: str = "linear_ramp", pad_constant: float | None = None, pad_end_values: float | None = None, ) -> xr.DataArray: """ Apply a spatial filter to a grid. Parameters ---------- grid : xarray.DataArray grid to filter the values of filter_width : float, optional width of the filter in meters, by default None height_displacement : float, optional height displacement for upward continuation, relative to observation height, by default None filt_type : str, optional type of filter to use from 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', 'northing_deriv', 'up_continue', or 'total_gradient', by default "lowpass" pad_width_factor : int, optional factor of grid width to pad the grid by, by default 3, which equates to a pad with a width of 1/3 of the grid width. pad_mode : str, optional mode of padding, can be "linear", by default "linear_ramp" pad_constant : float | None, optional constant value to use for padding, by default None pad_end_values : float | None, optional value to use for end of padding if pad_mode is "linear_ramp", by default None Returns ------- xarray.DataArray a filtered grid """ # get coordinate names original_dims = list(grid.sizes.keys()) # get original grid name original_name = grid.name # if there are nan's, fill them with nearest neighbor if grid.isnull().any(): filled = nearest_grid_fill(grid, method="verde") else: filled = grid.copy() # reset coordinate names if changed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="rename '") filled = filled.rename( { next(iter(filled.dims)): original_dims[0], list(filled.dims)[1]: original_dims[1], } ) # define width of padding in each direction pad_width = { original_dims[1]: grid[original_dims[1]].size // pad_width_factor, original_dims[0]: grid[original_dims[0]].size // pad_width_factor, } if pad_mode == "constant": if pad_constant is None: pad_constant = filled.median() pad_end_values = None if (pad_mode == "linear_ramp") and (pad_end_values is None): pad_end_values = filled.median() if pad_mode != "constant": pad_constant = ( None # needed until https://github.com/xgcm/xrft/issues/211 is fixed ) # apply padding pad_kwargs = { **pad_width, "mode": pad_mode, "constant_values": pad_constant, "end_values": pad_end_values, } padded = xrft.pad( filled, **pad_kwargs, ) if filt_type == "lowpass": if filter_width is None: msg = "filter_width must be provided if filt_type is 'lowpass'" raise ValueError(msg) filt = hm.gaussian_lowpass(padded, wavelength=filter_width).rename("filt") elif filt_type == "highpass": if filter_width is None: msg = "filter_width must be provided if filt_type is 'highpass'" raise ValueError(msg) filt = hm.gaussian_highpass(padded, wavelength=filter_width).rename("filt") elif filt_type == "up_deriv": filt = hm.derivative_upward(padded).rename("filt") elif filt_type == "easting_deriv": filt = hm.derivative_easting(padded).rename("filt") elif filt_type == "northing_deriv": filt = hm.derivative_northing(padded).rename("filt") elif filt_type == "up_continue": if height_displacement is None: msg = "height_displacement must be provided if filt_type is 'up_continue'" raise ValueError(msg) filt = hm.upward_continuation( padded, height_displacement=height_displacement ).rename("filt") elif filt_type == "total_gradient": filt = hm.total_gradient_amplitude(padded).rename("filt") else: msg = ( "filt_type must be 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', " "'northing_deriv', 'up_continue', or 'total_gradient'" ) raise ValueError(msg) unpadded = xrft.unpad(filt, pad_width) # reset coordinate values to original (avoid rounding errors) unpadded = unpadded.assign_coords( { original_dims[0]: grid[original_dims[0]].to_numpy(), original_dims[1]: grid[original_dims[1]].to_numpy(), } ) if grid.isnull().any(): result: xr.DataArray = xr.where(grid.notnull(), unpadded, grid) else: result = unpadded.copy() # reset coordinate names if changed with warnings.catch_warnings(): warnings.filterwarnings("ignore", message="rename '") result = result.rename( { next(iter(result.dims)): original_dims[0], # list(result.dims)[0]: original_dims[0], list(result.dims)[1]: original_dims[1], } ) return result.rename(original_name)
[docs] def dist_nearest_points( targets: pd.DataFrame, data: pd.DataFrame | xr.DataArray | xr.Dataset, coord_names: tuple[str, str] | None = None, ) -> typing.Any: """ for all grid cells calculate to the distance to the nearest target. Parameters ---------- targets : pandas.DataFrame contains the coordinates of the targets data : pandas.DataFrame | xarray.DataArray | xarray.Dataset the grid data, in either gridded or tabular form coord_names : tuple[str, str] | None, optional the names of the coordinates for both the targets and the data, by default None Returns ------- typing.Any the distance to the nearest target for each gridcell, in the same format as the input for `data`. """ if coord_names is None: coord_names = ("easting", "northing") df_targets = targets[[coord_names[0], coord_names[1]]].copy() df_data: pd.DataFrame | xr.DataArray | xr.Dataset if isinstance(data, pd.DataFrame): df_data = data[list(coord_names)].copy() elif isinstance(data, xr.DataArray): df_grid = vd.grid_to_table(data).dropna() df_data = df_grid[[coord_names[0], coord_names[1]]].copy() # pylint: disable=unsubscriptable-object elif isinstance(data, xr.Dataset): try: df_grid = vd.grid_to_table(data[next(iter(data.variables))]).dropna() # df_grid = vd.grid_to_table(data[list(data.variables)[0]]).dropna() except IndexError: df_grid = vd.grid_to_table(data).dropna() df_data = df_grid[[coord_names[0], coord_names[1]]].copy() # pylint: disable=unsubscriptable-object min_dist, _ = KDTree(df_targets.values).query(df_data.values, 1) df_data["min_dist"] = min_dist if isinstance(data, pd.DataFrame): return df_data return df_data.set_index([coord_names[0], coord_names[1]][::-1]).to_xarray()
[docs] def normalize( x: NDArray, low: float = 0, high: float = 1, ) -> NDArray: """ Normalize a list of numbers between provided values Parameters ---------- x : NDArray numbers to normalize low : float, optional lower value for normalization, by default 0 high : float, optional higher value for normalization, by default 1 Returns ------- NDArray a normalized list of numbers """ min_val = np.min(x) max_val = np.max(x) norm = (x - min_val) / (max_val - min_val) return norm * (high - low) + low
[docs] def normalize_xarray( da: xr.DataArray, low: float = 0, high: float = 1, ) -> xr.DataArray: """ Normalize a grid between provided values Parameters ---------- da : xarray.DataArray grid to normalize low : float, optional lower value for normalization, by default 0 high : float, optional higher value for normalization, by default 1 Returns ------- xarray.DataArray a normalized grid """ # min_val = da.values.min() # max_val = da.values.max() da = da.copy() min_val = da.quantile(0) max_val = da.quantile(1) da2: xr.DataArray = (high - low) * ( ((da - min_val) / (max_val - min_val)).clip(0, 1) ) + low return da2.drop("quantile")
[docs] def scale_normalized( sample: NDArray, bounds: NDArray, ) -> NDArray: """ Rescales the sample space into the unit hypercube, bounds = [0,1] Parameters ---------- sample : NDArray sampled values bounds : NDArray bounds of the sampling Returns ------- NDArray sampled values normalized from 0 to 1 """ scaled_sample = np.zeros(sample.shape) for j in range(sample.shape[1]): scaled_sample[:, j] = (sample[:, j] - bounds[j][0]) / ( bounds[j][1] - bounds[j][0] ) return scaled_sample
[docs] def normalized_mindist( points: pd.DataFrame, grid: xr.DataArray, low: float | None = None, high: float | None = None, mindist: float | None = None, region: list[float] | None = None, ) -> xr.DataArray: """ Find the minimum distance between each grid cell and the nearest point. If low and high are provided, normalize the min dists grid between these values. If region is provided, all grid cells outside region are set to a distance of 0. Parameters ---------- points : pandas.DataFrame coordinates of the points grid : xarray.DataArray gridded data to find min dists for each grid cell low : float | None, optional lower value for normalization, by default None high : float | None, optional higher value for normalization, by default None mindist : float | None, optional the minimum allowed distance, all values below are set equal to, by default None region : list[float] | None, optional bounding region for which all grid cells outside will be set to low, by default None Returns ------- xarray.DataArray grid of normalized minimum distances """ grid = copy.deepcopy(grid) # get coordinate names original_dims = list(grid.sizes.keys()) constraints_df = points.copy() min_dist: xr.DataArray = dist_nearest_points( targets=constraints_df, data=grid, coord_names=(str(original_dims[1]), str(original_dims[0])), ).min_dist # set points < mindist to low if mindist is not None: min_dist = xr.where(min_dist < mindist, 0, min_dist) # set points outside of region to low if region is not None: df = vd.grid_to_table(min_dist) df["are_inside"] = vd.inside( (df[original_dims[1]], df[original_dims[0]]), region=region, ) new_min_dist = df.set_index([original_dims[0], original_dims[1]]).to_xarray() new_min_dist = xr.where(new_min_dist.are_inside, new_min_dist, 0) # add nans back new_min_dist = xr.where(min_dist.isnull(), np.nan, new_min_dist) min_dist = new_min_dist.min_dist # normalize from low to high if (low is None) & (high is None): pass else: assert low is not None assert high is not None min_dist = normalize_xarray(min_dist, low=low, high=high) return min_dist
[docs] def sample_grids( df: pd.DataFrame, grid: str | xr.DataArray, sampled_name: str, **kwargs: typing.Any, ) -> pd.DataFrame: """ Sample data at every point along a line Parameters ---------- df : pandas.DataFrame Dataframe containing columns 'x', 'y', or columns with names defined by kwarg "coord_names". grid : str or xarray.DataArray Grid to sample, either file name or xarray.DataArray sampled_name : str, Name for sampled column Returns ------- pandas.DataFrame Dataframe with new column (sampled_name) of sample values from (grid) """ # drop name column if it already exists try: df1 = df.drop(columns=sampled_name) except KeyError: df1 = df.copy() if "index" in df1.columns: msg = "index column must be removed or renamed before sampling" raise ValueError(msg) df2 = df1.copy() # reset the index df3 = df2.reset_index() # get x and y column names x, y = kwargs.get("coord_names", ("easting", "northing")) # check column names exist, if not, use other common names if (x in df3.columns) and (y in df3.columns): pass elif ("x" in df3.columns) and ("y" in df3.columns): x, y = ("x", "y") # get points to sample at points = df3[[x, y]].copy() # sample the grid at all x,y points sampled = pygmt.grdtrack( points=points, grid=grid, newcolname=sampled_name, # radius=kwargs.get("radius", None), no_skip=True, # if false causes issues verbose=kwargs.get("verbose", "w"), interpolation=kwargs.get("interpolation", "c"), ) df3[sampled_name] = sampled[sampled_name] # reset index to previous df4 = df3.set_index("index") # reset index name to be same as originals df4.index.name = df1.index.name # check that dataframe is identical to original except for new column pd.testing.assert_frame_equal(df4.drop(columns=sampled_name), df1) return df4
[docs] def extract_prism_data( prism_layer: xr.Dataset, ) -> tuple[ pd.DataFrame, xr.Dataset, float, xr.DataArray, ]: """ extract the grid spacing from the starting prism layer and adds variables 'topo' and 'starting_topo', which are the both the starting topography elevation. 'starting_topo' remains unchanged, while 'topo' is updated at each iteration. Parameters ---------- prism_layer : xarray.Dataset starting model prism layer Returns ------- prisms_df : pandas.DataFrame dataframe of prism layer prisms_ds : xarray.Dataset prism layer with added variables 'topo' and 'starting_topo' spacing : float spacing of prisms topo_grid : xarray.DataArray grid of starting topography """ prisms_ds = copy.deepcopy(prism_layer.load()) # add starting topo to dataset topo_grid = xr.where(prisms_ds.density > 0, prisms_ds.top, prisms_ds.bottom) prisms_ds["topo"] = topo_grid prisms_ds["starting_topo"] = topo_grid # turn dataset into dataframe prisms_df = prisms_ds.to_dataframe().reset_index().dropna().astype(float) spacing = get_spacing(prisms_df) return prisms_df, prisms_ds, spacing, topo_grid
[docs] def get_spacing(prisms_df: pd.DataFrame) -> float: """ Extract spacing of harmonica prism layer using a dataframe representation. Parameters ---------- prisms_df : pandas.DataFrame dataframe of harmonica prism layer Returns ------- float spacing of prisms """ return float(abs(prisms_df.northing.unique()[1] - prisms_df.northing.unique()[0]))
[docs] def sample_bounding_surfaces( prisms_df: pd.DataFrame, upper_confining_layer: xr.DataArray | None = None, lower_confining_layer: xr.DataArray | None = None, ) -> pd.DataFrame: """ sample upper and/or lower confining layers into prisms dataframe Parameters ---------- prisms_df : pandas.DataFrame dataframe of prism properties upper_confining_layer : xarray.DataArray | None, optional layer which the inverted topography should always be below, by default None lower_confining_layer : xarray.DataArray | None, optional layer which the inverted topography should always be above, by default None Returns ------- pandas.DataFrame a dataframe with added columns 'upper_bounds' and 'lower_bounds', which are the sampled values of the supplied confining grids. """ df = prisms_df.copy() if upper_confining_layer is not None: df = sample_grids( df=df, grid=upper_confining_layer, sampled_name="upper_bounds", ) assert len(df.upper_bounds) != 0 if lower_confining_layer is not None: df = sample_grids( df=df, grid=lower_confining_layer, sampled_name="lower_bounds", ) assert len(df.lower_bounds) != 0 return df
[docs] def enforce_confining_surface( prisms_df: pd.DataFrame, iteration_number: int, ) -> pd.DataFrame: """ alter the surface correction values to ensure when added to the current iteration's topography it doesn't intersect optional confining layers. Parameters ---------- prisms_df : pandas.DataFrame prism layer dataframe with optional 'upper_bounds' or 'lower_bounds' columns, and current iteration's topography. iteration_number : int number of the current iteration, starting at 1 not 0 Returns ------- pandas.DataFrame a dataframe with added column 'iter_{iteration_number}_correction """ df = prisms_df.copy() if "upper_bounds" in df: # get max upward change allowed for each prism # positive values indicate max allowed upward change # negative values indicate topography is already too far above upper bound df["max_change_above"] = df.upper_bounds - df.topo number_enforced = 0 for i, j in enumerate(df[f"iter_{iteration_number}_correction"]): if j > df.max_change_above[i]: number_enforced += 1 df.loc[i, f"iter_{iteration_number}_correction"] = df.max_change_above[ i ] log.info("enforced upper confining surface at %s prisms", number_enforced) if "lower_bounds" in df: # get max downward change allowed for each prism # negative values indicate max allowed downward change # positive values indicate topography is already too far below lower bound df["max_change_below"] = df.lower_bounds - df.topo number_enforced = 0 for i, j in enumerate(df[f"iter_{iteration_number}_correction"]): if j < df.max_change_below[i]: number_enforced += 1 df.loc[i, f"iter_{iteration_number}_correction"] = df.max_change_below[ i ] log.info("enforced lower confining surface at %s prisms", number_enforced) # check that when constrained correction is added to topo it doesn't intersect # either bounding layer updated_topo: pd.Series[float] = df[f"iter_{iteration_number}_correction"] + df.topo if "upper_bounds" in df and np.any((df.upper_bounds - updated_topo) < -0.001): msg = ( "Constraining didn't work and updated topography intersects upper " "constraining surface" ) raise ValueError(msg) if "lower_bounds" in df and np.any((updated_topo - df.lower_bounds) < -0.001): msg = ( "Constraining didn't work and updated topography intersects lower " "constraining surface" ) raise ValueError(msg) return df
[docs] def apply_surface_correction( prisms_df: pd.DataFrame, iteration_number: int, ) -> tuple[pd.DataFrame, xr.DataArray]: """ update the prisms dataframe and dataset with the surface correction. Ensure that the updated surface doesn't intersect the optional confining surfaces. Parameters ---------- prisms_df : pandas.DataFrame dataframe of prism properties iteration_number : int the iteration number, starting at 1 not 0 Returns ------- tuple[pandas.DataFrame, xarray.DataArray] updated prisms dataframe and correction grid """ df = prisms_df.copy() # for negative densities, negate the correction df.loc[df.density < 0, f"iter_{iteration_number}_correction"] *= -1 # optionally constrain the surface correction with bounding surfaces df = enforce_confining_surface(df, iteration_number) # grid the corrections correction_grid = ( df.rename(columns={f"iter_{iteration_number}_correction": "z"}) .set_index(["northing", "easting"]) .to_xarray() .z ) return df, correction_grid
[docs] def update_prisms_ds( prisms_ds: xr.Dataset, correction_grid: xr.DataArray, ) -> xr.Dataset: """ apply the corrections grid and update the prism tops, bottoms, topo, and densities. Parameters ---------- prisms_ds : xarray.Dataset harmonica prism layer correction_grid : xarray.DataArray grid of corrections to apply to the prism layer Returns ------- xarray.Dataset updated prism layer with new tops, bottoms, topo, and densities """ ds = prisms_ds.copy() # extract the reference value used to create the prisms zref = ds.attrs.get("zref") # extract the element-wise absolute value of the density contrast density_contrast = np.fabs(ds.density) # create topo from top and bottom topo_grid = xr.where(ds.density > 0, ds.top, ds.bottom) # apply correction to topo topo_grid += correction_grid # update the prism layer ds.prism_layer.update_top_bottom(surface=topo_grid, reference=zref) # update the density ds["density"] = xr.where(ds.top > zref, density_contrast, -density_contrast) # update the topo ds["topo"] = topo_grid return ds
[docs] def add_updated_prism_properties( prisms_df: pd.DataFrame, prisms_ds: xr.Dataset, iteration_number: int, ) -> pd.DataFrame: """ update the prisms dataframe the the new prism tops, bottoms, topo, and densities the iteration number, starting at 1 not 0 Parameters ---------- prisms_df : pandas.DataFrame dataframe of prism properties prisms_ds : xarray.Dataset dataset of prism properties iteration_number : int the iteration number, starting at 1 not 0 Returns ------- pandas.DataFrame updated prism dataframe with new tops, bottoms, topo, and densities """ df = prisms_df.copy() ds = prisms_ds.copy() # turn back into dataframe prisms_iter = ds.to_dataframe().reset_index().dropna().astype(float) # add new cols to dict dict_of_cols = { f"iter_{iteration_number}_top": prisms_iter.top, f"iter_{iteration_number}_bottom": prisms_iter.bottom, f"iter_{iteration_number}_density": prisms_iter.density, f"iter_{iteration_number}_layer": prisms_iter.topo, } df = pd.concat([df, pd.DataFrame(dict_of_cols)], axis=1) df["topo"] = prisms_iter.topo return df
[docs] def create_topography( method: str, region: tuple[float, float, float, float], spacing: float, dampings: list[float] | None = None, registration: str = "g", upwards: float | None = None, constraints_df: pd.DataFrame | None = None, weights: pd.Series | NDArray | None = None, weights_col: str | None = None, upper_confining_layer: xr.DataArray | None = None, lower_confining_layer: xr.DataArray | None = None, ) -> xr.DataArray: """ Create a grid of topography data from either the interpolation of point data or creating a grid of constant value. Optionally, a subset of point data can be interpolated and then merged with an existing grid. The this, constraints_df must contain two additional columns of booleans, `inside` which is True for points inside the region of interest, and False otherwise, and `buffer` which is True for points within a buffer region around the region of interest, and False otherwise. Inside and Buffer points are used to interpolated the data, and then the interpolated data (without the buffer zone) is merged with the points outside the region of interest. Parameters ---------- method : str method to use, either 'flat' or 'splines' region : tuple[float, float, float, float] region of the grid spacing : float spacing of the grid dampings : list[float] | None, optional damping values to use in spline cross validation for method "spline", by default None registration : str, optional choose between gridline "g" or pixel "p" registration, by default "g" upwards : float | None, optional constant value to use for method "flat", by default None constraints_df : pandas.DataFrame | None, optional dataframe with column 'upwards' to use for method "splines", and optionally columns 'inside' and 'buffer', by default None weights : pandas.Series | numpy.ndarray | None, optional weight to use for fitting the spline. Typically, this should be 1 over the data uncertainty squared, by default None weights_col : str | None, optional instead of passing the weights, pass the name of the column containing the weights, by default None upper_confining_layer : xarray.DataArray | None, optional layer which the inverted topography should always be below, by default None lower_confining_layer : xarray.DataArray | None, optional layer which the inverted topography should always be above, by default None Returns ------- xarray.DataArray a topography grid """ if method == "flat": if registration == "g": pixel_register = False elif registration == "p": pixel_register = True else: msg = "registration must be 'g' or 'p'" raise ValueError(msg) if upwards is None: msg = "upwards must be provided if method is `flat`" raise ValueError(msg) # create grid of coordinates (x, y) = vd.grid_coordinates( # pylint: disable=unbalanced-tuple-unpacking region=region, spacing=spacing, pixel_register=pixel_register, ) # make flat topography of value = upwards grid = vd.make_xarray_grid( (x, y), np.ones_like(x) * upwards, data_names="upward", dims=("northing", "easting"), ).upward elif method == "splines": # get coordinates of the constraint points if constraints_df is None: msg = "constraints_df must be provided if method is `splines`" raise ValueError(msg) df = constraints_df.copy() # if only 1 point, return a flat topography if len(df) == 1: # create grid of coordinates (x, y) = vd.grid_coordinates( # pylint: disable=unbalanced-tuple-unpacking region=region, spacing=spacing, ) # make flat topography of value = upwards grid = vd.make_xarray_grid( (x, y), np.ones_like(x) * df.upward.values, data_names="upward", dims=("northing", "easting"), ).upward if pd.Series(["inside", "buffer"]).isin(df.columns).all(): df_to_interpolate = df[df.inside | df.buffer] df_outside_buffer = df[(df.inside == False) & (df.buffer == False)] # noqa: E712 # pylint: disable=singleton-comparison coords = (df_to_interpolate.easting, df_to_interpolate.northing) if weights_col is not None: weights = df_to_interpolate[weights_col] # run CV for fitting a spline to the data spline = best_spline_cv( coordinates=coords, data=df_to_interpolate.upward, weights=weights, dampings=dampings, ) # grid the fitted spline at desired spacing and region inside_grid = spline.grid( region=region, spacing=spacing, ).scalars # merge interpolation of inner / buffer points with outside grid # outside_grid = df_outside_buffer.set_index( # ["northing", "easting"]).to_xarray().upward # outside_grid = vd.make_xarray_grid( # (df_outside_buffer.easting, df_outside_buffer.northing), # df_outside_buffer.upward, # data_names="upward", # ) outside_grid = pygmt.xyz2grd( x=df_outside_buffer.easting, y=df_outside_buffer.northing, z=df_outside_buffer.upward, region=region, spacing=spacing, ).rename({"x": "easting", "y": "northing"}) grid = inside_grid.where( outside_grid.isnull(), outside_grid, ) else: coords = (df.easting, df.northing) if weights_col is not None: weights = df[weights_col] # run CV for fitting a spline to the data spline = best_spline_cv( coordinates=coords, data=df.upward, weights=weights, dampings=dampings, ) # grid the fitted spline at desired spacing and region grid = spline.grid( region=region, spacing=spacing, ).scalars try: grid = grid.assign_attrs(damping=spline.damping_) except AttributeError: grid = grid.assign_attrs(damping=None) else: msg = "method must be 'flat' or 'splines'" raise ValueError(msg) # ensure grid doesn't cross supplied confining layers if upper_confining_layer is not None: grid = grid.where(grid <= upper_confining_layer, upper_confining_layer) if lower_confining_layer is not None: grid = grid.where(grid >= lower_confining_layer, lower_confining_layer) return grid
[docs] def grids_to_prisms( surface: xr.DataArray, reference: float | xr.DataArray, density: float | int | xr.DataArray, input_coord_names: tuple[str, str] = ("easting", "northing"), ) -> xr.Dataset: """ create a Harmonica layer of prisms with assigned densities. Parameters ---------- surface : xarray.DataArray data to use for prism surface reference : float | xarray.DataArray data or constant to use for prism reference, if value is below surface, prism will be inverted density : float | int | xarray.DataArray data or constant to use for prism densities, should be in the form of a density contrast across a surface (i.e. between air and rock). input_coord_names : tuple[str, str], optional names of the coordinates in the input dataarray, by default ("easting", "northing") Returns ------- xarray.Dataset a prisms layer with assigned densities """ # if density provided as a single number, use it for all prisms if isinstance(density, (float, int)): dens = density * np.ones_like(surface) # if density provided as a dataarray, map each density to the correct prisms elif isinstance(density, xr.DataArray): dens = density else: msg = "invalid density type, should be a number or DataArray" raise ValueError(msg) # create layer of prisms based off input dataarrays prisms = hm.prism_layer( coordinates=( surface[input_coord_names[0]].values, surface[input_coord_names[1]].values, ), surface=surface, reference=reference, properties={ "density": dens, }, ) prisms["thickness"] = prisms.top - prisms.bottom # add zref as an attribute return prisms.assign_attrs(zref=reference)
[docs] def best_spline_cv( coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray], data: pd.Series | NDArray, weights: pd.Series | NDArray | None = None, **kwargs: typing.Any, ) -> vd.Spline: """ Find the best damping parameter for a verde.SplineCV() fit. All kwargs are passed to the verde.SplineCV class. Parameters ---------- coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | \ numpy.ndarray] easting and northing coordinates of the data data : pandas.Series | numpy.ndarray data for fitting the spline to weights : pandas.Series | numpy.ndarray | None, optional if not None, then the weights assigned to each data point. Typically, this should be 1 over the data uncertainty squared, by default None Keyword Arguments ----------------- dampings : float | None The positive damping regularization parameter. Controls how much smoothness is imposed on the estimated forces. If None, no regularization is used, by default None force_coords : bool The easting and northing coordinates of the point forces. If None (default), then will be set to the data coordinates. cv : None | cross-validation generator Any scikit-learn cross-validation generator. If not given, will use the default set by :func:`verde.cross_val_score`. delayed : bool If True, will use :func:`dask.delayed.delayed` to dispatch computations and allow :mod:`dask` to execute the grid search in parallel (see note above). scoring : None | str | Callable The scoring function (or name of a function) used for cross-validation. Must be known to scikit-learn. See the description of *scoring* in :func:`sklearn.model_selection.cross_val_score` for details. If None, will fall back to the :meth:`verde.Spline.score` method. Returns ------- verde.Spline the spline which best fits the data """ kwargs = copy.deepcopy(kwargs) dampings = kwargs.pop("dampings", None) # if single damping value provided, convert to list if isinstance(dampings, typing.Iterable): pass else: dampings = [dampings] n_splits = 5 while n_splits > 0: try: spline = vd.SplineCV( dampings=dampings, cv=sklearn.model_selection.KFold( n_splits=n_splits, shuffle=True, random_state=0, ), **kwargs, ) spline.fit( coordinates, data, weights=weights, ) break except ValueError as e: log.error(e) msg = "decreasing number of splits by 1 until ValueError is resolved" log.warning(msg) if n_splits == 1: msg = "ValueError not resolved, fitting spline with no damping" log.warning(msg) spline = vd.Spline( damping=None, **kwargs, ) spline.fit( coordinates, data, weights=weights, ) n_splits -= 1 if len(dampings) > 1: try: log.info("Best SplineCV score: %s", spline.scores_.max()) except AttributeError: log.info("Best SplineCV score: %s", max(dask.compute(spline.scores_)[0])) log.info("Best damping: %s", spline.damping_) dampings_without_none = [i for i in dampings if i is not None] try: if spline.damping_ is None: pass elif len(dampings) > 2 and spline.damping_ in [ np.min(dampings_without_none), np.max(dampings_without_none), ]: log.warning( "Best damping value (%s) is at the limit of provided values (%s, %s) " "and thus is likely not a global minimum, expand the range of values " "test to ensure the best parameter value value is found.", spline.damping_, np.nanmin(dampings_without_none), np.nanmax(dampings_without_none), ) except AttributeError: pass return spline
[docs] def best_equivalent_source_damping( coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray, pd.Series | NDArray], data: pd.Series | NDArray, delayed: bool = False, weights: pd.Series | NDArray | None = None, **kwargs: typing.Any, ) -> hm.EquivalentSources: """ Find the best damping parameter for a harmonica.EquivalentSource() fit. All kwargs are passed to the harmonica.EquivalentSource class. Parameters ---------- coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | numpy.ndarray, \ pandas.Series | numpy.ndarray] tuple of easting, northing, and upward coordinates of the gravity data data : pandas.Series | numpy.ndarray the gravity data delayed : bool, optional compute the scores in parallel if True, by default False weights : numpy.ndarray | None, optional optional weight values for each gravity data point, by default None Keyword Arguments ----------------- damping : float | None The positive damping regularization parameter. Controls how much smoothness is imposed on the estimated coefficients. If None, no regularization is used. points : list[numpy.ndarray] | None List containing the coordinates of the equivalent point sources. Coordinates are assumed to be in the following order: (``easting``, ``northing``, ``upward``). If None, will place one point source below each observation point at a fixed relative depth below the observation point. Defaults to None. depth : float or str Parameter used to control the depth at which the point sources will be located. If a value is provided, each source is located beneath each data point (or block-averaged location) at a depth equal to its elevation minus the ``depth`` value. If set to ``"default"``, the depth of the sources will be estimated as 4.5 times the mean distance between first neighboring sources. This parameter is ignored if *points* is specified. Defaults to ``"default"``. block_size: float | tuple[float, float] | None Size of the blocks used on block-averaged equivalent sources. If a single value is passed, the blocks will have a square shape. Alternatively, the dimensions of the blocks in the South-North and West-East directions can be specified by passing a tuple. If None, no block-averaging is applied. This parameter is ignored if *points* are specified. Default to None. parallel : bool If True any predictions and Jacobian building is carried out in parallel through Numba's ``jit.prange``, reducing the computation time. If False, these tasks will be run on a single CPU. Default to True. dtype : str The desired data-type for the predictions and the Jacobian matrix. Default to ``"float64"``. Returns ------- harmonica.EquivalentSources the best fitted equivalent sources """ kwargs = copy.deepcopy(kwargs) dampings = kwargs.pop("dampings", None) kwargs.pop("damping", None) # if single damping value provided, convert to list if isinstance(dampings, typing.Iterable): pass else: dampings = [dampings] # pylint: disable=duplicate-code if np.isnan(coordinates).any(): msg = "coordinates contain NaN" raise ValueError(msg) if np.isnan(data).any(): msg = "data contains is NaN" raise ValueError(msg) scores = [] for d in dampings: eqs = hm.EquivalentSources( damping=d, **kwargs, ) score = np.mean( vd.cross_val_score( eqs, coordinates, data, delayed=delayed, weights=weights, scoring="r2" ) ) # pylint: enable=duplicate-code scores.append(score) if delayed: scores = dask.compute(scores)[0] else: pass best = np.argmax(scores) log.info("Best EqSources score: %s", scores[best]) log.info("Best damping: %s", dampings[best]) dampings_without_none = [i for i in dampings if i is not None] if dampings[best] is None: pass elif len(dampings) > 2 and dampings[best] in [ np.min(dampings_without_none), np.max(dampings_without_none), ]: log.warning( "Best damping value (%s) is at the limit of provided values (%s, %s) and " "thus is likely not a global minimum, expand the range of values test to " "ensure the best parameter value value is found.", dampings[best], np.nanmin(dampings_without_none), np.nanmax(dampings_without_none), ) return hm.EquivalentSources(damping=dampings[best]).fit( coordinates, data, weights=weights )
@deprecation.deprecated( # type: ignore[misc] deprecated_in="0.8.0", removed_in="0.14.0", current_version=invert4geom.__version__, details="function eq_sources_score has been moved to the cross_validation model.", )
[docs] def eq_sources_score(kwargs: typing.Any) -> float: """ deprecated function, use cross_validation.eq_sources_score instead. """ return cross_validation.eq_sources_score(**kwargs)
[docs] def gravity_decay_buffer( buffer_perc: float, spacing: float, inner_region: tuple[float, float, float, float], top: float, zref: float, obs_height: float, density: float, amplitude: float | None = None, wavelength: float | None = None, checkerboard: bool = False, as_density_contrast: bool = False, plot: bool = True, plot_profile: bool = True, progressbar: bool = False, ) -> tuple[float, float, int, xr.Dataset]: """ For a given buffer zone width (as percentage of x or y range) and domain parameters, calculate the max percent decay of the gravity anomaly within the region of interest. Parameters ---------- buffer_perc : float percentage of the widest dimension of inner_region to use as buffer zone spacing : float spacing of the prism layer and gravity observation points inner_region : tuple[float, float, float, float] region boundaries for the region of interest top : float height for the top of the prisms zref : float reference level for the prisms obs_height : float gravity observation height density : float density value for the prisms amplitude : float | None, optional if using `checkerboard`, this is the amplitude of each undulation, by default None wavelength : float | None, optional if using `checkerboard`, this is the wavelength of each undulation, by default None checkerboard : bool, optional use an undulating checkerboard for the topography instead of a flat surface, by default False as_density_contrast : bool, optional discretize the topography as a density contrast, resulting in no edge effects, by default False plot : bool, optional plot the results, by default True plot_profile : bool, optional plot a profile across the prism layer, by default True progressbar : bool, optional show a progressbar for the forward gravity calculation, by default False Returns ------- max_decay : float the maximum percentage decay of the gravity anomaly within the region of interest buffer_width : float width of the buffer zone buffer_cells : int number of cells in the buffer zone grav_ds : xarray.Dataset dataset of the forward gravity calculations """ if (checkerboard is False) & (top == zref): msg = "top and zref must be different if checkerboard is False" raise ValueError(msg) # get x and y range of interest region x_diff = np.abs(inner_region[0] - inner_region[1]) y_diff = np.abs(inner_region[2] - inner_region[3]) # pick the bigger range max_diff = max(x_diff, y_diff) # calc buffer as percentage of width buffer_width = max_diff * (buffer_perc / 100) # round to nearest multiple of spacing def round_to_input(num: float, multiple: float) -> float: return round(num / multiple) * multiple # round buffer width to nearest spacing interval buffer_width = round_to_input(buffer_width, spacing) # define buffer region buffer_region = vd.pad_region(inner_region, buffer_width) # calculate buffer width in terms of number of cells buffer_cells = buffer_width / spacing # create topography if checkerboard: synth = vd.synthetic.CheckerBoard( amplitude=amplitude, region=buffer_region, w_east=wavelength, w_north=wavelength, ) surface = synth.grid( spacing=spacing, data_names="upward", dims=("northing", "easting") ).upward surface += top else: surface = create_topography( method="flat", upwards=top, region=buffer_region, spacing=spacing, ) # create prism layer if as_density_contrast: # create prisms around mean value to compare to to calculate decay zref = surface.values.mean() # positive densities above, negative below dens = surface.copy() dens.values[:] = density dens = dens.where(surface >= zref, -density) # create prism layer with a mean zref prisms = grids_to_prisms( surface, zref, density=dens, ) else: prisms = grids_to_prisms( surface, zref, density=density, ) # create prisms around mean value to compare to to calculate decay zref = surface.values.mean() # positive densities above, negative below dens = surface.copy() dens.values[:] = density dens = dens.where(surface >= zref, -density) # create prism layer with a mean zref prisms_mean_zref = grids_to_prisms( surface, zref, density=dens, ) # create set of observation points data = vd.grid_coordinates( inner_region, spacing=spacing, extra_coords=obs_height, ) forward_df = pd.DataFrame( { "easting": data[0].ravel(), "northing": data[1].ravel(), "upward": data[2].ravel(), } ) # calculate forward gravity of prism layer forward_df["forward"] = prisms.prism_layer.gravity( coordinates=( forward_df.easting, forward_df.northing, forward_df.upward, ), field="g_z", progressbar=progressbar, ) # if checkerboard: forward_df["forward_no_edge_effects"] = prisms_mean_zref.prism_layer.gravity( coordinates=( forward_df.easting, forward_df.northing, forward_df.upward, ), field="g_z", progressbar=progressbar, ) grav_ds = forward_df.set_index(["northing", "easting"]).to_xarray() # shift forward gravity with and without edge effects so max value are equal shift = grav_ds.forward_no_edge_effects.max() - grav_ds.forward.max() grav_ds["forward_no_edge_effects"] -= shift dif = grav_ds.forward - grav_ds.forward_no_edge_effects max_grav = grav_ds.forward.values.max() max_decay = 100 * (max_grav - (max_grav + dif.values.min())) / max_grav if plot: try: plotting.edge_effects( grav_ds=grav_ds, prism_layer=prisms, inner_region=inner_region, plot_profile=plot_profile, ) except Exception as e: # pylint: disable=broad-exception-caught log.error("plotting failed with error: %s", e) return max_decay, buffer_width, int(buffer_cells), grav_ds