diff --git a/docs/en/08-operation/16-security.md b/docs/en/08-operation/16-security.md index 73e69146ca74..c9b69dd3787e 100644 --- a/docs/en/08-operation/16-security.md +++ b/docs/en/08-operation/16-security.md @@ -99,6 +99,8 @@ Additionally, for an audit database: Audit databases created before version 3.4.0.0 are incompatible with audit features in version 3.4.0.0 and later. In older versions, the `is_audit` parameter was not enforced, so there were no mandatory requirements for `DURATION`, `WAL_LEVEL`, and `ENCRYPT_ALGORITHM`. To enable new audit features for an old audit database, it must be dropped and recreated. As a workaround to access data from a pre-3.4.0.0 audit database in a newer version (without new audit features), you can disable `auditUseToken` by setting it to 0. +In versions 3.4.1.0 and later, audit information can be saved locally rather than sent to taoskeeper. To use this functionality, you need to set the parameter auditSaveInSelf to 1, and the number of vgroups in the audit database created must be limited to one. + ### taosKeeper Configuration Configure the related parameters for audit logs in the taosKeeper configuration file `keeper.toml`, as shown in the table below diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index 85b69744e33f..6a770557a40a 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -243,6 +243,7 @@ The effective value of charset is UTF-8. | auditHttps | After 3.4.0.0 | Supported, effective immediately | Whether to use https to report audit data; Enterprise parameter; range 0 - 1, default value 0 (1: use https, 0: do not use). | | auditUseToken | After 3.4.0.0 | Supported, effective immediately | Whether to use token to report audit data; Enterprise parameter; range 0 - 1, default value 1 (1: use token, 0: do not use). | | auditCreateTable | | Supported, effective immediately | Whether to enable audit feature for creating subtables; Enterprise parameter | +| auditSaveInSelf | After 3.4.1.0 | Supported, effective immediately | Whether to save audit information locally instead of sending it to taoskeeper. Range: 0-1, default: 0 (1: enabled, 0: disabled). | | encryptAlgorithm | | Not supported | Data encryption algorithm; Enterprise parameter | | encryptScope | | Not supported | Encryption scope; Enterprise parameter | | encryptExtDir | v3.4.0.0 | Not supported | User-defined encryption algorithms extensions path; Enterprise parameter | diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 131e21b5ac1e..9bcf11b560e6 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -683,11 +683,12 @@ Below are the business error codes for each module. | Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users | |------------|---------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------| -| 0x80006103 | Audit database must be encrypted | Invalid param,eter | Check and correct the SQL statement | -| 0x80006104 | Audit database wal_level must be 2 | Invalid param,eter | Check and correct the SQL statement | -| 0x80006105 | Audit database keep2 must be greater than 1825d | Invalid param,eter | Check and correct the SQL statement | -| 0x80006106 | Audit database already exist | Invalid param,eter | Check and correct the SQL statement | -| 0x80006107 | Audit database is not allowed to change | Invalid param,eter | Check and correct the SQL statement | +| 0x80006103 | Audit database must be encrypted | Invalid parameter | Check and correct the SQL statement | +| 0x80006104 | Audit database wal_level must be 2 | Invalid parameter | Check and correct the SQL statement | +| 0x80006105 | Audit database keep2 must be greater than 1825d | Invalid parameter | Check and correct the SQL statement | +| 0x80006106 | Audit database already exist | Invalid parameter | Check and correct the SQL statement | +| 0x80006107 | Audit database is not allowed to change | Invalid parameter | Check and correct the SQL statement | +| 0x80006108 | Audit database is not allowed to keep multiple vgroups | Invalid parameter | Check and correct the SQL statement | #### virtual table diff --git a/docs/zh/08-operation/16-security.md b/docs/zh/08-operation/16-security.md index 6e8f524380b5..2fb504523bd1 100644 --- a/docs/zh/08-operation/16-security.md +++ b/docs/zh/08-operation/16-security.md @@ -92,6 +92,8 @@ database_option: { 在 3.4.0.0 之前版本创建的审计库,与 3.4.0.0 及之后版本的审计库不兼容。3.4.0.0 之前版本的审计库无法开启 is_audit 参数,因此不会对 DURATION、WAL_LEVEL、ENCRYPT_ALGORITHM 做强制要求。对于 3.4.0.0 之前创建的审计库,如需使用新版本的审计能力,建议先 drop 该审计库后再重新创建。如果要在 3.4.0.0 之后的版本中继续使用由 3.4.0.0 之前版本创建的审计库,则需要将 auditUseToken 关闭(设置为 0)。 +在 3.4.1.0 之后的版本可以将审计信息保存在自身,而不发送给 taoskeeper,若要使用该功能,需要将参数 auditSaveInSelf 设置为 1,并且在使用该功能时,创建的审计库的 vgroups 的数量只能为 1。 + ### taosKeeper 配置 在 taosKeeper 的配置文件 keeper.toml 中配置与审计日志有关的配置参数,如下表所示 diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index c47be59f5f49..e4dc8c588279 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -1264,6 +1264,17 @@ charset 的有效值是 UTF-8。 - 动态修改:支持通过 SQL 修改,立即生效。 - 支持版本:从 v3.1.0.0 版本开始引入 +#### auditSaveInSelf + +- 说明:审计数据保存在自身,而不发送给 taoskeeper +- 类型:整数;0:关闭,1:开启。 +- 默认值:0 +- 最小值:0 +- 最大值:1 +- 参数类型:全局配置参数 +- 动态修改:支持通过 SQL 修改,立即生效。 +- 支持版本:从 v3.4.1.0 版本开始引入 + #### encryptAlgorithm - 说明:数据加密算法 **`企业版参数`** diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index a3c168bdda31..04d681e5d825 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -670,6 +670,7 @@ TSDB 错误码包括 taosc 客户端和服务端,所有语言的连接器无 | 0x80006105 | Audit database keep2 must be greater than 1825d | 参数不正确 | 检查并修正 SQL 语句 | | 0x80006106 | Audit database already exist | 参数不正确 | 检查并修正 SQL 语句 | | 0x80006107 | Audit database is not allowed to change | 参数不正确 | 检查并修正 SQL 语句 | +| 0x80006108 | Audit database is not allowed to keep multiple vgroups | 参数不正确 | 检查并修正 SQL 语句 | #### virtual table diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 75a7d7711eeb..3451d0f3d26e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -245,6 +245,7 @@ extern int32_t tsAuditLevel; extern int32_t tsAuditInterval; extern bool tsAuditHttps; extern bool tsAuditUseToken; +extern bool tsAuditSaveInSelf; // telem extern bool tsEnableTelem; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d0a9af7a799f..35b250274d74 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2813,6 +2813,8 @@ typedef struct { int64_t timestamp; char auditDB[TSDB_DB_FNAME_LEN]; char auditToken[TSDB_TOKEN_LEN]; + SEpSet auditEpSet; + int32_t auditVgId; } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); @@ -2912,6 +2914,8 @@ typedef struct { int64_t timeWhiteVer; char auditDB[TSDB_DB_FNAME_LEN]; char auditToken[TSDB_TOKEN_LEN]; + SEpSet auditEpSet; + int32_t auditVgId; } SStatusRsp; int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); @@ -4069,6 +4073,13 @@ int32_t tSerializeSVArbSetAssignedLeaderReq(void* buf, int32_t bufLen, SVArbSetA int32_t tDeserializeSVArbSetAssignedLeaderReq(void* buf, int32_t bufLen, SVArbSetAssignedLeaderReq* pReq); void tFreeSVArbSetAssignedLeaderReq(SVArbSetAssignedLeaderReq* pReq); +typedef struct { + char* data; +} SVAuditRecordReq; +int32_t tSerializeSVAuditRecordReq(void* buf, int32_t bufLen, SVAuditRecordReq* pReq); +int32_t tDeserializeSVAuditRecordReq(void* buf, int32_t bufLen, SVAuditRecordReq* pReq); +void tFreeSVAuditRecordReq(SVAuditRecordReq* pReq); + typedef struct { char* arbToken; char* memberToken; diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index e78005ef98e8..02ea17999fb3 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -43,6 +43,7 @@ typedef enum { STREAM_RUNNER_QUEUE, STREAM_READER_QUEUE, STREAM_TRIGGER_QUEUE, + AUDIT_QUEUE, QUEUE_MAX, } EQueueType; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 3c666aea5c2f..31bd5ad4e4eb 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -366,6 +366,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_QUERY_TRIM_PROGRESS, "vnode-query-trim-progress", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SET_KEEP_VERSION, "vnode-set-keep-version", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TRIM_WAL, "vnode-trim-wal", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_AUDIT_RECORD, "vnode-audit-record", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index 36cb8aec9fb9..90ebc02e6ef6 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -30,6 +30,8 @@ extern "C" { #define AUDIT_DETAIL_MAX 65472 +typedef SEpSet (*mndGetDnodeEpsetByIdFn)(void *pMnode, int32_t dnodeId); + typedef struct { const char *server; uint16_t port; @@ -39,12 +41,12 @@ typedef struct { typedef struct { int64_t curTime; char strClusterId[TSDB_CLUSTER_ID_LEN]; - char clientAddress[50]; + char clientAddress[AUDIT_CLIENT_ADD_LEN]; char user[TSDB_USER_LEN]; char operation[AUDIT_OPERATION_LEN]; char target1[TSDB_DB_NAME_LEN]; //put db name char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max - char* detail; + char *detail; double duration; int64_t affectedRows; } SAuditRecord; @@ -53,11 +55,11 @@ int32_t auditInit(const SAuditCfg *pCfg); void auditSetDnodeId(int32_t dnodeId); void auditCleanup(); -void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, +void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, + int32_t len, double duration, int64_t affectedRows); +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len, double duration, int64_t affectedRows); -void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, - int32_t len, double duration, int64_t affectedRows); -void auditSendRecordsInBatch(); +void auditSendRecordsInBatch(); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8dfb06fcc4b4..06b62abf0da1 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1255,6 +1255,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_AUDIT_MUST_KEEPFORCE TAOS_DEF_ERROR_CODE(0, 0x6105) #define TSDB_CODE_AUDIT_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x6106) #define TSDB_CODE_AUDIT_DB_NOT_ALLOW_CHANGE TAOS_DEF_ERROR_CODE(0, 0x6107) +#define TSDB_CODE_AUDIT_DB_NOT_MULTI_VGROUP TAOS_DEF_ERROR_CODE(0, 0x6108) // VTABLE #define TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x6200) diff --git a/include/util/tdef.h b/include/util/tdef.h index 8382524505b1..704ad8bef202 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -815,6 +815,7 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define MONITOR_METRIC_NAME_LEN 100 #define AUDIT_OPERATION_LEN 20 +#define AUDIT_CLIENT_ADD_LEN 256 #define CONNECTOR_INFO_LEN 256 typedef enum { @@ -836,6 +837,8 @@ typedef enum { ANALY_ALGO_TYPE_END = 10, } EAnalyAlgoType; +#define AUDIT_STABLE_NAME "operations" + typedef enum { TSDB_VERSION_UNKNOWN = 0, TSDB_VERSION_OSS, diff --git a/include/util/tjson.h b/include/util/tjson.h index b4cd34f44b57..2dff399b29f4 100644 --- a/include/util/tjson.h +++ b/include/util/tjson.h @@ -72,6 +72,12 @@ void tjsonGetObjectValueBigInt(const SJson* pJson, int64_t* pVal); void tjsonGetObjectValueDouble(const SJson* pJson, double* pVal); int32_t tjsonGetStringValue(const SJson* pJson, const char* pName, char* pVal); int32_t tjsonGetStringValue2(const SJson* pJson, const char* pName, char* pVal, int32_t maxLen); +/** + * Returns a pointer to an internal string buffer owned by the underlying JSON object. + * The returned pointer must not be modified or freed by the caller and remains valid + * only as long as the associated SJson object (and its parent tree) is alive and unchanged. + */ +const char* tjsonGetStringPointer(const SJson* pJson, const char* pName); int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal); int32_t tjsonGetBigIntValue(const SJson* pJson, const char* pName, int64_t* pVal); int32_t tjsonGetIntValue(const SJson* pJson, const char* pName, int32_t* pVal); diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c index f6ee2a1fa050..915d82a0f042 100644 --- a/source/common/src/msg/tmsg.c +++ b/source/common/src/msg/tmsg.c @@ -2105,6 +2105,14 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { TAOS_CHECK_EXIT(tEncodeI64(&encoder, pload->syncTotalIndex)); } + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->auditEpSet.numOfEps)); + for (int32_t i = 0; i < pReq->auditEpSet.numOfEps; ++i) { + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->auditEpSet.eps[i].fqdn)); + TAOS_CHECK_EXIT(tEncodeI16(&encoder, pReq->auditEpSet.eps[i].port)); + // do not need to encode InUse here, because inUse is not accurate at every time + } + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->auditVgId)); + tEndEncode(&encoder); _exit: @@ -2281,6 +2289,20 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { } } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &(pReq->auditEpSet.numOfEps))); + int32_t numEps = pReq->auditEpSet.numOfEps; + if (numEps < 0 || numEps > TSDB_MAX_REPLICA) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + for (int32_t i = 0; i < pReq->auditEpSet.numOfEps; ++i) { + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->auditEpSet.eps[i].fqdn)); + TAOS_CHECK_EXIT(tDecodeI16(&decoder, &(pReq->auditEpSet.eps[i].port))); + } + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &(pReq->auditVgId))); + } + tEndDecode(&decoder); _exit: @@ -2546,6 +2568,14 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->timeWhiteVer)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pRsp->auditDB)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pRsp->auditToken)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRsp->auditEpSet.numOfEps)); + for (int32_t i = 0; i < pRsp->auditEpSet.numOfEps; ++i) { + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pRsp->auditEpSet.eps[i].fqdn)); + TAOS_CHECK_EXIT(tEncodeI16(&encoder, pRsp->auditEpSet.eps[i].port)); + // do not need to encode InUse here, because inUse is not accurate at every time + } + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->auditVgId)); + tEndEncode(&encoder); _exit: @@ -2610,6 +2640,20 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pRsp->auditToken)); } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &(pRsp->auditEpSet.numOfEps))); + int32_t numEps = pRsp->auditEpSet.numOfEps; + if (numEps < 0 || numEps > TSDB_MAX_REPLICA) { + code = TSDB_CODE_INVALID_MSG; + goto _exit; + } + for (int32_t i = 0; i < pRsp->auditEpSet.numOfEps; ++i) { + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pRsp->auditEpSet.eps[i].fqdn)); + TAOS_CHECK_EXIT(tDecodeI16(&decoder, &(pRsp->auditEpSet.eps[i].port))); + } + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &(pRsp->auditVgId))); + } + tEndDecode(&decoder); _exit: tDecoderClear(&decoder); @@ -12239,6 +12283,55 @@ void tFreeSVArbCheckSyncRsp(SVArbCheckSyncRsp *pRsp) { taosMemoryFreeClear(pRsp->member1Token); } +int32_t tSerializeSVAuditRecordReq(void *buf, int32_t bufLen, SVAuditRecordReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino = 0; + int32_t tlen = 0; + + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->data)); + + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSVAuditRecordReq(void *buf, int32_t bufLen, SVAuditRecordReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino = 0; + + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + + TAOS_CHECK_EXIT(tDecodeCStrAlloc(&decoder, &(pReq->data))); + + tEndDecode(&decoder); + +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSVAuditRecordReq(SVAuditRecordReq *pReq) { + if (NULL == pReq) { + return; + } + taosMemoryFreeClear(pReq->data); +} + int32_t tSerializeSVArbSetAssignedLeaderReq(void *buf, int32_t bufLen, SVArbSetAssignedLeaderReq *pReq) { SEncoder encoder = {0}; int32_t code = 0; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3057fc93f767..111491a6b285 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -241,6 +241,7 @@ int32_t tsAuditInterval = 5000; int32_t tsAuditLevel = AUDIT_LEVEL_DATABASE; bool tsAuditHttps = false; bool tsAuditUseToken = true; +bool tsAuditSaveInSelf = false; #else bool tsEnableAudit = false; bool tsEnableAuditCreateTable = false; @@ -251,6 +252,7 @@ int32_t tsAuditInterval = 200000; int32_t tsAuditLevel = AUDIT_LEVEL_NONE; bool tsAuditHttps = false; bool tsAuditUseToken = true; +bool tsAuditSaveInSelf = false; #endif // telem @@ -1041,6 +1043,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditHttps", tsAuditHttps, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditUseToken", tsAuditUseToken, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditSaveInSelf", tsAuditSaveInSelf, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER, CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_SYSTEM)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_SYSTEM)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_SERVER, CFG_DYN_BOTH,CFG_CATEGORY_GLOBAL, CFG_PRIV_SYSTEM)); @@ -1883,6 +1886,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditUseToken"); tsAuditUseToken = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditSaveInSelf"); + tsAuditSaveInSelf = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditInterval"); tsAuditInterval = pItem->i32; #endif @@ -2924,6 +2930,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"auditLevel", &tsAuditLevel}, {"auditHttps", &tsAuditHttps}, {"auditUseToken", &tsAuditUseToken}, + {"auditSaveInSelf", &tsAuditSaveInSelf}, {"slowLogThreshold", &tsSlowLogThreshold}, {"compressMsgSize", &tsCompressMsgSize}, {"compressor", &tsCompressor}, diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index 90d5c7a220dd..a6856d443ec1 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -31,10 +31,10 @@ #endif extern SConfig *tsCfg; -extern void setAuditDbNameToken(char *pDb, char *pToken); +extern void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId); #ifndef TD_ENTERPRISE -void setAuditDbNameToken(char *pDb, char *pToken) {} +void setAuditDbNameToken(char *pDb, char *pToken, SEpSet *ep, int32_t auditVgId) {} #endif extern void getAuditDbNameToken(char *pDb, char *pToken); @@ -43,6 +43,12 @@ extern void getAuditDbNameToken(char *pDb, char *pToken); void getAuditDbNameToken(char *pDb, char *pToken) {} #endif +extern void getAuditEpSet(SEpSet *ep, int32_t *pVgId); + +#ifndef TD_ENTERPRISE +void getAuditEpSet(SEpSet *ep, int32_t *pVgId) {} +#endif + SMonVloadInfo tsVinfo = {0}; SMnodeLoad tsMLoad = {0}; SDnodeData tsDnodeData = {0}; @@ -227,7 +233,7 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); } - setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken); + setAuditDbNameToken(statusRsp.auditDB, statusRsp.auditToken, &(statusRsp.auditEpSet), statusRsp.auditVgId); dmMayShouldUpdateIpWhiteList(pMgmt, statusRsp.ipWhiteVer); dmMayShouldUpdateTimeWhiteList(pMgmt, statusRsp.timeWhiteVer); dmMayShouldUpdateAnalyticsFunc(pMgmt, statusRsp.analVer); @@ -312,6 +318,10 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { getAuditDbNameToken(req.auditDB, req.auditToken); } + if (tsAuditSaveInSelf) { + getAuditEpSet(&req.auditEpSet, &req.auditVgId); + } + int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); if (contLen < 0) { dError("failed to serialize status req since %s", tstrerror(contLen)); diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 5cb3d6356db3..d2a9a4437054 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -39,6 +39,7 @@ typedef struct SMnodeMgmt { SSingleWorker arbWorker; SSingleWorker syncWorker; SSingleWorker syncRdWorker; + SSingleWorker auditWorker; bool stopped; int32_t refCount; TdThreadRwlock lock; @@ -71,6 +72,7 @@ int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc); int32_t mmPutMsgToStreamMgmtQueue(SMnodeMgmt* pMgmt, SRpcMsg* pMsg); int32_t mmPutMsgToStreamReaderQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutMsgToAuditQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mndProcessStreamHb(SRpcMsg *pReq); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index c96823126e61..8a803aac25e8 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -371,6 +371,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_PREP_SNAPSHOT_REPLY, mmPutMsgToSyncRdQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, mmPutMsgToStreamReaderQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_AUDIT_RECORD_RSP, mmPutMsgToAuditQueue, 0) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index ae47245eb9f1..86d6c7278269 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -144,6 +144,10 @@ int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->arbWorker, pMsg); } +int32_t mmPutMsgToAuditQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + return mmPutMsgToWorker(pMgmt, &pMgmt->auditWorker, pMsg); +} + int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg); } @@ -243,6 +247,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { case SYNC_RD_QUEUE: pWorker = &pMgmt->syncRdWorker; break; + case AUDIT_QUEUE: + pWorker = &pMgmt->auditWorker; + break; default: code = TSDB_CODE_INVALID_PARA; } @@ -521,6 +528,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { return code; } + SSingleWorkerCfg auditCfg = { + .min = 1, + .max = 1, + .name = "mnode-audit", + .fp = (FItem)mmProcessRpcMsg, + .param = pMgmt, + }; + if ((code = tSingleWorkerInit(&pMgmt->auditWorker, &auditCfg)) != 0) { + dError("failed to start mnode mnode-audit worker since %s", tstrerror(code)); + return code; + } + SDispatchWorkerPool* pPool = &pMgmt->streamMgmtWorkerPool; pPool->max = tsNumOfMnodeStreamMgmtThreads; pPool->name = "mnode-stream-mgmt"; @@ -561,6 +580,7 @@ void mmStopWorker(SMnodeMgmt *pMgmt) { tDispatchWorkerCleanup(&pMgmt->streamMgmtWorkerPool); tWWorkerFreeQueue(&pMgmt->streamReaderPool, pMgmt->pStreamReaderQ); tWWorkerCleanup(&pMgmt->streamReaderPool); - + tSingleWorkerCleanup(&pMgmt->auditWorker); + dDebug("mnode workers are closed"); } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 95d72b04bf91..0f932cc095d0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1963,6 +1963,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_FETCH, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRIGGER_PULL, vmPutMsgToStreamReaderQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_AUDIT_RECORD, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + code = 0; _OVER: diff --git a/source/dnode/mnode/impl/inc/mndStb.h b/source/dnode/mnode/impl/inc/mndStb.h index bd3e4b8549a3..07479bf6ec0f 100644 --- a/source/dnode/mnode/impl/inc/mndStb.h +++ b/source/dnode/mnode/impl/inc/mndStb.h @@ -52,6 +52,7 @@ void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, in int32_t alterOriDataLen); int32_t mndSetForceDropCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, SStbObj *pStb); +int32_t mndCreateAuditStb(SMnode *pMnode, SDbObj *pDb, SUserObj *pOperUser, STrans *pTrans, SVgObj *pVgroup); #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 4abc365db78e..80f9dafc98f4 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1132,6 +1132,13 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, } } + if (dbObj.cfg.isAudit) { + if (dbObj.cfg.numOfVgroups > 1) { + code = TSDB_CODE_AUDIT_DB_NOT_MULTI_VGROUP; + TAOS_CHECK_GOTO(code, NULL, _OVER); + } + } + mndTransSetOper(pTrans, MND_OPER_CREATE_DB); TAOS_CHECK_GOTO(mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj), NULL, _OVER); TAOS_CHECK_GOTO(mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups), NULL, _OVER); @@ -1139,6 +1146,13 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, TAOS_CHECK_GOTO(mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups), NULL, _OVER); TAOS_CHECK_GOTO(mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped, auditOwnedDbs), NULL, _OVER); TAOS_CHECK_GOTO(mndSetCreateDbUndoActions(pMnode, pTrans, &dbObj, pVgroups), NULL, _OVER); + + if (dbObj.cfg.isAudit) { + if (tsAuditSaveInSelf) { + // when create audit database, pVgroups num always is 1 + TAOS_CHECK_GOTO(mndCreateAuditStb(pMnode, &dbObj, pUser, pTrans, pVgroups), NULL, _OVER); + } + } TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); _OVER: diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index eae2d06f4e14..9444a65cbe51 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -780,15 +780,16 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { SStatusReq statusReq = {0}; SDnodeObj *pDnode = NULL; int32_t code = -1; + int32_t lino = 0; - TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), NULL, _OVER); + TAOS_CHECK_GOTO(tDeserializeSStatusReq(pReq->pCont, pReq->contLen, &statusReq), &lino, _OVER); int64_t clusterid = mndGetClusterId(pMnode); if (statusReq.clusterId != 0 && statusReq.clusterId != clusterid) { code = TSDB_CODE_MND_DNODE_DIFF_CLUSTER; mWarn("dnode:%d, %s, its clusterid:%" PRId64 " differ from current clusterid:%" PRId64 ", code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, statusReq.clusterId, clusterid, code); - goto _OVER; + TAOS_CHECK_GOTO(code, &lino, _OVER); } if (statusReq.dnodeId == 0) { @@ -797,7 +798,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mInfo("dnode:%s, not created yet", statusReq.dnodeEp); code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; - goto _OVER; + TAOS_CHECK_GOTO(code, &lino, _OVER); } } else { pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId); @@ -806,14 +807,14 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; - terrno = err; - goto _OVER; + code = err; + TAOS_CHECK_GOTO(code, &lino, _OVER); } mWarn("dnode:%d, %s not exist, code:0x%x", statusReq.dnodeId, statusReq.dnodeEp, err); if (err == TSDB_CODE_MND_DNODE_NOT_EXIST) { - terrno = err; - goto _OVER; + code = err; + TAOS_CHECK_GOTO(code, &lino, _OVER); } else { pDnode = mndAcquireDnodeAllStatusByEp(pMnode, statusReq.dnodeEp); if (pDnode == NULL) goto _OVER; @@ -836,17 +837,19 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { bool analVerChanged = (analVer != statusReq.analVer); bool auditDBChanged = false; char auditDB[TSDB_DB_FNAME_LEN] = {0}; - bool auditTokenChanged = false; + bool auditInfoChanged = false; char auditToken[TSDB_TOKEN_LEN] = {0}; + SDbObj *pDb = NULL; + if (tsAuditUseToken || tsAuditSaveInSelf) { + pDb = mndAcquireAuditDb(pMnode); + } if (tsAuditUseToken) { - SDbObj *pDb = mndAcquireAuditDb(pMnode); if (pDb != NULL) { SName name = {0}; if (tNameFromString(&name, pDb->name, T_NAME_ACCT | T_NAME_DB) < 0) mError("db:%s, failed to parse db name", pDb->name); tstrncpy(auditDB, name.dbname, TSDB_DB_FNAME_LEN); - mndReleaseDb(pMnode, pDb); } if (strncmp(statusReq.auditDB, auditDB, TSDB_DB_FNAME_LEN) != 0) auditDBChanged = true; @@ -858,18 +861,66 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mTrace("dnode:%d, get audit user:%s", pDnode->id, auditUser); int32_t ret = 0; if ((ret = mndGetUserActiveToken("audit", auditToken)) != 0) { - mTrace("dnode:%d, failed to get audit user active token, token:%s, since %s", pDnode->id, auditToken, - tstrerror(ret)); + mTrace("dnode:%d, failed to get audit user active token, token:xxxx, since %s", pDnode->id, tstrerror(ret)); } else { - mTrace("dnode:%d, get audit user active token:%s", pDnode->id, auditToken); - if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditTokenChanged = true; + mTrace("dnode:%d, get audit user active token:xxxx", pDnode->id); + if (strncmp(statusReq.auditToken, auditToken, TSDB_TOKEN_LEN) != 0) auditInfoChanged = true; + } + } + } + + SEpSet auditVnodeEpSet = {0}; + int32_t auditVgId = 0; + if (tsAuditSaveInSelf) { + if (pDb != NULL) { + void *pIter = NULL; + SVgObj *pVgroup = NULL; + while (1) { + pIter = sdbFetch(pMnode->pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (mndVgroupInDb(pVgroup, pDb->uid)) { + auditVnodeEpSet = mndGetVgroupEpset(pMnode, pVgroup); + auditVgId = pVgroup->vgId; + sdbCancelFetch(pMnode->pSdb, pIter); + sdbRelease(pMnode->pSdb, pVgroup); + break; + } + sdbRelease(pMnode->pSdb, pVgroup); + } + } + + if (auditVnodeEpSet.numOfEps != statusReq.auditEpSet.numOfEps) { + auditInfoChanged = true; + mTrace("dnode:%d, audit epset num changed, auditNum:%d, inReq:%d", pDnode->id, auditVnodeEpSet.numOfEps, + statusReq.auditEpSet.numOfEps); + } else { + for (int32_t i = 0; i < auditVnodeEpSet.numOfEps; i++) { + if (strncmp(auditVnodeEpSet.eps[i].fqdn, statusReq.auditEpSet.eps[i].fqdn, TSDB_FQDN_LEN) != 0 || + auditVnodeEpSet.eps[i].port != statusReq.auditEpSet.eps[i].port) { + // do not need to check InUse here, because inUse is not accurate at every time + auditInfoChanged = true; + mTrace("dnode:%d, audit epset changed at item:%d, fqdn:%s:%d:, inReq:%s:%d", pDnode->id, i, + auditVnodeEpSet.eps[i].fqdn, auditVnodeEpSet.eps[i].port, statusReq.auditEpSet.eps[i].fqdn, + statusReq.auditEpSet.eps[i].port); + break; + } } } - } + + if (auditVgId != statusReq.auditVgId) { + auditInfoChanged = true; + mTrace("dnode:%d, audit vgId changed, auditVgId:%d, inReq:%d", pDnode->id, auditVgId, statusReq.auditVgId); + } + } + + if (pDb != NULL) { + mndReleaseDb(pMnode, pDb); + } bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged || pMnode->ipWhiteVer != statusReq.ipWhiteVer || pMnode->timeWhiteVer != statusReq.timeWhiteVer || - encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditTokenChanged; + encryptKeyChanged || enableWhiteListChanged || auditDBChanged || auditInfoChanged; const STraceId *trace = &pReq->info.traceId; char timestamp[TD_TIME_STR_LEN] = {0}; if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI); @@ -890,7 +941,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->offlineReason = DND_REASON_TIME_UNSYNC; mError("dnode:%d, not sync with cluster:%"PRId64" since %s, limit %"PRId64"s", statusReq.dnodeId, pMnode->clusterId, tstrerror(code), tsTimestampDeltaLimit); - goto _OVER; + TAOS_CHECK_GOTO(code, &lino, _OVER); } for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) { SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); @@ -955,8 +1006,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; } mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, tsVersion); - terrno = TSDB_CODE_VERSION_NOT_COMPATIBLE; - goto _OVER; + code = TSDB_CODE_VERSION_NOT_COMPATIBLE; + TAOS_CHECK_GOTO(code, &lino, _OVER); } if (statusReq.dnodeId == 0) { @@ -968,8 +1019,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { } mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId, pMnode->clusterId); - terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; - goto _OVER; + code = TSDB_CODE_MND_INVALID_CLUSTER_ID; + TAOS_CHECK_GOTO(code, &lino, _OVER); } } @@ -987,10 +1038,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { } else { mInfo("dnode:%d, do check in status req, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d, dnodeChanged:%d supportVnodesChanged:%d analVerChanged:%d encryptKeyChanged:%d " - "enableWhiteListChanged:%d auditDBChanged:%d auditTokenChanged:%d pMnode->ipWhiteVer:%" PRId64 + "enableWhiteListChanged:%d auditDBChanged:%d auditInfoChanged:%d pMnode->ipWhiteVer:%" PRId64 " statusReq.ipWhiteVer:%" PRId64 " pMnode->timeWhiteVer:%" PRId64 " statusReq.timeWhiteVer:%" PRId64, pDnode->id, online, statusReq.dnodeVer, dnodeVer, reboot, dnodeChanged, supportVnodesChanged, - analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditTokenChanged, + analVerChanged, encryptKeyChanged, enableWhiteListChanged, auditDBChanged, auditInfoChanged, pMnode->ipWhiteVer, statusReq.ipWhiteVer, pMnode->timeWhiteVer, statusReq.timeWhiteVer); } @@ -1004,8 +1055,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->encryptionKeyChksum = statusReq.clusterCfg.encryptionKeyChksum; if (memcmp(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN) != 0) { tstrncpy(pDnode->machineId, statusReq.machineId, TSDB_MACHINE_ID_LEN + 1); - if ((terrno = mndUpdateDnodeObj(pMnode, pDnode)) != 0) { - goto _OVER; + if ((code = mndUpdateDnodeObj(pMnode, pDnode)) != 0) { + TAOS_CHECK_GOTO(code, &lino, _OVER); } } @@ -1017,21 +1068,31 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { statusRsp.dnodeCfg.clusterId = pMnode->clusterId; statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); if (statusRsp.pDnodeEps == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _OVER; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_CHECK_GOTO(code, &lino, _OVER); } mndGetDnodeEps(pMnode, statusRsp.pDnodeEps); statusRsp.ipWhiteVer = pMnode->ipWhiteVer; statusRsp.timeWhiteVer = pMnode->timeWhiteVer; - if (auditDB[0] != '\0') { - mInfo("dnode:%d, set audit db %s in process status rsp", statusReq.dnodeId, auditDB); - tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN); - } - if (auditToken[0] != '\0') { - mInfo("dnode:%d, set audit token %s in process status rsp", statusReq.dnodeId, auditToken); - tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN); + if (auditInfoChanged || auditDBChanged) { + if (tsAuditUseToken) { + if (auditDB[0] != '\0') { + mInfo("dnode:%d, set audit db:%s in process status rsp", statusReq.dnodeId, auditDB); + tstrncpy(statusRsp.auditDB, auditDB, TSDB_DB_FNAME_LEN); + } + if (auditToken[0] != '\0') { + mInfo("dnode:%d, set audit token:xxxx in process status rsp", statusReq.dnodeId); + tstrncpy(statusRsp.auditToken, auditToken, TSDB_TOKEN_LEN); + } + } + + if (tsAuditSaveInSelf) { + mInfo("dnode:%d, set audit epset and vgId:%d in process status rsp", statusReq.dnodeId, auditVgId); + statusRsp.auditEpSet = auditVnodeEpSet; + statusRsp.auditVgId = auditVgId; + } } int32_t contLen = tSerializeSStatusRsp(NULL, 0, &statusRsp); @@ -1040,7 +1101,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { taosArrayDestroy(statusRsp.pDnodeEps); if (contLen < 0) { code = contLen; - goto _OVER; + TAOS_CHECK_GOTO(code, &lino, _OVER); } pReq->info.rspLen = contLen; @@ -1058,7 +1119,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseDnode(pMnode, pDnode); taosArrayDestroy(statusReq.pVloads); if (code != 0) { - mError("dnode:%d, failed to process status req since %s", statusReq.dnodeId, tstrerror(code)); + mError("dnode:%d, failed to process status req at line:%d since %s", statusReq.dnodeId, lino, tstrerror(code)); return code; } diff --git a/source/dnode/mnode/impl/src/mndIndex.c b/source/dnode/mnode/impl/src/mndIndex.c index 50f2c2e5217b..4ac1075959f7 100644 --- a/source/dnode/mnode/impl/src/mndIndex.c +++ b/source/dnode/mnode/impl/src/mndIndex.c @@ -370,6 +370,7 @@ int32_t mndSetCreateIdxCommitLogs(SMnode *pMnode, STrans *pTrans, SIdxObj *pIdx) if (terrno != 0) code = terrno; return -1; } + mInfo("trans:%d, add create idx to commit log", pTrans->id); TAOS_CHECK_RETURN(mndTransAppendCommitlog(pTrans, pCommitRaw)); TAOS_CHECK_RETURN(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY)); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 371728574564..a948ecde3dd6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -67,6 +67,7 @@ static int32_t mndProcessDropIndexReq(SRpcMsg *pReq); static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq); static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq); static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq); +static int32_t mndProcessAuditRecordRsp(SRpcMsg *pRsp); int32_t mndInitStb(SMnode *pMnode) { SSdbTable table = { @@ -103,6 +104,7 @@ int32_t mndInitStb(SMnode *pMnode) { // mndSetMsgHandle(pMnode, TDMT_MND_DROP_INDEX, mndProcessDropIndexReq); // mndSetMsgHandle(pMnode, TDMT_VND_CREATE_INDEX_RSP, mndTransProcessRsp); // mndSetMsgHandle(pMnode, TDMT_VND_DROP_INDEX_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_AUDIT_RECORD_RSP, mndProcessAuditRecordRsp); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb); @@ -757,6 +759,7 @@ int32_t mndSetCreateStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, S if (terrno != 0) code = terrno; TAOS_RETURN(code); } + mInfo("trans:%d, add stb to commit log", pTrans->id); if ((code = mndTransAppendCommitlog(pTrans, pCommitRaw)) != 0) { sdbFreeRaw(pCommitRaw); TAOS_RETURN(code); @@ -798,6 +801,7 @@ static int32_t mndSetCreateStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.msgType = TDMT_VND_CREATE_STB; action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST; action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST; + mInfo("trans:%d, add create stb to redo action", pTrans->id); if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); sdbCancelFetch(pSdb, pIter); @@ -868,6 +872,7 @@ static int32_t mndSetCreateStbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.contLen = contLen; action.msgType = TDMT_VND_DROP_STB; action.acceptableCode = TSDB_CODE_TDB_STB_NOT_EXIST; + mInfo("trans:%d, add drop stb to undo action", pTrans->id); if ((code = mndTransAppendUndoAction(pTrans, &action)) != 0) { taosMemoryFree(pReq); sdbCancelFetch(pSdb, pIter); @@ -1038,6 +1043,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea SSchema *pSchema = &(stbObj.pTags[0]); if (mndGenIdxNameForFirstTag(fullIdxName, pDb->name, stbObj.name, pSchema->name) < 0) { + code = terrno; goto _OVER; } SSIdx idx = {0}; @@ -1071,6 +1077,219 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea TAOS_RETURN(code); } +typedef struct { + const char *name; + uint8_t type; + int32_t bytes; + uint32_t alg; +} AuditColumnDef; + +// column is consistent with vnodePrepareRow process in vnodeSvr.c +static const AuditColumnDef audit_columns[] = { + {"ts", TSDB_DATA_TYPE_TIMESTAMP, 8, 0x2000102}, + {"details", TSDB_DATA_TYPE_VARCHAR, 50000 + VARSTR_HEADER_SIZE, 0xFF000302}, + {"user_name", TSDB_DATA_TYPE_VARCHAR, 25 + VARSTR_HEADER_SIZE, 0xFF000302}, + {"operation", TSDB_DATA_TYPE_VARCHAR, 20 + VARSTR_HEADER_SIZE, 0xFF000302}, + {"db", TSDB_DATA_TYPE_VARCHAR, TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE, 0xFF000302}, + {"resource", TSDB_DATA_TYPE_VARCHAR, TSDB_STREAM_NAME_LEN + VARSTR_HEADER_SIZE, 0xFF000302}, + {"client_address", TSDB_DATA_TYPE_VARCHAR, AUDIT_CLIENT_ADD_LEN + VARSTR_HEADER_SIZE, 0xFF000302}, + {"duration", TSDB_DATA_TYPE_DOUBLE, 8, 0x5000102}, + {"affected_rows", TSDB_DATA_TYPE_UBIGINT, 8, 0x1000102}}; + +static int32_t mndBuildAuditStb(SMnode *pMnode, SStbObj *pDst, SDbObj *pDb) { + int32_t code = 0; + char *name = AUDIT_STABLE_NAME; + (void)tsnprintf(pDst->name, TSDB_TABLE_FNAME_LEN, "%s.%s", pDb->name, name); + memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN); + pDst->createdTime = taosGetTimestampMs(); + pDst->updateTime = pDst->createdTime; + pDst->uid = mndGenerateUid(pDst->name, strlen(pDst->name)); + pDst->dbUid = pDb->uid; + pDst->tagVer = 1; + pDst->colVer = 1; + pDst->smaVer = 1; + pDst->nextColId = 1; + pDst->maxdelay[0] = -1; + pDst->maxdelay[1] = -1; + pDst->watermark[0] = 5000; + pDst->watermark[1] = 5000; + pDst->ttl = 0; + pDst->keep = -1; + pDst->source = 0; + pDst->virtualStb = 0; + pDst->numOfColumns = sizeof(audit_columns) / sizeof(AuditColumnDef); + pDst->numOfTags = 1; + pDst->numOfFuncs = 0; + pDst->commentLen = -1; + pDst->pFuncs = NULL; + + pDst->ast1Len = 0; + pDst->ast2Len = 0; + + pDst->pColumns = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SSchema)); + pDst->pTags = taosMemoryCalloc(1, pDst->numOfTags * sizeof(SSchema)); + pDst->pCmpr = taosMemoryCalloc(1, pDst->numOfColumns * sizeof(SCmprObj)); + if (pDst->pColumns == NULL || pDst->pTags == NULL || pDst->pCmpr == NULL) { + code = terrno; + TAOS_RETURN(code); + } + + if (pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags) { + code = TSDB_CODE_OUT_OF_RANGE; + TAOS_RETURN(code); + } + + SSchema *pSchema = NULL; + for (int32_t i = 0; i < sizeof(audit_columns) / sizeof(AuditColumnDef); ++i) { + pSchema = &pDst->pColumns[pDst->nextColId - 1]; + pSchema->type = audit_columns[i].type; + pSchema->bytes = audit_columns[i].bytes; + pSchema->flags = 1; + tstrncpy(pSchema->name, audit_columns[i].name, TSDB_COL_NAME_LEN); + pSchema->colId = pDst->nextColId; + // hasTypeMods = hasTypeMods || HAS_TYPE_MOD(pSchema); + SColCmpr *pColCmpr = &pDst->pCmpr[pDst->nextColId - 1]; + pColCmpr->id = pSchema->colId; + pColCmpr->alg = audit_columns[i].alg; + pDst->nextColId++; + } + + // tag + pSchema = &pDst->pTags[0]; + pSchema->type = TSDB_DATA_TYPE_VARCHAR; + pSchema->bytes = 64 + VARSTR_HEADER_SIZE; + SSCHMEA_SET_IDX_ON(pSchema); + tstrncpy(pSchema->name, "cluster_id", TSDB_COL_NAME_LEN); + pSchema->colId = pDst->nextColId; + pDst->nextColId++; + + /* + if (hasTypeMods) { + pDst->pExtSchemas = taosMemoryCalloc(pDst->numOfColumns, sizeof(SExtSchema)); + if (!pDst->pExtSchemas) { + code = terrno; + TAOS_RETURN(code); + } + for (int32_t i = 0; i < pDst->numOfColumns; ++i) { + SFieldWithOptions *pField = taosArrayGet(pCreate->pColumns, i); + pDst->pExtSchemas[i].typeMod = pField->typeMod; + } + } + */ + TAOS_RETURN(code); +} + +static int32_t mndSetCreateAuditStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, + SVgObj *pVgroup) { + int32_t code = 0; + SSdb *pSdb = pMnode->pSdb; + + int32_t contLen; + + if (pVgroup == NULL) { + code = TSDB_CODE_INVALID_PARA; + TAOS_RETURN(code); + } + + void *pReq = mndBuildVCreateStbReq(pMnode, pVgroup, pStb, &contLen, NULL, 0); + if (pReq == NULL) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } + + STransAction action = {0}; + action.mTraceId = pTrans->mTraceId; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_CREATE_STB; + action.acceptableCode = TSDB_CODE_TDB_STB_ALREADY_EXIST; + action.retryCode = TSDB_CODE_TDB_STB_NOT_EXIST; + mInfo("trans:%d, add create stb to redo action", pTrans->id); + if ((code = mndTransAppendRedoAction(pTrans, &action)) != 0) { + taosMemoryFree(pReq); + TAOS_RETURN(code); + } + + TAOS_RETURN(code); +} + +// Note: pVgroup is expected to point to the first element of a vgroup array (e.g. pVgroups at the call site). +// Only the first element is used for creating the audit super table. +int32_t mndCreateAuditStb(SMnode *pMnode, SDbObj *pDb, SUserObj *pOperUser, STrans *pTrans, SVgObj *pVgroup) { + SStbObj stbObj = {0}; + int32_t code = -1; + + char fullIdxName[TSDB_INDEX_FNAME_LEN * 2] = {0}; + + TAOS_CHECK_GOTO(mndBuildAuditStb(pMnode, &stbObj, pDb), NULL, _OVER); + memcpy(stbObj.createUser, pOperUser->name, TSDB_USER_LEN); + stbObj.ownerId = pOperUser->uid; + + SSchema *pSchema = &(stbObj.pTags[0]); + if (mndGenIdxNameForFirstTag(fullIdxName, pDb->name, stbObj.name, pSchema->name) < 0) { + code = terrno; + goto _OVER; + } + + SSIdx idx = {0}; + if (mndAcquireGlobalIdx(pMnode, fullIdxName, SDB_IDX, &idx) == 0 && idx.pIdx != NULL) { + code = TSDB_CODE_MND_TAG_INDEX_ALREADY_EXIST; + mndReleaseIdx(pMnode, idx.pIdx); + goto _OVER; + } + + SIdxObj idxObj = {0}; + memcpy(idxObj.name, fullIdxName, TSDB_INDEX_FNAME_LEN); + memcpy(idxObj.stb, stbObj.name, TSDB_TABLE_FNAME_LEN); + memcpy(idxObj.db, stbObj.db, TSDB_DB_FNAME_LEN); + memcpy(idxObj.colName, pSchema->name, TSDB_COL_NAME_LEN); + memcpy(idxObj.createUser, pOperUser->name, TSDB_USER_LEN); + idxObj.ownerId = pOperUser->uid; + idxObj.createdTime = taosGetTimestampMs(); + idxObj.uid = mndGenerateUid(fullIdxName, strlen(fullIdxName)); + idxObj.stbUid = stbObj.uid; + idxObj.dbUid = stbObj.dbUid; + + mndTransSetDbName(pTrans, pDb->name, stbObj.name); + TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); + TAOS_CHECK_GOTO(mndSetCreateIdxCommitLogs(pMnode, pTrans, &idxObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, &stbObj), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetCreateAuditStbRedoActions(pMnode, pTrans, pDb, &stbObj, pVgroup), NULL, _OVER); + + code = 0; + +_OVER: + if (mndStbActionDelete(pMnode->pSdb, &stbObj) != 0) mError("failed to mndStbActionDelete"); + TAOS_RETURN(code); +} + +static int32_t mndProcessAuditRecordRsp(SRpcMsg *pRsp) { + int32_t code = 0; + + if (pRsp == NULL) { + mError("audit record rsp, null response message"); + return -1; + } + + if (pRsp->code != 0) { + mError("audit record rsp failed, code:%d", pRsp->code); + return pRsp->code; + } + + SMnode *pMnode = pRsp->info.node; + SSdb *pSdb = pMnode->pSdb; + (void)pMnode; // currently unused, kept for potential future use + (void)pSdb; // currently unused, kept for potential future use + + mDebug("audit record rsp succeeded, code:%d", pRsp->code); + + // no need to implement this rsp, since we do not care about the result of audit record insertion + + return code; +} + int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { mndTransSetDbName(pTrans, pDb->name, pStb->name); TAOS_CHECK_RETURN(mndTransCheckConflict(pMnode, pTrans)); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 294917155d53..36673cc4bf66 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1201,7 +1201,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (pNew->conflict == TRN_CONFLICT_NOTHING) return conflict; int32_t size = sdbGetSize(pMnode->pSdb, SDB_TRANS); - mInfo("trans:%d, trans hash size %d", pNew->id, size); + mDebug("trans:%d, trans hash size %d", pNew->id, size); while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index c6e529943bc4..221c65516ed3 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -33,6 +33,8 @@ #include "mndToken.h" #include "tbase64.h" #include "totp.h" +#include "mndDnode.h" +#include "mndVgroup.h" // clang-format on @@ -3461,6 +3463,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) { int64_t tse = taosGetTimestampMs(); double duration = (double)(tse - tss); duration = duration / 1000; + auditRecord(pReq, pMnode->clusterId, operation, "", createReq.user, detail, strlen(detail), duration, 0); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index db64a156e095..f37cfb68de9e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -44,6 +44,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in SRpcMsg *pOriginRpc); static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, SRpcMsg *pOriginalMsg); +static int32_t vnodeProcessAuditRecordReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pOriginalMsg); static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterConfigReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -657,6 +658,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { code = vnodePreProcessSsMigrateFileSetReq(pVnode, pMsg); } break; #endif + case TDMT_VND_AUDIT_RECORD: { + vTrace("vgId:%d, pre process TDMT_VND_AUDIT_RECORD", TD_VID(pVnode)); + } break; default: break; } @@ -982,6 +986,17 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_ARB_CHECK_SYNC: vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp); break; + case TDMT_VND_AUDIT_RECORD: + vTrace("vgId:%d, processed audit msg", TD_VID(pVnode)); + code = vnodeProcessAuditRecordReq(pVnode, ver, pReq, len, pMsg); + pRsp->code = code; + pRsp->msgType = TDMT_VND_AUDIT_RECORD_RSP; + pRsp->pCont = NULL; + pRsp->contLen = 0; + if (code) { + goto _err; + } + break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return TSDB_CODE_INVALID_MSG; @@ -2661,6 +2676,524 @@ static int32_t vnodeScanSubmitReq(SVnode *pVnode, int64_t version, SSubmitReq2 * return code; } +static int32_t vnodeDecodeAuditRecord(const SJson *pJson, void *pObj) { + SAuditRecord *record = (SAuditRecord *)pObj; + int32_t code = 0; + + tjsonGetNumberValue(pJson, "timestamp", record->curTime, code); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetStringValue2(pJson, "cluster_id", record->strClusterId, TSDB_CLUSTER_ID_LEN); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetStringValue2(pJson, "client_add", record->clientAddress, AUDIT_CLIENT_ADD_LEN); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetStringValue2(pJson, "user", record->user, TSDB_USER_LEN); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetStringValue2(pJson, "operation", record->operation, AUDIT_OPERATION_LEN); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetStringValue2(pJson, "db", record->target1, TSDB_DB_NAME_LEN); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetStringValue2(pJson, "resource", record->target2, TSDB_STREAM_NAME_LEN); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + record->detail = (char *)tjsonGetStringPointer(pJson, "details"); + if (record->detail == NULL) return TSDB_CODE_INVALID_JSON_FORMAT; + code = tjsonGetDoubleValue(pJson, "duration", &record->duration); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + tjsonGetNumberValue(pJson, "affected_rows", record->affectedRows, code); + if (code < 0) return TSDB_CODE_INVALID_JSON_FORMAT; + + return 0; +} + +static int32_t vnodeBuildCreateTbReq(SVCreateTbReq *pTbReq, const char *tname, STag *pTag, int64_t suid, + const char *sname, SArray *tagName, uint8_t tagNum, int32_t ttl) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + + if (tname == NULL || sname == NULL) { + return TSDB_CODE_INVALID_PARA; + } + + pTbReq->name = taosStrdup(tname); + if (!pTbReq->name) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + pTbReq->ctb.stbName = taosStrdup(sname); + if (!pTbReq->ctb.stbName) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + pTbReq->type = TD_CHILD_TABLE; + pTbReq->ctb.suid = suid; + pTbReq->ctb.tagNum = tagNum; + pTbReq->ttl = ttl; + pTbReq->commentLen = -1; + pTbReq->ctb.tagName = tagName; + pTbReq->ctb.pTag = (uint8_t *)pTag; + + return code; +_exit: + if (code != 0) { + if (pTbReq->name != NULL) { + taosMemoryFreeClear(pTbReq->name); + } + if (pTbReq->ctb.stbName != NULL) { + taosMemoryFreeClear(pTbReq->ctb.stbName); + } + vError("failed to build create tb at %d since %s", lino, tstrerror(code)); + } + return code; +} + +static int32_t vnodePrepareCreateTb(SVCreateTbReq *pTbReq, char *tbName, int64_t suid, SSchemaWrapper *pTagSchema, + SAuditRecord *record) { + int32_t code = 0; + int32_t lino = 0; + + SArray *TagNames = NULL; + SArray *pTagVals = NULL; + STag *pTag = NULL; + + SSchema *tSchema = &pTagSchema->pSchema[0]; + vTrace("schema name:%s", tSchema->name); + + TagNames = taosArrayInit(1, TSDB_COL_NAME_LEN); + if (!TagNames) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + if (NULL == taosArrayPush(TagNames, tSchema->name)) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + STagVal tv = (STagVal){.cid = tSchema->colId, + .type = tSchema->type, + .nData = strlen(record->strClusterId), + .pData = record->strClusterId}; // address copy, no value + if ((pTagVals = taosArrayInit(1, sizeof(STagVal))) == NULL) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + if (NULL == taosArrayPush(pTagVals, &tv)) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + // version hardcode to 1 according to other module + code = tTagNew(pTagVals, 1, false, &pTag); + if (code != TSDB_CODE_SUCCESS) { + vError("failed to create tag, error:%s", tstrerror(code)); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + code = vnodeBuildCreateTbReq(pTbReq, tbName, pTag, suid, AUDIT_STABLE_NAME, TagNames, pTagSchema->nCols, + TSDB_DEFAULT_TABLE_TTL); +_exit: + if (code != 0) { + vError("failed to prepare create tb at %d since %s", lino, tstrerror(code)); + if (TagNames != NULL) { + taosArrayDestroy(TagNames); + TagNames = NULL; + pTbReq->ctb.tagName = NULL; + } + if (pTag != NULL) { + tTagFree(pTag); + pTag = NULL; + pTbReq->ctb.pTag = NULL; + } + } + + if (pTagVals != NULL) { + taosArrayDestroy(pTagVals); + pTagVals = NULL; + } + + return code; +} + +static SArray *vnodePrepareRow(SVnode *pVnode, STSchema *pSchema, SAuditRecord *record) { + int32_t code = 0; + int32_t lino = 0; + + SArray *aRows = NULL; + SArray *pVals = NULL; + SRow *pRow = NULL; + + if (!(aRows = taosArrayInit(1, sizeof(SRow *)))) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + if ((pVals = taosArrayInit(1, sizeof(SColVal))) == NULL) { + code = terrno; + goto _exit; + } + + for (int32_t k = 0; k < pSchema->numOfCols; ++k) { + const STColumn *pCol = &pSchema->columns[k]; + vTrace("vgId:%d, schema column id:%d, type:%d", TD_VID(pVnode), pCol->colId, pCol->type); + + // colId is consistent with audit_columns in mndStb.c + void *data = NULL; + if (pCol->colId == 1) { + data = &record->curTime; + } else if (pCol->colId == 2) { + data = record->detail; + } else if (pCol->colId == 3) { + data = record->user; + } else if (pCol->colId == 4) { + data = record->operation; + } else if (pCol->colId == 5) { + data = record->target1; + } else if (pCol->colId == 6) { + data = record->target2; + } else if (pCol->colId == 7) { + data = record->clientAddress; + } else if (pCol->colId == 8) { + data = &record->duration; + } else if (pCol->colId == 9) { + data = &record->affectedRows; + } else { + vError("the column id %" PRIi16 " is not defined in audit record table", pCol->colId); + code = TSDB_CODE_APP_ERROR; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + switch (pCol->type) { + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_VARCHAR: { + // if (colDataIsNull_s(pColInfoData, j)) { + // SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); + // if (NULL == taosArrayPush(pVals, &cv)) { + // goto _end; + // } + // } else { + char *str = data; + int32_t len = strlen(str); + vTrace("set svalue id:%d %s, len:%d, bytes:%d", pCol->colId, str, len, pCol->bytes); + if (len > pCol->bytes - VARSTR_HEADER_SIZE) { + vWarn("vgId:%d, audit record string field with colId %d is too long, truncated from %d to %d bytes", + TD_VID(pVnode), pCol->colId, len, (int32_t)(pCol->bytes - VARSTR_HEADER_SIZE)); + len = pCol->bytes - VARSTR_HEADER_SIZE; + } + SValue sv = {0}; + sv.type = pCol->type; + sv.nData = len; + sv.pData = str; + SColVal cv = COL_VAL_VALUE(pCol->colId, sv); + if (NULL == taosArrayPush(pVals, &cv)) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + //} + break; + } + case TSDB_DATA_TYPE_BLOB: + case TSDB_DATA_TYPE_MEDIUMBLOB: + case TSDB_DATA_TYPE_JSON: { + vError("the column type %" PRIi16 " is defined but not implemented yet", pCol->type); + code = TSDB_CODE_APP_ERROR; + TAOS_CHECK_GOTO(code, &lino, _exit); + break; + } + default: { + /* + if (colDataIsNull_s(pColInfoData, j)) { + if (PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId) { + qError("Primary timestamp column should not be null"); + terrno = TSDB_CODE_PAR_INCORRECT_TIMESTAMP_VAL; + goto _end; + } + + SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); // should use pCol->type + if (NULL == taosArrayPush(pVals, &cv)) { + goto _end; + } + } else { + */ + SValue sv = {0}; + sv.type = pCol->type; + valueSetDatum(&sv, sv.type, data, pCol->bytes); + SColVal cv = COL_VAL_VALUE(pCol->colId, sv); + if (NULL == taosArrayPush(pVals, &cv)) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + //} + break; + } + } + } + + SRowBuildScanInfo sinfo = {0}; + if ((code = tRowBuild(pVals, pSchema, &pRow, &sinfo)) < 0) { + TAOS_CHECK_GOTO(code, &lino, _exit); + } + if (NULL == taosArrayPush(aRows, &pRow)) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + taosArrayDestroy(pVals); + pVals = NULL; + +_exit: + if (code != 0) { + vError("vgId:%d, failed to prepare row at:%d since %s", TD_VID(pVnode), lino, tstrerror(code)); + taosArrayDestroy(pVals); + tRowDestroy(pRow); + taosArrayDestroy(aRows); + terrno = code; + aRows = NULL; + } + return aRows; +} + +static SArray *vnodePrepareSubmitTb(SVnode *pVnode, SAuditRecord *record, STSchema *pSchema, SSchemaWrapper *pTagSchema, + int64_t suid) { + int32_t code = 0; + int32_t lino = 0; + + SArray *aSubmitTbData = NULL; + SSubmitTbData tbData = {0}; + + if (!(aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + char tbName[TSDB_TABLE_NAME_LEN] = {0}; + (void)tsnprintf(tbName, TSDB_TABLE_NAME_LEN, "t_operations_%s", record->strClusterId); + + SMetaReader merTb = {0}; + metaReaderDoInit(&merTb, pVnode->pMeta, META_READER_LOCK); + if ((code = metaGetTableEntryByName(&merTb, tbName)) == 0) { + vTrace("vgId:%d, get table entry, table:%s uid:%" PRId64 ", suid:% " PRId64 ", version:%" PRId64 ", api:%p", + TD_VID(pVnode), tbName, merTb.me.uid, merTb.me.ctbEntry.suid, merTb.me.version, merTb.pAPI); + tbData.uid = merTb.me.uid; + metaReaderClear(&merTb); + } else if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + vTrace("vgId:%d, %s table not exist", TD_VID(pVnode), tbName); + + tbData.uid = tGenIdPI64(); + tbData.flags |= SUBMIT_REQ_AUTO_CREATE_TABLE; + + tbData.pCreateTbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); + + if (tbData.pCreateTbReq == NULL) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + tbData.pCreateTbReq->uid = tbData.uid; + tbData.pCreateTbReq->btime = taosGetTimestampMs(); + + code = vnodePrepareCreateTb(tbData.pCreateTbReq, tbName, suid, pTagSchema, record); + + metaReaderClear(&merTb); + TAOS_CHECK_GOTO(code, &lino, _exit); + } else { + metaReaderClear(&merTb); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + tbData.suid = suid; + tbData.sver = pSchema->version; + + tbData.aRowP = vnodePrepareRow(pVnode, pSchema, record); + if (tbData.aRowP == NULL) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + if (NULL == taosArrayPush(aSubmitTbData, &tbData)) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + +_exit: + if (code != 0) { + vError("vgId:%d, failed to prepare submitTb at:%d since %s", TD_VID(pVnode), lino, tstrerror(code)); + tDestroySubmitTbData(&tbData, TSDB_MSG_FLG_ENCODE); + if (aSubmitTbData != NULL) { + taosArrayDestroy(aSubmitTbData); + } + terrno = code; + aSubmitTbData = NULL; + } + return aSubmitTbData; +} + +static int32_t vnodeSaveOneAuditRecord(SVnode *pVnode, int64_t ver, SJson *pJson, SSchemaWrapper *pTagSchema, + int64_t suid, STSchema *pSchema, SRpcMsg *pOriginalMsg) { + int32_t code = 0; + int32_t lino = 0; + terrno = 0; + + SAuditRecord record = {0}; + SSubmitReq2 *pSubmitReq = NULL; + SSubmitRsp2 *pSubmitRsp = &(SSubmitRsp2){0}; + + TAOS_CHECK_GOTO(vnodeDecodeAuditRecord(pJson, &record), &lino, _exit); + + vTrace("vgId:%d, start to audit operation:%s", TD_VID(pVnode), record.operation); + + if (!(pSubmitReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + pSubmitReq->aSubmitTbData = vnodePrepareSubmitTb(pVnode, &record, pSchema, pTagSchema, suid); + if (pSubmitReq->aSubmitTbData == NULL) { + code = terrno; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + vTrace("vgId:%d, going to create table", TD_VID(pVnode)); + code = vnodeHandleAutoCreateTable(pVnode, ver, pSubmitReq, pSubmitRsp); + if (code) { + vError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pVnode), __func__, __FILE__, __LINE__, + tstrerror(code), ver); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + vTrace("vgId:%d, going to data write", TD_VID(pVnode)); + code = vnodeHandleDataWrite(pVnode, ver, pSubmitReq, pSubmitRsp); + if (code) { + vError("vgId:%d, %s failed at %s:%d since %s, version:%" PRId64, TD_VID(pVnode), __func__, __FILE__, __LINE__, + tstrerror(code), ver); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + vTrace("vgId:%d, affectedRow:%d", TD_VID(pVnode), pSubmitRsp->affectedRows); + +_exit: + if (code != 0) + vError("vgId:%d, failed to save one AuditRecord at line:%d, since %s", TD_VID(pVnode), lino, tstrerror(code)); + + // update statistics + (void)atomic_add_fetch_64(&pVnode->statis.nInsert, pSubmitRsp->affectedRows); + (void)atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); + if (code == 0) { + (void)atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows); + (void)atomic_add_fetch_64(&pVnode->statis.nBatchInsertSuccess, 1); + } + + // update metrics + METRICS_UPDATE(pVnode->writeMetrics.total_requests, METRIC_LEVEL_LOW, 1); + METRICS_UPDATE(pVnode->writeMetrics.total_rows, METRIC_LEVEL_HIGH, pSubmitRsp->affectedRows); + // METRICS_UPDATE(pVnode->writeMetrics.total_bytes, METRIC_LEVEL_LOW, pMsg->header.contLen); + + if (code == 0 && tsEnableMonitor && tsMonitorFqdn[0] != 0 && tsMonitorPort != 0 && pSubmitRsp->affectedRows > 0 && + strlen(RPC_MSG_USER(pOriginalMsg)) > 0 && tsInsertCounter != NULL) { + const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, + pVnode->monitor.strClusterId, + pVnode->monitor.strDnodeId, + tsLocalEp, + pVnode->monitor.strVgId, + RPC_MSG_USER(pOriginalMsg), + "Success"}; + int tv = taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels); + if (tv != 0) vError("vgId:%d, failed to taos counter add since return is %d", TD_VID(pVnode), tv); + } + + // clear + if (pSubmitReq != NULL) { + tDestroySubmitReq(pSubmitReq, TSDB_MSG_FLG_ENCODE); + taosMemoryFree(pSubmitReq); + } + if (pSubmitRsp != NULL) { + tDestroySSubmitRsp2(pSubmitRsp, TSDB_MSG_FLG_ENCODE); + } + + return code; +} + +static int32_t vnodeProcessAuditRecordReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pOriginalMsg) { + int32_t code = 0; + int32_t lino = 0; + terrno = 0; + + SVAuditRecordReq req = {0}; + if (tDeserializeSVAuditRecordReq(pReq, len, &req) != 0) { + vError("vgId:%d, failed to deserialize SVAuditRecordReq", TD_VID(pVnode)); + code = TSDB_CODE_INVALID_MSG; + return code; + } + + vTrace("vgId:%d, start to process AuditRecord Req", TD_VID(pVnode)); + + SJson *pJson = NULL; + STSchema *pSchema = NULL; + SSchemaWrapper *pTagSchema = NULL; + int64_t suid = 0; + + pJson = tjsonParse(req.data); + if (pJson == NULL) { + code = TSDB_CODE_INVALID_JSON_FORMAT; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + + SMetaReader merStb = {0}; + metaReaderDoInit(&merStb, pVnode->pMeta, META_READER_LOCK); + code = metaGetTableEntryByName(&merStb, AUDIT_STABLE_NAME); + if (code != 0) { + metaReaderClear(&merStb); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + vTrace("vgId:%d, get audit stable entry, uid:%" PRId64 ", suid:% " PRId64 ", version:%" PRId64 ", api:%p", + TD_VID(pVnode), merStb.me.uid, merStb.me.ctbEntry.suid, merStb.me.version, merStb.pAPI); + + code = vnodeGetTableSchema(pVnode, merStb.me.uid, &pSchema, &suid, &pTagSchema); + if (code != 0) { + metaReaderClear(&merStb); + TAOS_CHECK_GOTO(code, &lino, _exit); + } + vTrace("vgId:%d, get audit stable schema, version:%d, suid:%" PRId64, TD_VID(pVnode), pSchema->version, suid); + metaReaderClear(&merStb); + // pSchema pTagSchema can be cached in the future + + SJson *pRecords = tjsonGetObjectItem(pJson, "records"); + if (pRecords == NULL) { + TAOS_CHECK_GOTO(vnodeSaveOneAuditRecord(pVnode, ver, pJson, pTagSchema, suid, pSchema, pOriginalMsg), &lino, _exit); + } else { + int32_t size = tjsonGetArraySize(pRecords); + vTrace("%d items in records", size); + for (int32_t i = 0; i < size; ++i) { + SJson *pRecord = tjsonGetArrayItem(pRecords, i); + if (pRecord == NULL) { + vError("vgId:%d, failed to get array item %d", TD_VID(pVnode), i); + code = TSDB_CODE_INVALID_JSON_FORMAT; + TAOS_CHECK_GOTO(code, &lino, _exit); + } + TAOS_CHECK_GOTO(vnodeSaveOneAuditRecord(pVnode, ver, pRecord, pTagSchema, suid, pSchema, pOriginalMsg), &lino, + _exit); + } + } + +_exit: + if (code != 0) + vError("vgId:%d, failed to process AuditRecordReq at line:%d, since %s", TD_VID(pVnode), lino, tstrerror(code)); + + // clear + if (pSchema) taosMemoryFree(pSchema); + if (pTagSchema) { + taosMemoryFreeClear(pTagSchema->pSchema); + taosMemoryFree(pTagSchema); + } + if (pJson != NULL) { + cJSON_Delete(pJson); + pJson = NULL; + } + tFreeSVAuditRecordReq(&req); + + return code; +} + static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, SRpcMsg *pOriginalMsg) { int32_t code = 0; @@ -2756,6 +3289,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in RPC_MSG_USER(pOriginalMsg), "Success"}; int tv = taos_counter_add(tsInsertCounter, pSubmitRsp->affectedRows, sample_labels); + if (tv != 0) vError("vgId:%d, failed to taos counter add since return is %d", TD_VID(pVnode), tv); } if (code == 0) { diff --git a/source/libs/audit/inc/auditInt.h b/source/libs/audit/inc/auditInt.h index 7e269747fe18..400696f4dbf6 100644 --- a/source/libs/audit/inc/auditInt.h +++ b/source/libs/audit/inc/auditInt.h @@ -27,6 +27,8 @@ typedef struct { TdThreadRwlock infoLock; char auditDB[TSDB_DB_FNAME_LEN]; char auditToken[TSDB_TOKEN_LEN]; + SEpSet auditEpSet; + int32_t auditVgId; } SAudit; #endif /*_TD_AUDIT_INT_H_*/ diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 1355bd3ee77e..7a7375e35c2f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -1087,6 +1087,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_MUST_WALFORCE, "Audit database wal_ TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_MUST_KEEPFORCE, "Audit database keep2 must be greater than 1825d") TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_DB_ALREADY_EXIST, "Audit database already exist") TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_DB_NOT_ALLOW_CHANGE, "Audit database is not allowed to change") +TAOS_DEFINE_ERROR(TSDB_CODE_AUDIT_DB_NOT_MULTI_VGROUP, "Audit database is not allowed to keep multiple vgroups") // VTABLE TAOS_DEFINE_ERROR(TSDB_CODE_VTABLE_SCAN_INTERNAL_ERROR, "Virtual table scan internal error") diff --git a/source/util/src/tjson.c b/source/util/src/tjson.c index 8570aa6a8c62..ccb9219fa50c 100644 --- a/source/util/src/tjson.c +++ b/source/util/src/tjson.c @@ -216,10 +216,18 @@ int32_t tjsonGetStringValue2(const SJson* pJson, const char* pName, char* pVal, if (len >= maxLen - 1) { return TSDB_CODE_OUT_OF_MEMORY; } - strcpy(pVal, p); + if (pVal == NULL) { + return TSDB_CODE_INVALID_PARA; + } else { + strcpy(pVal, p); + } return TSDB_CODE_SUCCESS; } +const char* tjsonGetStringPointer(const SJson* pJson, const char* pName) { + return cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); +} + int32_t tjsonDupStringValue(const SJson* pJson, const char* pName, char** pVal) { char* p = cJSON_GetStringValue(tjsonGetObjectItem((cJSON*)pJson, pName)); if (NULL == p) { diff --git a/test/cases/80-Components/01-Taosd/test_com_taosd_audit.py b/test/cases/80-Components/01-Taosd/test_com_taosd_audit.py index 97732132b5e2..0b630a227598 100644 --- a/test/cases/80-Components/01-Taosd/test_com_taosd_audit.py +++ b/test/cases/80-Components/01-Taosd/test_com_taosd_audit.py @@ -152,7 +152,7 @@ class TestTaosdAudit: } - print ("===================: ", updatecfgDict) + tdLog.info("===================: ", updatecfgDict) def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) @@ -163,7 +163,7 @@ def init(self, conn, logSql, replicaVar=1): def test_taosd_audit(self): """Taosd telemetry audit - 1. Create database with vgroups 4 + 1. Create database with vgroups 1 2. Create super table and table 3. Insert data into table 4. Delete data from table @@ -186,7 +186,7 @@ def test_taosd_audit(self): # time.sleep(2) tdLog.info("create audit database") - sql = "create database audit is_audit 1 wal_level 2 ENCRYPT_ALGORITHM 'SM4-CBC';" + sql = "create database audit is_audit 1 wal_level 2 vgroups 1 ENCRYPT_ALGORITHM 'SM4-CBC';" tdSql.query(sql) tdLog.info("create user audit pass '123456Ab@' sysinfo 0;") diff --git a/test/cases/80-Components/01-Taosd/test_com_taosd_self_audit.py b/test/cases/80-Components/01-Taosd/test_com_taosd_self_audit.py new file mode 100644 index 000000000000..ec875c83006e --- /dev/null +++ b/test/cases/80-Components/01-Taosd/test_com_taosd_self_audit.py @@ -0,0 +1,158 @@ +from new_test_framework.utils import tdLog, tdSql, tdDnodes, cluster +import time +import platform + +serverPort = '6030' +hostname = "localhost" #socket.gethostname() + +class TestTaosdSelfAudit: + global hostname + global serverPort + if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""): + try: + config = eval(tdDnodes.dnodes[0].remoteIP ) + hostname = config["host"] + except Exception: + hostname = tdDnodes.dnodes[0].remoteIP + clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + clientCfgDict["serverPort"] = serverPort + clientCfgDict["firstEp"] = hostname + ':' + serverPort + clientCfgDict["secondEp"] = hostname + ':' + serverPort + clientCfgDict["fqdn"] = hostname + + updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'debugFlag':'131', 'fqdn':''} + updatecfgDict["clientCfg"] = clientCfgDict + updatecfgDict["serverPort"] = serverPort + updatecfgDict["firstEp"] = hostname + ':' + serverPort + updatecfgDict["secondEp"] = hostname + ':' + serverPort + updatecfgDict["fqdn"] = hostname + + updatecfgDict["debugFlag"] = '131' + updatecfgDict["vdebugFlag"] = '143' + updatecfgDict["ddebugFlag"] = '131' + updatecfgDict["mdebugFlag"] = '131' + updatecfgDict["rpcdebugFlag"] = '131' + updatecfgDict["qDebugFlag"] = '131' + updatecfgDict["smaDebugFlag"] = '131' + updatecfgDict["stDebugFlag"] = '131' + + updatecfgDict["audit"] = '1' + updatecfgDict["monitorFqdn"] = hostname + updatecfgDict["auditLevel"] = '5' + updatecfgDict["auditHttps"] = '0' + updatecfgDict["auditSaveInSelf"] = '1' + updatecfgDict["auditUseToken"] = '0' + + + encryptConfig = { + "svrKey": "sdfsadfasdfasfas", + "dbKey": "sdfsadfasdfasfas", + "dataKey": "sdfsadfasdfasfas", + "generateConfig": True, + "generateMeta": True, + "generateData": True + + } + + print ("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), logSql) + self.dnodes = cluster.dnodes + + def test_taosd_self_audit(self): + """Taosd self audit + + 1. Create database with vgroups 1 + 2. Create super table and table + 3. Insert data into table + 4. Delete data from table + 5. Check info content valid + + Since: v3.4.0.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-1-15 dmchen init + + """ + tdSql.prepare() + # time.sleep(2) + + tdLog.info("create audit database") + sql = "create database audit is_audit 1 wal_level 2 vgroups 1 ENCRYPT_ALGORITHM 'SM4-CBC' PRECISION 'ns';" + tdSql.execute(sql, queryTimes = 1) + + time.sleep(3) + + # 1 create user + tdLog.info("create user") + sql = "create user test pass '123456Ab@' sysinfo 0;" + tdSql.execute(sql, queryTimes = 1) + + tdLog.info("check create user") + sql = "select * from audit.operations where operation = 'createUser' and resource = 'test';" + tdSql.query(sql) + tdSql.checkRows(1) + + + # 2 create stb + tdLog.info("create stb") + sql = "create table db.stb (ts timestamp, f int) tags (t int)" + tdSql.execute(sql, queryTimes = 1) + + tdLog.info("check create stb") + sql = "select * from audit.operations where operation = 'createStb' and resource = 'stb';" + tdSql.query(sql) + tdSql.checkRows(1) + + + # 3 create tb, insert tb, select stb + tdLog.info("create tb") + sql = "create table db.tb using db.stb tags (1)" + tdSql.execute(sql, queryTimes = 1) + + tdLog.info("insert tb") + sql = "INSERT INTO db.tb USING db.stb TAGS (1) VALUES (NOW, 2);" + tdSql.execute(sql, queryTimes = 1) + + tdLog.info("select stb") + sql = "SELECT * FROM db.stb;" + tdSql.query(sql) + + time.sleep(6) + + tdLog.info("check create tb") + sql = "select * from audit.operations where operation = 'createTable';" + tdSql.query(sql) + tdSql.checkRows(1) + + tdLog.info("check insert tb") + sql = "select * from audit.operations where operation = 'insert' and resource = 'tb';" + tdSql.query(sql) + tdSql.checkRows(1) + + tdLog.info("check select stb") + sql = "select * from audit.operations where operation = 'select' and resource = 'stb';" + tdSql.query(sql) + tdSql.checkRows(1) + + # 4 delete tb + tdLog.info("delete tb") + sql = "delete from db.tb" + tdSql.execute(sql, queryTimes = 1) + + time.sleep(6) + + tdLog.info("check delete tb") + sql = "select * from audit.operations where operation = 'delete' and resource = 'tb';" + tdSql.query(sql) + tdSql.checkRows(1) + + tdLog.success(f"{__file__} successfully executed") + diff --git a/test/ci/cases.task b/test/ci/cases.task index e598f9823deb..c34ba3504c04 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -1004,6 +1004,7 @@ ,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_config_refresh.py -N 3 -M 3 ,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_persisit_config.py ,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_taosd_audit.py +,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_taosd_self_audit.py ,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_taosd_monitor.py ,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_taosd_restart.py ,,y,.,./ci/pytest.sh pytest cases/80-Components/01-Taosd/test_com_telemetry.py