-
Notifications
You must be signed in to change notification settings - Fork 5k
Feat/6550634959/external window #34605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 3.0
Are you sure you want to change the base?
Changes from 10 commits
1cb043a
b79be7d
c7c45c7
bc1385f
753f498
c92f09e
23e245a
941e6a0
8b3f4a9
bb2db18
8791ebf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -2242,6 +2242,7 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo | |||||||||||||||||||||||||||||||||||||||||||||||||
| QRY_PARAM_CHECK(pOptrOut); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int32_t code = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int32_t lino = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| bool isInStream = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| SExternalWindowOperator* pExtW = taosMemoryCalloc(1, sizeof(SExternalWindowOperator)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pOperator->pPhyNode = pNode; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -2271,6 +2272,65 @@ int32_t createExternalWindowOperator(SOperatorInfo* pDownstream, SPhysiNode* pNo | |||||||||||||||||||||||||||||||||||||||||||||||||
| // pExtW->limitInfo = (SLimitInfo){0}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // initLimitInfo(pPhynode->window.node.pLimit, pPhynode->window.node.pSlimit, &pExtW->limitInfo); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if (pPhynode->pSubquery) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // todo xs | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Initialize minimal stream runtime info to reuse external-window logic. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Note: full subquery execution to populate multiple windows can be added later. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| isInStream = false; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (pTaskInfo->pStreamRuntimeInfo == NULL) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pTaskInfo->pStreamRuntimeInfo = (SStreamRuntimeInfo*)taosMemoryCalloc(1, sizeof(SStreamRuntimeInfo)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (pTaskInfo->pStreamRuntimeInfo == NULL) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| code = terrno; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| goto _error; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| SStreamRuntimeFuncInfo* pRt = &pTaskInfo->pStreamRuntimeInfo->funcInfo; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->withExternalWindow = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->isWindowTrigger = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->triggerType = STREAM_TRIGGER_SESSION; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->precision = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->curIdx = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->curOutIdx = 0; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| if (pRt->pStreamPesudoFuncVals == NULL) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| pRt->pStreamPesudoFuncVals = taosArrayInit(4, sizeof(SSTriggerCalcParam)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (pRt->pStreamPesudoFuncVals == NULL) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| code = terrno; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| goto _error; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| taosArrayClear(pRt->pStreamPesudoFuncVals); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| SSTriggerCalcParam one = {0}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Use provided timeRange as a minimal single-window placeholder | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.wstart = 1589335200000; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.wend = 1589338140000; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.wduration = one.wend - one.wstart; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+2507
to
+2511
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| SValue val = {0}; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.pExternalWindowData = taosArrayInit(1, sizeof(SValue)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| QUERY_CHECK_NULL(one.pExternalWindowData, code, lino, _error, terrno); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| val.type = TSDB_DATA_TYPE_BIGINT; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| val.val = 10; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(one.pExternalWindowData, &val); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| val.val = 11; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(one.pExternalWindowData, &val); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(pRt->pStreamPesudoFuncVals, &one); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.wstart = 1589338140001; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.wend = 1589340110000; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.wduration = one.wend - one.wstart; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| one.pExternalWindowData = taosArrayInit(1, sizeof(SValue)); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| QUERY_CHECK_NULL(one.pExternalWindowData, code, lino, _error, terrno); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| val.type = TSDB_DATA_TYPE_BIGINT; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| val.val = 20; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(one.pExternalWindowData, &val); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| val.val = 21; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(one.pExternalWindowData, &val); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(pRt->pStreamPesudoFuncVals, &one); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| (void)taosArrayPush(one.pExternalWindowData, &val); | |
| (void)taosArrayPush(pRt->pStreamPesudoFuncVals, &one); | |
| one.wstart = 1589338140001; | |
| one.wend = 1589340110000; | |
| one.wduration = one.wend - one.wstart; | |
| one.pExternalWindowData = taosArrayInit(1, sizeof(SValue)); | |
| QUERY_CHECK_NULL(one.pExternalWindowData, code, lino, _error, terrno); | |
| val.type = TSDB_DATA_TYPE_BIGINT; | |
| val.val = 20; | |
| (void)taosArrayPush(one.pExternalWindowData, &val); | |
| val.val = 21; | |
| (void)taosArrayPush(one.pExternalWindowData, &val); | |
| (void)taosArrayPush(pRt->pStreamPesudoFuncVals, &one); | |
| /* External window execution with subquery input (non-stream execution) | |
| * previously fabricated window time ranges and column values here, | |
| * which produced incorrect and non-deterministic results. | |
| * | |
| * Proper subquery execution/materialization is not implemented yet, | |
| * so we currently treat this scenario as unsupported instead of | |
| * returning bogus data. | |
| */ | |
| code = TSDB_CODE_FAILED; | |
| goto _error; |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -1203,6 +1203,10 @@ static int32_t translatePlaceHolderPseudoColumn(SFunctionNode* pFunc, char* pErr | |||||||||||||||||||||||||||
| return TSDB_CODE_SUCCESS; | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| static int32_t translateExternalWindowColumnFunc(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { | ||||||||||||||||||||||||||||
| return TSDB_CODE_SUCCESS; | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| static int32_t translateTimePseudoColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { | ||||||||||||||||||||||||||||
| // pseudo column do not need to check parameters | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -7243,6 +7247,20 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { | |||||||||||||||||||||||||||
| .estimateReturnRowsFunc = fillforwardEstReturnRows, | ||||||||||||||||||||||||||||
| .processFuncByRow = fillforwardFunctionByRow, | ||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||
| .name = "_external_window_column", | ||||||||||||||||||||||||||||
| .type = FUNCTION_TYPE_EXTERNAL_WINDOW_COLUMN, | ||||||||||||||||||||||||||||
| .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_PLACE_HOLDER_FUNC | FUNC_MGT_SKIP_SCAN_CHECK_FUNC, | ||||||||||||||||||||||||||||
| .parameters = {.minParamNum = 0, | ||||||||||||||||||||||||||||
| .maxParamNum = 0, | ||||||||||||||||||||||||||||
| .paramInfoPattern = 0, | ||||||||||||||||||||||||||||
|
Comment on lines
+7254
to
+7256
|
||||||||||||||||||||||||||||
| .parameters = {.minParamNum = 0, | |
| .maxParamNum = 0, | |
| .paramInfoPattern = 0, | |
| .parameters = {.minParamNum = 1, | |
| .maxParamNum = 1, | |
| .paramInfoPattern = 1, | |
| .inputParaInfo[0][0] = {.isLastParam = true, | |
| .startParam = 1, | |
| .endParam = 1, | |
| .validDataType = FUNC_PARAM_SUPPORT_NUMERIC_TYPE, | |
| .validNodeType = FUNC_PARAM_SUPPORT_EXPR_NODE, | |
| .paramAttribute = FUNC_PARAM_NO_SPECIFIC_ATTRIBUTE, | |
| .valueRangeFlag = FUNC_PARAM_NO_SPECIFIC_VALUE,}, |
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -33,6 +33,14 @@ static SFuncMgtService gFunMgtService; | |||||||||||
| static TdThreadOnce functionHashTableInit = PTHREAD_ONCE_INIT; | ||||||||||||
| static int32_t initFunctionCode = 0; | ||||||||||||
|
|
||||||||||||
| static void InitSpecialFuncId() { | ||||||||||||
| // just to make sure the funcId of these functions are initialized before used. | ||||||||||||
| int32_t tmp = fmGetTwstartFuncId(); | ||||||||||||
| tmp = fmGetTwendFuncId(); | ||||||||||||
| tmp = fmGetTwdurationFuncId(); | ||||||||||||
| tmp = fmGetExternalWindowColumnFuncId(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| static void doInitFunctionTable() { | ||||||||||||
| gFunMgtService.pFuncNameHashTable = | ||||||||||||
| taosHashInit(funcMgtBuiltinsNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); | ||||||||||||
|
|
@@ -48,6 +56,7 @@ static void doInitFunctionTable() { | |||||||||||
| return; | ||||||||||||
| } | ||||||||||||
| } | ||||||||||||
| InitSpecialFuncId(); | ||||||||||||
| } | ||||||||||||
|
|
||||||||||||
| static bool isSpecificClassifyFunc(int32_t funcId, uint64_t classification) { | ||||||||||||
|
|
@@ -416,6 +425,11 @@ bool fmIsPrimaryKeyFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, | |||||||||||
|
|
||||||||||||
| bool fmIsPlaceHolderFunc(int32_t funcId) {return isSpecificClassifyFunc(funcId, FUNC_MGT_PLACE_HOLDER_FUNC); } | ||||||||||||
|
|
||||||||||||
| bool fmIsPlaceHolderFuncForExternalWin(int32_t funcId) { | ||||||||||||
|
||||||||||||
| bool fmIsPlaceHolderFuncForExternalWin(int32_t funcId) { | |
| bool fmIsPlaceHolderFuncForExternalWin(int32_t funcId) { | |
| if (funcId < 0 || funcId >= funcMgtBuiltinsNum) { | |
| return false; | |
| } |
Copilot
AI
Feb 27, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fmSetStreamPseudoFuncParamVal() the FUNCTION_TYPE_EXTERNAL_WINDOW_COLUMN branch sets the value node using ((SValue*)pVal)->val only. This will be wrong for variable-length types stored in SValue.pData/nData (and also ignores the SValue.type). Either restrict _external_window_column to fixed-size numeric types or set the value node using the correct representation for the external column's real type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SExternalWindowNodenow containspSubquery,pFill, andaliasName. To avoid leaks and incorrect behavior when cloning/serializing/destroying AST nodes, the corresponding clone, JSON/msg serialization, andnodesDestroyNode()cases forQUERY_NODE_EXTERNAL_WINDOWneed to be updated to handle these new fields as well.