Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions src/anomalib/deploy/inferencers/openvino_inferencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

from anomalib.data import NumpyImageBatch
from anomalib.data.utils import read_image
from anomalib.deploy.metadata import load_metadata

if TYPE_CHECKING:
from openvino.utils.data_helpers.wrappers import OVDict
Expand All @@ -79,6 +80,11 @@ class OpenVINOInferencer:
Defaults to ``"AUTO"``.
config (dict | None, optional): OpenVINO configuration parameters.
Defaults to ``None``.
image_sensitivity (float | None, optional): Default image-level
sensitivity override. If ``None``, reads from metadata.json or
falls back to 0.5. Defaults to ``None``.
pixel_sensitivity (float | None, optional): Default pixel-level
sensitivity override. Defaults to ``None``.

Example:
>>> from anomalib.deploy import OpenVINOInferencer
Expand All @@ -94,6 +100,8 @@ def __init__(
path: str | Path | tuple[bytes, bytes],
device: str | None = "AUTO",
config: dict | None = None,
image_sensitivity: float | None = None,
pixel_sensitivity: float | None = None,
) -> None:
if not module_available("openvino"):
msg = "OpenVINO is not installed. Please install OpenVINO to use OpenVINOInferencer."
Expand All @@ -103,6 +111,22 @@ def __init__(
self.config = config
self.input_blob, self.output_blob, self.model = self.load_model(path)

self.metadata = self._load_metadata(path)
self._default_image_sensitivity = (
image_sensitivity
if image_sensitivity is not None
else self.metadata.get("postprocess", {}).get("image_sensitivity", 0.5)
if self.metadata
else 0.5
)
self._default_pixel_sensitivity = (
pixel_sensitivity
if pixel_sensitivity is not None
else self.metadata.get("postprocess", {}).get("pixel_sensitivity", 0.5)
if self.metadata
else 0.5
)

def load_model(self, path: str | Path | tuple[bytes, bytes]) -> tuple[Any, Any, Any]:
"""Load OpenVINO model from file or bytes.

Expand Down Expand Up @@ -193,19 +217,28 @@ def post_process(predictions: "OVDict") -> dict:
values = predictions.to_tuple()
return dict(zip(names, values, strict=False))

def predict(self, image: str | Path | np.ndarray | PILImage | torch.Tensor) -> NumpyImageBatch:
def predict(
self,
image: str | Path | np.ndarray | PILImage | torch.Tensor,
*,
image_sensitivity: float | None = None,
pixel_sensitivity: float | None = None,
) -> NumpyImageBatch:
"""Run inference on an input image.

Args:
image (str | Path | np.ndarray | PILImage | torch.Tensor): Input image as file path or array.
image_sensitivity (float | None): Per-call image sensitivity override.
Defaults to None (uses constructor/metadata default).
pixel_sensitivity (float | None): Per-call pixel sensitivity override.
Defaults to None (uses constructor/metadata default).

Returns:
NumpyImageBatch: Batch containing the predictions.

Raises:
TypeError: If image input is invalid type.
"""
# Convert file path or string to image if necessary
if isinstance(image, str | Path):
image = read_image(image, as_tensor=False)
elif isinstance(image, PILImage):
Expand All @@ -214,7 +247,36 @@ def predict(self, image: str | Path | np.ndarray | PILImage | torch.Tensor) -> N
image = image.cpu().numpy()

image = self.pre_process(image)
predictions = self.model({self.input_blob.any_name: image})

img_sens = image_sensitivity if image_sensitivity is not None else self._default_image_sensitivity
pix_sens = pixel_sensitivity if pixel_sensitivity is not None else self._default_pixel_sensitivity

input_names = {inp.any_name for inp in self.model.inputs}
feed_dict: dict[str, np.ndarray] = {self.input_blob.any_name: image}
if "image_sensitivity" in input_names:
feed_dict["image_sensitivity"] = np.array(img_sens, dtype=np.float32)
if "pixel_sensitivity" in input_names:
feed_dict["pixel_sensitivity"] = np.array(pix_sens, dtype=np.float32)
predictions = self.model(feed_dict)

pred_dict = self.post_process(predictions)

return NumpyImageBatch(image=image, **pred_dict)

@staticmethod
def _load_metadata(path: str | Path | tuple[bytes, bytes]) -> dict | None:
"""Load metadata.json sidecar if it exists next to the model.

Args:
path (str | Path | tuple[bytes, bytes]): Model path.

Returns:
dict | None: Parsed metadata or None if not found.
"""
if isinstance(path, tuple):
return None
path = Path(path) if not isinstance(path, Path) else path
metadata_path = path.parent / "metadata.json"
if metadata_path.exists():
return load_metadata(metadata_path)
return None
70 changes: 67 additions & 3 deletions src/anomalib/deploy/inferencers/torch_inferencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

from anomalib.data import ImageBatch
from anomalib.data.utils import read_image
from anomalib.deploy.metadata import load_metadata

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -96,6 +97,11 @@ class TorchInferencer:
device (str, optional): Device to use for inference.
Options are ``"auto"``, ``"cpu"``, ``"cuda"``, ``"xpu"``.
Defaults to ``"auto"``.
image_sensitivity (float | None, optional): Override image-level sensitivity.
Takes precedence over ``metadata.json`` value. Falls back to 0.5 if
neither this nor metadata provides a value. Defaults to ``None``.
pixel_sensitivity (float | None, optional): Override pixel-level sensitivity.
Same precedence as ``image_sensitivity``. Defaults to ``None``.

Example:
>>> from anomalib.deploy import TorchInferencer
Expand All @@ -113,16 +119,33 @@ def __init__(
self,
path: str | Path,
device: str = "auto",
image_sensitivity: float | None = None,
pixel_sensitivity: float | None = None,
) -> None:
logger.warning(
"TorchInferencer is a legacy inferencer. Consider using Engine.predict() instead, "
"which provides a more modern and feature-rich interface for model inference.",
)
self.device = self._get_device(device)

# Load the model weights and metadata
self.model = self.load_model(path)

self.metadata = self._load_metadata(path)
self._default_image_sensitivity = (
image_sensitivity
if image_sensitivity is not None
else self.metadata.get("postprocess", {}).get("image_sensitivity", 0.5)
if self.metadata
else 0.5
)
self._default_pixel_sensitivity = (
pixel_sensitivity
if pixel_sensitivity is not None
else self.metadata.get("postprocess", {}).get("pixel_sensitivity", 0.5)
if self.metadata
else 0.5
)
Comment on lines +133 to +147

@staticmethod
def _get_device(device: str) -> torch.device:
"""Get the device to use for inference.
Expand Down Expand Up @@ -246,12 +269,22 @@ def load_model(self, path: str | Path) -> nn.Module:
model.eval()
return model.to(self.device)

def predict(self, image: str | Path | np.ndarray | PILImage | torch.Tensor) -> ImageBatch:
def predict(
self,
image: str | Path | np.ndarray | PILImage | torch.Tensor,
*,
image_sensitivity: float | None = None,
pixel_sensitivity: float | None = None,
) -> ImageBatch:
"""Predict anomalies for an input image.

Args:
image (str | Path | np.ndarray | PILImage | torch.Tensor): Input image to predict.
Can be a file path or PyTorch tensor.
image_sensitivity (float | None): Per-call image sensitivity override.
Defaults to None (uses constructor/metadata default).
pixel_sensitivity (float | None): Per-call pixel sensitivity override.
Defaults to None (uses constructor/metadata default).

Returns:
ImageBatch: Prediction results containing anomaly maps and scores.
Expand All @@ -268,7 +301,22 @@ def predict(self, image: str | Path | np.ndarray | PILImage | torch.Tensor) -> I
image = to_dtype(to_image(image), torch.float32, scale=True)

image = self.pre_process(image)
predictions = self.model(image)

img_sens = image_sensitivity if image_sensitivity is not None else self._default_image_sensitivity
pix_sens = pixel_sensitivity if pixel_sensitivity is not None else self._default_pixel_sensitivity

try:
predictions = self.model(
image,
torch.tensor(img_sens, device=self.device),
torch.tensor(pix_sens, device=self.device),
)
except TypeError:
logger.warning(
"Image only API is deprecated and will be removed in Anomalib 2.7.0."
" Models exported from current version of Anomalib already support the new API.",
)
predictions = self.model(image)
Comment on lines +308 to +319

return ImageBatch(image=image, **predictions._asdict())

Expand All @@ -292,3 +340,19 @@ def pre_process(self, image: torch.Tensor) -> torch.Tensor:
image = image.unsqueeze(0) # model expects [B, C, H, W]

return image.to(self.device)

@staticmethod
def _load_metadata(path: str | Path) -> dict | None:
"""Load metadata.json sidecar if it exists next to the model.

Args:
path (str | Path): Model file path.

Returns:
dict | None: Parsed metadata or None if not found.
"""
path = Path(path) if not isinstance(path, Path) else path
metadata_path = path.parent / "metadata.json"
if metadata_path.exists():
return load_metadata(metadata_path)
return None
13 changes: 13 additions & 0 deletions src/anomalib/deploy/metadata/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Metadata module for exported anomalib models.

Provides load/dump helpers and schema migration for ``metadata.json`` sidecar
files produced during model export.
"""

from anomalib.deploy.metadata._core import SchemaValidationError, dump_metadata, load_metadata
from anomalib.deploy.metadata.migration import CURRENT_SCHEMA_VERSION

__all__ = ["CURRENT_SCHEMA_VERSION", "SchemaValidationError", "dump_metadata", "load_metadata"]
66 changes: 66 additions & 0 deletions src/anomalib/deploy/metadata/_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Metadata loading, dumping, and validation for exported anomalib models."""

from __future__ import annotations

import json
from typing import TYPE_CHECKING

from anomalib.deploy.metadata.migration import upgrade_to_latest

if TYPE_CHECKING:
from pathlib import Path


class SchemaValidationError(ValueError):
"""Raised when metadata fails schema validation."""


def load_metadata(path: Path) -> dict:
"""Read metadata.json, auto-upgrade to current schema, and validate.

Args:
path (Path): Path to the metadata JSON file.

Returns:
dict: Validated and upgraded metadata dictionary.

Raises:
SchemaValidationError: If required fields are missing.
"""
raw = json.loads(path.read_text(encoding="utf-8"))
metadata = upgrade_to_latest(raw)
_validate(metadata)
return metadata


def dump_metadata(metadata: dict, path: Path) -> None:
"""Validate and write metadata.json.

Args:
metadata (dict): Metadata dictionary to write.
path (Path): Destination file path.

Raises:
SchemaValidationError: If required fields are missing.
"""
_validate(metadata)
path.write_text(json.dumps(metadata, indent=2), encoding="utf-8")


def _validate(metadata: dict) -> None:
"""Validate metadata against expected schema fields.

Args:
metadata (dict): Metadata dictionary to validate.

Raises:
SchemaValidationError: If required fields are missing.
"""
required = {"schema_version", "anomalib_version", "model", "postprocess"}
missing = required - metadata.keys()
if missing:
msg = f"Metadata missing required fields: {missing}"
raise SchemaValidationError(msg)
66 changes: 66 additions & 0 deletions src/anomalib/deploy/metadata/migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Schema migration for metadata.json files."""

from __future__ import annotations

import logging

logger = logging.getLogger(__name__)

CURRENT_SCHEMA_VERSION = "1.0"


def _parse_version(version_str: str) -> tuple[int, ...]:
"""Parse a dotted version string into a comparable tuple of ints."""
return tuple(int(x) for x in version_str.split("."))


def upgrade_to_latest(metadata: dict) -> dict:
"""Upgrade metadata dict to the current schema version.

Applies each migration step in sequence. Pure function β€” does not mutate input.

Args:
metadata (dict): Raw metadata dictionary, possibly from an older schema.

Returns:
dict: Metadata upgraded to ``CURRENT_SCHEMA_VERSION``.
"""
metadata = metadata.copy()
version = metadata.get("schema_version")

if version is None:
metadata = _legacy_to_v1(metadata)

if _parse_version(metadata["schema_version"]) > _parse_version(CURRENT_SCHEMA_VERSION):
logger.warning(
"Metadata schema %s is newer than supported (%s). "
"Unknown fields will be ignored. Consider upgrading anomalib.",
metadata["schema_version"],
CURRENT_SCHEMA_VERSION,
)

return metadata


def _legacy_to_v1(raw: dict) -> dict:
"""Convert legacy (pre-schema) metadata to schema 1.0.

Args:
raw (dict): Legacy metadata dictionary without ``schema_version``.

Returns:
dict: Schema 1.0 metadata with sensible defaults.
"""
return {
"schema_version": "1.0",
"anomalib_version": "unknown",
"model": raw.get("model", "unknown"),
"preprocess": [],
"postprocess": {
"image_sensitivity": 0.5,
"pixel_sensitivity": 0.5,
},
}
Loading
Loading