Skip to content
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
with:
node-version: ${{ matrix.node-version }}

- name: Test nodejs websocket
- name: Run tests
working-directory: nodejs-connector/nodejs
env:
TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: nodejs Compatibility
name: Node.js Compatibility

on:
push:
Expand Down
59 changes: 59 additions & 0 deletions .github/workflows/enterprise.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
name: Node.js Enterprise

on:
push:
branches:
- "main"
- "3.0"
pull_request:
branches:
- "main"
- "3.0"

jobs:
build:
runs-on: ubuntu-22.04
strategy:
matrix:
node-version: [16.x, 20.x]
steps:
- name: Get TDengine
run: wget "${{ secrets.NIGHTLY_TDENGINE_ENTERPRISE_BASE_URL }}/tsdb-nightly-3.0.tar.gz?v=$(date +%s)" -O tsdb-nightly-3.0.tar.gz

- name: Install TDengine
run: |
tar -zxf tsdb-nightly-3.0.tar.gz
rm -rf tsdb-nightly-3.0.tar.gz
cd tsdb-nightly-3.0
sudo ./install.sh

- name: Start TDengine
run: |
sudo mkdir -p /etc/taos
sudo mkdir -p /var/log/taos
nohup sudo taosd &
nohup sudo taosadapter &

- name: Checkout
uses: actions/checkout@v4
with:
path: "nodejs-connector"
clean: true
set-safe-directory: true

- name: Use Node.js ${{ matrix.node-version }}
uses: actions/setup-node@v3
with:
node-version: ${{ matrix.node-version }}

