forked from open-edge-platform/geti-instant-learn
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline.py
More file actions
218 lines (183 loc) · 8.65 KB
/
pipeline.py
File metadata and controls
218 lines (183 loc) · 8.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import logging
import uuid
from threading import Lock, Thread
from typing import Self
from uuid import UUID
from domain.repositories.frame import FrameRepository
from domain.services.schemas.processor import InputData, OutputData
from domain.services.schemas.reader import FrameListResponse
from runtime.core.components.base import PipelineComponent
from runtime.core.components.broadcaster import FrameBroadcaster, FrameSlot
from runtime.core.components.processor import Processor
from runtime.core.components.sink import Sink
from runtime.core.components.source import Source
logger = logging.getLogger(__name__)
class Pipeline:
"""
Orchestrates a multithreaded streaming pipeline and manages component lifecycle.
This class manages the lifecycle of three core pipeline components (Source, Processor, Sink),
each running in a separate thread and communicating through broadcasters:
Source -> InboundBroadcaster -> Processor -> OutboundBroadcaster -> Sink
The Pipeline is responsible for:
- Starting/stopping all components
- Gracefully replacing individual components at runtime
- Managing thread lifecycle and broadcaster communication
The caller (typically PipelineManager) is responsible for:
- Configuration management and comparison
- Creating component instances
- Deciding when to update components
Args:
project_id (UUID): The project ID associated with this pipeline.
source (Source): The source component for reading input frames.
processor (Processor): The processor component for inference.
sink (Sink): The sink component for writing output.
inbound_broadcaster (FrameBroadcaster[InputData], optional): Broadcaster for raw frames.
Defaults to a new instance.
outbound_broadcaster (FrameBroadcaster[OutputData], optional): Broadcaster for processed frames.
Defaults to a new instance.
"""
def __init__(
self,
project_id: UUID,
frame_repository: FrameRepository,
inbound_broadcaster: FrameBroadcaster[InputData] = FrameBroadcaster[InputData]("inbound"),
outbound_broadcaster: FrameBroadcaster[OutputData] = FrameBroadcaster[OutputData]("outbound"),
):
# todo: remove project id from the pipeline as it is the application impl details
self._project_id = project_id
self._frame_repository = frame_repository
self._inbound_broadcaster = inbound_broadcaster
self._outbound_broadcaster = outbound_broadcaster
self._threads: dict[type[PipelineComponent], Thread] = {}
self._components: dict[type[PipelineComponent], PipelineComponent] = {}
self._lock = Lock()
self._is_running = False
logger.debug(f"Pipeline created for project_id={project_id}")
@property
def project_id(self) -> UUID:
"""Get the project ID associated with this pipeline."""
return self._project_id
@property
def is_running(self) -> bool:
"""Check if the pipeline is currently running."""
return self._is_running
@property
def outbound_slot(self) -> FrameSlot[OutputData]:
"""Shared slot holding the latest processed frame for external consumers."""
return self._outbound_broadcaster.slot
def start(self) -> None:
with self._lock:
if self._is_running:
logger.warning(f"Pipeline already running for project_id={self._project_id}")
return
logger.debug(f"Starting pipeline for project_id={self._project_id}")
for component_cls, component in self._components.items():
thread = Thread(target=component, daemon=False)
thread.start()
self._threads[component_cls] = thread
self._is_running = True
logger.debug(f"Pipeline started for project_id={self._project_id}")
def stop(self) -> None:
with self._lock:
if not self._is_running:
logger.warning(f"Pipeline already stopped for project_id={self._project_id}")
return
logger.debug(f"Stopping pipeline for project_id={self._project_id}")
for component_cls in [Source, Processor, Sink]:
component = self._components.get(component_cls)
if component:
component.stop()
thread = self._threads.get(component_cls)
if thread and thread.is_alive():
thread.join(timeout=5)
self._components.clear()
self._threads.clear()
self._is_running = False
logger.debug(f"Pipeline stopped for project_id={self._project_id}")
def set_source(self, source: Source, start: bool = False) -> Self:
source.setup(self._inbound_broadcaster)
self._register_component(source, start)
return self
def set_sink(self, sink: Sink, start: bool = False) -> Self:
sink.setup(self._outbound_broadcaster)
self._register_component(sink, start)
return self
def set_processor(self, processor: Processor, start: bool = False) -> Self:
processor.setup(self._inbound_broadcaster, self._outbound_broadcaster)
self._register_component(processor, start)
return self
def stop_component(self, component_cls: type[PipelineComponent]) -> None:
"""Stop and remove a single component, releasing its resources.
Args:
component_cls: The component class to stop (e.g. ``Processor``).
"""
with self._lock:
component = self._components.pop(component_cls, None)
if component is None:
return
component.stop()
thread = self._threads.pop(component_cls, None)
if thread and thread.is_alive():
thread.join(timeout=5)
if thread.is_alive():
logger.warning("%s thread did not stop cleanly", component_cls.__name__)
logger.debug("Stopped and removed %s", component_cls.__name__)
def _register_component(self, new_component: PipelineComponent, start: bool = True) -> None:
"""
A method to replace a component with a new one.
Handles the stop/replace/start lifecycle for a single component.
Args:
new_component: The new component instance.
"""
component_cls = new_component.__class__
with self._lock:
# Stop the current component if one exists
current_component = self._components.get(component_cls)
if current_component:
current_component.stop()
thread = self._threads.get(component_cls)
if thread and thread.is_alive():
thread.join(timeout=5)
if thread.is_alive():
logger.warning(f"{component_cls.__name__} thread did not stop cleanly")
self._inbound_broadcaster.clear()
self._outbound_broadcaster.clear()
self._components[component_cls] = new_component
if start:
thread = Thread(target=new_component, daemon=False)
thread.start()
self._threads[component_cls] = thread
logger.debug(f"Started new {component_cls.__name__}")
def seek(self, index: int) -> None:
"""Seek to a specific frame in the source."""
with self._lock:
source: Source = self._components.get(Source)
if source:
source.seek(index)
def get_frame_index(self) -> int:
"""Get current frame position from the source."""
with self._lock:
source: Source = self._components.get(Source)
if source:
return source.index()
return 0
def capture_frame(self) -> UUID:
"""
Capture the latest frame from the inbound stream.
"""
input_data = self._inbound_broadcaster.latest_frame
if input_data is None:
raise RuntimeError("No frame available from source")
frame_id = uuid.uuid4()
self._frame_repository.save_frame(self._project_id, frame_id, input_data.frame)
logger.info(f"Captured frame {frame_id} for project {self._project_id}")
return frame_id
def list_frames(self, offset: int = 0, limit: int = 30) -> FrameListResponse:
"""Get paginated list of frames from the source."""
with self._lock:
source: Source = self._components.get(Source)
if source:
return source.list_frames(offset, limit)
raise ValueError("No source component available")