"""Structured logging utilities for the heterodyne package.
Provides a lightweight but flexible logging system that matches the CMC
reimplementation requirements: contextual log prefixes, configurable console
and rotating file handlers, and helpers for performance monitoring.
"""
from __future__ import annotations
import functools
import logging
import sys
import threading
import time
import traceback
from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import (
Any,
Literal,
TypeVar,
)
# Type variables for decorators
F = TypeVar("F", bound=Callable[..., Any])
# Type alias for logger types
LoggerType = logging.Logger | logging.LoggerAdapter[logging.Logger]
DEFAULT_FORMAT_DETAILED = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
DEFAULT_FORMAT_SIMPLE = "%(levelname)-8s | %(message)s"
def _resolve_level(level: str | int | None) -> int | None:
"""Convert string/int log level to logging level constant."""
if level is None:
return None
if isinstance(level, int):
return level
return getattr(logging, str(level).upper(), logging.INFO)
class _ColorFormatter(logging.Formatter):
"""Optional ANSI color formatter for console logging."""
COLOR_MAP = {
"DEBUG": "\033[36m", # Cyan
"INFO": "\033[32m", # Green
"WARNING": "\033[33m", # Yellow
"ERROR": "\033[31m", # Red
"CRITICAL": "\033[35m", # Magenta
}
RESET = "\033[0m"
def __init__(self, fmt: str, datefmt: str | None, use_color: bool) -> None:
super().__init__(fmt=fmt, datefmt=datefmt)
self.use_color = use_color
def format(self, record: logging.LogRecord) -> str:
original_levelname = record.levelname
if self.use_color and original_levelname in self.COLOR_MAP:
record.levelname = (
f"{self.COLOR_MAP[original_levelname]}{original_levelname}{self.RESET}"
)
try:
return super().format(record)
finally:
record.levelname = original_levelname
class _ContextAdapter(logging.LoggerAdapter):
"""Logger adapter that prefixes messages with structured context."""
def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
if not self.extra:
return msg, kwargs
context_parts = [
f"{key}={value}"
for key, value in self.extra.items()
if value is not None and value != ""
]
if context_parts:
msg = f"[{' '.join(context_parts)}] {msg}"
return msg, kwargs
[docs]
@dataclass
class LogConfiguration:
"""Programmatic logging configuration.
Alternative to ``configure_logging()`` for programmatic control over
logging settings.
Attributes:
console_level: Console log level (default "INFO").
console_format: Console format ("simple" or "detailed").
console_colors: Enable ANSI colors in console (default False).
file_enabled: Enable file logging (default True).
file_path: Log file path (None = auto-generate).
file_level: File log level (default "DEBUG").
file_format: File format ("simple" or "detailed").
file_rotation_mb: Max file size before rotation (default 10).
file_backup_count: Number of backup files to keep (default 5).
module_overrides: Per-module log level overrides.
Example:
>>> config = LogConfiguration.from_cli_args(verbose=True, log_file="analysis.log")
>>> config.apply()
>>> config = LogConfiguration(
... console_level="INFO",
... file_level="DEBUG",
... module_overrides={"jax": "WARNING", "heterodyne.optimization": "DEBUG"}
... )
>>> config.apply()
"""
console_level: str = "INFO"
console_format: str = "simple"
console_colors: bool = False
file_enabled: bool = True
file_path: str | Path | None = None
file_level: str = "DEBUG"
file_format: str = "detailed"
file_rotation_mb: int = 10
file_backup_count: int = 5
module_overrides: dict[str, str] = field(default_factory=dict)
[docs]
def apply(self) -> Path | None:
"""Apply this configuration to the logging system.
Returns:
Path to log file if file logging is enabled, None otherwise.
"""
# Suppress external library logging by default
default_suppressions = {
"jax": "WARNING",
"numpy": "WARNING",
"numba": "WARNING",
"h5py": "WARNING",
}
# Merge default suppressions with user overrides (user overrides win)
merged_overrides = {**default_suppressions, **self.module_overrides}
# Determine file path
file_path = None
if self.file_enabled:
if self.file_path is not None:
file_path = Path(self.file_path)
else:
# Auto-generate timestamped log file
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
file_path = Path(f"heterodyne_{timestamp}.log")
return _logger_manager.configure(
level="DEBUG", # Root level should be lowest to allow filtering
console_level=self.console_level,
console_format=self.console_format,
console_colors=self.console_colors,
file_path=file_path,
file_level=self.file_level if self.file_enabled else False,
max_size_mb=self.file_rotation_mb,
backup_count=self.file_backup_count,
module_levels=merged_overrides,
force=True,
)
[docs]
@classmethod
def from_dict(cls, config: dict[str, Any]) -> LogConfiguration:
"""Create configuration from dictionary.
Args:
config: Dictionary with configuration values.
Returns:
LogConfiguration instance.
"""
return cls(
console_level=config.get("console_level", "INFO"),
console_format=config.get("console_format", "simple"),
console_colors=config.get("console_colors", False),
file_enabled=config.get("file_enabled", True),
file_path=config.get("file_path"),
file_level=config.get("file_level", "DEBUG"),
file_format=config.get("file_format", "detailed"),
file_rotation_mb=config.get("file_rotation_mb", 10),
file_backup_count=config.get("file_backup_count", 5),
module_overrides=config.get("module_overrides", {}),
)
[docs]
@classmethod
def from_cli_args(
cls,
verbose: bool = False,
quiet: bool = False,
log_file: str | None = None,
) -> LogConfiguration:
"""Create configuration from CLI flags.
Args:
verbose: Enable DEBUG level console logging.
quiet: Enable ERROR-only console logging.
log_file: Path to log file (None = auto-generate if file logging enabled).
Returns:
LogConfiguration instance.
"""
# Determine console level from flags
if quiet:
console_level = "ERROR"
elif verbose:
console_level = "DEBUG"
else:
console_level = "INFO"
return cls(
console_level=console_level,
console_format="detailed" if verbose else "simple",
console_colors=False,
file_enabled=True,
file_path=log_file,
file_level="DEBUG",
file_format="detailed",
)
@dataclass
class _PhaseRecord:
"""Internal record for phase timing."""
name: str
start_time: float | None = None
end_time: float | None = None
memory_peak_gb: float | None = None
@property
def duration(self) -> float | None:
if self.start_time is None or self.end_time is None:
return None
return self.end_time - self.start_time
[docs]
class AnalysisSummaryLogger:
"""Structured logging for analysis completion summaries.
Tracks phase timings, metrics, output files, and convergence status
for logging a structured summary at analysis completion.
Example:
>>> summary = AnalysisSummaryLogger(run_id="analysis_001", analysis_mode="two_component")
>>> summary.start_phase("loading")
>>> data = load_data(config)
>>> summary.end_phase("loading", memory_peak_gb=2.1)
>>> summary.record_metric("chi_squared", result.chi_squared)
>>> summary.set_convergence_status("converged")
>>> summary.log_summary(logger)
"""
[docs]
def __init__(self, run_id: str, analysis_mode: str) -> None:
"""Initialize summary logger for an analysis run.
Args:
run_id: Unique identifier for this analysis run.
analysis_mode: Analysis mode (e.g., "two_component", "single_component").
"""
self.run_id = run_id
self.analysis_mode = analysis_mode
self._phases: dict[str, _PhaseRecord] = {}
self._metrics: dict[str, float] = {}
self._output_files: list[Path] = []
self._convergence_status: str | None = None
self._start_time = time.perf_counter()
self._warning_count = 0
self._error_count = 0
self._config_summary: dict[str, Any] = {}
[docs]
def start_phase(self, name: str) -> None:
"""Mark phase start for timing.
Args:
name: Phase name (e.g., "loading", "optimization").
"""
self._phases[name] = _PhaseRecord(name=name, start_time=time.perf_counter())
[docs]
def end_phase(self, name: str, memory_peak_gb: float | None = None) -> None:
"""Mark phase completion.
Args:
name: Phase name that was started.
memory_peak_gb: Optional peak memory usage during phase.
"""
if name in self._phases:
self._phases[name].end_time = time.perf_counter()
self._phases[name].memory_peak_gb = memory_peak_gb
[docs]
def record_metric(self, name: str, value: float) -> None:
"""Record a named metric (e.g., chi_squared).
Args:
name: Metric name.
value: Metric value.
"""
self._metrics[name] = value
[docs]
def add_output_file(self, path: Path | str) -> None:
"""Record an output file path.
Args:
path: Path to output file.
"""
self._output_files.append(Path(path))
# Statuses that indicate a failure outcome. Once any of these has been
# recorded, calls with a non-failure status are ignored so a partial
# success cannot silently mask the failure. Defends against a future bug
# where some code path optimistically calls
# ``set_convergence_status("converged")`` after a failure was already
# recorded — the het_a10cf27e regression mode would have caught itself
# at the logger boundary even before the dispatch-side fix.
_FAILURE_STATUSES: frozenset[str] = frozenset(
{"failed", "not_converged", "max_iter"}
)
[docs]
def set_convergence_status(self, status: str) -> None:
"""Set final convergence status (failure is sticky).
Once a failure status ("failed", "not_converged", "max_iter") has
been recorded, subsequent calls with a non-failure status are
ignored. Failure-to-failure transitions and success-to-failure
transitions are always allowed.
Args:
status: Convergence status (e.g., "converged", "max_iter", "failed").
"""
current = self._convergence_status
if current is None:
self._convergence_status = status
return
# If the current status is a failure, only another failure may overwrite.
if current in self._FAILURE_STATUSES and status not in self._FAILURE_STATUSES:
return
self._convergence_status = status
@property
def convergence_status(self) -> str | None:
"""Final convergence status recorded for this run (read-only)."""
return self._convergence_status
[docs]
def is_failure(self) -> bool:
"""Return True when the recorded status indicates a failed run."""
return self._convergence_status in self._FAILURE_STATUSES
[docs]
def increment_warning_count(self) -> None:
"""Increment warning counter."""
self._warning_count += 1
[docs]
def increment_error_count(self) -> None:
"""Increment error counter."""
self._error_count += 1
[docs]
def set_config_summary(
self,
optimizer: str | None = None,
n_params: int | None = None,
n_data_points: int | None = None,
n_phi_angles: int | None = None,
data_file: str | None = None,
**kwargs: Any,
) -> None:
"""Set configuration summary for logging.
Args:
optimizer: Optimizer used (e.g., "nlsq", "cmc").
n_params: Number of parameters being optimized.
n_data_points: Total number of data points.
n_phi_angles: Number of phi angles.
data_file: Path to data file.
**kwargs: Additional key-value pairs to include.
"""
if optimizer is not None:
self._config_summary["optimizer"] = optimizer
if n_params is not None:
self._config_summary["n_params"] = n_params
if n_data_points is not None:
self._config_summary["n_data_points"] = n_data_points
if n_phi_angles is not None:
self._config_summary["n_phi_angles"] = n_phi_angles
if data_file is not None:
self._config_summary["data_file"] = data_file
self._config_summary.update(kwargs)
[docs]
def log_summary(self, logger: logging.Logger | logging.LoggerAdapter[Any]) -> None:
"""Log the complete analysis summary.
Args:
logger: Logger to use for output.
"""
total_runtime = time.perf_counter() - self._start_time
lines = [
"=" * 60,
"ANALYSIS SUMMARY",
"=" * 60,
f"Run ID: {self.run_id}",
f"Mode: {self.analysis_mode}",
f"Status: {self._convergence_status or 'unknown'}",
f"Total runtime: {total_runtime:.2f}s",
]
# Configuration summary
if self._config_summary:
lines.append("")
lines.append("Configuration:")
for key, value in self._config_summary.items():
if isinstance(value, int) and value > 1000:
lines.append(f" {key}: {value:,}")
else:
lines.append(f" {key}: {value}")
# Phase timings
if self._phases:
lines.append("")
lines.append("Phase Timings:")
for name, record in self._phases.items():
duration = record.duration
if duration is not None:
mem_str = (
f" (peak: {record.memory_peak_gb:.1f} GB)"
if record.memory_peak_gb is not None
else ""
)
lines.append(f" {name}: {duration:.2f}s{mem_str}")
# Metrics
if self._metrics:
lines.append("")
lines.append("Metrics:")
for name, value in self._metrics.items():
lines.append(f" {name}: {value:.6g}")
# Output files
if self._output_files:
lines.append("")
lines.append("Output files:")
for path in self._output_files:
lines.append(f" {path}")
# Warning/error counts
if self._warning_count > 0 or self._error_count > 0:
lines.append("")
lines.append(
f"Warnings: {self._warning_count}, Errors: {self._error_count}"
)
lines.append("=" * 60)
logger.info("\n".join(lines))
[docs]
def as_dict(self) -> dict[str, Any]:
"""Export summary as dictionary for JSON serialization.
Returns:
Dictionary with all summary data.
"""
total_runtime = time.perf_counter() - self._start_time
phases_dict: dict[str, Any] = {}
for name, record in self._phases.items():
phases_dict[name] = {
"duration_s": record.duration,
"memory_peak_gb": record.memory_peak_gb,
}
# Sanitize metrics so NaN/Inf floats do not cause json.dump to fail.
try:
from heterodyne.io.json_utils import json_safe as _json_safe
except ImportError:
def _json_safe(value: Any) -> Any: # type: ignore[misc]
"""Minimal fallback when io module unavailable."""
if isinstance(value, float):
import math
if math.isnan(value):
return None
if math.isinf(value):
return str(value)
return value
return {
"run_id": self.run_id,
"analysis_mode": self.analysis_mode,
"convergence_status": self._convergence_status,
"total_runtime_s": total_runtime,
"config_summary": self._config_summary,
"phases": phases_dict,
"metrics": _json_safe(self._metrics),
"output_files": [str(p) for p in self._output_files],
"warning_count": self._warning_count,
"error_count": self._error_count,
}
[docs]
class MinimalLogger:
"""Configurable logger manager for the heterodyne package.
Thread-safe singleton for managing heterodyne logging configuration.
"""
_instance: MinimalLogger | None = None
_initialized: bool
_configured: bool
_root_logger_name: str
_lock: threading.Lock
[docs]
def __new__(cls) -> MinimalLogger:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
cls._instance._lock = threading.Lock()
return cls._instance
[docs]
def __init__(self) -> None:
if self._initialized:
return
self._configured = False
self._root_logger_name = "heterodyne"
self._initialized = True
@staticmethod
def _build_formatter(
format_name: str = "detailed",
use_color: bool = False,
) -> logging.Formatter:
fmt = (
DEFAULT_FORMAT_SIMPLE
if format_name == "simple"
else DEFAULT_FORMAT_DETAILED
)
return _ColorFormatter(
fmt=fmt, datefmt="%Y-%m-%d %H:%M:%S", use_color=use_color
)
def _clear_managed_handlers(self, logger: logging.Logger) -> None:
for handler in list(logger.handlers):
if getattr(handler, "_heterodyne_managed", False):
logger.removeHandler(handler)
handler.close()
def _configure_impl(
self,
level: str | int = "INFO",
*,
console_level: str | int | None = None,
console_format: str = "detailed",
console_colors: bool = False,
file_path: str | Path | None = None,
file_level: str | int | None = None,
max_size_mb: int = 10,
backup_count: int = 5,
module_levels: Mapping[str, str | int] | None = None,
force: bool = False,
) -> Path | None:
"""Internal implementation of configure (called under lock)."""
root_logger = logging.getLogger(self._root_logger_name)
if force:
self._clear_managed_handlers(root_logger)
root_level_candidates = [_resolve_level(level)]
if console_level is not False:
root_level_candidates.append(_resolve_level(console_level))
if file_level is not False:
root_level_candidates.append(_resolve_level(file_level))
root_level = min(lvl for lvl in root_level_candidates if lvl is not None)
root_logger.setLevel(root_level)
root_logger.propagate = False
# Console handler
console_handler: logging.Handler | None = None
for handler in root_logger.handlers:
if isinstance(handler, logging.StreamHandler) and not isinstance(
handler,
logging.FileHandler,
):
console_handler = handler
break
if console_level is not False:
if console_handler is None:
console_handler = logging.StreamHandler(sys.stdout)
console_handler._heterodyne_managed = True # type: ignore[attr-defined]
root_logger.addHandler(console_handler)
console_handler.setLevel(_resolve_level(console_level) or root_level)
console_handler.setFormatter(
self._build_formatter(console_format, use_color=console_colors)
)
# File handler
created_file: Path | None = None
if file_path:
resolved_path = Path(file_path)
try:
resolved_path.parent.mkdir(parents=True, exist_ok=True)
except OSError as e:
warn_logger = logging.getLogger(self._root_logger_name)
warn_logger.warning(
"Cannot create log directory %s: %s. File logging disabled.",
resolved_path.parent,
e,
)
resolved_path = None # type: ignore[assignment]
if resolved_path is not None:
created_file = resolved_path
max_bytes = int(max_size_mb * 1024 * 1024)
file_handler: logging.Handler
if max_bytes > 0:
file_handler = RotatingFileHandler(
resolved_path,
maxBytes=max_bytes,
backupCount=backup_count,
)
else:
file_handler = logging.FileHandler(resolved_path)
file_handler._heterodyne_managed = True # type: ignore[attr-defined]
file_handler.setLevel(_resolve_level(file_level) or root_level)
file_handler.setFormatter(
logging.Formatter(
DEFAULT_FORMAT_DETAILED,
datefmt="%Y-%m-%d %H:%M:%S",
)
)
root_logger.addHandler(file_handler)
# Default suppression for external libraries
default_suppressions = {
"jax": "WARNING",
"numpy": "WARNING",
"numba": "WARNING",
"h5py": "WARNING",
}
for lib_name, lib_level in default_suppressions.items():
lib_logger = logging.getLogger(lib_name)
if lib_logger.level == logging.NOTSET:
lib_logger.setLevel(_resolve_level(lib_level) or logging.WARNING)
# Module-specific overrides (user overrides win over defaults)
if module_levels:
for module_name, module_level in module_levels.items():
logging.getLogger(module_name).setLevel(
_resolve_level(module_level) or root_level
)
self._configured = True
return created_file
[docs]
def get_logger(self, name: str) -> logging.Logger:
"""Get or create a logger with hierarchical naming."""
if not name.startswith(self._root_logger_name):
if name == "__main__":
full_name = f"{self._root_logger_name}.main"
elif "." in name and name.startswith("heterodyne"):
full_name = name
else:
full_name = f"{self._root_logger_name}.{name}"
else:
full_name = name
if not self._configured:
self.configure()
return logging.getLogger(full_name)
# Global logger manager instance
_logger_manager = MinimalLogger()
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
[docs]
def get_logger(
name: str | None = None,
*,
context: Mapping[str, Any] | None = None,
) -> logging.Logger | logging.LoggerAdapter[logging.Logger]:
"""Get a logger instance with automatic naming and optional context.
Delegates to ``logging.getLogger`` via ``MinimalLogger``, which caches
logger instances by name. Callers that previously used::
get_logger("heterodyne")
or::
get_logger(__name__)
continue to work unchanged. The optional *context* keyword adds structured
key-value prefixes to every message emitted through the returned adapter.
Args:
name: Logger name, typically ``__name__``. When ``None`` the caller's
``__name__`` is inferred automatically via frame inspection.
context: Optional mapping of key-value pairs to include as a prefix
on every log message (e.g. ``{"run_id": "abc", "q_bin": 5}``).
Returns:
A plain :class:`logging.Logger` when *context* is ``None``, or a
:class:`_ContextAdapter` that prepends ``[key=value ...]`` to every
message when *context* is provided.
"""
if name is None:
name = "heterodyne"
base_logger = _logger_manager.get_logger(name)
if context:
return _ContextAdapter(base_logger, dict(context))
return base_logger
[docs]
def with_context(
logger: logging.Logger | logging.LoggerAdapter[logging.Logger],
**context: Any,
) -> logging.LoggerAdapter[logging.Logger]:
"""Create a contextual logger with key-value prefixes.
Context is formatted as ``[key=value key2=value2] message``.
Nested calls merge contexts (inner overrides outer on key conflicts).
Thread-safe for use in multiprocessing.
Args:
logger: Base logger or existing contextual adapter to wrap.
**context: Key-value pairs to include as prefix.
Returns:
A logger adapter that prefixes all messages with context.
Example:
>>> logger = get_logger(__name__)
>>> ctx_logger = with_context(logger, run_id="abc123", mode="two_component")
>>> ctx_logger.info("Starting analysis")
# Output: [run_id=abc123 mode=two_component] Starting analysis
>>> # Nested context
>>> shard_logger = with_context(ctx_logger, q_bin=5)
>>> shard_logger.info("Processing bin")
# Output: [run_id=abc123 mode=two_component q_bin=5] Processing bin
"""
# Filter out None values from new context
new_context = {k: v for k, v in context.items() if v is not None}
# If wrapping an existing _ContextAdapter, merge contexts (inner overrides outer)
if isinstance(logger, _ContextAdapter):
merged_context = dict(logger.extra) if logger.extra else {}
merged_context.update(new_context)
# Get the underlying logger to avoid nested adapters
base_logger = logger.logger
return _ContextAdapter(base_logger, merged_context)
# If wrapping a LoggerAdapter (not our _ContextAdapter), extract base logger
if isinstance(logger, logging.LoggerAdapter):
base_logger = logger.logger
return _ContextAdapter(base_logger, new_context)
# Wrapping a plain Logger
return _ContextAdapter(logger, new_context)
[docs]
@dataclass
class PhaseContext:
"""Context object returned by :func:`log_phase` with timing and memory info."""
name: str
duration: float = 0.0
memory_peak_gb: float | None = None
memory_delta_gb: float | None = None
def _get_memory_gb() -> float | None:
"""Get current process memory usage in GB, or None if unavailable."""
try:
import resource
rusage = resource.getrusage(resource.RUSAGE_SELF)
# maxrss is in KB on Linux, bytes on macOS
if sys.platform == "darwin":
return rusage.ru_maxrss / (1024**3) # bytes to GB
return rusage.ru_maxrss / (1024**2) # KB to GB
except (ImportError, AttributeError):
return None
[docs]
@contextmanager
def log_phase(
name: str,
logger: LoggerType | None = None,
level: int = logging.INFO,
track_memory: bool = False,
threshold_s: float = 0.0,
) -> Generator[PhaseContext]:
"""Context manager for phase-level timing with optional memory tracking.
Args:
name: Phase name for logging.
logger: Logger to use. If ``None``, uses the caller's module logger.
level: Log level for phase messages.
track_memory: Track memory usage during phase.
threshold_s: Only log if duration > threshold (0 = always log).
Yields:
:class:`PhaseContext` with ``name``, ``duration``,
``memory_peak_gb``, and ``memory_delta_gb``. Duration and memory
values are populated after the context exits.
Example:
>>> with log_phase("optimization", track_memory=True) as phase:
... result = run_optimization(data)
>>> print(f"Took {phase.duration:.1f}s")
# Logs: Phase 'optimization' completed in 45.3s (peak memory: 12.4 GB)
"""
resolved_logger = get_logger() if logger is None else logger
context = PhaseContext(name=name)
memory_start: float | None = None
if track_memory:
memory_start = _get_memory_gb()
if threshold_s <= 0:
resolved_logger.log(level, "Phase '%s' started", name)
start_time = time.perf_counter()
try:
yield context
finally:
context.duration = time.perf_counter() - start_time
if track_memory:
memory_end = _get_memory_gb()
if memory_end is not None:
context.memory_peak_gb = memory_end
if memory_start is not None:
context.memory_delta_gb = memory_end - memory_start
if context.duration >= threshold_s:
failed = sys.exc_info()[1] is not None
outcome = "failed" if failed else "completed"
msg_parts = [f"Phase '{name}' {outcome} in {context.duration:.2f}s"]
if context.memory_peak_gb is not None:
msg_parts.append(f"(peak memory: {context.memory_peak_gb:.1f} GB)")
log_level = logging.ERROR if failed else level
resolved_logger.log(log_level, " ".join(msg_parts))
[docs]
def log_exception(
logger: logging.Logger | logging.LoggerAdapter[logging.Logger],
exc: BaseException,
context: dict[str, Any] | None = None,
level: int = logging.ERROR,
include_traceback: bool = True,
) -> None:
"""Log an exception with full context for debugging.
Extracts module, function, and line number from exception traceback.
Formats context as key-value pairs in the message.
Args:
logger: Logger to use.
exc: Exception to log.
context: Additional context (e.g., parameter values).
level: Log level (default ERROR).
include_traceback: Include full traceback (default True).
Example:
>>> try:
... result = compute_jacobian(params)
... except ValueError as e:
... log_exception(logger, e, context={
... "iteration": 45,
... "params": params.tolist()[:5]
... })
... raise
# Logs:
# ERROR | heterodyne.optimization.nlsq.core | Exception in compute_jacobian:
# ValueError: invalid value
# Context: iteration=45, params=[1.2e-11, 0.85, ...]
# Traceback (most recent call last):
# ...
"""
tb = exc.__traceback__
location_info = ""
if tb is not None:
while tb.tb_next is not None:
tb = tb.tb_next
frame = tb.tb_frame
func_name = frame.f_code.co_name
line_no = tb.tb_lineno
module_name = frame.f_globals.get("__name__", "unknown")
location_info = f" in {module_name}.{func_name}:{line_no}"
exc_type = type(exc).__name__
exc_msg = str(exc)
msg_parts = [f"Exception{location_info}: {exc_type}: {exc_msg}"]
if context:
context_str = ", ".join(f"{k}={v!r}" for k, v in context.items())
msg_parts.append(f"Context: {context_str}")
if include_traceback:
tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__))
msg_parts.append(f"Traceback:\n{tb_str}")
logger.log(level, "\n".join(msg_parts))
[docs]
def log_calls(
logger: LoggerType | None = None,
level: int = logging.DEBUG,
include_args: bool = False,
include_result: bool = False,
) -> Callable[[F], F]:
"""Decorator to log function calls.
Args:
logger: Logger to use. If ``None``, creates one for the decorated
function's module.
level: Logging level to use.
include_args: Whether to log function arguments.
include_result: Whether to log the function return value.
Returns:
Decorator that wraps *func* with entry/exit logging.
"""
resolved_logger: LoggerType | None = logger
def decorator(func: F) -> F:
nonlocal resolved_logger
if resolved_logger is None:
resolved_logger = get_logger(func.__module__)
@functools.wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
# ``resolved_logger`` is guaranteed non-None: the enclosing
# ``decorator`` populates it via ``get_logger`` if the caller
# passed ``None``.
assert resolved_logger is not None # noqa: S101 — narrowing only
rlog = resolved_logger
log_enabled = rlog.isEnabledFor(level)
# Always compute func_name so unbound-var paths can't trip pyright.
func_name = f"{func.__module__}.{func.__qualname__}"
if log_enabled:
if include_args:
args_str = ", ".join([repr(arg) for arg in args])
kwargs_str = ", ".join([f"{k}={v!r}" for k, v in kwargs.items()])
all_args = ", ".join(filter(None, [args_str, kwargs_str]))
rlog.log(level, "Calling %s(%s)", func_name, all_args)
else:
rlog.log(level, "Calling %s", func_name)
try:
result = func(*args, **kwargs)
if log_enabled:
if include_result:
rlog.log(level, "Completed %s -> %r", func_name, result)
else:
rlog.log(level, "Completed %s", func_name)
return result
except Exception as e:
rlog.log(logging.ERROR, "Exception in %s: %s", func_name, e)
raise
return wrapper # type: ignore[return-value]
return decorator
[docs]
@contextmanager
def log_operation(
operation_name: str,
logger: LoggerType | None = None,
level: int = logging.INFO,
) -> Generator[LoggerType]:
"""Context manager for logging named operations with timing.
Args:
operation_name: Human-readable name of the operation.
logger: Logger to use. If ``None``, uses the caller's module logger.
level: Logging level to use.
Yields:
The resolved logger so callers can emit additional messages inside
the ``with`` block without holding a separate reference.
Example:
>>> with log_operation("jacobian computation") as log:
... J = compute_jacobian(params)
... log.debug("Jacobian shape: %s", J.shape)
# Logs: Starting operation: jacobian computation
# Logs: Completed operation: jacobian computation in 0.042s
"""
resolved_logger = get_logger() if logger is None else logger
resolved_logger.log(level, "Starting operation: %s", operation_name)
start_time = time.perf_counter()
try:
yield resolved_logger
duration = time.perf_counter() - start_time
resolved_logger.log(
level, "Completed operation: %s in %.3fs", operation_name, duration
)
except Exception as e:
duration = time.perf_counter() - start_time
resolved_logger.log(
logging.ERROR,
"Failed operation: %s after %.3fs: %s",
operation_name,
duration,
e,
)
raise
[docs]
class ConvergenceLogger:
"""Structured logger for optimization convergence diagnostics."""
[docs]
def __init__(self, logger: logging.Logger | None = None) -> None:
self.logger = logger or get_logger("heterodyne.optimization")
self._iteration = 0
[docs]
def log_iteration(
self,
iteration: int,
loss: float,
gradient_norm: float | None = None,
step_size: float | None = None,
) -> None:
"""Log optimization iteration metrics."""
self._iteration = iteration
msg = f"iter={iteration:4d} | loss={loss:.6e}"
if gradient_norm is not None:
msg += f" | grad_norm={gradient_norm:.3e}"
if step_size is not None:
msg += f" | step={step_size:.3e}"
self.logger.debug(msg)
[docs]
def log_convergence(self, reason: str, final_loss: float) -> None:
"""Log convergence result."""
self.logger.info(
"Converged: %s at iteration %d | final_loss=%.6e",
reason,
self._iteration,
final_loss,
)
[docs]
def log_diagnostic(
self,
metric_name: str,
value: float,
threshold: float,
higher_is_better: bool = True,
) -> None:
"""Log diagnostic metric with pass/fail status.
Args:
metric_name: Name of the diagnostic metric.
value: Observed value.
threshold: Threshold for pass/fail.
higher_is_better: If ``True``, values above threshold pass (e.g. ESS).
If ``False``, values below threshold pass (e.g. R-hat).
"""
if higher_is_better:
status = "PASS" if value >= threshold else "WARN"
else:
status = "PASS" if value <= threshold else "WARN"
self.logger.info(
"[%s] %s: %.4f (threshold: %.4f)", status, metric_name, value, threshold
)
# Configure default logging on import
_logger_manager.configure()