Skip to content

Commit bec2ecd

Browse files
committed
feat(web_ui): add expose_camera to expose an MJPEG stream
1 parent b9ff178 commit bec2ecd

2 files changed

Lines changed: 95 additions & 0 deletions

File tree

src/arduino/app_bricks/web_ui/web_ui.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from fastapi.responses import FileResponse
1515
from fastapi_socketio import SocketManager
1616

17+
from arduino.app_peripherals.camera.base_camera import BaseCamera
1718
from arduino.app_utils import brick, Logger
1819

1920
logger = Logger("WebUI")
@@ -175,6 +176,44 @@ def expose_api(self, method: str, path: str, function: Callable):
175176
"""
176177
self.app.add_api_route(self._api_path_prefix + path, function, methods=[method])
177178

179+
def expose_camera(self, path: str, camera: BaseCamera, quality: int = 80):
180+
"""
181+
Expose a camera stream at the specified URL path in MJPEG format.
182+
183+
This can be consumed for example using `<img>` tags in a web application.
184+
185+
Args:
186+
path (str): URL path for the MJPEG stream endpoint.
187+
camera (BaseCamera): A camera instance, will be started if not already running.
188+
quality (int, optional): JPEG compression quality (0-100). Default: 80.
189+
"""
190+
from fastapi.responses import StreamingResponse
191+
from arduino.app_utils.image import compress_to_jpeg
192+
193+
if not camera.is_started:
194+
camera.start()
195+
196+
def generate_frames():
197+
try:
198+
while True:
199+
frame = camera.capture()
200+
if frame is None:
201+
continue
202+
jpeg = compress_to_jpeg(frame, quality=quality)
203+
if jpeg is None:
204+
continue
205+
yield b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + jpeg.tobytes() + b"\r\n"
206+
except Exception as e:
207+
logger.error(f"Terminating stream on camera error: {e}")
208+
209+
def stream_route():
210+
return StreamingResponse(
211+
generate_frames(),
212+
media_type="multipart/x-mixed-replace; boundary=frame",
213+
)
214+
215+
self.expose_api("GET", path, stream_route)
216+
178217
def on_connect(self, callback: Callable[[str], None]):
179218
"""Register a callback for WebSocket connection events.
180219

tests/arduino/app_bricks/web_ui/test_web_ui.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,59 @@ def test_stop_sets_should_exit():
7979
ui._server = dummy_server
8080
ui.stop()
8181
assert dummy_server.should_exit is True
82+
83+
84+
def test_expose_camera_starts_camera_if_not_started():
85+
from unittest.mock import Mock, patch
86+
import numpy as np
87+
88+
ui = WebUI()
89+
mock_camera = Mock()
90+
mock_camera.is_started = False
91+
mock_camera.capture = Mock(side_effect=[np.zeros((2, 2, 3), dtype=np.uint8), RuntimeError("end")])
92+
93+
with patch("arduino.app_utils.image.compress_to_jpeg", return_value=np.array([0], dtype=np.uint8)):
94+
ui.expose_camera("/stream", mock_camera)
95+
TestClient(ui.app, raise_server_exceptions=False).get("/stream")
96+
97+
mock_camera.start.assert_called_once()
98+
99+
100+
def test_expose_camera_streams_mjpeg_response():
101+
from unittest.mock import Mock, patch
102+
import numpy as np
103+
104+
ui = WebUI()
105+
mock_camera = Mock()
106+
mock_camera.is_started = True
107+
108+
fake_frame = np.zeros((2, 2, 3), dtype=np.uint8)
109+
mock_camera.capture = Mock(side_effect=[fake_frame, RuntimeError("end")])
110+
111+
fake_jpeg = b"\xff\xd8jpeg"
112+
113+
with patch("arduino.app_utils.image.compress_to_jpeg", return_value=np.frombuffer(fake_jpeg, dtype=np.uint8)):
114+
ui.expose_camera("/stream", mock_camera)
115+
response = TestClient(ui.app).get("/stream")
116+
117+
assert response.status_code == 200
118+
assert "multipart/x-mixed-replace" in response.headers["content-type"]
119+
assert fake_jpeg in response.content
120+
121+
122+
def test_expose_camera_passes_quality_to_compress():
123+
from unittest.mock import Mock, patch
124+
import numpy as np
125+
126+
ui = WebUI()
127+
mock_camera = Mock()
128+
mock_camera.is_started = True
129+
130+
fake_frame = np.zeros((2, 2, 3), dtype=np.uint8)
131+
mock_camera.capture = Mock(side_effect=[fake_frame, RuntimeError("end")])
132+
133+
with patch("arduino.app_utils.image.compress_to_jpeg", return_value=np.array([0], dtype=np.uint8)) as mock_compress:
134+
ui.expose_camera("/stream", mock_camera, quality=95)
135+
TestClient(ui.app).get("/stream")
136+
137+
mock_compress.assert_called_with(fake_frame, quality=95)

0 commit comments

Comments
 (0)