import copy # pylint: disable=too-many-lines
import itertools
import logging
import math
import pathlib
import pickle
import random
import time
import typing
import warnings
import harmonica as hm
import matplotlib as mpl
import matplotlib.pyplot as plt
import numba
import numpy as np
import optuna
import pandas as pd
import polartoolkit as ptk
import scipy as sp
import seaborn as sns
import verde as vd
import xarray as xr
from IPython.display import clear_output
from numpy.typing import NDArray
from tqdm.autonotebook import tqdm
from invert4geom import (
cross_validation,
logger,
optimization,
plotting,
regional,
utils,
)
@numba.jit(cache=True, nopython=True)
def grav_column_der(
grav_easting: NDArray,
grav_northing: NDArray,
grav_upward: NDArray,
prism_easting: NDArray,
prism_northing: NDArray,
prism_top: NDArray,
prism_spacing: float,
prism_density: NDArray,
) -> NDArray:
"""
Function to calculate the derivative of the vertical gravitational acceleration with
respect to height (thickness) of a right-rectangular prism at an observation point.
The equation of the vertical gravitational acceleration of an annulus from
:footcite:p:`hammerterrain1939` is differentiated with respect to height to get the
derivative of vertical gravity with respect to height of an annulus, and then this
is multiplied by the ratio of the area of the prism to the area of the full annulus
from :footcite:p:`mccubbineairborne2016`to the approximation for a prism.
Parameters
----------
grav_easting, grav_northing, grav_upward : numpy.ndarray
coordinates of gravity observation points.
prism_easting, prism_northing, prism_top : numpy.ndarray
coordinates of prism's center in northing, easting, and upward directions,
respectively
prism_spacing : float
resolution of prism layer in meters
prism_density : numpy.ndarray
density of prisms, in kg/m^3
Returns
-------
numpy.ndarray
array of derivative of vertical gravitational acceleration with respect to prism
height at observation point for series of prisms
References
----------
.. footbibliography::
"""
# distance from gravity observation to prism center in horizontal plane
r = np.sqrt(
np.square(grav_northing - prism_northing)
+ np.square(grav_easting - prism_easting)
)
# get inner (r1) and outer (r2) radius of annulus
# McCubbine 2016 Thesis definitions
# this results in larger prisms, not sure why he chose this
# r1 = r - np.sqrt(np.square(prism_spacing)/2)
# r2 = r + np.sqrt(np.square(prism_spacing)/2)
# instead, we just add/subtract half the prism size to get r1 and r2
r1 = r - 0.5 * prism_spacing # eq. 2.17 in McCubbine 2016 Thesis
r2 = r + 0.5 * prism_spacing # eq. 2.18 in McCubbine 2016 Thesis
# gravity observation point can't be within prism
# if it is, shift gravity point to be on prism edge
r1[r1 < 0] = 0
# shifting gravity point decreases prism size and thus gravity effect, so shift r2
# to maintain prism size
r2[r2 < prism_spacing] = prism_spacing
# ratio of area of prism to area of full annulus; eq 2.19 in McCubbine 2016 Thesis
f = np.square(prism_spacing) / (np.pi * (np.square(r2) - np.square(r1)))
# get height from prism top to gravity observation point
height = prism_top - grav_upward
# equation for the vertical gravity effect of the full annulus
# from eq. 2.13 in McCubbine 2016 Thesis or eq. 2 in Hammer (1939)
# g_annulus = (
# 2 * np.pi * 6.6743e-11
# * prism_density
# * (
# r2 - r1
# + np.sqrt(r1**2 + (height**2))
# - np.sqrt(r2**2 + (height**2))
# )
# )
# take the derivative w.r.t height to get dg/dh of the full annulus
dg_dh_annulus = (
2
* np.pi
* 6.6743e-11
* prism_density
* height
* (
1 / np.sqrt(np.square(r2) + np.square(height))
- 1 / np.sqrt(np.square(r1) + np.square(height))
)
)
# convert from m/s^2 to mGal
dg_dh_annulus *= 1e5
# multiply by f to get approximate dg/dh of prism (sector of annulus)
return dg_dh_annulus * f
@numba.njit(parallel=True)
def jacobian_geometry_annular(
grav_easting: NDArray,
grav_northing: NDArray,
grav_upward: NDArray,
prism_easting: NDArray,
prism_northing: NDArray,
prism_top: NDArray,
prism_density: NDArray,
prism_spacing: float,
jac: NDArray,
) -> NDArray:
"""
Function to calculate the Jacobian matrix using the annular cylinder
approximation. The resulting Jacobian is a matrix (numpy array) with a row per
gravity observation and a column per prism. This approximates the prisms as an
annulus :footcite:p:`mccubbineairborne2016`, and calculates it's derivative of
vertical gravity with respect to thickness of the prisms/tesseroids. Takes arrays
from `jacobian`, feeds them into `grav_column_der`, and returns the jacobian.
Parameters
----------
grav_easting, grav_northing, grav_upward : numpy.ndarray
coordinates of gravity observation points
prism_easting, prism_northing, prism_top : numpy.ndarray
coordinates of prism's center in northing, easting, and upward directions,
respectively
prism_density : numpy.ndarray
density of prisms, in kg/m^3
prism_spacing : float
resolution of prism layer in meters
jac : numpy.ndarray
empty jacobian matrix with a row per gravity observation and a column per prism
Returns
-------
numpy.ndarray
returns a jacobian matrix of shape (number of gravity points, number of prisms)
References
----------
.. footbibliography::
"""
for i in numba.prange(len(grav_easting)): # pylint: disable=not-an-iterable
jac[i, :] = grav_column_der(
grav_easting[i],
grav_northing[i],
grav_upward[i],
prism_easting,
prism_northing,
prism_top,
prism_spacing,
prism_density,
)
return jac
def _prism_properties(
prisms_layer: xr.Dataset, # noqa: ARG001
method: str = "itertools", # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `_model_properties` function instead
"""
# pylint: disable=W0613
msg = (
"Function `_prism_properties` deprecated, use the `_model_properties` function "
"instead"
)
raise DeprecationWarning(msg)
def _model_properties(
layer: xr.Dataset,
method: str = "itertools",
) -> NDArray:
"""
extract properties from prism or tesseroid layer
Parameters
----------
layer : xarray.Dataset
harmonica prism or tesseroid layer
method : str, optional
choice of method to extract properties, by default "itertools"
Returns
-------
numpy.ndarray
array of layer properties
"""
coord_names = layer.coord_names
if method == "itertools":
layer_properties = []
for (
y,
x,
) in itertools.product(
range(layer[coord_names[1]].size), range(layer[coord_names[0]].size)
):
if layer.model_type == "prisms":
layer_properties.append(
[
*list(layer.prism_layer.get_prism((y, x))),
layer.density.to_numpy()[y, x],
]
)
elif layer.model_type == "tesseroids":
layer_properties.append(
[
*list(layer.tesseroid_layer.get_tesseroid((y, x))),
layer.density.to_numpy()[y, x],
]
)
layer_properties = np.array(layer_properties)
elif method == "forloops":
layer_properties = []
for y in range(layer[coord_names[1]].size):
for x in range(layer[coord_names[0]].size):
if layer.model_type == "prisms":
layer_properties.append(
[
*list(layer.prism_layer.get_prism((y, x))),
layer.density.to_numpy()[y, x],
]
)
elif layer.model_type == "tesseroids":
layer_properties.append(
[
*list(layer.tesseroid_layer.get_tesseroid((y, x))),
layer.density.to_numpy()[y, x],
]
)
np.asarray(layer_properties)
elif method == "generator":
# slower, but doesn't allocate memory
if layer.model_type == "prisms":
layer_properties = [
list(layer.prism_layer.get_prism((y, x))) # noqa: RUF005
+ [layer.density.to_numpy()[y, x]]
for y in range(layer[coord_names[1]].size)
for x in range(layer[coord_names[0]].size)
]
elif layer.model_type == "tesseroids":
layer_properties = [
list(layer.tesseroid_layer.get_tesseroid((y, x))) # noqa: RUF005
+ [layer.density.to_numpy()[y, x]]
for y in range(layer[coord_names[1]].size)
for x in range(layer[coord_names[0]].size)
]
else:
msg = "method must be one of 'itertools', 'forloops', or 'generator'"
raise ValueError(msg)
return layer_properties
def jacobian_prism(
prisms_properties: NDArray, # noqa: ARG001
grav_easting: NDArray, # noqa: ARG001
grav_northing: NDArray, # noqa: ARG001
grav_upward: NDArray, # noqa: ARG001
delta: float, # noqa: ARG001
jac: NDArray, # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `jacobian_finite_difference_prisms` function instead
"""
# pylint: disable=W0613
msg = (
"Function `jacobian_prism` deprecated, use the `jacobian_geometry_finite_difference_prisms` function "
"instead"
)
raise DeprecationWarning(msg)
def jacobian_density_finite_difference_prisms(
model_properties: NDArray,
grav_easting: NDArray,
grav_northing: NDArray,
grav_upward: NDArray,
delta: float,
jac: NDArray,
) -> NDArray:
"""
Function to calculate the Jacobian matrix where each entry is the derivative of
vertical gravity with respect to the density of the prisms, calculated
using a numerical approximation with changing the density of the prisms of the
existing model.
Takes arrays from `jacobian` and calculates the jacobian.
Parameters
----------
model_properties : numpy.ndarray
array of prism properties of shape (number of prisms, 7) with the 7 entries for
each prism being: west, east, south, north, bottom, top, density
grav_easting, grav_northing,grav_upward : numpy.ndarray
coordinates of gravity observation points.
delta : float
small change in density of the prisms used to calculate vertical derivative
jac : numpy.ndarray
empty jacobian matrix with a row per gravity observation and a column per prism
Returns
-------
numpy.ndarray
returns a numpy.ndarray of shape (number of gravity points, number of prisms)
"""
# Finite difference approx for a first order derivative is:
# f'(x) = (f(x+Δx) - f(x))/Δx
# where x is the density of the prism
# we can change the density of the original prism (p1) by Δx giving a new prism (p2)
# f'(x) = (f(p2) - f(p1))/Δx
# the gravity effect of p2 can be split into the effect of p1 and the effect of
# prism with a density of Δx (p_Δx):
# f(p2) = f(p1) + f(p_Δx)
# simplifying: f'(x) = f(p_Δx)/Δx
# where p_Δx is a prism the same dimensions as p1 but with a small density value Δx
for i in numba.prange(len(model_properties)): # pylint: disable=not-an-iterable
element = model_properties[i]
jac[:, i] = (
hm.prism_gravity(
coordinates=(grav_easting, grav_northing, grav_upward),
prisms=element[0:6], # prism boundaries
density=delta, # new density is delta,
field="g_z",
parallel=True,
)
/ delta
)
return jac
def jacobian_density_finite_difference_tesseroids(
model_properties: NDArray,
grav_easting: NDArray,
grav_northing: NDArray,
grav_upward: NDArray,
delta: float,
jac: NDArray,
) -> NDArray:
"""
Function to calculate the Jacobian matrix where each entry is the derivative of
vertical gravity with respect to the density of the tesseroids, calculated
using a numerical approximation with changing the density of the tesseroids of the
existing model.
Takes arrays from `jacobian` and calculates the jacobian.
Parameters
----------
model_properties : numpy.ndarray
array of tesseroids properties of shape (number of tesseroids, 7) with the 7 entries for
each tesseroid being: west, east, south, north, bottom, top, density
grav_easting, grav_northing,grav_upward : numpy.ndarray
coordinates of gravity observation points.
delta : float
small change in density of the tesseroids used to calculate vertical derivative
jac : numpy.ndarray
empty jacobian matrix with a row per gravity observation and a column per tesseroid
Returns
-------
numpy.ndarray
returns a numpy.ndarray of shape (number of gravity points, number of tesseroids)
"""
# Finite difference approx for a first order derivative is:
# f'(x) = (f(x+Δx) - f(x))/Δx
# where x is the density of the tesseroids
# we can change the density of the original tesseroids (p1) by Δx giving a new tesseroid (p2)
# f'(x) = (f(p2) - f(p1))/Δx
# the gravity effect of p2 can be split into the effect of p1 and the effect of
# tesseroid with a density of Δx (p_Δx):
# f(p2) = f(p1) + f(p_Δx)
# simplifying: f'(x) = f(p_Δx)/Δx
# where p_Δx is a tesseroid the same dimensions as p1 but with a small density value Δx
for i in numba.prange(len(model_properties)): # pylint: disable=not-an-iterable
element = model_properties[i]
jac[:, i] = (
hm.tesseroid_gravity(
coordinates=(grav_easting, grav_northing, grav_upward),
tesseroids=element[0:6], # tesseroid boundaries
density=delta, # new density is delta,
field="g_z",
parallel=True,
)
/ delta
)
return jac
def jacobian_geometry_finite_difference_prisms(
model_properties: NDArray,
grav_easting: NDArray,
grav_northing: NDArray,
grav_upward: NDArray,
delta: float,
jac: NDArray,
) -> NDArray:
"""
Function to calculate the Jacobian matrix where each entry is the derivative of
vertical gravity with respect to thickness of the prisms, calculated
using a numerical approximation with small prisms added on top of the
existing model.
Takes arrays from `jacobian` and calculates the jacobian.
Parameters
----------
model_properties : numpy.ndarray
array of prism properties of shape (number of prisms, 7) with the 7 entries for
each prism being: west, east, south, north, bottom, top, density
grav_easting, grav_northing,grav_upward : numpy.ndarray
coordinates of gravity observation points.
delta : float
thickness in meters of small prisms used to calculate vertical derivative
jac : numpy.ndarray
empty jacobian matrix with a row per gravity observation and a column per prism
Returns
-------
numpy.ndarray
returns a numpy.ndarray of shape (number of gravity points, number of prisms)
"""
# Finite difference approx for a first order derivative is:
# f'(x) = (f(x+Δx) - f(x))/Δx
# where x is the thickness of the prism
# we can change the thickness of the original prism (p1) by adding a small prism
# (p2) of thickness Δx on top.
# f'(x) = ((f(p1)+f(p2)) - f(p1))/Δx
# simplifying: f'(x) = f(p2)/Δx
# where p2 is the small prism of thickness Δx
# Add a small model element on top of existing model element with thickness of delta
for i in numba.prange(len(model_properties)): # pylint: disable=not-an-iterable
element = model_properties[i]
density = element[6]
bottom = element[5] # new prism bottom is top of old prism
top = element[5] + delta # new prism top is old prism top + delta
delta_element = (element[0], element[1], element[2], element[3], bottom, top)
jac[:, i] = (
hm.prism_gravity(
coordinates=(grav_easting, grav_northing, grav_upward),
prisms=delta_element,
density=density,
field="g_z",
parallel=True,
)
/ delta
)
return jac
def jacobian_geometry_finite_difference_tesseroids(
model_properties: NDArray,
grav_longitude: NDArray,
grav_latitude: NDArray,
grav_upward: NDArray,
delta: float,
jac: NDArray,
) -> NDArray:
"""
Function to calculate the Jacobian matrix where each entry is the derivative of
vertical gravity with respect to thickness of the tesseroids, calculated
using a numerical approximation with small tesseroids added on top of the
existing model.
Takes arrays from `jacobian` and calculates the jacobian.
Parameters
----------
model_properties : numpy.ndarray
array of tesseroid properties of shape (number of tesseroids, 7) with the 7 entries for
each tesseroid being: west, east, south, north, bottom, top, density
grav_longitude, grav_latitude, grav_upward : numpy.ndarray
coordinates of gravity observation points.
delta : float
thickness in meters of small tesseroids used to calculate vertical derivative
jac : numpy.ndarray
empty jacobian matrix with a row per gravity observation and a column per
tesseroid
Returns
-------
numpy.ndarray
returns a numpy.ndarray of shape (number of gravity points, number of tesseroids)
"""
# Finite difference approx for a first order derivative is:
# f'(x) = (f(x+Δx) - f(x))/Δx
# where x is the thickness of the tesseroid
# we can change the thickness of the original tesseroid (t1) by adding a small
# tesseroid (t2) of thickness Δx on top.
# f'(x) = ((f(t1)+f(t2)) - f(t1))/Δx
# simplifying: f'(x) = f(t2)/Δx
# where t2 is the small tesseroid of thickness Δx
# Add a small model element on top of existing model element with thickness equal
# to delta
for i in numba.prange(len(model_properties)): # pylint: disable=not-an-iterable
element = model_properties[i]
density = element[6]
bottom = element[5]
top = element[5] + delta
delta_element = (element[0], element[1], element[2], element[3], bottom, top)
jac[:, i] = (
hm.tesseroid_gravity(
coordinates=(grav_longitude, grav_latitude, grav_upward),
tesseroids=delta_element,
density=density,
field="g_z",
parallel=True,
)
/ delta
)
return jac
def jacobian(
deriv_type: str, # noqa: ARG001
coordinates: pd.DataFrame, # noqa: ARG001
empty_jac: NDArray | None = None, # noqa: ARG001
prisms_layer: xr.Dataset | None = None, # noqa: ARG001
prism_spacing: float | None = None, # noqa: ARG001
prism_size: float | None = None, # noqa: ARG001
prisms_properties_method: str = "itertools", # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `Inversion` class method `jacobian` instead
"""
# pylint: disable=W0613
msg = (
"Function `jacobian` deprecated, use the `Inversion` class method "
"`jacobian_geometry` instead"
)
raise DeprecationWarning(msg)
def solver(
jac: NDArray, # noqa: ARG001
residuals: NDArray, # noqa: ARG001
damping: float | None = None, # noqa: ARG001
solver_type: str = "scipy least squares", # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `Inversion` class method `solver` instead
"""
# pylint: disable=W0613
msg = (
"Function `solver` deprecated, use the `Inversion` class method `solver` "
"instead"
)
raise DeprecationWarning(msg)
def update_l2_norms(
current_rmse: float, # noqa: ARG001
last_l2_norm: float, # noqa: ARG001
) -> None:
"""
DEPRECATED: function deprecated
"""
# pylint: disable=W0613
msg = "Function `update_l2_norms` has been deprecated"
raise DeprecationWarning(msg)
def end_inversion(
iteration_number: int, # noqa: ARG001
max_iterations: int, # noqa: ARG001
l2_norms: list[float], # noqa: ARG001
l2_norm_tolerance: float, # noqa: ARG001
delta_l2_norm: float, # noqa: ARG001
previous_delta_l2_norm: float, # noqa: ARG001
delta_l2_norm_tolerance: float, # noqa: ARG001
perc_increase_limit: float, # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `Inversion` class method `end_inversion` instead
"""
# pylint: disable=W0613
msg = (
"Function `end_inversion` deprecated, use the `Inversion` class method "
"`end_inversion` instead"
)
raise DeprecationWarning(msg)
def update_gravity_and_misfit(
gravity_df: pd.DataFrame, # noqa: ARG001
prisms_ds: xr.Dataset, # noqa: ARG001
iteration_number: int, # noqa: ARG001
) -> None:
"""
DEPRECATED: function deprecated
"""
# pylint: disable=W0613
msg = "Function `update_gravity_and_misfit` has been deprecated"
raise DeprecationWarning(msg)
def run_inversion(
grav_df: pd.DataFrame, # noqa: ARG001
prism_layer: xr.Dataset, # noqa: ARG001
max_iterations: int, # noqa: ARG001
l2_norm_tolerance: float = 0.2, # noqa: ARG001
delta_l2_norm_tolerance: float = 1.001, # noqa: ARG001
perc_increase_limit: float = 0.20, # noqa: ARG001
deriv_type: str = "annulus", # noqa: ARG001
jacobian_prism_size: float = 1, # noqa: ARG001
solver_type: str = "scipy least squares", # noqa: ARG001
solver_damping: float | None = None, # noqa: ARG001
upper_confining_layer: xr.DataArray | None = None, # noqa: ARG001
lower_confining_layer: xr.DataArray | None = None, # noqa: ARG001
apply_weighting_grid: bool = False, # noqa: ARG001
weighting_grid: xr.DataArray | None = None, # noqa: ARG001
plot_convergence: bool = False, # noqa: ARG001
plot_dynamic_convergence: bool = False, # noqa: ARG001
results_fname: str | None = None, # noqa: ARG001
progressbar: bool = True, # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `Inversion` class method `run_inversion` instead
"""
# pylint: disable=W0613
msg = (
"Function `run_inversion` deprecated, use the `Inversion` class method "
"`invert` instead"
)
raise DeprecationWarning(msg)
[docs]
@xr.register_dataset_accessor("inv")
class DatasetAccessorInvert4Geom:
"""
A class which allows adding properties and methods as xarray dataset accessors.
"""
[docs]
def __init__(
self,
ds: xr.Dataset,
) -> None:
self._ds = ds
@property
def df(self) -> pd.DataFrame:
"""return the dataframe representation of the xarray dataset without nans"""
if self._ds.dataset_type == "data":
return (
self._ds.to_dataframe()
.reset_index()
.dropna(how="any", axis=0)
.reset_index(drop=True)
)
if self._ds.dataset_type == "model":
return self._ds.to_dataframe().reset_index()
msg = "dataset must have attribute 'dataset_type' which is either 'data' or 'model'"
raise ValueError(msg)
@property
def inner_df(self) -> pd.DataFrame:
"""
return the dataframe representation of the inner region of the xarray dataset
without nans
"""
if self._ds.dataset_type == "data":
return (
self.inner.to_dataframe()
.reset_index()
.dropna(how="any", axis=0)
.reset_index(drop=True)
)
if self._ds.dataset_type == "model":
return self.inner.to_dataframe().reset_index()
msg = "dataset must have attribute 'dataset_type' which is either 'data' or 'model'"
raise ValueError(msg)
@property
def inner(self) -> xr.Dataset:
"""return only the inside region of the xarray dataset"""
self._check_dataset_initialized()
if self._ds.model_type == "tesseroids":
return self._ds.sel(
longitude=slice(self._ds.inner_region[0], self._ds.inner_region[1]),
latitude=slice(self._ds.inner_region[2], self._ds.inner_region[3]),
)
if self._ds.model_type == "prisms":
return self._ds.sel(
easting=slice(self._ds.inner_region[0], self._ds.inner_region[1]),
northing=slice(self._ds.inner_region[2], self._ds.inner_region[3]),
)
msg = "dataset must have attribute 'model_type' which is either 'prisms' or 'tesseroids'"
raise ValueError(msg)
@property
def masked_df(self) -> xr.Dataset:
"""
return the dataframe representation of the masked xarray dataset without nans
"""
self._check_correct_dataset_type("model")
return (
self.masked.to_dataframe()
.reset_index()
.dropna(subset=["mask"], axis=0)
.reset_index(drop=True)
)
@property
def masked(self) -> xr.Dataset:
"""return only the model elements with a non-nan mask value"""
self._check_correct_dataset_type("model")
return self._ds.where(np.isfinite(self._ds.mask), drop=True)
###
###
# Methods for the gravity data dataset
###
###
[docs]
def forward_gravity(
self,
layer: xr.Dataset,
name: str = "forward_gravity",
field: str = "g_z",
progressbar: bool = False,
**kwargs: typing.Any,
) -> None:
"""
Calculate the forward gravity of the model at each point of the gravity grid.
Add the calculated gravity effect as a new dataset variable.
Parameters
----------
layer : xarray.Dataset
a prism or tesseroid layer
name : str, optional
Name to assigned the variable for the calculated gravity, by default
"forward_gravity"
field : str, optional
Choose which gravitational field to be calculated, by default "g_z" which is
the downward acceleration.
progressbar : bool, optional
Display a progress bar of the calculation, by default False
kwargs : typing.Any
Additional keyword arguments to pass to
:meth:`harmonica.DatasetAccessorPrismLayer.gravity` or
:meth:`harmonica.DatasetAccessorTesseroidLayer.gravity` depending on the model type.
"""
self._check_correct_dataset_type("data")
df = self.df
coord_names = layer.coord_names
if layer.model_type == "prisms":
df[name] = layer.prism_layer.gravity(
coordinates=(df[coord_names[0]], df[coord_names[1]], df.upward),
field=field,
progressbar=progressbar,
**kwargs,
)
elif layer.model_type == "tesseroids":
df[name] = layer.tesseroid_layer.gravity(
coordinates=(
df[coord_names[0]],
df[coord_names[1]],
df.upward + df.geocentric_radius,
),
field=field,
progressbar=progressbar,
**kwargs,
)
else:
msg = "layer must have attribute 'model_type' which is either 'prisms' or 'tesseroids'"
raise ValueError(msg)
ds = df.set_index([coord_names[1], coord_names[0]]).to_xarray()
self._ds[name] = ds[name]
[docs]
def regional_separation(self, method: str, **kwargs: typing.Any) -> None:
"""
Calculate the gravity misfit as the difference between dataset variables
``gravity_anomaly`` and ``forward_gravity``. Then separate the misfit into
regional and residual components where ``residual = misfit - regional``.
Choose 1 of 6 methods for estimating the regional field:
``constant``, ``filter``, ``trend``, ``eq_sources``, ``constraints``,
``constraints_cv``
The following new variables are added to the dataset:
``misfit``, ``reg``, ``res``, ``starting_forward_gravity``,
``starting_misfit``, ``starting_reg``, ``starting_res``
Parameters
----------
method : str
choose method to apply; one of ``constant``, ``filter``, ``trend``,
``eq_sources``, ``constraints`` or ``constraints_cv``
kwargs : typing.Any
Additional keyword arguments to pass to the various regional separation
methods:
:meth:`DatasetAccessorInvert4Geom.regional_constant`
:meth:`DatasetAccessorInvert4Geom.regional_filter`
:meth:`DatasetAccessorInvert4Geom.regional_trend`
:meth:`DatasetAccessorInvert4Geom.regional_eq_sources`
:meth:`DatasetAccessorInvert4Geom.regional_constraints`
:meth:`DatasetAccessorInvert4Geom.regional_constraints_cv`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
if method == "constant":
self._ds.inv.regional_constant(**kwargs)
elif method == "filter":
self._ds.inv.regional_filter(**kwargs)
elif method == "trend":
self._ds.inv.regional_trend(**kwargs)
elif method == "eq_sources":
self._ds.inv.regional_eq_sources(**kwargs)
elif method == "constraints":
self._ds.inv.regional_constraints(**kwargs)
elif method == "constraints_cv":
self._ds.inv.regional_constraints_cv(**kwargs)
else:
msg = "invalid string for regional method"
raise ValueError(msg)
[docs]
def regional_constant(
self,
constant: float | None = None,
constraints_df: pd.DataFrame | None = None,
regional_shift: float = 0,
mask_column: str | None = None,
reverse_regional_residual: bool = False,
) -> None:
"""
Calculate the gravity misfit as the difference between dataset variables
``gravity_anomaly`` and ``forward_gravity``. Then separate the misfit into
regional and residual components where ``residual = misfit - regional``.
Approximate the regional field with a constant value supplied with
``constant``. If ``constraints_df`` is supplied, the constant value will instead
be the median misfit value at the constraint points.
The resulting regional field can be shifted with ``regional_shift``, and the
calculated residual field can be multiplied by the values in ``mask_column``.
The following new variables are added to the dataset:
``misfit``, ``reg``, ``res``, ``starting_forward_gravity``,
``starting_misfit``, ``starting_reg``, ``starting_res``
Parameters
----------
constant : float
value to use for the regional field.
constraints_df : pandas.DataFrame
a dataframe of constraint points with columns ``easting`` and ``northing``
(or ``longitude`` and ``latitude``).
regional_shift : float, optional
shift to add to the regional field, by default 0
mask_column : str | None, optional
Name of optional dataset variable with values to multiply the calculated
residual gravity field by, should have values of 1 or 0, by default None.
reverse_regional_residual : bool, optional
if True, reverse the regional and residual fields after calculation, by
default False
See also
--------
:meth:`DatasetAccessorInvert4Geom.regional_separation`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
regional.regional_constant(
grav_ds=self._ds,
constant=constant,
constraints_df=constraints_df,
regional_shift=regional_shift,
mask_column=mask_column,
reverse_regional_residual=reverse_regional_residual,
)
self._save_starting_anomalies()
[docs]
def regional_filter(
self,
filter_width: float,
filter_type: str = "lowpass",
regional_shift: float = 0,
mask_column: str | None = None,
reverse_regional_residual: bool = False,
) -> None:
"""
Calculate the gravity misfit as the difference between dataset variables
``gravity_anomaly`` and ``forward_gravity``. Then separate the misfit into
regional and residual components where ``residual = misfit - regional``.
Approximate the regional field by filtering the gravity misfit with a low-pass
gaussian filter with a supplied filter width,
using :func:`harmonica.gaussian_lowpass`. The grid will automatically be
padded to reduce edge effects.
The resulting regional field can be shifted with ``regional_shift``, and the
calculated residual field can be multiplied by the values in ``mask_column``.
The following new variables are added to the dataset:
``misfit``, ``reg``, ``res``, ``starting_forward_gravity``,
``starting_misfit``, ``starting_reg``, ``starting_res``
Parameters
----------
filter_width : float
width in meters to use for the low-pass filter
filter_type : str, optional
type of filter to apply, by default "lowpass"
regional_shift : float, optional
shift to add to the regional field, by default 0
mask_column : str | None, optional
Name of optional dataset variable with values to multiply the calculated
residual gravity field by, should have values of 1 or 0, by default None.
reverse_regional_residual : bool, optional
if True, reverse the regional and residual fields after calculation, by
default False
See also
--------
:meth:`DatasetAccessorInvert4Geom.regional_separation`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
regional.regional_filter(
grav_ds=self._ds,
filter_width=filter_width,
filter_type=filter_type,
regional_shift=regional_shift,
mask_column=mask_column,
reverse_regional_residual=reverse_regional_residual,
)
self._save_starting_anomalies()
[docs]
def regional_trend(
self,
trend: int,
regional_shift: float = 0,
mask_column: str | None = None,
reverse_regional_residual: bool = False,
) -> None:
"""
Calculate the gravity misfit as the difference between dataset variables
``gravity_anomaly`` and ``forward_gravity``. Then separate the misfit into
regional and residual components where ``residual = misfit - regional``.
Approximate the regional field by fitting a polynomial trend to the gravity
misfit using :class:`verde.Trend`.
The resulting regional field can be shifted with ``regional_shift``, and the
calculated residual field can be multiplied by the values in ``mask_column``.
The following new variables are added to the dataset:
``misfit``, ``reg``, ``res``, ``starting_forward_gravity``,
``starting_misfit``, ``starting_reg``, ``starting_res``
Parameters
----------
trend : int
order of the polynomial trend to fit to the data
regional_shift : float, optional
shift to add to the regional field, by default 0
mask_column : str | None, optional
Name of optional dataset variable with values to multiply the calculated
residual gravity field by, should have values of 1 or 0, by default None.
reverse_regional_residual : bool, optional
if True, reverse the regional and residual fields after calculation, by
default False
See also
--------
:meth:`DatasetAccessorInvert4Geom.regional_separation`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
regional.regional_trend(
grav_ds=self._ds,
trend=trend,
regional_shift=regional_shift,
mask_column=mask_column,
reverse_regional_residual=reverse_regional_residual,
)
self._save_starting_anomalies()
[docs]
def regional_eq_sources(
self,
depth: float | str = "default",
damping: float | None = None,
block_size: float | None = None,
grav_obs_height: float | None = None,
cv: bool = False,
weights_column: str | None = None,
cv_kwargs: dict[str, typing.Any] | None = None,
regional_shift: float = 0,
mask_column: str | None = None,
reverse_regional_residual: bool = False,
) -> None:
"""
Calculate the gravity misfit as the difference between dataset variables
``gravity_anomaly`` and ``forward_gravity``. Then separate the misfit into
regional and residual components where ``residual = misfit - regional``.
Approximate the regional field by fitting deep equivalent sources to the the
gravity misfit, using :class:`harmonica.EquivalentSources`. During fitting of
the equivalent sources, the source depth can be chosen with ``depth``, the
results can be smoothed with ``damping``, the gravity points can block-reduced
with ``block_size``, and to simulate upward continuation, the gravity
observation height can be set with ``grav_obs_height``. Instead of specifying
the equivalent source parameters ``depth`` and ``damping``, optimal values can
be chosen through a cross-validated optimization routine by setting ``cv`` to
True, and providing ``cv_kwargs`` which is passed to the function
:func:`optimize_eq_source_params`.
The resulting regional field can be shifted with ``regional_shift``, and the
calculated residual field can be multiplied by the values in ``mask_column``.
The following new variables are added to the dataset:
``misfit``, ``reg``, ``res``, ``starting_forward_gravity``,
``starting_misfit``, ``starting_reg``, ``starting_res``
Parameters
----------
depth : float
depth of each source relative to the data elevation
damping : float | None, optional
smoothness to impose on estimated coefficients, by default None
block_size : float | None, optional
block reduce the data to speed up, by default None
grav_obs_height: float, optional
Observation height to use predicting the eq sources, by default None and will
use the data height from grav_ds.
cv : bool, optional
use cross-validation to find the best equivalent source parameters, by default
False, provide dictionary ``cv_kwargs`` which is passed to
:func:`optimize_eq_source_params` and can contain:
``n_trials``, ``damping_limits``, ``depth_limits``,
``block_size_limits``, ``sampler``, ``plot``, ``progressbar``,
``parallel``, ``dtype``, or ``delayed``.
weights_column: str | None, optional
column name for weighting values of each gravity point.
regional_shift : float, optional
shift to add to the regional field, by default 0
mask_column : str | None, optional
Name of optional dataset variable with values to multiply the calculated
residual gravity field by, should have values of 1 or 0, by default None.
reverse_regional_residual : bool, optional
if True, reverse the regional and residual fields after calculation, by
default False
See also
--------
:meth:`DatasetAccessorInvert4Geom.regional_separation`
:func:`optimize_eq_source_params`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
regional.regional_eq_sources(
grav_ds=self._ds,
depth=depth,
damping=damping,
block_size=block_size,
grav_obs_height=grav_obs_height,
cv=cv,
weights_column=weights_column,
cv_kwargs=cv_kwargs,
regional_shift=regional_shift,
mask_column=mask_column,
reverse_regional_residual=reverse_regional_residual,
)
self._save_starting_anomalies()
[docs]
def regional_constraints(
self,
constraints_df: pd.DataFrame,
grid_method: str = "eq_sources",
constraints_block_size: float | None = None,
constraints_weights_column: str | None = None,
tension_factor: float = 1,
spline_dampings: float | list[float] | None = None,
depth: float | str | None = None,
damping: float | None = None,
cv: bool = False,
block_size: float | None = None,
grav_obs_height: float | None = None,
cv_kwargs: dict[str, typing.Any] | None = None,
regional_shift: float = 0,
mask_column: str | None = None,
reverse_regional_residual: bool = False,
) -> None:
"""
Calculate the gravity misfit as the difference between dataset variables
``gravity_anomaly`` and ``forward_gravity``. Then separate the misfit into
regional and residual components where ``residual = misfit - regional``.
Approximate the regional field by sampling the gravity misfit at constraint
points (points of known elevation of the layer of interest) and interpolating
those sampled ``values``. The interpolation can be accomplished with the
following methods:
Equivalent sources (grid_method="eq_sources")
- uses :class:`harmonica.EquivalentSources`
- ``depth`` and ``damping`` can be set, or optimal values found through a cross-validation procedure using :func:`optimize_eq_source_params`
Tensioned splines (grid_method="pygmt")
- uses :func:`pygmt.surface`
- amount of tension (0-1) can be set with ``tension_factor``
Bi-harmonic splines (grid_method="verde")
- uses :class:`verde.Spline`
- amount of damping can be set with ``spline_dampings``, or a list of damping values can be provided to find the optimal damping through cross-validation using :func:`optimal_spline_damping`
If there are many constraints, they can be block reduced with
``constraints_block_size`` and optional ``constraints_weights_column`` can be
used to weight the constraints during block reduction.
The resulting regional field can be shifted with ``regional_shift``, and the
calculated residual field can be multiplied by the values in ``mask_column``.
The following new variables are added to the dataset:
``misfit``, ``reg``, ``res``, ``starting_forward_gravity``,
``starting_misfit``, ``starting_reg``, ``starting_res``
Parameters
----------
constraints_df : pandas.DataFrame
dataframe of constraints with columns ``easting``, ``northing`` (or
``longitude``, ``latitude``), and ``upward``.
grid_method : str, optional
method used to grid the sampled gravity data at the constraint points. Choose
between ``verde``, ``pygmt``, or ``eq_sources``, by default ``eq_sources``
constraints_block_size : float | None, optional
size of block used in a block-mean reduction of the constraints points, by
default None
constraints_weights_column : str | None, optional
column name for weighting values of each constraint point. Used if
``constraint_block_size`` is not None or if ``grid_method`` is ``verde`` or
``eq_sources``, by default None
tension_factor : float, optional
Tension factor used if ``grid_method`` is ``pygmt``, by default 1
spline_dampings : float | list[float] | None, optional
damping values used if ``grid_method`` is ``verde``, by default None
depth : float | str | None, optional
depth of each source relative to the data elevation, positive downwards in
meters, by default None
damping : float | None, optional
damping values used if ``grid_method`` is ``eq_sources``, by default None
cv : bool, optional
use cross-validation to find the best equivalent source parameters, by
default False, provide dictionary ``cv_kwargs`` which is passed to
``optimization.optimize_eq_source_params`` and can contain:
``n_trials``, ``damping_limits``, ``depth_limits``,
``block_size_limits``, and ``progressbar``.
block_size : float | None, optional
block size used if ``grid_method`` is ``eq_sources``, by default None
grav_obs_height : float, optional
Observation height to use if ``grid_method`` is ``eq_sources``, by default None
cv_kwargs : dict[str, typing.Any] | None, optional
additional keyword arguments to be passed to
:func:`optimize_eq_source_params`, by default None. Can contain:
``n_trials``, ``damping_limits``, ``depth_limits``,
``block_size_limits``, ``sampler``, ``plot``, ``progressbar``,
``parallel``, ``fname``, ``dtype``, or ``delayed``.
regional_shift : float, optional
shift to add to the regional field, by default 0
mask_column : str | None, optional
Name of optional dataset variable with values to multiply the calculated
residual gravity field by, should have values of 1 or 0, by default None.
reverse_regional_residual : bool, optional
if True, reverse the regional and residual fields after calculation, by
default False
See also
--------
:meth:`DatasetAccessorInvert4Geom.regional_separation`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
regional.regional_constraints(
grav_ds=self._ds,
constraints_df=constraints_df,
grid_method=grid_method,
constraints_block_size=constraints_block_size,
constraints_weights_column=constraints_weights_column,
tension_factor=tension_factor,
spline_dampings=spline_dampings,
depth=depth,
damping=damping,
cv=cv,
block_size=block_size,
grav_obs_height=grav_obs_height,
cv_kwargs=cv_kwargs,
regional_shift=regional_shift,
mask_column=mask_column,
reverse_regional_residual=reverse_regional_residual,
)
self._save_starting_anomalies()
[docs]
def regional_constraints_cv(
self,
constraints_df: pd.DataFrame,
split_kwargs: dict[str, typing.Any] | None = None,
regional_shift: float = 0,
mask_column: str | None = None,
reverse_regional_residual: bool = False,
**kwargs: typing.Any,
) -> None:
"""
This is a convenience function to wrap
:func:`optimize_regional_constraint_point_minimization`. It takes a
full constraints dataframe and dictionary ``split_kwargs``, to split the
constraints into testing and training sets (with K-folds), uses these folds in
a K-Folds hyperparameter optimization to find the set of parameter values
(tension factor, spline damping, or equivalent source depth and damping) which
estimates the best regional field. It then uses the optimal parameter values and
all of the constraint points to re-calculate the best regional field. All kwargs
are passed to the function
:func:`optimize_regional_constraint_point_minimization`.
Parameters
----------
constraints_df : pandas.DataFrame
dataframe of constraints with columns ``easting``, ``northing`` (or
``longitude``, ``latitude``), and ``upward``.
split_kwargs : dict[str, typing.Any] | None, optional
kwargs to be passed to :func:`split_test_train`, by default None
regional_shift : float, optional
shift to add to the regional field, by default 0
mask_column : str | None, optional
Name of optional dataset variable with values to multiply the calculated
residual gravity field by, should have values of 1 or 0, by default None.
reverse_regional_residual : bool, optional
if True, reverse the regional and residual fields after calculation, by
default False
**kwargs : typing.Any
kwargs to be passed to :func:`optimize_regional_constraint_point_minimization`
See also
--------
:meth:`DatasetAccessorInvert4Geom.regional_separation`
:meth:`DatasetAccessorInvert4Geom.regional_constraints`
"""
self._check_correct_dataset_type("data")
self._check_grav_vars_for_regional()
regional.regional_constraints_cv(
grav_ds=self._ds,
constraints_df=constraints_df,
split_kwargs=split_kwargs,
regional_shift=regional_shift,
mask_column=mask_column,
reverse_regional_residual=reverse_regional_residual,
**kwargs,
)
self._save_starting_anomalies()
[docs]
def plot_observed(
self,
coast: bool = False,
) -> None:
"""plot observed gravity"""
self._check_correct_dataset_type("data")
epsg, coast = utils.get_epsg(coast=coast)
fig = ptk.plot_grid(
self._ds.gravity_anomaly,
title="Observed gravity",
cbar_label="mGal",
cmap="balance+h0",
robust=True,
absolute=True,
hist=True,
coast=coast,
epsg=epsg,
)
if self._ds.buffer_width is not None:
fig.plot(
x=[
self._ds.inner_region[0],
self._ds.inner_region[1],
self._ds.inner_region[1],
self._ds.inner_region[0],
self._ds.inner_region[0],
],
y=[
self._ds.inner_region[2],
self._ds.inner_region[2],
self._ds.inner_region[3],
self._ds.inner_region[3],
self._ds.inner_region[2],
],
pen="2p,black",
label="Inner region",
region=fig.reg,
projection=fig.proj,
)
fig.legend()
fig.show()
[docs]
def plot_anomalies(
self,
points: pd.DataFrame | None = None,
points_style: str | None = None,
coast: bool = False,
) -> None:
"""plot gravity anomalies"""
self._check_correct_dataset_type("data")
epsg, coast = utils.get_epsg(coast=coast)
ds = self.inner
grids = [
ds.gravity_anomaly,
ds.forward_gravity,
ds.misfit,
ds.reg,
ds.res,
]
titles = [
"Input anomaly",
"Forward gravity",
"Misfit",
"Regional misfit",
"Residual misfit",
]
min_max = ptk.get_combined_min_max(
grids[0:2],
absolute=True,
robust=True,
)
cpt_limits = [
min_max,
min_max,
None,
None,
None,
]
with utils._log_level(logging.CRITICAL, logging.getLogger("polartoolkit")): # noqa: SIM117 # pylint: disable=protected-access
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="Since limits were passed to `cpt_lims`"
)
fig = ptk.subplots(
grids,
dims=(1, 5),
region=self._ds.inner_region,
fig_title="Gravity anomalies",
titles=titles,
cbar_label="mGal",
cmap="balance+h0",
cpt_limits=cpt_limits,
absolute=True,
robust=True,
hist=True,
points=points,
points_style=points_style,
coast=coast,
epsg=epsg,
)
fig.show()
[docs]
def plot_regional_separation(
self,
points: pd.DataFrame | None = None,
points_style: str | None = None,
coast: bool = False,
) -> None:
"""plot gravity misfit and estimate regional and residual components"""
self._check_correct_dataset_type("data")
epsg, coast = utils.get_epsg(coast=coast)
ds = self.inner
grids = [
ds.misfit,
ds.reg,
ds.res,
]
titles = [
"Misfit",
"Regional misfit",
"Residual misfit",
]
cmaps = [
"balance+h0",
"balance+h0",
"balance+h0",
]
fig = ptk.subplots(
grids,
dims=(1, 3),
region=self._ds.inner_region,
fig_title="Gravity misfit grids",
titles=titles,
cbar_label="mGal",
cmaps=cmaps,
robust=True,
absolute=True,
hist=True,
points=points,
points_style=points_style,
coast=coast,
epsg=epsg,
)
fig.show()
###
###
# Methods for the model dataset
###
###
def add_density_correction(self, step: NDArray) -> xr.Dataset:
"""
update the model dataset with the density corrections.
"""
self._check_correct_dataset_type("model")
coord_names = self._ds.coord_names
# get dataframe of model layer (optionally masked)
df = self.masked_df
# add column of density correction values
df = pd.concat(
[
df.drop(columns=["density_correction"], errors="ignore"),
pd.DataFrame({"density_correction": step}),
],
axis=1,
)
# df is only for masked prisms, so fill in 0 for unmasked model elements
df_full = self.df.drop(columns=["density_correction"], errors="ignore")
df_full = df_full.merge(
df[[coord_names[1], coord_names[0], "density_correction"]],
how="left",
on=[coord_names[1], coord_names[0]],
)
df_full["density_correction"] = df_full["density_correction"].fillna(0)
# add the correction values to the model layer dataset
ds = df_full.set_index([coord_names[1], coord_names[0]]).to_xarray()
ds.attrs.update(self._ds.attrs)
return ds
def add_topography_correction(self, step: NDArray) -> xr.Dataset:
"""
update the model dataset with the surface corrections. Ensure
that the updated surface doesn't intersect the optional confining surfaces.
"""
self._check_correct_dataset_type("model")
# get dataframe of model layer (optionally masked)
df = self.masked_df
# add column of topography correction values
df = pd.concat(
[
df.drop(columns=["topography_correction"], errors="ignore"),
pd.DataFrame({"topography_correction": step}),
],
axis=1,
)
# for negative densities, negate the correction
df.loc[df.density < 0, "topography_correction"] *= -1
# optionally constrain the surface correction with bounding surfaces
# alter the surface correction values to ensure when added to the current iteration's
# topography it doesn't intersect optional confining layers.
if df.upper_confining_layer.notna().any():
# get max upward change allowed for each prism
# positive values indicate max allowed upward change
# negative values indicate topography is already too far above upper bound
df["max_change_above"] = df.upper_confining_layer - df.topography
number_enforced = 0
for i, j in enumerate(df.topography_correction):
if j > df.max_change_above[i]:
number_enforced += 1
df.loc[i, "topography_correction"] = df.max_change_above[i]
logger.info(
"enforced upper confining surface at %s prisms", number_enforced
)
if df.lower_confining_layer.notna().any():
# get max downward change allowed for each prism
# negative values indicate max allowed downward change
# positive values indicate topography is already too far below lower bound
df["max_change_below"] = df.lower_confining_layer - df.topography
number_enforced = 0
for i, j in enumerate(df.topography_correction):
if j < df.max_change_below[i]:
number_enforced += 1
df.loc[i, "topography_correction"] = df.max_change_below[i]
logger.info(
"enforced lower confining surface at %s prisms", number_enforced
)
# check that when constrained correction is added to topography it doesn't intersect
# either bounding layer
df["updated_topo"] = df.topography_correction + df.topography
df["diff_upper_confining_layer_and_new_topo"] = (
df.upper_confining_layer - df.updated_topo
)
df["diff_lower_confining_layer_and_new_topo"] = (
df.updated_topo - df.lower_confining_layer
)
if np.any(df.diff_upper_confining_layer_and_new_topo < -0.01):
msg = (
"Constraining didn't work and updated topography intersects upper "
"constraining surface"
)
raise ValueError(msg)
if np.any(df.diff_lower_confining_layer_and_new_topo < -0.01):
msg = (
"Constraining didn't work and updated topography intersects lower "
"constraining surface"
)
raise ValueError(msg)
# df is only for masked model elements, so fill in 0 for unmasked model elements
df_full = self.df.drop(columns=["topography_correction"], errors="ignore")
coord_names = self._ds.coord_names
df_full = df_full.merge(
df[[coord_names[1], coord_names[0], "topography_correction"]],
how="left",
on=[coord_names[1], coord_names[0]],
)
df_full["topography_correction"] = df_full["topography_correction"].fillna(0)
# add the correction values to the model layer dataset
ds = df_full.set_index([coord_names[1], coord_names[0]]).to_xarray()
ds.attrs.update(self._ds.attrs)
return ds
def update_model_ds(
self,
style: str,
) -> xr.Dataset:
"""
apply the corrections (density or topography) and update the model tops,
bottoms, topo, and densities.
Parameters
----------
style : str
choose which correction to apply; either "density" or "geometry"
Returns
-------
xr.Dataset
updated model dataset
"""
self._check_correct_dataset_type("model")
ds = self._ds
# update the topography and model
if style == "geometry":
ds["topography"] = ds.topography + ds.topography_correction
if self._ds.model_type == "prisms":
zref = ds.zref
ds.prism_layer.update_top_bottom(surface=ds.topography, reference=zref)
elif self._ds.model_type == "tesseroids":
zref = ds.geocentric_radius + ds.zref
topography = ds.topography + ds.geocentric_radius
ds.tesseroid_layer.update_top_bottom(surface=topography, reference=zref)
# update the density variable
ds["density"] = xr.where(
ds.top > zref,
ds.density_contrast,
-ds.density_contrast,
)
elif style == "density":
ds["density"] = ds.density + ds.density_correction
# add new density_contrast variable
ds["density_contrast"] = ds.density.where(ds.top > ds.zref, -ds.density)
else:
msg = "style must be either 'density' or 'geometry'"
raise ValueError(msg)
ds.attrs.update(self._ds.attrs)
return ds
[docs]
def plot_model(
self,
**kwargs: typing.Any,
) -> None:
"""
Use :mod:`pyvista` to plot the prism model. All ``kwargs`` are passed to
:func:`plotting.plot_prism_layers`.
"""
if (hasattr(self._ds, "model_type")) and (self._ds.model_type != "prisms"):
msg = "Plotting tesseroid models with PyVista is not support yet"
raise NotImplementedError(msg)
plotting.plot_prism_layers(self._ds, **kwargs)
###
###
# Private methods
###
###
def _save_starting_anomalies(self) -> None:
"""
save starting anomalies to be able to reset inversion later
"""
self._ds["starting_forward_gravity"] = self._ds.forward_gravity.copy()
self._ds["starting_misfit"] = self._ds.misfit.copy()
self._ds["starting_reg"] = self._ds.reg.copy()
self._ds["starting_res"] = self._ds.res.copy()
# self._ds.attrs.update(ds.attrs)
def _check_dataset_initialized(self) -> None:
assert hasattr(self._ds, "dataset_type"), (
"dataset must be passed through `create_data` or `create_model` function."
)
def _check_correct_dataset_type(self, dataset_type: str) -> None:
self._check_dataset_initialized()
if self._ds.dataset_type != dataset_type:
msg = f"Method is only available for the {dataset_type} dataset."
raise ValueError(msg)
def _check_grav_vars_for_regional(self) -> None:
"""
ensure the gravity dataset has the required variables for performing regional
estimation.
"""
self._check_correct_dataset_type("data")
if self._ds.model_type == "tesseroids":
variables = [
"longitude",
"latitude",
"geocentric_radius",
"upward",
"gravity_anomaly",
"forward_gravity",
]
else:
variables = [
"easting",
"northing",
"upward",
"gravity_anomaly",
"forward_gravity",
]
assert all(i in self._ds for i in variables), (
f"`gravity dataset` needs all the following variables: {variables}"
)
def _check_grav_vars(self) -> None:
"""
ensure the gravity dataset has the required variables for performing the
inversion.
"""
self._check_correct_dataset_type("data")
if self._ds.model_type == "tesseroids":
variables = [
"longitude",
"latitude",
"geocentric_radius",
"upward",
"gravity_anomaly",
"forward_gravity",
"misfit",
"reg",
"res",
]
else:
variables = [
"easting",
"northing",
"upward",
"gravity_anomaly",
"forward_gravity",
"misfit",
"reg",
"res",
]
assert all(i in self._ds for i in variables), (
f"`gravity dataset` needs all the following variables: {variables}"
)
def _check_gravity_inside_topography_region(
self,
topography: xr.DataArray,
) -> None:
"""check that all gravity data is inside the region of the topography grid"""
self._check_correct_dataset_type("data")
coord_names = self._ds.coord_names
topo_region = vd.get_region(
(
topography[coord_names[0]].to_numpy(),
topography[coord_names[1]].to_numpy(),
)
)
df = self.df
inside = vd.inside((df[coord_names[0]], df[coord_names[1]]), region=topo_region)
if not inside.all():
msg = (
"Some gravity data are outside the region of the topography grid. "
"This may result in unexpected behavior."
)
raise ValueError(msg)
def _update_gravity_and_residual(
self,
model: xr.Dataset,
) -> None:
"""
calculate the forward gravity of the supplied prism layer, add the results to a
new dataframe column, and update the residual misfit. The supplied gravity
dataframe needs a 'reg' column, which describes the regional component and can
be 0.
"""
self._check_correct_dataset_type("data")
# update the forward gravity
self.forward_gravity(model)
# each iteration updates the topography of the layer to minimize the residual
# portion of the misfit. We then want to recalculate the forward gravity of the
# new layer, use the same original regional misfit, and re-calculate the
# residual.
# Gmisfit = Gobs - Gforward
# Gres = Gmisfit - Greg
# Gres = Gobs - Gforward - Greg
# update the residual misfit with the new forward gravity and the same regional
self._ds["res"] = (
self._ds.gravity_anomaly - self._ds.forward_gravity - self._ds.reg
)
[docs]
def create_data(
gravity: xr.Dataset,
buffer_width: float | None = None,
model_type: str = "prisms",
) -> xr.Dataset:
"""
Convert a dataset of gravity data into the format needed for the inversion. This
includes adding various attributes to the dataset, and defining an inner region
which will be used for plotting and statistics to avoid edge effects.
Parameters
----------
gravity : xarray.Dataset
A dataset with coordinates ``easting`` and ``northing`` (if using prisms) or
``longitude`` and ``latitude`` (if using tesseroids), as well as variables
``upward`` defining the gravity observation height (with same vertical
reference as the model, (i.e. WGS84) in meters and ``gravity_anomaly`` with the
observed gravity values in mGals. If using tesseroids, the dataset must also
contain a variable ``geocentric_radius`` defining the geocentric radius at each
observation point in meters. This can be created using the Python package Boule.
buffer_width : float | None, optional
The width in meters or decimal degrees of a buffer zone used to zoom-in on the
provided data creating an inner region. This inner region will be used for
plotting and calculating statistics for processes such as cross validation, and
l2-norms, this avoids skewing plots and values by edge effects, by default will
use a width of the 10% of the shortest dimension length, to the nearest multiple
of grid spacing.
model_type : str, optional
Choose between ``prisms`` and ``tesseroids``, which affects whether geographic
or projected coordinates are expect, by default ``prisms``
Returns
-------
xarray.Dataset
A dataset with new attributes ``model_type``, ``dataset_type``, ``spacing``,
"buffer_width``, ``spacing``, ``region``, and ``inner_region``.
"""
gravity = gravity.copy()
assert "upward" in gravity, "gravity dataset must contain an 'upward' variable"
if model_type == "prisms":
coord_names = ("easting", "northing")
elif model_type == "tesseroids":
coord_names = ("longitude", "latitude")
assert "geocentric_radius" in gravity, (
"for tesseroid models, gravity dataset must contain a 'geocentric_radius' variable, which can be created using the Python package Boule."
)
else:
msg = "model_type must be either 'prisms' or 'tesseroids'"
raise AssertionError(msg)
assert all(s in gravity.upward.dims for s in coord_names), (
f"gravity dataset must have dims {coord_names}, you can rename your dimensions with `.rename({{'old_name':'new_name'}})`"
)
# set region and spacing from provided grid
spacing, region = ptk.get_grid_info(gravity.upward)[0:2]
# default buffer width is 10% of the shortest dimension of the region, rounded to
# nearest multiple of spacing
if buffer_width is None:
# check that buffer width is a multiple of the grid spacing
min_dimension = min(region[1] - region[0], region[3] - region[2])
buffer_width = min_dimension * 0.1
buffer_width = round(buffer_width / spacing) * spacing
assert buffer_width % spacing == 0, (
f"buffer_width ({buffer_width}) must be a multiple of the grid spacing ({spacing}) of the provided gravity dataframe"
)
min_region_width = min(region[1] - region[0], region[3] - region[2])
assert buffer_width * 2 < min_region_width, (
"buffer_width must be smaller than half the smallest dimension of the region"
)
inner_region = vd.pad_region(region, -buffer_width)
# Append some attributes to the xr.Dataset
attrs = {
"region": region,
"spacing": spacing,
"buffer_width": buffer_width,
"inner_region": inner_region,
"dataset_type": "data",
"model_type": model_type,
"coord_names": coord_names,
}
gravity.attrs = attrs
return gravity
[docs]
def create_model(
zref: float,
density_contrast: xr.DataArray | float,
topography: xr.Dataset,
buffer_width: float = 0,
model_type: str = "prisms",
upper_confining_layer: xr.DataArray | None = None,
lower_confining_layer: xr.DataArray | None = None,
) -> xr.Dataset:
"""
Convert a topography grid into a model, which can be used as the starting model for
an inversion. Choose between using prisms or tesseroids, and constraining the
topography during the inversion with upper and lower confining layers.
Parameters
----------
zref : float
The reference elevation to build prisms or tesseroids around. Model elements
above this reference are assigned positive density contrasts while those below
are assigned negative density contrasts. This value should be relative to the
same reference frame as the ``upward`` variable in the ``topography`` Dataset
(e.g., WGS84 ellipsoidal height, mean sea level, etc).
density_contrast : xarray.DataArray | float
The density contrast to use for the prisms or tesseroids. This can be
a constant value, or a DataArray with the same dimensions as the
``topography`` Dataset.
topography : xarray.Dataset
A dataset with coordinates ``easting`` and ``northing`` (if using prisms) or
``longitude`` and ``latitude`` (if using tesseroids), as well as variables
``upward`` defining the topography (with same vertical reference as the gravity
data, i.e. WGS84) in meters. For tesseroid models, the Dataset must also contain
a ``geocentric_radius`` variable, which can be created using the Python package
Boule. The variable ``upward`` is then assumed to be the ellipsoidal height,
and the geocentric height is calculated as ``upward + geocentric_radius``.
The dataset can optionally contain a ``mask`` variable. Mask values of NaN
won't be altered during the inversion, while non-NaN (finite) mask values are
free to change.
If you don't have a topography grid, you can create one with
:func:`invert4geom.create_topography`.
buffer_width : float, optional
The width in meters or decimal degrees of a buffer zone used to zoom-in on the
provided data creating an inner region. This inner region will be used for
plotting and calculating statistics, this avoids skewing plots and values by
edge effects, by default is None.
model_type : str, optional
The type of model to create, either ``prisms`` or ``tesseroids``, by default
``prisms``.
upper_confining_layer : xarray.DataArray | None, optional
The upper confining layer for the model, should be elevations relative to the
same reference frame as the ``upward`` variable in the ``topography`` Dataset,
by default None
lower_confining_layer : xarray.DataArray | None, optional
The lower confining layer for the model, should be elevations relative to the
same reference frame as the ``upward`` variable in the ``topography`` Dataset,
by default None
Returns
-------
xarray.Dataset
A dataset containing the variables associated with the prism / tesseroid layer
(``top``, ``bottom``, ``density``) as well as variables ``topography``,
``starting_topography``, ``mask``, ``upper_confining_layer``,
``lower_confining_layer`` , and attributes ``model_type``, ``dataset_type``,
``zref``, ``density_contrast``, ``spacing``, ``buffer_width``, ``region``, and
``inner_region``.
"""
topography = topography.copy()
# warn if zref has too many decimals
# d = decimal.Decimal(str(zref))
# result = abs(d.as_tuple().exponent)
# assert result < 10, "to avoid issues with float point precision, please limit the precision of zref to have less than 10 decimal place (or just use an integer)"
assert "upward" in topography, (
"topography dataset must contain an 'upward' variable"
)
if model_type == "prisms":
coord_names = ("easting", "northing")
elif model_type == "tesseroids":
coord_names = ("longitude", "latitude")
assert "geocentric_radius" in topography, (
"for tesseroid models, topography dataset must contain a 'geocentric_radius' variable, which can be created using the Python package Boule."
)
else:
msg = "model_type must be either 'prisms' or 'tesseroids'"
raise AssertionError(msg)
assert all(s in topography.upward.dims for s in coord_names), (
f"topography dataset must have dims {coord_names}, you can rename your dimensions with `.rename({{'old_name':'new_name'}})`"
)
# set region and spacing from provided grid
spacing, region = ptk.get_grid_info(topography.upward)[0:2]
if isinstance(density_contrast, xr.DataArray):
assert all(s in density_contrast.dims for s in coord_names), (
f"`density_contrast` dataarray must have dims {coord_names}, you can rename your dimensions with `.rename({{'old_name':'new_name'}})`"
)
elif isinstance(density_contrast, float | int):
pass
else:
msg = "`density_contrast` must be a float or xarray.DataArray"
raise AssertionError(msg)
if model_type == "tesseroids":
zref_used = topography.geocentric_radius + zref
topography_used = topography.upward + topography.geocentric_radius
else:
zref_used = zref
topography_used = topography.upward
# create grid of density values, positive above zref, negative below
density_grid = xr.where(
topography_used >= zref_used,
density_contrast,
-density_contrast,
)
# create prism / tesseroid layer from topography and density grid
model = utils.grid_to_model(
topography_used,
reference=zref_used,
density=density_grid,
model_type=model_type,
)
# add starting topography and current topography to prism / tesseroid layer dataset
model["starting_topography"] = topography.upward
model["topography"] = topography.upward
model["mask"] = (
topography.mask if "mask" in topography else xr.ones_like(topography.upward)
)
if model_type == "tesseroids":
model["geocentric_radius"] = topography.geocentric_radius
# add optional confining layers as variables
if upper_confining_layer is not None:
# check dataarray has same coord names
if sorted(upper_confining_layer.dims) != sorted(coord_names):
msg = f"upper_confining_layer must have coordinate names {coord_names}, use `.rename({{'old_name':'new_name'}})` to rename your coordinates"
raise ValueError(msg)
# check datarrays are aligned
try:
_ = xr.align(model.topography, upper_confining_layer, join="exact")
except xr.AlignmentError as err:
msg = "`upper_confining_layer` should be exactly aligned with the topography dataset"
raise xr.AlignmentError(msg) from err
model["upper_confining_layer"] = upper_confining_layer
else:
model["upper_confining_layer"] = xr.full_like(
model.topography,
np.nan,
dtype=np.double,
)
if lower_confining_layer is not None:
# check dataarray has same coord names
if sorted(lower_confining_layer.dims) != sorted(coord_names):
msg = f"lower_confining_layer must have coordinate names {coord_names}, use `.rename({{'old_name':'new_name'}})` to rename your coordinates"
raise ValueError(msg)
# check datarrays are aligned
try:
_ = xr.align(model.topography, lower_confining_layer, join="exact")
except xr.AlignmentError as err:
msg = "`lower_confining_layer` should be exactly aligned with the topography dataset"
raise xr.AlignmentError(msg) from err
model["lower_confining_layer"] = lower_confining_layer
else:
model["lower_confining_layer"] = xr.full_like(
model.topography,
np.nan,
dtype=np.double,
)
# default buffer width is 10% of the shortest dimension of the region, rounded to
# nearest multiple of spacing
if buffer_width == 0:
inner_region = region
elif buffer_width > 0:
assert buffer_width % spacing == 0, (
f"buffer_width ({buffer_width}) must be a multiple of the grid spacing ({spacing}) of the provided gravity dataframe"
)
min_region_width = min(region[1] - region[0], region[3] - region[2])
assert buffer_width * 2 < min_region_width, (
"buffer_width must be smaller than half the smallest dimension of the region"
)
inner_region = vd.pad_region(region, -buffer_width)
else:
msg = "buffer_width must be >= 0"
raise ValueError(msg)
# use extent of un-masked cells to define a region to use for plotting
masked_extent = model.mask.where(np.isfinite(model.mask), drop=True)
masked_region = ptk.get_grid_info(masked_extent)[1]
# get inner combinattion of masked_extent and inner_region
inner_region = ptk.regions_overlap(masked_region, inner_region)
# Append some attributes to the xr.Dataset
attrs = {
"zref": zref,
"density_contrast": density_contrast,
"region": region,
"spacing": spacing,
"buffer_width": buffer_width,
"inner_region": inner_region,
"dataset_type": "model",
"model_type": model_type,
"coord_names": coord_names,
}
model.attrs = attrs
return model
[docs]
class Inversion:
"""
A class which holds the gravity dataset, the model dataset, inversion stopping
criteria, parameters which control the inversion, and methods used to run the
inversion and cross-validations.
Data and model are both deep copied, so changes to the original datasets will not be
reflected by the datasets assigned as attributes to this instance, and changes to
the data and model made during an inversion will not affect the original objects.
Parameters
----------
data : xarray.Dataset
A dataset containing the gravity data, which has been initialized with
:func:`create_data`, contains variable ``forward_gravity`` calculated with
:meth:`DatasetAccessorInvert4Geom.forward_gravity`, and contains variables
``misfit``, ``reg``, and ``res``, calculated with
:meth:`DatasetAccessorInvert4Geom.regional_separation`.
model : xarray.Dataset
A dataset containing the prism or tesseroid layer, which has been initialized
with :func:`create_model`.
style : str, optional
style of inversion to run, 'geometry' for changing the topography of the
model, or 'density' for changing the density contrast of the model, by
default 'geometry'
max_iterations : int, optional
Stop the inversion once this number of iterations is reached, by default 100
l2_norm_tolerance : float, optional
Stop the inversion once the L2-norm (square root of the RMS residual misfit) is
less than this L2 norm tolerance, by default 0.2 mGal
delta_l2_norm_tolerance : float, optional
Stop the inversion once the relative change in L2-norm is less than this
tolerance, by default 1.001, which means the L2 norm must decrease by at least
0.1% each iteration.
perc_increase_limit : float, optional
Stop the inversion if the L2-norm ever increases above the minimum L2-norm (of
an iteration) by this decimal percentage amount, by default 0.20, which means if
some iteration had an L2-norm of 1 mGal, if any subsequent iteration has an
L2-norm greater than 1.2 mGal the inversion will terminate.
deriv_type : str, optional
The method to use for calculated the derivative for the Jacobian matrix. Choose
between ``annulus``, for an annular approximation or ``finite_difference``,
for a finite difference approximation, by default ``annulus``
jacobian_finite_step_size : float, optional
small change in density or thickness of the prisms/tesseroids used to calculate
entries of the jacobian, by default 1
or tesseroid for the finite difference approximation, by default 1
model_properties_method : str, optional
method to use to extract prism properties while calculating the Jacobian. Choose
between ``itertools``, ``generator``, or ``forloops``, by default ``itertools``
solver_type : str, optional
method to use for solving Ax=b to find each iteration's topographic correction
grid, by default ``scipy least squares``
solver_damping : float | None, optional
Damping factor for the solver, typically between 0 and 1, by default None
apply_weighting_grid : bool, optional
Whether to apply a weighting grid to the inversion, by default False
weighting_grid : xarray.DataArray | None, optional
Weighting grid to apply to the inversion, created through
:func:`normalized_mindist`, by default None
"""
[docs]
def __init__(
self,
data: xr.Dataset,
model: xr.Dataset,
style: str = "geometry",
max_iterations: int = 100,
l2_norm_tolerance: float = 0.2,
delta_l2_norm_tolerance: float = 1.001,
perc_increase_limit: float = 0.20,
deriv_type: str = "annulus",
jacobian_finite_step_size: float = 1,
model_properties_method: str = "itertools",
solver_type: str = "scipy least squares",
solver_damping: float | None = None,
apply_weighting_grid: bool = False,
weighting_grid: xr.DataArray | None = None,
) -> None:
"""
Initialize the inversion object.
"""
self.data = copy.deepcopy(data)
self.model = copy.deepcopy(model)
self.style = style
self.max_iterations = max_iterations
self.l2_norm_tolerance = l2_norm_tolerance
self.delta_l2_norm_tolerance = delta_l2_norm_tolerance
self.perc_increase_limit = perc_increase_limit
self.deriv_type = deriv_type
self.jacobian_finite_step_size = jacobian_finite_step_size
self.model_properties_method = model_properties_method
self.solver_type = solver_type
self.solver_damping = solver_damping
self.apply_weighting_grid = apply_weighting_grid
self.weighting_grid = weighting_grid
self.end = None
self.jac = None
self.step = None
self.iteration = None
self.stats_df = None
self.time_start = None
self.iter_time_start = None
self.iter_time_end = None
self.past_l2_norm = None
self.elapsed_time = None
self.termination_reason = None
self.params = None
self.results_fname = None
self.gravity_best_score = None
self.constraints_best_score = None
self.best_trial = None
self.study = None
self.damping_cv_results_fname = None
self.damping_cv_study_fname = None
self.zref_density_optimization_study_fname = None
self.zref_density_optimization_results_fname = None
# check invalid deriv type
valid_deriv_types = ["annulus", "finite_difference"]
if self.deriv_type not in valid_deriv_types:
msg = f"deriv_type must be one of {valid_deriv_types}"
raise ValueError(msg)
# check that gravity dataset has necessary dimensions
self.data.inv._check_grav_vars()
# check that gravity data is inside topography region
self.data.inv._check_gravity_inside_topography_region(self.model.topography)
@property
def already_inverted(self) -> bool:
"""
check if an inversion has already be performed on this instance.
"""
return self.iteration is not None
@property
def rmse(self) -> float:
"""
return the root mean square error of the current residual misfit
"""
return utils.rmse(self.data.inv.inner.res)
@property
def l2_norm(self) -> float:
"""
return the l2 norm of the current residual misfit
"""
x: float = np.sqrt(self.rmse)
return x
@property
def delta_l2_norm(self) -> float:
"""
return the current iterations delta l2 norm
"""
x: float = self.past_l2_norm / self.l2_norm # type: ignore[operator]
return x
def end_inversion(self) -> None:
"""
check if the inversion should be terminated
"""
end = False
termination_reason = []
l2_norms = self.stats_df.l2_norm.to_numpy() # type: ignore[attr-defined]
l2_norm = self.stats_df.l2_norm.iloc[self.iteration] # type: ignore[attr-defined]
delta_l2_norm = self.stats_df.delta_l2_norm.iloc[self.iteration] # type: ignore[attr-defined]
# ignore for first iteration
if self.iteration == 1:
pass
else:
if l2_norm > np.min(l2_norms) * (1 + self.perc_increase_limit):
logger.info(
"\nInversion terminated after %s iterations because L2 norm (%s) \n"
"was over %s times greater than minimum L2 norm (%s) \n"
"Change parameter 'perc_increase_limit' if desired.",
self.iteration,
l2_norm,
1 + self.perc_increase_limit,
np.min(l2_norms),
)
end = True
termination_reason.append("l2-norm increasing")
if (delta_l2_norm <= self.delta_l2_norm_tolerance) & (
self.stats_df.delta_l2_norm.iloc[self.iteration - 1] # type: ignore[operator, attr-defined]
<= self.delta_l2_norm_tolerance
):
logger.info(
"\nInversion terminated after %s iterations because there was no "
"significant variation in the L2-norm over 2 iterations \n"
"Change parameter 'delta_l2_norm_tolerance' if desired.",
self.iteration,
)
end = True
termination_reason.append("delta l2-norm tolerance")
if l2_norm < self.l2_norm_tolerance:
logger.info(
"\nInversion terminated after %s iterations because L2-norm (%s) was "
"less then set tolerance: %s \nChange parameter "
"'l2_norm_tolerance' if desired.",
self.iteration,
l2_norm,
self.l2_norm_tolerance,
)
end = True
termination_reason.append("l2-norm tolerance")
if self.iteration >= self.max_iterations: # type: ignore[operator]
logger.info(
"\nInversion terminated after %s iterations with L2-norm=%s because "
"maximum number of iterations (%s) reached.",
self.iteration,
round(l2_norm, 2),
self.max_iterations,
)
end = True
termination_reason.append("max iterations")
if "max iterations" in termination_reason:
msg = (
"Inversion terminated due to max_iterations limit. Consider increasing "
"this limit."
)
logger.warning(msg)
self.end = end # type: ignore[assignment]
self.termination_reason = termination_reason # type: ignore[assignment]
def jacobian_density(self) -> None:
"""
dispatcher for creating the jacobian matrix for a density inversion
"""
if self.deriv_type != "finite_difference":
msg = "For density inversions, only 'finite_difference' deriv_type is supported"
raise ValueError(msg)
# convert gravity dataframe to numpy arrays
coordinates = self.data.inv.df.select_dtypes(include=["number"])
coordinates_array = coordinates.to_numpy()
# get various arrays based on gravity column names
grav_easting = coordinates_array[
:, coordinates.columns.get_loc(self.model.coord_names[0])
]
grav_northing = coordinates_array[
:, coordinates.columns.get_loc(self.model.coord_names[1])
]
grav_upward = coordinates_array[:, coordinates.columns.get_loc("upward")]
if self.model.model_type == "tesseroids":
grav_upward += coordinates_array[
:, coordinates.columns.get_loc("geocentric_radius")
]
assert len(grav_easting) == len(grav_northing) == len(grav_upward)
# create empty jacobian to fill in
# first discard prisms based on mask
jac = np.empty(
(len(grav_easting), self.model.inv.masked_df.density.size),
dtype=np.float64,
)
# get prisms info in following format, 3 methods:
# ((west, east, south, north, bottom, top), density)
model_properties = _model_properties(
self.model.inv.masked, # only use non-masked prisms
method=self.model_properties_method,
)
if np.abs(model_properties[:, 4] - model_properties[:, 5]).max() == 0:
msg = (
"All model elements have zero thickness so we can't perform a density "
"inversion. Either include a starting model with non-zero thicknesses "
"or change inversion style to 'geometry'."
)
raise ValueError(msg)
if self.model.model_type == "prisms":
jac = jacobian_density_finite_difference_prisms(
model_properties,
grav_easting,
grav_northing,
grav_upward,
self.jacobian_finite_step_size,
jac,
)
elif self.model.model_type == "tesseroids":
jac = jacobian_density_finite_difference_tesseroids(
model_properties,
grav_easting,
grav_northing,
grav_upward,
self.jacobian_finite_step_size,
jac,
)
# log Jacobian values
logger.info("Jacobian shape: %s", np.shape(jac))
logger.info(
"Jacobian median: %s m, RMS:%s m",
round(np.nanmedian(jac), 10),
round(utils.rmse(jac), 10),
)
assert ~np.isnan(jac).any(), "Jacobian contains NaN values"
self.jac = jac
def jacobian_geometry(self) -> None:
"""
dispatcher for creating the jacobian matrix for a geometry inversion with 2
method options
"""
# convert gravity dataframe to numpy arrays
coordinates = self.data.inv.df.select_dtypes(include=["number"])
coordinates_array = coordinates.to_numpy()
coord_names = self.model.coord_names
# get various arrays based on gravity column names
grav_easting = coordinates_array[:, coordinates.columns.get_loc(coord_names[0])]
grav_northing = coordinates_array[
:, coordinates.columns.get_loc(coord_names[1])
]
grav_upward = coordinates_array[:, coordinates.columns.get_loc("upward")]
if self.model.model_type == "tesseroids":
grav_upward += coordinates_array[
:, coordinates.columns.get_loc("geocentric_radius")
]
assert len(grav_easting) == len(grav_northing) == len(grav_upward)
# create empty jacobian to fill in
# first discard prisms based on mask
jac = np.empty(
(len(grav_easting), self.model.inv.masked_df.top.size),
dtype=np.float64,
)
if self.deriv_type == "annulus":
if self.model.model_type != "prisms":
msg = "Annulus technique is only available for prism models, use deriv_type='finite_difference' instead"
raise ValueError(msg)
# convert dataframe to arrays
df = self.model.inv.masked_df # only use non-masked prisms
model_element_easting = df[coord_names[0]].to_numpy()
model_element_northing = df[coord_names[1]].to_numpy()
model_element_top = df.top.to_numpy()
model_element_density = df.density.to_numpy()
if np.all((model_element_top - grav_upward.mean()) == 0):
msg = (
"All prism tops coincides exactly with the elevation of the gravity "
"observation points, leading to issues with calculating the derivative "
"of gravity with respect to prism thickness using the annulus technique. "
"Either slightly change the prism tops or gravity elevations, or use "
"the small-prisms vertical derivative technique."
)
raise ValueError(msg)
jac = jacobian_geometry_annular(
grav_easting,
grav_northing,
grav_upward,
model_element_easting,
model_element_northing,
model_element_top,
model_element_density,
self.model.spacing,
jac,
)
elif self.deriv_type == "finite_difference":
# get prisms info in following format, 3 methods:
# ((west, east, south, north, bottom, top), density)
model_properties = _model_properties(
self.model.inv.masked, # only use non-masked prisms
method=self.model_properties_method,
)
if self.model.model_type == "prisms":
jac = jacobian_geometry_finite_difference_prisms(
model_properties,
grav_easting,
grav_northing,
grav_upward,
self.jacobian_finite_step_size,
jac,
)
elif self.model.model_type == "tesseroids":
jac = jacobian_geometry_finite_difference_tesseroids(
model_properties,
grav_easting,
grav_northing,
grav_upward,
self.jacobian_finite_step_size,
jac,
)
else:
msg = "invalid string for deriv_type"
raise ValueError(msg)
# log Jacobian values
logger.info("Jacobian shape: %s", np.shape(jac))
logger.info(
"Jacobian median: %s m, RMS:%s m",
round(np.nanmedian(jac), 10),
round(utils.rmse(jac), 10),
)
assert ~np.isnan(jac).any(), "Jacobian contains NaN values"
self.jac = jac
def solver(self) -> None:
"""
Calculate shift to add to prism top or density for each iteration of the
inversion. Finds the least-squares solution to the Jacobian and the gravity
residual.
"""
if self.solver_type == "scipy least squares":
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.linalg.lsqr.html
damping = 0 if self.solver_damping is None else self.solver_damping
step = sp.sparse.linalg.lsqr(
A=self.jac,
b=self.data.inv.df.res.to_numpy(),
show=False,
damp=damping, # float, typically 0-1
# atol= ,
# btol=1e-4, # if 1e-6, residuals should be accurate to ~6 digits
iter_lim=5000, # limit of iterations, just in case of issues
)[0]
else:
msg = "invalid string for solver_type"
raise ValueError(msg)
# log correction values
logger.info(
"Topography or density correction (before weighting) median: %s m, RMS:%s m",
round(np.nanmedian(step), 6),
round(utils.rmse(step), 6),
)
assert ~np.isnan(step).any(), "correction contains NaN values"
self.step = step
def reinitialize_inversion(self) -> None:
"""
reset inversion object attributes, gravity data misfit, and model topography to
what it was before the inversion.
"""
self.iteration = None
self.data["misfit"] = self.data["starting_misfit"]
self.data["res"] = self.data["starting_res"]
self.data["reg"] = self.data["starting_reg"]
self.data["forward_gravity"] = self.data["starting_forward_gravity"]
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="gravity dataframe does not contain a 'test' column"
)
self.data = cross_validation.remove_test_points(self.data)
self.data = self.data.drop_vars(
[v for v in list(self.data.keys()) if v.startswith("iter_")],
errors="ignore",
)
topography = self.model.starting_topography.to_dataset(name="upward")
topography["mask"] = self.model.mask
if self.model.model_type == "tesseroids":
topography["geocentric_radius"] = self.model.geocentric_radius
model = create_model(
zref=self.model.zref,
density_contrast=self.model.density_contrast,
topography=topography,
buffer_width=self.model.buffer_width,
model_type=self.model.model_type,
upper_confining_layer=self.model.upper_confining_layer,
lower_confining_layer=self.model.lower_confining_layer,
)
self.model = model
def invert(
self,
results_fname: str | None = None,
progressbar: bool = True,
plot_convergence: bool = False,
plot_dynamic_convergence: bool = False,
) -> None:
"""
Start the inversion, saving the results and parameters as attributes and new
variables in the data and model datasets attributes.
Parameters
----------
results_fname : str | None, optional
file name to save results to as pickle file, by default None
progressbar : bool, optional
whether to show an iteration progress bar, by default True
plot_convergence : bool, optional
whether to plot convergence, by default False
plot_dynamic_convergence : bool, optional
whether to plot convergence iteration-by-iteration, by default False
"""
if self.already_inverted is True:
msg = "this inversion object has already been used to run an inversion, re-initialize the object or run `reinitialize_inversion` to run another inversion"
raise ValueError(msg)
# warn if weighting grid supplied by unused
if (self.weighting_grid is not None) & (self.apply_weighting_grid is False):
msg = (
"weighting grid supplied but not used because apply_weighting_grid is "
"False"
)
raise ValueError(msg)
if (self.apply_weighting_grid is True) & (self.weighting_grid is None):
msg = "must supply weighting grid if apply_weighting_grid is True"
raise ValueError(msg)
self.iteration = 0 # type: ignore[assignment]
# create empty dataframe to hold iteration stats
self.stats_df = pd.DataFrame(
columns=[
"iteration",
"rmse",
"l2_norm",
"delta_l2_norm",
"iter_time_sec",
]
)
# add row for iteration 0 (initial model)
self.stats_df.loc[self.iteration] = [ # type: ignore[attr-defined]
self.iteration,
self.rmse,
self.l2_norm,
np.inf,
np.nan,
]
# start timing of overall inversion
self.time_start = time.perf_counter() # type: ignore[assignment]
if progressbar is True:
pbar = tqdm(range(self.max_iterations), initial=1, desc="Iteration")
elif progressbar is False:
pbar = range(self.max_iterations)
else:
msg = "progressbar must be a boolean" # type: ignore[unreachable]
raise ValueError(msg)
for iteration, _ in enumerate(pbar, start=1):
logger.info(
"\n #################################### \n iteration %s", iteration
)
# start iteration timer
self.iter_time_start = time.perf_counter() # type: ignore[assignment]
self.iteration = iteration # type: ignore[assignment]
# calculate jacobian sensitivity matrix of non-nan gravity points and
# non-masked model elements
if self.style == "geometry":
self.jacobian_geometry()
elif self.style == "density":
self.jacobian_density()
else:
msg = "invalid string for style, should be 'geometry' or 'density'"
raise ValueError(msg)
# calculate array of topographic correction for each prism
self.solver()
# add topography correction array to model dataset, optionally enforcing
# confining layers
if self.style == "geometry":
self.model = self.model.inv.add_topography_correction(self.step)
# add density correction array to model dataset
elif self.style == "density":
self.model = self.model.inv.add_density_correction(self.step)
# optionally apply weights to the topo correction grid
if self.apply_weighting_grid is True:
if self.style == "geometry":
self.model["topography_correction"] = (
self.model.topography_correction * self.weighting_grid
)
elif self.style == "density":
self.model["density_correction"] = (
self.model.density_correction * self.weighting_grid
)
# add the corrections and update the prisms dataset
self.model = self.model.inv.update_model_ds(style=self.style)
# save results with iteration number
self.model[f"iter_{self.iteration}_top"] = self.model.top
self.model[f"iter_{self.iteration}_bottom"] = self.model.bottom
self.model[f"iter_{self.iteration}_density"] = self.model.density
self.model[f"iter_{self.iteration}_layer"] = self.model.topography
if self.style == "geometry":
self.model[f"iter_{self.iteration}_correction"] = (
self.model.topography_correction
)
elif self.style == "density":
self.model[f"iter_{self.iteration}_correction"] = (
self.model.density_correction
)
# save current residual with iteration number
self.data[f"iter_{self.iteration}_initial_residual"] = self.data.res
# update the forward gravity, residual, and the l2 / delta l2 norms
self.data.inv._update_gravity_and_residual(self.model) # pylint: disable=protected-access
# end iteration timer
self.iter_time_end = time.perf_counter() # type: ignore[assignment]
self.past_l2_norm = self.stats_df.loc[self.iteration - 1, "l2_norm"] # type: ignore[attr-defined, operator]
# add row for current iteration
self.stats_df.loc[self.iteration] = [ # type: ignore[attr-defined]
self.iteration,
self.rmse,
self.l2_norm,
self.delta_l2_norm,
self.iter_time_end - self.iter_time_start, # type: ignore[operator]
]
# decide if to end the inversion
self.end_inversion()
if plot_dynamic_convergence is True:
self.plot_dynamic_convergence()
if self.end is True:
# self.already_inverted = True
if progressbar is True: # type: ignore[unreachable]
pbar.set_description(
f"Inversion ended due to {self.termination_reason}"
)
break
# end of inversion loop
self.elapsed_time = time.perf_counter() - self.time_start # type: ignore[assignment, operator]
if plot_convergence is True:
try:
self.plot_convergence()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
self.params = { # type: ignore[assignment]
# first column
"Density contrast(s)": "Spatially variable"
if isinstance(self.model.density_contrast, xr.DataArray)
else f"{np.unique(np.abs(self.model.density_contrast))} kg/m3",
"Inversion style": self.style,
"Reference level": f"{self.model.zref} m",
"Max iterations": self.max_iterations,
"L2 norm tolerance": f"{self.l2_norm_tolerance}",
"Delta L2 norm tolerance": f"{self.delta_l2_norm_tolerance}",
# second column
"Deriv type": self.deriv_type,
"Solver type": self.solver_type,
"Solver damping": self.solver_damping,
"Upper confining layer": "Not enabled"
if np.isnan(self.model.upper_confining_layer.values).all()
else "Enabled",
"Lower confining layer": "Not enabled"
if np.isnan(self.model.lower_confining_layer.values).all()
else "Enabled",
"Regularization weighting grid": "Not enabled"
if self.apply_weighting_grid is False
else "Enabled",
# third column
"Time elapsed": f"{int(self.elapsed_time)} seconds", # type: ignore[call-overload]
"Avg. iteration time": f"{round(np.mean(self.stats_df.iter_time_sec[1:].to_numpy()), 2)} seconds", # type: ignore[attr-defined]
"Final misfit RMSE / L2-norm": (
f"{round(self.rmse, 4)} /{round(self.l2_norm, 4)} mGal"
),
"Termination reason": self.termination_reason,
"Iteration times": self.stats_df.iter_time_sec[1:].to_numpy(), # type: ignore[attr-defined]
}
if results_fname is not None:
self.results_fname = results_fname # type: ignore[assignment]
# remove if exists
pathlib.Path(f"{results_fname}.pickle").unlink(missing_ok=True)
with pathlib.Path(f"{results_fname}.pickle").open("wb") as f:
pickle.dump(self, f)
logger.info("results saved to %s.pickle", results_fname)
def grav_cv_score(
self,
results_fname: str | None = None, # noqa: ARG002
rmse_as_median: bool = False, # noqa: ARG002
plot: bool = False, # noqa: ARG002
) -> "Inversion":
"""
DEPRECATED: use the `gravity_score` function instead
"""
# pylint: disable=W0613
msg = "Function `grav_cv_score` renamed to `gravity_score`"
raise DeprecationWarning(msg)
[docs]
def gravity_score(
self,
results_fname: str | None = None,
rmse_as_median: bool = False,
plot: bool = False,
) -> "Inversion":
"""
Find the score, represented by the root mean (or median) squared error (RMSE),
between the testing gravity data, and the predict gravity data after an
inversion. Follows methods of :footcite:t:`uiedafast2017`. Used in
:func:`optimize_inversion_damping`.
Parameters
----------
rmse_as_median : bool, optional
calculate the RMSE as the median as opposed to the mean, by default False
plot : bool, optional
choose to plot the observed and predicted data grids, and their difference,
located at the testing points, by
default False
Returns
-------
inv_copy : Inversion
a copy of the Inversion object after running the inversion on the training
References
----------
:footcite:t:`uiedafast2017`
"""
coord_names = self.model.coord_names
# make copies of Inversion and underlying data and model so as not
# to alter the original
inv_copy = copy.deepcopy(self)
df = inv_copy.data.inv.df
inv_copy.results_fname = results_fname # type: ignore[assignment]
test = df[df.test == True].copy() # noqa: E712 # pylint: disable=singleton-comparison
# temporarily set the gravity dataframe to only the training data
train = df[df.test == False].copy() # noqa: E712 # pylint: disable=singleton-comparison
inv_copy.data = train.set_index([coord_names[1], coord_names[0]]).to_xarray()
# inv_copy.data = inv_copy.data.where(inv_copy.data.test == False).copy()
# retrain attributes
inv_copy.data.attrs.update(self.data.attrs) # pylint: disable=protected-access
with utils._log_level(logging.WARN): # pylint: disable=protected-access
# run inversion
inv_copy.invert(
results_fname=inv_copy.results_fname,
progressbar=False,
plot_convergence=False,
plot_dynamic_convergence=False,
)
if inv_copy.model.model_type == "tesseroids":
zref_used = inv_copy.model.geocentric_radius + inv_copy.model.zref
topography_used = (
inv_copy.model.topography + inv_copy.model.geocentric_radius
)
elif inv_copy.model.model_type == "prisms":
zref_used = inv_copy.model.zref
topography_used = inv_copy.model.topography
else:
msg = "model must have attribute 'model_type' which is either 'prisms' or 'tesseroids'"
raise ValueError(msg)
density_grid = xr.where(
topography_used >= zref_used,
inv_copy.model.density_contrast,
-inv_copy.model.density_contrast,
)
# create new layer of prisms / tesseroids
layer = utils.grid_to_model(
topography_used,
reference=zref_used,
density=density_grid,
model_type=inv_copy.model.model_type,
)
# calculate forward gravity of inverted prism layer
if layer.model_type == "prisms":
test["test_point_grav"] = layer.prism_layer.gravity(
coordinates=(
test[coord_names[0]],
test[coord_names[1]],
test.upward,
),
field="g_z",
progressbar=False,
)
elif layer.model_type == "tesseroids":
test["test_point_grav"] = layer.tesseroid_layer.gravity(
coordinates=(
test[coord_names[0]],
test[coord_names[1]],
test.upward + test.geocentric_radius,
),
field="g_z",
progressbar=False,
)
else:
msg = "layer must have attribute 'model_type' which is either 'prisms' or 'tesseroids'"
raise ValueError(msg)
# subset to only points inside the inner region so edge effects have less effect
# on scores
test = ptk.points_inside_region(
test,
self.data.inner_region,
names=(coord_names[0], coord_names[1]),
)
# compare forward of inverted layer with observed
observed = test.gravity_anomaly - test.reg
predicted = test.test_point_grav
self.gravity_best_score = utils.rmse( # type: ignore[assignment]
predicted - observed, as_median=rmse_as_median
)
if plot:
try:
test_grid = test.set_index([coord_names[1], coord_names[0]]).to_xarray()
obs = test_grid.gravity_anomaly - test_grid.reg
pred = test_grid.test_point_grav.rename("")
epsg, coast = utils.get_epsg(coast=False)
_ = ptk.grid_compare(
pred,
obs,
grid1_name="Predicted gravity",
grid2_name="Observed gravity",
robust=True,
title=f"Score={self.gravity_best_score}",
rmse_in_title=False,
hist=True,
inset=False,
coast=coast,
epsg=epsg,
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return inv_copy
def constraints_cv_score(
self,
constraints_df: pd.DataFrame, # noqa: ARG002
results_fname: str | None = None, # noqa: ARG002
rmse_as_median: bool = False, # noqa: ARG002
) -> "Inversion":
"""
DEPRECATED: use the `constraints_score` function instead
"""
# pylint: disable=W0613
msg = "Function `constraints_cv_score` renamed to `constraints_score`"
raise DeprecationWarning(msg)
[docs]
def constraints_score(
self,
constraints_df: pd.DataFrame,
results_fname: str | None = None,
rmse_as_median: bool = False,
) -> "Inversion":
"""
Find the score, represented by the root mean squared error (RMSE), between the
constraint point elevation, and the inverted topography at the constraint points.
Follows methods of :footcite:t:`uiedafast2017`. Used in
:func:`optimize_inversion_zref_density_contrast`.
Parameters
----------
constraints_df : pandas.DataFrame
a dataframe with columns "easting", "northing", and "upward" for
coordinates and elevation of the constraint points.
results_fname : str | None, optional
file name to save results to as pickle file, by default fname is None
rmse_as_median : bool, optional
calculate the RMSE as the median of the , as opposed to the mean, by default
False
Returns
-------
inv_copy : Inversion
a copy of the Inversion object after running the inversion on the training
data and sampling the inverted topography at the constraint points
References
----------
.. footbibliography::
"""
# make copies of Inversion and underlying data and model dataset so as not
# to alter the original
inv_copy = copy.deepcopy(self)
inv_copy.results_fname = results_fname # type: ignore[assignment]
with utils._log_level(logging.WARN): # pylint: disable=protected-access
# run inversion
inv_copy.invert(
results_fname=inv_copy.results_fname,
progressbar=False,
plot_convergence=False,
plot_dynamic_convergence=False,
)
coord_names = self.model.coord_names
# sample the inverted topography at the constraint points
constraints_df = utils.sample_grids(
df=constraints_df,
grid=inv_copy.model.topography,
sampled_name="inverted_topo",
coord_names=coord_names,
)
# calculate the difference between the inverted topography and the constraint
# elevations
dif = constraints_df.upward - constraints_df.inverted_topo
self.constraints_best_score: float = utils.rmse( # type: ignore[no-redef, assignment]
dif, as_median=rmse_as_median
)
return inv_copy
[docs]
def optimize_inversion_damping(
self,
n_trials: int,
damping_limits: tuple[float, float],
n_startup_trials: int | None = None,
score_as_median: bool = False,
sampler: optuna.samplers.BaseSampler | None = None,
grid_search: bool = False,
fname: str | None = None,
plot_scores: bool = True,
plot_cv: bool | None = None,
plot_grids: bool = False,
logx: bool = True,
logy: bool = True,
progressbar: bool = True,
parallel: bool = False,
seed: int = 0,
) -> "Inversion":
"""
Use Optuna to find the optimal damping regularization parameter for a gravity
inversion. The optimization aims to minimize the cross-validation score,
represented by the root mean (or median) squared error (RMSE), between the testing
gravity data, and the predict gravity data after and inversion. Follows methods of
:footcite:t:`uiedafast2017`.
Provide upper and low damping values, number of trials to run, and specify to let
Optuna choose the best damping value for each trial or to use a grid search. The
results are saved to a pickle file with the best inversion results and the study.
Parameters
----------
n_trials : int
number of damping values to try
n_startup_trials : int | None, optional
number of startup trials, by default is automatically determined
damping_limits : tuple[float, float]
upper and lower limits
score_as_median : bool, optional
if True, changes the scoring from the root mean square to the root median
square, by default False
sampler : optuna.samplers.BaseSampler | None, optional
customize the optuna sampler, by default either GPsampler or GridSampler
depending on if grid_search is True or False
grid_search : bool, optional
search the entire parameter space between damping_limits in n_trial steps, by
default False
fname : str, optional
file name to save both study and inversion results to as pickle files, by
default fname is `tmp_x_damping_cv` where x is a random integer between 0 and
999 and will save study to <fname>_study.pickle and tuple of inversion results
to <fname>.pickle.
plot_scores : bool, optional
plot the cross-validation results, by default True
plot_grids : bool, optional
for each damping value, plot comparison of predicted and testing gravity data,
by default False
logx : bool, optional
make x axis of CV result plot on log scale, by default True
logy : bool, optional
make y axis of CV result plot on log scale, by default True
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
seed : int, optional
random seed for the samplers, by default 0
Returns
-------
inv_copy : Inversion
a copy of the Inversion object after running the inversion with the best
damping value
"""
if plot_cv is not None:
msg = "`plot_cv` parameter renamed to `plot_scores`."
warnings.warn(msg, UserWarning, stacklevel=2)
plot_scores = plot_cv
# make copies of Inversion and underlying data and model dataset so as not
# to alter the original
inv_copy = copy.deepcopy(self)
optuna.logging.set_verbosity(optuna.logging.WARN)
# set file name for saving results with random number between 0 and 999
if fname is None:
inv_copy.results_fname = f"tmp_{random.randint(0, 999)}_damping_cv" # type: ignore[assignment]
else:
inv_copy.results_fname = fname # type: ignore[assignment]
if parallel:
pathlib.Path(f"{inv_copy.results_fname}.log").unlink(missing_ok=True)
pathlib.Path(f"{inv_copy.results_fname}.lock").unlink(missing_ok=True)
pathlib.Path(f"{inv_copy.results_fname}.log.lock").unlink(missing_ok=True)
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(
f"{inv_copy.results_fname}.log"
),
)
else:
storage = None
# define the objective function
objective = optimization.OptimalInversionDamping(
inversion_obj=inv_copy,
damping_limits=damping_limits,
rmse_as_median=score_as_median,
fname=inv_copy.results_fname, # type: ignore[arg-type]
plot_grids=plot_grids,
)
if grid_search:
if n_trials < 4:
msg = (
"if grid_search is True, n_trials must be at least 4, "
"resetting n_trials to 4 now."
)
logger.warning(msg)
n_trials = 4
space = np.logspace(
np.log10(damping_limits[0]), np.log10(damping_limits[1]), n_trials
)
sampler = optuna.samplers.GridSampler(
search_space={"damping": space},
seed=seed,
)
study = optuna.create_study(
direction="minimize",
sampler=sampler,
load_if_exists=False,
study_name=inv_copy.results_fname,
storage=storage,
pruner=optimization.DuplicateIterationPruner,
)
# run optimization
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = optimization.run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_trials,
# callbacks=[_warn_limits_better_than_trial_1_param],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
else:
# define number of startup trials, whichever is bigger; 1/4 of trials or 4
if n_startup_trials is None:
n_startup_trials = max(4, int(n_trials / 4))
n_startup_trials = min(n_startup_trials, n_trials)
logger.info("using %s startup trials", n_startup_trials)
if n_startup_trials >= n_trials:
logger.warning(
"n_startup_trials is >= n_trials resulting in all trials sampled from "
"a QMC sampler instead of the GP sampler",
)
# create study
study = optuna.create_study(
direction="minimize",
sampler=optuna.samplers.QMCSampler(
seed=seed,
qmc_type="halton",
scramble=True,
),
load_if_exists=False,
study_name=inv_copy.results_fname,
storage=storage,
pruner=optimization.DuplicateIterationPruner,
)
# explicitly add the limits as trials
study.enqueue_trial({"damping": damping_limits[0]}, skip_if_exists=True)
study.enqueue_trial({"damping": damping_limits[1]}, skip_if_exists=True)
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = optimization.run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_startup_trials,
# callbacks=[_warn_limits_better_than_trial_1_param],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
# continue with remaining trials with user-defined sampler
# if sampler not provided, used GPsampler as default
if sampler is None:
sampler = optuna.samplers.GPSampler(
n_startup_trials=0,
seed=seed,
deterministic_objective=True,
)
study.sampler = sampler
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = optimization.run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_trials - n_startup_trials,
# callbacks=[_warn_limits_better_than_trial_1_param],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
inv_copy.best_trial = study.best_trial
# warn if any best parameter values are at their limits
optimization.warn_parameter_at_limits(inv_copy.best_trial)
# log the results of the best trial
optimization.log_optuna_results(inv_copy.best_trial)
# get best inversion result of each set
with pathlib.Path(
f"{inv_copy.results_fname}_trial_{inv_copy.best_trial.number}.pickle" # type: ignore[attr-defined]
).open("rb") as f:
inv_results = pickle.load(f)
# remove if exists
pathlib.Path(f"{inv_copy.results_fname}_study.pickle").unlink(missing_ok=True)
pathlib.Path(f"{inv_copy.results_fname}.pickle").unlink(missing_ok=True)
# save study to pickle
with pathlib.Path(f"{inv_copy.results_fname}_study.pickle").open("wb") as f:
pickle.dump(study, f)
# save inversion results tuple to pickle
with pathlib.Path(f"{inv_copy.results_fname}.pickle").open("wb") as f:
pickle.dump(inv_results, f)
# delete all inversion results
for i in range(n_trials):
pathlib.Path(f"{inv_copy.results_fname}_trial_{i}.pickle").unlink(
missing_ok=True
)
inv_copy.damping_cv_study_fname = f"{inv_copy.results_fname}_study.pickle" # type: ignore[assignment]
inv_copy.damping_cv_results_fname = f"{inv_copy.results_fname}.pickle" # type: ignore[assignment]
inv_copy.solver_damping = inv_copy.best_trial.params["damping"] # type: ignore[attr-defined]
inv_copy.study = study
# update the inversion object with the best inversion results
self.__dict__.update(inv_results.__dict__)
if plot_scores is True:
try:
plotting.plot_scores(
study.trials_dataframe().value.to_numpy(),
study.trials_dataframe().params_damping.to_numpy(),
param_name="Damping",
logx=logx,
logy=logy,
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return inv_copy
[docs]
def optimize_inversion_zref_density_contrast(
self,
n_trials: int,
constraints_df: pd.DataFrame,
n_startup_trials: int | None = None,
zref_limits: tuple[float, float] | None = None,
density_contrast_limits: tuple[float, float] | None = None,
starting_topography: xr.Dataset | None = None,
starting_topography_kwargs: dict[str, typing.Any] | None = None,
regional_grav_kwargs: dict[str, typing.Any] | None = None,
score_as_median: bool = False,
sampler: optuna.samplers.BaseSampler | None = None,
grid_search: bool = False,
fname: str | None = None,
plot_scores: bool = True,
plot_cv: bool | None = None,
logx: bool = False,
logy: bool = False,
progressbar: bool = True,
parallel: bool = False,
fold_progressbar: bool = True,
seed: int = 0,
) -> "Inversion":
"""
Run an Optuna optimization to find the optimal zref and or density contrast values
for a gravity inversion. The optimization aims to minimize the cross-validation
score, represented by the root mean (or median) squared error (RMSE), between
points of known topography and the inverted topography. Follows methods of
:footcite:t:`uiedafast2017`. This can optimize for either zref, density contrast,
or both at the same time. Provide upper and low limits for each parameter, number of
trials and let Optuna choose the best parameter values for each trial or use a grid
search to test all values between the limits in intervals of n_trials. The results
are saved to a pickle file with the best inversion results and the study. Since each
new set of zref and density values changes the starting model, for each set of
parameters this function re-calculates the starting gravity, the gravity misfit
and its regional and residual components. `regional_grav_kwargs` are passed to
:meth:`DatasetAccessorInvert4Geom.regional_separation`. Once the optimal parameters are found, the regional
separation and inversion are performed again and saved to <fname>.pickle and
the study is saved to <fname>_study.pickle.
The constraint point minimization regional separation technique uses constraints
points to estimate the regional field, and since constraints are used to calculating
the scoring metric of this function, the constraints need to be separated into
training (regional estimation) and testing (scoring) sets. To do this, supply the
training constraints to`regional_grav_kwargs` via `method="constraint"` or
`method="constraint_cv"` and `constraints_df`, and the testing constraints to this
function as `constraints_df`.
Typically there are not many constraints and omitting some of them from the training
set will significantly impact the regional estimation. To help with this, we can use
a K-Folds approach, where for each set of parameter values, we perform this entire
procedure K times, each time with a different separation of training and testing
points, called a fold. The score associated with that parameter set is the mean of
the K scores. Once the optimal parameter values are found, we then repeat the
inversion using all of the constraints in the regional estimation. For a K-folds
approach, supply lists of dataframes containing only each fold's testing or training
points to the two `constraints_df` arguments. To automatically perform the
test/train split and K-folds optimization, you can also use the convenience function
`optimize_inversion_zref_density_contrast_kfolds`.
Parameters
----------
n_trials : int
number of trials, if grid_search is True, needs to be a perfect square and >=16.
n_startup_trials : int | None, optional
number of startup trials, by default is automatically determined
zref_limits : tuple[float, float] | None, optional
upper and lower limits for the reference level, in meters, by default None
density_contrast_limits : tuple[float, float] | None, optional
upper and lower limits for the density contrast, in kg/m^-3, by default None
starting_topography_kwargs : dict[str, typing.Any] | None, optional
dictionary of kwargs to pass to function `create_topography` which must
include "method". kwargs "region", "spacing", "dataset_to_add",
"upper_confining_layer" and "lower_confining_layer" will automatically be
collected from the model object and passed to `create_topography`.
provided, by default None
regional_grav_kwargs : dict[str, typing.Any] | None, optional
dictionary with kwargs to supply to :meth:`DatasetAccessorInvert4Geom.regional_separation`, by default
None
score_as_median : bool, optional
change scoring metric from root mean square to root median square, by default
False
sampler : optuna.samplers.BaseSampler | None, optional
customize the optuna sampler, by default uses GPsampler unless grid_search
is True, then uses GridSampler.
grid_search : bool, optional
Switch the sampler to GridSampler and search entire parameter space between
provided limits in intervals set by n_trials (for 1 parameter optimizations), or
by the square root of n_trials (for 2 parameter optimizations), by default False
fname : str | None, optional
file name to save both study and inversion results to as pickle files, by
default fname is `tmp_x_zref_density_optimization` where x is a random
integer between 0 and 999 and will save study to <fname>_study.pickle and
the inversion object to <fname>.pickle.
plot_scores : bool, optional
plot the cross-validation results, by default True
logx : bool, optional
use a log scale for the cross-validation plot x-axis, by default False
logy : bool, optional
use a log scale for the cross-validation plot y-axis, by default False
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
fold_progressbar : bool, optional
show a progress bar for each fold of the constraint-point minimization
cross-validation, by default True
seed : int, optional
random seed for the samplers, by default 0
Returns
-------
Inversion
Inversion object with best inversion results and the optimally determined
zref and or density contrast values as attributes of the `model` attribute.
"""
if plot_cv is not None:
msg = "`plot_cv` parameter renamed to `plot_scores`."
warnings.warn(msg, UserWarning, stacklevel=2)
plot_scores = plot_cv
# make copies of Inversion and underlying data and model dataset so as not
# to alter the original
inv_copy = copy.deepcopy(self)
coord_names = inv_copy.model.coord_names
if regional_grav_kwargs is not None:
regional_grav_kwargs = copy.deepcopy(regional_grav_kwargs)
if starting_topography_kwargs is not None:
starting_topography_kwargs = copy.deepcopy(starting_topography_kwargs)
upper_confining_layer = inv_copy.model.upper_confining_layer
lower_confining_layer = inv_copy.model.lower_confining_layer
if inv_copy.model.model_type == "tesseroids":
dataset_to_add = inv_copy.model[
["mask", "geocentric_radius"]
].drop_vars(["top", "bottom"])
else:
dataset_to_add = inv_copy.model[["mask"]].drop_vars(["top", "bottom"])
starting_topography_kwargs["dataset_to_add"] = dataset_to_add
starting_topography_kwargs["upper_confining_layer"] = upper_confining_layer
starting_topography_kwargs["lower_confining_layer"] = lower_confining_layer
starting_topography_kwargs["region"] = inv_copy.model.region
starting_topography_kwargs["spacing"] = inv_copy.model.spacing
starting_topography_kwargs["coord_names"] = inv_copy.model.coord_names
optuna.logging.set_verbosity(optuna.logging.WARN)
# set file name for saving results with random number between 0 and 999
if fname is None:
inv_copy.results_fname = (
f"tmp_{random.randint(0, 999)}_zref_density_optimization" # type: ignore[assignment]
)
else:
inv_copy.results_fname = fname # type: ignore[assignment]
if "test" in inv_copy.data.inv.df.columns:
assert inv_copy.data.inv.df.test.any(), (
"test column contains True value, not needed except for during damping CV"
)
if parallel:
pathlib.Path(f"{inv_copy.results_fname}.log").unlink(missing_ok=True)
pathlib.Path(f"{inv_copy.results_fname}.lock").unlink(missing_ok=True)
pathlib.Path(f"{inv_copy.results_fname}.log.lock").unlink(missing_ok=True)
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(
f"{inv_copy.results_fname}.log"
),
)
else:
storage = None
# get number of parameters included in optimization
num_params = sum(x is not None for x in [zref_limits, density_contrast_limits])
# define the objective function
objective = optimization.OptimalInversionZrefDensity(
inversion_obj=inv_copy,
constraints_df=constraints_df,
fname=inv_copy.results_fname, # type: ignore[arg-type]
regional_grav_kwargs=regional_grav_kwargs, # type: ignore[arg-type]
starting_topography=starting_topography,
starting_topography_kwargs=starting_topography_kwargs,
zref_limits=zref_limits,
density_contrast_limits=density_contrast_limits,
rmse_as_median=score_as_median,
progressbar=fold_progressbar,
)
if grid_search:
if num_params == 1:
if n_trials < 4:
msg = (
"if grid_search is True, n_trials must be at least 4, "
"resetting n_trials to 4 now."
)
logger.warning(msg)
n_trials = 4
if zref_limits is None:
space = np.linspace(
int(density_contrast_limits[0]), # type: ignore[index]
int(density_contrast_limits[1]), # type: ignore[index]
n_trials,
dtype=int,
)
sampler = optuna.samplers.GridSampler(
search_space={"density_contrast": space},
seed=seed,
)
if density_contrast_limits is None:
space = np.linspace(zref_limits[0], zref_limits[1], n_trials) # type: ignore[index]
sampler = optuna.samplers.GridSampler(
search_space={"zref": space},
seed=seed,
)
else:
if n_trials < 16:
msg = (
"if grid_search is True, n_trials must be at least 16, "
"resetting n_trials to 16 now."
)
logger.warning(msg)
n_trials = 16
# n_trials needs to be square for 2 param grid search so each param has
# sqrt(n_trials).
if np.sqrt(n_trials).is_integer() is False:
# get next largest square number
old_n_trials = n_trials
n_trials = (math.floor(math.sqrt(n_trials)) + 1) ** 2
msg = (
"if grid_search is True with provided limits for both zref and "
"density contrast, n_trials (%s) must have an integer square "
"root. Resetting n_trials to to next largest compatible value "
"now (%s)"
)
logger.warning(msg, old_n_trials, n_trials)
zref_space = np.linspace(
zref_limits[0], # type: ignore[index]
zref_limits[1], # type: ignore[index]
int(np.sqrt(n_trials)),
)
density_contrast_space = np.linspace(
int(density_contrast_limits[0]), # type: ignore[index]
int(density_contrast_limits[1]), # type: ignore[index]
int(np.sqrt(n_trials)),
dtype=int,
)
sampler = optuna.samplers.GridSampler(
search_space={
"zref": zref_space,
"density_contrast": density_contrast_space,
},
seed=seed,
)
study = optuna.create_study(
direction="minimize",
sampler=sampler,
load_if_exists=False,
study_name=inv_copy.results_fname,
storage=storage,
pruner=optimization.DuplicateIterationPruner,
)
# run optimization
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = optimization.run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_trials,
# callbacks=[_warn_limits_better_than_trial_multi_params],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
else:
# define number of startup trials, whichever is bigger between 1/4 of trials, or
# 4 x the number of parameters
if n_startup_trials is None:
n_startup_trials = max(num_params * 4, int(n_trials / 4))
n_startup_trials = min(n_startup_trials, n_trials)
logger.info("using %s startup trials", n_startup_trials)
if n_startup_trials >= n_trials:
logger.warning(
"n_startup_trials is >= n_trials resulting in all trials sampled from "
"a QMC sampler instead of the GP sampler",
)
# if sampler not provided, use GPsampler as default unless grid_search is True
if sampler is None:
sampler = optuna.samplers.GPSampler(
n_startup_trials=0,
seed=seed,
deterministic_objective=True,
)
# create study
study = optuna.create_study(
direction="minimize",
sampler=optuna.samplers.QMCSampler(
seed=seed,
qmc_type="halton",
scramble=True,
),
load_if_exists=False,
study_name=inv_copy.results_fname,
storage=storage,
pruner=optimization.DuplicateIterationPruner,
)
# explicitly add the limits as trials
to_enqueue = []
if zref_limits is not None:
zref_trials = [
{"zref": zref_limits[0]},
{"zref": zref_limits[1]},
]
to_enqueue.append(zref_trials)
if density_contrast_limits is not None:
density_contrast_trials = [
{"density_contrast": density_contrast_limits[0]},
{"density_contrast": density_contrast_limits[1]},
]
to_enqueue.append(density_contrast_trials)
# get 2 lists of lists of dicts to enqueue (2 trials)
to_enqueue = np.array(to_enqueue).transpose()
for i in to_enqueue:
# turn list of dicts into single dict
x = {k: v for d in i for k, v in d.items()}
study.enqueue_trial(x, skip_if_exists=True)
# run optimization
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = optimization.run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_startup_trials,
# callbacks=[_warn_limits_better_than_trial_multi_params],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
# continue with remaining trials with user-defined sampler
# if sampler not provided, used GPsampler as default
if sampler is None:
sampler = optuna.samplers.GPSampler(
n_startup_trials=0,
seed=seed,
deterministic_objective=True,
)
study.sampler = sampler
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = optimization.run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_trials - n_startup_trials,
# callbacks=[_warn_limits_better_than_trial_multi_params],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
# warn if any best parameter values are at their limits
optimization.warn_parameter_at_limits(study.best_trial)
# log the results of the best trial
optimization.log_optuna_results(study.best_trial)
# combine testing and training to get a full constraints dataframe
reg_constraints = regional_grav_kwargs.pop("constraints_df", None) # type: ignore[union-attr]
if isinstance(constraints_df, pd.DataFrame):
constraints_df = (
pd.concat([constraints_df, reg_constraints])
.drop_duplicates(subset=[coord_names[0], coord_names[1], "upward"])
.sort_index()
)
else:
constraints_df = (
pd.concat(constraints_df + reg_constraints)
.drop_duplicates(subset=[coord_names[0], coord_names[1], "upward"])
.sort_index()
)
# add to regional grav kwargs
if reg_constraints is not None:
regional_grav_kwargs["constraints_df"] = constraints_df # type: ignore[index]
if starting_topography_kwargs is not None:
starting_topography_kwargs["constraints_df"] = constraints_df
if "weights" in starting_topography_kwargs:
starting_topography_kwargs["weights_col"] = (
starting_topography_kwargs["weights"].name
)
# redo inversion with best parameters
inv_copy.model = inv_copy.model.assign_attrs(
{
"zref": study.best_trial.params.get("zref", inv_copy.model.zref),
}
)
inv_copy.model = inv_copy.model.assign_attrs(
{
"density_contrast": study.best_trial.params.get(
"density_contrast", inv_copy.model.density_contrast
)
}
)
if starting_topography_kwargs is not None:
starting_topography_kwargs["upward"] = inv_copy.model.zref
# run the inversion workflow with the new best parameters
with utils._log_level(logging.WARN): # pylint: disable=protected-access
create_starting_topography = True if starting_topography is None else False # noqa: SIM210 # pylint: disable=simplifiable-if-expression
inv_copy = run_inversion_workflow(
grav_ds=inv_copy.data,
model_type=inv_copy.model.model_type,
create_starting_topography=create_starting_topography,
starting_topography=starting_topography,
starting_topography_kwargs=starting_topography_kwargs,
regional_grav_kwargs=regional_grav_kwargs,
calculate_regional_misfit=True,
zref=inv_copy.model.zref,
density_contrast=inv_copy.model.density_contrast,
upper_confining_layer=inv_copy.model.upper_confining_layer,
lower_confining_layer=inv_copy.model.lower_confining_layer,
buffer_width=inv_copy.model.buffer_width,
fname=inv_copy.results_fname,
inversion_kwargs={
"max_iterations": inv_copy.max_iterations,
"l2_norm_tolerance": inv_copy.l2_norm_tolerance,
"delta_l2_norm_tolerance": inv_copy.delta_l2_norm_tolerance,
"perc_increase_limit": inv_copy.perc_increase_limit,
"deriv_type": inv_copy.deriv_type,
"jacobian_finite_step_size": inv_copy.jacobian_finite_step_size,
"solver_type": inv_copy.solver_type,
"solver_damping": inv_copy.solver_damping,
"apply_weighting_grid": inv_copy.apply_weighting_grid,
"weighting_grid": inv_copy.weighting_grid,
},
progressbar=False,
)
used_zref = float(inv_copy.params["Reference level"][:-2]) # type: ignore[index]
used_density_contrast = float(inv_copy.params["Density contrast(s)"][1:-7]) # type: ignore[index]
assert math.isclose(
used_density_contrast, inv_copy.model.density_contrast, rel_tol=0.02
)
assert math.isclose(used_zref, inv_copy.model.zref, rel_tol=0.02)
# remove if exists
pathlib.Path(f"{inv_copy.results_fname}_study.pickle").unlink(missing_ok=True)
pathlib.Path(f"{inv_copy.results_fname}.pickle").unlink(missing_ok=True)
# save study to pickle
with pathlib.Path(f"{inv_copy.results_fname}_study.pickle").open("wb") as f:
pickle.dump(study, f)
# save inversion results tuple to pickle
with pathlib.Path(f"{inv_copy.results_fname}.pickle").open("wb") as f:
pickle.dump(inv_copy, f)
# delete all inversion results
for i in range(n_trials):
pathlib.Path(f"{inv_copy.results_fname}_trial_{i}.pickle").unlink(
missing_ok=True
)
inv_copy.zref_density_optimization_study_fname = (
f"{inv_copy.results_fname}_study.pickle" # type: ignore[assignment]
)
inv_copy.zref_density_optimization_results_fname = (
f"{inv_copy.results_fname}.pickle" # type: ignore[assignment]
)
inv_copy.study = study
inv_copy.best_trial = study.best_trial
# update the inversion object with the best inversion results
self.__dict__.update(inv_copy.__dict__)
if plot_scores is True:
try:
if zref_limits is None:
plotting.plot_scores(
study.trials_dataframe().value.to_numpy(),
study.trials_dataframe().params_density_contrast.to_numpy(),
param_name="Density contrast (kg/m$^3$)",
plot_title="Density contrast Cross-validation",
logx=logx,
logy=logy,
)
elif density_contrast_limits is None:
plotting.plot_scores(
study.trials_dataframe().value.to_numpy(),
study.trials_dataframe().params_zref.to_numpy(),
param_name="Reference level (m)",
plot_title="Reference level Cross-validation",
logx=logx,
logy=logy,
)
elif grid_search is True:
parameter_pairs = list(
zip(
study.trials_dataframe().params_zref,
study.trials_dataframe().params_density_contrast,
strict=False,
)
)
plotting.plot_2_parameter_scores(
study.trials_dataframe().value.to_numpy(),
parameter_pairs,
param_names=(
"Reference level (m)",
"Density contrast (kg/m$^3$)",
),
)
else:
plotting.plot_2_parameter_scores_uneven(
study,
param_names=(
"params_zref",
"params_density_contrast",
),
plot_param_names=(
"Reference level (m)",
"Density contrast (kg/m$^3$)",
),
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return inv_copy
[docs]
def optimize_inversion_zref_density_contrast_kfolds(
self,
constraints_df: pd.DataFrame,
split_kwargs: dict[str, typing.Any] | None = None,
**kwargs: typing.Any,
) -> "Inversion":
"""
Perform an optimization for zref and density contrast values same as
function `optimize_inversion_zref_density_contrast`, but pass a dataframe of
constraint points and `split_kwargs` which are both passed `split_test_train` create
K-folds of testing and training constraints. For each set of zref/density values,
regional separation and inversion are performed for each of the K-folds in the
constraints dataframe. The score for each parameter set will be the mean of the
K-folds scores.
This then repeats for all parameters. Within each parameter set and fold, the
training constraints are used for the regional separation and the testing
constraints are used for scoring. This optimization performs a total number of
inversions equal to K-folds * number of parameter sets. For 20 parameter sets and 5
K-folds, this is 100 inversions. This extra computational expense is only useful if
the regional separation technique you supply via `regional_grav_kwargs` uses
constraints points for the estimations, such as constraint point minimization
(method='constraints_cv' or method='constraints'). It is more
efficient, but less accurate, to simple use a different regional estimation
technique, which doesn't require constraint points, to find the optimal zref and
density values. Then use these again in another inversion with the desired regional
separation technique. Using the regional method of "constraints" will simply use the
training points and supplied `grid_method` parameter values to calculate a regional
field. Using the regional method of "constraints_cv" will take the training points
and split these into a secondary set of training and testing points. These will be
used internally in the regional separation to find the optimal `grid_method`
parameters.
Parameters
----------
constraints_df
constraints dataframe with columns "easting", "northing", and "upward".
split_kwargs : dict[str, typing.Any] | None, optional
kwargs to be passed to `split_test_train` for splitting constraints_df into
test and train sets, by default None
**kwargs : typing.Any
kwargs to be passed to `optimize_inversion_zref_density_contrast`
Returns
-------
Inversion
Inversion object with best inversion results and the optimally determined
zref and or density contrast values as attributes of the `model` attribute.
"""
# make copies of Inversion and underlying data and model dataset so as not
# to alter the original
inv_copy = copy.deepcopy(self)
# drop any existing fold columns
df = constraints_df.copy()
df = df[df.columns.drop(list(df.filter(regex="fold_")))]
kwargs = copy.deepcopy(kwargs)
# split into test and training sets
testing_training_df = cross_validation.split_test_train(
df,
coord_names=inv_copy.data.coord_names,
**split_kwargs, # type: ignore[arg-type]
)
# get list of training and testing dataframes
test_dfs, train_dfs = cross_validation.kfold_df_to_lists(testing_training_df)
logger.info("Constraints split into %s folds", len(test_dfs))
regional_grav_kwargs = kwargs.pop("regional_grav_kwargs", None)
starting_topography_kwargs = kwargs.pop("starting_topography_kwargs", None)
if regional_grav_kwargs is None:
msg = "must provide regional_grav_kwargs"
raise ValueError(msg)
if starting_topography_kwargs is None:
msg = "must provide starting_topography_kwargs"
raise ValueError(msg)
regional_grav_kwargs["constraints_df"] = train_dfs
starting_topography_kwargs["constraints_df"] = train_dfs
if "weights" in starting_topography_kwargs:
starting_topography_kwargs["weights_col"] = starting_topography_kwargs[
"weights"
].name
inv_copy = inv_copy.optimize_inversion_zref_density_contrast(
constraints_df=test_dfs,
regional_grav_kwargs=regional_grav_kwargs,
starting_topography_kwargs=starting_topography_kwargs,
**kwargs,
)
# update the inversion object with the best inversion results
self.__dict__.update(inv_copy.__dict__)
return inv_copy
###
###
# PLOTTING METHODS
###
###
[docs]
def plot_dynamic_convergence(self) -> None:
"""
plot a dynamic graph of L2-norm and delta L2-norm vs iteration number.
"""
sns.set_theme()
clear_output(wait=True)
# create figure instance
_fig, ax1 = plt.subplots(figsize=(5, 3.5))
# make second y axis for delta l2 norm
ax2 = ax1.twinx()
# plot L2-norm convergence
ax1.plot(
list(range(self.iteration + 1)), # type: ignore[operator]
self.stats_df.l2_norm.to_numpy(), # type: ignore[attr-defined]
"b-",
)
# plot delta L2-norm convergence
if self.iteration > 1: # type: ignore[operator]
ax2.plot(
list(range(self.iteration + 1)), # type: ignore[operator]
self.stats_df.delta_l2_norm.to_numpy(), # type: ignore[attr-defined]
"g-",
)
# set axis labels, ticks and gridlines
ax1.set_xlabel("Iteration")
ax1.set_ylabel("L2-norm", color="b")
ax1.tick_params(axis="y", colors="b", which="both")
ax2.set_ylabel("Δ L2-norm", color="g")
ax2.tick_params(axis="y", colors="g", which="both")
ax2.grid(False)
# add buffer to y axis limits
ax1.set_ylim(0.9 * self.l2_norm_tolerance, self.stats_df.l2_norm.iloc[0]) # type: ignore[attr-defined]
if self.iteration > 1: # type: ignore[operator]
ax2.set_ylim(
self.delta_l2_norm_tolerance,
np.nanmax(self.stats_df.delta_l2_norm.to_numpy()[1:]), # type: ignore[attr-defined]
)
# set x axis to integer values
ax1.xaxis.set_major_locator(mpl.ticker.MaxNLocator(integer=True))
# plot current L2-norm and Δ L2-norm
ax1.plot(
self.iteration,
self.l2_norm,
"^",
markersize=6,
color=sns.color_palette()[3],
# label="current L2-norm",
)
if self.iteration > 1: # type: ignore[operator]
ax2.plot(
self.iteration,
self.delta_l2_norm,
"^",
markersize=6,
color=sns.color_palette()[3],
# label="current Δ L2-norm",
)
# make both y axes align at tolerance levels
plotting.align_yaxis(
ax1, self.l2_norm_tolerance, ax2, self.delta_l2_norm_tolerance
)
# plot horizontal line of tolerances
ax2.axhline(
y=self.delta_l2_norm_tolerance,
linewidth=1,
color="r",
linestyle="dashed",
label="tolerances",
)
# ask matplotlib for the plotted objects and their labels
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc="upper right")
plt.title("Inversion convergence")
plt.tight_layout()
plt.show()
[docs]
def plot_convergence(self) -> None:
"""
plot a graph of L2-norm and delta L2-norm vs iteration number.
"""
sns.set_theme()
# create figure instance
_fig, ax1 = plt.subplots(figsize=(5, 3.5))
# make second y axis for delta l2 norm
ax2 = ax1.twinx()
# plot L2-norm convergence
ax1.plot(range(self.iteration + 1), self.stats_df.l2_norm.to_numpy(), "b-") # type: ignore[attr-defined, operator]
# plot delta L2-norm convergence
ax2.plot(
range(self.iteration + 1), # type: ignore[operator]
self.stats_df.delta_l2_norm.to_numpy(), # type: ignore[attr-defined]
"g-",
)
# set axis labels, ticks and gridlines
ax1.set_xlabel("Iteration")
ax1.set_ylabel("L2-norm", color="b")
ax1.tick_params(axis="y", colors="b", which="both")
ax2.set_ylabel("Δ L2-norm", color="g")
ax2.tick_params(axis="y", colors="g", which="both")
ax2.grid(False)
# add buffer to y axis limits
ax1.set_ylim(0.9 * self.l2_norm_tolerance, self.stats_df.l2_norm.iloc[0]) # type: ignore[attr-defined]
ax2.set_ylim(
self.delta_l2_norm_tolerance,
np.nanmax(self.stats_df.delta_l2_norm.to_numpy()[1:]), # type: ignore[attr-defined]
)
# set x axis to integer values
ax1.xaxis.set_major_locator(mpl.ticker.MaxNLocator(integer=True))
# make both y axes align at tolerance levels
plotting.align_yaxis(
ax1, self.l2_norm_tolerance, ax2, self.delta_l2_norm_tolerance
)
# plot horizontal line of tolerances
ax2.axhline(
y=self.delta_l2_norm_tolerance,
linewidth=1,
color="r",
linestyle="dashed",
label="tolerances",
)
# ask matplotlib for the plotted objects and their labels
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc="upper right")
plt.title("Inversion convergence")
plt.tight_layout()
plt.show()
def plot_inversion_results(
self,
iters_to_plot: int | None = None,
plot_iter_results: bool = True,
plot_topo_results: bool = True,
plot_grav_results: bool = True,
constraints_df: pd.DataFrame | None = None,
coast: bool = False,
**kwargs: typing.Any,
) -> None:
"""
plot various results from the inversion
Parameters
----------
iters_to_plot : int | None, optional
number of iterations to plot, including the first and last, by default None
plot_iter_results : bool, optional
plot the iteration results, by default True
plot_topo_results : bool, optional
plot the topography results, by default True
plot_grav_results : bool, optional
plot the gravity results, by default True
constraints_df : pandas.DataFrame, optional
constraint points to include in the plots
coast : bool, optional
whether to plot coastlines, by default False
"""
# get lists of columns to grid
misfits = [
s
for s in self.data.inv.inner_df.columns.to_list()
if "initial_residual" in s
]
if self.style == "geometry":
topos = [
s
for s in self.model.inv.inner_df.columns.to_list()
if "_layer" in s and "confining" not in s
]
elif self.style == "density":
densities = [
s for s in self.model.inv.inner_df.columns.to_list() if "_density" in s
]
corrections = [
s for s in self.model.inv.inner_df.columns.to_list() if "_correction" in s
]
corrections = corrections[1:]
# list of iterations, e.g. [1,2,3,4]
its = list(range(1, self.iteration + 1)) # type: ignore[operator]
# get on x amount of iterations to plot
if iters_to_plot is not None:
if iters_to_plot > max(its):
iterations = its
else:
iterations = list(np.linspace(1, max(its), iters_to_plot, dtype=int))
else:
iterations = its
# subset columns based on iterations to plot
misfits = [misfits[i] for i in [x - 1 for x in iterations]]
if self.style == "geometry":
topos = [topos[i] for i in [x - 1 for x in iterations]]
elif self.style == "density":
densities = [densities[i] for i in [x - 1 for x in iterations]]
corrections = [corrections[i] for i in [x - 1 for x in iterations]]
# grid all results
ds = self.data.inv.inner
misfit_grids = [ds[g] for g in misfits]
ds = self.model.inv.inner
if self.style == "geometry":
updated_grids = [ds[g] for g in topos]
elif self.style == "density":
updated_grids = [ds[g] for g in densities]
else:
msg = "invalid string for style, should be 'geometry' or 'density'"
raise ValueError(msg)
correction_grids = [ds[g] for g in corrections]
grids = (misfit_grids, updated_grids, correction_grids)
if plot_iter_results is True:
plotting.plot_inversion_iteration_results(
grids,
self.data.inv.inner_df,
self.model.inv.masked_df,
self.params, # type: ignore[arg-type]
iterations,
style=self.style,
topo_cmap_perc=kwargs.get("topo_cmap_perc", 1),
misfit_cmap_perc=kwargs.get("misfit_cmap_perc", 1),
corrections_cmap_perc=kwargs.get("corrections_cmap_perc", 1),
constraints_df=constraints_df,
constraint_size=kwargs.get("constraint_size", 1),
)
if self.style == "geometry" and plot_topo_results is True:
plotting.plot_inversion_topo_results(
self.model.inv.inner,
constraints_df=constraints_df,
constraint_style=kwargs.get("constraint_style", "x.3c"),
fig_height=kwargs.get("fig_height", 12),
coast=coast,
)
if plot_grav_results is True:
plotting.plot_inversion_grav_results(
self.data.inv.inner_df,
self.data.inner_region,
constraints_df=constraints_df,
fig_height=kwargs.get("fig_height", 12),
constraint_style=kwargs.get("constraint_style", "x.3c"),
coast=coast,
)
[docs]
def run_inversion_workflow(
grav_ds: xr.Dataset,
create_starting_topography: bool = False,
calculate_starting_gravity: bool = False,
calculate_regional_misfit: bool = False,
run_damping_cv: bool = False,
run_zref_or_density_optimization: bool = False,
run_zref_or_density_cv: bool | None = None,
run_kfolds_zref_or_density_optimization: bool = False,
run_kfolds_zref_or_density_cv: bool | None = None,
fname: str | None = None,
starting_topography: xr.Dataset | None = None,
starting_topography_kwargs: dict[str, typing.Any] | None = None,
density_contrast: float | None = None,
zref: float | None = None,
model_type: str = "prisms",
upper_confining_layer: xr.Dataset | None = None,
lower_confining_layer: xr.Dataset | None = None,
buffer_width: float = 0,
regional_grav_kwargs: dict[str, typing.Any] | None = None,
constraints_df: pd.DataFrame | None = None,
inversion_kwargs: dict[str, typing.Any] | None = None,
damping_cv_kwargs: dict[str, typing.Any] | None = None,
zref_density_optimization_kwargs: dict[str, typing.Any] | None = None,
zref_density_cv_kwargs: dict[str, typing.Any] | None = None,
progressbar: bool = True,
) -> "Inversion":
"""
This function runs the full inversion workflow. Depending on the input parameters,
it will:
1) create a starting topography model
2) create a starting prism model
3) calculate the starting gravity of the prism model
4) calculate the gravity misfit
5) calculate the regional and residual components of the misfit
6) run the inversion to update the prism model
7) run a cross-validation for determining optimal values for damping, density, and
zref
Parameters
----------
grav_ds : xarray.Dataset
gravity data with variables 'upward' and 'gravity_anomaly'.
create_starting_topography : bool, optional
Choose whether to create starting topography model. If True, must provide
`starting_topography_kwargs`, if False must provide `starting_topography` by
default False
calculate_starting_gravity : bool, optional
Choose whether to calculate starting gravity from prisms model. If False, must
provide column "forward_gravity" in grav_df , by default False
calculate_regional_misfit : bool, optional
Choose whether to calculate regional misfit. If False, must provide column "reg"
in grav_df, if True, must provide`regional_grav_kwargs`, by default False
run_damping_cv : bool, optional
Choose whether to run cross validation for damping, if True, must provide
dictionary `damping_cv_kwargs` with parameters `n_trials` and `damping_limits`,
by default False
run_zref_or_density_optimization : bool, optional
Choose whether to run cross validation for zref or density, if True, must
provide dictionary `zref_density_optimization_kwargs` with parameters `n_trials`, and
either `zref_values` or `density_values`, by default False
run_kfolds_zref_or_density_optimization : bool, optional
Choose whether to run internal kfolds cross validation for zref or density, if
True, must provide `split_kwargs` as argument to `zref_density_optimization_kwargs`, by
default False
fname : str | None, optional
filename and path to use for saving results. If running a damping
CV, will save the study to <fname>_damping_cv_study.pickle and the tuple of the
best inversion results to <fname>_damping_cv.pickle. If running a
density/zref optimization, will save the study to
<fname>_zref_density_optimization_study.pickle and the tuple of the best
inversion results to <fname>_zref_density_optimization.pickle. The final
inversion result for all methods will be saved to <fname>.pickle, by default
will be "tmp_<x>"
where x is a random integer between 0 and 999.
starting_topography : xarray.Dataset | None, optional
a starting topography model with variable `upward`, by default None
starting_topography_kwargs : dict[str, typing.Any] | None, optional
kwargs needed for create a starting topography grid, passed to
`create_topography()`, by default None
density_contrast : float | None, optional
density contrast for the starting prisms, by default None
zref : float | None, optional
reference depth for the starting prisms, by default None
model_type : str, optional
type of model to create, either "prisms" or "tesseroids", by default "prisms"
upper_confining_layer : xarray.Dataset | None, optional
an upper confining layer model with variable `upward`, by default None
lower_confining_layer : xarray.Dataset | None, optional
a lower confining layer model with variable `upward`, by default None
buffer_width : float, optional
The width in meters of a buffer zone used to zoom-in on the provided data
creating an inner region. This inner region will be used for plotting and
calculating statistics, this avoids skewing plots and values by edge effects,
by default is None.
regional_grav_kwargs : dict[str, typing.Any] | None, optional
kwargs needed for estimating regional gravity, passed to
:meth:`DatasetAccessorInvert4Geom.regional_separation`, by default None
constraints_df : pandas.DataFrame | None, optional
Dataframe of constraint points, by default None
inversion_kwargs : dict[str, typing.Any] | None, optional
kwargs to be passed to `Inversion()`, by default None
damping_cv_kwargs : dict[str, typing.Any] | None, optional
kwargs to be passed to `optimize_inversion_damping()`, by default None
zref_density_optimization_kwargs : dict[str, typing.Any] | None, optional
kwargs to be passed to `optimize_inversion_zref_density_contrast()` or
`optimize_inversion_zref_density_contrast_kfolds()`, by default None
progressbar : bool, optional
whether to show progress bar during inversion, by default True
Returns
-------
Inversion
a inversion object with all results and optimal values as attributes.
"""
if run_zref_or_density_cv is not None:
msg = "`run_zref_or_density_cv` parameter has been renamed to `run_zref_or_density_optimization`"
raise DeprecationWarning(msg)
if run_kfolds_zref_or_density_cv is not None:
msg = "`run_kfolds_zref_or_density_cv` parameter has been renamed to `run_kfolds_zref_or_density_optimization`"
raise DeprecationWarning(msg)
if zref_density_cv_kwargs is not None:
msg = "`zref_density_cv_kwargs` parameter has been renamed to `zref_density_optimization_kwargs`"
raise DeprecationWarning(msg)
if isinstance(grav_ds, pd.DataFrame):
msg = "`run_inversion_workflow` function has been updated, gravity data must be provided to parameter `grav_ds` created through the `create_data` function"
raise DeprecationWarning(msg)
# set file name for saving results with random number between 0 and 999
if fname is None:
fname = f"tmp_{random.randint(0, 999)}"
logger.info("saving all results with root name '%s'", fname)
###
###
# figure out what needs to be done
###
###
if starting_topography is None:
create_starting_topography = True
# if creating starting topo, must also create starting prisms
if create_starting_topography is True:
calculate_starting_gravity = True
# if calculating starting gravity, must also calculate gravity misfit
if calculate_starting_gravity is True:
calculate_regional_misfit = True
logger.debug("creating starting topo: %s", create_starting_topography)
logger.debug("calculating starting gravity: %s", calculate_starting_gravity)
logger.debug("calculating regional misfit: %s", calculate_regional_misfit)
grav_ds = grav_ds.copy()
###
###
# check needed inputs are provided at the beginning
###
###
if (calculate_regional_misfit is True) or ( # noqa: SIM102
run_zref_or_density_optimization is True
):
if regional_grav_kwargs is None:
msg = (
"regional_grav_kwargs must be provided if recalculating regional "
"gravity or performing zref or density optimization"
)
raise ValueError(msg)
if (
run_kfolds_zref_or_density_optimization is True
and run_zref_or_density_optimization is False
):
msg = "run_zref_or_density_optimization must be True if run_kfolds_zref_or_density_optimization is True"
raise ValueError(msg)
if run_zref_or_density_optimization is True:
if constraints_df is None:
msg = "must provide constraints_df if run_zref_or_density_optimization is True"
raise ValueError(msg)
if zref_density_optimization_kwargs is None:
msg = "must provide zref_density_optimization_kwargs with parameters `n_trials`, and 1 or both of `zref_limits` and `density_contrast_limits` if run_zref_or_density_optimization is True"
raise ValueError(msg)
if run_kfolds_zref_or_density_optimization is True:
if zref_density_optimization_kwargs.get("split_kwargs") is None:
msg = "split_kwargs must be provided if performing internal kfolds CV"
raise ValueError(msg)
elif "constraints_df" in regional_grav_kwargs: # type: ignore[operator]
msg = (
"if performing density/zref optimization, it's best to not use constraints "
"in the regional separation"
)
logger.warning(msg)
if zref_density_optimization_kwargs.get("density_contrast_limits") is None:
assert density_contrast is not None
if zref_density_optimization_kwargs.get("zref_limits") is None:
assert zref is not None
if (zref_density_optimization_kwargs.get("density_contrast_limits") is None) & (
zref_density_optimization_kwargs.get("zref_limits") is None
):
msg = (
"must provide density_contrast_limits or zref_limits if run_zref_or_"
"density_optimization is True"
)
raise ValueError(msg)
if run_damping_cv is True:
if (
("test" in grav_ds)
and (False in grav_ds.inv.df.test.unique())
and (True in grav_ds.inv.df.test.unique())
):
pass
else:
# resample data at 1/2 spacing to include test points for cross-validation
grav_ds = cross_validation.add_test_points(grav_ds)
if damping_cv_kwargs is None:
msg = "must provide damping_cv_kwargs with parameters `damping_limits` and `n_trials` if run_damping_cv is True"
raise ValueError(msg)
# Starting Topography
if create_starting_topography is False:
if (starting_topography is None) & (run_zref_or_density_optimization is False):
msg = (
"starting_topography must be provided since create_starting_topography "
"is False."
)
raise ValueError(msg)
logger.debug("not creating starting topo because it is provided")
elif create_starting_topography is True:
if starting_topography is not None:
msg = (
"starting_topography provided but unused since "
"create_starting_topography is True"
)
logger.warning(msg)
if starting_topography_kwargs is None:
msg = (
"starting_topography_kwargs must be provided if "
"create_starting_topography is True"
)
raise ValueError(msg)
starting_topography_kwargs["upper_confining_layer"] = upper_confining_layer
starting_topography_kwargs["lower_confining_layer"] = lower_confining_layer
with utils._log_level(logging.WARN): # pylint: disable=protected-access
# create the starting topography
starting_topography = utils.create_topography(
**starting_topography_kwargs,
)
logger.debug("starting topo created")
# starting prism model
model = create_model(
zref=zref, # type: ignore[arg-type]
density_contrast=density_contrast,
topography=starting_topography,
buffer_width=buffer_width,
model_type=model_type,
upper_confining_layer=upper_confining_layer,
lower_confining_layer=lower_confining_layer,
)
logger.debug("starting prisms created")
# starting gravity of prism model
if calculate_starting_gravity is False:
if "forward_gravity" not in grav_ds:
msg = (
"'forward_gravity' must be a variable of `grav_ds` if "
"calculate_starting_gravity is False"
)
raise ValueError(msg)
logger.debug("not calculating starting forward gravity because it is provided")
elif calculate_starting_gravity is True:
if "forward_gravity" in grav_ds:
msg = (
"'forward_gravity' already a variable of `grav_ds`, but is being "
"overwritten since calculate_starting_gravity is True"
)
logger.warning(msg)
logger.debug("calculating starting forward gravity")
grav_ds.inv.forward_gravity(
model,
progressbar=False,
)
logger.debug("starting forward gravity calculated")
# Regional Component of Misfit
if calculate_regional_misfit is False:
if ("misfit" not in grav_ds) & (run_zref_or_density_optimization is False):
msg = (
"'misfit' must be a column of `grav_df` if calculate_regional_misfit is"
" False"
)
raise ValueError(msg)
if ("reg" not in grav_ds) & (run_zref_or_density_optimization is False):
msg = (
"'reg' must be a column of `grav_df` if calculate_regional_misfit is"
" False"
)
raise ValueError(msg)
logger.debug("not calculating regional misfit because it is provided")
elif calculate_regional_misfit is True:
if "reg" in grav_ds:
msg = (
"'reg' already a column of `grav_df`, but is being overwritten since"
" calculate_regional_misfit is True"
)
logger.warning(msg)
logger.debug("calculating regional misfit")
logger.debug("regional_grav_kwargs: %s", regional_grav_kwargs)
grav_ds.inv.regional_separation(**regional_grav_kwargs)
logger.debug("regional misfit calculated")
# initialize inversion
inv = Inversion(
grav_ds,
model,
**inversion_kwargs, # type: ignore[arg-type]
)
inv.results_fname = fname # type: ignore[assignment]
###
###
# SINGLE INVERSION
###
###
# run only the inversion with specified damping, density, and zref values
if (run_damping_cv is False) & (run_zref_or_density_optimization is False):
logger.info("running individual inversion")
with utils._log_level(logging.WARN): # pylint: disable=protected-access
inv.invert(results_fname=fname, progressbar=progressbar)
logger.info("inversion complete, results saved to '%s'", fname)
return inv
###
###
# DAMPING CV
###
###
if run_damping_cv is True:
logger.info("running damping cross validation")
_damping_cv_obj = inv.optimize_inversion_damping(
fname=f"{fname}_damping_cv",
**damping_cv_kwargs, # type: ignore[arg-type]
)
assert inv.solver_damping is not None
logger.info(
"damping cross validation complete with optimal damping: %f",
inv.solver_damping,
)
###
###
# DENSITY / ZREF OPTIMIZATION
###
###
if run_zref_or_density_optimization is True:
logger.info("running zref and/or density contrast cross validation")
# drop the testing data
inv.data = cross_validation.remove_test_points(inv.data)
# if chosen, run an internal K-folds CV for regional separation within the
# density/Zref optimization
if run_kfolds_zref_or_density_optimization is True:
logger.info("running internal K-folds CV for regional separation")
_zref_density_optimization_obj = (
inv.optimize_inversion_zref_density_contrast_kfolds( # type: ignore[arg-type]
fname=f"{fname}_zref_density_optimization",
constraints_df=constraints_df,
fold_progressbar=True,
**zref_density_optimization_kwargs, # typing: ignore[arg-type]
)
)
# run the normal non-kfolds optimization
else:
_zref_density_optimization_obj = (
inv.optimize_inversion_zref_density_contrast(
fname=f"{fname}_zref_density_optimization",
constraints_df=constraints_df,
**zref_density_optimization_kwargs, # type: ignore[arg-type]
)
)
logger.info(
"zref and/or density contrast cross validation complete with density contrast %s kg/m3 and zref %s m",
inv.model.density_contrast,
inv.model.zref,
)
# save inversion results to pickle
pathlib.Path(f"{fname}.pickle").unlink(missing_ok=True)
with pathlib.Path(f"{fname}.pickle").open("wb") as f:
pickle.dump(inv, f)
logger.info("results saved to %s", f"{fname}.pickle")
# ensure final inversion used the best parameters
if run_damping_cv is True:
assert inv.solver_damping == _damping_cv_obj.study.best_trial.params["damping"] # type: ignore[attr-defined]
assert inv.params["Solver damping"] == inv.solver_damping # type: ignore[index]
if run_zref_or_density_optimization is True:
assert (
inv.model.density_contrast
== _zref_density_optimization_obj.study.best_trial.params.get( # type: ignore[attr-defined]
"density_contrast", density_contrast
)
)
assert (
inv.model.zref
== _zref_density_optimization_obj.study.best_trial.params.get( # type: ignore[attr-defined]
"zref", zref
)
)
used_zref = float(inv.params["Reference level"][:-2]) # type: ignore[index]
used_density_contrast = float(inv.params["Density contrast(s)"][1:-7]) # type: ignore[index]
assert math.isclose(
used_density_contrast, inv.model.density_contrast, rel_tol=0.02
)
assert math.isclose(used_zref, inv.model.zref, rel_tol=0.02)
return inv