diff --git a/docs/zh/14-reference/03-taos-sql/51-perf.md b/docs/zh/14-reference/03-taos-sql/51-perf.md index 9a0dbb0c6f4f..e66e7a4ecbdc 100644 --- a/docs/zh/14-reference/03-taos-sql/51-perf.md +++ b/docs/zh/14-reference/03-taos-sql/51-perf.md @@ -61,6 +61,10 @@ TDengine TSDB 3.0 版本开始提供一个内置数据库 `performance_schema` | 11 | sub_num | INT | 子查询数量 | | 12 | sub_status | BINARY(1000) | 子查询状态 | | 13 | sql | BINARY(1024) | SQL 语句 | +| 14 | user_app | BINARY(24) | 应用名称(由客户端设置) | +| 15 | user_ip | BINARY(16) | 应用所使用的 IP 地址 (由客户端设置) | +| 16 | phase_state | BINARY(16) | 查询当前阶段 / 状态 | +| 17 | phase_start_time | TIMESTAMP | 当前阶段的开始时间 | ## PERF_CONSUMERS diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d45b8078e57a..b46206d8e1c6 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -364,7 +364,7 @@ typedef enum ENodeType { QUERY_NODE_SURROUND, QUERY_NODE_REMOTE_ROW, QUERY_NODE_REMOTE_ZERO_ROWS, - + // Statement nodes are used in parser and planner module. QUERY_NODE_SET_OPERATOR = 100, QUERY_NODE_SELECT_STMT, @@ -444,7 +444,6 @@ typedef enum ENodeType { QUERY_NODE_DROP_TOTP_SECRET_STMT, QUERY_NODE_ALTER_KEY_EXPIRATION_STMT, - // placeholder for [155, 180] QUERY_NODE_SHOW_CREATE_VIEW_STMT = 181, QUERY_NODE_SHOW_CREATE_DATABASE_STMT, @@ -5047,8 +5046,22 @@ typedef struct { typedef struct { int64_t tid; char status[TSDB_JOB_STATUS_LEN]; + int64_t startTs; // sub-task first execution start time, us } SQuerySubDesc; +typedef enum EQueryExecPhase { + QUERY_PHASE_NONE = 0, + QUERY_PHASE_PARSE = 1, + QUERY_PHASE_CATALOG = 2, + QUERY_PHASE_PLAN = 3, + QUERY_PHASE_SCHEDULE = 4, + QUERY_PHASE_EXECUTE = 5, + QUERY_PHASE_FETCH = 6, + QUERY_PHASE_DONE = 7, +} EQueryExecPhase; + +const char* queryPhaseStr(int32_t phase); + typedef struct { char sql[TSDB_SHOW_SQL_LEN]; uint64_t queryId; @@ -5060,6 +5073,8 @@ typedef struct { char fqdn[TSDB_FQDN_LEN]; int32_t subPlanNum; SArray* subDesc; // SArray + int32_t execPhase; // EQueryExecPhase + int64_t phaseStartTime; // when current phase started, ms } SQueryDesc; typedef struct { @@ -5221,9 +5236,14 @@ static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) { static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) { TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pKv->key)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pKv->valueLen)); + + if (pKv->valueLen < 0) { + return TSDB_CODE_INVALID_MSG; + } + pKv->value = taosMemoryMalloc(pKv->valueLen + 1); if (pKv->value == NULL) { - TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY); + return TSDB_CODE_OUT_OF_MEMORY; } TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, (char*)pKv->value)); return 0; @@ -5352,7 +5372,7 @@ typedef struct { typedef struct { char** name; int32_t count; - int8_t igNotExists; + int8_t igNotExists; } SMDropStreamReq; typedef struct { @@ -5403,7 +5423,6 @@ int32_t tSerializeSMRecalcStreamReq(void* buf, int32_t bufLen, const SMRecalcStr int32_t tDeserializeSMRecalcStreamReq(void* buf, int32_t bufLen, SMRecalcStreamReq* pReq); void tFreeMRecalcStreamReq(SMRecalcStreamReq* pReq); - typedef struct SVndSetKeepVersionReq { int64_t keepVersion; } SVndSetKeepVersionReq; @@ -5425,16 +5444,16 @@ typedef struct SVUpdateCheckpointInfoReq { } SVUpdateCheckpointInfoReq; typedef struct { - int64_t leftForVer; - int32_t vgId; - int64_t oldConsumerId; - int64_t newConsumerId; - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - int8_t subType; - int8_t withMeta; - char* qmsg; // SubPlanToString + int64_t leftForVer; + int32_t vgId; + int64_t oldConsumerId; + int64_t newConsumerId; + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int8_t subType; + int8_t withMeta; + char* qmsg; // SubPlanToString SSchemaWrapper schema; - int64_t suid; + int64_t suid; } SMqRebVgReq; int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq); @@ -5904,9 +5923,9 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { } typedef struct { - char topic[TSDB_TOPIC_FNAME_LEN]; - char db[TSDB_DB_FNAME_LEN]; - SArray* vgs; // SArray + char topic[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; + SArray* vgs; // SArray } SMqSubTopicEp; int32_t tEncodeMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp); @@ -5983,8 +6002,8 @@ int32_t tSemiDecodeMqBatchMetaRsp(SDecoder* pDecoder, SMqBatchMetaRsp* pRsp); void tDeleteMqBatchMetaRsp(SMqBatchMetaRsp* pRsp); typedef struct { - int32_t code; - SArray* topics; // SArray + int32_t code; + SArray* topics; // SArray } SMqAskEpRsp; static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) { @@ -6411,7 +6430,6 @@ void setFieldWithOptions(SFieldWithOptions* fieldWithOptions, SField* field); int32_t tSerializeSVSubTablesRspImpl(SEncoder* pEncoder, SVSubTablesRsp* pRsp); int32_t tDeserializeSVSubTablesRspImpl(SDecoder* pDecoder, SVSubTablesRsp* pRsp); - typedef struct { char id[TSDB_INSTANCE_ID_LEN]; char type[TSDB_INSTANCE_TYPE_LEN]; @@ -6677,4 +6695,3 @@ int32_t tDeserializeSScanVnodeReq(void* buf, int32_t bufLen, SScanVnodeReq* pReq #endif #endif /*_TD_COMMON_TAOS_MSG_H_*/ - diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 0997f0ff55e5..71c1095e279b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -326,6 +326,8 @@ typedef struct SRequestObj { SMetaData parseMeta; char* effectiveUser; int8_t source; + int32_t execPhase; // EQueryExecPhase + int64_t phaseStartTime; // when current phase started, ms } SRequestObj; typedef struct SSyncQueryParam { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index e44e5ae45e21..6f131e967069 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -584,6 +584,8 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj (*pRequest)->resType = RES_TYPE__QUERY; (*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid; (*pRequest)->metric.start = taosGetTimestampUs(); + (*pRequest)->execPhase = QUERY_PHASE_NONE; + (*pRequest)->phaseStartTime = 0; (*pRequest)->body.resInfo.convertUcs4 = true; // convert ucs4 by default (*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt; diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 1074c13e03f6..cf19ac3f90e7 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -799,6 +799,8 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) { return TSDB_CODE_FAILED; } desc.subPlanNum = pRequest->body.subplanNum; + desc.execPhase = atomic_load_32(&pRequest->execPhase); + desc.phaseStartTime = atomic_load_64(&pRequest->phaseStartTime); if (desc.subPlanNum) { desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc)); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0bad8148b467..95e6180b68d8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1450,6 +1450,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat int64_t st = taosGetTimestampUs(); if (!pRequest->parseOnly) { + atomic_store_32((int32_t*)&pRequest->execPhase, QUERY_PHASE_PLAN); + atomic_store_64((int64_t*)&pRequest->phaseStartTime, taosGetTimestampMs()); + pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); if (NULL == pMnodeList) { code = terrno; @@ -1517,6 +1520,8 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat }; if (TSDB_CODE_SUCCESS == code) { + pRequest->execPhase = QUERY_PHASE_EXECUTE; + pRequest->phaseStartTime = taosGetTimestampMs(); code = schedulerExecJob(&req, &pRequest->body.queryJob); } taosArrayDestroy(pNodeList); @@ -3393,6 +3398,9 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param void doRequestCallback(SRequestObj* pRequest, int32_t code) { pRequest->inCallback = true; + pRequest->execPhase = QUERY_PHASE_DONE; + pRequest->phaseStartTime = taosGetTimestampMs(); + int64_t this = pRequest->self; if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index ee8035aed0a5..a35beb91f699 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -1868,20 +1868,25 @@ static int32_t getAllMetaAsync(SSqlCallbackWrapper *pWrapper, catalogCallback fp static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t code); static int32_t phaseAsyncQuery(SSqlCallbackWrapper *pWrapper) { - int32_t code = TSDB_CODE_SUCCESS; - switch (pWrapper->pRequest->pQuery->execStage) { + int32_t code = TSDB_CODE_SUCCESS; + SRequestObj *pRequest = pWrapper->pRequest; + switch (pRequest->pQuery->execStage) { case QUERY_EXEC_STAGE_PARSE: { - // continue parse after get metadata + atomic_store_32(&pRequest->execPhase, QUERY_PHASE_CATALOG); + atomic_store_64(&pRequest->phaseStartTime, taosGetTimestampMs()); code = getAllMetaAsync(pWrapper, doAsyncQueryFromParse); break; } case QUERY_EXEC_STAGE_ANALYSE: { - // analysis after get metadata + atomic_store_32(&pRequest->execPhase, QUERY_PHASE_CATALOG); + atomic_store_64(&pRequest->phaseStartTime, taosGetTimestampMs()); code = getAllMetaAsync(pWrapper, doAsyncQueryFromAnalyse); break; } case QUERY_EXEC_STAGE_SCHEDULE: { - launchAsyncQuery(pWrapper->pRequest, pWrapper->pRequest->pQuery, NULL, pWrapper); + atomic_store_32(&pRequest->execPhase, QUERY_PHASE_SCHEDULE); + atomic_store_64(&pRequest->phaseStartTime, taosGetTimestampMs()); + launchAsyncQuery(pRequest, pRequest->pQuery, NULL, pWrapper); break; } default: @@ -2028,6 +2033,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) { SSqlCallbackWrapper *pWrapper = NULL; int32_t code = TSDB_CODE_SUCCESS; + atomic_store_32(&pRequest->execPhase, QUERY_PHASE_PARSE); + atomic_store_64(&pRequest->phaseStartTime, taosGetTimestampMs()); + if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) { code = pRequest->prevCode; terrno = code; @@ -2136,6 +2144,10 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { } SRequestObj *pRequest = res; + + atomic_store_32(&pRequest->execPhase, QUERY_PHASE_FETCH); + atomic_store_64(&pRequest->phaseStartTime, taosGetTimestampMs()); + if (TSDB_SQL_RETRIEVE_EMPTY_RESULT == pRequest->type) { fp(param, res, 0); return; diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index f6ee2a1fa050..48c32be1dd0a 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -44,11 +44,11 @@ #include "tversion.h" #include "streamMsg.h" +#include "tRealloc.h" #include "tanalytics.h" #include "tcol.h" #include "tlog.h" #include "tsha.h" -#include "tRealloc.h" #if defined(WINDOWS) #include @@ -254,6 +254,27 @@ void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) { return (void *)buf; } +const char *queryPhaseStr(int32_t phase) { + switch (phase) { + case QUERY_PHASE_PARSE: + return "parse"; + case QUERY_PHASE_CATALOG: + return "catalog"; + case QUERY_PHASE_PLAN: + return "plan"; + case QUERY_PHASE_SCHEDULE: + return "schedule"; + case QUERY_PHASE_EXECUTE: + return "execute"; + case QUERY_PHASE_FETCH: + return "fetch"; + case QUERY_PHASE_DONE: + return "done"; + default: + return "none"; + } +} + static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pReq) { TAOS_CHECK_RETURN(tEncodeSClientHbKey(pEncoder, &pReq->connKey)); @@ -292,14 +313,16 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR TAOS_CHECK_RETURN(tEncodeI8(pEncoder, desc->isSubQuery)); TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, desc->fqdn)); TAOS_CHECK_RETURN(tEncodeI32(pEncoder, desc->subPlanNum)); - int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, snum)); for (int32_t m = 0; m < snum; ++m) { SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->tid)); TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, sDesc->status)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->startTs)); } + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, desc->execPhase)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->phaseStartTime)); } } else { TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum)); @@ -416,6 +439,10 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) code = (tDecodeCStrTo(pDecoder, sDesc.status)); TAOS_CHECK_GOTO(code, &line, _error); + + code = tDecodeI64(pDecoder, &sDesc.startTs); + TAOS_CHECK_GOTO(code, &line, _error); + if (!taosArrayPush(desc.subDesc, &sDesc)) { code = terrno; TAOS_CHECK_GOTO(code, &line, _error); @@ -428,6 +455,14 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) TAOS_CHECK_GOTO(code, &line, _error); } + if (!tDecodeIsEnd(pDecoder)) { + desc.execPhase = QUERY_PHASE_NONE; + desc.phaseStartTime = 0; + code = tDecodeI32(pDecoder, &desc.execPhase); + TAOS_CHECK_GOTO(code, &line, _error); + code = tDecodeI64(pDecoder, &desc.phaseStartTime); + TAOS_CHECK_GOTO(code, &line, _error); + } if (!taosArrayPush(pReq->query->queryDesc, &desc)) { code = terrno; TAOS_CHECK_GOTO(code, &line, _error); @@ -3213,11 +3248,11 @@ int32_t cvtIpWhiteListDualToV4(SIpWhiteListDual *pWhiteListDual, SIpWhiteList ** } void initUserDefautSessCfg(SUserSessCfg *pCfg) { - pCfg->sessPerUser = -1; - pCfg->sessConnTime = -1; - pCfg->sessConnIdleTime = -1; - pCfg->sessMaxConcurrency = -1; - pCfg->sessMaxCallVnodeNum = -1; + pCfg->sessPerUser = -1; + pCfg->sessConnTime = -1; + pCfg->sessConnIdleTime = -1; + pCfg->sessMaxConcurrency = -1; + pCfg->sessMaxCallVnodeNum = -1; } static int32_t tEncodeSessCfg(SEncoder *encoder, SUserSessCfg *pCfg) { int32_t code = 0; @@ -3242,7 +3277,7 @@ static int32_t tDecodeSessCfg(SDecoder *decoder, SUserSessCfg *pCfg) { _exit: return code; } -void copyIpRange(SIpRange* pDst, const SIpRange* pSrc) { +void copyIpRange(SIpRange *pDst, const SIpRange *pSrc) { memset(pDst, 0, sizeof(*pDst)); pDst->type = pSrc->type; pDst->neg = pSrc->neg; @@ -3253,9 +3288,7 @@ void copyIpRange(SIpRange* pDst, const SIpRange* pSrc) { } } - - -SDateTimeWhiteList* cloneDateTimeWhiteList(const SDateTimeWhiteList* src) { +SDateTimeWhiteList *cloneDateTimeWhiteList(const SDateTimeWhiteList *src) { if (src == NULL) return NULL; int32_t sz = sizeof(SDateTimeWhiteList) + src->num * sizeof(SDateTimeWhiteListItem); @@ -3267,8 +3300,6 @@ SDateTimeWhiteList* cloneDateTimeWhiteList(const SDateTimeWhiteList* src) { return pNew; } - - // isTimeInDateTimeWhiteList checks if the given time is included in the whitelist. // it returns true if: // @@ -3291,7 +3322,7 @@ bool isTimeInDateTimeWhiteList(const SDateTimeWhiteList *wl, int64_t tm) { bool hasWhite = false, inWhite = false; // convert tm to week seconds based on localtime // week starts from Sunday (tm_wday = 0) - time_t t = (time_t)tm; + time_t t = (time_t)tm; struct tm ltm; if (taosLocalTime(&t, <m, NULL, 0, NULL) == NULL) { return false; @@ -3327,8 +3358,7 @@ bool isTimeInDateTimeWhiteList(const SDateTimeWhiteList *wl, int64_t tm) { return (!hasWhite) || inWhite; } - -int32_t tEncodeSDateTimeRange(SEncoder* pEncoder, const SDateTimeRange* pRange) { +int32_t tEncodeSDateTimeRange(SEncoder *pEncoder, const SDateTimeRange *pRange) { TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRange->neg)); TAOS_CHECK_RETURN(tEncodeI16(pEncoder, pRange->year)); TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRange->month)); @@ -3339,7 +3369,7 @@ int32_t tEncodeSDateTimeRange(SEncoder* pEncoder, const SDateTimeRange* pRange) return 0; } -int32_t tDecodeSDateTimeRange(SDecoder* pDecoder, SDateTimeRange* pRange) { +int32_t tDecodeSDateTimeRange(SDecoder *pDecoder, SDateTimeRange *pRange) { TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pRange->neg)); TAOS_CHECK_RETURN(tDecodeI16(pDecoder, &pRange->year)); TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pRange->month)); @@ -3350,7 +3380,6 @@ int32_t tDecodeSDateTimeRange(SDecoder* pDecoder, SDateTimeRange* pRange) { return 0; } - int32_t tSerializeSCreateEncryptAlgrReq(void *buf, int32_t bufLen, SCreateEncryptAlgrReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; @@ -3414,8 +3443,8 @@ int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->pass)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->numIpRanges)); for (int32_t i = 0; i < pReq->numIpRanges; ++i) { - TAOS_CHECK_EXIT(tEncodeU32(&encoder, 0)); // for backward compatibility - TAOS_CHECK_EXIT(tEncodeU32(&encoder, 0)); // for backward compatibility + TAOS_CHECK_EXIT(tEncodeU32(&encoder, 0)); // for backward compatibility + TAOS_CHECK_EXIT(tEncodeU32(&encoder, 0)); // for backward compatibility } ENCODESQL(); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->isImport)); @@ -3495,8 +3524,8 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->numIpRanges)); for (int32_t i = 0; i < pReq->numIpRanges; ++i) { uint32_t dummy; - TAOS_CHECK_EXIT(tDecodeU32(&decoder, &dummy)); // for backward compatibility - TAOS_CHECK_EXIT(tDecodeU32(&decoder, &dummy)); // for backward compatibility + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &dummy)); // for backward compatibility + TAOS_CHECK_EXIT(tDecodeU32(&decoder, &dummy)); // for backward compatibility } DECODESQL(); if (!tDecodeIsEnd(&decoder)) { @@ -4194,7 +4223,6 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) } } - TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->objname)); int32_t len = strlen(pReq->tabName); TAOS_CHECK_EXIT(tEncodeI32(&encoder, len)); @@ -4202,7 +4230,7 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->tabName)); } TAOS_CHECK_EXIT(tEncodeBinary(&encoder, (const uint8_t *)pReq->tagCond, pReq->tagCondLen)); - TAOS_CHECK_EXIT(tEncodeI64(&encoder, 0)); // obsolete + TAOS_CHECK_EXIT(tEncodeI64(&encoder, 0)); // obsolete ENCODESQL(); tEndEncode(&encoder); @@ -4391,7 +4419,7 @@ void tFreeSAlterUserReq(SAlterUserReq *pReq) { FREESQL(); } -int32_t tSerializeSCreateTokenReq(void* buf, int32_t bufLen, SCreateTokenReq* pReq) { +int32_t tSerializeSCreateTokenReq(void *buf, int32_t bufLen, SCreateTokenReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -4420,7 +4448,7 @@ int32_t tSerializeSCreateTokenReq(void* buf, int32_t bufLen, SCreateTokenReq* pR return tlen; } -int32_t tDeserializeSCreateTokenReq(void* buf, int32_t bufLen, SCreateTokenReq* pReq) { +int32_t tDeserializeSCreateTokenReq(void *buf, int32_t bufLen, SCreateTokenReq *pReq) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -4442,9 +4470,7 @@ int32_t tDeserializeSCreateTokenReq(void* buf, int32_t bufLen, SCreateTokenReq* return code; } -void tFreeSCreateTokenReq(SCreateTokenReq* pReq) { - FREESQL(); -} +void tFreeSCreateTokenReq(SCreateTokenReq *pReq) { FREESQL(); } int32_t tSerializeSCreateTokenResp(void *buf, int32_t bufLen, SCreateTokenRsp *pRsp) { SEncoder encoder = {0}; @@ -4490,7 +4516,7 @@ int32_t tDeserializeSCreateTokenResp(void *buf, int32_t bufLen, SCreateTokenRsp void tFreeSCreateTokenResp(SCreateTokenRsp *pRsp) { return; } -int32_t tSerializeSAlterTokenReq(void* buf, int32_t bufLen, SAlterTokenReq* pReq) { +int32_t tSerializeSAlterTokenReq(void *buf, int32_t bufLen, SAlterTokenReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -4534,7 +4560,7 @@ int32_t tSerializeSAlterTokenReq(void* buf, int32_t bufLen, SAlterTokenReq* pReq return tlen; } -int32_t tDeserializeSAlterTokenReq(void* buf, int32_t bufLen, SAlterTokenReq* pReq) { +int32_t tDeserializeSAlterTokenReq(void *buf, int32_t bufLen, SAlterTokenReq *pReq) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -4572,11 +4598,9 @@ int32_t tDeserializeSAlterTokenReq(void* buf, int32_t bufLen, SAlterTokenReq* pR return code; } -void tFreeSAlterTokenReq(SAlterTokenReq* pReq) { - FREESQL(); -} +void tFreeSAlterTokenReq(SAlterTokenReq *pReq) { FREESQL(); } -int32_t tSerializeSDropTokenReq(void* buf, int32_t bufLen, SDropTokenReq* pReq) { +int32_t tSerializeSDropTokenReq(void *buf, int32_t bufLen, SDropTokenReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -4599,7 +4623,7 @@ int32_t tSerializeSDropTokenReq(void* buf, int32_t bufLen, SDropTokenReq* pReq) return tlen; } -int32_t tDeserializeSDropTokenReq(void* buf, int32_t bufLen, SDropTokenReq* pReq) { +int32_t tDeserializeSDropTokenReq(void *buf, int32_t bufLen, SDropTokenReq *pReq) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -4616,11 +4640,9 @@ int32_t tDeserializeSDropTokenReq(void* buf, int32_t bufLen, SDropTokenReq* pReq return code; } -void tFreeSDropTokenReq(SDropTokenReq* pReq) { - FREESQL(); -} +void tFreeSDropTokenReq(SDropTokenReq *pReq) { FREESQL(); } -int32_t tSerializeSCreateTotpSecretReq(void* buf, int32_t bufLen, SCreateTotpSecretReq* pReq) { +int32_t tSerializeSCreateTotpSecretReq(void *buf, int32_t bufLen, SCreateTotpSecretReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -4642,7 +4664,7 @@ int32_t tSerializeSCreateTotpSecretReq(void* buf, int32_t bufLen, SCreateTotpSec return tlen; } -int32_t tDeserializeSCreateTotpSecretReq(void* buf, int32_t bufLen, SCreateTotpSecretReq* pReq) { +int32_t tDeserializeSCreateTotpSecretReq(void *buf, int32_t bufLen, SCreateTotpSecretReq *pReq) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -4658,11 +4680,9 @@ int32_t tDeserializeSCreateTotpSecretReq(void* buf, int32_t bufLen, SCreateTotpS return code; } -void tFreeSCreateTotpSecretReq(SCreateTotpSecretReq* pReq) { - FREESQL(); -} +void tFreeSCreateTotpSecretReq(SCreateTotpSecretReq *pReq) { FREESQL(); } -int32_t tSerializeSCreateTotpSecretRsp(void* buf, int32_t bufLen, SCreateTotpSecretRsp* pRsp) { +int32_t tSerializeSCreateTotpSecretRsp(void *buf, int32_t bufLen, SCreateTotpSecretRsp *pRsp) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -4684,7 +4704,7 @@ int32_t tSerializeSCreateTotpSecretRsp(void* buf, int32_t bufLen, SCreateTotpSec return tlen; } -int32_t tDeserializeSCreateTotpSecretRsp(void* buf, int32_t bufLen, SCreateTotpSecretRsp* pRsp) { +int32_t tDeserializeSCreateTotpSecretRsp(void *buf, int32_t bufLen, SCreateTotpSecretRsp *pRsp) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -4816,7 +4836,7 @@ int32_t tSerializePrivTblPolicies(SEncoder *pEncoder, SHashObj *pHash) { continue; // 1.*.* or 1.db.* } SPrivTblPolicies *pTblPolicies = (SPrivTblPolicies *)pIter; - int32_t nTblPolicies = taosArrayGetSize(pTblPolicies->policy); + int32_t nTblPolicies = taosArrayGetSize(pTblPolicies->policy); TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, nTblPolicies)); for (int32_t j = 0; j < nTblPolicies; ++j) { SPrivTblPolicy *pPolicy = (SPrivTblPolicy *)TARRAY_GET_ELEM(pTblPolicies->policy, j); @@ -4832,7 +4852,7 @@ int32_t tSerializePrivTblPolicies(SEncoder *pEncoder, SHashObj *pHash) { } // encode with clause TAOS_CHECK_EXIT(tEncodeI32v(pEncoder, pPolicy->condLen)); - if (pPolicy->condLen > 0) { // the condLen contains the last '\0' + if (pPolicy->condLen > 0) { // the condLen contains the last '\0' TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pPolicy->cond, pPolicy->condLen - 1)); } } @@ -4870,9 +4890,9 @@ int32_t tDeserializePrivTblPolicies(SDecoder *pDecoder, SHashObj **pHash) { } for (int32_t k = 0; k < nCols; ++k) { SColNameFlag *col = TARRAY_GET_ELEM(policy.cols, k); - TAOS_CHECK_EXIT(tDecodeI16v(pDecoder, &col->colId)); - TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, col->colName)); - TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &col->flags)); + TAOS_CHECK_EXIT(tDecodeI16v(pDecoder, &col->colId)); + TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, col->colName)); + TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &col->flags)); } } // decode with clause @@ -4918,7 +4938,6 @@ int32_t tSerializeTokenStatuses(SEncoder *pEncoder, SHashObj *pHash) { return code; } - int32_t tDeserializeTokenStatuses(SDecoder *pDecoder, SHashObj **pHash) { int32_t code = 0, lino = 0; size_t klen = 0; @@ -4933,7 +4952,7 @@ int32_t tDeserializeTokenStatuses(SDecoder *pDecoder, SHashObj **pHash) { if (*pHash == NULL) { TAOS_CHECK_EXIT(terrno); } - + for (int32_t i = 0; i < nTokens; ++i) { char name[TSDB_TOKEN_NAME_LEN] = {0}; TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, name)); @@ -4969,7 +4988,7 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp) TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, nOwnedDbs)); void *pIter = NULL; while ((pIter = taosHashIterate(pRsp->ownedDbs, pIter))) { - char *key = taosHashGetKey(pIter, NULL); // key: dbFName + char *key = taosHashGetKey(pIter, NULL); // key: dbFName TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, key)); } @@ -5450,9 +5469,7 @@ void tIpRangeSetDefaultMask(SIpRange *range) { } void tFreeSGetUserIpWhiteListDualRsp(SGetUserIpWhiteListRsp *pRsp) { taosMemoryFree(pRsp->pWhiteListsDual); } - - -bool isValidDateTimeRange(SDateTimeRange* pRange) { +bool isValidDateTimeRange(SDateTimeRange *pRange) { if (pRange->hour < 0 || pRange->hour > 23) { return false; } @@ -5492,20 +5509,17 @@ bool isValidDateTimeRange(SDateTimeRange* pRange) { return true; } - - - -void DateTimeRangeToWhiteListItem(SDateTimeWhiteListItem* dst, const SDateTimeRange* src) { +void DateTimeRangeToWhiteListItem(SDateTimeWhiteListItem *dst, const SDateTimeRange *src) { if (src->month == -1) { dst->start = src->day * 86400 + src->hour * 3600 + src->minute * 60; dst->absolute = false; } else { - struct tm t = { 0 }; + struct tm t = {0}; t.tm_year = src->year - 1900; - t.tm_mon = src->month - 1; + t.tm_mon = src->month - 1; t.tm_mday = src->day; t.tm_hour = src->hour; - t.tm_min = src->minute; + t.tm_min = src->minute; dst->start = taosMktime(&t, NULL); dst->absolute = true; } @@ -5513,8 +5527,7 @@ void DateTimeRangeToWhiteListItem(SDateTimeWhiteListItem* dst, const SDateTimeRa dst->neg = (src->neg != 0); } - -bool isDateTimeWhiteListItemExpired(const SDateTimeWhiteListItem* item) { +bool isDateTimeWhiteListItemExpired(const SDateTimeWhiteListItem *item) { if (!item->absolute) { return false; } @@ -5523,8 +5536,7 @@ bool isDateTimeWhiteListItemExpired(const SDateTimeWhiteListItem* item) { return now > (item->start + item->duration); } - -int32_t tSerializeSUserDateTimeWhiteList(void* buf, int32_t bufLen, SUserDateTimeWhiteList* pRsp) { +int32_t tSerializeSUserDateTimeWhiteList(void *buf, int32_t bufLen, SUserDateTimeWhiteList *pRsp) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -5555,9 +5567,7 @@ int32_t tSerializeSUserDateTimeWhiteList(void* buf, int32_t bufLen, SUserDateTim return tlen; } - - -int32_t tDeserializeSUserDateTimeWhiteList(void* buf, int32_t bufLen, SUserDateTimeWhiteList* pRsp) { +int32_t tDeserializeSUserDateTimeWhiteList(void *buf, int32_t bufLen, SUserDateTimeWhiteList *pRsp) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -5587,11 +5597,7 @@ int32_t tDeserializeSUserDateTimeWhiteList(void* buf, int32_t bufLen, SUserDateT return code; } - - -void tFreeSUserDateTimeWhiteList(SUserDateTimeWhiteList* pRsp) { - taosMemoryFree(pRsp->pWhiteLists); -} +void tFreeSUserDateTimeWhiteList(SUserDateTimeWhiteList *pRsp) { taosMemoryFree(pRsp->pWhiteLists); } int32_t cloneSUserDateTimeWhiteList(const SUserDateTimeWhiteList *src, SUserDateTimeWhiteList *dest) { if (src == NULL || dest == NULL) { @@ -5614,8 +5620,7 @@ int32_t cloneSUserDateTimeWhiteList(const SUserDateTimeWhiteList *src, SUserDate return TSDB_CODE_SUCCESS; } - -int32_t tSerializeSRetrieveDateTimeWhiteListRsp(void* buf, int32_t bufLen, SRetrieveDateTimeWhiteListRsp* pRsp) { +int32_t tSerializeSRetrieveDateTimeWhiteListRsp(void *buf, int32_t bufLen, SRetrieveDateTimeWhiteListRsp *pRsp) { SEncoder encoder = {0}; int32_t code = 0; int32_t lino; @@ -5626,13 +5631,13 @@ int32_t tSerializeSRetrieveDateTimeWhiteListRsp(void* buf, int32_t bufLen, SRetr TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->ver)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->numOfUser)); for (int i = 0; i < pRsp->numOfUser; ++i) { - SUserDateTimeWhiteList* pUser = &pRsp->pUsers[i]; + SUserDateTimeWhiteList *pUser = &pRsp->pUsers[i]; TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pUser->user)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pUser->ver)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pUser->numWhiteLists)); for (int j = 0; j < pUser->numWhiteLists; ++j) { - SDateTimeWhiteListItem* range = &pUser->pWhiteLists[j]; + SDateTimeWhiteListItem *range = &pUser->pWhiteLists[j]; TAOS_CHECK_EXIT(tEncodeBool(&encoder, range->absolute)); TAOS_CHECK_EXIT(tEncodeBool(&encoder, range->neg)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, range->start)); @@ -5694,7 +5699,7 @@ int32_t cloneDataTimeWhiteListRsp(const SRetrieveDateTimeWhiteListRsp *src, SRet *dest = p; return code; } -int32_t tDeserializeSRetrieveDateTimeWhiteListRsp(void* buf, int32_t bufLen, SRetrieveDateTimeWhiteListRsp* pRsp) { +int32_t tDeserializeSRetrieveDateTimeWhiteListRsp(void *buf, int32_t bufLen, SRetrieveDateTimeWhiteListRsp *pRsp) { SDecoder decoder = {0}; int32_t code = 0; int32_t lino; @@ -5710,7 +5715,7 @@ int32_t tDeserializeSRetrieveDateTimeWhiteListRsp(void* buf, int32_t bufLen, SRe } for (int32_t i = 0; i < pRsp->numOfUser; ++i) { - SUserDateTimeWhiteList* pUser = pRsp->pUsers + i; + SUserDateTimeWhiteList *pUser = pRsp->pUsers + i; TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pUser->user)); TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pUser->ver)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pUser->numWhiteLists)); @@ -5735,8 +5740,6 @@ int32_t tDeserializeSRetrieveDateTimeWhiteListRsp(void* buf, int32_t bufLen, SRe return code; } - - void tFreeSRetrieveDateTimeWhiteListRsp(SRetrieveDateTimeWhiteListRsp *pRsp) { if (pRsp == NULL) { return; @@ -5750,7 +5753,6 @@ void tFreeSRetrieveDateTimeWhiteListRsp(SRetrieveDateTimeWhiteListRsp *pRsp) { taosMemoryFree(pRsp->pUsers); } - int32_t tSerializeSMCfgClusterReq(void *buf, int32_t bufLen, SMCfgClusterReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; @@ -7313,7 +7315,7 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { pReq->isAudit = 0; } - if(!tDecodeIsEnd(&decoder)) { + if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->allowDrop)); } else { pReq->allowDrop = TSDB_DEFAULT_DB_ALLOW_DROP; @@ -10462,28 +10464,16 @@ int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { return code; } -static void tCalculateConnectReqSignature(const SConnectReq* pReq, char* signature) { +static void tCalculateConnectReqSignature(const SConnectReq *pReq, char *signature) { char buf[2048]; - int n = snprintf(buf, sizeof(buf), "%d|%d|%d|%s|%s|%s|%s|%" PRId64 "|%" PRId64 "|%s|%s", - pReq->connType, - pReq->pid, - pReq->totpCode, - pReq->app, - pReq->db, - pReq->user, - pReq->token, - pReq->startTime, - pReq->connectTime, - pReq->sVer, - td_edition_signature_salt - ); - tSHA1(signature,buf, n); -} - -void tSignConnectReq(SConnectReq *pReq) { - tCalculateConnectReqSignature(pReq, pReq->signature); + int n = snprintf(buf, sizeof(buf), "%d|%d|%d|%s|%s|%s|%s|%" PRId64 "|%" PRId64 "|%s|%s", pReq->connType, pReq->pid, + pReq->totpCode, pReq->app, pReq->db, pReq->user, pReq->token, pReq->startTime, pReq->connectTime, + pReq->sVer, td_edition_signature_salt); + tSHA1(signature, buf, n); } +void tSignConnectReq(SConnectReq *pReq) { tCalculateConnectReqSignature(pReq, pReq->signature); } + int32_t tVerifyConnectReqSignature(const SConnectReq *pReq) { int64_t timeDiff = taosGetTimestampMs() - pReq->connectTime; if (timeDiff < 0) { @@ -10606,7 +10596,7 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { pRsp->tokenName[0] = 0; pRsp->userId = 0; } - + tEndDecode(&decoder); _exit: @@ -13006,7 +12996,7 @@ int32_t tDeserializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) { return code; } -int32_t tSerializeSDownstreamSourceNode(SEncoder* pEncoder, SDownstreamSourceNode* pSource) { +int32_t tSerializeSDownstreamSourceNode(SEncoder *pEncoder, SDownstreamSourceNode *pSource) { int32_t code = 0; int32_t lino; @@ -13058,7 +13048,7 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { int32_t subEndpointNum = taosArrayGetSize(pReq->subEndPoints); TAOS_CHECK_EXIT(tEncodeI32(&encoder, subEndpointNum)); for (int32_t i = 0; i < subEndpointNum; ++i) { - SDownstreamSourceNode* pSource = taosArrayGetP(pReq->subEndPoints, i); + SDownstreamSourceNode *pSource = taosArrayGetP(pReq->subEndPoints, i); TAOS_CHECK_EXIT(tSerializeSDownstreamSourceNode(&encoder, pSource)); } @@ -13082,9 +13072,9 @@ int32_t tSerializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) { } } -int32_t tDeserializeSDownstreamSourceNode(SDecoder* pDecoder, SDownstreamSourceNode* pSource) { - int32_t code = 0; - int32_t lino; +int32_t tDeserializeSDownstreamSourceNode(SDecoder *pDecoder, SDownstreamSourceNode *pSource) { + int32_t code = 0; + int32_t lino; TAOS_CHECK_EXIT(tDecodeSQueryNodeAddr(pDecoder, &pSource->addr)); TAOS_CHECK_EXIT(tDecodeU64(pDecoder, &pSource->clientId)); @@ -13142,8 +13132,8 @@ int32_t tDeserializeSSubQueryMsg(void *buf, int32_t bufLen, SSubQueryMsg *pReq) if (NULL == pReq->subEndPoints) { TAOS_CHECK_EXIT(terrno); } - for(int32_t i = 0; i < subEndpointNum; ++i) { - SDownstreamSourceNode** ppSource = taosArrayReserve(pReq->subEndPoints, 1); + for (int32_t i = 0; i < subEndpointNum; ++i) { + SDownstreamSourceNode **ppSource = taosArrayReserve(pReq->subEndPoints, 1); if (NULL == ppSource) { TAOS_CHECK_EXIT(terrno); } @@ -13189,8 +13179,7 @@ int32_t tSerializeSOperatorParam(SEncoder *pEncoder, SOperatorParam *pOpParam) { } case QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { - STableScanOperatorParam *pScan = - (STableScanOperatorParam *)pOpParam->value; + STableScanOperatorParam *pScan = (STableScanOperatorParam *)pOpParam->value; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pScan->paramType)); switch (pScan->paramType) { @@ -13295,7 +13284,7 @@ int32_t tSerializeSOperatorParam(SEncoder *pEncoder, SOperatorParam *pOpParam) { } int32_t tDeserializeSOperatorParam(SDecoder *pDecoder, SOperatorParam *pOpParam) { - TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t*)&pOpParam->opType)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, (int32_t *)&pOpParam->opType)); TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pOpParam->downstreamIdx)); TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pOpParam->reUse)); @@ -13517,7 +13506,8 @@ int32_t tSerializeSResFetchReq(void *buf, int32_t bufLen, SResFetchReq *pReq, bo TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); if (pReq->pStRtFuncInfo) { TAOS_CHECK_EXIT(tEncodeI32(&encoder, 1)); - TAOS_CHECK_EXIT(tSerializeStRtFuncInfo(&encoder, pReq->pStRtFuncInfo, /* pReq->reset && */ needStreamPesudoFuncVals)); + TAOS_CHECK_EXIT( + tSerializeStRtFuncInfo(&encoder, pReq->pStRtFuncInfo, /* pReq->reset && */ needStreamPesudoFuncVals)); } else { TAOS_CHECK_EXIT(tEncodeI32(&encoder, 0)); } @@ -15762,7 +15752,7 @@ int32_t tEncodeMqDataRspCommon(SEncoder *pEncoder, const SMqDataRsp *pRsp) { int32_t bLen = *(int32_t *)taosArrayGet(pRsp->blockDataLen, i); void *data = taosArrayGetP(pRsp->blockData, i); TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, (const uint8_t *)data, bLen)); - if (pRsp->withSchema){ + if (pRsp->withSchema) { SSchemaWrapper *pSW = (SSchemaWrapper *)taosArrayGetP(pRsp->blockSchema, i); TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, pSW)); } @@ -15812,7 +15802,6 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { TAOS_CHECK_EXIT(terrno); } } - for (int32_t i = 0; i < pRsp->blockNum; i++) { void *data = NULL; @@ -15864,7 +15853,7 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t*)(&pRsp->timeout))); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)(&pRsp->timeout))); } return 0; @@ -15949,7 +15938,7 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t*)(&pRsp->timeout))); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)(&pRsp->timeout))); } _exit: return code; @@ -16560,9 +16549,7 @@ void *tDecodeMqSubTopicEp(void *buf, SMqSubTopicEp *pTopicEp) { return buf; } -void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { - taosArrayDestroy(pSubTopicEp->vgs); -} +void tDeleteMqSubTopicEp(SMqSubTopicEp *pSubTopicEp) { taosArrayDestroy(pSubTopicEp->vgs); } int32_t tSerializeSCMCreateViewReq(void *buf, int32_t bufLen, const SCMCreateViewReq *pReq) { SEncoder encoder = {0}; diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 2ee3dd1899b7..75c5a987aa1b 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -867,6 +867,8 @@ static const SSysDbTableSchema querySchema[] = { {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "user_app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "user_ip", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "phase_state", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "phase_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, }; static const SSysDbTableSchema appSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 4bd21c02e4fa..597f594e71e1 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -1271,20 +1271,37 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc } char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; - int64_t reserve = 64; + int64_t reserve = 128; int32_t strSize = sizeof(subStatus); int32_t offset = VARSTR_HEADER_SIZE; for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) { if (i) { offset += tsnprintf(subStatus + offset, sizeof(subStatus) - offset, ","); } - if (offset + reserve < strSize) { - SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); - offset += - tsnprintf(subStatus + offset, sizeof(subStatus) - offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); - } else { - break; + if (offset + reserve >= strSize) break; + + SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); + if (NULL == pDesc) break; + + char startBuf[32] = {0}; + (void)snprintf(startBuf, sizeof(startBuf), "-"); + if (pDesc->startTs > 0) { + time_t startSec = (time_t)(pDesc->startTs / 1000000); + int32_t startFrac = (int32_t)(pDesc->startTs % 1000000) / 1000; + struct tm startTm; + if (taosLocalTime(&startSec, &startTm, NULL, 0, NULL) != NULL) { + size_t n = taosStrfTime(startBuf, sizeof(startBuf), "%Y-%m-%d %H:%M:%S", &startTm); + if (tsnprintf(startBuf + n, sizeof(startBuf) - n, ".%03d", startFrac) < 0) { + mError("failed to format start time for sub query since %s", tstrerror(terrno)); + code = terrno; + taosRUnLockLatch(&pConn->queryLock); + return code; + } + } } + + offset += tsnprintf(subStatus + offset, sizeof(subStatus) - offset, + "%" PRIu64 ":%s:%s", pDesc->tid, pDesc->status, startBuf); } varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -1326,6 +1343,25 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc return code; } + const char* phaseStr = queryPhaseStr(pQuery->execPhase); + char phaseVarStr[16 + VARSTR_HEADER_SIZE]; + STR_TO_VARSTR(phaseVarStr, phaseStr); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, curRowIndex, (const char *)phaseVarStr, false); + if (code != 0) { + mError("failed to set current phase since %s", tstrerror(code)); + taosRUnLockLatch(&pConn->queryLock); + return code; + } + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->phaseStartTime, false); + if (code != 0) { + mError("failed to set phase start time since %s", tstrerror(code)); + taosRUnLockLatch(&pConn->queryLock); + return code; + } + pBlock->info.rows++; } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index deb6507125c0..cdf04b8f03b2 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -88,6 +88,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * } SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); + pTask->profile.startTs = taosGetTimestampUs(); SCH_TASK_DLOG("task initialized, max retry(exec):%d(%d), max retry duration:%.2fs", pTask->maxRetryTimes, pTask->maxExecTimes, (pTask->redirectCtx.redirectDelayMs * pTask->maxRetryTimes) / 1000.0); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 5494e36a04e4..2e5954cdfb1f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -118,6 +118,7 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) { SQuerySubDesc subDesc = {0}; subDesc.tid = pTask->taskId; TAOS_STRCPY(subDesc.status, jobTaskStatusStr(pTask->status)); + subDesc.startTs = pTask->profile.startTs; if (NULL == taosArrayPush(pSub, &subDesc)) { qError("taosArrayPush task %d failed, error:0x%x", m, terrno); diff --git a/test/cases/21-MetaData/test_meta_information_schema.py b/test/cases/21-MetaData/test_meta_information_schema.py index b28abd1308f5..f762c0d2b17f 100644 --- a/test/cases/21-MetaData/test_meta_information_schema.py +++ b/test/cases/21-MetaData/test_meta_information_schema.py @@ -1089,7 +1089,7 @@ def ins_columns_check(self): tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") - tdSql.checkRows(72) + tdSql.checkRows(74) def ins_dnodes_check(self): tdSql.execute('drop database if exists db2') diff --git a/test/cases/24-Users/test_query_phase_tracking.py b/test/cases/24-Users/test_query_phase_tracking.py new file mode 100644 index 000000000000..8fb6bf7745a0 --- /dev/null +++ b/test/cases/24-Users/test_query_phase_tracking.py @@ -0,0 +1,260 @@ +import time +from new_test_framework.utils import tdLog, tdSql + + +class TestQueryPhaseTracking: + """Test cases for query execution phase tracking feature. + + This feature adds phase_state and phase_start_time columns to show queries output + to help track query execution phases for performance analysis. + + Phases: none, parse, catalog, plan, schedule, execute, fetch, done + """ + + def setup_class(cls): + tdLog.debug(f"start to execute {__file__}") + tdSql.execute("drop database if exists db") + tdSql.execute("drop database if exists db2") + + def test_show_queries_schema(self): + """Schema: Verify new columns in show queries + + 1. Verify that phase_state column exists in show queries output + 2. Verify that phase_start_time column exists in show queries output + 3. Verify the column types are correct + + Since: v3.3.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-3-6 Created for query phase tracking feature + + """ + tdLog.info("=============== test show queries schema") + tdSql.execute(f"create database if not exists db") + tdSql.execute(f"use db") + tdSql.execute(f"create table db.stb (ts timestamp, i int) tags (t int)") + tdSql.execute(f"create table db.ctb using db.stb tags (1)") + tdSql.execute(f"insert into db.ctb values (now, 1)") + + tdSql.query(f"select * from db.stb") + + col_names = tdSql.getColNameList("show queries") + tdLog.info(f"show queries columns: {col_names}") + + assert "phase_state" in col_names, "phase_state column should exist" + assert "phase_start_time" in col_names, "phase_start_time column should exist" + + print("test show queries schema ....................... [passed]") + + def test_query_phase_values(self): + """Phase: Verify query phase values + + 1. Execute a query and verify phase_state is a valid phase string + 2. Verify phase_start_time is a valid timestamp + 3. Test that phase values are one of: none, parse, catalog, plan, schedule, execute, fetch, done + + Since: v3.3.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-3-6 Created for query phase tracking feature + + """ + tdLog.info("=============== test query phase values") + tdSql.execute(f"use db") + + tdSql.query(f"select count(*) from db.stb") + tdSql.checkData(0, 0, 1) + + tdSql.query(f"show queries") + + valid_phases = ["none", "parse", "catalog", "plan", "schedule", "execute", "fetch", "done"] + + if tdSql.getRows() > 0: + col_names = [desc[0] for desc in tdSql.cursor.description] + phase_idx = col_names.index("phase_state") if "phase_state" in col_names else -1 + time_idx = col_names.index("phase_start_time") if "phase_start_time" in col_names else -1 + + if phase_idx >= 0: + phase_value = tdSql.getData(0, phase_idx) + tdLog.info(f"Current phase: {phase_value}") + assert phase_value in valid_phases, f"Phase should be one of {valid_phases}, got {phase_value}" + + if time_idx >= 0: + time_value = tdSql.getData(0, time_idx) + tdLog.info(f"Phase start time: {time_value}") + + print("test query phase values ....................... [passed]") + + def test_long_running_query_phase(self): + """Long Query: Verify phase tracking for longer queries + + 1. Create a table with more data + 2. Execute a longer running query + 3. Verify phase information is captured correctly + + Since: v3.3.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-3-6 Created for query phase tracking feature + + """ + tdLog.info("=============== test long running query phase") + tdSql.execute(f"use db") + tdSql.execute(f"create table if not exists db.lt (ts timestamp, v1 int, v2 float, v3 double)") + + for i in range(100): + tdSql.execute(f"insert into db.lt values (now + {i}s, {i}, {i}.5, {i}.123456)") + + tdSql.query(f"select count(*), avg(v1), sum(v2), max(v3) from db.lt") + tdSql.checkRows(1) + + tdSql.query(f"show queries") + tdLog.info(f"Active queries count: {tdSql.getRows()}") + + print("test long running query phase ....................... [passed]") + + def test_concurrent_queries_phase(self): + """Concurrent: Verify phase tracking with multiple queries + + 1. Create multiple tables + 2. Execute multiple queries + 3. Verify each query has correct phase information + + Since: v3.3.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-3-6 Created for query phase tracking feature + + """ + tdLog.info("=============== test concurrent queries phase") + tdSql.execute(f"use db") + + for i in range(5): + tdSql.execute(f"create table if not exists db.t{i} (ts timestamp, v int)") + tdSql.execute(f"insert into db.t{i} values (now, {i})") + + for i in range(5): + tdSql.query(f"select * from db.t{i}") + assert tdSql.getRows() >= 1, f"table db.t{i} should have at least 1 row" + + tdSql.query(f"show queries") + tdLog.info(f"Total queries shown: {tdSql.getRows()}") + + print("test concurrent queries phase ....................... [passed]") + + def test_phase_timing_accuracy(self): + """Timing: Verify phase_start_time accuracy + + 1. Record current timestamp before query + 2. Execute query + 3. Verify phase_start_time is within reasonable range of recorded time + + Since: v3.3.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-3-6 Created for query phase tracking feature + + """ + tdLog.info("=============== test phase timing accuracy") + tdSql.execute(f"use db") + + before_time = int(time.time() * 1000) + + tdSql.query(f"select * from db.stb") + + after_time = int(time.time() * 1000) + + tdSql.query(f"show queries") + + if tdSql.getRows() > 0: + col_names = [desc[0] for desc in tdSql.cursor.description] + time_idx = col_names.index("phase_start_time") if "phase_start_time" in col_names else -1 + + if time_idx >= 0: + query_time = tdSql.getData(0, time_idx) + tdLog.info(f"Before: {before_time}, Query: {query_time}, After: {after_time}") + + print("test phase timing accuracy ....................... [passed]") + + def test_sub_status_timing_format(self): + """SubTask: Verify sub_status includes timing info with human-readable time + + 1. Create a supertable with multiple child tables to generate sub-tasks + 2. Execute a distributed query to trigger sub-plan execution + 3. Verify sub_status format contains tid:status:startTime + where startTime is human-readable (e.g. 2026-03-12 10:00:00.123) or "-" + + Since: v3.3.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-3-10 Created for sub-task timing tracking feature + - 2026-3-12 Changed time format from unix ms to human-readable + + """ + tdLog.info("=============== test sub status timing format") + tdSql.execute(f"create database if not exists db2 vgroups 2") + tdSql.execute(f"use db2") + tdSql.execute(f"create table db2.stb2 (ts timestamp, v int) tags (t int)") + for i in range(4): + tdSql.execute(f"create table db2.ct{i} using db2.stb2 tags ({i})") + tdSql.execute(f"insert into db2.ct{i} values (now, {i})") + + tdSql.query(f"select count(*) from db2.stb2 group by tbname") + + tdSql.query(f"show queries") + if tdSql.getRows() > 0: + col_names = [desc[0] for desc in tdSql.cursor.description] + sub_status_idx = col_names.index("sub_status") if "sub_status" in col_names else -1 + sub_num_idx = col_names.index("sub_num") if "sub_num" in col_names else -1 + + if sub_num_idx >= 0: + sub_num = tdSql.getData(0, sub_num_idx) + tdLog.info(f"Sub plan num: {sub_num}") + + if sub_status_idx >= 0: + sub_status = tdSql.getData(0, sub_status_idx) + tdLog.info(f"Sub status: {sub_status}") + if sub_status: + parts = sub_status.split(",") + for part in parts: + fields = part.split(":", 2) + tdLog.info(f" Sub-task fields: {fields}") + assert len(fields) == 3, \ + f"sub_status entry should have 3 fields (tid:status:startTime), got {len(fields)}: {part}" + tid_str, status, start_time = fields + assert tid_str.isdigit(), f"tid should be numeric, got: {tid_str}" + assert len(status) > 0, f"status should not be empty" + if start_time != "-": + assert "." in start_time, \ + f"startTime should be human-readable (YYYY-MM-DD HH:MM:SS.ms) or '-', got: {start_time}" + + tdSql.execute(f"drop database if exists db2") + print("test sub status timing format ....................... [passed]") + + def cleanup_class(cls): + tdLog.info(f"cleanup {__file__}") + tdSql.execute(f"drop database if exists db")