"""Visualization for NLSQ fitting results."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
import matplotlib.pyplot as plt
import numpy as np
from heterodyne.utils.logging import get_logger
if TYPE_CHECKING:
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from heterodyne.optimization.nlsq.results import NLSQResult
logger = get_logger(__name__)
try:
from heterodyne.viz.datashader_backend import HAS_DATASHADER as _HAS_DATASHADER
DATASHADER_AVAILABLE = bool(_HAS_DATASHADER)
except ImportError: # pragma: no cover
DATASHADER_AVAILABLE = False
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
def _resolve_color_limits(
matrix: np.ndarray,
percentile_min: float = 1.0,
percentile_max: float = 99.0,
) -> tuple[float, float]:
"""Percentile-based colour limits with fallback for empty/flat data."""
if matrix.size == 0 or not np.any(np.isfinite(matrix)):
return 1.0, 1.5
vmin = float(np.nanpercentile(matrix, percentile_min))
vmax = float(np.nanpercentile(matrix, percentile_max))
if not np.isfinite(vmin):
vmin = 1.0
if not np.isfinite(vmax):
vmax = 1.5
if vmin >= vmax:
vmax = vmin + 1.0
return vmin, vmax
def _save_fig(
fig: Figure,
save_path: Path | str | None,
dpi: int = 150,
) -> None:
"""Save and close a figure when a path is provided."""
if save_path is not None:
fig.savefig(save_path, dpi=dpi, bbox_inches="tight")
logger.info("Figure saved to %s", save_path)
plt.close(fig)
# ---------------------------------------------------------------------------
# Existing plots (kept intact)
# ---------------------------------------------------------------------------
[docs]
def plot_nlsq_fit(
c2_data: np.ndarray,
result: NLSQResult,
t: np.ndarray | None = None,
save_path: Path | str | None = None,
figsize: tuple[float, float] = (15, 5),
) -> Figure:
"""Plot NLSQ fit comparison.
Creates three-panel figure:
1. Experimental data
2. Fitted model
3. Residuals
Args:
c2_data: Experimental correlation data
result: NLSQ fitting result
t: Optional time array
save_path: Optional path to save figure
figsize: Figure size
Returns:
Matplotlib figure
"""
fig, axes = plt.subplots(1, 3, figsize=figsize)
if c2_data.size == 0:
fig.suptitle("No data available")
return fig
t_arr: np.ndarray = t if t is not None else np.arange(c2_data.shape[0])
if t_arr.size == 0:
fig.suptitle("No data available")
return fig
# Shared color scale for exp and fitted panels (hexp convention: clamp to
# [max(1.0, data_min), min(1.5, data_max)] over union of both arrays).
# This makes a poor fit visually obvious — both panels use the same scale.
if result.fitted_correlation is not None:
combined_vals = np.concatenate(
[c2_data.ravel(), result.fitted_correlation.ravel()]
)
else:
combined_vals = c2_data.ravel()
_finite = combined_vals[np.isfinite(combined_vals)]
_data_min = float(np.nanmin(_finite)) if _finite.size > 0 else 1.0
_data_max = float(np.nanmax(_finite)) if _finite.size > 0 else 1.5
vmin_shared = max(1.0, _data_min)
vmax_shared = min(1.5, _data_max)
if vmin_shared >= vmax_shared:
vmax_shared = vmin_shared + 0.5
t_extent = (float(t_arr[0]), float(t_arr[-1]), float(t_arr[0]), float(t_arr[-1]))
# Data plot — origin='lower' places t=0 at bottom-left; extent=[left,right,bottom,top]
im0 = axes[0].imshow(
c2_data,
origin="lower",
extent=t_extent,
aspect="auto",
cmap="jet",
vmin=vmin_shared,
vmax=vmax_shared,
)
axes[0].set_title("Experimental Data")
axes[0].set_xlabel("t₂")
axes[0].set_ylabel("t₁")
plt.colorbar(im0, ax=axes[0], label="c₂")
# Model plot — same color scale as experimental for direct comparison
if result.fitted_correlation is not None:
im1 = axes[1].imshow(
result.fitted_correlation,
origin="lower",
extent=t_extent,
aspect="auto",
cmap="jet",
vmin=vmin_shared,
vmax=vmax_shared,
)
axes[1].set_title("Fitted Model")
axes[1].set_xlabel("t₂")
axes[1].set_ylabel("t₁")
plt.colorbar(im1, ax=axes[1], label="c₂")
else:
axes[1].text(0.5, 0.5, "No fitted correlation", ha="center", va="center")
axes[1].set_title("Fitted Model")
# Residual plot — both arrays are N×N (orchestrator passes full t arrays;
# boundary exclusion is handled at the residual mask, not by truncation).
if result.fitted_correlation is not None:
fc = np.asarray(result.fitted_correlation)
residual_2d = c2_data - fc
_finite = residual_2d[np.isfinite(residual_2d)]
vmax = float(np.percentile(np.abs(_finite), 99)) if _finite.size > 0 else 1.0
if vmax == 0.0:
vmax = 1.0
im2 = axes[2].imshow(
residual_2d,
origin="lower",
extent=t_extent,
aspect="auto",
cmap="RdBu_r",
vmin=-vmax,
vmax=vmax,
)
axes[2].set_title("Residuals")
axes[2].set_xlabel("t₂")
axes[2].set_ylabel("t₁")
plt.colorbar(im2, ax=axes[2], label="Residual")
else:
axes[2].text(0.5, 0.5, "No residuals", ha="center", va="center")
axes[2].set_title("Residuals")
# Add fit statistics
chi2 = result.reduced_chi_squared
stats_text = f"χ²_red = {chi2:.3f}" if chi2 is not None else ""
fig.suptitle(f"NLSQ Fit Results {stats_text}", fontsize=12, fontweight="bold")
plt.tight_layout()
if save_path is not None:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return fig
[docs]
def plot_residual_map(
result: NLSQResult,
c2_data: np.ndarray,
t: np.ndarray | None = None,
save_path: Path | str | None = None,
) -> Figure:
"""Plot detailed residual analysis.
Args:
result: NLSQ result
c2_data: Original data
t: Time array
save_path: Optional save path
Returns:
Matplotlib figure
"""
fig, axes = plt.subplots(2, 2, figsize=(10, 10))
if result.fitted_correlation is None:
fig.suptitle("No fitted correlation available")
return fig
# Both arrays are N×N (orchestrator passes full t arrays; boundary
# exclusion is handled at the residual mask, not by truncation).
fc: np.ndarray = np.asarray(result.fitted_correlation)
residuals: np.ndarray = c2_data - fc
n_t: int = int(residuals.shape[0])
t_arr: np.ndarray = t if (t is not None and len(t) == n_t) else np.arange(n_t)
# 2D residual map
_finite_r = residuals[np.isfinite(residuals)]
vmax = float(np.percentile(np.abs(_finite_r), 99)) if _finite_r.size > 0 else 1.0
if vmax == 0.0:
vmax = 1.0
t_extent_r = (float(t_arr[0]), float(t_arr[-1]), float(t_arr[0]), float(t_arr[-1]))
im = axes[0, 0].imshow(
residuals,
origin="lower",
extent=t_extent_r,
aspect="auto",
cmap="RdBu_r",
vmin=-vmax,
vmax=vmax,
)
axes[0, 0].set_title("Residual Map")
axes[0, 0].set_xlabel("t₂")
axes[0, 0].set_ylabel("t₁")
plt.colorbar(im, ax=axes[0, 0])
# Histogram of residuals (skip if no finite values, e.g. diagonal-masked fallback)
_flat_finite = residuals.ravel()[np.isfinite(residuals.ravel())]
if _flat_finite.size > 0:
axes[0, 1].hist(_flat_finite, bins=50, density=True, alpha=0.7)
else:
axes[0, 1].text(
0.5,
0.5,
"No finite residuals",
ha="center",
va="center",
transform=axes[0, 1].transAxes,
)
axes[0, 1].set_xlabel("Residual Value")
axes[0, 1].set_ylabel("Density")
axes[0, 1].set_title("Residual Distribution")
# Add normal distribution overlay
mu, sigma = np.nanmean(residuals), np.nanstd(residuals)
if sigma > 0 and np.isfinite(sigma):
x = np.linspace(mu - 4 * sigma, mu + 4 * sigma, 100)
axes[0, 1].plot(
x,
np.exp(-((x - mu) ** 2) / (2 * sigma**2)) / (sigma * np.sqrt(2 * np.pi)),
"r-",
lw=2,
label=f"Normal(μ={mu:.2e}, σ={sigma:.2e})",
)
# Only call legend() when an artist with a label was actually
# added — otherwise matplotlib emits a UserWarning.
axes[0, 1].legend()
# Residual along diagonal
diag_residuals = np.diag(residuals)
axes[1, 0].plot(t_arr, diag_residuals, "b-", lw=1)
axes[1, 0].axhline(0, color="k", linestyle="--", alpha=0.5)
axes[1, 0].set_xlabel("Time")
axes[1, 0].set_ylabel("Residual")
axes[1, 0].set_title("Diagonal Residuals")
# Residual vs fitted value
axes[1, 1].scatter(
result.fitted_correlation.ravel(),
residuals.ravel(),
alpha=0.1,
s=1,
)
axes[1, 1].axhline(0, color="r", linestyle="--")
axes[1, 1].set_xlabel("Fitted Value")
axes[1, 1].set_ylabel("Residual")
axes[1, 1].set_title("Residuals vs Fitted")
plt.tight_layout()
if save_path is not None:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return fig
[docs]
def plot_parameter_uncertainties(
result: NLSQResult,
save_path: Path | str | None = None,
) -> Figure:
"""Plot parameter values with uncertainties.
Args:
result: NLSQ result
save_path: Optional save path
Returns:
Matplotlib figure
"""
fig, ax = plt.subplots(figsize=(10, 6))
n_params = len(result.parameter_names)
x = np.arange(n_params)
# Normalize parameters for visualization
params = result.parameters
if result.uncertainties is not None:
errors = np.maximum(result.uncertainties, 0.0)
else:
errors = np.zeros(n_params)
# Plot as errorbar
ax.errorbar(x, params, yerr=errors, fmt="o", capsize=5, markersize=8)
ax.set_xticks(x)
ax.set_xticklabels(result.parameter_names, rotation=45, ha="right")
ax.set_ylabel("Parameter Value")
ax.set_title("Fitted Parameters with Uncertainties")
ax.grid(True, alpha=0.3)
# Use log scale if values span many orders of magnitude
nonzero_params = params[params != 0]
if (
len(nonzero_params) > 0
and np.max(np.abs(params)) / np.min(np.abs(nonzero_params)) > 100
):
ax.set_yscale("symlog")
plt.tight_layout()
if save_path is not None:
fig.savefig(save_path, dpi=150, bbox_inches="tight")
plt.close(fig)
return fig
# ---------------------------------------------------------------------------
# New plots
# ---------------------------------------------------------------------------
[docs]
def plot_fit_surface(
c2_exp: np.ndarray,
c2_fit: np.ndarray,
times: np.ndarray,
phi_angle: float | None = None,
save_path: Path | str | None = None,
figsize: tuple[float, float] = (14, 5),
dpi: int = 150,
) -> Figure:
"""Side-by-side 2D heatmaps: experimental data, fit, and residuals.
Three panels are rendered with per-panel percentile-based colour
scaling so that structure is visible in both the experimental and
fitted matrices even when their absolute ranges differ.
Args:
c2_exp: Experimental correlation matrix, shape ``(n_t, n_t)``.
c2_fit: Fitted model correlation matrix, shape ``(n_t, n_t)``.
times: 1-D time axis of length ``n_t`` (seconds).
phi_angle: Optional azimuthal angle in degrees for the title.
save_path: Optional path to save the figure.
figsize: Figure size ``(width, height)``.
dpi: Resolution for saved figures.
Returns:
Matplotlib figure.
Raises:
ValueError: If ``c2_exp`` and ``c2_fit`` have different shapes.
"""
c2_exp = np.asarray(c2_exp)
c2_fit = np.asarray(c2_fit)
if c2_exp.shape != c2_fit.shape:
raise ValueError(
f"c2_exp shape {c2_exp.shape} does not match c2_fit shape {c2_fit.shape}"
)
residual = c2_exp - c2_fit
# extent=[left, right, bottom, top]; with origin='lower' and .T, maps (0,0) to bottom-left
t_extent = [times[0], times[-1], times[0], times[-1]]
fig, axes = plt.subplots(1, 3, figsize=figsize)
vmin_exp, vmax_exp = _resolve_color_limits(c2_exp)
vmin_fit, vmax_fit = _resolve_color_limits(c2_fit)
vmax_res = float(np.nanpercentile(np.abs(residual), 99))
if not np.isfinite(vmax_res) or vmax_res == 0:
vmax_res = 0.01
im0 = axes[0].imshow(
c2_exp.T,
origin="lower",
aspect="equal",
cmap="jet",
extent=t_extent,
vmin=vmin_exp,
vmax=vmax_exp,
)
phi_str = f" φ={phi_angle:.1f}°" if phi_angle is not None else ""
axes[0].set_title(f"Experimental C₂{phi_str}", fontsize=11)
axes[0].set_xlabel("t₁ (s)")
axes[0].set_ylabel("t₂ (s)")
plt.colorbar(im0, ax=axes[0], label="C₂", shrink=0.85)
im1 = axes[1].imshow(
c2_fit.T,
origin="lower",
aspect="equal",
cmap="jet",
extent=t_extent,
vmin=vmin_fit,
vmax=vmax_fit,
)
axes[1].set_title(f"Fitted C₂{phi_str}", fontsize=11)
axes[1].set_xlabel("t₁ (s)")
axes[1].set_ylabel("t₂ (s)")
plt.colorbar(im1, ax=axes[1], label="C₂", shrink=0.85)
im2 = axes[2].imshow(
residual.T,
origin="lower",
aspect="equal",
cmap="RdBu_r",
extent=t_extent,
vmin=-vmax_res,
vmax=vmax_res,
)
axes[2].set_title(f"Residuals (exp - fit){phi_str}", fontsize=11)
axes[2].set_xlabel("t₁ (s)")
axes[2].set_ylabel("t₂ (s)")
plt.colorbar(im2, ax=axes[2], label="ΔC₂", shrink=0.85)
rms = float(np.sqrt(np.nanmean(residual**2)))
fig.suptitle(
f"NLSQ Fit Surface [RMS residual = {rms:.4f}]",
fontsize=12,
fontweight="bold",
)
plt.tight_layout()
_save_fig(fig, save_path, dpi=dpi)
return fig
[docs]
def plot_per_angle_residuals(
residuals: np.ndarray,
phi_angles: np.ndarray,
times: np.ndarray,
save_path: Path | str | None = None,
figsize: tuple[float, float] | None = None,
dpi: int = 150,
) -> Figure:
"""Per-angle 2D residual heatmap.
Renders one heatmap panel per phi angle, showing the signed residual
``(experimental - fitted)`` on a symmetric diverging colour scale.
Args:
residuals: 3-D array of shape ``(n_angles, n_t, n_t)``.
phi_angles: 1-D array of azimuthal angles in degrees, length
``n_angles``.
times: 1-D time axis of length ``n_t`` (seconds).
save_path: Optional path to save the figure.
figsize: Figure size. Auto-computed when ``None``.
dpi: Resolution for saved figures.
Returns:
Matplotlib figure.
Raises:
ValueError: If ``residuals.shape[0]`` differs from
``len(phi_angles)``.
"""
residuals = np.asarray(residuals)
phi_angles = np.asarray(phi_angles)
if residuals.ndim != 3:
raise ValueError(
f"residuals must be 3-D (n_angles, n_t, n_t), got shape {residuals.shape}"
)
if residuals.shape[0] != len(phi_angles):
raise ValueError(
f"residuals first dimension {residuals.shape[0]} "
f"does not match phi_angles length {len(phi_angles)}"
)
n_angles = len(phi_angles)
ncols = min(4, n_angles)
nrows = (n_angles + ncols - 1) // ncols
if figsize is None:
figsize = (4 * ncols, 3.8 * nrows)
fig, axes = plt.subplots(nrows, ncols, figsize=figsize, squeeze=False)
axes_flat = axes.ravel()
# Shared colour scale across all angles
global_vmax = float(np.nanpercentile(np.abs(residuals), 99))
if not np.isfinite(global_vmax) or global_vmax == 0:
global_vmax = 0.01
t_extent = (times[0], times[-1], times[-1], times[0])
for angle_idx in range(n_angles):
ax: Axes = axes_flat[angle_idx]
im = ax.imshow(
residuals[angle_idx].T,
origin="lower",
aspect="equal",
cmap="RdBu_r",
extent=t_extent,
vmin=-global_vmax,
vmax=global_vmax,
)
ax.set_title(f"φ = {phi_angles[angle_idx]:.1f}°", fontsize=10)
ax.set_xlabel("t₁ (s)", fontsize=8)
ax.set_ylabel("t₂ (s)", fontsize=8)
plt.colorbar(im, ax=ax, label="ΔC₂", shrink=0.8)
for idx in range(n_angles, len(axes_flat)):
axes_flat[idx].set_visible(False)
fig.suptitle(
"Per-angle residuals (experimental − fitted)",
fontsize=13,
fontweight="bold",
)
plt.tight_layout()
_save_fig(fig, save_path, dpi=dpi)
return fig
[docs]
def plot_parameter_evolution(
history: list[dict[str, Any]],
param_names: list[str],
save_path: Path | str | None = None,
figsize: tuple[float, float] | None = None,
dpi: int = 150,
) -> Figure:
"""Parameter values across multistart optimisation attempts.
Displays the trajectory of each parameter's best-so-far value and
the individual starting-point results as scatter dots, making it
easy to see whether multistart has converged on a consistent
solution.
Args:
history: List of result records, one per multistart attempt
(ordered by attempt index). Each record must contain:
- ``"params"`` – 1-D array of parameter values, same length
as ``param_names``.
- ``"loss"`` – scalar loss value for this attempt.
- ``"converged"`` – bool flag (optional, default ``True``).
param_names: Parameter names corresponding to the ``"params"``
arrays.
save_path: Optional save path.
figsize: Figure size. Auto-computed when ``None``.
dpi: Resolution for saved figures.
Returns:
Matplotlib figure.
Raises:
ValueError: If ``history`` is empty or ``param_names`` is empty.
"""
if not history:
raise ValueError("history must be non-empty")
if not param_names:
raise ValueError("param_names must be non-empty")
n_params = len(param_names)
n_attempts = len(history)
attempt_idx = np.arange(n_attempts)
ncols = min(3, n_params)
nrows = (n_params + ncols - 1) // ncols
if figsize is None:
figsize = (5 * ncols, 3 * nrows)
fig, axes = plt.subplots(nrows, ncols, figsize=figsize, squeeze=False)
axes_flat = axes.ravel()
losses = np.array([float(r.get("loss", np.nan)) for r in history])
converged_flags = np.array([bool(r.get("converged", True)) for r in history])
for p_idx, pname in enumerate(param_names):
ax: Axes = axes_flat[p_idx]
param_vals = np.array(
[
float(np.asarray(r["params"])[p_idx]) if "params" in r else np.nan
for r in history
]
)
# Best-so-far trajectory
best_so_far = np.full(n_attempts, np.nan)
best_loss = np.inf
best_val = np.nan
for i in range(n_attempts):
if np.isfinite(losses[i]) and losses[i] < best_loss:
best_loss = losses[i]
best_val = param_vals[i]
best_so_far[i] = best_val
ax.plot(attempt_idx, best_so_far, "k-", lw=1.5, label="Best so far", zorder=3)
# Individual attempts
conv_mask = converged_flags
div_mask = ~converged_flags
if np.any(conv_mask):
ax.scatter(
attempt_idx[conv_mask],
param_vals[conv_mask],
s=20,
alpha=0.6,
color="steelblue",
zorder=4,
label="Converged",
)
if np.any(div_mask):
ax.scatter(
attempt_idx[div_mask],
param_vals[div_mask],
s=20,
alpha=0.6,
color="tomato",
marker="x",
zorder=4,
label="Not converged",
)
ax.set_xlabel("Attempt index")
ax.set_ylabel(pname)
ax.set_title(f"{pname}", fontweight="bold")
ax.grid(True, alpha=0.3)
if p_idx == 0:
ax.legend(fontsize=7)
for idx in range(n_params, len(axes_flat)):
axes_flat[idx].set_visible(False)
fig.suptitle(
f"Parameter evolution across {n_attempts} multistart attempts",
fontsize=12,
fontweight="bold",
)
plt.tight_layout()
_save_fig(fig, save_path, dpi=dpi)
return fig
[docs]
def plot_scaling_comparison(
solver_scaling: np.ndarray,
lstsq_scaling: np.ndarray,
phi_angles: np.ndarray,
param_labels: tuple[str, str] = ("contrast", "offset"),
save_path: Path | str | None = None,
figsize: tuple[float, float] = (10, 5),
dpi: int = 150,
) -> Figure:
"""Contrast and offset comparison: solver vs least-squares post-hoc.
Side-by-side bar charts compare the contrast and offset values
produced by the nonlinear solver with those derived from the
post-hoc linear least-squares scaling step. Significant
disagreement may indicate a poorly conditioned solver or
overfitting.
Args:
solver_scaling: Array of shape ``(n_angles, 2)`` containing
``[contrast, offset]`` from the nonlinear solver for each
phi angle.
lstsq_scaling: Array of shape ``(n_angles, 2)`` containing
``[contrast, offset]`` from the least-squares scaling step.
phi_angles: 1-D array of phi angles in degrees, length
``n_angles``.
param_labels: Labels for the two scaling parameters. Default
is ``("contrast", "offset")``.
save_path: Optional save path.
figsize: Figure size.
dpi: Resolution for saved figures.
Returns:
Matplotlib figure.
Raises:
ValueError: If array shapes are inconsistent.
"""
solver_scaling = np.asarray(solver_scaling)
lstsq_scaling = np.asarray(lstsq_scaling)
phi_angles = np.asarray(phi_angles)
n_angles = len(phi_angles)
for arr, name in (
(solver_scaling, "solver_scaling"),
(lstsq_scaling, "lstsq_scaling"),
):
if arr.shape != (n_angles, 2):
raise ValueError(f"{name} must have shape ({n_angles}, 2), got {arr.shape}")
fig, axes = plt.subplots(1, 2, figsize=figsize)
x = np.arange(n_angles)
width = 0.35
for col_idx, label in enumerate(param_labels):
ax: Axes = axes[col_idx]
solver_vals = solver_scaling[:, col_idx]
lstsq_vals = lstsq_scaling[:, col_idx]
ax.bar(
x - width / 2,
solver_vals,
width,
label="Solver",
alpha=0.75,
color="steelblue",
)
ax.bar(
x + width / 2,
lstsq_vals,
width,
label="Least-squares",
alpha=0.75,
color="orange",
)
ax.set_xticks(x)
ax.set_xticklabels(
[f"{p:.1f}°" for p in phi_angles], rotation=45, ha="right", fontsize=8
)
ax.set_xlabel("phi angle")
ax.set_ylabel(label)
ax.set_title(f"{label} — solver vs least-squares", fontweight="bold")
ax.legend(fontsize=8)
ax.grid(True, axis="y", alpha=0.3)
fig.suptitle(
"Scaling parameter comparison per angle", fontsize=12, fontweight="bold"
)
plt.tight_layout()
_save_fig(fig, save_path, dpi=dpi)
return fig
[docs]
def plot_chi_squared_landscape(
chi2_values: np.ndarray,
param_values: np.ndarray,
param_name: str,
best_value: float | None = None,
save_path: Path | str | None = None,
figsize: tuple[float, float] = (8, 5),
dpi: int = 150,
) -> Figure:
"""1-D chi-squared profile plot for a single parameter.
Displays how the reduced chi-squared changes as one parameter is
swept through a range of values (all other parameters fixed at
their best-fit values). The minimum of the profile locates the
best-fit value; the width at ``Δχ² = 1`` gives the 68% confidence
interval for that parameter.
Args:
chi2_values: 1-D array of reduced chi-squared values, length
``n_sweep``.
param_values: 1-D array of parameter values swept, length
``n_sweep``.
param_name: Human-readable parameter name for axis labels.
best_value: Optional known best-fit value; if provided, a
vertical dashed line is drawn at this position.
save_path: Optional save path.
figsize: Figure size.
dpi: Resolution for saved figures.
Returns:
Matplotlib figure.
Raises:
ValueError: If ``chi2_values`` and ``param_values`` have
different lengths.
"""
chi2_values = np.asarray(chi2_values, dtype=float)
param_values = np.asarray(param_values, dtype=float)
if chi2_values.shape != param_values.shape:
raise ValueError(
f"chi2_values length {chi2_values.shape} does not match "
f"param_values length {param_values.shape}"
)
fig, ax = plt.subplots(figsize=figsize)
ax.plot(param_values, chi2_values, "o-", color="steelblue", ms=4, lw=1.5)
# Mark the minimum
min_idx = int(np.nanargmin(chi2_values))
ax.scatter(
param_values[min_idx],
chi2_values[min_idx],
color="red",
zorder=5,
s=80,
label=f"Min χ² = {chi2_values[min_idx]:.3f} at {param_values[min_idx]:.4g}",
)
# Delta chi2 = 1 band
chi2_min = float(np.nanmin(chi2_values))
ax.axhline(
chi2_min + 1, color="green", linestyle="--", lw=1.5, label="χ²_min + 1 (68% CI)"
)
if best_value is not None:
ax.axvline(
best_value,
color="orange",
linestyle=":",
lw=2,
label=f"Best fit = {best_value:.4g}",
)
ax.set_xlabel(param_name)
ax.set_ylabel("Reduced χ²")
ax.set_title(f"χ² landscape — {param_name}", fontweight="bold")
ax.legend(fontsize=8)
ax.grid(True, alpha=0.3)
plt.tight_layout()
_save_fig(fig, save_path, dpi=dpi)
return fig
[docs]
def plot_multistart_summary(
results: list[dict[str, Any]],
save_path: Path | str | None = None,
figsize: tuple[float, float] = (12, 5),
dpi: int = 150,
) -> Figure:
"""Loss distribution and convergence summary across multistart attempts.
Three panels:
1. **Loss histogram** — distribution of final loss values across all
starts, with the best loss marked.
2. **Loss rank plot** — loss values sorted in ascending order to
reveal the shape of the basin landscape.
3. **Convergence fraction** — pie chart of converged vs non-converged
starts.
Args:
results: List of result dictionaries, one per multistart
attempt. Each record must contain:
- ``"loss"`` – scalar final loss.
- ``"converged"`` – bool convergence flag.
save_path: Optional save path.
figsize: Figure size.
dpi: Resolution for saved figures.
Returns:
Matplotlib figure.
Raises:
ValueError: If ``results`` is empty.
"""
if not results:
raise ValueError("results must be non-empty")
losses = np.array([float(r.get("loss", np.nan)) for r in results])
converged = np.array([bool(r.get("converged", True)) for r in results])
finite_losses = losses[np.isfinite(losses)]
n_total = len(results)
n_converged = int(np.sum(converged))
n_not_converged = n_total - n_converged
fig, axes = plt.subplots(1, 3, figsize=figsize)
# Panel 1: loss histogram
ax0: Axes = axes[0]
if len(finite_losses) > 0:
bins = min(30, max(5, len(finite_losses) // 2))
ax0.hist(
finite_losses, bins=bins, color="steelblue", alpha=0.75, edgecolor="white"
)
best_loss = float(np.nanmin(finite_losses))
ax0.axvline(
best_loss,
color="red",
lw=2,
linestyle="--",
label=f"Best = {best_loss:.4g}",
)
ax0.legend(fontsize=8)
ax0.set_xlabel("Final loss")
ax0.set_ylabel("Count")
ax0.set_title("Loss distribution", fontweight="bold")
ax0.grid(True, alpha=0.3)
# Panel 2: sorted loss rank plot
ax1: Axes = axes[1]
if len(finite_losses) > 0:
sorted_losses = np.sort(finite_losses)
ax1.plot(
np.arange(1, len(sorted_losses) + 1),
sorted_losses,
"o-",
ms=4,
color="steelblue",
)
ax1.set_xlabel("Rank (1 = best)")
ax1.set_ylabel("Loss")
ax1.set_title("Sorted loss (rank plot)", fontweight="bold")
ax1.grid(True, alpha=0.3)
else:
ax1.text(
0.5,
0.5,
"No finite losses",
ha="center",
va="center",
transform=ax1.transAxes,
)
# Panel 3: convergence pie
ax2: Axes = axes[2]
if n_total > 0:
pie_vals = [n_converged, n_not_converged]
pie_labels = [
f"Converged\n({n_converged})",
f"Not converged\n({n_not_converged})",
]
pie_colors = ["green", "tomato"]
non_zero = [
(v, lab, c)
for v, lab, c in zip(pie_vals, pie_labels, pie_colors, strict=True)
if v > 0
]
if non_zero:
nz_vals, nz_labels, nz_colors = zip(*non_zero, strict=True)
ax2.pie(
nz_vals,
labels=nz_labels,
colors=nz_colors,
autopct="%1.0f%%",
startangle=90,
)
ax2.set_title(
f"Convergence ({n_converged}/{n_total})",
fontweight="bold",
)
fig.suptitle(
f"Multistart summary [{n_total} attempts]",
fontsize=12,
fontweight="bold",
)
plt.tight_layout()
_save_fig(fig, save_path, dpi=dpi)
return fig
# ---------------------------------------------------------------------------
# Homodyne-parity functions: simulated data, 3-panel heatmaps, fitted sims
# ---------------------------------------------------------------------------
[docs]
def plot_simulated_data(
config: dict[str, Any],
contrast: float,
offset: float,
phi_angles_str: str | None,
plots_dir: Path,
data: dict[str, Any] | None = None,
) -> None:
"""Generate heatmap plots of theoretical/simulated C2 data.
Constructs the heterodyne model from *config*, evaluates the forward
model for each phi angle, and saves per-angle heatmaps plus a
combined diagonal plot.
Args:
config: Configuration dictionary with model parameters.
contrast: Optical contrast for scaling.
offset: Baseline offset.
phi_angles_str: Comma-separated phi angles (e.g. ``"0,45,90"``).
plots_dir: Output directory for plots.
data: Optional experimental data dict for extracting phi angles.
"""
plots_dir.mkdir(parents=True, exist_ok=True)
from heterodyne.core.heterodyne_model import HeterodyneModel
try:
model = HeterodyneModel.from_config(config)
except Exception:
logger.exception("Cannot create model from config — skipping simulated plots")
return
# Determine phi angles
if phi_angles_str is not None:
phi_list = [float(x.strip()) for x in phi_angles_str.split(",")]
elif data is not None:
phi_raw = data.get("phi_angles_list", data.get("phi_angles"))
if phi_raw is not None:
phi_list = list(np.asarray(phi_raw, dtype=float))
else:
phi_list = list(np.linspace(0, 180, 8))
else:
phi_list = list(np.linspace(0, 180, 8))
# Evaluate the model on its configured fitting grid. Experimental elapsed
# axes may start at t=0 and are display-only; feeding them into the model can
# create artificial values for time-dependent parameters.
if hasattr(model, "t"):
t_model = np.asarray(model.t, dtype=float)
else:
t_model = np.arange(1, 101, dtype=float)
# Use display time arrays for extent/labels only.
t_extent = t_model
if data is not None:
t1_orig = data.get("t1_original")
if t1_orig is not None:
t_extent = np.asarray(t1_orig, dtype=float)
else:
t1_raw = data.get("t1")
if t1_raw is not None:
t_extent = np.asarray(t1_raw, dtype=float)
n_t = len(t_model)
# extent: [left=t1_min, right=t1_max, bottom=t2_min, top=t2_max]
# Used with .T so x-axis=t1, y-axis=t2 (parity with homodyne)
extent = (
float(t_extent[0]),
float(t_extent[-1]),
float(t_extent[0]),
float(t_extent[-1]),
)
# Collect simulated C2 for each angle
c2_all: list[np.ndarray] = []
for phi_deg in phi_list:
try:
c2_scaled = np.asarray(
model.compute_correlation(
phi_angle=phi_deg, contrast=contrast, offset=offset
)
)
except Exception:
logger.warning("Forward model failed for phi=%.1f; using zeros", phi_deg)
c2_scaled = np.ones((n_t, n_t)) * offset
c2_all.append(c2_scaled)
# Global color scale (parity with homodyne: clamp to [1.0, 1.6])
all_vals = np.concatenate([c.ravel() for c in c2_all])
c2_min = float(np.nanmin(all_vals)) if all_vals.size > 0 else 1.0
c2_max = float(np.nanmax(all_vals)) if all_vals.size > 0 else 1.6
if not np.isfinite(c2_min) or not np.isfinite(c2_max):
c2_min, c2_max = 1.0, 1.6
vmin = max(1.0, c2_min)
vmax = min(1.6, c2_max)
logger.debug(
"Simulated C2 range [%.4f, %.4f] -> color scale [%.4f, %.4f]",
c2_min,
c2_max,
vmin,
vmax,
)
# Per-angle heatmaps
for _i, (phi_deg, c2_mat) in enumerate(zip(phi_list, c2_all, strict=True)):
fig, ax = plt.subplots(figsize=(8, 7))
im = ax.imshow(
c2_mat.T,
extent=extent,
aspect="equal",
cmap="jet",
vmin=vmin,
vmax=vmax,
origin="lower",
)
ax.set_xlabel("t₁ (s)", fontsize=11)
ax.set_ylabel("t₂ (s)", fontsize=11)
ax.set_title(
f"Simulated C₂(t₁, t₂) at φ={phi_deg:.1f}°",
fontsize=13,
fontweight="bold",
)
cbar = plt.colorbar(im, ax=ax, label="C₂", shrink=0.9)
cbar.ax.tick_params(labelsize=9)
mean_val = np.nanmean(c2_mat)
max_val = np.nanmax(c2_mat)
min_val = np.nanmin(c2_mat)
stats_text = f"Mean: {mean_val:.4f}\nRange: [{min_val:.4f}, {max_val:.4f}]"
ax.text(
0.02,
0.98,
stats_text,
transform=ax.transAxes,
fontsize=9,
verticalalignment="top",
bbox={"boxstyle": "round", "facecolor": "white", "alpha": 0.8},
)
mode_text = f"Contrast: {contrast:.3f}\nOffset: {offset:.3f}"
ax.text(
0.02,
0.02,
mode_text,
transform=ax.transAxes,
fontsize=8,
verticalalignment="bottom",
bbox={"boxstyle": "round", "facecolor": "lightblue", "alpha": 0.7},
)
plt.tight_layout()
fig.savefig(
plots_dir / f"simulated_data_phi_{phi_deg:.1f}.png",
dpi=150,
bbox_inches="tight",
)
plt.close(fig)
# Diagonal comparison plot (t1=t2 diagonal)
fig, ax = plt.subplots(figsize=(10, 6))
for idx, (phi_deg, c2_mat) in enumerate(zip(phi_list, c2_all, strict=True)):
if idx >= 10:
break
diag = np.diag(c2_mat)
t_diag = (
t_extent[: len(diag)]
if len(t_extent) >= len(diag)
else t_model[: len(diag)]
)
ax.plot(t_diag, diag, label=f"φ={phi_deg:.1f}°", alpha=0.7, linewidth=2)
ax.set_xlabel("Time t (s)", fontsize=12)
ax.set_ylabel("C₂(t, t)", fontsize=12)
ax.set_title(
f"Simulated C₂ Along Diagonal (t₁=t₂)\n(contrast={contrast:.3f}, offset={offset:.3f})",
fontsize=13,
fontweight="bold",
)
ax.legend(loc="best", fontsize=9, ncol=2)
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(plots_dir / "simulated_data_diagonal.png", dpi=150, bbox_inches="tight")
plt.close(fig)
logger.info("Generated diagonal plot: simulated_data_diagonal.png")
logger.info("Saved %d simulated C2 heatmaps + diagonal", len(phi_list))
[docs]
def generate_nlsq_plots(
phi_angles: np.ndarray,
c2_exp: np.ndarray,
c2_theoretical_scaled: np.ndarray,
residuals: np.ndarray,
t1: np.ndarray,
t2: np.ndarray,
output_dir: Path,
config: Any = None,
use_datashader: bool = True,
parallel: bool = True,
*,
c2_solver_scaled: np.ndarray | None = None,
) -> None:
"""Generate 3-panel heatmaps (experimental | fit | residuals) per angle.
For each phi angle, creates a ``1x3`` subplot figure showing
experimental data, theoretical fit, and residuals side by side.
Args:
phi_angles: Scattering angles in degrees, shape ``(n_angles,)``.
c2_exp: Experimental correlation, shape ``(n_angles, n_t1, n_t2)``.
c2_theoretical_scaled: Scaled theoretical fits, same shape.
residuals: Residuals ``exp - fit``, same shape.
t1: First time array in seconds.
t2: Second time array in seconds.
output_dir: Directory for output PNG files.
config: Optional config for color scaling options.
use_datashader: When True and Datashader backend is available,
emit a rasterized ``nlsq_overview_datashader.png`` covering
all angles (fast preview for large matrices). The per-angle
matplotlib panels are still produced regardless.
parallel: Reserved for future parallel generation.
c2_solver_scaled: Optional solver-computed C2 to display instead
of *c2_theoretical_scaled*.
"""
output_dir.mkdir(parents=True, exist_ok=True)
phi_angles = np.asarray(phi_angles)
c2_fit = c2_solver_scaled if c2_solver_scaled is not None else c2_theoretical_scaled
# Datashader overview for large matrices (single rasterized grid across all angles).
if (
use_datashader
and DATASHADER_AVAILABLE
and c2_exp.ndim == 3
and c2_exp.size >= 512 * 512
):
try:
from heterodyne.viz.datashader_backend import render_multi_angle_grid
img: Any = render_multi_angle_grid(c2_exp, phi_angles, t1)
overview_path = output_dir / "nlsq_overview_datashader.png"
if hasattr(img, "save"):
img.save(overview_path) # PIL Image path
else:
plt.imsave(overview_path, np.asarray(img))
logger.info("Datashader overview saved to %s", overview_path)
except (ValueError, RuntimeError, OSError, MemoryError) as exc:
logger.warning("Datashader overview failed (%s); skipping", exc)
# extent with .T: [left=t1_min, right=t1_max, bottom=t2_min, top=t2_max]
extent = [float(t1[0]), float(t1[-1]), float(t2[0]), float(t2[-1])]
for i, phi_deg in enumerate(phi_angles):
if i >= c2_exp.shape[0]:
break
exp_i = c2_exp[i]
fit_i = c2_fit[i] if c2_fit.ndim == 3 else c2_fit
res_i = residuals[i] if residuals.ndim == 3 else residuals
fig, axes = plt.subplots(1, 3, figsize=(15, 5))
# Shared color scale for experimental and fitted panels (hexp convention:
# clamp to [1.0, 1.5] using the union of both data ranges for direct comparison)
combined = np.concatenate([exp_i.ravel(), fit_i.ravel()])
_data_min = (
float(np.nanmin(combined[np.isfinite(combined)]))
if np.isfinite(combined).any()
else 1.0
)
_data_max = (
float(np.nanmax(combined[np.isfinite(combined)]))
if np.isfinite(combined).any()
else 1.5
)
vmin_shared = max(1.0, _data_min)
vmax_shared = min(1.5, _data_max)
if vmin_shared >= vmax_shared:
vmax_shared = vmin_shared + 0.5
# Panel 1: Experimental
im0 = axes[0].imshow(
exp_i.T,
extent=extent,
aspect="equal",
cmap="jet",
vmin=vmin_shared,
vmax=vmax_shared,
origin="lower",
)
axes[0].set_title(f"Experimental C₂ (φ={float(phi_deg):.1f}°)", fontsize=12)
axes[0].set_xlabel("t₁ (s)", fontsize=10)
axes[0].set_ylabel("t₂ (s)", fontsize=10)
cbar0 = plt.colorbar(im0, ax=axes[0], label="C₂(t₁,t₂)")
cbar0.ax.tick_params(labelsize=8)
# Panel 2: Fit — same scale as experimental for direct comparison
im1 = axes[1].imshow(
fit_i.T,
extent=extent,
aspect="equal",
cmap="jet",
vmin=vmin_shared,
vmax=vmax_shared,
origin="lower",
)
axes[1].set_title(f"Fitted C₂ (φ={float(phi_deg):.1f}°)", fontsize=12)
axes[1].set_xlabel("t₁ (s)", fontsize=10)
axes[1].set_ylabel("t₂ (s)", fontsize=10)
cbar1 = plt.colorbar(im1, ax=axes[1], label="C₂(t₁,t₂)")
cbar1.ax.tick_params(labelsize=8)
# Panel 3: Residuals — symmetric RdBu_r scale (matching hexp/plot_fit_surface)
vmax_res = float(np.nanpercentile(np.abs(res_i), 99))
if not np.isfinite(vmax_res) or vmax_res == 0:
vmax_res = 0.01
im2 = axes[2].imshow(
res_i.T,
extent=extent,
aspect="equal",
cmap="RdBu_r",
vmin=-vmax_res,
vmax=vmax_res,
origin="lower",
)
axes[2].set_title(f"Residuals (φ={float(phi_deg):.1f}°)", fontsize=12)
axes[2].set_xlabel("t₁ (s)", fontsize=10)
axes[2].set_ylabel("t₂ (s)", fontsize=10)
cbar2 = plt.colorbar(im2, ax=axes[2], label="ΔC₂")
cbar2.ax.tick_params(labelsize=8)
plt.tight_layout()
fig.savefig(
output_dir / f"c2_heatmaps_phi_{float(phi_deg):.1f}deg.png",
dpi=300,
bbox_inches="tight",
)
plt.close(fig)
logger.info("Saved %d 3-panel heatmap(s) to %s", len(phi_angles), output_dir)
[docs]
def generate_and_plot_fitted_simulations(
result: Any,
data: dict[str, Any],
config: dict[str, Any],
output_dir: Path,
angle_filter_func: Any | None = None,
) -> None:
"""Generate forward simulations from fitted parameters and plot.
Uses the fitted parameters from *result* to evaluate the heterodyne
model, saves the simulated C2 to NPZ, and generates per-angle
heatmap plots.
Args:
result: Optimization result with fitted parameters.
data: Experimental data dictionary.
config: Configuration dictionary.
output_dir: Root output directory (``simulated_data/`` subdirectory
is created automatically).
angle_filter_func: Optional angle filter applied to data before
simulation.
"""
sim_dir = output_dir / "simulated_data"
sim_dir.mkdir(parents=True, exist_ok=True)
from heterodyne.core.heterodyne_model import HeterodyneModel
try:
model = HeterodyneModel.from_config(config)
except Exception:
logger.exception("Cannot create model — skipping fitted simulation plots")
return
# Apply optional angle filtering
if angle_filter_func is not None:
try:
phi_raw = data.get("phi_angles_list", data.get("phi_angles"))
c2_raw = data.get("c2_exp", data.get("c2"))
if phi_raw is not None and c2_raw is not None:
_, filtered_phi, filtered_c2 = angle_filter_func(
np.asarray(phi_raw), np.asarray(c2_raw), data
)
data = {**data, "phi_angles_list": filtered_phi, "c2_exp": filtered_c2}
except Exception:
logger.warning("Angle filtering failed; using unfiltered data")
phi_angles = data.get("phi_angles_list", data.get("phi_angles"))
if phi_angles is None:
phi_angles = [0.0]
phi_angles = np.asarray(phi_angles, dtype=float)
t = np.asarray(model.t) if hasattr(model, "t") else None
if t is None:
t1_raw = data.get("t1")
t = np.asarray(t1_raw) if t1_raw is not None else np.arange(1, 101, dtype=float)
n_t = len(t)
# extent with .T: [left=t1_min, right=t1_max, bottom=t2_min, top=t2_max]
extent = (float(t[0]), float(t[-1]), float(t[0]), float(t[-1]))
# Extract fitted parameters from NLSQResult (no dedicated contrast/offset attrs)
params = getattr(result, "parameters", None)
if params is None:
params = getattr(result, "mean_params", None)
if params is None:
logger.warning("No fitted parameters found in result — skipping")
return
params_d = (
dict(zip(result.parameter_names, result.parameters, strict=True))
if hasattr(result, "parameter_names") and result.parameter_names
else {}
)
# Contrast/offset live in result.metadata (set by the multi-angle NLSQ solver),
# not in parameter_names (which only contains the 14 physics params).
# Fall back to params_d in case a CMC result embeds them there, then to defaults.
metadata: dict[str, Any] = getattr(result, "metadata", None) or {}
_c_raw = params_d.get("contrast")
contrast = float(_c_raw if _c_raw is not None else metadata.get("contrast", 0.3))
_o_raw = params_d.get("offset")
offset_val = float(_o_raw if _o_raw is not None else metadata.get("offset", 1.0))
# Physics params: all fitted params except scaling
physics_names = [
n
for n in getattr(result, "parameter_names", [])
if n not in ("contrast", "offset")
]
physics_params = (
np.array([float(params_d[n]) for n in physics_names])
if physics_names
else np.asarray(params)
)
logger.info(
"Using fitted parameters: contrast=%.4f, offset=%.4f", contrast, offset_val
)
# Warn if the velocity creates aliased strip oscillations (phase > π per time step).
# When q·cos(φ)·v·dt > π the heterodyne strips are Nyquist-undersampled and invisible.
_dt_val = float(t[1] - t[0]) if len(t) > 1 else 1.0
_q_val = float(getattr(model, "q", 0.0))
_v0 = float(params_d.get("v0") or metadata.get("v0") or 0.0)
_beta = float(params_d.get("beta") or metadata.get("beta") or 0.0)
if _q_val > 0 and _v0 != 0 and len(phi_angles) > 0:
_t0 = float(t[0])
_v_t0 = abs(_v0) * (_t0**_beta) if _beta < 0 and _t0 > 0 else abs(_v0)
_max_phi_cos = float(np.max(np.abs(np.cos(np.deg2rad(phi_angles)))))
_phase_per_dt = _q_val * _max_phi_cos * _v_t0 * _dt_val
if _phase_per_dt > np.pi:
logger.warning(
"Heterodyne strip pattern is Nyquist-aliased: max phase/dt ≈ %.2f rad "
"(> π). With v(t₀)≈%.0f Å/s, q=%.4f Å⁻¹, dt=%.4f s, strips are "
"sub-pixel at the first time point and may be invisible in the plot. "
"Consider using a finer time grid (smaller dt) to resolve the strips.",
_phase_per_dt,
_v_t0,
_q_val,
_dt_val,
)
# Phase 1: compute all fitted C2 matrices
c2_fitted_all: list[np.ndarray] = []
for phi_deg in phi_angles:
try:
c2_scaled = np.asarray(
model.compute_correlation(
phi_angle=float(phi_deg),
params=physics_params,
contrast=contrast,
offset=offset_val,
)
)
except Exception:
logger.warning("Forward model failed for phi=%.1f", phi_deg)
c2_scaled = np.ones((n_t, n_t)) * offset_val
c2_fitted_all.append(c2_scaled)
# Global color scale: use percentile-based limits over the actual data range so
# that small heterodyne strip oscillations get the full dynamic range of the
# colormap. The hard [1.0, 1.5] hexp convention is appropriate for experimental
# data display but compresses the strip contrast in forward-model diagnostics.
_all_vals = np.concatenate([c.ravel() for c in c2_fitted_all])
vmin, vmax = _resolve_color_limits(_all_vals)
logger.debug(
"Fitted C2 percentile color scale [%.4f, %.4f]",
vmin,
vmax,
)
# Phase 2: plot with the global color scale
for phi_deg, c2_scaled in zip(phi_angles, c2_fitted_all, strict=True):
fig, ax = plt.subplots(figsize=(8, 7))
im = ax.imshow(
c2_scaled.T,
extent=extent,
aspect="equal",
cmap="jet",
vmin=vmin,
vmax=vmax,
origin="lower",
)
ax.set_xlabel("t₁ (s)", fontsize=11)
ax.set_ylabel("t₂ (s)", fontsize=11)
ax.set_title(
f"Fitted C₂(t₁, t₂) at φ={float(phi_deg):.1f}°",
fontsize=13,
fontweight="bold",
)
cbar = plt.colorbar(im, ax=ax, label="C₂", shrink=0.9)
cbar.ax.tick_params(labelsize=9)
mean_val = float(np.nanmean(c2_scaled))
max_val = float(np.nanmax(c2_scaled))
min_val = float(np.nanmin(c2_scaled))
stats_text = f"Mean: {mean_val:.4f}\nRange: [{min_val:.4f}, {max_val:.4f}]"
ax.text(
0.02,
0.98,
stats_text,
transform=ax.transAxes,
fontsize=9,
verticalalignment="top",
bbox={"boxstyle": "round", "facecolor": "white", "alpha": 0.8},
)
fit_text = (
f"Fitted Parameters\nContrast: {contrast:.3f}\nOffset: {offset_val:.3f}"
)
ax.text(
0.02,
0.02,
fit_text,
transform=ax.transAxes,
fontsize=8,
verticalalignment="bottom",
bbox={"boxstyle": "round", "facecolor": "lightgreen", "alpha": 0.7},
)
plt.tight_layout()
fig.savefig(
sim_dir / f"simulated_c2_fitted_phi_{float(phi_deg):.1f}deg.png",
dpi=150,
bbox_inches="tight",
)
plt.close(fig)
# Save NPZ
try:
np.savez(
sim_dir / "fitted_simulations.npz",
c2_fitted=np.array(c2_fitted_all),
phi_angles=phi_angles,
t=t,
)
logger.info("Saved %d fitted simulation(s) to %s", len(phi_angles), sim_dir)
except Exception:
logger.exception("Failed to save fitted simulation NPZ")