Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ jobs:
repository: "taosdata/TDengine"
path: "TDengine"
ref: "main"
submodules: "recursive"

- name: Install system dependencies
run: |
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ jobs:
env:
TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }}
TDENGINE_CLOUD_TOKEN: ${{ secrets.TDENGINE_CLOUD_TOKEN }}
TEST_3360: true
run: |
npm install
npm list
Expand Down
4 changes: 2 additions & 2 deletions nodejs/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion nodejs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@tdengine/websocket",
"version": "3.2.2",
"version": "3.2.3",
"description": "The websocket Node.js connector for TDengine. TDengine versions 3.3.2.0 and above are recommended to use this connector.",
"source": "index.ts",
"main": "lib/index.js",
Expand Down
1 change: 1 addition & 0 deletions nodejs/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const path = require("path");
const { resolve, parse } = path;

const tsFile = /\/src\/.*.ts$/;

async function dir(folder, ts = []) {
let files = await readdir(folder);
for (let f of files) {
Expand Down
3 changes: 2 additions & 1 deletion nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { WSVersionResponse, WSQueryResponse } from "./wsResponse";
import { ReqId } from "../common/reqid";
import logger from "../common/log";
import { safeDecodeURIComponent, compareVersions } from "../common/utils";
import { safeDecodeURIComponent, compareVersions, CONNECTOR_INFO } from "../common/utils";
import { w3cwebsocket } from "websocket";
import { TSDB_OPTION_CONNECTION } from "../common/constant";

Expand Down Expand Up @@ -41,6 +41,7 @@ export class WsClient {
user: safeDecodeURIComponent(this._url.username),
password: safeDecodeURIComponent(this._url.password),
db: database,
connector: CONNECTOR_INFO,
...(this._timezone && { tz: this._timezone }),
},
};
Expand Down
10 changes: 2 additions & 8 deletions nodejs/src/client/wsConnectorPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,13 @@ process.on("exit", (code) => {
});

process.on("SIGINT", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect..."
);
logger.info("Received SIGINT. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

process.on("SIGTERM", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect"
);
logger.info("Received SIGTERM. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

// process.kill(process.pid, 'SIGINT');
4 changes: 2 additions & 2 deletions nodejs/src/client/wsEventCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ export enum OnMessageType {
}

const eventMutex = new Mutex();

export class WsEventCallback {
private static _instance?: WsEventCallback;
private static _msgActionRegister: Map<MessageId, MessageAction> =
new Map();
private static _msgActionRegister: Map<MessageId, MessageAction> = new Map();
private constructor() { }

public static instance(): WsEventCallback {
Expand Down
7 changes: 3 additions & 4 deletions nodejs/src/client/wsResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/**
* define ws Response type|class, for query?
*/

import { MessageResp, readVarchar } from "../common/taosResult";

export class WSVersionResponse {
Expand All @@ -10,6 +6,7 @@ export class WSVersionResponse {
message: string;
action: string;
totalTime: number;

constructor(resp: MessageResp) {
this.version = resp.msg.version;
this.code = resp.msg.code;
Expand Down Expand Up @@ -41,6 +38,7 @@ export class WSQueryResponse {
this.totalTime = resp.totalTime;
this.initMsg(resp.msg);
}

private initMsg(msg: any) {
this.code = msg.code;
this.message = msg.message;
Expand Down Expand Up @@ -81,6 +79,7 @@ export class WSFetchBlockResponse {
finished: number | undefined;
metaType: number | undefined;
textDecoder: TextDecoder;

constructor(msg: ArrayBuffer) {
let dataView = new DataView(msg);
this.action = dataView.getBigUint64(8, true);
Expand Down
1 change: 0 additions & 1 deletion nodejs/src/common/taosResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import { appendRune } from "./ut8Helper";
import logger from "./log";
import { decimalToString } from "./utils";
import { TMQRawDataSchema } from "../tmq/constant";

export interface TDengineMeta {
name: string;
Expand Down
3 changes: 3 additions & 0 deletions nodejs/src/common/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { WSConfig } from "./config";
import { ErrorCode, TDWebSocketClientError } from "./wsError";
import pkg from "../../package.json";

export const CONNECTOR_INFO = `nodejs-ws-v${pkg.version}-ncid000`;

export function getUrl(wsConfig: WSConfig): URL {
let url = new URL(wsConfig.getUrl());
Expand Down
1 change: 1 addition & 0 deletions nodejs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ let setLogLevel = (level: string) => {
let destroy = () => {
WebSocketConnectionPool.instance().destroyed();
};

export { sqlConnect, tmqConnect, setLogLevel, destroy };
2 changes: 1 addition & 1 deletion nodejs/src/tmq/config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { TMQConstants } from "./constant";

export class TmqConfig {
// req_id: number;
url: URL | null = null;
sql_url: URL | null = null;
user: string | null = null;
Expand Down Expand Up @@ -57,6 +56,7 @@ export class TmqConfig {
} else {
this.user = this.url.username;
}

if (this.password) {
this.url.password = this.password;
} else {
Expand Down
3 changes: 0 additions & 3 deletions nodejs/src/tmq/tmqResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,8 @@ export class WSTmqFetchBlockInfo {
this.withTableName = dataView.getUint8(4) == 1 ? true : false;
this.withSchema = dataView.getUint8(5) == 1 ? true : false;

// let dataBuffer = dataView.buffer.slice(6)
let dataBuffer = new DataView(dataView.buffer, dataView.byteOffset + 6);
let rows = 0;
// const parseStartTime = new Date().getTime();
// console.log("parseBlockInfos blockNum="+ blockNum)
for (let i = 0; i < blockNum; i++) {
let variableInfo = this.parseVariableByteInteger(dataBuffer);
this.taosResult.setPrecision(variableInfo[1].getUint8(17));
Expand Down
4 changes: 3 additions & 1 deletion nodejs/src/tmq/wsTmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ import {
TopicPartition,
WSTmqFetchBlockInfo,
WsPollResponse,
WsTmqQueryResponse,
} from "./tmqResponse";
import { ReqId } from "../common/reqid";
import logger from "../common/log";
import { WSFetchBlockResponse } from "../client/wsResponse";
import { CONNECTOR_INFO } from "../common/utils";

export class WsConsumer {
private _wsClient: WsClient;
private _wsConfig: TmqConfig;
private _topics?: string[];
private _commitTime?: number;
private _lastMessageID?: bigint;

private constructor(wsConfig: Map<string, any>) {
this._wsConfig = new TmqConfig(wsConfig);
logger.debug(this._wsConfig);
Expand Down Expand Up @@ -106,6 +107,7 @@ export class WsConsumer {
auto_commit: this._wsConfig.auto_commit,
auto_commit_interval_ms: this._wsConfig.auto_commit_interval_ms,
config: this._wsConfig.otherConfigs,
connector: CONNECTOR_INFO,
},
};
this._topics = topics;
Expand Down
22 changes: 22 additions & 0 deletions nodejs/test/bulkPulling/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,28 @@ describe("TDWebSocket.WsSql()", () => {

await wsSql.close();
});

const maybeConnectorVersionTest = process.env.TEST_3360 ? test.skip : test;

maybeConnectorVersionTest("connector version info", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
let wsSql = await WsSql.open(conf);
await Sleep(2000);
let wsRows = await wsSql.query("show connections");
let hasNodejsWs = false;
while (await wsRows.next()) {
const data = wsRows.getData();
if (Array.isArray(data) && data.some(v => typeof v === "string" && v.includes("nodejs-ws"))) {
hasNodejsWs = true;
break;
}
}
expect(hasNodejsWs).toBe(true);
await wsRows.close();
await wsSql.close();
});
});

afterAll(async () => {
Expand Down
32 changes: 30 additions & 2 deletions nodejs/test/bulkPulling/tmq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ import { TMQConstants } from "../../src/tmq/constant";
import { WsConsumer } from "../../src/tmq/wsTmq";
import { WSConfig } from "../../src/common/config";
import { WsSql } from "../../src/sql/wsSql";
import { createSTable, insertStable } from "../utils";
import { createSTable, insertStable, Sleep } from "../utils";
import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool";
import logger, { setLevel } from "../../src/common/log";
import { setLevel } from "../../src/common/log";

setLevel("debug");
const stable = "st";
Expand Down Expand Up @@ -398,6 +398,34 @@ describe("TDWebSocket.Tmq()", () => {
await consumer.unsubscribe();
await consumer.close();
});

const maybeConnectorVersionTest = process.env.TEST_3360 ? test.skip : test;

maybeConnectorVersionTest("connector version info", async () => {
let consumer = await WsConsumer.newConsumer(configMap);
await consumer.subscribe(topics);

let conf: WSConfig = new WSConfig("ws://localhost:6041");
conf.setUser("root");
conf.setPwd("taosdata");
let wsSql = await WsSql.open(conf);
await Sleep(2000);

let wsRows = await wsSql.query("show connections");
let count = 0;
while (await wsRows.next()) {
const data = wsRows.getData();
if (Array.isArray(data) && data.some(v => typeof v === "string" && v.includes("nodejs-ws"))) {
count++;
}
}
expect(count).toBeGreaterThanOrEqual(2);
await wsRows.close();
await wsSql.close();

await consumer.unsubscribe();
await consumer.close();
});
});

afterAll(async () => {
Expand Down
2 changes: 1 addition & 1 deletion nodejs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
// "typeRoots": [], /* Specify multiple folders that act like `./node_modules/@types`. */
// "types": [], /* Specify type package names to be included without being referenced in a source file. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
// "resolveJsonModule": true, /* Enable importing .json files */
"resolveJsonModule": true, /* Enable importing .json files */
// "noResolve": true, /* Disallow `import`s, `require`s or `<reference>`s from expanding the number of files TypeScript should add to a project. */

/* JavaScript Support */
Expand Down
Loading