Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/en/06-advanced/03-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ For detailed usage instructions, see [SQL Manual](../14-reference/03-taos-sql/41
## Create a Stream

```sql
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [NODELAY_CREATE_SUBTABLE] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]

options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
Expand Down
5 changes: 3 additions & 2 deletions docs/en/14-reference/03-taos-sql/41-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ TDengine TSDB’s stream processing engine also offers additional usability bene
## Create a Stream

```sql
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [NODELAY_CREATE_SUBTABLE] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]

options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
Expand Down Expand Up @@ -300,7 +300,7 @@ Specifies the columns used for trigger grouping. Multiple columns are supported,
By default, the results of a stream are stored in an output table. Each output table contains only the results that have been triggered and computed up to the current time. You can define the structure of the output table, and if grouping is used, you can also specify the tag values for each subtable.

```sql
[INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])]
[INTO [db_name.]table_name] [NODELAY_CREATE_SUBTABLE] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])]

tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
Expand All @@ -312,6 +312,7 @@ Details are as follows:
- If trigger grouping is used, this table will be a supertable.
- If no trigger grouping is used, this table will be a regular table.
- If the trigger only sends notifications without computation, or if computation results are only sent as notifications without being stored, this option does not need to be specified.
- `[NODELAY_CREATE_SUBTABLE]`: Optional. Specifies that the calculation output subtables for each group are created immediately when the stream is created. By default, output subtables are created only when the first calculated data is written. If this option is added, subtables are created asynchronously after the stream is created. If not all subtables are created successfully, the stream status remains `Idle`; if creation succeeds, the status changes to `Running`. For regular tables and supertables as output tables, they are created automatically when the stream is created by default, and no configuration is needed.
- `[OUTPUT_SUBTABLE(tbname_expr)]`: Optional. Specifies the name of the calculation output table (subtable) for each trigger group. This cannot be specified if there is no trigger grouping. If not specified, a unique output table (subtable) name will be automatically generated for each group. tbname_expr can be any output string expression, and may include trigger group partition columns (from [PARTITION BY col1[, ...]]). The output length must not exceed the maximum table name length; if it does, it will be truncated. If you do not want different groups to output to the same subtable, you must ensure each group's output table name is unique.
- `[(column_name1, column_name2 [COMPOSITE KEY][, ...])]`: Optional. Specifies the column names for each column in the output table. If not specified, each column name will be the same as the corresponding column name in the calculation result. You can use [COMPOSITE KEY] to indicate that the second column is a primary key column, forming a composite primary key together with the first column.
- `[TAGS (tag_definition [, ...])]`: Optional. Specifies the list of tag column definitions and values for the output supertable. This can only be specified if trigger grouping is present. If not specified, the tag column definitions and values are derived from all grouping columns, and in this case, grouping columns cannot have duplicate names. When grouping by subtable, the default generated tag column name is tag_tbname, with the type VARCHAR(270). The tag_definition parameters are as follows:
Expand Down
1 change: 1 addition & 0 deletions docs/en/14-reference/03-taos-sql/92-keywords.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ The list of keywords is as follows:
| NE | |
| NEXT | |
| NMATCH | |
| NODELAY_CREATE_SUBTABLE | 3.5.0.0+ |
| NONE | |
| NORMAL | |
| NOT | |
Expand Down
71 changes: 71 additions & 0 deletions docs/stream_create_deploy_sequence.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# 建流与部署时序:正常 vs 异常

## 1. 正常情况(流先落 SDB,再被部署)

```mermaid
sequenceDiagram
participant Client
participant MND as Mnode(trans)
participant SDB as SDB
participant MSTM as Mnode(StreamMgmt)
participant Snode

Client->>MND: create stream 请求
MND->>MND: trans 创建 (prepare)
MND->>MND: redoAction: vnode-create-stb (x6)
Note over MND: 所有 redoAction 完成
MND->>MND: stage → commit, propose(seq:40)
MND->>MND: 其他节点 apply commit
MND->>SDB: commitAction:0 (stb)
MND->>SDB: commitAction:1 (stream 写入)
Note over MND,SDB: 流已持久化到 SDB
MND->>MSTM: (建流事务 commit 完成后) post DEPLOY 入队
MND->>Client: create-stream-rsp

Snode->>MSTM: stream 心跳
MSTM->>MSTM: 处理 deploy action
MSTM->>SDB: mndAcquireStream(streamName)
SDB-->>MSTM: 返回 stream 对象 (存在)
MSTM->>MSTM: msmDeployStreamTasks()
MSTM->>Snode: 部署 / 调度
```

**要点**:DEPLOY 入队发生在 **commitAction 执行之后**(即 stream 已写入 SDB 之后),心跳处理 deploy 时 `mndAcquireStream` 能查到流。

---

## 2. 本次异常情况(DEPLOY 先入队,流尚未落 SDB)

```mermaid
sequenceDiagram
participant Client
participant MND as Mnode(trans)
participant SDB as SDB
participant MSTM as Mnode(StreamMgmt)
participant Snode

