Skip to content
Merged
8 changes: 5 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ jobs:
repository: "taosdata/TDengine"
path: "TDengine"
ref: "main"
submodules: "recursive"

- name: Install system dependencies
run: |
Expand Down Expand Up @@ -71,9 +70,12 @@ jobs:

- name: Test nodejs websocket
working-directory: nodejs-connector/nodejs
env:
TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }}
TDENGINE_CLOUD_TOKEN: ${{ secrets.TDENGINE_CLOUD_TOKEN }}
TDENGINE_TEST_USERNAME: ${{ secrets.TDENGINE_TEST_USERNAME }}
TDENGINE_TEST_PASSWORD: ${{ secrets.TDENGINE_TEST_PASSWORD }}
run: |
export TDENGINE_CLOUD_URL=${{ secrets.TDENGINE_CLOUD_URL }}
export TDENGINE_CLOUD_TOKEN=${{ secrets.TDENGINE_CLOUD_TOKEN }}
ls -al
npm install
npm list
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ jobs:
env:
TDENGINE_CLOUD_URL: ${{ secrets.TDENGINE_CLOUD_URL }}
TDENGINE_CLOUD_TOKEN: ${{ secrets.TDENGINE_CLOUD_TOKEN }}
TDENGINE_TEST_USERNAME: ${{ secrets.TDENGINE_TEST_USERNAME }}
TDENGINE_TEST_PASSWORD: ${{ secrets.TDENGINE_TEST_PASSWORD }}
run: |
npm install
npm list
Expand Down
6,080 changes: 3,042 additions & 3,038 deletions nodejs/package-lock.json

Large diffs are not rendered by default.

23 changes: 13 additions & 10 deletions nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { WSVersionResponse, WSQueryResponse } from "./wsResponse";
import { ReqId } from "../common/reqid";
import logger from "../common/log";
import { safeDecodeURIComponent, compareVersions } from "../common/utils";
import { safeDecodeURIComponent, compareVersions, maskPasswordForLog } from "../common/utils";
import { w3cwebsocket } from "websocket";
import { TSDB_OPTION_CONNECTION } from "../common/constant";

Expand Down Expand Up @@ -44,14 +44,15 @@ export class WsClient {
...(this._timezone && { tz: this._timezone }),
},
};
logger.debug(
"[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg)
if (logger.isDebugEnabled()) {
logger.debug("[wsClient.connect.connMsg]===>" + JSONBig.stringify(connMsg, (key, value) =>
key === "password" ? "[REDACTED]" : value
));
}
this._wsConnector = await WebSocketConnectionPool.instance().getConnection(
this._url,
this._timeout
);
this._wsConnector =
await WebSocketConnectionPool.instance().getConnection(
this._url,
this._timeout
);
if (this._wsConnector.readyState() === w3cwebsocket.OPEN) {
return;
}
Expand Down Expand Up @@ -120,10 +121,12 @@ export class WsClient {
);
}

