Data

XPCS data loading, runtime validation, preprocessing, quality control, and memory management for large datasets.

Data Loader

XPCS data loading from HDF5, NPZ, and MAT files.

class heterodyne.data.xpcs_loader.XPCSData[source]

Bases: object

Container for loaded XPCS data.

c2: ndarray
t1: ndarray
t2: ndarray
q: float | None = None
phi_angles: ndarray | None = None
uncertainties: ndarray | None = None
q_values: ndarray | None = None
metadata: dict[str, Any]
property shape: tuple[int, ...]

Shape of correlation data.

property n_times: int

Number of time points.

property has_multi_phi: bool

Whether data has multiple phi angles.

property has_multi_q: bool

Whether data contains multiple q-bins (q_values is set).

__init__(c2, t1, t2, q=None, phi_angles=None, uncertainties=None, q_values=None, metadata=<factory>)
exception heterodyne.data.xpcs_loader.DataValidationError[source]

Bases: ValueError

Raised when loaded XPCS data fails validation checks.

heterodyne.data.xpcs_loader.validate_loaded_data(data)[source]

Validate an XPCSData container and return a list of warning strings.

Performs the following checks:

  1. NaN / Inf - c2, t1, t2 must be finite.

  2. Shape consistency - t1 and t2 lengths must match the time dimensions of c2. If q_values is set, its length must match c2.shape[0].

  3. Symmetry - For 2-D c2 the matrix should be approximately symmetric (max |c2 - c2.T| / max(|c2|) < 1e-6). A warning is issued but no exception is raised.

  4. Positive diagonal - All diagonal elements of c2 (or each slice for 3-D) must be positive.

  5. Time monotonicity - t1 and t2 must be strictly increasing.

Parameters:

data (XPCSData) – Loaded XPCSData to validate.

Return type:

list[str]

Returns:

List of human-readable warning strings. An empty list means all checks passed.

Raises:

DataValidationError – If any hard constraint is violated (NaN/Inf, shape mismatch, non-positive diagonal, non-monotonic time).

heterodyne.data.xpcs_loader.probe_hdf5_structure(file_path)[source]

Inspect and report the structure of an HDF5 file.

Recursively walks the HDF5 tree and collects dataset names, shapes, dtypes, and top-level attributes. Useful for discovering key names before loading.

Parameters:

file_path (Path | str) – Path to the HDF5 file.

Returns:

  • "datasets": list of dicts, each with "path", "shape", "dtype" for every dataset in the file.

  • "groups": list of str paths for all groups.

  • "root_attrs": dict of attributes on the root / group.

  • "n_datasets": total dataset count.

  • "n_groups": total group count.

Return type:

dict[str, Any]

Raises:

heterodyne.utils.path_validation.PathValidationError – If the file does not exist or is not readable.

heterodyne.data.xpcs_loader.load_xpcs_batch(file_paths, c2_key='c2', time_key='t', format=None, use_cache=False, validate=False, apply_diag_correction=False, diag_correction_width=1, diag_correction_method='interpolate', frame_range=None, select_q=None, q_tolerance=None, allow_partial=False)[source]

Load multiple XPCS data files and return them as a list.

Each file is loaded independently using XPCSDataLoader. By default, any per-file failure raises — silent partial batches were masking config errors and corrupted files. Pass allow_partial=True to opt into the legacy skip-on-failure behaviour; failed paths are still logged at ERROR level and omitted from the returned list.

Parameters:
  • file_paths (list[Path | str]) – Sequence of paths to data files.

  • c2_key (str) – Key for correlation data in each file.

  • time_key (str) – Key for time array in each file.

  • format (str | None) – File format (auto-detected per file if None).

  • use_cache (bool) – If True, enable NPZ caching for each file.

  • validate (bool) – If True, run validate_loaded_data() on each result.

  • apply_diag_correction (bool) – If True, apply diagonal correction to each c2.

  • diag_correction_width (int) – Band width passed to diagonal correction.

  • diag_correction_method (str) – Method passed to diagonal correction.

  • frame_range (tuple[int, int] | None) – Optional (start, end) with 1-based inclusive frame indices applied uniformly to every file.

  • select_q (float | None) – Target wavevector in Å⁻¹ applied uniformly to every file.

  • q_tolerance (float | None) – Maximum absolute deviation from select_q in Å⁻¹.

  • allow_partial (bool) – If True, individual file failures are logged and skipped instead of raising. Default False — strict batches.

Return type:

list[XPCSData]

Returns:

List of XPCSData objects, one per successfully loaded file.

