Skip to content

Commit 681beac

Browse files
committed
feat: support multiple juice_mux_listen callbacks
Supports multiple callbacks for unhandled STUN requests. Stores multiple registries, one per local listening port in a `conn_registry_t **registries;` field on `conn_mode_entry_t`. The list of registries grows in the same way as the list of agents in `conn_mode_entry_t`. Adds a `port` property to `conn_registry_t` which is the port the registry is bound to, and an `index` property which is the index of the registry in the containing `registries` list in `conn_mode_entry_t`.
1 parent 2fb91a3 commit 681beac

File tree

2 files changed

+83
-18
lines changed

2 files changed

+83
-18
lines changed

src/conn.c

Lines changed: 81 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,30 +33,54 @@ typedef struct conn_mode_entry {
3333
int (*get_addrs_func)(juice_agent_t *agent, addr_record_t *records, size_t size);
3434

3535
mutex_t mutex;
36-
conn_registry_t *registry;
36+
conn_registry_t **registries;
37+
int registries_size;
38+
int registries_count;
39+
3740
} conn_mode_entry_t;
3841

42+
typedef struct conn_impl {
43+
conn_registry_t *registry;
44+
timestamp_t next_timestamp;
45+
bool finished;
46+
} conn_impl_t;
47+
3948
#define MODE_ENTRIES_SIZE 3
4049

4150
static conn_mode_entry_t mode_entries[MODE_ENTRIES_SIZE] = {
4251
{conn_poll_registry_init, conn_poll_registry_cleanup, conn_poll_init, conn_poll_cleanup,
4352
conn_poll_lock, conn_poll_unlock, conn_poll_interrupt, conn_poll_send, conn_poll_get_addrs,
44-
MUTEX_INITIALIZER, NULL},
53+
MUTEX_INITIALIZER, NULL, 0, 0},
4554
{conn_mux_registry_init, conn_mux_registry_cleanup, conn_mux_init, conn_mux_cleanup,
4655
conn_mux_lock, conn_mux_unlock, conn_mux_interrupt, conn_mux_send, conn_mux_get_addrs,
47-
MUTEX_INITIALIZER, NULL},
56+
MUTEX_INITIALIZER, NULL, 0, 0},
4857
{NULL, NULL, conn_thread_init, conn_thread_cleanup, conn_thread_lock, conn_thread_unlock,
49-
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL}};
58+
conn_thread_interrupt, conn_thread_send, conn_thread_get_addrs, MUTEX_INITIALIZER, NULL, 0, 0}};
5059

5160
static conn_mode_entry_t *get_mode_entry(juice_agent_t *agent) {
5261
juice_concurrency_mode_t mode = agent->config.concurrency_mode;
5362
assert(mode >= 0 && mode < MODE_ENTRIES_SIZE);
5463
return mode_entries + (int)mode;
5564
}
5665

66+
static conn_registry_t *get_registry(conn_mode_entry_t *entry, uint16_t port) {
67+
for (int i = 0; i < entry->registries_size; i++) {
68+
if (!entry->registries[i]) {
69+
continue;
70+
}
71+
72+
if (entry->registries[i]->port == port) {
73+
return entry->registries[i];
74+
}
75+
}
76+
77+
return NULL;
78+
}
79+
5780
static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *config) {
5881
// entry must be locked
59-
conn_registry_t *registry = entry->registry;
82+
conn_registry_t *registry = get_registry(entry, config->port_begin);
83+
6084
if (!registry) {
6185
if (!entry->registry_init_func)
6286
return 0;
@@ -91,8 +115,38 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
91115
return -1;
92116
}
93117

94-
entry->registry = registry;
118+
int i = 0;
119+
while (i < entry->registries_size && entry->registries[i])
120+
++i;
121+
122+
if (i == entry->registries_size) {
123+
int new_size = entry->registries_size * 2;
124+
125+
if (new_size == 0) {
126+
new_size = 1;
127+
}
95128

129+
JLOG_DEBUG("Reallocating cb_mux_incomings array, new_size=%d", new_size);
130+
assert(new_size > 0);
131+
132+
conn_registry_t **new_registries =
133+
realloc(entry->registries, new_size * sizeof(conn_registry_t *));
134+
if (!new_registries) {
135+
JLOG_FATAL("Memory reallocation failed for cb_mux_incomings array");
136+
mutex_unlock(&registry->mutex);
137+
mutex_unlock(&entry->mutex);
138+
return -1;
139+
}
140+
141+
entry->registries = new_registries;
142+
entry->registries_size = new_size;
143+
memset(entry->registries + i, 0, (new_size - i) * sizeof(conn_registry_t *));
144+
}
145+
146+
registry->index = i;
147+
registry->port = config->port_begin;
148+
149+
entry->registries[i] = registry;
96150
} else {
97151
mutex_lock(&registry->mutex);
98152
}
@@ -101,24 +155,31 @@ static int acquire_registry(conn_mode_entry_t *entry, udp_socket_config_t *confi
101155
return 0;
102156
}
103157

