Skip to content

Commit fd539ac

Browse files
authored
Allow logging queries validators during replay (#1927)
1 parent 42b1664 commit fd539ac

10 files changed

Lines changed: 168 additions & 22 deletions

File tree

packages/test/src/run-activation-perf-tests.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ if (!wf.inWorkflowContext()) {
8585
historyLength: 3,
8686
historySize: 300,
8787
continueAsNewSuggested: false,
88-
unsafe: { isReplaying: false, now: Date.now },
88+
unsafe: { isReplaying: false, isReplayingHistoryEvents: false, now: Date.now },
8989
startTime: new Date(),
9090
runStartTime: new Date(),
9191
};

packages/test/src/test-integration-split-one.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -752,7 +752,7 @@ test.serial('Workflow can read WorkflowInfo', configMacro, async (t, config) =>
752752
currentBuildId: res.currentBuildId, // eslint-disable-line deprecation/deprecation
753753
currentDeploymentVersion: res.currentDeploymentVersion,
754754
// unsafe.now is a function, so doesn't make it through serialization, but .now is required, so we need to cast
755-
unsafe: { isReplaying: false } as UnsafeWorkflowInfo,
755+
unsafe: { isReplaying: false, isReplayingHistoryEvents: false } as UnsafeWorkflowInfo,
756756
priority: {},
757757
});
758758
});

packages/test/src/test-sinks.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ if (RUN_INTEGRATION_TESTS) {
146146
// unsafe.now() doesn't make it through serialization, but .now is required, so we need to cast
147147
unsafe: {
148148
isReplaying: false,
149+
isReplayingHistoryEvents: false,
149150
} as UnsafeWorkflowInfo,
150151
priority: {
151152
fairnessKey: undefined,
@@ -554,4 +555,72 @@ if (RUN_INTEGRATION_TESTS) {
554555
},
555556
]);
556557
});
558+
559+
test('Logging is allowed in query handlers and update validators', async (t) => {
560+
const taskQueue = `${__filename}-${t.title}`;
561+
562+
let recordedMessages: { message: string; isReplaying: boolean }[] = [];
563+
const sinks: InjectedSinks<workflows.CustomLoggerSinks> = {
564+
customLogger: {
565+
info: {
566+
fn: async (info, message) => {
567+
recordedMessages.push({
568+
message,
569+
isReplaying: info.unsafe.isReplaying,
570+
});
571+
},
572+
},
573+
},
574+
};
575+
576+
const client = new WorkflowClient();
577+
const handle = await client.start(workflows.queryAndValidatorLogging, { taskQueue, workflowId: uuid4() });
578+
579+
const worker = await Worker.create({
580+
...defaultOptions,
581+
taskQueue,
582+
sinks,
583+
// Avoid waiting for sticky execution timeout on worker transition
584+
stickyQueueScheduleToStartTimeout: '1s',
585+
});
586+
587+
await worker.runUntil(async () => {
588+
await handle.query(workflows.loggingQuery);
589+
await handle.executeUpdate(workflows.loggingUpdate, { args: ['good'] });
590+
});
591+
592+
let messages = recordedMessages.map((m) => m.message);
593+
t.true(messages.includes('Query handler called'), 'Query handler log should be emitted');
594+
t.true(messages.includes('Update validator called'), 'Update validator log should be emitted');
595+
t.true(messages.includes('Update handler called'), 'Update handler log should be emitted');
596+
597+
const worker2 = await Worker.create({
598+
...defaultOptions,
599+
taskQueue,
600+
sinks,
601+
// Avoid waiting for sticky execution timeout on worker transition
602+
stickyQueueScheduleToStartTimeout: '1s',
603+
});
604+
605+
// Empty recorded messages
606+
recordedMessages = [];
607+
608+
// Run the entire workflow through worker 2 (will replay).
609+
await worker2.runUntil(async () => {
610+
await handle.query(workflows.loggingQuery);
611+
// No update - it will be replayed
612+
await handle.signal(workflows.unblockSignal);
613+
const res = await handle.result();
614+
// Assert that the update replayed (expect initial update arg as result)
615+
t.is(res, 'good');
616+
});
617+
618+
messages = recordedMessages.map((m) => m.message);
619+
// Query is a live operation even during replay — log should be emitted
620+
t.true(messages.includes('Query handler called'), 'Query handler log should be emitted on replay');
621+
// Validator does not re-run during replay (Core sends runValidator: false for accepted updates)
622+
t.false(messages.includes('Update validator called'), 'Update validator log should not be emitted on replay');
623+
// Update handler re-runs during replay to rebuild state, but its logs should be suppressed
624+
t.false(messages.includes('Update handler called'), 'Update handler log should not be emitted on replay');
625+
});
557626
}

