Source code for invert4geom.utils

import copy  # pylint: disable=too-many-lines
import os
import typing
import warnings
from contextlib import contextmanager

import dask
import harmonica as hm
import numpy as np
import pandas as pd
import polartoolkit as ptk
import pygmt
import pyproj
import sklearn
import verde as vd
import xarray as xr
import xrft
from numpy.typing import NDArray
from pykdtree.kdtree import KDTree  # pylint: disable=no-name-in-module

from invert4geom import logger, plotting


@contextmanager
def _log_level(level, log_instance=logger):  # type: ignore[no-untyped-def, has-type]
    "Run body with logger at a different level"
    saved_logger_level = log_instance.level
    log_instance.setLevel(level)
    try:
        yield saved_logger_level
    finally:
        log_instance.setLevel(saved_logger_level)


@contextmanager
def _environ(**env):  # type: ignore[no-untyped-def] # pylint: disable=missing-function-docstring
    """temporarily set/reset an environment variable"""
    originals = {k: os.environ.get(k) for k in env}
    for k, val in env.items():
        if val is None:
            os.environ.pop(k, None)
        else:
            os.environ[k] = val
    try:
        yield
    finally:
        for k, val in originals.items():
            if val is None:
                os.environ.pop(k, None)
            else:
                os.environ[k] = val


class DuplicateFilter:
    """
    Filters away duplicate log messages.
    Adapted from https://stackoverflow.com/a/60462619/18686384
    """

    def __init__(self, log):  # type: ignore[no-untyped-def]
        self.msgs = set()
        self.log = log

    def filter(self, record):  # type: ignore[no-untyped-def] # pylint: disable=missing-function-docstring
        msg = str(record.msg)
        is_duplicate = msg in self.msgs
        if not is_duplicate:
            self.msgs.add(msg)
        return not is_duplicate

    def __enter__(self):  # type: ignore[no-untyped-def]
        self.log.addFilter(self)

    def __exit__(self, exc_type, exc_val, exc_tb):  # type: ignore[no-untyped-def]
        self.log.removeFilter(self)


def get_epsg(coast: bool) -> tuple[str, bool]:
    """
    Get the EPSG code for plotting coastlines. If the environment variable
    'POLARTOOLKIT_EPSG' is not set, default to EPSG of 3857 and skip plotting
    coastlines.

    Parameters
    ----------
    coast : bool
        whether to plot coastlines, if True, will attempt to get EPSG code for plotting
        coastlines, if False, will skip plotting coastlines and return EPSG of 3857

    Returns
    -------
    tuple[str, bool]
        epsg code and whether to plot coastlines
    """
    try:
        epsg = ptk.default_epsg(None, None)
    except KeyError:
        if coast:
            msg = (
                "couldn't find environment variable 'POLARTOOLKIT_EPSG' with the EPSG "
                "code needed for plotting elements with geographic coordinates, such as"
                "coastlines. You can set the environment variable "
                "temporarily with `import os; os.environ['POLARTOOLKIT_EPSG'] = '3031' "
                "where 3031 is the EPSG projection of your region. For now defaulting to "
                "EPSG of 3857 and skipping plotting coastlines."
            )
            warnings.warn(
                msg,
                UserWarning,
                stacklevel=2,
            )
            logger.warning(msg)
            epsg = "3857"
            coast = False
        epsg = "3857"
    return epsg, coast


def _check_constraints_inside_gravity_region(
    constraints_df: pd.DataFrame,
    grav_ds: xr.Dataset,
) -> None:
    """check that all constraints are inside the region of the gravity data"""
    coord_names = grav_ds.coord_names

    grav_df = grav_ds.inv.df

    grav_region = vd.get_region((grav_df[coord_names[0]], grav_df[coord_names[1]]))
    inside = vd.inside(
        (constraints_df[coord_names[0]], constraints_df[coord_names[1]]),
        region=grav_region,
    )
    if not inside.all():
        msg = (
            "Some constraints are outside the region of the gravity data. "
            "This may result in unexpected behavior."
        )
        logger.warning(msg)


def rmse(data: NDArray, as_median: bool = False) -> float:
    """
    function to give the root mean/median squared error (RMSE) of data

    Parameters
    ----------
    data : numpy.ndarray
        input data
    as_median : bool, optional
        choose to give root median squared error instead, by default False

    Returns
    -------
    float
        RMSE value
    """
    if as_median:
        value: float = np.sqrt(np.nanmedian(data**2).item())
    else:
        value = np.sqrt(np.nanmean(data**2).item())

    return value


