Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions source/libs/executor/src/externalwindowoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ typedef struct SExternalWindowOperator {
SResultRow* pResultRow;

int64_t lastSKey;
int64_t lastEKey;
int32_t lastWinId;
SSDataBlock* pEmptyInputBlock;
bool hasCountFunc;
Expand Down Expand Up @@ -866,6 +867,7 @@ static int32_t resetExternalWindowOperator(SOperatorInfo* pOperator) {

pExtW->outWinIdx = 0;
pExtW->lastSKey = INT64_MIN;
pExtW->lastEKey = INT64_MIN;
pExtW->isDynWindow = false;

qDebug("%s ext window stat at reset, created:%" PRId64 ", destroyed:%" PRId64 ", recycled:%" PRId64 ", reused:%" PRId64 ", append:%" PRId64,
Expand Down Expand Up @@ -1784,7 +1786,8 @@ static int32_t extWinAggHandleEmptyWins(SOperatorInfo* pOperator, SSDataBlock* p
SExprSupp* pSup = &pOperator->exprSupp;
int32_t currIdx = extWinGetCurWinIdx(pOperator->pTaskInfo);

if (NULL == pExtW->pEmptyInputBlock || (pWin && pWin->tw.skey == pExtW->lastSKey)) {
if (NULL == pExtW->pEmptyInputBlock ||
(pWin && pWin->tw.skey == pExtW->lastSKey && pWin->tw.ekey == pExtW->lastEKey)) {
goto _exit;
}

Expand Down Expand Up @@ -1860,14 +1863,16 @@ static int32_t extWinAggOpen(SOperatorInfo* pOperator, SSDataBlock* pInputBlock)
scalarCalc = true;
}

if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.skey == INT64_MIN) {
TAOS_CHECK_EXIT(extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
if (pWin->tw.skey != pExtW->lastSKey || pWin->tw.ekey != pExtW->lastEKey || pWin->tw.skey == INT64_MIN) {
TAOS_CHECK_EXIT(
extWinAggSetWinOutputBuf(pOperator, pWin, &pOperator->exprSupp, &pExtW->aggSup, pOperator->pTaskInfo));
}

updateTimeWindowInfo(&pExtW->twAggSup.timeWindowData, &pWin->tw, 1);
TAOS_CHECK_EXIT(extWinAggDo(pOperator, startPos, winRows, pInputBlock));

pExtW->lastSKey = pWin->tw.skey;
pExtW->lastEKey = pWin->tw.ekey;
pExtW->lastWinId = extWinGetCurWinIdx(pOperator->pTaskInfo);
startPos += winRows;
}
Expand Down Expand Up @@ -2143,6 +2148,7 @@ static int32_t extWinOpen(SOperatorInfo* pOperator) {
pExtW->blkWinStartIdx = 0;
pExtW->outWinIdx = 0;
pExtW->lastSKey = INT64_MIN;
pExtW->lastEKey = INT64_MIN;
pExtW->isDynWindow = true;
pExtW->orgTableTimeRange.skey = INT64_MAX;
pExtW->orgTableTimeRange.ekey = INT64_MIN;
Expand Down Expand Up @@ -2422,6 +2428,7 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo
QUERY_CHECK_CODE(code, lino, _error);

pExtW->lastSKey = INT64_MIN;
pExtW->lastEKey = INT64_MIN;
} else {
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;

Expand Down
77 changes: 57 additions & 20 deletions source/libs/scalar/src/scalar.c
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,21 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
int32_t code = 0;
int64_t timeValue = 0;
SNode* pNode = NULL;
int64_t* pTsValList = NULL;
int32_t tsValRows = 0;
int32_t winNum = 0;

if (!ctx->stream.pStreamRuntimeFuncInfo || !ctx->stream.pStreamRuntimeFuncInfo->pStreamPesudoFuncVals) {
sclError("invalid stream runtime func info for ext window range calc");
return TSDB_CODE_QRY_INVALID_INPUT;
}

winNum = taosArrayGetSize(ctx->stream.pStreamRuntimeFuncInfo->pStreamPesudoFuncVals);
if (winNum <= 0) {
sclError("invalid ext window count:%d", winNum);
return TSDB_CODE_QRY_INVALID_INPUT;
}

if (sclIsPrimTimeStampCol(node->pRight)) {
pNode = node->pLeft;
} else if (sclIsPrimTimeStampCol(node->pLeft)) {
Expand All @@ -1278,29 +1293,49 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}

SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL || res->columnData == NULL) {
sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (res->columnData->pData == NULL) {
sclError("invalid column data for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (res->columnData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
sclError("invalid type for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
if (nodeType(pNode) == QUERY_NODE_VALUE) {
SValueNode* pVal = (SValueNode*)pNode;
if (!IS_INTEGER_TYPE(pVal->node.resType.type) && !IS_TIMESTAMP_TYPE(pVal->node.resType.type)) {
sclError("invalid value type for ts range expr, type:%d, node:%p", pVal->node.resType.type, pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
timeValue = pVal->datum.i;
pTsValList = &timeValue;
tsValRows = 1;
} else {
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL || res->columnData == NULL) {
sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (res->columnData->pData == NULL) {
sclError("invalid column data for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (res->columnData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
sclError("invalid type for ts range expr, type:%d, node:%p", res->columnData->info.type, pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}

pTsValList = (int64_t*)res->columnData->pData;
tsValRows = res->numOfRows;
if (tsValRows != 1 && tsValRows != winNum) {
sclError("invalid ts range rows:%d, ext window count:%d, node:%p", tsValRows, winNum, pNode);
return TSDB_CODE_QRY_INVALID_INPUT;
}
}

if (1 == ctx->stream.extWinType) {
if (node->opType == OP_TYPE_GREATER_THAN) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(((int64_t*)res->columnData->pData)[i] + 1, ctx->stream.pWins[i].tw.skey) : (((int64_t*)res->columnData->pData)[i] + 1);
for (int32_t i = 0; i < winNum; ++i) {
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(tsVal + 1, ctx->stream.pWins[i].tw.skey) : (tsVal + 1);
ctx->stream.pWins[i].winOutIdx = -1;
}
} else if (node->opType == OP_TYPE_GREATER_EQUAL) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(((int64_t*)res->columnData->pData)[i], ctx->stream.pWins[i].tw.skey) : (((int64_t*)res->columnData->pData)[i]);
for (int32_t i = 0; i < winNum; ++i) {
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
ctx->stream.pWins[i].tw.skey = (-1 == ctx->stream.pWins[i].winOutIdx) ? TMAX(tsVal, ctx->stream.pWins[i].tw.skey) : tsVal;
ctx->stream.pWins[i].winOutIdx = -1;
}
} else {
Expand All @@ -1313,13 +1348,15 @@ static int32_t sclCalcStreamExtWinsTimeRange(SScalarCtx *ctx, SOperator
//if (ctx->stream.pStreamRuntimeFuncInfo->triggerType != STREAM_TRIGGER_SLIDING) {
// consider triggerType and keep the ekey exclude
if (node->opType == OP_TYPE_LOWER_THAN) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(((int64_t*)res->columnData->pData)[i], ctx->stream.pWins[i].tw.ekey) : (((int64_t*)res->columnData->pData)[i]);
for (int32_t i = 0; i < winNum; ++i) {
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(tsVal, ctx->stream.pWins[i].tw.ekey) : tsVal;
ctx->stream.pWins[i].winOutIdx = -2;
}
} else if (node->opType == OP_TYPE_LOWER_EQUAL) {
for (int32_t i = 0; i < res->numOfRows; ++i) {
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(((int64_t*)res->columnData->pData)[i] + 1, ctx->stream.pWins[i].tw.ekey) : (((int64_t*)res->columnData->pData)[i] + 1);
for (int32_t i = 0; i < winNum; ++i) {
int64_t tsVal = pTsValList[(tsValRows == 1) ? 0 : i];
ctx->stream.pWins[i].tw.ekey = (-2 == ctx->stream.pWins[i].winOutIdx) ? TMIN(tsVal + 1, ctx->stream.pWins[i].tw.ekey) : (tsVal + 1);
ctx->stream.pWins[i].winOutIdx = -2;
}
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import time
from new_test_framework.utils import tdLog, tdSql, tdStream


class TestStreamIntervalConstFilterTwstart:

def setup_class(cls):
tdLog.debug(f"start to execute {__file__}")

def test_stream_interval_const_filter_twstart(self):
"""Trigger mode sliding: const lower-bound with _twstart

Reproduce bug where stream runtime fails when the query filter mixes:
- constant timestamp lower-bound
- window placeholder upper-bound (_twstart)

Catalog:
- Streams:03-TriggerMode

Since: v3.4.0.9

Labels: common,ci

Feishu: https://project.feishu.cn/taosdata_td/defect/detail/6766024000

History:
- 2026-03-03 Jinqing Kuang Created

"""
tdStream.dropAllStreamsAndDbs()
tdStream.createSnode()

tdSql.prepare(dbname="sdb_const_twstart", vgroups=1)
tdSql.execute("use sdb_const_twstart")
tdSql.execute("create stable stb (ts timestamp, c1 int) tags(gid int);")

tdSql.execute(
"create stream s_const_twstart interval(1s) sliding(1s) from stb into s_const_twstart_res "
"as select _twstart, _twend, _twduration, count(*) from stb "
"where ts >= '2026-01-01 00:00:00.000' and ts < _twstart;"
)
tdStream.checkStreamStatus("s_const_twstart")

tdSql.execute("create table ctb_1 using stb tags (1);")
tdSql.execute(
"insert into ctb_1 values "
"('2026-01-01 00:00:00.000', 0),"
"('2026-01-01 00:00:01.000', 1),"
"('2026-01-01 00:00:02.000', 2),"
"('2026-01-01 00:00:03.000', 3),"
"('2026-01-01 00:00:04.000', 4),"
"('2026-01-01 00:00:05.000', 5);"
)

tdSql.checkResultsByFunc(
sql="select * from s_const_twstart_res;",
func=lambda: tdSql.getRows() == 5
and tdSql.compareData(0, 0, "2026-01-01 00:00:00.000")
and tdSql.compareData(0, 1, "2026-01-01 00:00:01.000")
and tdSql.compareData(0, 2, 1000)
and tdSql.compareData(0, 3, 0)
and tdSql.compareData(1, 0, "2026-01-01 00:00:01.000")
and tdSql.compareData(1, 1, "2026-01-01 00:00:02.000")
and tdSql.compareData(1, 2, 1000)
and tdSql.compareData(1, 3, 1)
and tdSql.compareData(2, 0, "2026-01-01 00:00:02.000")
and tdSql.compareData(2, 1, "2026-01-01 00:00:03.000")
and tdSql.compareData(2, 2, 1000)
and tdSql.compareData(2, 3, 2)
and tdSql.compareData(3, 0, "2026-01-01 00:00:03.000")
and tdSql.compareData(3, 1, "2026-01-01 00:00:04.000")
and tdSql.compareData(3, 2, 1000)
and tdSql.compareData(3, 3, 3)
and tdSql.compareData(4, 0, "2026-01-01 00:00:04.000")
and tdSql.compareData(4, 1, "2026-01-01 00:00:05.000")
and tdSql.compareData(4, 2, 1000)
and tdSql.compareData(4, 3, 4)
)

tdSql.checkResultsByFunc(
sql=(
"select status from information_schema.ins_stream_tasks "
"where stream_name='s_const_twstart' and type='Runner';"
),
func=lambda: tdSql.getRows() > 0
and all(tdSql.getData(i, 0) != "Failed" for i in range(tdSql.getRows())),
)

1 change: 1 addition & 0 deletions test/ci/cases.task
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,7 @@
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_event_new.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_event.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_fill_history.py
,,y,.,./ci/pytest.sh pytest cases/18-StreamProcessing/03-TriggerMode/test_interval_const_filter_twstart.py
,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_period_1.py
,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_sliding.py
,,n,.,pytest cases/18-StreamProcessing/03-TriggerMode/test_state_new.py
Expand Down
Loading