Skip to content
Merged
105 changes: 16 additions & 89 deletions src/model_api/models/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,11 @@
from typing import TYPE_CHECKING

import numpy as np
from openvino import Model, Type
from openvino import opset10 as opset
from openvino.preprocess import PrePostProcessor

from model_api.models.image_model import ImageModel
from model_api.models.parameters import ParameterRegistry
from model_api.models.result import ClassificationResult, Label
from model_api.models.utils import softmax
from model_api.models.utils import is_softmaxed, softmax, top_k

if TYPE_CHECKING:
from model_api.adapters.inference_adapter import InferenceAdapter
Expand Down Expand Up @@ -95,27 +92,8 @@ def _setup_multilabel(self) -> None:

def _setup_single_label(self) -> None:
"""Configure model for single-label classification with TopK."""
try:
addOrFindSoftmaxAndTopkOutputs(
self.inference_adapter,
self.params.topk,
self.params.output_raw_scores,
)
self.embedded_topk = True
self.out_layer_names = ["indices", "scores"]
if self.params.output_raw_scores:
self.out_layer_names.append(self.raw_scores_name)
except (RuntimeError, AttributeError):
# exception means we have a non-ov model
# with already inserted softmax and topk
if self.params.embedded_processing and len(self.outputs) >= 2:
self.embedded_topk = True
self.out_layer_names = ["indices", "scores"]
self.raw_scores_name = _raw_scores_name
else: # likely a non-ov model
self.embedded_topk = False
self.out_layer_names = _get_non_xai_names(self.outputs.keys())
self.raw_scores_name = self.out_layer_names[0]
self.out_layer_names = _get_non_xai_names(self.outputs.keys())
self.raw_scores_name = self.out_layer_names[0]

self.embedded_processing = True

Expand Down Expand Up @@ -228,10 +206,9 @@ def get_all_probs(self, logits: np.ndarray) -> np.ndarray:
if cls_heads_info["num_multilabel_classes"]:
logits_begin = cls_heads_info["num_single_label_classes"]
probs[logits_begin:] = sigmoid_numpy(logits[logits_begin:])
elif self.embedded_topk:
probs = logits.reshape(-1)
else:
probs = softmax(logits.reshape(-1))
logits_flattened = logits.reshape(-1)
probs = logits_flattened if is_softmaxed(logits_flattened, axis=0) else softmax(logits_flattened)
return probs

def get_hierarchical_predictions(self, logits: np.ndarray) -> list[Label]:
Expand Down Expand Up @@ -277,68 +254,18 @@ def get_multilabel_predictions(self, logits: np.ndarray) -> list[Label]:
return list(starmap(Label, zip(indices, labels, scores)))

def get_multiclass_predictions(self, outputs: dict) -> list[Label]:
axis = 1
logits = outputs[self.out_layer_names[0]]
if not is_softmaxed(logits, axis=axis):
logits = softmax(logits, axis=axis)
top_k_result = top_k(logits, self.params.topk, axis=axis)
scores = top_k_result.values[0] # noqa: PD011 # silencing false positive - it's not pandas code
indices = top_k_result.indices[0]

labels_list = self.params.labels
if self.embedded_topk:
indicesTensor = outputs[self.out_layer_names[0]][0]
scoresTensor = outputs[self.out_layer_names[1]][0]
labels = [labels_list[i] if labels_list else "" for i in indicesTensor]
else:
scoresTensor = softmax(outputs[self.out_layer_names[0]][0])
indicesTensor = [int(np.argmax(scoresTensor))]
labels = [labels_list[i] if labels_list else "" for i in indicesTensor]
return list(starmap(Label, zip(indicesTensor, labels, scoresTensor)))


def addOrFindSoftmaxAndTopkOutputs(inference_adapter: InferenceAdapter, topk: int, output_raw_scores: bool) -> None:
softmaxNode = None
for i in range(len(inference_adapter.model.outputs)):
output_node = inference_adapter.model.get_output_op(i).input(0).get_source_output().get_node()
if output_node.get_type_name() == "Softmax":
softmaxNode = output_node
elif output_node.get_type_name() == "TopK":
return

if softmaxNode is None:
logitsNode = inference_adapter.model.get_output_op(0).input(0).get_source_output().get_node()
softmaxNode = opset.softmax(logitsNode.output(0), 1)
k = opset.constant(topk, np.int32)
topkNode = opset.topk(softmaxNode, k, 1, "max", "value")