def _nearest_grid_fill(
    grid: xr.DataArray,
    method: str = "verde",
    crs: str | None = None,
) -> xr.DataArray:
    """
    fill missing values in a grid with the nearest value.

    Parameters
    ----------
    grid : xarray.DataArray
        grid with missing values
    method : str, optional
        choose method of filling, by default "verde"
    crs : str | None, optional
        if method is 'rioxarray', provide the crs of the grid, in format 'epsg:xxxx',
        by default None
    Returns
    -------
    xarray.DataArray
        filled grid
    """

    # TODO: also check out rasterio fillnodata() https://rasterio.readthedocs.io/en/latest/api/rasterio.fill.html#rasterio.fill.fillnodata
    # uses https://gdal.org/en/stable/api/gdal_alg.html#_CPPv414GDALFillNodata15GDALRasterBandH15GDALRasterBandHdiiPPc16GDALProgressFuncPv
    # can fill with nearest neighbor or inverse distance weighting

    # get coordinate names
    original_dims = list(grid.sizes.keys())

    # get original grid name
    original_name = grid.name

    if method == "rioxarray":
        filled: xr.DataArray = (
            grid.rio.write_crs(crs)
            .rio.set_spatial_dims(original_dims[1], original_dims[0])
            .rio.write_nodata(np.nan)
            .rio.interpolate_na(method="nearest")
            .rename(original_name)
        )
    elif method == "verde":
        df = vd.grid_to_table(grid)
        df_dropped = df[df[grid.name].notna()]
        coords = (df_dropped[grid.dims[1]], df_dropped[grid.dims[0]])
        region = vd.get_region((df[grid.dims[1]], df[grid.dims[0]]))
        filled = (
            vd.KNeighbors()
            .fit(coords, df_dropped[grid.name])
            .grid(
                region=region,
                shape=grid.shape,
                data_names=original_name,
                dims=(original_dims[1], original_dims[0]),
            )[original_name]
        )
    # elif method == "pygmt":
    #     filled = pygmt.grdfill(grid, mode="n", verbose="q").rename(original_name)
    else:
        msg = "method must be 'rioxarray', or 'verde'"
        raise ValueError(msg)

    # reset coordinate names if changed
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message="rename '")
        return filled.rename(
            {
                next(iter(filled.dims)): original_dims[0],
                list(filled.dims)[1]: original_dims[1],
            }
        )


def region_mask(
    grid: xr.DataArray,
    region: tuple[float, float, float, float],
) -> xr.DataArray:
    """
    Make a mask with values of 1 inside a region and 0 outside the region.
    """

    # get coordinate names
    coord_names = list(grid.sizes.keys())

    mask_easting = (grid[coord_names[0]] >= region[0]) & (
        grid[coord_names[0]] <= region[1]
    )
    mask_northing = (grid[coord_names[1]] >= region[2]) & (
        grid[coord_names[1]] <= region[3]
    )

    return xr.where(mask_easting & mask_northing, 1, 0)


def filter_grid(
    grid: xr.DataArray,
    filter_width: float | None = None,
    height_displacement: float | None = None,
    filter_type: str = "lowpass",
    pad_width_factor: int = 3,
    pad_mode: str = "linear_ramp",
    pad_constant: float | None = None,
    pad_end_values: float | None = None,
) -> xr.DataArray:
    """
    Apply a spatial filter to a grid.

    Parameters
    ----------
    grid : xarray.DataArray
        grid to filter the values of
    filter_width : float, optional
        width of the filter in meters, by default None
    height_displacement : float, optional
        height displacement for upward continuation, relative to observation height, by
        default None
    filter_type : str, optional
        type of filter to use from 'lowpass', 'highpass' 'up_deriv', 'easting_deriv',
        'northing_deriv', 'up_continue', or 'total_gradient', by default "lowpass"
    pad_width_factor : int, optional
        factor of grid width to pad the grid by, by default 3, which equates to a pad
        with a width of 1/3 of the grid width.
    pad_mode : str, optional
        mode of padding, can be "linear", by default "linear_ramp"
    pad_constant : float | None, optional
        constant value to use for padding, by default None
    pad_end_values : float | None, optional
        value to use for end of padding if pad_mode is "linear_ramp", by default None

    Returns
    -------
    xarray.DataArray
        a filtered grid
    """
    # get coordinate names
    original_dims = list(grid.sizes.keys())

    # get original grid name
    original_name = grid.name

    # if there are nan's, fill them with nearest neighbor
    if grid.isnull().any():  # noqa: PD003
        filled = _nearest_grid_fill(grid, method="verde")
    else:
        filled = grid.copy()

    # reset coordinate names if changed
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message="rename '")
        filled = filled.rename(
            {
                next(iter(filled.dims)): original_dims[0],
                list(filled.dims)[1]: original_dims[1],
            }
        )

    # define width of padding in each direction
    pad_width = {
        original_dims[1]: grid[original_dims[1]].size // pad_width_factor,
        original_dims[0]: grid[original_dims[0]].size // pad_width_factor,
    }

    if pad_mode == "constant":
        if pad_constant is None:
            pad_constant = filled.median()
        pad_end_values = None

    if (pad_mode == "linear_ramp") and (pad_end_values is None):
        pad_end_values = filled.median()

    if pad_mode != "constant":
        pad_constant = (
            None  # needed until https://github.com/xgcm/xrft/issues/211 is fixed
        )

    # apply padding
    pad_kwargs = {
        **pad_width,
        "mode": pad_mode,
        "constant_values": pad_constant,
        "end_values": pad_end_values,
    }

    padded = xrft.pad(
        filled,
        **pad_kwargs,
    )

    if filter_type == "lowpass":
        if filter_width is None:
            msg = "filter_width must be provided if filter_type is 'lowpass'"
            raise ValueError(msg)
        filt = hm.gaussian_lowpass(padded, wavelength=filter_width).rename("filt")
    elif filter_type == "highpass":
        if filter_width is None:
            msg = "filter_width must be provided if filter_type is 'highpass'"
            raise ValueError(msg)
        filt = hm.gaussian_highpass(padded, wavelength=filter_width).rename("filt")
    elif filter_type == "up_deriv":
        filt = hm.derivative_upward(padded).rename("filt")
    elif filter_type == "easting_deriv":
        filt = hm.derivative_easting(padded).rename("filt")
    elif filter_type == "northing_deriv":
        filt = hm.derivative_northing(padded).rename("filt")
    elif filter_type == "up_continue":
        if height_displacement is None:
            msg = "height_displacement must be provided if filter_type is 'up_continue'"
            raise ValueError(msg)
        filt = hm.upward_continuation(
            padded, height_displacement=height_displacement
        ).rename("filt")
    elif filter_type == "total_gradient":
        filt = hm.total_gradient_amplitude(padded).rename("filt")
    else:
        msg = (
            "filter_type must be 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', "
            "'northing_deriv', 'up_continue', or 'total_gradient'"
        )
        raise ValueError(msg)

    unpadded = xrft.unpad(filt, pad_width)

    # reset coordinate values to original (avoid rounding errors)
    unpadded = unpadded.assign_coords(
        {
            original_dims[0]: grid[original_dims[0]].to_numpy(),
            original_dims[1]: grid[original_dims[1]].to_numpy(),
        }
    )

    if grid.isnull().any():  # noqa: PD003
        result: xr.DataArray = xr.where(grid.notnull(), unpadded, grid)  # noqa: PD004
    else:
        result = unpadded.copy()

    # reset coordinate names if changed
    with warnings.catch_warnings():
        warnings.filterwarnings("ignore", message="rename '")
        result = result.rename(
            {
                next(iter(result.dims)): original_dims[0],
                # list(result.dims)[0]: original_dims[0],
                list(result.dims)[1]: original_dims[1],
            }
        )

    return result.rename(original_name)


