Source code for roiextractors.extractors.hdf5imagingextractor.hdf5imagingextractor
"""An imaging extractor for HDF5.
Classes
-------
Hdf5ImagingExtractor
An imaging extractor for HDF5.
"""
from pathlib import Path
from warnings import warn
import h5py
import numpy as np
from lazy_ops import DatasetView
from numpy.typing import ArrayLike
from ...extraction_tools import PathType
from ...imagingextractor import ImagingExtractor
[docs]
class Hdf5ImagingExtractor(ImagingExtractor):
"""An imaging extractor for HDF5."""
extractor_name = "Hdf5Imaging"
def __init__(
self,
file_path: PathType,
mov_field: str = "mov",
sampling_frequency: float = None,
start_time: float = None,
metadata: dict = None,
channel_names: ArrayLike = None,
):
"""Create an ImagingExtractor from an HDF5 file.
Parameters
----------
file_path : str or Path
Path to the HDF5 file.
mov_field : str, optional
Name of the dataset in the HDF5 file that contains the imaging data. The default is "mov".
sampling_frequency : float, optional
Sampling frequency of the video. The default is None.
start_time : float, optional
Start time of the video. The default is None.
metadata : dict, optional
Metadata dictionary. The default is None.
channel_names : array-like, optional
List of channel names. The default is None.
"""
ImagingExtractor.__init__(self)
self.filepath = Path(file_path)
self._sampling_frequency = sampling_frequency
self._mov_field = mov_field
if self.filepath.suffix not in [".h5", ".hdf5"]:
warn("'file_path' file is not an .hdf5 or .h5 file")
self._channel_names = channel_names
self._file = h5py.File(file_path, "r")
if mov_field in self._file.keys():
self._video = DatasetView(self._file[self._mov_field])
if sampling_frequency is None:
assert "fr" in self._video.attrs, (
"Sampling frequency is unavailable as a dataset attribute! "
"Please set the keyword argument 'sampling_frequency'"
)
self._sampling_frequency = float(self._video.attrs["fr"])
else:
self._sampling_frequency = sampling_frequency
else:
raise Exception(f"{file_path} does not contain the 'mov' dataset")
if start_time is None:
if "start_time" in self._video.attrs.keys():
self._start_time = self._video.attrs["start_time"]
else:
self._start_time = start_time
if metadata is None:
if "metadata" in self._video.attrs:
self.metadata = self._video.attrs["metadata"]
else:
self.metadata = metadata
# The test data has four dimensions and the first axis is channels
self._num_channels, self._num_samples, self._num_rows, self._num_cols = self._video.shape
self._video = self._video.lazy_transpose([1, 2, 3, 0])
if self._channel_names is not None:
assert len(self._channel_names) == self._num_channels, (
"'channel_names' length is different than number " "of channels"
)
else:
self._channel_names = [f"channel_{ch}" for ch in range(self._num_channels)]
self._kwargs = {
"file_path": str(Path(file_path).absolute()),
"mov_field": mov_field,
"sampling_frequency": sampling_frequency,
"channel_names": channel_names,
}
[docs]
def get_series(self, start_sample=None, end_sample=None) -> np.ndarray:
return self._video.lazy_slice[start_sample:end_sample, :, :, 0].dsetread()
[docs]
def get_image_shape(self) -> tuple[int, int]:
"""Get the shape of the video frame (num_rows, num_columns).
Returns
-------
image_shape: tuple
Shape of the video frame (num_rows, num_columns).
"""
return self._num_rows, self._num_cols
[docs]
def get_native_timestamps(
self, start_sample: int | None = None, end_sample: int | None = None
) -> np.ndarray | None:
"""Retrieve the original unaltered timestamps for the data in this interface.
Returns
-------
timestamps: numpy.ndarray or None
The timestamps for the data stream, or None if native timestamps are not available.
"""
# HDF5 imaging data does not have native timestamps
return None