From a0b054bbf9d3c4d97e1403740be3905ebdd56eff Mon Sep 17 00:00:00 2001 From: qevolg <2227465945@qq.com> Date: Tue, 10 Mar 2026 11:49:12 +0800 Subject: [PATCH 1/2] feat: add multi-address load balancing and failover support - Add multi-address URL parser (common/urlParser.ts) supporting IPv4/IPv6 Format: ws://user:pass@host1:port1,host2:port2,[::1]:port3?retries=5 - Add ConnectionManager (client/wsConnectionManager.ts) with: - Random host selection for initial connection - Failover with round-robin across hosts - Exponential backoff retry (retries, retry_backoff_ms, retry_backoff_max_ms) - Inflight request tracking and resend on reconnect - Enhance WebSocketConnector with onClose callback for fault detection - Refactor WsClient to use ConnectionManager for multi-address connections - Update WSConfig to carry ParsedMultiAddress - Update getUrl() to parse multi-address URLs - Update TmqConfig to support multi-address URLs - Add unit tests for URL parser and ConnectionManager Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- nodejs/src/client/wsClient.ts | 353 +++++++++++++---------- nodejs/src/client/wsConnectionManager.ts | 314 ++++++++++++++++++++ nodejs/src/client/wsConnector.ts | 43 ++- nodejs/src/common/config.ts | 12 +- nodejs/src/common/urlParser.ts | 250 ++++++++++++++++ nodejs/src/common/utils.ts | 46 ++- nodejs/src/sql/wsSql.ts | 7 +- nodejs/src/tmq/config.ts | 19 +- nodejs/src/tmq/wsTmq.ts | 7 +- nodejs/test/connectionManager.test.ts | 63 ++++ nodejs/test/urlParser.test.ts | 137 +++++++++ 11 files changed, 1081 insertions(+), 170 deletions(-) create mode 100644 nodejs/src/client/wsConnectionManager.ts create mode 100644 nodejs/src/common/urlParser.ts create mode 100644 nodejs/test/connectionManager.test.ts create mode 100644 nodejs/test/urlParser.test.ts diff --git a/nodejs/src/client/wsClient.ts b/nodejs/src/client/wsClient.ts index aee82d93..b64ed8a6 100644 --- a/nodejs/src/client/wsClient.ts +++ b/nodejs/src/client/wsClient.ts @@ -1,6 +1,7 @@ import JSONBig from "json-bigint"; import { WebSocketConnector } from "./wsConnector"; import { WebSocketConnectionPool } from "./wsConnectorPool"; +import { ConnectionManager } from "./wsConnectionManager"; import { ErrorCode, TDWebSocketClientError, @@ -11,6 +12,7 @@ import { WSVersionResponse, WSQueryResponse } from "./wsResponse"; import { ReqId } from "../common/reqid"; import logger from "../common/log"; import { safeDecodeURIComponent, compareVersions, maskSensitiveForLog, maskUrlForLog } from "../common/utils"; +import { ParsedMultiAddress, buildUrlForHost } from "../common/urlParser"; import { w3cwebsocket } from "websocket"; import { ConnectorInfo, TSDB_OPTION_CONNECTION } from "../common/constant"; @@ -22,11 +24,16 @@ export class WsClient { private static readonly _minVersion = "3.3.2.0"; private _version?: string | undefined | null; private _bearerToken?: string | undefined | null; + private _connMgr?: ConnectionManager; + private _parsedMultiAddress?: ParsedMultiAddress; + private _database?: string | null; + private _inflightIdCounter = 0; - constructor(url: URL, timeout?: number | undefined | null) { + constructor(url: URL, timeout?: number | undefined | null, parsedMultiAddress?: ParsedMultiAddress) { this.checkURL(url); this._url = url; this._timeout = timeout; + this._parsedMultiAddress = parsedMultiAddress; if (this._url.searchParams.has("timezone")) { this._timezone = this._url.searchParams.get("timezone") || undefined; this._url.searchParams.delete("timezone"); @@ -34,9 +41,90 @@ export class WsClient { if (this._url.searchParams.has("bearer_token")) { this._bearerToken = this._url.searchParams.get("bearer_token") || undefined; } + + // Initialize ConnectionManager if multi-address is provided + if (this._parsedMultiAddress && this._parsedMultiAddress.hosts.length > 0) { + this._connMgr = new ConnectionManager(this._parsedMultiAddress, this._timeout); + } + } + + private _nextInflightId(): string { + return `inflight_${++this._inflightIdCounter}`; + } + + /** + * Ensure we have an active WebSocket connector, performing failover if needed. + */ + private async _ensureConnector(): Promise { + // If ConnectionManager is available, use it + if (this._connMgr) { + if (this._connMgr.isConnected()) { + return this._connMgr.getConnector()!; + } + // Try reconnect with failover + const connector = await this._connMgr.reconnect(); + this._wsConnector = connector; + // Re-authenticate after reconnect + await this._sendConnMsg(this._database); + return connector; + } + + // Fallback: original single-URL behavior + if (this._wsConnector && this._wsConnector.readyState() === w3cwebsocket.OPEN) { + return this._wsConnector; + } + throw new TDWebSocketClientError( + ErrorCode.ERR_CONNECTION_CLOSED, + "invalid websocket connect" + ); + } + + private async _sendConnMsg(database?: string | null): Promise { + const connector = this._connMgr ? this._connMgr.getConnector()! : this._wsConnector!; + let connMsg = { + action: "conn", + args: { + req_id: ReqId.getReqID(), + user: safeDecodeURIComponent(this._url.username), + password: safeDecodeURIComponent(this._url.password), + db: database, + connector: ConnectorInfo, + ...(this._timezone && { tz: this._timezone }), + ...(this._bearerToken && { bearer_token: this._bearerToken }), + }, + }; + if (logger.isDebugEnabled()) { + logger.debug("[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg, (key, value) => + (key === "password" || key === "bearer_token") ? "[REDACTED]" : value + )); + } + let result: any = await connector.sendMsg(JSON.stringify(connMsg)); + if (result.msg.code != 0) { + throw new WebSocketQueryError(result.msg.code, result.msg.message); + } } async connect(database?: string | undefined | null): Promise { + this._database = database; + + if (this._connMgr) { + // Multi-address: use ConnectionManager + try { + this._wsConnector = await this._connMgr.connect(); + await this._sendConnMsg(database); + return; + } catch (e: any) { + await this.close(); + const maskedUrl = maskUrlForLog(this._url); + logger.error(`connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}`); + throw new TDWebSocketClientError( + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + `connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}` + ); + } + } + + // Original single-URL path let connMsg = { action: "conn", args: { @@ -104,58 +192,51 @@ export class WsClient { async execNoResp(queryMsg: string): Promise { logger.debug("[wsQueryInterface.query.queryMsg]===>" + queryMsg); - if ( - this._wsConnector && - this._wsConnector.readyState() === w3cwebsocket.OPEN - ) { - await this._wsConnector.sendMsgNoResp(queryMsg); - return; - } - throw new TDWebSocketClientError( - ErrorCode.ERR_CONNECTION_CLOSED, - "invalid websocket connect" - ); + const connector = await this._ensureConnector(); + await connector.sendMsgNoResp(queryMsg); } // Need to construct Response async exec(queryMsg: string, bSqlQuery: boolean = true): Promise { + if (logger.isDebugEnabled()) { + logger.debug("[wsQueryInterface.query.queryMsg]===>" + maskSensitiveForLog(queryMsg)); + } + const connector = await this._ensureConnector(); + const inflightId = this._nextInflightId(); + return new Promise((resolve, reject) => { - if (logger.isDebugEnabled()) { - logger.debug("[wsQueryInterface.query.queryMsg]===>" + maskSensitiveForLog(queryMsg)); - } - if ( - this._wsConnector && - this._wsConnector.readyState() === w3cwebsocket.OPEN - ) { - this._wsConnector - .sendMsg(queryMsg) - .then((e: any) => { - if (e.msg.code == 0) { - if (bSqlQuery) { - resolve(new WSQueryResponse(e)); - } else { - resolve(e); - } - } else { - reject( - new WebSocketInterfaceError( - e.msg.code, - e.msg.message - ) - ); - } - }) - .catch((e) => { - reject(e); - }); - } else { - reject( - new TDWebSocketClientError( - ErrorCode.ERR_CONNECTION_CLOSED, - "invalid websocket connect" - ) - ); + const wrappedResolve = (e: any) => { + if (this._connMgr) this._connMgr.completeRequest(inflightId); + if (e.msg.code == 0) { + if (bSqlQuery) { + resolve(new WSQueryResponse(e)); + } else { + resolve(e); + } + } else { + reject(new WebSocketInterfaceError(e.msg.code, e.msg.message)); + } + }; + const wrappedReject = (e: any) => { + if (this._connMgr) this._connMgr.completeRequest(inflightId); + reject(e); + }; + + if (this._connMgr) { + this._connMgr.trackRequest(inflightId, { + id: inflightId, + type: "text", + message: queryMsg, + resolve: wrappedResolve, + reject: wrappedReject, + register: true, + }); } + + connector + .sendMsg(queryMsg) + .then(wrappedResolve) + .catch(wrappedReject); }); } @@ -167,48 +248,55 @@ export class WsClient { bSqlQuery: boolean = true, bResultBinary: boolean = false ): Promise { + const connector = await this._ensureConnector(); + const inflightId = this._nextInflightId(); + return new Promise((resolve, reject) => { - if ( - this._wsConnector && - this._wsConnector.readyState() === w3cwebsocket.OPEN - ) { - this._wsConnector - .sendBinaryMsg(reqId, action, message) - .then((e: any) => { - if (bResultBinary) { - resolve(e); - } + const wrappedResolve = (e: any) => { + if (this._connMgr) this._connMgr.completeRequest(inflightId); + if (bResultBinary) { + resolve(e); + return; + } + if (e.msg.code == 0) { + if (bSqlQuery) { + resolve(new WSQueryResponse(e)); + } else { + resolve(e); + } + } else { + reject(new WebSocketInterfaceError(e.msg.code, e.msg.message)); + } + }; + const wrappedReject = (e: any) => { + if (this._connMgr) this._connMgr.completeRequest(inflightId); + reject(e); + }; - if (e.msg.code == 0) { - if (bSqlQuery) { - resolve(new WSQueryResponse(e)); - } else { - resolve(e); - } - } else { - reject( - new WebSocketInterfaceError( - e.msg.code, - e.msg.message - ) - ); - } - }) - .catch((e) => { - reject(e); - }); - } else { - reject( - new TDWebSocketClientError( - ErrorCode.ERR_CONNECTION_CLOSED, - "invalid websocket connect" - ) - ); + if (this._connMgr) { + this._connMgr.trackRequest(inflightId, { + id: inflightId, + type: "binary", + reqId, + action, + binaryData: message, + resolve: wrappedResolve, + reject: wrappedReject, + register: true, + }); } + + connector + .sendBinaryMsg(reqId, action, message) + .then(wrappedResolve) + .catch(wrappedReject); }); } getState() { + if (this._connMgr) { + return this._connMgr.isConnected() ? w3cwebsocket.OPEN : w3cwebsocket.CLOSED; + } if (this._wsConnector) { return this._wsConnector.readyState(); } @@ -216,6 +304,13 @@ export class WsClient { } async ready(): Promise { + if (this._connMgr) { + if (!this._connMgr.isConnected()) { + this._wsConnector = await this._connMgr.reconnect(); + } + return; + } + try { this._wsConnector = await WebSocketConnectionPool.instance().getConnection( this._url, @@ -241,27 +336,9 @@ export class WsClient { } async sendMsg(msg: string): Promise { - return new Promise((resolve, reject) => { - logger.debug("[wsQueryInterface.sendMsg]===>" + msg); - if ( - this._wsConnector && - this._wsConnector.readyState() === w3cwebsocket.OPEN - ) { - this._wsConnector - .sendMsg(msg) - .then((e: any) => { - resolve(e); - }) - .catch((e) => reject(e)); - } else { - reject( - new TDWebSocketClientError( - ErrorCode.ERR_CONNECTION_CLOSED, - "invalid websocket connect" - ) - ); - } - }); + logger.debug("[wsQueryInterface.sendMsg]===>" + msg); + const connector = await this._ensureConnector(); + return connector.sendMsg(msg); } async freeResult(res: WSQueryResponse) { @@ -272,30 +349,12 @@ export class WsClient { id: res.id, }, }; - return new Promise((resolve, reject) => { - let jsonStr = JSONBig.stringify(freeResultMsg); - logger.debug( - "[wsQueryInterface.freeResult.freeResultMsg]===>" + jsonStr - ); - if ( - this._wsConnector && - this._wsConnector.readyState() === w3cwebsocket.OPEN - ) { - this._wsConnector - .sendMsgNoResp(jsonStr) - .then((e: any) => { - resolve(e); - }) - .catch((e) => reject(e)); - } else { - reject( - new TDWebSocketClientError( - ErrorCode.ERR_CONNECTION_CLOSED, - "invalid websocket connect" - ) - ); - } - }); + let jsonStr = JSONBig.stringify(freeResultMsg); + logger.debug( + "[wsQueryInterface.freeResult.freeResultMsg]===>" + jsonStr + ); + const connector = await this._ensureConnector(); + await connector.sendMsgNoResp(jsonStr); } async version(): Promise { @@ -310,31 +369,31 @@ export class WsClient { }, }; - if (this._wsConnector) { - try { - if (this._wsConnector.readyState() !== w3cwebsocket.OPEN) { - await this._wsConnector.ready(); - } - let result: any = await this._wsConnector.sendMsg(JSONBig.stringify(versionMsg)); - if (result.msg.code == 0) { - return new WSVersionResponse(result).version; - } - throw new WebSocketInterfaceError(result.msg.code, result.msg.message); - } catch (e: any) { - const maskedUrl = maskUrlForLog(this._url); - logger.error( - `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` - ); - throw new TDWebSocketClientError( - ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` - ); + try { + const connector = await this._ensureConnector(); + let result: any = await connector.sendMsg(JSONBig.stringify(versionMsg)); + if (result.msg.code == 0) { + return new WSVersionResponse(result).version; } + throw new WebSocketInterfaceError(result.msg.code, result.msg.message); + } catch (e: any) { + const maskedUrl = maskUrlForLog(this._url); + logger.error( + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` + ); + throw new TDWebSocketClientError( + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` + ); } - throw (ErrorCode.ERR_CONNECTION_CLOSED, "invalid websocket connect"); } async close(): Promise { + if (this._connMgr) { + await this._connMgr.close(); + this._wsConnector = undefined; + return; + } if (this._wsConnector) { await WebSocketConnectionPool.instance().releaseConnection( this._wsConnector diff --git a/nodejs/src/client/wsConnectionManager.ts b/nodejs/src/client/wsConnectionManager.ts new file mode 100644 index 00000000..1b4a2a86 --- /dev/null +++ b/nodejs/src/client/wsConnectionManager.ts @@ -0,0 +1,314 @@ +import { Mutex } from "async-mutex"; +import { w3cwebsocket, ICloseEvent } from "websocket"; +import { WebSocketConnector } from "./wsConnector"; +import { WebSocketConnectionPool } from "./wsConnectorPool"; +import { WsEventCallback, OnMessageType } from "./wsEventCallback"; +import { + ParsedMultiAddress, + buildUrlForHost, +} from "../common/urlParser"; +import { + ErrorCode, + TDWebSocketClientError, +} from "../common/wsError"; +import { ReqId } from "../common/reqid"; +import logger from "../common/log"; +import { maskUrlForLog } from "../common/utils"; + +export interface InflightRequest { + id: string; + type: "text" | "binary"; + // for text requests + message?: string; + // for binary requests + reqId?: bigint; + action?: string; + binaryData?: ArrayBuffer; + // callback + resolve: (args: unknown) => void; + reject: (reason: any) => void; + register: boolean; +} + +export class ConnectionManager { + private _parsed: ParsedMultiAddress; + private _currentIndex: number; + private _timeout: number; + private _connector: WebSocketConnector | null = null; + private _inflightRequests: Map = new Map(); + private _isReconnecting: boolean = false; + private _reconnectPromise: Promise | null = null; + private _reconnectMutex = new Mutex(); + private _closed: boolean = false; + + constructor(parsed: ParsedMultiAddress, timeout?: number | null) { + this._parsed = parsed; + this._currentIndex = Math.floor(Math.random() * parsed.hosts.length); + this._timeout = timeout || 5000; + } + + /** + * Establish the initial connection using a random host. + */ + async connect(): Promise { + const connector = await this._createConnector(this._currentIndex); + this._connector = connector; + this._setupCloseHandler(connector); + return connector; + } + + /** + * Get current connector, or null if disconnected. + */ + getConnector(): WebSocketConnector | null { + return this._connector; + } + + /** + * Check if the current connection is open. + */ + isConnected(): boolean { + return ( + this._connector !== null && + this._connector.readyState() === w3cwebsocket.OPEN + ); + } + + /** + * Perform failover: retry current host, then round-robin through others. + * Returns a connected WebSocketConnector or throws if all fail. + */ + async reconnect(): Promise { + const release = await this._reconnectMutex.acquire(); + try { + // If already reconnected by another caller, return existing connector + if (this.isConnected()) { + return this._connector!; + } + + if (this._closed) { + throw new TDWebSocketClientError( + ErrorCode.ERR_CONNECTION_CLOSED, + "ConnectionManager is closed" + ); + } + + this._isReconnecting = true; + const totalHosts = this._parsed.hosts.length; + + // Try each host starting from current + for (let i = 0; i < totalHosts; i++) { + const hostIndex = + (this._currentIndex + i) % totalHosts; + const connector = await this._tryConnectWithRetries(hostIndex); + if (connector) { + // Clean up old connector + if (this._connector) { + try { this._connector.close(); } catch (_) {} + } + this._connector = connector; + this._currentIndex = hostIndex; + this._setupCloseHandler(connector); + this._isReconnecting = false; + + // Resend inflight requests + await this._resendInflightRequests(connector); + return connector; + } + } + + this._isReconnecting = false; + throw new TDWebSocketClientError( + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + `Failover failed: all ${totalHosts} hosts exhausted after retries` + ); + } finally { + release(); + } + } + + /** + * Track an inflight request for potential resend on reconnect. + */ + trackRequest(id: string, request: InflightRequest): void { + this._inflightRequests.set(id, request); + } + + /** + * Remove a completed request from inflight tracking. + */ + completeRequest(id: string): void { + this._inflightRequests.delete(id); + } + + /** + * Get inflight request count (for diagnostics). + */ + getInflightCount(): number { + return this._inflightRequests.size; + } + + /** + * Close the connection manager and underlying connector. + */ + async close(): Promise { + this._closed = true; + if (this._connector) { + await WebSocketConnectionPool.instance().releaseConnection( + this._connector + ); + this._connector = null; + } + this._inflightRequests.clear(); + } + + /** + * Get the current host URL. + */ + getCurrentUrl(): URL { + return buildUrlForHost(this._parsed, this._currentIndex); + } + + /** + * Get parsed multi-address info. + */ + getParsed(): ParsedMultiAddress { + return this._parsed; + } + + /** + * Try connecting to a specific host with exponential backoff retries. + */ + private async _tryConnectWithRetries( + hostIndex: number + ): Promise { + const retries = this._parsed.retries; + const baseBackoff = this._parsed.retryBackoffMs; + const maxBackoff = this._parsed.retryBackoffMaxMs; + const url = buildUrlForHost(this._parsed, hostIndex); + + for (let attempt = 0; attempt <= retries; attempt++) { + if (this._closed) return null; + + try { + logger.info( + `Attempting connection to ${maskUrlForLog(url)}, attempt ${attempt + 1}/${retries + 1}` + ); + const connector = await this._createConnector(hostIndex); + logger.info( + `Successfully connected to ${maskUrlForLog(url)}` + ); + return connector; + } catch (e: any) { + logger.warn( + `Connection attempt ${attempt + 1} to ${maskUrlForLog(url)} failed: ${e.message}` + ); + if (attempt < retries) { + const backoff = Math.min( + baseBackoff * Math.pow(2, attempt), + maxBackoff + ); + await this._sleep(backoff); + } + } + } + return null; + } + + /** + * Create a WebSocketConnector for a specific host and wait until it's ready. + */ + private async _createConnector( + hostIndex: number + ): Promise { + const url = buildUrlForHost(this._parsed, hostIndex); + const connector = await WebSocketConnectionPool.instance().getConnection( + url, + this._timeout + ); + if (connector.readyState() !== w3cwebsocket.OPEN) { + await connector.ready(); + } + return connector; + } + + /** + * Set up the onclose handler to detect disconnections and trigger failover. + */ + private _setupCloseHandler(connector: WebSocketConnector): void { + connector.onClose((_event: ICloseEvent) => { + if (this._closed) return; + logger.warn( + "WebSocket connection closed, failover will be triggered on next operation" + ); + }); + } + + /** + * Resend all inflight requests after a successful reconnect. + */ + private async _resendInflightRequests( + connector: WebSocketConnector + ): Promise { + if (this._inflightRequests.size === 0) return; + + logger.info( + `Resending ${this._inflightRequests.size} inflight requests after reconnect` + ); + + const requests = Array.from(this._inflightRequests.entries()); + for (const [id, req] of requests) { + try { + if (req.type === "text" && req.message) { + if (req.register) { + const msg = JSON.parse(req.message); + WsEventCallback.instance().registerCallback( + { + action: msg.action, + req_id: msg.args.req_id, + timeout: connector._timeout, + id: + msg.args.id === undefined + ? msg.args.id + : BigInt(msg.args.id), + }, + req.resolve, + req.reject + ); + } + connector.sendMsgNoResp(req.message); + } else if ( + req.type === "binary" && + req.binaryData && + req.reqId !== undefined && + req.action + ) { + if (req.register) { + WsEventCallback.instance().registerCallback( + { + action: req.action, + req_id: req.reqId, + timeout: connector._timeout, + id: req.reqId, + }, + req.resolve, + req.reject + ); + } + connector.sendBinaryMsgRaw(req.binaryData); + } + logger.debug(`Resent inflight request ${id}`); + } catch (e: any) { + logger.error( + `Failed to resend inflight request ${id}: ${e.message}` + ); + req.reject(e); + this._inflightRequests.delete(id); + } + } + } + + private _sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } +} diff --git a/nodejs/src/client/wsConnector.ts b/nodejs/src/client/wsConnector.ts index eb49e8a4..347877d8 100644 --- a/nodejs/src/client/wsConnector.ts +++ b/nodejs/src/client/wsConnector.ts @@ -10,9 +10,11 @@ import { ReqId } from "../common/reqid"; import { maskSensitiveForLog, maskUrlForLog } from "../common/utils"; export class WebSocketConnector { + // 底层 ws 连接 private _wsConn: w3cwebsocket; private _wsURL: URL; _timeout = 5000; + private _onCloseCallbacks: Array<(event: ICloseEvent) => void> = []; constructor(url: URL, timeout: number | undefined | null) { if (url) { @@ -37,7 +39,12 @@ export class WebSocketConnector { this._wsConn.onerror = function (err: Error) { logger.error(`webSocket connection failed, url: ${maskUrlForLog(new URL(this.url))}, error: ${err.message}`); }; - this._wsConn.onclose = this._onclose; + this._wsConn.onclose = (e: ICloseEvent) => { + this._onclose(e); + for (const cb of this._onCloseCallbacks) { + try { cb(e); } catch (_) { /* ignore callback errors */ } + } + }; this._wsConn.onmessage = this._onmessage; this._wsConn._binaryType = "arraybuffer"; } else { @@ -48,6 +55,15 @@ export class WebSocketConnector { } } + /** + * Register a callback to be invoked when the underlying WebSocket closes. + */ + onClose(callback: (event: ICloseEvent) => void): void { + this._onCloseCallbacks.push(callback); + } + + // ??? + // 连接建立成功后,发送连接成功的消息,触发后续的事件回调 async ready() { return new Promise((resolve, reject) => { let reqId = ReqId.getReqID(); @@ -107,6 +123,7 @@ export class WebSocketConnector { } } + // 关闭底层 ws 连接 close() { if (this._wsConn) { this._wsConn.close(); @@ -154,6 +171,13 @@ export class WebSocketConnector { } return new Promise((resolve, reject) => { + /* + w3c 连接有状态 + CONNECTING: number; + OPEN: number; + CLOSING: number; + CLOSED: number; + */ if (this._wsConn && this._wsConn.readyState === w3cwebsocket.OPEN) { if (register) { WsEventCallback.instance().registerCallback( @@ -172,6 +196,9 @@ export class WebSocketConnector { } this._wsConn.send(message); } else { + // 连接重建,重试机制 + // reconnect 后重新发送请求 + // 有没有 inflight 请求?如果有需要添加缓存机制,重试时重新发送 reject( new WebSocketQueryError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, @@ -220,6 +247,20 @@ export class WebSocketConnector { }); } + /** + * Send raw binary data without callback registration (used for inflight resend). + */ + sendBinaryMsgRaw(message: ArrayBuffer): void { + if (this._wsConn && this._wsConn.readyState === w3cwebsocket.OPEN) { + this._wsConn.send(message); + } else { + throw new WebSocketQueryError( + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + `WebSocket connection is not ready, status: ${this._wsConn?.readyState}` + ); + } + } + public getWsURL(): URL { return this._wsURL; } diff --git a/nodejs/src/common/config.ts b/nodejs/src/common/config.ts index 34ebb318..deb66ed5 100644 --- a/nodejs/src/common/config.ts +++ b/nodejs/src/common/config.ts @@ -1,15 +1,17 @@ import { MinStmt2Version } from "./constant"; +import { ParsedMultiAddress } from "./urlParser"; export class WSConfig { private _user: string | undefined | null; private _password: string | undefined | null; private _db: string | undefined | null; - private _url: string; + private _url: string; // 支持多地址 private _timeout: number | undefined | null; private _token: string | undefined | null; private _timezone: string | undefined | null; private _minStmt2Version: string; private _bearerToken: string | undefined | null; + private _parsedMultiAddress: ParsedMultiAddress | undefined; constructor(url: string, minStmt2Version?: string) { this._url = url; @@ -87,4 +89,12 @@ export class WSConfig { public getMinStmt2Version() { return this._minStmt2Version; } + + public getParsedMultiAddress(): ParsedMultiAddress | undefined { + return this._parsedMultiAddress; + } + + public setParsedMultiAddress(parsed: ParsedMultiAddress) { + this._parsedMultiAddress = parsed; + } } diff --git a/nodejs/src/common/urlParser.ts b/nodejs/src/common/urlParser.ts new file mode 100644 index 00000000..b134c348 --- /dev/null +++ b/nodejs/src/common/urlParser.ts @@ -0,0 +1,250 @@ +import { ErrorCode, TDWebSocketClientError } from "./wsError"; + +export interface HostPort { + host: string; + port: number; +} + +export interface ParsedMultiAddress { + scheme: string; + username: string; + password: string; + hosts: HostPort[]; + pathname: string; + searchParams: URLSearchParams; + retries: number; + retryBackoffMs: number; + retryBackoffMaxMs: number; +} + +const DEFAULT_RETRIES = 5; +const DEFAULT_RETRY_BACKOFF_MS = 200; +const DEFAULT_RETRY_BACKOFF_MAX_MS = 2000; + +/** + * Parse a multi-address URL string. + * Format: ws://user:pass@host1:port1,host2:port2,[::1]:port3/path?key=val + * Supports IPv4, IPv6 (bracketed), and hostnames. + */ +export function parseMultiAddressUrl(urlStr: string): ParsedMultiAddress { + if (!urlStr) { + throw new TDWebSocketClientError( + ErrorCode.ERR_INVALID_URL, + "URL string is empty" + ); + } + + // Extract scheme + const schemeMatch = urlStr.match(/^(wss?):\/\//i); + if (!schemeMatch) { + throw new TDWebSocketClientError( + ErrorCode.ERR_INVALID_URL, + `Invalid URL scheme: ${urlStr}` + ); + } + const scheme = schemeMatch[1].toLowerCase(); + let remaining = urlStr.slice(schemeMatch[0].length); + + // Extract userinfo (user:pass@) + let username = ""; + let password = ""; + const atIndex = findUserInfoEnd(remaining); + if (atIndex !== -1) { + const userInfo = remaining.slice(0, atIndex); + remaining = remaining.slice(atIndex + 1); + const colonIndex = userInfo.indexOf(":"); + if (colonIndex !== -1) { + username = userInfo.slice(0, colonIndex); + password = userInfo.slice(colonIndex + 1); + } else { + username = userInfo; + } + } + + // Split host part from path+query + // Find the first '/' or '?' that is NOT inside brackets + let hostEnd = findHostEnd(remaining); + const hostPart = remaining.slice(0, hostEnd); + remaining = remaining.slice(hostEnd); + + // Parse hosts + const hosts = parseHosts(hostPart); + if (hosts.length === 0) { + throw new TDWebSocketClientError( + ErrorCode.ERR_INVALID_URL, + `No valid host found in URL: ${urlStr}` + ); + } + + // Parse path and query + let pathname = "/"; + let search = ""; + const queryIndex = remaining.indexOf("?"); + if (queryIndex !== -1) { + pathname = remaining.slice(0, queryIndex) || "/"; + search = remaining.slice(queryIndex + 1); + } else { + pathname = remaining || "/"; + } + + const searchParams = new URLSearchParams(search); + + // Extract failover params + const retries = extractIntParam(searchParams, "retries", DEFAULT_RETRIES); + const retryBackoffMs = extractIntParam(searchParams, "retry_backoff_ms", DEFAULT_RETRY_BACKOFF_MS); + const retryBackoffMaxMs = extractIntParam(searchParams, "retry_backoff_max_ms", DEFAULT_RETRY_BACKOFF_MAX_MS); + + // Remove failover params from searchParams so they don't leak to ws URL + searchParams.delete("retries"); + searchParams.delete("retry_backoff_ms"); + searchParams.delete("retry_backoff_max_ms"); + + return { + scheme, + username, + password, + hosts, + pathname, + searchParams, + retries, + retryBackoffMs, + retryBackoffMaxMs, + }; +} + +/** + * Build a standard URL object for a specific host index from ParsedMultiAddress. + */ +export function buildUrlForHost(parsed: ParsedMultiAddress, hostIndex: number): URL { + const hp = parsed.hosts[hostIndex]; + const isIPv6 = hp.host.includes(":"); + const hostStr = isIPv6 ? `[${hp.host}]` : hp.host; + const base = `${parsed.scheme}://${encodeURIComponent(parsed.username)}:${encodeURIComponent(parsed.password)}@${hostStr}:${hp.port}${parsed.pathname}`; + const url = new URL(base); + parsed.searchParams.forEach((value, key) => { + url.searchParams.set(key, value); + }); + return url; +} + +/** + * Find the '@' that ends userinfo. Must not be inside [] brackets. + */ +function findUserInfoEnd(str: string): number { + let inBracket = false; + for (let i = 0; i < str.length; i++) { + const ch = str[i]; + if (ch === "[") inBracket = true; + else if (ch === "]") inBracket = false; + else if (ch === "@" && !inBracket) return i; + else if (ch === "/" || ch === "?" || ch === "#") { + // No userinfo + return -1; + } + } + return -1; +} + +/** + * Find where the host section ends (first '/' or '?' not inside brackets). + */ +function findHostEnd(str: string): number { + let inBracket = false; + for (let i = 0; i < str.length; i++) { + const ch = str[i]; + if (ch === "[") inBracket = true; + else if (ch === "]") inBracket = false; + else if ((ch === "/" || ch === "?") && !inBracket) return i; + } + return str.length; +} + +/** + * Parse comma-separated host:port entries. + * Supports: hostname:port, IPv4:port, [IPv6]:port + * If port is omitted, defaults to 6041. + */ +function parseHosts(hostPart: string): HostPort[] { + if (!hostPart) return []; + + const results: HostPort[] = []; + const DEFAULT_PORT = 6041; + + // Split by comma, respecting brackets + const segments = splitHostSegments(hostPart); + + for (const seg of segments) { + const trimmed = seg.trim(); + if (!trimmed) continue; + + if (trimmed.startsWith("[")) { + // IPv6: [::1]:port or [::1] + const closeBracket = trimmed.indexOf("]"); + if (closeBracket === -1) { + throw new TDWebSocketClientError( + ErrorCode.ERR_INVALID_URL, + `Invalid IPv6 address: ${trimmed}` + ); + } + const host = trimmed.slice(1, closeBracket); + const afterBracket = trimmed.slice(closeBracket + 1); + let port = DEFAULT_PORT; + if (afterBracket.startsWith(":")) { + port = parseInt(afterBracket.slice(1), 10); + if (isNaN(port)) { + throw new TDWebSocketClientError( + ErrorCode.ERR_INVALID_URL, + `Invalid port in: ${trimmed}` + ); + } + } + results.push({ host, port }); + } else { + // IPv4 or hostname + const lastColon = trimmed.lastIndexOf(":"); + if (lastColon === -1) { + results.push({ host: trimmed, port: DEFAULT_PORT }); + } else { + const host = trimmed.slice(0, lastColon); + const portStr = trimmed.slice(lastColon + 1); + const port = parseInt(portStr, 10); + if (isNaN(port)) { + // Might be hostname without port that contains something odd, treat whole as host + results.push({ host: trimmed, port: DEFAULT_PORT }); + } else { + results.push({ host, port }); + } + } + } + } + + return results; +} + +/** + * Split host part by commas, but not inside brackets. + */ +function splitHostSegments(hostPart: string): string[] { + const segments: string[] = []; + let current = ""; + let inBracket = false; + for (const ch of hostPart) { + if (ch === "[") inBracket = true; + else if (ch === "]") inBracket = false; + if (ch === "," && !inBracket) { + segments.push(current); + current = ""; + } else { + current += ch; + } + } + if (current) segments.push(current); + return segments; +} + +function extractIntParam(params: URLSearchParams, key: string, defaultVal: number): number { + const val = params.get(key); + if (val === null) return defaultVal; + const parsed = parseInt(val, 10); + return isNaN(parsed) ? defaultVal : parsed; +} diff --git a/nodejs/src/common/utils.ts b/nodejs/src/common/utils.ts index a6294c8a..85b0b5e8 100644 --- a/nodejs/src/common/utils.ts +++ b/nodejs/src/common/utils.ts @@ -1,48 +1,66 @@ import { TmqConfig } from "../tmq/config"; import { WSConfig } from "./config"; import { ErrorCode, TDWebSocketClientError } from "./wsError"; +import { parseMultiAddressUrl, buildUrlForHost } from "./urlParser"; export function getUrl(wsConfig: WSConfig): URL { - let url = new URL(wsConfig.getUrl()); + const rawUrl = wsConfig.getUrl(); + + // Try multi-address parsing first + const parsed = parseMultiAddressUrl(rawUrl); + wsConfig.setParsedMultiAddress(parsed); + + // Apply user/password overrides from WSConfig if (wsConfig.getUser()) { - url.username = wsConfig.getUser() || ""; + parsed.username = wsConfig.getUser() || ""; } if (wsConfig.getPwd()) { - url.password = wsConfig.getPwd() || ""; + parsed.password = wsConfig.getPwd() || ""; } + // Apply token let token = wsConfig.getToken(); if (token) { - url.searchParams.set("token", token); + parsed.searchParams.set("token", token); } + // Apply timezone let timezone = wsConfig.getTimezone(); if (timezone) { - url.searchParams.set("timezone", timezone); + parsed.searchParams.set("timezone", timezone); } - if (url.pathname && url.pathname !== "/") { - wsConfig.setDb(url.pathname.slice(1)); + // Extract db from path + if (parsed.pathname && parsed.pathname !== "/") { + wsConfig.setDb(parsed.pathname.slice(1)); } - if (url.searchParams.has("timezone")) { - wsConfig.setTimezone(url.searchParams.get("timezone") || ""); + // Read timezone from searchParams + if (parsed.searchParams.has("timezone")) { + wsConfig.setTimezone(parsed.searchParams.get("timezone") || ""); } + // Apply bearer token const bearerToken = wsConfig.getBearerToken(); if (bearerToken) { - url.searchParams.set("bearer_token", bearerToken); + parsed.searchParams.set("bearer_token", bearerToken); } else { - const bearerTokenFromUrl = url.searchParams.get("bearer_token"); + const bearerTokenFromUrl = parsed.searchParams.get("bearer_token"); if (bearerTokenFromUrl) { wsConfig.setBearerToken(bearerTokenFromUrl); } else { - url.searchParams.delete("bearer_token"); + parsed.searchParams.delete("bearer_token"); } } - url.pathname = "/ws"; - return url; + // Set pathname to /ws for WebSocket endpoint + parsed.pathname = "/ws"; + + // Update the stored parsed multi-address + wsConfig.setParsedMultiAddress(parsed); + + // Return URL for first host (backward compatibility) + return buildUrlForHost(parsed, 0); } export function isEmpty(value: any): boolean { diff --git a/nodejs/src/sql/wsSql.ts b/nodejs/src/sql/wsSql.ts index 650e3482..843f2fa0 100644 --- a/nodejs/src/sql/wsSql.ts +++ b/nodejs/src/sql/wsSql.ts @@ -27,10 +27,14 @@ export class WsSql { private wsConfig: WSConfig; private _wsClient: WsClient; + // 没有建立连接 constructor(wsConfig: WSConfig) { + // string -> url + // 解析并校验,支持多地址 let url = getUrl(wsConfig); this.wsConfig = wsConfig; - this._wsClient = new WsClient(url, wsConfig.getTimeOut()); + // 传递 ParsedMultiAddress 给 WsClient,支持多地址连接 + this._wsClient = new WsClient(url, wsConfig.getTimeOut(), wsConfig.getParsedMultiAddress()); } static async open(wsConfig: WSConfig): Promise { @@ -128,7 +132,6 @@ export class WsSql { "'"; let result = await this.exec(sql); let data = result.getData(); - if (data && data[0] && data[0][0]) { precision = PrecisionLength[data[0][0]]; } diff --git a/nodejs/src/tmq/config.ts b/nodejs/src/tmq/config.ts index eea62338..c110a02a 100644 --- a/nodejs/src/tmq/config.ts +++ b/nodejs/src/tmq/config.ts @@ -1,4 +1,5 @@ import { TMQConstants } from "./constant"; +import { parseMultiAddressUrl, buildUrlForHost, ParsedMultiAddress } from "../common/urlParser"; export class TmqConfig { url: URL | null = null; @@ -14,13 +15,15 @@ export class TmqConfig { auto_commit_interval_ms: number = 5 * 1000; timeout: number = 5000; otherConfigs: Map; + parsedMultiAddress: ParsedMultiAddress | null = null; constructor(wsConfig: Map) { this.otherConfigs = new Map(); + let rawUrl: string | null = null; for (const [key, value] of wsConfig) { switch (key) { case TMQConstants.WS_URL: - this.url = new URL(value); + rawUrl = value; break; case TMQConstants.CONNECT_USER: this.user = value; @@ -55,20 +58,30 @@ export class TmqConfig { } } - if (this.url) { + if (rawUrl) { + // Parse multi-address URL + const parsed = parseMultiAddressUrl(rawUrl); + this.parsedMultiAddress = parsed; + + // Build URL from first host for compatibility + this.url = buildUrlForHost(parsed, 0); + if (this.user) { this.url.username = this.user; + parsed.username = this.user; } else { this.user = this.url.username; } if (this.password) { this.url.password = this.password; + parsed.password = this.password; } else { this.password = this.url.password; } if (this.token) { this.url.searchParams.set("bearer_token", this.token); + parsed.searchParams.set("bearer_token", this.token); } else { const bearerToken = this.url.searchParams.get("bearer_token"); if (bearerToken) { @@ -79,7 +92,7 @@ export class TmqConfig { } } - this.sql_url = new URL(this.url); + this.sql_url = new URL(this.url.toString()); this.sql_url.pathname = "/ws"; this.url.pathname = "/rest/tmq"; } diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index d95a6e99..bbc649e0 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -45,7 +45,8 @@ export class WsConsumer { } this._wsClient = new WsClient( this._wsConfig.url, - this._wsConfig.timeout + this._wsConfig.timeout, + this._wsConfig.parsedMultiAddress || undefined ); this._lastMessageID = BigInt(0); } @@ -56,10 +57,12 @@ export class WsConsumer { if (this._wsConfig.sql_url) { wsSql = new WsClient( this._wsConfig.sql_url, - this._wsConfig.timeout + this._wsConfig.timeout, + this._wsConfig.parsedMultiAddress || undefined ); await wsSql.connect(); await wsSql.checkVersion(); + // 创建 ws 连接 await this._wsClient.ready(); } else { throw new TDWebSocketClientError( diff --git a/nodejs/test/connectionManager.test.ts b/nodejs/test/connectionManager.test.ts new file mode 100644 index 00000000..8f2c4242 --- /dev/null +++ b/nodejs/test/connectionManager.test.ts @@ -0,0 +1,63 @@ +import { parseMultiAddressUrl, buildUrlForHost } from "../src/common/urlParser"; +import { ConnectionManager } from "../src/client/wsConnectionManager"; + +describe("ConnectionManager", () => { + test("constructor initializes with random index", () => { + const parsed = parseMultiAddressUrl( + "ws://root:taosdata@host1:6041,host2:6042,host3:6043?retries=2&retry_backoff_ms=100" + ); + const mgr = new ConnectionManager(parsed, 5000); + expect(mgr).toBeDefined(); + expect(mgr.getParsed()).toBe(parsed); + expect(mgr.isConnected()).toBe(false); + expect(mgr.getConnector()).toBeNull(); + expect(mgr.getInflightCount()).toBe(0); + }); + + test("getCurrentUrl returns valid URL", () => { + const parsed = parseMultiAddressUrl( + "ws://root:taosdata@localhost:6041,localhost:6042" + ); + const mgr = new ConnectionManager(parsed, 5000); + const url = mgr.getCurrentUrl(); + expect(url.protocol).toBe("ws:"); + expect(url.username).toBe("root"); + expect(["6041", "6042"]).toContain(url.port); + }); + + test("trackRequest and completeRequest", () => { + const parsed = parseMultiAddressUrl("ws://root:taosdata@localhost:6041"); + const mgr = new ConnectionManager(parsed, 5000); + + mgr.trackRequest("req1", { + id: "req1", + type: "text", + message: '{"action":"test"}', + resolve: () => {}, + reject: () => {}, + register: true, + }); + expect(mgr.getInflightCount()).toBe(1); + + mgr.completeRequest("req1"); + expect(mgr.getInflightCount()).toBe(0); + }); + + test("close clears inflight requests", async () => { + const parsed = parseMultiAddressUrl("ws://root:taosdata@localhost:6041"); + const mgr = new ConnectionManager(parsed, 5000); + + mgr.trackRequest("req1", { + id: "req1", + type: "text", + message: '{"action":"test"}', + resolve: () => {}, + reject: () => {}, + register: true, + }); + expect(mgr.getInflightCount()).toBe(1); + + await mgr.close(); + expect(mgr.getInflightCount()).toBe(0); + }); +}); diff --git a/nodejs/test/urlParser.test.ts b/nodejs/test/urlParser.test.ts new file mode 100644 index 00000000..78a109c6 --- /dev/null +++ b/nodejs/test/urlParser.test.ts @@ -0,0 +1,137 @@ +import { parseMultiAddressUrl, buildUrlForHost, ParsedMultiAddress } from "../src/common/urlParser"; + +describe("parseMultiAddressUrl", () => { + test("single host with all params", () => { + const result = parseMultiAddressUrl( + "ws://root:taosdata@localhost:6041?retries=3&retry_backoff_ms=100&retry_backoff_max_ms=1000" + ); + expect(result.scheme).toBe("ws"); + expect(result.username).toBe("root"); + expect(result.password).toBe("taosdata"); + expect(result.hosts).toEqual([{ host: "localhost", port: 6041 }]); + expect(result.retries).toBe(3); + expect(result.retryBackoffMs).toBe(100); + expect(result.retryBackoffMaxMs).toBe(1000); + // failover params should be removed from searchParams + expect(result.searchParams.has("retries")).toBe(false); + expect(result.searchParams.has("retry_backoff_ms")).toBe(false); + }); + + test("multiple hosts", () => { + const result = parseMultiAddressUrl( + "ws://root:taosdata@localhost:6041,localhost:6042,localhost:6043?retries=5" + ); + expect(result.hosts).toEqual([ + { host: "localhost", port: 6041 }, + { host: "localhost", port: 6042 }, + { host: "localhost", port: 6043 }, + ]); + expect(result.retries).toBe(5); + }); + + test("IPv6 addresses", () => { + const result = parseMultiAddressUrl( + "ws://root:taosdata@[::1]:6041,[2001:db8::1]:6042?retries=2" + ); + expect(result.hosts).toEqual([ + { host: "::1", port: 6041 }, + { host: "2001:db8::1", port: 6042 }, + ]); + }); + + test("mixed IPv4 and IPv6", () => { + const result = parseMultiAddressUrl( + "ws://root:taosdata@192.168.1.1:6041,[::1]:6042,myhost:6043" + ); + expect(result.hosts).toEqual([ + { host: "192.168.1.1", port: 6041 }, + { host: "::1", port: 6042 }, + { host: "myhost", port: 6043 }, + ]); + }); + + test("wss scheme", () => { + const result = parseMultiAddressUrl("wss://user:pass@host1:6041"); + expect(result.scheme).toBe("wss"); + expect(result.username).toBe("user"); + expect(result.password).toBe("pass"); + }); + + test("default port when omitted", () => { + const result = parseMultiAddressUrl("ws://root:taosdata@myhost"); + expect(result.hosts).toEqual([{ host: "myhost", port: 6041 }]); + }); + + test("default failover params", () => { + const result = parseMultiAddressUrl("ws://root:taosdata@localhost:6041"); + expect(result.retries).toBe(5); + expect(result.retryBackoffMs).toBe(200); + expect(result.retryBackoffMaxMs).toBe(2000); + }); + + test("with pathname", () => { + const result = parseMultiAddressUrl("ws://root:taosdata@localhost:6041/mydb"); + expect(result.pathname).toBe("/mydb"); + }); + + test("with extra query params preserved", () => { + const result = parseMultiAddressUrl( + "ws://root:taosdata@localhost:6041?token=abc&retries=3&timezone=UTC" + ); + expect(result.searchParams.get("token")).toBe("abc"); + expect(result.searchParams.get("timezone")).toBe("UTC"); + expect(result.searchParams.has("retries")).toBe(false); + expect(result.retries).toBe(3); + }); + + test("empty URL throws", () => { + expect(() => parseMultiAddressUrl("")).toThrow(); + }); + + test("invalid scheme throws", () => { + expect(() => parseMultiAddressUrl("http://localhost:6041")).toThrow(); + }); + + test("no userinfo", () => { + const result = parseMultiAddressUrl("ws://localhost:6041?token=abc"); + expect(result.username).toBe(""); + expect(result.password).toBe(""); + expect(result.hosts).toEqual([{ host: "localhost", port: 6041 }]); + }); + + test("IPv6 without port uses default", () => { + const result = parseMultiAddressUrl("ws://root:taosdata@[::1]"); + expect(result.hosts).toEqual([{ host: "::1", port: 6041 }]); + }); +}); + +describe("buildUrlForHost", () => { + test("builds correct URL for IPv4 host", () => { + const parsed = parseMultiAddressUrl( + "ws://root:taosdata@localhost:6041,localhost:6042/ws?token=abc" + ); + const url0 = buildUrlForHost(parsed, 0); + expect(url0.hostname).toBe("localhost"); + expect(url0.port).toBe("6041"); + expect(url0.username).toBe("root"); + expect(url0.password).toBe("taosdata"); + expect(url0.searchParams.get("token")).toBe("abc"); + + const url1 = buildUrlForHost(parsed, 1); + expect(url1.hostname).toBe("localhost"); + expect(url1.port).toBe("6042"); + }); + + test("builds correct URL for IPv6 host", () => { + const parsed = parseMultiAddressUrl("ws://root:taosdata@[::1]:6041"); + const url = buildUrlForHost(parsed, 0); + expect(url.hostname).toBe("[::1]"); + expect(url.port).toBe("6041"); + }); + + test("preserves pathname", () => { + const parsed = parseMultiAddressUrl("ws://root:taosdata@localhost:6041/mydb"); + const url = buildUrlForHost(parsed, 0); + expect(url.pathname).toBe("/mydb"); + }); +}); From 1162db48ba79188507cecc08c0eb1ea7ad762b26 Mon Sep 17 00:00:00 2001 From: qevolg <2227465945@qq.com> Date: Tue, 10 Mar 2026 17:38:48 +0800 Subject: [PATCH 2/2] refactor: simplify URL parser, unify connection management in WebSocketConnector MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Simplify urlParser: replace bracket-aware scanning functions with straightforward @/slash/query position-based host:port extraction. All existing test cases still pass. 2. Refactor WebSocketConnector: change constructor from URL to string, parse multi-address internally via parseMultiAddressUrl(). Merge ConnectionManager failover (round-robin + exponential backoff), reconnect, and inflight request tracking directly into WebSocketConnector. 3. Simplify WsClient: remove ConnectionManager dependency and dual code paths (multi-address vs single-address). WsClient now passes raw URL string to WebSocketConnector, which handles everything uniformly. 4. Update consumers: wsSql, wsTmq/TmqConfig, wsConnectorPool, config, utils all updated to use string URLs instead of URL objects. 5. Delete wsConnectionManager.ts — all its behavior now lives in WebSocketConnector. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- nodejs/src/client/wsClient.ts | 249 +++++------- nodejs/src/client/wsConnectionManager.ts | 314 --------------- nodejs/src/client/wsConnector.ts | 424 +++++++++++++++------ nodejs/src/client/wsConnectorPool.ts | 24 +- nodejs/src/common/config.ts | 10 - nodejs/src/common/urlParser.ts | 111 +++--- nodejs/src/common/utils.ts | 30 +- nodejs/src/sql/wsSql.ts | 4 +- nodejs/src/tmq/config.ts | 49 ++- nodejs/src/tmq/wsTmq.ts | 8 +- nodejs/test/bulkPulling/tmq.config.test.ts | 20 +- nodejs/test/connectionManager.test.ts | 64 ++-- 12 files changed, 563 insertions(+), 744 deletions(-) delete mode 100644 nodejs/src/client/wsConnectionManager.ts diff --git a/nodejs/src/client/wsClient.ts b/nodejs/src/client/wsClient.ts index b64ed8a6..72b5b845 100644 --- a/nodejs/src/client/wsClient.ts +++ b/nodejs/src/client/wsClient.ts @@ -1,7 +1,6 @@ import JSONBig from "json-bigint"; import { WebSocketConnector } from "./wsConnector"; import { WebSocketConnectionPool } from "./wsConnectorPool"; -import { ConnectionManager } from "./wsConnectionManager"; import { ErrorCode, TDWebSocketClientError, @@ -12,40 +11,34 @@ import { WSVersionResponse, WSQueryResponse } from "./wsResponse"; import { ReqId } from "../common/reqid"; import logger from "../common/log"; import { safeDecodeURIComponent, compareVersions, maskSensitiveForLog, maskUrlForLog } from "../common/utils"; -import { ParsedMultiAddress, buildUrlForHost } from "../common/urlParser"; import { w3cwebsocket } from "websocket"; import { ConnectorInfo, TSDB_OPTION_CONNECTION } from "../common/constant"; export class WsClient { private _wsConnector?: WebSocketConnector; - private _timeout?: number | undefined | null; - private _timezone?: string | undefined | null; - private readonly _url: URL; + private _timeout?: number | null; + private _timezone?: string | null; + private readonly _url: string; private static readonly _minVersion = "3.3.2.0"; - private _version?: string | undefined | null; - private _bearerToken?: string | undefined | null; - private _connMgr?: ConnectionManager; - private _parsedMultiAddress?: ParsedMultiAddress; + private _version?: string | null; + private _bearerToken?: string | null; private _database?: string | null; private _inflightIdCounter = 0; - constructor(url: URL, timeout?: number | undefined | null, parsedMultiAddress?: ParsedMultiAddress) { - this.checkURL(url); - this._url = url; - this._timeout = timeout; - this._parsedMultiAddress = parsedMultiAddress; - if (this._url.searchParams.has("timezone")) { - this._timezone = this._url.searchParams.get("timezone") || undefined; - this._url.searchParams.delete("timezone"); - } - if (this._url.searchParams.has("bearer_token")) { - this._bearerToken = this._url.searchParams.get("bearer_token") || undefined; - } + // Parsed URL info (extracted from the raw URL string) + private _username: string = ""; + private _password: string = ""; + private _token?: string | null; - // Initialize ConnectionManager if multi-address is provided - if (this._parsedMultiAddress && this._parsedMultiAddress.hosts.length > 0) { - this._connMgr = new ConnectionManager(this._parsedMultiAddress, this._timeout); + constructor(url: string, timeout?: number | null) { + if (!url) { + throw new WebSocketInterfaceError( + ErrorCode.ERR_INVALID_URL, + `invalid url, provide non-empty URL` + ); } + this._url = url; + this._timeout = timeout; } private _nextInflightId(): string { @@ -56,21 +49,14 @@ export class WsClient { * Ensure we have an active WebSocket connector, performing failover if needed. */ private async _ensureConnector(): Promise { - // If ConnectionManager is available, use it - if (this._connMgr) { - if (this._connMgr.isConnected()) { - return this._connMgr.getConnector()!; - } + if (this._wsConnector && this._wsConnector.isConnected()) { + return this._wsConnector; + } + if (this._wsConnector) { // Try reconnect with failover - const connector = await this._connMgr.reconnect(); - this._wsConnector = connector; + await this._wsConnector.reconnect(); // Re-authenticate after reconnect await this._sendConnMsg(this._database); - return connector; - } - - // Fallback: original single-URL behavior - if (this._wsConnector && this._wsConnector.readyState() === w3cwebsocket.OPEN) { return this._wsConnector; } throw new TDWebSocketClientError( @@ -80,13 +66,13 @@ export class WsClient { } private async _sendConnMsg(database?: string | null): Promise { - const connector = this._connMgr ? this._connMgr.getConnector()! : this._wsConnector!; + if (!this._wsConnector) return; let connMsg = { action: "conn", args: { req_id: ReqId.getReqID(), - user: safeDecodeURIComponent(this._url.username), - password: safeDecodeURIComponent(this._url.password), + user: this._username, + password: this._password, db: database, connector: ConnectorInfo, ...(this._timezone && { tz: this._timezone }), @@ -98,68 +84,50 @@ export class WsClient { (key === "password" || key === "bearer_token") ? "[REDACTED]" : value )); } - let result: any = await connector.sendMsg(JSON.stringify(connMsg)); + let result: any = await this._wsConnector.sendMsg(JSON.stringify(connMsg)); if (result.msg.code != 0) { throw new WebSocketQueryError(result.msg.code, result.msg.message); } } - async connect(database?: string | undefined | null): Promise { + async connect(database?: string | null): Promise { this._database = database; - if (this._connMgr) { - // Multi-address: use ConnectionManager - try { - this._wsConnector = await this._connMgr.connect(); - await this._sendConnMsg(database); - return; - } catch (e: any) { - await this.close(); - const maskedUrl = maskUrlForLog(this._url); - logger.error(`connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}`); - throw new TDWebSocketClientError( - ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}` + // Create connector from raw URL string + this._wsConnector = new WebSocketConnector(this._url, this._timeout); + const parsed = this._wsConnector.getParsed(); + + // Extract credentials from parsed URL + this._username = safeDecodeURIComponent(parsed.username); + this._password = safeDecodeURIComponent(parsed.password); + + // Extract timezone/bearerToken from searchParams + if (parsed.searchParams.has("timezone")) { + this._timezone = parsed.searchParams.get("timezone") || undefined; + } + if (parsed.searchParams.has("bearer_token")) { + this._bearerToken = parsed.searchParams.get("bearer_token") || undefined; + } + if (parsed.searchParams.has("token")) { + this._token = parsed.searchParams.get("token") || undefined; + } + + // Check authentication + if (!this._token && !this._bearerToken) { + if (!parsed.username && !parsed.password) { + throw new WebSocketInterfaceError( + ErrorCode.ERR_INVALID_AUTHENTICATION, + `invalid url, provide non-empty "token" or "bearer_token", or provide username/password` ); } } - // Original single-URL path - let connMsg = { - action: "conn", - args: { - req_id: ReqId.getReqID(), - user: safeDecodeURIComponent(this._url.username), - password: safeDecodeURIComponent(this._url.password), - db: database, - connector: ConnectorInfo, - ...(this._timezone && { tz: this._timezone }), - ...(this._bearerToken && { bearer_token: this._bearerToken }), - }, - }; - if (logger.isDebugEnabled()) { - logger.debug("[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg, (key, value) => - (key === "password" || key === "bearer_token") ? "[REDACTED]" : value - )); - } - this._wsConnector = await WebSocketConnectionPool.instance().getConnection( - this._url, - this._timeout - ); - if (this._wsConnector.readyState() === w3cwebsocket.OPEN) { - return; - } try { - await this._wsConnector.ready(); - let result: any = await this._wsConnector.sendMsg(JSON.stringify(connMsg)); - if (result.msg.code == 0) { - return; - } - await this.close(); - throw new WebSocketQueryError(result.msg.code, result.msg.message); + await this._wsConnector.connect(); + await this._sendConnMsg(database); } catch (e: any) { await this.close(); - const maskedUrl = maskUrlForLog(this._url); + const maskedUrl = maskUrlForLog(this._wsConnector?.getCurrentUrl() ?? null); logger.error(`connection creation failed, url: ${maskedUrl}, code:${e.code}, msg:${e.message}`); throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, @@ -196,7 +164,6 @@ export class WsClient { await connector.sendMsgNoResp(queryMsg); } - // Need to construct Response async exec(queryMsg: string, bSqlQuery: boolean = true): Promise { if (logger.isDebugEnabled()) { logger.debug("[wsQueryInterface.query.queryMsg]===>" + maskSensitiveForLog(queryMsg)); @@ -206,7 +173,7 @@ export class WsClient { return new Promise((resolve, reject) => { const wrappedResolve = (e: any) => { - if (this._connMgr) this._connMgr.completeRequest(inflightId); + connector.completeRequest(inflightId); if (e.msg.code == 0) { if (bSqlQuery) { resolve(new WSQueryResponse(e)); @@ -218,20 +185,18 @@ export class WsClient { } }; const wrappedReject = (e: any) => { - if (this._connMgr) this._connMgr.completeRequest(inflightId); + connector.completeRequest(inflightId); reject(e); }; - if (this._connMgr) { - this._connMgr.trackRequest(inflightId, { - id: inflightId, - type: "text", - message: queryMsg, - resolve: wrappedResolve, - reject: wrappedReject, - register: true, - }); - } + connector.trackRequest(inflightId, { + id: inflightId, + type: "text", + message: queryMsg, + resolve: wrappedResolve, + reject: wrappedReject, + register: true, + }); connector .sendMsg(queryMsg) @@ -240,7 +205,6 @@ export class WsClient { }); } - // need to construct Response. async sendBinaryMsg( reqId: bigint, action: string, @@ -253,7 +217,7 @@ export class WsClient { return new Promise((resolve, reject) => { const wrappedResolve = (e: any) => { - if (this._connMgr) this._connMgr.completeRequest(inflightId); + connector.completeRequest(inflightId); if (bResultBinary) { resolve(e); return; @@ -269,22 +233,20 @@ export class WsClient { } }; const wrappedReject = (e: any) => { - if (this._connMgr) this._connMgr.completeRequest(inflightId); + connector.completeRequest(inflightId); reject(e); }; - if (this._connMgr) { - this._connMgr.trackRequest(inflightId, { - id: inflightId, - type: "binary", - reqId, - action, - binaryData: message, - resolve: wrappedResolve, - reject: wrappedReject, - register: true, - }); - } + connector.trackRequest(inflightId, { + id: inflightId, + type: "binary", + reqId, + action, + binaryData: message, + resolve: wrappedResolve, + reject: wrappedReject, + register: true, + }); connector .sendBinaryMsg(reqId, action, message) @@ -294,9 +256,6 @@ export class WsClient { } getState() { - if (this._connMgr) { - return this._connMgr.isConnected() ? w3cwebsocket.OPEN : w3cwebsocket.CLOSED; - } if (this._wsConnector) { return this._wsConnector.readyState(); } @@ -304,27 +263,32 @@ export class WsClient { } async ready(): Promise { - if (this._connMgr) { - if (!this._connMgr.isConnected()) { - this._wsConnector = await this._connMgr.reconnect(); - } + if (this._wsConnector && this._wsConnector.isConnected()) { return; } + if (this._wsConnector) { + try { + await this._wsConnector.reconnect(); + return; + } catch (e: any) { + const maskedUrl = maskUrlForLog(this._wsConnector?.getCurrentUrl() ?? null); + logger.error( + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` + ); + throw new TDWebSocketClientError( + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` + ); + } + } + // No connector yet — create one and connect + this._wsConnector = new WebSocketConnector(this._url, this._timeout); try { - this._wsConnector = await WebSocketConnectionPool.instance().getConnection( - this._url, - this._timeout - ); - if (this._wsConnector.readyState() !== w3cwebsocket.OPEN) { - await this._wsConnector.ready(); - } - if (logger.isDebugEnabled()) { - logger.debug("ready status ", maskUrlForLog(this._url), this._wsConnector.readyState()); - } + await this._wsConnector.connect(); return; } catch (e: any) { - const maskedUrl = maskUrlForLog(this._url); + const maskedUrl = maskUrlForLog(this._wsConnector?.getCurrentUrl() ?? null); logger.error( `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` ); @@ -377,7 +341,7 @@ export class WsClient { } throw new WebSocketInterfaceError(result.msg.code, result.msg.message); } catch (e: any) { - const maskedUrl = maskUrlForLog(this._url); + const maskedUrl = maskUrlForLog(this._wsConnector?.getCurrentUrl() ?? null); logger.error( `connection creation failed, url: ${maskedUrl}, code: ${e.code}, message: ${e.message}` ); @@ -389,11 +353,6 @@ export class WsClient { } async close(): Promise { - if (this._connMgr) { - await this._connMgr.close(); - this._wsConnector = undefined; - return; - } if (this._wsConnector) { await WebSocketConnectionPool.instance().releaseConnection( this._wsConnector @@ -402,18 +361,6 @@ export class WsClient { } } - checkURL(url: URL) { - // Assert token or bearer_token exists, otherwise username and password must exist. - if (!url.searchParams.get("token") && !url.searchParams.get("bearer_token")) { - if (!(url.username || url.password)) { - throw new WebSocketInterfaceError( - ErrorCode.ERR_INVALID_AUTHENTICATION, - `invalid url, provide non-empty "token" or "bearer_token", or provide username/password` - ); - } - } - } - async checkVersion() { this._version = await this.version(); let result = compareVersions(this._version, WsClient._minVersion); diff --git a/nodejs/src/client/wsConnectionManager.ts b/nodejs/src/client/wsConnectionManager.ts deleted file mode 100644 index 1b4a2a86..00000000 --- a/nodejs/src/client/wsConnectionManager.ts +++ /dev/null @@ -1,314 +0,0 @@ -import { Mutex } from "async-mutex"; -import { w3cwebsocket, ICloseEvent } from "websocket"; -import { WebSocketConnector } from "./wsConnector"; -import { WebSocketConnectionPool } from "./wsConnectorPool"; -import { WsEventCallback, OnMessageType } from "./wsEventCallback"; -import { - ParsedMultiAddress, - buildUrlForHost, -} from "../common/urlParser"; -import { - ErrorCode, - TDWebSocketClientError, -} from "../common/wsError"; -import { ReqId } from "../common/reqid"; -import logger from "../common/log"; -import { maskUrlForLog } from "../common/utils"; - -export interface InflightRequest { - id: string; - type: "text" | "binary"; - // for text requests - message?: string; - // for binary requests - reqId?: bigint; - action?: string; - binaryData?: ArrayBuffer; - // callback - resolve: (args: unknown) => void; - reject: (reason: any) => void; - register: boolean; -} - -export class ConnectionManager { - private _parsed: ParsedMultiAddress; - private _currentIndex: number; - private _timeout: number; - private _connector: WebSocketConnector | null = null; - private _inflightRequests: Map = new Map(); - private _isReconnecting: boolean = false; - private _reconnectPromise: Promise | null = null; - private _reconnectMutex = new Mutex(); - private _closed: boolean = false; - - constructor(parsed: ParsedMultiAddress, timeout?: number | null) { - this._parsed = parsed; - this._currentIndex = Math.floor(Math.random() * parsed.hosts.length); - this._timeout = timeout || 5000; - } - - /** - * Establish the initial connection using a random host. - */ - async connect(): Promise { - const connector = await this._createConnector(this._currentIndex); - this._connector = connector; - this._setupCloseHandler(connector); - return connector; - } - - /** - * Get current connector, or null if disconnected. - */ - getConnector(): WebSocketConnector | null { - return this._connector; - } - - /** - * Check if the current connection is open. - */ - isConnected(): boolean { - return ( - this._connector !== null && - this._connector.readyState() === w3cwebsocket.OPEN - ); - } - - /** - * Perform failover: retry current host, then round-robin through others. - * Returns a connected WebSocketConnector or throws if all fail. - */ - async reconnect(): Promise { - const release = await this._reconnectMutex.acquire(); - try { - // If already reconnected by another caller, return existing connector - if (this.isConnected()) { - return this._connector!; - } - - if (this._closed) { - throw new TDWebSocketClientError( - ErrorCode.ERR_CONNECTION_CLOSED, - "ConnectionManager is closed" - ); - } - - this._isReconnecting = true; - const totalHosts = this._parsed.hosts.length; - - // Try each host starting from current - for (let i = 0; i < totalHosts; i++) { - const hostIndex = - (this._currentIndex + i) % totalHosts; - const connector = await this._tryConnectWithRetries(hostIndex); - if (connector) { - // Clean up old connector - if (this._connector) { - try { this._connector.close(); } catch (_) {} - } - this._connector = connector; - this._currentIndex = hostIndex; - this._setupCloseHandler(connector); - this._isReconnecting = false; - - // Resend inflight requests - await this._resendInflightRequests(connector); - return connector; - } - } - - this._isReconnecting = false; - throw new TDWebSocketClientError( - ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `Failover failed: all ${totalHosts} hosts exhausted after retries` - ); - } finally { - release(); - } - } - - /** - * Track an inflight request for potential resend on reconnect. - */ - trackRequest(id: string, request: InflightRequest): void { - this._inflightRequests.set(id, request); - } - - /** - * Remove a completed request from inflight tracking. - */ - completeRequest(id: string): void { - this._inflightRequests.delete(id); - } - - /** - * Get inflight request count (for diagnostics). - */ - getInflightCount(): number { - return this._inflightRequests.size; - } - - /** - * Close the connection manager and underlying connector. - */ - async close(): Promise { - this._closed = true; - if (this._connector) { - await WebSocketConnectionPool.instance().releaseConnection( - this._connector - ); - this._connector = null; - } - this._inflightRequests.clear(); - } - - /** - * Get the current host URL. - */ - getCurrentUrl(): URL { - return buildUrlForHost(this._parsed, this._currentIndex); - } - - /** - * Get parsed multi-address info. - */ - getParsed(): ParsedMultiAddress { - return this._parsed; - } - - /** - * Try connecting to a specific host with exponential backoff retries. - */ - private async _tryConnectWithRetries( - hostIndex: number - ): Promise { - const retries = this._parsed.retries; - const baseBackoff = this._parsed.retryBackoffMs; - const maxBackoff = this._parsed.retryBackoffMaxMs; - const url = buildUrlForHost(this._parsed, hostIndex); - - for (let attempt = 0; attempt <= retries; attempt++) { - if (this._closed) return null; - - try { - logger.info( - `Attempting connection to ${maskUrlForLog(url)}, attempt ${attempt + 1}/${retries + 1}` - ); - const connector = await this._createConnector(hostIndex); - logger.info( - `Successfully connected to ${maskUrlForLog(url)}` - ); - return connector; - } catch (e: any) { - logger.warn( - `Connection attempt ${attempt + 1} to ${maskUrlForLog(url)} failed: ${e.message}` - ); - if (attempt < retries) { - const backoff = Math.min( - baseBackoff * Math.pow(2, attempt), - maxBackoff - ); - await this._sleep(backoff); - } - } - } - return null; - } - - /** - * Create a WebSocketConnector for a specific host and wait until it's ready. - */ - private async _createConnector( - hostIndex: number - ): Promise { - const url = buildUrlForHost(this._parsed, hostIndex); - const connector = await WebSocketConnectionPool.instance().getConnection( - url, - this._timeout - ); - if (connector.readyState() !== w3cwebsocket.OPEN) { - await connector.ready(); - } - return connector; - } - - /** - * Set up the onclose handler to detect disconnections and trigger failover. - */ - private _setupCloseHandler(connector: WebSocketConnector): void { - connector.onClose((_event: ICloseEvent) => { - if (this._closed) return; - logger.warn( - "WebSocket connection closed, failover will be triggered on next operation" - ); - }); - } - - /** - * Resend all inflight requests after a successful reconnect. - */ - private async _resendInflightRequests( - connector: WebSocketConnector - ): Promise { - if (this._inflightRequests.size === 0) return; - - logger.info( - `Resending ${this._inflightRequests.size} inflight requests after reconnect` - ); - - const requests = Array.from(this._inflightRequests.entries()); - for (const [id, req] of requests) { - try { - if (req.type === "text" && req.message) { - if (req.register) { - const msg = JSON.parse(req.message); - WsEventCallback.instance().registerCallback( - { - action: msg.action, - req_id: msg.args.req_id, - timeout: connector._timeout, - id: - msg.args.id === undefined - ? msg.args.id - : BigInt(msg.args.id), - }, - req.resolve, - req.reject - ); - } - connector.sendMsgNoResp(req.message); - } else if ( - req.type === "binary" && - req.binaryData && - req.reqId !== undefined && - req.action - ) { - if (req.register) { - WsEventCallback.instance().registerCallback( - { - action: req.action, - req_id: req.reqId, - timeout: connector._timeout, - id: req.reqId, - }, - req.resolve, - req.reject - ); - } - connector.sendBinaryMsgRaw(req.binaryData); - } - logger.debug(`Resent inflight request ${id}`); - } catch (e: any) { - logger.error( - `Failed to resend inflight request ${id}: ${e.message}` - ); - req.reject(e); - this._inflightRequests.delete(id); - } - } - } - - private _sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); - } -} diff --git a/nodejs/src/client/wsConnector.ts b/nodejs/src/client/wsConnector.ts index 347877d8..ac11417b 100644 --- a/nodejs/src/client/wsConnector.ts +++ b/nodejs/src/client/wsConnector.ts @@ -1,3 +1,4 @@ +import { Mutex } from "async-mutex"; import { ICloseEvent, w3cwebsocket } from "websocket"; import { ErrorCode, @@ -5,147 +6,149 @@ import { WebSocketQueryError, } from "../common/wsError"; import { OnMessageType, WsEventCallback } from "./wsEventCallback"; +import { + ParsedMultiAddress, + buildUrlForHost, + parseMultiAddressUrl, +} from "../common/urlParser"; import logger from "../common/log"; import { ReqId } from "../common/reqid"; import { maskSensitiveForLog, maskUrlForLog } from "../common/utils"; +export interface InflightRequest { + id: string; + type: "text" | "binary"; + message?: string; + reqId?: bigint; + action?: string; + binaryData?: ArrayBuffer; + resolve: (args: unknown) => void; + reject: (reason: any) => void; + register: boolean; +} + export class WebSocketConnector { - // 底层 ws 连接 - private _wsConn: w3cwebsocket; - private _wsURL: URL; + private _wsConn: w3cwebsocket | null = null; + private _rawUrl: string; + private _parsed: ParsedMultiAddress; + private _currentIndex: number; _timeout = 5000; - private _onCloseCallbacks: Array<(event: ICloseEvent) => void> = []; - constructor(url: URL, timeout: number | undefined | null) { - if (url) { - this._wsURL = url; - let origin = url.origin; - let pathname = url.pathname; - let search = url.search; - if (timeout) { - this._timeout = timeout; - } - this._wsConn = new w3cwebsocket( - origin.concat(pathname).concat(search), - undefined, - undefined, - undefined, - undefined, - { - maxReceivedFrameSize: 0x60000000, - maxReceivedMessageSize: 0x60000000, - } - ); - this._wsConn.onerror = function (err: Error) { - logger.error(`webSocket connection failed, url: ${maskUrlForLog(new URL(this.url))}, error: ${err.message}`); - }; - this._wsConn.onclose = (e: ICloseEvent) => { - this._onclose(e); - for (const cb of this._onCloseCallbacks) { - try { cb(e); } catch (_) { /* ignore callback errors */ } - } - }; - this._wsConn.onmessage = this._onmessage; - this._wsConn._binaryType = "arraybuffer"; - } else { + // Failover state + private _inflightRequests: Map = new Map(); + private _reconnectMutex = new Mutex(); + private _closed: boolean = false; + + constructor(url: string, timeout?: number | null) { + if (!url) { throw new WebSocketQueryError( ErrorCode.ERR_INVALID_URL, "websocket URL must be defined" ); } + this._rawUrl = url; + this._parsed = parseMultiAddressUrl(url); + this._currentIndex = this._parsed.hosts.length > 1 + ? Math.floor(Math.random() * this._parsed.hosts.length) + : 0; + if (timeout) { + this._timeout = timeout; + } } - /** - * Register a callback to be invoked when the underlying WebSocket closes. - */ - onClose(callback: (event: ICloseEvent) => void): void { - this._onCloseCallbacks.push(callback); + /** Parsed multi-address info. */ + getParsed(): ParsedMultiAddress { + return this._parsed; } - // ??? - // 连接建立成功后,发送连接成功的消息,触发后续的事件回调 - async ready() { - return new Promise((resolve, reject) => { - let reqId = ReqId.getReqID(); - WsEventCallback.instance().registerCallback( - { - action: "websocket_connection", - req_id: BigInt(reqId), - timeout: this._timeout, - id: BigInt(reqId), - }, - resolve, - reject - ); + /** URL object for the current host. */ + getCurrentUrl(): URL { + return buildUrlForHost(this._parsed, this._currentIndex); + } - this._wsConn.onopen = () => { - logger.debug("websocket connection opened"); - WsEventCallback.instance().handleEventCallback( - { - id: BigInt(reqId), - action: "websocket_connection", - req_id: BigInt(reqId), - }, - OnMessageType.MESSAGE_TYPE_CONNECTION, - this - ); - }; - }); + /** Establish the initial WebSocket connection. */ + async connect(): Promise { + this._wsConn = this._createWsConnection(this._currentIndex); + await this._ready(); } - private async _onclose(e: ICloseEvent) { - logger.info("websocket connection closed"); + /** Check if the current connection is open. */ + isConnected(): boolean { + return ( + this._wsConn !== null && + this._wsConn.readyState === w3cwebsocket.OPEN + ); } - private _onmessage(event: any) { - let data = event.data; - logger.debug("wsClient._onMessage()====" + Object.prototype.toString.call(data)); - if (Object.prototype.toString.call(data) === "[object ArrayBuffer]") { - let id = new DataView(data, 26, 8).getBigUint64(0, true); - WsEventCallback.instance().handleEventCallback( - { id: id, action: "", req_id: BigInt(0) }, - OnMessageType.MESSAGE_TYPE_ARRAYBUFFER, - data - ); - } else if (Object.prototype.toString.call(data) === "[object String]") { - let msg = JSON.parse(data); - logger.debug("[_onmessage.stringType]==>:" + data); - WsEventCallback.instance().handleEventCallback( - { id: BigInt(0), action: msg.action, req_id: msg.req_id }, - OnMessageType.MESSAGE_TYPE_STRING, - msg - ); - } else { + readyState(): number { + return this._wsConn ? this._wsConn.readyState : w3cwebsocket.CLOSED; + } + + /** + * Perform failover: retry current host, then round-robin through others. + * Returns once reconnected or throws if all hosts fail. + */ + async reconnect(): Promise { + const release = await this._reconnectMutex.acquire(); + try { + if (this.isConnected()) return; + if (this._closed) { + throw new TDWebSocketClientError( + ErrorCode.ERR_CONNECTION_CLOSED, + "WebSocketConnector is closed" + ); + } + + const totalHosts = this._parsed.hosts.length; + for (let i = 0; i < totalHosts; i++) { + const hostIndex = (this._currentIndex + i) % totalHosts; + const conn = await this._tryConnectWithRetries(hostIndex); + if (conn) { + if (this._wsConn) { + try { this._wsConn.close(); } catch (_) { /* ignore */ } + } + this._wsConn = conn; + this._currentIndex = hostIndex; + await this._resendInflightRequests(); + return; + } + } + throw new TDWebSocketClientError( - ErrorCode.ERR_INVALID_MESSAGE_TYPE, - `invalid message type ${Object.prototype.toString.call(data)}` + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + `Failover failed: all ${totalHosts} hosts exhausted after retries` ); + } finally { + release(); } } - // 关闭底层 ws 连接 + /** Track an inflight request for potential resend on reconnect. */ + trackRequest(id: string, request: InflightRequest): void { + this._inflightRequests.set(id, request); + } + + /** Remove a completed request from inflight tracking. */ + completeRequest(id: string): void { + this._inflightRequests.delete(id); + } + + /** Get inflight request count (for diagnostics). */ + getInflightCount(): number { + return this._inflightRequests.size; + } + close() { + this._closed = true; if (this._wsConn) { this._wsConn.close(); - } else { - throw new TDWebSocketClientError( - ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - "WebSocket connection is undefined." - ); + this._wsConn = null; } - } - - readyState(): number { - return this._wsConn.readyState; + this._inflightRequests.clear(); } async sendMsgNoResp(message: string): Promise { logger.debug("[wsClient.sendMsgNoResp()]===>" + message); - let msg = JSON.parse(message); - if (msg.args.id !== undefined) { - msg.args.id = BigInt(msg.args.id); - } - return new Promise((resolve, reject) => { if (this._wsConn && this._wsConn.readyState === w3cwebsocket.OPEN) { this._wsConn.send(message); @@ -171,13 +174,6 @@ export class WebSocketConnector { } return new Promise((resolve, reject) => { - /* - w3c 连接有状态 - CONNECTING: number; - OPEN: number; - CLOSING: number; - CLOSED: number; - */ if (this._wsConn && this._wsConn.readyState === w3cwebsocket.OPEN) { if (register) { WsEventCallback.instance().registerCallback( @@ -196,9 +192,6 @@ export class WebSocketConnector { } this._wsConn.send(message); } else { - // 连接重建,重试机制 - // reconnect 后重新发送请求 - // 有没有 inflight 请求?如果有需要添加缓存机制,重试时重新发送 reject( new WebSocketQueryError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, @@ -247,9 +240,7 @@ export class WebSocketConnector { }); } - /** - * Send raw binary data without callback registration (used for inflight resend). - */ + /** Send raw binary data without callback registration (used for inflight resend). */ sendBinaryMsgRaw(message: ArrayBuffer): void { if (this._wsConn && this._wsConn.readyState === w3cwebsocket.OPEN) { this._wsConn.send(message); @@ -261,7 +252,196 @@ export class WebSocketConnector { } } - public getWsURL(): URL { - return this._wsURL; + // ── private helpers ── + + /** Create a raw w3cwebsocket connection for a specific host. */ + private _createWsConnection(hostIndex: number): w3cwebsocket { + const url = buildUrlForHost(this._parsed, hostIndex); + const addr = url.origin + url.pathname + url.search; + const conn = new w3cwebsocket( + addr, + undefined, + undefined, + undefined, + undefined, + { + maxReceivedFrameSize: 0x60000000, + maxReceivedMessageSize: 0x60000000, + } + ); + conn.onerror = function (err: Error) { + logger.error(`webSocket connection failed, url: ${maskUrlForLog(url)}, error: ${err.message}`); + }; + conn.onclose = (e: ICloseEvent) => { + logger.info("websocket connection closed"); + if (!this._closed) { + logger.warn( + "WebSocket connection closed, failover will be triggered on next operation" + ); + } + }; + conn.onmessage = this._onmessage; + conn._binaryType = "arraybuffer"; + return conn; + } + + /** Wait for the current WebSocket to open and fire the connection callback. */ + private _ready(): Promise { + return new Promise((resolve, reject) => { + if (!this._wsConn) { + return reject( + new TDWebSocketClientError( + ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, + "WebSocket connection is undefined." + ) + ); + } + const reqId = ReqId.getReqID(); + WsEventCallback.instance().registerCallback( + { + action: "websocket_connection", + req_id: BigInt(reqId), + timeout: this._timeout, + id: BigInt(reqId), + }, + resolve, + reject + ); + + this._wsConn.onopen = () => { + logger.debug("websocket connection opened"); + WsEventCallback.instance().handleEventCallback( + { + id: BigInt(reqId), + action: "websocket_connection", + req_id: BigInt(reqId), + }, + OnMessageType.MESSAGE_TYPE_CONNECTION, + this + ); + }; + }); + } + + private _onmessage(event: any) { + let data = event.data; + logger.debug("wsClient._onMessage()====" + Object.prototype.toString.call(data)); + if (Object.prototype.toString.call(data) === "[object ArrayBuffer]") { + let id = new DataView(data, 26, 8).getBigUint64(0, true); + WsEventCallback.instance().handleEventCallback( + { id: id, action: "", req_id: BigInt(0) }, + OnMessageType.MESSAGE_TYPE_ARRAYBUFFER, + data + ); + } else if (Object.prototype.toString.call(data) === "[object String]") { + let msg = JSON.parse(data); + logger.debug("[_onmessage.stringType]==>:" + data); + WsEventCallback.instance().handleEventCallback( + { id: BigInt(0), action: msg.action, req_id: msg.req_id }, + OnMessageType.MESSAGE_TYPE_STRING, + msg + ); + } else { + throw new TDWebSocketClientError( + ErrorCode.ERR_INVALID_MESSAGE_TYPE, + `invalid message type ${Object.prototype.toString.call(data)}` + ); + } + } + + /** Try connecting to a specific host with exponential backoff retries. */ + private async _tryConnectWithRetries( + hostIndex: number + ): Promise { + const retries = this._parsed.retries; + const baseBackoff = this._parsed.retryBackoffMs; + const maxBackoff = this._parsed.retryBackoffMaxMs; + const url = buildUrlForHost(this._parsed, hostIndex); + + for (let attempt = 0; attempt <= retries; attempt++) { + if (this._closed) return null; + try { + logger.info( + `Attempting connection to ${maskUrlForLog(url)}, attempt ${attempt + 1}/${retries + 1}` + ); + const conn = this._createWsConnection(hostIndex); + this._wsConn = conn; + await this._ready(); + logger.info(`Successfully connected to ${maskUrlForLog(url)}`); + return conn; + } catch (e: any) { + logger.warn( + `Connection attempt ${attempt + 1} to ${maskUrlForLog(url)} failed: ${e.message}` + ); + if (attempt < retries) { + const backoff = Math.min( + baseBackoff * Math.pow(2, attempt), + maxBackoff + ); + await new Promise((r) => setTimeout(r, backoff)); + } + } + } + return null; + } + + /** Resend all inflight requests after a successful reconnect. */ + private async _resendInflightRequests(): Promise { + if (this._inflightRequests.size === 0) return; + + logger.info( + `Resending ${this._inflightRequests.size} inflight requests after reconnect` + ); + + const requests = Array.from(this._inflightRequests.entries()); + for (const [id, req] of requests) { + try { + if (req.type === "text" && req.message) { + if (req.register) { + const msg = JSON.parse(req.message); + WsEventCallback.instance().registerCallback( + { + action: msg.action, + req_id: msg.args.req_id, + timeout: this._timeout, + id: + msg.args.id === undefined + ? msg.args.id + : BigInt(msg.args.id), + }, + req.resolve, + req.reject + ); + } + this.sendMsgNoResp(req.message); + } else if ( + req.type === "binary" && + req.binaryData && + req.reqId !== undefined && + req.action + ) { + if (req.register) { + WsEventCallback.instance().registerCallback( + { + action: req.action, + req_id: req.reqId, + timeout: this._timeout, + id: req.reqId, + }, + req.resolve, + req.reject + ); + } + this.sendBinaryMsgRaw(req.binaryData); + } + logger.debug(`Resent inflight request ${id}`); + } catch (e: any) { + logger.error( + `Failed to resend inflight request ${id}: ${e.message}` + ); + req.reject(e); + this._inflightRequests.delete(id); + } + } } } diff --git a/nodejs/src/client/wsConnectorPool.ts b/nodejs/src/client/wsConnectorPool.ts index c87586c9..aae06920 100644 --- a/nodejs/src/client/wsConnectorPool.ts +++ b/nodejs/src/client/wsConnectorPool.ts @@ -3,7 +3,6 @@ import { WebSocketConnector } from "./wsConnector"; import { ErrorCode, TDWebSocketClientError } from "../common/wsError"; import logger from "../common/log"; import { w3cwebsocket } from "websocket"; -import { maskUrlForLog } from "../common/utils"; const mutex = new Mutex(); @@ -28,13 +27,12 @@ export class WebSocketConnectionPool { return WebSocketConnectionPool._instance; } - async getConnection(url: URL, timeout: number | undefined | null): Promise { - let connectAddr = url.origin.concat(url.pathname).concat(url.search); + async getConnection(url: string, timeout?: number | null): Promise { let connector: WebSocketConnector | undefined; const unlock = await mutex.acquire(); try { - if (this.pool.has(connectAddr)) { - const connectors = this.pool.get(connectAddr); + if (this.pool.has(url)) { + const connectors = this.pool.get(url); while (connectors && connectors.length > 0) { const candidate = connectors.pop(); if (!candidate) { @@ -46,7 +44,7 @@ export class WebSocketConnectionPool { } else if (candidate) { Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1); candidate.close(); - logger.error(`getConnection, current connection status fail, url: ${maskUrlForLog(new URL(connectAddr))}`); + logger.error(`getConnection, current connection status fail, url: ${url}`); } } } @@ -74,8 +72,8 @@ export class WebSocketConnectionPool { logger.info( "getConnection, new connection count:" + Atomics.load(WebSocketConnectionPool.sharedArray, 0) + - ", connectAddr:" + - connectAddr.replace(/(token=)[^&]*/i, "$1[REDACTED]") + ", url:" + + url.replace(/(token=)[^&]*/i, "$1[REDACTED]") ); } return new WebSocketConnector(url, timeout); @@ -89,13 +87,13 @@ export class WebSocketConnectionPool { const unlock = await mutex.acquire(); try { if (connector.readyState() === w3cwebsocket.OPEN) { - let url = connector.getWsURL(); - let connectAddr = url.origin.concat(url.pathname).concat(url.search); - let connectors = this.pool.get(connectAddr); + const url = connector.getCurrentUrl(); + const key = url.origin + url.pathname + url.search; + let connectors = this.pool.get(key); if (!connectors) { - connectors = new Array(); + connectors = []; connectors.push(connector); - this.pool.set(connectAddr, connectors); + this.pool.set(key, connectors); } else { connectors.push(connector); } diff --git a/nodejs/src/common/config.ts b/nodejs/src/common/config.ts index deb66ed5..5d3a0323 100644 --- a/nodejs/src/common/config.ts +++ b/nodejs/src/common/config.ts @@ -1,5 +1,4 @@ import { MinStmt2Version } from "./constant"; -import { ParsedMultiAddress } from "./urlParser"; export class WSConfig { private _user: string | undefined | null; @@ -11,7 +10,6 @@ export class WSConfig { private _timezone: string | undefined | null; private _minStmt2Version: string; private _bearerToken: string | undefined | null; - private _parsedMultiAddress: ParsedMultiAddress | undefined; constructor(url: string, minStmt2Version?: string) { this._url = url; @@ -89,12 +87,4 @@ export class WSConfig { public getMinStmt2Version() { return this._minStmt2Version; } - - public getParsedMultiAddress(): ParsedMultiAddress | undefined { - return this._parsedMultiAddress; - } - - public setParsedMultiAddress(parsed: ParsedMultiAddress) { - this._parsedMultiAddress = parsed; - } } diff --git a/nodejs/src/common/urlParser.ts b/nodejs/src/common/urlParser.ts index b134c348..949dd5bf 100644 --- a/nodejs/src/common/urlParser.ts +++ b/nodejs/src/common/urlParser.ts @@ -17,6 +17,7 @@ export interface ParsedMultiAddress { retryBackoffMaxMs: number; } +const DEFAULT_PORT = 6041; const DEFAULT_RETRIES = 5; const DEFAULT_RETRY_BACKOFF_MS = 200; const DEFAULT_RETRY_BACKOFF_MAX_MS = 2000; @@ -43,31 +44,60 @@ export function parseMultiAddressUrl(urlStr: string): ParsedMultiAddress { ); } const scheme = schemeMatch[1].toLowerCase(); - let remaining = urlStr.slice(schemeMatch[0].length); + const afterScheme = schemeMatch[0].length; // position after "ws://" or "wss://" - // Extract userinfo (user:pass@) + // Locate host:port section boundaries + // If user:pass@ exists, @ marks the end of userinfo; host starts after @ + // Otherwise host starts right after "//" let username = ""; let password = ""; - const atIndex = findUserInfoEnd(remaining); + let hostStart: number; + + const atIndex = urlStr.indexOf("@", afterScheme); if (atIndex !== -1) { - const userInfo = remaining.slice(0, atIndex); - remaining = remaining.slice(atIndex + 1); - const colonIndex = userInfo.indexOf(":"); - if (colonIndex !== -1) { - username = userInfo.slice(0, colonIndex); - password = userInfo.slice(colonIndex + 1); + // Verify @ comes before any / or ? (i.e. it's part of userinfo, not path) + const slashIndex = urlStr.indexOf("/", afterScheme); + const questionIndex = urlStr.indexOf("?", afterScheme); + const pathStart = Math.min( + slashIndex === -1 ? urlStr.length : slashIndex, + questionIndex === -1 ? urlStr.length : questionIndex + ); + if (atIndex < pathStart) { + const userInfo = urlStr.slice(afterScheme, atIndex); + const colonIndex = userInfo.indexOf(":"); + if (colonIndex !== -1) { + username = userInfo.slice(0, colonIndex); + password = userInfo.slice(colonIndex + 1); + } else { + username = userInfo; + } + hostStart = atIndex + 1; } else { - username = userInfo; + // @ is in path/query, not userinfo + hostStart = afterScheme; + } + } else { + hostStart = afterScheme; + } + + // Find end of host:port section: first "/" or "?" after hostStart + // For IPv6 brackets we only need to skip content inside [] + let hostEnd = urlStr.length; + let inBracket = false; + for (let i = hostStart; i < urlStr.length; i++) { + const ch = urlStr[i]; + if (ch === "[") inBracket = true; + else if (ch === "]") inBracket = false; + else if ((ch === "/" || ch === "?") && !inBracket) { + hostEnd = i; + break; } } - // Split host part from path+query - // Find the first '/' or '?' that is NOT inside brackets - let hostEnd = findHostEnd(remaining); - const hostPart = remaining.slice(0, hostEnd); - remaining = remaining.slice(hostEnd); + const hostPart = urlStr.slice(hostStart, hostEnd); + const remaining = urlStr.slice(hostEnd); - // Parse hosts + // Parse hosts from the comma-separated host:port section const hosts = parseHosts(hostPart); if (hosts.length === 0) { throw new TDWebSocketClientError( @@ -76,7 +106,7 @@ export function parseMultiAddressUrl(urlStr: string): ParsedMultiAddress { ); } - // Parse path and query + // Parse path and query from remaining let pathname = "/"; let search = ""; const queryIndex = remaining.indexOf("?"); @@ -127,38 +157,6 @@ export function buildUrlForHost(parsed: ParsedMultiAddress, hostIndex: number): return url; } -/** - * Find the '@' that ends userinfo. Must not be inside [] brackets. - */ -function findUserInfoEnd(str: string): number { - let inBracket = false; - for (let i = 0; i < str.length; i++) { - const ch = str[i]; - if (ch === "[") inBracket = true; - else if (ch === "]") inBracket = false; - else if (ch === "@" && !inBracket) return i; - else if (ch === "/" || ch === "?" || ch === "#") { - // No userinfo - return -1; - } - } - return -1; -} - -/** - * Find where the host section ends (first '/' or '?' not inside brackets). - */ -function findHostEnd(str: string): number { - let inBracket = false; - for (let i = 0; i < str.length; i++) { - const ch = str[i]; - if (ch === "[") inBracket = true; - else if (ch === "]") inBracket = false; - else if ((ch === "/" || ch === "?") && !inBracket) return i; - } - return str.length; -} - /** * Parse comma-separated host:port entries. * Supports: hostname:port, IPv4:port, [IPv6]:port @@ -168,10 +166,8 @@ function parseHosts(hostPart: string): HostPort[] { if (!hostPart) return []; const results: HostPort[] = []; - const DEFAULT_PORT = 6041; - // Split by comma, respecting brackets - const segments = splitHostSegments(hostPart); + const segments = splitByComma(hostPart); for (const seg of segments) { const trimmed = seg.trim(); @@ -200,7 +196,7 @@ function parseHosts(hostPart: string): HostPort[] { } results.push({ host, port }); } else { - // IPv4 or hostname + // IPv4 or hostname: use lastIndexOf(":") to find port separator const lastColon = trimmed.lastIndexOf(":"); if (lastColon === -1) { results.push({ host: trimmed, port: DEFAULT_PORT }); @@ -209,7 +205,6 @@ function parseHosts(hostPart: string): HostPort[] { const portStr = trimmed.slice(lastColon + 1); const port = parseInt(portStr, 10); if (isNaN(port)) { - // Might be hostname without port that contains something odd, treat whole as host results.push({ host: trimmed, port: DEFAULT_PORT }); } else { results.push({ host, port }); @@ -221,14 +216,12 @@ function parseHosts(hostPart: string): HostPort[] { return results; } -/** - * Split host part by commas, but not inside brackets. - */ -function splitHostSegments(hostPart: string): string[] { +/** Split string by commas, but not inside [] brackets. */ +function splitByComma(str: string): string[] { const segments: string[] = []; let current = ""; let inBracket = false; - for (const ch of hostPart) { + for (const ch of str) { if (ch === "[") inBracket = true; else if (ch === "]") inBracket = false; if (ch === "," && !inBracket) { diff --git a/nodejs/src/common/utils.ts b/nodejs/src/common/utils.ts index 85b0b5e8..594abfc3 100644 --- a/nodejs/src/common/utils.ts +++ b/nodejs/src/common/utils.ts @@ -3,12 +3,11 @@ import { WSConfig } from "./config"; import { ErrorCode, TDWebSocketClientError } from "./wsError"; import { parseMultiAddressUrl, buildUrlForHost } from "./urlParser"; -export function getUrl(wsConfig: WSConfig): URL { +export function getUrl(wsConfig: WSConfig): string { const rawUrl = wsConfig.getUrl(); - // Try multi-address parsing first + // Parse multi-address URL to apply overrides const parsed = parseMultiAddressUrl(rawUrl); - wsConfig.setParsedMultiAddress(parsed); // Apply user/password overrides from WSConfig if (wsConfig.getUser()) { @@ -56,11 +55,25 @@ export function getUrl(wsConfig: WSConfig): URL { // Set pathname to /ws for WebSocket endpoint parsed.pathname = "/ws"; - // Update the stored parsed multi-address - wsConfig.setParsedMultiAddress(parsed); + // Rebuild the URL string with all overrides applied + const url = buildUrlForHost(parsed, 0); + // For multi-address, reconstruct the full multi-address URL string + if (parsed.hosts.length > 1) { + const hosts = parsed.hosts.map(hp => { + const isIPv6 = hp.host.includes(":"); + return isIPv6 ? `[${hp.host}]:${hp.port}` : `${hp.host}:${hp.port}`; + }).join(","); + let result = `${parsed.scheme}://`; + if (parsed.username || parsed.password) { + result += `${encodeURIComponent(parsed.username)}:${encodeURIComponent(parsed.password)}@`; + } + result += hosts + parsed.pathname; + const search = parsed.searchParams.toString(); + if (search) result += "?" + search; + return result; + } - // Return URL for first host (backward compatibility) - return buildUrlForHost(parsed, 0); + return url.toString(); } export function isEmpty(value: any): boolean { @@ -245,9 +258,6 @@ export function maskTmqConfigForLog(config: TmqConfig): string { }; return JSON.stringify(masked, (key, value) => { switch (key) { - case 'url': - case 'sql_url': - return maskUrlForLog(value); case 'token': case 'password': case 'bearer_token': diff --git a/nodejs/src/sql/wsSql.ts b/nodejs/src/sql/wsSql.ts index 843f2fa0..d92d3d37 100644 --- a/nodejs/src/sql/wsSql.ts +++ b/nodejs/src/sql/wsSql.ts @@ -33,8 +33,8 @@ export class WsSql { // 解析并校验,支持多地址 let url = getUrl(wsConfig); this.wsConfig = wsConfig; - // 传递 ParsedMultiAddress 给 WsClient,支持多地址连接 - this._wsClient = new WsClient(url, wsConfig.getTimeOut(), wsConfig.getParsedMultiAddress()); + // 传递 string URL 给 WsClient,支持多地址连接 + this._wsClient = new WsClient(url, wsConfig.getTimeOut()); } static async open(wsConfig: WSConfig): Promise { diff --git a/nodejs/src/tmq/config.ts b/nodejs/src/tmq/config.ts index c110a02a..74f4922a 100644 --- a/nodejs/src/tmq/config.ts +++ b/nodejs/src/tmq/config.ts @@ -2,8 +2,8 @@ import { TMQConstants } from "./constant"; import { parseMultiAddressUrl, buildUrlForHost, ParsedMultiAddress } from "../common/urlParser"; export class TmqConfig { - url: URL | null = null; - sql_url: URL | null = null; + url: string | null = null; + sql_url: string | null = null; user: string | null = null; password: string | null = null; token: string | null = null; @@ -15,7 +15,6 @@ export class TmqConfig { auto_commit_interval_ms: number = 5 * 1000; timeout: number = 5000; otherConfigs: Map; - parsedMultiAddress: ParsedMultiAddress | null = null; constructor(wsConfig: Map) { this.otherConfigs = new Map(); @@ -61,40 +60,56 @@ export class TmqConfig { if (rawUrl) { // Parse multi-address URL const parsed = parseMultiAddressUrl(rawUrl); - this.parsedMultiAddress = parsed; - - // Build URL from first host for compatibility - this.url = buildUrlForHost(parsed, 0); if (this.user) { - this.url.username = this.user; parsed.username = this.user; } else { - this.user = this.url.username; + this.user = parsed.username || null; } if (this.password) { - this.url.password = this.password; parsed.password = this.password; } else { - this.password = this.url.password; + this.password = parsed.password || null; } if (this.token) { - this.url.searchParams.set("bearer_token", this.token); parsed.searchParams.set("bearer_token", this.token); } else { - const bearerToken = this.url.searchParams.get("bearer_token"); + const bearerToken = parsed.searchParams.get("bearer_token"); if (bearerToken) { this.token = bearerToken; this.otherConfigs.set(TMQConstants.CONNECT_TOKEN, bearerToken); } else { - this.url.searchParams.delete("bearer_token"); + parsed.searchParams.delete("bearer_token"); } } - this.sql_url = new URL(this.url.toString()); - this.sql_url.pathname = "/ws"; - this.url.pathname = "/rest/tmq"; + // Build the multi-address URL strings for TMQ and SQL endpoints + const buildMultiAddrUrl = (parsed: ParsedMultiAddress): string => { + if (parsed.hosts.length > 1) { + const hosts = parsed.hosts.map(hp => { + const isIPv6 = hp.host.includes(":"); + return isIPv6 ? `[${hp.host}]:${hp.port}` : `${hp.host}:${hp.port}`; + }).join(","); + let result = `${parsed.scheme}://`; + if (parsed.username || parsed.password) { + result += `${encodeURIComponent(parsed.username)}:${encodeURIComponent(parsed.password)}@`; + } + result += hosts + parsed.pathname; + const search = parsed.searchParams.toString(); + if (search) result += "?" + search; + return result; + } + return buildUrlForHost(parsed, 0).toString(); + }; + + // SQL endpoint + const sqlParsed = { ...parsed, pathname: "/ws" }; + this.sql_url = buildMultiAddrUrl(sqlParsed); + + // TMQ endpoint + parsed.pathname = "/rest/tmq"; + this.url = buildMultiAddrUrl(parsed); } } } diff --git a/nodejs/src/tmq/wsTmq.ts b/nodejs/src/tmq/wsTmq.ts index bbc649e0..7e7abacd 100644 --- a/nodejs/src/tmq/wsTmq.ts +++ b/nodejs/src/tmq/wsTmq.ts @@ -45,8 +45,7 @@ export class WsConsumer { } this._wsClient = new WsClient( this._wsConfig.url, - this._wsConfig.timeout, - this._wsConfig.parsedMultiAddress || undefined + this._wsConfig.timeout ); this._lastMessageID = BigInt(0); } @@ -57,8 +56,7 @@ export class WsConsumer { if (this._wsConfig.sql_url) { wsSql = new WsClient( this._wsConfig.sql_url, - this._wsConfig.timeout, - this._wsConfig.parsedMultiAddress || undefined + this._wsConfig.timeout ); await wsSql.connect(); await wsSql.checkVersion(); @@ -67,7 +65,7 @@ export class WsConsumer { } else { throw new TDWebSocketClientError( ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL, - `connection creation failed, url: ${maskUrlForLog(this._wsConfig.url)}` + `connection creation failed, url: ${this._wsConfig.url}` ); } } catch (e: any) { diff --git a/nodejs/test/bulkPulling/tmq.config.test.ts b/nodejs/test/bulkPulling/tmq.config.test.ts index a1f84c2d..96ba3122 100644 --- a/nodejs/test/bulkPulling/tmq.config.test.ts +++ b/nodejs/test/bulkPulling/tmq.config.test.ts @@ -26,44 +26,44 @@ describe("TmqConfig - td.connect.token", () => { expect(cfg.token).toBe("mytoken123"); }); - test("bearer_token is appended to url search params when token is provided", () => { + test("bearer_token is appended to url when token is provided", () => { const configMap = new Map([ [TMQConstants.WS_URL, baseUrl], [TMQConstants.CONNECT_TOKEN, "mytoken123"], [TMQConstants.GROUP_ID, "g1"], ]); const cfg = new TmqConfig(configMap); - expect(cfg.url?.searchParams.get("bearer_token")).toBe("mytoken123"); + expect(cfg.url).toContain("bearer_token=mytoken123"); }); - test("bearer_token is appended to sql_url search params when token is provided", () => { + test("bearer_token is appended to sql_url when token is provided", () => { const configMap = new Map([ [TMQConstants.WS_URL, baseUrl], [TMQConstants.CONNECT_TOKEN, "mytoken123"], [TMQConstants.GROUP_ID, "g1"], ]); const cfg = new TmqConfig(configMap); - expect(cfg.sql_url?.searchParams.get("bearer_token")).toBe("mytoken123"); + expect(cfg.sql_url).toContain("bearer_token=mytoken123"); }); - test("sql_url pathname is /ws when token is provided", () => { + test("sql_url contains /ws path when token is provided", () => { const configMap = new Map([ [TMQConstants.WS_URL, baseUrl], [TMQConstants.CONNECT_TOKEN, "mytoken123"], [TMQConstants.GROUP_ID, "g1"], ]); const cfg = new TmqConfig(configMap); - expect(cfg.sql_url?.pathname).toBe("/ws"); + expect(cfg.sql_url).toContain("/ws"); }); - test("url pathname is /rest/tmq when token is provided", () => { + test("url contains /rest/tmq path when token is provided", () => { const configMap = new Map([ [TMQConstants.WS_URL, baseUrl], [TMQConstants.CONNECT_TOKEN, "mytoken123"], [TMQConstants.GROUP_ID, "g1"], ]); const cfg = new TmqConfig(configMap); - expect(cfg.url?.pathname).toBe("/rest/tmq"); + expect(cfg.url).toContain("/rest/tmq"); }); test("bearer_token not set on urls when token is not provided", () => { @@ -74,8 +74,8 @@ describe("TmqConfig - td.connect.token", () => { [TMQConstants.GROUP_ID, "g1"], ]); const cfg = new TmqConfig(configMap); - expect(cfg.url?.searchParams.has("bearer_token")).toBe(false); - expect(cfg.sql_url?.searchParams.has("bearer_token")).toBe(false); + expect(cfg.url).not.toContain("bearer_token"); + expect(cfg.sql_url).not.toContain("bearer_token"); }); test("CONNECT_TOKEN constant value is td.connect.token", () => { diff --git a/nodejs/test/connectionManager.test.ts b/nodejs/test/connectionManager.test.ts index 8f2c4242..4f1e0af1 100644 --- a/nodejs/test/connectionManager.test.ts +++ b/nodejs/test/connectionManager.test.ts @@ -1,35 +1,35 @@ -import { parseMultiAddressUrl, buildUrlForHost } from "../src/common/urlParser"; -import { ConnectionManager } from "../src/client/wsConnectionManager"; +import { WebSocketConnector } from "../src/client/wsConnector"; -describe("ConnectionManager", () => { - test("constructor initializes with random index", () => { - const parsed = parseMultiAddressUrl( - "ws://root:taosdata@host1:6041,host2:6042,host3:6043?retries=2&retry_backoff_ms=100" +describe("WebSocketConnector", () => { + test("constructor initializes with parsed multi-address", () => { + const connector = new WebSocketConnector( + "ws://root:taosdata@host1:6041,host2:6042,host3:6043?retries=2&retry_backoff_ms=100", + 5000 ); - const mgr = new ConnectionManager(parsed, 5000); - expect(mgr).toBeDefined(); - expect(mgr.getParsed()).toBe(parsed); - expect(mgr.isConnected()).toBe(false); - expect(mgr.getConnector()).toBeNull(); - expect(mgr.getInflightCount()).toBe(0); + expect(connector).toBeDefined(); + expect(connector.getParsed().hosts.length).toBe(3); + expect(connector.isConnected()).toBe(false); + expect(connector.getInflightCount()).toBe(0); }); test("getCurrentUrl returns valid URL", () => { - const parsed = parseMultiAddressUrl( - "ws://root:taosdata@localhost:6041,localhost:6042" + const connector = new WebSocketConnector( + "ws://root:taosdata@localhost:6041,localhost:6042", + 5000 ); - const mgr = new ConnectionManager(parsed, 5000); - const url = mgr.getCurrentUrl(); + const url = connector.getCurrentUrl(); expect(url.protocol).toBe("ws:"); expect(url.username).toBe("root"); expect(["6041", "6042"]).toContain(url.port); }); test("trackRequest and completeRequest", () => { - const parsed = parseMultiAddressUrl("ws://root:taosdata@localhost:6041"); - const mgr = new ConnectionManager(parsed, 5000); - - mgr.trackRequest("req1", { + const connector = new WebSocketConnector( + "ws://root:taosdata@localhost:6041", + 5000 + ); + + connector.trackRequest("req1", { id: "req1", type: "text", message: '{"action":"test"}', @@ -37,17 +37,19 @@ describe("ConnectionManager", () => { reject: () => {}, register: true, }); - expect(mgr.getInflightCount()).toBe(1); + expect(connector.getInflightCount()).toBe(1); - mgr.completeRequest("req1"); - expect(mgr.getInflightCount()).toBe(0); + connector.completeRequest("req1"); + expect(connector.getInflightCount()).toBe(0); }); - test("close clears inflight requests", async () => { - const parsed = parseMultiAddressUrl("ws://root:taosdata@localhost:6041"); - const mgr = new ConnectionManager(parsed, 5000); - - mgr.trackRequest("req1", { + test("close clears inflight requests", () => { + const connector = new WebSocketConnector( + "ws://root:taosdata@localhost:6041", + 5000 + ); + + connector.trackRequest("req1", { id: "req1", type: "text", message: '{"action":"test"}', @@ -55,9 +57,9 @@ describe("ConnectionManager", () => { reject: () => {}, register: true, }); - expect(mgr.getInflightCount()).toBe(1); + expect(connector.getInflightCount()).toBe(1); - await mgr.close(); - expect(mgr.getInflightCount()).toBe(0); + connector.close(); + expect(connector.getInflightCount()).toBe(0); }); });