Conversation
Signed-off-by: Moi Ran <maoyi.ran@emqx.io>
Signed-off-by: Moi Ran <maoyi.ran@emqx.io>
Signed-off-by: Moi Ran <maoyi.ran@emqx.io>
Signed-off-by: Moi Ran <maoyi.ran@emqx.io>
There was a problem hiding this comment.
Pull request overview
This work-in-progress PR implements parquet streaming functionality for NanoMQ, allowing data to be written to parquet files incrementally in chunks rather than all at once. The implementation adds a new ring buffer full operation mode (RB_FULL_RETURN_STREAM) and introduces per-topic worker threads that handle streaming writes asynchronously.
Key Changes:
- Added streaming infrastructure with per-topic worker threads and streaming contexts to handle incremental parquet writes
- Extended the stream registration API to support optional streaming encode callbacks
- Implemented cross-device file copy fallback for rename operations and added streaming configuration parameters (chunk size, throttle interval)
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 18 comments.
Show a summary per file
| File | Description |
|---|---|
| src/supplemental/nanolib/ringbuffer/ringbuffer.c | Added RB_FULL_RETURN_STREAM enum value and passes fullOp to webhook via aio |
| src/supplemental/nanolib/parquet/parquet.cc | Core streaming implementation: added StreamingCtrl structure, per-topic worker threads, streaming write context, AIO callbacks, and cross-device rename fallback |
| src/supplemental/nanolib/conf_ver2.c | Added parsing for new streaming configuration parameters (stream_chunk_bytes, stream_throttle_ms, stream_lmq_cap) |
| src/supplemental/nanolib/conf.c | Added logging for new streaming configuration fields |
| src/mqtt/protocol/exchange/stream/stream_test.c | Updated test to match new stream_register signature with encode_stream parameter |
| src/mqtt/protocol/exchange/stream/stream.c | Added encode_stream callback support to stream registration and new stream_encode_stream function |
| src/mqtt/protocol/exchange/stream/raw_stream.c | Implemented raw_encode_stream function to batch input data into parquet chunks |
| src/mqtt/protocol/exchange/exchange_server.c | Modified to pass exchange pointer and fullOp via aio for downstream use |
| src/mqtt/protocol/exchange/exchange.c | Initialized streamAio field in exchange structure |
| include/nng/supplemental/nanolib/ringbuffer.h | Added RB_FULL_RETURN_STREAM enum value |
| include/nng/supplemental/nanolib/parquet.h | Added parquet_stream_in structure, force_flush API, and streaming parameter setters |
| include/nng/supplemental/nanolib/conf.h | Added streaming configuration fields to ringBuffer_node and conf_web_hook structures |
| include/nng/exchange/stream/stream.h | Added encode_stream callback to stream_node and stream_encode_stream function declaration |
| include/nng/exchange/exchange.h | Added streamAio field to exchange structure |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| parquet_stream_in *sin = (parquet_stream_in *) elem->aio_arg; | ||
| if (sin == nullptr || sin->magic != PARQUET_STREAM_IN_MAGIC || sin->sdata == nullptr) { | ||
| log_error("streaming cbdata invalid"); | ||
| parquet_object_free(elem); | ||
| return -1; | ||
| } | ||
|
|
There was a problem hiding this comment.
Unclear variable name: The variable 'sin' (parquet_stream_in) uses an abbreviated name that could be confused with the mathematical sine function. Consider using a more descriptive name like 'stream_in' or 'pstream_in' to improve code clarity.
| } | ||
|
|
||
| static char * | ||
| get_random_file_name(char *prefix, uint64_t key_start, uint64_t key_end) | ||
| get_random_file_name(const char *dir, char *prefix, uint64_t key_start, uint64_t key_end) | ||
| { | ||
| char *file_name = NULL; | ||
| char dir[] = "/tmp"; | ||
|
|
||
| file_name = (char *) malloc(strlen(prefix) + strlen(dir) + | ||
| UINT64_MAX_DIGITS + UINT64_MAX_DIGITS + 16); |
There was a problem hiding this comment.
Incorrect log level: This appears to be a debug or info message showing the generated filename, but it's logged with log_error severity. This should be changed to log_debug or log_info to accurately reflect the nature of the message.
| } | ||
|
|
||
| // Mark this topic streaming as running and observe flush-now flag if set later | ||
| StreamingCtrl *ctrl = get_stream_ctrl(elem->topic); |
There was a problem hiding this comment.
Missing null check: The get_stream_ctrl function can theoretically return nullptr if memory allocation fails for the new StreamingCtrl, but line 974 does not check for this before dereferencing ctrl. While unlikely, this could cause a null pointer dereference if allocation fails.
| StreamingCtrl *ctrl = get_stream_ctrl(elem->topic); | |
| StreamingCtrl *ctrl = get_stream_ctrl(elem->topic); | |
| if (ctrl == nullptr) { | |
| log_error("Failed to get streaming control for topic: %s", elem->topic); | |
| parquet_object_free(elem); | |
| return -1; | |
| } |
| nng_mtx_alloc(&ctx.mtx); | ||
| nng_cv_alloc(&ctx.cv, ctx.mtx); |
There was a problem hiding this comment.
Inconsistent error handling: Lines 1011-1012 allocate nng_mtx and nng_cv without checking the return values. If these allocations fail, ctx.mtx or ctx.cv will be NULL, leading to undefined behavior when they're used later (e.g., line 1040). These allocations should be checked, and the function should handle failure appropriately.
| nng_mtx_alloc(&ctx.mtx); | |
| nng_cv_alloc(&ctx.cv, ctx.mtx); | |
| if (nng_mtx_alloc(&ctx.mtx) != 0) { | |
| if (ctx.tmp_name) { | |
| free(ctx.tmp_name); | |
| ctx.tmp_name = NULL; | |
| tmp_name = NULL; | |
| } | |
| parquet_stream_release_ctrl(ctrl); | |
| parquet_object_free(elem); | |
| return -1; | |
| } | |
| if (nng_cv_alloc(&ctx.cv, ctx.mtx) != 0) { | |
| nng_mtx_free(ctx.mtx); | |
| ctx.mtx = NULL; | |
| if (ctx.tmp_name) { | |
| free(ctx.tmp_name); | |
| ctx.tmp_name = NULL; | |
| tmp_name = NULL; | |
| } | |
| parquet_stream_release_ctrl(ctrl); | |
| parquet_object_free(elem); | |
| return -1; | |
| } |
| } | ||
| // run streaming | ||
| nng_aio *laio = NULL; | ||
| nng_aio_alloc(&laio, parquet_stream_aio_cb, &ctx); |
There was a problem hiding this comment.
Missing error check for nng_aio_alloc: The nng_aio_alloc call at line 1023 may fail, but the code does not check if laio is NULL before using it at line 1025. If allocation fails, this will cause a null pointer dereference when stream_encode_stream is called with a NULL aio.
| nng_aio_alloc(&laio, parquet_stream_aio_cb, &ctx); | |
| int aio_alloc_ret = nng_aio_alloc(&laio, parquet_stream_aio_cb, &ctx); | |
| if (aio_alloc_ret != 0 || laio == NULL) { | |
| nng_cv_free(ctx.cv); | |
| nng_mtx_free(ctx.mtx); | |
| if (ctx.tmp_name) { | |
| free(ctx.tmp_name); | |
| ctx.tmp_name = NULL; | |
| tmp_name = NULL; | |
| } | |
| parquet_stream_release_ctrl(ctrl); | |
| parquet_object_free(elem); | |
| return aio_alloc_ret != 0 ? aio_alloc_ret : -1; | |
| } |
| if (src && dst) { | ||
| dst << src.rdbuf(); | ||
| dst.flush(); | ||
| src.close(); | ||
| dst.close(); | ||
| if (remove(filename.c_str()) != 0) { | ||
| log_error("Failed to remove temp file %s errno: %d", | ||
| filename.c_str(), errno); | ||
| } |
There was a problem hiding this comment.
Missing error check after file copy: After copying the file and flushing (lines 816-819), the code does not verify that the copy operation succeeded. The dst.flush() could fail due to disk full or I/O errors, but there's no check of the stream state before removing the source file. This could result in data loss if the copy fails silently. Check dst.good() or similar before removing the source file.
| // All are optional; 0 means "use built-in default". | ||
| uint32_t stream_chunk_bytes; // Max bytes per parquet streaming chunk | ||
| uint32_t stream_throttle_ms; // Sleep interval between streaming rowgroups | ||
| uint32_t stream_lmq_cap; // In-memory queue size for pending streaming flush tasks |
There was a problem hiding this comment.
Unused configuration field: The stream_lmq_cap field is added to the ringBuffer_node structure and parsed from configuration (line 1643 in conf_ver2.c), but there's no code in the implementation that actually uses this value to limit queue capacity. Either implement the queue capacity limiting logic or remove this unused configuration field.
| uint32_t stream_lmq_cap; // In-memory queue size for pending streaming flush tasks |
| parquet_stream_release_ctrl(ctrl); | ||
| return -1; | ||
| } | ||
|
|
||
| parquet_stream_release_ctrl(ctrl); | ||
|
|
||
| parquet_object_free(elem); |
There was a problem hiding this comment.
Potential resource leak on error: If parquet_stream_finalize returns -1 at line 1061, the function releases ctrl and returns, but it has already freed elem inside parquet_stream_finalize (line 463). However, if compute_and_rename_file_withMD5 returns nullptr (line 462), parquet_object_free(elem) is called, and then the function returns -1 before this point. This means the caller's elem could be double-freed if it tries to clean up after this function returns error. The ownership and cleanup responsibility needs to be clearer.
| parquet_stream_release_ctrl(ctrl); | |
| return -1; | |
| } | |
| parquet_stream_release_ctrl(ctrl); | |
| parquet_object_free(elem); | |
| // If parquet_stream_finalize frees elem on error, set elem to NULL to avoid double-free | |
| elem = NULL; | |
| parquet_stream_release_ctrl(ctrl); | |
| return -1; | |
| } | |
| parquet_stream_release_ctrl(ctrl); | |
| if (elem) | |
| parquet_object_free(elem); |
| } | ||
|
|
||
| // Streaming write: when data is NULL and aio_arg is provided. | ||
| if (elem->data == nullptr && elem->aio != nullptr) { |
There was a problem hiding this comment.
Logic error in streaming detection: The condition checks for elem->data == nullptr && elem->aio != nullptr, but based on line 966, the streaming path actually expects elem->aio_arg (not elem->aio) to contain the parquet_stream_in structure. This condition should be checking elem->aio_arg instead of elem->aio, otherwise the streaming path may not be triggered correctly.
| if (elem->data == nullptr && elem->aio != nullptr) { | |
| if (elem->data == nullptr && elem->aio_arg != nullptr) { |
| // Streaming control: per-topic flags & worker context for streaming writes. | ||
| struct StreamingCtrl { | ||
| nng_mtx *mtx; | ||
| bool running; | ||
| bool flush_now; | ||
| // Dedicated per-topic worker thread & queue for streaming parquet writes. | ||
| std::queue<parquet_object *> q; | ||
| pthread_mutex_t q_mtx; | ||
| pthread_cond_t q_cv; | ||
| bool thread_started; | ||
| bool stop; | ||
| pthread_t thread; | ||
|
|
||
| StreamingCtrl() | ||
| : mtx(nullptr) | ||
| , running(false) | ||
| , flush_now(false) | ||
| , thread_started(false) | ||
| , stop(false) | ||
| { | ||
| nng_mtx_alloc(&mtx); | ||
| pthread_mutex_init(&q_mtx, NULL); | ||
| pthread_cond_init(&q_cv, NULL); | ||
| } | ||
| ~StreamingCtrl() { | ||
| if (mtx) { | ||
| nng_mtx_free(mtx); | ||
| } | ||
| pthread_mutex_destroy(&q_mtx); | ||
| pthread_cond_destroy(&q_cv); | ||
| } | ||
| }; |
There was a problem hiding this comment.
Missing thread cleanup mechanism: The StreamingCtrl structure has a 'stop' flag and worker thread, but there's no code path that sets ctrl->stop = true to gracefully shut down the worker thread. When the application terminates or a topic is removed, the worker threads created by pthread_create (line 202) will continue running indefinitely since they're detached. A cleanup function should be added to set stop=true and signal the condition variable to allow graceful thread termination.
No description provided.