Skip to content

feat(server-core, query-orchestrator): Use the fastest xxhash for cache keys and model versions #9397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/cubejs-backend-shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
"typescript": "~5.2.2"
},
"dependencies": {
"@node-rs/xxhash": "^1.7.6",
"@oclif/color": "^0.1.2",
"bytes": "^3.1.2",
"cli-progress": "^3.9.0",
Expand Down
7 changes: 2 additions & 5 deletions packages/cubejs-backend-shared/src/promises.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/* eslint-disable arrow-body-style,no-restricted-syntax */
import crypto from 'crypto';

import { xxh3 } from '@node-rs/xxhash';
import { Optional } from './type-helpers';

export type PromiseLock = {
Expand Down Expand Up @@ -273,9 +272,7 @@ export const asyncDebounce = <Ret, Arguments>(
const cache = new Map<string, Promise<Ret>>();

return async (...args: Arguments[]) => {
const key = crypto.createHash('md5')
.update(args.map((v) => JSON.stringify(v)).join(','))
.digest('hex');
const key = xxh3.xxh64(args.map((v) => JSON.stringify(v)).join(',')).toString(16);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make a function somewhere in @cubejs-backend-shared, with a name like defaultHasherHex16 (or defaultHasher and a tiny interface over it) and use it everywhere in the code instead of defining and requiring @node-rs/xxhash in each package.

This approach is how the Rust language does it: DefaultHasher. It will allow us to change it on the fly for testing via envs or replace the implementation whenever necessary.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely agree! Great point!


if (cache.has(key)) {
return <Promise<Ret>>cache.get(key);
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-cubestore-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"@cubejs-backend/cubestore": "1.2.30",
"@cubejs-backend/native": "1.2.30",
"@cubejs-backend/shared": "1.2.30",
"@node-rs/xxhash": "^1.7.6",
"csv-write-stream": "^2.0.0",
"flatbuffers": "23.3.3",
"fs-extra": "^9.1.0",
Expand Down
22 changes: 11 additions & 11 deletions packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import crypto from 'crypto';
import { xxh3 } from '@node-rs/xxhash';
import {
QueueDriverInterface,
QueueDriverConnectionInterface,
Expand All @@ -22,7 +22,7 @@ import { CubeStoreDriver } from './CubeStoreDriver';

function hashQueryKey(queryKey: QueryKey, processUid?: string): QueryKeyHash {
processUid = processUid || getProcessUid();
const hash = crypto.createHash('md5').update(JSON.stringify(queryKey)).digest('hex');
const hash = xxh3.xxh128(JSON.stringify(queryKey)).toString(16);

if (typeof queryKey === 'object' && queryKey.persistent) {
return `${hash}@${processUid}` as any;
Expand Down Expand Up @@ -83,7 +83,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
values.push(JSON.stringify(data));

const rows = await this.driver.query(`QUEUE ADD PRIORITY ?${options.orphanedTimeout ? ' ORPHANED ?' : ''} ? ?`, values);
if (rows && rows.length) {
if (rows?.length) {
return [
rows[0].added === 'true' ? 1 : 0,
rows[0].id ? parseInt(rows[0].id, 10) : null,
Expand All @@ -104,7 +104,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// queryKeyHash as compatibility fallback
queueId || this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'cancelQuery');
}

Expand Down Expand Up @@ -149,7 +149,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
const rows = await this.driver.query('CACHE INCR ?', [
`${this.options.redisQueuePrefix}:PROCESSING_COUNTER`
]);
if (rows && rows.length) {
if (rows?.length) {
return rows[0].value;
}

Expand Down Expand Up @@ -186,7 +186,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
const rows = await this.driver.query('QUEUE RESULT ?', [
this.prefixKey(this.redisHash(queryKey)),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'getResult');
}

Expand Down Expand Up @@ -245,7 +245,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
const rows = await this.driver.query('QUEUE GET ?', [
queueId || this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'getQueryDef');
}

Expand All @@ -271,7 +271,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
this.options.concurrency,
this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
const active = rows[0].active ? (rows[0].active).split(',') as unknown as QueryKeyHash[] : [];
const pending = parseInt(rows[0].pending, 10);

Expand Down Expand Up @@ -302,7 +302,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
// queryKeyHash as compatibility fallback
queueId || this.prefixKey(hash),
]);
if (rows && rows.length) {
if (rows?.length) {
return this.decodeQueryDefFromRow(rows[0], 'getResultBlocking');
}

Expand Down Expand Up @@ -348,8 +348,8 @@ export class CubeStoreQueueDriver implements QueueDriverInterface {
return this.connection;
}

// eslint-disable-next-line no-return-assign
return this.connection = await this.driverFactory();
this.connection = await this.driverFactory();
return this.connection;
}

public async createConnection(): Promise<CubestoreQueueDriverConnection> {
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-query-orchestrator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@cubejs-backend/base-driver": "1.2.30",
"@cubejs-backend/cubestore-driver": "1.2.30",
"@cubejs-backend/shared": "1.2.30",
"@node-rs/xxhash": "^1.7.6",
"csv-write-stream": "^2.0.0",
"generic-pool": "^3.8.2",
"lru-cache": "^5.1.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ export function version(cacheKey) {
let result = '';

const hashCharset = 'abcdefghijklmnopqrstuvwxyz012345';
// TODO: switch to use '@node-rs/xxhash' instead of crypto
const digestBuffer = crypto.createHash('md5').update(JSON.stringify(cacheKey)).digest();

let residue = 0;
Expand Down
15 changes: 5 additions & 10 deletions packages/cubejs-query-orchestrator/src/orchestrator/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import crypto from 'crypto';
import { xxh3 } from '@node-rs/xxhash';

import { getProcessUid } from '@cubejs-backend/shared';
import { QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver';
Expand All @@ -18,16 +18,11 @@ export function getCacheHash(queryKey: QueryKey | CacheKey, processUid?: string)
return queryKey as any;
}

const hash = xxh3.xxh128(JSON.stringify(queryKey)).toString(16);

if (typeof queryKey === 'object' && 'persistent' in queryKey && queryKey.persistent) {
return `${crypto
.createHash('md5')
.update(JSON.stringify(queryKey))
.digest('hex')
}@${processUid}` as any;
return `${hash}@${processUid}` as any;
} else {
return crypto
.createHash('md5')
.update(JSON.stringify(queryKey))
.digest('hex') as any;
return hash as any;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
expect(processUidRE.test(key3.split('@')[1])).toBeTruthy();

if (options.cacheAndQueueDriver === 'cubestore') {
expect(queue.redisHash('string')).toBe('095d71cf12556b9d5e330ad575b3df5d');
expect(queue.redisHash('string')).toBe('7aeaa28c6474df82129b9fe0d65c3ae4');
} else {
expect(queue.redisHash('string')).toBe('string');
}
Expand Down
1 change: 1 addition & 0 deletions packages/cubejs-server-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"@cubejs-backend/schema-compiler": "1.2.30",
"@cubejs-backend/shared": "1.2.30",
"@cubejs-backend/templates": "1.2.30",
"@node-rs/xxhash": "^1.7.6",
"codesandbox-import-utils": "^2.1.12",
"cross-spawn": "^7.0.1",
"fs-extra": "^8.1.0",
Expand Down
17 changes: 7 additions & 10 deletions packages/cubejs-server-core/src/core/CompilerApi.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import crypto from 'crypto';
import { xxh3 } from '@node-rs/xxhash';
import R from 'ramda';
import { createQuery, compile, queryClass, PreAggregations, QueryFactory } from '@cubejs-backend/schema-compiler';
import { v4 as uuidv4, parse as uuidParse, stringify as uuidStringify } from 'uuid';
Expand Down Expand Up @@ -55,7 +55,7 @@

if (this.options.devServer || this.options.fastReload) {
const files = await this.repository.dataSchemaFiles();
compilerVersion += `_${crypto.createHash('md5').update(JSON.stringify(files)).digest('hex')}`;
compilerVersion += `_${xxh3.xxh64(JSON.stringify(files)).toString(16)}`;
}

if (!this.compilers || this.compilerVersion !== compilerVersion) {
Expand Down Expand Up @@ -224,7 +224,7 @@

hashRequestContext(context) {
if (!context.__hash) {
context.__hash = crypto.createHash('md5').update(JSON.stringify(context)).digest('hex');
context.__hash = xxh3.xxh64(JSON.stringify(context)).toString(16);

Check warning on line 227 in packages/cubejs-server-core/src/core/CompilerApi.js

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-server-core/src/core/CompilerApi.js#L227

Added line #L227 was not covered by tests
}
return context.__hash;
}
Expand Down Expand Up @@ -505,7 +505,7 @@
const visibiliyMask = JSON.stringify(isMemberVisibleInContext, Object.keys(isMemberVisibleInContext).sort());
// This hash will be returned along the modified meta config and can be used
// to distinguish between different "schema versions" after DAP visibility is applied
const visibilityMaskHash = crypto.createHash('sha256').update(visibiliyMask).digest('hex');
const visibilityMaskHash = xxh3.xxh64(visibiliyMask).toString(16);

Check warning on line 508 in packages/cubejs-server-core/src/core/CompilerApi.js

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-server-core/src/core/CompilerApi.js#L508

Added line #L508 was not covered by tests

return {
cubes: cubes
Expand All @@ -523,10 +523,7 @@
}

mixInVisibilityMaskHash(compilerId, visibilityMaskHash) {
const uuidBytes = uuidParse(compilerId);
const hashBytes = Buffer.from(visibilityMaskHash, 'hex');
return uuidv4({ random: crypto.createHash('sha256').update(uuidBytes).update(hashBytes).digest()
.subarray(0, 16) });
return uuidv4({ random: Buffer.from(xxh3.xxh64(`${compilerId}${visibilityMaskHash}`).toString()) });

Check warning on line 526 in packages/cubejs-server-core/src/core/CompilerApi.js

View check run for this annotation

Codecov / codecov/patch

packages/cubejs-server-core/src/core/CompilerApi.js#L526

Added line #L526 was not covered by tests
}

async metaConfig(requestContext, options = {}) {
Expand All @@ -542,8 +539,8 @@
return {
cubes: patchedCubes,
// This compilerId is primarily used by the cubejs-backend-native or caching purposes.
// By default it doesn't account for member visibility changes introduced above by DAP.
// Here we're modifying the originila compilerId in a way that it's distinct for
// By default, it doesn't account for member visibility changes introduced above by DAP.
// Here we're modifying the original compilerId in a way that it's distinct for
// distinct schema versions while still being a valid UUID.
compilerId: visibilityMaskHash ? this.mixInVisibilityMaskHash(compilers.compilerId, visibilityMaskHash) : compilers.compilerId,
};
Expand Down
7 changes: 2 additions & 5 deletions packages/cubejs-server-core/src/core/RefreshScheduler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import R from 'ramda';
import pLimit from 'p-limit';
import { v4 as uuidv4 } from 'uuid';
import crypto from 'crypto';
import { xxh3 } from '@node-rs/xxhash';
import { Required } from '@cubejs-backend/shared';
import {
PreAggregationDescription,
Expand Down Expand Up @@ -127,10 +127,7 @@ function getPreAggsJobsList(
* Returns MD5 hash token of the job object.
*/
function getPreAggJobToken(job: PreAggJob) {
return crypto
.createHash('md5')
.update(JSON.stringify(job))
.digest('hex');
return xxh3.xxh64(JSON.stringify(job)).toString(16);
}

export class RefreshScheduler {
Expand Down
10 changes: 5 additions & 5 deletions packages/cubejs-testing/test/smoke-cubesql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,23 @@ describe('SQL API', () => {
}

it('regular query', async () => {
expect(await generateSql(`SELECT SUM(totalAmount) AS total FROM Orders;`)).toMatchSnapshot();
expect(await generateSql('SELECT SUM(totalAmount) AS total FROM Orders;')).toMatchSnapshot();
});

it('regular query with missing column', async () => {
expect(await generateSql(`SELECT SUM(foobar) AS total FROM Orders;`)).toMatchSnapshot();
expect(await generateSql('SELECT SUM(foobar) AS total FROM Orders;')).toMatchSnapshot();
});

it('regular query with parameters', async () => {
expect(await generateSql(`SELECT SUM(totalAmount) AS total FROM Orders WHERE status = 'foo';`)).toMatchSnapshot();
expect(await generateSql('SELECT SUM(totalAmount) AS total FROM Orders WHERE status = \'foo\';')).toMatchSnapshot();
});

it('strictly post-processing', async () => {
expect(await generateSql(`SELECT version();`)).toMatchSnapshot();
expect(await generateSql('SELECT version();')).toMatchSnapshot();
});

it('strictly post-processing with disabled post-processing', async () => {
expect(await generateSql(`SELECT version();`, true)).toMatchSnapshot();
expect(await generateSql('SELECT version();', true)).toMatchSnapshot();
});

it('double aggregation post-processing', async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-testing/test/smoke-duckdb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ describe('duckdb', () => {
]
});

// There are 2 'processed' orders
// There are 2 'processed' orders
expect(response.rawData()[0]['Orders.count']).toBe('2');
});

Expand Down
Loading
Loading