Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions application/backend/app/runtime/core/components/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,6 @@ def initialise(self) -> None:
@abstractmethod
def predict(self, inputs: list[InputData]) -> list[dict[str, np.ndarray]]:
pass

def cleanup(self) -> None:
"""Release model resources and free device memory."""
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create(cls, reference_batch: Batch | None, config: ModelConfig | None) -> Mo
encoder_model=config.encoder_model,
use_nms=config.use_nms,
)
return TorchModelHandler(model, reference_batch)
return TorchModelHandler(model, reference_batch, device=settings.device)
case PerDinoConfig() as config:
model = PerDino(
sam=config.sam_model,
Expand All @@ -48,7 +48,7 @@ def create(cls, reference_batch: Batch | None, config: ModelConfig | None) -> Mo
precision=config.precision,
device=settings.device,
)
return TorchModelHandler(model, reference_batch)
return TorchModelHandler(model, reference_batch, device=settings.device)
case SoftMatcherConfig() as config:
model = SoftMatcher(
sam=config.sam_model,
Expand All @@ -65,6 +65,6 @@ def create(cls, reference_batch: Batch | None, config: ModelConfig | None) -> Mo
precision=config.precision,
device=settings.device,
)
return TorchModelHandler(model, reference_batch)
return TorchModelHandler(model, reference_batch, device=settings.device)
case _:
return PassThroughModelHandler()
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import gc
import logging

import numpy as np
Expand All @@ -22,3 +23,17 @@ def initialise(self) -> None: ...

def predict(self, inputs: list[InputData]) -> list[dict[str, np.ndarray]]:
raise NotImplementedError("OpenVINO inference is not yet implemented")

def cleanup(self) -> None:
"""Release model and reference batch."""
logger.info("Cleaning up OpenVINOModelHandler")

if self._model is not None:
del self._model
self._model = None

if self._reference_batch is not None:
del self._reference_batch
self._reference_batch = None

gc.collect()
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import gc
import logging

import numpy as np
Expand All @@ -16,10 +17,36 @@
logger = logging.getLogger(__name__)


def release_device_memory(device: str) -> None:
"""Clear the device memory cache after model cleanup.

Calls the appropriate cache-clearing function depending on the device
type (CUDA, XPU, or CPU). The device string may include an index
(e.g. ``"cuda:0"``); only the base type is used to select the cache
to clear. This should be called after deleting model references and
running ``gc.collect()`` to ensure freed tensors are returned to the
device allocator.

Args:
device: The device string, e.g. ``"cpu"``, ``"cuda"``,
``"cuda:0"``, or ``"xpu"``.
"""
device_type = device.split(":")[0]
if device_type == "cuda" and torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info("CUDA memory cache cleared")
elif device_type == "xpu" and hasattr(torch, "xpu") and torch.xpu.is_available():
torch.xpu.empty_cache()
logger.info("XPU memory cache cleared")
else:
logger.debug("No device cache to clear for device=%s", device_type)


class TorchModelHandler(ModelHandler):
def __init__(self, model: Model, reference_batch: Batch) -> None:
def __init__(self, model: Model, reference_batch: Batch, device: str = "cpu") -> None:
self._model = model
self._reference_batch = reference_batch
self._device = device

