Skip to content

Commit 6c521e2

Browse files
committed
turn: add multithreading support
1 parent 0d055bd commit 6c521e2

File tree

5 files changed

+132
-9
lines changed

5 files changed

+132
-9
lines changed

modules/turn/alloc.c

+23-1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ static void destructor(void *arg)
5252
{
5353
struct allocation *al = arg;
5454

55+
mtx_lock(&turndp()->mutex);
56+
list_unlink(&al->le_map);
57+
mtx_unlock(&turndp()->mutex);
58+
5559
hash_flush(al->perms);
5660
mem_deref(al->perms);
5761
mem_deref(al->chans);
@@ -60,8 +64,11 @@ static void destructor(void *arg)
6064
tmr_cancel(&al->tmr);
6165
mem_deref(al->username);
6266
mem_deref(al->cli_sock);
67+
68+
/* @TODO check fd deref cleanup on turn worker thread */
6369
mem_deref(al->rel_us);
6470
mem_deref(al->rsv_us);
71+
6572
turndp()->allocc_cur--;
6673
}
6774

@@ -90,13 +97,17 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg)
9097
}
9198
}
9299

100+
mtx_lock(&al->mutex);
93101
perm = perm_find(al->perms, src);
102+
mtx_unlock(&al->mutex);
94103
if (!perm) {
95104
++al->dropc_rx;
96105
return;
97106
}
98107

108+
mtx_lock(&al->mutex);
99109
chan = chan_peer_find(al->chans, src);
110+
mtx_unlock(&al->mutex);
100111
if (chan) {
101112
uint16_t len = mbuf_get_left(mb);
102113
size_t start;
@@ -185,6 +196,14 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
185196
break;
186197
}
187198

199+
/* Release fd for new thread and re_map*/
200+
udp_thread_detach(al->rel_us);
201+
udp_thread_detach(al->rsv_us);
202+
203+
mtx_lock(&turndp()->mutex);
204+
list_append(&turndp()->re_map, &al->le_map, al);
205+
mtx_unlock(&turndp()->mutex);
206+
188207
return (i == PORT_TRY_MAX) ? EADDRINUSE : err;
189208
}
190209

@@ -247,7 +266,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
247266
goto reply;
248267
}
249268

