import copy # pylint: disable=too-many-lines
import os
import typing
import warnings
from contextlib import contextmanager
import dask
import harmonica as hm
import numpy as np
import pandas as pd
import polartoolkit as ptk
import pygmt
import pyproj
import sklearn
import verde as vd
import xarray as xr
import xrft
from numpy.typing import NDArray
from pykdtree.kdtree import KDTree # pylint: disable=no-name-in-module
from invert4geom import logger, plotting
@contextmanager
def _log_level(level, log_instance=logger): # type: ignore[no-untyped-def, has-type]
"Run body with logger at a different level"
saved_logger_level = log_instance.level
log_instance.setLevel(level)
try:
yield saved_logger_level
finally:
log_instance.setLevel(saved_logger_level)
@contextmanager
def _environ(**env): # type: ignore[no-untyped-def] # pylint: disable=missing-function-docstring
"""temporarily set/reset an environment variable"""
originals = {k: os.environ.get(k) for k in env}
for k, val in env.items():
if val is None:
os.environ.pop(k, None)
else:
os.environ[k] = val
try:
yield
finally:
for k, val in originals.items():
if val is None:
os.environ.pop(k, None)
else:
os.environ[k] = val
class DuplicateFilter:
"""
Filters away duplicate log messages.
Adapted from https://stackoverflow.com/a/60462619/18686384
"""
def __init__(self, log): # type: ignore[no-untyped-def]
self.msgs = set()
self.log = log
def filter(self, record): # type: ignore[no-untyped-def] # pylint: disable=missing-function-docstring
msg = str(record.msg)
is_duplicate = msg in self.msgs
if not is_duplicate:
self.msgs.add(msg)
return not is_duplicate
def __enter__(self): # type: ignore[no-untyped-def]
self.log.addFilter(self)
def __exit__(self, exc_type, exc_val, exc_tb): # type: ignore[no-untyped-def]
self.log.removeFilter(self)
def get_epsg(coast: bool) -> tuple[str, bool]:
"""
Get the EPSG code for plotting coastlines. If the environment variable
'POLARTOOLKIT_EPSG' is not set, default to EPSG of 3857 and skip plotting
coastlines.
Parameters
----------
coast : bool
whether to plot coastlines, if True, will attempt to get EPSG code for plotting
coastlines, if False, will skip plotting coastlines and return EPSG of 3857
Returns
-------
tuple[str, bool]
epsg code and whether to plot coastlines
"""
try:
epsg = ptk.default_epsg(None, None)
except KeyError:
if coast:
msg = (
"couldn't find environment variable 'POLARTOOLKIT_EPSG' with the EPSG "
"code needed for plotting elements with geographic coordinates, such as"
"coastlines. You can set the environment variable "
"temporarily with `import os; os.environ['POLARTOOLKIT_EPSG'] = '3031' "
"where 3031 is the EPSG projection of your region. For now defaulting to "
"EPSG of 3857 and skipping plotting coastlines."
)
warnings.warn(
msg,
UserWarning,
stacklevel=2,
)
logger.warning(msg)
epsg = "3857"
coast = False
epsg = "3857"
return epsg, coast
def _check_constraints_inside_gravity_region(
constraints_df: pd.DataFrame,
grav_ds: xr.Dataset,
) -> None:
"""check that all constraints are inside the region of the gravity data"""
coord_names = grav_ds.coord_names
grav_df = grav_ds.inv.df
grav_region = vd.get_region((grav_df[coord_names[0]], grav_df[coord_names[1]]))
inside = vd.inside(
(constraints_df[coord_names[0]], constraints_df[coord_names[1]]),
region=grav_region,
)
if not inside.all():
msg = (
"Some constraints are outside the region of the gravity data. "
"This may result in unexpected behavior."
)
logger.warning(msg)
def rmse(data: NDArray, as_median: bool = False) -> float:
"""
function to give the root mean/median squared error (RMSE) of data
Parameters
----------
data : numpy.ndarray
input data
as_median : bool, optional
choose to give root median squared error instead, by default False
Returns
-------
float
RMSE value
"""
if as_median:
value: float = np.sqrt(np.nanmedian(data**2).item())
else:
value = np.sqrt(np.nanmean(data**2).item())
return value
def _nearest_grid_fill(
grid: xr.DataArray,
method: str = "verde",
crs: str | None = None,
) -> xr.DataArray:
"""
fill missing values in a grid with the nearest value.
Parameters
----------
grid : xarray.DataArray
grid with missing values
method : str, optional
choose method of filling, by default "verde"
crs : str | None, optional
if method is 'rioxarray', provide the crs of the grid, in format 'epsg:xxxx',
by default None
Returns
-------
xarray.DataArray
filled grid
"""
# TODO: also check out rasterio fillnodata() https://rasterio.readthedocs.io/en/latest/api/rasterio.fill.html#rasterio.fill.fillnodata
# uses https://gdal.org/en/stable/api/gdal_alg.html#_CPPv414GDALFillNodata15GDALRasterBandH15GDALRasterBandHdiiPPc16GDALProgressFuncPv
# can fill with nearest neighbor or inverse distance weighting
# get coordinate names
original_dims = list(grid.sizes.keys())
# get original grid name
original_name = grid.name
if method == "rioxarray":
filled: xr.DataArray = (
grid.rio.write_crs(crs)
.rio.set_spatial_dims(original_dims[1], original_dims[0])
.rio.write_nodata(np.nan)
.rio.interpolate_na(method="nearest")
.rename(original_name)
)
elif method == "verde":
df = vd.grid_to_table(grid)
df_dropped = df[df[grid.name].notna()]
coords = (df_dropped[grid.dims[1]], df_dropped[grid.dims[0]])
region = vd.get_region((df[grid.dims[1]], df[grid.dims[0]]))
filled = (
vd.KNeighbors()
.fit(coords, df_dropped[grid.name])
.grid(
region=region,
shape=grid.shape,
data_names=original_name,
dims=(original_dims[1], original_dims[0]),
)[original_name]
)
# elif method == "pygmt":
# filled = pygmt.grdfill(grid, mode="n", verbose="q").rename(original_name)
else:
msg = "method must be 'rioxarray', or 'verde'"
raise ValueError(msg)
# reset coordinate names if changed
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="rename '")
return filled.rename(
{
next(iter(filled.dims)): original_dims[0],
list(filled.dims)[1]: original_dims[1],
}
)
def region_mask(
grid: xr.DataArray,
region: tuple[float, float, float, float],
) -> xr.DataArray:
"""
Make a mask with values of 1 inside a region and 0 outside the region.
"""
# get coordinate names
coord_names = list(grid.sizes.keys())
mask_easting = (grid[coord_names[0]] >= region[0]) & (
grid[coord_names[0]] <= region[1]
)
mask_northing = (grid[coord_names[1]] >= region[2]) & (
grid[coord_names[1]] <= region[3]
)
return xr.where(mask_easting & mask_northing, 1, 0)
def filter_grid(
grid: xr.DataArray,
filter_width: float | None = None,
height_displacement: float | None = None,
filter_type: str = "lowpass",
pad_width_factor: int = 3,
pad_mode: str = "linear_ramp",
pad_constant: float | None = None,
pad_end_values: float | None = None,
) -> xr.DataArray:
"""
Apply a spatial filter to a grid.
Parameters
----------
grid : xarray.DataArray
grid to filter the values of
filter_width : float, optional
width of the filter in meters, by default None
height_displacement : float, optional
height displacement for upward continuation, relative to observation height, by
default None
filter_type : str, optional
type of filter to use from 'lowpass', 'highpass' 'up_deriv', 'easting_deriv',
'northing_deriv', 'up_continue', or 'total_gradient', by default "lowpass"
pad_width_factor : int, optional
factor of grid width to pad the grid by, by default 3, which equates to a pad
with a width of 1/3 of the grid width.
pad_mode : str, optional
mode of padding, can be "linear", by default "linear_ramp"
pad_constant : float | None, optional
constant value to use for padding, by default None
pad_end_values : float | None, optional
value to use for end of padding if pad_mode is "linear_ramp", by default None
Returns
-------
xarray.DataArray
a filtered grid
"""
# get coordinate names
original_dims = list(grid.sizes.keys())
# get original grid name
original_name = grid.name
# if there are nan's, fill them with nearest neighbor
if grid.isnull().any(): # noqa: PD003
filled = _nearest_grid_fill(grid, method="verde")
else:
filled = grid.copy()
# reset coordinate names if changed
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="rename '")
filled = filled.rename(
{
next(iter(filled.dims)): original_dims[0],
list(filled.dims)[1]: original_dims[1],
}
)
# define width of padding in each direction
pad_width = {
original_dims[1]: grid[original_dims[1]].size // pad_width_factor,
original_dims[0]: grid[original_dims[0]].size // pad_width_factor,
}
if pad_mode == "constant":
if pad_constant is None:
pad_constant = filled.median()
pad_end_values = None
if (pad_mode == "linear_ramp") and (pad_end_values is None):
pad_end_values = filled.median()
if pad_mode != "constant":
pad_constant = (
None # needed until https://github.com/xgcm/xrft/issues/211 is fixed
)
# apply padding
pad_kwargs = {
**pad_width,
"mode": pad_mode,
"constant_values": pad_constant,
"end_values": pad_end_values,
}
padded = xrft.pad(
filled,
**pad_kwargs,
)
if filter_type == "lowpass":
if filter_width is None:
msg = "filter_width must be provided if filter_type is 'lowpass'"
raise ValueError(msg)
filt = hm.gaussian_lowpass(padded, wavelength=filter_width).rename("filt")
elif filter_type == "highpass":
if filter_width is None:
msg = "filter_width must be provided if filter_type is 'highpass'"
raise ValueError(msg)
filt = hm.gaussian_highpass(padded, wavelength=filter_width).rename("filt")
elif filter_type == "up_deriv":
filt = hm.derivative_upward(padded).rename("filt")
elif filter_type == "easting_deriv":
filt = hm.derivative_easting(padded).rename("filt")
elif filter_type == "northing_deriv":
filt = hm.derivative_northing(padded).rename("filt")
elif filter_type == "up_continue":
if height_displacement is None:
msg = "height_displacement must be provided if filter_type is 'up_continue'"
raise ValueError(msg)
filt = hm.upward_continuation(
padded, height_displacement=height_displacement
).rename("filt")
elif filter_type == "total_gradient":
filt = hm.total_gradient_amplitude(padded).rename("filt")
else:
msg = (
"filter_type must be 'lowpass', 'highpass' 'up_deriv', 'easting_deriv', "
"'northing_deriv', 'up_continue', or 'total_gradient'"
)
raise ValueError(msg)
unpadded = xrft.unpad(filt, pad_width)
# reset coordinate values to original (avoid rounding errors)
unpadded = unpadded.assign_coords(
{
original_dims[0]: grid[original_dims[0]].to_numpy(),
original_dims[1]: grid[original_dims[1]].to_numpy(),
}
)
if grid.isnull().any(): # noqa: PD003
result: xr.DataArray = xr.where(grid.notnull(), unpadded, grid) # noqa: PD004
else:
result = unpadded.copy()
# reset coordinate names if changed
with warnings.catch_warnings():
warnings.filterwarnings("ignore", message="rename '")
result = result.rename(
{
next(iter(result.dims)): original_dims[0],
# list(result.dims)[0]: original_dims[0],
list(result.dims)[1]: original_dims[1],
}
)
return result.rename(original_name)
def dist_nearest_points(
targets: pd.DataFrame,
data: pd.DataFrame | xr.DataArray | xr.Dataset,
coord_names: tuple[str, str] | None = None,
) -> typing.Any:
"""
for all grid cells calculate to the distance to the nearest target.
Parameters
----------
targets : pandas.DataFrame
contains the coordinates of the targets
data : pandas.DataFrame | xarray.DataArray | xarray.Dataset
the grid data, in either gridded or tabular form
coord_names : tuple[str, str] | None, optional
the names of the coordinates for both the targets and the data, by default None
Returns
-------
typing.Any
the distance to the nearest target for each gridcell, in the same format as the
input for `data`.
"""
if coord_names is None:
coord_names = ("easting", "northing")
df_targets = targets[[coord_names[0], coord_names[1]]].copy()
df_data: pd.DataFrame | xr.DataArray | xr.Dataset
if isinstance(data, pd.DataFrame):
df_data = data[list(coord_names)].copy()
elif isinstance(data, xr.DataArray):
df_grid = vd.grid_to_table(data).dropna()
df_data = df_grid[[coord_names[0], coord_names[1]]].copy() # pylint: disable=unsubscriptable-object
elif isinstance(data, xr.Dataset):
try:
df_grid = vd.grid_to_table(data[next(iter(data.variables))]).dropna()
# df_grid = vd.grid_to_table(data[list(data.variables)[0]]).dropna()
except IndexError:
df_grid = vd.grid_to_table(data).dropna()
df_data = df_grid[[coord_names[0], coord_names[1]]].copy() # pylint: disable=unsubscriptable-object
min_dist, _ = KDTree(df_targets.to_numpy()).query(df_data.to_numpy(), 1)
df_data["min_dist"] = min_dist
if isinstance(data, pd.DataFrame):
return df_data
return df_data.set_index([coord_names[0], coord_names[1]][::-1]).to_xarray()
def normalize(
x: NDArray,
low: float = 0,
high: float = 1,
) -> NDArray:
"""
Normalize a list of numbers between provided values
Parameters
----------
x : NDArray
numbers to normalize
low : float, optional
lower value for normalization, by default 0
high : float, optional
higher value for normalization, by default 1
Returns
-------
NDArray
a normalized list of numbers
"""
min_val = np.min(x)
max_val = np.max(x)
norm = (x - min_val) / (max_val - min_val)
return norm * (high - low) + low
def normalize_xarray(
da: xr.DataArray,
low: float = 0,
high: float = 1,
) -> xr.DataArray:
"""
Normalize a grid between provided values
Parameters
----------
da : xarray.DataArray
grid to normalize
low : float, optional
lower value for normalization, by default 0
high : float, optional
higher value for normalization, by default 1
Returns
-------
xarray.DataArray
a normalized grid
"""
# min_val = da.to_numpy().min()
# max_val = da.to_numpy().max()
da = da.copy()
min_val = da.quantile(0)
max_val = da.quantile(1)
da2: xr.DataArray = (high - low) * (
((da - min_val) / (max_val - min_val)).clip(0, 1)
) + low
return da2.drop_vars("quantile")
def scale_normalized(
sample: NDArray,
bounds: NDArray,
) -> NDArray:
"""
Rescales the sample space into the unit hypercube, bounds = [0,1]
Parameters
----------
sample : NDArray
sampled values
bounds : NDArray
bounds of the sampling
Returns
-------
NDArray
sampled values normalized from 0 to 1
"""
scaled_sample = np.zeros(sample.shape)
for j in range(sample.shape[1]):
scaled_sample[:, j] = (sample[:, j] - bounds[j][0]) / (
bounds[j][1] - bounds[j][0]
)
return scaled_sample
[docs]
def normalized_mindist(
points: pd.DataFrame,
grid: xr.DataArray,
low: float | None = None,
high: float | None = None,
mindist: float | None = None,
region: list[float] | None = None,
) -> xr.DataArray:
"""
Find the minimum distance between each grid cell and the nearest point. If low and
high are provided, normalize the min dists grid between these values. If region is
provided, all grid cells outside region are set to a distance of 0.
Parameters
----------
points : pandas.DataFrame
coordinates of the points
grid : xarray.DataArray
gridded data to find min dists for each grid cell
low : float | None, optional
lower value for normalization, by default None
high : float | None, optional
higher value for normalization, by default None
mindist : float | None, optional
the minimum allowed distance, all values below are set equal to, by default None
region : list[float] | None, optional
bounding region for which all grid cells outside will be set to low, by default
None
Returns
-------
xarray.DataArray
grid of normalized minimum distances
"""
grid = copy.deepcopy(grid)
# get coordinate names
original_dims = list(grid.sizes.keys())
constraints_df = points.copy()
min_dist: xr.DataArray = dist_nearest_points(
targets=constraints_df,
data=grid,
coord_names=(str(original_dims[1]), str(original_dims[0])),
).min_dist
# set points < mindist to low
if mindist is not None:
min_dist = xr.where(min_dist < mindist, 0, min_dist)
# set points outside of region to low
if region is not None:
df = vd.grid_to_table(min_dist)
df["are_inside"] = vd.inside(
(df[original_dims[1]], df[original_dims[0]]),
region=region,
)
new_min_dist = df.set_index([original_dims[0], original_dims[1]]).to_xarray()
new_min_dist = xr.where(new_min_dist.are_inside, new_min_dist, 0)
# add nans back
new_min_dist = xr.where(min_dist.isnull(), np.nan, new_min_dist) # noqa: PD003
min_dist = new_min_dist.min_dist
# normalize from low to high
if (low is None) & (high is None):
pass
else:
assert low is not None
assert high is not None
min_dist = normalize_xarray(min_dist, low=low, high=high)
return min_dist
def sample_grids(
df: pd.DataFrame,
grid: str | xr.DataArray,
sampled_name: str,
**kwargs: typing.Any,
) -> pd.DataFrame:
"""
Sample data at every point along a line
Parameters
----------
df : pandas.DataFrame
Dataframe containing columns 'easting', 'northing', 'longitude', 'latitude' or
columns with names defined by kwarg "coord_names".
grid : str or xarray.DataArray
Grid to sample, either file name or xarray.DataArray
sampled_name : str,
Name for sampled column
Returns
-------
pandas.DataFrame
Dataframe with new column (sampled_name) of sample values from (grid)
"""
return ptk.sample_grids(
df=df,
grid=grid,
sampled_name=sampled_name,
**kwargs,
)
def get_spacing(prisms_df: pd.DataFrame) -> None: # noqa: ARG001
"""
DEPRECATED: function has been removed
"""
# pylint: disable=W0613
msg = "Function `get_spacing` deprecated"
raise DeprecationWarning(msg)
[docs]
def create_topography(
method: str,
region: tuple[float, float, float, float],
spacing: float,
dampings: list[float] | None = None,
registration: str = "g",
upward: float | None = None,
upwards: float | None = None,
coord_names: tuple[str, str] = ("easting", "northing"),
projection: pyproj.Proj | None = None,
constraints_df: pd.DataFrame | None = None,
weights: pd.Series | NDArray | None = None,
weights_col: str | None = None,
block_size: float | None = None,
block_reduction: str = "median",
upper_confining_layer: xr.DataArray | None = None,
lower_confining_layer: xr.DataArray | None = None,
dataset_to_add: xr.Dataset | None = None,
) -> xr.Dataset:
"""
Create a grid of topography data from either the interpolation (with splines) of
point data or creating a grid of constant value. Optionally, a subset of point data
can be interpolated and then merged with an existing grid. To do this,
``constraints_df`` must contain two additional columns of booleans, ``inside`` which
is True for points inside the region of interest, and False otherwise, and
``buffer`` which is True for points within a buffer region around the region of
interest, and False otherwise. Inside and Buffer points are used to interpolated the
data, and then the interpolated data (without the buffer zone) is merged with the
points outside the region of interest. For interpolations, ``block_size`` can be
supplied to perform a block-median filtering of the points before fitting the
spline, reducing the computational cost.
Parameters
----------
method : str
method to use, either ``flat`` or ``splines``
region : tuple[float, float, float, float]
bounding region to use for the output grid. If you are using projected
coordinates (meters), this is in the format (min_easting, max_easting,
min_northing, max_northing). If using geographic coordinates (latitude and
longitude), this is in the format (min_longitude, max_longitude, min_latitude,
max_latitude)
spacing : float
spacing of the grid in meters if using projected coordinates, or in decimal
degrees if using geographic coordinates
dampings : list[float] | None, optional
damping values to use in spline cross validation for method ``spline``, by default
None
registration : str, optional
choose between gridline ``g`` or pixel ``p`` registration, by default ``g``
upward : float | None, optional
constant elevation in meters to use for method ``flat``, by default None
upwards : float | None, optional
deprecated, use `upward` instead, by default None
coord_names : tuple[str, str], optional
names to use for coordinates, should be either (easting, northing) or
(longitude, latitude), by default (easting, northing)
projection : pyproj.Proj | None, optional
if using geographic coordinates (latitude and longitude) and the splines method,
provide a pyproj Proj object to convert to cartesian coordinates, by default
None
constraints_df : pandas.DataFrame | None, optional
dataframe with column 'upward' to use for method ``splines``, and optionally
columns ``inside`` and ``buffer``, by default None
weights : pandas.Series | numpy.ndarray | None, optional
weight to use for fitting the spline. Typically, this should be 1 over the data
uncertainty squared, by default None
weights_col : str | None, optional
instead of passing the weights, pass the name of the column containing the
weights, by default None
block_size : float | None, optional
block size to use for block-reduction of constraint points before fitting
splines. In meters if using projected coordinates, or in decimal degrees if
using geographic coordinates. If None, no block-reduction is applied, by
default None
block_reduction: str, optional
type of block reduction to apply, if ``median``, weights will be ignored, of
``mean``, and weights are provided, they will be used in the block reduction.
Defaults to ``median``.
upper_confining_layer : xarray.DataArray | None, optional
layer which the inverted topography should always be below, by default None
lower_confining_layer : xarray.DataArray | None, optional
layer which the inverted topography should always be above, by default None
dataset_to_add : xarray.Dataset | None, optional
additional variables to add to the topography dataset, such as variables `mask`
or `geocentric_radius`, by default None
Returns
-------
xarray.Dataset
a topography grid
"""
if upwards is not None:
warnings.warn(
"`upwards` is deprecated, please use `upward` instead.",
UserWarning,
stacklevel=2,
)
upward = upwards
if method == "flat":
if registration == "g":
pixel_register = False
elif registration == "p":
pixel_register = True
else:
msg = "registration must be 'g' or 'p'"
raise ValueError(msg)
if upward is None:
msg = "upward must be provided if method is `flat`"
raise ValueError(msg)
# create grid of coordinates
(x, y) = vd.grid_coordinates( # pylint: disable=unbalanced-tuple-unpacking
region=region,
spacing=spacing,
pixel_register=pixel_register,
)
# make flat topography of value = upward
grid = vd.make_xarray_grid(
(x, y),
np.ones_like(x) * upward,
data_names="upward",
dims=(coord_names[1], coord_names[0]),
).upward
elif method == "splines": # pylint: disable=too-many-nested-blocks
# get coordinates of the constraint points
if constraints_df is None:
msg = "constraints_df must be provided if method is `splines`"
raise ValueError(msg)
df = constraints_df.copy()
# if only 1 point, return a flat topography
if len(df) == 1:
# create grid of coordinates
(x, y) = vd.grid_coordinates( # pylint: disable=unbalanced-tuple-unpacking
region=region,
spacing=spacing,
)
# make flat topography of value = upward
grid = vd.make_xarray_grid(
(x, y),
np.ones_like(x) * df.upward.to_numpy(),
data_names="upward",
dims=(coord_names[1], coord_names[0]),
).upward
else:
if (coord_names == ("longitude", "latitude")) & (projection is None):
msg = (
"If using geographic coordinates (latitude and longitude) and the "
"spline method, a pyproj.Proj projection must be provided to "
"convert to projected coordinates."
)
raise ValueError(msg)
# convert to projected coordinates if needed
if projection is not None:
projected_points = [
projection(lon, lat)
for (lon, lat) in zip(df.longitude, df.latitude, strict=True)
]
eastings, northings = zip(*projected_points, strict=True)
df["easting"] = eastings
df["northing"] = northings
# convert region in lat/lon to projected easting/northing
corners_lonlat = [
(region[0], region[2]),
(region[1], region[2]),
(region[1], region[3]),
(region[0], region[3]),
]
projected_corners = [
projection(lon, lat) for (lon, lat) in corners_lonlat
]
eastings, northings = zip(*projected_corners, strict=True)
cartesian_region = (
min(eastings),
max(eastings),
min(northings),
max(northings),
)
cartesian_spacing = (
spacing * 111139
) # approx conversion from degrees latitude to meters
if block_size is not None:
block_size = (
block_size * 111139
) # approx conversion from degrees latitude to meters
else:
cartesian_region = region
cartesian_spacing = spacing
if pd.Series(["inside", "buffer"]).isin(df.columns).all():
df_to_interpolate = df[df.inside | df.buffer]
df_outside_buffer = df[(df.inside == False) & (df.buffer == False)] # noqa: E712 # pylint: disable=singleton-comparison
coords = (
df_to_interpolate[coord_names[0]],
df_to_interpolate[coord_names[1]],
)
data = df_to_interpolate.upward
if weights_col is not None:
weights = df_to_interpolate[weights_col]
if block_size is not None:
if block_reduction == "mean":
reduction = np.mean
elif block_reduction == "median":
reduction = np.median
if weights is not None:
msg = "weights are ignored when block_reduction is 'median'"
logger.warning(msg)
else:
msg = "block_reduction must be 'mean' or 'median'"
raise ValueError(msg)
reducer = vd.BlockReduce(
reduction=reduction,
spacing=block_size,
region=cartesian_region,
center_coordinates=True,
)
coords, data = reducer.filter(
coordinates=coords,
data=data,
weights=weights,
)
# run CV for fitting a spline to the data
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="The mindist parameter of verde.Spline"
)
warnings.filterwarnings(
"ignore", message="The default scoring will change"
)
spline = optimal_spline_damping(
coordinates=coords,
data=data,
weights=weights,
dampings=dampings,
)
# grid the fitted spline at desired spacing and region
inside_grid = spline.grid(
region=region,
spacing=spacing,
projection=projection,
dims=(coord_names[1], coord_names[0]),
).scalars
# merge interpolation of inner / buffer points with outside grid
# outside_grid = df_outside_buffer.set_index(
# ["northing", "easting"]).to_xarray().upward
# outside_grid = vd.make_xarray_grid(
# (df_outside_buffer.easting, df_outside_buffer.northing),
# df_outside_buffer.upward,
# data_names="upward",
# )
outside_grid = pygmt.xyz2grd(
x=df_outside_buffer[coord_names[0]],
y=df_outside_buffer[coord_names[1]],
z=df_outside_buffer.upward,
region=cartesian_region,
spacing=cartesian_spacing,
).rename({"x": coord_names[0], "y": coord_names[1]})
grid = inside_grid.where(
outside_grid.isnull(), # noqa: PD003
outside_grid,
)
else:
coords = (df.easting, df.northing)
data = df.upward
if weights_col is not None:
weights = df[weights_col]
if block_size is not None:
if block_reduction == "mean" and weights is not None:
reducer = vd.BlockMean(
spacing=block_size,
region=cartesian_region,
center_coordinates=True,
)
coords, data, weights = reducer.filter(
coordinates=coords,
data=data,
weights=weights,
)
else:
if block_reduction == "mean":
reduction = np.mean
elif block_reduction == "median":
reduction = np.median
if weights is not None:
msg = "weights are ignored when block_reduction is 'median', to use weights, set block_reduction to 'mean'"
logger.warning(msg)
else:
msg = "block_reduction must be 'mean' or 'median'"
raise ValueError(msg)
reducer = vd.BlockReduce(
reduction=reduction,
spacing=block_size,
region=cartesian_region,
center_coordinates=True,
)
coords, data = reducer.filter(
coordinates=coords,
data=data,
weights=weights,
)
# run CV for fitting a spline to the data
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="The mindist parameter of verde.Spline"
)
warnings.filterwarnings(
"ignore", message="The default scoring will change"
)
spline = optimal_spline_damping(
coordinates=coords,
data=data,
weights=weights,
dampings=dampings,
)
# grid the fitted spline at desired spacing and region
grid = spline.grid(
region=region,
spacing=spacing,
projection=projection,
dims=(coord_names[1], coord_names[0]),
).scalars
try:
grid = grid.assign_attrs(damping=spline.damping_)
except UnboundLocalError:
grid = grid.assign_attrs(damping=None)
else:
msg = "method must be 'flat' or 'splines'"
raise ValueError(msg)
# ensure grid doesn't cross supplied confining layers
if upper_confining_layer is None or np.all(upper_confining_layer.isnull()): # noqa: PD003
pass
else:
da = ptk.resample_grid(
upper_confining_layer,
spacing=spacing,
region=region,
registration=registration,
)
original_dims = list(da.sizes.keys())
da = da.rename(
{original_dims[1]: coord_names[0], original_dims[0]: coord_names[1]}
)
grid = xr.where(grid > da, da, grid)
if lower_confining_layer is None or np.all(lower_confining_layer.isnull()): # noqa: PD003
pass
else:
da = ptk.resample_grid(
lower_confining_layer,
spacing=spacing,
region=region,
registration=registration,
)
original_dims = list(da.sizes.keys())
da = da.rename(
{original_dims[1]: coord_names[0], original_dims[0]: coord_names[1]}
)
grid = xr.where(grid < da, da, grid)
ds = grid.to_dataset(name="upward")
if dataset_to_add is not None:
# check datasets are aligned
try:
_ = xr.align(ds, dataset_to_add, join="exact")
except xr.AlignmentError as err:
msg = "`dataset_to_add` should be exactly aligned with the create topography dataset"
raise xr.AlignmentError(msg) from err
ds = ds.merge(dataset_to_add)
return ds
def grid_to_model(
surface: xr.DataArray,
reference: float | xr.DataArray,
density: float | int | xr.DataArray,
model_type: str,
) -> xr.Dataset:
"""
create a Harmonica layer of prisms or tesseroids with assigned densities.
Parameters
----------
surface : xarray.DataArray
data to use for model surface
reference : float | xarray.DataArray
data or constant to use for model reference, if value is below surface,
prism/tesseroid will be inverted
density : float | int | xarray.DataArray
data or constant to use for model densities, should be in the form of a density
contrast across a surface (i.e. between air and rock).
model_type : str
type of model to create, either 'prisms' or 'tesseroids'
Returns
-------
xarray.Dataset
a prism or tesseroid layer with assigned densities
"""
# if density provided as a single number, use it for all prisms/tesseroids
if isinstance(density, (float, int)):
dens = density * np.ones_like(surface)
# if density provided as a dataarray, map each density to the correct prisms/tesseroids
elif isinstance(density, xr.DataArray):
dens = density
else:
msg = "invalid density type, should be a number or DataArray"
raise ValueError(msg)
# create layer of prisms/tesseroids based off input dataarrays
if model_type == "tesseroids":
model = hm.tesseroid_layer(
coordinates=(
surface["longitude"].to_numpy(),
surface["latitude"].to_numpy(),
),
surface=surface,
reference=reference,
properties={
"density": dens,
},
)
elif model_type == "prisms":
model = hm.prism_layer(
coordinates=(
surface["easting"].to_numpy(),
surface["northing"].to_numpy(),
),
surface=surface,
reference=reference,
properties={
"density": dens,
},
)
else:
msg = "model_type must be 'prisms' or 'tesseroids'"
raise ValueError(msg)
model["thickness"] = model.top - model.bottom
# add zref as an attribute
return model.assign_attrs(zref=reference, model_type=model_type)
def grids_to_prisms(
surface: xr.DataArray, # noqa: ARG001
reference: float | xr.DataArray, # noqa: ARG001
density: float | int | xr.DataArray, # noqa: ARG001
input_coord_names: tuple[str, str] = ("easting", "northing"), # noqa: ARG001
) -> None:
"""
DEPRECATED: use the function `grid_to_model` instead
"""
# pylint: disable=W0613
msg = "Function `grids_to_prisms` deprecated, use `grid_to_model` instead"
raise DeprecationWarning(msg)
def best_spline_cv(
coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray], # noqa: ARG001 # pylint: disable=unused-argument
data: pd.Series | NDArray, # noqa: ARG001 # pylint: disable=unused-argument
weights: pd.Series | NDArray | None = None, # noqa: ARG001 # pylint: disable=unused-argument
**kwargs: typing.Any, # noqa: ARG001 # pylint: disable=unused-argument
) -> None:
"""
DEPRECATED: use the function `optimal_spline_damping` instead
"""
msg = "Function `best_spline_cv` deprecated, use `optimal_spline_damping` instead"
raise DeprecationWarning(msg)
[docs]
def optimal_spline_damping(
coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray],
data: pd.Series | NDArray,
weights: pd.Series | NDArray | None = None,
**kwargs: typing.Any,
) -> vd.Spline:
"""
Find the best damping parameter for a verde.SplineCV() fit. All kwargs are passed to
the verde.SplineCV class.
Parameters
----------
coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | \
numpy.ndarray]
easting and northing coordinates of the data
data : pandas.Series | numpy.ndarray
data for fitting the spline to
weights : pandas.Series | numpy.ndarray | None, optional
if not None, then the weights assigned to each data point. Typically, this
should be 1 over the data uncertainty squared, by default None
Keyword Arguments
-----------------
dampings : float | None
The positive damping regularization parameter. Controls how much smoothness is
imposed on the estimated forces. If None, no regularization is used, by default
None
force_coords : bool
The easting and northing coordinates of the point forces. If None (default),
then will be set to the data coordinates.
cv : None | cross-validation generator
Any scikit-learn cross-validation generator. If not given, will use the
default set by :func:`verde.cross_val_score`.
delayed : bool
If True, will use :func:`dask.delayed.delayed` to dispatch computations and
allow :mod:`dask` to execute the grid search in parallel (see note
above).
scoring : None | str | Callable
The scoring function (or name of a function) used for cross-validation.
Must be known to scikit-learn. See the description of *scoring* in
:func:`sklearn.model_selection.cross_val_score` for details. If None,
will fall back to the :meth:`verde.Spline.score` method.
Returns
-------
verde.Spline
the spline which best fits the data
"""
kwargs = copy.deepcopy(kwargs)
dampings = kwargs.pop("dampings", None)
# if single damping value provided, convert to list
if isinstance(dampings, typing.Iterable):
pass
else:
dampings = [dampings]
n_splits = 5
while n_splits > 0:
try:
spline = vd.SplineCV(
dampings=dampings,
cv=sklearn.model_selection.KFold(
n_splits=n_splits,
shuffle=True,
random_state=0,
),
**kwargs,
)
spline.fit(
coordinates,
data,
weights=weights,
)
break
except ValueError as e:
logger.error(e)
msg = "decreasing number of splits by 1 until ValueError is resolved"
logger.warning(msg)
if n_splits == 1:
msg = "ValueError not resolved, fitting spline with no damping"
logger.warning(msg)
spline = vd.Spline(
damping=None,
**kwargs,
)
spline.fit(
coordinates,
data,
weights=weights,
)
n_splits -= 1
if len(dampings) > 1:
try:
logger.info("Best SplineCV score: %s", spline.scores_.max())
except AttributeError:
logger.info("Best SplineCV score: %s", max(dask.compute(spline.scores_)[0]))
logger.info("Best damping: %s", spline.damping_)
dampings_without_none = [i for i in dampings if i is not None]
try:
if spline.damping_ is None:
pass
elif len(dampings) > 2 and spline.damping_ in [
np.min(dampings_without_none),
np.max(dampings_without_none),
]:
logger.warning(
"Best damping value (%s) is at the limit of provided values (%s, %s) "
"and thus is likely not a global minimum, expand the range of values "
"test to ensure the best parameter value value is found.",
spline.damping_,
np.nanmin(dampings_without_none),
np.nanmax(dampings_without_none),
)
except AttributeError:
pass
return spline
def best_equivalent_source_damping(
coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray, pd.Series | NDArray],
data: pd.Series | NDArray,
delayed: bool = False,
weights: pd.Series | NDArray | None = None,
**kwargs: typing.Any,
) -> hm.EquivalentSources:
"""
Find the best damping parameter for a harmonica.EquivalentSource() fit. All kwargs
are passed to the harmonica.EquivalentSource class.
Parameters
----------
coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | numpy.ndarray, \
pandas.Series | numpy.ndarray]
tuple of easting, northing, and upward coordinates of the gravity data
data : pandas.Series | numpy.ndarray
the gravity data
delayed : bool, optional
compute the scores in parallel if True, by default False
weights : numpy.ndarray | None, optional
optional weight values for each gravity data point, by default None
Keyword Arguments
-----------------
damping : float | None
The positive damping regularization parameter. Controls how much
smoothness is imposed on the estimated coefficients.
If None, no regularization is used.
points : list[numpy.ndarray] | None
List containing the coordinates of the equivalent point sources.
Coordinates are assumed to be in the following order:
(``easting``, ``northing``, ``upward``).
If None, will place one point source below each observation point at
a fixed relative depth below the observation point.
Defaults to None.
depth : float or str
Parameter used to control the depth at which the point sources will be
located.
If a value is provided, each source is located beneath each data point
(or block-averaged location) at a depth equal to its elevation minus
the ``depth`` value.
If set to ``"default"``, the depth of the sources will be estimated as
4.5 times the mean distance between first neighboring sources.
This parameter is ignored if *points* is specified.
Defaults to ``"default"``.
block_size: float | tuple[float, float] | None
Size of the blocks used on block-averaged equivalent sources.
If a single value is passed, the blocks will have a square shape.
Alternatively, the dimensions of the blocks in the South-North and
West-East directions can be specified by passing a tuple.
If None, no block-averaging is applied.
This parameter is ignored if *points* are specified.
Default to None.
parallel : bool
If True any predictions and Jacobian building is carried out in
parallel through Numba's ``jit.prange``, reducing the computation time.
If False, these tasks will be run on a single CPU. Default to True.
dtype : str
The desired data-type for the predictions and the Jacobian matrix.
Default to ``"float64"``.
Returns
-------
harmonica.EquivalentSources
the best fitted equivalent sources
"""
kwargs = copy.deepcopy(kwargs)
dampings = kwargs.pop("dampings", None)
kwargs.pop("damping", None)
# if single damping value provided, convert to list
if isinstance(dampings, typing.Iterable):
pass
else:
dampings = [dampings]
# pylint: disable=duplicate-code
if np.isnan(coordinates).any():
msg = "coordinates contain NaN"
raise ValueError(msg)
if np.isnan(data).any():
msg = "data contains is NaN"
raise ValueError(msg)
scores = []
for d in dampings:
eqs = hm.EquivalentSources(
damping=d,
**kwargs,
)
score = np.mean(
vd.cross_val_score(
eqs, coordinates, data, delayed=delayed, weights=weights, scoring="r2"
)
)
# pylint: enable=duplicate-code
scores.append(score)
if delayed:
scores = dask.compute(scores)[0]
else:
pass
best = np.argmax(scores)
logger.info("Best EqSources score: %s", scores[best])
logger.info("Best damping: %s", dampings[best])
dampings_without_none = [i for i in dampings if i is not None]
if dampings[best] is None:
pass
elif len(dampings) > 2 and dampings[best] in [
np.min(dampings_without_none),
np.max(dampings_without_none),
]:
logger.warning(
"Best damping value (%s) is at the limit of provided values (%s, %s) and "
"thus is likely not a global minimum, expand the range of values test to "
"ensure the best parameter value value is found.",
dampings[best],
np.nanmin(dampings_without_none),
np.nanmax(dampings_without_none),
)
return hm.EquivalentSources(damping=dampings[best]).fit(
coordinates, data, weights=weights
)
def gravity_decay_buffer(
buffer_perc: float,
spacing: float,
inner_region: tuple[float, float, float, float],
top: float,
zref: float,
obs_height: float,
density: float,
amplitude: float | None = None,
wavelength: float | None = None,
checkerboard: bool = False,
as_density_contrast: bool = False,
plot: bool = True,
plot_profile: bool = True,
progressbar: bool = False,
) -> tuple[float, float, int, xr.Dataset]:
"""
For a given buffer zone width (as percentage of x or y range) and domain parameters,
calculate the max percent decay of the gravity anomaly within the region of
interest.
Parameters
----------
buffer_perc : float
percentage of the widest dimension of inner_region to use as buffer zone
spacing : float
spacing of the prism layer and gravity observation points in meters
inner_region : tuple[float, float, float, float]
region boundaries for the region of interest in meters
top : float
height in meters for the top of the prisms
zref : float
reference level in meters for the prisms
obs_height : float
gravity observation height in meters
density : float
density value for the prisms in kg/m^3
amplitude : float | None, optional
if using `checkerboard`, this is the amplitude of each undulation, by default
None
wavelength : float | None, optional
if using `checkerboard`, this is the wavelength of each undulation, by default
None
checkerboard : bool, optional
use an undulating checkerboard for the topography instead of a flat surface, by
default False
as_density_contrast : bool, optional
discretize the topography as a density contrast, resulting in no edge effects,
by default False
plot : bool, optional
plot the results, by default True
plot_profile : bool, optional
plot a profile across the prism layer, by default True
progressbar : bool, optional
show a progressbar for the forward gravity calculation, by default False
Returns
-------
max_decay : float
the maximum percentage decay of the gravity anomaly within the region of
interest
buffer_width : float
width of the buffer zone in meters
buffer_cells : int
number of cells in the buffer zone
grav_ds : xarray.Dataset
dataset of the forward gravity calculations
"""
if (checkerboard is False) & (top == zref):
msg = "top and zref must be different if checkerboard is False"
raise ValueError(msg)
# get x and y range of interest region
x_diff = np.abs(inner_region[0] - inner_region[1])
y_diff = np.abs(inner_region[2] - inner_region[3])
# pick the bigger range
max_diff = max(x_diff, y_diff)
# calc buffer as percentage of width
buffer_width = max_diff * (buffer_perc / 100)
# round to nearest multiple of spacing
def round_to_input(num: float, multiple: float) -> float:
return round(num / multiple) * multiple
# round buffer width to nearest spacing interval
buffer_width = round_to_input(buffer_width, spacing)
# define buffer region
buffer_region = vd.pad_region(inner_region, buffer_width)
# calculate buffer width in terms of number of cells
buffer_cells = buffer_width / spacing
# create topography
if checkerboard:
synth = vd.synthetic.CheckerBoard(
amplitude=amplitude,
region=buffer_region,
w_east=wavelength,
w_north=wavelength,
)
surface = synth.grid(
spacing=spacing, data_names="upward", dims=("northing", "easting")
).upward
surface += top
else:
surface = create_topography(
method="flat",
upward=top,
region=buffer_region,
spacing=spacing,
).upward
# create prism layer
if as_density_contrast:
# create prisms around mean value to compare to to calculate decay
zref = surface.to_numpy().mean()
# positive densities above, negative below
dens = surface.copy()
dens.to_numpy()[:] = density
dens = dens.where(surface >= zref, -density)
# create prism layer with a mean zref
model = grid_to_model(
surface,
zref,
density=dens,
model_type="prisms",
)
else:
model = grid_to_model(
surface,
zref,
density=density,
model_type="prisms",
)
# create prisms around mean value to compare to to calculate decay
zref = surface.to_numpy().mean()
# positive densities above, negative below
dens = surface.copy()
dens.to_numpy()[:] = density
dens = dens.where(surface >= zref, -density)
# create prism layer with a mean zref
model_mean_zref = grid_to_model(
surface,
zref,
density=dens,
model_type="prisms",
)
# create set of observation points
data = vd.grid_coordinates(
inner_region,
spacing=spacing,
extra_coords=obs_height,
)
forward_df = pd.DataFrame(
{
"easting": data[0].ravel(),
"northing": data[1].ravel(),
"upward": data[2].ravel(),
}
)
# calculate forward gravity of layer
forward_df["forward"] = model.prism_layer.gravity(
coordinates=(
forward_df.easting,
forward_df.northing,
forward_df.upward,
),
field="g_z",
progressbar=progressbar,
)
# if checkerboard:
# calculate forward gravity of layer
forward_df["forward_no_edge_effects"] = model_mean_zref.prism_layer.gravity(
coordinates=(
forward_df.easting,
forward_df.northing,
forward_df.upward,
),
field="g_z",
progressbar=progressbar,
)
grav_ds = forward_df.set_index(["northing", "easting"]).to_xarray()
# shift forward gravity with and without edge effects so max value are equal
shift = grav_ds.forward_no_edge_effects.max() - grav_ds.forward.max()
grav_ds["forward_no_edge_effects"] -= shift
dif = grav_ds.forward - grav_ds.forward_no_edge_effects
max_grav = grav_ds.forward.to_numpy().max()
max_decay = 100 * (max_grav - (max_grav + dif.to_numpy().min())) / max_grav
if plot:
try:
plotting.plot_edge_effects(
grav_ds=grav_ds,
layer=model,
inner_region=inner_region,
plot_profile=plot_profile,
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return max_decay, buffer_width, int(buffer_cells), grav_ds