Client->>MND: create stream 请求
MND->>MND: trans 创建 (prepare), propose(seq:39)
MND->>MND: redoAction: vnode-create-stb (x6)
Note over MND: 部分/全部 redoAction 完成
MND->>MSTM: "half completed" 时 post DEPLOY 入队
Note over MND,MSTM: 此时 stream 尚未写入 SDB
MND->>MND: redoAction 全部完成 → stage commit
MND->>MND: propose(seq:40),等待 apply

Snode->>MSTM: stream 心跳 (约 230ms 后)
MSTM->>MSTM: 处理 deploy action
MSTM->>SDB: mndAcquireStream(streamName)
SDB-->>MSTM: NULL (stream 不存在)
MSTM->>MSTM: "stream no longer exists, ignore deploy"
MSTM->>Snode: 心跳响应 (未部署)

Note over MND: 约 110ms 后
MND->>MND: process sync proposal, apply index:40
MND->>SDB: commitAction:0 (stb)
MND->>SDB: commitAction:1 (stream 写入)
Note over MND,SDB: 流此时才落库,但 deploy 已被忽略
MND->>Client: create-stream-rsp
```

**要点**:DEPLOY 在 **redoAction 阶段 / half completed** 就入队,而 stream 要等到 **commit 被 apply 后** 才写入 SDB。心跳先于 commit 处理了 deploy,此时 SDB 里还没有该流,于是 “no longer exists, ignore deploy”,导致该流本次不会再被部署。
2 changes: 1 addition & 1 deletion docs/zh/06-advanced/03-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ TDengine TSDB 的流计算引擎还提供了其他使用上的便利。针对结
## 流式计算的创建

```sql
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [NODELAY_CREATE_SUBTABLE] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]

options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
Expand Down
5 changes: 3 additions & 2 deletions docs/zh/14-reference/03-taos-sql/41-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ TDengine TSDB 的流计算引擎还提供了其他使用上的便利。针对结
## 创建流式计算

```sql
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]
CREATE STREAM [IF NOT EXISTS] [db_name.]stream_name options [INTO [db_name.]table_name] [NODELAY_CREATE_SUBTABLE] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])] [AS subquery]

options: {
trigger_type [FROM [db_name.]table_name] [PARTITION BY col1 [, ...]] [STREAM_OPTIONS(stream_option [|...])] [notification_definition]
Expand Down Expand Up @@ -299,7 +299,7 @@ COUNT_WINDOW(count_val[, sliding_val][, col1[, ...]])
流计算的计算结果默认会保存到输出表中,每个输出表中的计算结果是截至当前时刻已经触发和计算完成的输出。可以指定输出表的结构定义,如果存在分组还可以指定子表的标签值。

```sql
[INTO [db_name.]table_name] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])]
[INTO [db_name.]table_name] [NODELAY_CREATE_SUBTABLE] [OUTPUT_SUBTABLE(tbname_expr)] [(column_name1, column_name2 [COMPOSITE KEY][, ...])] [TAGS (tag_definition [, ...])]

tag_definition:
tag_name type_name [COMMENT 'string_value'] AS expr
Expand All @@ -311,6 +311,7 @@ tag_definition:
- 存在触发分组时该表为超级表。
- 不存在触发分组时该表为普通表。
- 只触发通知不计算,或计算结果只通知不保存时,不需要指定。
- [NODELAY_CREATE_SUBTABLE]:可选,指定在建流的时候立即创建每个分组的计算输出子表,默认情况下计算输出子表在有一条计算数据写入时才创建。如果添加该选项,创建流之后,子表会异步的创建,如果未全部创建成功,则流的状态会是 `Idle` ;如果创建成功,则状态会变更为 `Running` 。输出表为普通表和超级表默认会在建流的时候自动建立,无需进行配置。
- [OUTPUT_SUBTABLE(tbname_expr)]:可选,指定每个触发分组的计算输出表(子表)名,没有触发分组时不可以指定。未指定时自动为每个分组生成唯一的输出表(子表)名。`tbname_expr` 为任意输出字符串的表达式,可根据需要选择触发表分组列(来自 `[PARTITION BY col1[, ...]]`),输出长度不能超过表名最大长度,超过时截断处理。如果不希望不同分组输出到同一子表中,用户需确保每个分组输出表名都是唯一的。
- [(column_name1, column_name2 [COMPOSITE KEY][, ...])]:可选,指定输出表的每列列名,未指定时每列列名与计算结果的每列列名相同。可以通过 `[COMPOSITE KEY]` 指定第二列为主键列,与第一列共同组成复合主键。
- [TAGS (tag_definition [, ...])]:可选,指定输出超级表的标签列定义与值的列表,只有存在触发分组时才可以指定。未指定时,标签列的定义和值来自于所有分组列,此时分组列中不可以存在相同的列名。当按子表分组时,默认产生的标签列名为 `tag_tbname`,类型为 `VARCHAR(270)`。具体的 `tag_definition` 参数说明如下:
Expand Down
1 change: 1 addition & 0 deletions docs/zh/14-reference/03-taos-sql/92-keywords.md
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ description: TDengine TSDB 保留关键字的详细列表
| NE | |
| NEXT | |
| NMATCH | |
| NODELAY_CREATE_SUBTABLE | 3.5.0.0+ |
| NONE | |
| NORMAL | |
| NOT | |
Expand Down
4 changes: 3 additions & 1 deletion include/common/streamMsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ typedef struct {
SArray* forceOutCols; // array of SStreamOutCol, only available when forceOutput is true
SArray* colCids; // array of SStreamCidCol, only available when colCids is not empty
SArray* tagCids; // array of SStreamCidTag, only available when tagCids is not empty
int8_t nodelayCreateSubtable; // 1 = create sub-tables at stream create time; 0 = default
} SCMCreateStreamReq;