250-
restund_debug("turn: allocation already exists (%J)\n", src);
269+
restund_warning("turn: allocation already exists (%J)\n", src);
251270
++turnd->reply.scode_437;
252271
rerr = stun_ereply(proto, sock, src, 0, msg,
253272
437, "Allocation TID Mismatch",
@@ -351,6 +370,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
351370
al->cli_addr = *src;
352371
al->srv_addr = *dst;
353372
al->proto = proto;
373+
mtx_init(&al->mutex, mtx_plain);
354374
sa_init(&al->rsv_addr, AF_UNSPEC);
355375
turndp()->allocc_tot++;
356376
turndp()->allocc_cur++;
@@ -466,7 +486,9 @@ void refresh_request(struct turnd *turnd, struct allocation *al,
466486
lifetime = lifetime ? MAX(lifetime, TURN_DEFAULT_LIFETIME) : 0;
467487
lifetime = MIN(lifetime, turnd->lifetime_max);
468488

489+
mtx_lock(&al->mutex);
469490
tmr_start(&al->tmr, lifetime * 1000, timeout, al);
491+
mtx_unlock(&al->mutex);
470492

471493
restund_debug("turn: allocation %p refresh (%us)\n", al, lifetime);
472494

modules/turn/chan.c

+11-3
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ struct chan {
2727
struct le he_numb;
2828
struct le he_peer;
2929
struct sa peer;
30-
const struct allocation *al;
30+
struct allocation *al;
3131
time_t expires;
3232
uint16_t numb;
3333
};
@@ -50,8 +50,10 @@ static void destructor(void *arg)
5050
restund_debug("turn: allocation %p channel 0x%x %J destroyed\n",
5151
chan->al, chan->numb, &chan->peer);
5252

53+
mtx_lock(&chan->al->mutex);
5354
hash_unlink(&chan->he_numb);
5455
hash_unlink(&chan->he_peer);
56+
mtx_unlock(&chan->al->mutex);
5557
}
5658

5759

@@ -185,17 +187,19 @@ void chan_status(const struct chanlist *cl, struct mbuf *mb)
185187

186188
static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
187189
const struct sa *peer,
188-
const struct allocation *al)
190+
struct allocation *al)
189191
{
190192
struct chan *chan;
191193

192-
if (!cl || !peer)
194+
if (!cl || !peer || !al)
193195
return NULL;
194196

195197
chan = mem_zalloc(sizeof(*chan), destructor);
196198
if (!chan)
197199
return NULL;
198200

201+
mtx_lock(&al->mutex);
202+
199203
hash_append(cl->ht_numb, numb, &chan->he_numb, chan);
200204
hash_append(cl->ht_peer, sa_hash(peer, SA_ALL), &chan->he_peer, chan);
201205

@@ -204,6 +208,8 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
204208
chan->al = al;
205209
chan->expires = time(NULL) + CHAN_LIFETIME;
206210

211+
mtx_unlock(&al->mutex);
212+
207213
restund_debug("turn: allocation %p channel 0x%x %J created\n",
208214
chan->al, chan->numb, &chan->peer);
209215

@@ -216,7 +222,9 @@ static void chan_refresh(struct chan *chan)
216222
if (!chan)
217223
return;
218224

225+
mtx_lock(&chan->al->mutex);
219226
chan->expires = time(NULL) + CHAN_LIFETIME;
227+
mtx_unlock(&chan->al->mutex);
220228

221229
restund_debug("turn: allocation %p channel 0x%x %J refreshed\n",
222230
chan->al, chan->numb, &chan->peer);

modules/turn/perm.c

+8-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ struct perm {
1919
struct le he;
2020
struct sa peer;
2121
struct restund_trafstat ts;
22-
const struct allocation *al;
22+
struct allocation *al;
2323
time_t expires;
2424
time_t start;
2525
bool new;
@@ -38,7 +38,9 @@ static void destructor(void *arg)
3838
struct perm *perm = arg;
3939
int err;
4040

41+
mtx_lock(&perm->al->mutex);
4142
hash_unlink(&perm->he);
43+
mtx_unlock(&perm->al->mutex);
4244

4345
restund_debug("turn: allocation %p permission %j destroyed "
4446
"(%llu/%llu %llu/%llu)\n",
@@ -90,7 +92,7 @@ struct perm *perm_find(const struct hash *ht, const struct sa *peer)
9092

9193

9294
struct perm *perm_create(struct hash *ht, const struct sa *peer,
93-
const struct allocation *al)
95+
struct allocation *al)
9496
{
9597
const time_t now = time(NULL);
9698
struct perm *perm;
@@ -102,13 +104,17 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer,
102104
if (!perm)
103105
return NULL;
104106

107+
mtx_lock(&al->mutex);
108+
105109
hash_append(ht, sa_hash(peer, SA_ADDR), &perm->he, perm);
106110

107111
perm->peer = *peer;
108112
perm->al = al;
109113
perm->expires = now + PERM_LIFETIME;
110114
perm->start = now;
111115

116+
mtx_unlock(&al->mutex);
117+
112118
restund_debug("turn: allocation %p permission %j created\n", al, peer);
113119

114120
return perm;

modules/turn/turn.c

+84-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212

1313
enum {
14-
ALLOC_DEFAULT_BSIZE = 512,
14+
ALLOC_DEFAULT_BSIZE = 1024,
15+
TURN_THREADS = 4
1516
};
1617

1718

@@ -23,7 +24,8 @@ struct tuple {
2324

2425

2526
static struct turnd turnd;
26-
27+
static struct tmr timers[TURN_THREADS];
28+
static thrd_t tid[TURN_THREADS];
2729

2830
struct turnd *turndp(void)
2931
{
@@ -335,6 +337,60 @@ static struct restund_cmdsub cmd_turnreply = {
335337
};
336338

337339

340+
static void tmr_handler(void *arg)
341+
{
342+
struct tmr *tmr = arg;
343+
struct le *le;
344+
345+
mtx_lock(&turndp()->mutex);
346+
if (!turndp()->run)
347+
re_cancel();
348+
349+
/* Reassign one allocation by time */
350+
LIST_FOREACH(&turndp()->re_map, le)
351+
{
352+
struct allocation *al = le->data;
353+
mtx_lock(&al->mutex);
354+
udp_thread_attach(al->rel_us);
355+
udp_thread_attach(al->rsv_us);
356+
mtx_unlock(&al->mutex);
357+
}
358+
list_clear(&turndp()->re_map);
359+
360+
mtx_unlock(&turndp()->mutex);
361+
362+
tmr_start(tmr, 10, tmr_handler, tmr);
363+
}
364+
365+
366+
static int thread_handler(void *arg)
367+
{
368+
struct tmr *tmr = arg;
369+
int err;
370+
371+
err = re_thread_init();
372+
if (err) {
373+
restund_error("turn: re_thread_init failed %m\n", err);
374+
return 0;
375+
}
376+
377+
fd_setsize(-1);
378+
379+
tmr_start(tmr, 10, tmr_handler, tmr);
380+
381+
err = re_main(NULL);
382+
if (err)
383+
restund_error("turn: re_main failed %m\n", err);
384+
385+
tmr_cancel(tmr);
386+
387+
tmr_debug();
388+
re_thread_close();
389+
390+
return 0;
391+
}
392+
393+
338394
static int module_init(void)
339395
{
340396
uint32_t x, bsize = ALLOC_DEFAULT_BSIZE;
@@ -406,6 +462,24 @@ static int module_init(void)
406462
goto out;
407463
}
408464

465+
list_init(&turnd.re_map);
466+
467+
turnd.run = true;
468+
err = mtx_init(&turnd.mutex, mtx_plain);
469+
if (err) {
470+
restund_error("turn: mtx_init err: %d\n", err);
471+
goto out;
472+
}
473+
474+
for (int i = 0; i < TURN_THREADS; i++) {
475+
err = thrd_create(&tid[i], thread_handler,
476+
&timers[i]);
477+
if (err) {
478+
restund_error("turn: thrd_create err: %m\n", err);
479+
goto out;
480+
}
481+
}
482+
409483
restund_debug("turn: lifetime=%u ext=%j ext6=%j bsz=%u\n",
410484
turnd.lifetime_max, &turnd.rel_addr, &turnd.rel_addr6,
411485
bsize);
@@ -417,6 +491,14 @@ static int module_init(void)
417491

418492
static int module_close(void)
419493
{
494+
mtx_lock(&turnd.mutex);
495+
turnd.run = false;
496+
mtx_unlock(&turnd.mutex);
497+
498+
for (int i = 0; i < TURN_THREADS; i++) {
499+
thrd_join(tid[i], NULL);
500+
}
501+
420502
hash_flush(turnd.ht_alloc);
421503
turnd.ht_alloc = mem_deref(turnd.ht_alloc);
422504
restund_cmd_unsubscribe(&cmd_turnreply);

modules/turn/turn.h

+6-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ struct turnd {
1717
uint32_t allocc_cur;
1818
uint32_t lifetime_max;
1919
uint32_t udp_sockbuf_size;
20+
mtx_t mutex;
21+
bool run;
22+
struct list re_map;
2023

2124
struct {
2225
uint64_t scode_400;
@@ -35,6 +38,8 @@ struct chanlist;
3538

3639
struct allocation {
3740
struct le he;
41+
struct le le_map;
42+
mtx_t mutex;
3843
struct tmr tmr;
3944
uint8_t tid[STUN_TID_SIZE];
4045
struct sa cli_addr;
@@ -73,7 +78,7 @@ struct perm;
7378

7479
struct perm *perm_find(const struct hash *ht, const struct sa *addr);
7580
struct perm *perm_create(struct hash *ht, const struct sa *peer,
76-
const struct allocation *al);
81+
struct allocation *al);
7782
void perm_refresh(struct perm *perm);
7883
void perm_tx_stat(struct perm *perm, size_t bytc);
7984
void perm_rx_stat(struct perm *perm, size_t bytc);

0 commit comments

Comments
 (0)