import copy # pylint: disable=too-many-lines
import logging
import math
import multiprocessing
import os
import pathlib
import pickle
import random
import re
import subprocess
import typing
import warnings
import harmonica as hm
import joblib
import numpy as np
import optuna
import pandas as pd
import psutil
import verde as vd
import xarray as xr
from numpy.typing import NDArray
from tqdm.autonotebook import tqdm
from invert4geom import (
cross_validation,
inversion,
logger,
plotting,
utils,
)
if typing.TYPE_CHECKING:
from invert4geom.inversion import Inversion
warnings.simplefilter(
"ignore",
category=optuna.exceptions.ExperimentalWarning,
)
def available_cpu_count() -> typing.Any:
"""
Number of available virtual or physical CPUs on this system, i.e.
user/real as output by time(1) when called with an optimally scaling
userspace-only program
Adapted from https://stackoverflow.com/a/1006301/18686384
"""
# cpuset
# cpuset may restrict the number of *available* processors
try:
# m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", open("/proc/self/status").read())
with pathlib.Path("/proc/self/status").open(encoding="utf8") as f:
m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", f.read())
if m:
res = bin(int(m.group(1).replace(",", ""), 16)).count("1")
if res > 0:
return res
except OSError:
pass
# Python 2.6+
try:
return multiprocessing.cpu_count()
except (ImportError, NotImplementedError):
pass
# https://github.com/giampaolo/psutil
try:
return psutil.cpu_count() # psutil.NUM_CPUS on old versions
except (ImportError, AttributeError):
pass
# POSIX
try:
res = int(os.sysconf("SC_NPROCESSORS_ONLN"))
if res > 0:
return res
except (AttributeError, ValueError):
pass
# Windows
try:
res = int(os.environ["NUMBER_OF_PROCESSORS"])
if res > 0:
return res
except (KeyError, ValueError):
pass
# BSD
try:
with subprocess.Popen(
["sysctl", "-n", "hw.ncpu"], stdout=subprocess.PIPE
) as sysctl:
# sysctl = subprocess.Popen(["sysctl", "-n", "hw.ncpu"],
# stdout=subprocess.PIPE)
sc_std_out = sysctl.communicate()[0]
res = int(sc_std_out)
if res > 0:
return res
except (OSError, ValueError):
pass
# Linux
try:
# res = open("/proc/cpuinfo").read().count("processor\t:")
with pathlib.Path("/proc/cpuinfo").open(encoding="utf8") as f:
res = f.read().count("processor\t:")
if res > 0:
return res
except OSError:
pass
# Solaris
try:
pseudo_devices = os.listdir("/devices/pseudo/") # noqa: PTH208
res = 0
for pds in pseudo_devices:
if re.match(r"^cpuid@[0-9]+$", pds):
res += 1
if res > 0:
return res
except OSError:
pass
# Other UNIXes (heuristic)
try:
try:
# dmesg = open("/var/run/dmesg.boot").read()
with pathlib.Path("/var/run/dmesg.boot").open(encoding="utf8") as f:
dmesg = f.read()
# dmesg = pathlib.Path("/var/run/dmesg.boot").open().read()
except OSError:
with subprocess.Popen(["dmesg"], stdout=subprocess.PIPE) as dmesg_process:
# dmesg_process = subprocess.Popen(["dmesg"], stdout=subprocess.PIPE)
dmesg = dmesg_process.communicate()[0] # type: ignore[assignment]
res = 0
while "\ncpu" + str(res) + ":" in dmesg:
res += 1
if res > 0:
return res
except OSError:
pass
msg = "Can not determine number of CPUs on this system"
raise Exception(msg) # pylint: disable=broad-exception-raised
def run_optuna(
study: optuna.study.Study,
objective: typing.Callable[..., float],
n_trials: int,
storage: optuna.storages.BaseStorage | None = None,
maximize_cpus: bool = True,
parallel: bool = False,
progressbar: bool | None = None,
callbacks: typing.Any | None = None,
) -> optuna.study.Study:
"""
Run optuna optimization, optionally in parallel. Pre-define the study, and objective
function, and if parallel is True, the storage (preferably with JournalStorage) and
study name.
"""
optuna.logging.set_verbosity(optuna.logging.WARN)
# set up parallel processing and run optimization
if parallel is True:
if progressbar is None:
progressbar = False
if storage is None:
msg = "if running in parallel, must provide an Optuna storage object"
raise ValueError(msg)
study_name = study.study_name
def optimize_study(
study_name: str,
storage: optuna.storages.BaseStorage,
objective: typing.Callable[..., float],
n_trials: int,
) -> None:
study = optuna.load_study(study_name=study_name, storage=storage)
optuna.logging.set_verbosity(optuna.logging.WARN)
study.optimize(
objective,
n_trials=n_trials,
)
_optuna_set_cores(
n_trials=n_trials,
optimize_study=optimize_study,
study_name=study_name,
storage=storage,
objective=objective,
max_cores=maximize_cpus,
)
# reload the study
return optuna.load_study(
study_name=study_name,
storage=storage,
)
# run in normal, non-parallel mode
if progressbar is None:
progressbar = True
study.optimize(
objective,
n_trials=n_trials,
callbacks=callbacks,
show_progress_bar=progressbar,
)
return study
def _optuna_set_cores(
n_trials: int,
optimize_study: typing.Callable[..., None],
study_name: str,
storage: typing.Any,
objective: typing.Callable[..., float],
max_cores: bool = True,
) -> None:
"""
Set up optuna optimization in parallel splitting up the number of trials over either
all available cores or giving each available core 1 trial.
"""
if max_cores:
# get available cores (UNIX and Windows)
# num_cores = len(psutil.Process().cpu_affinity())
num_cores = available_cpu_count()
# set trials per job
trials_per_job = math.ceil(n_trials / num_cores)
# set number of jobs
n_jobs = num_cores if n_trials >= num_cores else n_trials
else:
trials_per_job = 1
n_jobs = int(n_trials / trials_per_job)
logger.info(
"Running %s trials with %s jobs with up to %s trials per job",
n_trials,
n_jobs,
trials_per_job,
)
try:
joblib.Parallel(n_jobs=n_jobs)(
joblib.delayed(optimize_study)(
study_name,
storage,
objective,
n_trials=trials_per_job,
)
for i in range(n_trials)
)
except FileNotFoundError:
logger.exception("FileNotFoundError occurred in parallel optimization")
pathlib.Path(f"{study_name}.log.lock").unlink(missing_ok=True)
pathlib.Path(f"{study_name}.lock").unlink(missing_ok=True)
def warn_parameter_at_limits(
trial: optuna.trial.FrozenTrial,
) -> None:
"""
Warn if any best parameter values are at their limits
Parameters
----------
trial : optuna.trial.FrozenTrial
optuna trial, most likely should be the best trial.
"""
for k, v in trial.params.items():
dist = trial.distributions.get(k)
lims = (dist.high, dist.low)
if v in lims:
logger.warning(
"Best %s value (%s) is at the limit of provided values "
"%s and thus is likely not a global minimum, expand the range of "
"values tested to ensure the best parameter value is found.",
k,
v,
lims,
)
def log_optuna_results(
trial: optuna.trial.FrozenTrial,
) -> None:
"""
Log the results of an optuna trial
Parameters
----------
trial : optuna.trial.FrozenTrial
optuna trial
"""
logger.info("Trial with best score: ")
logger.info("\ttrial number: %s", trial.number)
logger.info("\tparameter: %s", trial.params)
logger.info("\tscores: %s", trial.values)
def _create_regional_separation_study(
optimize_on_true_regional_misfit: bool,
separate_metrics: bool,
sampler: optuna.samplers.BaseSampler,
true_regional: xr.DataArray | None = None,
parallel: bool = True,
fname: str | None = None,
) -> tuple[optuna.study.Study, optuna.storages.BaseStorage | None]:
"""
Creates a study, sets directions and metric names based on the input parameters.
Parameters
----------
optimize_on_true_regional_misfit : bool
choose to optimize on the true regional misfit instead of the residual misfit at
constraints and the residual misfit amplitude.
separate_metrics : bool
choose to optimize on the residual misfit at constraints and the residual misfit
amplitude as separate metrics, as opposed to them as a ratio.
sampler : optuna.samplers.BaseSampler
sampler object
true_regional : xarray.DataArray | None, optional
grid of true regional values, by default None
parallel : bool, optional
inform whether the study should be run in run in parallel, by default True. If
True, uses file storage, which slows down the optimization, but allows for
running in parallel.
fname : str | None, optional
file name to save the study to, by default None
Returns
-------
study : optuna.study.Study
return a study object with direction, sampler, and metric names set
storage : optuna.storages.BaseStorage | None
return an optuna storage object if parallel is True, otherwise None
"""
direction = None
directions = None
if fname is None:
fname = f"tmp_{random.randint(0, 999)}"
if parallel:
pathlib.Path(f"{fname}.log").unlink(missing_ok=True)
pathlib.Path(f"{fname}.lock").unlink(missing_ok=True)
pathlib.Path(f"{fname}.log.lock").unlink(missing_ok=True)
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(f"{fname}.log"),
)
else:
storage = None
if optimize_on_true_regional_misfit is True:
if true_regional is None:
msg = (
"if optimizing on true regional misfit, must provide true_regional grid"
)
raise ValueError(msg)
direction = "minimize"
metric_names = ["difference with true regional"]
logger.info("optimizing on minimizing the true regional misfit")
elif separate_metrics is True:
directions = ["minimize", "maximize"]
metric_names = ["residual at constraints", "amplitude of residual"]
else:
direction = "minimize"
metric_names = ["combined scores"]
study = optuna.create_study(
direction=direction,
directions=directions,
sampler=sampler,
study_name=fname,
storage=storage,
load_if_exists=False,
pruner=DuplicateIterationPruner,
)
study.set_metric_names(metric_names)
return study, storage
def _logging_callback(
study: optuna.study.Study,
frozen_trial: optuna.trial.FrozenTrial,
) -> None:
"""
custom optuna callback, only log trial info if it's the best value yet.
Parameters
----------
study : optuna.study.Study
optuna study
frozen_trial : optuna.trial.FrozenTrial
current trial
"""
optuna.logging.set_verbosity(optuna.logging.WARN)
if study._is_multi_objective() is False: # pylint: disable=protected-access
previous_best_value = study.user_attrs.get("previous_best_value", None)
if previous_best_value != study.best_value:
study.set_user_attr("previous_best_value", study.best_value)
logger.info(
"Trial %s finished with best value: %s and parameters: %s.",
frozen_trial.number,
frozen_trial.value,
frozen_trial.params,
)
elif frozen_trial.number in [n.number for n in study.best_trials]:
msg = (
"Trial %s is on the Pareto front with value 1: %s, value 2: %s and "
"parameters: %s."
)
logger.info(
msg,
frozen_trial.number,
frozen_trial.values[0], # noqa: PD011
frozen_trial.values[1], # noqa: PD011
frozen_trial.params,
)
def _warn_limits_better_than_trial_1_param(
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
) -> None:
"""
custom optuna callback, warn if limits provide better score than current trial
Parameters
----------
study : optuna.study.Study
optuna study
trial : optuna.trial.FrozenTrial
current trial
"""
# exit if one of first 2 trials (lower and upper limits)
if trial.number < 2:
return
# get scores of lower and upper limits
# this assumes that the first two trials are the lower and upper limits set by
# study.enqueue_trial()
lower_limit_score = study.trials[0].value
upper_limit_score = study.trials[1].value
msg = (
"Current trial (#%s, %s) has a worse score (%s) than either of the lower "
"(%s) or upper (%s) parameter value limits, it might be best to stop the "
"study and expand the limits."
)
if lower_limit_score is None:
return
if upper_limit_score is None:
return
# if study direction is minimize
if study.direction == optuna.study.StudyDirection.MINIMIZE:
# if current trial is worse than either limit, log a warning
if trial.values[0] > max(lower_limit_score, upper_limit_score): # noqa: PD011
logger.info(
msg,
trial.number,
trial.params,
trial.values[0], # noqa: PD011
lower_limit_score,
upper_limit_score,
)
else:
pass
# if study direction is maximize
if study.direction == optuna.study.StudyDirection.MAXIMIZE:
# if current trial is worse than either limit, log a warning
if trial.values[0] < min(lower_limit_score, upper_limit_score): # noqa: PD011
logger.info(
msg,
trial.number,
trial.params,
trial.values[0], # noqa: PD011
lower_limit_score,
upper_limit_score,
)
else:
pass
def _warn_limits_better_than_trial_multi_params(
study: optuna.study.Study,
trial: optuna.trial.FrozenTrial,
) -> None:
"""
custom optuna callback, warn if limits provide better score than current trial for
multiple parameter optimization
Parameters
----------
study : optuna.study.Study
optuna study
trial : optuna.trial.FrozenTrial
current trial
"""
# number of parameters in the study
num_params = len(trial.params)
# get number of combos (2 params->4 trials, 3 params->8 trials etc.)
num_combos = 2**num_params
# exit if one of enqueued trials
if trial.number < num_combos:
return
# get scores of combos of upper and lower limits of both parameters
# this assumes that the first four trials are set by study.enqueue_trial()
scores = []
for i in range(num_combos):
scores.append(study.trials[i].value)
msg = (
"Current trial (#%s, %s) has a worse score (%s) than any of the combinations "
"of parameter value limits, it might be best to stop the study and expand the "
"limits."
)
# if study direction is minimize
if study.direction == optuna.study.StudyDirection.MINIMIZE:
# if current trial is worse than either limit, log a warning
if trial.values[0] > max(scores): # noqa: PD011
logger.info(
msg,
trial.number,
trial.params,
trial.values[0], # noqa: PD011
)
else:
pass
# if study direction is maximize
if study.direction == optuna.study.StudyDirection.MAXIMIZE:
# if current trial is worse than either limit, log a warning
try:
if trial.values[0] < min(scores): # noqa: PD011
logger.info(
msg,
trial.number,
trial.params,
trial.values[0], # noqa: PD011
)
except TypeError:
pass
else:
pass
class OptimalInversionDamping:
"""
Objective function to use in an Optuna optimization for finding the optimal damping
regularization value for a gravity inversion. Used within function
:func:`optimize_inversion_damping`.
"""
def __init__(
self,
inversion_obj: "Inversion",
fname: str,
damping_limits: tuple[float, float],
rmse_as_median: bool = False,
plot_grids: bool = False,
) -> None:
self.inversion_obj = inversion_obj
self.fname = fname
self.rmse_as_median = rmse_as_median
self.damping_limits = damping_limits
self.plot_grids = plot_grids
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the score of the eq_sources fit
"""
self.inversion_obj.solver_damping = trial.suggest_float(
"damping",
self.damping_limits[0],
self.damping_limits[1],
log=True,
)
trial.set_user_attr("fname", f"{self.fname}_trial_{trial.number}")
_new_inversion_obj = self.inversion_obj.gravity_score(
rmse_as_median=self.rmse_as_median,
plot=self.plot_grids,
results_fname=trial.user_attrs.get("fname"),
)
return self.inversion_obj.gravity_best_score # type: ignore[return-value]
def optimize_inversion_damping(
training_df: pd.DataFrame, # noqa: ARG001
testing_df: pd.DataFrame, # noqa: ARG001
n_trials: int, # noqa: ARG001
damping_limits: tuple[float, float], # noqa: ARG001
n_startup_trials: int | None = None, # noqa: ARG001
score_as_median: bool = False, # noqa: ARG001
sampler: optuna.samplers.BaseSampler | None = None, # noqa: ARG001
grid_search: bool = False, # noqa: ARG001
fname: str | None = None, # noqa: ARG001
plot_scores: bool = True, # noqa: ARG001
plot_grids: bool = False, # noqa: ARG001
logx: bool = True, # noqa: ARG001
logy: bool = True, # noqa: ARG001
progressbar: bool = True, # noqa: ARG001
parallel: bool = False, # noqa: ARG001
seed: int = 0, # noqa: ARG001
**kwargs: typing.Any, # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `Inversion` class method `optimize_inversion_damping` instead
"""
# pylint: disable=W0613
msg = (
"Function `optimize_inversion_damping` deprecated, use the `Inversion` class method "
"`optimize_inversion_damping` instead"
)
raise DeprecationWarning(msg)
class OptimalInversionZrefDensity:
"""
Objective function to use in an Optuna optimization for finding the optimal values
for zref and or density contrast values for a gravity inversion. This class is used
within the function `optimize_inversion_zref_density_contrast`. If using constraint
point minimization for the regional separation, split constraints into testing and
training sets and provide the testing set to argument `constraints_df` and the
training set to the `constraints_df` argument of `regional_grav_kwargs`. To perform
K-folds cross-validation, provide lists of constraints dataframes to the parameters
where each dataframe in each list corresponds to fold.
"""
def __init__(
self,
inversion_obj: "Inversion",
constraints_df: pd.DataFrame,
fname: str,
regional_grav_kwargs: dict[str, typing.Any],
starting_topography: xr.Dataset | None = None,
starting_topography_kwargs: dict[str, typing.Any] | None = None,
zref_limits: tuple[float, float] | None = None,
density_contrast_limits: tuple[float, float] | None = None,
rmse_as_median: bool = False,
progressbar: bool = True,
) -> None:
self.inversion_obj = inversion_obj
self.constraints_df = constraints_df.copy()
self.fname = fname
self.regional_grav_kwargs = copy.deepcopy(regional_grav_kwargs)
self.zref_limits = zref_limits
self.density_contrast_limits = density_contrast_limits
self.starting_topography = starting_topography
self.starting_topography_kwargs = copy.deepcopy(starting_topography_kwargs)
self.rmse_as_median = rmse_as_median
self.progressbar = progressbar
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the score of the eq_sources fit
"""
if self.inversion_obj.apply_weighting_grid is True:
msg = (
"Using the weighting grid within the inversion for regularization. "
"This makes the inversion not update the topography at constraint "
"points. Since constraint points are used to determine the scores "
"within this cross validation, the scores will not be useful, giving "
"biased results. Set `apply_weighting_grid` to False to continue."
)
raise ValueError(msg)
if (self.zref_limits is None) & (self.density_contrast_limits is None):
msg = "must provide either or both zref_limits and density_contrast_limits"
raise ValueError(msg)
if self.zref_limits is not None:
self.inversion_obj.model = self.inversion_obj.model.assign_attrs(
{
"zref": trial.suggest_float(
"zref",
self.zref_limits[0],
self.zref_limits[1],
)
}
)
if self.density_contrast_limits is not None:
self.inversion_obj.model = self.inversion_obj.model.assign_attrs(
{
"density_contrast": trial.suggest_int(
"density_contrast",
self.density_contrast_limits[0],
self.density_contrast_limits[1],
)
}
)
reg_kwargs = copy.deepcopy(self.regional_grav_kwargs)
constraints_warning = (
"Using constraint point minimization technique for regional field "
"estimation. This is not recommended as the constraint points are used "
"for the density / reference level cross-validation scoring, which "
"biases the scoring. Consider using a different method for regional "
"field estimation, or separate constraints in training and testing "
"sets and provide the training set to `regional_grav_kwargs` and the "
"testing set to `constraints_df` to use for scoring."
)
logger.debug("prism model created and forward gravity calculated")
###
###
# Single optimization
###
###
if isinstance(self.constraints_df, pd.DataFrame):
logger.debug("running single optimization")
# raise warning about using constraint point minimization for regional
# estimation
if (reg_kwargs.get("method") in ["constraints", "constraints_cv"]) and (
len(reg_kwargs.get("constraints_df")) == len(self.constraints_df) # type: ignore[arg-type]
):
assert isinstance(reg_kwargs.get("constraints_df"), pd.DataFrame)
logger.warning(constraints_warning)
# create starting topography model if not provided
if self.starting_topography is None:
msg = (
"starting_topography not provided, creating a starting topography "
"model with the supplied starting_topography_kwargs"
)
logger.info(msg)
if self.starting_topography_kwargs is None:
msg = (
"must provide `starting_topography_kwargs` to be passed to the "
"function `utils.create_topography`."
)
raise ValueError(msg)
if self.starting_topography_kwargs["method"] == "flat":
msg = "using zref to create a flat starting topography model"
logger.info(msg)
self.starting_topography_kwargs["upward"] = (
self.inversion_obj.model.zref
)
# create starting topography model if not provided
if self.starting_topography is None:
msg = (
"starting_topography not provided, creating a starting topography "
"model with the supplied starting_topography_kwargs"
)
logger.info(msg)
if self.starting_topography_kwargs is None:
msg = (
"must provide `starting_topography_kwargs` to be passed to the "
"function `utils.create_topography`."
)
raise ValueError(msg)
if (
self.starting_topography_kwargs["method"] == "flat"
and self.starting_topography_kwargs.get("upward", None) is None
):
msg = "using zref to create a flat starting topography model"
logger.info(msg)
self.starting_topography_kwargs["upward"] = (
self.inversion_obj.model.zref
)
starting_topo = utils.create_topography(
**self.starting_topography_kwargs,
)
else:
if self.starting_topography_kwargs is not None:
msg = (
"starting_topography and starting_topography_kwargs provided, "
"please only provide one or the other."
)
raise ValueError(msg)
starting_topo = self.starting_topography.copy()
# update model with new topography
self.inversion_obj.model = inversion.create_model(
zref=self.inversion_obj.model.zref,
density_contrast=self.inversion_obj.model.density_contrast,
topography=starting_topo,
buffer_width=self.inversion_obj.model.buffer_width,
model_type=self.inversion_obj.model.model_type,
upper_confining_layer=self.inversion_obj.model.upper_confining_layer,
lower_confining_layer=self.inversion_obj.model.lower_confining_layer,
)
# calculate forward gravity of starting prism layer
self.inversion_obj.data.inv.forward_gravity(self.inversion_obj.model)
# calculate regional field
with utils._log_level(logging.WARN): # pylint: disable=protected-access
self.inversion_obj.data.inv.regional_separation(**reg_kwargs)
trial.set_user_attr("fname", f"{self.fname}_trial_{trial.number}")
# run cross validation
constraints_optimization_object = self.inversion_obj.constraints_score(
results_fname=trial.user_attrs.get("fname"),
constraints_df=self.constraints_df,
)
# log the termination reason
logger.debug(
"Trial %s termination reason: %s",
trial.number,
constraints_optimization_object.params.get("Termination reason"), # type: ignore[attr-defined]
)
###
###
# K-Folds optimization
###
###
else:
logger.debug("running k-folds optimization")
training_constraints = reg_kwargs.pop("constraints_df", None)
if self.starting_topography_kwargs is None:
msg = (
"must provide `starting_topography_kwargs` to be passed to the "
"function `utils.create_topography`."
)
raise ValueError(msg)
self.starting_topography_kwargs.pop("constraints_df", None)
testing_constraints = self.constraints_df
if training_constraints is None:
msg = (
"must provide training constraints dataframes for regional "
"separation"
)
raise ValueError(msg)
if isinstance(training_constraints, pd.DataFrame):
msg = (
"must provide a list of training constraints dataframes for "
"cross-validation to parameter `constraints_df` of "
"`regional_grav_kwargs`."
)
raise ValueError(msg)
assert len(training_constraints) == len(testing_constraints)
# get list of folds
folds = testing_constraints
# progressbar for folds
if self.progressbar is True:
pbar = tqdm(
folds,
desc="Regional Estimation CV folds",
)
elif self.progressbar is False:
pbar = folds
else:
msg = "progressbar must be a boolean" # type: ignore[unreachable]
raise ValueError(msg)
logger.debug("Running %s folds", len(folds))
# logger.debug(testing_constraints)
# logger.debug(training_constraints)
# for each fold, run CV
scores: list[float] = []
for i, _ in enumerate(pbar):
logger.debug(training_constraints[i])
# create starting topography model if not provided
if self.starting_topography is None:
msg = (
"starting_topography not provided, creating a starting "
"topography model with the supplied starting_topography_kwargs"
)
logger.info(msg)
if self.starting_topography_kwargs["method"] == "flat":
msg = "using zref to create a flat starting topography model"
logger.info(msg)
self.starting_topography_kwargs["upward"] = (
self.inversion_obj.model.zref
)
elif self.starting_topography_kwargs["method"] == "splines":
self.starting_topography_kwargs["constraints_df"] = (
training_constraints[i]
)
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
starting_topo = utils.create_topography(
**self.starting_topography_kwargs,
)
else:
starting_topo = self.starting_topography.copy()
# update model with new topography
self.inversion_obj.model = inversion.create_model(
zref=self.inversion_obj.model.zref,
density_contrast=self.inversion_obj.model.density_contrast,
topography=starting_topo,
buffer_width=self.inversion_obj.model.buffer_width,
model_type=self.inversion_obj.model.model_type,
upper_confining_layer=self.inversion_obj.model.upper_confining_layer,
lower_confining_layer=self.inversion_obj.model.lower_confining_layer,
)
# calculate forward gravity of starting prism layer
self.inversion_obj.data.inv.forward_gravity(self.inversion_obj.model)
# calculate regional field
with utils._log_level(logging.WARN): # pylint: disable=protected-access
self.inversion_obj.data.inv.regional_separation(
constraints_df=training_constraints[i],
**reg_kwargs,
)
trial.set_user_attr("fname", f"{self.fname}_trial_{trial.number}")
# run cross validation
constraints_optimization_object = self.inversion_obj.constraints_score(
constraints_df=testing_constraints[i],
results_fname=trial.user_attrs.get("fname"),
)
scores.append(self.inversion_obj.constraints_best_score) # type: ignore[arg-type]
# log the termination reason
logger.debug(
"Trial %s termination reason: %s",
trial.number,
constraints_optimization_object.params.get("Termination reason"), # type: ignore[attr-defined]
)
# get mean of scores of all folds
self.inversion_obj.constraints_best_score = np.mean(scores)
return self.inversion_obj.constraints_best_score # type: ignore[return-value]
def optimize_inversion_zref_density_contrast(
grav_df: pd.DataFrame, # noqa: ARG001
constraints_df: pd.DataFrame | list[pd.DataFrame], # noqa: ARG001
n_trials: int, # noqa: ARG001
n_startup_trials: int | None = None, # noqa: ARG001
starting_topography: xr.DataArray | None = None, # noqa: ARG001
zref_limits: tuple[float, float] | None = None, # noqa: ARG001
density_contrast_limits: tuple[float, float] | None = None, # noqa: ARG001
zref: float | None = None, # noqa: ARG001
density_contrast: float | None = None, # noqa: ARG001
starting_topography_kwargs: dict[str, typing.Any] | None = None, # noqa: ARG001
regional_grav_kwargs: dict[str, typing.Any] | None = None, # noqa: ARG001
score_as_median: bool = False, # noqa: ARG001
sampler: optuna.samplers.BaseSampler | None = None, # noqa: ARG001
grid_search: bool = False, # noqa: ARG001
fname: str | None = None, # noqa: ARG001
plot_scores: bool = True, # noqa: ARG001
logx: bool = False, # noqa: ARG001
logy: bool = False, # noqa: ARG001
progressbar: bool = True, # noqa: ARG001
parallel: bool = False, # noqa: ARG001
fold_progressbar: bool = True, # noqa: ARG001
seed: int = 0, # noqa: ARG001
**kwargs: typing.Any, # noqa: ARG001
) -> None:
"""
DEPRECATED: use the `Inversion` class method `optimize_inversion_zref_density_contrast` instead
"""
# pylint: disable=W0613
msg = (
"Function `optimize_inversion_zref_density_contrast` deprecated, use the `Inversion` class method "
"`optimize_inversion_zref_density_contrast` instead"
)
raise DeprecationWarning(msg)
class OptimalEqSourceParams:
"""
Objective function to use in an Optuna optimization for finding the optimal
equivalent source parameters for fitting to gravity data.
"""
def __init__(
self,
depth_limits: tuple[float, float] | None = None,
block_size_limits: tuple[float, float] | None = None,
damping_limits: tuple[float, float] | None = None,
**kwargs: typing.Any,
) -> None:
self.depth_limits = depth_limits
self.block_size_limits = block_size_limits
self.damping_limits = damping_limits
self.kwargs = kwargs
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the score of the eq_sources fit
"""
kwargs = copy.deepcopy(self.kwargs)
# get parameters provided not as limits
depth = kwargs.pop("depth", "default")
# calculate 4.5 times the mean distance between points
if depth == "default":
depth = 4.5 * np.mean(
vd.median_distance(
(kwargs.get("coordinates")[0], kwargs.get("coordinates")[1]), # type: ignore[unused-ignore, index]
k_nearest=1,
)
)
block_size = kwargs.pop("block_size", None)
damping = kwargs.pop("damping", None)
# replace with suggested values if limits provided
if self.depth_limits is not None:
depth = trial.suggest_float(
"depth",
self.depth_limits[0],
self.depth_limits[1],
)
if self.block_size_limits is not None:
block_size = trial.suggest_float(
"block_size",
self.block_size_limits[0],
self.block_size_limits[1],
)
if self.damping_limits is not None:
damping = trial.suggest_float(
"damping",
self.damping_limits[0],
self.damping_limits[1],
log=True,
)
try:
score = cross_validation.eq_sources_score(
damping=damping,
depth=depth,
block_size=block_size,
**kwargs,
)
except ValueError as e:
logger.error(e)
msg = "score could not be calculated, returning NaN"
logger.warning(msg)
score = np.nan
return score
[docs]
def optimize_eq_source_params(
coordinates: tuple[pd.Series | NDArray, pd.Series | NDArray, pd.Series | NDArray],
data: pd.Series | NDArray,
n_trials: int = 100,
damping_limits: tuple[float, float] | None = None,
depth_limits: tuple[float, float] | None = None,
block_size_limits: tuple[float, float] | None = None,
sampler: optuna.samplers.BaseSampler | None = None,
plot: bool = False,
progressbar: bool = True,
parallel: bool = False,
fname: str | None = None,
seed: int = 0,
**kwargs: typing.Any,
) -> tuple[optuna.study.Study, hm.EquivalentSources]:
"""
Use Optuna to find the optimal parameters for fitting equivalent sources to gravity
data. The 3 parameters are damping, depth, and block size. Any or all of these can
be optimized at the same time. Provide upper and lower limits for each parameter,
or if you don't want to optimize a parameter, provide a constant value of the
parameter in the kwargs.
Parameters
----------
coordinates : tuple[pandas.Series | numpy.ndarray, pandas.Series | numpy.ndarray, \
pandas.Series | numpy.ndarray]
tuple of coordinates in the order (easting, northing, upward) for the gravity
observation locations.
data : pandas.Series | numpy.ndarray
gravity data values
n_trials : int, optional
number of trials to run, by default 100
damping_limits : tuple[float, float], optional
damping parameter limits, by default (0, 10**3)
depth_limits : tuple[float, float], optional
source depth limits (positive downwards) in meters, by default (0, 10e6)
block_size_limits : tuple[float, float] | None, optional
block size limits in meters, by default None
sampler : optuna.samplers.BaseSampler | None, optional
specify which Optuna sampler to use, by default GPsampler
plot : bool, optional
plot the resulting optimization figures, by default False
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
fname : str | None, optional
file name to save the study to, by default None
seed : int, optional
random seed for the samplers, by default 0
kwargs : typing.Any
additional keyword arguments to pass to `OptimalEqSourceParams`, which are
passed to `eq_sources_score`. These can include parameters to pass to
`harmonica.EquivalentSources`; "damping", "depth", "block_size",
"parallel", and "dtype", or parameters to pass to `vd.cross_val_score`;
"delayed", or "weights".
Returns
-------
study : optuna.study.Study
the completed optuna study
eqs : harmonica.EquivalentSources
the fitted equivalent sources model
"""
optuna.logging.set_verbosity(optuna.logging.WARN)
kwargs = copy.deepcopy(kwargs)
study_fname = f"tmp_{random.randint(0, 999)}" if fname is None else fname
if parallel:
pathlib.Path(f"{study_fname}.log").unlink(missing_ok=True)
pathlib.Path(f"{study_fname}.lock").unlink(missing_ok=True)
pathlib.Path(f"{study_fname}.log.lock").unlink(missing_ok=True)
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(f"{study_fname}.log"),
)
else:
storage = None
# get number of parameters included in optimization
num_params = sum(
x is not None for x in [depth_limits, block_size_limits, damping_limits]
)
if num_params == 0:
msg = (
"No parameters to optimize, must provide at least one set of limits for "
"damping, depth, or block size."
)
raise ValueError(msg)
# define number of startup trials, whichever is bigger between 1/4 of trials, or
# 4 x the number of parameters
n_startup_trials = max(num_params * 4, int(n_trials / 4))
logger.info("using %s startup trials", n_startup_trials)
if n_startup_trials >= n_trials:
logger.warning(
"n_startup_trials is >= n_trials resulting in all trials sampled from "
"a QMC sampler instead of the GP sampler",
)
# QMC's Sobol' sequence is best with n_trials as a power of 2
# e.g. 2, 4, 8, 16, ...
# def next_power_of_2(x):
# return 1 if x == 0 else 2**(x - 1).bit_length()
# n_startup_trials = next_power_of_2(n_startup_trials)
# create study
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.QMCSampler(
seed=seed,
qmc_type="halton",
scramble=True,
),
load_if_exists=False,
study_name=study_fname,
storage=storage,
pruner=DuplicateIterationPruner,
)
# explicitly add the limits as trials
to_enqueue = []
if block_size_limits is not None:
block_size_trials = [
{"block_size": block_size_limits[0]},
{"block_size": block_size_limits[1]},
]
to_enqueue.append(block_size_trials)
if damping_limits is not None:
damping_trials = [
{"damping": damping_limits[0]},
{"damping": damping_limits[1]},
]
to_enqueue.append(damping_trials)
if depth_limits is not None:
depth_trials = [{"depth": depth_limits[0]}, {"depth": depth_limits[1]}]
to_enqueue.append(depth_trials)
# get 2 lists of lists of dicts to enqueue (2 trials)
to_enqueue = np.array(to_enqueue).transpose()
for i in to_enqueue:
# turn list of dicts into single dict
x = {k: v for d in i for k, v in d.items()}
study.enqueue_trial(x, skip_if_exists=True)
# define the objective function
objective = OptimalEqSourceParams(
depth_limits=depth_limits,
block_size_limits=block_size_limits,
damping_limits=damping_limits,
coordinates=coordinates,
data=data,
**kwargs,
)
logger.debug("starting eq_source parameter optimization")
# pylint: enable=duplicate-code
# ignore skLearn LinAlg warnings
with (utils._environ(PYTHONWARNINGS="ignore")) and (utils.DuplicateFilter(logger)): # type: ignore[no-untyped-call, truthy-bool] # pylint: disable=protected-access
# run startup trials with QMC low-discrepancy sampling
study = run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_startup_trials,
# callbacks=callbacks,
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
# continue with remaining trials with user-defined sampler
# if sampler not provided, used GPsampler as default
if sampler is None:
sampler = optuna.samplers.GPSampler(
n_startup_trials=0,
seed=seed,
deterministic_objective=True,
)
study.sampler = sampler
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = run_optuna(
study=study,
storage=storage,
objective=objective,
n_trials=n_trials - n_startup_trials,
# callbacks=callbacks,
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
# pylint: disable=duplicate-code
best_trial = study.best_trial
# warn if any best parameter values are at their limits
warn_parameter_at_limits(best_trial)
# log the results of the best trial
log_optuna_results(best_trial)
best_damping = best_trial.params.get("damping", None)
best_depth = best_trial.params.get("depth", None)
best_block_size = best_trial.params.get("block_size", None)
if best_damping is None:
try:
best_damping = kwargs["damping"]
except KeyError:
msg = (
"No damping parameter value found in best params or kwargs, setting to "
"'None'"
)
logger.warning(msg)
best_damping = None
if best_depth is None:
try:
best_depth = kwargs["depth"]
except KeyError:
msg = (
"No depth parameter value found in best params or kwargs, setting to "
"'default' (4.5 times mean distance between points)"
)
logger.warning(msg)
best_depth = "default"
if best_depth == "default":
best_depth = 4.5 * np.mean(
vd.median_distance((coordinates[0], coordinates[1]), k_nearest=1)
)
if best_block_size is None:
try:
best_block_size = kwargs["block_size"]
except KeyError:
msg = (
"No block size parameter value found in best params or kwargs, setting "
"to 'None'"
)
logger.warning(msg)
best_block_size = None
# refit EqSources with best parameters
eqs = hm.EquivalentSources(
damping=best_damping,
depth=best_depth,
block_size=best_block_size,
parallel=kwargs.pop("parallel", True),
dtype=kwargs.pop("dtype", "float64"),
)
eqs.fit(coordinates, data, weights=kwargs.pop("weights", None))
# save study
if study_fname is not None:
# remove if exists
pathlib.Path(f"{study_fname}.pickle").unlink(missing_ok=True)
# save study to pickle
with pathlib.Path(f"{study_fname}.pickle").open("wb") as f:
pickle.dump(study, f)
if plot is True:
try:
plotting.plot_optuna_figures(
study,
target_names=["score"],
plot_history=False,
plot_slice=True,
plot_importance=True,
include_duration=False,
)
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return study, eqs
class OptimizeRegionalTrend:
"""
Objective function to use in an Optuna optimization for finding the optimal trend
order for estimation the regional component of gravity misfit.
"""
def __init__(
self,
trend_limits: tuple[int, int],
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
**kwargs: typing.Any,
) -> None:
self.trend_limits = trend_limits
self.optimize_on_true_regional_misfit = optimize_on_true_regional_misfit
self.separate_metrics = separate_metrics
self.kwargs = kwargs
def __call__(self, trial: optuna.trial) -> tuple[float, float] | float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the scores
"""
trend = trial.suggest_int(
"trend",
self.trend_limits[0],
self.trend_limits[1],
)
with utils._log_level(logging.WARN): # pylint: disable=protected-access
residual_constraint_score, residual_amplitude_score, true_reg_score, _ = (
cross_validation.regional_separation_score(
method="trend",
trend=trend,
**self.kwargs,
)
)
trial.set_user_attr("true_reg_score", true_reg_score)
if self.optimize_on_true_regional_misfit is True:
trial.set_user_attr("residual constraint score", residual_constraint_score)
trial.set_user_attr("residual amplitude score", residual_amplitude_score)
return true_reg_score # type: ignore[return-value]
if self.separate_metrics is True:
return residual_constraint_score, residual_amplitude_score
# combine the two metrics into one
return residual_constraint_score / residual_amplitude_score
class OptimizeRegionalFilter:
"""
Objective function to use in an Optuna optimization for finding the optimal filter
width for estimation the regional component of gravity misfit.
"""
def __init__(
self,
filter_width_limits: tuple[float, float],
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
**kwargs: typing.Any,
) -> None:
self.filter_width_limits = filter_width_limits
self.optimize_on_true_regional_misfit = optimize_on_true_regional_misfit
self.separate_metrics = separate_metrics
self.kwargs = kwargs
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the scores
"""
filter_width = trial.suggest_float(
"filter_width",
self.filter_width_limits[0],
self.filter_width_limits[1],
)
with utils._log_level(logging.WARN): # pylint: disable=protected-access
residual_constraint_score, residual_amplitude_score, true_reg_score, _ = (
cross_validation.regional_separation_score(
method="filter",
filter_width=filter_width,
**self.kwargs,
)
)
trial.set_user_attr("true_reg_score", true_reg_score)
if self.optimize_on_true_regional_misfit is True:
trial.set_user_attr("residual constraint score", residual_constraint_score)
trial.set_user_attr("residual amplitude score", residual_amplitude_score)
return true_reg_score # type: ignore[return-value]
if self.separate_metrics is True:
return residual_constraint_score, residual_amplitude_score # type: ignore[return-value]
# combine the two metrics into one
return residual_constraint_score / residual_amplitude_score
class OptimizeRegionalEqSources:
"""
Objective function to use in an Optuna optimization for finding the optimal
equivalent source parameters for estimation the regional component of gravity
misfit.
"""
def __init__(
self,
depth_limits: tuple[float, float] | None = None,
block_size_limits: tuple[float, float] | None = None,
damping_limits: tuple[float, float] | None = None,
grav_obs_height_limits: tuple[float, float] | None = None,
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
**kwargs: typing.Any,
) -> None:
self.depth_limits = depth_limits
self.block_size_limits = block_size_limits
self.damping_limits = damping_limits
self.grav_obs_height_limits = grav_obs_height_limits
self.optimize_on_true_regional_misfit = optimize_on_true_regional_misfit
self.separate_metrics = separate_metrics
self.kwargs = kwargs
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the scores
"""
if self.depth_limits is not None:
depth = trial.suggest_float(
"depth",
self.depth_limits[0],
self.depth_limits[1],
)
else:
depth = self.kwargs.get("depth", None)
if depth is None:
msg = "must provide depth if depth_limits not provided"
raise ValueError(msg)
if self.block_size_limits is not None:
block_size = trial.suggest_float(
"block_size",
self.block_size_limits[0],
self.block_size_limits[1],
)
else:
block_size = self.kwargs.get("block_size", None)
if self.damping_limits is not None:
damping = trial.suggest_float(
"damping",
self.damping_limits[0],
self.damping_limits[1],
log=True,
)
else:
damping = self.kwargs.get("damping", None)
if self.grav_obs_height_limits is not None:
grav_obs_height = trial.suggest_float(
"grav_obs_height",
self.grav_obs_height_limits[0],
self.grav_obs_height_limits[1],
)
else:
grav_obs_height = self.kwargs.get("grav_obs_height", None)
new_kwargs = {
key: value
for key, value in self.kwargs.items()
if key not in ["depth", "block_size", "damping", "grav_obs_height"]
}
with utils._log_level(logging.WARN): # pylint: disable=protected-access
residual_constraint_score, residual_amplitude_score, true_reg_score, _ = (
cross_validation.regional_separation_score(
method="eq_sources",
depth=depth,
block_size=block_size,
damping=damping,
grav_obs_height=grav_obs_height,
**new_kwargs,
)
)
trial.set_user_attr("true_reg_score", true_reg_score)
if self.optimize_on_true_regional_misfit is True:
trial.set_user_attr("residual constraint score", residual_constraint_score)
trial.set_user_attr("residual amplitude score", residual_amplitude_score)
return true_reg_score # type: ignore[return-value]
if self.separate_metrics is True:
return residual_constraint_score, residual_amplitude_score # type: ignore[return-value]
# combine the two metrics into one
return residual_constraint_score / residual_amplitude_score
class OptimizeRegionalConstraintsPointMinimization:
"""
Objective function to use in an Optuna optimization for finding the optimal
hyperparameter values the Constraint Point Minimization technique for estimation the
regional component of gravity misfit. If single dataframes are supplied to
`training_df` and `testing_df`, for each parameter value a regional field will be
estimated using the `training_df`, and a score calculated used the `testing_df`. If
lists of dataframes are supplied, a score will be calculated for each item in the
list and the mean of the scores will be the metric returned. This class is used with
the function `optimize_regional_constraint_point_minimization`.
"""
def __init__(
self,
training_df: pd.DataFrame | list[pd.DataFrame],
testing_df: pd.DataFrame | list[pd.DataFrame],
grid_method: str,
# for tensioned minimum curvature gridding
tension_factor_limits: tuple[float, float] = (0, 1),
# for bi-harmonic spline gridding
spline_damping_limits: tuple[float, float] | None = None,
# for eq source gridding
depth_limits: tuple[float, float] | None = None,
block_size_limits: tuple[float, float] | None = None,
damping_limits: tuple[float, float] | None = None,
grav_obs_height_limits: tuple[float, float] | None = None,
# other args
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
progressbar: bool = False,
**kwargs: typing.Any,
) -> None:
self.training_df = training_df
self.testing_df = testing_df
self.grid_method = grid_method
self.tension_factor_limits = tension_factor_limits
self.spline_damping_limits = spline_damping_limits
self.depth_limits = depth_limits
self.block_size_limits = block_size_limits
self.damping_limits = damping_limits
self.grav_obs_height_limits = grav_obs_height_limits
self.optimize_on_true_regional_misfit = optimize_on_true_regional_misfit
self.separate_metrics = separate_metrics
self.progressbar = progressbar
self.kwargs = kwargs
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the scores
"""
new_kwargs = copy.deepcopy(self.kwargs)
if self.grid_method == "pygmt":
new_kwargs["tension_factor"] = trial.suggest_float(
"tension_factor",
self.tension_factor_limits[0],
self.tension_factor_limits[1],
)
elif self.grid_method == "verde":
if self.spline_damping_limits is None:
msg = "if grid_method is 'verde' must provide spline_damping_limits"
raise ValueError(msg)
new_kwargs["spline_dampings"] = trial.suggest_float(
"spline_dampings",
self.spline_damping_limits[0],
self.spline_damping_limits[1],
log=True,
)
elif self.grid_method == "eq_sources":
if self.block_size_limits is not None:
new_kwargs["block_size"] = trial.suggest_float(
"block_size",
self.block_size_limits[0],
self.block_size_limits[1],
)
else:
new_kwargs["block_size"] = self.kwargs.get("block_size", None)
if self.damping_limits is not None:
new_kwargs["damping"] = trial.suggest_float(
"damping",
self.damping_limits[0],
self.damping_limits[1],
log=True,
)
else:
new_kwargs["damping"] = self.kwargs.get("damping", None)
if self.grav_obs_height_limits is not None:
new_kwargs["grav_obs_height"] = trial.suggest_float(
"grav_obs_height",
self.grav_obs_height_limits[0],
self.grav_obs_height_limits[1],
)
else:
new_kwargs["grav_obs_height"] = self.kwargs.get("grav_obs_height", None)
else:
msg = "invalid gridding method"
raise ValueError(msg)
if isinstance(self.training_df, pd.DataFrame):
if self.grid_method == "eq_sources":
if self.depth_limits is not None:
new_kwargs["depth"] = trial.suggest_float(
"depth",
self.depth_limits[0],
self.depth_limits[1],
)
else:
eq_depth = self.kwargs.get("depth", "default")
if eq_depth == "default":
# calculate 4.5 times the mean distance between points
eq_depth = 4.5 * np.mean(
vd.median_distance(
(self.training_df.easting, self.training_df.northing),
k_nearest=1,
)
)
new_kwargs["depth"] = eq_depth
with utils._log_level(logging.WARN): # pylint: disable=protected-access
(
residual_constraint_score,
residual_amplitude_score,
true_reg_score,
_df,
) = cross_validation.regional_separation_score(
constraints_df=self.training_df,
testing_df=self.testing_df,
method="constraints",
grid_method=self.grid_method,
**new_kwargs,
)
else:
# get list of folds
folds = [
list(df.columns[df.columns.str.startswith("fold_")])[0] # noqa: RUF015
for df in self.testing_df
]
# progressbar for folds
if self.progressbar is True:
pbar = tqdm(
folds,
desc="Cross-validation folds",
)
elif self.progressbar is False:
pbar = folds
else:
msg = "progressbar must be a boolean" # type: ignore[unreachable]
raise ValueError(msg)
with utils._log_level(logging.WARN): # pylint: disable=protected-access
# for each fold, run CV
results = []
for i, _ in enumerate(pbar):
if self.grid_method == "eq_sources":
if self.depth_limits is not None:
new_kwargs["depth"] = trial.suggest_float(
"depth",
self.depth_limits[0],
self.depth_limits[1],
)
else:
eq_depth = self.kwargs.get("depth", "default")
if eq_depth == "default":
# calculate 4.5 times the mean distance between points
eq_depth = 4.5 * np.mean(
vd.median_distance(
(
self.training_df[i].easting,
self.training_df[i].northing,
),
k_nearest=1,
)
)
new_kwargs["depth"] = eq_depth
fold_results = cross_validation.regional_separation_score(
constraints_df=self.training_df[i],
testing_df=self.testing_df[i],
method="constraints",
grid_method=self.grid_method,
**new_kwargs,
)
results.append(fold_results)
# get mean of scores of all folds
residual_constraint_score = np.mean([r[0] for r in results])
residual_amplitude_score = np.mean([r[1] for r in results])
try:
true_reg_score = np.mean([r[2] for r in results])
except TypeError:
true_reg_score = None
logger.debug("separate_metrics: %s", self.separate_metrics)
logger.debug(
"optimize_on_true_regional_misfit: %s",
self.optimize_on_true_regional_misfit,
)
trial.set_user_attr("true_reg_score", true_reg_score)
if self.optimize_on_true_regional_misfit is True:
trial.set_user_attr("residual constraint score", residual_constraint_score)
trial.set_user_attr("residual amplitude score", residual_amplitude_score)
return true_reg_score # type: ignore[return-value]
if self.separate_metrics is True:
return residual_constraint_score, residual_amplitude_score # type: ignore[return-value]
# combine the two metrics into one
return residual_constraint_score / residual_amplitude_score
[docs]
def optimize_regional_filter(
testing_df: pd.DataFrame,
grav_ds: xr.Dataset,
filter_width_limits: tuple[float, float],
score_as_median: bool = False,
true_regional: xr.DataArray | None = None,
n_trials: int = 100,
sampler: optuna.samplers.BaseSampler | None = None,
plot: bool = False,
plot_grid: bool = False,
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
progressbar: bool = True,
parallel: bool = False,
fname: str | None = None,
seed: int = 0,
) -> tuple[optuna.study.Study, xr.Dataset, optuna.trial.FrozenTrial]:
"""
Run an Optuna optimization to find the optimal filter width for estimating the
regional component of gravity misfit. For synthetic testing, if the true regional
grid is provided, the optimization can be set to optimize on the RMSE of the
predicted and true regional gravity, by setting
`optimize_on_true_regional_misfit=True`. By default this will perform a
multi-objective optimization to find the best trade-off between the lowest RMSE of
the residual at the constraints and the highest RMSE of the residual at all
locations.
Parameters
----------
testing_df : pandas.DataFrame
constraint points to use for calculating the score with columns "easting",
"northing" and "upward".
grav_ds : xarray.Dataset
gravity dataset with coordinates "easting", "northing", and variables "reg" and
`gravity_anomaly`.
filter_width_limits : tuple[float, float]
limits to use for the filter width in meters.
score_as_median : bool, optional
use the root median square instead of the root mean square for the scoring
metric, by default False
true_regional : xarray.DataArray | None, optional
if the true regional gravity is known (in synthetic models), supply this as a
grid to include a user_attr of the RMSE between this and the estimated regional
for each trial, or set `optimize_on_true_regional_misfit=True` to have the
optimization optimize on the RMSE, by default None
n_trials : int, optional
number of trials to run, by default 100
sampler : optuna.samplers.BaseSampler | None, optional
customize the optuna sampler, by default TPE sampler
plot : bool, optional
plot the resulting optimization figures, by default False
plot_grid : bool, optional
plot the resulting regional gravity grid, by default False
optimize_on_true_regional_misfit : bool, optional
if true_regional grid is provide, choose to perform optimization on the RMSE
between the true regional and the estimated region, by default False
separate_metrics : bool, optional
if False, returns the scores combined with the formula
residual_constraints_score / residual_amplitude_score, by default is True and
returns both the residual and regional scores separately.
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
fname : str | None, optional
file name to save the study to, by default None
seed : int, optional
random seed for the samplers, by default 0
Returns
-------
study : optuna.study.Study,
the completed Optuna study
resulting_grav_ds : xarray.Dataset
the resulting gravity dataset of the best trial
best_trial : optuna.trial.FrozenTrial
the best trial
"""
if isinstance(grav_ds, xr.Dataset) is False:
msg = "Function `optimize_regional_filter` has been changed, data must be provided to parameter `grav_ds` as an xarray dataset initialized through function `create_data`"
raise DeprecationWarning(msg)
optuna.logging.set_verbosity(optuna.logging.WARN)
# if sampler not provided, use TPE as default
if sampler is None:
sampler = optuna.samplers.TPESampler(
n_startup_trials=int(n_trials / 4),
seed=seed,
)
results_fname = f"tmp_{random.randint(0, 999)}" if fname is None else fname
# create study and set directions / metric names depending on optimization type
study, storage = _create_regional_separation_study(
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
sampler=sampler,
true_regional=true_regional,
fname=results_fname,
)
# run optimization
study = run_optuna(
study=study,
storage=storage,
objective=OptimizeRegionalFilter(
filter_width_limits=filter_width_limits,
testing_df=testing_df,
grav_ds=grav_ds,
true_regional=true_regional,
score_as_median=score_as_median,
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
),
n_trials=n_trials,
progressbar=progressbar,
parallel=parallel,
)
if study._is_multi_objective() is False: # pylint: disable=protected-access
best_trial = study.best_trial
else:
best_trial = min(study.best_trials, key=lambda t: t.values[0]) # noqa: PD011
# best_trial = max(study.best_trials, key=lambda t: t.values[1])
logger.info("Number of trials on the Pareto front: %s", len(study.best_trials))
# warn if any best parameter values are at their limits
warn_parameter_at_limits(best_trial)
# log the results of the best trial
log_optuna_results(best_trial)
# redo the regional separation with ALL constraint points
grav_ds.inv.regional_separation(
method="filter",
filter_width=best_trial.params["filter_width"],
)
if plot is True:
try:
if study._is_multi_objective() is False: # pylint: disable=protected-access
if optimize_on_true_regional_misfit is True:
plotting.plot_optimization_combined_slice(
study,
attribute_names=[
"residual constraint score",
"residual amplitude score",
],
).show()
else:
optuna.visualization.plot_slice(study).show()
else:
optuna.visualization.plot_pareto_front(study).show()
for i, j in enumerate(study.metric_names):
optuna.visualization.plot_slice(
study,
target=lambda t: t.values[i], # noqa: B023 PD011 # pylint: disable=cell-var-from-loop
target_name=j,
).show()
if plot_grid is True:
grav_ds.reg.plot()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return study, grav_ds, best_trial
[docs]
def optimize_regional_trend(
testing_df: pd.DataFrame,
grav_ds: xr.Dataset,
trend_limits: tuple[int, int],
score_as_median: bool = False,
true_regional: xr.DataArray | None = None,
sampler: optuna.samplers.BaseSampler | None = None,
plot: bool = False,
plot_grid: bool = False,
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
progressbar: bool = True,
parallel: bool = False,
fname: str | None = None,
seed: int = 0,
) -> tuple[optuna.study.Study, xr.Dataset, optuna.trial.FrozenTrial]:
"""
Run an Optuna optimization to find the optimal trend order for estimating the
regional component of gravity misfit. For synthetic testing, if the true regional
grid is provided, the optimization can be set to optimize on the RMSE of the
predicted and true regional gravity, by setting
`optimize_on_true_regional_misfit=True`. By default this will perform a
multi-objective optimization to find the best trade-off between the lowest RMSE of
the residual at the constraints and the highest RMSE of the residual at all
locations.
Parameters
----------
testing_df : pandas.DataFrame
constraint points to use for calculating the score with columns "easting",
"northing" and "upward".
grav_ds : xarray.Dataset
gravity dataset with coordinates "easting", "northing", and variables "reg" and
`gravity_anomaly`.
trend_limits : tuple[int, int]
limits to use for the trend order in degrees.
score_as_median : bool, optional
use the root median square instead of the root mean square for the scoring
metric, by default False
true_regional : xarray.DataArray | None, optional
if the true regional gravity is known (in synthetic models), supply this as a
grid to include a user_attr of the RMSE between this and the estimated regional
for each trial, or set `optimize_on_true_regional_misfit=True` to have the
optimization optimize on the RMSE, by default None
sampler : optuna.samplers.BaseSampler | None, optional
customize the optuna sampler, by default GridSampler
plot : bool, optional
plot the resulting optimization figures, by default False
plot_grid : bool, optional
plot the resulting regional gravity grid, by default False
optimize_on_true_regional_misfit : bool, optional
if true_regional grid is provide, choose to perform optimization on the RMSE
between the true regional and the estimated region, by default False
separate_metrics : bool, optional
if False, returns the scores combined with the formula
residual_constraints_score / residual_amplitude_score, by default is True and
returns both the residual and regional scores separately.
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
fname : str | None, optional
file name to save the study to, by default None
seed : int, optional
random seed for the samplers, by default 0
Returns
-------
study : optuna.study.Study,
the completed Optuna study
resulting_grav_ds : xarray.Dataset
the resulting gravity dataset of the best trial
best_trial : optuna.trial.FrozenTrial
the best trial
"""
if isinstance(grav_ds, xr.Dataset) is False:
msg = "Function `optimize_regional_trend` has been changed, data must be provided to parameter `grav_ds` as an xarray dataset initialized through function `create_data`"
raise DeprecationWarning(msg)
optuna.logging.set_verbosity(optuna.logging.WARN)
optuna.logging.set_verbosity(optuna.logging.WARN)
# if sampler not provided, use GridSampler as default
if sampler is None:
sampler = optuna.samplers.GridSampler(
search_space={"trend": list(range(trend_limits[0], trend_limits[1] + 1))},
seed=seed,
)
results_fname = f"tmp_{random.randint(0, 999)}" if fname is None else fname
# create study and set directions / metric names depending on optimization type
study, storage = _create_regional_separation_study(
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
sampler=sampler,
true_regional=true_regional,
fname=results_fname,
)
# run optimization
study = run_optuna(
study=study,
storage=storage,
objective=OptimizeRegionalTrend( # type: ignore[arg-type]
trend_limits=trend_limits,
testing_df=testing_df,
grav_ds=grav_ds,
true_regional=true_regional,
score_as_median=score_as_median,
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
),
n_trials=len(list(range(trend_limits[0], trend_limits[1] + 1))),
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
if study._is_multi_objective() is False: # pylint: disable=protected-access
best_trial = study.best_trial
else:
best_trial = min(study.best_trials, key=lambda t: t.values[0]) # noqa: PD011
# best_trial = max(study.best_trials, key=lambda t: t.values[1])
logger.info("Number of trials on the Pareto front: %s", len(study.best_trials))
# warn if any best parameter values are at their limits
warn_parameter_at_limits(best_trial)
# log the results of the best trial
log_optuna_results(best_trial)
# redo the regional separation with ALL constraint points
grav_ds.inv.regional_separation(
method="trend",
trend=best_trial.params["trend"],
)
if plot is True:
try:
if study._is_multi_objective() is False: # pylint: disable=protected-access
if optimize_on_true_regional_misfit is True:
plotting.plot_optimization_combined_slice(
study,
attribute_names=[
"residual constraint score",
"residual amplitude score",
],
).show()
else:
optuna.visualization.plot_slice(study).show()
else:
optuna.visualization.plot_pareto_front(study).show()
for i, j in enumerate(study.metric_names):
optuna.visualization.plot_slice(
study,
target=lambda t: t.values[i], # noqa: B023 PD011 # pylint: disable=cell-var-from-loop
target_name=j,
).show()
if plot_grid is True:
grav_ds.reg.plot()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return study, grav_ds, best_trial
[docs]
def optimize_regional_eq_sources(
testing_df: pd.DataFrame,
grav_ds: xr.Dataset,
score_as_median: bool = False,
true_regional: xr.DataArray | None = None,
n_trials: int = 100,
depth_limits: tuple[float, float] | None = None,
block_size_limits: tuple[float, float] | None = None,
damping_limits: tuple[float, float] | None = None,
grav_obs_height_limits: tuple[float, float] | None = None,
sampler: optuna.samplers.BaseSampler | None = None,
plot: bool = False,
plot_grid: bool = False,
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
progressbar: bool = True,
parallel: bool = False,
fname: str | None = None,
seed: int = 0,
**kwargs: typing.Any,
) -> tuple[optuna.study.Study, xr.Dataset, optuna.trial.FrozenTrial]:
"""
Run an Optuna optimization to find the optimal equivalent source parameters for
estimating the regional component of gravity misfit. For synthetic testing, if the
true regional grid is provided, the optimization can be set to optimize on the
RMSE of the predicted and true regional gravity, by setting
`optimize_on_true_regional_misfit=True`. By default this will perform a
multi-objective optimization to find the best trade-off between the lowest RMSE of
the residual at the constraints and the highest RMSE of the residual at all
locations.
Parameters
----------
testing_df : pandas.DataFrame
constraint points to use for calculating the score with columns "easting",
"northing" and "upward".
grav_ds : xarray.Dataset
gravity dataset with coordinates "easting", "northing", and variables "reg" and
`gravity_anomaly`.
score_as_median : bool, optional
use the root median square instead of the root mean square for the scoring
metric, by default False
true_regional : xarray.DataArray | None, optional
if the true regional gravity is known (in synthetic models), supply this as a
grid to include a user_attr of the RMSE between this and the estimated regional
for each trial, or set `optimize_on_true_regional_misfit=True` to have the
optimization optimize on the RMSE, by default None
n_trials : int, optional
number of trials to run, by default 100
depth_limits : tuple[float, float] | None, optional
limits to use for source depths, positive down in meters, by default None
block_size_limits : tuple[float, float] | None, optional
limits to use for block size in meters, by default None
damping_limits : tuple[float, float] | None, optional
limits to use for the damping parameter, by default None
grav_obs_height_limits : tuple[float, float] | None, optional
limits to use for the gravity observation height in meters, by default None
sampler : optuna.samplers.BaseSampler | None, optional
customize the optuna sampler, by default TPE sampler
plot : bool, optional
plot the resulting optimization figures, by default False
plot_grid : bool, optional
plot the resulting regional gravity grid, by default False
optimize_on_true_regional_misfit : bool, optional
if true_regional grid is provide, choose to perform optimization on the RMSE
between the true regional and the estimated region, by default False
separate_metrics : bool, optional
if False, returns the scores combined with the formula
residual_constraints_score / residual_amplitude_score, by default is True and
returns both the residual and regional scores separately.
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
fname : str | None, optional
file name to save the study to, by default None
seed : int, optional
random seed for the samplers, by default 0
kwargs : typing.Any
additional keyword arguments to pass to the :meth:`DatasetAccessorInvert4Geom.regional_separation`
Returns
-------
study : optuna.study.Study
the completed Optuna study
resulting_grav_ds : xarray.Dataset
the resulting gravity dataset of the best trial
best_trial : optuna.trial.FrozenTrial
the best trial
"""
if isinstance(grav_ds, xr.Dataset) is False:
msg = "Function `optimize_regional_eq_sources` has been changed, data must be provided to parameter `grav_ds` as an xarray dataset initialized through function `create_data`"
raise DeprecationWarning(msg)
optuna.logging.set_verbosity(optuna.logging.WARN)
optuna.logging.set_verbosity(optuna.logging.WARN)
kwargs = copy.deepcopy(kwargs)
# if sampler not provided, use TPE as default
if sampler is None:
sampler = optuna.samplers.TPESampler(
n_startup_trials=int(n_trials / 4),
seed=seed,
)
results_fname = f"tmp_{random.randint(0, 999)}" if fname is None else fname
# create study and set directions / metric names depending on optimization type
study, storage = _create_regional_separation_study(
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
sampler=sampler,
true_regional=true_regional,
fname=results_fname,
)
# run optimization
study = run_optuna(
study=study,
storage=storage,
objective=OptimizeRegionalEqSources(
depth_limits=depth_limits,
block_size_limits=block_size_limits,
damping_limits=damping_limits,
grav_obs_height_limits=grav_obs_height_limits,
testing_df=testing_df,
grav_ds=grav_ds,
true_regional=true_regional,
score_as_median=score_as_median,
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
**kwargs,
),
n_trials=n_trials,
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
if study._is_multi_objective() is False: # pylint: disable=protected-access
best_trial = study.best_trial
else:
best_trial = min(study.best_trials, key=lambda t: t.values[0]) # noqa: PD011
# best_trial = max(study.best_trials, key=lambda t: t.values[1])
logger.info("Number of trials on the Pareto front: %s", len(study.best_trials))
# warn if any best parameter values are at their limits
warn_parameter_at_limits(best_trial)
# log the results of the best trial
log_optuna_results(best_trial)
# get optimal hyperparameter values
depth = best_trial.params.get("depth", kwargs.pop("depth", "default"))
if depth == "default":
# calculate 4.5 times the mean distance between points
depth = 4.5 * np.mean(
vd.median_distance((grav_ds.easting, grav_ds.northing), k_nearest=1)
)
damping = best_trial.params.get("damping", kwargs.pop("damping", None))
block_size = best_trial.params.get("block_size", kwargs.pop("block_size", None))
grav_obs_height = best_trial.params.get(
"grav_obs_height",
kwargs.pop("grav_obs_height", None),
)
# redo the regional separation with best parameters
grav_ds.inv.regional_separation(
method="eq_sources",
depth=depth,
damping=damping,
block_size=block_size,
grav_obs_height=grav_obs_height,
**kwargs,
)
if plot is True:
try:
if study._is_multi_objective() is False: # pylint: disable=protected-access
if optimize_on_true_regional_misfit is True:
for p in best_trial.params:
plotting.plot_optimization_combined_slice(
study,
attribute_names=[
"residual constraint score",
"residual amplitude score",
],
parameter_name=[p], # type: ignore[arg-type]
).show()
else:
optuna.visualization.plot_slice(study).show()
else:
optuna.visualization.plot_pareto_front(study).show()
for i, j in enumerate(study.metric_names):
optuna.visualization.plot_slice(
study,
target=lambda t: t.values[i], # noqa: B023 PD011 # pylint: disable=cell-var-from-loop
target_name=j,
).show()
if plot_grid is True:
grav_ds.reg.plot()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return study, grav_ds, best_trial
[docs]
def optimize_regional_constraint_point_minimization(
testing_training_df: pd.DataFrame,
grid_method: str,
grav_ds: xr.Dataset,
n_trials: int,
tension_factor_limits: tuple[float, float] = (0, 1),
spline_damping_limits: tuple[float, float] | None = None,
depth_limits: tuple[float, float] | None = None,
block_size_limits: tuple[float, float] | None = None,
damping_limits: tuple[float, float] | None = None,
grav_obs_height_limits: tuple[float, float] | None = None,
sampler: optuna.samplers.BaseSampler | None = None,
plot: bool = False,
plot_grid: bool = False,
fold_progressbar: bool = False,
optimize_on_true_regional_misfit: bool = False,
separate_metrics: bool = True,
score_as_median: bool = False,
true_regional: xr.DataArray | None = None,
progressbar: bool = True,
parallel: bool = False,
fname: str | None = None,
seed: int = 0,
**kwargs: typing.Any,
) -> tuple[optuna.study.Study, xr.Dataset, optuna.trial.FrozenTrial]:
"""
Run an Optuna optimization to find the optimal hyperparameters for the Constraint
Point Minimization technique for estimating the regional component of gravity
misfit. Since constraints are used both for determining the regional field, and for
the scoring of the performance, we must split the constraints into testing and
training sets. This function can perform both single and K-Folds cross validations,
determined by the number of "fold_x" columns in testing_training_df. If using more
than one fold, the score for each parameter set is the mean of the scores of each
fold. The total number of regional separation this will perform is n_trials*K-folds.
This function then uses the optimal parameter values to redo the regional
estimation using all the constraints points, not just the training points, and
returns the results.
By default this will perform a multi-objective optimization to
find the best trade-off between the lowest RMSE of the residual misfit at the
constraints and the highest RMS amplitude of the residual at all locations.
Choose which Constraint Point Minimization gridding method with the `grid_method`
parameter, and supplied the associated method parameter limits via parameters
<parameter>_limits. For grid method "eq_sources" which has multiple parameters, if
limits aren't provided for one of the parameters, supply a constant value for the
parameter in the keyword arguments, which are past direction to
:meth:`DatasetAccessorInvert4Geom.regional_separation`. For synthetic testing, if the true regional grid is
provided, the optimization can be set to optimize on the RMSE of the predicted and
true regional gravity, by setting `optimize_on_true_regional_misfit=True`.
Parameters
----------
testing_training_df : pandas.DataFrame
constraints dataframe with columns "easting", "northing", "upward", and a column
for each fold in the format "fold_0", "fold_1", etc. This can be created with
function `cross_validation.split_test_train()`. Each fold column should have
strings of "test" or "train" to indicate which rows are testing or training
points. If more than one fold is provided, this function will perform a K-Folds
cross validation and the score for each set of parameters will be the mean of
the K-scores.
grid_method : str
constraint point minimization method to use, choose between "verde" for
bi-harmonic spline gridding, "pygmt" for tensioned minimum curvature gridding,
or "eq_sources" for equivalent sources gridding.
grav_ds : xarray.Dataset
gravity dataset with coordinates "easting", "northing", and variables "reg" and
`gravity_anomaly`.
n_trials : int
number of trials to run
tension_factor_limits : tuple[float, float], optional
limits to use for the PyGMT tension factor gridding, by default (0, 1)
spline_damping_limits : tuple[float, float] | None, optional
limits to use for the Verde bi-harmonic spline damping, by default None
depth_limits : tuple[float, float] | None, optional
limits to use for the equivalent sources' depths, by default None
block_size_limits : tuple[float, float] | None, optional
limits to use for the block size for fitting equivalent sources, by default None
damping_limits : tuple[float, float] | None, optional
limits to use for the damping value for fitting equivalent sources, by default
None
grav_obs_height_limits : tuple[float, float] | None, optional
limits to use for the gravity observation height for fitting equivalent sources,
by default None
sampler : optuna.samplers.BaseSampler | None, optional
customize the optuna sampler, by default TPE sampler
plot : bool, optional
plot the resulting optimization figures, by default False
plot_grid : bool, optional
plot the resulting regional gravity grid, by default False
fold_progressbar : bool, optional
turn on or off a progress bar for the optimization of each fold if performing
a K-Folds cross-validation within the optimization, by default False
optimize_on_true_regional_misfit : bool, optional
if true_regional grid is provide, choose to perform optimization on the RMSE
between the true regional and the estimated region, by default False
separate_metrics : bool, optional
if False, returns the scores combined with the formula
residual_constraints_score / residual_amplitude_score, by default is True and
returns both the residual and regional scores separately.
score_as_median : bool, optional
use the root median square instead of the root mean square for the scoring
metric, by default False
true_regional : xarray.DataArray | None, optional
if the true regional gravity is known (in synthetic models), supply this as a
grid to include a user_attr of the RMSE between this and the estimated regional
for each trial, or set `optimize_on_true_regional_misfit=True` to have the
optimization optimize on the RMSE, by default None
progressbar : bool, optional
add a progressbar, by default True
parallel : bool, optional
run the optimization in parallel, by default False
fname : str | None, optional
file name to save the study to, by default None
seed : int, optional
random seed for the samplers, by default 0
kwargs : typing.Any
additional keyword arguments to pass to the :meth:`DatasetAccessorInvert4Geom.regional_separation`
Returns
-------
study : optuna.study.Study,
the completed Optuna study
resulting_grav_ds : xarray.Dataset
the resulting gravity dataset of the best trial
best_trial : optuna.trial.FrozenTrial
the best trial
"""
if isinstance(grav_ds, xr.Dataset) is False:
msg = "Function `optimize_regional_constraint_point_minimization` has been changed, data must be provided to parameter `grav_ds` as an xarray dataset initialized through function `create_data`"
raise DeprecationWarning(msg)
optuna.logging.set_verbosity(optuna.logging.WARN)
kwargs = copy.deepcopy(kwargs)
# if sampler not provided, use TPE as default
if sampler is None:
sampler = optuna.samplers.TPESampler(
n_startup_trials=int(n_trials / 4),
seed=seed,
)
results_fname = f"tmp_{random.randint(0, 999)}" if fname is None else fname
# create study and set directions / metric names depending on optimization type
study, storage = _create_regional_separation_study(
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
sampler=sampler,
true_regional=true_regional,
parallel=parallel,
fname=results_fname,
)
# get folds from constraints_df
test_dfs, train_dfs = cross_validation.kfold_df_to_lists(testing_training_df)
assert len(test_dfs) == len(train_dfs)
logger.info("Number of folds: %s", len(test_dfs))
# combine testing and training to get a full constraints dataframe
constraints_df = (
pd.concat(test_dfs + train_dfs)
.drop_duplicates(subset=["easting", "northing", "upward"])
.sort_index()
)
if len(test_dfs) == 1:
test_dfs = test_dfs[0]
train_dfs = train_dfs[0]
logger.debug("separate_metrics: %s", separate_metrics)
logger.debug(
"optimize_on_true_regional_misfit: %s", optimize_on_true_regional_misfit
)
# enqueue limits as trials
if grid_method == "pygmt":
study.enqueue_trial(
{"tension_factor": tension_factor_limits[0]}, skip_if_exists=True
)
study.enqueue_trial(
{"tension_factor": tension_factor_limits[1]}, skip_if_exists=True
)
elif grid_method == "verde":
study.enqueue_trial(
{"spline_dampings": spline_damping_limits[0]}, # type: ignore[index]
skip_if_exists=True,
)
study.enqueue_trial(
{"spline_dampings": spline_damping_limits[1]}, # type: ignore[index]
skip_if_exists=True,
)
elif grid_method == "eq_sources":
if depth_limits is not None:
study.enqueue_trial({"depth": depth_limits[0]}, skip_if_exists=True)
study.enqueue_trial({"depth": depth_limits[1]}, skip_if_exists=True)
if block_size_limits is not None:
study.enqueue_trial(
{"block_size": block_size_limits[0]}, skip_if_exists=True
)
study.enqueue_trial(
{"block_size": block_size_limits[1]}, skip_if_exists=True
)
if damping_limits is not None:
study.enqueue_trial({"damping": damping_limits[0]}, skip_if_exists=True)
study.enqueue_trial({"damping": damping_limits[1]}, skip_if_exists=True)
if grav_obs_height_limits is not None:
study.enqueue_trial(
{"grav_obs_height": grav_obs_height_limits[0]}, skip_if_exists=True
)
study.enqueue_trial(
{"grav_obs_height": grav_obs_height_limits[1]}, skip_if_exists=True
)
# run optimization
study = run_optuna(
study=study,
storage=storage,
objective=OptimizeRegionalConstraintsPointMinimization(
training_df=train_dfs,
testing_df=test_dfs,
# kwargs for regional_constraints:
grav_ds=grav_ds,
grid_method=grid_method,
tension_factor_limits=tension_factor_limits,
spline_damping_limits=spline_damping_limits,
depth_limits=depth_limits,
block_size_limits=block_size_limits,
damping_limits=damping_limits,
grav_obs_height_limits=grav_obs_height_limits,
# optimization kwargs
true_regional=true_regional,
score_as_median=score_as_median,
optimize_on_true_regional_misfit=optimize_on_true_regional_misfit,
separate_metrics=separate_metrics,
progressbar=fold_progressbar,
**kwargs,
),
n_trials=n_trials,
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
if study._is_multi_objective() is False: # pylint: disable=protected-access
best_trial = study.best_trial
else:
best_trial = min(study.best_trials, key=lambda t: t.values[0]) # noqa: PD011
# best_trial = max(study.best_trials, key=lambda t: t.values[1])
logger.info("Number of trials on the Pareto front: %s", len(study.best_trials))
# warn if any best parameter values are at their limits
warn_parameter_at_limits(best_trial)
# log the results of the best trial
log_optuna_results(best_trial)
logger.info(
"re-running regional separation with best parameters and all constraints"
)
# get optimal hyperparameter values
# if not included in optimization, get from kwargs
tension_factor = best_trial.params.get("tension_factor", 1)
spline_dampings = best_trial.params.get("spline_dampings", None)
depth = best_trial.params.get("depth", kwargs.pop("depth", "default"))
damping = best_trial.params.get("damping", kwargs.pop("damping", None))
block_size = best_trial.params.get("block_size", kwargs.pop("block_size", None))
grav_obs_height = best_trial.params.get(
"grav_obs_height", kwargs.pop("grav_obs_height", None)
)
# redo the regional separation with ALL constraint points
grav_ds.inv.regional_separation(
method="constraints",
constraints_df=constraints_df,
grid_method=grid_method,
tension_factor=tension_factor,
spline_dampings=spline_dampings,
depth=depth,
damping=damping,
block_size=block_size,
grav_obs_height=grav_obs_height,
**kwargs,
)
# save study
if results_fname is not None:
# remove if exists
pathlib.Path(f"{results_fname}_study.pickle").unlink(missing_ok=True)
# save study to pickle
with pathlib.Path(f"{results_fname}_study.pickle").open("wb") as f:
pickle.dump(study, f)
if plot is True:
try:
if study._is_multi_objective() is False: # pylint: disable=protected-access
if optimize_on_true_regional_misfit is True:
for p in best_trial.params:
plotting.plot_optimization_combined_slice(
study,
attribute_names=[
"residual constraint score",
"residual amplitude score",
],
parameter_name=[p], # type: ignore[arg-type]
).show()
else:
optuna.visualization.plot_slice(study).show()
else:
optuna.visualization.plot_pareto_front(study).show()
for i, j in enumerate(study.metric_names):
optuna.visualization.plot_slice(
study,
target=lambda t: t.values[i], # noqa: B023 PD011 # pylint: disable=cell-var-from-loop
target_name=j,
).show()
if plot_grid is True:
grav_ds.reg.plot()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return study, grav_ds, best_trial
[docs]
def optimal_buffer(
target: float,
buffer_perc_limits: tuple[float, float] = (1, 50),
n_trials: int = 25,
sampler: optuna.samplers.BaseSampler | None = None,
grid_search: bool = False,
fname: str | None = None,
progressbar: bool = True,
parallel: bool = False,
plot: bool = True,
seed: int = 0,
**kwargs: typing.Any,
) -> tuple[optuna.study.Study, tuple[float, float, int, xr.Dataset]]:
"""
Run an optimization to find best buffer zone width.
"""
optuna.logging.set_verbosity(optuna.logging.WARN)
# pylint: enable=duplicate-code
# if sampler not provided, use GPSampler as default unless grid_search is True
if sampler is None:
if grid_search is True:
if n_trials < 4:
msg = (
"if grid_search is True, n_trials must be at least 4, "
"resetting n_trials to 4 now."
)
logger.warning(msg)
n_trials = 4
space = np.linspace(buffer_perc_limits[0], buffer_perc_limits[1], n_trials)
# omit first and last since they will be enqueued separately
space = space[1:-1]
sampler = optuna.samplers.GridSampler(
search_space={"buffer_perc": space},
seed=seed,
)
else:
with warnings.catch_warnings():
sampler = optuna.samplers.GPSampler(
n_startup_trials=int(n_trials / 4),
seed=seed,
deterministic_objective=True,
)
# pylint: disable=duplicate-code
# set file name for saving results with random number between 0 and 999
if fname is None:
fname = f"tmp_{random.randint(0, 999)}"
if parallel:
pathlib.Path(f"{fname}.log").unlink(missing_ok=True)
pathlib.Path(f"{fname}.lock").unlink(missing_ok=True)
pathlib.Path(f"{fname}.log.lock").unlink(missing_ok=True)
storage = optuna.storages.JournalStorage(
optuna.storages.journal.JournalFileBackend(f"{fname}.log"),
)
else:
storage = None
study = optuna.create_study(
direction="minimize",
sampler=sampler,
load_if_exists=False,
study_name=fname,
storage=storage,
pruner=DuplicateIterationPruner,
)
# explicitly add the limits as trials
study.enqueue_trial({"damping": buffer_perc_limits[0]}, skip_if_exists=True)
study.enqueue_trial({"damping": buffer_perc_limits[1]}, skip_if_exists=True)
# run optimization
with warnings.catch_warnings():
warnings.filterwarnings(
"ignore", message="logei_candidates_func is experimental"
)
with utils.DuplicateFilter(logger): # type: ignore[no-untyped-call]
study = run_optuna(
study=study,
storage=storage,
objective=OptimalBuffer(
buffer_perc_limits=buffer_perc_limits,
fname=fname,
target=target,
**kwargs,
),
n_trials=n_trials,
# callbacks=[_warn_limits_better_than_trial_1_param],
maximize_cpus=True,
parallel=parallel,
progressbar=progressbar,
)
best_trial = study.best_trial
# warn if any best parameter values are at their limits
warn_parameter_at_limits(best_trial)
# log the results of the best trial
log_optuna_results(best_trial)
# re-run decay calculation with optimal buffer
results = utils.gravity_decay_buffer(
buffer_perc=best_trial.params["buffer_perc"],
plot=plot,
**kwargs,
)
if plot:
try:
plot1 = optuna.visualization.plot_optimization_history(study)
plot2 = optuna.visualization.plot_slice(study)
plot1.show()
plot2.show()
except Exception as e: # pylint: disable=broad-exception-caught
logger.error("plotting failed with error: %s", e)
return study, results
class OptimalBuffer:
"""
Objective function to use in an Optuna optimization for finding the buffer zone
width as a percentage of region width which limits the gravity decay (edge effects)
to a specified amount within a region of interest. Used within function
func:`optimal_buffer`.
"""
def __init__(
self,
buffer_perc_limits: tuple[float, float],
target: float,
fname: str,
**kwargs: typing.Any,
) -> None:
self.fname = fname
self.buffer_perc_limits = buffer_perc_limits
self.target = target
self.kwargs = kwargs
def __call__(self, trial: optuna.trial) -> float:
"""
Parameters
----------
trial : optuna.trial
the trial to run
Returns
-------
float
the score of the eq_sources fit
"""
buffer_perc = trial.suggest_float(
"buffer_perc",
self.buffer_perc_limits[0],
self.buffer_perc_limits[1],
)
new_kwargs = {
key: value
for key, value in self.kwargs.items()
if key
not in [
"buffer_perc",
"progressbar",
"results_fname",
"plot",
]
}
trial.set_user_attr("fname", f"{self.fname}_trial_{trial.number}")
score = utils.gravity_decay_buffer(
buffer_perc=buffer_perc,
progressbar=False,
plot=False,
**new_kwargs,
)[0]
return np.abs((self.target) - score) # type: ignore[no-any-return]
class DuplicateIterationPruner(optuna.pruners.BasePruner): # type: ignore[misc]
"""
DuplicatePruner
Pruner to detect duplicate trials based on the parameters.
This pruner is used to identify and prune trials that have the same set of
parameters as a previously completed trial.
"""
def prune(self, study: optuna.study.Study, trial: optuna.trial.FrozenTrial) -> bool:
completed_trials = study.get_trials(states=[optuna.trial.TrialState.COMPLETE])
for completed_trial in completed_trials:
if completed_trial.params == trial.params:
return True
return False