def initialise(self) -> None:
logger.info(
Expand Down Expand Up @@ -48,3 +75,15 @@ def predict(self, inputs: list[InputData]) -> list[dict[str, np.ndarray]]:
def _build_batch(inputs: list[InputData]) -> Batch:
samples = [Sample(image=tv_tensors.Image(torch.from_numpy(data.frame).permute(2, 0, 1))) for data in inputs]
return Batch.collate(samples)

def cleanup(self) -> None:
"""Release model and reference batch, then free device memory."""
model_name = type(self._model).__name__ if self._model else "unknown"
logger.info("Cleaning up TorchModelHandler (model=%s, device=%s)", model_name, self._device)

self._model = None
self._reference_batch = None

gc.collect()
release_device_memory(self._device)
logger.debug("TorchModelHandler cleanup complete")
20 changes: 20 additions & 0 deletions application/backend/app/runtime/core/components/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def stop(self) -> None:
if thread and thread.is_alive():
thread.join(timeout=5)

self._components.clear()
self._threads.clear()
self._is_running = False
logger.debug(f"Pipeline stopped for project_id={self._project_id}")

Expand All @@ -129,6 +131,24 @@ def set_processor(self, processor: Processor, start: bool = False) -> Self:
self._register_component(processor, start)
return self

def stop_component(self, component_cls: type[PipelineComponent]) -> None:
"""Stop and remove a single component, releasing its resources.

Args:
component_cls: The component class to stop (e.g. ``Processor``).
"""
with self._lock:
component = self._components.pop(component_cls, None)
if component is None:
return
component.stop()
thread = self._threads.pop(component_cls, None)
if thread and thread.is_alive():
thread.join(timeout=5)
if thread.is_alive():
logger.warning("%s thread did not stop cleanly", component_cls.__name__)
logger.debug("Stopped and removed %s", component_cls.__name__)

def _register_component(self, new_component: PipelineComponent, start: bool = True) -> None:
"""
A method to replace a component with a new one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(
self._model_handler = model_handler
self._batch_size = batch_size
self._category_id_to_label_id = category_id_to_label_id or {}
self._inbound_broadcaster: FrameBroadcaster[InputData] | None = None
self._in_queue: Queue[InputData] | None = None

def setup(
self,
Expand Down Expand Up @@ -85,4 +87,6 @@ def run(self) -> None:
logger.debug("Stopping the pipeline runner loop")

def _stop(self) -> None:
self._inbound_broadcaster.unregister(self.__class__.__name__)
if self._inbound_broadcaster is not None:
self._inbound_broadcaster.unregister(self.__class__.__name__)
self._model_handler.cleanup()
42 changes: 29 additions & 13 deletions application/backend/app/runtime/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from runtime.core.components.broadcaster import FrameBroadcaster, FrameSlot
from runtime.core.components.errors import UnsupportedOperationError
from runtime.core.components.pipeline import Pipeline
from runtime.core.components.processor import Processor
from runtime.errors import PipelineNotActiveError, PipelineProjectMismatchError, SourceNotSeekableError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,6 +140,7 @@ def on_config_change(self, event: ConfigChangeEvent) -> None:
case ProjectActivationEvent() as e:
if self._pipeline:
self._pipeline.stop()
self._pipeline = None
self._pipeline = self._create_pipeline(e.project_id)
self._refresh_visualization_info(e.project_id)
self._pipeline.start()
Expand All @@ -147,6 +149,7 @@ def on_config_change(self, event: ConfigChangeEvent) -> None:
case ProjectDeactivationEvent() as e:
if self._pipeline and self._pipeline.project_id == e.project_id:
self._pipeline.stop()
self._pipeline = None
self._current_config = None
with self._visualization_lock:
self._visualization_info = None
Expand All @@ -160,11 +163,10 @@ def on_config_change(self, event: ConfigChangeEvent) -> None:
logger.info("Pipeline components updated for project %s", e.project_id)

def _create_pipeline(self, project_id: UUID) -> Pipeline:
"""
Create a new Pipeline instance with components built from the given configuration.
"""Create a new Pipeline with all components for the given project.

Args:
config: The pipeline configuration.
project_id: The project ID to build the pipeline for.

Returns:
A fully initialized Pipeline instance (not yet started).
Expand All @@ -187,8 +189,7 @@ def _create_pipeline(self, project_id: UUID) -> Pipeline:
)

def _update_pipeline_components(self, project_id: UUID, component_type: ComponentType) -> None:
"""
Compare current and new configurations, updating only changed components.
"""Recreate and install the component for the given type.

Args:
project_id: The project ID for the pipeline.
Expand All @@ -202,14 +203,29 @@ def _update_pipeline_components(self, project_id: UUID, component_type: Componen
source = self._component_factory.create_source(project_id)
self._pipeline.set_source(source, True)
case ComponentType.PROCESSOR:
reference_batch, category_id_to_label_id = self.get_reference_batch(project_id, PromptType.VISUAL) or (
None,
{},
)
processor = self._component_factory.create_processor(
project_id, reference_batch, category_id_to_label_id
)
self._pipeline.set_processor(processor, True)
# Unload-before-load: stop the current processor and free its
# model memory before loading the replacement. This avoids
# having two models in device memory simultaneously.
self._pipeline.stop_component(Processor)

try:
reference_batch, category_id_to_label_id = self.get_reference_batch(
project_id, PromptType.VISUAL
) or (
None,
{},
)
processor = self._component_factory.create_processor(
project_id, reference_batch, category_id_to_label_id
)
self._pipeline.set_processor(processor, True)
except Exception:
logger.exception(
"Failed to create replacement processor for project %s; stopping pipeline",
project_id,
)
self._pipeline.stop()
self._pipeline = None
case ComponentType.SINK:
sink = self._component_factory.create_sink(project_id)
self._pipeline.set_sink(sink, True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def test_factory_creates_matcher_model_with_config(self, mock_reference_batch, m
encoder_model="dinov3_small",
use_nms=True,
)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch, device="cpu")

def test_factory_creates_perdino_model_with_config(self, mock_reference_batch, mock_settings):
config = PerDinoConfig(
Expand Down Expand Up @@ -89,7 +89,7 @@ def test_factory_creates_perdino_model_with_config(self, mock_reference_batch, m
precision="bf16",
device="cpu",
)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch, device="cpu")

def test_factory_creates_softmatcher_model_with_config(self, mock_reference_batch, mock_settings):
config = SoftMatcherConfig(
Expand Down Expand Up @@ -130,7 +130,7 @@ def test_factory_creates_softmatcher_model_with_config(self, mock_reference_batc
precision="bf16",
device="cpu",
)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch, device="cpu")

def test_factory_returns_passthrough_for_none_reference_batch(self):
config = MatcherConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import MagicMock, patch

from runtime.core.components.models.openvino_model import OpenVINOModelHandler


class TestOpenVINOModelHandler:
def test_cleanup_frees_references_and_collects_garbage(self):
handler = OpenVINOModelHandler(MagicMock(), MagicMock())

with patch("runtime.core.components.models.openvino_model.gc") as mock_gc:
handler.cleanup()

mock_gc.collect.assert_called_once()
assert handler._model is None
assert handler._reference_batch is None

def test_cleanup_is_safe_to_call_twice(self):
handler = OpenVINOModelHandler(MagicMock(), MagicMock())

handler.cleanup()
handler.cleanup()

assert handler._model is None
assert handler._reference_batch is None
Loading