Skip to content

Commit e44f89f

Browse files
authored
Implement stop_request() to request cancellation when a request is stopped (#185)
* request_stop() concept * Make safe for all return types * More tests * Update docs
1 parent a9a16d5 commit e44f89f

File tree

8 files changed

+104
-2
lines changed

8 files changed

+104
-2
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Type: Package
22
Package: nanonext
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 1.6.2.9002
4+
Version: 1.6.2.9003
55
Authors@R: c(
66
person("Charlie", "Gao", , "[email protected]", role = c("aut", "cre"),
77
comment = c(ORCID = "0000-0002-0750-061X")),

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ export(socket)
9696
export(stat)
9797
export(status_code)
9898
export(stop_aio)
99+
export(stop_request)
99100
export(stream)
100101
export(subscribe)
101102
export(survey_time)

R/aio.R

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,23 @@ collect_aio_ <- function(x) .Call(rnng_aio_collect_safe, x)
264264
#'
265265
stop_aio <- function(x) invisible(.Call(rnng_aio_stop, x))
266266

267+
#' Stop Request Operation
268+
#'
269+
#' Stop an asynchronous Aio operation, or a list of Aio operations, created by
270+
#' [request()]. This is an augmented version of [stop_aio()] that additionally
271+
#' requests cancellation by sending an integer zero followed by the context ID
272+
#' over the context, and waiting for the response.
273+
#'
274+
#' @param x an Aio or list of Aios (objects of class 'recvAio' returned by
275+
#' [request()]).
276+
#'
277+
#' @return Invisibly, a logical vector.
278+
#'
279+
#' @keywords internal
280+
#' @export
281+
#'
282+
stop_request <- function(x) invisible(.Call(rnng_request_stop, x))
283+
267284
#' Query if an Aio is Unresolved
268285
#'
269286
#' Query whether an Aio, Aio value or list of Aios remains unresolved. Unlike

man/stop_request.Rd

Lines changed: 22 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aio.c

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,61 @@ SEXP rnng_aio_stop(SEXP x) {
469469

470470
}
471471

472+
SEXP rnng_request_stop(SEXP x) {
473+
474+
SEXP out;
475+
switch (TYPEOF(x)) {
476+
case ENVSXP: ;
477+
SEXP coreaio;
478+
nng_msg *msgp = NULL;
479+
int res = 0;
480+
PROTECT(coreaio = Rf_findVarInFrame(x, nano_AioSymbol));
481+
if (NANO_PTR_CHECK(coreaio, nano_AioSymbol)) goto fail;
482+
nano_aio *aiop = (nano_aio *) NANO_PTR(coreaio);
483+
if (aiop->type != REQAIOS && aiop->type != REQAIO) goto fail;
484+
nng_aio_stop(aiop->aio);
485+
nano_saio *saio = (nano_saio *) aiop->cb;
486+
if (saio->id == 0) goto fail;
487+
488+
const SEXP context = Rf_getAttrib(coreaio, nano_ContextSymbol);
489+
if (NANO_PTR_CHECK(context, nano_ContextSymbol)) goto fail;
490+
nng_ctx *ctx = (nng_ctx *) NANO_PTR(context);
491+
const nng_duration dur = NANONEXT_WAIT_DUR;
492+
if (nng_ctx_set_ms(*ctx, "send-timeout", dur) ||
493+
nng_ctx_set_ms(*ctx, "recv-timeout", dur) ||
494+
nng_msg_alloc(&msgp, 0) ||
495+
nng_msg_append_u32(msgp, 0) ||
496+
nng_msg_append(msgp, &saio->id, sizeof(int)) ||
497+
nng_ctx_sendmsg(*ctx, msgp, 0)) {
498+
goto fail;
499+
}
500+
msgp = NULL;
501+
if (nng_ctx_recvmsg(*ctx, &msgp, 0))
502+
goto fail;
503+
memcpy(&res, nng_msg_body(msgp), sizeof(int));
504+
505+
fail:
506+
nng_msg_free(msgp);
507+
UNPROTECT(1);
508+
out = Rf_ScalarLogical(res != 0);
509+
break;
510+
case VECSXP: ;
511+
const R_xlen_t xlen = Rf_xlength(x);
512+
PROTECT(out = Rf_allocVector(LGLSXP, xlen));
513+
for (R_xlen_t i = xlen - 1; i >= 0; i--) {
514+
SEXP item = rnng_request_stop(NANO_VECTOR(x)[i]);
515+
SET_LOGICAL_ELT(out, i, NANO_INTEGER(item));
516+
}
517+
UNPROTECT(1);
518+
break;
519+
default:
520+
out = Rf_ScalarLogical(0);
521+
}
522+
523+
return out;
524+
525+
}
526+
472527
static int rnng_unresolved_impl(SEXP x) {
473528

474529
int xc;

src/init.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ static const R_CallMethodDef callMethods[] = {
153153
{"rnng_recv", (DL_FUNC) &rnng_recv, 4},
154154
{"rnng_recv_aio", (DL_FUNC) &rnng_recv_aio, 6},
155155
{"rnng_request", (DL_FUNC) &rnng_request, 8},
156+
{"rnng_request_stop", (DL_FUNC) &rnng_request_stop, 1},
156157
{"rnng_send", (DL_FUNC) &rnng_send, 5},
157158
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 6},
158159
{"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 3},

src/nanonext.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ extern int R_interrupts_pending;
123123
#define NANONEXT_SERIAL_THR 134217728
124124
#define NANONEXT_CHUNK_SIZE INT_MAX // must be <= INT_MAX
125125
#define NANONEXT_STR_SIZE 40
126+
#define NANONEXT_WAIT_DUR 1000
126127
#define NANO_ALLOC(x, sz) \
127128
(x)->buf = calloc(sz, sizeof(unsigned char)); \
128129
if ((x)->buf == NULL) Rf_error("memory allocation failed"); \
@@ -390,6 +391,7 @@ SEXP rnng_reap(SEXP);
390391
SEXP rnng_recv(SEXP, SEXP, SEXP, SEXP);
391392
SEXP rnng_recv_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
392393
SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
394+
SEXP rnng_request_stop(SEXP);
393395
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP, SEXP);
394396
SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
395397
SEXP rnng_serial_config(SEXP, SEXP, SEXP);

tests/tests.R

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ test_identical(call_aio(msg), msg)
114114
test_class("recvAio", msg <- n1$recv_aio(mode = "complex", timeout = 500))
115115
test_null(stop_aio(msg))
116116
test_null(stop_aio(n))
117+
test_false(stop_request(n))
117118
test_identical(call_aio(msg), msg)
118119
test_class("errorValue", msg$data)
119120
test_identical(call_aio(n), n)
@@ -300,6 +301,7 @@ test_notnull(cs$data)
300301
test_true(recv(ctxn, block = 500))
301302
test_zero(send(ctxn, TRUE, mode = 1L, block = 500))
302303
test_class("recvAio", cs <- request(.context(req$socket), data = TRUE, timeout = 5, id = TRUE))
304+
test_false(stop_request(cs))
303305
test_zero(reap(ctxn))
304306
test_equal(reap(ctxn), 7L)
305307
test_zero(pipe_notify(rep, cv, add = TRUE, flag = TRUE))
@@ -450,6 +452,7 @@ test_type("integer", call_aio_(put1)$status)
450452
if (put1$status == 200L) test_notnull(put1$headers)
451453
if (put1$status == 200L) test_notnull(put1$data)
452454
test_null(stop_aio(put1))
455+
test_false(stop_request(put1))
453456
test_class("ncurlAio", haio <- ncurl_aio("https://i.i"))
454457
test_class("errorValue", call_aio(haio)$data)
455458
test_print(haio$data)
@@ -588,8 +591,9 @@ test_error(collect_aio_("a"), "object is not an Aio or list of Aios")
588591
test_error(collect_aio_(list("a")), "object is not an Aio or list of Aios")
589592
test_error(collect_aio(list(fakesock)), "object is not an Aio or list of Aios")
590593
test_null(stop_aio("a"))
594+
test_false(stop_request("a"))
591595
test_null(stop_aio(list("a")))
592-
test_null(stop_aio(list("a")))
596+
test_false(stop_request(list("a")))
593597
test_null(.keep(NULL, new.env()))
594598
test_null(.keep(new.env(), new.env()))
595599

0 commit comments

Comments
 (0)