packages/test/src/test-workflows.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ async function createWorkflow(
122122
historyLength: 3,
123123
historySize: 300,
124124
continueAsNewSuggested: false,
125-
unsafe: { isReplaying: false, now: Date.now },
125+
unsafe: { isReplaying: false, isReplayingHistoryEvents: false, now: Date.now },
126126
startTime: new Date(),
127127
runStartTime: new Date(),
128128
},

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export * from './patched';
5454
export * from './patched-top-level';
5555
export * from './priority';
5656
export * from './promise-all';
57+
export * from './query-and-validator-logging';
5758
export * from './promise-race';
5859
export * from './promise-then-promise';
5960
export * from './race';
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import * as wf from '@temporalio/workflow';
2+
import { CustomLoggerSinks } from './log-sink-tester';
3+
import { unblockSignal } from './definitions';
4+
5+
const { customLogger } = wf.proxySinks<CustomLoggerSinks>();
6+
7+
export const loggingQuery = wf.defineQuery<string>('loggingQuery');
8+
export const loggingUpdate = wf.defineUpdate<string, [string]>('loggingUpdate');
9+
10+
export async function queryAndValidatorLogging(): Promise<string> {
11+
let lastSignal = '';
12+
let updateArg = '';
13+
14+
wf.setHandler(loggingQuery, () => {
15+
customLogger.info('Query handler called');
16+
return lastSignal;
17+
});
18+
19+
wf.setHandler(
20+
loggingUpdate,
21+
(arg: string) => {
22+
customLogger.info('Update handler called');
23+
updateArg = arg;
24+
return `update-result: ${arg}`;
25+
},
26+
{
27+
validator: (arg: string) => {
28+
customLogger.info('Update validator called');
29+
if (arg === 'bad') {
30+
throw new Error('Validation failed');
31+
}
32+
},
33+
}
34+
);
35+
36+
wf.setHandler(unblockSignal, () => {
37+
lastSignal = 'unblocked';
38+
});
39+
40+
await wf.condition(() => lastSignal === 'unblocked');
41+
return updateArg;
42+
}

packages/worker/src/worker.ts

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,12 +1459,10 @@ export class Worker {
14591459
// When processing workflows through runReplayHistories, Core may still send non-replay
14601460
// activations on the very last Workflow Task in some cases. Though Core is technically exact
14611461
// here, the fact that sinks marked with callDuringReplay = false may get called on a replay
1462-
// worker is definitely a surprising behavior. For that reason, we extend the isReplaying flag in
1463-
// this case to also include anything running under in a replay worker.
1464-
const isReplaying = activation.isReplaying || this.isReplayWorker;
1465-
1462+
// worker is definitely a surprising behavior. For that reason, processSinkCalls checks
1463+
// this.isReplayWorker to suppress all non-callDuringReplay sinks on replay workers.
14661464
const calls = await workflow.workflow.getAndResetSinkCalls();
1467-
await this.processSinkCalls(calls, isReplaying, workflow.logAttributes);
1465+
await this.processSinkCalls(calls, workflow.logAttributes);
14681466
}
14691467
this.logger.trace('Completed activation', workflow.logAttributes);
14701468
}
@@ -1572,6 +1570,7 @@ export class Worker {
15721570
unsafe: {
15731571
now: () => Date.now(), // re-set in initRuntime
15741572
isReplaying: activation.isReplaying,
1573+
isReplayingHistoryEvents: activation.isReplaying,
15751574
},
15761575
priority: decodePriority(priority),
15771576
};
@@ -1597,11 +1596,7 @@ export class Worker {
15971596
* This function does not throw, it will log in case of missing sinks
15981597
* or failed sink function invocations.
15991598
*/
1600-
protected async processSinkCalls(
1601-
externalCalls: SinkCall[],
1602-
isReplaying: boolean,
1603-
logAttributes: Record<string, unknown>
1604-
): Promise<void> {
1599+
protected async processSinkCalls(externalCalls: SinkCall[], logAttributes: Record<string, unknown>): Promise<void> {
16051600
const { sinks } = this.options;
16061601

16071602
const filteredCalls = externalCalls
@@ -1619,8 +1614,15 @@ export class Worker {
16191614
});
16201615
return false;
16211616
})
1622-
// If appropriate, reject calls to sink functions not configured with `callDuringReplay = true`
1623-
.filter(({ sink }) => sink?.callDuringReplay || !isReplaying);
1617+
// If appropriate, reject calls to sink functions not configured with `callDuringReplay = true`.
1618+
// Use per-call isReplayingHistoryEvents (which is false during queries and update validators)
1619+
// rather than per-activation isReplaying, so that logging is permitted during live read-only operations.
1620+
// Replay workers still suppress all non-callDuringReplay sinks regardless.
1621+
.filter(({ call, sink }) => {
1622+
if (sink?.callDuringReplay) return true;
1623+
if (this.isReplayWorker) return false;
1624+
return !call.workflowInfo.unsafe.isReplayingHistoryEvents;
1625+
});
16241626

