Source code for roiextractors.extractors.hdf5imagingextractor.hdf5imagingextractor

"""An imaging extractor for HDF5.

Classes
-------
Hdf5ImagingExtractor
    An imaging extractor for HDF5.
"""

from pathlib import Path
from warnings import warn

import h5py
import numpy as np
from lazy_ops import DatasetView
from numpy.typing import ArrayLike

from ...extraction_tools import PathType
from ...imagingextractor import ImagingExtractor


[docs] class Hdf5ImagingExtractor(ImagingExtractor): """An imaging extractor for HDF5.""" extractor_name = "Hdf5Imaging" def __init__( self, file_path: PathType, mov_field: str = "mov", sampling_frequency: float = None, start_time: float = None, metadata: dict = None, channel_names: ArrayLike = None, ): """Create an ImagingExtractor from an HDF5 file. Parameters ---------- file_path : str or Path Path to the HDF5 file. mov_field : str, optional Name of the dataset in the HDF5 file that contains the imaging data. The default is "mov". sampling_frequency : float, optional Sampling frequency of the video. The default is None. start_time : float, optional Start time of the video. The default is None. metadata : dict, optional Metadata dictionary. The default is None. channel_names : array-like, optional List of channel names. The default is None. """ ImagingExtractor.__init__(self) self.filepath = Path(file_path) self._sampling_frequency = sampling_frequency self._mov_field = mov_field if self.filepath.suffix not in [".h5", ".hdf5"]: warn("'file_path' file is not an .hdf5 or .h5 file") self._channel_names = channel_names self._file = h5py.File(file_path, "r") if mov_field in self._file.keys(): self._video = DatasetView(self._file[self._mov_field]) if sampling_frequency is None: assert "fr" in self._video.attrs, ( "Sampling frequency is unavailable as a dataset attribute! " "Please set the keyword argument 'sampling_frequency'" ) self._sampling_frequency = float(self._video.attrs["fr"]) else: self._sampling_frequency = sampling_frequency else: raise Exception(f"{file_path} does not contain the 'mov' dataset") if start_time is None: if "start_time" in self._video.attrs.keys(): self._start_time = self._video.attrs["start_time"] else: self._start_time = start_time if metadata is None: if "metadata" in self._video.attrs: self.metadata = self._video.attrs["metadata"] else: self.metadata = metadata # The test data has four dimensions and the first axis is channels self._num_channels, self._num_samples, self._num_rows, self._num_cols = self._video.shape self._video = self._video.lazy_transpose([1, 2, 3, 0]) if self._channel_names is not None: assert len(self._channel_names) == self._num_channels, ( "'channel_names' length is different than number " "of channels" ) else: self._channel_names = [f"channel_{ch}" for ch in range(self._num_channels)] self._kwargs = { "file_path": str(Path(file_path).absolute()), "mov_field": mov_field, "sampling_frequency": sampling_frequency, "channel_names": channel_names, }
[docs] def __del__(self): """Close the HDF5 file.""" self._file.close()
[docs] def get_series(self, start_sample=None, end_sample=None) -> np.ndarray: return self._video.lazy_slice[start_sample:end_sample, :, :, 0].dsetread()
[docs] def get_image_shape(self) -> tuple[int, int]: """Get the shape of the video frame (num_rows, num_columns). Returns ------- image_shape: tuple Shape of the video frame (num_rows, num_columns). """ return self._num_rows, self._num_cols
[docs] def get_num_samples(self): return self._num_samples
[docs] def get_sampling_frequency(self): return self._sampling_frequency
[docs] def get_native_timestamps( self, start_sample: int | None = None, end_sample: int | None = None ) -> np.ndarray | None: """Retrieve the original unaltered timestamps for the data in this interface. Returns ------- timestamps: numpy.ndarray or None The timestamps for the data stream, or None if native timestamps are not available. """ # HDF5 imaging data does not have native timestamps return None