Source code for roiextractors.extractors.tiffimagingextractors.scanimagetiffimagingextractor
"""Specialized extractor for reading TIFF files produced via ScanImage.
Classes
-------
ScanImageLegacyImagingExtractor
Specialized extractor for reading TIFF files produced via ScanImage.
"""
import warnings
from pathlib import Path
from warnings import warn
import numpy as np
from .scanimagetiff_utils import (
_get_scanimage_reader,
)
from ...extraction_tools import DtypeType, FloatType, PathType, get_package
from ...imagingextractor import ImagingExtractor
[docs]
class ScanImageImagingExtractor(ImagingExtractor):
"""
Specialized extractor for reading TIFF files produced via ScanImage software.
This extractor is designed to handle the structure of ScanImage TIFF files, which can contain
multi channel and both planar and volumetric data. It also supports both single-file and multi-file datasets generated
by ScanImage in various acquisition modes (grab, focus, loop).
The extractor creates a mapping between each frame in the dataset and its corresponding physical file
and IFD (Image File Directory) location. This mapping enables efficient retrieval of specific frames
without loading the entire dataset into memory, making it suitable for large datasets.
For datasets with multiple frames per slice, either a slice_sample parameter must be provided
or interleave_slice_samples must be set to True to explicitly opt into interleaving behavior.
Key features:
- Handles multi-channel data with channel selection
- Supports volumetric (multi-plane) imaging data
- Automatically detects and loads multi-file datasets based on ScanImage naming conventions
- Extracts and provides access to ScanImage metadata
- Efficiently retrieves frames using lazy loading
- Handles flyback frames in volumetric data by ignoring them in the mapping
"""
extractor_name = "ScanImageImagingExtractor"
def __init__(
self,
file_path: PathType | None = None,
channel_name: str | None = None,
file_paths: list[PathType] | None = None,
slice_sample: int | None = None,
plane_index: int | None = None,
interleave_slice_samples: bool = False,
):
"""
Initialize the ScanImageImagingExtractor.
Parameters
----------
file_path : PathType, optional
Path to the ScanImage TIFF file. If this is part of a multi-file series, this should be the first file.
Either `file_path` or `file_paths` must be provided.
channel_name : str, optional
Name of the channel to extract (e.g., "Channel 1", "Channel 2").
- If None and only one channel is available, that channel will be used.
- If None and multiple channels are available, an error will be raised.
- Use `get_available_channel_names(file_path)` to see available channels before creating the extractor.
file_paths : list[PathType], optional
List of file paths to use. If provided, this overrides the automatic file detection heuristics.
Use this parameter when:
- Automatic detection doesn't work correctly
- You need to specify a custom subset of files
- You need to control the exact order of files
The file paths must be provided in the temporal order of the frames in the dataset.
slice_sample : int, optional
Controls how to handle multiple frames per slice in volumetric data:
- If an integer (0 to frames_per_slice-1): Uses only that specific frame for each slice,
effectively selecting a single sample from each acquisition.
- If None (default): Requires interleave_slice_samples=True when frames_per_slice > 1.
- This parameter has no effect when frames_per_slice = 1.
- Use `get_frames_per_slice(file_path)` to check the number of frames per slice.
interleave_slice_samples : bool, optional
Controls whether to interleave all slice samples as separate time points when frames_per_slice > 1:
- If True: Interleaves all slice samples as separate time points, increasing the effective
number of samples by frames_per_slice. This treats each slice_sample as a distinct sample.
- If False (default): Requires a specific slice_sample to be provided when frames_per_slice > 1.
- This parameter has no effect when frames_per_slice = 1 or when slice_sample is provided.
plane_index : int, optional
Must be between 0 and num_planes-1. Used to extract a specific plane from volumetric data.
When provided:
- The resulting extractor will be planar (is_volumetric = False)
- Each sample will contain only data for the specified plane
- The shape of returned data will be (samples, height, width) instead of (samples, height, width, planes)
- This parameter has no effect on planar (non-volumetric) data.
Examples
--------
# Basic usage with a single file, single channel
>>> extractor = ScanImageImagingExtractor(file_path='path/to/file.tif')
# Multi-channel data, selecting a specific channel
>>> channel_names = ScanImageImagingExtractor.get_available_channel_names('path/to/file.tif')
>>> extractor = ScanImageImagingExtractor(file_path='path/to/file.tif', channel_name=channel_names[0])
# Volumetric data with multiple frames per slice, selecting a specific slice sample
>>> frames_per_slice = ScanImageImagingExtractor.get_frames_per_slice('path/to/file.tif')
>>> extractor = ScanImageImagingExtractor(file_path='path/to/file.tif', slice_sample=0)
# Volumetric data, extracting a specific plane
>>> extractor = ScanImageImagingExtractor(file_path='path/to/file.tif', plane_index=2)
# Explicitly specifying multiple files
>>> extractor = ScanImageImagingExtractor(
... file_paths=['path/to/file1.tif', 'path/to/file2.tif', 'path/to/file3.tif'],
... channel_name='Channel 1'
... )
"""
super().__init__()
self.file_path = file_paths[0] if file_paths is not None else file_path
assert self.file_path is not None, "file_path or file_paths must be provided"
self.file_path = Path(self.file_path)
# Validate file suffix
valid_suffixes = [".tiff", ".tif", ".TIFF", ".TIF"]
if self.file_path.suffix not in valid_suffixes:
suffix_string = ", ".join(valid_suffixes[:-1]) + f", or {valid_suffixes[-1]}"
warn(
f"Suffix ({self.file_path.suffix}) is not of type {suffix_string}! "
f"The {self.extractor_name} Extractor may not be appropriate for the file."
)
# Open the TIFF file
tifffile = get_package(package_name="tifffile")
tiff_reader = tifffile.TiffReader(self.file_path)
self._general_metadata = tiff_reader.scanimage_metadata
non_valid_metadata = self._general_metadata is None or len(self._general_metadata) == 0
if non_valid_metadata:
error_msg = (
f"Invalid metadata for file with name {file_path.name}. \n"
"The metadata is either None or empty which probably indicates that the tiff file "
"Is not a ScanImage file or it could be an older version."
)
raise ValueError("Invalid metadata: The metadata is either None or empty.")
self._metadata = self._general_metadata["FrameData"]
self._num_rows, self._num_columns = tiff_reader.pages[0].shape
self._dtype = tiff_reader.pages[0].dtype
# Check if stack manager is enabled and if there are multiple slices
# This criteria was confirmed by Lawrence Niu, a developer of ScanImage
# but we need to also check numSlices > 1 because some planar datasets
# have SI.hStackManager.enable = True but only one slice
stack_enabled = self._metadata["SI.hStackManager.enable"]
num_slices = self._metadata["SI.hStackManager.numSlices"]
self.is_volumetric = stack_enabled and num_slices > 1
if self.is_volumetric:
self._sampling_frequency = self._metadata["SI.hRoiManager.scanVolumeRate"]
self._num_planes = self._metadata["SI.hStackManager.numSlices"]
self._frames_per_slice = self._metadata["SI.hStackManager.framesPerSlice"]
if self._frames_per_slice == 1:
self._slice_sample = None
elif slice_sample is not None:
if not (0 <= slice_sample < self._frames_per_slice):
error_msg = f"slice_sample must be between 0 and {self._frames_per_slice - 1} (frames_per_slice - 1), but got {slice_sample}."
raise ValueError(error_msg)
self._slice_sample = slice_sample
# Case: multiple frames per slice, no slice_sample, but interleaving explicitly enabled
elif interleave_slice_samples:
self._slice_sample = None
# Error case: multiple frames per slice, no slice_sample, interleaving not enabled
else:
error_msg = (
f"Multiple frames per slice detected ({self._frames_per_slice}), but no slice_sample specified. "
f"Either provide a specific slice_sample (0 to {self._frames_per_slice - 1}) or set "
f"interleave_slice_samples=True to explicitly opt into interleaving all slice samples as separate time points."
)
raise ValueError(error_msg)
self._frames_per_volume_per_channel = self._metadata["SI.hStackManager.numFramesPerVolume"]
self._frames_per_volume_with_flyback = self._metadata["SI.hStackManager.numFramesPerVolumeWithFlyback"]
self.num_flyback_frames_per_channel = (
self._frames_per_volume_with_flyback - self._frames_per_volume_per_channel
)
else:
self._sampling_frequency = self._metadata["SI.hRoiManager.scanFrameRate"]
self._num_planes = 1
self._frames_per_slice = 1
self.num_flyback_frames_per_channel = 0
# This piece of the metadata is the indication that the channel is saved on the data
channels_available = self._metadata["SI.hChannels.channelSave"]
channels_available = [channels_available] if isinstance(channels_available, int) else channels_available
self._num_channels = len(channels_available)
# Determine their name and use matlab 1-indexing
all_channel_names = self._metadata["SI.hChannels.channelName"]
self.channel_names = [all_channel_names[channel_index - 1] for channel_index in channels_available]
# Channel selection checks
self._is_multi_channel_data = len(self.channel_names) > 1
if self._is_multi_channel_data and channel_name is None:
error_msg = (
f"Multiple channels available in the data {self.channel_names}"
"Please specify a channel name to extract data from."
)
raise ValueError(error_msg)
elif self._is_multi_channel_data and channel_name is not None:
if channel_name not in self.channel_names:
error_msg = (
f"Channel name ({channel_name}) not found in available channels ({self.channel_names}). "
"Please specify a valid channel name."
)
raise ValueError(error_msg)
self.channel_name = channel_name
self._channel_index = self.channel_names.index(channel_name)
else: # single channel data
self.channel_name = self.channel_names[0]
self._channel_index = 0
# Check if this is a multi-file dataset
if file_paths is None:
self.file_paths = self._find_data_files()
else:
self.file_paths = file_paths
# Open all TIFF files and store only file readers for lazy loading
total_ifds = 0
self._tiff_readers = []
for file_path in self.file_paths:
try:
tiff_reader = tifffile.TiffFile(file_path)
self._tiff_readers.append(tiff_reader)
total_ifds += len(tiff_reader.pages)
except Exception as e:
for tiff_reader in self._tiff_readers:
tiff_reader.close()
raise RuntimeError(f"Error opening TIFF file {file_path}: {e}")
# Calculate total IFDs and samples
self._ifds_per_file = [len(tiff_reader.pages) for tiff_reader in self._tiff_readers]
# Note that this includes all the frames for all the channels including flyback frames
self._num_frames_in_dataset = sum(self._ifds_per_file)
image_frames_per_cycle = self._num_planes * self._num_channels * self._frames_per_slice
total_frames_per_cycle = image_frames_per_cycle + self.num_flyback_frames_per_channel * self._num_channels
# Note that the acquisition might end without completing the last cycle and we discard those frames
num_acquisition_cycles = self._num_frames_in_dataset // (total_frames_per_cycle)
# Every cycle is a full channel sample either volume or planar
self._num_samples = num_acquisition_cycles
# Map IFDs and files to frames, channel, depth, and acquisition cycle
full_frames_to_ifds_table = self._create_frame_to_ifd_table(
num_channels=self._num_channels,
num_planes=self._num_planes,
num_acquisition_cycles=num_acquisition_cycles,
num_frames_per_slice=self._frames_per_slice,
num_flyback_frames_per_channel=self.num_flyback_frames_per_channel,
ifds_per_file=self._ifds_per_file,
)
# Filter mapping for the specified channel
channel_mask = full_frames_to_ifds_table["channel_index"] == self._channel_index
channel_frames_to_ifd_table = full_frames_to_ifds_table[channel_mask]
self._frames_to_ifd_table = channel_frames_to_ifd_table
# Filter mapping for the specified slice_sample or reorder for all slice samples
if self.is_volumetric and interleave_slice_samples:
# Re-order to interleave samples from different slice_samples
# For each acquisition cycle, include all slice_samples in sequence
sorted_indices = np.lexsort(
(
channel_frames_to_ifd_table["depth_index"],
channel_frames_to_ifd_table["slice_sample_index"],
channel_frames_to_ifd_table["acquisition_cycle_index"],
)
)
self._frames_to_ifd_table = channel_frames_to_ifd_table[sorted_indices]
# Adjust the number of samples to account for interleaving of slice samples
# Each acquisition cycle now produces frames_per_slice x samples (one for each slice_sample)
self._num_samples = self._num_samples * self._frames_per_slice
if self.is_volumetric and self._slice_sample is not None:
# Filter for the specified slice_sample
slice_sample_mask = channel_frames_to_ifd_table["slice_sample_index"] == self._slice_sample
self._frames_to_ifd_table = channel_frames_to_ifd_table[slice_sample_mask]
# Finally, if a planar extractor is requested, we filter the samples for that plane
if self.is_volumetric and plane_index is not None:
# Validate plane_index
if plane_index < 0 or plane_index >= self._num_planes:
raise ValueError(f"plane_index ({plane_index}) must be between 0 and {self._num_planes - 1}")
# Filter the frames_to_ifd_table to only include entries for the specified depth plane
depth_mask = self._frames_to_ifd_table["depth_index"] == plane_index
self._frames_to_ifd_table = self._frames_to_ifd_table[depth_mask]
# Override the is_volumetric flag and num_planes
self.is_volumetric = False
self._num_planes = 1
@staticmethod
def _create_frame_to_ifd_table(
num_channels: int,
num_planes: int,
num_acquisition_cycles: int,
ifds_per_file: list[int],
num_frames_per_slice: int = 1,
num_flyback_frames_per_channel: int = 0,
) -> np.ndarray:
"""
Create a table that describes the data layout of the dataset.
Every row in the table corresponds to a frame in the dataset and contains:
- file_index: The index of the file in the series
- IFD_index: The index of the IFD in the file
- channel_index: The index of the channel
- depth_index: The index of the depth
- acquisition_cycle_index: The index of the time
The table is represented as a structured numpy array that maps each combination of time,
channel, and depth to its corresponding physical location in the TIFF files.
Parameters
----------
num_channels : int
Number of channels.
num_planes: int
The number of planes which corresponds to the depth index or the number of frames per volume
per channel.
num_acquisition_cycles : int
Number of acquisition cycles. For ScanImage, this is the number of samples.
ifds_per_file : list[int]
Number of IFDs in each file.
num_frames_per_slice : int
Number of frames per slice. This is used to determine the slice_sample index.
num_flyback_frames_per_channel : int
Number of flyback frames.
Returns
-------
np.ndarray
A structured array mapping all combinations of time, channel, and depth to file
and IFD indices.
"""
# Create structured dtype for the table
mapping_dtype = np.dtype(
[
("file_index", np.uint16),
("IFD_index", np.uint16),
("channel_index", np.uint8),
("depth_index", np.uint8),
("slice_sample_index", np.uint8),
("acquisition_cycle_index", np.uint16),
]
)
# Calculate total number of entries
image_frames_per_cycle = num_planes * num_frames_per_slice * num_channels
flyback_frames = num_flyback_frames_per_channel * num_channels
total_frames_per_cycle = image_frames_per_cycle + flyback_frames
# Generate global ifd indices for complete cycles only
# This ensures we only include frames from complete acquisition cycles
num_frames_in_complete_cycles = num_acquisition_cycles * total_frames_per_cycle
global_ifd_indices = np.arange(num_frames_in_complete_cycles, dtype=np.uint32)
# We need to filter out the flyback frames, we create an index within each acquisition cycle
# And then filter out the non-image frames (flyback frames)
index_in_acquisition_cycle = global_ifd_indices % total_frames_per_cycle
is_imaging_frame = index_in_acquisition_cycle < image_frames_per_cycle
global_ifd_indices = global_ifd_indices[is_imaging_frame]
index_in_acquisition_cycle = index_in_acquisition_cycle[is_imaging_frame]
# To find their file index we need file boundaries
file_boundaries = np.zeros(len(ifds_per_file) + 1, dtype=np.uint32)
file_boundaries[1:] = np.cumsum(ifds_per_file)
# Find which file each global index belongs to
file_indices = np.searchsorted(file_boundaries, global_ifd_indices, side="right") - 1
# Now, we offset the global IFD indices by the starting position of the file
# to get local IFD indices that start at 0 for each file
ifd_indices = global_ifd_indices - file_boundaries[file_indices]
# Calculate indices for each dimension based on the frame position within the cycle
# For ScanImage, the order is always CZT which means that the channel index comes first,
# followed by the frames per slice, then depth and finally the acquisition cycle
channel_indices = index_in_acquisition_cycle % num_channels
slice_sample_indices = (index_in_acquisition_cycle // num_channels) % num_frames_per_slice
depth_indices = (index_in_acquisition_cycle // (num_channels * num_frames_per_slice)) % num_planes
acquisition_cycle_indices = global_ifd_indices // total_frames_per_cycle
# Create the structured array with the correct size (number of imaging frames after filtering)
mapping = np.zeros(len(global_ifd_indices), dtype=mapping_dtype)
mapping["file_index"] = file_indices
mapping["IFD_index"] = ifd_indices
mapping["channel_index"] = channel_indices
mapping["slice_sample_index"] = slice_sample_indices
mapping["depth_index"] = depth_indices
mapping["acquisition_cycle_index"] = acquisition_cycle_indices
return mapping
def _find_data_files(self) -> list[PathType]:
"""Find additional files in the series based on the file naming pattern.
This method determines which files to include in the dataset using one of these approaches:
1. If file_paths is provided: Uses the provided list of file paths directly
2. If file_pattern is provided: Uses the provided pattern to glob for files
3. Otherwise, analyzes the file name and ScanImage metadata to determine if the current file
is part of a multi-file dataset. It uses different strategies based on the acquisition mode:
- For 'grab' mode with finite frames per file: Uses base_name_acquisition_* pattern
- For 'loop' mode: Uses base_name_* pattern
- For 'slow' stack mode with volumetric data: Uses base_name_* pattern
- Otherwise: Returns only the current file
This method also checks for missing files in the sequence and warns the user if any are detected.
It also identifies and removes files with non-integer indices, warning the user that they can be
included explicitly using the file_paths parameter.
This information about ScanImage file naming was shared in a private conversation with
Lawrence Niu, who is a developer of ScanImage.
Returns
-------
list[PathType]
list of paths to all files in the series, sorted naturally (e.g., file_1, file_2, file_10)
"""
# Parse the file name to extract base name, acquisition number, and file index
file_stem = self.file_path.stem
# Can be grab, focus or loop, see
# https://docs.scanimage.org/Basic+Features/Acquisitions.html
acquisition_state = self._metadata["SI.acqState"]
frames_per_file = self._metadata["SI.hScan2D.logFramesPerFile"]
stack_mode = self._metadata["SI.hStackManager.stackMode"]
extension = self.file_path.suffix
# This is the happy path that is well specified in the documentation
if acquisition_state == "grab" and frames_per_file != float("inf"):
name_parts = file_stem.split("_")
base_name, acquisition, file_index = "_".join(name_parts[:-2]), name_parts[-2], name_parts[-1]
pattern_prefix = f"{base_name}_{acquisition}_"
# Looped acquisitions also divides the files according to Lawrence Niu in private conversation
elif acquisition_state == "loop": # This also separates the files
base_name = "_".join(file_stem.split("_")[:-1]) # Everything before the last _
pattern_prefix = f"{base_name}_"
# This also divided the files according to Lawrence Niu in private conversation
elif stack_mode == "slow" and self.is_volumetric:
base_name = "_".join(file_stem.split("_")[:-1]) # Everything before the last _
pattern_prefix = f"{base_name}_"
else:
file_paths_found = [self.file_path]
return file_paths_found
from natsort import natsorted
glob_pattern = f"{pattern_prefix}*{extension}"
file_paths_found = natsorted(self.file_path.parent.glob(glob_pattern))
# Early return if only one file is found
if len(file_paths_found) == 1:
return file_paths_found
file_paths_found_filtered = self._check_for_missing_and_excess_files(
file_paths_found,
pattern_prefix,
)
return file_paths_found_filtered
def _check_for_missing_and_excess_files(
self,
file_paths_found: list[PathType],
pattern_prefix: str,
) -> list[PathType]:
"""Check for missing and/or excess files in the sequences of files that was found."""
# Extract the varying part from each filename using the pattern_prefix
suffix = self.file_path.suffix
excess_files = []
valid_indices = []
valid_file_paths = []
# First we exclude excess files that are not part of the sequence
for file_path in file_paths_found:
file_name = file_path.name
# Extract the part between the pattern_prefix and suffix
varying_part = file_name[len(pattern_prefix) : -len(suffix)]
if varying_part.isdigit():
file_index = int(varying_part)
valid_indices.append(file_index)
valid_file_paths.append(file_path)
else:
excess_files.append(file_name)
continue
# Warn about files that don't belong in the sequence
if excess_files:
warnings.warn(
f"Non-sequence files detected: {', '.join(excess_files)}. "
f"These files will be excluded from the dataset. "
f"If you need to include these files, use the file_paths parameter.",
UserWarning,
)
# Check for gaps in the sequence
if len(valid_indices) > 1:
valid_indices.sort()
min_index = min(valid_indices)
max_index = max(valid_indices)
expected_indices = set(range(min_index, max_index + 1))
missing_indices = expected_indices - set(valid_indices)
if missing_indices:
# Determine the format of the index (e.g., 00001, 01, etc.)
# by looking at the first file's index format
first_file = file_paths_found[0]
varying_part = first_file.name[len(pattern_prefix) : -len(suffix)]
# Format the missing file names
missing_files = []
for index in missing_indices:
# Format the index with the same number of digits
formatted_index = f"{index:0{len(varying_part)}d}"
missing_file = f"{pattern_prefix}{formatted_index}{suffix}"
missing_files.append(missing_file)
warnings.warn(
f"Missing files detected in the sequence: {', '.join(missing_files)}. "
f"This may affect data integrity and analysis results.",
UserWarning,
)
return valid_file_paths
[docs]
def get_series(self, start_sample: int | None = None, end_sample: int | None = None) -> np.ndarray:
"""
Get data as a time series from start_sample to end_sample.
This method retrieves frames at the specified range from the ScanImage TIFF file(s).
It uses the mapping created during initialization to efficiently locate and load only
the requested frames, without loading the entire dataset into memory.
For volumetric data (multiple planes), the returned array will have an additional dimension
for the planes. For planar data (single plane), the plane dimension is squeezed out.
Parameters
----------
start_sample : int
end_sample : int
Returns
-------
numpy.ndarray
Array of data with shape (num_samples, height, width) if num_planes is 1,
or (num_samples, height, width, num_planes) if num_planes > 1.
For example, for a non-volumetric dataset with 512x512 frames, requesting 3 samples
would return an array with shape (3, 512, 512).
For a volumetric dataset with 5 planes and 512x512 frames, requesting 3 samples
would return an array with shape (3, 512, 512, 5).
"""
start_sample = int(start_sample) if start_sample is not None else 0
end_sample = int(end_sample) if end_sample is not None else self.get_num_samples()
samples_in_series = end_sample - start_sample
# Preallocate output array as volumetric and squeeze if not volumetric before returning
num_rows, num_columns, num_planes = self.get_volume_shape()
dtype = self.get_dtype()
samples = np.empty((samples_in_series, num_rows, num_columns, num_planes), dtype=dtype)
for return_index, sample_index in enumerate(range(start_sample, end_sample)):
for depth_position in range(num_planes):
# Calculate the index in the mapping table array
frame_index = sample_index * num_planes + depth_position
table_row = self._frames_to_ifd_table[frame_index]
file_index = table_row["file_index"]
ifd_index = table_row["IFD_index"]
tiff_reader = self._tiff_readers[file_index]
image_file_directory = tiff_reader.pages[ifd_index]
samples[return_index, :, :, depth_position] = image_file_directory.asarray()
# Squeeze the depth dimension if not volumetric
if not self.is_volumetric:
samples = samples.squeeze(axis=3)
return samples
[docs]
def get_image_shape(self) -> tuple[int, int]:
"""Get the shape of the video frame (num_rows, num_columns).
Returns
-------
tuple
Shape of the video frame (num_rows, num_columns).
"""
return (self._num_rows, self._num_columns)
[docs]
def get_frame_shape(self) -> tuple[int, int]:
"""Get the shape of a single frame (num_rows, num_columns).
Returns
-------
tuple
Shape of a single frame (num_rows, num_columns).
"""
return (self._num_rows, self._num_columns)
[docs]
def get_sample_shape(self):
"""
Get the shape of a sample.
Returns
-------
tuple of int
Shape of a single sample. If the data is volumetric, the shape is hape of a single sample (num_rows, num_columns).
(num_rows, num_columns, num_planes). Otherwise, the shape is
(num_rows, num_columns).
"""
if self.is_volumetric:
return (self._num_rows, self._num_columns, self._num_planes)
else:
return (self._num_rows, self._num_columns)
[docs]
def get_volume_shape(self) -> tuple[int, int, int]:
"""Get the shape of a single volume (num_rows, num_columns, num_planes).
Returns
-------
tuple
Shape of a single volume (num_rows, num_columns, num_planes).
"""
return (self._num_rows, self._num_columns, self._num_planes)
[docs]
def get_num_samples(self) -> int:
"""Get the number of samples in the video.
Returns
-------
int
Number of samples in the video.
"""
return self._num_samples
[docs]
def get_sampling_frequency(self) -> float:
"""Get the sampling frequency in Hz.
Returns
-------
float
Sampling frequency in Hz.
"""
return self._sampling_frequency
[docs]
def get_channel_names(self):
warnings.warn(
"get_channel_names is deprecated and will be removed in May 2026 or after.",
category=FutureWarning,
stacklevel=2,
)
return self.channel_names
[docs]
def get_num_planes(self) -> int:
"""Get the number of depth planes.
For volumetric data, this returns the number of Z-planes in each volume.
For planar data, this returns 1.
Returns
-------
int
Number of depth planes.
"""
return self._num_planes
[docs]
@staticmethod
def get_available_channel_names(file_path: PathType) -> list:
"""Get the channel names available in a ScanImage TIFF file.
This static method extracts the channel names from a ScanImage TIFF file
without needing to create an extractor instance. This is useful for
determining which channels are available before creating an extractor.
Parameters
----------
file_path : PathType
Path to the ScanImage TIFF file.
Returns
-------
list
list of channel names available in the file.
Examples
--------
>>> channel_names = ScanImageImagingExtractor.get_channel_names('path/to/file.tif')
>>> print(f"Available channels: {channel_names}")
"""
from tifffile import read_scanimage_metadata
with open(file_path, "rb") as fh:
all_metadata = read_scanimage_metadata(fh)
non_varying_frame_metadata = all_metadata[0]
# `channelSave` indicates whether the channel is saved
# We check `channelSave` first but keep the `channelsActive` check for backward compatibility
channel_availability_keys = ["SI.hChannels.channelSave", "SI.hChannels.channelsActive"]
for channel_availability in channel_availability_keys:
if channel_availability in non_varying_frame_metadata.keys():
break
available_channels = non_varying_frame_metadata[channel_availability]
available_channels = [available_channels] if not isinstance(available_channels, list) else available_channels
channel_indices = np.array(available_channels) - 1 # Account for MATLAB indexing
channel_names = non_varying_frame_metadata["SI.hChannels.channelName"]
channel_names_available = [channel_names[i] for i in channel_indices]
return channel_names_available
[docs]
def get_dtype(self) -> DtypeType:
"""Get the data type of the video.
Returns
-------
dtype
Data type of the video.
"""
return self._dtype
[docs]
def get_times(self) -> np.ndarray:
"""Get the timestamps for each frame.
Returns
-------
numpy.ndarray
Array of timestamps in seconds for each frame.
Notes
-----
This method extracts timestamps from the ScanImage TIFF file(s) for the selected channel.
It uses the mapping created during initialization to efficiently locate and extract
timestamps for each sample.
"""
if self._times is not None:
return self._times
# Initialize array to store timestamps
num_samples = self.get_num_samples()
num_planes = self.get_num_planes()
timestamps = np.zeros(num_samples, dtype=np.float64)
# For each sample, extract its timestamp from the corresponding file and IFD
for sample_index in range(num_samples):
# Get the last frame in this sample to get the timestamps
frame_index = sample_index * num_planes + (num_planes - 1)
table_row = self._frames_to_ifd_table[frame_index]
file_index = table_row["file_index"]
ifd_index = table_row["IFD_index"]
tiff_reader = self._tiff_readers[file_index]
image_file_directory = tiff_reader.pages[ifd_index]
# Extract timestamp using the static method
timestamp = self.extract_timestamp_from_page(image_file_directory)
if timestamp is not None:
timestamps[sample_index] = timestamp
else:
# If no timestamp found, throw a warning and use sample index / sampling frequency as fallback
warnings.warn(
f"No frameTimestamps_sec found for sample {sample_index}. Using calculated timestamp instead.",
UserWarning,
)
timestamps[sample_index] = sample_index / self._sampling_frequency
# Cache the timestamps
self._times = timestamps
return timestamps
[docs]
def get_native_timestamps(
self, start_sample: int | None = None, end_sample: int | None = None
) -> np.ndarray | None:
"""
Retrieve the original unaltered timestamps for the data in this interface.
Parameters
----------
start_sample : int, optional
The starting sample index. If None, starts from the beginning.
end_sample : int, optional
The ending sample index. If None, goes to the end.
Returns
-------
timestamps: numpy.ndarray or None
The timestamps for the data stream, or None if native timestamps are not available.
"""
timestamps = self.get_times()
if start_sample is None:
start_sample = 0
if end_sample is None:
end_sample = len(timestamps)
return timestamps[start_sample:end_sample]
[docs]
@staticmethod
def extract_timestamp_from_page(page) -> float:
"""
Extract timestamp from a ScanImage TIFF page.
Parameters
----------
page : tifffile.TiffPage
The TIFF page to extract the timestamp from.
Returns
-------
float
The timestamp in seconds or None if no timestamp is found.
"""
if "ImageDescription" not in page.tags:
return None
description = page.tags["ImageDescription"].value
description_lines = description.split("\n")
# Find the frameTimestamps_sec line
timestamp_line = next((line for line in description_lines if "frameTimestamps_sec" in line), None)
if timestamp_line is not None:
# Extract the value part after " = "
_, value_str = timestamp_line.split(" = ", 1)
try:
timestamp = float(value_str.strip())
return timestamp
except ValueError:
return None
return None
[docs]
@staticmethod
def get_available_num_planes(file_path: PathType) -> int:
"""
Get the number of depth planes from a ScanImage TIFF file.
For volumetric data, this returns the number of Z-planes in each volume.
For planar data, this returns 1.
Parameters
----------
file_path : PathType
Path to the ScanImage TIFF file.
Returns
-------
int
Number of depth planes.
"""
from tifffile import read_scanimage_metadata
with open(file_path, "rb") as fh:
all_metadata = read_scanimage_metadata(fh)
non_varying_frame_metadata = all_metadata[0]
num_planes = non_varying_frame_metadata.get("SI.hStackManager.numSlices", 1)
return num_planes
[docs]
@staticmethod
def get_frames_per_slice(file_path: PathType) -> int:
"""
Get the number of frames per slice from a ScanImage TIFF file.
ScanImage can sample multiple frames per each slice.
Parameters
----------
file_path : PathType
Path to the ScanImage TIFF file.
Returns
-------
int
Number of frames per slice.
"""
from tifffile import read_scanimage_metadata
with open(file_path, "rb") as fh:
all_metadata = read_scanimage_metadata(fh)
non_varying_frame_metadata = all_metadata[0]
frames_per_slice = non_varying_frame_metadata.get("SI.hStackManager.framesPerSlice", 1)
return frames_per_slice
[docs]
def get_original_frame_indices(self, plane_index: int | None = None) -> np.ndarray:
"""
Get the original frame indices for each sample.
Returns the index of the original frame for each sample, mapping processed samples
back to their corresponding frames in the raw microscopy data. This accounts for
any filtering, subsampling, or exclusions (such as flyback frames) performed by
the extractor.
Parameters
----------
plane_index : int, optional
Which plane to use for frame index calculation in volumetric data.
If None, plane_index is set to the last plane in the volume. This is because the timestamp of the acquisition of the last plane in a volume is typically set as the timestamp of the volume as a whole. It must be less than the total number of planes.
Returns
-------
np.ndarray
Array of original frame indices (dtype: int64) with length equal to the
number of samples. Each element represents the index of the original
microscopy frame that corresponds to that sample.
Notes
-----
**Frame Index Calculation:**
- **Planar data**: Frame indices are sequential (0, 1, 2, ...)
- **Multi-channel data**: Accounts for channel interleaving
- **Volumetric data**: Uses the specified plane (default: last plane)
- **Multi-file data**: Includes file offsets for global indexing
- **Flyback frames**: Automatically excluded from indexing
**Common Use Cases:**
- Synchronizing with external timing systems
- Mapping back to original acquisition timestamps
- Data provenance and traceability
- Cross-referencing with raw data files
**Examples:**
For a 3-sample volumetric dataset with 5 planes per volume:
- Default behavior returns indices [4, 9, 14] (last plane of each volumetric sample)
- With plane_index=0 returns indices [0, 5, 10] (first plane of each volumetric sample)
"""
num_planes = self.get_num_planes()
if plane_index is not None:
assert plane_index < num_planes, f"Plane index {plane_index} exceeds number of planes {num_planes}."
else:
plane_index = num_planes - 1
# Initialize array to store timestamps
num_samples = self.get_num_samples()
frame_indices = np.zeros(num_samples, dtype=np.int64)
# For each sample, extract its timestamp from the corresponding file and IFD
for sample_index in range(num_samples):
# Get the last frame in this sample to get the timestamps
frame_index = sample_index * num_planes + plane_index
table_row = self._frames_to_ifd_table[frame_index]
file_index = int(table_row["file_index"])
ifd_index = int(table_row["IFD_index"])
# The ifds are local within a file, so we need to add and offset
# equal to the number of IFDs in the previous files
file_offset = sum(self._ifds_per_file[:file_index]) if file_index > 0 else 0
frame_indices[sample_index] = ifd_index + file_offset
return frame_indices
[docs]
def __del__(self):
"""Close file handles when the extractor is garbage collected."""
if hasattr(self, "_tiff_readers"):
for handle in self._tiff_readers:
try:
handle.close()
except Exception as e:
warnings.warn(f"Error closing TIFF file handle {handle} with error: {e}", UserWarning)
pass
[docs]
class ScanImageLegacyImagingExtractor(ImagingExtractor):
"""Specialized extractor for reading TIFF files produced via ScanImage.
This implementation is for legacy purposes and is not recommended for use.
Please use ScanImageTiffSinglePlaneImagingExtractor or ScanImageTiffMultiPlaneImagingExtractor instead.
"""
extractor_name = "ScanImageLegacyImagingExtractor"
def __init__(
self,
file_path: PathType,
sampling_frequency: FloatType,
):
"""Create a ScanImageLegacyImagingExtractor instance from a TIFF file produced by ScanImage.
This extractor allows for lazy accessing of slices, unlike
:py:class:`~roiextractors.extractors.tiffimagingextractors.TiffImagingExtractor`.
However, direct slicing of the underlying data structure is not equivalent to a numpy memory map.
Parameters
----------
file_path : PathType
Path to the TIFF file.
sampling_frequency : float
The frequency at which the frames were sampled, in Hz.
"""
ScanImageTiffReader = _get_scanimage_reader()
super().__init__()
self.file_path = Path(file_path)
self._sampling_frequency = sampling_frequency
valid_suffixes = [".tiff", ".tif", ".TIFF", ".TIF"]
if self.file_path.suffix not in valid_suffixes:
suffix_string = ", ".join(valid_suffixes[:-1]) + f", or {valid_suffixes[-1]}"
warning_message = (
f"Suffix ({self.file_path.suffix}) is not of type {suffix_string}! "
f"The {self.extractor_name} may not be appropriate for the file."
)
warn(warning_message, UserWarning, stacklevel=2)
with ScanImageTiffReader(str(self.file_path)) as io:
shape = io.shape() # [frames, rows, columns]
if len(shape) == 3:
self._num_samples, self._num_rows, self._num_columns = shape
self._num_channels = 1
else: # no example file for multiple color channels or depths
raise NotImplementedError(
"Extractor cannot handle 4D TIFF data. Please raise an issue to request this feature: "
"https://github.com/catalystneuro/roiextractors/issues "
)
# Data accessed through an open ScanImageTiffReader io gets scrambled if there are multiple calls.
# Thus, open fresh io in context each time something is needed.
def _get_single_frame(self, idx: int) -> np.ndarray:
"""Get a single frame of data from the TIFF file.
Parameters
----------
idx : int
The index of the frame to retrieve.
Returns
-------
frame: numpy.ndarray
The frame of data.
"""
ScanImageTiffReader = _get_scanimage_reader()
with ScanImageTiffReader(str(self.file_path)) as io:
return io.data(beg=idx, end=idx + 1)
[docs]
def get_series(self, start_sample=None, end_sample=None) -> np.ndarray:
ScanImageTiffReader = _get_scanimage_reader()
with ScanImageTiffReader(filename=str(self.file_path)) as io:
return io.data(beg=start_sample, end=end_sample)
[docs]
def get_image_shape(self) -> tuple[int, int]:
"""Get the shape of the video frame (num_rows, num_columns).
Returns
-------
image_shape: tuple
Shape of the video frame (num_rows, num_columns).
"""
return (self._num_rows, self._num_columns)
[docs]
def get_channel_names(self) -> list:
warnings.warn(
"get_channel_names is deprecated and will be removed in May 2026 or after.",
category=FutureWarning,
stacklevel=2,
)
pass
[docs]
def get_native_timestamps(
self, start_sample: int | None = None, end_sample: int | None = None
) -> np.ndarray | None:
# Legacy ScanImage files do not have native timestamps
return None