indices = topkNode.output(0)
scores = topkNode.output(1)
results_descr = [indices, scores]
if output_raw_scores:
raw_scores = softmaxNode.output(0)
results_descr.append(raw_scores)
for output in inference_adapter.model.outputs:
if _saliency_map_name in output.get_names() or _feature_vector_name in output.get_names():
results_descr.append(output)

source_rt_info = inference_adapter.get_model().get_rt_info()
inference_adapter.model = Model(
results_descr,
inference_adapter.model.get_parameters(),
"classification",
)

if "model_info" in source_rt_info:
source_rt_info = source_rt_info["model_info"]
for k in source_rt_info:
inference_adapter.model.set_rt_info(source_rt_info[k], ["model_info", k])

# manually set output tensors name for created topK node
inference_adapter.model.outputs[0].tensor.set_names({"scores"})
inference_adapter.model.outputs[1].tensor.set_names({"indices"})
if output_raw_scores:
inference_adapter.model.outputs[2].tensor.set_names({_raw_scores_name})

# set output precisions
ppp = PrePostProcessor(inference_adapter.model)
ppp.output("indices").tensor().set_element_type(Type.i32)
ppp.output("scores").tensor().set_element_type(Type.f32)
if output_raw_scores:
ppp.output(_raw_scores_name).tensor().set_element_type(Type.f32)
inference_adapter.model = ppp.build()
labels = [labels_list[i] if labels_list else "" for i in indices]

return list(starmap(Label, zip(indices, labels, scores)))


def sigmoid_numpy(x: np.ndarray) -> np.ndarray:
Expand Down
27 changes: 27 additions & 0 deletions src/model_api/models/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

from collections import namedtuple
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
Expand All @@ -14,6 +15,9 @@

from model_api.models.result import Contour, InstanceSegmentationResult, RotatedSegmentationResult

topk_namedtuple = namedtuple("topk_namedtuple", ["values", "indices"])


if TYPE_CHECKING:
from model_api.models.result.detection import DetectionResult

Expand Down Expand Up @@ -284,6 +288,29 @@ def multiclass_nms(
return det, keep


def is_softmaxed(array: np.ndarray, axis: int, atol: float = 1e-5) -> bool:
"""Check if the input array is softmaxed along the specified axis."""
# Check values are in [0, 1]
if not np.all((array >= 0) & (array <= 1)):
return False
# Check sum along axis is close to 1
sums = np.sum(array, axis=axis)
return np.allclose(sums, 1.0, atol=atol)


def softmax(logits: np.ndarray, eps: float = 1e-9, axis=None, keepdims: bool = False) -> np.ndarray:
exp = np.exp(logits - np.max(logits))
return exp / (np.sum(exp, axis=axis, keepdims=keepdims) + eps)


def top_k(array: np.ndarray, k: int, axis: int) -> topk_namedtuple:
"""Returns the top k values and their indices along the specified axis."""
# Get indices of the top k elements
indices = np.take(np.argpartition(array, -k, axis=axis), range(-k, 0), axis=axis)
# Gather the top k values
topk_values = np.take_along_axis(array, indices, axis=axis)
# Sort the top k values and indices in descending order
sorted_order = np.argsort(-topk_values, axis=axis)
topk_values = np.take_along_axis(topk_values, sorted_order, axis=axis)
indices = np.take_along_axis(indices, sorted_order, axis=axis)
return topk_namedtuple(values=topk_values, indices=indices)
2 changes: 1 addition & 1 deletion tests/accuracy/public_scope.json
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@
{
"id": 105,
"name": "194",
"confidence": 0.06216677650809288
"confidence": 0.4564049541950226
}
],
"raw_scores": [
Expand Down
2 changes: 1 addition & 1 deletion tests/accuracy/test_accuracy.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def create_models(model_type, model_path, download_dir, force_onnx_adapter=False
model = create_core().read_model(model_path)
if model.has_rt_info(["model_info", "model_type"]):
wrapper_type = model_type.get_model_class(
create_core().read_model(model_path).get_rt_info(["model_info", "model_type"]).astype(str),
model.get_rt_info(["model_info", "model_type"]).astype(str),
)
model = wrapper_type(OpenvinoAdapter(create_core(), model_path, device=device))
model.load()
Expand Down