Skip to content

Commit a900c78

Browse files
authored
fix: Report more accurate time to APM on heavy used deployments (#9667)
The idea behind this change: When an async function is called in an async function with await, the real execution can be delayed to microtasking in Node.js. It results in incorrect timestamps, which I see in APM.
1 parent 0914e1e commit a900c78

File tree

6 files changed

+30
-18
lines changed

6 files changed

+30
-18
lines changed

packages/cubejs-backend-shared/src/track.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@ import { internalExceptions } from './errors';
55

66
export type BaseEvent = {
77
event: string,
8+
// It's possible to fill timestamp at the place of logging, otherwise, it will be filled in automatically
9+
timestamp?: string,
810
[key: string]: any,
911
};
1012

11-
export type Event = BaseEvent & {
13+
export type Event = {
1214
id: string,
1315
clientTimestamp: string,
1416
anonymousId: string,
1517
platform: string,
18+
arch: string,
1619
nodeVersion: string,
1720
sentFrom: 'backend';
1821
};
@@ -79,8 +82,8 @@ export async function track(opts: BaseEvent) {
7982

8083
trackEvents.push({
8184
...opts,
85+
clientTimestamp: opts.timestamp || new Date().toJSON(),
8286
id: crypto.randomBytes(16).toString('hex'),
83-
clientTimestamp: new Date().toJSON(),
8487
platform: process.platform,
8588
arch: process.arch,
8689
nodeVersion: process.version,

packages/cubejs-server-core/src/core/DevServer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import jwt from 'jsonwebtoken';
1111
import isDocker from 'is-docker';
1212
import type { Application as ExpressApplication, Request, Response } from 'express';
1313
import type { ChildProcess } from 'child_process';
14-
import { executeCommand, getEnv, keyByDataSource, packageExists } from '@cubejs-backend/shared';
14+
import { executeCommand, getAnonymousId, getEnv, keyByDataSource, packageExists } from '@cubejs-backend/shared';
1515
import crypto from 'crypto';
1616

1717
import type { BaseDriver } from '@cubejs-backend/query-orchestrator';
@@ -115,7 +115,7 @@ export class DevServer {
115115
res.json({
116116
cubejsToken,
117117
basePath: options.basePath,
118-
anonymousId: this.cubejsServer.anonymousId,
118+
anonymousId: getAnonymousId(),
119119
coreServerVersion: this.cubejsServer.coreServerVersion,
120120
dockerVersion: this.options.dockerVersion || null,
121121
projectFingerprint: this.cubejsServer.projectFingerprint,

packages/cubejs-server-core/src/core/agentCollect.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import crypto from 'crypto';
88
import WebSocket from 'ws';
99
import zlib from 'zlib';
1010
import { promisify } from 'util';
11+
import { LoggerFnParams, LoggerFn } from './types';
1112

1213
const deflate = promisify(zlib.deflate);
1314
interface AgentTransport {
@@ -175,11 +176,11 @@ const clearTransport = () => {
175176
agentInterval = null;
176177
};
177178

178-
export default async (event: Record<string, any>, endpointUrl: string, logger: any) => {
179+
export const agentCollect = async (event: LoggerFnParams, endpointUrl: string, logger: LoggerFn) => {
179180
trackEvents.push({
181+
timestamp: new Date().toJSON(),
180182
...event,
181183
id: crypto.randomBytes(16).toString('hex'),
182-
timestamp: new Date().toJSON(),
183184
instanceId: getEnv('instanceId'),
184185
});
185186
lastEvent = new Date();
@@ -227,3 +228,5 @@ export default async (event: Record<string, any>, endpointUrl: string, logger: a
227228
}, getEnv('agentFlushInterval'));
228229
}
229230
};
231+
232+
export default agentCollect;

packages/cubejs-server-core/src/core/server.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import {
1414
CancelableInterval,
1515
createCancelableInterval,
1616
formatDuration,
17-
getAnonymousId,
1817
getEnv,
1918
assertDataSource,
2019
getRealType,
@@ -31,7 +30,7 @@ import { RefreshScheduler, ScheduledRefreshOptions } from './RefreshScheduler';
3130
import { OrchestratorApi, OrchestratorApiOptions } from './OrchestratorApi';
3231
import { CompilerApi } from './CompilerApi';
3332
import { DevServer } from './DevServer';
34-
import agentCollect from './agentCollect';
33+
import { agentCollect } from './agentCollect';
3534
import { OrchestratorStorage } from './OrchestratorStorage';
3635
import { prodLogger, devLogger } from './logger';
3736
import { OptsHandler } from './OptsHandler';
@@ -60,6 +59,7 @@ import type {
6059
DriverConfig,
6160
ScheduledRefreshTimeZonesFn,
6261
ContextToCubeStoreRouterIdFn,
62+
LoggerFnParams,
6363
} from './types';
6464
import {
6565
ContextToOrchestratorIdFn,
@@ -171,8 +171,6 @@ export class CubejsServerCore {
171171

172172
public projectFingerprint: string | null = null;
173173

174-
public anonymousId: string | null = null;
175-
176174
public coreServerVersion: string | null = null;
177175

178176
protected contextAcceptor: ContextAcceptor;
@@ -233,7 +231,7 @@ export class CubejsServerCore {
233231

234232
this.startScheduledRefreshTimer();
235233

236-
this.event = async (name, props) => {
234+
this.event = async (event, props: LoggerFnParams) => {
237235
if (!this.options.telemetry) {
238236
return;
239237
}
@@ -248,15 +246,12 @@ export class CubejsServerCore {
248246
}
249247
}
250248

251-
if (!this.anonymousId) {
252-
this.anonymousId = getAnonymousId();
253-
}
254-
255249
const internalExceptionsEnv = getEnv('internalExceptions');
256250

257251
try {
258252
await track({
259-
event: name,
253+
timestamp: new Date().toJSON(),
254+
event,
260255
projectFingerprint: this.projectFingerprint,
261256
coreServerVersion: this.coreServerVersion,
262257
dockerVersion: getEnv('dockerImageVersion'),
@@ -410,7 +405,12 @@ export class CubejsServerCore {
410405
if (agentEndpointUrl) {
411406
const oldLogger = this.logger;
412407
this.preAgentLogger = oldLogger;
408+
413409
this.logger = (msg, params) => {
410+
// Filling timestamp as much as earlier as we can, otherwise it can be incorrect. Because next code is async
411+
// with await points which can be delayed with Node.js micro-tasking.
412+
params.timestamp = params.timestamp || new Date().toJSON();
413+
414414
oldLogger(msg, params);
415415
agentCollect(
416416
{

packages/cubejs-server-core/src/core/types.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,12 @@ export type ExternalDbTypeFn = (context: RequestContext) => DatabaseType;
167167
export type ExternalDriverFactoryFn = (context: RequestContext) => Promise<BaseDriver> | BaseDriver;
168168
export type ExternalDialectFactoryFn = (context: RequestContext) => BaseQuery;
169169

170-
export type LoggerFn = (msg: string, params: Record<string, any>) => void;
170+
export type LoggerFnParams = {
171+
// It's possible to fill timestamp at the place of logging, otherwise, it will be filled in automatically
172+
timestamp?: string,
173+
[key: string]: any,
174+
};
175+
export type LoggerFn = (msg: string, params: LoggerFnParams) => void;
171176

172177
export type BiToolSyncConfig = {
173178
type: string;

rust/cubesql/cubesql/src/sql/compiler_cache.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,8 @@ impl CompilerCache for CompilerCacheImpl {
191191
.get(&(compiler_id, protocol.clone()))
192192
.cloned()
193193
};
194-
// Double checked locking
194+
195+
// Double-checked locking
195196
let cache_entry = if let Some(cache_entry) = cache_entry {
196197
cache_entry
197198
} else {

0 commit comments

Comments
 (0)