104-
static void release_registry(conn_mode_entry_t *entry) {
158+
static void release_registry(conn_mode_entry_t *entry, conn_registry_t *registry) {
105159
// entry must be locked
106-
conn_registry_t *registry = entry->registry;
107160
if (!registry)
108161
return;
109162

110163
// registry must be locked
111-
112164
if (registry->agents_count == 0 && registry->cb_mux_incoming == NULL) {
113165
JLOG_DEBUG("No connection left, destroying connections registry");
114166
mutex_unlock(&registry->mutex);
115167

116168
if (entry->registry_cleanup_func)
117169
entry->registry_cleanup_func(registry);
118170

171+
if (registry->index > -1) {
172+
int i = registry->index;
173+
assert(entry->registries[i] == registry);
174+
entry->registries[i] = NULL;
175+
registry->index = -1;
176+
}
177+
178+
assert(entry->registries_count > 0);
179+
--entry->registries_count;
180+
119181
free(registry->agents);
120182
free(registry);
121-
entry->registry = NULL;
122183
return;
123184
}
124185

@@ -136,7 +197,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
136197
return -1;
137198
}
138199

139-
conn_registry_t *registry = entry->registry;
200+
conn_registry_t *registry = get_registry(entry, config->port_begin);
140201

141202
JLOG_DEBUG("Creating connection");
142203
if (registry) {
@@ -164,7 +225,7 @@ int conn_create(juice_agent_t *agent, udp_socket_config_t *config) {
164225
}
165226

166227
if (get_mode_entry(agent)->init_func(agent, registry, config)) {
167-
release_registry(entry); // unlocks the registry
228+
release_registry(entry, registry); // unlocks the registry
168229
mutex_unlock(&entry->mutex);
169230
return -1;
170231
}
@@ -194,7 +255,9 @@ void conn_destroy(juice_agent_t *agent) {
194255
mutex_lock(&entry->mutex);
195256

196257
JLOG_DEBUG("Destroying connection");
197-
conn_registry_t *registry = entry->registry;
258+
conn_impl_t *conn_impl = agent->conn_impl;
259+
260+
conn_registry_t *registry = conn_impl->registry;
198261
if (registry) {
199262
mutex_lock(&registry->mutex);
200263

@@ -210,7 +273,7 @@ void conn_destroy(juice_agent_t *agent) {
210273
assert(registry->agents_count > 0);
211274
--registry->agents_count;
212275

213-
release_registry(entry); // unlocks the registry
276+
release_registry(entry, registry); // unlocks the registry
214277

215278
} else {
216279
entry->cleanup_func(agent);
@@ -264,7 +327,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) {
264327

265328
mutex_lock(&entry->mutex);
266329

267-
conn_registry_t *registry = entry->registry;
330+
conn_registry_t *registry = get_registry(entry, local_port);
268331
if (!registry) {
269332
mutex_unlock(&entry->mutex);
270333
return -1;
@@ -276,7 +339,7 @@ static int juice_mux_stop_listen(const char *bind_address, int local_port) {
276339
registry->mux_incoming_user_ptr = NULL;
277340
conn_mux_interrupt_registry(registry);
278341

279-
release_registry(entry);
342+
release_registry(entry, registry);
280343

281344
mutex_unlock(&entry->mutex);
282345

@@ -296,7 +359,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco
296359

297360
mutex_lock(&entry->mutex);
298361

299-
if (entry->registry) {
362+
if (get_registry(entry, local_port)) {
300363
mutex_unlock(&entry->mutex);
301364
JLOG_DEBUG("juice_mux_listen needs to be called before establishing any mux connection.");
302365
return -1;
@@ -307,7 +370,7 @@ int juice_mux_listen(const char *bind_address, int local_port, juice_cb_mux_inco
307370
return -1;
308371
}
309372

310-
conn_registry_t *registry = entry->registry;
373+
conn_registry_t *registry = get_registry(entry, local_port);
311374
if(!registry) {
312375
mutex_unlock(&entry->mutex);
313376
return -1;

src/conn.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ typedef struct juice_agent juice_agent_t;
2525
// See include/juice/juice.h for implemented concurrency modes
2626

2727
typedef struct conn_registry {
28+
int index;
29+
uint16_t port;
2830
void *impl;
2931
mutex_t mutex;
3032
juice_agent_t **agents;

0 commit comments

Comments
 (0)