Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Type: Package
Package: nanonext
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
Version: 1.5.2.9009
Version: 1.5.2.9010
Authors@R: c(
person("Charlie", "Gao", , "[email protected]", role = c("aut", "cre"),
comment = c(ORCID = "0000-0002-0750-061X")),
Expand Down
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ export(nng_version)
export(opt)
export(parse_url)
export(pipe_notify)
export(pipe_register)
export(random)
export(read_monitor)
export(reap)
Expand Down
40 changes: 40 additions & 0 deletions R/sync.R
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@
#' socket. The underlying transport may be closed at this point, and it is not
#' possible to communicate using this pipe.
#'
#' Note: this function cannot be used in conjunction with [pipe_register()].
#'
#' @param socket a Socket.
#' @param cv a 'conditionVariable' to signal, or NULL to cancel a previously set
#' signal.
Expand Down Expand Up @@ -197,6 +199,44 @@
pipe_notify <- function(socket, cv, add = FALSE, remove = FALSE, flag = FALSE)
invisible(.Call(rnng_pipe_notify, socket, cv, add, remove, flag))


#' Pipe Register
#'
#' Register a callback to run whenever pipes (individual connections) are
#' added or removed at a socket.
#'
#' For add: this event occurs after the pipe is fully added to the socket. Prior
#' to this time, it is not possible to communicate over the pipe with the
#' socket.
#'
#' For remove: this event occurs after the pipe has been removed from the
#' socket. The underlying transport may be closed at this point, and it is not
#' possible to communicate using this pipe.
#'
#' Note: this function cannot be used in conjunction with [pipe_notify()].
#'
#' @param socket a Socket.
#' @param add \[default NULL\] an R function callback to be run whenever a pipe
#' is added. If NULL, any previously-registered callback is cancelled.
#' @param remove \[default NULL\] an R function callback to be run whenever a
#' pipe is removed. If NULL, any previously-registered callback is cancelled.
#'
#' @return Invisibly, zero on success (will otherwise error).
#'
#' @examplesIf requireNamespace("later", quietly = TRUE)
#' s <- socket(listen = "inproc://nanopipecb")
#' pipe_register(s, function() print("hi"), function() print("bye"))
#'
#' s1 <- socket(dial = "inproc://nanopipecb")
#' close(s1)
#'
#' close(s)
#'
#' @export
#'
pipe_register <- function(socket, add = NULL, remove = NULL)
invisible(.Call(rnng_pipe_register, socket, add, remove))

Check warning on line 238 in R/sync.R

View check run for this annotation

Codecov / codecov/patch

R/sync.R#L237-L238

Added lines #L237 - L238 were not covered by tests

#' Signal Forwarder
#'
#' Forwards signals from one 'conditionVariable' to another.
Expand Down
2 changes: 2 additions & 0 deletions man/pipe_notify.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 46 additions & 0 deletions man/pipe_register.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/init.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ static const R_CallMethodDef callMethods[] = {
{"rnng_ncurl_session_close", (DL_FUNC) &rnng_ncurl_session_close, 1},
{"rnng_ncurl_transact", (DL_FUNC) &rnng_ncurl_transact, 1},
{"rnng_pipe_notify", (DL_FUNC) &rnng_pipe_notify, 5},
{"rnng_pipe_register", (DL_FUNC) &rnng_pipe_register, 3},
{"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 6},
{"rnng_random", (DL_FUNC) &rnng_random, 2},
{"rnng_reap", (DL_FUNC) &rnng_reap, 1},
Expand Down
2 changes: 2 additions & 0 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ extern int R_interrupts_pending;

#define ERROR_OUT(xc) Rf_error("%d | %s", xc, nng_strerror(xc))
#define ERROR_RET(xc) { Rf_warning("%d | %s", xc, nng_strerror(xc)); return mk_error(xc); }
#define NANONEXT_ENSURE_LATER if (eln2 == NULL) nano_load_later()
#define NANONEXT_INIT_BUFSIZE 4096
#define NANONEXT_SERIAL_VER 3
#define NANONEXT_SERIAL_THR 134217728
Expand Down Expand Up @@ -345,6 +346,7 @@ SEXP rnng_ncurl_session(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_ncurl_session_close(SEXP);
SEXP rnng_ncurl_transact(SEXP);
SEXP rnng_pipe_notify(SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_pipe_register(SEXP, SEXP, SEXP);
SEXP rnng_protocol_open(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
SEXP rnng_random(SEXP, SEXP);
SEXP rnng_reap(SEXP);
Expand Down
84 changes: 76 additions & 8 deletions src/sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,29 @@
#define NANONEXT_SIGNALS
#include "nanonext.h"

// internals -------------------------------------------------------------------

static void nano_load_later(void) {

SEXP str, call;
PROTECT(str = Rf_mkString("later"));
PROTECT(call = Rf_lang2(Rf_install("loadNamespace"), str));
Rf_eval(call, R_BaseEnv);
UNPROTECT(2);
eln2 = (void (*)(void (*)(void *), void *, double, int)) R_GetCCallable("later", "execLaterNative2");

}

static void nano_eval_later (void *arg) {

Check warning on line 19 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L19

Added line #L19 was not covered by tests

SEXP call, node;
node = R_WeakRefValue((SEXP) arg);
PROTECT(call = Rf_lcons(node, R_NilValue));
Rf_eval(call, R_GlobalEnv);
UNPROTECT(1);

Check warning on line 25 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L22-L25

Added lines #L22 - L25 were not covered by tests

}

Check warning on line 27 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L27

Added line #L27 was not covered by tests

// aio completion callbacks ----------------------------------------------------

static void sendaio_complete(void *arg) {
Expand Down Expand Up @@ -101,6 +124,12 @@

}

void pipe_cb_eval(nng_pipe p, nng_pipe_ev ev, void *arg) {

Check warning on line 127 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L127

Added line #L127 was not covered by tests

later2(nano_eval_later, arg);

Check warning on line 129 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L129

Added line #L129 was not covered by tests

}

Check warning on line 131 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L131

Added line #L131 was not covered by tests

static void pipe_cb_monitor(nng_pipe p, nng_pipe_ev ev, void *arg) {

nano_monitor *monitor = (nano_monitor *) arg;
Expand Down Expand Up @@ -495,14 +524,7 @@

nano_aio *raio = (nano_aio *) NANO_PTR(aio);

if (eln2 == NULL) {
SEXP str, call;
PROTECT(str = Rf_mkString("later"));
PROTECT(call = Rf_lang2(Rf_install("loadNamespace"), str));
Rf_eval(call, R_BaseEnv);
UNPROTECT(2);
eln2 = (void (*)(void (*)(void *), void *, double, int)) R_GetCCallable("later", "execLaterNative2");
}
NANONEXT_ENSURE_LATER;

switch (raio->type) {
case REQAIO:
Expand Down Expand Up @@ -569,6 +591,52 @@

}

SEXP rnng_pipe_register(SEXP socket, SEXP add, SEXP remove) {

Check warning on line 594 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L594

Added line #L594 was not covered by tests

if (NANO_PTR_CHECK(socket, nano_SocketSymbol))
Rf_error("'socket' is not a valid Socket");

Check warning on line 597 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L596-L597

Added lines #L596 - L597 were not covered by tests

int xc;
nng_socket *sock = (nng_socket *) NANO_PTR(socket);

Check warning on line 600 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L600

Added line #L600 was not covered by tests

NANONEXT_ENSURE_LATER;

Check warning on line 602 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L602

Added line #L602 was not covered by tests

if (add != R_NilValue) {

Check warning on line 604 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L604

Added line #L604 was not covered by tests

if (TYPEOF(add) != CLOSXP)
Rf_error("'add' is not a valid R closure function");

Check warning on line 607 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L606-L607

Added lines #L606 - L607 were not covered by tests

SEXP ref = R_MakeWeakRef(socket, add, R_NilValue, FALSE);
if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, pipe_cb_eval, ref)))
ERROR_OUT(xc);

Check warning on line 611 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L609-L611

Added lines #L609 - L611 were not covered by tests

} else {

if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_ADD_POST, NULL, NULL)))
ERROR_OUT(xc);

Check warning on line 616 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L615-L616

Added lines #L615 - L616 were not covered by tests

}

if (remove != R_NilValue) {

Check warning on line 620 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L620

Added line #L620 was not covered by tests

if (TYPEOF(remove) != CLOSXP)
Rf_error("'remove' is not a valid R closure function");

Check warning on line 623 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L622-L623

Added lines #L622 - L623 were not covered by tests

SEXP ref = R_MakeWeakRef(socket, remove, R_NilValue, FALSE);
if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, pipe_cb_eval, ref)))
ERROR_OUT(xc);

Check warning on line 627 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L625-L627

Added lines #L625 - L627 were not covered by tests

} else {

if ((xc = nng_pipe_notify(*sock, NNG_PIPE_EV_REM_POST, NULL, NULL)))
ERROR_OUT(xc);

Check warning on line 632 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L631-L632

Added lines #L631 - L632 were not covered by tests

}

return nano_success;

Check warning on line 636 in src/sync.c

View check run for this annotation

Codecov / codecov/patch

src/sync.c#L636

Added line #L636 was not covered by tests

}

// monitors --------------------------------------------------------------------

SEXP rnng_monitor_create(SEXP socket, SEXP cv) {
Expand Down
Loading