Skip to content

Commit f800ce8

Browse files
committed
fix(stream): handle const boundary in external window range
1 parent 75f9b32 commit f800ce8

File tree

4 files changed

+150
-22
lines changed

4 files changed

+150
-22
lines changed

source/libs/executor/src/externalwindowoperator.c

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1784,7 +1784,7 @@ static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* p
17841784
SExprSupp* pSup = &pOperator->exprSupp;
17851785
int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);
17861786

1787-
if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
1787+
if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->winOutIdx >= 0)) {
17881788
goto _exit;
17891789
}
17901790

@@ -1860,7 +1860,9 @@ static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock)
18601860
scalarCalc = true;
18611861
}
18621862

1863-
if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.skey == INT64_MIN) {
1863+
// Use window output index to determine whether this window has been initialized.
1864+
// Different windows can share the same skey when lower bound is a constant.
1865+
if (pWin->winOutIdx < 0) {
18641866
TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
18651867
}
18661868

source/libs/scalar/src/scalar.c

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,6 +1268,21 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
12681268
int32_t code = 0;
12691269
int64_t timeValue = 0;
12701270
SNode* pNode = NULL;
1271+
int64_t* pTsValList = NULL;
1272+
int32_t tsValRows = 0;
1273+
int32_t winNum = 0;
1274+
1275+
if (!ctx->stream.pStreamRuntimeFuncInfo || !ctx->stream.pStreamRuntimeFuncInfo->pStreamPesudoFuncVals) {
1276+
sclError("invalid stream runtime func info for ext window range calc");
1277+
return TSDB_CODE_QRY_INVALID_INPUT;
1278+
}
1279+
1280+
winNum = taosArrayGetSize(ctx->stream.pStreamRuntimeFuncInfo->pStreamPesudoFuncVals);
1281+
if (winNum <= 0) {
1282+
sclError("invalid ext window count:%d", winNum);
1283+
return TSDB_CODE_QRY_INVALID_INPUT;
1284+
}
1285+
12711286
if (sclIsPrimTimeStampCol(node->pRight)) {
12721287
pNode = node->pLeft;
12731288
} else if (sclIsPrimTimeStampCol(node->pLeft)) {
@@ -1278,29 +1293,49 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
12781293
return TSDB_CODE_STREAM_INTERNAL_ERROR;
12791294
}
12801295

1281-
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&pNode, POINTER_BYTES);
1282-
if (res == NULL || res->columnData == NULL) {
1283-
sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
1284-
return TSDB_CODE_QRY_INVALID_INPUT;
1285-
}
1286-
if (res->columnData->pData == NULL) {
1287-
sclError("invalid column data for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
1288-
return TSDB_CODE_QRY_INVALID_INPUT;
1289-
}
1290-
if (res->columnData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
1291-
sclError("invalid type for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
1292-
return TSDB_CODE_QRY_INVALID_INPUT;
1296+
if (nodeType(pNode) == QUERY_NODE_VALUE) {
1297+
SValueNode* pVal = (SValueNode*)pNode;
1298+
if (!IS_INTEGER_TYPE(pVal->node.resType.type) && !IS_TIMESTAMP_TYPE(pVal->node.resType.type)) {
1299+
sclError("invalid value type for ts range expr, type:%d, node:%p", pVal->node.resType.type, pNode);
1300+
return TSDB_CODE_QRY_INVALID_INPUT;
1301+
}
1302+
timeValue = pVal->datum.i;
1303+
pTsValList = &timeValue;
1304+
tsValRows = 1;
1305+
} else {
1306+
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&pNode, POINTER_BYTES);
1307+
if (res == NULL || res->columnData == NULL) {
1308+
sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
1309+
return TSDB_CODE_QRY_INVALID_INPUT;
1310+
}
1311+
if (res->columnData->pData == NULL) {
1312+
sclError("invalid column data for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
1313+
return TSDB_CODE_QRY_INVALID_INPUT;
1314+
}
1315+
if (res->columnData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
1316+
sclError("invalid type for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
1317+
return TSDB_CODE_QRY_INVALID_INPUT;
1318+
}
1319+
1320+
pTsValList = (int64_t*)res->columnData->pData;
1321+
tsValRows = res->numOfRows;
1322+
if (tsValRows != 1 && tsValRows != winNum) {
1323+
sclError("invalid ts range rows:%d, ext window count:%d, node:%p", tsValRows, winNum, pNode);
1324+
return TSDB_CODE_QRY_INVALID_INPUT;
1325+
}
12931326
}
12941327

12951328
if (1 == ctx->stream.extWinType) {
12961329
if (node->opType == OP_TYPE_GREATER_THAN) {
1297-
for (int32_t i = 0; i < res->numOfRows; ++i) {
1298-
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(((int64_t*)res->columnData->pData)[i] + 1, ctx->stream.pWins[i].tw.skey) : (((int64_t*)res->columnData->pData)[i] + 1);
1330+
for (int32_t i = 0; i < winNum; ++i) {
1331+
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
1332+
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(tsVal + 1, ctx->stream.pWins[i].tw.skey) : (tsVal + 1);
12991333
ctx->stream.pWins[i].winOutIdx = -1;
13001334
}
13011335
} else if (node->opType == OP_TYPE_GREATER_EQUAL) {
1302-
for (int32_t i = 0; i < res->numOfRows; ++i) {
1303-
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(((int64_t*)res->columnData->pData)[i], ctx->stream.pWins[i].tw.skey) : (((int64_t*)res->columnData->pData)[i]);
1336+
for (int32_t i = 0; i < winNum; ++i) {
1337+
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
1338+
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(tsVal, ctx->stream.pWins[i].tw.skey) : tsVal;
13041339
ctx->stream.pWins[i].winOutIdx = -1;
13051340
}
13061341
} else {
@@ -1313,13 +1348,15 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
13131348
//if (ctx->stream.pStreamRuntimeFuncInfo->triggerType != STREAM_TRIGGER_SLIDING) {
13141349
// consider triggerType and keep the ekey exclude
13151350
if (node->opType == OP_TYPE_LOWER_THAN) {
1316-
for (int32_t i = 0; i < res->numOfRows; ++i) {
1317-
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(((int64_t*)res->columnData->pData)[i], ctx->stream.pWins[i].tw.ekey) : (((int64_t*)res->columnData->pData)[i]);
1351+
for (int32_t i = 0; i < winNum; ++i) {
1352+
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
1353+
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(tsVal, ctx->stream.pWins[i].tw.ekey) : tsVal;
13181354
ctx->stream.pWins[i].winOutIdx = -2;
13191355
}
13201356
} else if (node->opType == OP_TYPE_LOWER_EQUAL) {
1321-
for (int32_t i = 0; i < res->numOfRows; ++i) {
1322-
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(((int64_t*)res->columnData->pData)[i] + 1, ctx->stream.pWins[i].tw.ekey) : (((int64_t*)res->columnData->pData)[i] + 1);
1357+
for (int32_t i = 0; i < winNum; ++i) {
1358+
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
1359+
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(tsVal + 1, ctx->stream.pWins[i].tw.ekey) : (tsVal + 1);
13231360
ctx->stream.pWins[i].winOutIdx = -2;
13241361
}
13251362
} else {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import time
2+
from new_test_framework.utils import tdLog, tdSql, tdStream
3+
4+
5+
class TestStreamIntervalConstFilterTwstart:
6+
7+
def setup_class(cls):
8+
tdLog.debug(f"start to execute {__file__}")
9+
10+
def test_stream_interval_const_filter_twstart(self):
11+
"""Trigger mode sliding: const lower-bound with _twstart
12+
13+
Reproduce bug where stream runtime fails when the query filter mixes:
14+
- constant timestamp lower-bound
15+
- window placeholder upper-bound (_twstart)
16+
17+
Catalog:
18+
- Streams:03-TriggerMode
19+
20+
Since: v3.4.0.9
21+
22+
Labels: common,ci
23+
24+
Feishu: https://project.feishu.cn/taosdata_td/defect/detail/6766024000
25+
26+
History:
27+
- 2026-03-03 Jinqing Kuang Created
28+
29+
"""
30+
tdStream.dropAllStreamsAndDbs()
31+
tdStream.createSnode()
32+
33+
tdSql.prepare(dbname="sdb_const_twstart", vgroups=1)
34+
tdSql.execute("use sdb_const_twstart")
35+
tdSql.execute("create stable stb (ts timestamp, c1 int) tags(gid int);")
36+
37+
tdSql.execute(
38+
"create stream s_const_twstart interval(1s) sliding(1s) from stb into s_const_twstart_res "
39+
"as select _twstart, _twend, _twduration, count(*) from stb "
40+
"where ts >= '2026-01-01 00:00:00.000' and ts < _twstart;"
41+
)
42+
tdStream.checkStreamStatus("s_const_twstart")
43+
44+
tdSql.execute("create table ctb_1 using stb tags (1);")
45+
tdSql.execute(
46+
"insert into ctb_1 values "
47+
"('2026-01-01 00:00:00.000', 0),"
48+
"('2026-01-01 00:00:01.000', 1),"
49+
"('2026-01-01 00:00:02.000', 2),"
50+
"('2026-01-01 00:00:03.000', 3),"
51+
"('2026-01-01 00:00:04.000', 4),"
52+
"('2026-01-01 00:00:05.000', 5);"
53+
)
54+
55+
tdSql.checkResultsByFunc(
56+
sql="select * from s_const_twstart_res;",
57+
func=lambda: tdSql.getRows() == 5
58+
and tdSql.compareData(0, 0, "2026-01-01 00:00:00.000")
59+
and tdSql.compareData(0, 1, "2026-01-01 00:00:01.000")
60+
and tdSql.compareData(0, 2, 1000)
61+
and tdSql.compareData(0, 3, 0)
62+
and tdSql.compareData(1, 0, "2026-01-01 00:00:01.000")
63+
and tdSql.compareData(1, 1, "2026-01-01 00:00:02.000")
64+
and tdSql.compareData(1, 2, 1000)
65+
and tdSql.compareData(1, 3, 1)
66+
and tdSql.compareData(2, 0, "2026-01-01 00:00:02.000")
67+
and tdSql.compareData(2, 1, "2026-01-01 00:00:03.000")
68+
and tdSql.compareData(2, 2, 1000)
69+
and tdSql.compareData(2, 3, 2)
70+
and tdSql.compareData(3, 0, "2026-01-01 00:00:03.000")
71+
and tdSql.compareData(3, 1, "2026-01-01 00:00:04.000")
72+
and tdSql.compareData(3, 2, 1000)
73+
and tdSql.compareData(3, 3, 3)
74+
and tdSql.compareData(4, 0, "2026-01-01 00:00:04.000")
75+
and tdSql.compareData(4, 1, "2026-01-01 00:00:05.000")
76+
and tdSql.compareData(4, 2, 1000)
77+
and tdSql.compareData(4, 3, 4)
78+
)
79+
80+
tdSql.checkResultsByFunc(
81+
sql=(
82+
"select status from information_schema.ins_stream_tasks "
83+
"where stream_name='s_const_twstart' and type='Runner';"
84+
),
85+
func=lambda: tdSql.getRows() > 0
86+
and all(tdSql.getData(i, 0) != "Failed" for i in range(tdSql.getRows())),
87+
)
88+

test/ci/cases.task

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,7 @@
697697
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_event_new.py
698698
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_event.py
699699
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_fill_history.py
700+
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_interval_const_filter_twstart.py
700701
,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_period_1.py
701702
,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_sliding.py
702703
,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_state_new.py

0 commit comments

Comments
 (0)