Skip to content

Commit efdcd81

Browse files
committed
Use nng_msg_append() directly
1 parent 80407ad commit efdcd81

File tree

7 files changed

+137
-101
lines changed

7 files changed

+137
-101
lines changed

src/aio.c

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -625,26 +625,27 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
625625
const int raw = nano_encode_mode(mode);
626626
SEXP aio, env, fun;
627627
nano_aio *saio = NULL;
628-
nano_buf buf;
629628
int sock, xc;
630629

631630
if ((sock = !NANO_PTR_CHECK(con, nano_SocketSymbol)) || !NANO_PTR_CHECK(con, nano_ContextSymbol)) {
632631

633632
const int pipeid = sock ? nano_integer(pipe) : 0;
634-
if (raw) {
635-
nano_encode(&buf, data);
636-
} else {
637-
nano_serialize(&buf, data, NANO_PROT(con), 0);
638-
}
639633
nng_msg *msg = NULL;
640634

641635
saio = calloc(1, sizeof(nano_aio));
642636
NANO_ENSURE_ALLOC(saio);
643637
saio->type = SENDAIO;
644638

645-
if ((xc = nng_msg_alloc(&msg, 0)) ||
646-
(xc = nng_msg_append(msg, buf.buf, buf.cur)) ||
647-
(xc = nng_aio_alloc(&saio->aio, saio_complete, saio))) {
639+
if ((xc = nng_msg_alloc(&msg, 0)))
640+
goto fail;
641+
642+
if (raw) {
643+
nano_encode(msg, data);
644+
} else {
645+
nano_serialize(msg, data, NANO_PROT(con), 0);
646+
}
647+
648+
if ((xc = nng_aio_alloc(&saio->aio, saio_complete, saio))) {
648649
nng_msg_free(msg);
649650
goto fail;
650651
}
@@ -659,14 +660,14 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
659660
nng_aio_set_timeout(saio->aio, dur);
660661
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), saio->aio) :
661662
nng_ctx_send(*(nng_ctx *) NANO_PTR(con), saio->aio);
662-
NANO_FREE(buf);
663663

664664
PROTECT(aio = R_MakeExternalPtr(saio, nano_AioSymbol, R_NilValue));
665665
R_RegisterCFinalizerEx(aio, saio_finalizer, TRUE);
666666

667667
} else if (!NANO_PTR_CHECK(con, nano_StreamSymbol)) {
668668

669-
nano_encode(&buf, data);
669+
nano_buf buf;
670+
nano_encode_buf(&buf, data);
670671

671672
nano_stream *nst = (nano_stream *) NANO_PTR(con);
672673
nng_stream *sp = nst->stream;
@@ -683,8 +684,10 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
683684
};
684685

685686
if ((xc = nng_aio_alloc(&saio->aio, isaio_complete, saio)) ||
686-
(xc = nng_aio_set_iov(saio->aio, 1u, &iov)))
687-
goto fail;
687+
(xc = nng_aio_set_iov(saio->aio, 1u, &iov))) {
688+
NANO_FREE(buf);
689+
goto fail;
690+
}
688691

