Source code for radioviz.tools.tool_discovery
# Copyright 2025–2026 European Union
# Author: Bulgheroni Antonio (antonio.bulgheroni@ec.europa.eu)
# SPDX-License-Identifier: EUPL-1.2
"""
Tool discovery utilities for RadioViz.
This module discovers Tool implementations via Python entry points and
fallback built-ins. External developers can register tools using the
"radioviz.tools" entry point group.
"""
from __future__ import annotations
import sys
from importlib import import_module
from importlib.metadata import EntryPoint, entry_points
from typing import Callable, Iterable, Optional, cast
from radioviz.tools.base_tool import Tool
from radioviz.typing.tool_types import ToolProtocol
ENTRY_POINT_GROUP = 'radioviz.tools'
_BUILTIN_TOOL_PATHS = [
'radioviz.tools.level_tool:LevelTool',
'radioviz.tools.profile_tool:ProfileTool',
'radioviz.tools.region_tool:RegionTool',
'radioviz.tools.measurement_tool:MeasurementTool',
'radioviz.tools.autocrop_tool:AutocropTool',
'radioviz.tools.manual_crop_tool:ManualCropTool',
'radioviz.tools.align_image_tool:AlignImageTool',
'radioviz.tools.flip_image_tool:FlipImageTool',
]
"""List of built-ins tools."""
[docs]
def discover_tools(
include_builtins: bool = True,
include_entry_points: bool = True,
progress_cb: Optional[Callable[[str, int, int], None]] = None,
) -> list[ToolProtocol]:
"""
Discover and instantiate tools.
Tools can be provided either as subclasses of Tool or as callables
returning a Tool instance.
:param include_builtins: Whether to include built-in tools as fallback
:param include_entry_points: Whether to load entry-point tools
:param progress_cb: Optional callback invoked as tools are loaded
:return: List of instantiated tools
"""
tools: list[ToolProtocol] = []
seen_ids: set[str] = set()
entry_points_list = list(_iter_entry_points(ENTRY_POINT_GROUP)) if include_entry_points else []
builtin_paths = _BUILTIN_TOOL_PATHS if include_builtins else []
if include_entry_points:
total_plugins = len(entry_points_list)
for idx, ep in enumerate(entry_points_list, start=1):
_report_progress(progress_cb, 'external', idx, total_plugins)
_load_tool_from_ep(ep, tools, seen_ids)
if include_builtins:
total_builtins = len(builtin_paths)
for idx, path in enumerate(builtin_paths, start=1):
_report_progress(progress_cb, 'builtin', idx, total_builtins)
_load_tool_from_path(path, tools, seen_ids)
return tools
[docs]
def _iter_entry_points(group: str) -> Iterable[EntryPoint]:
"""
Retrieve entry points for the specified *group*.
This helper abstracts the differences between the modern ``select`` API
(available in Python 3.10+) and the legacy ``dict``-based API, returning an
iterable of :class:`importlib.metadata.EntryPoint` objects.
:param group: The entry‑point group name (e.g. ``"radioviz.tools"``).
:return: An iterable of matching :class:`EntryPoint` objects; empty if none.
"""
eps = entry_points()
if hasattr(eps, 'select'):
return eps.select(group=group)
get = getattr(eps, 'get', None)
if get is None:
return []
return cast(Iterable[EntryPoint], get(group, []))
[docs]
def _load_tool_from_ep(ep: EntryPoint, tools: list[ToolProtocol], seen_ids: set[str]) -> None:
"""
Load a tool from an entry point and add it to *tools* if valid.
The function attempts to import the object referenced by *ep*. On success,
the object is passed to :func:`_instantiate_tool` for validation and
registration. Errors during loading are reported via :func:`_warn`.
:param ep: The entry point to load.
:param tools: The mutable list collecting instantiated tools.
:param seen_ids: Set of already‑registered tool identifiers to avoid duplicates.
"""
try:
obj = ep.load()
except Exception as exc:
_warn(f'Failed to load tool entry point "{ep.name}": {exc}')
return
_instantiate_tool(obj, tools, seen_ids, source=f'entry point "{ep.name}"')
[docs]
def _load_tool_from_path(path: str, tools: list[ToolProtocol], seen_ids: set[str]) -> None:
"""
Load a built‑in tool from a ``module:attribute`` path and add it to *tools*.
The function imports the specified module, retrieves the attribute, and
forwards the resulting object to :func:`_instantiate_tool`. If the object
is a subclass of :class:`Tool` that has already been seen, the function
returns early to avoid duplication.
:param path: String in the form ``"module.submodule:ClassName"``.
:param tools: The mutable list collecting instantiated tools.
:param seen_ids: Set of already‑registered tool identifiers.
"""
try:
module_name, attr = path.split(':', 1)
module = import_module(module_name)
obj = getattr(module, attr)
except Exception as exc:
_warn(f'Failed to load built-in tool "{path}": {exc}')
return
if isinstance(obj, type) and issubclass(obj, Tool):
tool_id = getattr(obj, 'tool_id', None)
if tool_id and tool_id in seen_ids:
return
_instantiate_tool(obj, tools, seen_ids, source=f'builtin "{path}"')
[docs]
def _instantiate_tool(
obj: object,
tools: list[ToolProtocol],
seen_ids: set[str],
source: str,
) -> None:
"""
Instantiate *obj* as a :class:`Tool` and register it.
*obj* may be a concrete subclass of :class:`Tool` or a callable factory that
returns a :class:`Tool` instance. The function ensures that the resulting
tool has a unique ``tool_id``; duplicates are ignored with a warning.
:param obj: The object to instantiate (class or factory).
:param tools: List to which a successfully instantiated tool is appended.
:param seen_ids: Set tracking already‑registered ``tool_id`` values.
:param source: Human‑readable description of the origin (used in warnings).
"""
tool: Optional[ToolProtocol] = None
if isinstance(obj, type) and issubclass(obj, Tool):
tool_id = getattr(obj, 'tool_id', None)
if tool_id and tool_id in seen_ids:
_warn(f'Duplicate tool_id "{tool_id}" from {source}; skipping.')
return
tool = obj()
elif callable(obj):
result = obj()
if isinstance(result, Tool):
tool = result
if tool is None:
_warn(f'Tool from {source} is not a Tool subclass or factory.')
return
if tool.tool_id in seen_ids:
_warn(f'Duplicate tool_id "{tool.tool_id}" from {source}; skipping.')
return
seen_ids.add(tool.tool_id)
tools.append(tool)
[docs]
def _warn(message: str) -> None:
"""Prints a message to the standard error.
This helper function is invoked during the tool loading phase and thus the normal error handling interface might
be not yet ready.
"""
print(f'[radioviz] {message}', file=sys.stderr)
def _report_progress(
progress_cb: Optional[Callable[[str, int, int], None]],
group: str,
idx: int,
total: int,
) -> None:
if progress_cb is None:
return
try:
progress_cb(group, idx, total)
except Exception:
pass