Skip to content

Commit 8c3856d

Browse files
committed
allows wait thread to be re-launched after shutdown
1 parent 4c33e6f commit 8c3856d

File tree

3 files changed

+58
-56
lines changed

3 files changed

+58
-56
lines changed

DESCRIPTION

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 1.3.0.9006
4+
Version: 1.3.0.9007
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library implementing 'Scalability Protocols', a reliable,
77
high-performance standard for common communications patterns including

NEWS.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# nanonext 1.3.0.9006 (development)
1+
# nanonext 1.3.0.9007 (development)
22

33
#### Updates
44

src/thread.c

+56-54
Original file line numberDiff line numberDiff line change
@@ -284,60 +284,6 @@ void single_wait_thread_create(SEXP x) {
284284

285285
// # nocov end
286286

287-
static void thread_duo_finalizer(SEXP xptr) {
288-
289-
if (NANO_PTR(xptr) == NULL) return;
290-
nano_thread_duo *xp = (nano_thread_duo *) NANO_PTR(xptr);
291-
nano_cv *ncv = xp->cv;
292-
if (ncv != NULL) {
293-
nng_mtx *mtx = ncv->mtx;
294-
nng_cv *cv = ncv->cv;
295-
nng_mtx_lock(mtx);
296-
ncv->condition = -1;
297-
nng_cv_wake(cv);
298-
nng_mtx_unlock(mtx);
299-
}
300-
nng_thread_destroy(xp->thr);
301-
R_Free(xp);
302-
303-
}
304-
305-
static void thread_disp_finalizer(SEXP xptr) {
306-
307-
if (NANO_PTR(xptr) == NULL) return;
308-
nano_thread_disp *xp = (nano_thread_disp *) NANO_PTR(xptr);
309-
nano_cv *ncv = xp->cv;
310-
nng_mtx *mtx = ncv->mtx;
311-
nng_cv *cv = ncv->cv;
312-
nng_mtx_lock(mtx);
313-
ncv->condition = -1;
314-
nng_cv_wake(cv);
315-
nng_mtx_unlock(mtx);
316-
if (xp->tls != NULL)
317-
nng_tls_config_free(xp->tls);
318-
nng_thread_destroy(xp->thr);
319-
nng_url_free(xp->up);
320-
for (int i = 0; i < xp->n; i++) {
321-
nng_aio_free(xp->saio[i]->aio);
322-
nng_aio_free(xp->raio[i]->aio);
323-
nng_aio_free(xp->haio[i]->aio);
324-
R_Free(xp->saio[i]);
325-
R_Free(xp->raio[i]);
326-
R_Free(xp->haio[i]);
327-
R_Free(xp->url[i]);
328-
}
329-
R_Free(xp->saio);
330-
R_Free(xp->raio);
331-
R_Free(xp->haio);
332-
R_Free(xp->url);
333-
R_Free(xp->online);
334-
nng_cv_free(ncv->cv);
335-
nng_mtx_free(ncv->mtx);
336-
R_Free(xp->cv);
337-
R_Free(xp);
338-
339-
}
340-
341287
static void rnng_wait_thread(void *args) {
342288

343289
while (1) {
@@ -468,6 +414,7 @@ SEXP rnng_thread_shutdown(void) {
468414
nng_thread_destroy(nano_wait_thr);
469415
nng_cv_free(nano_wait_cv);
470416
nng_mtx_free(nano_wait_mtx);
417+
nano_wait_thr = NULL;
471418
}
472419
return R_NilValue;
473420
}
@@ -487,6 +434,24 @@ static void nano_record_pipe(nng_pipe p, nng_pipe_ev ev, void *arg) {
487434

488435
}
489436

437+
static void thread_duo_finalizer(SEXP xptr) {
438+
439+
if (NANO_PTR(xptr) == NULL) return;
440+
nano_thread_duo *xp = (nano_thread_duo *) NANO_PTR(xptr);
441+
nano_cv *ncv = xp->cv;
442+
if (ncv != NULL) {
443+
nng_mtx *mtx = ncv->mtx;
444+
nng_cv *cv = ncv->cv;
445+
nng_mtx_lock(mtx);
446+
ncv->condition = -1;
447+
nng_cv_wake(cv);
448+
nng_mtx_unlock(mtx);
449+
}
450+
nng_thread_destroy(xp->thr);
451+
R_Free(xp);
452+
453+
}
454+
490455
static void rnng_signal_thread(void *args) {
491456

492457
nano_thread_duo *duo = (nano_thread_duo *) args;
@@ -574,6 +539,43 @@ SEXP rnng_signal_thread_create(SEXP cv, SEXP cv2) {
574539

575540
}
576541

542+
543+
static void thread_disp_finalizer(SEXP xptr) {
544+
545+
if (NANO_PTR(xptr) == NULL) return;
546+
nano_thread_disp *xp = (nano_thread_disp *) NANO_PTR(xptr);
547+
nano_cv *ncv = xp->cv;
548+
nng_mtx *mtx = ncv->mtx;
549+
nng_cv *cv = ncv->cv;
550+
nng_mtx_lock(mtx);
551+
ncv->condition = -1;
552+
nng_cv_wake(cv);
553+
nng_mtx_unlock(mtx);
554+
if (xp->tls != NULL)
555+
nng_tls_config_free(xp->tls);
556+
nng_thread_destroy(xp->thr);
557+
nng_url_free(xp->up);
558+
for (int i = 0; i < xp->n; i++) {
559+
nng_aio_free(xp->saio[i]->aio);
560+
nng_aio_free(xp->raio[i]->aio);
561+
nng_aio_free(xp->haio[i]->aio);
562+
R_Free(xp->saio[i]);
563+
R_Free(xp->raio[i]);
564+
R_Free(xp->haio[i]);
565+
R_Free(xp->url[i]);
566+
}
567+
R_Free(xp->saio);
568+
R_Free(xp->raio);
569+
R_Free(xp->haio);
570+
R_Free(xp->url);
571+
R_Free(xp->online);
572+
nng_cv_free(ncv->cv);
573+
nng_mtx_free(ncv->mtx);
574+
R_Free(xp->cv);
575+
R_Free(xp);
576+
577+
}
578+
577579
static void rnng_dispatch_thread(void *args) {
578580

579581
nano_thread_disp *disp = (nano_thread_disp *) args;

0 commit comments

Comments
 (0)