typedef enum SStreamMsgType {
Expand Down Expand Up @@ -605,7 +606,8 @@ typedef struct {
SArray* runnerList; // SArray<SStreamRunnerTarget>

int32_t leaderSnodeId;
char* streamName;
char* streamName;
int8_t nodelayCreateSubtable; // 1 = create sub-tables at stream create time; 0 = create on the fly during trigger
} SStreamTriggerDeployMsg;

typedef struct SStreamRunnerDeployMsg {
Expand Down
1 change: 1 addition & 0 deletions include/libs/nodes/cmdnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,7 @@ typedef struct SCreateStreamStmt {
SNode* pSubtable;
SNodeList* pTags; // SStreamTagDefNode
SNodeList* pCols; // SColumnDefNode
int8_t nodelayCreateSubtable; // 1 = create sub-tables at stream create; 0 = default
} SCreateStreamStmt;

typedef struct SDropStreamStmt {
Expand Down
1 change: 1 addition & 0 deletions include/libs/nodes/querynodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,7 @@ typedef struct SStreamOutTableNode {
SNode* pSubtable;
SNodeList* pTags; // SStreamTagDefNode
SNodeList* pCols; // SColumnDefNode
int8_t nodelayCreateSubtable; // 1 = create sub-tables at stream create time; 0 = default, do not
} SStreamOutTableNode;

typedef struct SStreamCalcRangeNode {
Expand Down
4 changes: 4 additions & 0 deletions source/common/src/msg/streamJson.c
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ static const char* jkCreateStreamReqForceOutCols = "forceOutCols";

static const char* jkCreateStreamReqColCids = "colCids";
static const char* jkCreateStreamReqTagCids = "tagCids";
static const char* jkCreateStreamReqNodelayCreateSubtable = "nodelayCreateSubtable";

static int32_t scmCreateStreamReqToJsonImpl(const void* pObj, void* pJson) {
const SCMCreateStreamReq* pReq = (const SCMCreateStreamReq*)pObj;
Expand Down Expand Up @@ -706,6 +707,8 @@ static int32_t scmCreateStreamReqToJsonImpl(const void* pObj, void* pJson) {
pReq->tagCids ? TARRAY_GET_ELEM(pReq->tagCids, 0) : NULL,
pReq->tagCids ? pReq->tagCids->elemSize : 0,
pReq->tagCids ? pReq->tagCids->size : 0));
TAOS_CHECK_RETURN(
tjsonAddIntegerToObject(pJson, jkCreateStreamReqNodelayCreateSubtable, pReq->nodelayCreateSubtable));

return TSDB_CODE_SUCCESS;
}
Expand Down Expand Up @@ -926,6 +929,7 @@ int32_t jsonToSCMCreateStreamReq(const void* pJson, void* pObj) {
jsonToSStreamOutCol, &pReq->forceOutCols, sizeof(SStreamOutCol)));
TAOS_CHECK_RETURN(tjsonToTArray(pJson, jkCreateStreamReqColCids, jsonToInt16, &pReq->colCids, sizeof(int16_t)));
TAOS_CHECK_RETURN(tjsonToTArray(pJson, jkCreateStreamReqTagCids, jsonToInt16, &pReq->tagCids, sizeof(int16_t)));
(void)tjsonGetTinyIntValue(pJson, jkCreateStreamReqNodelayCreateSubtable, &pReq->nodelayCreateSubtable);

return TSDB_CODE_SUCCESS;
}
4 changes: 4 additions & 0 deletions source/common/src/msg/streamMsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,7 @@ int32_t tEncodeSStreamTriggerDeployMsg(SEncoder* pEncoder, const SStreamTriggerD
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->leaderSnodeId));
TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, pMsg->streamName, (int32_t)strlen(pMsg->streamName) + 1));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->precision));
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pMsg->nodelayCreateSubtable));

_exit:

Expand Down Expand Up @@ -1686,6 +1687,9 @@ int32_t tDecodeSStreamTriggerDeployMsg(SDecoder* pDecoder, SStreamTriggerDeployM
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->precision));
}
if (!tDecodeIsEnd(pDecoder)) {
TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pMsg->nodelayCreateSubtable));
}

_exit:

Expand Down
1 change: 1 addition & 0 deletions source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_DB_INFO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MOUNT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MOUNT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
Expand Down
2 changes: 2 additions & 0 deletions source/dnode/mnode/impl/src/mndStb.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_TABLE_RSP, mndTransProcessRsp);

// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);

// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
Expand Down
Loading
Loading