from __future__ import annotations # pylint: disable=too-many-lines
import copy
import typing
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import optuna
import pandas as pd
import plotly
import plotly.io as pio
import pyvista
import scipy as sp
import seaborn as sns
import verde as vd
import xarray as xr
from IPython.display import clear_output
from numpy.typing import NDArray
from polartoolkit import maps, profiles
from polartoolkit import utils as polar_utils
from invert4geom import log, utils
# This ensures Plotly output works in multiple places:
# plotly_mimetype: VS Code notebook UI
# notebook: "Jupyter: Export to HTML" command in VS Code
# See https://plotly.com/python/renderers/#multiple-renderers
pio.renderers.default = "plotly_mimetype+notebook"
[docs]
def plot_2_parameter_cv_scores(
scores: list[float],
parameter_pairs: list[tuple[float, float]],
param_names: tuple[str, str] = ("Hyperparameter 1", "Hyperparameter 2"),
figsize: tuple[float, float] = (5, 3.5),
cmap: str | None = None,
) -> None:
"""
plot a scatter plot graph with x axis equal to parameter 1, y axis equal to
parameter 2, and points colored by cross-validation scores.
Parameters
----------
scores : list[float]
score values
parameter_pairs : list[float]
parameter values
param_names : tuple[str, str], optional
name to give for the parameters, by default "Hyperparameter"
figsize : tuple[float, float], optional
size of the figure, by default (5, 3.5)
cmap : str, optional
matplotlib colormap for scores, by default "viridis"
"""
sns.set_theme()
if cmap is None:
cmap = sns.color_palette("mako", as_cmap=True)
df = pd.DataFrame(
{
"scores": scores,
param_names[0]: [
parameter_pairs[i][0] for i in list(range(len(parameter_pairs)))
],
param_names[1]: [
parameter_pairs[i][1] for i in list(range(len(parameter_pairs)))
],
}
)
df = df.sort_values(by="scores")
best = df.iloc[0]
plt.figure(figsize=figsize)
plt.title("Two parameter cross-validation")
grid = df.set_index([param_names[1], param_names[0]]).to_xarray().scores
grid.plot(
cmap=cmap,
# norm=plt.Normalize(df.scores.min(), df.scores.max()),
)
# plt.contourf(
# df[param_names[0]],
# df[param_names[1]],
# Z = grid,
# cmap = cmap,
# )
plt.scatter(
df[param_names[0]], # pylint: disable=unsubscriptable-object
df[param_names[1]], # pylint: disable=unsubscriptable-object
# c = df.scores,
# cmap = cmap,
# marker="o",
marker=".",
color="gray",
)
plt.plot(
best[param_names[0]],
best[param_names[1]],
"s",
markersize=10,
color=sns.color_palette()[3],
label="Minimum",
)
plt.legend(
loc="upper right",
)
plt.xlabel(param_names[0])
plt.ylabel(param_names[1])
# plt.colorbar()
plt.tight_layout()
[docs]
def plot_2_parameter_cv_scores_uneven(
study: optuna.study.Study,
param_names: tuple[str, str],
plot_param_names: tuple[str, str] = ("Hyperparameter 1", "Hyperparameter 2"),
figsize: tuple[float, float] = (5, 3.5),
cmap: str | None = None,
best: str = "min",
logx: bool = False,
logy: bool = False,
robust: bool = False,
) -> None:
"""
plot a scatter plot graph with x axis equal to parameter 1, y axis equal to
parameter 2, and points colored by cross-validation scores.
Parameters
----------
study : optuna.study.Study
param_names : tuple[str, str], optional
name to give for the parameters, by default "Hyperparameter"
figsize : tuple[float, float], optional
size of the figure, by default (5, 3.5)
cmap : str, optional
matplotlib colormap for scores, by default "viridis"
best : str, optional
whether the 'min' or 'max' score is considered best, by default 'min'
logx, logy : bool, optional
make the x or y axes log scale, by default False
robust: bool, optional
use robust color limits
"""
sns.set_theme()
if cmap is None:
cmap = sns.color_palette("mako", as_cmap=True)
df = study.trials_dataframe()
df = df[[param_names[0], param_names[1], "value"]]
if best == "min":
best_ind = df.value.idxmin()
label = "Minimum"
elif best == "max":
best_ind = df.value.idxmax()
label = "Maximum"
else:
msg = "best must be either 'min' or 'max'"
raise ValueError(msg)
plt.figure(figsize=figsize)
plt.title("Two parameter cross-validation")
x = df[param_names[0]].values
y = df[param_names[1]].values
z = df.value.values
x_buffer = (max(x) - min(x)) / 50
y_buffer = (max(y) - min(y)) / 50
# 2D grid for interpolation
xi = np.linspace(min(x), max(x))
yi = np.linspace(min(y), max(y))
xi, yi = np.meshgrid(xi, yi)
try:
interp = sp.interpolate.CloughTocher2DInterpolator(list(zip(x, y)), z)
except ValueError as e:
log.error(
"Error interpolating value in plot_2_parameter_cv_scores_uneven: %s", e
)
return
zi = interp(xi, yi)
vmin, vmax = polar_utils.get_min_max(
df.value,
robust=robust,
# robust_percentiles=(.89,.9)
)
plt.pcolormesh(
xi,
yi,
zi,
cmap=cmap,
vmin=vmin,
vmax=vmax,
# shading='auto',
)
# plt.contourf(
# xi,
# yi,
# zi,
# 30,
# cmap=cmap,
# vmin=vmin,
# vmax=vmax,
# )
plt.colorbar().set_label("Scores")
plt.plot(
df.iloc[best_ind][param_names[0]],
df.iloc[best_ind][param_names[1]],
"s",
markersize=10,
color=sns.color_palette()[3],
label=label,
)
plt.scatter(
x,
y,
marker=".",
color="lightgray",
edgecolor="black",
)
plt.legend(
loc="best",
)
plt.xlim([min(x) - x_buffer, max(x) + x_buffer])
plt.ylim([min(y) - y_buffer, max(y) + y_buffer])
plt.xlabel(plot_param_names[0])
plt.ylabel(plot_param_names[1])
plt.xticks(rotation=20)
if logx:
plt.xscale("log")
if logy:
plt.yscale("log")
plt.tight_layout()
[docs]
def plot_cv_scores(
scores: list[float],
parameters: list[float],
logx: bool = False,
logy: bool = False,
param_name: str = "Hyperparameter",
figsize: tuple[float, float] = (5, 3.5),
plot_title: str | None = None,
fname: str | None = None,
best: str = "min",
) -> typing.Any:
"""
plot a graph of cross-validation scores vs hyperparameter values
Parameters
----------
scores : list[float]
score values
parameters : list[float]
parameter values
logx, logy : bool, optional
make the x or y axes log scale, by default False
param_name : str, optional
name to give for the parameters, by default "Hyperparameter"
figsize : tuple[float, float], optional
size of the figure, by default (5, 3.5)
plot_title : str | None, optional
title of figure, by default None
fname : str | None, optional
filename to save figure, by default None
best : str, optional
which value to plot as the best, 'min' or 'max', by default "min"
Returns
-------
a matplotlib figure instance
"""
sns.set_theme()
df0 = pd.DataFrame({"scores": scores, "parameters": parameters})
df = df0.sort_values(by="parameters")
if best == "min":
best_score = df.scores.argmin()
label = "Minimum"
elif best == "max":
best_score = df.scores.argmax()
label = "Maximum"
else:
msg = f"best must be 'min' or 'max', not {best}"
raise ValueError(msg)
fig, ax = plt.subplots(figsize=figsize)
if plot_title is not None:
ax.set_title(plot_title)
else:
ax.set_title(f"{param_name} Cross-validation")
ax.plot(
df.parameters.iloc[best_score],
df.scores.iloc[best_score],
"s",
markersize=10,
color=sns.color_palette()[3],
label=label,
)
ax.plot(df.parameters, df.scores, marker="o")
ax.scatter(
df.parameters,
df.scores,
s=1,
marker=".",
color="black",
edgecolors="black",
zorder=10,
)
ax.legend(loc="best")
if logx:
ax.set_xscale("log")
if logy:
ax.set_yscale("log")
ax.set_xlabel(f"{param_name} value")
ax.set_ylabel("Root Mean Square Error")
plt.tight_layout()
if fname is not None:
plt.savefig(fname)
return fig
[docs]
def plot_convergence(
results: pd.DataFrame,
params: dict[str, typing.Any],
inversion_region: tuple[float, float, float, float] | None = None,
figsize: tuple[float, float] = (5, 3.5),
fname: str | None = None,
) -> None:
"""
plot a graph of L2-norm and delta L2-norm vs iteration number.
Parameters
----------
results : pandas.DataFrame
gravity result dataframe
params : dict[str, typing.Any]
inversion parameters output from function `run_inversion()`
inversion_region : tuple[float, float, float, float] | None, optional
inside region of inversion, by default None
figsize : tuple[float, float], optional
width and height of figure, by default (5, 3.5)
fname : str | None, optional
filename to save figure, by default None
"""
sns.set_theme()
# get misfit data at end of each iteration
cols = [s for s in results.columns.to_list() if "_final_misfit" in s]
iters = len(cols)
if inversion_region is not None:
l2_norms = [np.sqrt(utils.rmse(results[results.inside][i])) for i in cols]
starting_misfit = utils.rmse(results[results.inside]["iter_1_initial_misfit"])
starting_l2_norm = np.sqrt(starting_misfit)
else:
l2_norms = [np.sqrt(utils.rmse(results[i])) for i in cols]
starting_misfit = utils.rmse(results["iter_1_initial_misfit"])
starting_l2_norm = np.sqrt(starting_misfit)
# add starting l2 norm to the beginning of the list
l2_norms.insert(0, starting_l2_norm)
# calculate delta L2-norms
delta_l2_norms = []
for i, m in enumerate(l2_norms):
if i == 0:
delta_l2_norms.append(np.nan)
else:
delta_l2_norms.append(l2_norms[i - 1] / m)
# get tolerance values
l2_norm_tolerance = float(params["L2 norm tolerance"])
delta_l2_norm_tolerance = float(params["Delta L2 norm tolerance"])
# create figure instance
_fig, ax1 = plt.subplots(figsize=figsize)
# make second y axis for delta l2 norm
ax2 = ax1.twinx()
# plot L2-norm convergence
ax1.plot(range(iters + 1), l2_norms, "b-")
# plot delta L2-norm convergence
ax2.plot(range(iters + 1), delta_l2_norms, "g-")
# set axis labels, ticks and gridlines
ax1.set_xlabel("Iteration")
ax1.set_ylabel("L2-norm", color="b")
ax1.tick_params(axis="y", colors="b", which="both")
ax2.set_ylabel("Î L2-norm", color="g")
ax2.tick_params(axis="y", colors="g", which="both")
ax2.grid(False)
# add buffer to y axis limits
ax1.set_ylim(0.9 * l2_norm_tolerance, starting_l2_norm)
ax2.set_ylim(delta_l2_norm_tolerance, np.nanmax(delta_l2_norms))
# set x axis to integer values
ax1.xaxis.set_major_locator(mpl.ticker.MaxNLocator(integer=True))
# make both y axes align at tolerance levels
align_yaxis(ax1, l2_norm_tolerance, ax2, delta_l2_norm_tolerance)
# plot horizontal line of tolerances
ax2.axhline(
y=delta_l2_norm_tolerance,
linewidth=1,
color="r",
linestyle="dashed",
label="tolerances",
)
# ask matplotlib for the plotted objects and their labels
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc="upper right")
plt.title("Inversion convergence")
plt.tight_layout()
if fname is not None:
plt.savefig(fname)
plt.show()
[docs]
def plot_dynamic_convergence(
l2_norms: list[float],
l2_norm_tolerance: float,
delta_l2_norms: list[float],
delta_l2_norm_tolerance: float,
starting_misfit: float,
figsize: tuple[float, float] = (5, 3.5),
) -> None:
"""
plot a dynamic graph of L2-norm and delta L2-norm vs iteration number.
Parameters
----------
l2_norms : list[float]
list of l2 norm values
l2_norm_tolerance : float
l2 norm tolerance
delta_l2_norms : list[float]
list of delta l2 norm values
delta_l2_norm_tolerance : float
delta l2 norm tolerance
starting_misfit : float
starting misfit rmse
figsize : tuple[float, float], optional
width and height of figure, by default (5, 3.5)
"""
sns.set_theme()
clear_output(wait=True)
l2_norms = copy.deepcopy(l2_norms)
delta_l2_norms = copy.deepcopy(delta_l2_norms)
assert len(delta_l2_norms) == len(l2_norms)
l2_norms.insert(0, np.sqrt(starting_misfit))
delta_l2_norms.insert(0, np.nan)
iters = len(l2_norms)
# create figure instance
_fig, ax1 = plt.subplots(figsize=figsize)
# make second y axis for delta l2 norm
ax2 = ax1.twinx()
# plot L2-norm convergence
ax1.plot(list(range(len(l2_norms))), l2_norms, "b-")
# plot delta L2-norm convergence
if iters > 1:
ax2.plot(list(range(len(delta_l2_norms))), delta_l2_norms, "g-")
# set axis labels, ticks and gridlines
ax1.set_xlabel("Iteration")
ax1.set_ylabel("L2-norm", color="b")
ax1.tick_params(axis="y", colors="b", which="both")
ax2.set_ylabel("Î L2-norm", color="g")
ax2.tick_params(axis="y", colors="g", which="both")
ax2.grid(False)
# add buffer to y axis limits
ax1.set_ylim(0.9 * (l2_norm_tolerance), np.sqrt(starting_misfit))
if iters > 1:
ax2.set_ylim(delta_l2_norm_tolerance, np.nanmax(delta_l2_norms))
# set x axis to integer values
ax1.xaxis.set_major_locator(mpl.ticker.MaxNLocator(integer=True))
# plot current L2-norm and Î L2-norm
ax1.plot(
iters - 1,
l2_norms[-1],
"^",
markersize=6,
color=sns.color_palette()[3],
# label="current L2-norm",
)
if iters > 1:
ax2.plot(
iters - 1,
delta_l2_norms[-1],
"^",
markersize=6,
color=sns.color_palette()[3],
# label="current Î L2-norm",
)
# make both y axes align at tolerance levels
align_yaxis(ax1, l2_norm_tolerance, ax2, delta_l2_norm_tolerance)
# plot horizontal line of tolerances
ax2.axhline(
y=delta_l2_norm_tolerance,
linewidth=1,
color="r",
linestyle="dashed",
label="tolerances",
)
# ask matplotlib for the plotted objects and their labels
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax2.legend(lines + lines2, labels + labels2, loc="upper right")
plt.title("Inversion convergence")
plt.tight_layout()
plt.show()
[docs]
def align_yaxis(
ax1: mpl.axes.Axes,
v1: float,
ax2: mpl.axes.Axes,
v2: float,
) -> None:
"""
adjust ax2 ylimit so that v2 in ax2 is aligned to v1 in ax1.
From https://stackoverflow.com/a/10482477/18686384
"""
_, y1 = ax1.transData.transform((0, v1))
_, y2 = ax2.transData.transform((0, v2))
inv = ax2.transData.inverted()
_, dy = inv.transform((0, 0)) - inv.transform((0, y1 - y2))
miny, maxy = ax2.get_ylim()
ax2.set_ylim(miny + dy, maxy + dy)
[docs]
def grid_inversion_results(
misfits: list[str],
topos: list[str],
corrections: list[str],
prisms_ds: xr.Dataset,
grav_results: pd.DataFrame,
region: tuple[float, float, float, float],
) -> tuple[list[xr.DataArray], list[xr.DataArray], list[xr.DataArray]]:
"""
create grids from the various data variables of the supplied gravity dataframe and
prism dataset
Parameters
----------
misfits : list[str]
list of misfit column names in the gravity results dataframe
topos : list[str]
list of topography variable names in the prism dataset
corrections : list[str]
list of correction variable names in the prism dataset
prisms_ds : xarray.Dataset
resulting dataset of prism layer from the inversion
grav_results : pandas.DataFrame
resulting dataframe of gravity data from the inversion
region : tuple[float, float, float, float]
region to use for gridding in format (xmin, xmax, ymin, ymax)
Returns
-------
misfit_grids : list[xarray.DataArray]
list of misfit grids
topo_grids : list[xarray.DataArray]
list of topography grids
corrections_grids : list[xarray.DataArray]
list of correction grids
"""
misfit_grids = []
for m in misfits:
grid = grav_results.set_index(["northing", "easting"]).to_xarray()[m]
misfit_grids.append(grid)
topo_grids = []
for t in topos:
topo_grids.append(
prisms_ds[t].sel(
easting=slice(region[0], region[1]),
northing=slice(region[2], region[3]),
)
)
corrections_grids = []
for m in corrections:
corrections_grids.append(
prisms_ds[m].sel(
easting=slice(region[0], region[1]),
northing=slice(region[2], region[3]),
)
)
return (misfit_grids, topo_grids, corrections_grids)
[docs]
def plot_inversion_topo_results(
prisms_ds: xr.Dataset,
region: tuple[float, float, float, float] | None = None,
constraints_df: pd.DataFrame | None = None,
constraint_style: str = "x.3c",
fig_height: float = 12,
) -> None:
"""
plot the initial and final topography grids from the inversion and their difference
Parameters
----------
prisms_ds : xarray.Dataset
dataset resulting from inversion
topo_cmap_perc : float, optional
value to multiple min and max values by for colorscale, by default 1
region : tuple[float, float, float, float], optional
clip grids to this region before plotting
constraints_df : pandas.DataFrame, optional
constraint points to include in the plots
constraint_style : str, optional
pygmt style string for for constraint points, by default 'x.3c'
fig_height : float, optional
height of the figure, by default 12
"""
initial_topo = prisms_ds.starting_topo
final_topo = prisms_ds.topo
points = constraints_df if constraints_df is not None else None
# pylint: disable=duplicate-code
_ = polar_utils.grd_compare(
initial_topo,
final_topo,
fig_height=fig_height,
region=region,
plot=True,
grid1_name="Initial topography",
grid2_name="Inverted topography",
robust=True,
hist=True,
inset=False,
verbose="q",
title="difference",
grounding_line=False,
reverse_cpt=True,
cmap="rain",
points=points,
points_style=constraint_style,
)
# pylint: enable=duplicate-code
[docs]
def plot_inversion_grav_results(
grav_results: pd.DataFrame,
region: tuple[float, float, float, float],
iterations: list[int],
constraints_df: pd.DataFrame | None = None,
fig_height: float = 12,
constraint_style: str = "x.3c",
) -> None:
"""
plot the initial and final misfit grids from the inversion and their difference
Parameters
----------
grav_results : pandas.DataFrame
resulting dataframe of gravity data from the inversion
region : tuple[float, float, float, float]
region to use for gridding in format (xmin, xmax, ymin, ymax)
iterations : list[int]
list of all the iteration numbers
constraints_df : pandas.DataFrame, optional
constraint points to include in the plots
fig_height : float, optional
height of the figure, by default 12
constraint_style : str, optional
pygmt style string for for constraint points, by default 'x.3c'
"""
grid = grav_results.set_index(["northing", "easting"]).to_xarray()
initial_misfit = grid["iter_1_initial_misfit"]
final_misfit = grid[f"iter_{max(iterations)}_final_misfit"]
initial_rmse = utils.rmse(grav_results["iter_1_initial_misfit"])
final_rmse = utils.rmse(grav_results[f"iter_{max(iterations)}_final_misfit"])
points = constraints_df if constraints_df is not None else None
dif, initial, final = polar_utils.grd_compare(
initial_misfit,
final_misfit,
plot=False,
)
robust = True
diff_maxabs = vd.maxabs(polar_utils.get_min_max(dif, robust=robust))
initial_maxabs = vd.maxabs(polar_utils.get_min_max(initial, robust=robust))
final_maxabs = vd.maxabs(polar_utils.get_min_max(final, robust=robust))
fig = maps.plot_grd(
initial,
fig_height=fig_height,
region=region,
cmap="balance+h0",
# robust=True,
cpt_lims=(-initial_maxabs, initial_maxabs),
hist=True,
cbar_label="mGal",
title=f"Initial misfit: RMSE:{round(initial_rmse, 2)} mGal",
points=points,
points_style=constraint_style,
)
fig = maps.plot_grd(
dif,
fig=fig,
origin_shift="x",
fig_height=fig_height,
region=region,
cmap="balance+h0",
cpt_lims=(-diff_maxabs, diff_maxabs),
hist=True,
cbar_label="mGal",
title=f"difference: RMSE:{round(utils.rmse(dif), 2)} mGal",
points=points,
points_style=constraint_style,
)
fig = maps.plot_grd(
final,
fig=fig,
origin_shift="x",
fig_height=fig_height,
region=region,
cmap="balance+h0",
# robust=True,
cpt_lims=(-final_maxabs, final_maxabs),
hist=True,
cbar_label="mGal",
title=f"Final misfit: RMSE:{round(final_rmse, 2)} mGal",
points=points,
points_style=constraint_style,
)
fig.show()
[docs]
def plot_inversion_iteration_results(
grids: tuple[list[xr.DataArray], list[xr.DataArray], list[xr.DataArray]],
grav_results: pd.DataFrame,
topo_results: pd.DataFrame,
parameters: dict[str, typing.Any],
iterations: list[int],
topo_cmap_perc: float = 1,
misfit_cmap_perc: float = 1,
corrections_cmap_perc: float = 1,
constraints_df: pd.DataFrame | None = None,
constraint_size: float = 1,
) -> None:
"""
plot the starting misfit, updated topography, and correction grids for a specified
number of the iterations of an inversion
Parameters
----------
grids : tuple[list[xarray.DataArray], list[xarray.DataArray],
list[xarray.DataArray]]
lists of misfit, topography, and correction grids
grav_results : pandas.DataFrame
gravity dataframe resulting from the inversion
topo_results : pandas.DataFrame
topography dataframe resulting from the inversion
parameters : dict[str, typing.Any]
inversion parameters resulting from the inversion
iterations : list[int]
list of all the iteration numbers which occurred in the inversion
topo_cmap_perc : float, optional
value to multiply the max and min colorscale values by, by default 1
misfit_cmap_perc : float, optional
value to multiply the max and min colorscale values by, by default 1
corrections_cmap_perc : float, optional
value to multiply the max and min colorscale values by, by default 1
constraints_df : pandas.DataFrame, optional
constraint points to include in the plots
constraint_size : float, optional
size for constraint points, by default 1
"""
misfit_grids, topo_grids, corrections_grids = grids
params = copy.deepcopy(parameters)
# set figure parameters
sub_width = 5
nrows, ncols = len(iterations), 3
# setup subplot figure
_fig, ax = plt.subplots(
nrows=nrows,
ncols=ncols,
figsize=(sub_width * ncols, sub_width * nrows),
)
# set color limits for each column
misfit_lims = []
topo_lims = []
corrections_lims = []
for g in misfit_grids:
misfit_lims.append(polar_utils.get_min_max(g))
for g in topo_grids:
topo_lims.append(polar_utils.get_min_max(g))
for g in corrections_grids:
corrections_lims.append(polar_utils.get_min_max(g))
misfit_min = min([i[0] for i in misfit_lims]) # pylint: disable=consider-using-generator
misfit_max = max([i[1] for i in misfit_lims]) # pylint: disable=consider-using-generator
misfit_lim = vd.maxabs(misfit_min, misfit_max) * misfit_cmap_perc
topo_min = min([i[0] for i in topo_lims]) * topo_cmap_perc # pylint: disable=consider-using-generator
topo_max = max([i[1] for i in topo_lims]) * topo_cmap_perc # pylint: disable=consider-using-generator
corrections_min = min([i[0] for i in corrections_lims]) # pylint: disable=consider-using-generator
corrections_max = max([i[1] for i in corrections_lims]) # pylint: disable=consider-using-generator
corrections_lim = (
vd.maxabs(corrections_min, corrections_max) * corrections_cmap_perc
)
for column, j in enumerate(grids):
for row, _y in enumerate(j):
# if only 1 iteration
axes = ax[column] if max(iterations) == 1 else ax[row, column]
# add iteration number as text
plt.text(
-0.1,
0.5,
f"Iteration #{iterations[row]}",
transform=axes.transAxes,
rotation="vertical",
ha="center",
va="center",
fontsize=20,
)
# set colormaps and limits
if column == 0: # misfit grids
cmap = "RdBu_r"
lims = (-misfit_lim, misfit_lim)
robust = True
norm = None
elif column == 1: # topography grids
cmap = "gist_earth"
lims = (topo_min, topo_max)
robust = True
norm = None
elif column == 2: # correction grids
cmap = "RdBu_r"
lims = (-corrections_lim, corrections_lim)
robust = True
norm = None
# plot grids
j[row].plot(
ax=axes,
cmap=cmap, # pylint: disable=possibly-used-before-assignment
norm=norm, # pylint: disable=possibly-used-before-assignment
robust=robust, # pylint: disable=possibly-used-before-assignment
vmin=lims[0], # pylint: disable=possibly-used-before-assignment
vmax=lims[1], # pylint: disable=possibly-used-before-assignment
cbar_kwargs={
"orientation": "horizontal",
"anchor": (1, 1),
"fraction": 0.05,
"pad": 0.04,
},
)
# add subplot titles
if column == 0: # misfit grids
rmse = utils.rmse(
grav_results[f"iter_{iterations[row]}_initial_misfit"]
)
axes.set_title(f"initial misfit RMSE = {round(rmse, 2)} mGal")
elif column == 1: # topography grids
axes.set_title("updated topography")
elif column == 2: # correction grids
rmse = utils.rmse(topo_results[f"iter_{iterations[row]}_correction"])
axes.set_title(f"iteration correction RMSE = {round(rmse, 2)} m")
if (constraints_df is not None) & (column in (0, 1, 2)): # misfit grids
axes.plot(
constraints_df.easting, # type: ignore[union-attr]
constraints_df.northing, # type: ignore[union-attr]
"k.",
markersize=constraint_size,
markeredgewidth=1,
)
# set axes labels and make proportional
axes.set_xticklabels([])
axes.set_yticklabels([])
axes.set_xlabel("")
axes.set_ylabel("")
axes.set_aspect("equal")
# add text with inversion parameter info
text1, text2, text3 = [], [], []
params.pop("Iteration times")
for i, (k, v) in enumerate(params.items(), start=1):
if i <= 5:
text1.append(f"{k}: {v}\n")
elif i <= 11:
text2.append(f"{k}: {v}\n")
else:
text3.append(f"{k}: {v}\n")
text1 = "".join(text1) # type: ignore[assignment]
text2 = "".join(text2) # type: ignore[assignment]
text3 = "".join(text3) # type: ignore[assignment]
# if only 1 iteration
if max(iterations) == 1:
plt.text(
x=0.0,
y=1.1,
s=text1,
transform=ax[0].transAxes,
)
plt.text(
x=0.0,
y=1.1,
s=text2,
transform=ax[1].transAxes,
)
plt.text(
x=0.0,
y=1.1,
s=text3,
transform=ax[2].transAxes,
)
else:
plt.text(
x=0.0,
y=1.1,
s=text1,
transform=ax[0, 0].transAxes,
)
plt.text(
x=0.0,
y=1.1,
s=text2,
transform=ax[0, 1].transAxes,
)
plt.text(
x=0.0,
y=1.1,
s=text3,
transform=ax[0, 2].transAxes,
)
[docs]
def plot_inversion_results(
grav_results: pd.DataFrame | str,
topo_results: pd.DataFrame | str,
parameters: dict[str, typing.Any] | str,
grav_region: tuple[float, float, float, float] | None,
iters_to_plot: int | None = None,
plot_iter_results: bool = True,
plot_topo_results: bool = True,
plot_grav_results: bool = True,
constraints_df: pd.DataFrame | None = None,
**kwargs: typing.Any,
) -> None:
"""
plot various results from the inversion
Parameters
----------
grav_results : pandas.DataFrame | str
gravity results dataframe or filename
topo_results : pandas.DataFrame | str
topography results dataframe or filename
parameters : dict[str, typing.Any] | str
inversion parameters dictionary or filename
grav_region : tuple[float, float, float, float] | None
region to use for gridding in format (xmin, xmax, ymin, ymax), by default None
iters_to_plot : int | None, optional
number of iterations to plot, including the first and last, by default None
plot_iter_results : bool, optional
plot the iteration results, by default True
plot_topo_results : bool, optional
plot the topography results, by default True
plot_grav_results : bool, optional
plot the gravity results, by default True
constraints_df : pandas.DataFrame, optional
constraint points to include in the plots
"""
# if results are given as filenames (strings), load them
if isinstance(grav_results, str):
grav_results = pd.read_csv(
grav_results,
sep=",",
header="infer",
index_col=None,
compression="gzip",
)
if isinstance(topo_results, str):
topo_results = pd.read_csv(
topo_results,
sep=",",
header="infer",
index_col=None,
compression="gzip",
)
if isinstance(parameters, str):
params = np.load(parameters, allow_pickle="TRUE").item()
else:
params = parameters
prisms_ds = topo_results.set_index(["northing", "easting"]).to_xarray()
# either set input inversion region or get from input gravity data extent
if grav_region is None:
grav_region = vd.get_region((grav_results.easting, grav_results.northing))
# get lists of columns to grid
misfits = [s for s in grav_results.columns.to_list() if "initial_misfit" in s]
topos = [s for s in topo_results.columns.to_list() if "_layer" in s]
corrections = [s for s in topo_results.columns.to_list() if "_correction" in s]
# list of iterations, e.g. [1,2,3,4]
its = [int(s[5:][:-15]) for s in misfits]
# get on x amount of iterations to plot
if iters_to_plot is not None:
if iters_to_plot > max(its):
iterations = its
else:
iterations = list(np.linspace(1, max(its), iters_to_plot, dtype=int))
else:
iterations = its
# subset columns based on iterations to plot
misfits = [misfits[i] for i in [x - 1 for x in iterations]]
topos = [topos[i] for i in [x - 1 for x in iterations]]
corrections = [corrections[i] for i in [x - 1 for x in iterations]]
# grid all results
grids = grid_inversion_results(
misfits,
topos,
corrections,
prisms_ds,
grav_results,
grav_region,
)
if plot_iter_results is True:
plot_inversion_iteration_results(
grids,
grav_results,
topo_results,
params,
iterations,
topo_cmap_perc=kwargs.get("topo_cmap_perc", 1),
misfit_cmap_perc=kwargs.get("misfit_cmap_perc", 1),
corrections_cmap_perc=kwargs.get("corrections_cmap_perc", 1),
constraints_df=constraints_df,
constraint_size=kwargs.get("constraint_size", 1),
)
if plot_topo_results is True:
plot_inversion_topo_results(
prisms_ds,
region=grav_region,
constraints_df=constraints_df,
constraint_style=kwargs.get("constraint_style", "x.3c"),
fig_height=kwargs.get("fig_height", 12),
)
if plot_grav_results is True:
plot_inversion_grav_results(
grav_results,
grav_region,
iterations,
constraints_df=constraints_df,
fig_height=kwargs.get("fig_height", 12),
constraint_style=kwargs.get("constraint_style", "x.3c"),
)
[docs]
def add_light(
plotter: pyvista.Plotter,
prisms: xr.Dataset,
) -> None:
"""
add a light to a pyvista plotter object
Parameters
----------
plotter : pyvista.Plotter
pyvista plotter object
prisms : xarray.Dataset
harmonica prisms layer
"""
# Add a ceiling light
west, east, south, north = vd.get_region((prisms.easting, prisms.northing))
easting_center, northing_center = (east + west) / 2, (north + south) / 2
light = pyvista.Light(
position=(easting_center, northing_center, 100e3),
focal_point=(easting_center, northing_center, 0),
intensity=1, # 0 to 1
light_type="scene light", # the light doesn't move with the camera
positional=False, # the light comes from infinity
shadow_attenuation=0, # 0 to 1,
)
plotter.add_light(light)
[docs]
def show_prism_layers(
prisms: list[xr.Dataset] | xr.Dataset,
cmap: str = "viridis",
color_by: str = "density",
region: tuple[float, float, float, float] | None = None,
clip_box: bool = False,
**kwargs: typing.Any,
) -> None:
"""
show prism layers using PyVista
Parameters
----------
prisms : list | xarray.Dataset
either a single harmonica prism layer of list of layers,
cmap : str, optional
matplotlib colorscale to use, by default "viridis"
color_by : str, optional
either use a variable of the prism_layer dataset, typically 'density' or
'thickness', or choose 'constant' to have each layer colored by a unique color
use kwarg `colors` to alter these colors, by default is "density"
region : tuple[float, float, float, float], optional
region to clip the model to, by default None
clip_box : bool, optional
clip a corner out of the model to help visualize, by default False
"""
# Plot with pyvista
plotter = pyvista.Plotter(
lighting="three_lights",
notebook=True,
)
opacity = kwargs.get("opacity")
if isinstance(prisms, xr.Dataset):
prisms = [prisms]
for i, j in enumerate(prisms):
# if region is given, clip model
if region is not None:
j = j.sel( # noqa: PLW2901
easting=slice(region[0], region[1]),
northing=slice(region[2], region[3]),
)
# turn prisms into pyvista object
pv_grid = j.prism_layer.to_pyvista()
# clip corner out of model to help visualize
if clip_box is True:
# extract region from first prism layer
reg = vd.get_region((j.easting.values, j.northing.values))
# box_buffer used make box slightly bigger
box_buffer = kwargs.get("box_buffer", 5e3)
# set 6 edges of cube to clip out
bounds = [
reg[0] - box_buffer,
reg[0] + box_buffer + ((reg[1] - reg[0]) / 2),
reg[2] - box_buffer,
reg[2] + box_buffer + ((reg[3] - reg[2]) / 2),
np.nanmin(j.bottom),
np.nanmax(j.top),
]
pv_grid = pv_grid.clip_box(
bounds,
invert=True,
)
trans = opacity[i] if opacity is not None else None
if color_by == "constant":
colors = kwargs.get(
"colors", ["lavender", "aqua", "goldenrod", "saddlebrown", "black"]
)
plotter.add_mesh(
pv_grid,
color=colors[i],
smooth_shading=kwargs.get("smooth_shading", False),
style=kwargs.get("style", "surface"),
show_edges=kwargs.get("show_edges", False),
opacity=trans,
scalar_bar_args=kwargs.get("scalar_bar_args"),
)
else:
plotter.add_mesh(
pv_grid,
scalars=color_by,
cmap=cmap,
flip_scalars=kwargs.get("flip_scalars", False),
smooth_shading=kwargs.get("smooth_shading", False),
style=kwargs.get("style", "surface"),
show_edges=kwargs.get("show_edges", False),
log_scale=kwargs.get("log_scale", True),
opacity=trans,
scalar_bar_args=kwargs.get("scalar_bar_args"),
)
plotter.set_scale(
zscale=kwargs.get("zscale", 75)
) # exaggerate the vertical coordinate
plotter.camera_position = kwargs.get("camera_position", "xz")
plotter.camera.elevation = kwargs.get("elevation", 20)
plotter.camera.azimuth = kwargs.get("azimuth", -25)
plotter.camera.zoom(kwargs.get("zoom", 1.2))
# Add a ceiling light
add_light(plotter, prisms[i]) # pylint: disable=undefined-loop-variable
if kwargs.get("show_axes", True):
plotter.show_axes()
plotter.show(jupyter_backend=kwargs.get("backend", "client"))
[docs]
def combined_slice(
study: optuna.study.Study,
attribute_names: list[str],
parameter_name: str | None = None,
) -> plotly.graph_objects.Figure:
"""
plot combined slice plots for optimizations.
Parameters
----------
study : optuna.study.Study
the optuna study object
target_names : list[str]
list of names for parameters in the study
Returns
-------
plotly.graph_objects.Figure
a plotly figure
"""
figs = []
names = []
for i, j in enumerate(study.metric_names):
f = optuna.visualization.plot_slice(
study,
params=parameter_name,
target=lambda t: t.values[i], # noqa: B023 # pylint: disable=cell-var-from-loop
target_name=j,
)
if i == 0:
figs.append(f)
names.append(j)
for i in attribute_names: # type: ignore[assignment]
f = optuna.visualization.plot_slice(
study,
params=parameter_name,
target=lambda t: t.user_attrs[i], # noqa: B023 # pylint: disable=cell-var-from-loop
target_name=i,
)
figs.append(f)
names.append(i)
yaxes = {}
for i, j in enumerate(names, start=1):
if i == 1:
pass
else:
yax = plotly.graph_objs.layout.YAxis(
title=j,
overlaying="y",
side="left",
anchor="free",
autoshift=True,
)
yaxes[f"yaxis{i}"] = yax
layout = plotly.graph_objects.Layout(
yaxis1=plotly.graph_objs.layout.YAxis(
title=names[0],
side="right",
),
**yaxes,
)
# Create figure with secondary x-axis
fig = plotly.graph_objects.Figure(layout=layout) # pylint: disable=possibly-used-before-assignment
# Add traces
for i, j in enumerate(names):
fig.add_trace(
plotly.graph_objects.Scatter(
x=figs[i].data[0]["x"],
y=figs[i].data[0]["y"],
name=j,
mode="markers",
yaxis=f"y{i + 1}",
)
)
fig.update_layout(
xaxis=f.layout.xaxis,
title=f.layout.title.text,
)
return fig
[docs]
def plot_stochastic_results(
stats_ds: xr.Dataset,
points: pd.DataFrame | None = None,
region: tuple[float, float, float, float] | None = None,
**kwargs: typing.Any,
) -> None:
"""
Plot the (weighted) standard deviation (uncertainty) and mean of the stochastic
ensemble. Optionally, plot points as well.
Parameters
----------
stats_ds : xarray.Dataset
dataset with the merged inversion results, generate from function
`uncertainty.model_ensemble_stats`.
points : pandas.DataFrame | None, optional
dataframe with points to plot, by default None
Keyword Arguments
-----------------
cmap : str, optional
colormap to use for the ensemble mean, by default "rain"
unit : str, optional
unit of the data, by default "m"
reverse_cpt : bool, optional
reverse the ensemble mean colormap, by default True
label : str, optional
label for the colorbar, by default "ensemble mean"
points_label : str, optional
label for the points, by default None
fig_height : float, optional
height of the figure, by default 12
"""
cmap = kwargs.get("cmap", "viridis")
unit = kwargs.get("unit", "m")
reverse_cpt = kwargs.get("reverse_cpt", True)
label = kwargs.get("label", "ensemble mean")
points_label = kwargs.get("points_label")
fig_height = kwargs.get("fig_height", 12)
try:
stdev = stats_ds.weighted_stdev
weighted = "weighted"
except AttributeError:
stdev = stats_ds.z_stdev
weighted = ""
if region is not None:
stdev = stdev.sel(
easting=slice(region[0], region[1]),
northing=slice(region[2], region[3]),
)
fig = maps.plot_grd(
stdev,
fig_height=fig_height,
cmap="thermal",
robust=True,
hist=True,
cbar_label=f"{label}: {weighted} standard deviation, {unit}",
title="Ensemble uncertainty",
)
if points is not None:
fig.plot(
x=points.easting,
y=points.northing,
fill="black",
style="x.3c",
pen="1p",
label=points_label,
)
fig.legend()
try:
mean = stats_ds.weighted_mean
except AttributeError:
mean = stats_ds.z_mean
if region is not None:
mean = mean.sel(
easting=slice(region[0], region[1]),
northing=slice(region[2], region[3]),
)
fig = maps.plot_grd(
mean,
fig_height=fig_height,
cmap=cmap,
reverse_cpt=reverse_cpt,
robust=True,
hist=True,
cbar_label=f"{label}: {weighted} mean ({unit})",
title="Ensemble mean",
fig=fig,
origin_shift="x",
)
if points is not None:
fig.plot(
x=points.easting,
y=points.northing,
fill="black",
style="x.3c",
pen="1p",
label=points_label,
)
fig.legend()
fig.show()
[docs]
def remove_df_from_hoverdata(
plot: plotly.graph_objects.Figure,
) -> plotly.graph_objects.Figure:
"""
Remove the dataframe from the hoverdata of a plotly plot
Parameters
----------
plot : plotly.graph_objects.Figure
plotly figure
Returns
-------
plotly.graph_objects.Figure
plotly figure with the dataframe removed from the hoverdata
"""
text = []
for s in plot.data[1].text:
sub1 = '<br> "results"'
sub2 = "<br> "
start = s.split(sub1)[0]
end = s.split(sub1)[1]
new_string = start + end.split(sub2)[1]
text.append(new_string)
text = tuple(text) # type: ignore[assignment]
return plot.update_traces(text=text)
[docs]
def plot_latin_hypercube(
params_dict: dict[str, dict[str, typing.Any]],
plot_individual_dists: bool = True,
plot_2d_projections: bool = True,
) -> None:
"""
With a dictionary of parameters and their sampled values, plot the individual
distributions and or the 2D projections of the parameter pairs.
Parameters
----------
params_dict : dict[str, dict[str, typing.Any]]
dictionary of sampled parameter values, can be created manually or from the
output of func:`.uncertainty.create_lhc`
plot_individual_dists : bool, optional
choose to plot distribution of each parameter, by default True
plot_2d_projections : bool, optional
choose to plot the 2D projection of each parameter pair, by default True
"""
df = pd.DataFrame(
[params_dict[x]["sampled_values"] for x in params_dict],
).transpose()
df.columns = params_dict.keys()
# plot individual variables
if plot_individual_dists is True:
_, axes = plt.subplots(
1,
len(df.columns),
figsize=(3 * len(df.columns), 1.8),
)
if len(df.columns) == 1:
axes = [axes]
for i, j in enumerate(df.columns):
sns.kdeplot(
ax=axes[i],
data=df,
x=j,
)
sns.rugplot(ax=axes[i], data=df, x=j, linewidth=2.5, height=0.07)
axes[i].set_xlabel(j.replace("_", " ").capitalize())
axes[i].ticklabel_format(
axis="y",
style="sci",
scilimits=(0, 0),
)
axes[i].set_ylabel(None)
plt.show()
dim = np.shape(df)[1]
param_values = df.values
problem = {
"num_vars": dim,
"names": [i.replace("_", " ") for i in df.columns],
"bounds": [[-1, 1]] * dim,
}
# Rescale to the unit hypercube for the analysis
sample = utils.scale_normalized(param_values, problem["bounds"])
# 2D projection
if plot_2d_projections:
if len(df.columns) == 1:
pass
else:
projection_2d(sample, problem["names"])
[docs]
def projection_2d(
sample: NDArray,
var_names: list[str],
) -> None:
"""
Plots the sample projected on each 2D plane
Parameters
----------
sample : NDArray
The sampled values
var_names : list[str]
The names of the variables
"""
dim = sample.shape[1]
for i in range(dim):
for j in range(dim):
plt.subplot(dim, dim, i * dim + j + 1)
plt.scatter(
sample[:, j],
sample[:, i],
s=2,
)
if j == 0:
plt.ylabel(var_names[i], rotation=0, ha="right")
if i == dim - 1:
plt.xlabel(var_names[j], rotation=20, ha="right")
plt.xticks([])
plt.yticks([])
plt.show()
[docs]
def edge_effects(
grav_ds: xr.Dataset,
prism_layer: xr.DataArray,
inner_region: tuple[float, float, float, float],
plot_profile: bool = True,
) -> None:
"""
Show the gravity edge effects and the percentage decay within the inner region and
optionally a profile across the region.
Parameters
----------
grav_ds : xr.Dataset
the gravity dataset
prism_layer : xr.DataArray
the prism layer
inner_region : tuple[float, float, float, float]
the inside region, where forward gravity is calculated
plot_profile : bool, optional
plot a profile across the region, by default True
"""
# plot profiles
if plot_profile:
data_dict = profiles.make_data_dict(
["forward gravity", "without edge effects"],
[grav_ds.forward, grav_ds.forward_no_edge_effects],
["black", "red"],
)
layers_dict = profiles.make_data_dict(
["surface", "reference"],
[prism_layer.top, prism_layer.bottom],
["blue", "darkorange"],
)
fig, _, _ = profiles.plot_profile(
"points",
start=(inner_region[0], (inner_region[3] - inner_region[2]) / 2),
stop=(inner_region[1], (inner_region[3] - inner_region[2]) / 2),
layers_dict=layers_dict,
data_dict=data_dict,
fill_layers=False,
fig_width=10,
fig_height=8,
data_height=6,
)
fig.show()
dif = grav_ds.forward - grav_ds.forward_no_edge_effects
max_grav = grav_ds.forward.values.max()
percent_decay = 100 * (max_grav - (max_grav + dif)) / max_grav
hist_vals = vd.grid_to_table(percent_decay).reset_index().scalars
# plot histogram of gravity decay values
sns.displot(hist_vals, kde=True, stat="percent")
plt.xlabel("Percent of max forward gravity")
plt.ylabel("Percent")
plt.title("Percent gravity decay within inner region")
plt.show()
# plot gravity and percentage contours
fig = maps.plot_grd(
grav_ds.forward,
cmap="viridis",
region=inner_region,
title="Forward gravity",
cbar_label="mGal",
scalebar=False,
hist=True,
)
fig = maps.plot_grd(
percent_decay,
fig=fig,
origin_shift="x",
cmap="thermal",
region=inner_region,
title="Gravity edge effect",
cbar_label="Percentage decay",
scalebar=False,
hist=True,
)
fig.grdcontour(grid=percent_decay)
fig.show()