Raises:

OSError, ValueError, KeyError, RuntimeError – From the first per-file failure when allow_partial=False (default).

class heterodyne.data.xpcs_loader.XPCSDataLoader[source]

Bases: object

Loader for XPCS correlation data from various file formats.

__init__(file_path, format=None)[source]

Initialize loader.

Parameters:
  • file_path (Path | str) – Path to data file.

  • format (str | None) – File format (‘hdf5’, ‘npz’, ‘mat’, ‘npy’), or None to auto-detect from extension.

load(c2_key='c2', time_key='t', q_key='q', phi_key='phi', use_cache=False, frame_range=None, select_q=None, q_tolerance=None, cache_dir=None, cache_template=None, template_vars=None, cache_compression=True)[source]

Load XPCS data from file.

Parameters:
  • c2_key (str) – Key/path for correlation data.

  • time_key (str) – Key/path for time array.

  • q_key (str | None) – Optional key for scalar wavevector.

  • phi_key (str | None) – Optional key for phi angles.

  • use_cache (bool) – If True and the format supports caching (hdf5, mat), attempt to load from a collocated NPZ cache first. The cache is invalidated automatically when the source file’s mtime changes.

  • frame_range (tuple[int, int] | None) – Optional (start, end) with 1-based indexing (matching homodyne convention). If provided, only frames start through end (inclusive) are retained after loading. Validation is performed before slicing.

  • select_q (float | None) – Target wavevector in Å⁻¹. When set and the loaded data has q_values (multi-q 3-D), the q-bin(s) closest to this value are selected. If q_tolerance is also given, all bins within that tolerance are kept; otherwise only the single nearest bin is kept.

  • q_tolerance (float | None) – Maximum absolute deviation from select_q in Å⁻¹. Ignored when select_q is None.

Return type:

XPCSData

Returns:

XPCSData container.

heterodyne.data.xpcs_loader.select_optimal_wavevector(q_values, target_q, tolerance=None)[source]

Select q-bin indices closest to a target wavevector.

Parameters:
  • q_values (ndarray) – Available q-values array (Å⁻¹).

  • target_q (float) – Target wavevector in Å⁻¹.

  • tolerance (float | None) – Maximum absolute deviation from target_q in Å⁻¹. If None, only the single closest q-value is selected.

Return type:

tuple[ndarray, ndarray]

Returns:

Tuple of (selected_indices, selected_q_values) where both are 1-D NumPy arrays. selected_indices contains integer indices into q_values; selected_q_values contains the corresponding q-values.

Raises:

ValueError – If q_values is empty.

heterodyne.data.xpcs_loader.load_xpcs_data(file_path, c2_key='c2', time_key='t', format=None, use_cache=False, frame_range=None, select_q=None, q_tolerance=None, cache_dir=None, cache_template=None, template_vars=None, cache_compression=True)[source]

Convenience function to load XPCS data.

Parameters:
  • file_path (Path | str) – Path to data file.

  • c2_key (str) – Key for correlation data.

  • time_key (str) – Key for time array.

  • format (str | None) – File format (auto-detected if None).

  • use_cache (bool) – Enable NPZ caching to avoid re-reading large source files.

  • frame_range (tuple[int, int] | None) – Optional (start, end) with 1-based inclusive frame indices. See XPCSDataLoader.load() for details.

  • select_q (float | None) – Target wavevector in Å⁻¹ for q-bin selection. Applied only when the loaded data has multiple q-bins.

  • q_tolerance (float | None) – Maximum absolute deviation from select_q in Å⁻¹. None selects only the single closest bin.

  • cache_dir (Path | None) – Directory for cache files (None = collocate with source).

  • cache_template (str | None) – Filename template with ${key} placeholders.

  • template_vars (dict[str, str] | None) – Substitution values for the template.

  • cache_compression (bool) – Whether to compress cache files.

Return type:

XPCSData

Returns:

XPCSData container.

Validators

Input validation utilities for XPCS data arrays.

Complements the higher-level validation.py module by providing fine-grained, composable checks on individual arrays. Each function returns a list of error strings; an empty list indicates valid input.

heterodyne.data.validators.validate_correlation_shape(c2, expected_shape=None)[source]

Validate shape of a correlation matrix.

Checks that c2 is 2D (single angle) or 3D (multi-angle batch), and optionally matches an expected shape.

Parameters:
  • c2 (ndarray) – Correlation array to validate.

  • expected_shape (tuple[int, ...] | None) – If provided, the exact expected shape.

Return type:

list[str]

Returns:

List of error messages (empty if valid).

heterodyne.data.validators.validate_time_arrays(t1, t2)[source]

Validate time arrays for monotonicity and matching lengths.

Parameters:
  • t1 (ndarray) – First time axis array.

  • t2 (ndarray) – Second time axis array.

Return type:

list[str]

Returns:

List of error messages (empty if valid).

heterodyne.data.validators.validate_q_range(q, q_min, q_max)[source]

Validate that wavevector values fall within the specified range.

Parameters:
  • q (ndarray) – Array of wavevector values.

  • q_min (float) – Minimum allowed wavevector.

  • q_max (float) – Maximum allowed wavevector.

Return type:

list[str]

Returns:

List of error messages (empty if valid).

heterodyne.data.validators.validate_weights(weights, data_shape)[source]

Validate weight array for non-negativity and shape compatibility.

Parameters:
  • weights (ndarray) – Weight array to validate.

  • data_shape (tuple[int, ...]) – Expected shape (must match weights shape).

Return type:

list[str]

Returns:

List of error messages (empty if valid).

heterodyne.data.validators.validate_no_nan(arr, name)[source]

Check that an array contains no NaN or Inf values.

Parameters:
  • arr (ndarray) – Array to check.

  • name (str) – Descriptive name for error messages.

Return type:

list[str]

Returns:

List of error messages (empty if valid).

Preprocessing

Preprocessing pipeline for XPCS correlation data.

class heterodyne.data.preprocessing.PreprocessingResult[source]

Bases: object

Result of preprocessing operations.

c2: ndarray
applied_steps: list[str]
statistics: dict[str, float]
__init__(c2, applied_steps=<factory>, statistics=<factory>)
class heterodyne.data.preprocessing.PreprocessingPipeline[source]

Bases: object

Pipeline for preprocessing XPCS correlation data.

Supports operations like: - Baseline subtraction - Normalization - Outlier removal - Smoothing

__init__()[source]

Initialize empty pipeline.

add_step(name, func)[source]

Add a preprocessing step.

Parameters:
Return type:

PreprocessingPipeline

Returns:

Self for chaining

normalize_diagonal()[source]

Add diagonal normalization step.

Normalizes c2 so that diagonal values are 1.

Return type:

PreprocessingPipeline

subtract_baseline(baseline=1.0)[source]

Add baseline subtraction step.

Parameters:

baseline (float) – Baseline value to subtract

Return type:

PreprocessingPipeline

clip_values(min_val=None, max_val=None)[source]

Add value clipping step.

Parameters:
Return type:

PreprocessingPipeline

remove_outliers(n_sigma=5.0, replace_with='median')[source]

Add outlier removal step.

Parameters:
  • n_sigma (float) – Number of standard deviations for outlier threshold

  • replace_with (str) – Replacement strategy (‘median’, ‘nan’, ‘clip’)

Return type:

PreprocessingPipeline

symmetrize()[source]

Add symmetrization step for correlation matrices.

Makes c2(t1, t2) = c2(t2, t1). Handles both 2D and 3D (batch) data.

Return type:

PreprocessingPipeline

crop_time(t_start=0, t_end=None)[source]

Add time cropping step.

Parameters:
  • t_start (int) – Starting index

  • t_end (int | None) – Ending index (exclusive), None for end

Return type:

PreprocessingPipeline

process(c2)[source]

Apply all preprocessing steps.

Parameters:

c2 (ndarray) – Input correlation array

Return type:

PreprocessingResult

Returns:

PreprocessingResult with processed data

process_with_provenance(c2, source_file=None)[source]

Apply all preprocessing steps and record full provenance.

Generates a unique pipeline ID and computes per-step input/output hashes so that every transformation can be audited.

Parameters:
  • c2 (ndarray) – Input correlation array.

  • source_file (str | None) – Optional path to the source data file, stored in the provenance record.

Return type:

tuple[PreprocessingResult, PreprocessingProvenance]

Returns:

A tuple of (PreprocessingResult, PreprocessingProvenance).

heterodyne.data.preprocessing.preprocess_correlation(c2, normalize=True, remove_outliers=True, symmetrize=True)[source]

Convenience function for standard preprocessing.

Parameters:
  • c2 (ndarray) – Input correlation array

  • normalize (bool) – Whether to normalize diagonal to 1

  • remove_outliers (bool) – Whether to remove outliers

  • symmetrize (bool) – Whether to symmetrize

Return type:

PreprocessingResult

Returns:

PreprocessingResult

class heterodyne.data.preprocessing.PreprocessingStage[source]

