Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions include/common/tmsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -5062,8 +5062,23 @@ typedef struct {
typedef struct {
int64_t tid;
char status[TSDB_JOB_STATUS_LEN];
int64_t startTs; // sub-task first execution start time, us
int64_t endTs; // sub-task end time, us (0 if not finished)
} SQuerySubDesc;

typedef enum EQueryExecPhase {
QUERY_PHASE_NONE = 0,
QUERY_PHASE_PARSE = 1,
QUERY_PHASE_CATALOG = 2,
QUERY_PHASE_PLAN = 3,
QUERY_PHASE_SCHEDULE = 4,
QUERY_PHASE_EXECUTE = 5,
QUERY_PHASE_FETCH = 6,
QUERY_PHASE_DONE = 7,
} EQueryExecPhase;

const char* queryPhaseStr(int32_t phase);

typedef struct {
char sql[TSDB_SHOW_SQL_LEN];
uint64_t queryId;
Expand All @@ -5075,6 +5090,8 @@ typedef struct {
char fqdn[TSDB_FQDN_LEN];
int32_t subPlanNum;
SArray* subDesc; // SArray<SQuerySubDesc>
int32_t execPhase; // EQueryExecPhase
int64_t phaseStartTime; // when current phase started, ms
} SQueryDesc;

typedef struct {
Expand Down
2 changes: 2 additions & 0 deletions source/client/inc/clientInt.h
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ typedef struct SRequestObj {
SMetaData parseMeta;
char* effectiveUser;
int8_t source;
int32_t execPhase; // EQueryExecPhase
int64_t phaseStartTime; // when current phase started, ms
} SRequestObj;

typedef struct SSyncQueryParam {
Expand Down
2 changes: 2 additions & 0 deletions source/client/src/clientEnv.c
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ int32_t createRequest(uint64_t connId, int32_t type, int64_t reqid, SRequestObj
(*pRequest)->resType = RES_TYPE__QUERY;
(*pRequest)->requestId = reqid == 0 ? generateRequestId() : reqid;
(*pRequest)->metric.start = taosGetTimestampUs();
(*pRequest)->execPhase = QUERY_PHASE_NONE;
(*pRequest)->phaseStartTime = 0;

(*pRequest)->body.resInfo.convertUcs4 = true; // convert ucs4 by default
(*pRequest)->body.resInfo.charsetCxt = pTscObj->optionInfo.charsetCxt;
Expand Down
2 changes: 2 additions & 0 deletions source/client/src/clientHb.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,8 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
return TSDB_CODE_FAILED;
}
desc.subPlanNum = pRequest->body.subplanNum;
desc.execPhase = pRequest->execPhase;
desc.phaseStartTime = pRequest->phaseStartTime;

if (desc.subPlanNum) {
desc.subDesc = taosArrayInit(desc.subPlanNum, sizeof(SQuerySubDesc));
Expand Down
8 changes: 8 additions & 0 deletions source/client/src/clientImpl.c
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,9 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
int64_t st = taosGetTimestampUs();

if (!pRequest->parseOnly) {
pRequest->execPhase = QUERY_PHASE_PLAN;
pRequest->phaseStartTime = taosGetTimestampMs();

pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
if (NULL == pMnodeList) {
code = terrno;
Expand Down Expand Up @@ -1517,6 +1520,8 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat
};

if (TSDB_CODE_SUCCESS == code) {
pRequest->execPhase = QUERY_PHASE_EXECUTE;
pRequest->phaseStartTime = taosGetTimestampMs();
code = schedulerExecJob(&req, &pRequest->body.queryJob);
}
taosArrayDestroy(pNodeList);
Expand Down Expand Up @@ -3393,6 +3398,9 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param

void doRequestCallback(SRequestObj* pRequest, int32_t code) {
pRequest->inCallback = true;
pRequest->execPhase = QUERY_PHASE_DONE;
pRequest->phaseStartTime = taosGetTimestampMs();

int64_t this = pRequest->self;
if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
(code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
Expand Down
22 changes: 17 additions & 5 deletions source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1868,20 +1868,25 @@ static int32_t getAllMetaAsync(SSqlCallbackWrapper *pWrapper, catalogCallback fp
static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t code);

static int32_t phaseAsyncQuery(SSqlCallbackWrapper *pWrapper) {
int32_t code = TSDB_CODE_SUCCESS;
switch (pWrapper->pRequest->pQuery->execStage) {
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj *pRequest = pWrapper->pRequest;
switch (pRequest->pQuery->execStage) {
case QUERY_EXEC_STAGE_PARSE: {
// continue parse after get metadata
pRequest->execPhase = QUERY_PHASE_CATALOG;
pRequest->phaseStartTime = taosGetTimestampMs();
code = getAllMetaAsync(pWrapper, doAsyncQueryFromParse);
break;
}
case QUERY_EXEC_STAGE_ANALYSE: {
// analysis after get metadata
pRequest->execPhase = QUERY_PHASE_CATALOG;
pRequest->phaseStartTime = taosGetTimestampMs();
code = getAllMetaAsync(pWrapper, doAsyncQueryFromAnalyse);
break;
}
case QUERY_EXEC_STAGE_SCHEDULE: {
launchAsyncQuery(pWrapper->pRequest, pWrapper->pRequest->pQuery, NULL, pWrapper);
pRequest->execPhase = QUERY_PHASE_SCHEDULE;
pRequest->phaseStartTime = taosGetTimestampMs();
launchAsyncQuery(pRequest, pRequest->pQuery, NULL, pWrapper);
break;
}
default:
Expand Down Expand Up @@ -2028,6 +2033,9 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
SSqlCallbackWrapper *pWrapper = NULL;
int32_t code = TSDB_CODE_SUCCESS;

pRequest->execPhase = QUERY_PHASE_PARSE;
pRequest->phaseStartTime = taosGetTimestampMs();

if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
code = pRequest->prevCode;
terrno = code;
Expand Down Expand Up @@ -2136,6 +2144,10 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
}

SRequestObj *pRequest = res;

pRequest->execPhase = QUERY_PHASE_FETCH;
pRequest->phaseStartTime = taosGetTimestampMs();

if (TSDB_SQL_RETRIEVE_EMPTY_RESULT == pRequest->type) {
fp(param, res, 0);
return;
Expand Down
44 changes: 43 additions & 1 deletion source/common/src/msg/tmsg.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,19 @@ void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) {
return (void *)buf;
}

const char* queryPhaseStr(int32_t phase) {
switch (phase) {
case QUERY_PHASE_PARSE: return "parse";
case QUERY_PHASE_CATALOG: return "catalog";
case QUERY_PHASE_PLAN: return "plan";
case QUERY_PHASE_SCHEDULE: return "schedule";
case QUERY_PHASE_EXECUTE: return "execute";
case QUERY_PHASE_FETCH: return "fetch";
case QUERY_PHASE_DONE: return "done";
default: return "none";
}
}

static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pReq) {
TAOS_CHECK_RETURN(tEncodeSClientHbKey(pEncoder, &pReq->connKey));

Expand Down Expand Up @@ -292,14 +305,20 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, desc->isSubQuery));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, desc->fqdn));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, desc->subPlanNum));

int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0;
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, snum));
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->tid));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, sDesc->status));
}
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, desc->execPhase));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->phaseStartTime));
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->startTs));
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->endTs));
}
}
} else {
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum));
Expand Down Expand Up @@ -428,6 +447,29 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
TAOS_CHECK_GOTO(code, &line, _error);
}

