-
Notifications
You must be signed in to change notification settings - Fork 464
Expand file tree
/
Copy pathtest_manager.py
More file actions
111 lines (89 loc) · 3.5 KB
/
test_manager.py
File metadata and controls
111 lines (89 loc) · 3.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
import socket
import numpy as np
import pytest
from aiortc import RTCConfiguration
from app.models.webrtc import InputData, Offer
from app.webrtc import SDPHandler
from app.webrtc.broadcaster import FrameBroadcaster
from app.webrtc.manager import WebRTCManager, WebRTCSettings
VALID_SDP = (
"v=0\n"
"o=- 0 0 IN IP4 127.0.0.1\n"
"s=-\n"
"t=0 0\n"
"m=video 9 UDP/TLS/RTP/SAVPF 96\n"
"c=IN IP4 0.0.0.0\n"
"a=rtpmap:96 VP8/90000\n"
"a=ice-ufrag:someufrag\n"
"a=ice-pwd:somepassword\n"
"a=setup:actpass\n"
"a=mid:0\n"
"a=rtcp-mux\n"
"a=recvonly\n"
)
@pytest.fixture
def fxt_frame_broadcaster():
return FrameBroadcaster[np.ndarray]()
@pytest.fixture
def fxt_settings():
return WebRTCSettings(config=RTCConfiguration(iceServers=[]))
@pytest.fixture
def fxt_sdp_handler():
return SDPHandler()
@pytest.fixture
def fxt_manager(fxt_frame_broadcaster, fxt_settings, fxt_sdp_handler):
return WebRTCManager(fxt_frame_broadcaster, fxt_settings, fxt_sdp_handler)
@pytest.fixture
def fxt_offer():
return Offer(webrtc_id="test_id", sdp=VALID_SDP, type="offer")
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# Doesn't need to be reachable
s.connect(("10.255.255.255", 1))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
class TestWebRTCManager:
@pytest.mark.asyncio
async def test_handle_offer_creates_connection(self, fxt_manager, fxt_offer):
answer = await fxt_manager.handle_offer(fxt_offer)
assert get_local_ip() in answer.sdp
assert answer.type == "answer"
assert "test_id" in fxt_manager._pcs
assert fxt_manager._frame_broadcaster.is_registered("test_id")
@pytest.mark.asyncio
async def test_handle_offer_with_host_resolution(self, fxt_frame_broadcaster, fxt_sdp_handler, fxt_offer):
settings = WebRTCSettings(config=RTCConfiguration(iceServers=[]), advertise_ip="localhost")
manager = WebRTCManager(fxt_frame_broadcaster, settings, fxt_sdp_handler)
answer = await manager.handle_offer(fxt_offer)
assert "127.0.0.1" in answer.sdp
assert answer.type == "answer"
assert "test_id" in manager._pcs
assert manager._frame_broadcaster.is_registered("test_id")
@pytest.mark.asyncio
async def test_cleanup_connection_removes_pc(self, fxt_manager, fxt_offer):
await fxt_manager.handle_offer(fxt_offer)
assert "test_id" in fxt_manager._pcs
assert fxt_manager._frame_broadcaster.is_registered("test_id")
await fxt_manager.cleanup_connection("test_id")
assert "test_id" not in fxt_manager._pcs
assert not fxt_manager._frame_broadcaster.is_registered("test_id")
@pytest.mark.asyncio
def test_set_input_stores_data(self, fxt_manager):
data = InputData(webrtc_id="test_id", conf_threshold=0.5)
fxt_manager.set_input(data)
assert fxt_manager._input_data["test_id"]["conf_threshold"] == 0.5
@pytest.mark.asyncio
async def test_cleanup_removes_all(self, fxt_manager, fxt_offer):
await fxt_manager.handle_offer(fxt_offer)
await fxt_manager.handle_offer(Offer(webrtc_id="id2", sdp=VALID_SDP, type="offer"))
await fxt_manager.cleanup()
assert not fxt_manager._pcs
assert not fxt_manager._input_data
assert fxt_manager._frame_broadcaster.consumer_count() == 0