- name: Run tests
working-directory: nodejs-connector/nodejs
env:
TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }}
TDENGINE_CLOUD_TOKEN: ${{ secrets.TDENGINE_CLOUD_TOKEN }}
TEST_ENTERPRISE: true
run: |
npm install
npm list
npm run example
npm run test
36 changes: 15 additions & 21 deletions nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ export class WsClient {
private readonly _url: URL;
private static readonly _minVersion = "3.3.2.0";
private _version?: string | undefined | null;
private _bearerToken?: string | undefined | null;

constructor(url: URL, timeout?: number | undefined | null) {
this.checkURL(url);
this._url = url;
this._timeout = timeout;
if (this._url.searchParams.has("timezone")) {
this._timezone =
this._url.searchParams.get("timezone") || undefined;
this._timezone = this._url.searchParams.get("timezone") || undefined;
this._url.searchParams.delete("timezone");
}
if (this._url.searchParams.has("bearer_token")) {
this._bearerToken = this._url.searchParams.get("bearer_token") || undefined;
}
}

async connect(database?: string | undefined | null): Promise<void> {
Expand All @@ -42,6 +45,7 @@ export class WsClient {
password: safeDecodeURIComponent(this._url.password),
db: database,
...(this._timezone && { tz: this._timezone }),
...(this._bearerToken && { bearer_token: this._bearerToken }),
},
};
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -221,19 +225,14 @@ export class WsClient {

async ready(): Promise<void> {
try {
this._wsConnector =
await WebSocketConnectionPool.instance().getConnection(
this._url,
this._timeout
);
this._wsConnector = await WebSocketConnectionPool.instance().getConnection(
this._url,
this._timeout
);
if (this._wsConnector.readyState() !== w3cwebsocket.OPEN) {
await this._wsConnector.ready();
}
logger.debug(
"ready status ",
this._url,
this._wsConnector.readyState()
);
logger.debug("ready status ", this._url, this._wsConnector.readyState());
return;
} catch (e: any) {
logger.error(
Expand Down Expand Up @@ -321,16 +320,11 @@ export class WsClient {
if (this._wsConnector.readyState() !== w3cwebsocket.OPEN) {
await this._wsConnector.ready();
}
let result: any = await this._wsConnector.sendMsg(
JSONBig.stringify(versionMsg)
);
let result: any = await this._wsConnector.sendMsg(JSONBig.stringify(versionMsg));
if (result.msg.code == 0) {
return new WSVersionResponse(result).version;
}
throw new WebSocketInterfaceError(
result.msg.code,
result.msg.message
);
throw new WebSocketInterfaceError(result.msg.code, result.msg.message);
} catch (e: any) {
logger.error(
`connection creation failed, url: ${this._url}, code: ${e.code}, message: ${e.message}`
Expand All @@ -354,8 +348,8 @@ export class WsClient {
}

checkURL(url: URL) {
// Assert is cloud url
if (!url.searchParams.has("token")) {
// Assert token or bearer_token exists, otherwise username and password must exist.
if (!url.searchParams.has("token") && !url.searchParams.has("bearer_token")) {
if (!(url.username || url.password)) {
throw new WebSocketInterfaceError(
ErrorCode.ERR_INVALID_AUTHENTICATION,
Expand Down
16 changes: 5 additions & 11 deletions nodejs/src/client/wsConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ export class WebSocketConnector {
}
);
this._wsConn.onerror = function (err: Error) {
logger.error(
`webSocket connection failed, url: ${this.url}, error: ${err.message}`
);
logger.error(`webSocket connection failed, url: ${this.url.split('?')[0]}, error: ${err.message}`);
};

this._wsConn.onclose = this._onclose;

this._wsConn.onmessage = this._onmessage;
this._wsConn._binaryType = "arraybuffer";
} else {
Expand Down Expand Up @@ -87,9 +83,7 @@ export class WebSocketConnector {

private _onmessage(event: any) {
let data = event.data;
logger.debug(
"wsClient._onMessage()====" + Object.prototype.toString.call(data)
);
logger.debug("wsClient._onMessage()====" + Object.prototype.toString.call(data));
if (Object.prototype.toString.call(data) === "[object ArrayBuffer]") {
let id = new DataView(data, 26, 8).getBigUint64(0, true);
WsEventCallback.instance().handleEventCallback(
Expand Down Expand Up @@ -143,7 +137,7 @@ export class WebSocketConnector {
reject(
new WebSocketQueryError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
`WebSocket connection is not ready,status :${this._wsConn?.readyState}`
`WebSocket connection is not ready, status: ${this._wsConn?.readyState}`
)
);
}
Expand Down Expand Up @@ -181,7 +175,7 @@ export class WebSocketConnector {
reject(
new WebSocketQueryError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
`WebSocket connection is not ready,status :${this._wsConn?.readyState}`
`WebSocket connection is not ready, status: ${this._wsConn?.readyState}`
)
);
}
Expand Down Expand Up @@ -219,7 +213,7 @@ export class WebSocketConnector {
reject(
new WebSocketQueryError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_FAIL,
`WebSocket connection is not ready,status :${this._wsConn?.readyState}`
`WebSocket connection is not ready, status: ${this._wsConn?.readyState}`
)
);
}
Expand Down
44 changes: 10 additions & 34 deletions nodejs/src/client/wsConnectorPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,18 @@ export class WebSocketConnectionPool {
private constructor(maxConnections: number = -1) {
this._maxConnections = maxConnections;
WebSocketConnectionPool.sharedBuffer = new SharedArrayBuffer(4);
WebSocketConnectionPool.sharedArray = new Int32Array(
WebSocketConnectionPool.sharedBuffer
);
WebSocketConnectionPool.sharedArray = new Int32Array(WebSocketConnectionPool.sharedBuffer);
Atomics.store(WebSocketConnectionPool.sharedArray, 0, 0);
}

public static instance(
maxConnections: number = -1
): WebSocketConnectionPool {
public static instance(maxConnections: number = -1): WebSocketConnectionPool {
if (!WebSocketConnectionPool._instance) {
WebSocketConnectionPool._instance = new WebSocketConnectionPool(maxConnections);
}
return WebSocketConnectionPool._instance;
}

async getConnection(
url: URL,
timeout: number | undefined | null
): Promise<WebSocketConnector> {
async getConnection(url: URL, timeout: number | undefined | null): Promise<WebSocketConnector> {
let connectAddr = url.origin.concat(url.pathname).concat(url.search);
let connector: WebSocketConnector | undefined;
const unlock = await mutex.acquire();
Expand All @@ -46,18 +39,13 @@ export class WebSocketConnectionPool {
if (!candidate) {
continue;
}
if (
candidate &&
candidate.readyState() === w3cwebsocket.OPEN
) {
if (candidate && candidate.readyState() === w3cwebsocket.OPEN) {
connector = candidate;
break;
} else if (candidate) {
Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1);
candidate.close();
logger.error(
`getConnection, current connection status fail, url: ${connectAddr}`
);
logger.error(`getConnection, current connection status fail, url: ${connectAddr.split('?')[0]}`);
}
}
}
Expand All @@ -72,8 +60,7 @@ export class WebSocketConnectionPool {

if (
this._maxConnections != -1 &&
Atomics.load(WebSocketConnectionPool.sharedArray, 0) >
this._maxConnections
Atomics.load(WebSocketConnectionPool.sharedArray, 0) > this._maxConnections
) {
throw new TDWebSocketClientError(
ErrorCode.ERR_WEBSOCKET_CONNECTION_ARRIVED_LIMIT,
Expand Down Expand Up @@ -102,9 +89,7 @@ export class WebSocketConnectionPool {
try {
if (connector.readyState() === w3cwebsocket.OPEN) {
let url = connector.getWsURL();
let connectAddr = url.origin
.concat(url.pathname)
.concat(url.search);
let connectAddr = url.origin.concat(url.pathname).concat(url.search);
let connectors = this.pool.get(connectAddr);
if (!connectors) {
connectors = new Array();
Expand All @@ -113,10 +98,7 @@ export class WebSocketConnectionPool {
} else {
connectors.push(connector);
}
logger.info(
"releaseConnection, current connection count:" +
connectors.length
);
logger.info("releaseConnection, current connection count:" + connectors.length);
} else {
Atomics.add(WebSocketConnectionPool.sharedArray, 0, -1);
connector.close();
Expand Down Expand Up @@ -159,19 +141,13 @@ process.on("exit", (code) => {
});

process.on("SIGINT", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect..."
);
logger.info("Received SIGINT. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

process.on("SIGTERM", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect"
);
logger.info("Received SIGTERM. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

// process.kill(process.pid, 'SIGINT');
5 changes: 3 additions & 2 deletions nodejs/src/client/wsEventCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ export enum OnMessageType {
}

const eventMutex = new Mutex();

export class WsEventCallback {
private static _instance?: WsEventCallback;
private static _msgActionRegister: Map<MessageId, MessageAction> =
new Map();
private static _msgActionRegister: Map<MessageId, MessageAction> = new Map();

private constructor() { }

public static instance(): WsEventCallback {
Expand Down
7 changes: 3 additions & 4 deletions nodejs/src/client/wsResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/**
* define ws Response type|class, for query?
*/

import { MessageResp, readVarchar } from "../common/taosResult";

export class WSVersionResponse {
Expand All @@ -10,6 +6,7 @@ export class WSVersionResponse {
message: string;
action: string;
totalTime: number;

constructor(resp: MessageResp) {
this.version = resp.msg.version;
this.code = resp.msg.code;
Expand Down Expand Up @@ -41,6 +38,7 @@ export class WSQueryResponse {
this.totalTime = resp.totalTime;
this.initMsg(resp.msg);
}

private initMsg(msg: any) {
this.code = msg.code;
this.message = msg.message;
Expand Down Expand Up @@ -81,6 +79,7 @@ export class WSFetchBlockResponse {
finished: number | undefined;
metaType: number | undefined;
textDecoder: TextDecoder;

constructor(msg: ArrayBuffer) {
let dataView = new DataView(msg);
this.action = dataView.getBigUint64(8, true);
Expand Down
9 changes: 9 additions & 0 deletions nodejs/src/common/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export class WSConfig {
private _token: string | undefined | null;
private _timezone: string | undefined | null;
private _minStmt2Version: string;
private _bearerToken: string | undefined | null;

constructor(url: string, minStmt2Version?: string) {
this._url = url;
Expand Down Expand Up @@ -75,6 +76,14 @@ export class WSConfig {
this._timezone = timezone;
}

public getBearerToken(): string | undefined | null {
return this._bearerToken;
}

public setBearerToken(token: string) {
this._bearerToken = token;
}

public getMinStmt2Version() {
return this._minStmt2Version;
}
Expand Down
1 change: 0 additions & 1 deletion nodejs/src/common/taosResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import { appendRune } from "./ut8Helper";
import logger from "./log";
import { decimalToString } from "./utils";
import { TMQRawDataSchema } from "../tmq/constant";

export interface TDengineMeta {
name: string;
Expand Down
Loading
Loading