Bases: Enum

Stages in the preprocessing pipeline.

LOAD_RAW = 'load_raw'
VALIDATE_INPUT = 'validate_input'
NORMALIZE = 'normalize'
CORRECT_BASELINE = 'correct_baseline'
REDUCE_NOISE = 'reduce_noise'
TRANSFORM = 'transform'
VALIDATE_OUTPUT = 'validate_output'
class heterodyne.data.preprocessing.NormalizationMethod[source]

Bases: Enum

Available normalization methods.

DIAGONAL = 'diagonal'
BASELINE = 'baseline'
ZSCORE = 'zscore'
MINMAX = 'minmax'
ROBUST = 'robust'
PHYSICS_BASED = 'physics_based'
NONE = 'none'
class heterodyne.data.preprocessing.NoiseReductionMethod[source]

Bases: Enum

Available noise reduction methods.

MEDIAN_FILTER = 'median_filter'
GAUSSIAN_SMOOTH = 'gaussian_smooth'
WIENER_FILTER = 'wiener_filter'
SAVGOL_FILTER = 'savgol_filter'
WAVELET = 'wavelet'
NONE = 'none'
class heterodyne.data.preprocessing.TransformationRecord[source]

Bases: object

Record of a single preprocessing transformation for provenance tracking.

stage

Which preprocessing stage this belongs to.

method

Name of the method applied.

parameters

Parameters used for the transformation.

timestamp

ISO-format timestamp of when the transformation was applied.

input_hash

SHA-256 hash of the input array.

output_hash

SHA-256 hash of the output array.

stage: PreprocessingStage
method: str
parameters: dict[str, Any]
timestamp: str
input_hash: str
output_hash: str
__init__(stage, method, parameters, timestamp, input_hash, output_hash)
class heterodyne.data.preprocessing.PreprocessingProvenance[source]

Bases: object

Audit trail for preprocessing operations.

Tracks every transformation applied to data, including hashes of input/output arrays for reproducibility verification.

records

List of transformation records.

source_file

Path to the source data file, if applicable.

created_at

ISO-format creation timestamp.

pipeline_id

Unique identifier for this pipeline run.

config_hash

Hash of the pipeline configuration for reproducibility.

records: list[TransformationRecord]
source_file: str | None = None
created_at: str
pipeline_id: str = ''
config_hash: str = ''
__post_init__()[source]

Set created_at to current UTC time if not provided.

Return type:

None

add_record(record)[source]

Append a transformation record to the provenance trail.

Parameters:

record (TransformationRecord) – The transformation record to add.

Return type:

None

to_dict()[source]

Serialize provenance to a dictionary.

Return type:

dict[str, Any]

Returns:

Dictionary representation suitable for JSON serialization.

classmethod from_dict(d)[source]

Deserialize provenance from a dictionary.

Parameters:

d (dict[str, Any]) – Dictionary as produced by to_dict.

Return type:

PreprocessingProvenance

Returns:

Reconstructed PreprocessingProvenance instance.

__init__(records=<factory>, source_file=None, created_at=<factory>, pipeline_id='', config_hash='')
heterodyne.data.preprocessing.normalize_zscore(c2)[source]

Z-score normalization: (c2 - mean) / std.

Handles zero standard deviation by returning a zero-centred array.

Parameters:

c2 (ndarray) – Input correlation array.

Return type:

ndarray

Returns:

Z-score normalized array.

heterodyne.data.preprocessing.normalize_minmax(c2)[source]

Min-max normalization: (c2 - min) / (max - min).

Handles degenerate case (max == min) by returning zeros.

Parameters:

c2 (ndarray) – Input correlation array.

Return type:

ndarray

Returns:

Array scaled to [0, 1].

heterodyne.data.preprocessing.normalize_robust(c2)[source]

Robust normalization: (c2 - median) / IQR.

Uses the interquartile range (Q3 - Q1) as the scale factor. Handles IQR == 0 by returning (c2 - median).

Parameters:

c2 (ndarray) – Input correlation array.

Return type:

ndarray

Returns:

Robustly normalized array.

heterodyne.data.preprocessing.apply_baseline_correction(c2, baseline=None, method='subtract')[source]

Apply baseline correction to a correlation matrix.

Parameters:
  • c2 (ndarray) – Input correlation array (2D or 3D).

  • baseline (ndarray | float | None) – Baseline value(s). If None, the baseline is estimated from the last 10% of off-diagonal elements (far from the diagonal, corresponding to long time delays).

  • method (str) – Correction strategy — "subtract", "divide", or "polynomial".