// need to construct Response.
// Need to construct Response
async exec(queryMsg: string, bSqlQuery: boolean = true): Promise<any> {
return new Promise((resolve, reject) => {
logger.debug("[wsQueryInterface.query.queryMsg]===>" + queryMsg);
if (logger.isDebugEnabled()) {
logger.debug("[wsQueryInterface.query.queryMsg]===>" + maskPasswordForLog(queryMsg));
}
if (
this._wsConnector &&
this._wsConnector.readyState() === w3cwebsocket.OPEN
Expand Down
16 changes: 8 additions & 8 deletions nodejs/src/client/wsConnector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ import {
import { OnMessageType, WsEventCallback } from "./wsEventCallback";
import logger from "../common/log";
import { ReqId } from "../common/reqid";
import { maskPasswordForLog } from "../common/utils";

export class WebSocketConnector {
private _wsConn: w3cwebsocket;
private _wsURL: URL;
_timeout = 5000;

// create ws
constructor(url: URL, timeout: number | undefined | null) {
// return w3bsocket3
if (url) {
this._wsURL = url;
let origin = url.origin;
Expand Down Expand Up @@ -152,7 +151,9 @@ export class WebSocketConnector {
}

async sendMsg(message: string, register: Boolean = true) {
logger.debug("[wsClient.sendMessage()]===>" + message);
if (logger.isDebugEnabled()) {
logger.debug("[wsClient.sendMessage()]===>" + maskPasswordForLog(message));
}
let msg = JSON.parse(message);
if (msg.args.id !== undefined) {
msg.args.id = BigInt(msg.args.id);
Expand All @@ -166,16 +167,15 @@ export class WebSocketConnector {
action: msg.action,
req_id: msg.args.req_id,
timeout: this._timeout,
id:
msg.args.id === undefined
? msg.args.id
: BigInt(msg.args.id),
id: msg.args.id === undefined ? msg.args.id : BigInt(msg.args.id),
},
resolve,
reject
);
}
logger.debug(`[wsClient.sendMessage.msg]===> ${message}`);
if (logger.isDebugEnabled()) {
logger.debug("[wsClient.sendMessage.msg]===>" + maskPasswordForLog(message));
}
this._wsConn.send(message);
} else {
reject(
Expand Down
18 changes: 9 additions & 9 deletions nodejs/src/client/wsConnectorPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ export class WebSocketConnectionPool {
maxConnections: number = -1
): WebSocketConnectionPool {
if (!WebSocketConnectionPool._instance) {
WebSocketConnectionPool._instance = new WebSocketConnectionPool(
maxConnections
);
WebSocketConnectionPool._instance = new WebSocketConnectionPool(maxConnections);
}
return WebSocketConnectionPool._instance;
}
Expand Down Expand Up @@ -84,12 +82,14 @@ export class WebSocketConnectionPool {
);
}
Atomics.add(WebSocketConnectionPool.sharedArray, 0, 1);
logger.info(
"getConnection, new connection count:" +
Atomics.load(WebSocketConnectionPool.sharedArray, 0) +
", connectAddr:" +
connectAddr
);
if (logger.isInfoEnabled()) {
logger.info(
"getConnection, new connection count:" +
Atomics.load(WebSocketConnectionPool.sharedArray, 0) +
", connectAddr:" +
connectAddr.replace(/(token=)[^&]*/i, "$1[REDACTED]")
);
}
return new WebSocketConnector(url, timeout);
} finally {
unlock();
Expand Down
10 changes: 6 additions & 4 deletions nodejs/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ export function getUrl(wsConfig: WSConfig): URL {

export function isEmpty(value: any): boolean {
if (value === null || value === undefined) return true;
// if (typeof value === 'string' && value.trim() === '') return true;
if (Array.isArray(value) && value.length === 0) return true;
// if (typeof value === 'object' && Object.keys(value).length === 0) return true;
return false;
}

Expand All @@ -46,8 +44,6 @@ export function getBinarySql(
resultId: bigint,
sql?: string
): ArrayBuffer {
// construct msg

if (sql) {
const encoder = new TextEncoder();
const buffer = encoder.encode(sql);
Expand Down Expand Up @@ -187,3 +183,9 @@ export function decimalToString(
}
return decimalStr;
}

const PASSWORD_FIELD_REGEX = /("password"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g;

export function maskPasswordForLog(message: string): string {
return message.replace(PASSWORD_FIELD_REGEX, '$1"[REDACTED]"');
}
5 changes: 1 addition & 4 deletions nodejs/src/tmq/tmqResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,10 @@ export class WSTmqFetchBlockInfo {
this.withTableName = dataView.getUint8(4) == 1 ? true : false;
this.withSchema = dataView.getUint8(5) == 1 ? true : false;

// let dataBuffer = dataView.buffer.slice(6)
let dataBuffer = new DataView(dataView.buffer, dataView.byteOffset + 6);
let rows = 0;
// const parseStartTime = new Date().getTime();
// console.log("parseBlockInfos blockNum="+ blockNum)
for (let i = 0; i < blockNum; i++) {
let variableInfo = this.parseVariableByteInteger(dataBuffer);
const variableInfo = this.parseVariableByteInteger(dataBuffer);
this.taosResult.setPrecision(variableInfo[1].getUint8(17));
dataView = new DataView(
variableInfo[1].buffer,
Expand Down
7 changes: 4 additions & 3 deletions nodejs/test/bulkPulling/cloud.tmq.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { TMQConstants } from "../../src/tmq/constant";
import { WsConsumer } from "../../src/tmq/wsTmq";
import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool";
import logger, { setLevel } from "../../src/common/log";
import { setLevel } from "../../src/common/log";
import { WSConfig } from "../../src/common/config";
import { WsSql } from "../../src/sql/wsSql";
import { testPassword, testUsername } from "../utils";

beforeAll(async () => {
const url = `wss://${process.env.TDENGINE_CLOUD_URL}?token=${process.env.TDENGINE_CLOUD_TOKEN}`;
let wsSql = null;
try {
const conf = new WSConfig(url);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
wsSql = await WsSql.open(conf);
let sql = `INSERT INTO dmeters.d1001 USING dmeters.meters (groupid, location) TAGS(2, 'SanFrancisco')
VALUES (NOW + 1a, 10.30000, 219, 0.31000) (NOW + 2a, 12.60000, 218, 0.33000) (NOW + 3a, 12.30000, 221, 0.31000)
Expand Down
20 changes: 10 additions & 10 deletions nodejs/test/bulkPulling/decimal.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool";
import { WSConfig } from "../../src/common/config";
import { WsSql } from "../../src/sql/wsSql";
import { Sleep } from "../utils";
import logger, { setLevel } from "../../src/common/log";
import { Sleep, testPassword, testUsername } from "../utils";
import { setLevel } from "../../src/common/log";
import { TMQConstants } from "../../src/tmq/constant";
import { WsConsumer } from "../../src/tmq/wsTmq";

Expand All @@ -12,8 +12,8 @@ let dropTopic = `DROP TOPIC IF EXISTS topic_decimal_test;`;
setLevel("debug");
beforeAll(async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
let wsSql = await WsSql.open(conf);
await wsSql.exec(dropTopic);
await wsSql.exec("drop database if exists power");
Expand Down Expand Up @@ -64,8 +64,8 @@ describe("TDWebSocket.WsSql()", () => {

test("insert recoder", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
let wsSql = await WsSql.open(conf);
let taosResult = await wsSql.exec("use power");
console.log(taosResult);
Expand Down Expand Up @@ -108,8 +108,8 @@ describe("TDWebSocket.WsSql()", () => {
test("normal Subscribe", async () => {
let configMap = new Map([
[TMQConstants.GROUP_ID, "gId"],
[TMQConstants.CONNECT_USER, "root"],
[TMQConstants.CONNECT_PASS, "taosdata"],
[TMQConstants.CONNECT_USER, testUsername()],
[TMQConstants.CONNECT_PASS, testPassword()],
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
[TMQConstants.CLIENT_ID, "test_tmq_client"],
[TMQConstants.WS_URL, dns],
Expand Down Expand Up @@ -159,8 +159,8 @@ test("normal Subscribe", async () => {

afterAll(async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
let wsSql = await WsSql.open(conf);
await wsSql.exec(dropTopic);
await wsSql.exec("drop database power");
Expand Down
4 changes: 3 additions & 1 deletion nodejs/test/bulkPulling/queryTables.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@ import {
jsonMeta,
tableMeta,
tagMeta,
testPassword,
testUsername,
} from "../utils";

let dsn = "ws://root:taosdata@localhost:6041";
let dsn = `ws://${testUsername()}:${testPassword()}@localhost:6041`;
let conf: WSConfig = new WSConfig(dsn);
const resultMap: Map<string, any> = new Map();
resultMap.set(
Expand Down
30 changes: 16 additions & 14 deletions nodejs/test/bulkPulling/schemaless.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ import { WebSocketConnectionPool } from "../../src/client/wsConnectorPool";
import { WSConfig } from "../../src/common/config";
import { Precision, SchemalessProto } from "../../src/sql/wsProto";
import { WsSql } from "../../src/sql/wsSql";
import { testPassword, testUsername } from "../utils";

let dns = "ws://localhost:6041";

beforeAll(async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());

let wsSql = await WsSql.open(conf);
await wsSql.exec("drop database if exists power_schemaless;");
Expand All @@ -30,8 +32,8 @@ describe("TDWebSocket.WsSchemaless()", () => {

test("normal connect", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
conf.setDb("power_schemaless");
let wsSchemaless = await WsSql.open(conf);
expect(wsSchemaless.state()).toBeGreaterThan(0);
Expand All @@ -43,8 +45,8 @@ describe("TDWebSocket.WsSchemaless()", () => {
let wsSchemaless = null;
try {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
conf.setDb("jest");
wsSchemaless = await WsSql.open(conf);
} catch (e: any) {
Expand All @@ -59,8 +61,8 @@ describe("TDWebSocket.WsSchemaless()", () => {

test("normal insert", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
conf.setDb("power_schemaless");
let wsSchemaless = await WsSql.open(conf);
expect(wsSchemaless.state()).toBeGreaterThan(0);
Expand All @@ -87,8 +89,8 @@ describe("TDWebSocket.WsSchemaless()", () => {

test("normal wsSql insert", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
conf.setDb("power_schemaless");
let wsSchemaless = await WsSql.open(conf);
expect(wsSchemaless.state()).toBeGreaterThan(0);
Expand Down Expand Up @@ -122,8 +124,8 @@ describe("TDWebSocket.WsSchemaless()", () => {

test("SchemalessProto error", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());
conf.setDb("power_schemaless");
let wsSchemaless = await WsSql.open(conf);
expect(wsSchemaless.state()).toBeGreaterThan(0);
Expand All @@ -144,8 +146,8 @@ describe("TDWebSocket.WsSchemaless()", () => {

afterAll(async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
conf.setUser(testUsername());
conf.setPwd(testPassword());

let wsSql = await WsSql.open(conf);
await wsSql.exec("drop database if exists power_schemaless;");
Expand Down
Loading
Loading