##############################################################################
# Copyright Kitware Inc.
#
# Licensed under the Apache License, Version 2.0 ( the "License" );
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##############################################################################
import copy
import math
import os
from collections import OrderedDict
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as _importlib_version
import numpy as np
import PIL.Image
from large_image_source_tiff import TiffFileTileSource
from large_image_source_tiff.exceptions import InvalidOperationTiffError, IOTiffError, TiffError
from large_image.cache_util import LruCacheMetaclass, methodcache
from large_image.constants import TILE_FORMAT_NUMPY, TILE_FORMAT_PIL, SourcePriority
from large_image.exceptions import TileSourceError, TileSourceFileNotFoundError
try:
__version__ = _importlib_version(__name__)
except PackageNotFoundError:
# package is not installed
pass
_omeUnitsToMeters = {
'Ym': 1e24,
'Zm': 1e21,
'Em': 1e18,
'Pm': 1e15,
'Tm': 1e12,
'Gm': 1e9,
'Mm': 1e6,
'km': 1e3,
'hm': 1e2,
'dam': 1e1,
'm': 1,
'dm': 1e-1,
'cm': 1e-2,
'mm': 1e-3,
'\u00b5m': 1e-6,
'nm': 1e-9,
'pm': 1e-12,
'fm': 1e-15,
'am': 1e-18,
'zm': 1e-21,
'ym': 1e-24,
'\u00c5': 1e-10,
}
[docs]
class OMETiffFileTileSource(TiffFileTileSource, metaclass=LruCacheMetaclass):
"""
Provides tile access to TIFF files.
"""
cacheName = 'tilesource'
name = 'ometiff'
extensions = {
None: SourcePriority.LOW,
'tif': SourcePriority.MEDIUM,
'tiff': SourcePriority.MEDIUM,
'ome': SourcePriority.PREFERRED,
}
mimeTypes = {
'image/tiff': SourcePriority.MEDIUM,
'image/x-tiff': SourcePriority.MEDIUM,
}
# The expect number of pixels that would need to be read to read the worst-
# case tile.
_maxUntiledChunk = 512 * 1024 * 1024
def __init__(self, path, **kwargs):
"""
Initialize the tile class. See the base class for other available
parameters.
:param path: a filesystem path for the tile source.
"""
# Note this is the super of the parent class, not of this class.
super(TiffFileTileSource, self).__init__(path, **kwargs)
self._largeImagePath = str(self._getLargeImagePath())
try:
base = self.getTiffDir(0, mustBeTiled=None)
except TiffError:
if not os.path.isfile(self._largeImagePath):
raise TileSourceFileNotFoundError(self._largeImagePath) from None
msg = 'Not a recognized OME Tiff'
raise TileSourceError(msg)
info = getattr(base, '_description_record', None)
if not info or not info.get('OME'):
msg = 'Not an OME Tiff'
raise TileSourceError(msg)
self._omeinfo = info['OME']
self._checkForOMEZLoop()
try:
self._parseOMEInfo()
except KeyError:
msg = 'Not a recognized OME Tiff'
raise TileSourceError(msg)
omeimages = [
entry['Pixels'] for entry in self._omeinfo['Image'] if
len(entry['Pixels']['TiffData']) == len(self._omebase['TiffData'])]
levels = [max(0, int(math.ceil(math.log(max(
float(entry['SizeX']) / base.tileWidth,
float(entry['SizeY']) / base.tileHeight)) / math.log(2))))
for entry in omeimages]
omebylevel = dict(zip(levels, omeimages))
self._omeLevels = [omebylevel.get(key) for key in range(max(omebylevel.keys()) + 1)]
if base._tiffInfo.get('istiled'):
self._tiffDirectories = [
self.getTiffDir(int(entry['TiffData'][0].get('IFD', 0)))
if entry else None
for entry in self._omeLevels]
else:
self._tiffDirectories = [
self.getTiffDir(0, mustBeTiled=None)
if entry else None
for entry in self._omeLevels]
self._checkForInefficientDirectories(warn=False)
_maxChunk = min(base.imageWidth, base.tileWidth * self._skippedLevels ** 2) * \
min(base.imageHeight, base.tileHeight * self._skippedLevels ** 2)
if _maxChunk > self._maxUntiledChunk:
msg = 'Untiled image is too large to access with the OME Tiff source'
raise TileSourceError(msg)
self.tileWidth = base.tileWidth
self.tileHeight = base.tileHeight
self.levels = len(self._tiffDirectories)
self.sizeX = base.imageWidth
self.sizeY = base.imageHeight
# We can get the embedded images, but we don't currently use non-tiled
# images as associated images. This would require enumerating tiff
# directories not mentioned by the ome list.
self._associatedImages = {}
self._checkForInefficientDirectories()
def _checkForOMEZLoop(self):
"""
Check if the OME description lists a Z-loop that isn't referenced by
the frames or TiffData list and is present based on the number of tiff
directories. This can modify self._omeinfo.
"""
info = self._omeinfo
try:
zloopinfo = info['Image']['Description'].split('Z Stack Loop: ')[1]
zloop = int(zloopinfo.split()[0])
stepinfo = zloopinfo.split('Step: ')[1].split()
stepmm = float(stepinfo[0])
stepmm *= {'mm': 1, '\xb5m': 0.001}[stepinfo[1]]
planes = len(info['Image']['Pixels']['Plane'])
for plane in info['Image']['Pixels']['Plane']:
if int(plane.get('TheZ', 0)) != 0:
return
if int(info['Image']['Pixels']['SizeZ']) != 1:
return
except Exception:
return
if zloop <= 1 or not stepmm or not planes:
return
if len(info['Image']['Pixels'].get('TiffData', {})):
return
expecteddir = planes * zloop
try:
lastdir = self.getTiffDir(expecteddir - 1, mustBeTiled=None)
if not lastdir._tiffInfo.get('lastdirectory'):
return
except Exception:
return
tiffdata = []
for z in range(zloop):
for plane in info['Image']['Pixels']['Plane']:
td = plane.copy()
td['TheZ'] = str(z)
# This position is probably wrong -- it seems like the
# listed position is likely to be the center of the stack, not
# the bottom, but we'd have to confirm it.
td['PositionZ'] = str(float(td.get('PositionZ', 0)) + z * stepmm * 1000)
tiffdata.append(td)
info['Image']['Pixels']['TiffData'] = tiffdata
info['Image']['Pixels']['Plane'] = tiffdata
info['Image']['Pixels']['PlanesFromZloop'] = 'true'
info['Image']['Pixels']['SizeZ'] = str(zloop)
def _parseOMEInfo(self): # noqa
if isinstance(self._omeinfo['Image'], dict):
self._omeinfo['Image'] = [self._omeinfo['Image']]
for img in self._omeinfo['Image']:
if isinstance(img['Pixels'].get('TiffData'), dict):
img['Pixels']['TiffData'] = [img['Pixels']['TiffData']]
if isinstance(img['Pixels'].get('Plane'), dict):
img['Pixels']['Plane'] = [img['Pixels']['Plane']]
if isinstance(img['Pixels'].get('Channels'), dict):
img['Pixels']['Channels'] = [img['Pixels']['Channels']]
try:
self._omebase = self._omeinfo['Image'][0]['Pixels']
if isinstance(self._omebase.get('Plane'), dict):
self._omebase['Plane'] = [self._omebase['Plane']]
if ((not len(self._omebase['TiffData']) or
len(self._omebase['TiffData']) == 1) and
(len(self._omebase.get('Plane', [])) or
len(self._omebase.get('Channel', [])))):
if (not len(self._omebase['TiffData']) or
self._omebase['TiffData'][0] == {} or
int(self._omebase['TiffData'][0].get('PlaneCount', 0)) == 1):
planes = copy.deepcopy(self._omebase.get(
'Plane', self._omebase.get('Channel')))
if isinstance(planes, dict):
planes = [planes]
self._omebase['SizeC'] = 1
for idx, plane in enumerate(planes):
plane['IndexC'] = idx
self._omebase['TiffData'] = planes
elif (int(self._omebase['TiffData'][0].get('PlaneCount', 0)) ==
len(self._omebase.get('Plane', self._omebase.get('Channel', [])))):
planes = copy.deepcopy(self._omebase.get('Plane', self._omebase.get('Channel')))
for idx, plane in enumerate(planes):
plane['IFD'] = plane.get(
'IFD', int(self._omebase['TiffData'][0].get('IFD', 0)) + idx)
self._omebase['TiffData'] = planes
if isinstance(self._omebase['TiffData'], dict):
self._omebase['TiffData'] = [self._omebase['TiffData']]
if len({entry.get('UUID', {}).get('FileName', '')
for entry in self._omebase['TiffData']}) > 1:
msg = 'OME Tiff references multiple files'
raise TileSourceError(msg)
if (len(self._omebase['TiffData']) != int(self._omebase['SizeC']) *
int(self._omebase['SizeT']) * int(self._omebase['SizeZ']) or
len(self._omebase['TiffData']) != len(
self._omebase.get('Plane', self._omebase['TiffData']))):
msg = 'OME Tiff contains frames that contain multiple planes'
raise TileSourceError(msg)
except (KeyError, ValueError, IndexError, TypeError):
msg = 'OME Tiff does not contain an expected record'
raise TileSourceError(msg)
[docs]
def getNativeMagnification(self):
"""
Get the magnification for the highest-resolution level.
:return: magnification, width of a pixel in mm, height of a pixel in mm.
"""
result = super().getNativeMagnification()
if result['mm_x'] is None and 'PhysicalSizeX' in self._omebase:
result['mm_x'] = (
float(self._omebase['PhysicalSizeX']) * 1e3 *
_omeUnitsToMeters[self._omebase.get('PhysicalSizeXUnit', '\u00b5m')])
if result['mm_y'] is None and 'PhysicalSizeY' in self._omebase:
result['mm_y'] = (
float(self._omebase['PhysicalSizeY']) * 1e3 *
_omeUnitsToMeters[self._omebase.get('PhysicalSizeYUnit', '\u00b5m')])
if not result.get('magnification') and result.get('mm_x'):
result['magnification'] = 0.01 / result['mm_x']
return result
[docs]
@methodcache()
def getTile(self, x, y, z, pilImageAllowed=False, numpyAllowed=False,
sparseFallback=False, **kwargs):
if ((z < 0 or z >= len(self._omeLevels) or (
self._omeLevels[z] is not None and kwargs.get('frame') in (None, 0, '0', ''))) and
not getattr(self, '_style', None)):
return super().getTile(
x, y, z, pilImageAllowed=pilImageAllowed,
numpyAllowed=numpyAllowed, sparseFallback=sparseFallback,
**kwargs)
frame = self._getFrame(**kwargs)
if frame < 0 or frame >= len(self._omebase['TiffData']):
msg = 'Frame does not exist'
raise TileSourceError(msg)
subdir = None
if self._omeLevels[z] is not None:
dirnum = int(self._omeLevels[z]['TiffData'][frame].get('IFD', frame))
else:
dirnum = int(self._omeLevels[-1]['TiffData'][frame].get('IFD', frame))
subdir = self.levels - 1 - z
dir = self._getDirFromCache(dirnum, subdir)
if subdir:
scale = int(2 ** subdir)
if (dir is None or
(dir.tileWidth != self.tileWidth and dir.tileWidth != dir.imageWidth) or
(dir.tileHeight != self.tileHeight and dir.tileHeight != dir.imageHeight) or
abs(dir.imageWidth * scale - self.sizeX) > scale or
abs(dir.imageHeight * scale - self.sizeY) > scale):
return super().getTile(
x, y, z, pilImageAllowed=pilImageAllowed,
numpyAllowed=numpyAllowed, sparseFallback=sparseFallback,
**kwargs)
try:
tile = dir.getTile(x, y, asarray=numpyAllowed == 'always')
format = 'JPEG'
if isinstance(tile, PIL.Image.Image):
format = TILE_FORMAT_PIL
if isinstance(tile, np.ndarray):
format = TILE_FORMAT_NUMPY
return self._outputTile(tile, format, x, y, z, pilImageAllowed,
numpyAllowed, **kwargs)
except InvalidOperationTiffError as e:
raise TileSourceError(e.args[0])
except IOTiffError as e:
return self.getTileIOTiffError(
x, y, z, pilImageAllowed=pilImageAllowed,
numpyAllowed=numpyAllowed, sparseFallback=sparseFallback,
exception=e, **kwargs)
[docs]
def getPreferredLevel(self, level):
"""
Given a desired level (0 is minimum resolution, self.levels - 1 is max
resolution), return the level that contains actual data that is no
lower resolution.
:param level: desired level
:returns level: a level with actual data that is no lower resolution.
"""
level = max(0, min(level, self.levels - 1))
baselevel = level
while self._tiffDirectories[level] is None and level < self.levels - 1:
try:
dirnum = int(self._omeLevels[-1]['TiffData'][0].get('IFD', 0))
subdir = self.levels - 1 - level
if self._getDirFromCache(dirnum, subdir):
break
except Exception:
pass
level += 1
while level - baselevel > self._maxSkippedLevels:
level -= self._maxSkippedLevels
return level
[docs]
def open(*args, **kwargs):
"""
Create an instance of the module class.
"""
return OMETiffFileTileSource(*args, **kwargs)
[docs]
def canRead(*args, **kwargs):
"""
Check if an input can be read by the module class.
"""
return OMETiffFileTileSource.canRead(*args, **kwargs)