Return type:

ndarray

Returns:

Baseline-corrected array.

Raises:

ValueError – If method is not one of the supported strategies.

heterodyne.data.preprocessing.apply_noise_reduction(c2, method=NoiseReductionMethod.GAUSSIAN_SMOOTH, **kwargs)[source]

Apply noise reduction to a correlation matrix.

Parameters:
  • c2 (ndarray) – Input correlation array (2D or 3D).

  • method (NoiseReductionMethod) – Noise reduction method to apply.

  • **kwargs (Any) – Method-specific parameters. - kernel_size (int): Kernel size for median filter (default 3). - sigma (float): Standard deviation for Gaussian smooth (default 1.0).

Return type:

ndarray

Returns:

Noise-reduced array.

heterodyne.data.preprocessing.process_chunked(c2, pipeline, chunk_size=100)[source]

Process data in chunks along the first (batch) axis.

For 3D data (n_phi, n_t, n_t), slices are processed in groups of chunk_size along axis 0 and concatenated. For 2D data the pipeline is applied directly.

Steps that use array-wide statistics (outlier removal, baseline subtraction, z-score / min-max / robust normalization) are rejected with ValueError when chunking would be required — per-chunk statistics would produce mismatched scales between chunks. Run the pipeline whole or remove those steps before chunking.

Parameters:
  • c2 (ndarray) – Input correlation array (2D or 3D).

  • pipeline (PreprocessingPipeline) – Configured PreprocessingPipeline.

  • chunk_size (int) – Number of slices per chunk (along axis 0).

Return type:

PreprocessingResult

Returns:

Combined PreprocessingResult.

Raises:

ValueError – If the pipeline contains steps requiring global statistics and chunking would actually be applied (i.e. c2.ndim == 3 and n_phi > chunk_size).

heterodyne.data.preprocessing.preprocess_xpcs_data(c2, normalize_method=NormalizationMethod.DIAGONAL, noise_reduction=NoiseReductionMethod.NONE, remove_outliers=False, symmetrize=True, baseline_correction=False, **kwargs)[source]

Convenience function for comprehensive XPCS preprocessing.

Builds a PreprocessingPipeline with the requested steps and applies it to c2. The processing order is:

  1. Outlier removal (optional)

  2. Symmetrization (optional)

  3. Baseline correction (optional)

  4. Normalization (configurable method)

  5. Noise reduction (configurable method)

Parameters:
  • c2 (ndarray) – Input correlation array.

  • normalize_method (NormalizationMethod) – Which normalization to apply.

  • noise_reduction (NoiseReductionMethod) – Which noise reduction to apply.

  • remove_outliers (bool) – Whether to remove outliers before other steps.

  • symmetrize (bool) – Whether to symmetrize the correlation matrix.

  • baseline_correction (bool) – Whether to apply baseline correction.

  • **kwargs (Any) – Extra keyword arguments forwarded to noise reduction (e.g. kernel_size, sigma).

Return type:

PreprocessingResult

Returns:

PreprocessingResult with processed data, step list, and summary statistics.

Quality Controller

Comprehensive data quality assessment for XPCS correlation data.

class heterodyne.data.quality_controller.QualityLevel[source]

Bases: Enum

Quality classification levels, ordered from best to worst.

GOOD = 'good'
ACCEPTABLE = 'acceptable'
WARNING = 'warning'
CRITICAL = 'critical'
class heterodyne.data.quality_controller.QualityMetric[source]

Bases: object

A single quality assessment metric.

name

Human-readable metric name.

value

Measured value of the metric.

threshold

Threshold used for classification.

level

Quality level assigned to this metric.

message

Descriptive message explaining the assessment.

name: str
value: float
threshold: float
level: QualityLevel
message: str
__init__(name, value, threshold, level, message)
class heterodyne.data.quality_controller.QualityReport[source]

Bases: object

Aggregated quality report from multiple metrics.

metrics

Individual quality metrics.

overall_level

Worst-case level across all metrics.

recommendations

Actionable suggestions for improving data quality.

metrics: list[QualityMetric]
overall_level: QualityLevel = 'good'
recommendations: list[str]
summary()[source]

Generate a human-readable quality report.

Return type:

str

Returns:

Multi-line summary string.

__init__(metrics=<factory>, overall_level=QualityLevel.GOOD, recommendations=<factory>)
class heterodyne.data.quality_controller.QualityController[source]

Bases: object

Assess quality of XPCS correlation data.

Runs a battery of checks and produces a QualityReport.

Parameters:

config (DataConfig | None) – Optional data configuration; thresholds may be drawn from it in the future.

__init__(config=None)[source]
assess(c2, t, q=None, phi_angles=None)[source]

Run all quality checks and produce a report.

Parameters:
  • c2 (ndarray) – Correlation data, shape (n_t, n_t) or (n_phi, n_t, n_t).

  • t (ndarray) – 1D time array.

  • q (ndarray | None) – Optional wavevector array.

  • phi_angles (ndarray | None) – Optional phi angle array.

Return type:

QualityReport

Returns:

QualityReport with per-metric assessments and recommendations.

class heterodyne.data.quality_controller.QualityControlStage[source]

Bases: Enum

Processing stage at which quality assessment is performed.

RAW = 'raw'
FILTERED = 'filtered'
PREPROCESSED = 'preprocessed'
FINAL = 'final'
class heterodyne.data.quality_controller.QualityControlConfig[source]

Bases: object

Configuration for the 4-stage quality control pipeline.

nan_threshold

Maximum acceptable NaN fraction (0-1).

snr_threshold

Minimum acceptable signal-to-noise ratio.

symmetry_threshold

Maximum acceptable relative asymmetry.

value_range_max

Maximum acceptable absolute value.

min_time_points

Minimum required number of time points.

auto_repair_nans

Whether to automatically interpolate NaN values.

auto_repair_outliers

Whether to automatically clip outlier values.

outlier_sigma

Number of standard deviations beyond which values are considered outliers (used when auto_repair_outliers is True).

report_format

Output format for reports, "text" or "json".

nan_threshold: float = 0.05
snr_threshold: float = 5.0
symmetry_threshold: float = 0.01
value_range_max: float = 100.0
min_time_points: int = 10
auto_repair_nans: bool = False
auto_repair_outliers: bool = False
outlier_sigma: float = 5.0
report_format: str = 'text'
__init__(nan_threshold=0.05, snr_threshold=5.0, symmetry_threshold=0.01, value_range_max=100.0, min_time_points=10, auto_repair_nans=False, auto_repair_outliers=False, outlier_sigma=5.0, report_format='text')
class heterodyne.data.quality_controller.QualityControlResult[source]

Bases: object

Result of a single-stage quality assessment.

stage

The pipeline stage this result corresponds to.

report

The underlying QualityReport produced by the controller.

auto_corrections

Descriptions of any automatic corrections applied.

recommendations

Actionable suggestions derived from the assessment.

quality_score

Aggregate quality score in [0, 1] (1 = perfect).

metadata

Arbitrary key-value metadata for downstream consumers.

stage: QualityControlStage
report: QualityReport
auto_corrections: list[str]
recommendations: list[str]
quality_score: float = 1.0
metadata: dict[str, Any]
__init__(stage, report, auto_corrections=<factory>, recommendations=<factory>, quality_score=1.0, metadata=<factory>)
heterodyne.data.quality_controller.assess_stage(controller, c2, t, stage, config=None)[source]

Run a quality assessment for a specific pipeline stage.

Parameters:
  • controller (QualityController) – An existing QualityController instance.

  • c2 (ndarray) – Correlation data array.

  • t (ndarray) – 1-D time array.

  • stage (QualityControlStage) – Pipeline stage being assessed.

  • config (QualityControlConfig | None) – Optional configuration; defaults are used when None.

Return type:

QualityControlResult

Returns:

A QualityControlResult with score and recommendations.

heterodyne.data.quality_controller.suggest_fixes(report)[source]

Analyse a QualityReport and suggest concrete corrective actions.

Parameters:

report (QualityReport) – Quality report to analyse.

Return type:

list[dict[str, Any]]

Returns:

List of fix suggestion dicts, each with action, description, and priority keys.

heterodyne.data.quality_controller.apply_auto_corrections(c2, t, report, config)[source]

Apply automatic corrections to data based on quality report.

The input arrays are not mutated; a corrected copy of c2 is returned.

Parameters:
  • c2 (ndarray) – Correlation data array.

  • t (ndarray) – 1-D time array.

  • report (QualityReport) – Quality report from a prior assessment.

  • config (QualityControlConfig) – Configuration controlling which corrections are enabled.

Return type:

tuple[ndarray, list[str]]

Returns:

Tuple of (corrected_c2, list_of_correction_descriptions).

heterodyne.data.quality_controller.export_report(result, format='text')[source]

Export a QualityControlResult in the requested format.

Parameters:
  • result (QualityControlResult) – Quality control result to export.

  • format (str) – "text" for a human-readable string, "json" for a JSON-safe dictionary.

