Data¶
XPCS data loading, runtime validation, preprocessing, quality control, and memory management for large datasets.
Data Loader¶
XPCS data loading from HDF5, NPZ, and MAT files.
- class heterodyne.data.xpcs_loader.XPCSData[source]
Bases:
objectContainer for loaded XPCS data.
- c2: ndarray
- t1: ndarray
- t2: ndarray
- property n_times: int
Number of time points.
- property has_multi_phi: bool
Whether data has multiple phi angles.
- property has_multi_q: bool
Whether data contains multiple q-bins (q_values is set).
- __init__(c2, t1, t2, q=None, phi_angles=None, uncertainties=None, q_values=None, metadata=<factory>)
- exception heterodyne.data.xpcs_loader.DataValidationError[source]
Bases:
ValueErrorRaised when loaded XPCS data fails validation checks.
- heterodyne.data.xpcs_loader.validate_loaded_data(data)[source]
Validate an XPCSData container and return a list of warning strings.
Performs the following checks:
NaN / Inf - c2, t1, t2 must be finite.
Shape consistency - t1 and t2 lengths must match the time dimensions of c2. If q_values is set, its length must match
c2.shape[0].Symmetry - For 2-D c2 the matrix should be approximately symmetric (max
|c2 - c2.T|/ max(|c2|) < 1e-6). A warning is issued but no exception is raised.Positive diagonal - All diagonal elements of c2 (or each slice for 3-D) must be positive.
Time monotonicity - t1 and t2 must be strictly increasing.
- Parameters:
data (
XPCSData) – Loaded XPCSData to validate.- Return type:
- Returns:
List of human-readable warning strings. An empty list means all checks passed.
- Raises:
DataValidationError – If any hard constraint is violated (NaN/Inf, shape mismatch, non-positive diagonal, non-monotonic time).
- heterodyne.data.xpcs_loader.probe_hdf5_structure(file_path)[source]
Inspect and report the structure of an HDF5 file.
Recursively walks the HDF5 tree and collects dataset names, shapes, dtypes, and top-level attributes. Useful for discovering key names before loading.
- Parameters:
- Returns:
"datasets": list of dicts, each with"path","shape","dtype"for every dataset in the file."groups": list of str paths for all groups."root_attrs": dict of attributes on the root/group."n_datasets": total dataset count."n_groups": total group count.
- Return type:
- Raises:
heterodyne.utils.path_validation.PathValidationError – If the file does not exist or is not readable.
- heterodyne.data.xpcs_loader.load_xpcs_batch(file_paths, c2_key='c2', time_key='t', format=None, use_cache=False, validate=False, apply_diag_correction=False, diag_correction_width=1, diag_correction_method='interpolate', frame_range=None, select_q=None, q_tolerance=None, allow_partial=False)[source]
Load multiple XPCS data files and return them as a list.
Each file is loaded independently using
XPCSDataLoader. By default, any per-file failure raises — silent partial batches were masking config errors and corrupted files. Passallow_partial=Trueto opt into the legacy skip-on-failure behaviour; failed paths are still logged atERRORlevel and omitted from the returned list.- Parameters:
file_paths (
list[Path|str]) – Sequence of paths to data files.c2_key (
str) – Key for correlation data in each file.time_key (
str) – Key for time array in each file.format (
str|None) – File format (auto-detected per file if None).use_cache (
bool) – If True, enable NPZ caching for each file.validate (
bool) – If True, runvalidate_loaded_data()on each result.apply_diag_correction (
bool) – If True, apply diagonal correction to each c2.diag_correction_width (
int) – Band width passed to diagonal correction.diag_correction_method (
str) – Method passed to diagonal correction.frame_range (
tuple[int,int] |None) – Optional(start, end)with 1-based inclusive frame indices applied uniformly to every file.select_q (
float|None) – Target wavevector in Å⁻¹ applied uniformly to every file.q_tolerance (
float|None) – Maximum absolute deviation fromselect_qin Å⁻¹.allow_partial (
bool) – If True, individual file failures are logged and skipped instead of raising. DefaultFalse— strict batches.
- Return type:
list[XPCSData]- Returns:
List of
XPCSDataobjects, one per successfully loaded file.- Raises:
OSError, ValueError, KeyError, RuntimeError – From the first per-file failure when
allow_partial=False(default).
- class heterodyne.data.xpcs_loader.XPCSDataLoader[source]
Bases:
objectLoader for XPCS correlation data from various file formats.
- __init__(file_path, format=None)[source]
Initialize loader.
- load(c2_key='c2', time_key='t', q_key='q', phi_key='phi', use_cache=False, frame_range=None, select_q=None, q_tolerance=None, cache_dir=None, cache_template=None, template_vars=None, cache_compression=True)[source]
Load XPCS data from file.
- Parameters:
c2_key (
str) – Key/path for correlation data.time_key (
str) – Key/path for time array.use_cache (
bool) – If True and the format supports caching (hdf5, mat), attempt to load from a collocated NPZ cache first. The cache is invalidated automatically when the source file’s mtime changes.frame_range (
tuple[int,int] |None) – Optional(start, end)with 1-based indexing (matching homodyne convention). If provided, only framesstartthroughend(inclusive) are retained after loading. Validation is performed before slicing.select_q (
float|None) – Target wavevector in Å⁻¹. When set and the loaded data hasq_values(multi-q 3-D), the q-bin(s) closest to this value are selected. Ifq_toleranceis also given, all bins within that tolerance are kept; otherwise only the single nearest bin is kept.q_tolerance (
float|None) – Maximum absolute deviation fromselect_qin Å⁻¹. Ignored whenselect_qisNone.
- Return type:
XPCSData- Returns:
XPCSData container.
- heterodyne.data.xpcs_loader.select_optimal_wavevector(q_values, target_q, tolerance=None)[source]
Select q-bin indices closest to a target wavevector.
- Parameters:
- Return type:
- Returns:
Tuple of
(selected_indices, selected_q_values)where both are 1-D NumPy arrays.selected_indicescontains integer indices intoq_values;selected_q_valuescontains the corresponding q-values.- Raises:
ValueError – If
q_valuesis empty.
- heterodyne.data.xpcs_loader.load_xpcs_data(file_path, c2_key='c2', time_key='t', format=None, use_cache=False, frame_range=None, select_q=None, q_tolerance=None, cache_dir=None, cache_template=None, template_vars=None, cache_compression=True)[source]
Convenience function to load XPCS data.
- Parameters:
c2_key (
str) – Key for correlation data.time_key (
str) – Key for time array.use_cache (
bool) – Enable NPZ caching to avoid re-reading large source files.frame_range (
tuple[int,int] |None) – Optional(start, end)with 1-based inclusive frame indices. SeeXPCSDataLoader.load()for details.select_q (
float|None) – Target wavevector in Å⁻¹ for q-bin selection. Applied only when the loaded data has multiple q-bins.q_tolerance (
float|None) – Maximum absolute deviation fromselect_qin Å⁻¹.Noneselects only the single closest bin.cache_dir (
Path|None) – Directory for cache files (None = collocate with source).cache_template (
str|None) – Filename template with${key}placeholders.template_vars (
dict[str,str] |None) – Substitution values for the template.cache_compression (
bool) – Whether to compress cache files.
- Return type:
XPCSData- Returns:
XPCSData container.
Validators¶
Input validation utilities for XPCS data arrays.
Complements the higher-level validation.py module by providing
fine-grained, composable checks on individual arrays. Each function
returns a list of error strings; an empty list indicates valid input.
- heterodyne.data.validators.validate_correlation_shape(c2, expected_shape=None)[source]
Validate shape of a correlation matrix.
Checks that
c2is 2D (single angle) or 3D (multi-angle batch), and optionally matches an expected shape.
- heterodyne.data.validators.validate_time_arrays(t1, t2)[source]
Validate time arrays for monotonicity and matching lengths.
- heterodyne.data.validators.validate_q_range(q, q_min, q_max)[source]
Validate that wavevector values fall within the specified range.
- heterodyne.data.validators.validate_weights(weights, data_shape)[source]
Validate weight array for non-negativity and shape compatibility.
Preprocessing¶
Preprocessing pipeline for XPCS correlation data.
- class heterodyne.data.preprocessing.PreprocessingResult[source]
Bases:
objectResult of preprocessing operations.
- c2: ndarray
- __init__(c2, applied_steps=<factory>, statistics=<factory>)
- class heterodyne.data.preprocessing.PreprocessingPipeline[source]
Bases:
objectPipeline for preprocessing XPCS correlation data.
Supports operations like: - Baseline subtraction - Normalization - Outlier removal - Smoothing
- __init__()[source]
Initialize empty pipeline.
- add_step(name, func)[source]
Add a preprocessing step.
- normalize_diagonal()[source]
Add diagonal normalization step.
Normalizes c2 so that diagonal values are 1.
- Return type:
PreprocessingPipeline
- subtract_baseline(baseline=1.0)[source]
Add baseline subtraction step.
- Parameters:
baseline (
float) – Baseline value to subtract- Return type:
PreprocessingPipeline
- clip_values(min_val=None, max_val=None)[source]
Add value clipping step.
- remove_outliers(n_sigma=5.0, replace_with='median')[source]
Add outlier removal step.
- symmetrize()[source]
Add symmetrization step for correlation matrices.
Makes c2(t1, t2) = c2(t2, t1). Handles both 2D and 3D (batch) data.
- Return type:
PreprocessingPipeline
- crop_time(t_start=0, t_end=None)[source]
Add time cropping step.
- process(c2)[source]
Apply all preprocessing steps.
- Parameters:
c2 (
ndarray) – Input correlation array- Return type:
PreprocessingResult- Returns:
PreprocessingResult with processed data
- process_with_provenance(c2, source_file=None)[source]
Apply all preprocessing steps and record full provenance.
Generates a unique pipeline ID and computes per-step input/output hashes so that every transformation can be audited.
- heterodyne.data.preprocessing.preprocess_correlation(c2, normalize=True, remove_outliers=True, symmetrize=True)[source]
Convenience function for standard preprocessing.
- class heterodyne.data.preprocessing.PreprocessingStage[source]
Bases:
EnumStages in the preprocessing pipeline.
- LOAD_RAW = 'load_raw'
- VALIDATE_INPUT = 'validate_input'
- NORMALIZE = 'normalize'
- CORRECT_BASELINE = 'correct_baseline'
- REDUCE_NOISE = 'reduce_noise'
- TRANSFORM = 'transform'
- VALIDATE_OUTPUT = 'validate_output'
- class heterodyne.data.preprocessing.NormalizationMethod[source]
Bases:
EnumAvailable normalization methods.
- DIAGONAL = 'diagonal'
- BASELINE = 'baseline'
- ZSCORE = 'zscore'
- MINMAX = 'minmax'
- ROBUST = 'robust'
- PHYSICS_BASED = 'physics_based'
- NONE = 'none'
- class heterodyne.data.preprocessing.NoiseReductionMethod[source]
Bases:
EnumAvailable noise reduction methods.
- MEDIAN_FILTER = 'median_filter'
- GAUSSIAN_SMOOTH = 'gaussian_smooth'
- WIENER_FILTER = 'wiener_filter'
- SAVGOL_FILTER = 'savgol_filter'
- WAVELET = 'wavelet'
- NONE = 'none'
- class heterodyne.data.preprocessing.TransformationRecord[source]
Bases:
objectRecord of a single preprocessing transformation for provenance tracking.
- stage
Which preprocessing stage this belongs to.
- method
Name of the method applied.
- parameters
Parameters used for the transformation.
- timestamp
ISO-format timestamp of when the transformation was applied.
- input_hash
SHA-256 hash of the input array.
- output_hash
SHA-256 hash of the output array.
- stage: PreprocessingStage
- method: str
- timestamp: str
- input_hash: str
- output_hash: str
- __init__(stage, method, parameters, timestamp, input_hash, output_hash)
- class heterodyne.data.preprocessing.PreprocessingProvenance[source]
Bases:
objectAudit trail for preprocessing operations.
Tracks every transformation applied to data, including hashes of input/output arrays for reproducibility verification.
- records
List of transformation records.
- source_file
Path to the source data file, if applicable.
- created_at
ISO-format creation timestamp.
- pipeline_id
Unique identifier for this pipeline run.
- config_hash
Hash of the pipeline configuration for reproducibility.
- records: list[TransformationRecord]
- created_at: str
- pipeline_id: str = ''
- config_hash: str = ''
- add_record(record)[source]
Append a transformation record to the provenance trail.
- Parameters:
record (
TransformationRecord) – The transformation record to add.- Return type:
- to_dict()[source]
Serialize provenance to a dictionary.
- classmethod from_dict(d)[source]
Deserialize provenance from a dictionary.
- __init__(records=<factory>, source_file=None, created_at=<factory>, pipeline_id='', config_hash='')
- heterodyne.data.preprocessing.normalize_zscore(c2)[source]
Z-score normalization: (c2 - mean) / std.
Handles zero standard deviation by returning a zero-centred array.
- heterodyne.data.preprocessing.normalize_minmax(c2)[source]
Min-max normalization: (c2 - min) / (max - min).
Handles degenerate case (max == min) by returning zeros.
- heterodyne.data.preprocessing.normalize_robust(c2)[source]
Robust normalization: (c2 - median) / IQR.
Uses the interquartile range (Q3 - Q1) as the scale factor. Handles IQR == 0 by returning (c2 - median).
- heterodyne.data.preprocessing.apply_baseline_correction(c2, baseline=None, method='subtract')[source]
Apply baseline correction to a correlation matrix.
- Parameters:
c2 (
ndarray) – Input correlation array (2D or 3D).baseline (
ndarray|float|None) – Baseline value(s). If None, the baseline is estimated from the last 10% of off-diagonal elements (far from the diagonal, corresponding to long time delays).method (
str) – Correction strategy —"subtract","divide", or"polynomial".
- Return type:
- Returns:
Baseline-corrected array.
- Raises:
ValueError – If method is not one of the supported strategies.
- heterodyne.data.preprocessing.apply_noise_reduction(c2, method=NoiseReductionMethod.GAUSSIAN_SMOOTH, **kwargs)[source]
Apply noise reduction to a correlation matrix.
- Parameters:
- Return type:
- Returns:
Noise-reduced array.
- heterodyne.data.preprocessing.process_chunked(c2, pipeline, chunk_size=100)[source]
Process data in chunks along the first (batch) axis.
For 3D data
(n_phi, n_t, n_t), slices are processed in groups of chunk_size along axis 0 and concatenated. For 2D data the pipeline is applied directly.Steps that use array-wide statistics (outlier removal, baseline subtraction, z-score / min-max / robust normalization) are rejected with
ValueErrorwhen chunking would be required — per-chunk statistics would produce mismatched scales between chunks. Run the pipeline whole or remove those steps before chunking.- Parameters:
- Return type:
PreprocessingResult- Returns:
Combined
PreprocessingResult.- Raises:
ValueError – If the pipeline contains steps requiring global statistics and chunking would actually be applied (i.e.
c2.ndim == 3andn_phi > chunk_size).
- heterodyne.data.preprocessing.preprocess_xpcs_data(c2, normalize_method=NormalizationMethod.DIAGONAL, noise_reduction=NoiseReductionMethod.NONE, remove_outliers=False, symmetrize=True, baseline_correction=False, **kwargs)[source]
Convenience function for comprehensive XPCS preprocessing.
Builds a
PreprocessingPipelinewith the requested steps and applies it to c2. The processing order is:Outlier removal (optional)
Symmetrization (optional)
Baseline correction (optional)
Normalization (configurable method)
Noise reduction (configurable method)
- Parameters:
c2 (
ndarray) – Input correlation array.normalize_method (
NormalizationMethod) – Which normalization to apply.noise_reduction (
NoiseReductionMethod) – Which noise reduction to apply.remove_outliers (
bool) – Whether to remove outliers before other steps.symmetrize (
bool) – Whether to symmetrize the correlation matrix.baseline_correction (
bool) – Whether to apply baseline correction.**kwargs (
Any) – Extra keyword arguments forwarded to noise reduction (e.g.kernel_size,sigma).
- Return type:
PreprocessingResult- Returns:
PreprocessingResultwith processed data, step list, and summary statistics.
Quality Controller¶
Comprehensive data quality assessment for XPCS correlation data.
- class heterodyne.data.quality_controller.QualityLevel[source]
Bases:
EnumQuality classification levels, ordered from best to worst.
- GOOD = 'good'
- ACCEPTABLE = 'acceptable'
- WARNING = 'warning'
- CRITICAL = 'critical'
- class heterodyne.data.quality_controller.QualityMetric[source]
Bases:
objectA single quality assessment metric.
- name
Human-readable metric name.
- value
Measured value of the metric.
- threshold
Threshold used for classification.
- level
Quality level assigned to this metric.
- message
Descriptive message explaining the assessment.
- name: str
- value: float
- threshold: float
- level: QualityLevel
- message: str
- __init__(name, value, threshold, level, message)
- class heterodyne.data.quality_controller.QualityReport[source]
Bases:
objectAggregated quality report from multiple metrics.
- metrics
Individual quality metrics.
- overall_level
Worst-case level across all metrics.
- recommendations
Actionable suggestions for improving data quality.
- metrics: list[QualityMetric]
- overall_level: QualityLevel = 'good'
- summary()[source]
Generate a human-readable quality report.
- Return type:
- Returns:
Multi-line summary string.
- __init__(metrics=<factory>, overall_level=QualityLevel.GOOD, recommendations=<factory>)
- class heterodyne.data.quality_controller.QualityController[source]
Bases:
objectAssess quality of XPCS correlation data.
Runs a battery of checks and produces a
QualityReport.- Parameters:
config (
DataConfig|None) – Optional data configuration; thresholds may be drawn from it in the future.
- __init__(config=None)[source]
- assess(c2, t, q=None, phi_angles=None)[source]
Run all quality checks and produce a report.
- Parameters:
- Return type:
QualityReport- Returns:
QualityReport with per-metric assessments and recommendations.
- class heterodyne.data.quality_controller.QualityControlStage[source]
Bases:
EnumProcessing stage at which quality assessment is performed.
- RAW = 'raw'
- FILTERED = 'filtered'
- PREPROCESSED = 'preprocessed'
- FINAL = 'final'
- class heterodyne.data.quality_controller.QualityControlConfig[source]
Bases:
objectConfiguration for the 4-stage quality control pipeline.
- nan_threshold
Maximum acceptable NaN fraction (0-1).
- snr_threshold
Minimum acceptable signal-to-noise ratio.
- symmetry_threshold
Maximum acceptable relative asymmetry.
- value_range_max
Maximum acceptable absolute value.
- min_time_points
Minimum required number of time points.
- auto_repair_nans
Whether to automatically interpolate NaN values.
- auto_repair_outliers
Whether to automatically clip outlier values.
- outlier_sigma
Number of standard deviations beyond which values are considered outliers (used when
auto_repair_outliersis True).
- report_format
Output format for reports,
"text"or"json".
- nan_threshold: float = 0.05
- snr_threshold: float = 5.0
- symmetry_threshold: float = 0.01
- value_range_max: float = 100.0
- min_time_points: int = 10
- auto_repair_nans: bool = False
- auto_repair_outliers: bool = False
- outlier_sigma: float = 5.0
- report_format: str = 'text'
- __init__(nan_threshold=0.05, snr_threshold=5.0, symmetry_threshold=0.01, value_range_max=100.0, min_time_points=10, auto_repair_nans=False, auto_repair_outliers=False, outlier_sigma=5.0, report_format='text')
- class heterodyne.data.quality_controller.QualityControlResult[source]
Bases:
objectResult of a single-stage quality assessment.
- stage
The pipeline stage this result corresponds to.
- report
The underlying
QualityReportproduced by the controller.
- auto_corrections
Descriptions of any automatic corrections applied.
- recommendations
Actionable suggestions derived from the assessment.
- quality_score
Aggregate quality score in [0, 1] (1 = perfect).
- metadata
Arbitrary key-value metadata for downstream consumers.
- stage: QualityControlStage
- report: QualityReport
- quality_score: float = 1.0
- __init__(stage, report, auto_corrections=<factory>, recommendations=<factory>, quality_score=1.0, metadata=<factory>)
- heterodyne.data.quality_controller.assess_stage(controller, c2, t, stage, config=None)[source]
Run a quality assessment for a specific pipeline stage.
- Parameters:
- Return type:
QualityControlResult- Returns:
A
QualityControlResultwith score and recommendations.
- heterodyne.data.quality_controller.suggest_fixes(report)[source]
Analyse a
QualityReportand suggest concrete corrective actions.
- heterodyne.data.quality_controller.apply_auto_corrections(c2, t, report, config)[source]
Apply automatic corrections to data based on quality report.
The input arrays are not mutated; a corrected copy of c2 is returned.
- Parameters:
- Return type:
- Returns:
Tuple of (corrected_c2, list_of_correction_descriptions).
- heterodyne.data.quality_controller.export_report(result, format='text')[source]
Export a
QualityControlResultin the requested format.- Parameters:
result (
QualityControlResult) – Quality control result to export.format (
str) –"text"for a human-readable string,"json"for a JSON-safe dictionary.
- Return type:
- Returns:
Formatted report as a string or dict.
- Raises:
ValueError – If format is not
"text"or"json".
- heterodyne.data.quality_controller.track_quality_history(history, new_result)[source]
Append a new result to the quality history and log the trend.
- heterodyne.data.quality_controller.run_4_stage_pipeline(controller, c2, t, config=None)[source]
Execute the full 4-stage quality control pipeline.
Stages are run in order: RAW -> FILTERED -> PREPROCESSED -> FINAL. Between stages, automatic corrections are applied when enabled in config. Adaptive thresholds are computed once from the raw data.
- Parameters:
- Return type:
list[QualityControlResult]- Returns:
List of four
QualityControlResultobjects, one per stage.
Memory Manager¶
Memory budget tracking for large XPCS datasets.
Provides allocation tracking, budget enforcement, and chunk-size suggestions so that downstream code can stay within a configurable memory envelope.
- class heterodyne.data.memory_manager.MemoryBudget[source]
Bases:
objectSnapshot of memory budget state.
- total_bytes
Total budget available.
- allocated_bytes
Currently tracked allocations.
- peak_bytes
Highest allocated_bytes observed.
- total_bytes: int
- allocated_bytes: int
- peak_bytes: int
- __init__(total_bytes, allocated_bytes, peak_bytes)
- class heterodyne.data.memory_manager.MemoryManager[source]
Bases:
objectTrack memory allocations against a configurable budget.
When budget_bytes is
Nonethe manager auto-detects available system memory viapsutil.virtual_memory(), falling back to 8 GB if psutil is not installed.All public methods are thread-safe.
- __init__(budget_bytes=None)[source]
- request(n_bytes, label)[source]
Request an allocation of n_bytes tracked under label.
If the allocation would exceed the budget, the request is denied and
Falseis returned. Existing allocations with the same label are released first (idempotent re-allocation).
- release(label)[source]
Release a tracked allocation.
No-op if the label is not currently tracked.
- static estimate_array_size(shape, dtype=np.float64)[source]
Estimate the memory footprint of an array.
- get_budget()[source]
Return a snapshot of the current budget state.
- Return type:
MemoryBudget- Returns:
MemoryBudget dataclass.
- suggest_chunk_size(total_elements, element_bytes)[source]
Suggest an optimal chunk size that fits within available budget.
The returned chunk size will use at most half of the remaining budget so that headroom is preserved for intermediate buffers.
- class heterodyne.data.memory_manager.MemoryMapManager[source]
Bases:
objectManage memory-mapped access to large HDF5 datasets.
For datasets that exceed available RAM, this provides read-only memory-mapped access via h5py’s direct chunk reading, avoiding full materialization of arrays into memory.
- Parameters:
max_resident_bytes (
int) – Maximum bytes to keep resident in memory at any time. Defaults to 2 GB.
- __init__(max_resident_bytes=2 * 1024 * 1024 * 1024)[source]
- open_dataset(file_path, dataset_path)[source]
Open an HDF5 dataset for memory-mapped-like access.
Returns the raw
h5py.Datasetproxy — supports NumPy-style slicing (ds[:, 100:200]) and reads chunks on demand without materializing the full array. Useread_slice()for explicit partial reads ormaterialize()when the caller truly needs an in-memory ndarray.- Parameters:
- Return type:
- Returns:
h5py.Datasetproxy with lazy chunk access.- Raises:
MemoryError – If the dataset size exceeds
max_resident_bytesand the caller would have to materialize it — guard against accidental OOM by callingread_slice()instead.
- materialize(file_path, dataset_path)[source]
Eagerly load an HDF5 dataset into memory as an
ndarray.Raises
MemoryErrorif the estimated size exceedsmax_resident_bytes— callers must explicitly opt out of the limit by raising it, or useread_slice()for partial loads.- Return type:
- read_slice(file_path, dataset_path, slices)[source]
Read a specific slice from an HDF5 dataset without loading the full array.
- estimate_dataset_size(file_path, dataset_path)[source]
Estimate the in-memory size of an HDF5 dataset without loading it.
- __enter__()[source]
Support use as a context manager.
- Return type:
MemoryMapManager
- class heterodyne.data.memory_manager.ChunkInfo[source]
Bases:
objectMetadata for a single processing chunk.
- start
Start index along the batch axis.
- end
End index (exclusive) along the batch axis.
- size_bytes
Estimated memory footprint of this chunk.
- priority
Processing priority (lower = higher priority).
- start: int
- end: int
- size_bytes: int
- priority: int = 0
- __init__(start, end, size_bytes, priority=0)
- class heterodyne.data.memory_manager.AdaptiveChunker[source]
Bases:
objectCompute chunk sizes that adapt to available memory and data characteristics.
Unlike fixed chunking, this class monitors memory pressure and adjusts chunk sizes dynamically. Chunks near the diagonal of correlation matrices (small time lag) are given higher priority since they carry more signal.
- Parameters:
memory_manager (
MemoryManager) – MemoryManager instance for budget awareness.safety_factor (
float) – Fraction of available memory to actually use (default 0.5).
- __init__(memory_manager, safety_factor=0.5)[source]
- class heterodyne.data.memory_manager.MemoryPressureLevel[source]
Bases:
EnumSystem memory pressure classification.
- LOW = 'low'
- MODERATE = 'moderate'
- HIGH = 'high'
- CRITICAL = 'critical'
- class heterodyne.data.memory_manager.MemoryPressureMonitor[source]
Bases:
objectMonitor system memory pressure and trigger adaptive responses.
Polls system memory usage via psutil (with graceful fallback) and classifies pressure into levels that downstream code can use to adjust batch sizes, enable compression, or skip optional caching.
- Parameters:
poll_interval_seconds (
float) – Minimum seconds between actual system polls (cached between polls). Defaults to 5.0.
- __init__(poll_interval_seconds=5.0)[source]
- current_pressure()[source]
Return the current memory pressure level.
Uses cached value if polled recently (within poll_interval_seconds).
- Return type:
MemoryPressureLevel- Returns:
Current MemoryPressureLevel.
- available_bytes()[source]
Return available system memory in bytes.
- Return type:
- Returns:
Available memory, or a conservative default if psutil unavailable.
- should_reduce_allocation()[source]
Return True if memory pressure suggests reducing allocations.
Returns True when pressure is HIGH or CRITICAL.
- Return type: