Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions application/backend/app/runtime/core/components/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,6 @@ def initialise(self) -> None:
@abstractmethod
def predict(self, inputs: list[InputData]) -> list[dict[str, np.ndarray]]:
pass

def cleanup(self) -> None:
"""Release model resources and free device memory."""
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def create(cls, reference_batch: Batch | None, config: ModelConfig | None) -> Mo
encoder_model=config.encoder_model,
use_nms=config.use_nms,
)
return TorchModelHandler(model, reference_batch)
return TorchModelHandler(model, reference_batch, device=settings.device)
case PerDinoConfig() as config:
model = PerDino(
sam=config.sam_model,
Expand All @@ -48,7 +48,7 @@ def create(cls, reference_batch: Batch | None, config: ModelConfig | None) -> Mo
precision=config.precision,
device=settings.device,
)
return TorchModelHandler(model, reference_batch)
return TorchModelHandler(model, reference_batch, device=settings.device)
case SoftMatcherConfig() as config:
model = SoftMatcher(
sam=config.sam_model,
Expand All @@ -65,6 +65,6 @@ def create(cls, reference_batch: Batch | None, config: ModelConfig | None) -> Mo
precision=config.precision,
device=settings.device,
)
return TorchModelHandler(model, reference_batch)
return TorchModelHandler(model, reference_batch, device=settings.device)
case _:
return PassThroughModelHandler()
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import gc
import logging

import numpy as np
Expand All @@ -22,3 +23,17 @@ def initialise(self) -> None: ...

def predict(self, inputs: list[InputData]) -> list[dict[str, np.ndarray]]:
raise NotImplementedError("OpenVINO inference is not yet implemented")

def cleanup(self) -> None:
"""Release model and reference batch."""
logger.info("Cleaning up OpenVINOModelHandler")

if self._model is not None:
del self._model
self._model = None

if self._reference_batch is not None:
del self._reference_batch
self._reference_batch = None

gc.collect()
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

import gc
import logging

import numpy as np
Expand All @@ -16,10 +17,33 @@
logger = logging.getLogger(__name__)


def release_device_memory(device: str) -> None:
"""Clear the device memory cache after model cleanup.

Calls the appropriate cache-clearing function depending on the device
type (CUDA, XPU, or CPU). This should be called after deleting model
references and running ``gc.collect()`` to ensure freed tensors are
returned to the device allocator.

Args:
device: The device string ("cpu", "cuda", "xpu").
"""
device_type = device.split(":")[0]
if device_type == "cuda" and torch.cuda.is_available():
torch.cuda.empty_cache()
logger.info("CUDA memory cache cleared")
elif device_type == "xpu" and hasattr(torch, "xpu") and torch.xpu.is_available():
torch.xpu.empty_cache()
logger.info("XPU memory cache cleared")
else:
logger.debug("No device cache to clear for device=%s", device_type)


class TorchModelHandler(ModelHandler):
def __init__(self, model: Model, reference_batch: Batch) -> None:
def __init__(self, model: Model, reference_batch: Batch, device: str = "cpu") -> None:
self._model = model
self._reference_batch = reference_batch
self._device = device

