Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 12 additions & 11 deletions nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ export class WsClient {
private readonly _url: URL;
private static readonly _minVersion = "3.3.2.0";
private _version?: string | undefined | null;
private _bearerToken?: string | undefined | null;

constructor(url: URL, timeout?: number | undefined | null) {
this.checkURL(url);
this._url = url;
this._timeout = timeout;
if (this._url.searchParams.has("timezone")) {
this._timezone =
this._url.searchParams.get("timezone") || undefined;
this._timezone = this._url.searchParams.get("timezone") || undefined;
this._url.searchParams.delete("timezone");
}
if (this._url.searchParams.has("bearer_token")) {
this._bearerToken = this._url.searchParams.get("bearer_token") || undefined;
}
}

async connect(database?: string | undefined | null): Promise<void> {
Expand All @@ -42,16 +45,14 @@ export class WsClient {
password: safeDecodeURIComponent(this._url.password),
db: database,
...(this._timezone && { tz: this._timezone }),
...(this._bearerToken && { bearer_token: this._bearerToken }),
},
};
logger.debug(
"[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg)
logger.debug("[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg));
this._wsConnector = await WebSocketConnectionPool.instance().getConnection(
this._url,
this._timeout
);
this._wsConnector =
await WebSocketConnectionPool.instance().getConnection(
this._url,
this._timeout
);
if (this._wsConnector.readyState() === w3cwebsocket.OPEN) {
return;
}
Expand Down Expand Up @@ -351,8 +352,8 @@ export class WsClient {
}

checkURL(url: URL) {
// Assert is cloud url
if (!url.searchParams.has("token")) {
// Assert token or bearer_token exists, otherwise username and password must exist.
if (!url.searchParams.has("token") && !url.searchParams.has("bearer_token")) {
if (!(url.username || url.password)) {
throw new WebSocketInterfaceError(
ErrorCode.ERR_INVALID_AUTHENTICATION,
Expand Down
18 changes: 5 additions & 13 deletions nodejs/src/client/wsConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ export class WebSocketConnector {
private _wsURL: URL;
_timeout = 5000;

// create ws
constructor(url: URL, timeout: number | undefined | null) {
// return w3bsocket3
if (url) {
this._wsURL = url;
let origin = url.origin;
Expand All @@ -36,13 +34,9 @@ export class WebSocketConnector {
}
);
this._wsConn.onerror = function (err: Error) {
logger.error(
`webSocket connection failed, url: ${this.url}, error: ${err.message}`
);
logger.error(`webSocket connection failed, url: ${this.url}, error: ${err.message}`);
};

this._wsConn.onclose = this._onclose;

this._wsConn.onmessage = this._onmessage;
this._wsConn._binaryType = "arraybuffer";
} else {
Expand Down Expand Up @@ -88,9 +82,7 @@ export class WebSocketConnector {

private _onmessage(event: any) {
let data = event.data;
logger.debug(
"wsClient._onMessage()====" + Object.prototype.toString.call(data)
);
logger.debug("wsClient._onMessage()====" + Object.prototype.toString.call(data));
if (Object.prototype.toString.call(data) === "[object ArrayBuffer]") {
let id = new DataView(data, 26, 8).getBigUint64(0, true);
WsEventCallback.instance().handleEventCallback(
Expand Down Expand Up @@ -144,7 +136,7 @@ export class WebSocketConnector {
reject(
new WebSocketQueryError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
`WebSocket connection is not ready,status :${this._wsConn?.readyState}`
`WebSocket connection is not ready, status: ${this._wsConn?.readyState}`
)
);
}
Expand Down Expand Up @@ -181,7 +173,7 @@ export class WebSocketConnector {
reject(
new WebSocketQueryError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
`WebSocket connection is not ready,status :${this._wsConn?.readyState}`
`WebSocket connection is not ready, status: ${this._wsConn?.readyState}`
)
);
}
Expand Down Expand Up @@ -219,7 +211,7 @@ export class WebSocketConnector {
reject(
new WebSocketQueryError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
`WebSocket connection is not ready,status :${this._wsConn?.readyState}`
`WebSocket connection is not ready, status: ${this._wsConn?.readyState}`
)
);
}
Expand Down
48 changes: 11 additions & 37 deletions nodejs/src/client/wsConnectorPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,18 @@ export class WebSocketConnectionPool {
private constructor(maxConnections: number = -1) {
this._maxConnections = maxConnections;
WebSocketConnectionPool.sharedBuffer = new SharedArrayBuffer(4);
WebSocketConnectionPool.sharedArray = new Int32Array(
WebSocketConnectionPool.sharedBuffer
);
WebSocketConnectionPool.sharedArray = new Int32Array(WebSocketConnectionPool.sharedBuffer);
Atomics.store(WebSocketConnectionPool.sharedArray, 0, 0);
}

public static instance(
maxConnections: number = -1
): WebSocketConnectionPool {
public static instance(maxConnections: number = -1): WebSocketConnectionPool {
if (!WebSocketConnectionPool._instance) {
WebSocketConnectionPool._instance = new WebSocketConnectionPool(
maxConnections
);
WebSocketConnectionPool._instance = new WebSocketConnectionPool(maxConnections);
}
return WebSocketConnectionPool._instance;
}

async getConnection(
url: URL,
timeout: number | undefined | null
): Promise<WebSocketConnector> {
async getConnection(url: URL, timeout: number | undefined | null): Promise<WebSocketConnector> {
let connectAddr = url.origin.concat(url.pathname).concat(url.search);
let connector: WebSocketConnector | undefined;
const unlock = await mutex.acquire();
Expand All @@ -48,18 +39,13 @@ export class WebSocketConnectionPool {
if (!candidate) {
continue;
}
if (
candidate &&
candidate.readyState() === w3cwebsocket.OPEN
) {
if (candidate && candidate.readyState() === w3cwebsocket.OPEN) {
connector = candidate;
break;
} else if (candidate) {
Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1);
candidate.close();
logger.error(
`getConnection, current connection status fail, url: ${connectAddr}`
);
logger.error(`getConnection, current connection status fail, url: ${connectAddr}`);
}
}
}
Expand All @@ -74,8 +60,7 @@ export class WebSocketConnectionPool {

if (
this._maxConnections != -1 &&
Atomics.load(WebSocketConnectionPool.sharedArray, 0) >
this._maxConnections
Atomics.load(WebSocketConnectionPool.sharedArray, 0) > this._maxConnections
) {
throw new TDWebSocketClientError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_ARRIVED_LIMIT,
Expand All @@ -102,9 +87,7 @@ export class WebSocketConnectionPool {
try {
if (connector.readyState() === w3cwebsocket.OPEN) {
let url = connector.getWsURL();
let connectAddr = url.origin
.concat(url.pathname)
.concat(url.search);
let connectAddr = url.origin.concat(url.pathname).concat(url.search);
let connectors = this.pool.get(connectAddr);
if (!connectors) {
connectors = new Array();
Expand All @@ -113,10 +96,7 @@ export class WebSocketConnectionPool {
} else {
connectors.push(connector);
}
logger.info(
"releaseConnection, current connection count:" +
connectors.length
);
logger.info("releaseConnection, current connection count:" + connectors.length);
} else {
Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1);
connector.close();
Expand Down Expand Up @@ -159,19 +139,13 @@ process.on("exit", (code) => {
});

process.on("SIGINT", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect..."
);
logger.info("Received SIGINT. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

process.on("SIGTERM", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect"
);
logger.info("Received SIGTERM. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

// process.kill(process.pid, 'SIGINT');
5 changes: 3 additions & 2 deletions nodejs/src/client/wsEventCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ export enum OnMessageType {
}

const eventMutex = new Mutex();

export class WsEventCallback {
private static _instance?: WsEventCallback;
private static _msgActionRegister: Map<MessageId, MessageAction> =
new Map();
private static _msgActionRegister: Map<MessageId, MessageAction> = new Map();

private constructor() { }

public static instance(): WsEventCallback {
Expand Down
7 changes: 3 additions & 4 deletions nodejs/src/client/wsResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/**
* define ws Response type|class, for query?
*/

import { MessageResp, readVarchar } from "../common/taosResult";

export class WSVersionResponse {
Expand All @@ -10,6 +6,7 @@ export class WSVersionResponse {
message: string;
action: string;
totalTime: number;

constructor(resp: MessageResp) {
this.version = resp.msg.version;
this.code = resp.msg.code;
Expand Down Expand Up @@ -41,6 +38,7 @@ export class WSQueryResponse {
this.totalTime = resp.totalTime;
this.initMsg(resp.msg);
}

private initMsg(msg: any) {
this.code = msg.code;
this.message = msg.message;
Expand Down Expand Up @@ -81,6 +79,7 @@ export class WSFetchBlockResponse {
finished: number | undefined;
metaType: number | undefined;
textDecoder: TextDecoder;

constructor(msg: ArrayBuffer) {
let dataView = new DataView(msg);
this.action = dataView.getBigUint64(8, true);
Expand Down
9 changes: 9 additions & 0 deletions nodejs/src/common/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export class WSConfig {
private _token: string | undefined | null;
private _timezone: string | undefined | null;
private _minStmt2Version: string;
private _bearerToken: string | undefined | null;

constructor(url: string, minStmt2Version?: string) {
this._url = url;
Expand Down Expand Up @@ -75,6 +76,14 @@ export class WSConfig {
this._timezone = timezone;
}

public getBearerToken(): string | undefined | null {
return this._bearerToken;
}

public setBearerToken(token: string) {
this._bearerToken = token;
}

public getMinStmt2Version() {
return this._minStmt2Version;
}
Expand Down
1 change: 0 additions & 1 deletion nodejs/src/common/taosResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import { appendRune } from "./ut8Helper";
import logger from "./log";
import { decimalToString } from "./utils";
import { TMQRawDataSchema } from "../tmq/constant";

export interface TDengineMeta {
name: string;
Expand Down
8 changes: 8 additions & 0 deletions nodejs/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ export function getUrl(wsConfig: WSConfig): URL {
wsConfig.setTimezone(url.searchParams.get("timezone") || "");
}

let bearerToken = wsConfig.getBearerToken();
if (bearerToken) {
url.searchParams.set("bearer_token", bearerToken);
}
if (url.searchParams.has("bearer_token")) {
wsConfig.setBearerToken(url.searchParams.get("bearer_token") || "");
}

url.pathname = "/ws";
return url;
}
Expand Down
47 changes: 43 additions & 4 deletions nodejs/test/bulkPulling/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ beforeAll(async () => {
let wsSql = await WsSql.open(conf);
await wsSql.exec("drop database if exists sql_test");
await wsSql.exec("drop database if exists sql_create");
await wsSql.exec(`CREATE USER user1 PASS '${password1}'`);
await wsSql.exec(`CREATE USER user2 PASS '${password2}'`);
await wsSql.exec(`create user user1 pass '${password1}'`);
await wsSql.exec(`create user user2 pass '${password2}'`);
await wsSql.exec("create user token_user pass 'token_pass_1'");
await wsSql.exec(
"create database if not exists sql_test KEEP 3650 DURATION 10 BUFFER 16 WAL_LEVEL 1;"
);
Expand Down Expand Up @@ -265,6 +266,43 @@ describe("TDWebSocket.WsSql()", () => {

await wsSql.close();
});

test("bearer token connect", async () => {
const conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
const wsSql = await WsSql.open(conf);
const wsRows = await wsSql.query("create token test_bearer_token from user token_user");
await wsRows.next();
const token = wsRows.getData()?.[0] as string;
expect(token).toBeTruthy();
await wsRows.close();
await wsSql.close();

const assertServerVersionWithConfig = async (config: WSConfig) => {
const client = await WsSql.open(config);
const rows = await client.query("select server_version()");
await rows.next();
const version = rows.getData()?.[0] as string;
expect(version).toBeTruthy();
await rows.close();
await client.close();
};

const conf1 = new WSConfig(dns);
conf1.setBearerToken(token);
await assertServerVersionWithConfig(conf1);

const conf2 = new WSConfig("ws://localhost:6041?bearer_token=" + token);
await assertServerVersionWithConfig(conf2);
});

test("bearer token connect error", async () => {
const conf = new WSConfig("ws://localhost:6041?bearer_token=invalid_token");
await expect(WsSql.open(conf)).rejects.toMatchObject({
message: expect.stringMatching(/invalid token/i),
});
});
});

afterAll(async () => {
Expand All @@ -274,8 +312,9 @@ afterAll(async () => {
let wsSql = await WsSql.open(conf);
await wsSql.exec("drop database sql_test");
await wsSql.exec("drop database sql_create");
await wsSql.exec("DROP USER user1;");
await wsSql.exec("DROP USER user2;");
await wsSql.exec("drop user user1");
await wsSql.exec("drop user user2");
await wsSql.exec("drop user token_user");
await wsSql.close();
WebSocketConnectionPool.instance().destroyed();
});
Loading