"""CMA-ES optimization wrapper for heterodyne parameter fitting.
Provides a derivative-free global optimizer as an alternative to
gradient-based NLSQ methods. Useful when the cost landscape is
multi-modal or the Jacobian is unreliable.
The ``cma`` package is an optional dependency. If it is not installed,
importing this module succeeds but :meth:`CMAESWrapper.fit` raises
:class:`ImportError` at call time.
Parity note
-----------
This module matches the homodyne ``cmaes_wrapper`` public API:
- :class:`CMAESWrapperConfig` (v2.16.0 normalization + refinement settings)
- :class:`CMAESResult` (parameters, covariance, chi_squared, …)
- :class:`CMAESWrapper` (compute_scale_ratio, is_available, should_use_cmaes, fit)
Heterodyne-specific additions (not in homodyne):
- :class:`CMAESConfig` — low-level ``cma`` library settings
- :func:`fit_with_cmaes` — objective-based convenience (returns NLSQResult)
- :func:`build_anti_degeneracy_objective`
- :func:`normalize_to_unit_cube` / :func:`denormalize_from_unit_cube`
- :func:`compute_adaptive_cmaes_params`
- :func:`adjust_covariance_for_bounds`
D3 divergence: heterodyne uses the ``cma`` library backend instead of
NLSQ's evosax backend (CPU-only; NLSQ evosax not required).
"""
from __future__ import annotations
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import numpy as np
from heterodyne.optimization.nlsq.result_builder import build_result_from_arrays
from heterodyne.utils.logging import get_logger
if TYPE_CHECKING:
from heterodyne.optimization.nlsq.config import NLSQConfig
from heterodyne.optimization.nlsq.results import NLSQResult
logger = get_logger(__name__)
try:
import cma as _cma # noqa: F401 # type: ignore[import-untyped]
_HAS_CMA = True
except ImportError:
_HAS_CMA = False
#: Public availability flag consumed by ``core.py``.
CMAES_AVAILABLE: bool = _HAS_CMA
# Skip L-M refinement when CMA-ES chi2 exceeds this multiple of the
# warm-start chi2 — the comparison in core.py will discard it anyway.
_REFINEMENT_SKIP_CHI2_RATIO = 10.0
# =============================================================================
# Bounds summary helper (homodyne parity)
# =============================================================================
def _format_bounds_summary(bounds: tuple[np.ndarray, np.ndarray]) -> str:
"""Format bounds summary for logging.
Parameters
----------
bounds : tuple[np.ndarray, np.ndarray]
Lower and upper bounds as (lower, upper) arrays.
Returns
-------
str
Human-readable bounds summary.
"""
lower, upper = bounds
ranges = upper - lower
n_params = len(lower)
valid_ranges = ranges[ranges > 0]
if len(valid_ranges) == 0:
return f"{n_params} params (no valid ranges)"
min_range = np.min(valid_ranges)
max_range = np.max(valid_ranges)
return f"{n_params} params, range=[{min_range:.2e}, {max_range:.2e}]"
# =============================================================================
# Parameter Normalization Utilities (v2.16.0 — homodyne parity)
# =============================================================================
# These functions implement bounds-based normalization to improve CMA-ES
# convergence for multi-scale problems (e.g., D₀ ~ 1e4 vs alpha near 1).
def _compute_normalization_factors(
bounds: tuple[np.ndarray, np.ndarray],
epsilon: float = 1e-12,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Compute normalization factors for bounds-based normalization.
Normalization: x_norm = (x - lower) / (upper - lower)
Parameters
----------
bounds : tuple[np.ndarray, np.ndarray]
Lower and upper bounds as (lower, upper) arrays.
epsilon : float
Small value to prevent division by zero for fixed parameters.
Returns
-------
tuple[np.ndarray, np.ndarray, np.ndarray]
(scale, offset, range) where:
- scale = 1 / (upper - lower + epsilon) for safe division
- offset = lower (subtracted before scaling)
- range = upper - lower (for covariance adjustment)
"""
lower, upper = bounds
param_range = upper - lower
safe_range = np.where(param_range > epsilon, param_range, 1.0)
scale = 1.0 / safe_range
return scale, lower, param_range
def _normalize_params(
params: np.ndarray,
scale: np.ndarray,
offset: np.ndarray,
) -> np.ndarray:
"""Normalize parameters from physical space to [0, 1] space.
Parameters
----------
params : np.ndarray
Parameters in physical space.
scale : np.ndarray
Scale factors (1 / range).
offset : np.ndarray
Offset values (lower bounds).
Returns
-------
np.ndarray
Parameters normalized to [0, 1] space.
"""
return (params - offset) * scale
def _denormalize_params(
params_norm: np.ndarray,
scale: np.ndarray,
offset: np.ndarray,
) -> np.ndarray:
"""Denormalize parameters from [0, 1] space back to physical space.
Parameters
----------
params_norm : np.ndarray
Parameters in normalized [0, 1] space.
scale : np.ndarray
Scale factors (1 / range).
offset : np.ndarray
Offset values (lower bounds).
Returns
-------
np.ndarray
Parameters in physical space.
"""
return params_norm / scale + offset
def _normalize_bounds(
bounds: tuple[np.ndarray, np.ndarray],
) -> tuple[np.ndarray, np.ndarray]:
"""Normalize bounds to [0, 1] space.
Parameters
----------
bounds : tuple[np.ndarray, np.ndarray]
Physical bounds as (lower, upper) arrays.
Returns
-------
tuple[np.ndarray, np.ndarray]
Normalized bounds (zeros, ones).
"""
n_params = len(bounds[0])
return (np.zeros(n_params), np.ones(n_params))
def _adjust_covariance_for_normalization(
covariance: np.ndarray | None,
param_range: np.ndarray,
) -> np.ndarray | None:
"""Transform covariance from normalized to physical space.
Uses Jacobian scaling: Cov_phys = J @ Cov_norm @ J.T
where J is diagonal with elements = param_range.
Parameters
----------
covariance : np.ndarray | None
Covariance matrix in normalized space.
param_range : np.ndarray
Parameter ranges (upper - lower bounds).
Returns
-------
np.ndarray | None
Covariance matrix in physical space, or None if input is None.
"""
if covariance is None:
return None
jacobian_outer = np.outer(param_range, param_range)
return covariance * jacobian_outer
# =============================================================================
# CMAESConfig (heterodyne-specific: cma library settings)
# =============================================================================
[docs]
@dataclass
class CMAESConfig:
"""Configuration for CMA-ES optimization via the ``cma`` library.
Attributes:
sigma0: Initial step-size (standard deviation) for the search
distribution. A good default is ~1/4 of the expected parameter
range.
popsize: Population size. ``None`` lets cma choose automatically.
maxiter: Maximum number of CMA-ES generations.
tolx: Termination tolerance on parameter changes.
tolfun: Termination tolerance on cost function changes.
seed: Random seed for reproducibility.
restart_strategy: Restart strategy for CMA-ES. ``"bipop"`` alternates
large/small population restarts for robust global search. ``"none"``
runs a single CMA-ES run with no restarts. Callers should override
to ``"none"`` when a warmstart sigma is already small (warmstart mode),
because BIPOP large-population restarts are incoherent with a tight
initial distribution.
max_restarts: Maximum number of BIPOP restarts. Ignored when
``restart_strategy="none"``.
"""
sigma0: float = 0.5
popsize: int | None = None
maxiter: int = 1000
tolx: float = 1e-11
tolfun: float = 1e-11
seed: int = 42
diagonal_filtering: str = "none"
restart_strategy: str = "bipop"
max_restarts: int = 9
[docs]
def __post_init__(self) -> None:
"""Validate configuration values."""
if self.sigma0 <= 0:
raise ValueError("sigma0 must be positive")
if self.maxiter < 1:
raise ValueError("maxiter must be >= 1")
if self.popsize is not None and self.popsize < 2:
raise ValueError("popsize must be >= 2 when specified")
if self.diagonal_filtering not in ("remove", "none"):
raise ValueError(
f"diagonal_filtering must be 'remove' or 'none', "
f"got {self.diagonal_filtering!r}"
)
if self.restart_strategy not in ("bipop", "none"):
raise ValueError(
f"restart_strategy must be 'bipop' or 'none', "
f"got {self.restart_strategy!r}"
)
if self.max_restarts < 0:
raise ValueError("max_restarts must be >= 0")
# =============================================================================
# CMAESWrapperConfig (homodyne parity)
# =============================================================================
[docs]
@dataclass
class CMAESWrapperConfig:
"""Configuration for :class:`CMAESWrapper`.
Mirrors homodyne's ``CMAESWrapperConfig`` for structural parity.
Attributes
----------
preset : str
CMA-ES preset: "cmaes-fast" (50 gen), "cmaes" (100 gen),
"cmaes-global" (200 gen).
max_generations : int | None
Maximum CMA-ES generations. None = use preset default.
popsize : int | None
Population size (None = auto from 4+3*ln(n)).
sigma : float
Initial step size as fraction of search range (0, 1].
sigma_warmstart : float
Reduced sigma for warm-start (local refinement).
tol_fun : float
Function value tolerance for convergence.
tol_x : float
Parameter tolerance for convergence.
restart_strategy : str
Restart strategy: "none" or "bipop".
max_restarts : int
Maximum number of BIPOP restarts.
refine_with_nlsq : bool
Whether to refine CMA-ES solution with NLSQ TRF.
normalize : bool
Enable bounds-based parameter normalization.
normalization_epsilon : float
Prevent division by zero in normalization.
"""
# CMA-ES global search settings
preset: str = "cmaes"
max_generations: int | None = None
popsize: int | None = None
sigma: float = 0.5
sigma_warmstart: float = 0.05
tol_fun: float = 1e-8
tol_x: float = 1e-8
restart_strategy: str = "bipop"
max_restarts: int = 9
# NLSQ TRF refinement settings (post-CMA-ES)
refine_with_nlsq: bool = True
refinement_ftol: float = 1e-10
refinement_xtol: float = 1e-10
refinement_gtol: float = 1e-10
refinement_max_nfev: int = 500
refinement_loss: str = "linear"
# Parameter normalization (v2.16.0)
normalize: bool = True
normalization_epsilon: float = 1e-12
[docs]
@classmethod
def from_nlsq_config(cls, config: NLSQConfig) -> CMAESWrapperConfig:
"""Create :class:`CMAESWrapperConfig` from :class:`NLSQConfig`.
Parameters
----------
config : NLSQConfig
NLSQ configuration object.
Returns
-------
CMAESWrapperConfig
CMA-ES wrapper configuration.
"""
return cls(
preset=getattr(config, "cmaes_preset", "cmaes"),
max_generations=getattr(config, "cmaes_max_generations", None),
popsize=getattr(config, "cmaes_popsize", None),
sigma=getattr(config, "cmaes_sigma", 0.5),
sigma_warmstart=getattr(config, "cmaes_sigma_warmstart", 0.05),
tol_fun=getattr(config, "cmaes_tol_fun", 1e-8),
tol_x=getattr(config, "cmaes_tol_x", 1e-8),
restart_strategy=getattr(config, "cmaes_restart_strategy", "bipop"),
max_restarts=getattr(config, "cmaes_max_restarts", 9),
refine_with_nlsq=getattr(config, "cmaes_refine_with_nlsq", True),
refinement_ftol=getattr(config, "cmaes_refinement_ftol", 1e-10),
refinement_xtol=getattr(config, "cmaes_refinement_xtol", 1e-10),
refinement_gtol=getattr(config, "cmaes_refinement_gtol", 1e-10),
refinement_max_nfev=getattr(config, "cmaes_refinement_max_nfev", 500),
refinement_loss=getattr(config, "cmaes_refinement_loss", "linear"),
normalize=getattr(config, "cmaes_normalize", True),
normalization_epsilon=getattr(config, "cmaes_normalization_epsilon", 1e-12),
)
# =============================================================================
# CMAESResult (homodyne parity)
# =============================================================================
[docs]
@dataclass
class CMAESResult:
"""Result from CMA-ES optimization.
Matches homodyne's ``CMAESResult`` for structural parity.
Attributes
----------
parameters : np.ndarray
Optimized parameter values.
covariance : np.ndarray | None
Parameter covariance matrix (if computed).
chi_squared : float
Final chi-squared value.
success : bool
Whether optimization converged successfully.
diagnostics : dict
CMA-ES diagnostics (generations, evaluations, etc.).
method_used : str
Method used: "cmaes" or "multi-start".
nlsq_refined : bool
Whether result was refined with NLSQ L-M.
message : str
Convergence message.
"""
parameters: np.ndarray
covariance: np.ndarray | None
chi_squared: float
success: bool
diagnostics: dict = field(default_factory=dict)
method_used: str = "cmaes"
nlsq_refined: bool = False
message: str = ""
# =============================================================================
# CMAESWrapper (homodyne parity API + heterodyne cma backend)
# =============================================================================
[docs]
class CMAESWrapper:
"""Wrapper around ``cma.fmin2`` for bounded parameter optimization.
Public API matches homodyne's ``CMAESWrapper``:
- :meth:`is_available` — check if ``cma`` library is installed
- :meth:`compute_scale_ratio` — max/min bound range ratio
- :meth:`should_use_cmaes` — scale-ratio based selection
- :meth:`fit` — run CMA-ES, return :class:`CMAESResult`
Heterodyne-specific internal method:
- :meth:`fit_with_objective` — objective-fn based fit, returns NLSQResult
Parameters
----------
config : CMAESWrapperConfig | None
CMA-ES wrapper configuration. Uses defaults if None.
parameter_names : list[str] | None
Ordered parameter names (used in returned NLSQResult from
:meth:`fit_with_objective`).
"""
[docs]
def __init__(
self,
config: CMAESWrapperConfig | None = None,
parameter_names: list[str] | None = None,
) -> None:
self.config = config or CMAESWrapperConfig()
self._parameter_names = parameter_names or []
# ------------------------------------------------------------------
# Homodyne parity properties / methods
# ------------------------------------------------------------------
@property
def is_available(self) -> bool:
"""Check if CMA-ES is available (``cma`` library installed)."""
return CMAES_AVAILABLE
[docs]
def compute_scale_ratio(
self,
bounds: tuple[np.ndarray, np.ndarray],
) -> float:
"""Compute scale ratio from parameter bounds.
The scale ratio is the ratio of the largest to smallest parameter
range. High scale ratios (> 1000) indicate multi-scale problems
where CMA-ES excels.
Parameters
----------
bounds : tuple[np.ndarray, np.ndarray]
Lower and upper bounds as (lower, upper) arrays.
Returns
-------
float
Scale ratio (max_range / min_range).
"""
lower, upper = bounds
ranges = upper - lower
valid_ranges = ranges[ranges > 0]
if len(valid_ranges) < 2:
return 1.0
min_range = float(np.min(valid_ranges))
max_range = float(np.max(valid_ranges))
return max_range / min_range if min_range > 0 else 1.0
[docs]
def should_use_cmaes(
self,
bounds: tuple[np.ndarray, np.ndarray],
scale_threshold: float = 1000.0,
) -> bool:
"""Determine if CMA-ES should be used based on scale ratio.
Parameters
----------
bounds : tuple[np.ndarray, np.ndarray]
Parameter bounds as (lower, upper) arrays.
scale_threshold : float
Scale ratio threshold for CMA-ES selection. Default: 1000.
Returns
-------
bool
True if CMA-ES should be used.
"""
if not self.is_available:
logger.info(
"[CMA-ES] Method unavailable: 'cma' package not installed. "
"Install with: uv add cma"
)
return False
scale_ratio = self.compute_scale_ratio(bounds)
should_use = scale_ratio >= scale_threshold
bounds_summary = _format_bounds_summary(bounds)
if should_use:
logger.info(
"[CMA-ES] Auto-selected: scale_ratio=%.2e (threshold=%.2e), %s",
scale_ratio,
scale_threshold,
bounds_summary,
)
else:
logger.debug(
"[CMA-ES] Not selected: scale_ratio=%.2e < threshold=%.2e",
scale_ratio,
scale_threshold,
)
return should_use
[docs]
def fit(
self,
model_func: Callable,
xdata: np.ndarray,
ydata: np.ndarray,
p0: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray],
sigma: np.ndarray | None = None,
warmstart_chi2: float | None = None,
) -> CMAESResult:
"""Run CMA-ES global optimization.
Matches homodyne's ``CMAESWrapper.fit`` signature exactly.
Uses the ``cma`` library backend (heterodyne CPU-only path).
Parameters
----------
model_func : Callable
Model function: ``y = f(x, *params)``.
xdata : np.ndarray
Independent variable data.
ydata : np.ndarray
Dependent variable data to fit.
p0 : np.ndarray
Initial parameter guess.
bounds : tuple[np.ndarray, np.ndarray]
Parameter bounds as (lower, upper).
sigma : np.ndarray | None
Data uncertainties (optional).
warmstart_chi2 : float | None
Chi-squared from NLSQ warm-start. If provided and CMA-ES chi2
exceeds ``_REFINEMENT_SKIP_CHI2_RATIO`` × this value, refinement
is skipped. Also triggers ``sigma_warmstart`` instead of ``sigma``
for the CMA-ES search.
Returns
-------
CMAESResult
Optimization result with parameters, covariance, diagnostics.
Raises
------
ImportError
If the ``cma`` package is not installed.
"""
if not _HAS_CMA:
raise ImportError(
"CMA-ES requires the 'cma' package. Install it with: uv add cma"
)
import cma # noqa: F811 # type: ignore[import-untyped]
p0 = np.asarray(p0, dtype=np.float64)
lower, upper = (
np.asarray(bounds[0], dtype=np.float64),
np.asarray(bounds[1], dtype=np.float64),
)
n_params = len(p0)
scale_ratio = self.compute_scale_ratio(bounds)
bounds_summary = _format_bounds_summary(bounds)
logger.info(
"[CMA-ES] Optimization starting: n_params=%d, n_data=%d",
n_params,
len(ydata),
)
logger.info(
"[CMA-ES] Problem characteristics: scale_ratio=%.2e, %s",
scale_ratio,
bounds_summary,
)
# Select sigma based on warm-start state (v2.20.0)
warmstart_active = warmstart_chi2 is not None and warmstart_chi2 < float("inf")
if warmstart_active:
effective_sigma = self.config.sigma_warmstart
logger.info(
"[CMA-ES] Warm-start active: using sigma_warmstart=%.3f "
"(default sigma=%.3f)",
effective_sigma,
self.config.sigma,
)
else:
effective_sigma = self.config.sigma
# Build cma opts
preset_generations = {
"cmaes-fast": 50,
"cmaes": 100,
"cmaes-global": 200,
}
max_gen = preset_generations.get(
self.config.preset, self.config.max_generations or 100
)
opts: dict[str, Any] = {
"bounds": [lower.tolist(), upper.tolist()],
"maxiter": max_gen,
"tolx": self.config.tol_x,
"tolfun": self.config.tol_fun,
"verbose": -9,
}
if self.config.popsize is not None:
opts["popsize"] = self.config.popsize
# Override BIPOP -> none when warm-start is active (v2.21.0)
restart_strategy = self.config.restart_strategy
max_restarts = self.config.max_restarts
if warmstart_active and restart_strategy == "bipop":
restart_strategy = "none"
max_restarts = 0
logger.info(
"[CMA-ES] Warm-start: overriding restart_strategy='bipop' -> 'none'"
)
# Build objective from model_func
def _objective(x: np.ndarray) -> float:
pred = model_func(xdata, *x)
residuals = np.asarray(ydata) - np.asarray(pred)
if sigma is not None:
residuals = residuals / np.asarray(sigma)
return float(np.sum(residuals**2))
logger.info("[CMA-ES] Global search phase starting...")
t0 = time.perf_counter()
if restart_strategy == "bipop" and max_restarts > 0:
logger.info("[CMA-ES] BIPOP: max_restarts=%d", max_restarts)
xbest, es = cma.fmin2(
_objective,
p0.tolist(),
effective_sigma,
opts,
restarts=max_restarts,
bipop=True,
)
best_x = np.asarray(xbest, dtype=np.float64)
else:
es = cma.CMAEvolutionStrategy(p0.tolist(), effective_sigma, opts)
es.optimize(_objective)
best_x = np.asarray(es.result.xbest, dtype=np.float64)
search_duration = time.perf_counter() - t0
cmaes_chi_squared = float(es.result.fbest)
n_evals = int(es.result.evaluations)
n_iters = int(es.result.iterations)
stop_conds = es.stop()
convergence_reason = "; ".join(f"{k}={v}" for k, v in stop_conds.items())
logger.info(
"[CMA-ES] Global search completed: chi2=%.4e, "
"generations=%d, evals=%d, wall=%.2fs",
cmaes_chi_squared,
n_iters,
n_evals,
search_duration,
)
diagnostics: dict[str, Any] = {
"generations": n_iters,
"evaluations": n_evals,
"convergence_reason": convergence_reason,
"global_search_time_s": search_duration,
}
# Early-exit: skip refinement if CMA-ES chi2 is much worse than warm-start
skip_refinement = False
if (
warmstart_chi2 is not None
and cmaes_chi_squared > _REFINEMENT_SKIP_CHI2_RATIO * warmstart_chi2
):
skip_refinement = True
logger.warning(
"[CMA-ES] Skipping refinement: CMA-ES chi2=%.4e is >%.0fx "
"worse than warm-start chi2=%.4e.",
cmaes_chi_squared,
_REFINEMENT_SKIP_CHI2_RATIO,
warmstart_chi2,
)
# Optional NLSQ refinement (uses nlsq.curve_fit if available)
best_params = best_x
best_covariance: np.ndarray | None = None
best_chi_squared = cmaes_chi_squared
nlsq_refined = False
if self.config.refine_with_nlsq and not skip_refinement:
try:
from nlsq import curve_fit as _nlsq_curve_fit
popt, pcov = _nlsq_curve_fit(
f=model_func,
xdata=xdata,
ydata=ydata,
p0=best_x,
sigma=sigma,
bounds=bounds,
ftol=self.config.refinement_ftol,
xtol=self.config.refinement_xtol,
gtol=self.config.refinement_gtol,
max_nfev=self.config.refinement_max_nfev,
loss=self.config.refinement_loss,
)
popt_arr = np.asarray(popt, dtype=np.float64)
refined_pred = model_func(xdata, *popt_arr)
refined_res = np.asarray(ydata) - np.asarray(refined_pred)
if sigma is not None:
refined_res = refined_res / np.asarray(sigma)
refined_chi2 = float(np.sum(refined_res**2))
best_params = popt_arr
best_covariance = (
np.asarray(pcov, dtype=np.float64) if pcov is not None else None
)
best_chi_squared = refined_chi2
nlsq_refined = True
diagnostics["cmaes_chi_squared"] = cmaes_chi_squared
diagnostics["refined_chi_squared"] = refined_chi2
logger.info(
"[CMA-ES] Refinement improved chi2: %.4e -> %.4e",
cmaes_chi_squared,
refined_chi2,
)
except Exception as exc: # noqa: BLE001
logger.warning(
"[CMA-ES] Refinement failed: %s. Using CMA-ES result.", exc
)
diagnostics["refinement_failed"] = True
converged_reasons = {"tol_fun", "tol_x", "tol_fun_hist", "ftarget"}
raw_reason = (
convergence_reason.split("=")[0] if "=" in convergence_reason else ""
)
success = raw_reason in converged_reasons or nlsq_refined
return CMAESResult(
parameters=best_params,
covariance=best_covariance,
chi_squared=best_chi_squared,
success=success,
diagnostics=diagnostics,
method_used="cmaes",
nlsq_refined=nlsq_refined,
message=f"CMA-ES: {convergence_reason}",
)
# ------------------------------------------------------------------
# Heterodyne-specific: objective-based fit → NLSQResult
# ------------------------------------------------------------------
[docs]
def fit_with_objective(
self,
objective_fn: Callable[[np.ndarray], float],
x0: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray],
*,
residual_fn: Callable[[np.ndarray], np.ndarray] | None = None,
n_data: int | None = None,
parameter_names: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> NLSQResult:
"""Run CMA-ES optimization from a scalar objective function.
Heterodyne-specific; not in homodyne. Used by :func:`fit_with_cmaes`.
Args:
objective_fn: Scalar objective ``f(x) -> float`` to minimize.
x0: Initial guess, shape ``(n_params,)``.
bounds: ``(lower, upper)`` arrays, each shape ``(n_params,)``.
residual_fn: Optional residual function for building a full
:class:`NLSQResult` with covariance information.
n_data: Number of data points (for reduced chi-squared).
parameter_names: Override parameter names for this call.
metadata: Extra metadata attached to the result.
Returns:
An :class:`NLSQResult` populated from the CMA-ES solution.
Raises:
ImportError: If the ``cma`` package is not installed.
"""
if not _HAS_CMA:
raise ImportError(
"CMA-ES requires the 'cma' package. Install it with: uv add cma"
)
import cma # noqa: F811 # type: ignore[import-untyped]
# Build a CMAESConfig from CMAESWrapperConfig for cma opts
preset_generations = {
"cmaes-fast": 50,
"cmaes": 100,
"cmaes-global": 200,
}
max_gen = preset_generations.get(
self.config.preset, self.config.max_generations or 100
)
names = parameter_names or self._parameter_names
if not names:
names = [f"p{i}" for i in range(len(x0))]
x0 = np.asarray(x0, dtype=np.float64)
lower = np.asarray(bounds[0], dtype=np.float64)
upper = np.asarray(bounds[1], dtype=np.float64)
opts: dict[str, Any] = {
"bounds": [lower.tolist(), upper.tolist()],
"maxiter": max_gen,
"tolx": self.config.tol_x,
"tolfun": self.config.tol_fun,
"verbose": -9,
}
if self.config.popsize is not None:
opts["popsize"] = self.config.popsize
logger.info(
"Starting CMA-ES (objective): sigma=%.3g, maxiter=%d, n_params=%d",
self.config.sigma,
max_gen,
len(x0),
)
t0 = time.perf_counter()
if self.config.restart_strategy == "bipop" and self.config.max_restarts > 0:
logger.info("CMA-ES BIPOP: max_restarts=%d", self.config.max_restarts)
xbest, es = cma.fmin2(
objective_fn,
x0.tolist(),
self.config.sigma,
opts,
restarts=self.config.max_restarts,
bipop=True,
)
best_x = np.asarray(xbest, dtype=np.float64)
else:
es = cma.CMAEvolutionStrategy(x0.tolist(), self.config.sigma, opts)
es.optimize(objective_fn)
best_x = np.asarray(es.result.xbest, dtype=np.float64)
wall_time = time.perf_counter() - t0
best_cost = float(es.result.fbest)
n_evals = int(es.result.evaluations)
n_iters = int(es.result.iterations)
logger.info(
"CMA-ES finished: cost=%.6e, evals=%d, iters=%d, wall=%.2fs",
best_cost,
n_evals,
n_iters,
wall_time,
)
if residual_fn is not None:
residuals = np.asarray(residual_fn(best_x), dtype=np.float64)
else:
residuals = np.array([np.sqrt(max(best_cost, 0.0))], dtype=np.float64)
effective_n_data = n_data if n_data is not None else max(len(residuals), 1)
stop_reason = "; ".join(f"{k}={v}" for k, v in es.stop().items())
success = best_cost < np.inf and not np.isnan(best_cost)
result_metadata: dict[str, Any] = {
"optimizer": "CMA-ES",
"stop_conditions": es.stop(),
}
if metadata:
result_metadata.update(metadata)
return build_result_from_arrays(
parameters=best_x,
parameter_names=names,
residuals=residuals,
n_data=effective_n_data,
success=success,
message=stop_reason if stop_reason else "CMA-ES completed",
n_iterations=n_iters,
n_function_evals=n_evals,
wall_time=wall_time,
metadata=result_metadata,
)
# =============================================================================
# Coordinate transformation utilities (heterodyne-specific)
# =============================================================================
[docs]
def normalize_to_unit_cube(
x: np.ndarray,
lower: np.ndarray,
upper: np.ndarray,
) -> np.ndarray:
"""Map parameter vector from physical bounds to the unit hypercube [0, 1].
Args:
x: Parameter vector of shape ``(n_params,)``.
lower: Lower bound array of shape ``(n_params,)``.
upper: Upper bound array of shape ``(n_params,)``.
Returns:
Transformed array of shape ``(n_params,)`` with values in [0, 1].
Dimensions where ``lower == upper`` are mapped to 0.0.
Raises:
ValueError: If arrays have mismatched shapes.
"""
x = np.asarray(x, dtype=np.float64)
lower = np.asarray(lower, dtype=np.float64)
upper = np.asarray(upper, dtype=np.float64)
if not (x.shape == lower.shape == upper.shape):
raise ValueError(
f"x, lower, and upper must have the same shape; "
f"got {x.shape}, {lower.shape}, {upper.shape}"
)
span = upper - lower
safe_span = np.where(span > 0, span, 1.0)
x_norm = np.where(span > 0, (x - lower) / safe_span, 0.0)
return x_norm
[docs]
def denormalize_from_unit_cube(
x_norm: np.ndarray,
lower: np.ndarray,
upper: np.ndarray,
) -> np.ndarray:
"""Map parameter vector from the unit hypercube [0, 1] to physical bounds.
This is the inverse of :func:`normalize_to_unit_cube`.
Args:
x_norm: Normalized parameter vector of shape ``(n_params,)``
with values in [0, 1].
lower: Lower bound array of shape ``(n_params,)``.
upper: Upper bound array of shape ``(n_params,)``.
Returns:
De-normalized array of shape ``(n_params,)`` in physical units.
Raises:
ValueError: If arrays have mismatched shapes.
"""
x_norm = np.asarray(x_norm, dtype=np.float64)
lower = np.asarray(lower, dtype=np.float64)
upper = np.asarray(upper, dtype=np.float64)
if not (x_norm.shape == lower.shape == upper.shape):
raise ValueError(
f"x_norm, lower, and upper must have the same shape; "
f"got {x_norm.shape}, {lower.shape}, {upper.shape}"
)
return lower + x_norm * (upper - lower)
[docs]
def compute_adaptive_cmaes_params(
bounds: tuple[np.ndarray, np.ndarray],
) -> tuple[int, int]:
"""Compute adaptive population size and max generations from bound ranges.
Scales popsize from 50 to 200 and max_generations from 200 to 500 based
on the ratio between the largest and smallest parameter ranges.
Args:
bounds: ``(lower, upper)`` arrays, each shape ``(n_params,)``.
Returns:
``(popsize, max_generations)`` tuple.
"""
lower = np.asarray(bounds[0], dtype=np.float64)
upper = np.asarray(bounds[1], dtype=np.float64)
spans = upper - lower
active_spans = spans[spans > 0]
if len(active_spans) < 2:
return 50, 200
scale_ratio = float(active_spans.max() / active_spans.min())
log_ratio = np.log10(max(scale_ratio, 1.0))
t = min(log_ratio / 6.0, 1.0)
popsize = int(50 + t * 150)
max_generations = int(200 + t * 300)
logger.debug(
"Adaptive CMA-ES: scale_ratio=%.1f, popsize=%d, max_generations=%d",
scale_ratio,
popsize,
max_generations,
)
return popsize, max_generations
[docs]
def build_anti_degeneracy_objective(
base_objective: Callable[[np.ndarray], float],
bounds: tuple[np.ndarray, np.ndarray],
parameter_names: list[str],
penalty_weight: float = 0.01,
) -> Callable[[np.ndarray], float]:
"""Wrap an objective with penalty terms for known degenerate parameter pairs.
For each known degenerate pair (e.g. v0/v_offset, D0_ref/D0_sample),
a soft penalty encourages separation of their normalized values.
Args:
base_objective: Original scalar objective ``f(x) -> float``.
bounds: ``(lower, upper)`` arrays.
parameter_names: Parameter names matching the indices in ``x``.
penalty_weight: Strength of the penalty terms relative to the
base cost.
Returns:
Wrapped objective function with degeneracy penalties.
"""
from heterodyne.optimization.nlsq.anti_degeneracy_controller import (
_KNOWN_DEGENERATE_PAIRS,
)
lower = np.asarray(bounds[0], dtype=np.float64)
upper = np.asarray(bounds[1], dtype=np.float64)
spans = upper - lower
safe_spans = np.where(spans > 0, spans, 1.0)
name_to_idx = {n: i for i, n in enumerate(parameter_names)}
active_pairs: list[tuple[int, int]] = []
for name_a, name_b, _desc in _KNOWN_DEGENERATE_PAIRS:
if name_a in name_to_idx and name_b in name_to_idx:
active_pairs.append((name_to_idx[name_a], name_to_idx[name_b]))
if not active_pairs:
return base_objective
def penalized_objective(x: np.ndarray) -> float:
cost = base_objective(x)
x_norm = (x - lower) / safe_spans
penalty = 0.0
for idx_a, idx_b in active_pairs:
diff = abs(x_norm[idx_a] - x_norm[idx_b])
penalty += 1.0 / (diff + 0.01)
return cost + penalty_weight * penalty
return penalized_objective
[docs]
def fit_with_cmaes(
objective_fn: Callable[[np.ndarray], float],
initial_params: np.ndarray,
bounds: tuple[np.ndarray, np.ndarray],
parameter_names: list[str] | None = None,
*,
config: CMAESConfig | None = None,
residual_fn: Callable[[np.ndarray], np.ndarray] | None = None,
n_data: int | None = None,
anti_degeneracy: bool = False,
metadata: dict[str, Any] | None = None,
) -> NLSQResult:
"""Convenience function to run CMA-ES from a scalar objective.
Applies adaptive population sizing and max_generations based on the
parameter scale ratio when the user hasn't explicitly set these values.
Args:
objective_fn: Scalar objective ``f(x) -> float`` to minimize.
initial_params: Initial guess, shape ``(n_params,)``.
bounds: ``(lower, upper)`` arrays.
parameter_names: Ordered parameter names.
config: Low-level :class:`CMAESConfig`; ``None`` for defaults.
residual_fn: Optional residual function for covariance estimation.
n_data: Number of data points (for reduced chi-squared).
anti_degeneracy: Apply degeneracy-penalty weighting to objective.
metadata: Extra metadata attached to the result.
Returns:
An :class:`NLSQResult` from the CMA-ES solution.
"""
cfg = config or CMAESConfig()
# Adaptive population sizing
if cfg.popsize is None or cfg.maxiter == CMAESConfig.maxiter:
adaptive_pop, adaptive_gen = compute_adaptive_cmaes_params(bounds)
if cfg.popsize is None:
cfg = CMAESConfig(
sigma0=cfg.sigma0,
popsize=adaptive_pop,
maxiter=cfg.maxiter
if cfg.maxiter != CMAESConfig.maxiter
else adaptive_gen,
tolx=cfg.tolx,
tolfun=cfg.tolfun,
seed=cfg.seed,
diagonal_filtering=cfg.diagonal_filtering,
)
elif cfg.maxiter == CMAESConfig.maxiter:
cfg = CMAESConfig(
sigma0=cfg.sigma0,
popsize=cfg.popsize,
maxiter=adaptive_gen,
tolx=cfg.tolx,
tolfun=cfg.tolfun,
seed=cfg.seed,
diagonal_filtering=cfg.diagonal_filtering,
)
names = parameter_names or [f"p{i}" for i in range(len(initial_params))]
effective_objective = objective_fn
if anti_degeneracy:
effective_objective = build_anti_degeneracy_objective(
objective_fn,
bounds,
names,
)
# Convert CMAESConfig → CMAESWrapperConfig for the wrapper
wrapper_cfg = CMAESWrapperConfig(
sigma=cfg.sigma0,
tol_fun=cfg.tolfun,
tol_x=cfg.tolx,
restart_strategy=cfg.restart_strategy,
max_restarts=cfg.max_restarts,
refine_with_nlsq=False, # no model_func available here
)
if cfg.popsize is not None:
wrapper_cfg.popsize = cfg.popsize
wrapper = CMAESWrapper(config=wrapper_cfg, parameter_names=names)
return wrapper.fit_with_objective(
objective_fn=effective_objective,
x0=initial_params,
bounds=bounds,
residual_fn=residual_fn,
n_data=n_data,
parameter_names=names,
metadata=metadata,
)
[docs]
def adjust_covariance_for_bounds(
cov: np.ndarray,
lower: np.ndarray,
upper: np.ndarray,
) -> np.ndarray:
"""Scale a covariance matrix to account for parameter bounds ranges.
When parameters have very different dynamic ranges, a unit-cube
covariance should be scaled to physical space. The adjustment is:
cov_adjusted[i, j] = cov[i, j] * (upper[i] - lower[i])
* (upper[j] - lower[j])
Args:
cov: Covariance matrix of shape ``(n_params, n_params)``.
lower: Lower bound array of shape ``(n_params,)``.
upper: Upper bound array of shape ``(n_params,)``.
Returns:
Adjusted covariance matrix of shape ``(n_params, n_params)``.
Raises:
ValueError: If array shapes are inconsistent.
"""
cov = np.asarray(cov, dtype=np.float64)
lower = np.asarray(lower, dtype=np.float64)
upper = np.asarray(upper, dtype=np.float64)
n = len(lower)
if cov.shape != (n, n):
raise ValueError(
f"cov must be square with side length matching bounds ({n}), "
f"got shape {cov.shape}"
)
if lower.shape != (n,) or upper.shape != (n,):
raise ValueError(
f"lower and upper must have shape ({n},), "
f"got {lower.shape} and {upper.shape}"
)
spans = upper - lower
scale = np.outer(spans, spans)
result = cov * scale
min_eig = float(np.linalg.eigvalsh(result).min())
if min_eig < 0:
logger.warning(
"adjust_covariance_for_bounds: result is not PSD "
"(min eigenvalue=%.2e). Downstream consumers may need "
"to apply a PSD projection.",
min_eig,
)
return result