# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Image loading service for RadioViz application.
This module provides asynchronous image loading capabilities supporting multiple
file formats including TIFF, XYZ, and standard image formats. It implements
background thread workers with progress reporting and cancellation support.
"""
from __future__ import annotations
from functools import partial
from io import BytesIO
from pathlib import Path
from typing import Any, cast
import numpy as np
import tifffile
from imageio.v3 import imread
from PySide6.QtCore import QObject, QThread, Signal
from skimage.transform import downscale_local_mean
from superqt.utils import thread_worker
from radioviz.services.tiff_metadata import extract_tiff_metadata, extract_xyz_dat_metadata
from radioviz.services.xyz_format import decode_xyz_bytes
@thread_worker
def downscale_image(input_image: np.ndarray, factor: int) -> np.ndarray:
return cast(np.ndarray, downscale_local_mean(input_image, factor)) # type: ignore[no-untyped-call]
[docs]
class _ImageLoaderWorker(QThread):
"""
Worker thread for loading images asynchronously.
This class handles the actual image loading process in a separate thread,
providing progress updates and emitting signals when loading is complete
or fails.
.. seealso::
:class:`ImageLoaderService` for the main service interface
"""
image_loaded = Signal(Path, object, dict)
"""Signal emitted when image loading is successful.
:param path: Path to the loaded image file
:type path: Path
:param array: Loaded image data as numpy array
:type array: numpy.ndarray
:param metadata: Extracted image metadata
:type metadata: dict[str, Any]
"""
failed = Signal(Path, Exception)
"""Signal emitted when image loading fails.
:param path: Path to the image file that failed to load
:type path: Path
:param exception: Exception that occurred during loading
:type exception: Exception
"""
progress = Signal(int)
"""Signal emitted to report loading progress.
:param percentage: Loading progress percentage (0-100)
:type percentage: int
"""
def __init__(self, path: Path) -> None:
"""
Initialize the image loader worker.
:param path: Path to the image file to load
:type path: Path
"""
super().__init__()
self.path: Path = Path(path)
[docs]
def run(self) -> None:
"""
Execute the image loading operation.
This method runs in a separate thread and performs the actual file reading
and image loading. It reports progress and emits appropriate signals based
on the outcome of the operation.
"""
try:
file_size = self.path.stat().st_size
bytes_read = bytearray()
chunk_size = 1024 * 64
with open(self.path, 'rb') as f:
while True:
if self.isInterruptionRequested():
self.failed.emit(self.path, RuntimeError('Load cancelled'))
return
chunk = f.read(chunk_size)
if not chunk:
break
bytes_read.extend(chunk)
percent = int((len(bytes_read) / file_size) * 100)
self.progress.emit(percent)
metadata: dict[str, Any] = {}
suffix = self.path.suffix.lower()
arr: np.ndarray
if suffix == '.xyz':
arr = decode_xyz_bytes(bytes(bytes_read))
dat_path = self.path.with_suffix('.dat')
if dat_path.exists():
try:
metadata, warnings = extract_xyz_dat_metadata(dat_path, image_shape=arr.shape)
if warnings:
metadata['_warnings'] = warnings
except Exception:
metadata = {}
elif suffix in {'.tif', '.tiff'}:
metadata = extract_tiff_metadata(bytes(bytes_read))
arr = tifffile.imread(BytesIO(bytes_read))
else:
arr = imread(BytesIO(bytes_read))
self.image_loaded.emit(self.path, arr, metadata)
except Exception as e:
self.failed.emit(self.path, e)
[docs]
def cancel(self) -> None:
"""
Request cancellation of the image loading operation.
This method requests interruption of the worker thread, which will cause
the loading to stop and emit a failure signal with a RuntimeError.
"""
self.requestInterruption()
[docs]
class ImageLoaderService(QObject):
"""
Service for managing asynchronous image loading operations.
This service provides methods to start image loading operations in background
threads, track their progress, and handle completion or failure events. It
supports cancellation of ongoing operations and manages the lifecycle of
worker threads.
.. seealso::
:class:`_ImageLoaderWorker` for the internal worker implementation
"""
image_loaded = Signal(Path, object, dict)
"""Signal emitted when an image is successfully loaded.
:param path: Path to the loaded image file
:type path: Path
:param array: Loaded image data as numpy array
:type array: numpy.ndarray
:param metadata: Extracted image metadata
:type metadata: dict[str, Any]
"""
image_load_failed = Signal(Path, Exception)
"""Signal emitted when image loading fails.
:param path: Path to the image file that failed to load
:type path: Path
:param exception: Exception that occurred during loading
:type exception: Exception
"""
progress = Signal(Path, int)
"""Signal emitted to report loading progress for a specific image.
:param path: Path to the image file being loaded
:type path: Path
:param percentage: Loading progress percentage (0-100)
:type percentage: int
"""
def __init__(self) -> None:
"""
Initialize the image loader service.
Creates internal data structures to track active workers and canceled
operations.
"""
super().__init__()
self._workers: dict[Path, _ImageLoaderWorker] = {}
self._canceled: set[Path] = set()
[docs]
def load(self, path: Path) -> None:
"""
Start loading an image in a background thread.
This method creates a new worker thread to load the specified image file
asynchronously. Progress updates and completion/failure signals are
forwarded through the service's signals.
:param path: Path to the image file to load
:type path: Path
"""
self._canceled.discard(path) # if it is in the canceled set, remove it
worker = _ImageLoaderWorker(path)
worker.setParent(self)
worker.image_loaded.connect(self.image_loaded)
worker.progress.connect(partial(self._forward_progress, worker.path))
worker.failed.connect(self.image_load_failed)
worker.finished.connect(partial(self.clean_up, worker))
worker.start()
self._workers[path] = worker
[docs]
def _forward_progress(self, path: Path, progress: int) -> None:
"""
Forward progress signal from worker to service observers.
This internal method ensures that progress updates are only forwarded
if the operation hasn't been canceled.
:param path: Path to the image file being loaded
:type path: Path
:param progress: Loading progress percentage (0-100)
:type progress: int
"""
if path in self._canceled:
return
self.progress.emit(path, progress)
[docs]
def clean_up(self, worker: _ImageLoaderWorker) -> None:
"""
Clean up resources after worker thread finishes.
Removes the worker from tracking structures and schedules it for deletion.
:param worker: The worker thread that finished
:type worker: _ImageLoaderWorker
"""
path = worker.path
self._workers.pop(path, None)
self._canceled.discard(path)
worker.deleteLater()
[docs]
def cancel(self, path: Path) -> None:
"""
Cancel an ongoing image loading operation.
If there is an active worker for the specified path, this method will
request cancellation of that operation.
:param path: Path to the image file whose loading should be cancelled
:type path: Path
"""
if path in self._workers:
self._canceled.add(path)
self._workers[path].cancel()