Skip to content

Commit 343b0b4

Browse files
committed
add multiple threads and use C11 threads
1 parent 42d5e69 commit 343b0b4

File tree

5 files changed

+67
-49
lines changed

5 files changed

+67
-49
lines changed

modules/turn/alloc.c

+15-12
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ static void destructor(void *arg)
5252
{
5353
struct allocation *al = arg;
5454

55-
pthread_mutex_lock(&turndp()->mutex);
55+
mtx_lock(&turndp()->mutex);
5656
list_unlink(&al->le_map);
57-
pthread_mutex_unlock(&turndp()->mutex);
57+
mtx_unlock(&turndp()->mutex);
5858

5959
hash_flush(al->perms);
6060
mem_deref(al->perms);
@@ -64,8 +64,11 @@ static void destructor(void *arg)
6464
tmr_cancel(&al->tmr);
6565
mem_deref(al->username);
6666
mem_deref(al->cli_sock);
67+
68+
/* @TODO check fd deref cleanup on turn worker thread */
6769
mem_deref(al->rel_us);
6870
mem_deref(al->rsv_us);
71+
6972
turndp()->allocc_cur--;
7073
}
7174

@@ -94,17 +97,17 @@ static void udp_recv(const struct sa *src, struct mbuf *mb, void *arg)
9497
}
9598
}
9699

97-
pthread_mutex_lock(&al->mutex);
100+
mtx_lock(&al->mutex);
98101
perm = perm_find(al->perms, src);
99-
pthread_mutex_unlock(&al->mutex);
102+
mtx_unlock(&al->mutex);
100103
if (!perm) {
101104
++al->dropc_rx;
102105
return;
103106
}
104107

105-
pthread_mutex_lock(&al->mutex);
108+
mtx_lock(&al->mutex);
106109
chan = chan_peer_find(al->chans, src);
107-
pthread_mutex_unlock(&al->mutex);
110+
mtx_unlock(&al->mutex);
108111
if (chan) {
109112
uint16_t len = mbuf_get_left(mb);
110113
size_t start;
@@ -197,9 +200,9 @@ static int relay_listen(const struct sa *rel_addr, struct allocation *al,
197200
udp_thread_detach(al->rel_us);
198201
udp_thread_detach(al->rsv_us);
199202

200-
pthread_mutex_lock(&turndp()->mutex);
203+
mtx_lock(&turndp()->mutex);
201204
list_append(&turndp()->re_map, &al->le_map, al);
202-
pthread_mutex_unlock(&turndp()->mutex);
205+
mtx_unlock(&turndp()->mutex);
203206

204207
return (i == PORT_TRY_MAX) ? EADDRINUSE : err;
205208
}
@@ -263,7 +266,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
263266
goto reply;
264267
}
265268

