Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions src/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -625,26 +625,27 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
const int raw = nano_encode_mode(mode);
SEXP aio, env, fun;
nano_aio *saio = NULL;
nano_buf buf;
int sock, xc;

if ((sock = !NANO_PTR_CHECK(con, nano_SocketSymbol)) || !NANO_PTR_CHECK(con, nano_ContextSymbol)) {

const int pipeid = sock ? nano_integer(pipe) : 0;
if (raw) {
nano_encode(&buf, data);
} else {
nano_serialize(&buf, data, NANO_PROT(con), 0);
}
nng_msg *msg = NULL;

saio = calloc(1, sizeof(nano_aio));
NANO_ENSURE_ALLOC(saio);
saio->type = SENDAIO;

if ((xc = nng_msg_alloc(&msg, 0)) ||
(xc = nng_msg_append(msg, buf.buf, buf.cur)) ||
(xc = nng_aio_alloc(&saio->aio, saio_complete, saio))) {
if ((xc = nng_msg_alloc(&msg, 0)))
goto fail;

if (raw) {
nano_encode(msg, data);
} else {
nano_serialize(msg, data, NANO_PROT(con), 0);
}

if ((xc = nng_aio_alloc(&saio->aio, saio_complete, saio))) {
nng_msg_free(msg);
goto fail;
}
Expand All @@ -659,14 +660,14 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
nng_aio_set_timeout(saio->aio, dur);
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), saio->aio) :
nng_ctx_send(*(nng_ctx *) NANO_PTR(con), saio->aio);
NANO_FREE(buf);

PROTECT(aio = R_MakeExternalPtr(saio, nano_AioSymbol, R_NilValue));
R_RegisterCFinalizerEx(aio, saio_finalizer, TRUE);

} else if (!NANO_PTR_CHECK(con, nano_StreamSymbol)) {

nano_encode(&buf, data);
nano_buf buf;
nano_encode_buf(&buf, data);

nano_stream *nst = (nano_stream *) NANO_PTR(con);
nng_stream *sp = nst->stream;
Expand All @@ -683,8 +684,10 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
};

if ((xc = nng_aio_alloc(&saio->aio, isaio_complete, saio)) ||
(xc = nng_aio_set_iov(saio->aio, 1u, &iov)))
goto fail;
(xc = nng_aio_set_iov(saio->aio, 1u, &iov))) {
NANO_FREE(buf);
goto fail;
}

nng_aio_set_timeout(saio->aio, dur);
nng_stream_send(sp, saio->aio);
Expand All @@ -711,7 +714,6 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
nng_aio_free(saio->aio);
free(saio->data);
failmem:
NANO_FREE(buf);
free(saio);
return mk_error_data(-xc);

Expand Down
53 changes: 21 additions & 32 deletions src/comms.c
Original file line number Diff line number Diff line change
Expand Up @@ -335,54 +335,41 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {

const int flags = block == R_NilValue ? NNG_DURATION_DEFAULT : TYPEOF(block) == LGLSXP ? 0 : nano_integer(block);
const int raw = nano_encode_mode(mode);
nano_buf buf;
int sock, xc;

if ((sock = !NANO_PTR_CHECK(con, nano_SocketSymbol)) || !NANO_PTR_CHECK(con, nano_ContextSymbol)) {

const int pipeid = sock ? nano_integer(pipe) : 0;
nng_msg *msgp = NULL;

if ((xc = nng_msg_alloc(&msgp, 0)))
goto fail;

if (raw) {
nano_encode(&buf, data);
nano_encode(msgp, data);
} else {
nano_serialize(&buf, data, NANO_PROT(con), 0);
nano_serialize(msgp, data, NANO_PROT(con), 0);
}
nng_msg *msgp = NULL;

if (flags <= 0) {

if ((xc = nng_msg_alloc(&msgp, 0)))
goto fail;
if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msgp, p);
}

if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msgp, p);
}
if (flags <= 0) {

if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
(xc = sock ? nng_sendmsg(*(nng_socket *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK) :
if ((xc = sock ? nng_sendmsg(*(nng_socket *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK) :
nng_ctx_sendmsg(*(nng_ctx *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK))) {
nng_msg_free(msgp);
goto fail;
}

NANO_FREE(buf);

} else {

nng_aio *aiop = NULL;

if ((xc = nng_msg_alloc(&msgp, 0)))
goto fail;

if (pipeid) {
nng_pipe p;
p.id = (uint32_t) pipeid;
nng_msg_set_pipe(msgp, p);
}

if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
(xc = nng_aio_alloc(&aiop, NULL, NULL))) {
if ((xc = nng_aio_alloc(&aiop, NULL, NULL))) {
nng_msg_free(msgp);
goto fail;
}
Expand All @@ -391,7 +378,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
nng_aio_set_timeout(aiop, flags);
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), aiop) :
nng_ctx_send(*(nng_ctx *) NANO_PTR(con), aiop);
NANO_FREE(buf);
nng_aio_wait(aiop);
if ((xc = nng_aio_result(aiop)))
nng_msg_free(nng_aio_get_msg(aiop));
Expand All @@ -401,7 +387,8 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {

} else if (!NANO_PTR_CHECK(con, nano_StreamSymbol)) {

nano_encode(&buf, data);
nano_buf buf;
nano_encode_buf(&buf, data);

nano_stream *nst = (nano_stream *) NANO_PTR(con);
nng_stream *sp = nst->stream;
Expand All @@ -411,11 +398,14 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
.iov_len = buf.cur - nst->textframes
};

if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
if ((xc = nng_aio_alloc(&aiop, NULL, NULL))) {
NANO_FREE(buf);
goto fail;
}

if ((xc = nng_aio_set_iov(aiop, 1u, &iov))) {
nng_aio_free(aiop);
NANO_FREE(buf);
goto fail;
}

Expand All @@ -436,7 +426,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
return nano_success;

fail:
NANO_FREE(buf);
return mk_error(xc);

}
Expand Down
108 changes: 73 additions & 35 deletions src/core.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ static SEXP nano_eval_prot (void *call) {
}

static void nano_cleanup(void *data, Rboolean jump) {
if (jump)
free(data);
if (jump) {
nng_msg_free((nng_msg *) data);
}
}

static void nano_eval_safe (void *call) {
Expand All @@ -23,28 +24,12 @@ static void nano_eval_safe (void *call) {

static void nano_write_bytes(R_outpstream_t stream, void *src, int len) {

nano_buf *buf = (nano_buf *) stream->data;

size_t req = buf->cur + (size_t) len;
if (req > buf->len) {
if (req > R_XLEN_T_MAX) {
if (buf->len) free(buf->buf);
Rf_error("serialization exceeds max length of raw vector");
}
do {
buf->len += buf->len > NANONEXT_SERIAL_THR ? NANONEXT_SERIAL_THR : buf->len;
} while (buf->len < req);
unsigned char *nbuf = realloc(buf->buf, buf->len);
if (nbuf == NULL) {
free(buf->buf);
Rf_error("memory allocation failed");
}
buf->buf = nbuf;
nng_msg *msg = (nng_msg *) stream->data;
if (nng_msg_append(msg, src, (size_t) len)) {
nng_msg_free(msg);
Rf_error("serialization failed");
}

memcpy(buf->buf + buf->cur, src, len);
buf->cur += len;

}

static void nano_read_bytes(R_inpstream_t stream, void *dst, int len) {
Expand Down Expand Up @@ -111,10 +96,10 @@ static SEXP nano_serialize_hook(SEXP x, SEXP hook_func) {

SEXP out, call;
PROTECT(call = Rf_lcons(NANO_VECTOR(hook_func)[i], Rf_cons(x, R_NilValue)));
out = R_UnwindProtect(nano_eval_prot, call, nano_cleanup, nano_bundle.buf, NULL);
out = R_UnwindProtect(nano_eval_prot, call, nano_cleanup, nano_bundle.msg, NULL);
UNPROTECT(1);
if (TYPEOF(out) != RAWSXP) {
free(nano_bundle.buf);
nng_msg_free(nano_bundle.msg);
Rf_error("Serialization function for `%s` did not return a raw vector", NANO_STR_N(klass, i));
}

Expand Down Expand Up @@ -259,28 +244,28 @@ SEXP nano_raw_char(const unsigned char *buf, const size_t sz) {

}

void nano_serialize(nano_buf *buf, SEXP object, SEXP hook, int header) {
void nano_serialize(nng_msg *msg, SEXP object, SEXP hook, int header) {

NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE);
struct R_outpstream_st output_stream;

if (header || special_marker) {
buf->buf[0] = 0x7;
buf->buf[3] = (uint8_t) special_marker;
if (header)
memcpy(buf->buf + 4, &header, sizeof(int));
buf->cur += 8;
unsigned char magic[4] = {0x7, 0x0, 0x0, (uint8_t) special_marker};
if (nng_msg_append(msg, magic, sizeof(magic)) ||
nng_msg_append(msg, &header, sizeof(int))) {
nng_msg_free(msg);
Rf_error("serialization failed");
}
}

if (hook != R_NilValue) {
nano_bundle.klass = NANO_VECTOR(hook)[0];
nano_bundle.outpstream = &output_stream;
nano_bundle.buf = buf->buf;
nano_bundle.msg = msg;
}

R_InitOutPStream(
&output_stream,
(R_pstream_data_t) buf,
(R_pstream_data_t) msg,
R_pstream_binary_format,
NANONEXT_SERIAL_VER,
NULL,
Expand Down Expand Up @@ -429,10 +414,62 @@ SEXP nano_decode(unsigned char *buf, const size_t sz, const uint8_t mod, SEXP ho

}

void nano_encode(nano_buf *enc, const SEXP object) {
void nano_encode(nng_msg *msg, const SEXP object) {

switch (TYPEOF(object)) {
case STRSXP: ;
case STRSXP: {
R_xlen_t xlen = XLENGTH(object);
if (xlen == 1) {
const char *s = NANO_STRING(object);
size_t slen = strlen(s) + 1;
if (nng_msg_append(msg, s, slen))
goto fail;
} else {
for (R_xlen_t i = 0; i < xlen; i++) {
const char *s = NANO_STR_N(object, i);
size_t slen = strlen(s) + 1;
if (nng_msg_append(msg, s, slen))
goto fail;
}
}
break;
}
case REALSXP:
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object) * sizeof(double)))
goto fail;
break;
case INTSXP:
case LGLSXP:
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object) * sizeof(int)))
goto fail;
break;
case CPLXSXP:
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object) * 2 * sizeof(double)))
goto fail;
break;
case RAWSXP:
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object)))
goto fail;
break;
case NILSXP:
break;
default:
nng_msg_free(msg);
Rf_error("`data` must be an atomic vector type or NULL to send in mode 'raw'");
}

