Source code for heterodyne.utils.logging

"""Structured logging utilities for the heterodyne package.

Provides a lightweight but flexible logging system that matches the CMC
reimplementation requirements: contextual log prefixes, configurable console
and rotating file handlers, and helpers for performance monitoring.
"""

from __future__ import annotations

import functools
import logging
import sys
import threading
import time
import traceback
from collections.abc import Callable, Generator, Mapping
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime
from logging.handlers import RotatingFileHandler
from pathlib import Path
from typing import (
    Any,
    Literal,
    TypeVar,
)

# Type variables for decorators
F = TypeVar("F", bound=Callable[..., Any])

# Type alias for logger types
LoggerType = logging.Logger | logging.LoggerAdapter[logging.Logger]

DEFAULT_FORMAT_DETAILED = "%(asctime)s | %(levelname)-8s | %(name)s | %(message)s"
DEFAULT_FORMAT_SIMPLE = "%(levelname)-8s | %(message)s"


def _resolve_level(level: str | int | None) -> int | None:
    """Convert string/int log level to logging level constant."""
    if level is None:
        return None
    if isinstance(level, int):
        return level
    return getattr(logging, str(level).upper(), logging.INFO)


class _ColorFormatter(logging.Formatter):
    """Optional ANSI color formatter for console logging."""

    COLOR_MAP = {
        "DEBUG": "\033[36m",  # Cyan
        "INFO": "\033[32m",  # Green
        "WARNING": "\033[33m",  # Yellow
        "ERROR": "\033[31m",  # Red
        "CRITICAL": "\033[35m",  # Magenta
    }
    RESET = "\033[0m"

    def __init__(self, fmt: str, datefmt: str | None, use_color: bool) -> None:
        super().__init__(fmt=fmt, datefmt=datefmt)
        self.use_color = use_color

    def format(self, record: logging.LogRecord) -> str:
        original_levelname = record.levelname
        if self.use_color and original_levelname in self.COLOR_MAP:
            record.levelname = (
                f"{self.COLOR_MAP[original_levelname]}{original_levelname}{self.RESET}"
            )
        try:
            return super().format(record)
        finally:
            record.levelname = original_levelname


class _ContextAdapter(logging.LoggerAdapter):
    """Logger adapter that prefixes messages with structured context."""

    def process(self, msg: str, kwargs: Any) -> tuple[str, Any]:
        if not self.extra:
            return msg, kwargs

        context_parts = [
            f"{key}={value}"
            for key, value in self.extra.items()
            if value is not None and value != ""
        ]
        if context_parts:
            msg = f"[{' '.join(context_parts)}] {msg}"
        return msg, kwargs


