Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/nng/exchange/exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ struct exchange_s {
unsigned int rb_count;
uint8_t streamType;
uint32_t chunk_size;
/* Optional per-exchange streaming AIO hint for webhook */
nng_aio *streamAio;
};

NNG_DECL int exchange_client_get_msg_by_key(void *arg, uint64_t key, nng_msg **msg);
Expand Down
13 changes: 13 additions & 0 deletions include/nng/exchange/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@
// found online at https://opensource.org/licenses/MIT.
#ifndef STREAM_H
#define STREAM_H
#include "nng/nng.h"
#include "nng/supplemental/util/idhash.h"
#include "nng/supplemental/nanolib/log.h"
#ifdef SUPP_PARQUET
#include "nng/supplemental/nanolib/parquet.h"
#endif

#ifdef __cplusplus
extern "C" {
#endif

typedef struct stream_node {
char *name;
uint8_t id;
void *(*decode)(void *);
void *(*encode)(void *);
void (*encode_stream)(void *, nng_aio *, size_t);
void *(*cmd_parser)(void *);
} stream_node;

Expand Down Expand Up @@ -89,18 +95,25 @@ struct cmd_data {
char **schema;
};

// Unified registration API: supports optional streaming encode callback (may be NULL)
int stream_register(char *name, uint8_t id,
void *(*decode)(void *),
void *(*encode)(void *),
void (*encode_stream)(void *, nng_aio *, size_t),
void *(*cmd_parser)(void *));
int stream_unregister(uint8_t id);
void *stream_decode(uint8_t id, void *buf);
void *stream_encode(uint8_t id, void *buf);
int stream_encode_stream(uint8_t id, void *buf, nng_aio *aio, size_t chunk_bytes);
void *stream_cmd_parser(uint8_t id, void *buf);
int stream_sys_init(void);
void stream_sys_fini(void);

void stream_decoded_data_free(struct stream_decoded_data *data);
void stream_data_out_free(struct stream_data_out *data);
void stream_data_in_free(struct stream_data_in *sdata);

#ifdef __cplusplus
}
#endif
#endif
19 changes: 16 additions & 3 deletions include/nng/supplemental/nanolib/conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ struct ringBuffer_node {
char *name;
uint32_t fullOp;
uint64_t cap;
// Streaming related options for ringbus full operations.
// All are optional; 0 means "use built-in default".
uint32_t stream_chunk_bytes; // Max bytes per parquet streaming chunk
uint32_t stream_throttle_ms; // Sleep interval between streaming rowgroups
uint32_t stream_lmq_cap; // In-memory queue size for pending streaming flush tasks
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused configuration field: The stream_lmq_cap field is added to the ringBuffer_node structure and parsed from configuration (line 1643 in conf_ver2.c), but there's no code in the implementation that actually uses this value to limit queue capacity. Either implement the queue capacity limiting logic or remove this unused configuration field.

Suggested change
uint32_t stream_lmq_cap; // In-memory queue size for pending streaming flush tasks

Copilot uses AI. Check for mistakes.
};

