Source code for heterodyne.optimization.nlsq.multistart

"""Multi-start optimization with Latin Hypercube Sampling.

Supports parallel execution via ProcessPoolExecutor with automatic
fallback to sequential when JAX functions cannot be serialized for
inter-process communication.

NOTE: Subsampling is explicitly NOT supported per project requirements.
Numerical precision and reproducibility take priority over computational speed.
"""

from __future__ import annotations

import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from concurrent.futures import TimeoutError as FuturesTimeoutError
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import numpy as np

from heterodyne.optimization.nlsq.config import NLSQConfig
from heterodyne.optimization.nlsq.results import NLSQResult
from heterodyne.utils.logging import get_logger

if TYPE_CHECKING:
    from collections.abc import Callable

    from numpy.typing import NDArray

    from heterodyne.optimization.nlsq.adapter_base import NLSQAdapterBase

logger = get_logger(__name__)

_WORKER_TIMEOUT = 1800.0  # 30 minutes default
_MAX_POINTS_FOR_PARALLEL = 500_000


# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------


[docs] @dataclass class MultiStartConfig: """Configuration for multi-start optimization. Attributes: enable: Whether to use multi-start optimization. Default: False. n_starts: Number of starting points (including the user-provided one). seed: Random seed for Latin Hypercube Sampling reproducibility. sampling_strategy: Method for generating starting points. ``"latin_hypercube"`` or ``"random"``. custom_starts: User-provided custom starting points to include alongside generated starts. n_workers: Number of parallel workers. 0 = auto (cpu_count). Default: 0. use_screening: Whether to pre-screen starting points by cost. screen_keep_fraction: Fraction of starts to keep after screening. refine_top_k: Refine only the top-k starts after screening. refinement_ftol: Function tolerance for refinement phase. degeneracy_threshold: Relative chi-squared threshold for degeneracy detection across basins. parallel: Whether to attempt parallel execution via ProcessPoolExecutor. Defaults to False because JAX closures cannot be sent across process boundaries. max_workers: Maximum number of worker processes. Defaults to ``min(n_starts, os.cpu_count() or 4)``. worker_timeout: Per-worker timeout in seconds. max_data_points_for_parallel: Auto-disable parallel when ``n_data * n_starts`` exceeds this threshold. """ enable: bool = False n_starts: int = 10 seed: int | None = None sampling_strategy: str = "latin_hypercube" custom_starts: list[list[float]] | None = None n_workers: int = 0 use_screening: bool = True screen_keep_fraction: float = 0.5 refine_top_k: int = 3 refinement_ftol: float = 1e-12 degeneracy_threshold: float = 0.1 parallel: bool = ( False # Default False — JAX closures cannot cross process boundaries ) max_workers: int | None = None worker_timeout: float = _WORKER_TIMEOUT max_data_points_for_parallel: int = _MAX_POINTS_FOR_PARALLEL def __post_init__(self) -> None: if self.n_starts < 1: raise ValueError(f"n_starts must be >= 1, got {self.n_starts}") if self.worker_timeout <= 0: raise ValueError(f"worker_timeout must be > 0, got {self.worker_timeout}") if self.max_workers is None: self.max_workers = min(self.n_starts, os.cpu_count() or 4)
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> MultiStartConfig: """Create from dictionary, ignoring unknown keys. Args: d: Dictionary of configuration values. Returns: Populated ``MultiStartConfig`` instance. """ known = set(cls.__dataclass_fields__) return cls(**{k: v for k, v in d.items() if k in known})
[docs] @classmethod def from_nlsq_config(cls, nlsq_config: Any) -> MultiStartConfig: """Create MultiStartConfig from NLSQConfig. Parameters ---------- nlsq_config : NLSQConfig NLSQ configuration object. Returns ------- MultiStartConfig Multi-start configuration. """ custom_starts = getattr(nlsq_config, "multi_start_custom_starts", None) return cls( enable=getattr(nlsq_config, "enable_multi_start", False), n_starts=getattr(nlsq_config, "multi_start_n_starts", 10), seed=getattr(nlsq_config, "multi_start_seed", None), sampling_strategy=getattr( nlsq_config, "multi_start_sampling_strategy", "latin_hypercube" ), custom_starts=custom_starts, n_workers=getattr(nlsq_config, "multi_start_n_workers", 0), use_screening=getattr(nlsq_config, "multi_start_use_screening", True), screen_keep_fraction=getattr( nlsq_config, "multi_start_screen_keep_fraction", 0.5 ), refine_top_k=getattr(nlsq_config, "multi_start_refine_top_k", 3), refinement_ftol=getattr(nlsq_config, "multi_start_refinement_ftol", 1e-12), degeneracy_threshold=getattr( nlsq_config, "multi_start_degeneracy_threshold", 0.1 ), )
[docs] def to_nlsq_global_config(self) -> Any: """Convert to NLSQ's GlobalOptimizationConfig. Returns ------- GlobalOptimizationConfig NLSQ global optimization configuration. Raises ------ ImportError If NLSQ GlobalOptimizationConfig is not available. Notes ----- Maps heterodyne's multi-start configuration to NLSQ's GlobalOptimizationConfig: - sampling_strategy -> sampler (lhs, sobol, halton) - use_screening -> elimination_rounds (0 if disabled) - screen_keep_fraction -> elimination_fraction (inverted) """ try: from nlsq.global_optimization import GlobalOptimizationConfig except ImportError as e: raise ImportError( "NLSQ GlobalOptimizationConfig not available. " "Please install NLSQ >= 0.4.0: pip install nlsq>=0.4.0" ) from e sampler_map = { "latin_hypercube": "lhs", "lhs": "lhs", "sobol": "sobol", "halton": "halton", "random": "lhs", } sampler_str = sampler_map.get(self.sampling_strategy, "lhs") elimination_fraction = 1.0 - self.screen_keep_fraction elimination_rounds = 3 if self.use_screening else 0 return GlobalOptimizationConfig( n_starts=self.n_starts, sampler=sampler_str, # type: ignore[arg-type] elimination_rounds=elimination_rounds, elimination_fraction=elimination_fraction, )
# --------------------------------------------------------------------------- # Result containers # ---------------------------------------------------------------------------
[docs] @dataclass class SingleStartResult: """Result from a single optimization start. Attributes: result: The ``NLSQResult`` returned by the adapter. start_index: Zero-based index of this start within the run. initial_params: Starting parameter vector used for this run. wall_time: Wall-clock time in seconds for this single start. """ result: NLSQResult start_index: int initial_params: np.ndarray wall_time: float
[docs] @dataclass class MultiStartResult: """Aggregated result from a multi-start optimization run. Attributes: best_result: The ``NLSQResult`` with the lowest final cost. all_starts: Ordered list of ``SingleStartResult`` for every start. n_successful: Number of starts that reported ``success=True``. n_total: Total number of starts attempted. config: The ``MultiStartConfig`` used for this run. strategy_used: Strategy that was used (always ``"full"``). n_unique_basins: Number of distinct local minima found. degeneracy_detected: Whether parameter degeneracy was detected. total_wall_time: Total execution time in seconds (homodyne-parity alias). wall_time_total: Total elapsed wall-clock time in seconds. screening_costs: Initial costs from screening phase. basin_labels: Cluster labels for each result. """ best_result: NLSQResult all_starts: list[SingleStartResult] n_successful: int n_total: int config: MultiStartConfig strategy_used: str = "full" n_unique_basins: int = 1 degeneracy_detected: bool = False wall_time_total: float = 0.0 screening_costs: NDArray[np.float64] | None = None basin_labels: NDArray[np.int64] | None = None @property def best(self) -> NLSQResult: """Homodyne-parity alias: best result by cost.""" return self.best_result @property def total_wall_time(self) -> float: """Homodyne-parity alias for :attr:`wall_time_total`. Was a duplicate dataclass field; now a derived property so callers using either name always see the same value (previously ``total_wall_time`` was always 0.0 because ``fit()`` only set ``wall_time_total``). """ return self.wall_time_total @property def all_results(self) -> list[NLSQResult]: """Backward-compatible accessor returning all ``NLSQResult`` objects.""" return [s.result for s in self.all_starts]
[docs] def to_nlsq_result(self) -> NLSQResult: """Convert the best result to an ``NLSQResult`` with multistart metadata. Attaches a ``"multistart"`` key to ``result.metadata`` containing summary statistics for the full multi-start run. Returns: The best ``NLSQResult`` with metadata populated. """ best_index = next( (s.start_index for s in self.all_starts if s.result is self.best_result), 0, ) self.best_result.metadata["multistart"] = { "n_starts": self.n_total, "n_successful": self.n_successful, "wall_time_total": self.wall_time_total, "best_start_index": best_index, "strategy_used": self.strategy_used, "n_unique_basins": self.n_unique_basins, "degeneracy_detected": self.degeneracy_detected, } return self.best_result
[docs] def to_optimization_result(self) -> Any: """Convert MultiStartResult to a result dict for CLI compatibility. Returns a dictionary containing the best solution with multi-start metadata, compatible with downstream reporting. Returns ------- dict[str, Any] Result dict with best parameters and multi-start diagnostics. """ best = self.best_result multistart_diagnostics: dict[str, Any] = { "strategy_used": self.strategy_used, "n_starts": self.n_total, "n_successful": self.n_successful, "n_unique_basins": self.n_unique_basins, "degeneracy_detected": self.degeneracy_detected, "total_wall_time": self.total_wall_time, } return { "best_result": best, "multistart_diagnostics": multistart_diagnostics, }
# --------------------------------------------------------------------------- # Module-level worker function (required for ProcessPoolExecutor) # --------------------------------------------------------------------------- def _multistart_worker_sequential( adapter: NLSQAdapterBase, residual_fn: Callable[[np.ndarray], np.ndarray], initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], config: NLSQConfig, jacobian_fn: Callable[[np.ndarray], np.ndarray] | None, ) -> NLSQResult: """Run a single optimization start inside a worker process. This function is defined at module level so that ProcessPoolExecutor can submit it across process boundaries. Note: JAX-based residual functions cannot be sent across process boundaries, so this function is only callable in practice when a serialization-safe residual is supplied. The ``MultiStartOptimizer.fit()`` method always falls back to sequential execution for JAX residuals. Args: adapter: Adapter instance carrying solver configuration. residual_fn: Residual callable (must be serialization-safe). initial_params: Starting parameter vector. bounds: ``(lower, upper)`` bound arrays. config: NLSQ solver configuration. jacobian_fn: Optional analytic Jacobian (must be serialization-safe). Returns: ``NLSQResult`` for this start. """ return adapter.fit( residual_fn=residual_fn, initial_params=initial_params, bounds=bounds, config=config, jacobian_fn=jacobian_fn, ) # --------------------------------------------------------------------------- # Optimizer # ---------------------------------------------------------------------------
[docs] class MultiStartOptimizer: """Multi-start optimizer using Latin Hypercube Sampling. Runs optimization from multiple starting points to improve the chances of finding the global minimum. The first starting point is always the user-supplied ``initial_params``; the remaining ``n_starts - 1`` points are drawn via Latin Hypercube Sampling within the supplied bounds. Parallel execution is available via ``MultiStartConfig.parallel=True`` but is disabled by default because JAX residual closures cannot be transmitted across process boundaries. When parallel is requested, the optimizer logs a warning and falls back to sequential execution. """
[docs] def __init__( self, adapter: NLSQAdapterBase, config: MultiStartConfig | None = None, *, # Legacy keyword arguments for backward compatibility n_starts: int | None = None, seed: int | None = None, ) -> None: """Initialize the multi-start optimizer. Args: adapter: NLSQ adapter used to run each individual fit. config: Full ``MultiStartConfig``. When provided, ``n_starts`` and ``seed`` keyword arguments are ignored. n_starts: Legacy argument — number of starting points. seed: Legacy argument — random seed. """ self._adapter = adapter if config is not None: self._config = config else: self._config = MultiStartConfig( n_starts=n_starts if n_starts is not None else 10, seed=seed, ) self._rng = np.random.default_rng(self._config.seed)
# ------------------------------------------------------------------ # Public helpers # ------------------------------------------------------------------
[docs] def generate_starting_points( self, initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], ) -> np.ndarray: """Generate starting points using Latin Hypercube Sampling. The first point is always ``initial_params``. The remaining ``n_starts - 1`` points are drawn via LHS within ``bounds``. Args: initial_params: User-provided initial values (used as first point). bounds: ``(lower, upper)`` bound arrays of shape ``(n_params,)``. Returns: Array of shape ``(n_starts, n_params)``. """ lower, upper = bounds n_params = len(initial_params) # First point is user-provided starting_points = [initial_params.copy()] # Generate LHS points for remaining starts n_lhs = self._config.n_starts - 1 if n_lhs > 0: lhs_points = self._latin_hypercube_sample(n_lhs, n_params, lower, upper) starting_points.extend(lhs_points) return np.array(starting_points)
# ------------------------------------------------------------------ # Sampling internals # ------------------------------------------------------------------ def _latin_hypercube_sample( self, n_samples: int, n_dims: int, lower: np.ndarray, upper: np.ndarray, ) -> list[np.ndarray]: """Generate Latin Hypercube samples. Divides each dimension into ``n_samples`` equal strata, places one sample per stratum with a uniform random offset, then shuffles the stratum assignment independently across dimensions. Args: n_samples: Number of samples to generate. n_dims: Dimensionality of the parameter space. lower: Lower bounds, shape ``(n_dims,)``. upper: Upper bounds, shape ``(n_dims,)``. Returns: List of ``n_samples`` arrays each of shape ``(n_dims,)``. """ # Proper Latin Hypercube Sampling: # Divide each dimension into n_samples equal strata, # place one sample per stratum, then shuffle across dimensions. result = np.zeros((n_samples, n_dims)) for d in range(n_dims): # Create one sample per stratum with random offset within stratum perm = self._rng.permutation(n_samples) for i in range(n_samples): stratum = perm[i] low_edge = lower[d] + stratum * (upper[d] - lower[d]) / n_samples high_edge = lower[d] + (stratum + 1) * (upper[d] - lower[d]) / n_samples result[i, d] = low_edge + self._rng.random() * (high_edge - low_edge) return [result[i] for i in range(n_samples)] # ------------------------------------------------------------------ # Main entry point # ------------------------------------------------------------------
[docs] def fit( self, residual_fn: Callable[[np.ndarray], np.ndarray], initial_params: np.ndarray, bounds: tuple[np.ndarray, np.ndarray], config: NLSQConfig, jacobian_fn: Callable[[np.ndarray], np.ndarray] | None = None, ) -> MultiStartResult: """Run multi-start optimization. Generates starting points via LHS, then runs the adapter from each point sequentially. If ``MultiStartConfig.parallel=True``, a warning is emitted and execution falls back to sequential because JAX residual closures cannot be transmitted across process boundaries. Args: residual_fn: Residual function mapping parameters to residual vector. initial_params: Initial guess; always used as the first start. bounds: ``(lower, upper)`` bound arrays. config: Per-run NLSQ solver configuration. jacobian_fn: Optional analytic Jacobian. Returns: ``MultiStartResult`` with the best result and per-start details. """ t_start = time.perf_counter() starting_points = self.generate_starting_points(initial_params, bounds) n_points = len(starting_points) # JAX closures cannot cross process boundaries — the production path # is always sequential. ``_force_parallel`` is the explicit test-only # opt-in for exercising ``_run_parallel`` with a process-safe # residual function (no JAX tracers, no live config managers). force_parallel = bool(getattr(self, "_force_parallel", False)) use_parallel = force_parallel and self._config.parallel logger.info( "Multi-start: %d starts, parallel=%s%s", n_points, use_parallel, " (force-flag set)" if force_parallel else "", ) if self._config.parallel and not force_parallel: logger.warning( "Parallel multi-start requested but JAX closures cannot be " "transmitted across process boundaries. " "Falling back to sequential execution. " "(Set ``optimizer._force_parallel = True`` to exercise the " "parallel path explicitly — test-only.)" ) if use_parallel: all_starts = self._run_parallel( starting_points=starting_points, residual_fn=residual_fn, bounds=bounds, config=config, jacobian_fn=jacobian_fn, ) else: all_starts = self._run_sequential( starting_points=starting_points, residual_fn=residual_fn, bounds=bounds, config=config, jacobian_fn=jacobian_fn, ) # Identify best result best: NLSQResult | None = None best_cost = np.inf n_successful = 0 for s in all_starts: if s.result.success: n_successful += 1 cost = s.result.final_cost if s.result.final_cost is not None else np.inf if cost < best_cost: best_cost = cost best = s.result # Fallback: if nothing succeeded, take the run with the lowest cost if best is None: best = min( (s.result for s in all_starts), key=lambda r: r.final_cost if r.final_cost is not None else np.inf, ) wall_total = time.perf_counter() - t_start logger.info( "Multi-start complete: %d/%d successful, best cost=%.4e, total time=%.1fs", n_successful, n_points, best_cost, wall_total, ) return MultiStartResult( best_result=best, all_starts=all_starts, n_successful=n_successful, n_total=n_points, config=self._config, wall_time_total=wall_total, )
# ------------------------------------------------------------------ # Execution backends # ------------------------------------------------------------------ def _run_sequential( self, starting_points: np.ndarray, residual_fn: Callable[[np.ndarray], np.ndarray], bounds: tuple[np.ndarray, np.ndarray], config: NLSQConfig, jacobian_fn: Callable[[np.ndarray], np.ndarray] | None, ) -> list[SingleStartResult]: """Execute all starts sequentially, logging progress after each. Args: starting_points: Array of shape ``(n_starts, n_params)``. residual_fn: Residual callable. bounds: ``(lower, upper)`` bound arrays. config: NLSQ solver configuration. jacobian_fn: Optional analytic Jacobian. Returns: Ordered list of ``SingleStartResult`` for every start. """ all_starts: list[SingleStartResult] = [] best_cost = np.inf n_total = len(starting_points) for i, start in enumerate(starting_points): logger.debug("Start %d/%d", i + 1, n_total) t0 = time.perf_counter() result = self._adapter.fit( residual_fn=residual_fn, initial_params=start, bounds=bounds, config=config, jacobian_fn=jacobian_fn, ) wall = time.perf_counter() - t0 all_starts.append( SingleStartResult( result=result, start_index=i, initial_params=start, wall_time=wall, ) ) cost = result.final_cost if result.final_cost is not None else np.inf if cost < best_cost: best_cost = cost logger.debug( " Start %d/%d: new best cost=%.4e (%.2fs)", i + 1, n_total, best_cost, wall, ) else: logger.debug( " Start %d/%d: cost=%.4e (%.2fs)", i + 1, n_total, cost, wall, ) return all_starts def _run_parallel( self, starting_points: np.ndarray, residual_fn: Callable[[np.ndarray], np.ndarray], bounds: tuple[np.ndarray, np.ndarray], config: NLSQConfig, jacobian_fn: Callable[[np.ndarray], np.ndarray] | None, ) -> list[SingleStartResult]: """Execute starts in parallel via ProcessPoolExecutor. This method requires that ``residual_fn`` (and ``jacobian_fn``) are safe to transmit across process boundaries. If any worker raises an error or times out, the failed starts are logged and excluded from the results. If all workers fail, raises ``RuntimeError``. Note: This method is present for completeness and future use. It is not invoked by ``fit()`` for JAX-based residuals. Use ``MultiStartConfig.parallel=True`` only when supplying a residual function that can be transmitted across process boundaries. Args: starting_points: Array of shape ``(n_starts, n_params)``. residual_fn: Residual callable (must be transmittable across process boundaries). bounds: ``(lower, upper)`` bound arrays. config: NLSQ solver configuration. jacobian_fn: Optional analytic Jacobian (must be transmittable across process boundaries). Returns: Ordered list of ``SingleStartResult`` for every completed start. Raises: RuntimeError: If every worker fails or times out. """ n_total = len(starting_points) max_workers = self._config.max_workers or min(n_total, os.cpu_count() or 4) timeout = self._config.worker_timeout # Map from future -> (start_index, initial_params, t0) future_meta: dict[Any, tuple[int, np.ndarray, float]] = {} results_map: dict[int, SingleStartResult] = {} logger.info( "Parallel multi-start: %d starts, %d workers, timeout=%.0fs", n_total, max_workers, timeout, ) try: with ProcessPoolExecutor(max_workers=max_workers) as executor: for i, start in enumerate(starting_points): t0 = time.perf_counter() future = executor.submit( _multistart_worker_sequential, self._adapter, residual_fn, start, bounds, config, jacobian_fn, ) future_meta[future] = (i, start, t0) best_cost = np.inf for future in as_completed(future_meta, timeout=timeout): idx, start, t0 = future_meta[future] wall = time.perf_counter() - t0 try: result: NLSQResult = future.result(timeout=0) cost = ( result.final_cost if result.final_cost is not None else np.inf ) if cost < best_cost: best_cost = cost logger.debug( " Worker %d/%d done: new best cost=%.4e (%.2fs)", idx + 1, n_total, best_cost, wall, ) else: logger.debug( " Worker %d/%d done: cost=%.4e (%.2fs)", idx + 1, n_total, cost, wall, ) results_map[idx] = SingleStartResult( result=result, start_index=idx, initial_params=start, wall_time=wall, ) except FuturesTimeoutError: logger.warning( "Worker %d/%d timed out after %.0fs — skipping", idx + 1, n_total, timeout, ) except Exception as exc: # noqa: BLE001 logger.warning( "Worker %d/%d failed: %s — skipping", idx + 1, n_total, exc, ) except FuturesTimeoutError: logger.warning( "Global as_completed timeout reached after %.0fs — " "proceeding with %d completed workers", timeout, len(results_map), ) if not results_map: raise RuntimeError( "All parallel workers failed or timed out. " "Consider using sequential execution." ) # Return in original start order return [results_map[i] for i in sorted(results_map)]
# --------------------------------------------------------------------------- # Standalone LHS utilities # ---------------------------------------------------------------------------
[docs] def check_zero_volume_bounds( bounds_lower: np.ndarray, bounds_upper: np.ndarray, ) -> list[int]: """Identify parameter dimensions with zero sampling volume. A dimension has zero volume when its lower bound equals its upper bound, meaning the parameter is effectively fixed. LHS sampling should skip these dimensions and keep all starts at the fixed value. Args: bounds_lower: Lower bound array of shape ``(n_params,)``. bounds_upper: Upper bound array of shape ``(n_params,)``. Returns: Sorted list of dimension indices where ``bounds_lower[i] == bounds_upper[i]``. Raises: ValueError: If arrays have different lengths. """ lower = np.asarray(bounds_lower, dtype=np.float64) upper = np.asarray(bounds_upper, dtype=np.float64) if lower.shape != upper.shape: raise ValueError( f"bounds_lower and bounds_upper must have the same shape, " f"got {lower.shape} vs {upper.shape}" ) fixed_dims = [int(i) for i in np.where(lower == upper)[0]] if fixed_dims: logger.debug( "check_zero_volume_bounds: %d fixed dimension(s) detected: %s", len(fixed_dims), fixed_dims, ) return fixed_dims
[docs] def generate_lhs_starts( n_starts: int, bounds_lower: np.ndarray, bounds_upper: np.ndarray, seed: int = 42, ) -> np.ndarray: """Generate Latin Hypercube starting points, excluding fixed dimensions. Fixed dimensions (where ``bounds_lower[i] == bounds_upper[i]``) are detected via :func:`check_zero_volume_bounds` and excluded from LHS sampling. All starts receive the fixed value for those dimensions. Args: n_starts: Number of starting points to generate. bounds_lower: Lower bound array of shape ``(n_params,)``. bounds_upper: Upper bound array of shape ``(n_params,)``. seed: Random seed for reproducibility. Returns: Array of shape ``(n_starts, n_params)`` containing starting points. Raises: ValueError: If ``n_starts < 1`` or bound arrays have mismatched shapes. """ if n_starts < 1: raise ValueError(f"n_starts must be >= 1, got {n_starts}") lower = np.asarray(bounds_lower, dtype=np.float64) upper = np.asarray(bounds_upper, dtype=np.float64) n_params = len(lower) fixed_dims = check_zero_volume_bounds(lower, upper) fixed_set = set(fixed_dims) free_dims = [i for i in range(n_params) if i not in fixed_set] # Initialise output; fixed dimensions get their constant value immediately starts = np.empty((n_starts, n_params), dtype=np.float64) for i in fixed_dims: starts[:, i] = lower[i] if not free_dims: logger.warning( "generate_lhs_starts: all %d dimensions are fixed — " "all starts are identical", n_params, ) return starts # LHS over free dimensions only rng = np.random.default_rng(seed) n_free = len(free_dims) lhs_block = np.zeros((n_starts, n_free), dtype=np.float64) for col_idx, dim in enumerate(free_dims): lo = lower[dim] hi = upper[dim] perm = rng.permutation(n_starts).astype(np.float64) u = rng.random(n_starts) lhs_block[:, col_idx] = lo + (perm + u) * (hi - lo) / n_starts free_dims_arr = np.array(free_dims) starts[:, free_dims_arr] = lhs_block logger.debug( "generate_lhs_starts: generated %d starts across %d free / %d fixed dims " "(seed=%d)", n_starts, n_free, len(fixed_dims), seed, ) return starts
[docs] def validate_n_starts_for_lhs( n_starts: int, n_params: int, warn: bool = True, ) -> int: """Validate n_starts for Latin Hypercube Sampling coverage. For LHS to provide meaningful coverage, n_starts should be at least n_params. Very large n_starts relative to parameter space may produce redundant samples. Parameters ---------- n_starts : int Requested number of starting points. n_params : int Number of parameters (dimensions). warn : bool Whether to emit warnings for suboptimal settings. Returns ------- int Validated n_starts (unchanged if valid). """ if n_starts < n_params and warn: logger.warning( "n_starts (%d) < n_params (%d): " "LHS coverage may be inadequate. Consider n_starts >= %d.", n_starts, n_params, n_params, ) max_meaningful = n_params * 1000 if n_starts > max_meaningful and warn: logger.warning( "n_starts (%d) is very large for %d parameters. " "This may produce redundant samples with diminishing returns. " "Consider n_starts <= %d.", n_starts, n_params, max_meaningful, ) return n_starts
[docs] def generate_random_starts( bounds: NDArray[np.float64], n_starts: int, seed: int = 42, ) -> NDArray[np.float64]: """Generate starting points via random uniform sampling. Parameters ---------- bounds : NDArray[np.float64] Parameter bounds as (n_params, 2) array. n_starts : int Number of starting points to generate. seed : int Random seed for reproducibility. Returns ------- NDArray[np.float64] Starting points as (n_starts, n_params) array. """ rng = np.random.default_rng(seed) n_params = bounds.shape[0] lower = bounds[:, 0] upper = bounds[:, 1] samples = rng.uniform(lower, upper, size=(n_starts, n_params)) logger.debug( "Generated %d random starting points for %d parameters", n_starts, n_params ) return samples
[docs] def include_custom_starts( generated_starts: NDArray[np.float64], custom_starts: list[list[float]] | NDArray[np.float64] | None, bounds: NDArray[np.float64], ) -> NDArray[np.float64]: """Include user-provided custom starting points alongside generated starts. Custom starting points are prepended to the generated starts so they are always included (not filtered by screening). Parameters ---------- generated_starts : NDArray[np.float64] Starting points generated by LHS or random sampling. custom_starts : list[list[float]] | NDArray[np.float64] | None User-provided custom starting points. bounds : NDArray[np.float64] Parameter bounds as (n_params, 2) array, used for validation. Returns ------- NDArray[np.float64] Combined starting points with custom starts first. """ if custom_starts is None or len(custom_starts) == 0: return generated_starts custom_array = np.asarray(custom_starts, dtype=np.float64) n_params = bounds.shape[0] if custom_array.ndim == 1: custom_array = custom_array.reshape(1, -1) if custom_array.shape[1] != n_params: logger.warning( "Custom starts have wrong dimension: %d != %d. Ignoring custom starts.", custom_array.shape[1], n_params, ) return generated_starts lower = bounds[:, 0] upper = bounds[:, 1] n_custom = len(custom_array) valid_mask = np.all((custom_array >= lower) & (custom_array <= upper), axis=1) n_valid = int(np.sum(valid_mask)) if n_valid < n_custom: logger.warning( "%d custom starting point(s) are outside bounds and will be skipped.", n_custom - n_valid, ) custom_array = custom_array[valid_mask] if len(custom_array) == 0: return generated_starts logger.info("Including %d custom starting point(s)", len(custom_array)) return np.vstack([custom_array, generated_starts])
[docs] def screen_starts( cost_func: Callable[[NDArray[np.float64]], float], starts: NDArray[np.float64], keep_fraction: float = 0.5, min_keep: int = 3, n_workers: int = 0, ) -> tuple[NDArray[np.float64], NDArray[np.float64]]: """Pre-filter starting points by initial cost. Parameters ---------- cost_func : Callable Function that computes cost (chi-squared) for a parameter vector. starts : NDArray[np.float64] Starting points as (n_starts, n_params) array. keep_fraction : float Fraction of starting points to keep (0, 1]. min_keep : int Minimum number of starting points to keep. n_workers : int Number of parallel workers for cost evaluation. 0 = auto (cpu_count - 1). Returns ------- tuple[NDArray[np.float64], NDArray[np.float64]] Filtered starting points and their initial costs (all costs, not just kept ones). """ from concurrent.futures import ThreadPoolExecutor n_total_starts = len(starts) n_keep = max(min_keep, int(n_total_starts * keep_fraction)) n_keep = min(n_keep, n_total_starts) if n_workers == 0: n_workers = max(1, (os.cpu_count() or 1) - 1) if n_total_starts >= 4 and n_workers > 1: try: with ThreadPoolExecutor(max_workers=n_workers) as executor: costs = np.array(list(executor.map(cost_func, starts))) except (RuntimeError, OSError, ValueError) as exc: logger.warning( "Parallel screening failed, falling back to sequential: %s", exc ) costs = np.array([cost_func(start) for start in starts]) else: costs = np.array([cost_func(start) for start in starts]) sorted_indices = np.argsort(costs) keep_indices = sorted_indices[:n_keep] filtered_starts = starts[keep_indices] filtered_costs = costs[keep_indices] logger.info( "Screening: kept %d/%d starts (best cost: %.4g, worst kept: %.4g)", n_keep, n_total_starts, filtered_costs[0], filtered_costs[-1], ) return filtered_starts, costs
[docs] def get_n_workers(config: MultiStartConfig, n_starts: int) -> int: """Determine number of parallel workers. Parameters ---------- config : MultiStartConfig Multi-start configuration. n_starts : int Number of starting points. Returns ------- int Number of workers to use. """ if config.n_workers > 0: n_workers = config.n_workers else: n_workers = os.cpu_count() or 4 n_workers = min(n_workers, n_starts) logger.debug("Using %d parallel workers for %d starts", n_workers, n_starts) return n_workers
[docs] def detect_degeneracy( results: list[SingleStartResult], chi_sq_threshold: float = 0.1, param_threshold: float = 0.2, ) -> tuple[bool, int, NDArray[np.int64] | None]: """Detect parameter degeneracy from multiple optimization results. Parameters ---------- results : list[SingleStartResult] List of optimization results (heterodyne ``SingleStartResult`` wrapping an inner ``NLSQResult``). chi_sq_threshold : float Maximum relative chi-squared difference to consider similar. param_threshold : float Maximum relative parameter distance to consider same basin. Returns ------- tuple[bool, int, NDArray[np.int64] | None] (degeneracy_detected, n_unique_basins, basin_labels) """ successful = [r for r in results if r.result.success] if len(successful) < 2: return False, 1, None # Sort by cost successful.sort( key=lambda r: r.result.final_cost if r.result.final_cost is not None else np.inf ) best_cost = ( successful[0].result.final_cost if successful[0].result.final_cost is not None else 0.0 ) basins: list[list[SingleStartResult]] = [] basin_assignments: list[int] = [] for r in successful: r_cost = r.result.final_cost if r.result.final_cost is not None else np.inf cost_diff = abs(r_cost - best_cost) / (abs(best_cost) + 1e-10) if cost_diff > chi_sq_threshold: basin_assignments.append(-1) continue r_params = r.result.parameters found_basin = False for basin_idx, basin in enumerate(basins): center_params = basin[0].result.parameters if center_params is None: continue param_dist = np.linalg.norm(r_params - center_params) / ( np.linalg.norm(center_params) + 1e-10 ) if param_dist < param_threshold: basin.append(r) basin_assignments.append(basin_idx) found_basin = True break if not found_basin: basins.append([r]) basin_assignments.append(len(basins) - 1) n_unique_basins = len(basins) degeneracy_detected = n_unique_basins > 1 labels = np.array(basin_assignments, dtype=np.int64) if degeneracy_detected: logger.warning( "Parameter degeneracy detected: %d distinct basins " "with similar chi-squared values", n_unique_basins, ) return degeneracy_detected, n_unique_basins, labels