689692
nng_aio_set_timeout(saio->aio, dur);
690693
nng_stream_send(sp, saio->aio);
@@ -711,7 +714,6 @@ SEXP rnng_send_aio(SEXP con, SEXP data, SEXP mode, SEXP timeout, SEXP pipe, SEXP
711714
nng_aio_free(saio->aio);
712715
free(saio->data);
713716
failmem:
714-
NANO_FREE(buf);
715717
free(saio);
716718
return mk_error_data(-xc);
717719

src/comms.c

Lines changed: 21 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -335,54 +335,41 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
335335

336336
const int flags = block == R_NilValue ? NNG_DURATION_DEFAULT : TYPEOF(block) == LGLSXP ? 0 : nano_integer(block);
337337
const int raw = nano_encode_mode(mode);
338-
nano_buf buf;
339338
int sock, xc;
340339

341340
if ((sock = !NANO_PTR_CHECK(con, nano_SocketSymbol)) || !NANO_PTR_CHECK(con, nano_ContextSymbol)) {
342341

343342
const int pipeid = sock ? nano_integer(pipe) : 0;
343+
nng_msg *msgp = NULL;
344+
345+
if ((xc = nng_msg_alloc(&msgp, 0)))
346+
goto fail;
347+
344348
if (raw) {
345-
nano_encode(&buf, data);
349+
nano_encode(msgp, data);
346350
} else {
347-
nano_serialize(&buf, data, NANO_PROT(con), 0);
351+
nano_serialize(msgp, data, NANO_PROT(con), 0);
348352
}
349-
nng_msg *msgp = NULL;
350353

351-
if (flags <= 0) {
352-
353-
if ((xc = nng_msg_alloc(&msgp, 0)))
354-
goto fail;
354+
if (pipeid) {
355+
nng_pipe p;
356+
p.id = (uint32_t) pipeid;
357+
nng_msg_set_pipe(msgp, p);
358+
}
355359

356-
if (pipeid) {
357-
nng_pipe p;
358-
p.id = (uint32_t) pipeid;
359-
nng_msg_set_pipe(msgp, p);
360-
}
360+
if (flags <= 0) {
361361

362-
if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
363-
(xc = sock ? nng_sendmsg(*(nng_socket *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK) :
362+
if ((xc = sock ? nng_sendmsg(*(nng_socket *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK) :
364363
nng_ctx_sendmsg(*(nng_ctx *) NANO_PTR(con), msgp, flags ? NNG_FLAG_NONBLOCK : (NANO_INTEGER(block) != 1) * NNG_FLAG_NONBLOCK))) {
365364
nng_msg_free(msgp);
366365
goto fail;
367366
}
368367

369-
NANO_FREE(buf);
370-
371368
} else {
372369

373370
nng_aio *aiop = NULL;
374371

375-
if ((xc = nng_msg_alloc(&msgp, 0)))
376-
goto fail;
377-
378-
if (pipeid) {
379-
nng_pipe p;
380-
p.id = (uint32_t) pipeid;
381-
nng_msg_set_pipe(msgp, p);
382-
}
383-
384-
if ((xc = nng_msg_append(msgp, buf.buf, buf.cur)) ||
385-
(xc = nng_aio_alloc(&aiop, NULL, NULL))) {
372+
if ((xc = nng_aio_alloc(&aiop, NULL, NULL))) {
386373
nng_msg_free(msgp);
387374
goto fail;
388375
}
@@ -391,7 +378,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
391378
nng_aio_set_timeout(aiop, flags);
392379
sock ? nng_send_aio(*(nng_socket *) NANO_PTR(con), aiop) :
393380
nng_ctx_send(*(nng_ctx *) NANO_PTR(con), aiop);
394-
NANO_FREE(buf);
395381
nng_aio_wait(aiop);
396382
if ((xc = nng_aio_result(aiop)))
397383
nng_msg_free(nng_aio_get_msg(aiop));
@@ -401,7 +387,8 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
401387

402388
} else if (!NANO_PTR_CHECK(con, nano_StreamSymbol)) {
403389

404-
nano_encode(&buf, data);
390+
nano_buf buf;
391+
nano_encode_buf(&buf, data);
405392

406393
nano_stream *nst = (nano_stream *) NANO_PTR(con);
407394
nng_stream *sp = nst->stream;
@@ -411,11 +398,14 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
411398
.iov_len = buf.cur - nst->textframes
412399
};
413400

414-
if ((xc = nng_aio_alloc(&aiop, NULL, NULL)))
401+
if ((xc = nng_aio_alloc(&aiop, NULL, NULL))) {
402+
NANO_FREE(buf);
415403
goto fail;
404+
}
416405

417406
if ((xc = nng_aio_set_iov(aiop, 1u, &iov))) {
418407
nng_aio_free(aiop);
408+
NANO_FREE(buf);
419409
goto fail;
420410
}
421411

@@ -436,7 +426,6 @@ SEXP rnng_send(SEXP con, SEXP data, SEXP mode, SEXP block, SEXP pipe) {
436426
return nano_success;
437427

438428
fail:
439-
NANO_FREE(buf);
440429
return mk_error(xc);
441430

442431
}

src/core.c

Lines changed: 73 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ static SEXP nano_eval_prot (void *call) {
1313
}
1414

1515
static void nano_cleanup(void *data, Rboolean jump) {
16-
if (jump)
17-
free(data);
16+
if (jump) {
17+
nng_msg_free((nng_msg *) data);
18+
}
1819
}
1920

2021
static void nano_eval_safe (void *call) {
@@ -23,28 +24,12 @@ static void nano_eval_safe (void *call) {
2324

2425
static void nano_write_bytes(R_outpstream_t stream, void *src, int len) {
2526

26-
nano_buf *buf = (nano_buf *) stream->data;
27-
28-
size_t req = buf->cur + (size_t) len;
29-
if (req > buf->len) {
30-
if (req > R_XLEN_T_MAX) {
31-
if (buf->len) free(buf->buf);
32-
Rf_error("serialization exceeds max length of raw vector");
33-
}
34-
do {
35-
buf->len += buf->len > NANONEXT_SERIAL_THR ? NANONEXT_SERIAL_THR : buf->len;
36-
} while (buf->len < req);
37-
unsigned char *nbuf = realloc(buf->buf, buf->len);
38-
if (nbuf == NULL) {
39-
free(buf->buf);
40-
Rf_error("memory allocation failed");
41-
}
42-
buf->buf = nbuf;
27+
nng_msg *msg = (nng_msg *) stream->data;
28+
if (nng_msg_append(msg, src, (size_t) len)) {
29+
nng_msg_free(msg);
30+
Rf_error("serialization failed");
4331
}
4432

45-
memcpy(buf->buf + buf->cur, src, len);
46-
buf->cur += len;
47-
4833
}
4934

5035
static void nano_read_bytes(R_inpstream_t stream, void *dst, int len) {
@@ -111,10 +96,10 @@ static SEXP nano_serialize_hook(SEXP x, SEXP hook_func) {
11196

11297
SEXP out, call;
11398
PROTECT(call = Rf_lcons(NANO_VECTOR(hook_func)[i], Rf_cons(x, R_NilValue)));
114-
out = R_UnwindProtect(nano_eval_prot, call, nano_cleanup, nano_bundle.buf, NULL);
99+
out = R_UnwindProtect(nano_eval_prot, call, nano_cleanup, nano_bundle.msg, NULL);
115100
UNPROTECT(1);
116101
if (TYPEOF(out) != RAWSXP) {
117-
free(nano_bundle.buf);
102+
nng_msg_free(nano_bundle.msg);
118103
Rf_error("Serialization function for `%s` did not return a raw vector", NANO_STR_N(klass, i));
119104
}
120105

@@ -259,28 +244,28 @@ SEXP nano_raw_char(const unsigned char *buf, const size_t sz) {
259244

260245
}
261246

262-
void nano_serialize(nano_buf *buf, SEXP object, SEXP hook, int header) {
247+
void nano_serialize(nng_msg *msg, SEXP object, SEXP hook, int header) {
263248

264-
NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE);
265249
struct R_outpstream_st output_stream;
266250

267251
if (header || special_marker) {
268-
buf->buf[0] = 0x7;
269-
buf->buf[3] = (uint8_t) special_marker;
270-
if (header)
271-
memcpy(buf->buf + 4, &header, sizeof(int));
272-
buf->cur += 8;
252+
unsigned char magic[4] = {0x7, 0x0, 0x0, (uint8_t) special_marker};
253+
if (nng_msg_append(msg, magic, sizeof(magic)) ||
254+
nng_msg_append(msg, &header, sizeof(int))) {
255+
nng_msg_free(msg);
256+
Rf_error("serialization failed");
257+
}
273258
}
274259

275260
if (hook != R_NilValue) {
276261
nano_bundle.klass = NANO_VECTOR(hook)[0];
277262
nano_bundle.outpstream = &output_stream;
278-
nano_bundle.buf = buf->buf;
263+
nano_bundle.msg = msg;
279264
}
280265

281266
R_InitOutPStream(
282267
&output_stream,
283-
(R_pstream_data_t) buf,
268+
(R_pstream_data_t) msg,
284269
R_pstream_binary_format,
285270
NANONEXT_SERIAL_VER,
286271
NULL,
@@ -429,10 +414,62 @@ SEXP nano_decode(unsigned char *buf, const size_t sz, const uint8_t mod, SEXP ho
429414

430415
}
431416

432-
void nano_encode(nano_buf *enc, const SEXP object) {
417+
void nano_encode(nng_msg *msg, const SEXP object) {
433418

434419
switch (TYPEOF(object)) {
435-
case STRSXP: ;
420+
case STRSXP: {
421+
R_xlen_t xlen = XLENGTH(object);
422+
if (xlen == 1) {
423+
const char *s = NANO_STRING(object);
424+
size_t slen = strlen(s) + 1;
425+
if (nng_msg_append(msg, s, slen))
426+
goto fail;
427+
} else {
428+
for (R_xlen_t i = 0; i < xlen; i++) {
429+
const char *s = NANO_STR_N(object, i);
430+
size_t slen = strlen(s) + 1;
431+
if (nng_msg_append(msg, s, slen))
432+
goto fail;
433+
}
434+
}
435+
break;
436+
}
437+
case REALSXP:
438+
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object) * sizeof(double)))
439+
goto fail;
440+
break;
441+
case INTSXP:
442+
case LGLSXP:
443+
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object) * sizeof(int)))
444+
goto fail;
445+
break;
446+
case CPLXSXP:
447+
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object) * 2 * sizeof(double)))
448+
goto fail;
449+
break;
450+
case RAWSXP:
451+
if (nng_msg_append(msg, DATAPTR_RO(object), XLENGTH(object)))
452+
goto fail;
453+
break;
454+
case NILSXP:
455+
break;
456+
default:
457+
nng_msg_free(msg);
458+
Rf_error("`data` must be an atomic vector type or NULL to send in mode 'raw'");
459+
}
460+
461+
return;
462+
463+
fail:
464+
nng_msg_free(msg);
465+
Rf_error("encode failed");
466+
467+
}
468+
469+
void nano_encode_buf(nano_buf *enc, const SEXP object) {
470+
471+
switch (TYPEOF(object)) {
472+
case STRSXP: {
436473
const char *s;
437474
R_xlen_t xlen = XLENGTH(object);
438475
if (xlen == 1) {
@@ -452,6 +489,7 @@ void nano_encode(nano_buf *enc, const SEXP object) {
452489
enc->cur += slen;
453490
}
454491
break;
492+
}
455493
case REALSXP:
456494
NANO_INIT(enc, (unsigned char *) DATAPTR_RO(object), XLENGTH(object) * sizeof(double));
457495
break;

src/nanonext.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ typedef struct nano_serial_bundle_s {
230230
R_outpstream_t outpstream;
231231
R_inpstream_t inpstream;
232232
SEXP klass;
233-
unsigned char *buf;
233+
nng_msg *msg;
234234
} nano_serial_bundle;
235235

236236
typedef enum nano_list_op {
@@ -320,10 +320,11 @@ void haio_invoke_cb(void *);
320320
SEXP mk_error(const int);
321321
SEXP mk_error_data(const int);
322322
SEXP nano_raw_char(const unsigned char *, const size_t);
323-
void nano_serialize(nano_buf *, const SEXP, SEXP, int);
323+
void nano_serialize(nng_msg *, const SEXP, SEXP, int);
324324
SEXP nano_unserialize(unsigned char *, const size_t, SEXP);
325325
SEXP nano_decode(unsigned char *, const size_t, const uint8_t, SEXP);
326-
void nano_encode(nano_buf *, const SEXP);
326+
void nano_encode(nng_msg *, const SEXP);
327+
void nano_encode_buf(nano_buf *, const SEXP);
327328
int nano_encode_mode(const SEXP);
328329
int nano_matcharg(const SEXP);
329330
SEXP nano_aio_result(SEXP);

0 commit comments

Comments
 (0)