Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nodejs/prepare.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const path = require("path");
const { resolve, parse } = path;

const tsFile = /\/src\/.*.ts$/;

async function dir(folder, ts = []) {
let files = await readdir(folder);
for (let f of files) {
Expand Down
4 changes: 4 additions & 0 deletions nodejs/src/client/wsClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import logger from "../common/log";
import { safeDecodeURIComponent, compareVersions } from "../common/utils";
import { w3cwebsocket } from "websocket";
import { TSDB_OPTION_CONNECTION } from "../common/constant";
import pkg from "../../package.json";

const connectorInfo = `nodejs-ws-v${pkg.version}-ncid000`;

export class WsClient {
private _wsConnector?: WebSocketConnector;
Expand Down Expand Up @@ -41,6 +44,7 @@ export class WsClient {
user: safeDecodeURIComponent(this._url.username),
password: safeDecodeURIComponent(this._url.password),
db: database,
connector: connectorInfo,
...(this._timezone && { tz: this._timezone }),
},
};
Expand Down
10 changes: 2 additions & 8 deletions nodejs/src/client/wsConnectorPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,13 @@ process.on("exit", (code) => {
});

process.on("SIGINT", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect..."
);
logger.info("Received SIGINT. Press Control-D to exit, begin destroy connect...");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

process.on("SIGTERM", () => {
logger.info(
"Received SIGINT. Press Control-D to exit, begin destroy connect"
);
logger.info("Received SIGINT. Press Control-D to exit, begin destroy connect");
WebSocketConnectionPool.instance().destroyed();
process.exit();
});

// process.kill(process.pid, 'SIGINT');
4 changes: 2 additions & 2 deletions nodejs/src/client/wsEventCallback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ export enum OnMessageType {
}

const eventMutex = new Mutex();

export class WsEventCallback {
private static _instance?: WsEventCallback;
private static _msgActionRegister: Map<MessageId, MessageAction> =
new Map();
private static _msgActionRegister: Map<MessageId, MessageAction> = new Map();
private constructor() { }

public static instance(): WsEventCallback {
Expand Down
7 changes: 3 additions & 4 deletions nodejs/src/client/wsResponse.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
/**
* define ws Response type|class, for query?
*/

import { MessageResp, readVarchar } from "../common/taosResult";

export class WSVersionResponse {
Expand All @@ -10,6 +6,7 @@ export class WSVersionResponse {
message: string;
action: string;
totalTime: number;

constructor(resp: MessageResp) {
this.version = resp.msg.version;
this.code = resp.msg.code;
Expand Down Expand Up @@ -41,6 +38,7 @@ export class WSQueryResponse {
this.totalTime = resp.totalTime;
this.initMsg(resp.msg);
}

private initMsg(msg: any) {
this.code = msg.code;
this.message = msg.message;
Expand Down Expand Up @@ -81,6 +79,7 @@ export class WSFetchBlockResponse {
finished: number | undefined;
metaType: number | undefined;
textDecoder: TextDecoder;

constructor(msg: ArrayBuffer) {
let dataView = new DataView(msg);
this.action = dataView.getBigUint64(8, true);
Expand Down
22 changes: 22 additions & 0 deletions nodejs/src/common/log.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
import winston from "winston";
import DailyRotateFile from "winston-daily-rotate-file";

export function redactMessage(msg: any): any {
if (typeof msg === "string") {
return msg
.replace(/("password"\s*:\s*")([^"]*)(")/gi, '$1[REDACTED]$3')
.replace(/(token=)([^&\s]+)/gi, "$1[REDACTED]")
.replace(/(\/\/[^:@\s]+:)([^@/\s]+)(@)/gi, "$1[REDACTED]$3");
}
if (msg && typeof msg === "object") {
const clone: any = Array.isArray(msg) ? [] : {};
for (const [k, v] of Object.entries(msg)) {
if (["password", "token", "bearer_token"].includes(k.toLowerCase())) {
clone[k] = "[REDACTED]";
} else {
clone[k] = redactMessage(v);
}
}
return clone;
}
return msg;
}

const customFormat = winston.format.printf(
({ level, message, label, timestamp }) => {
if (
Expand All @@ -10,6 +31,7 @@ const customFormat = winston.format.printf(
) {
message = (message as any).toJSON();
}
message = redactMessage(message);
return `${timestamp} [${label}] ${level}: ${message}`;
}
);
Expand Down
1 change: 0 additions & 1 deletion nodejs/src/common/taosResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import {
import { appendRune } from "./ut8Helper";
import logger from "./log";
import { decimalToString } from "./utils";
import { TMQRawDataSchema } from "../tmq/constant";

export interface TDengineMeta {
name: string;
Expand Down
1 change: 0 additions & 1 deletion nodejs/src/tmq/config.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { TMQConstants } from "./constant";

export class TmqConfig {
// req_id: number;
url: URL | null = null;
sql_url: URL | null = null;
user: string | null = null;
Expand Down
3 changes: 0 additions & 3 deletions nodejs/src/tmq/tmqResponse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,8 @@ export class WSTmqFetchBlockInfo {
this.withTableName = dataView.getUint8(4) == 1 ? true : false;
this.withSchema = dataView.getUint8(5) == 1 ? true : false;

// let dataBuffer = dataView.buffer.slice(6)
let dataBuffer = new DataView(dataView.buffer, dataView.byteOffset + 6);
let rows = 0;
// const parseStartTime = new Date().getTime();
// console.log("parseBlockInfos blockNum="+ blockNum)
for (let i = 0; i < blockNum; i++) {
let variableInfo = this.parseVariableByteInteger(dataBuffer);
this.taosResult.setPrecision(variableInfo[1].getUint8(17));
Expand Down
2 changes: 1 addition & 1 deletion nodejs/src/tmq/wsTmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
TopicPartition,
WSTmqFetchBlockInfo,
WsPollResponse,
WsTmqQueryResponse,
} from "./tmqResponse";
import { ReqId } from "../common/reqid";
import logger from "../common/log";
Expand All @@ -30,6 +29,7 @@ export class WsConsumer {
private _topics?: string[];
private _commitTime?: number;
private _lastMessageID?: bigint;

private constructor(wsConfig: Map<string, any>) {
this._wsConfig = new TmqConfig(wsConfig);
logger.debug(this._wsConfig);
Expand Down
102 changes: 101 additions & 1 deletion nodejs/test/bulkPulling/log.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import logger, { setLevel } from "../../src/common/log";
import logger, { setLevel, redactMessage } from "../../src/common/log";

describe("log level print", () => {
test("normal connect", async () => {
Expand All @@ -21,3 +21,103 @@ describe("log level print", () => {
expect(isLevel).toEqual(true);
});
});

describe("redact message", () => {
test("redacts password field in JSON-like string", () => {
const input = '{"password": "secret123"}';
const output = redactMessage(input);
expect(output).toBe('{"password": "[REDACTED]"}');
});

test("redacts token in query string", () => {
const input = "https://example.com?token=abcdef123456&other=1";
const output = redactMessage(input);
expect(output).toBe(
"https://example.com?token=[REDACTED]&other=1"
);
});

test("redacts bearer token in query string", () => {
const input = "https://example.com?bearer_token=abcdef123456&other=1";
const output = redactMessage(input);
expect(output).toBe(
"https://example.com?bearer_token=[REDACTED]&other=1"
);
});

test("is case-insensitive for password key", () => {
const input = '{"PassWord": "secret123"}';
const output = redactMessage(input);
expect(output).toBe('{"PassWord": "[REDACTED]"}');
});

test("leaves string without sensitive data unchanged", () => {
const input = "normal message without secrets";
const output = redactMessage(input);
expect(output).toBe(input);
});

test("redacts password and token fields on plain object", () => {
const input = {
user: "u1",
password: "secret",
token: "abc123",
bearer_token: "def456",
other: "keep",
};
const output = redactMessage(input) as any;

expect(output.user).toBe("u1");
expect(output.password).toBe("[REDACTED]");
expect(output.token).toBe("[REDACTED]");
expect(output.bearer_token).toBe("[REDACTED]");
expect(output.other).toBe("keep");
});

test("redacts nested objects recursively", () => {
const input = {
level1: {
password: "p1",
nested: {
token: "t1",
value: 42,
},
},
};
const output = redactMessage(input) as any;

expect(output.level1.password).toBe("[REDACTED]");
expect(output.level1.nested.token).toBe("[REDACTED]");
expect(output.level1.nested.value).toBe(42);
});

test("redacts objects inside array", () => {
const input = [
{ password: "p1" },
{ token: "t2", ok: true },
];
const output = redactMessage(input) as any[];

expect(output[0].password).toBe("[REDACTED]");
expect(output[1].token).toBe("[REDACTED]");
expect(output[1].ok).toBe(true);
});

test("returns same primitive for non-object/non-string", () => {
expect(redactMessage(123)).toBe(123);
expect(redactMessage(null)).toBeNull();
expect(redactMessage(undefined)).toBeUndefined();
});

test("redacts password in ws url user:password@", () => {
const input = "ws://root:taosdata@localhost:6041/ws";
const output = redactMessage(input);
expect(output).toBe("ws://root:[REDACTED]@localhost:6041/ws");
});

test("does not change url without credentials", () => {
const input = "ws://localhost:6041/ws";
const output = redactMessage(input);
expect(output).toBe(input);
});
});
20 changes: 20 additions & 0 deletions nodejs/test/bulkPulling/sql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,26 @@ describe("TDWebSocket.WsSql()", () => {

await wsSql.close();
});

test("connector version info", async () => {
let conf: WSConfig = new WSConfig(dns);
conf.setUser("root");
conf.setPwd("taosdata");
let wsSql = await WsSql.open(conf);
await Sleep(2000);
let wsRows = await wsSql.query("show connections");
let hasNodejsWs = false;
while (await wsRows.next()) {
const data = wsRows.getData();
if (Array.isArray(data) && data.some(v => typeof v === "string" && v.includes("nodejs-ws"))) {
hasNodejsWs = true;
break;
}
}
expect(hasNodejsWs).toBe(true);
await wsRows.close();
await wsSql.close();
});
});

afterAll(async () => {
Expand Down
2 changes: 1 addition & 1 deletion nodejs/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
// "typeRoots": [], /* Specify multiple folders that act like `./node_modules/@types`. */
// "types": [], /* Specify type package names to be included without being referenced in a source file. */
// "allowUmdGlobalAccess": true, /* Allow accessing UMD globals from modules. */
// "resolveJsonModule": true, /* Enable importing .json files */
"resolveJsonModule": true, /* Enable importing .json files */
// "noResolve": true, /* Disallow `import`s, `require`s or `<reference>`s from expanding the number of files TypeScript should add to a project. */

/* JavaScript Support */
Expand Down
Loading