def initialise(self) -> None:
logger.info(
Expand Down Expand Up @@ -48,3 +72,15 @@ def predict(self, inputs: list[InputData]) -> list[dict[str, np.ndarray]]:
def _build_batch(inputs: list[InputData]) -> Batch:
samples = [Sample(image=tv_tensors.Image(torch.from_numpy(data.frame).permute(2, 0, 1))) for data in inputs]
return Batch.collate(samples)

def cleanup(self) -> None:
"""Release model and reference batch, then free device memory."""
model_name = type(self._model).__name__ if self._model else "unknown"
logger.info("Cleaning up TorchModelHandler (model=%s, device=%s)", model_name, self._device)

self._model = None
self._reference_batch = None

gc.collect()
release_device_memory(self._device)
logger.debug("TorchModelHandler cleanup complete")
20 changes: 20 additions & 0 deletions application/backend/app/runtime/core/components/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ def stop(self) -> None:
if thread and thread.is_alive():
thread.join(timeout=5)

self._components.clear()
self._threads.clear()
self._is_running = False
logger.debug(f"Pipeline stopped for project_id={self._project_id}")

Expand All @@ -129,6 +131,24 @@ def set_processor(self, processor: Processor, start: bool = False) -> Self:
self._register_component(processor, start)
return self

def stop_component(self, component_cls: type[PipelineComponent]) -> None:
"""Stop and remove a single component, releasing its resources.

Args:
component_cls: The component class to stop (e.g. ``Processor``).
"""
with self._lock:
component = self._components.pop(component_cls, None)
if component is None:
return
component.stop()
thread = self._threads.pop(component_cls, None)
if thread and thread.is_alive():
thread.join(timeout=5)
if thread.is_alive():
logger.warning("%s thread did not stop cleanly", component_cls.__name__)
logger.debug("Stopped and removed %s", component_cls.__name__)

def _register_component(self, new_component: PipelineComponent, start: bool = True) -> None:
"""
A method to replace a component with a new one.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def __init__(
self._model_handler = model_handler
self._batch_size = batch_size
self._category_id_to_label_id = category_id_to_label_id or {}
self._inbound_broadcaster: FrameBroadcaster[InputData] | None = None
self._in_queue: Queue[InputData] | None = None

def setup(
self,
Expand Down Expand Up @@ -85,4 +87,6 @@ def run(self) -> None:
logger.debug("Stopping the pipeline runner loop")

def _stop(self) -> None:
self._inbound_broadcaster.unregister(self.__class__.__name__)
if self._inbound_broadcaster is not None:
self._inbound_broadcaster.unregister(self.__class__.__name__)
self._model_handler.cleanup()
16 changes: 11 additions & 5 deletions application/backend/app/runtime/pipeline_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from runtime.core.components.broadcaster import FrameBroadcaster, FrameSlot
from runtime.core.components.errors import UnsupportedOperationError
from runtime.core.components.pipeline import Pipeline
from runtime.core.components.processor import Processor
from runtime.errors import PipelineNotActiveError, PipelineProjectMismatchError, SourceNotSeekableError

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -139,6 +140,7 @@ def on_config_change(self, event: ConfigChangeEvent) -> None:
case ProjectActivationEvent() as e:
if self._pipeline:
self._pipeline.stop()
self._pipeline = None
self._pipeline = self._create_pipeline(e.project_id)
self._refresh_visualization_info(e.project_id)
self._pipeline.start()
Expand All @@ -147,6 +149,7 @@ def on_config_change(self, event: ConfigChangeEvent) -> None:
case ProjectDeactivationEvent() as e:
if self._pipeline and self._pipeline.project_id == e.project_id:
self._pipeline.stop()
self._pipeline = None
self._current_config = None
with self._visualization_lock:
self._visualization_info = None
Expand All @@ -160,11 +163,10 @@ def on_config_change(self, event: ConfigChangeEvent) -> None:
logger.info("Pipeline components updated for project %s", e.project_id)

def _create_pipeline(self, project_id: UUID) -> Pipeline:
"""
Create a new Pipeline instance with components built from the given configuration.
"""Create a new Pipeline with all components for the given project.

Args:
config: The pipeline configuration.
project_id: The project ID to build the pipeline for.

Returns:
A fully initialized Pipeline instance (not yet started).
Expand All @@ -187,8 +189,7 @@ def _create_pipeline(self, project_id: UUID) -> Pipeline:
)

def _update_pipeline_components(self, project_id: UUID, component_type: ComponentType) -> None:
"""
Compare current and new configurations, updating only changed components.
"""Recreate and install the component for the given type.

Args:
project_id: The project ID for the pipeline.
Expand All @@ -202,6 +203,11 @@ def _update_pipeline_components(self, project_id: UUID, component_type: Componen
source = self._component_factory.create_source(project_id)
self._pipeline.set_source(source, True)
case ComponentType.PROCESSOR:
# Unload-before-load: stop the current processor and free its
# model memory before loading the replacement. This avoids
# having two models in device memory simultaneously.
self._pipeline.stop_component(Processor)

reference_batch, category_id_to_label_id = self.get_reference_batch(project_id, PromptType.VISUAL) or (
None,
{},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def test_factory_creates_matcher_model_with_config(self, mock_reference_batch, m
encoder_model="dinov3_small",
use_nms=True,
)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch, device="cpu")

def test_factory_creates_perdino_model_with_config(self, mock_reference_batch, mock_settings):
config = PerDinoConfig(
Expand Down Expand Up @@ -89,7 +89,7 @@ def test_factory_creates_perdino_model_with_config(self, mock_reference_batch, m
precision="bf16",
device="cpu",
)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch, device="cpu")

def test_factory_creates_softmatcher_model_with_config(self, mock_reference_batch, mock_settings):
config = SoftMatcherConfig(
Expand Down Expand Up @@ -130,7 +130,7 @@ def test_factory_creates_softmatcher_model_with_config(self, mock_reference_batc
precision="bf16",
device="cpu",
)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch)
mock_handler.assert_called_once_with(mock_model_instance, mock_reference_batch, device="cpu")

def test_factory_returns_passthrough_for_none_reference_batch(self):
config = MatcherConfig(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import MagicMock, patch

from runtime.core.components.models.openvino_model import OpenVINOModelHandler


class TestOpenVINOModelHandler:
def test_cleanup_frees_references_and_collects_garbage(self):
handler = OpenVINOModelHandler(MagicMock(), MagicMock())

with patch("runtime.core.components.models.openvino_model.gc") as mock_gc:
handler.cleanup()

mock_gc.collect.assert_called_once()
assert handler._model is None
assert handler._reference_batch is None

def test_cleanup_is_safe_to_call_twice(self):
handler = OpenVINOModelHandler(MagicMock(), MagicMock())

handler.cleanup()
handler.cleanup()

assert handler._model is None
assert handler._reference_batch is None
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# Copyright (C) 2026 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import numpy as np
import pytest
import torch

from domain.services.schemas.processor import InputData
from runtime.core.components.models.torch_model import TorchModelHandler
from runtime.core.components.models.torch_model import TorchModelHandler, release_device_memory


class TestTorchModelHandler:
Expand Down Expand Up @@ -57,3 +57,72 @@ def test_predict_handles_standard_tensors(self, mock_model, mock_reference_batch

assert len(results) == 1
assert results[0]["scores"].dtype == np.float32

def test_cleanup_frees_references_and_collects_garbage(self, mock_model, mock_reference_batch):
handler = TorchModelHandler(mock_model, mock_reference_batch, device="cpu")

with patch("runtime.core.components.models.torch_model.gc") as mock_gc:
handler.cleanup()

mock_gc.collect.assert_called_once()
assert handler._model is None
assert handler._reference_batch is None

def test_cleanup_calls_cuda_empty_cache_for_cuda_device(self, mock_model, mock_reference_batch):
handler = TorchModelHandler(mock_model, mock_reference_batch, device="cuda")

with (
patch("runtime.core.components.models.torch_model.gc"),
patch("runtime.core.components.models.torch_model.torch") as mock_torch,
):
mock_torch.cuda.is_available.return_value = True
handler.cleanup()

mock_torch.cuda.empty_cache.assert_called_once()

def test_cleanup_calls_xpu_empty_cache_for_xpu_device(self, mock_model, mock_reference_batch):
handler = TorchModelHandler(mock_model, mock_reference_batch, device="xpu")

with (
patch("runtime.core.components.models.torch_model.gc"),
patch("runtime.core.components.models.torch_model.torch") as mock_torch,
):
mock_torch.xpu.is_available.return_value = True
handler.cleanup()

mock_torch.xpu.empty_cache.assert_called_once()

def test_cleanup_is_safe_to_call_twice(self, mock_model, mock_reference_batch):
handler = TorchModelHandler(mock_model, mock_reference_batch, device="cpu")

handler.cleanup()
# Second call should not raise even though _model is already None
handler.cleanup()

assert handler._model is None
assert handler._reference_batch is None


class TestReleaseDeviceMemory:
def test_release_cuda_memory(self):
with patch("runtime.core.components.models.torch_model.torch") as mock_torch:
mock_torch.cuda.is_available.return_value = True
release_device_memory("cuda")
mock_torch.cuda.empty_cache.assert_called_once()

def test_release_cuda_with_device_index(self):
with patch("runtime.core.components.models.torch_model.torch") as mock_torch:
mock_torch.cuda.is_available.return_value = True
release_device_memory("cuda:0")
mock_torch.cuda.empty_cache.assert_called_once()

def test_release_xpu_memory(self):
with patch("runtime.core.components.models.torch_model.torch") as mock_torch:
mock_torch.xpu.is_available.return_value = True
release_device_memory("xpu")
mock_torch.xpu.empty_cache.assert_called_once()

def test_release_cpu_is_noop(self):
with patch("runtime.core.components.models.torch_model.torch") as mock_torch:
release_device_memory("cpu")
mock_torch.cuda.empty_cache.assert_not_called()
Loading
Loading