Return type:

str | dict[str, Any]

Returns:

Formatted report as a string or dict.

Raises:

ValueError – If format is not "text" or "json".

heterodyne.data.quality_controller.track_quality_history(history, new_result)[source]

Append a new result to the quality history and log the trend.

Parameters:
  • history (list[QualityControlResult]) – Existing list of results (may be empty).

  • new_result (QualityControlResult) – The latest quality control result.

Return type:

list[QualityControlResult]

Returns:

The updated history list (same object, mutated in place).

heterodyne.data.quality_controller.run_4_stage_pipeline(controller, c2, t, config=None)[source]

Execute the full 4-stage quality control pipeline.

Stages are run in order: RAW -> FILTERED -> PREPROCESSED -> FINAL. Between stages, automatic corrections are applied when enabled in config. Adaptive thresholds are computed once from the raw data.

Parameters:
  • controller (QualityController) – A QualityController instance.

  • c2 (ndarray) – Correlation data array.

  • t (ndarray) – 1-D time array.

  • config (QualityControlConfig | None) – Pipeline configuration; defaults are used when None.

Return type:

list[QualityControlResult]

Returns:

List of four QualityControlResult objects, one per stage.

Memory Manager

Memory budget tracking for large XPCS datasets.

Provides allocation tracking, budget enforcement, and chunk-size suggestions so that downstream code can stay within a configurable memory envelope.

class heterodyne.data.memory_manager.MemoryBudget[source]

Bases: object

Snapshot of memory budget state.

total_bytes

Total budget available.

allocated_bytes

Currently tracked allocations.

peak_bytes

Highest allocated_bytes observed.

total_bytes: int
allocated_bytes: int
peak_bytes: int
__init__(total_bytes, allocated_bytes, peak_bytes)
class heterodyne.data.memory_manager.MemoryManager[source]

Bases: object

Track memory allocations against a configurable budget.

When budget_bytes is None the manager auto-detects available system memory via psutil.virtual_memory(), falling back to 8 GB if psutil is not installed.

All public methods are thread-safe.

Parameters:

budget_bytes (int | None) – Explicit budget in bytes, or None for auto-detect.

__init__(budget_bytes=None)[source]
request(n_bytes, label)[source]

Request an allocation of n_bytes tracked under label.

If the allocation would exceed the budget, the request is denied and False is returned. Existing allocations with the same label are released first (idempotent re-allocation).

Parameters:
  • n_bytes (int) – Number of bytes to allocate.

  • label (str) – Human-readable label for the allocation.

Return type:

bool

Returns:

True if the allocation fits within the budget.

release(label)[source]

Release a tracked allocation.

No-op if the label is not currently tracked.

Parameters:

label (str) – The label passed to request().

Return type:

None

static estimate_array_size(shape, dtype=np.float64)[source]

Estimate the memory footprint of an array.

Parameters:
  • shape (tuple[int, ...]) – Array dimensions.

  • dtype (dtype | type) – NumPy dtype (or type convertible to one).

Return type:

int

Returns:

Size in bytes.

get_budget()[source]

Return a snapshot of the current budget state.

Return type:

MemoryBudget

Returns:

MemoryBudget dataclass.

suggest_chunk_size(total_elements, element_bytes)[source]

Suggest an optimal chunk size that fits within available budget.

The returned chunk size will use at most half of the remaining budget so that headroom is preserved for intermediate buffers.

Parameters:
  • total_elements (int) – Total number of elements to process.

  • element_bytes (int) – Bytes per element.

Return type:

int

Returns:

Suggested number of elements per chunk (always >= 1).

class heterodyne.data.memory_manager.MemoryMapManager[source]

Bases: object

Manage memory-mapped access to large HDF5 datasets.

For datasets that exceed available RAM, this provides read-only memory-mapped access via h5py’s direct chunk reading, avoiding full materialization of arrays into memory.

Parameters:

max_resident_bytes (int) – Maximum bytes to keep resident in memory at any time. Defaults to 2 GB.

__init__(max_resident_bytes=2 * 1024 * 1024 * 1024)[source]
open_dataset(file_path, dataset_path)[source]

Open an HDF5 dataset for memory-mapped-like access.

Returns the raw h5py.Dataset proxy — supports NumPy-style slicing (ds[:, 100:200]) and reads chunks on demand without materializing the full array. Use read_slice() for explicit partial reads or materialize() when the caller truly needs an in-memory ndarray.

