import concurrent.futures
import datetime
import fractions
import json
import logging
import math
import os
import re
import struct
import threading
import time
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version as _importlib_version
from tempfile import TemporaryDirectory
import numpy as np
import tifftools
import large_image
from large_image.tilesource.utilities import (_gdalParameters,
_newFromFileLock, _vipsCast,
_vipsParameters)
from . import format_aperio
pyvips = None
try:
__version__ = _importlib_version(__name__)
except PackageNotFoundError:
# package is not installed
pass
logger = logging.getLogger('large-image-converter')
FormatModules = {
'aperio': format_aperio,
}
# Estimated maximum memory use per frame conversion. Used to limit concurrent
# frame conversions.
FrameMemoryEstimate = 3 * 1024 ** 3
def _use_associated_image(key, **kwargs):
"""
Check if an associated image key should be used. If a list of images to
keep was specified, it must match at least one of the regex in that list.
If a list of images to exclude was specified, it must not any regex in that
list. The exclude list takes priority.
"""
if kwargs.get('_exclude_associated'):
for exp in kwargs['_exclude_associated']:
if re.match(exp, key):
return False
if kwargs.get('_keep_associated'):
for exp in kwargs['_keep_associated']:
if re.match(exp, key):
return True
return False
return True
def _data_from_large_image(path, outputPath, **kwargs):
"""
Check if the input file can be read by installed large_image tile sources.
If so, return the metadata, internal metadata, and extract each associated
image.
:param path: path of the file.
:param outputPath: the name of a temporary output file.
:returns: a dictionary of metadata, internal_metadata, and images. images
is a dictionary of keys and paths. Returns None if the path is not
readable by large_image.
"""
_import_pyvips()
if not path.startswith('large_image://test'):
try:
ts = large_image.open(path, noCache=True)
except Exception:
return
else:
import urllib.parse
tsparams = {
k: int(v[0]) if v[0].isdigit() else v[0]
for k, v in urllib.parse.parse_qs(
path.split('?', 1)[1] if '?' in path else '').items()}
ts = large_image.open('large_image://test', **tsparams)
results = {
'metadata': ts.getMetadata(),
'internal_metadata': ts.getInternalMetadata(),
'images': {},
'tilesource': ts,
}
tasks = []
pool = _get_thread_pool(**kwargs)
for key in ts.getAssociatedImagesList():
if not _use_associated_image(key, **kwargs):
continue
try:
img, mime = ts.getAssociatedImage(key)
except Exception:
continue
savePath = outputPath + '-%s-%s.tiff' % (key, time.strftime('%Y%m%d-%H%M%S'))
# TODO: allow specifying quality separately from main image quality
_pool_add(tasks, (pool.submit(
_convert_via_vips, img, savePath, outputPath, mime=mime, forTiled=False), ))
results['images'][key] = savePath
_drain_pool(pool, tasks, 'associated images')
return results
def _generate_geotiff(inputPath, outputPath, **kwargs):
"""
Take a source input file, readable by gdal, and output a cloud-optimized
geotiff file. See https://gdal.org/drivers/raster/cog.html.
:param inputPath: the path to the input file or base file of a set.
:param outputPath: the path of the output file.
Optional parameters that can be specified in kwargs:
:param tileSize: the horizontal and vertical tile size.
:param compression: one of 'jpeg', 'deflate' (zip), 'lzw', or 'zstd'.
:param quality: a jpeg quality passed to vips. 0 is small, 100 is high
quality. 90 or above is recommended.
:param level: compression level for zstd, 1-22 (default is 10).
:param predictor: one of 'none', 'horizontal', 'float', or 'yes' used for
lzw and deflate.
"""
from osgeo import gdal, gdalconst
cmdopt = _gdalParameters(**kwargs)
cmd = ['gdal_translate', inputPath, outputPath] + cmdopt
logger.info('Convert to geotiff: %r', cmd)
try:
# subprocess.check_call(cmd)
ds = gdal.Open(inputPath, gdalconst.GA_ReadOnly)
gdal.Translate(outputPath, ds, options=cmdopt)
except Exception:
os.unlink(outputPath)
raise
def _generate_multiframe_tiff(inputPath, outputPath, tempPath, lidata, **kwargs):
"""
Take a source input file with multiple frames and output a multi-pyramidal
tiff file.
:param inputPath: the path to the input file or base file of a set.
:param outputPath: the path of the output file.
:param tempPath: a temporary file in a temporary directory.
:param lidata: data from a large_image tilesource including associated
images.
Optional parameters that can be specified in kwargs:
:param tileSize: the horizontal and vertical tile size.
:param compression: one of 'jpeg', 'deflate' (zip), 'lzw', 'packbits',
'zstd', or 'jp2k'.
:param quality: a jpeg quality passed to vips. 0 is small, 100 is high
quality. 90 or above is recommended.
:param level: compression level for zstd, 1-22 (default is 10).
:param predictor: one of 'none', 'horizontal', or 'float' used for lzw and
deflate.
"""
_import_pyvips()
with _newFromFileLock:
image = pyvips.Image.new_from_file(inputPath)
width = image.width
height = image.height
pages = 1
if 'n-pages' in image.get_fields():
pages = image.get_value('n-pages')
# Now check if there are other images we need to convert or preserve
outputList = []
imageSizes = []
tasks = []
pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate, **kwargs)
onlyFrame = int(kwargs.get('onlyFrame')) if str(kwargs.get('onlyFrame')).isdigit() else None
frame = 0
# Process each image separately to pyramidize it
for page in range(pages):
subInputPath = inputPath + '[page=%d]' % page
with _newFromFileLock:
subImage = pyvips.Image.new_from_file(subInputPath)
imageSizes.append((subImage.width, subImage.height, subInputPath, page))
if subImage.width != width or subImage.height != height:
if subImage.width * subImage.height <= width * height:
continue
logger.info('Bigger image found (was %dx%d, now %dx%d)',
width, height, subImage.width, subImage.height)
for path in outputList:
os.unlink(path)
width = subImage.width
height = subImage.height
frame += 1
if onlyFrame is not None and onlyFrame + 1 != frame:
continue
subOutputPath = tempPath + '-%d-%s.tiff' % (
page + 1, time.strftime('%Y%m%d-%H%M%S'))
_pool_add(tasks, (pool.submit(
_convert_via_vips, subInputPath, subOutputPath, tempPath,
status='%d/%d' % (page, pages), **kwargs), ))
outputList.append(subOutputPath)
extraImages = {}
if not lidata or not len(lidata['images']):
# If we couldn't extract images from li, try to detect non-primary
# images from the original file. These are any images who size is
# not a power of two division of the primary image size
possibleSizes = _list_possible_sizes(width, height)
for w, h, subInputPath, page in imageSizes:
if (w, h) not in possibleSizes:
key = 'image_%d' % page
if not _use_associated_image(key, **kwargs):
continue
savePath = tempPath + '-%s-%s.tiff' % (key, time.strftime('%Y%m%d-%H%M%S'))
_pool_add(tasks, (pool.submit(
_convert_via_vips, subInputPath, savePath, tempPath, False), ))
extraImages[key] = savePath
_drain_pool(pool, tasks, 'subpage')
_output_tiff(outputList, outputPath, tempPath, lidata, extraImages, **kwargs)
def _generate_tiff(inputPath, outputPath, tempPath, lidata, **kwargs):
"""
Take a source input file, readable by vips, and output a pyramidal tiff
file.
:param inputPath: the path to the input file or base file of a set.
:param outputPath: the path of the output file.
:param tempPath: a temporary file in a temporary directory.
:param lidata: data from a large_image tilesource including associated
images.
Optional parameters that can be specified in kwargs:
:param tileSize: the horizontal and vertical tile size.
:param compression: one of 'jpeg', 'deflate' (zip), 'lzw', 'packbits',
'zstd', or 'jp2k'.
:param quality: a jpeg quality passed to vips. 0 is small, 100 is high
quality. 90 or above is recommended.
:param level: compression level for zstd, 1-22 (default is 10).
:param predictor: one of 'none', 'horizontal', or 'float' used for lzw and
deflate.
"""
_import_pyvips()
subOutputPath = tempPath + '-%s.tiff' % (time.strftime('%Y%m%d-%H%M%S'))
_convert_via_vips(inputPath, subOutputPath, tempPath, **kwargs)
_output_tiff([subOutputPath], outputPath, tempPath, lidata, **kwargs)
def _convert_via_vips(inputPathOrBuffer, outputPath, tempPath, forTiled=True,
status=None, **kwargs):
"""
Convert a file, buffer, or vips image to a tiff file. This is equivalent
to a vips command line of
vips tiffsave <input path> <output path>
followed by the convert params in the form of --<key>[=<value>] where no
value needs to be specified if they are True.
:param inputPathOrBuffer: a file path, bytes object, or vips image.
:param outputPath: the name of the file to save.
:param tempPath: a directory where temporary files may be stored. vips
also stores files in TMPDIR
:param forTiled: True if the output should be tiled, false if not.
:param status: an optional additional string to add to log messages.
:param kwargs: addition arguments that get passed to _vipsParameters
and _convert_to_jp2k.
"""
_import_pyvips()
convertParams = _vipsParameters(forTiled, **kwargs)
status = (', ' + status) if status else ''
if isinstance(inputPathOrBuffer, pyvips.vimage.Image):
source = 'vips image'
image = inputPathOrBuffer
elif isinstance(inputPathOrBuffer, bytes):
source = 'buffer'
image = pyvips.Image.new_from_buffer(inputPathOrBuffer, '')
else:
source = inputPathOrBuffer
with _newFromFileLock:
image = pyvips.Image.new_from_file(inputPathOrBuffer)
logger.info('Input: %s, Output: %s, Options: %r%s',
source, outputPath, convertParams, status)
image = image.autorot()
adjusted = format_hook('modify_vips_image_before_output', image, convertParams, **kwargs)
if adjusted is False:
return
elif adjusted:
image = adjusted
if (convertParams['compression'] not in {'jpeg'} or
image.interpretation != pyvips.Interpretation.SCRGB):
# jp2k compression supports more than 8-bits per sample, but the
# decompressor claims this is unsupported.
image = _vipsCast(
image,
convertParams['compression'] in {'webp', 'jpeg'} or
kwargs.get('compression') in {'jp2k'})
# TODO: revisit the TMPDIR override; this is not thread safe
# oldtmpdir = os.environ.get('TMPDIR')
# os.environ['TMPDIR'] = os.path.dirname(tempPath)
# try:
# image.write_to_file(outputPath, **convertParams)
# finally:
# if oldtmpdir is not None:
# os.environ['TMPDIR'] = oldtmpdir
# else:
# del os.environ['TMPDIR']
image.write_to_file(outputPath, **convertParams)
if kwargs.get('compression') == 'jp2k':
_convert_to_jp2k(outputPath, **kwargs)
def _convert_to_jp2k_tile(lock, fptr, dest, offset, length, shape, dtype, jp2kargs):
"""
Read an uncompressed tile from a file and save it as a JP2000 file.
:param lock: a lock to ensure exclusive access to the file.
:param fptr: a pointer to the open file.
:param dest: the output path for the jp2k file.
:param offset: the location in the input file with the data.
:param length: the number of bytes to read.
:param shape: a tuple with the shape of the tile to read. This is usually
(height, width, channels).
:param dtype: the numpy dtype of the data in the tile.
:param jp2kargs: arguments to pass to the compression, such as psnr or
cratios.
"""
import glymur
with lock:
fptr.seek(offset)
data = fptr.read(length)
data = np.frombuffer(data, dtype=dtype)
data = np.reshape(data, shape)
glymur.Jp2k(dest, data=data, **jp2kargs)
def _concurrency_to_value(_concurrency=None, **kwargs):
"""
Convert the _concurrency value to a number of cpus.
:param _concurrency: a positive value for a set number of cpus. <= 0 for
the number of logical cpus less that amount. None is the same as 0.
:returns: the number of cpus.
"""
_concurrency = int(_concurrency) if str(_concurrency).isdigit() else 0
if _concurrency > 0:
return _concurrency
return max(1, large_image.config.cpu_count(logical=True) + _concurrency)
def _get_thread_pool(memoryLimit=None, parentConcurrency=None, numItems=None, **kwargs):
"""
Allocate a thread pool based on the specific concurrency.
:param memoryLimit: if not None, limit the concurrency to no more than one
process per memoryLimit bytes of total memory.
"""
concurrency = _concurrency_to_value(**kwargs)
if parentConcurrency and parentConcurrency > 1 and concurrency > 1:
concurrency = max(1, int(math.ceil(concurrency / parentConcurrency)))
if memoryLimit:
if parentConcurrency:
memoryLimit *= parentConcurrency
concurrency = min(concurrency, large_image.config.total_memory() // memoryLimit)
if numItems and numItems >= 1 and concurrency > numItems:
concurrency = numItems
concurrency = max(1, concurrency)
return concurrent.futures.ThreadPoolExecutor(max_workers=concurrency)
def _pool_log(left, total, label):
"""
Log processing within a pool.
:param left: units left to process.
:param total: total units left to process.
:param label: label to log describing what is being processed.
"""
if not hasattr(logger, '_pool_log_starttime'):
logger._pool_log_starttime = time.time()
if not hasattr(logger, '_pool_log_lastlog'):
logger._pool_log_lastlog = time.time()
if time.time() - logger._pool_log_lastlog < 10:
return
elapsed = time.time() - logger._pool_log_starttime
logger.debug('%d/%d %s left %4.2fs', left, total, label, elapsed)
logger._pool_log_lastlog = time.time()
def _pool_add(tasks, newtask):
"""
Add a new task to a pool, then drain any finished tasks at the start of the
pool.
:param tasks: a list containing either lists or tuples, the last element
of which is a task submitted to the pool. Altered.
:param newtask: a list or tuple to add to the pool.
"""
tasks.append(newtask)
while len(tasks):
try:
tasks[0][-1].result(0)
except concurrent.futures.TimeoutError:
break
tasks.pop(0)
def _drain_pool(pool, tasks, label=''):
"""
Wait for all tasks in a pool to complete, then shutdown the pool.
:param pool: a concurrent futures pool.
:param tasks: a list containing either lists or tuples, the last element
of which is a task submitted to the pool. Altered.
"""
numtasks = len(tasks)
_pool_log(len(tasks), numtasks, label)
while len(tasks):
# This allows better stopping on a SIGTERM
try:
tasks[0][-1].result(0.1)
except concurrent.futures.TimeoutError:
continue
tasks.pop(0)
_pool_log(len(tasks), numtasks, label)
pool.shutdown(False)
def _convert_to_jp2k(path, **kwargs):
"""
Given a tiled tiff file without compression, convert it to jp2k compression
using the gylmur library. This expects a tiff as written by vips without
any subifds.
:param path: the path of the tiff file. The file is altered.
:param psnr: if set, the target psnr. 0 for lossless.
:param cr: is set, the target compression ratio. 1 for lossless.
"""
info = tifftools.read_tiff(path)
jp2kargs = {}
if 'psnr' in kwargs:
jp2kargs['psnr'] = [int(kwargs['psnr'])]
elif 'cr' in kwargs:
jp2kargs['cratios'] = [int(kwargs['cr'])]
tilecount = sum(len(ifd['tags'][tifftools.Tag.TileOffsets.value]['data'])
for ifd in info['ifds'])
processed = 0
lastlog = 0
tasks = []
lock = threading.Lock()
pool = _get_thread_pool(**kwargs)
with open(path, 'r+b') as fptr:
for ifd in info['ifds']:
ifd['tags'][tifftools.Tag.Compression.value]['data'][0] = (
tifftools.constants.Compression.JP2000)
shape = (
ifd['tags'][tifftools.Tag.TileWidth.value]['data'][0],
ifd['tags'][tifftools.Tag.TileLength.value]['data'][0],
len(ifd['tags'][tifftools.Tag.BitsPerSample.value]['data']))
dtype = np.uint16 if ifd['tags'][
tifftools.Tag.BitsPerSample.value]['data'][0] == 16 else np.uint8
for idx, offset in enumerate(ifd['tags'][tifftools.Tag.TileOffsets.value]['data']):
tmppath = path + '%d.jp2k' % processed
tasks.append((ifd, idx, processed, tmppath, pool.submit(
_convert_to_jp2k_tile, lock, fptr, tmppath, offset,
ifd['tags'][tifftools.Tag.TileByteCounts.value]['data'][idx],
shape, dtype, jp2kargs)))
processed += 1
while len(tasks):
try:
tasks[0][-1].result(0.1)
except concurrent.futures.TimeoutError:
continue
ifd, idx, processed, tmppath, task = tasks.pop(0)
data = open(tmppath, 'rb').read()
os.unlink(tmppath)
# Remove first comment marker. It adds needless bytes
compos = data.find(b'\xff\x64')
if compos >= 0 and compos + 4 < len(data):
comlen = struct.unpack('>H', data[compos + 2:compos + 4])[0]
if compos + 2 + comlen + 1 < len(data) and data[compos + 2 + comlen] == 0xff:
data = data[:compos] + data[compos + 2 + comlen:]
with lock:
fptr.seek(0, os.SEEK_END)
ifd['tags'][tifftools.Tag.TileOffsets.value]['data'][idx] = fptr.tell()
ifd['tags'][tifftools.Tag.TileByteCounts.value]['data'][idx] = len(data)
fptr.write(data)
if time.time() - lastlog >= 10 and tilecount > 1:
logger.debug('Converted %d of %d tiles to jp2k', processed + 1, tilecount)
lastlog = time.time()
pool.shutdown(False)
fptr.seek(0, os.SEEK_END)
for ifd in info['ifds']:
ifd['size'] = fptr.tell()
info['size'] = fptr.tell()
tmppath = path + '.jp2k.tiff'
tifftools.write_tiff(info, tmppath, bigtiff=False, allowExisting=True)
os.unlink(path)
os.rename(tmppath, path)
def _convert_large_image_tile(tilelock, strips, tile):
"""
Add a single tile to a list of strips for a vips image so that they can be
composited together.
:param tilelock: a lock for thread safety.
:param strips: an array of strips to adds to the final vips image.
:param tile: a tileIterator tile.
"""
data = tile['tile']
if data.dtype.char not in large_image.constants.dtypeToGValue:
data = data.astype('d')
vimg = pyvips.Image.new_from_memory(
np.ascontiguousarray(data).data,
data.shape[1], data.shape[0], data.shape[2],
large_image.constants.dtypeToGValue[data.dtype.char])
vimgTemp = pyvips.Image.new_temp_file('%s.v')
vimg.write(vimgTemp)
vimg = vimgTemp
x = tile['x']
ty = tile['tile_position']['level_y']
with tilelock:
while len(strips) <= ty:
strips.append(None)
if strips[ty] is None:
strips[ty] = vimg
if not x:
return
if vimg.bands > strips[ty].bands:
vimg = vimg[:strips[ty].bands]
elif strips[ty].bands > vimg.bands:
strips[ty] = strips[ty][:vimg.bands]
strips[ty] = strips[ty].insert(vimg, x, 0, expand=True)
def _convert_large_image_frame(frame, numFrames, ts, frameOutputPath, tempPath,
parentConcurrency=None, **kwargs):
"""
Convert a single frame from a large_image source. This parallelizes tile
reads. Once all tiles are converted to a composited vips image, a tiff
file is generated.
:param frame: the 0-based frame number.
:param numFrames: the total number of frames; used for logging.
:param ts: the open tile source.
:param frameOutputPath: the destination name for the tiff file.
:param tempPath: a temporary file in a temporary directory.
:param parentConcurrency: amount of concurrency used by parent task.
"""
# The iterator tile size is a balance between memory use and fewer calls
# and file handles.
_iterTileSize = 4096
logger.info('Processing frame %d/%d', frame + 1, numFrames)
strips = []
pool = _get_thread_pool(
memoryLimit=FrameMemoryEstimate,
# allow multiple tiles even if we are using all the cores, as it
# balances I/O and computation
parentConcurrency=(parentConcurrency // 2),
**kwargs)
tasks = []
tilelock = threading.Lock()
for tile in ts.tileIterator(tile_size=dict(width=_iterTileSize), frame=frame):
_pool_add(tasks, (pool.submit(_convert_large_image_tile, tilelock, strips, tile), ))
_drain_pool(pool, tasks, f'tiles from frame {frame + 1}/{numFrames}')
minbands = min(strip.bands for strip in strips)
maxbands = max(strip.bands for strip in strips)
if minbands != maxbands:
strips = [strip[:minbands] for strip in strips]
img = strips[0]
for stripidx in range(1, len(strips)):
img = img.insert(strips[stripidx], 0, stripidx * _iterTileSize, expand=True)
_convert_via_vips(
img, frameOutputPath, tempPath, status='%d/%d' % (frame + 1, numFrames), **kwargs)
def _convert_large_image(inputPath, outputPath, tempPath, lidata, **kwargs):
"""
Take a large_image source and convert it by resaving each tiles image with
vips.
:param inputPath: the path to the input file or base file of a set.
:param outputPath: the path of the output file.
:param tempPath: a temporary file in a temporary directory.
:param lidata: data from a large_image tilesource including associated
images.
"""
ts = lidata['tilesource']
numFrames = len(lidata['metadata'].get('frames', [0]))
outputList = []
tasks = []
startFrame = 0
endFrame = numFrames
if kwargs.get('onlyFrame') is not None and str(kwargs.get('onlyFrame')):
startFrame = int(kwargs.get('onlyFrame'))
endFrame = startFrame + 1
pool = _get_thread_pool(memoryLimit=FrameMemoryEstimate,
numItems=endFrame - startFrame, **kwargs)
for frame in range(startFrame, endFrame):
frameOutputPath = tempPath + '-%d-%s.tiff' % (
frame + 1, time.strftime('%Y%m%d-%H%M%S'))
_pool_add(tasks, (pool.submit(
_convert_large_image_frame, frame, numFrames, ts, frameOutputPath,
tempPath, pool._max_workers, **kwargs), ))
outputList.append(frameOutputPath)
_drain_pool(pool, tasks, 'frames')
_output_tiff(outputList, outputPath, tempPath, lidata, **kwargs)
def _output_tiff(inputs, outputPath, tempPath, lidata, extraImages=None, **kwargs):
"""
Given a list of input tiffs and data as parsed by _data_from_large_image,
generate an output tiff file with the associated images, correct scale, and
other metadata.
:param inputs: a list of pyramidal input files.
:param outputPath: the final destination.
:param tempPath: a temporary file in a temporary directory.
:param lidata: large_image data including metadata and associated images.
:param extraImages: an optional dictionary of keys and paths to add as
extra associated images.
"""
logger.debug('Reading %s', inputs[0])
info = tifftools.read_tiff(inputs[0])
ifdIndices = [0]
imgDesc = info['ifds'][0]['tags'].get(tifftools.Tag.ImageDescription.value)
description = _make_li_description(
len(info['ifds']), len(inputs), lidata,
(len(extraImages) if extraImages else 0) + (len(lidata['images']) if lidata else 0),
imgDesc['data'] if imgDesc else None, **kwargs)
info['ifds'][0]['tags'][tifftools.Tag.ImageDescription.value] = {
'data': description,
'datatype': tifftools.Datatype.ASCII,
}
if lidata:
_set_resolution(info['ifds'], lidata['metadata'])
if len(inputs) > 1:
if kwargs.get('subifds') is not False:
info['ifds'][0]['tags'][tifftools.Tag.SubIFD.value] = {
'ifds': info['ifds'][1:],
}
info['ifds'][1:] = []
for idx, inputPath in enumerate(inputs):
if not idx:
continue
logger.debug('Reading %s', inputPath)
nextInfo = tifftools.read_tiff(inputPath)
if lidata:
_set_resolution(nextInfo['ifds'], lidata['metadata'])
if len(lidata['metadata'].get('frames', [])) > idx:
nextInfo['ifds'][0]['tags'][tifftools.Tag.ImageDescription.value] = {
'data': json.dumps(
{'frame': lidata['metadata']['frames'][idx]},
separators=(',', ':'), sort_keys=True, default=json_serial),
'datatype': tifftools.Datatype.ASCII,
}
ifdIndices.append(len(info['ifds']))
if kwargs.get('subifds') is not False:
nextInfo['ifds'][0]['tags'][tifftools.Tag.SubIFD.value] = {
'ifds': nextInfo['ifds'][1:],
}
info['ifds'].append(nextInfo['ifds'][0])
else:
info['ifds'].extend(nextInfo['ifds'])
ifdIndices.append(len(info['ifds']))
assocList = []
if lidata:
assocList += list(lidata['images'].items())
if extraImages:
assocList += list(extraImages.items())
for key, assocPath in assocList:
assocInfo = tifftools.read_tiff(assocPath)
assocInfo['ifds'][0]['tags'][tifftools.Tag.ImageDescription.value] = {
'data': key,
'datatype': tifftools.Datatype.ASCII,
}
info['ifds'] += assocInfo['ifds']
if format_hook('modify_tiff_before_write', info, ifdIndices, tempPath,
lidata, **kwargs) is False:
return
logger.debug('Writing %s', outputPath)
tifftools.write_tiff(info, outputPath, bigEndian=False, bigtiff=False, allowExisting=True)
def _set_resolution(ifds, metadata):
"""
Given metadata with a scale in mm_x and/or mm_y, set the resolution for
each ifd, assuming that each one is half the scale of the previous one.
:param ifds: a list of ifds from a single pyramid. The resolution may be
set on each one.
:param metadata: metadata with a scale specified by mm_x and/or mm_y.
"""
if metadata.get('mm_x') or metadata.get('mm_y'):
for idx, ifd in enumerate(ifds):
ifd['tags'][tifftools.Tag.ResolutionUnit.value] = {
'data': [tifftools.constants.ResolutionUnit.Centimeter],
'datatype': tifftools.Datatype.SHORT,
}
for mkey, tkey in (('mm_x', 'XResolution'), ('mm_y', 'YResolution')):
if metadata[mkey]:
val = fractions.Fraction(
10.0 / (metadata[mkey] * 2 ** idx)).limit_denominator()
if val.numerator >= 2**32 or val.denominator >= 2**32:
origval = val
denom = 1000000
while val.numerator >= 2**32 or val.denominator >= 2**32 and denom > 1:
denom = int(denom / 10)
val = origval.limit_denominator(denom)
if val.numerator >= 2**32 or val.denominator >= 2**32:
continue
ifd['tags'][tifftools.Tag[tkey].value] = {
'data': [val.numerator, val.denominator],
'datatype': tifftools.Datatype.RATIONAL,
}
def _import_pyvips():
"""
Import pyvips on demand.
"""
global pyvips
if pyvips is None:
import pyvips
def _is_eightbit(path, tiffinfo=None):
"""
Check if a path has an unsigned 8-bit per sample data size. If any known
channel is otherwise or this is unknown, this returns False.
:param path: The path to the file
:param tiffinfo: data extracted from tifftools.read_tiff(path).
:returns: True if known to be 8 bits per sample.
"""
if not tiffinfo:
return False
try:
if (tifftools.Tag.SampleFormat.value in tiffinfo['ifds'][0]['tags'] and
not all(val == tifftools.constants.SampleFormat.uint for val in
tiffinfo['ifds'][0]['tags'][tifftools.Tag.SampleFormat.value]['data'])):
return False
if tifftools.Tag.BitsPerSample.value in tiffinfo['ifds'][0]['tags'] and not all(
val == 8 for val in
tiffinfo['ifds'][0]['tags'][tifftools.Tag.BitsPerSample.value]['data']):
return False
except Exception:
return False
return True
def _is_lossy(path, tiffinfo=None):
"""
Check if a path uses lossy compression. This imperfectly just checks if
the file is a TIFF and stored in one of the JPEG formats.
:param path: The path to the file
:param tiffinfo: data extracted from tifftools.read_tiff(path).
:returns: True if known to be lossy.
"""
if not tiffinfo:
return False
try:
return bool(tifftools.constants.Compression[
tiffinfo['ifds'][0]['tags'][
tifftools.Tag.Compression.value]['data'][0]].lossy)
except Exception:
return False
def _is_multiframe(path):
"""
Check if a path is a multiframe file.
:param path: The path to the file
:returns: True if multiframe.
"""
_import_pyvips()
try:
with _newFromFileLock:
image = pyvips.Image.new_from_file(path)
except Exception:
try:
open(path, 'rb').read(1)
raise
except Exception:
logger.warning('Is the file reachable and readable? (%r)', path)
raise OSError(path) from None
pages = 1
if 'n-pages' in image.get_fields():
pages = image.get_value('n-pages')
return pages > 1
def _list_possible_sizes(width, height):
"""
Given a width and height, return a list of possible sizes that could be
reasonable powers-of-two smaller versions of that size. This includes
the values rounded up and down.
"""
results = [(width, height)]
pos = 0
while pos < len(results):
w, h = results[pos]
if w > 1 or h > 1:
w2f = int(math.floor(w / 2))
h2f = int(math.floor(h / 2))
w2c = int(math.ceil(w / 2))
h2c = int(math.ceil(h / 2))
for w2, h2 in [(w2f, h2f), (w2f, h2c), (w2c, h2f), (w2c, h2c)]:
if (w2, h2) not in results:
results.append((w2, h2))
pos += 1
return results
[docs]
def json_serial(obj):
"""
Fallback serializier for json. This serializes datetime objects to iso
format.
:param obj: an object to serialize.
:returns: a serialized string.
"""
if isinstance(obj, (datetime.datetime, datetime.date)):
return obj.isoformat()
return str(obj)
def _make_li_description(
framePyramidSize, numFrames, lidata=None, numAssociatedImages=0,
imageDescription=None, **kwargs):
"""
Given the number of frames, the number of levels per frame, the associated
image list, and any metadata from large_image, construct a json string with
information about the whole image.
:param framePyramidSize: the number of layers per frame.
:param numFrames: the number of frames.
:param lidata: the data returned from _data_from_large_image.
:param numAssociatedImages: the number of associated images.
:param imageDescription: if present, the original description.
:returns: a json string
"""
results = {
'large_image_converter': {
'conversion_epoch': time.time(),
'version': __version__,
'levels': framePyramidSize,
'frames': numFrames,
'associated': numAssociatedImages,
'arguments': {
k: v for k, v in kwargs.items()
if not k.startswith('_') and ('_' + k) not in kwargs and
k not in {'overwrite'}},
},
}
if lidata:
results['metadata'] = lidata['metadata']
if len(lidata['metadata'].get('frames', [])) >= 1:
results['frame'] = lidata['metadata']['frames'][0]
if len(lidata['metadata'].get('channels', [])) >= 1:
results['channels'] = lidata['metadata']['channels']
results['internal'] = lidata['internal_metadata']
if imageDescription:
results['image_description'] = imageDescription
return json.dumps(results, separators=(',', ':'), sort_keys=True, default=json_serial)
[docs]
def convert(inputPath, outputPath=None, **kwargs): # noqa: C901
"""
Take a source input file and output a pyramidal tiff file.
:param inputPath: the path to the input file or base file of a set.
:param outputPath: the path of the output file.
Optional parameters that can be specified in kwargs:
:param tileSize: the horizontal and vertical tile size.
:param format: one of 'tiff' or 'aperio'. Default is 'tiff'.
:param onlyFrame: None for all frames or the 0-based frame number to just
convert a single frame of the source.
:param compression: one of 'jpeg', 'deflate' (zip), 'lzw', 'packbits',
'zstd', or 'none'.
:param quality: a jpeg or webp quality passed to vips. 0 is small, 100 is
high quality. 90 or above is recommended. For webp, 0 is lossless.
:param level: compression level for zstd, 1-22 (default is 10) and deflate,
1-9.
:param predictor: one of 'none', 'horizontal', 'float', or 'yes' used for
lzw and deflate. Default is horizontal for non-geospatial data and yes
for geospatial.
:param psnr: psnr value for jp2k, higher results in large files. 0 is
lossless.
:param cr: jp2k compression ratio. 1 is lossless, 100 will try to make
a file 1% the size of the original, etc.
:param subifds: if True (the default), when creating a multi-frame file,
store lower resolution tiles in sub-ifds. If False, store all data in
primary ifds.
:param overwrite: if not True, throw an exception if the output path
already exists.
Additional optional parameters:
:param geospatial: if not None, a boolean indicating if this file is
geospatial. If not specified or None, this will be checked.
:param _concurrency: the number of cpus to use during conversion. None to
use the logical cpu count.
:returns: outputPath if successful
"""
logger._pool_log_starttime = time.time()
if kwargs.get('_concurrency'):
os.environ['VIPS_CONCURRENCY'] = str(_concurrency_to_value(**kwargs))
geospatial = kwargs.get('geospatial')
if geospatial is None:
geospatial = is_geospatial(inputPath)
logger.debug('Is file geospatial: %r', geospatial)
suffix = format_hook('adjust_params', geospatial, kwargs, **kwargs)
if suffix is False:
return
suffix = suffix or ('.tiff' if not geospatial else '.geo.tiff')
if not outputPath:
outputPath = os.path.splitext(inputPath)[0] + suffix
if outputPath.endswith('.geo' + suffix):
outputPath = outputPath[:len(outputPath) - len(suffix) - 4] + suffix
if outputPath == inputPath:
outputPath = (os.path.splitext(inputPath)[0] + '.' +
time.strftime('%Y%m%d-%H%M%S') + suffix)
if os.path.exists(outputPath) and not kwargs.get('overwrite'):
msg = 'Output file already exists.'
raise Exception(msg)
try:
tiffinfo = tifftools.read_tiff(inputPath)
except Exception:
tiffinfo = None
eightbit = _is_eightbit(inputPath, tiffinfo)
if not kwargs.get('compression', None):
kwargs = kwargs.copy()
lossy = _is_lossy(inputPath, tiffinfo)
logger.debug('Is file lossy: %r', lossy)
logger.debug('Is file 8 bits per samples: %r', eightbit)
kwargs['_compression'] = None
kwargs['compression'] = 'jpeg' if lossy and eightbit else 'lzw'
if geospatial:
_generate_geotiff(inputPath, outputPath, eightbit=eightbit or None, **kwargs)
else:
with TemporaryDirectory() as tempDir:
tempPath = os.path.join(tempDir, os.path.basename(outputPath))
lidata = _data_from_large_image(str(inputPath), tempPath, **kwargs)
logger.log(logging.DEBUG - 1, 'large_image information for %s: %r',
inputPath, lidata)
if lidata and (not is_vips(
inputPath, (lidata['metadata']['sizeX'], lidata['metadata']['sizeY'])) or (
len(lidata['metadata'].get('frames', [])) >= 2 and
not _is_multiframe(inputPath))):
_convert_large_image(inputPath, outputPath, tempPath, lidata, **kwargs)
elif _is_multiframe(inputPath):
_generate_multiframe_tiff(inputPath, outputPath, tempPath, lidata, **kwargs)
else:
try:
_generate_tiff(inputPath, outputPath, tempPath, lidata, **kwargs)
except Exception:
if lidata:
_convert_large_image(inputPath, outputPath, tempPath, lidata, **kwargs)
return outputPath
[docs]
def is_geospatial(path):
"""
Check if a path is likely to be a geospatial file.
:param path: The path to the file
:returns: True if geospatial.
"""
try:
from osgeo import gdal, gdalconst
except ImportError:
logger.warning('Cannot import GDAL.')
return False
gdal.UseExceptions()
try:
ds = gdal.Open(path, gdalconst.GA_ReadOnly)
except Exception:
return False
if ds and (
(ds.GetGCPs() and ds.GetGCPProjection()) or
ds.GetProjection() or
ds.GetDriver().ShortName in {'NITF', 'netCDF'}):
return True
return False
[docs]
def is_vips(path, matchSize=None):
"""
Check if a path is readable by vips.
:param path: The path to the file
:param matchSize: if not None, the image read by vips must be the specified
(width, height) tuple in pixels.
:returns: True if readable by vips.
"""
_import_pyvips()
try:
with _newFromFileLock:
image = pyvips.Image.new_from_file(path)
# image(0, 0) will throw if vips can't decode the image
if not image.width or not image.height or image(0, 0) is None:
return False
if matchSize and (matchSize[0] != image.width or matchSize[1] != image.height):
return False
except Exception:
return False
return True