diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e64c0b09..c2dec3ce 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -63,7 +63,7 @@ jobs: with: node-version: ${{ matrix.node-version }} - - name: Test nodejs websocket + - name: Run tests working-directory: nodejs-connector/nodejs env: TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }} diff --git a/.github/workflows/compatibility.yml b/.github/workflows/compatibility.yml index fd70d4f0..e60d8893 100644 --- a/.github/workflows/compatibility.yml +++ b/.github/workflows/compatibility.yml @@ -1,4 +1,4 @@ -name: nodejs Compatibility +name: Node.js Compatibility on: push: diff --git a/.github/workflows/enterprise.yml b/.github/workflows/enterprise.yml new file mode 100644 index 00000000..32328ebf --- /dev/null +++ b/.github/workflows/enterprise.yml @@ -0,0 +1,59 @@ +name: Node.js Enterprise + +on: + push: + branches: + - "main" + - "3.0" + pull_request: + branches: + - "main" + - "3.0" + +jobs: + build: + runs-on: ubuntu-22.04 + strategy: + matrix: + node-version: [16.x, 20.x] + steps: + - name: Get TDengine + run: wget "${{ secrets.NIGHTLY_TDENGINE_ENTERPRISE_BASE_URL }}/tsdb-nightly-3.0.tar.gz?v=$(date +%s)" -O tsdb-nightly-3.0.tar.gz + + - name: Install TDengine + run: | + tar -zxf tsdb-nightly-3.0.tar.gz + rm -rf tsdb-nightly-3.0.tar.gz + cd tsdb-nightly-3.0 + sudo ./install.sh + + - name: Start TDengine + run: | + sudo mkdir -p /etc/taos + sudo mkdir -p /var/log/taos + nohup sudo taosd & + nohup sudo taosadapter & + + - name: Checkout + uses: actions/checkout@v4 + with: + path: "nodejs-connector" + clean: true + set-safe-directory: true + + - name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v3 + with: + node-version: ${{ matrix.node-version }} + + - name: Run tests + working-directory: nodejs-connector/nodejs + env: + TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }} + TDENGINE_CLOUD_TOKEN: ${{ secrets.TDENGINE_CLOUD_TOKEN }} + TEST_ENTERPRISE: true + run: | + npm install + npm list + npm run example + npm run test diff --git a/nodejs/src/client/wsClient.ts b/nodejs/src/client/wsClient.ts index 743ee1e0..fd19cda1 100644 --- a/nodejs/src/client/wsClient.ts +++ b/nodejs/src/client/wsClient.ts @@ -10,7 +10,7 @@ import { import { WSVersionResponse, WSQueryResponse } from "./wsResponse"; import { ReqId } from "../common/reqid"; import logger from "../common/log"; -import { safeDecodeURIComponent, compareVersions, maskPasswordForLog } from "../common/utils"; +import { safeDecodeURIComponent, compareVersions, maskSensitiveForLog, maskUrlForLog } from "../common/utils"; import { w3cwebsocket } from "websocket"; import { TSDB_OPTION_CONNECTION } from "../common/constant"; @@ -21,16 +21,19 @@ export class WsClient { private readonly _url: URL; private static readonly _minVersion = "3.3.2.0"; private _version?: string | undefined | null; + private _bearerToken?: string | undefined | null; constructor(url: URL, timeout?: number | undefined | null) { this.checkURL(url); this._url = url; this._timeout = timeout; if (this._url.searchParams.has("timezone")) { - this._timezone = - this._url.searchParams.get("timezone") || undefined; + this._timezone = this._url.searchParams.get("timezone") || undefined; this._url.searchParams.delete("timezone"); } + if (this._url.searchParams.has("bearer_token")) { + this._bearerToken = this._url.searchParams.get("bearer_token") || undefined; + } } async connect(database?: string | undefined | null): Promise { @@ -42,11 +45,12 @@ export class WsClient { password: safeDecodeURIComponent(this._url.password), db: database, ...(this._timezone && { tz: this._timezone }), + ...(this._bearerToken && { bearer_token: this._bearerToken }), }, }; if (logger.isDebugEnabled()) { logger.debug("[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg, (key, value) => - key === "password" ? "[REDACTED]" : value + (key === "password" || key === "bearer_token") ? "[REDACTED]" : value )); } this._wsConnector = await WebSocketConnectionPool.instance().getConnection( @@ -58,9 +62,7 @@ export class WsClient { } try { await this._wsConnector.ready(); - let result: any = await this._wsConnector.sendMsg( - JSON.stringify(connMsg) - ); + let result: any = await this._wsConnector.sendMsg(JSON.stringify(connMsg)); if (result.msg.code == 0) { return; } @@ -68,24 +70,17 @@ export class WsClient { throw new WebSocketQueryError(result.msg.code, result.msg.message); } catch (e: any) { await this.close(); - logger.error( - `connection creation failed, url: ${this._url}, code:${e.code}, msg:${e.message}` - ); + const maskedUrl = maskUrlForLog(this._url); + logger.error(`connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}`); throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${this._url}, code:${e.code}, msg:${e.message}` + `connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}` ); } } - async setOptionConnection( - option: TSDB_OPTION_CONNECTION, - value: string | null - ): Promise { - logger.debug( - "[wsClient.setOptionConnection]===>" + option + ", " + value - ); - + async setOptionConnection(option: TSDB_OPTION_CONNECTION, value: string | null): Promise { + logger.debug("[wsClient.setOptionConnection]===>" + option + ", " + value); let connMsg = { action: "options_connection", args: { @@ -125,7 +120,7 @@ export class WsClient { async exec(queryMsg: string, bSqlQuery: boolean = true): Promise { return new Promise((resolve, reject) => { if (logger.isDebugEnabled()) { - logger.debug("[wsQueryInterface.query.queryMsg]===>" + maskPasswordForLog(queryMsg)); + logger.debug("[wsQueryInterface.query.queryMsg]===>" + maskSensitiveForLog(queryMsg)); } if ( this._wsConnector && @@ -221,27 +216,25 @@ export class WsClient { async ready(): Promise { try { - this._wsConnector = - await WebSocketConnectionPool.instance().getConnection( - this._url, - this._timeout - ); + this._wsConnector = await WebSocketConnectionPool.instance().getConnection( + this._url, + this._timeout + ); if (this._wsConnector.readyState() !== w3cwebsocket.OPEN) { await this._wsConnector.ready(); } - logger.debug( - "ready status ", - this._url, - this._wsConnector.readyState() - ); + if (logger.isDebugEnabled()) { + logger.debug("ready status ", maskUrlForLog(this._url), this._wsConnector.readyState()); + } return; } catch (e: any) { + const maskedUrl = maskUrlForLog(this._url); logger.error( - `connection creation failed, url: ${this._url}, code: ${e.code}, message: ${e.message}` + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` ); throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${this._url}, code: ${e.code}, message: ${e.message}` + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` ); } } @@ -321,23 +314,19 @@ export class WsClient { if (this._wsConnector.readyState() !== w3cwebsocket.OPEN) { await this._wsConnector.ready(); } - let result: any = await this._wsConnector.sendMsg( - JSONBig.stringify(versionMsg) - ); + let result: any = await this._wsConnector.sendMsg(JSONBig.stringify(versionMsg)); if (result.msg.code == 0) { return new WSVersionResponse(result).version; } - throw new WebSocketInterfaceError( - result.msg.code, - result.msg.message - ); + throw new WebSocketInterfaceError(result.msg.code, result.msg.message); } catch (e: any) { + const maskedUrl = maskUrlForLog(this._url); logger.error( - `connection creation failed, url: ${this._url}, code: ${e.code}, message: ${e.message}` + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` ); throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${this._url}, code: ${e.code}, message: ${e.message}` + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` ); } } @@ -354,12 +343,12 @@ export class WsClient { } checkURL(url: URL) { - // Assert is cloud url - if (!url.searchParams.has("token")) { + // Assert token or bearer_token exists, otherwise username and password must exist. + if (!url.searchParams.get("token") && !url.searchParams.get("bearer_token")) { if (!(url.username || url.password)) { throw new WebSocketInterfaceError( ErrorCode.ERR_INVALID_AUTHENTICATION, - "invalid url, password or username needed." + `invalid url, provide non-empty "token" or "bearer_token", or provide username/password` ); } } diff --git a/nodejs/src/client/wsConnector.ts b/nodejs/src/client/wsConnector.ts index 09d38b71..eb49e8a4 100644 --- a/nodejs/src/client/wsConnector.ts +++ b/nodejs/src/client/wsConnector.ts @@ -7,7 +7,7 @@ import { import { OnMessageType, WsEventCallback } from "./wsEventCallback"; import logger from "../common/log"; import { ReqId } from "../common/reqid"; -import { maskPasswordForLog } from "../common/utils"; +import { maskSensitiveForLog, maskUrlForLog } from "../common/utils"; export class WebSocketConnector { private _wsConn: w3cwebsocket; @@ -35,13 +35,9 @@ export class WebSocketConnector { } ); this._wsConn.onerror = function (err: Error) { - logger.error( - `webSocket connection failed, url: ${this.url}, error: ${err.message}` - ); + logger.error(`webSocket connection failed, url: ${maskUrlForLog(new URL(this.url))}, error: ${err.message}`); }; - this._wsConn.onclose = this._onclose; - this._wsConn.onmessage = this._onmessage; this._wsConn._binaryType = "arraybuffer"; } else { @@ -87,9 +83,7 @@ export class WebSocketConnector { private _onmessage(event: any) { let data = event.data; - logger.debug( - "wsClient._onMessage()====" + Object.prototype.toString.call(data) - ); + logger.debug("wsClient._onMessage()====" + Object.prototype.toString.call(data)); if (Object.prototype.toString.call(data) === "[object ArrayBuffer]") { let id = new DataView(data, 26, 8).getBigUint64(0, true); WsEventCallback.instance().handleEventCallback( @@ -143,7 +137,7 @@ export class WebSocketConnector { reject( new WebSocketQueryError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `WebSocket connection is not ready,status :${this._wsConn?.readyState}` + `WebSocket connection is not ready, status: ${this._wsConn?.readyState}` ) ); } @@ -152,7 +146,7 @@ export class WebSocketConnector { async sendMsg(message: string, register: Boolean = true) { if (logger.isDebugEnabled()) { - logger.debug("[wsClient.sendMessage()]===>" + maskPasswordForLog(message)); + logger.debug("[wsClient.sendMessage()]===>" + maskSensitiveForLog(message)); } let msg = JSON.parse(message); if (msg.args.id !== undefined) { @@ -174,14 +168,14 @@ export class WebSocketConnector { ); } if (logger.isDebugEnabled()) { - logger.debug("[wsClient.sendMessage.msg]===>" + maskPasswordForLog(message)); + logger.debug("[wsClient.sendMessage.msg]===>" + maskSensitiveForLog(message)); } this._wsConn.send(message); } else { reject( new WebSocketQueryError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `WebSocket connection is not ready,status :${this._wsConn?.readyState}` + `WebSocket connection is not ready, status: ${this._wsConn?.readyState}` ) ); } @@ -219,7 +213,7 @@ export class WebSocketConnector { reject( new WebSocketQueryError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `WebSocket connection is not ready,status :${this._wsConn?.readyState}` + `WebSocket connection is not ready, status: ${this._wsConn?.readyState}` ) ); } diff --git a/nodejs/src/client/wsConnectorPool.ts b/nodejs/src/client/wsConnectorPool.ts index 549916e0..c87586c9 100644 --- a/nodejs/src/client/wsConnectorPool.ts +++ b/nodejs/src/client/wsConnectorPool.ts @@ -3,6 +3,7 @@ import { WebSocketConnector } from "./wsConnector"; import { ErrorCode, TDWebSocketClientError } from "../common/wsError"; import logger from "../common/log"; import { w3cwebsocket } from "websocket"; +import { maskUrlForLog } from "../common/utils"; const mutex = new Mutex(); @@ -16,25 +17,18 @@ export class WebSocketConnectionPool { private constructor(maxConnections: number = -1) { this._maxConnections = maxConnections; WebSocketConnectionPool.sharedBuffer = new SharedArrayBuffer(4); - WebSocketConnectionPool.sharedArray = new Int32Array( - WebSocketConnectionPool.sharedBuffer - ); + WebSocketConnectionPool.sharedArray = new Int32Array(WebSocketConnectionPool.sharedBuffer); Atomics.store(WebSocketConnectionPool.sharedArray, 0, 0); } - public static instance( - maxConnections: number = -1 - ): WebSocketConnectionPool { + public static instance(maxConnections: number = -1): WebSocketConnectionPool { if (!WebSocketConnectionPool._instance) { WebSocketConnectionPool._instance = new WebSocketConnectionPool(maxConnections); } return WebSocketConnectionPool._instance; } - async getConnection( - url: URL, - timeout: number | undefined | null - ): Promise { + async getConnection(url: URL, timeout: number | undefined | null): Promise { let connectAddr = url.origin.concat(url.pathname).concat(url.search); let connector: WebSocketConnector | undefined; const unlock = await mutex.acquire(); @@ -46,18 +40,13 @@ export class WebSocketConnectionPool { if (!candidate) { continue; } - if ( - candidate && - candidate.readyState() === w3cwebsocket.OPEN - ) { + if (candidate && candidate.readyState() === w3cwebsocket.OPEN) { connector = candidate; break; } else if (candidate) { Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1); candidate.close(); - logger.error( - `getConnection, current connection status fail, url: ${connectAddr}` - ); + logger.error(`getConnection, current connection status fail, url: ${maskUrlForLog(new URL(connectAddr))}`); } } } @@ -72,8 +61,7 @@ export class WebSocketConnectionPool { if ( this._maxConnections != -1 && - Atomics.load(WebSocketConnectionPool.sharedArray, 0) > - this._maxConnections + Atomics.load(WebSocketConnectionPool.sharedArray, 0) > this._maxConnections ) { throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_ARRIVED_LIMIT, @@ -102,9 +90,7 @@ export class WebSocketConnectionPool { try { if (connector.readyState() === w3cwebsocket.OPEN) { let url = connector.getWsURL(); - let connectAddr = url.origin - .concat(url.pathname) - .concat(url.search); + let connectAddr = url.origin.concat(url.pathname).concat(url.search); let connectors = this.pool.get(connectAddr); if (!connectors) { connectors = new Array(); @@ -113,10 +99,7 @@ export class WebSocketConnectionPool { } else { connectors.push(connector); } - logger.info( - "releaseConnection, current connection count:" + - connectors.length - ); + logger.info("releaseConnection, current connection count:" + connectors.length); } else { Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1); connector.close(); @@ -159,19 +142,13 @@ process.on("exit", (code) => { }); process.on("SIGINT", () => { - logger.info( - "Received SIGINT. Press Control-D to exit, begin destroy connect..." - ); + logger.info("Received SIGINT. Press Control-D to exit, begin destroy connect..."); WebSocketConnectionPool.instance().destroyed(); process.exit(); }); process.on("SIGTERM", () => { - logger.info( - "Received SIGINT. Press Control-D to exit, begin destroy connect" - ); + logger.info("Received SIGTERM. Press Control-D to exit, begin destroy connect..."); WebSocketConnectionPool.instance().destroyed(); process.exit(); }); - -// process.kill(process.pid, 'SIGINT'); diff --git a/nodejs/src/client/wsEventCallback.ts b/nodejs/src/client/wsEventCallback.ts index 860513e6..a08259b4 100644 --- a/nodejs/src/client/wsEventCallback.ts +++ b/nodejs/src/client/wsEventCallback.ts @@ -29,10 +29,11 @@ export enum OnMessageType { } const eventMutex = new Mutex(); + export class WsEventCallback { private static _instance?: WsEventCallback; - private static _msgActionRegister: Map = - new Map(); + private static _msgActionRegister: Map = new Map(); + private constructor() { } public static instance(): WsEventCallback { diff --git a/nodejs/src/client/wsResponse.ts b/nodejs/src/client/wsResponse.ts index 78197460..e58892ea 100644 --- a/nodejs/src/client/wsResponse.ts +++ b/nodejs/src/client/wsResponse.ts @@ -1,7 +1,3 @@ -/** - * define ws Response type|class, for query? - */ - import { MessageResp, readVarchar } from "../common/taosResult"; export class WSVersionResponse { @@ -10,6 +6,7 @@ export class WSVersionResponse { message: string; action: string; totalTime: number; + constructor(resp: MessageResp) { this.version = resp.msg.version; this.code = resp.msg.code; @@ -41,6 +38,7 @@ export class WSQueryResponse { this.totalTime = resp.totalTime; this.initMsg(resp.msg); } + private initMsg(msg: any) { this.code = msg.code; this.message = msg.message; @@ -81,6 +79,7 @@ export class WSFetchBlockResponse { finished: number | undefined; metaType: number | undefined; textDecoder: TextDecoder; + constructor(msg: ArrayBuffer) { let dataView = new DataView(msg); this.action = dataView.getBigUint64(8, true); diff --git a/nodejs/src/common/config.ts b/nodejs/src/common/config.ts index 445a9582..34ebb318 100644 --- a/nodejs/src/common/config.ts +++ b/nodejs/src/common/config.ts @@ -9,6 +9,7 @@ export class WSConfig { private _token: string | undefined | null; private _timezone: string | undefined | null; private _minStmt2Version: string; + private _bearerToken: string | undefined | null; constructor(url: string, minStmt2Version?: string) { this._url = url; @@ -75,6 +76,14 @@ export class WSConfig { this._timezone = timezone; } + public getBearerToken(): string | undefined | null { + return this._bearerToken; + } + + public setBearerToken(token: string) { + this._bearerToken = token; + } + public getMinStmt2Version() { return this._minStmt2Version; } diff --git a/nodejs/src/common/taosResult.ts b/nodejs/src/common/taosResult.ts index 117892d4..b98b0c74 100644 --- a/nodejs/src/common/taosResult.ts +++ b/nodejs/src/common/taosResult.ts @@ -12,7 +12,6 @@ import { import { appendRune } from "./ut8Helper"; import logger from "./log"; import { decimalToString } from "./utils"; -import { TMQRawDataSchema } from "../tmq/constant"; export interface TDengineMeta { name: string; diff --git a/nodejs/src/common/utils.ts b/nodejs/src/common/utils.ts index 9f3de675..c7a2e53f 100644 --- a/nodejs/src/common/utils.ts +++ b/nodejs/src/common/utils.ts @@ -1,3 +1,4 @@ +import { TmqConfig } from "../tmq/config"; import { WSConfig } from "./config"; import { ErrorCode, TDWebSocketClientError } from "./wsError"; @@ -28,6 +29,18 @@ export function getUrl(wsConfig: WSConfig): URL { wsConfig.setTimezone(url.searchParams.get("timezone") || ""); } + const bearerToken = wsConfig.getBearerToken(); + if (bearerToken) { + url.searchParams.set("bearer_token", bearerToken); + } else { + const bearerTokenFromUrl = url.searchParams.get("bearer_token"); + if (bearerTokenFromUrl) { + wsConfig.setBearerToken(bearerTokenFromUrl); + } else { + url.searchParams.delete("bearer_token"); + } + } + url.pathname = "/ws"; return url; } @@ -184,8 +197,41 @@ export function decimalToString( return decimalStr; } -const PASSWORD_FIELD_REGEX = /("password"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g; +const SENSITIVE_FIELD_REGEX = /("(?:password|bearer_token)"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g; + +export function maskSensitiveForLog(message: string): string { + return message.replace(SENSITIVE_FIELD_REGEX, '$1"[REDACTED]"'); +} + +export function maskUrlForLog(url: URL | null): string { + if (!url) { + return ""; + } + + const masked = new URL(url.toString()); + masked.password = "[REDACTED]"; + if (masked.searchParams.has("token")) { + masked.searchParams.set("token", "[REDACTED]"); + } + if (masked.searchParams.has("bearer_token")) { + masked.searchParams.set("bearer_token", "[REDACTED]"); + } + return masked.toString().replace(/%5BREDACTED%5D/g, "[REDACTED]"); +} -export function maskPasswordForLog(message: string): string { - return message.replace(PASSWORD_FIELD_REGEX, '$1"[REDACTED]"'); +export function maskTmqConfigForLog(config: TmqConfig): object { + const masked = { ...config, otherConfigs: Object.fromEntries(config.otherConfigs) }; + if (masked.url) { + masked.url = new URL(maskUrlForLog(masked.url)); + } + if (masked.sql_url) { + masked.sql_url = new URL(maskUrlForLog(masked.sql_url)); + } + if (masked.token) { + masked.token = "[REDACTED]"; + } + if (masked.password) { + masked.password = "[REDACTED]"; + } + return masked; } diff --git a/nodejs/src/sql/wsSql.ts b/nodejs/src/sql/wsSql.ts index 53295770..650e3482 100644 --- a/nodejs/src/sql/wsSql.ts +++ b/nodejs/src/sql/wsSql.ts @@ -166,17 +166,16 @@ export class WsSql { ): Promise { try { let bigintReqId = BigInt(ReqId.getReqID(reqId)); - let wsQueryResponse: WSQueryResponse = - await this._wsClient.sendBinaryMsg( + let wsQueryResponse: WSQueryResponse = await this._wsClient.sendBinaryMsg( + bigintReqId, + action, + getBinarySql( + BinaryQueryMessage, bigintReqId, - action, - getBinarySql( - BinaryQueryMessage, - bigintReqId, - BigInt(0), - sql - ) - ); + BigInt(0), + sql + ) + ); let taosResult = new TaosResult(wsQueryResponse); if (wsQueryResponse.is_update) { return taosResult; diff --git a/nodejs/src/tmq/config.ts b/nodejs/src/tmq/config.ts index 4e954076..21886ab8 100644 --- a/nodejs/src/tmq/config.ts +++ b/nodejs/src/tmq/config.ts @@ -1,11 +1,11 @@ import { TMQConstants } from "./constant"; export class TmqConfig { - // req_id: number; url: URL | null = null; sql_url: URL | null = null; user: string | null = null; password: string | null = null; + token: string | null = null; group_id: string | null = null; client_id: string | null = null; offset_rest: string | null = null; @@ -28,6 +28,9 @@ export class TmqConfig { case TMQConstants.CONNECT_PASS: this.password = value; break; + case TMQConstants.CONNECT_TOKEN: + this.token = value; + break; case TMQConstants.GROUP_ID: this.group_id = value; break; @@ -62,6 +65,16 @@ export class TmqConfig { } else { this.password = this.url.password; } + if (this.token) { + this.url.searchParams.set("bearer_token", this.token); + } else { + const bearerToken = this.url.searchParams.get("bearer_token"); + if (bearerToken) { + this.token = bearerToken; + } else { + this.url.searchParams.delete("bearer_token"); + } + } this.sql_url = new URL(this.url); this.sql_url.pathname = "/ws"; diff --git a/nodejs/src/tmq/constant.ts b/nodejs/src/tmq/constant.ts index d701ba84..399c56da 100644 --- a/nodejs/src/tmq/constant.ts +++ b/nodejs/src/tmq/constant.ts @@ -59,6 +59,11 @@ export class TMQConstants { */ public static CONNECT_PASS: string = "td.connect.pass"; + /** + * connection token + */ + public static CONNECT_TOKEN: string = "td.connect.token"; + /** * connect type websocket or jni, default is jni */ diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index 49d61a9c..824e05fe 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -18,11 +18,11 @@ import { TopicPartition, WSTmqFetchBlockInfo, WsPollResponse, - WsTmqQueryResponse, } from "./tmqResponse"; import { ReqId } from "../common/reqid"; import logger from "../common/log"; import { WSFetchBlockResponse } from "../client/wsResponse"; +import { maskTmqConfigForLog, maskUrlForLog } from "../common/utils"; export class WsConsumer { private _wsClient: WsClient; @@ -30,9 +30,12 @@ export class WsConsumer { private _topics?: string[]; private _commitTime?: number; private _lastMessageID?: bigint; + private constructor(wsConfig: Map) { this._wsConfig = new TmqConfig(wsConfig); - logger.debug(this._wsConfig); + if (logger.isDebugEnabled()) { + logger.debug(maskTmqConfigForLog(this._wsConfig)); + } if (wsConfig.size == 0 || !this._wsConfig.url) { throw new WebSocketInterfaceError( ErrorCode.ERR_INVALID_URL, @@ -60,7 +63,7 @@ export class WsConsumer { } else { throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${this._wsConfig.url}` + `connection creation failed, url: ${maskUrlForLog(this._wsConfig.url)}` ); } } catch (e: any) { @@ -99,6 +102,7 @@ export class WsConsumer { req_id: ReqId.getReqID(reqId), user: this._wsConfig.user, password: this._wsConfig.password, + ...(this._wsConfig.token && { bearer_token: this._wsConfig.token }), group_id: this._wsConfig.group_id, client_id: this._wsConfig.client_id, topics: topics, @@ -122,10 +126,7 @@ export class WsConsumer { return await this._wsClient.exec(JSON.stringify(queryMsg)); } - async poll( - timeoutMs: number, - reqId?: number - ): Promise> { + async poll(timeoutMs: number, reqId?: number): Promise> { if (this._wsConfig.auto_commit) { if (this._commitTime) { let currTime = new Date().getTime(); @@ -148,7 +149,6 @@ export class WsConsumer { req_id: ReqId.getReqID(reqId), }, }; - let resp = await this._wsClient.exec(JSON.stringify(queryMsg), false); return new SubscriptionResp(resp).topics; } @@ -166,7 +166,6 @@ export class WsConsumer { message_id: 0, }, }; - await this._wsClient.exec(JSON.stringify(queryMsg)); } @@ -205,9 +204,7 @@ export class WsConsumer { return new CommittedResp(resp).setTopicPartitions(offsets); } - async commitOffsets( - partitions: Array - ): Promise> { + async commitOffsets(partitions: Array): Promise> { if (!partitions || partitions.length == 0) { throw new TaosResultError( ErrorCode.ERR_INVALID_PARAMS, @@ -222,10 +219,7 @@ export class WsConsumer { return await this.committed(partitions); } - async commitOffset( - partition: TopicPartition, - reqId?: number - ): Promise { + async commitOffset(partition: TopicPartition, reqId?: number): Promise { if (!partition) { throw new TaosResultError( ErrorCode.ERR_INVALID_PARAMS, @@ -313,7 +307,6 @@ export class WsConsumer { "WsTmq SeekToEnd params is error!" ); } - return await this.seekToBeginOrEnd(partitions, false); } @@ -349,7 +342,6 @@ export class WsConsumer { return true; } } - return false; } diff --git a/nodejs/test/bulkPulling/sql.test.ts b/nodejs/test/bulkPulling/sql.test.ts index 8e874add..33522cb1 100644 --- a/nodejs/test/bulkPulling/sql.test.ts +++ b/nodejs/test/bulkPulling/sql.test.ts @@ -1,22 +1,23 @@ import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool"; import { WSConfig } from "../../src/common/config"; import { WsSql } from "../../src/sql/wsSql"; -import { Sleep, testPassword, testUsername } from "../utils"; +import { Sleep, testPassword, testUsername, testEnterprise } from "../utils"; import { setLevel } from "../../src/common/log"; -let dns = "ws://localhost:6041"; +let dsn = "ws://localhost:6041"; let password1 = "Ab1!@#$%,.:?<>;~"; let password2 = "Bc%^&*()-_+=[]{}"; setLevel("debug"); beforeAll(async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); await wsSql.exec("drop database if exists sql_test"); await wsSql.exec("drop database if exists sql_create"); - await wsSql.exec(`CREATE USER user1 PASS '${password1}'`); - await wsSql.exec(`CREATE USER user2 PASS '${password2}'`); + await wsSql.exec(`create user user1 pass '${password1}'`); + await wsSql.exec(`create user user2 pass '${password2}'`); + await wsSql.exec("create user token_user pass 'token_pass_1'"); await wsSql.exec( "create database if not exists sql_test KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;" ); @@ -33,7 +34,7 @@ describe("TDWebSocket.WsSql()", () => { test("normal connect", async () => { let wsSql = null; let conf: WSConfig = new WSConfig(""); - conf.setUrl(dns); + conf.setUrl(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); conf.setDb("sql_test"); @@ -53,7 +54,7 @@ describe("TDWebSocket.WsSql()", () => { test("special characters connect1", async () => { let wsSql = null; - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser("user1"); conf.setPwd(password1); wsSql = await WsSql.open(conf); @@ -65,7 +66,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("special characters connect2", async () => { let wsSql = null; - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser("user2"); conf.setPwd(password2); wsSql = await WsSql.open(conf); @@ -80,7 +81,7 @@ describe("TDWebSocket.WsSql()", () => { expect.assertions(1); let wsSql = null; try { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); conf.setDb("jest"); @@ -114,7 +115,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("get taosc version", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -125,7 +126,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("show databases", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -136,7 +137,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("create databases", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -149,7 +150,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("create stable", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -166,7 +167,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("insert recoder", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -186,7 +187,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("query sql", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -210,7 +211,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("query sql no getdata", async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); @@ -223,7 +224,7 @@ describe("TDWebSocket.WsSql()", () => { }); test("timestamp order check", async () => { - const conf: WSConfig = new WSConfig(dns); + const conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); @@ -265,17 +266,78 @@ describe("TDWebSocket.WsSql()", () => { await wsSql.close(); }); + + testEnterprise("connect with token", async () => { + const conf = new WSConfig(dsn); + conf.setUser(testUsername()); + conf.setPwd(testPassword()); + const wsSql = await WsSql.open(conf); + const wsRows = await wsSql.query("create token test_bearer_token from user token_user"); + await wsRows.next(); + const token = wsRows.getData()?.[0] as string; + expect(token).toBeTruthy(); + await wsRows.close(); + await wsSql.close(); + + const assertServerVersionWithConfig = async (config: WSConfig) => { + const client = await WsSql.open(config); + const rows = await client.query("select server_version()"); + await rows.next(); + const version = rows.getData()?.[0] as string; + expect(version).toBeTruthy(); + await rows.close(); + await client.close(); + }; + + const conf1 = new WSConfig(dsn); + conf1.setBearerToken(token); + await assertServerVersionWithConfig(conf1); + + const conf2 = new WSConfig("ws://localhost:6041?bearer_token=" + token); + await assertServerVersionWithConfig(conf2); + }); + + testEnterprise("connect with invalid token", async () => { + let conf = new WSConfig("ws://localhost:6041?bearer_token=invalid_token"); + await expect(WsSql.open(conf)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid token/i), + }); + + conf = new WSConfig("ws://localhost:6041"); + conf.setBearerToken("invalid_token1"); + await expect(WsSql.open(conf)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid token/i), + }); + + conf = new WSConfig("ws://localhost:6041"); + conf.setBearerToken(" "); + await expect(WsSql.open(conf)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid token/i), + }); + + conf = new WSConfig("ws://localhost:6041?bearer_token="); + await expect(WsSql.open(conf)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid url/i), + }); + + conf = new WSConfig("ws://localhost:6041"); + conf.setBearerToken(""); + await expect(WsSql.open(conf)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid url/i), + }); + }); }); afterAll(async () => { - let conf: WSConfig = new WSConfig(dns); + let conf: WSConfig = new WSConfig(dsn); conf.setUser(testUsername()); conf.setPwd(testPassword()); let wsSql = await WsSql.open(conf); await wsSql.exec("drop database sql_test"); await wsSql.exec("drop database sql_create"); - await wsSql.exec("DROP USER user1;"); - await wsSql.exec("DROP USER user2;"); + await wsSql.exec("drop user user1"); + await wsSql.exec("drop user user2"); + await wsSql.exec("drop user token_user"); await wsSql.close(); WebSocketConnectionPool.instance().destroyed(); }); diff --git a/nodejs/test/bulkPulling/tmq.config.test.ts b/nodejs/test/bulkPulling/tmq.config.test.ts new file mode 100644 index 00000000..a1f84c2d --- /dev/null +++ b/nodejs/test/bulkPulling/tmq.config.test.ts @@ -0,0 +1,84 @@ +import { TmqConfig } from "../../src/tmq/config"; +import { TMQConstants } from "../../src/tmq/constant"; +import { testPassword, testUsername } from "../utils"; + +describe("TmqConfig - td.connect.token", () => { + const baseUrl = "ws://localhost:6041"; + + test("token field is null when CONNECT_TOKEN not provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_USER, testUsername()], + [TMQConstants.CONNECT_PASS, testPassword()], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.token).toBeNull(); + }); + + test("token field is set when CONNECT_TOKEN is provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_TOKEN, "mytoken123"], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.token).toBe("mytoken123"); + }); + + test("bearer_token is appended to url search params when token is provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_TOKEN, "mytoken123"], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.url?.searchParams.get("bearer_token")).toBe("mytoken123"); + }); + + test("bearer_token is appended to sql_url search params when token is provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_TOKEN, "mytoken123"], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.sql_url?.searchParams.get("bearer_token")).toBe("mytoken123"); + }); + + test("sql_url pathname is /ws when token is provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_TOKEN, "mytoken123"], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.sql_url?.pathname).toBe("/ws"); + }); + + test("url pathname is /rest/tmq when token is provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_TOKEN, "mytoken123"], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.url?.pathname).toBe("/rest/tmq"); + }); + + test("bearer_token not set on urls when token is not provided", () => { + const configMap = new Map([ + [TMQConstants.WS_URL, baseUrl], + [TMQConstants.CONNECT_USER, testUsername()], + [TMQConstants.CONNECT_PASS, testPassword()], + [TMQConstants.GROUP_ID, "g1"], + ]); + const cfg = new TmqConfig(configMap); + expect(cfg.url?.searchParams.has("bearer_token")).toBe(false); + expect(cfg.sql_url?.searchParams.has("bearer_token")).toBe(false); + }); + + test("CONNECT_TOKEN constant value is td.connect.token", () => { + expect(TMQConstants.CONNECT_TOKEN).toBe("td.connect.token"); + }); +}); diff --git a/nodejs/test/bulkPulling/tmq.test.ts b/nodejs/test/bulkPulling/tmq.test.ts index 447bc613..7f844ff3 100644 --- a/nodejs/test/bulkPulling/tmq.test.ts +++ b/nodejs/test/bulkPulling/tmq.test.ts @@ -2,20 +2,19 @@ import { TMQConstants } from "../../src/tmq/constant"; import { WsConsumer } from "../../src/tmq/wsTmq"; import { WSConfig } from "../../src/common/config"; import { WsSql } from "../../src/sql/wsSql"; -import { createSTable, insertStable, testPassword, testUsername } from "../utils"; +import { createSTable, insertStable, testPassword, testUsername, Sleep, testEnterprise } from "../utils"; import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool"; import { setLevel } from "../../src/common/log"; setLevel("debug"); + const stable = "st"; const db = "ws_tmq_test"; const topics: string[] = ["topic_ws_bean"]; -// const topic2 = 'topic_ws_bean_2' -// let createTopic = `create topic if not exists ${topic} as select ts, c1, c2, c3, c4, c5, t1 from ${db}.${stable}` -// let createTopic2 = `create topic if not exists ${topic2} as select ts, c1, c4, c5, t1 from ${db}.${stable}` +const tokenTopic = "topic_token_test"; + let createTopic = `create topic if not exists ${topics[0]} as select * from ${db}.${stable}`; let dropTopic = `DROP TOPIC IF EXISTS ${topics[0]};`; -// let dropTopic2 = `DROP TOPIC IF EXISTS ${topic2};` let dsn = `ws://${testUsername()}:${testPassword()}@localhost:6041`; let tmqDsn = "ws://localhost:6041"; @@ -240,17 +239,17 @@ beforeAll(async () => { let ws = await WsSql.open(conf); await ws.exec(dropTopic); - // await ws.Exec(dropTopic2); await ws.exec(dropDB); await ws.exec(createDB); await ws.exec(useDB); await ws.exec(createSTable(stable)); await ws.exec(createTopic); - // await ws.Exec(createTopic2); let insert = insertStable(tableValues, stableTags, stable); let insertRes = await ws.exec(insert); insert = insertStable(tableCNValues, stableTags, stable); insertRes = await ws.exec(insert); + await ws.exec("create user tmq_token_user pass 'token_pass_1'"); + await ws.exec(`create topic if not exists ${tokenTopic} as select * from ${db}.${stable}`); await ws.close(); }); @@ -331,7 +330,6 @@ describe("TDWebSocket.Tmq()", () => { console.log("-----===>>", record); } } - // await Sleep(100) } await consumer.seekToBeginning(assignment); @@ -350,7 +348,6 @@ describe("TDWebSocket.Tmq()", () => { counts += data.length; } - // await Sleep(100) } let topicArray = await consumer.subscription(); expect(topics.length).toEqual(topicArray.length); @@ -398,14 +395,81 @@ describe("TDWebSocket.Tmq()", () => { await consumer.unsubscribe(); await consumer.close(); }); + + testEnterprise("connect with token", async () => { + const conf = new WSConfig(dsn); + conf.setUser(testUsername()); + conf.setPwd(testPassword()); + const wsSql = await WsSql.open(conf); + const wsRows = await wsSql.query("create token test_tmq_token from user tmq_token_user"); + await wsRows.next(); + const token = wsRows.getData()?.[0] as string; + expect(token).toBeTruthy(); + await wsRows.close(); + await wsSql.close(); + + const tokenConfigMap = new Map(configMap); + tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, token); + tokenConfigMap.set(TMQConstants.GROUP_ID, "token_group"); + const consumer = await WsConsumer.newConsumer(tokenConfigMap); + await consumer.subscribe([tokenTopic]); + + let count: number = 0; + for (let i = 0; i < 5; i++) { + const res = await consumer.poll(500); + for (const [, value] of res) { + const data = value.getData(); + if (data == null || data.length == 0) { + break; + } + count += data.length; + } + } + expect(count).toEqual(10); + + await Sleep(3000); + await consumer.unsubscribe(); + await consumer.close(); + }); + + testEnterprise("connect with invalid token", async () => { + const tokenConfigMap = new Map([ + [TMQConstants.GROUP_ID, "token_group1"], + [TMQConstants.CLIENT_ID, "token_client1"], + [TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=invalid_token"], + ]); + await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid token/i), + }); + + tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041"); + tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, "invalid_token1"); + await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid token/i), + }); + + tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041?bearer_token="); + tokenConfigMap.delete(TMQConstants.CONNECT_TOKEN); + await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid url/i), + }); + + tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041"); + tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, ""); + await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({ + message: expect.stringMatching(/invalid url/i), + }); + }); }); afterAll(async () => { const dropDB = `drop database if exists ${db}`; - let conf: WSConfig = new WSConfig(dsn); - let ws = await WsSql.open(conf); + const conf = new WSConfig(dsn); + const ws = await WsSql.open(conf); await ws.exec(dropTopic); + await ws.exec(`drop topic if exists ${tokenTopic}`); await ws.exec(dropDB); + await ws.exec("drop user tmq_token_user"); await ws.close(); WebSocketConnectionPool.instance().destroyed(); }); diff --git a/nodejs/test/utils.ts b/nodejs/test/utils.ts index f785876d..2f7a258e 100644 --- a/nodejs/test/utils.ts +++ b/nodejs/test/utils.ts @@ -181,6 +181,7 @@ export const jsonMeta: Array = [ length: 4095, }, ]; + export const tagMeta: Array = [ { name: "tb", @@ -330,3 +331,5 @@ export function testUsername(): string { export function testPassword(): string { return process.env.TDENGINE_TEST_PASSWORD || "taosdata"; } + +export const testEnterprise = process.env.TEST_ENTERPRISE?.toLowerCase() === "true" ? test : test.skip;