266-
restund_debug("turn: allocation already exists (%J)\n", src);
269+
restund_warning("turn: allocation already exists (%J)\n", src);
267270
++turnd->reply.scode_437;
268271
rerr = stun_ereply(proto, sock, src, 0, msg,
269272
437, "Allocation TID Mismatch",
@@ -367,7 +370,7 @@ void allocate_request(struct turnd *turnd, struct allocation *alx,
367370
al->cli_addr = *src;
368371
al->srv_addr = *dst;
369372
al->proto = proto;
370-
pthread_mutex_init(&al->mutex, NULL);
373+
mtx_init(&al->mutex, mtx_plain);
371374
sa_init(&al->rsv_addr, AF_UNSPEC);
372375
turndp()->allocc_tot++;
373376
turndp()->allocc_cur++;
@@ -483,9 +486,9 @@ void refresh_request(struct turnd *turnd, struct allocation *al,
483486
lifetime = lifetime ? MAX(lifetime, TURN_DEFAULT_LIFETIME) : 0;
484487
lifetime = MIN(lifetime, turnd->lifetime_max);
485488

486-
pthread_mutex_lock(&al->mutex);
489+
mtx_lock(&al->mutex);
487490
tmr_start(&al->tmr, lifetime * 1000, timeout, al);
488-
pthread_mutex_unlock(&al->mutex);
491+
mtx_unlock(&al->mutex);
489492

490493
restund_debug("turn: allocation %p refresh (%us)\n", al, lifetime);
491494

modules/turn/chan.c

+6-6
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ static void destructor(void *arg)
5050
restund_debug("turn: allocation %p channel 0x%x %J destroyed\n",
5151
chan->al, chan->numb, &chan->peer);
5252

53-
pthread_mutex_lock(&chan->al->mutex);
53+
mtx_lock(&chan->al->mutex);
5454
hash_unlink(&chan->he_numb);
5555
hash_unlink(&chan->he_peer);
56-
pthread_mutex_unlock(&chan->al->mutex);
56+
mtx_unlock(&chan->al->mutex);
5757
}
5858

5959

@@ -198,7 +198,7 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
198198
if (!chan)
199199
return NULL;
200200

201-
pthread_mutex_lock(&al->mutex);
201+
mtx_lock(&al->mutex);
202202

203203
hash_append(cl->ht_numb, numb, &chan->he_numb, chan);
204204
hash_append(cl->ht_peer, sa_hash(peer, SA_ALL), &chan->he_peer, chan);
@@ -208,7 +208,7 @@ static struct chan *chan_create(struct chanlist *cl, uint16_t numb,
208208
chan->al = al;
209209
chan->expires = time(NULL) + CHAN_LIFETIME;
210210

211-
pthread_mutex_unlock(&al->mutex);
211+
mtx_unlock(&al->mutex);
212212

213213
restund_debug("turn: allocation %p channel 0x%x %J created\n",
214214
chan->al, chan->numb, &chan->peer);
@@ -222,9 +222,9 @@ static void chan_refresh(struct chan *chan)
222222
if (!chan)
223223
return;
224224

225-
pthread_mutex_lock(&chan->al->mutex);
225+
mtx_lock(&chan->al->mutex);
226226
chan->expires = time(NULL) + CHAN_LIFETIME;
227-
pthread_mutex_unlock(&chan->al->mutex);
227+
mtx_unlock(&chan->al->mutex);
228228

229229
restund_debug("turn: allocation %p channel 0x%x %J refreshed\n",
230230
chan->al, chan->numb, &chan->peer);

modules/turn/perm.c

+4-4
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,9 @@ static void destructor(void *arg)
3838
struct perm *perm = arg;
3939
int err;
4040

41-
pthread_mutex_lock(&perm->al->mutex);
41+
mtx_lock(&perm->al->mutex);
4242
hash_unlink(&perm->he);
43-
pthread_mutex_unlock(&perm->al->mutex);
43+
mtx_unlock(&perm->al->mutex);
4444

4545
restund_debug("turn: allocation %p permission %j destroyed "
4646
"(%llu/%llu %llu/%llu)\n",
@@ -104,7 +104,7 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer,
104104
if (!perm)
105105
return NULL;
106106

107-
pthread_mutex_lock(&al->mutex);
107+
mtx_lock(&al->mutex);
108108

109109
hash_append(ht, sa_hash(peer, SA_ADDR), &perm->he, perm);
110110

@@ -113,7 +113,7 @@ struct perm *perm_create(struct hash *ht, const struct sa *peer,
113113
perm->expires = now + PERM_LIFETIME;
114114
perm->start = now;
115115

116-
pthread_mutex_unlock(&al->mutex);
116+
mtx_unlock(&al->mutex);
117117

118118
restund_debug("turn: allocation %p permission %j created\n", al, peer);
119119

modules/turn/turn.c

+40-24
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111

1212

1313
enum {
14-
ALLOC_DEFAULT_BSIZE = 512,
14+
ALLOC_DEFAULT_BSIZE = 1024,
15+
TURN_THREADS = 4
1516
};
1617

1718

@@ -23,8 +24,8 @@ struct tuple {
2324

2425

2526
static struct turnd turnd;
26-
static struct tmr tmr;
27-
27+
static struct tmr timers[TURN_THREADS];
28+
static thrd_t tid[TURN_THREADS];
2829

2930
struct turnd *turndp(void)
3031
{
@@ -338,53 +339,53 @@ static struct restund_cmdsub cmd_turnreply = {
338339

339340
static void tmr_handler(void *arg)
340341
{
341-
struct turnd *td = arg;
342+
struct tmr *tmr = arg;
342343
struct le *le;
343344

344-
pthread_mutex_lock(&td->mutex);
345-
if (!td->run)
345+
mtx_lock(&turndp()->mutex);
346+
if (!turndp()->run)
346347
re_cancel();
347348

348-
LIST_FOREACH(&td->re_map, le)
349+
/* Reassign one allocation by time */
350+
LIST_FOREACH(&turndp()->re_map, le)
349351
{
350352
struct allocation *al = le->data;
351-
pthread_mutex_lock(&al->mutex);
353+
mtx_lock(&al->mutex);
352354
udp_thread_attach(al->rel_us);
353355
udp_thread_attach(al->rsv_us);
354-
pthread_mutex_unlock(&al->mutex);
356+
mtx_unlock(&al->mutex);
355357
}
356-
list_clear(&td->re_map);
357-
pthread_mutex_unlock(&td->mutex);
358+
list_clear(&turndp()->re_map);
358359

360+
mtx_unlock(&turndp()->mutex);
359361

360-
tmr_start(&tmr, 10, tmr_handler, td);
362+
tmr_start(tmr, 10, tmr_handler, tmr);
361363
}
362364

363365

364-
static void *thread_handler(void *arg)
366+
static int thread_handler(void *arg)
365367
{
366-
struct turnd *td = arg;
368+
struct tmr *tmr = arg;
367369
int err;
368370

369371
err = re_thread_init();
370372
if (err) {
371373
restund_error("turn: re_thread_init failed %m\n", err);
372-
return NULL;
374+
return 0;
373375
}
374376

375-
tmr_start(&tmr, 10, tmr_handler, td);
377+
tmr_start(tmr, 10, tmr_handler, tmr);
376378

377379
err = re_main(NULL);
378380
if (err)
379381
restund_error("turn: re_main failed %m\n", err);
380382

381-
restund_warning("EXIT turn thread\n");
382-
tmr_cancel(&tmr);
383+
tmr_cancel(tmr);
383384

384385
tmr_debug();
385386
re_thread_close();
386387

387-
return NULL;
388+
return 0;
388389
}
389390

390391

@@ -462,8 +463,20 @@ static int module_init(void)
462463
list_init(&turnd.re_map);
463464

464465
turnd.run = true;
465-
pthread_mutex_init(&turnd.mutex, NULL);
466-
err = pthread_create(&turnd.tid, NULL, thread_handler, &turnd);
466+
err = mtx_init(&turnd.mutex, mtx_plain);
467+
if (err) {
468+
restund_error("turn: mtx_init err: %d\n", err);
469+
goto out;
470+
}
471+
472+
for (int i = 0; i < TURN_THREADS; i++) {
473+
err = thrd_create(&tid[i], thread_handler,
474+
&timers[i]);
475+
if (err) {
476+
restund_error("turn: thrd_create err: %m\n", err);
477+
goto out;
478+
}
479+
}
467480

468481
restund_debug("turn: lifetime=%u ext=%j ext6=%j bsz=%u\n",
469482
turnd.lifetime_max, &turnd.rel_addr, &turnd.rel_addr6,
@@ -476,10 +489,13 @@ static int module_init(void)
476489

477490
static int module_close(void)
478491
{
479-
pthread_mutex_lock(&turnd.mutex);
492+
mtx_lock(&turnd.mutex);
480493
turnd.run = false;
481-
pthread_mutex_unlock(&turnd.mutex);
482-
pthread_join(turnd.tid, NULL);
494+
mtx_unlock(&turnd.mutex);
495+
496+
for (int i = 0; i < TURN_THREADS; i++) {
497+
thrd_join(tid[i], NULL);
498+
}
483499

484500
hash_flush(turnd.ht_alloc);
485501
turnd.ht_alloc = mem_deref(turnd.ht_alloc);

modules/turn/turn.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ struct turnd {
1919
uint32_t allocc_cur;
2020
uint32_t lifetime_max;
2121
uint32_t udp_sockbuf_size;
22-
pthread_t tid;
23-
pthread_mutex_t mutex;
22+
mtx_t mutex;
2423
bool run;
2524
struct list re_map;
2625

@@ -42,7 +41,7 @@ struct chanlist;
4241
struct allocation {
4342
struct le he;
4443
struct le le_map;
45-
pthread_mutex_t mutex;
44+
mtx_t mutex;
4645
struct tmr tmr;
4746
uint8_t tid[STUN_TID_SIZE];
4847
struct sa cli_addr;

0 commit comments

Comments
 (0)