def dist_nearest_points(
    targets: pd.DataFrame,
    data: pd.DataFrame | xr.DataArray | xr.Dataset,
    coord_names: tuple[str, str] | None = None,
) -> typing.Any:
    """
    for all grid cells calculate to the distance to the nearest target.

    Parameters
    ----------
    targets : pandas.DataFrame
        contains the coordinates of the targets
    data : pandas.DataFrame | xarray.DataArray | xarray.Dataset
        the grid data, in either gridded or tabular form
    coord_names : tuple[str, str] | None, optional
        the names of the coordinates for both the targets and the data, by default None

    Returns
    -------
    typing.Any
        the distance to the nearest target for each gridcell, in the same format as the
        input for `data`.
    """

    if coord_names is None:
        coord_names = ("easting", "northing")

    df_targets = targets[[coord_names[0], coord_names[1]]].copy()

    df_data: pd.DataFrame | xr.DataArray | xr.Dataset
    if isinstance(data, pd.DataFrame):
        df_data = data[list(coord_names)].copy()
    elif isinstance(data, xr.DataArray):
        df_grid = vd.grid_to_table(data).dropna()
        df_data = df_grid[[coord_names[0], coord_names[1]]].copy()  # pylint: disable=unsubscriptable-object
    elif isinstance(data, xr.Dataset):
        try:
            df_grid = vd.grid_to_table(data[next(iter(data.variables))]).dropna()
            # df_grid = vd.grid_to_table(data[list(data.variables)[0]]).dropna()
        except IndexError:
            df_grid = vd.grid_to_table(data).dropna()
        df_data = df_grid[[coord_names[0], coord_names[1]]].copy()  # pylint: disable=unsubscriptable-object

    min_dist, _ = KDTree(df_targets.to_numpy()).query(df_data.to_numpy(), 1)

    df_data["min_dist"] = min_dist

    if isinstance(data, pd.DataFrame):
        return df_data
    return df_data.set_index([coord_names[0], coord_names[1]][::-1]).to_xarray()


def normalize(
    x: NDArray,
    low: float = 0,
    high: float = 1,
) -> NDArray:
    """
    Normalize a list of numbers between provided values

    Parameters
    ----------
    x : NDArray
        numbers to normalize
    low : float, optional
        lower value for normalization, by default 0
    high : float, optional
        higher value for normalization, by default 1

    Returns
    -------
    NDArray
        a normalized list of numbers
    """
    min_val = np.min(x)
    max_val = np.max(x)

    norm = (x - min_val) / (max_val - min_val)

    return norm * (high - low) + low


def normalize_xarray(
    da: xr.DataArray,
    low: float = 0,
    high: float = 1,
) -> xr.DataArray:
    """
    Normalize a grid between provided values

    Parameters
    ----------
    da : xarray.DataArray
        grid to normalize
    low : float, optional
        lower value for normalization, by default 0
    high : float, optional
        higher value for normalization, by default 1

    Returns
    -------
    xarray.DataArray
        a normalized grid
    """
    # min_val = da.to_numpy().min()
    # max_val = da.to_numpy().max()

    da = da.copy()

    min_val = da.quantile(0)
    max_val = da.quantile(1)

    da2: xr.DataArray = (high - low) * (
        ((da - min_val) / (max_val - min_val)).clip(0, 1)
    ) + low

    return da2.drop_vars("quantile")


def scale_normalized(
    sample: NDArray,
    bounds: NDArray,
) -> NDArray:
    """
    Rescales the sample space into the unit hypercube, bounds = [0,1]

    Parameters
    ----------
    sample : NDArray
        sampled values
    bounds : NDArray
        bounds of the sampling

    Returns
    -------
    NDArray
        sampled values normalized from 0 to 1
    """

    scaled_sample = np.zeros(sample.shape)

    for j in range(sample.shape[1]):
        scaled_sample[:, j] = (sample[:, j] - bounds[j][0]) / (
            bounds[j][1] - bounds[j][0]
        )

    return scaled_sample


