Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions nodejs/src/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ export function decimalToString(
return decimalStr;
}

const SENSITIVE_FIELD_REGEX = /("(?:password|bearer_token)"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g;
const SENSITIVE_FIELD_REGEX =
/("(?:password|bearer_token|td\.connect\.token)"\s*:\s*)"([^"\\]*(?:\\.[^"\\]*)*)"/g;

export function maskSensitiveForLog(message: string): string {
return message.replace(SENSITIVE_FIELD_REGEX, '$1"[REDACTED]"');
Expand All @@ -219,19 +220,19 @@ export function maskUrlForLog(url: URL | null): string {
return masked.toString().replace(/%5BREDACTED%5D/g, "[REDACTED]");
}

export function maskTmqConfigForLog(config: TmqConfig): object {
const masked = { ...config, otherConfigs: Object.fromEntries(config.otherConfigs) };
if (masked.url) {
masked.url = new URL(maskUrlForLog(masked.url));
}
if (masked.sql_url) {
masked.sql_url = new URL(maskUrlForLog(masked.sql_url));
}
export function maskTmqConfigForLog(config: TmqConfig): string {
const masked = {
...config,
otherConfigs: Object.fromEntries(config.otherConfigs)
};
if (masked.token) {
masked.token = "[REDACTED]";
}
if (masked.password) {
masked.password = "[REDACTED]";
}
return masked;
return JSON.stringify(masked, (key, value) =>
(key === "url" || key === "sql_url")
? maskUrlForLog(value) : (key === "td.connect.token" ? "[REDACTED]" : value)
);
}
2 changes: 2 additions & 0 deletions nodejs/src/tmq/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class TmqConfig {
break;
case TMQConstants.CONNECT_TOKEN:
this.token = value;
this.otherConfigs.set(key, value);
break;
case TMQConstants.GROUP_ID:
this.group_id = value;
Expand Down Expand Up @@ -71,6 +72,7 @@ export class TmqConfig {
const bearerToken = this.url.searchParams.get("bearer_token");
if (bearerToken) {
this.token = bearerToken;
this.otherConfigs.set(TMQConstants.CONNECT_TOKEN, bearerToken);
} else {
this.url.searchParams.delete("bearer_token");
}
Expand Down
5 changes: 2 additions & 3 deletions nodejs/src/tmq/wsTmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class WsConsumer {
private constructor(wsConfig: Map<string, any>) {
this._wsConfig = new TmqConfig(wsConfig);
if (logger.isDebugEnabled()) {
logger.debug(maskTmqConfigForLog(this._wsConfig));
logger.debug("WsConsumer config: " + maskTmqConfigForLog(this._wsConfig));
}
if (wsConfig.size == 0 || !this._wsConfig.url) {
throw new WebSocketInterfaceError(
Expand Down Expand Up @@ -102,14 +102,13 @@ export class WsConsumer {
req_id: ReqId.getReqID(reqId),
user: this._wsConfig.user,
password: this._wsConfig.password,
...(this._wsConfig.token && { bearer_token: this._wsConfig.token }),
group_id: this._wsConfig.group_id,
client_id: this._wsConfig.client_id,
topics: topics,
offset_rest: this._wsConfig.offset_rest,
auto_commit: this._wsConfig.auto_commit,
auto_commit_interval_ms: this._wsConfig.auto_commit_interval_ms,
config: this._wsConfig.otherConfigs,
config: Object.fromEntries(this._wsConfig.otherConfigs),
},
};
this._topics = topics;
Expand Down
93 changes: 71 additions & 22 deletions nodejs/test/bulkPulling/tmq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ beforeAll(async () => {
let insertRes = await ws.exec(insert);
insert = insertStable(tableCNValues, stableTags, stable);
insertRes = await ws.exec(insert);
await ws.exec("create user tmq_token_user pass 'token_pass_1'");
await ws.exec(`create topic if not exists ${tokenTopic} as select * from ${db}.${stable}`);
await ws.close();
});
Expand Down Expand Up @@ -398,20 +397,69 @@ describe("TDWebSocket.Tmq()", () => {

testEnterprise("connect with token", async () => {
const conf = new WSConfig(dsn);
conf.setUser(testUsername());
conf.setPwd(testPassword());
const wsSql = await WsSql.open(conf);
const wsRows = await wsSql.query("create token test_tmq_token from user tmq_token_user");
await wsSql.exec("drop token if exists test_tmq_token");
const wsRows = await wsSql.query(`create token test_tmq_token from user ${testUsername()}`);
await wsRows.next();
const token = wsRows.getData()?.[0] as string;
expect(token).toBeTruthy();
await wsRows.close();

const tmqConf = new Map([
[TMQConstants.WS_URL, "ws://localhost:6041"],
[TMQConstants.CONNECT_USER, "invalid_user"],
[TMQConstants.CONNECT_PASS, "invalid_pass"],
[TMQConstants.CONNECT_TOKEN, token],
[TMQConstants.GROUP_ID, "g1101"],
[TMQConstants.CLIENT_ID, "c1101"],
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
[TMQConstants.ENABLE_AUTO_COMMIT, "false"],
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, "1000"],
]);
const consumer = await WsConsumer.newConsumer(tmqConf);
await consumer.subscribe([tokenTopic]);

let count: number = 0;
for (let i = 0; i < 5; i++) {
const res = await consumer.poll(500);
for (const [, value] of res) {
const data = value.getData();
if (data == null || data.length == 0) {
break;
}
count += data.length;
}
}
expect(count).toEqual(10);

await Sleep(3000);
await consumer.unsubscribe();
await consumer.close();
await wsSql.exec("drop token if exists test_tmq_token");
await wsSql.close();
});

testEnterprise("connect with token url", async () => {
const conf = new WSConfig(dsn);
const wsSql = await WsSql.open(conf);
await wsSql.exec("drop token if exists test_tmq_token_url");
const wsRows = await wsSql.query(`create token test_tmq_token_url from user ${testUsername()}`);
await wsRows.next();
const token = wsRows.getData()?.[0] as string;
expect(token).toBeTruthy();
await wsRows.close();

const tokenConfigMap = new Map(configMap);
tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, token);
tokenConfigMap.set(TMQConstants.GROUP_ID, "token_group");
const consumer = await WsConsumer.newConsumer(tokenConfigMap);
const tmqConf = new Map([
[TMQConstants.WS_URL, `ws://localhost:6041?bearer_token=${token}`],
[TMQConstants.CONNECT_USER, "invalid_user"],
[TMQConstants.CONNECT_PASS, "invalid_pass"],
[TMQConstants.GROUP_ID, "g1103"],
[TMQConstants.CLIENT_ID, "c1103"],
[TMQConstants.AUTO_OFFSET_RESET, "earliest"],
[TMQConstants.ENABLE_AUTO_COMMIT, "false"],
[TMQConstants.AUTO_COMMIT_INTERVAL_MS, "1000"],
]);
const consumer = await WsConsumer.newConsumer(tmqConf);
await consumer.subscribe([tokenTopic]);

let count: number = 0;
Expand All @@ -430,33 +478,35 @@ describe("TDWebSocket.Tmq()", () => {
await Sleep(3000);
await consumer.unsubscribe();
await consumer.close();
await wsSql.exec("drop token if exists test_tmq_token_url");
await wsSql.close();
});

testEnterprise("connect with invalid token", async () => {
const tokenConfigMap = new Map([
[TMQConstants.GROUP_ID, "token_group1"],
[TMQConstants.CLIENT_ID, "token_client1"],
const conf = new Map([
[TMQConstants.GROUP_ID, "g1102"],
[TMQConstants.CLIENT_ID, "c1102"],
[TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=invalid_token"],
]);
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
message: expect.stringMatching(/invalid token/i),
});

tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041");
tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, "invalid_token1");
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
conf.set(TMQConstants.WS_URL, "ws://localhost:6041");
conf.set(TMQConstants.CONNECT_TOKEN, "invalid_token1");
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
message: expect.stringMatching(/invalid token/i),
});

tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=");
tokenConfigMap.delete(TMQConstants.CONNECT_TOKEN);
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
conf.set(TMQConstants.WS_URL, "ws://localhost:6041?bearer_token=");
conf.delete(TMQConstants.CONNECT_TOKEN);
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
message: expect.stringMatching(/invalid url/i),
});

tokenConfigMap.set(TMQConstants.WS_URL, "ws://localhost:6041");
tokenConfigMap.set(TMQConstants.CONNECT_TOKEN, "");
await expect(WsConsumer.newConsumer(tokenConfigMap)).rejects.toMatchObject({
conf.set(TMQConstants.WS_URL, "ws://localhost:6041");
conf.set(TMQConstants.CONNECT_TOKEN, "");
await expect(WsConsumer.newConsumer(conf)).rejects.toMatchObject({
message: expect.stringMatching(/invalid url/i),
});
});
Expand All @@ -469,7 +519,6 @@ afterAll(async () => {
await ws.exec(dropTopic);
await ws.exec(`drop topic if exists ${tokenTopic}`);
await ws.exec(dropDB);
await ws.exec("drop user tmq_token_user");
await ws.close();
WebSocketConnectionPool.instance().destroyed();
});
Loading