enh:create stream auto create output table#34723
Conversation
There was a problem hiding this comment.
Pull request overview
This PR adds support for automatically creating stream output tables (including normal output tables) and introduces a NODELAY_CREATE_SUBTABLE option to eagerly create output subtables at stream creation time (notably for partition by tbname streams). It wires the new option from SQL parsing → AST/JSON → create-stream request → mnode deploy messaging → trigger runtime behavior, and adds a new stream test.
Changes:
- Add
NODELAY_CREATE_SUBTABLEsyntax/AST fields and propagate it through stream request serialization and deploy messages. - Implement mnode-side creation of output normal tables during
CREATE STREAM, and trigger-side “create-table-only” requests for eager subtable creation. - Add a new test case covering auto creation behavior for output stables/normal tables.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| test/cases/18-StreamProcessing/99-Others/auto_create_output_table.py | Adds test coverage for auto-creation of output stables/normal tables and eager subtable creation. |
| source/libs/parser/src/parTranslater.c | Propagates nodelayCreateSubtable into the create-stream request (plus formatting changes). |
| source/libs/parser/src/parTokenizer.c | Registers NODELAY_CREATE_SUBTABLE as a SQL keyword. |
| source/libs/parser/src/parAstCreater.c | Extends stream out-table node creation to carry nodelayCreateSubtable into the create-stream stmt. |
| source/libs/parser/inc/sql.y | Adds nodelay_create_subtable_opt grammar for INTO ... NODELAY_CREATE_SUBTABLE .... |
| source/libs/parser/inc/parAst.h | Updates createStreamOutTableNode signature to include nodelayCreateSubtable. |
| source/libs/nodes/src/nodesCodeFuncs.c | Adds JSON encode/decode for SCreateStreamStmt.nodelayCreateSubtable. |
| include/libs/nodes/querynodes.h | Adds nodelayCreateSubtable field to SStreamOutTableNode. |
| include/libs/nodes/cmdnodes.h | Adds nodelayCreateSubtable field to SCreateStreamStmt. |
| include/common/streamMsg.h | Adds nodelayCreateSubtable to SCMCreateStreamReq and SStreamTriggerDeployMsg. |
| source/common/src/msg/streamMsg.c | Encodes/decodes nodelayCreateSubtable in trigger deploy messages. |
| source/common/src/msg/streamJson.c | Adds JSON encode/decode for SCMCreateStreamReq.nodelayCreateSubtable. |
| source/dnode/mnode/impl/src/mndStreamMgmt.c | Includes nodelayCreateSubtable in trigger deploy info; improves taskMap missing logging. |
| source/libs/new-stream/inc/streamTriggerTask.h | Adds runtime fields for eager subtable creation coordination. |
| source/libs/new-stream/src/streamTriggerTask.c | Implements eager subtable creation flow: pull group info per gid then send “create-table-only” calc requests. |
| source/dnode/vnode/src/vnd/vnodeStream.c | Returns gid list even when no data for groupByTbname (to support eager creation). |
| source/dnode/mnode/impl/src/mndStream.c | Creates output normal table during CREATE STREAM when missing; adds vgroup header include. |
| source/dnode/mnode/impl/src/mndStreamUtil.c | Fixes hash key usage for streamMap lookup; adds debug logging around show streams status. |
| source/dnode/mnode/impl/src/mndStb.c | Registers handler for TDMT_VND_CREATE_TABLE_RSP to be processed by transaction machinery. |
| source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | Routes TDMT_VND_CREATE_TABLE_RSP into mnode write queue. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
test/cases/18-StreamProcessing/99-Others/auto_create_output_table.py
Outdated
Show resolved
Hide resolved
| if (pTask->nodelayCreateSubtable) { | ||
| if (nrows > 0 && pGidData != NULL) { | ||
| pContext->pPendingCreateTableGids = taosArrayInit(0, sizeof(int64_t)); | ||
| QUERY_CHECK_NULL(pContext->pPendingCreateTableGids, code, lino, _end, terrno); | ||
| pContext->pPendingCreateTableProgress = pProgress; | ||
| if (pTask->isVirtualTable) { |
There was a problem hiding this comment.
pPendingCreateTableGids is unconditionally re-initialized on every STRIGGER_PULL_LAST_TS response when nodelayCreateSubtable is enabled. With multiple readers, later responses can overwrite/leak the existing pending list and lose work. Guard this block (e.g., only initialize when pPendingCreateTableGids == NULL) and define how to merge gids across readers before sending the first GROUP_COL_VALUE pull.
| // Build schema from outCols (same logic as buildNormalTableCreateReq) | ||
| int32_t numOfCols = taosArrayGetSize(pStream->outCols); | ||
| createReq.ntb.schemaRow.nCols = numOfCols; | ||
| createReq.ntb.schemaRow.version = 1; | ||
| createReq.ntb.schemaRow.pSchema = taosMemoryCalloc(numOfCols, sizeof(SSchema)); | ||
| if (createReq.ntb.schemaRow.pSchema == NULL) { |
There was a problem hiding this comment.
When creating the output normal table, SVCreateTbReq requires ntb.userId to be set (it becomes the table owner in vnode meta). This request currently never sets createReq.ntb.userId, so the created table will have ownerId=0, which can break privilege/ownership semantics. Set createReq.ntb.userId to the stream owner/user (consistent with other create-table paths).
test/cases/18-StreamProcessing/99-Others/auto_create_output_table.py
Outdated
Show resolved
Hide resolved
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant enhancement to stream processing by allowing users to explicitly control the timing of output table creation. It adds a Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a feature to automatically create output tables for streams using the NODELAY_CREATE_SUBTABLE keyword, including the new function mndStreamCreateOutTable. A high-severity Broken Access Control vulnerability has been identified in the new table creation logic, as it fails to verify user permissions on the target database, potentially allowing authenticated users to create tables in databases they lack write access to. Additionally, the review highlights areas for improving error logging, enhancing code maintainability in the new C functions, and correcting issues in the new Python test file.
| static int32_t mndStreamCreateOutTable(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream) { | ||
| int32_t code = 0; | ||
| int32_t lino = 0; | ||
| SVgObj *pVgroup = NULL; | ||
| SDbObj *pDb = NULL; | ||
| SName name = {0}; | ||
| char dbFName[TSDB_DB_FNAME_LEN] = {0}; | ||
|
|
||
| // Parse database and table name | ||
| if ((code = tNameFromString(&name, pStream->outDB, T_NAME_ACCT | T_NAME_DB)) != 0) { | ||
| mError("stream:%s failed to parse outDB:%s, code:%s", pStream->name, pStream->outDB, tstrerror(code)); | ||
| return code; | ||
| } | ||
| if ((code = tNameGetFullDbName(&name, dbFName)) != 0) { | ||
| mError("stream:%s failed to get full db name, code:%s", pStream->name, tstrerror(code)); | ||
| return code; | ||
| } | ||
|
|
||
| // Get database object | ||
| pDb = mndAcquireDb(pMnode, dbFName); | ||
| if (pDb == NULL) { | ||
| code = TSDB_CODE_MND_DB_NOT_SELECTED; | ||
| mError("stream:%s failed to acquire db:%s, code:%s", pStream->name, dbFName, tstrerror(code)); | ||
| return code; | ||
| } | ||
|
|
||
| // Set transaction db name and check conflict (similar to mndAddStbToTrans) | ||
| mndTransSetDbName(pTrans, pDb->name, pStream->outTblName); | ||
| code = mndTransCheckConflict(pMnode, pTrans); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| mError("stream:%s failed to check conflict, code:%s", pStream->name, tstrerror(code)); | ||
| goto _OVER; | ||
| } |
There was a problem hiding this comment.
The mndStreamCreateOutTable function, while handling the automatic creation of output tables, has a critical security vulnerability: it lacks authorization checks for user permissions on the target database (pStream->outDB). This could allow authenticated users to create tables in databases they do not have write access to, leading to unauthorized data injection or resource exhaustion. Additionally, the function is currently over 250 lines long, which impacts readability and maintainability. Consider refactoring it into smaller, more focused helper functions after addressing the security vulnerability.
static int32_t mndStreamCreateOutTable(SMnode *pMnode, STrans *pTrans, const SCMCreateStreamReq *pStream, const char *pUser) {
int32_t code = 0;
int32_t lino = 0;
SVgObj *pVgroup = NULL;
SDbObj *pDb = NULL;
SName name = {0};
char dbFName[TSDB_DB_FNAME_LEN] = {0};
// Parse database and table name
if ((code = tNameFromString(&name, pStream->outDB, T_NAME_ACCT | T_NAME_DB)) != 0) {
mError("stream:%s failed to parse outDB:%s, code:%s", pStream->name, pStream->outDB, tstrerror(code));
return code;
}
if ((code = tNameGetFullDbName(&name, dbFName)) != 0) {
mError("stream:%s failed to get full db name, code:%s", pStream->name, tstrerror(code));
return code;
}
// Get database object
pDb = mndAcquireDb(pMnode, dbFName);
if (pDb == NULL) {
code = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stream:%s failed to acquire db:%s, code:%s", pStream->name, dbFName, tstrerror(code));
return code;
}
// Check if user has privilege to create table in the target database
code = mndCheckDbPrivilegeByName(pMnode, pUser, MND_OPER_WRITE_DB, dbFName);
if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s user:%s has no privilege to create table in db:%s", pStream->name, pUser, dbFName);
goto _OVER;
}
// Set transaction db name and check conflict (similar to mndAddStbToTrans)
mndTransSetDbName(pTrans, pDb->name, pStream->outTblName);
code = mndTransCheckConflict(pMnode, pTrans);
if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s failed to check conflict, code:%s", pStream->name, tstrerror(code));
goto _OVER;
}There was a problem hiding this comment.
有建流权限一定有建表权限,否则,没有建流权限,就不会走到这里的逻辑
| mstsInfo("stream:%s created output super table:%s", pStream->pCreate->name, pStream->pCreate->outTblName); | ||
| } else if (TSDB_NORMAL_TABLE == pStream->pCreate->outTblType) { | ||
| // Create normal table in vnode | ||
| code = mndStreamCreateOutTable(pMnode, pTrans, pStream->pCreate); |
There was a problem hiding this comment.
The call to mndStreamCreateOutTable is missing the user context required for authorization checks. Unlike the parallel call to mndStreamCreateOutStb on line 1326, this call does not pass RPC_MSG_USER(pReq), which prevents the function from verifying if the user has permission to create a table in the target database.
code = mndStreamCreateOutTable(pMnode, pTrans, pStream->pCreate, pUser);| mError("stream:%s failed to create output normal table:%s, line:%d code:%s", pStream->name, pStream->outTblName, | ||
| lino, tstrerror(code)); | ||
| } |
There was a problem hiding this comment.
The lino variable is initialized to 0 and is never updated within this function. Consequently, if an error occurs, the logged line number will always be 0, which is not helpful for debugging. You should update lino with __LINE__ before each goto _OVER; that doesn't use a macro like TSDB_CHECK_CODE to ensure meaningful error messages.
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.