[docs] def normalized_mindist( points: pd.DataFrame, grid: xr.DataArray, low: float | None = None, high: float | None = None, mindist: float | None = None, region: list[float] | None = None, ) -> xr.DataArray: """ Find the minimum distance between each grid cell and the nearest point. If low and high are provided, normalize the min dists grid between these values. If region is provided, all grid cells outside region are set to a distance of 0. Parameters ---------- points : pandas.DataFrame coordinates of the points grid : xarray.DataArray gridded data to find min dists for each grid cell low : float | None, optional lower value for normalization, by default None high : float | None, optional higher value for normalization, by default None mindist : float | None, optional the minimum allowed distance, all values below are set equal to, by default None region : list[float] | None, optional bounding region for which all grid cells outside will be set to low, by default None Returns ------- xarray.DataArray grid of normalized minimum distances """ grid = copy.deepcopy(grid) # get coordinate names original_dims = list(grid.sizes.keys()) constraints_df = points.copy() min_dist: xr.DataArray = dist_nearest_points( targets=constraints_df, data=grid, coord_names=(str(original_dims[1]), str(original_dims[0])), ).min_dist # set points < mindist to low if mindist is not None: min_dist = xr.where(min_dist < mindist, 0, min_dist) # set points outside of region to low if region is not None: df = vd.grid_to_table(min_dist) df["are_inside"] = vd.inside( (df[original_dims[1]], df[original_dims[0]]), region=region, ) new_min_dist = df.set_index([original_dims[0], original_dims[1]]).to_xarray() new_min_dist = xr.where(new_min_dist.are_inside, new_min_dist, 0) # add nans back new_min_dist = xr.where(min_dist.isnull(), np.nan, new_min_dist) # noqa: PD003 min_dist = new_min_dist.min_dist # normalize from low to high if (low is None) & (high is None): pass else: assert low is not None assert high is not None min_dist = normalize_xarray(min_dist, low=low, high=high) return min_dist
def sample_grids( df: pd.DataFrame, grid: str | xr.DataArray, sampled_name: str, **kwargs: typing.Any, ) -> pd.DataFrame: """ Sample data at every point along a line Parameters ---------- df : pandas.DataFrame Dataframe containing columns 'easting', 'northing', 'longitude', 'latitude' or columns with names defined by kwarg "coord_names". grid : str or xarray.DataArray Grid to sample, either file name or xarray.DataArray sampled_name : str, Name for sampled column Returns ------- pandas.DataFrame Dataframe with new column (sampled_name) of sample values from (grid) """ return ptk.sample_grids( df=df, grid=grid, sampled_name=sampled_name, **kwargs, ) def get_spacing(prisms_df: pd.DataFrame) -> None: # noqa: ARG001 """ DEPRECATED: function has been removed """ # pylint: disable=W0613 msg = "Function `get_spacing` deprecated" raise DeprecationWarning(msg)
[docs] def create_topography( method: str, region: tuple[float, float, float, float], spacing: float, dampings: list[float] | None = None, registration: str = "g", upward: float | None = None, upwards: float | None = None, coord_names: tuple[str, str] = ("easting", "northing"), projection: pyproj.Proj | None = None, constraints_df: pd.DataFrame | None = None, weights: pd.Series | NDArray | None = None, weights_col: str | None = None, block_size: float | None = None, block_reduction: str = "median", upper_confining_layer: xr.DataArray | None = None, lower_confining_layer: xr.DataArray | None = None, dataset_to_add: xr.Dataset | None = None, ) -> xr.Dataset: """ Create a grid of topography data from either the interpolation (with splines) of point data or creating a grid of constant value. Optionally, a subset of point data can be interpolated and then merged with an existing grid. To do this, ``constraints_df`` must contain two additional columns of booleans, ``inside`` which is True for points inside the region of interest, and False otherwise, and ``buffer`` which is True for points within a buffer region around the region of interest, and False otherwise. Inside and Buffer points are used to interpolated the data, and then the interpolated data (without the buffer zone) is merged with the points outside the region of interest. For interpolations, ``block_size`` can be supplied to perform a block-median filtering of the points before fitting the spline, reducing the computational cost. Parameters ---------- method : str method to use, either ``flat`` or ``splines`` region : tuple[float, float, float, float] bounding region to use for the output grid. If you are using projected coordinates (meters), this is in the format (min_easting, max_easting, min_northing, max_northing). If using geographic coordinates (latitude and longitude), this is in the format (min_longitude, max_longitude, min_latitude, max_latitude) spacing : float spacing of the grid in meters if using projected coordinates, or in decimal degrees if using geographic coordinates dampings : list[float] | None, optional damping values to use in spline cross validation for method ``spline``, by default None registration : str, optional choose between gridline ``g`` or pixel ``p`` registration, by default ``g`` upward : float | None, optional constant elevation in meters to use for method ``flat``, by default None upwards : float | None, optional deprecated, use `upward` instead, by default None coord_names : tuple[str, str], optional names to use for coordinates, should be either (easting, northing) or (longitude, latitude), by default (easting, northing) projection : pyproj.Proj | None, optional if using geographic coordinates (latitude and longitude) and the splines method, provide a pyproj Proj object to convert to cartesian coordinates, by default None constraints_df : pandas.DataFrame | None, optional dataframe with column 'upward' to use for method ``splines``, and optionally columns ``inside`` and ``buffer``, by default None weights : pandas.Series | numpy.ndarray | None, optional weight to use for fitting the spline. Typically, this should be 1 over the data uncertainty squared, by default None weights_col : str | None, optional instead of passing the weights, pass the name of the column containing the weights, by default None block_size : float | None, optional block size to use for block-reduction of constraint points before fitting splines. In meters if using projected coordinates, or in decimal degrees if using geographic coordinates. If None, no block-reduction is applied, by default None block_reduction: str, optional type of block reduction to apply, if ``median``, weights will be ignored, of ``mean``, and weights are provided, they will be used in the block reduction. Defaults to ``median``. upper_confining_layer : xarray.DataArray | None, optional layer which the inverted topography should always be below, by default None lower_confining_layer : xarray.DataArray | None, optional layer which the inverted topography should always be above, by default None dataset_to_add : xarray.Dataset | None, optional additional variables to add to the topography dataset, such as variables `mask` or `geocentric_radius`, by default None Returns ------- xarray.Dataset a topography grid """ if upwards is not None: warnings.warn( "`upwards` is deprecated, please use `upward` instead.", UserWarning, stacklevel=2, ) upward = upwards if method == "flat": if registration == "g": pixel_register = False elif registration == "p": pixel_register = True else: msg = "registration must be 'g' or 'p'" raise ValueError(msg) if upward is None: msg = "upward must be provided if method is `flat`" raise ValueError(msg) # create grid of coordinates (x, y) = vd.grid_coordinates( # pylint: disable=unbalanced-tuple-unpacking region=region, spacing=spacing, pixel_register=pixel_register, ) # make flat topography of value = upward grid = vd.make_xarray_grid( (x, y), np.ones_like(x) * upward, data_names="upward", dims=(coord_names[1], coord_names[0]), ).upward elif method == "splines": # pylint: disable=too-many-nested-blocks # get coordinates of the constraint points if constraints_df is None: msg = "constraints_df must be provided if method is `splines`" raise ValueError(msg) df = constraints_df.copy() # if only 1 point, return a flat topography if len(df) == 1: # create grid of coordinates (x, y) = vd.grid_coordinates( # pylint: disable=unbalanced-tuple-unpacking region=region, spacing=spacing, ) # make flat topography of value = upward grid = vd.make_xarray_grid( (x, y), np.ones_like(x) * df.upward.to_numpy(), data_names="upward", dims=(coord_names[1], coord_names[0]), ).upward else: if (coord_names == ("longitude", "latitude")) & (projection is None): msg = ( "If using geographic coordinates (latitude and longitude) and the " "spline method, a pyproj.Proj projection must be provided to " "convert to projected coordinates." ) raise ValueError(msg) # convert to projected coordinates if needed if projection is not None: projected_points = [ projection(lon, lat) for (lon, lat) in zip(df.longitude, df.latitude, strict=True) ] eastings, northings = zip(*projected_points, strict=True) df["easting"] = eastings df["northing"] = northings # convert region in lat/lon to projected easting/northing corners_lonlat = [ (region[0], region[2]), (region[1], region[2]), (region[1], region[3]), (region[0], region[3]), ] projected_corners = [ projection(lon, lat) for (lon, lat) in corners_lonlat ] eastings, northings = zip(*projected_corners, strict=True) cartesian_region = ( min(eastings), max(eastings), min(northings), max(northings), ) cartesian_spacing = ( spacing * 111139 ) # approx conversion from degrees latitude to meters if block_size is not None: block_size = ( block_size * 111139 ) # approx conversion from degrees latitude to meters else: cartesian_region = region cartesian_spacing = spacing if pd.Series(["inside", "buffer"]).isin(df.columns).all(): df_to_interpolate = df[df.inside | df.buffer] df_outside_buffer = df[(df.inside == False) & (df.buffer == False)] # noqa: E712 # pylint: disable=singleton-comparison coords = ( df_to_interpolate[coord_names[0]], df_to_interpolate[coord_names[1]], ) data = df_to_interpolate.upward if weights_col is not None: weights = df_to_interpolate[weights_col] if block_size is not None: if block_reduction == "mean": reduction = np.mean elif block_reduction == "median": reduction = np.median if weights is not None: msg = "weights are ignored when block_reduction is 'median'" logger.warning(msg) else: msg = "block_reduction must be 'mean' or 'median'" raise ValueError(msg) reducer = vd.BlockReduce( reduction=reduction, spacing=block_size, region=cartesian_region, center_coordinates=True, ) coords, data = reducer.filter( coordinates=coords, data=data, weights=weights, ) # run CV for fitting a spline to the data with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="The mindist parameter of verde.Spline" ) warnings.filterwarnings( "ignore", message="The default scoring will change" ) spline = optimal_spline_damping( coordinates=coords, data=data, weights=weights, dampings=dampings, ) # grid the fitted spline at desired spacing and region inside_grid = spline.grid( region=region, spacing=spacing, projection=projection, dims=(coord_names[1], coord_names[0]), ).scalars # merge interpolation of inner / buffer points with outside grid # outside_grid = df_outside_buffer.set_index( # ["northing", "easting"]).to_xarray().upward # outside_grid = vd.make_xarray_grid( # (df_outside_buffer.easting, df_outside_buffer.northing), # df_outside_buffer.upward, # data_names="upward", # ) outside_grid = pygmt.xyz2grd( x=df_outside_buffer[coord_names[0]], y=df_outside_buffer[coord_names[1]], z=df_outside_buffer.upward, region=cartesian_region, spacing=cartesian_spacing, ).rename({"x": coord_names[0], "y": coord_names[1]}) grid = inside_grid.where( outside_grid.isnull(), # noqa: PD003 outside_grid, ) else: coords = (df.easting, df.northing) data = df.upward if weights_col is not None: weights = df[weights_col] if block_size is not None: if block_reduction == "mean" and weights is not None: reducer = vd.BlockMean( spacing=block_size, region=cartesian_region, center_coordinates=True, ) coords, data, weights = reducer.filter( coordinates=coords, data=data, weights=weights, ) else: if block_reduction == "mean": reduction = np.mean elif block_reduction == "median": reduction = np.median if weights is not None: msg = "weights are ignored when block_reduction is 'median', to use weights, set block_reduction to 'mean'" logger.warning(msg) else: msg = "block_reduction must be 'mean' or 'median'" raise ValueError(msg) reducer = vd.BlockReduce( reduction=reduction, spacing=block_size, region=cartesian_region, center_coordinates=True, ) coords, data = reducer.filter( coordinates=coords, data=data, weights=weights, ) # run CV for fitting a spline to the data with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message="The mindist parameter of verde.Spline" ) warnings.filterwarnings( "ignore", message="The default scoring will change" ) spline = optimal_spline_damping( coordinates=coords, data=data, weights=weights, dampings=dampings, ) # grid the fitted spline at desired spacing and region grid = spline.grid( region=region, spacing=spacing, projection=projection, dims=(coord_names[1], coord_names[0]), ).scalars try: grid = grid.assign_attrs(damping=spline.damping_) except UnboundLocalError: grid = grid.assign_attrs(damping=None) else: msg = "method must be 'flat' or 'splines'" raise ValueError(msg) # ensure grid doesn't cross supplied confining layers if upper_confining_layer is None or np.all(upper_confining_layer.isnull()): # noqa: PD003 pass else: da = ptk.resample_grid( upper_confining_layer, spacing=spacing, region=region, registration=registration, ) original_dims = list(da.sizes.keys()) da = da.rename( {original_dims[1]: coord_names[0], original_dims[0]: coord_names[1]} ) grid = xr.where(grid > da, da, grid) if lower_confining_layer is None or np.all(lower_confining_layer.isnull()): # noqa: PD003 pass else: da = ptk.resample_grid( lower_confining_layer, spacing=spacing, region=region, registration=registration, ) original_dims = list(da.sizes.keys()) da = da.rename( {original_dims[1]: coord_names[0], original_dims[0]: coord_names[1]} ) grid = xr.where(grid < da, da, grid) ds = grid.to_dataset(name="upward") if dataset_to_add is not None: # check datasets are aligned try: _ = xr.align(ds, dataset_to_add, join="exact") except xr.AlignmentError as err: msg = "`dataset_to_add` should be exactly aligned with the create topography dataset" raise xr.AlignmentError(msg) from err ds = ds.merge(dataset_to_add) return ds
def grid_to_model( surface: xr.DataArray, reference: float | xr.DataArray, density: float | int | xr.DataArray, model_type: str, ) -> xr.Dataset: """ create a Harmonica layer of prisms or tesseroids with assigned densities. Parameters ---------- surface : xarray.DataArray data to use for model surface reference : float | xarray.DataArray data or constant to use for model reference, if value is below surface, prism/tesseroid will be inverted density : float | int | xarray.DataArray data or constant to use for model densities, should be in the form of a density contrast across a surface (i.e. between air and rock). model_type : str type of model to create, either 'prisms' or 'tesseroids' Returns ------- xarray.Dataset a prism or tesseroid layer with assigned densities """ # if density provided as a single number, use it for all prisms/tesseroids if isinstance(density, (float, int)): dens = density * np.ones_like(surface) # if density provided as a dataarray, map each density to the correct prisms/tesseroids elif isinstance(density, xr.DataArray): dens = density else: msg = "invalid density type, should be a number or DataArray" raise ValueError(msg) # create layer of prisms/tesseroids based off input dataarrays if model_type == "tesseroids": model = hm.tesseroid_layer( coordinates=( surface["longitude"].to_numpy(), surface["latitude"].to_numpy(), ), surface=surface, reference=reference, properties={ "density": dens, }, ) elif model_type == "prisms": model = hm.prism_layer( coordinates=( surface["easting"].to_numpy(), surface["northing"].to_numpy(), ), surface=surface, reference=reference, properties={ "density": dens, }, ) else: msg = "model_type must be 'prisms' or 'tesseroids'" raise ValueError(msg) model["thickness"] = model.top - model.bottom # add zref as an attribute return model.assign_attrs(zref=reference, model_type=model_type) def grids_to_prisms( surface: xr.DataArray, # noqa: ARG001 reference: float | xr.DataArray, # noqa: ARG001 density: float | int | xr.DataArray, # noqa: ARG001 input_coord_names: tuple[str, str] = ("easting", "northing"), # noqa: ARG001 ) -> None: """ DEPRECATED: use the function `grid_to_model` instead """ # pylint: disable=W0613 msg = "Function `grids_to_prisms` deprecated, use `grid_to_model` instead" raise DeprecationWarning(msg) def best_spline_cv( coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray], # noqa: ARG001 # pylint: disable=unused-argument data: pd.Series | NDArray, # noqa: ARG001 # pylint: disable=unused-argument weights: pd.Series | NDArray | None = None, # noqa: ARG001 # pylint: disable=unused-argument **kwargs: typing.Any, # noqa: ARG001 # pylint: disable=unused-argument ) -> None: """ DEPRECATED: use the function `optimal_spline_damping` instead """ msg = "Function `best_spline_cv` deprecated, use `optimal_spline_damping` instead" raise DeprecationWarning(msg)
[docs] def optimal_spline_damping( coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray], data: pd.Series | NDArray, weights: pd.Series | NDArray | None = None, **kwargs: typing.Any, ) -> vd.Spline: """ Find the best damping parameter for a verde.SplineCV() fit. All kwargs are passed to the verde.SplineCV class. Parameters ---------- coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | \ numpy.ndarray] easting and northing coordinates of the data data : pandas.Series | numpy.ndarray data for fitting the spline to weights : pandas.Series | numpy.ndarray | None, optional if not None, then the weights assigned to each data point. Typically, this should be 1 over the data uncertainty squared, by default None Keyword Arguments ----------------- dampings : float | None The positive damping regularization parameter. Controls how much smoothness is imposed on the estimated forces. If None, no regularization is used, by default None force_coords : bool The easting and northing coordinates of the point forces. If None (default), then will be set to the data coordinates. cv : None | cross-validation generator Any scikit-learn cross-validation generator. If not given, will use the default set by :func:`verde.cross_val_score`. delayed : bool If True, will use :func:`dask.delayed.delayed` to dispatch computations and allow :mod:`dask` to execute the grid search in parallel (see note above). scoring : None | str | Callable The scoring function (or name of a function) used for cross-validation. Must be known to scikit-learn. See the description of *scoring* in :func:`sklearn.model_selection.cross_val_score` for details. If None, will fall back to the :meth:`verde.Spline.score` method. Returns ------- verde.Spline the spline which best fits the data """ kwargs = copy.deepcopy(kwargs) dampings = kwargs.pop("dampings", None) # if single damping value provided, convert to list if isinstance(dampings, typing.Iterable): pass else: dampings = [dampings] n_splits = 5 while n_splits > 0: try: spline = vd.SplineCV( dampings=dampings, cv=sklearn.model_selection.KFold( n_splits=n_splits, shuffle=True, random_state=0, ), **kwargs, ) spline.fit( coordinates, data, weights=weights, ) break except ValueError as e: logger.error(e) msg = "decreasing number of splits by 1 until ValueError is resolved" logger.warning(msg) if n_splits == 1: msg = "ValueError not resolved, fitting spline with no damping" logger.warning(msg) spline = vd.Spline( damping=None, **kwargs, ) spline.fit( coordinates, data, weights=weights, ) n_splits -= 1 if len(dampings) > 1: try: logger.info("Best SplineCV score: %s", spline.scores_.max()) except AttributeError: logger.info("Best SplineCV score: %s", max(dask.compute(spline.scores_)[0])) logger.info("Best damping: %s", spline.damping_) dampings_without_none = [i for i in dampings if i is not None] try: if spline.damping_ is None: pass elif len(dampings) > 2 and spline.damping_ in [ np.min(dampings_without_none), np.max(dampings_without_none), ]: logger.warning( "Best damping value (%s) is at the limit of provided values (%s, %s) " "and thus is likely not a global minimum, expand the range of values " "test to ensure the best parameter value value is found.", spline.damping_, np.nanmin(dampings_without_none), np.nanmax(dampings_without_none), ) except AttributeError: pass return spline
def best_equivalent_source_damping( coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray, pd.Series | NDArray], data: pd.Series | NDArray, delayed: bool = False, weights: pd.Series | NDArray | None = None, **kwargs: typing.Any, ) -> hm.EquivalentSources: """ Find the best damping parameter for a harmonica.EquivalentSource() fit. All kwargs are passed to the harmonica.EquivalentSource class. Parameters ---------- coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | numpy.ndarray, \ pandas.Series | numpy.ndarray] tuple of easting, northing, and upward coordinates of the gravity data data : pandas.Series | numpy.ndarray the gravity data delayed : bool, optional compute the scores in parallel if True, by default False weights : numpy.ndarray | None, optional optional weight values for each gravity data point, by default None Keyword Arguments ----------------- damping : float | None The positive damping regularization parameter. Controls how much smoothness is imposed on the estimated coefficients. If None, no regularization is used. points : list[numpy.ndarray] | None List containing the coordinates of the equivalent point sources. Coordinates are assumed to be in the following order: (``easting``, ``northing``, ``upward``). If None, will place one point source below each observation point at a fixed relative depth below the observation point. Defaults to None. depth : float or str Parameter used to control the depth at which the point sources will be located. If a value is provided, each source is located beneath each data point (or block-averaged location) at a depth equal to its elevation minus the ``depth`` value. If set to ``"default"``, the depth of the sources will be estimated as 4.5 times the mean distance between first neighboring sources. This parameter is ignored if *points* is specified. Defaults to ``"default"``. block_size: float | tuple[float, float] | None Size of the blocks used on block-averaged equivalent sources. If a single value is passed, the blocks will have a square shape. Alternatively, the dimensions of the blocks in the South-North and West-East directions can be specified by passing a tuple. If None, no block-averaging is applied. This parameter is ignored if *points* are specified. Default to None. parallel : bool If True any predictions and Jacobian building is carried out in parallel through Numba's ``jit.prange``, reducing the computation time. If False, these tasks will be run on a single CPU. Default to True. dtype : str The desired data-type for the predictions and the Jacobian matrix. Default to ``"float64"``. Returns ------- harmonica.EquivalentSources the best fitted equivalent sources """ kwargs = copy.deepcopy(kwargs) dampings = kwargs.pop("dampings", None) kwargs.pop("damping", None) # if single damping value provided, convert to list if isinstance(dampings, typing.Iterable): pass else: dampings = [dampings] # pylint: disable=duplicate-code if np.isnan(coordinates).any(): msg = "coordinates contain NaN" raise ValueError(msg) if np.isnan(data).any(): msg = "data contains is NaN" raise ValueError(msg) scores = [] for d in dampings: eqs = hm.EquivalentSources( damping=d, **kwargs, ) score = np.mean( vd.cross_val_score( eqs, coordinates, data, delayed=delayed, weights=weights, scoring="r2" ) ) # pylint: enable=duplicate-code scores.append(score) if delayed: scores = dask.compute(scores)[0] else: pass best = np.argmax(scores) logger.info("Best EqSources score: %s", scores[best]) logger.info("Best damping: %s", dampings[best]) dampings_without_none = [i for i in dampings if i is not None] if dampings[best] is None: pass elif len(dampings) > 2 and dampings[best] in [ np.min(dampings_without_none), np.max(dampings_without_none), ]: logger.warning( "Best damping value (%s) is at the limit of provided values (%s, %s) and " "thus is likely not a global minimum, expand the range of values test to " "ensure the best parameter value value is found.", dampings[best], np.nanmin(dampings_without_none), np.nanmax(dampings_without_none), ) return hm.EquivalentSources(damping=dampings[best]).fit( coordinates, data, weights=weights ) def gravity_decay_buffer( buffer_perc: float, spacing: float, inner_region: tuple[float, float, float, float], top: float, zref: float, obs_height: float, density: float, amplitude: float | None = None, wavelength: float | None = None, checkerboard: bool = False, as_density_contrast: bool = False, plot: bool = True, plot_profile: bool = True, progressbar: bool = False, ) -> tuple[float, float, int, xr.Dataset]: """ For a given buffer zone width (as percentage of x or y range) and domain parameters, calculate the max percent decay of the gravity anomaly within the region of interest. Parameters ---------- buffer_perc : float percentage of the widest dimension of inner_region to use as buffer zone spacing : float spacing of the prism layer and gravity observation points in meters inner_region : tuple[float, float, float, float] region boundaries for the region of interest in meters top : float height in meters for the top of the prisms zref : float reference level in meters for the prisms obs_height : float gravity observation height in meters density : float density value for the prisms in kg/m^3 amplitude : float | None, optional if using `checkerboard`, this is the amplitude of each undulation, by default None wavelength : float | None, optional if using `checkerboard`, this is the wavelength of each undulation, by default None checkerboard : bool, optional use an undulating checkerboard for the topography instead of a flat surface, by default False as_density_contrast : bool, optional discretize the topography as a density contrast, resulting in no edge effects, by default False plot : bool, optional plot the results, by default True plot_profile : bool, optional plot a profile across the prism layer, by default True progressbar : bool, optional show a progressbar for the forward gravity calculation, by default False Returns ------- max_decay : float the maximum percentage decay of the gravity anomaly within the region of interest buffer_width : float width of the buffer zone in meters buffer_cells : int number of cells in the buffer zone grav_ds : xarray.Dataset dataset of the forward gravity calculations """ if (checkerboard is False) & (top == zref): msg = "top and zref must be different if checkerboard is False" raise ValueError(msg) # get x and y range of interest region x_diff = np.abs(inner_region[0] - inner_region[1]) y_diff = np.abs(inner_region[2] - inner_region[3]) # pick the bigger range max_diff = max(x_diff, y_diff) # calc buffer as percentage of width buffer_width = max_diff * (buffer_perc / 100) # round to nearest multiple of spacing def round_to_input(num: float, multiple: float) -> float: return round(num / multiple) * multiple # round buffer width to nearest spacing interval buffer_width = round_to_input(buffer_width, spacing) # define buffer region buffer_region = vd.pad_region(inner_region, buffer_width) # calculate buffer width in terms of number of cells buffer_cells = buffer_width / spacing # create topography if checkerboard: synth = vd.synthetic.CheckerBoard( amplitude=amplitude, region=buffer_region, w_east=wavelength, w_north=wavelength, ) surface = synth.grid( spacing=spacing, data_names="upward", dims=("northing", "easting") ).upward surface += top else: surface = create_topography( method="flat", upward=top, region=buffer_region, spacing=spacing, ).upward # create prism layer if as_density_contrast: # create prisms around mean value to compare to to calculate decay zref = surface.to_numpy().mean() # positive densities above, negative below dens = surface.copy() dens.to_numpy()[:] = density dens = dens.where(surface >= zref, -density) # create prism layer with a mean zref model = grid_to_model( surface, zref, density=dens, model_type="prisms", ) else: model = grid_to_model( surface, zref, density=density, model_type="prisms", ) # create prisms around mean value to compare to to calculate decay zref = surface.to_numpy().mean() # positive densities above, negative below dens = surface.copy() dens.to_numpy()[:] = density dens = dens.where(surface >= zref, -density) # create prism layer with a mean zref model_mean_zref = grid_to_model( surface, zref, density=dens, model_type="prisms", ) # create set of observation points data = vd.grid_coordinates( inner_region, spacing=spacing, extra_coords=obs_height, ) forward_df = pd.DataFrame( { "easting": data[0].ravel(), "northing": data[1].ravel(), "upward": data[2].ravel(), } ) # calculate forward gravity of layer forward_df["forward"] = model.prism_layer.gravity( coordinates=( forward_df.easting, forward_df.northing, forward_df.upward, ), field="g_z", progressbar=progressbar, ) # if checkerboard: # calculate forward gravity of layer forward_df["forward_no_edge_effects"] = model_mean_zref.prism_layer.gravity( coordinates=( forward_df.easting, forward_df.northing, forward_df.upward, ), field="g_z", progressbar=progressbar, ) grav_ds = forward_df.set_index(["northing", "easting"]).to_xarray() # shift forward gravity with and without edge effects so max value are equal shift = grav_ds.forward_no_edge_effects.max() - grav_ds.forward.max() grav_ds["forward_no_edge_effects"] -= shift dif = grav_ds.forward - grav_ds.forward_no_edge_effects max_grav = grav_ds.forward.to_numpy().max() max_decay = 100 * (max_grav - (max_grav + dif.to_numpy().min())) / max_grav if plot: try: plotting.plot_edge_effects( grav_ds=grav_ds, layer=model, inner_region=inner_region, plot_profile=plot_profile, ) except Exception as e: # pylint: disable=broad-exception-caught logger.error("plotting failed with error: %s", e) return max_decay, buffer_width, int(buffer_cells), grav_ds