desc.execPhase = QUERY_PHASE_NONE;
desc.phaseStartTime = 0;
if (!tDecodeIsEnd(pDecoder)) {
code = tDecodeI32(pDecoder, &desc.execPhase);
TAOS_CHECK_GOTO(code, &line, _error);
code = tDecodeI64(pDecoder, &desc.phaseStartTime);
TAOS_CHECK_GOTO(code, &line, _error);

if (!tDecodeIsEnd(pDecoder) && snum > 0) {
for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc.subDesc, m);
if (NULL == sDesc) {
code = TSDB_CODE_INVALID_MSG;
TAOS_CHECK_GOTO(code, &line, _error);
}
code = tDecodeI64(pDecoder, &sDesc->startTs);
TAOS_CHECK_GOTO(code, &line, _error);
code = tDecodeI64(pDecoder, &sDesc->endTs);
TAOS_CHECK_GOTO(code, &line, _error);
}
}
}

if (!taosArrayPush(pReq->query->queryDesc, &desc)) {
code = terrno;
TAOS_CHECK_GOTO(code, &line, _error);
Expand Down
2 changes: 2 additions & 0 deletions source/common/src/systable.c
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,8 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "user_app", .bytes = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "user_ip", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "current_phase", .bytes = 16 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "phase_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
};

static const SSysDbTableSchema appSchema[] = {
Expand Down
25 changes: 23 additions & 2 deletions source/dnode/mnode/impl/src/mndProfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -1280,8 +1280,10 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
}
if (offset + reserve < strSize) {
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset +=
tsnprintf(subStatus + offset, sizeof(subStatus) - offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
int64_t startMs = pDesc->startTs / 1000;
int64_t endMs = pDesc->endTs / 1000;
offset += tsnprintf(subStatus + offset, sizeof(subStatus) - offset,
"%" PRIu64 ":%s:%" PRId64 ":%" PRId64, pDesc->tid, pDesc->status, startMs, endMs);
} else {
break;
}
Expand Down Expand Up @@ -1326,6 +1328,25 @@ static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBloc
return code;
}

const char* phaseStr = queryPhaseStr(pQuery->execPhase);
char phaseVarStr[16 + VARSTR_HEADER_SIZE];
STR_TO_VARSTR(phaseVarStr, phaseStr);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, curRowIndex, (const char *)phaseVarStr, false);
if (code != 0) {
mError("failed to set current phase since %s", tstrerror(code));
taosRUnLockLatch(&pConn->queryLock);
return code;
}

pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
code = colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->phaseStartTime, false);
if (code != 0) {
mError("failed to set phase start time since %s", tstrerror(code));
taosRUnLockLatch(&pConn->queryLock);
return code;
}

pBlock->info.rows++;
}

Expand Down
2 changes: 2 additions & 0 deletions source/libs/scheduler/src/scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ int32_t schedulerGetTasksStatus(int64_t jobId, SArray *pSub) {
SQuerySubDesc subDesc = {0};
subDesc.tid = pTask->taskId;
TAOS_STRCPY(subDesc.status, jobTaskStatusStr(pTask->status));
subDesc.startTs = pTask->profile.startTs;
subDesc.endTs = pTask->profile.endTs;

if (NULL == taosArrayPush(pSub, &subDesc)) {
qError("taosArrayPush task %d failed, error:0x%x", m, terrno);
Expand Down
2 changes: 1 addition & 1 deletion test/cases/21-MetaData/test_meta_information_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ def ins_columns_check(self):
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")

tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkRows(72)
tdSql.checkRows(74)

def ins_dnodes_check(self):
tdSql.execute('drop database if exists db2')
Expand Down
Loading
Loading