16251627
// Make a wrapper function, to make things easier afterward
16261628
await Promise.all(

packages/worker/src/workflow/vm-shared.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ export function injectGlobals(context: vm.Context): void {
116116
console[level](`[not in workflow context]`, ...args);
117117
} else {
118118
const { info } = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__!;
119-
if (info.unsafe.isReplaying) return;
119+
if (info.unsafe.isReplayingHistoryEvents) return;
120120
console[level](`[${info.workflowType}(${info.workflowId})]`, ...args);
121121
}
122122
};
@@ -394,6 +394,7 @@ export abstract class BaseVMWorkflow implements Workflow {
394394
unsafe: {
395395
...info.unsafe,
396396
isReplaying: activation.isReplaying ?? false,
397+
isReplayingHistoryEvents: activation.isReplaying ?? false,
397398
},
398399
}));
399400
this.activator.addKnownFlags(activation.availableInternalFlags ?? []);
@@ -470,6 +471,8 @@ export abstract class BaseVMWorkflow implements Workflow {
470471
unsafe: {
471472
...info.unsafe,
472473
isReplaying: true,
474+
// Queries are live read-only operations, not replay of history events
475+
isReplayingHistoryEvents: false,
473476
},
474477
}));
475478
this.workflowModule.activate(activation);

packages/workflow/src/interfaces.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,22 @@ export interface UnsafeWorkflowInfo {
233233
*/
234234
readonly now: () => number;
235235

236+
/**
237+
* Whether the workflow is currently replaying.
238+
*/
236239
readonly isReplaying: boolean;
240+
241+
/**
242+
* Whether the workflow is currently replaying history events.
243+
*
244+
* This is similar to {@link isReplaying}, but returns `false` during query handlers and update
245+
* validators, which are live read-only operations that should not be considered as replaying
246+
* history events.
247+
*
248+
* When this property is true, workflow log messages are suppressed and sinks defined with
249+
* callDuringReplay=false won't get processed.
250+
*/
251+
readonly isReplayingHistoryEvents: boolean;
237252
}
238253

239254
/**

packages/workflow/src/internals.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -858,12 +858,26 @@ export class Activator implements ActivationHandler {
858858
let input: UpdateInput;
859859
try {
860860
if (runValidator && entry.validator) {
861-
const validate = composeInterceptors(
862-
interceptors,
863-
'validateUpdate',
864-
this.validateUpdateNextHandler.bind(this, entry.validator)
865-
);
866-
validate(makeInput());
861+
// Temporarily mark as not replaying history events during validator execution
862+
// so that logging is permitted. Validators are live read-only operations.
863+
const wasReplayingHistoryEvents = this.info.unsafe.isReplayingHistoryEvents;
864+
this.mutateWorkflowInfo((info) => ({
865+
...info,
866+
unsafe: { ...info.unsafe, isReplayingHistoryEvents: false },
867+
}));
868+
try {
869+
const validate = composeInterceptors(
870+
interceptors,
871+
'validateUpdate',
872+
this.validateUpdateNextHandler.bind(this, entry.validator)
873+
);
874+
validate(makeInput());
875+
} finally {
876+
this.mutateWorkflowInfo((info) => ({
877+
...info,
878+
unsafe: { ...info.unsafe, isReplayingHistoryEvents: wasReplayingHistoryEvents },
879+
}));
880+
}
867881
}
868882
input = makeInput();
869883
} catch (error) {

0 commit comments

Comments
 (0)