return;

fail:
nng_msg_free(msg);
Rf_error("encode failed");

}

void nano_encode_buf(nano_buf *enc, const SEXP object) {

switch (TYPEOF(object)) {
case STRSXP: {
const char *s;
R_xlen_t xlen = XLENGTH(object);
if (xlen == 1) {
Expand All @@ -452,6 +489,7 @@ void nano_encode(nano_buf *enc, const SEXP object) {
enc->cur += slen;
}
break;
}
case REALSXP:
NANO_INIT(enc, (unsigned char *) DATAPTR_RO(object), XLENGTH(object) * sizeof(double));
break;
Expand Down
7 changes: 4 additions & 3 deletions src/nanonext.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ typedef struct nano_serial_bundle_s {
R_outpstream_t outpstream;
R_inpstream_t inpstream;
SEXP klass;
unsigned char *buf;
nng_msg *msg;
} nano_serial_bundle;

typedef enum nano_list_op {
Expand Down Expand Up @@ -320,10 +320,11 @@ void haio_invoke_cb(void *);
SEXP mk_error(const int);
SEXP mk_error_data(const int);
SEXP nano_raw_char(const unsigned char *, const size_t);
void nano_serialize(nano_buf *, const SEXP, SEXP, int);
void nano_serialize(nng_msg *, const SEXP, SEXP, int);
SEXP nano_unserialize(unsigned char *, const size_t, SEXP);
SEXP nano_decode(unsigned char *, const size_t, const uint8_t, SEXP);
void nano_encode(nano_buf *, const SEXP);
void nano_encode(nng_msg *, const SEXP);
void nano_encode_buf(nano_buf *, const SEXP);
int nano_encode_mode(const SEXP);
int nano_matcharg(const SEXP);
SEXP nano_aio_result(SEXP);
Expand Down
Loading
Loading