Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces the Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces lag and lead functions. No security vulnerabilities were found. However, a significant performance issue was identified in the logic that groups functions for processing, due to nested loops leading to quadratic time complexity. A more efficient approach using a hash map has been suggested to improve performance and readability.
| if (processByRowFunctionCtx && taosArrayGetSize(processByRowFunctionCtx) > 0) { | ||
| SqlFunctionCtx** pfCtx = taosArrayGet(processByRowFunctionCtx, 0); | ||
| TSDB_CHECK_NULL(pfCtx, code, lino, _exit, terrno); | ||
| int32_t processByRowSize = taosArrayGetSize(processByRowFunctionCtx); | ||
| pProcessedFuncIds = taosArrayInit(4, sizeof(int32_t)); | ||
| TSDB_CHECK_NULL(pProcessedFuncIds, code, lino, _exit, terrno); | ||
|
|
||
| for (int32_t i = 0; i < processByRowSize; ++i) { | ||
| SqlFunctionCtx** ppCurrCtx = taosArrayGet(processByRowFunctionCtx, i); | ||
| TSDB_CHECK_NULL(ppCurrCtx, code, lino, _exit, terrno); | ||
| TSDB_CHECK_NULL(*ppCurrCtx, code, lino, _exit, terrno); | ||
|
|
||
| bool processed = false; | ||
| int32_t processedNum = taosArrayGetSize(pProcessedFuncIds); | ||
| for (int32_t j = 0; j < processedNum; ++j) { | ||
| int32_t* pFuncId = taosArrayGet(pProcessedFuncIds, j); | ||
| TSDB_CHECK_NULL(pFuncId, code, lino, _exit, terrno); | ||
| if (*pFuncId == (*ppCurrCtx)->functionId) { | ||
| processed = true; | ||
| break; | ||
| } | ||
| } | ||
|
|
||
| if (processed) { | ||
| continue; | ||
| } | ||
|
|
||
| pGroupedCtxArray = taosArrayInit(2, sizeof(SqlFunctionCtx*)); | ||
| TSDB_CHECK_NULL(pGroupedCtxArray, code, lino, _exit, terrno); | ||
|
|
||
| for (int32_t j = i; j < processByRowSize; ++j) { | ||
| SqlFunctionCtx** ppTmpCtx = taosArrayGet(processByRowFunctionCtx, j); | ||
| TSDB_CHECK_NULL(ppTmpCtx, code, lino, _exit, terrno); | ||
| TSDB_CHECK_NULL(*ppTmpCtx, code, lino, _exit, terrno); | ||
|
|
||
| if ((*ppTmpCtx)->functionId == (*ppCurrCtx)->functionId) { | ||
| void* px = taosArrayPush(pGroupedCtxArray, ppTmpCtx); | ||
| TSDB_CHECK_NULL(px, code, lino, _exit, terrno); | ||
| } | ||
| } | ||
|
|
||
| TAOS_CHECK_EXIT((*ppCurrCtx)->fpSet.processFuncByRow(pGroupedCtxArray)); | ||
| taosArrayDestroy(pGroupedCtxArray); | ||
| pGroupedCtxArray = NULL; | ||
|
|
||
| void* px = taosArrayPush(pProcessedFuncIds, &(*ppCurrCtx)->functionId); | ||
| TSDB_CHECK_NULL(px, code, lino, _exit, terrno); | ||
|
|
||
| TAOS_CHECK_EXIT((*pfCtx)->fpSet.processFuncByRow(processByRowFunctionCtx)); | ||
| numOfRows = (*pfCtx)->resultInfo->numOfRes; | ||
| numOfRows = (*ppCurrCtx)->resultInfo->numOfRes; | ||
| } | ||
|
|
||
| taosArrayDestroy(pProcessedFuncIds); | ||
| pProcessedFuncIds = NULL; | ||
| } |
There was a problem hiding this comment.
The current implementation for grouping functions by functionId has a time complexity that can approach O(N^2) in some cases, where N is the number of process by row functions. This is due to nested loops, including a linear scan to check if a function has been processed. This can lead to performance degradation when many such functions are used in a query.
A more efficient approach would be to use a hash map to group the function contexts. You could iterate through processByRowFunctionCtx once to populate a hash map where keys are functionIds and values are arrays of SqlFunctionCtx*. Then, iterate through the hash map to process each group. This would reduce the complexity to approximately O(N).
For example (pseudo-code):
// 1. Create a hash map: HMap<int32_t, SArray*>
// 2. Iterate through processByRowFunctionCtx
// - Get or create SArray for functionId
// - Add SqlFunctionCtx* to SArray
// 3. Iterate through hash map
// - For each SArray (group), call processFuncByRow
// 4. Clean up hash map and SArraysThis refactoring would improve both performance and code readability.
There was a problem hiding this comment.
Pull request overview
This PR introduces initial support for lag() / lead() selection functions, including parser/executor plumbing to allow multiple process-by-row functions to coexist, plus CI coverage via a new regression test.
Changes:
- Add
lag/leadbuiltins with translation/type validation and process-by-row execution hooks. - Update executor project operator to group and run process-by-row functions by function id (enabling
lag+leadin the same SELECT). - Add a dedicated CI test case validating basic semantics, partition behavior, and default-value type rules.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
test/ci/cases.task |
Registers the new lag/lead selection test in CI. |
test/cases/11-Functions/03-Selection/test_fun_select_lag_lead.py |
Adds regression coverage for lag() / lead() behavior and parameter validation. |
source/libs/parser/src/parTranslater.c |
Relaxes the “different by-row func” restriction via a coexistence check to allow lag + lead. |
source/libs/function/src/functionMgt.c |
Adds canCoexistIndefiniteRowsFunc() helper used by the parser. |
source/libs/function/src/builtinsimpl.c |
Implements lag/lead process-by-row evaluation and default handling. |
source/libs/function/src/builtins.c |
Adds builtin definitions for lag/lead and validates default parameter type compatibility. |
source/libs/function/inc/builtinsimpl.h |
Exposes lag/lead exec hooks for the function runtime. |
source/libs/executor/src/projectoperator.c |
Runs process-by-row functions in grouped batches by function id. |
source/libs/executor/src/executil.c |
Ensures process-by-row funcs get their exec fpSet populated. |
include/libs/function/functionMgt.h |
Introduces FUNCTION_TYPE_LEAD and declares coexistence helper. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| } | ||
|
|
||
| SColumnInfoData* pInputCol = pCtx->input.pData[0]; | ||
| int32_t dataLen = IS_VAR_DATA_TYPE(pInputCol->info.type) ? varDataTLen(pRow->pData) : pInputCol->info.bytes; |
There was a problem hiding this comment.
copyLagLeadRowValue uses varDataTLen(pRow->pData) for all IS_VAR_DATA_TYPE inputs. IS_VAR_DATA_TYPE includes BLOB/MEDIUMBLOB (see include/common/ttypes.h), whose on-wire representation uses a 4-byte length header (via blobDataTLen), not the 2-byte VarDataLenT assumed by varDataTLen. This will copy the wrong length (and can read past the buffer) for lag/lead over BLOB columns. Please branch on IS_STR_DATA_BLOB(type) (or explicit BLOB types) and use the correct blob total-length helper when allocating/copying.
| int32_t dataLen = IS_VAR_DATA_TYPE(pInputCol->info.type) ? varDataTLen(pRow->pData) : pInputCol->info.bytes; | |
| int32_t dataLen = pInputCol->info.bytes; | |
| if (IS_VAR_DATA_TYPE(pInputCol->info.type)) { | |
| if (IS_STR_DATA_BLOB(pInputCol->info.type)) { | |
| dataLen = blobDataTLen(pRow->pData); | |
| } else { | |
| dataLen = varDataTLen(pRow->pData); | |
| } | |
| } |
| int32_t numOfElems = 0; | ||
| SArray* pRows = taosArrayInit_s(sizeof(SFuncInputRow), colNum); | ||
| if (NULL == pRows) { | ||
| return terrno; | ||
| } | ||
|
|
||
| SArray* pValueArrays = taosArrayInit(colNum, sizeof(SArray*)); | ||
| if (NULL == pValueArrays) { | ||
| code = terrno; | ||
| goto _exit; | ||
| } | ||
|
|
||
| for (int32_t i = 0; i < colNum; ++i) { | ||
| SArray* pValues = taosArrayInit(32, sizeof(SLagLeadRowValue)); | ||
| if (NULL == pValues) { | ||
| code = terrno; | ||
| goto _exit; | ||
| } | ||
|
|
||
| if (NULL == taosArrayPush(pValueArrays, &pValues)) { | ||
| cleanupLagLeadRowValueArray(pValues); |
There was a problem hiding this comment.
lagLeadFunctionByRowImpl materializes all input rows into per-function arrays and deep-copies every cell value before producing any output. This makes lag/lead O(N) additional memory per selected lag/lead expression (and can become very large for long queries / var-length columns). Consider a streaming approach (e.g., ring buffer for lag; bounded lookahead buffer for lead) so memory usage scales with offset rather than total rows.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
source/libs/executor/src/projectoperator.c:668
- Setting noSplitOutput for any lag/lead query disables the existing threshold-based result splitting, which can force the operator to buffer arbitrarily large result sets in memory for large scans. Consider keeping splitting enabled and making lag/lead state management work across output chunks (or introduce an upper bound / fallback) to avoid potential OOM and latency spikes.
if (noSplitOutput || pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
while (1) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
if (pBlock == NULL) {
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| vs = f"s{v}" | ||
| values.append(f"('{ts}', {v}, {vb}, '{vs}')") | ||
|
|
||
| tdSql.execute("insert into ct_big values " + "".join(values)) |
There was a problem hiding this comment.
The batch INSERT for ct_big concatenates value tuples without commas ("".join(values)), producing invalid SQL like (...)(...) . Use a comma-separated join (e.g., ', '.join(values)) so the INSERT statement is syntactically correct.
| tdSql.execute("insert into ct_big values " + "".join(values)) | |
| tdSql.execute("insert into ct_big values " + ", ".join(values)) |
| bool canCoexistIndefiniteRowsFunc(int32_t funcId1, int32_t funcId2) { | ||
| if (funcId1 == funcId2) { | ||
| return true; | ||
| } | ||
| if ((funcMgtBuiltins[funcId1].type == FUNCTION_TYPE_LAG || funcMgtBuiltins[funcId1].type == FUNCTION_TYPE_LEAD) && | ||
| (funcMgtBuiltins[funcId2].type == FUNCTION_TYPE_LAG || funcMgtBuiltins[funcId2].type == FUNCTION_TYPE_LEAD)) { | ||
| return true; | ||
| } | ||
| return false; |
There was a problem hiding this comment.
canCoexistIndefiniteRowsFunc indexes funcMgtBuiltins[funcIdX] without validating that funcId1/funcId2 are within [0, funcMgtBuiltinsNum). Unlike other helpers in this file, this can read out of bounds if an invalid/udf id ever reaches this path. Add bounds checks (and return false) before indexing.
Description
Issue(s)
Checklist
Please check the items in the checklist if applicable.