Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/cubejs-api-gateway/src/jwk.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* eslint-disable no-restricted-syntax */
import crypto from 'crypto';
import {
asyncMemoizeBackground,
asyncRetry,
BackgroundMemoizeOptions,
defaultHasher,
getHttpAgentForProxySettings
} from '@cubejs-backend/shared';
import fetch from 'node-fetch';
Expand Down Expand Up @@ -91,7 +91,7 @@ export const createJWKsFetcher = (jwtOptions: JWTOptions, options: JWKsFetcherOp
result,
};
}, {
extractKey: (url) => crypto.createHash('md5').update(url).digest('hex'),
extractKey: (url) => defaultHasher().update(url).digest('hex'),
extractCacheLifetime: ({ lifeTime }) => {
if (lifeTime) {
return lifeTime;
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"typescript": "~5.2.2"
},
"dependencies": {
"@node-rs/xxhash": "^1.7.6",
"@oclif/color": "^0.1.2",
"bytes": "^3.1.2",
"cli-progress": "^3.9.0",
Expand Down
85 changes: 85 additions & 0 deletions packages/cubejs-backend-shared/src/hasher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { xxh3 } from '@node-rs/xxhash';

export interface Hasher {
/**
* @param data - The data to hash (string or Buffer)
*/
update(data: string | Buffer): this;

/**
* @returns The hash value in hex format
*/
digest(encoding: 'hex'): string;

/**
* @returns The hash value as a Buffer
*/
digest(): Buffer;
}

class XxHasher implements Hasher {
private data: Buffer[] = [];

public update(data: string | Buffer): this {
if (typeof data === 'string') {
this.data.push(Buffer.from(data));
} else {
this.data.push(data);
}
return this;
}

public digest(): Buffer;

public digest(encoding: 'hex'): string;

public digest(encoding?: 'hex'): Buffer | string {
const combined = Buffer.concat(this.data);
const hash = xxh3.xxh128(combined);

if (encoding === 'hex') {
return hash.toString(16);
}

/*
* This ensures the Buffer format matches what the old MD5 implementation
* would have returned, maintaining compatibility with code that reads the
* digest as a Buffer.
*/
const buffer = Buffer.alloc(16);
const hashBigInt = BigInt(hash);
// eslint-disable-next-line no-bitwise
buffer.writeBigUInt64BE(hashBigInt >> BigInt(64), 0);
// eslint-disable-next-line no-bitwise
buffer.writeBigUInt64BE(hashBigInt & BigInt('0xFFFFFFFFFFFFFFFF'), 8);
return buffer;
}
}

/**
* Creates a new default hasher instance.
*
* This follows Rust's DefaultHasher pattern and provides a consistent
* hashing interface throughout the Cube.js codebase. The implementation
* uses xxHash (xxh128) for fast, non-cryptographic hashing.
*
* The hasher can be used as a drop-in replacement for crypto.createHash()
* in non-cryptographic contexts.
*
* @example
* ```typescript
* const hash = defaultHasher().update('data').digest('hex');
* ```
*
* @example
* ```typescript
* const buffer = defaultHasher().update(JSON.stringify(obj)).digest();
* ```
*
* @returns A new Hasher instance
*/
export function defaultHasher(): Hasher {
// Future: could check environment variable here to switch implementations
// e.g., process.env.CUBEJS_HASHER_ALGORITHM
return new XxHasher();
}
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ export * from './FileRepository';
export * from './decorators';
export * from './PerfTracker';
export * from './disposedProxy';
export * from './hasher';
8 changes: 4 additions & 4 deletions packages/cubejs-backend-shared/src/promises.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable arrow-body-style,no-restricted-syntax */
import crypto from 'crypto';
import { defaultHasher } from './hasher';
import { LRUCache } from 'lru-cache';

import { Optional } from './type-helpers';
Expand Down Expand Up @@ -282,9 +282,9 @@ export const asyncDebounceFn = <Ret, Arguments>(
});

return async (...args: Arguments[]) => {
const key = crypto.createHash('md5')
.update(args.map((v) => JSON.stringify(v)).join(','))
.digest('hex');
const key = defaultHasher()
.update(args.map((v) => JSON.stringify(v)).join(','))
.digest('hex');

const existing = cache.get(key);
if (existing) {
Expand Down
158 changes: 158 additions & 0 deletions packages/cubejs-backend-shared/test/hasher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { defaultHasher } from '../src';

describe('defaultHasher', () => {
test('should create a hasher instance', () => {
const hasher = defaultHasher();
expect(hasher).toBeDefined();
expect(typeof hasher.update).toBe('function');
expect(typeof hasher.digest).toBe('function');
});

test('should return consistent hex hash for the same input', () => {
const input = 'test data';
const hash1 = defaultHasher().update(input).digest('hex');
const hash2 = defaultHasher().update(input).digest('hex');

expect(hash1).toBe(hash2);
expect(typeof hash1).toBe('string');
expect(hash1.length).toBeGreaterThan(0);
});

test('should return different hashes for different inputs', () => {
const hash1 = defaultHasher().update('input1').digest('hex');
const hash2 = defaultHasher().update('input2').digest('hex');

expect(hash1).not.toBe(hash2);
});

test('should support chaining update calls', () => {
const hash1 = defaultHasher()
.update('part1')
.update('part2')
.digest('hex');

const hash2 = defaultHasher()
.update('part1part2')
.digest('hex');

expect(hash1).toBe(hash2);
});

test('should handle Buffer inputs', () => {
const buffer = Buffer.from('test data');
const hash = defaultHasher().update(buffer).digest('hex');

expect(hash).toBeDefined();
expect(typeof hash).toBe('string');
});

test('should return Buffer when digest is called without encoding', () => {
const hash = defaultHasher().update('test').digest();

expect(Buffer.isBuffer(hash)).toBe(true);
expect(hash.length).toBe(16); // 128 bits = 16 bytes
});

test('should handle JSON stringified objects', () => {
const obj = { key: 'value', nested: { prop: 123 } };
const hash1 = defaultHasher().update(JSON.stringify(obj)).digest('hex');
const hash2 = defaultHasher().update(JSON.stringify(obj)).digest('hex');

expect(hash1).toBe(hash2);
});

test('should handle empty strings', () => {
const hash = defaultHasher().update('').digest('hex');

expect(hash).toBeDefined();
expect(typeof hash).toBe('string');
});

test('should handle large inputs', () => {
const largeString = 'x'.repeat(10000);
const hash = defaultHasher().update(largeString).digest('hex');

expect(hash).toBeDefined();
expect(typeof hash).toBe('string');
});

test('should handle unicode characters', () => {
const unicode = '你好世界 🌍 مرحبا';
const hash = defaultHasher().update(unicode).digest('hex');

expect(hash).toBeDefined();
expect(typeof hash).toBe('string');
});

test('should produce consistent hashes for mixed string and Buffer updates', () => {
const hash1 = defaultHasher()
.update('hello')
.update(Buffer.from('world'))
.digest('hex');

const hash2 = defaultHasher()
.update(Buffer.from('hello'))
.update('world')
.digest('hex');

expect(hash1).toBe(hash2);
});
});

describe('Hasher interface compatibility', () => {
test('should be compatible with crypto.createHash API pattern', () => {
// This tests that the API matches the pattern used to replace crypto.createHash('md5')
const data = JSON.stringify({ test: 'data' });

// Old pattern: crypto.createHash('md5').update(data).digest('hex')
// New pattern: defaultHasher().update(data).digest('hex')
const hash = defaultHasher().update(data).digest('hex');

expect(hash).toBeDefined();
expect(typeof hash).toBe('string');
});

test('should support digest() without encoding for Buffer result', () => {
// Old pattern: crypto.createHash('md5').update(data).digest()
// New pattern: defaultHasher().update(data).digest()
const data = JSON.stringify({ test: 'data' });
const digestBuffer = defaultHasher().update(data).digest();

expect(Buffer.isBuffer(digestBuffer)).toBe(true);
expect(digestBuffer.length).toBe(16);
});

test('should handle the version() function pattern from PreAggregations', () => {
// Testing the pattern: defaultHasher().update(JSON.stringify(cacheKey)).digest()
const cacheKey = ['2024', '01', 'users'];
const digestBuffer = defaultHasher().update(JSON.stringify(cacheKey)).digest();

expect(Buffer.isBuffer(digestBuffer)).toBe(true);

// Should be able to read bytes from the buffer like the old code did
const firstByte = digestBuffer.readUInt8(0);
expect(typeof firstByte).toBe('number');
expect(firstByte).toBeGreaterThanOrEqual(0);
expect(firstByte).toBeLessThanOrEqual(255);
});
});

describe('Hash consistency across different data types', () => {
test('string vs Buffer with same content should produce same hash', () => {
const str = 'test content';
const buf = Buffer.from(str);

const hashFromString = defaultHasher().update(str).digest('hex');
const hashFromBuffer = defaultHasher().update(buf).digest('hex');

expect(hashFromString).toBe(hashFromBuffer);
});

test('Buffer digest should be consistent', () => {
const input = 'consistent test';
const digest1 = defaultHasher().update(input).digest();
const digest2 = defaultHasher().update(input).digest();

expect(digest1.equals(digest2)).toBe(true);
});
});
23 changes: 11 additions & 12 deletions packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import crypto from 'crypto';
import {
QueueDriverInterface,
QueueDriverConnectionInterface,
Expand All @@ -16,13 +15,13 @@ import {
GetActiveAndToProcessResponse,
QueryKeysTuple,
} from '@cubejs-backend/base-driver';
import { getProcessUid } from '@cubejs-backend/shared';
import { defaultHasher, getProcessUid } from '@cubejs-backend/shared';

import { CubeStoreDriver } from './CubeStoreDriver';

function hashQueryKey(queryKey: QueryKey, processUid?: string): QueryKeyHash {
processUid = processUid || getProcessUid();
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
const hash = defaultHasher().update(JSON.stringify(queryKey)).digest('hex');

if (typeof queryKey === 'object' && queryKey.persistent) {
return `${hash}@${processUid}` as any;
Expand Down Expand Up @@ -83,7 +82,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
values.push(JSON.stringify(data));

const rows = await this.driver.query(`QUEUE ADD PRIORITY ?${options.orphanedTimeout ? ' ORPHANED ?' : ''} ? ?`, values);
if (rows && rows.length) {
if (rows?.length) {
return [
rows[0].added === 'true' ? 1 : 0,
rows[0].id ? parseInt(rows[0].id, 10) : null,
Expand All @@ -104,7 +103,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// queryKeyHash as compatibility fallback
queueId || this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'cancelQuery');
}

Expand Down Expand Up @@ -149,7 +148,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
const rows = await this.driver.query('CACHE INCR ?', [
`${this.options.redisQueuePrefix}:PROCESSING_COUNTER`
]);
if (rows && rows.length) {
if (rows?.length) {
return rows[0].value;
}

Expand Down Expand Up @@ -184,7 +183,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
const rows = await this.driver.query('QUEUE RESULT ?', [
this.prefixKey(this.redisHash(queryKey)),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'getResult');
}

Expand Down Expand Up @@ -243,7 +242,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
const rows = await this.driver.query('QUEUE GET ?', [
queueId || this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'getQueryDef');
}

Expand All @@ -269,7 +268,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
this.options.concurrency,
this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
const active = rows[0].active ? (rows[0].active).split(',') as unknown as QueryKeyHash[] : [];
const pending = parseInt(rows[0].pending, 10);

Expand Down Expand Up @@ -300,7 +299,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// queryKeyHash as compatibility fallback
queueId || this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'getResultBlocking');
}

Expand Down Expand Up @@ -346,8 +345,8 @@ export class CubeStoreQueueDriver implements QueueDriverInterface {
return this.connection;
}

// eslint-disable-next-line no-return-assign
return this.connection = await this.driverFactory();
this.connection = await this.driverFactory();
return this.connection;
}

public async createConnection(): Promise<CubestoreQueueDriverConnection> {
Expand Down
Loading