[docs] @dataclass class LogConfiguration: """Programmatic logging configuration. Alternative to ``configure_logging()`` for programmatic control over logging settings. Attributes: console_level: Console log level (default "INFO"). console_format: Console format ("simple" or "detailed"). console_colors: Enable ANSI colors in console (default False). file_enabled: Enable file logging (default True). file_path: Log file path (None = auto-generate). file_level: File log level (default "DEBUG"). file_format: File format ("simple" or "detailed"). file_rotation_mb: Max file size before rotation (default 10). file_backup_count: Number of backup files to keep (default 5). module_overrides: Per-module log level overrides. Example: >>> config = LogConfiguration.from_cli_args(verbose=True, log_file="analysis.log") >>> config.apply() >>> config = LogConfiguration( ... console_level="INFO", ... file_level="DEBUG", ... module_overrides={"jax": "WARNING", "heterodyne.optimization": "DEBUG"} ... ) >>> config.apply() """ console_level: str = "INFO" console_format: str = "simple" console_colors: bool = False file_enabled: bool = True file_path: str | Path | None = None file_level: str = "DEBUG" file_format: str = "detailed" file_rotation_mb: int = 10 file_backup_count: int = 5 module_overrides: dict[str, str] = field(default_factory=dict)
[docs] def apply(self) -> Path | None: """Apply this configuration to the logging system. Returns: Path to log file if file logging is enabled, None otherwise. """ # Suppress external library logging by default default_suppressions = { "jax": "WARNING", "numpy": "WARNING", "numba": "WARNING", "h5py": "WARNING", } # Merge default suppressions with user overrides (user overrides win) merged_overrides = {**default_suppressions, **self.module_overrides} # Determine file path file_path = None if self.file_enabled: if self.file_path is not None: file_path = Path(self.file_path) else: # Auto-generate timestamped log file timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") file_path = Path(f"heterodyne_{timestamp}.log") return _logger_manager.configure( level="DEBUG", # Root level should be lowest to allow filtering console_level=self.console_level, console_format=self.console_format, console_colors=self.console_colors, file_path=file_path, file_level=self.file_level if self.file_enabled else False, max_size_mb=self.file_rotation_mb, backup_count=self.file_backup_count, module_levels=merged_overrides, force=True, )
[docs] @classmethod def from_dict(cls, config: dict[str, Any]) -> LogConfiguration: """Create configuration from dictionary. Args: config: Dictionary with configuration values. Returns: LogConfiguration instance. """ return cls( console_level=config.get("console_level", "INFO"), console_format=config.get("console_format", "simple"), console_colors=config.get("console_colors", False), file_enabled=config.get("file_enabled", True), file_path=config.get("file_path"), file_level=config.get("file_level", "DEBUG"), file_format=config.get("file_format", "detailed"), file_rotation_mb=config.get("file_rotation_mb", 10), file_backup_count=config.get("file_backup_count", 5), module_overrides=config.get("module_overrides", {}), )
[docs] @classmethod def from_cli_args( cls, verbose: bool = False, quiet: bool = False, log_file: str | None = None, ) -> LogConfiguration: """Create configuration from CLI flags. Args: verbose: Enable DEBUG level console logging. quiet: Enable ERROR-only console logging. log_file: Path to log file (None = auto-generate if file logging enabled). Returns: LogConfiguration instance. """ # Determine console level from flags if quiet: console_level = "ERROR" elif verbose: console_level = "DEBUG" else: console_level = "INFO" return cls( console_level=console_level, console_format="detailed" if verbose else "simple", console_colors=False, file_enabled=True, file_path=log_file, file_level="DEBUG", file_format="detailed", )
@dataclass class _PhaseRecord: """Internal record for phase timing.""" name: str start_time: float | None = None end_time: float | None = None memory_peak_gb: float | None = None @property def duration(self) -> float | None: if self.start_time is None or self.end_time is None: return None return self.end_time - self.start_time
[docs] class AnalysisSummaryLogger: """Structured logging for analysis completion summaries. Tracks phase timings, metrics, output files, and convergence status for logging a structured summary at analysis completion. Example: >>> summary = AnalysisSummaryLogger(run_id="analysis_001", analysis_mode="two_component") >>> summary.start_phase("loading") >>> data = load_data(config) >>> summary.end_phase("loading", memory_peak_gb=2.1) >>> summary.record_metric("chi_squared", result.chi_squared) >>> summary.set_convergence_status("converged") >>> summary.log_summary(logger) """
[docs] def __init__(self, run_id: str, analysis_mode: str) -> None: """Initialize summary logger for an analysis run. Args: run_id: Unique identifier for this analysis run. analysis_mode: Analysis mode (e.g., "two_component", "single_component"). """ self.run_id = run_id self.analysis_mode = analysis_mode self._phases: dict[str, _PhaseRecord] = {} self._metrics: dict[str, float] = {} self._output_files: list[Path] = [] self._convergence_status: str | None = None self._start_time = time.perf_counter() self._warning_count = 0 self._error_count = 0 self._config_summary: dict[str, Any] = {}
[docs] def start_phase(self, name: str) -> None: """Mark phase start for timing. Args: name: Phase name (e.g., "loading", "optimization"). """ self._phases[name] = _PhaseRecord(name=name, start_time=time.perf_counter())
[docs] def end_phase(self, name: str, memory_peak_gb: float | None = None) -> None: """Mark phase completion. Args: name: Phase name that was started. memory_peak_gb: Optional peak memory usage during phase. """ if name in self._phases: self._phases[name].end_time = time.perf_counter() self._phases[name].memory_peak_gb = memory_peak_gb
[docs] def record_metric(self, name: str, value: float) -> None: """Record a named metric (e.g., chi_squared). Args: name: Metric name. value: Metric value. """ self._metrics[name] = value
[docs] def add_output_file(self, path: Path | str) -> None: """Record an output file path. Args: path: Path to output file. """ self._output_files.append(Path(path))
# Statuses that indicate a failure outcome. Once any of these has been # recorded, calls with a non-failure status are ignored so a partial # success cannot silently mask the failure. Defends against a future bug # where some code path optimistically calls # ``set_convergence_status("converged")`` after a failure was already # recorded — the het_a10cf27e regression mode would have caught itself # at the logger boundary even before the dispatch-side fix. _FAILURE_STATUSES: frozenset[str] = frozenset( {"failed", "not_converged", "max_iter"} )
[docs] def set_convergence_status(self, status: str) -> None: """Set final convergence status (failure is sticky). Once a failure status ("failed", "not_converged", "max_iter") has been recorded, subsequent calls with a non-failure status are ignored. Failure-to-failure transitions and success-to-failure transitions are always allowed. Args: status: Convergence status (e.g., "converged", "max_iter", "failed"). """ current = self._convergence_status if current is None: self._convergence_status = status return # If the current status is a failure, only another failure may overwrite. if current in self._FAILURE_STATUSES and status not in self._FAILURE_STATUSES: return self._convergence_status = status
@property def convergence_status(self) -> str | None: """Final convergence status recorded for this run (read-only).""" return self._convergence_status
[docs] def is_failure(self) -> bool: """Return True when the recorded status indicates a failed run.""" return self._convergence_status in self._FAILURE_STATUSES
[docs] def increment_warning_count(self) -> None: """Increment warning counter.""" self._warning_count += 1
[docs] def increment_error_count(self) -> None: """Increment error counter.""" self._error_count += 1
[docs] def set_config_summary( self, optimizer: str | None = None, n_params: int | None = None, n_data_points: int | None = None, n_phi_angles: int | None = None, data_file: str | None = None, **kwargs: Any, ) -> None: """Set configuration summary for logging. Args: optimizer: Optimizer used (e.g., "nlsq", "cmc"). n_params: Number of parameters being optimized. n_data_points: Total number of data points. n_phi_angles: Number of phi angles. data_file: Path to data file. **kwargs: Additional key-value pairs to include. """ if optimizer is not None: self._config_summary["optimizer"] = optimizer if n_params is not None: self._config_summary["n_params"] = n_params if n_data_points is not None: self._config_summary["n_data_points"] = n_data_points if n_phi_angles is not None: self._config_summary["n_phi_angles"] = n_phi_angles if data_file is not None: self._config_summary["data_file"] = data_file self._config_summary.update(kwargs)
[docs] def log_summary(self, logger: logging.Logger | logging.LoggerAdapter[Any]) -> None: """Log the complete analysis summary. Args: logger: Logger to use for output. """ total_runtime = time.perf_counter() - self._start_time lines = [ "=" * 60, "ANALYSIS SUMMARY", "=" * 60, f"Run ID: {self.run_id}", f"Mode: {self.analysis_mode}", f"Status: {self._convergence_status or 'unknown'}", f"Total runtime: {total_runtime:.2f}s", ] # Configuration summary if self._config_summary: lines.append("") lines.append("Configuration:") for key, value in self._config_summary.items(): if isinstance(value, int) and value > 1000: lines.append(f" {key}: {value:,}") else: lines.append(f" {key}: {value}") # Phase timings if self._phases: lines.append("") lines.append("Phase Timings:") for name, record in self._phases.items(): duration = record.duration if duration is not None: mem_str = ( f" (peak: {record.memory_peak_gb:.1f} GB)" if record.memory_peak_gb is not None else "" ) lines.append(f" {name}: {duration:.2f}s{mem_str}") # Metrics if self._metrics: lines.append("") lines.append("Metrics:") for name, value in self._metrics.items(): lines.append(f" {name}: {value:.6g}") # Output files if self._output_files: lines.append("") lines.append("Output files:") for path in self._output_files: lines.append(f" {path}") # Warning/error counts if self._warning_count > 0 or self._error_count > 0: lines.append("") lines.append( f"Warnings: {self._warning_count}, Errors: {self._error_count}" ) lines.append("=" * 60) logger.info("\n".join(lines))
[docs] def as_dict(self) -> dict[str, Any]: """Export summary as dictionary for JSON serialization. Returns: Dictionary with all summary data. """ total_runtime = time.perf_counter() - self._start_time phases_dict: dict[str, Any] = {} for name, record in self._phases.items(): phases_dict[name] = { "duration_s": record.duration, "memory_peak_gb": record.memory_peak_gb, } # Sanitize metrics so NaN/Inf floats do not cause json.dump to fail. try: from heterodyne.io.json_utils import json_safe as _json_safe except ImportError: def _json_safe(value: Any) -> Any: # type: ignore[misc] """Minimal fallback when io module unavailable.""" if isinstance(value, float): import math if math.isnan(value): return None if math.isinf(value): return str(value) return value return { "run_id": self.run_id, "analysis_mode": self.analysis_mode, "convergence_status": self._convergence_status, "total_runtime_s": total_runtime, "config_summary": self._config_summary, "phases": phases_dict, "metrics": _json_safe(self._metrics), "output_files": [str(p) for p in self._output_files], "warning_count": self._warning_count, "error_count": self._error_count, }
[docs] class MinimalLogger: """Configurable logger manager for the heterodyne package. Thread-safe singleton for managing heterodyne logging configuration. """ _instance: MinimalLogger | None = None _initialized: bool _configured: bool _root_logger_name: str _lock: threading.Lock
[docs] def __new__(cls) -> MinimalLogger: if cls._instance is None: cls._instance = super().__new__(cls) cls._instance._initialized = False cls._instance._lock = threading.Lock() return cls._instance
[docs] def __init__(self) -> None: if self._initialized: return self._configured = False self._root_logger_name = "heterodyne" self._initialized = True
@staticmethod def _build_formatter( format_name: str = "detailed", use_color: bool = False, ) -> logging.Formatter: fmt = ( DEFAULT_FORMAT_SIMPLE if format_name == "simple" else DEFAULT_FORMAT_DETAILED ) return _ColorFormatter( fmt=fmt, datefmt="%Y-%m-%d %H:%M:%S", use_color=use_color ) def _clear_managed_handlers(self, logger: logging.Logger) -> None: for handler in list(logger.handlers): if getattr(handler, "_heterodyne_managed", False): logger.removeHandler(handler) handler.close()
[docs] def configure( self, level: str | int = "INFO", *, console_level: str | int | None = None, console_format: str = "detailed", console_colors: bool = False, file_path: str | Path | None = None, file_level: str | int | None = None, max_size_mb: int = 10, backup_count: int = 5, module_levels: Mapping[str, str | int] | None = None, force: bool = False, ) -> Path | None: """Configure heterodyne logging. Thread-safe configuration of the logging system. Args: level: Root logger level. console_level: Console handler level. Pass ``False`` to suppress the console handler entirely. console_format: "simple" or "detailed". console_colors: Enable ANSI color output on the console. file_path: Path to log file. ``None`` disables file logging. file_level: File handler level. Pass ``False`` to suppress the file handler even when *file_path* is set. max_size_mb: Rotating log file maximum size in MB. backup_count: Number of backup files to keep. module_levels: Per-module level overrides. force: Remove existing managed handlers before reconfiguring. Returns: Path to the log file if a file handler was created, else ``None``. """ with self._lock: return self._configure_impl( level=level, console_level=console_level, console_format=console_format, console_colors=console_colors, file_path=file_path, file_level=file_level, max_size_mb=max_size_mb, backup_count=backup_count, module_levels=module_levels, force=force, )
def _configure_impl( self, level: str | int = "INFO", *, console_level: str | int | None = None, console_format: str = "detailed", console_colors: bool = False, file_path: str | Path | None = None, file_level: str | int | None = None, max_size_mb: int = 10, backup_count: int = 5, module_levels: Mapping[str, str | int] | None = None, force: bool = False, ) -> Path | None: """Internal implementation of configure (called under lock).""" root_logger = logging.getLogger(self._root_logger_name) if force: self._clear_managed_handlers(root_logger) root_level_candidates = [_resolve_level(level)] if console_level is not False: root_level_candidates.append(_resolve_level(console_level)) if file_level is not False: root_level_candidates.append(_resolve_level(file_level)) root_level = min(lvl for lvl in root_level_candidates if lvl is not None) root_logger.setLevel(root_level) root_logger.propagate = False # Console handler console_handler: logging.Handler | None = None for handler in root_logger.handlers: if isinstance(handler, logging.StreamHandler) and not isinstance( handler, logging.FileHandler, ): console_handler = handler break if console_level is not False: if console_handler is None: console_handler = logging.StreamHandler(sys.stdout) console_handler._heterodyne_managed = True # type: ignore[attr-defined] root_logger.addHandler(console_handler) console_handler.setLevel(_resolve_level(console_level) or root_level) console_handler.setFormatter( self._build_formatter(console_format, use_color=console_colors) ) # File handler created_file: Path | None = None if file_path: resolved_path = Path(file_path) try: resolved_path.parent.mkdir(parents=True, exist_ok=True) except OSError as e: warn_logger = logging.getLogger(self._root_logger_name) warn_logger.warning( "Cannot create log directory %s: %s. File logging disabled.", resolved_path.parent, e, ) resolved_path = None # type: ignore[assignment] if resolved_path is not None: created_file = resolved_path max_bytes = int(max_size_mb * 1024 * 1024) file_handler: logging.Handler if max_bytes > 0: file_handler = RotatingFileHandler( resolved_path, maxBytes=max_bytes, backupCount=backup_count, ) else: file_handler = logging.FileHandler(resolved_path) file_handler._heterodyne_managed = True # type: ignore[attr-defined] file_handler.setLevel(_resolve_level(file_level) or root_level) file_handler.setFormatter( logging.Formatter( DEFAULT_FORMAT_DETAILED, datefmt="%Y-%m-%d %H:%M:%S", ) ) root_logger.addHandler(file_handler) # Default suppression for external libraries default_suppressions = { "jax": "WARNING", "numpy": "WARNING", "numba": "WARNING", "h5py": "WARNING", } for lib_name, lib_level in default_suppressions.items(): lib_logger = logging.getLogger(lib_name) if lib_logger.level == logging.NOTSET: lib_logger.setLevel(_resolve_level(lib_level) or logging.WARNING) # Module-specific overrides (user overrides win over defaults) if module_levels: for module_name, module_level in module_levels.items(): logging.getLogger(module_name).setLevel( _resolve_level(module_level) or root_level ) self._configured = True return created_file
[docs] def configure_from_dict( self, logging_config: Mapping[str, Any] | None, *, verbose: bool = False, quiet: bool = False, output_dir: Path | str | None = None, run_id: str | None = None, ) -> Path | None: """Configure logging from a ``logging:`` config section. Args: logging_config: Mapping with logging configuration (may be ``None`` or contain ``enabled: false`` to skip configuration). verbose: Override console level to DEBUG. quiet: Override console level to ERROR. output_dir: Base directory for auto-generated log file paths. run_id: Run identifier used for auto-generated log filenames. Returns: Path to the log file if a file handler was created, else ``None``. """ if not logging_config or not logging_config.get("enabled", True): return None level = logging_config.get("level", "INFO") console_cfg: Mapping[str, Any] = logging_config.get("console", {}) or {} file_cfg: Mapping[str, Any] = logging_config.get("file", {}) or {} console_enabled = console_cfg.get("enabled", True) console_level: str | int | None = ( console_cfg.get("level", level) if console_enabled else False ) if console_enabled: if quiet: console_level = "ERROR" elif verbose: console_level = "DEBUG" file_path: Path | None = None if file_cfg.get("enabled", False): if "path" in file_cfg: base_dir = Path(file_cfg.get("path", "./logs/")) if not base_dir.is_absolute(): base_dir = base_dir.resolve() else: base_dir = Path(output_dir) / "logs" if output_dir else Path("./logs") base_dir = base_dir.resolve() filename = file_cfg.get("filename", "heterodyne_analysis.log") run_suffix = run_id or datetime.now().strftime("%Y%m%d_%H%M%S") if "{run_id}" in filename: filename = filename.format(run_id=run_suffix) else: stem = Path(filename).stem or "heterodyne_analysis" suffix = Path(filename).suffix or ".log" filename = f"{stem}_{run_suffix}{suffix}" file_path = base_dir / filename return self.configure( level=level, console_level=console_level, console_format=console_cfg.get("format", "detailed"), console_colors=bool(console_cfg.get("colors", False)), file_path=file_path, file_level=file_cfg.get("level", "DEBUG"), max_size_mb=int(file_cfg.get("max_size_mb", 10)), backup_count=int(file_cfg.get("backup_count", 5)), module_levels=logging_config.get("modules"), force=True, )
[docs] def get_logger(self, name: str) -> logging.Logger: """Get or create a logger with hierarchical naming.""" if not name.startswith(self._root_logger_name): if name == "__main__": full_name = f"{self._root_logger_name}.main" elif "." in name and name.startswith("heterodyne"): full_name = name else: full_name = f"{self._root_logger_name}.{name}" else: full_name = name if not self._configured: self.configure() return logging.getLogger(full_name)
# Global logger manager instance _logger_manager = MinimalLogger() # --------------------------------------------------------------------------- # Public API # ---------------------------------------------------------------------------
[docs] def get_logger( name: str | None = None, *, context: Mapping[str, Any] | None = None, ) -> logging.Logger | logging.LoggerAdapter[logging.Logger]: """Get a logger instance with automatic naming and optional context. Delegates to ``logging.getLogger`` via ``MinimalLogger``, which caches logger instances by name. Callers that previously used:: get_logger("heterodyne") or:: get_logger(__name__) continue to work unchanged. The optional *context* keyword adds structured key-value prefixes to every message emitted through the returned adapter. Args: name: Logger name, typically ``__name__``. When ``None`` the caller's ``__name__`` is inferred automatically via frame inspection. context: Optional mapping of key-value pairs to include as a prefix on every log message (e.g. ``{"run_id": "abc", "q_bin": 5}``). Returns: A plain :class:`logging.Logger` when *context* is ``None``, or a :class:`_ContextAdapter` that prepends ``[key=value ...]`` to every message when *context* is provided. """ if name is None: name = "heterodyne" base_logger = _logger_manager.get_logger(name) if context: return _ContextAdapter(base_logger, dict(context)) return base_logger
[docs] def configure_logging( level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO", log_file: Path | str | None = None, format_string: str | None = None, ) -> None: """Configure logging for the heterodyne package. Simple interface for configuring console (and optionally file) logging. For richer control — rotating files, per-module overrides, CLI flag integration — use :class:`LogConfiguration` instead. Args: level: Logging level. log_file: Optional path to log file. format_string: Custom format string, or ``None`` for the default detailed format. """ if format_string is None: format_string = DEFAULT_FORMAT_DETAILED new_handlers: list[logging.Handler] = [] # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(logging.Formatter(format_string)) new_handlers.append(console_handler) # File handler (optional) if log_file is not None: file_path = Path(log_file) file_path.parent.mkdir(parents=True, exist_ok=True) file_handler = logging.FileHandler(file_path, mode="a", encoding="utf-8") file_handler.setFormatter(logging.Formatter(format_string)) new_handlers.append(file_handler) # Configure root heterodyne logger — close existing handlers first root_logger = logging.getLogger("heterodyne") root_logger.setLevel(getattr(logging, level)) for handler in root_logger.handlers[:]: handler.close() root_logger.removeHandler(handler) for handler in new_handlers: root_logger.addHandler(handler) # Prevent propagation to root logger root_logger.propagate = False # Mark configured so MinimalLogger does not auto-reconfigure _logger_manager._configured = True
[docs] def with_context( logger: logging.Logger | logging.LoggerAdapter[logging.Logger], **context: Any, ) -> logging.LoggerAdapter[logging.Logger]: """Create a contextual logger with key-value prefixes. Context is formatted as ``[key=value key2=value2] message``. Nested calls merge contexts (inner overrides outer on key conflicts). Thread-safe for use in multiprocessing. Args: logger: Base logger or existing contextual adapter to wrap. **context: Key-value pairs to include as prefix. Returns: A logger adapter that prefixes all messages with context. Example: >>> logger = get_logger(__name__) >>> ctx_logger = with_context(logger, run_id="abc123", mode="two_component") >>> ctx_logger.info("Starting analysis") # Output: [run_id=abc123 mode=two_component] Starting analysis >>> # Nested context >>> shard_logger = with_context(ctx_logger, q_bin=5) >>> shard_logger.info("Processing bin") # Output: [run_id=abc123 mode=two_component q_bin=5] Processing bin """ # Filter out None values from new context new_context = {k: v for k, v in context.items() if v is not None} # If wrapping an existing _ContextAdapter, merge contexts (inner overrides outer) if isinstance(logger, _ContextAdapter): merged_context = dict(logger.extra) if logger.extra else {} merged_context.update(new_context) # Get the underlying logger to avoid nested adapters base_logger = logger.logger return _ContextAdapter(base_logger, merged_context) # If wrapping a LoggerAdapter (not our _ContextAdapter), extract base logger if isinstance(logger, logging.LoggerAdapter): base_logger = logger.logger return _ContextAdapter(base_logger, new_context) # Wrapping a plain Logger return _ContextAdapter(logger, new_context)
[docs] @dataclass class PhaseContext: """Context object returned by :func:`log_phase` with timing and memory info.""" name: str duration: float = 0.0 memory_peak_gb: float | None = None memory_delta_gb: float | None = None
def _get_memory_gb() -> float | None: """Get current process memory usage in GB, or None if unavailable.""" try: import resource rusage = resource.getrusage(resource.RUSAGE_SELF) # maxrss is in KB on Linux, bytes on macOS if sys.platform == "darwin": return rusage.ru_maxrss / (1024**3) # bytes to GB return rusage.ru_maxrss / (1024**2) # KB to GB except (ImportError, AttributeError): return None
[docs] @contextmanager def log_phase( name: str, logger: LoggerType | None = None, level: int = logging.INFO, track_memory: bool = False, threshold_s: float = 0.0, ) -> Generator[PhaseContext]: """Context manager for phase-level timing with optional memory tracking. Args: name: Phase name for logging. logger: Logger to use. If ``None``, uses the caller's module logger. level: Log level for phase messages. track_memory: Track memory usage during phase. threshold_s: Only log if duration > threshold (0 = always log). Yields: :class:`PhaseContext` with ``name``, ``duration``, ``memory_peak_gb``, and ``memory_delta_gb``. Duration and memory values are populated after the context exits. Example: >>> with log_phase("optimization", track_memory=True) as phase: ... result = run_optimization(data) >>> print(f"Took {phase.duration:.1f}s") # Logs: Phase 'optimization' completed in 45.3s (peak memory: 12.4 GB) """ resolved_logger = get_logger() if logger is None else logger context = PhaseContext(name=name) memory_start: float | None = None if track_memory: memory_start = _get_memory_gb() if threshold_s <= 0: resolved_logger.log(level, "Phase '%s' started", name) start_time = time.perf_counter() try: yield context finally: context.duration = time.perf_counter() - start_time if track_memory: memory_end = _get_memory_gb() if memory_end is not None: context.memory_peak_gb = memory_end if memory_start is not None: context.memory_delta_gb = memory_end - memory_start if context.duration >= threshold_s: failed = sys.exc_info()[1] is not None outcome = "failed" if failed else "completed" msg_parts = [f"Phase '{name}' {outcome} in {context.duration:.2f}s"] if context.memory_peak_gb is not None: msg_parts.append(f"(peak memory: {context.memory_peak_gb:.1f} GB)") log_level = logging.ERROR if failed else level resolved_logger.log(log_level, " ".join(msg_parts))
[docs] def log_exception( logger: logging.Logger | logging.LoggerAdapter[logging.Logger], exc: BaseException, context: dict[str, Any] | None = None, level: int = logging.ERROR, include_traceback: bool = True, ) -> None: """Log an exception with full context for debugging. Extracts module, function, and line number from exception traceback. Formats context as key-value pairs in the message. Args: logger: Logger to use. exc: Exception to log. context: Additional context (e.g., parameter values). level: Log level (default ERROR). include_traceback: Include full traceback (default True). Example: >>> try: ... result = compute_jacobian(params) ... except ValueError as e: ... log_exception(logger, e, context={ ... "iteration": 45, ... "params": params.tolist()[:5] ... }) ... raise # Logs: # ERROR | heterodyne.optimization.nlsq.core | Exception in compute_jacobian: # ValueError: invalid value # Context: iteration=45, params=[1.2e-11, 0.85, ...] # Traceback (most recent call last): # ... """ tb = exc.__traceback__ location_info = "" if tb is not None: while tb.tb_next is not None: tb = tb.tb_next frame = tb.tb_frame func_name = frame.f_code.co_name line_no = tb.tb_lineno module_name = frame.f_globals.get("__name__", "unknown") location_info = f" in {module_name}.{func_name}:{line_no}" exc_type = type(exc).__name__ exc_msg = str(exc) msg_parts = [f"Exception{location_info}: {exc_type}: {exc_msg}"] if context: context_str = ", ".join(f"{k}={v!r}" for k, v in context.items()) msg_parts.append(f"Context: {context_str}") if include_traceback: tb_str = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) msg_parts.append(f"Traceback:\n{tb_str}") logger.log(level, "\n".join(msg_parts))
[docs] def log_calls( logger: LoggerType | None = None, level: int = logging.DEBUG, include_args: bool = False, include_result: bool = False, ) -> Callable[[F], F]: """Decorator to log function calls. Args: logger: Logger to use. If ``None``, creates one for the decorated function's module. level: Logging level to use. include_args: Whether to log function arguments. include_result: Whether to log the function return value. Returns: Decorator that wraps *func* with entry/exit logging. """ resolved_logger: LoggerType | None = logger def decorator(func: F) -> F: nonlocal resolved_logger if resolved_logger is None: resolved_logger = get_logger(func.__module__) @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: # ``resolved_logger`` is guaranteed non-None: the enclosing # ``decorator`` populates it via ``get_logger`` if the caller # passed ``None``. assert resolved_logger is not None # noqa: S101 — narrowing only rlog = resolved_logger log_enabled = rlog.isEnabledFor(level) # Always compute func_name so unbound-var paths can't trip pyright. func_name = f"{func.__module__}.{func.__qualname__}" if log_enabled: if include_args: args_str = ", ".join([repr(arg) for arg in args]) kwargs_str = ", ".join([f"{k}={v!r}" for k, v in kwargs.items()]) all_args = ", ".join(filter(None, [args_str, kwargs_str])) rlog.log(level, "Calling %s(%s)", func_name, all_args) else: rlog.log(level, "Calling %s", func_name) try: result = func(*args, **kwargs) if log_enabled: if include_result: rlog.log(level, "Completed %s -> %r", func_name, result) else: rlog.log(level, "Completed %s", func_name) return result except Exception as e: rlog.log(logging.ERROR, "Exception in %s: %s", func_name, e) raise return wrapper # type: ignore[return-value] return decorator
[docs] def log_performance( logger: LoggerType | None = None, level: int = logging.INFO, threshold: float = 0.1, ) -> Callable[[F], F]: """Decorator to log function performance. Only emits a log record when the function's wall-clock time meets or exceeds *threshold* seconds. Args: logger: Logger to use. If ``None``, creates one for the decorated function's module. level: Logging level to use. threshold: Minimum duration in seconds required to emit a log record. Returns: Decorator that wraps *func* with performance logging. """ resolved_logger: LoggerType | None = logger def decorator(func: F) -> F: nonlocal resolved_logger if resolved_logger is None: resolved_logger = get_logger(func.__module__) @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: # See log_calls.wrapper — resolved_logger is non-None by construction. assert resolved_logger is not None # noqa: S101 — narrowing only rlog = resolved_logger start_time = time.perf_counter() func_name = f"{func.__module__}.{func.__qualname__}" try: result = func(*args, **kwargs) duration = time.perf_counter() - start_time if duration >= threshold: rlog.log( level, "Performance: %s completed in %.3fs", func_name, duration, ) return result except Exception as e: duration = time.perf_counter() - start_time rlog.log( logging.ERROR, "Performance: %s failed after %.3fs: %s", func_name, duration, e, ) raise return wrapper # type: ignore[return-value] return decorator
[docs] @contextmanager def log_operation( operation_name: str, logger: LoggerType | None = None, level: int = logging.INFO, ) -> Generator[LoggerType]: """Context manager for logging named operations with timing. Args: operation_name: Human-readable name of the operation. logger: Logger to use. If ``None``, uses the caller's module logger. level: Logging level to use. Yields: The resolved logger so callers can emit additional messages inside the ``with`` block without holding a separate reference. Example: >>> with log_operation("jacobian computation") as log: ... J = compute_jacobian(params) ... log.debug("Jacobian shape: %s", J.shape) # Logs: Starting operation: jacobian computation # Logs: Completed operation: jacobian computation in 0.042s """ resolved_logger = get_logger() if logger is None else logger resolved_logger.log(level, "Starting operation: %s", operation_name) start_time = time.perf_counter() try: yield resolved_logger duration = time.perf_counter() - start_time resolved_logger.log( level, "Completed operation: %s in %.3fs", operation_name, duration ) except Exception as e: duration = time.perf_counter() - start_time resolved_logger.log( logging.ERROR, "Failed operation: %s after %.3fs: %s", operation_name, duration, e, ) raise
[docs] class ConvergenceLogger: """Structured logger for optimization convergence diagnostics."""
[docs] def __init__(self, logger: logging.Logger | None = None) -> None: self.logger = logger or get_logger("heterodyne.optimization") self._iteration = 0
[docs] def log_iteration( self, iteration: int, loss: float, gradient_norm: float | None = None, step_size: float | None = None, ) -> None: """Log optimization iteration metrics.""" self._iteration = iteration msg = f"iter={iteration:4d} | loss={loss:.6e}" if gradient_norm is not None: msg += f" | grad_norm={gradient_norm:.3e}" if step_size is not None: msg += f" | step={step_size:.3e}" self.logger.debug(msg)
[docs] def log_convergence(self, reason: str, final_loss: float) -> None: """Log convergence result.""" self.logger.info( "Converged: %s at iteration %d | final_loss=%.6e", reason, self._iteration, final_loss, )
[docs] def log_diagnostic( self, metric_name: str, value: float, threshold: float, higher_is_better: bool = True, ) -> None: """Log diagnostic metric with pass/fail status. Args: metric_name: Name of the diagnostic metric. value: Observed value. threshold: Threshold for pass/fail. higher_is_better: If ``True``, values above threshold pass (e.g. ESS). If ``False``, values below threshold pass (e.g. R-hat). """ if higher_is_better: status = "PASS" if value >= threshold else "WARN" else: status = "PASS" if value <= threshold else "WARN" self.logger.info( "[%s] %s: %.4f (threshold: %.4f)", status, metric_name, value, threshold )
# Configure default logging on import _logger_manager.configure()