Parameters:
  • file_path (Path | str) – Path to HDF5 file.

  • dataset_path (str) – Internal HDF5 dataset path (e.g., “/exchange/C2T_all/c2_00001”).

Return type:

Any

Returns:

h5py.Dataset proxy with lazy chunk access.

Raises:

MemoryError – If the dataset size exceeds max_resident_bytes and the caller would have to materialize it — guard against accidental OOM by calling read_slice() instead.

materialize(file_path, dataset_path)[source]

Eagerly load an HDF5 dataset into memory as an ndarray.

Raises MemoryError if the estimated size exceeds max_resident_bytes — callers must explicitly opt out of the limit by raising it, or use read_slice() for partial loads.

Return type:

ndarray

read_slice(file_path, dataset_path, slices)[source]

Read a specific slice from an HDF5 dataset without loading the full array.

Parameters:
  • file_path (Path | str) – Path to HDF5 file.

  • dataset_path (str) – Internal HDF5 dataset path.

  • slices (tuple[slice, ...]) – Tuple of slice objects defining the region to read.

Return type:

ndarray

Returns:

NumPy array of the requested slice.

estimate_dataset_size(file_path, dataset_path)[source]

Estimate the in-memory size of an HDF5 dataset without loading it.

Parameters:
  • file_path (Path | str) – Path to HDF5 file.

  • dataset_path (str) – Internal HDF5 dataset path.

Return type:

int

Returns:

Estimated size in bytes.

close_all()[source]

Close all open HDF5 file handles.

Return type:

None

__enter__()[source]

Support use as a context manager.

Return type:

MemoryMapManager

__exit__(*args)[source]

Close all handles on context exit.

Return type:

None

class heterodyne.data.memory_manager.ChunkInfo[source]

Bases: object

Metadata for a single processing chunk.

start

Start index along the batch axis.

end

End index (exclusive) along the batch axis.

size_bytes

Estimated memory footprint of this chunk.

priority

Processing priority (lower = higher priority).

start: int
end: int
size_bytes: int
priority: int = 0
__init__(start, end, size_bytes, priority=0)
class heterodyne.data.memory_manager.AdaptiveChunker[source]

Bases: object

Compute chunk sizes that adapt to available memory and data characteristics.

Unlike fixed chunking, this class monitors memory pressure and adjusts chunk sizes dynamically. Chunks near the diagonal of correlation matrices (small time lag) are given higher priority since they carry more signal.

Parameters:
  • memory_manager (MemoryManager) – MemoryManager instance for budget awareness.

  • safety_factor (float) – Fraction of available memory to actually use (default 0.5).

__init__(memory_manager, safety_factor=0.5)[source]
compute_chunks(total_elements, element_bytes, prioritize_near_diagonal=False)[source]

Compute adaptive chunk boundaries.

Parameters:
  • total_elements (int) – Total number of elements along the batch axis.

  • element_bytes (int) – Memory per element in bytes.

  • prioritize_near_diagonal (bool) – If True, assign lower priority numbers (= higher priority) to chunks covering small indices.

Return type:

list[ChunkInfo]

Returns:

List of ChunkInfo objects defining the chunking strategy.

class heterodyne.data.memory_manager.MemoryPressureLevel[source]

Bases: Enum

System memory pressure classification.

LOW = 'low'
MODERATE = 'moderate'
HIGH = 'high'
CRITICAL = 'critical'
class heterodyne.data.memory_manager.MemoryPressureMonitor[source]

Bases: object

Monitor system memory pressure and trigger adaptive responses.

Polls system memory usage via psutil (with graceful fallback) and classifies pressure into levels that downstream code can use to adjust batch sizes, enable compression, or skip optional caching.

Parameters:

poll_interval_seconds (float) – Minimum seconds between actual system polls (cached between polls). Defaults to 5.0.

__init__(poll_interval_seconds=5.0)[source]
current_pressure()[source]

Return the current memory pressure level.

Uses cached value if polled recently (within poll_interval_seconds).

Return type:

MemoryPressureLevel

Returns:

Current MemoryPressureLevel.

available_bytes()[source]

Return available system memory in bytes.

Return type:

int

Returns:

Available memory, or a conservative default if psutil unavailable.

should_reduce_allocation()[source]

Return True if memory pressure suggests reducing allocations.

Returns True when pressure is HIGH or CRITICAL.

Return type:

bool

recommended_budget_fraction()[source]

Return recommended fraction of total memory to use.

Returns:

1.0 for LOW, 0.75 for MODERATE, 0.5 for HIGH, 0.25 for CRITICAL.

Return type:

float