Source code for roiextractors.extractors.tiffimagingextractors.brukertiffimagingextractor

"""ImagingExtractors for the TIFF image format produced by Bruker.

Classes
-------
BrukerTiffSinglePlaneImagingExtractor
    A ImagingExtractor for TIFF files produced by Bruker with only 1 plane.
BrukerTiffMultiPlaneImagingExtractor
    A MultiImagingExtractor for TIFF files produced by Bruker with multiple planes.
"""

import logging
import re
import warnings
from collections import Counter
from itertools import islice
from pathlib import Path
from types import ModuleType
from xml.etree import ElementTree

import numpy as np
from lxml import etree

from ...extraction_tools import (
    DtypeType,
    PathType,
    calculate_regular_series_rate,
    get_package,
)
from ...imagingextractor import ImagingExtractor
from ...multiimagingextractor import MultiImagingExtractor


[docs] def filter_read_uic_tag_warnings(record): """Filter out the warnings from tifffile.read_uic_tag() that are not relevant to the user.""" return not record.msg.startswith("<tifffile.read_uic_tag>")
logging.getLogger("tifffile.tifffile").addFilter(filter_read_uic_tag_warnings) def _get_tiff_reader() -> ModuleType: """Return the tifffile module.""" return get_package(package_name="tifffile", installation_instructions="pip install tifffile") def _determine_frame_rate(element: etree.Element, file_names: list[str] | None = None) -> float | None: """Determine the frame rate from the difference in relative timestamps of the frame elements.""" # Use a single XPath expression if file_names are provided if file_names: file_names_set = set(file_names) frame_elements = element.xpath(".//Frame[File/@filename]") filtered_frame_elements = [] for frame in frame_elements: for file in frame.xpath("File"): if file.attrib.get("filename") in file_names_set: filtered_frame_elements.append(frame) break frame_elements = filtered_frame_elements else: frame_elements = element.xpath(".//Frame") # Extract relativeTime attributes and convert to float try: relative_times = [float(frame.attrib["relativeTime"]) for frame in frame_elements] except KeyError: raise ValueError("One or more Frame elements are missing the 'relativeTime' attribute.") except ValueError: raise ValueError("One or more 'relativeTime' attributes cannot be converted to float.") # Calculate frame rate frame_rate = calculate_regular_series_rate(np.array(relative_times)) if relative_times else None return frame_rate def _determine_imaging_is_volumetric(folder_path: PathType) -> bool: """Determine whether imaging is volumetric. Parameters ---------- folder_path : PathType The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). Returns ------- is_volumetric: bool True if the imaging is volumetric (multiplane), False otherwise (single plane). """ folder_path = Path(folder_path) xml_file_path = folder_path / f"{folder_path.name}.xml" assert xml_file_path.is_file(), f"The XML configuration file is not found at '{xml_file_path}'." is_series_type_volumetric = { "TSeries ZSeries Element": True, # XYZT "TSeries Timed Element": False, # XYT "ZSeries": True, # ZT (not a time series) "Single": False, # Single image (not a time series) "BrightnessOverTime": False, # XYT (not a volumetric series) "TSeries Brightness Over Time Element": False, # XYT } is_volumetric = False for event, elem in etree.iterparse(xml_file_path, events=("start",)): if elem.tag == "Sequence": series_type = elem.attrib.get("type") if series_type in is_series_type_volumetric: is_volumetric = is_series_type_volumetric[series_type] break else: raise ValueError( f"Unknown series type: {series_type}, please raise an issue in the roiextractor repository" ) return is_volumetric def _parse_xml(folder_path: PathType) -> etree.Element: """Parse the XML configuration file into element tree and returns the root Element.""" folder_path = Path(folder_path) xml_file_path = folder_path / f"{folder_path.name}.xml" assert xml_file_path.is_file(), f"The XML configuration file is not found at '{folder_path}'." tree = etree.parse(str(xml_file_path)) return tree.getroot()
[docs] class BrukerTiffMultiPlaneImagingExtractor(MultiImagingExtractor): """A MultiImagingExtractor for TIFF files produced by Bruke with multiple planes. This format consists of multiple TIF image files (.ome.tif) and configuration files (.xml, .env). """ extractor_name = "BrukerTiffMultiPlaneImaging" mode = "folder"
[docs] @classmethod def get_streams(cls, folder_path: PathType) -> dict: """Get the available streams from the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). Parameters ---------- folder_path : PathType The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). Returns ------- streams: dict The dictionary of available streams. """ natsort = get_package(package_name="natsort", installation_instructions="pip install natsort") folder_path = Path(folder_path) xml_file_path = folder_path / f"{folder_path.name}.xml" assert xml_file_path.is_file(), f"The XML configuration file is not found at '{folder_path}'." channel_names = set() channel_ids = set() file_names = [] # Parse the XML file iteratively to find the first Sequence element first_sequence_element = None for _, elem in ElementTree.iterparse(xml_file_path, events=("end",)): if elem.tag == "Sequence": first_sequence_element = elem break if first_sequence_element is None: raise ValueError("No Sequence element found in the XML configuration file. Can't get streams") # Then in the first Sequence we find all the Frame elements if first_sequence_element is not None: # Iterate over all Frame elements within the first Sequence frame_elements = first_sequence_element.findall(".//Frame") for frame_elemenet in frame_elements: # Iterate over all File elements within each Frame for file_elem in frame_elemenet.findall("File"): channel_names.add(file_elem.attrib["channelName"]) channel_ids.add(file_elem.attrib["channel"]) file_names.append(file_elem.attrib["filename"]) unique_channel_names = natsort.natsorted(channel_names) unique_channel_ids = natsort.natsorted(channel_ids) streams = dict(channel_streams=unique_channel_names) streams["plane_streams"] = dict() if not _determine_imaging_is_volumetric(folder_path=folder_path): return streams for channel_id, channel_name in zip(unique_channel_ids, unique_channel_names): plane_naming_pattern = rf"(?P<stream_name>Ch{channel_id}_\d+)" regular_expression_matches = [re.search(plane_naming_pattern, filename) for filename in file_names] plane_stream_names = [matches["stream_name"] for matches in regular_expression_matches if matches] unique_plane_stream_names = natsort.natsorted(set(plane_stream_names)) streams["plane_streams"][channel_name] = unique_plane_stream_names return streams
def __init__( self, folder_path: PathType, stream_name: str | None = None, ): """Create a BrukerTiffMultiPlaneImagingExtractor instance from a folder path that contains the image files. Parameters ---------- folder_path : PathType The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). stream_name: str, optional The name of the recording channel (e.g. "Ch2"). Raises ------ ValueError If more than one recording stream is detected. ValueError If the selected stream is not in the available plane_streams. AssertionError If the TIF image files are missing from the folder. AssertionError If the imaging is not volumetric. """ self._tifffile = _get_tiff_reader() folder_path = Path(folder_path) tif_file_paths = list(folder_path.glob("*.ome.tif")) assert tif_file_paths, f"The TIF image files are missing from '{folder_path}'." streams = self.get_streams(folder_path=folder_path) plane_streams = streams["plane_streams"] assert len(plane_streams) > 0, ( f"{self.extractor_name}Extractor is for volumetric imaging. " "For single imaging plane data use BrukerTiffSinglePlaneImagingExtractor." ) if stream_name is None: if len(streams["channel_streams"]) > 1: raise ValueError( "More than one recording stream is detected! Please specify which stream you wish to load with the `stream_name` argument. " "The following channel streams are available: \n" f"{streams['channel_streams']}" ) channel_stream_name = streams["channel_streams"][0] stream_name = streams["plane_streams"][channel_stream_name][0] channel_stream_name = stream_name.split("_")[0] plane_stream_names = streams["plane_streams"][channel_stream_name] if stream_name is not None and stream_name not in plane_stream_names: raise ValueError( f"The selected stream '{stream_name}' is not in the available plane_streams '{plane_stream_names}'!" ) self.folder_path = Path(folder_path) self.stream_name = stream_name self._num_planes_per_channel_stream = len(plane_stream_names) imaging_extractors = [] for stream_name in plane_stream_names: extractor = BrukerTiffSinglePlaneImagingExtractor(folder_path=folder_path, stream_name=stream_name) imaging_extractors.append(extractor) super().__init__(imaging_extractors=imaging_extractors) self._num_samples = self._imaging_extractors[0].get_num_samples() self._image_size = *self._imaging_extractors[0].get_frame_shape(), self._num_planes_per_channel_stream self.xml_metadata = self._imaging_extractors[0].xml_metadata self._start_frames = [0] * self._num_planes_per_channel_stream self._end_frames = [self._num_samples] * self._num_planes_per_channel_stream self.is_volumetric = True
[docs] def get_image_shape(self) -> tuple[int, int]: """Get the shape of the video frame (num_rows, num_columns). Returns ------- image_shape: tuple Shape of the video frame (num_rows, num_columns). """ return self._image_size[0], self._image_size[1]
# TODO: fix this method so that it is consistent with base multiimagingextractor method (i.e. num_rows, num_columns)
[docs] def get_num_samples(self) -> int: return self._imaging_extractors[0].get_num_samples()
[docs] def get_sampling_frequency(self) -> float: return self._imaging_extractors[0].get_sampling_frequency() * self._num_planes_per_channel_stream
[docs] def get_series(self, start_sample: int | None = None, end_sample: int | None = None) -> np.ndarray: start = start_sample if start_sample is not None else 0 stop = end_sample if end_sample is not None else self.get_num_samples() series_shape = (stop - start,) + self.get_sample_shape() series = np.empty(shape=series_shape, dtype=self.get_dtype()) for plane_ind, extractor in enumerate(self._imaging_extractors): series[..., plane_ind] = extractor.get_series(start_sample=start, end_sample=stop) return series
[docs] def get_num_planes(self) -> int: """Get the number of depth planes. Returns ------- num_planes: int The number of depth planes. """ return self._num_planes_per_channel_stream
[docs] def get_volume_shape(self) -> tuple[int, int, int]: """Get the shape of the volumetric video (num_rows, num_columns, num_planes). Returns ------- video_shape: tuple Shape of the volumetric video (num_rows, num_columns, num_planes). """ return (self._image_size[0], self._image_size[1], self.get_num_planes())
[docs] class BrukerTiffSinglePlaneImagingExtractor(MultiImagingExtractor): """A MultiImagingExtractor for TIFF files produced by Bruker with only 1 plane.""" extractor_name = "BrukerTiffSinglePlaneImaging" mode = "folder"
[docs] @classmethod def get_streams(cls, folder_path: PathType) -> dict: """ Get the available streams from the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). Parameters ---------- folder_path : PathType The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). Returns ------- streams: dict The dictionary of available streams. """ channel_names = cls.get_available_channels(folder_path=folder_path) channel_names = cls.get_available_channels(folder_path=folder_path) natsort = get_package(package_name="natsort", installation_instructions="pip install natsort") unique_channel_names = natsort.natsorted(channel_names) unique_channel_names = natsort.natsorted(channel_names) streams = dict(channel_streams=unique_channel_names) return streams
[docs] @staticmethod def get_available_channels(folder_path: PathType) -> set[str]: """ Extract set of available channel names from the XML configuration file in the specified folder. Parameters ---------- folder_path : PathType The path to the folder containing the XML configuration file. It can be either a string or a Path object. Returns ------- Set[str] A set of channel names available in the first 'Frame' element found in the XML configuration file. """ folder_path = Path(folder_path) xml_file_path = folder_path / f"{folder_path.name}.xml" assert xml_file_path.is_file(), f"The XML configuration file is not found at '{folder_path}'." channel_names = set() for event, elem in etree.iterparse(xml_file_path, events=("start",)): if elem.tag == "Frame": # Get all the sub-elements in this Frame element for subelem in elem: if subelem.tag == "File": channel_names.add(subelem.attrib["channelName"]) break # Exit after processing the first "Frame" element return channel_names
def __init__(self, folder_path: PathType, stream_name: str | None = None): """Create a BrukerTiffSinglePlaneImagingExtractor instance from a folder path that contains the image files. Parameters ---------- folder_path : PathType The path to the folder that contains the Bruker TIF image files (.ome.tif) and configuration files (.xml, .env). stream_name: str, optional The name of the recording channel (e.g. "Ch2" or "Green"). """ self._tifffile = _get_tiff_reader() folder_path = Path(folder_path) tif_file_paths = list(folder_path.glob("*.ome.tif")) assert tif_file_paths, f"The TIF image files are missing from '{folder_path}'." streams = self.get_streams(folder_path=folder_path) channel_streams = streams["channel_streams"] channel_streams = streams["channel_streams"] if stream_name is None: if len(channel_streams) > 1: raise ValueError( "More than one recording stream is detected! Please specify which stream you wish to load with the `stream_name` argument. " f"To see what streams are available, call `BrukerTiffSinglePlaneImagingExtractor.get_stream_names(folder_path=...)`." ) stream_name = channel_streams[0] self.stream_name = stream_name self._xml_root = _parse_xml(folder_path=folder_path) file_elements = self._xml_root.findall(".//File") # This is the case when stream_name is a channel name (e.g. "Green" or "Ch2") if stream_name in channel_streams: file_names_for_stream = [ f.attrib["filename"] for f in file_elements if f.attrib["channelName"] == stream_name ] else: # This is the case for when stream_name is a plane_stream file_names = [file.attrib["filename"] for file in file_elements] file_names_for_stream = [file for file in file_names if self.stream_name in file] if file_names_for_stream == []: raise ValueError( f"The selected stream '{self.stream_name}' is not in the available channel_streams '{streams['channel_streams']}'!" ) # determine image shape and data type from first file with self._tifffile.TiffFile(folder_path / file_names_for_stream[0], _multifile=False) as tif: self._height, self._width = tif.pages[0].shape self._dtype = tif.pages[0].dtype sequence_elements = self._xml_root.findall("Sequence") # determine the true sampling frequency # the "framePeriod" in the XML is not trusted (usually higher than the true frame rate) frame_rate = _determine_frame_rate(element=self._xml_root, file_names=file_names_for_stream) if frame_rate is None and len(sequence_elements) > 1: frame_rate = _determine_frame_rate(element=sequence_elements[0], file_names=file_names_for_stream) assert frame_rate is not None, "Could not determine the frame rate from the XML file." self._sampling_frequency = frame_rate self._channel_names = [self.stream_name.split("_")[0]] # count the number of occurrences of each file path and their names # files that contain stacks of images (multi-page tiffs) will appear repeated (number of repetition is the number of frames in the tif file) file_counts = Counter(file_names_for_stream) imaging_extractors = [] for file_name, num_samples in file_counts.items(): extractor = _BrukerTiffSinglePlaneImagingExtractor(file_path=str(Path(folder_path) / file_name)) extractor._num_samples = num_samples extractor._image_size = (self._height, self._width) extractor._dtype = self._dtype imaging_extractors.append(extractor) self.xml_metadata = self._get_xml_metadata() super().__init__(imaging_extractors=imaging_extractors) def _get_xml_metadata(self) -> dict[str, str | list[dict[str, str]]]: """Parse the metadata in the root element that are under "PVStateValue" tag into a dictionary. Returns ------- xml_metadata: dict The dictionary of metadata extracted from the XML file. """ xml_metadata = dict() xml_metadata.update(self._xml_root.attrib) # Use a single XPath to get all PVStateValue elements pv_state_values = self._xml_root.xpath(".//PVStateValue") for child in pv_state_values: metadata_root_key = child.attrib["key"] if "value" in child.attrib: if metadata_root_key not in xml_metadata: xml_metadata[metadata_root_key] = child.attrib["value"] else: xml_metadata[metadata_root_key] = [] for indexed_value in child: if "description" in indexed_value.attrib: xml_metadata[metadata_root_key].append( {indexed_value.attrib["description"]: indexed_value.attrib["value"]} ) elif "value" in indexed_value.attrib: xml_metadata[metadata_root_key].append( {indexed_value.attrib["index"]: indexed_value.attrib["value"]} ) else: for subindexed_value in indexed_value: if "description" in subindexed_value.attrib: xml_metadata[metadata_root_key].append( {subindexed_value.attrib["description"]: subindexed_value.attrib["value"]} ) else: xml_metadata[metadata_root_key].append( {indexed_value.attrib["index"]: subindexed_value.attrib["value"]} ) return xml_metadata def _check_consistency_between_imaging_extractors(self): """Override the parent class method as none of the properties that are checked are from the sub-imaging extractors.""" return True
[docs] def get_image_shape(self) -> tuple[int, int]: """Get the shape of the video frame (num_rows, num_columns). Returns ------- image_shape: tuple Shape of the video frame (num_rows, num_columns). """ return self._height, self._width
[docs] def get_sampling_frequency(self) -> float: return self._sampling_frequency
[docs] def get_channel_names(self) -> list[str]: warnings.warn( "get_channel_names is deprecated and will be removed in May 2026 or after.", category=FutureWarning, stacklevel=2, ) return self._channel_names
[docs] def get_dtype(self) -> DtypeType: return self._dtype
class _BrukerTiffSinglePlaneImagingExtractor(ImagingExtractor): """A private ImagingExtractor for TIFF files produced by Bruker with only 1 plane. The private imaging extractor for OME-TIF image format produced by Bruker, which defines the get_video() method to return the requested frames from a given file. This extractor is not meant to be used as a standalone ImagingExtractor. """ extractor_name = "_BrukerTiffSinglePlaneImaging" mode = "file" SAMPLING_FREQ_ERROR = "The {}Extractor does not support retrieving the imaging rate." CHANNEL_NAMES_ERROR = "The {}Extractor does not support retrieving the name of the channels." DATA_TYPE_ERROR = "The {}Extractor does not support retrieving the data type." def __init__(self, file_path: PathType): """Create a _BrukerTiffSinglePlaneImagingExtractor instance from a TIFF image file (.ome.tif). Parameters ---------- file_path : PathType The path to the TIF image file (.ome.tif) """ self.tifffile = _get_tiff_reader() self.file_path = file_path super().__init__() self._num_samples = None self._image_size = None self._dtype = None def get_num_samples(self) -> int: return self._num_samples def get_image_shape(self) -> tuple[int, int]: """Get the shape of the video frame (num_rows, num_columns). Returns ------- image_shape: tuple Shape of the video frame (num_rows, num_columns). """ return self._image_size def get_sampling_frequency(self): raise NotImplementedError(self.SAMPLING_FREQ_ERROR.format(self.extractor_name)) def get_channel_names(self) -> list: warnings.warn( "get_channel_names is deprecated and will be removed in May 2026 or after.", category=FutureWarning, stacklevel=2, ) raise NotImplementedError(self.CHANNEL_NAMES_ERROR.format(self.extractor_name)) def get_dtype(self): raise NotImplementedError(self.DATA_TYPE_ERROR.format(self.extractor_name)) def get_series(self, start_sample: int | None = None, end_sample: int | None = None) -> np.ndarray: with self.tifffile.TiffFile(self.file_path, _multifile=False) as tif: pages = tif.pages if start_sample is not None and end_sample is not None and start_sample == end_sample: return pages[start_sample].asarray() end_sample = end_sample or self.get_num_samples() start_sample = start_sample or 0 image_shape = (end_sample - start_sample, *self.get_image_shape()) series = np.zeros(shape=image_shape, dtype=self._dtype) for page_ind, page in enumerate(islice(pages, start_sample, end_sample)): series[page_ind] = page.asarray() return series def get_native_timestamps( self, start_sample: int | None = None, end_sample: int | None = None ) -> np.ndarray | None: # Bruker TIFF data does not have native timestamps in the TIFF files themselves # The timestamps are in the XML configuration files which are handled by the parent extractors return None