typedef struct conf_exchange_node conf_exchange_node;
Expand Down Expand Up @@ -591,9 +596,17 @@ struct conf_web_hook {
uint16_t rule_count;
conf_web_hook_rule **rules;

nng_mtx *ex_mtx; // mutex for saios
nng_aio *ex_aio; // Await flush
nng_aio **saios; // Aios for sending message
nng_mtx *ex_mtx; // mutex for saios
nng_aio *ex_aio; // Await flush (batch)
nng_aio *ex_stream_aio; // Legacy single streaming AIO (fallback)
nng_aio **saios; // Aios for sending message

// Per-exchange streaming AIOs, one for each conf_exchange_node when
// parquet streaming is enabled. This allows different topics/streams
// to have their own asynchronous pipeline instead of sharing a single
// global ex_stream_aio.
nng_aio **ex_stream_aios;
size_t ex_stream_aios_cnt;

// TODO not support yet
conf_tls tls;
Expand Down
20 changes: 20 additions & 0 deletions include/nng/supplemental/nanolib/parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ typedef struct parquet_data_ret parquet_data_ret;
typedef struct parquet_payload parquet_payload;
typedef void (*parquet_cb)(parquet_object *arg);

// Streaming input wrapper for decoupling parquet from webhook internals.
#define PARQUET_STREAM_IN_MAGIC 0x504B5354 /* 'PKST' */
struct stream_data_in;
typedef struct parquet_stream_in {
uint32_t magic;
struct stream_data_in *sdata; // Input rows
void *user_cbdata; // Opaque pointer passed back to caller
uint8_t stream_id; // Stream type used by encode_stream
} parquet_stream_in;

typedef enum {
WRITE_RAW,
WRITE_CAN,
Expand Down Expand Up @@ -82,6 +92,10 @@ int parquet_write_batch_async(parquet_object *elem);
int parquet_write_batch_tmp_async(parquet_object *elem);
int parquet_write_launcher(conf_exchange *conf);

// Hint streaming writer of given topic to finish as soon as possible
// (used by webhook when a new flush arrives while previous is still running).
void parquet_stream_force_flush(const char *topic);

const char *parquet_find(const char *topic, uint64_t key);
const char **parquet_find_span(
const char *topic, uint64_t start_key, uint64_t end_key, uint32_t *size);
Expand All @@ -105,6 +119,12 @@ parquet_data_packet **parquet_find_data_span_packets_specify_file(
parquet_filename_range **parquet_get_file_ranges(
uint64_t start_key, uint64_t end_key, char *topic);

// Streaming parameters customization (configured from ringbus section).
// bytes: max payload bytes per parquet streaming chunk (0 keeps default).
void parquet_stream_set_chunk_bytes(size_t bytes);
// ms: sleep interval between streaming row groups (0 disables throttling).
void parquet_stream_set_throttle_ms(uint32_t ms);

// If filename in range is NULL return all results, else one parquet file results.
parquet_data_ret **parquet_get_data_packets_in_range_by_column(
parquet_filename_range *range, const char *topic, const char **schema,
Expand Down
2 changes: 2 additions & 0 deletions include/nng/supplemental/nanolib/ringbuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ enum fullOption {
RB_FULL_NONE,
RB_FULL_DROP,
RB_FULL_RETURN,
/* Return messages immediately and indicate streaming write */
RB_FULL_RETURN_STREAM,
RB_FULL_FILE,

RB_FULL_MAX
Expand Down
1 change: 1 addition & 0 deletions src/mqtt/protocol/exchange/exchange.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ exchange_init(exchange_t **ex, char *name, char *topic, uint8_t streamType, uint

newEx->streamType = streamType;
newEx->chunk_size = chunk_size;
newEx->streamAio = NULL;

for (unsigned int i = 0; i < RINGBUFFER_MAX; i++) {
newEx->rbs[i] = NULL;
Expand Down
13 changes: 12 additions & 1 deletion src/mqtt/protocol/exchange/exchange_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,18 @@ exchange_do_send(exchange_node_t *ex_node, nni_msg *msg, nni_aio *user_aio)
char *topic = ex_node->ex->topic;
nng_msg_set_conn_param(tmsg, topic);
nng_msg_set_cmd_type(tmsg, ex_node->ex->streamType);
nng_msg_set_payload_ptr(tmsg, (uint8_t *)ex_node->ex->chunk_size);
/* Embed exchange pointer for optional downstream use */
nng_msg_set_payload_ptr(tmsg, (uint8_t *)ex_node->ex);
/* Also pass current fullOp via aio input[1] for clarity */
if (ex_node->ex->rb_count > 0 && ex_node->ex->rbs[0] != NULL) {
nni_aio_set_input(user_aio, 1,
(void *)(uintptr_t)ex_node->ex->rbs[0]->fullOp);
}
/* If webhook provided a preferred streaming AIO in input[2], store it */
void *stream_aio = nni_aio_get_input(user_aio, 2);
if (stream_aio != NULL) {
ex_node->ex->streamAio = (nng_aio *)stream_aio;
}
nni_aio_set_msg(user_aio, tmsg);
}
nni_aio_finish(user_aio, 0, 0);
Expand Down
110 changes: 109 additions & 1 deletion src/mqtt/protocol/exchange/stream/raw_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "nng/supplemental/nanolib/parquet.h"
#endif

void raw_encode_stream(void *data, nng_aio *aio, size_t chunk_bytes);
static char **raw_schema_init()
{
char **schema = nng_alloc(2 * sizeof(char *));
Expand Down Expand Up @@ -369,11 +370,118 @@ int raw_stream_register()

strcpy(name, RAW_STREAM_NAME);

ret = stream_register(name, RAW_STREAM_ID, raw_decode, raw_encode, raw_cmd_parser);
ret = stream_register(name, RAW_STREAM_ID, raw_decode, raw_encode, raw_encode_stream, raw_cmd_parser);
if (ret != 0) {
nng_free(name, strlen(name) + 1);
return ret;
}

return 0;
}

// Simple streaming encoder: push input data to aio in ~5KB batches.
void raw_encode_stream(void *data, nng_aio *aio, size_t chunk_bytes)
{
// Only support RAW stream: 1 data column + ts column.
struct stream_data_in *input_stream = (struct stream_data_in *)data;
if (input_stream == NULL || aio == NULL || input_stream->len == 0) {
return;
}
if (chunk_bytes == 0) {
chunk_bytes = 5 * 1024;
}

uint32_t start = 0;
while (start < input_stream->len) {
uint32_t rows = 0;
size_t acc = 0;
// Calculate the number of rows in this batch.
for (uint32_t i = start; i < input_stream->len; i++) {
size_t one = input_stream->lens[i];
if (rows > 0 && acc + one > chunk_bytes) {
break;
}
acc += one;
rows++;
}
if (rows == 0) {
rows = 1;
}

// Build parquet_data with a single \"data\" column.
char **schema = nng_alloc(2 * sizeof(char *));
if (schema == NULL) {
return;
}
schema[0] = nng_alloc(strlen("ts") + 1);
if (schema[0] == NULL) {
nng_free(schema, 2 * sizeof(char *));
return;
}
strcpy(schema[0], "ts");
schema[1] = nng_alloc(strlen("data") + 1);
if (schema[1] == NULL) {
nng_free(schema[0], strlen("ts") + 1);
nng_free(schema, 2 * sizeof(char *));
return;
}
strcpy(schema[1], "data");

parquet_data_packet ***payload_arr = nng_alloc(sizeof(parquet_data_packet **) * 1);
if (payload_arr == NULL) {
nng_free(schema[0], strlen("ts") + 1);
nng_free(schema[1], strlen("data") + 1);
nng_free(schema, 2 * sizeof(char *));
return;
}
payload_arr[0] = nng_alloc(sizeof(parquet_data_packet *) * rows);
if (payload_arr[0] == NULL) {
nng_free(payload_arr, sizeof(parquet_data_packet **) * 1);
nng_free(schema[0], strlen("ts") + 1);
nng_free(schema[1], strlen("data") + 1);
nng_free(schema, 2 * sizeof(char *));
return;
}
uint64_t *ts = nng_alloc(sizeof(uint64_t) * rows);
if (ts == NULL) {
nng_free(payload_arr[0], sizeof(parquet_data_packet *) * rows);
nng_free(payload_arr, sizeof(parquet_data_packet **) * 1);
nng_free(schema[0], strlen("ts") + 1);
nng_free(schema[1], strlen("data") + 1);
nng_free(schema, 2 * sizeof(char *));
return;
}
Comment on lines +412 to +453
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak on early return: When any allocation fails in the function (lines 413-453), the function returns early without cleaning up previously allocated resources. For example, if allocation of 'ts' fails at line 445, the function returns without freeing schema[0], schema[1], schema, payload_arr[0], and payload_arr that were allocated earlier. All early return paths should properly free previously allocated memory.

Copilot uses AI. Check for mistakes.

for (uint32_t r = 0; r < rows; r++) {
uint32_t idx = start + r;
ts[r] = input_stream->keys[idx];
payload_arr[0][r] = nng_alloc(sizeof(parquet_data_packet));
if (payload_arr[0][r] == NULL) {
// Memory leak acceptable in debug stage; refine free logic later.
continue;
}
payload_arr[0][r]->size = input_stream->lens[idx];
if (payload_arr[0][r]->size > 0) {
payload_arr[0][r]->data = nng_alloc(payload_arr[0][r]->size);
if (payload_arr[0][r]->data != NULL) {
memcpy(payload_arr[0][r]->data, input_stream->datas[idx], payload_arr[0][r]->size);
}
} else {
payload_arr[0][r]->data = NULL;
}
}
Comment on lines +455 to +472
Copy link

Copilot AI Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Memory leak on allocation failure: When allocation of payload_arr[0][r] fails at line 459, the code continues to the next iteration without freeing previously allocated payload_arr[0][0..r-1] entries. The comment acknowledges this is acceptable in "debug stage", but this should be fixed before production. A proper cleanup path should free all previously allocated resources when any allocation in the loop fails.

Copilot uses AI. Check for mistakes.

parquet_data *chunk = parquet_data_alloc(schema, payload_arr, ts, 1, rows);

// Submit this batch via aio.
nng_aio_set_msg(aio, NULL);
nng_aio_set_prov_data(aio, chunk);
nng_aio_set_output(aio, 0,
(void *)(uintptr_t) (start + rows >= input_stream->len)); // last flag
nng_aio_finish(aio, 0);
// Synchronously wait callback before next batch to avoid AIO re-entrance.
nng_aio_wait(aio);

start += rows;
}
}
13 changes: 13 additions & 0 deletions src/mqtt/protocol/exchange/stream/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ int stream_register(char *name,
uint8_t id,
void *(*decode)(void *),
void *(*encode)(void *),
void (*encode_stream)(void *, nng_aio *, size_t),
void *(*cmd_parser)(void *))
{
stream_node *snode = NULL;
Expand All @@ -37,6 +38,7 @@ int stream_register(char *name,
snode->id = id;
snode->decode = decode;
snode->encode = encode;
snode->encode_stream = encode_stream;
snode->cmd_parser = cmd_parser;

nng_id_set(stream_node_map, id, snode);
Expand Down Expand Up @@ -82,6 +84,17 @@ void *stream_encode(uint8_t id, void *buf)
return snode->encode(buf);
}

int stream_encode_stream(uint8_t id, void *buf, nng_aio *aio, size_t chunk_bytes)
{
stream_node *snode = NULL;
snode = nng_id_get(stream_node_map, id);
if (snode == NULL || snode->encode_stream == NULL) {
return NNG_ENOTSUP;
}
snode->encode_stream(buf, aio, chunk_bytes);
return 0;
}

void *stream_cmd_parser(uint8_t id, void *buf)
{
stream_node *snode = NULL;
Expand Down
2 changes: 1 addition & 1 deletion src/mqtt/protocol/exchange/stream/stream_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void test_stream_sys_register(void)
ret = stream_sys_init();
NUTS_TRUE(ret == 0);

ret = stream_register("raw", RAW_STREAM_ID, NULL, NULL, NULL);
ret = stream_register("raw", RAW_STREAM_ID, NULL, NULL, NULL, NULL);
NUTS_TRUE(ret != 0);

stream_sys_fini();
Expand Down
9 changes: 6 additions & 3 deletions src/supplemental/nanolib/conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -1252,9 +1252,12 @@ print_exchange_conf(conf_exchange *exchange)
log_info("exchange limit_frequency %d", n->limit_frequency);
for (int j=0; j< (int) n->rbufs_sz; j++) {
ringBuffer_node *r = n->rbufs[j];
log_info("exchange ringbus name %s", r->name);
log_info("exchange ringbus cap %d", r->cap);
log_info("exchange ringbus fullOp %d", r->fullOp);
log_info("exchange ringbus name %s", r->name);
log_info("exchange ringbus cap %d", r->cap);
log_info("exchange ringbus fullOp %d", r->fullOp);
log_info("exchange ringbus stream_chunk_bytes %u", r->stream_chunk_bytes);
log_info("exchange ringbus stream_throttle_ms %u", r->stream_throttle_ms);
log_info("exchange ringbus stream_lmq_cap %u", r->stream_lmq_cap);
}

if (n->parquet != NULL) {
Expand Down
10 changes: 8 additions & 2 deletions src/supplemental/nanolib/conf_ver2.c
Original file line number Diff line number Diff line change
Expand Up @@ -1626,15 +1626,21 @@ conf_exchange_node_parse(conf_exchange_node *node, cJSON *exchange)
return;
}

rb_node->cap = 0;
rb_node->fullOp = 0;
rb_node->cap = 0;
rb_node->fullOp = 0;
rb_node->stream_chunk_bytes = 0;
rb_node->stream_throttle_ms = 0;
rb_node->stream_lmq_cap = 0;

hocon_read_str(rb_node, name, rb);
hocon_read_num(rb_node, cap, rb);
if ((uint64_t)rb_node->cap > (uint64_t)RINGBUFFER_MAX_SIZE) {
log_error("exchange: ringbuffer: name/cap is exceeding limit");
}
hocon_read_num(rb_node, fullOp, rb);
hocon_read_num(rb_node, stream_chunk_bytes, rb);
hocon_read_num(rb_node, stream_throttle_ms, rb);
hocon_read_num(rb_node, stream_lmq_cap, rb);

if (rb_node->name == NULL || rb_node->cap == 0) {
log_error("exchange: ringbuffer: name/cap not found");
Expand Down
Loading
Loading