Skip to content

Commit 3039377

Browse files
fix(otel-v2): provide id generator
1 parent 5541c8d commit 3039377

8 files changed

Lines changed: 950 additions & 74 deletions

File tree

packages/interceptors-opentelemetry-v2/src/__tests__/history_files/otel_v2_smorgasbord.json

Lines changed: 796 additions & 0 deletions
Large diffs are not rendered by default.

packages/interceptors-opentelemetry-v2/src/__tests__/test-otel.ts

Lines changed: 75 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
/**
22
* Manual tests to inspect tracing output
33
*/
4-
import * as http from 'http';
54
import * as http2 from 'http2';
65
import * as path from 'path';
76
import * as otelApi from '@opentelemetry/api';
@@ -14,7 +13,6 @@ import { ATTR_SERVICE_NAME } from '@opentelemetry/semantic-conventions';
1413
import test from 'ava';
1514
import { v4 as uuid4 } from 'uuid';
1615
import { WorkflowClient, WithStartWorkflowOperation, WorkflowClientInterceptor, Client } from '@temporalio/client';
17-
import { TestWorkflowEnvironment } from '@temporalio/testing';
1816
import {
1917
ActivityInboundCallsInterceptor,
2018
ActivityOutboundCallsInterceptor,
@@ -147,15 +145,15 @@ if (RUN_INTEGRATION_TESTS) {
147145
const worker = await Worker.create({
148146
workflowsPath: require.resolve('./workflows'),
149147
activities,
150-
taskQueue: 'test-otel',
148+
taskQueue: 'test-otel-v2',
151149
plugins: [plugin],
152150
});
153151

154152
const client = new Client({
155153
plugins: [plugin],
156154
});
157155
await worker.runUntil(
158-
client.workflow.execute(workflows.smorgasbord, { taskQueue: 'test-otel', workflowId: uuid4() })
156+
client.workflow.execute(workflows.smorgasbord, { taskQueue: 'test-otel-v2', workflowId: uuid4() })
159157
);
160158
await otel.shutdown();
161159
const originalSpan = spans.find(({ name }) => name === `${SpanName.WORKFLOW_START}${SPAN_DELIMITER}smorgasbord`);
@@ -301,7 +299,7 @@ if (RUN_INTEGRATION_TESTS) {
301299
workflowsPath: require.resolve('./workflows'),
302300
activities,
303301
enableSDKTracing: true,
304-
taskQueue: 'test-otel',
302+
taskQueue: 'test-otel-v2',
305303
interceptors: {
306304
workflowModules: [require.resolve('./workflows/otel-interceptors')],
307305
activity: [(ctx) => ({ inbound: new OpenTelemetryActivityInboundInterceptor(ctx) })],
@@ -312,7 +310,7 @@ if (RUN_INTEGRATION_TESTS) {
312310
const client = new WorkflowClient({
313311
interceptors: [new OpenTelemetryWorkflowClientInterceptor()],
314312
});
315-
await worker.runUntil(client.execute(workflows.smorgasbord, { taskQueue: 'test-otel', workflowId: uuid4() }));
313+
await worker.runUntil(client.execute(workflows.smorgasbord, { taskQueue: 'test-otel-v2', workflowId: uuid4() }));
316314
// Allow some time to ensure spans are flushed out to collector
317315
await new Promise<void>((resolve) => setTimeout(resolve, 5000));
318316
t.pass();
@@ -327,7 +325,7 @@ if (RUN_INTEGRATION_TESTS) {
327325
t.falsy((globalThis as any).window);
328326
});
329327

330-
test('instrumentation: Error status includes message and records exception', async (t) => {
328+
test.serial('instrumentation: Error status includes message and records exception', async (t) => {
331329
const memoryExporter = new InMemorySpanExporter();
332330
const provider = new BasicTracerProvider({
333331
spanProcessors: [new SimpleSpanProcessor(memoryExporter)],
@@ -360,7 +358,7 @@ if (RUN_INTEGRATION_TESTS) {
360358
t.is(exceptionEvents.length, 1);
361359
});
362360

363-
test('Otel workflow omits ApplicationError with BENIGN category', async (t) => {
361+
test.serial('Otel workflow omits ApplicationError with BENIGN category', async (t) => {
364362
const memoryExporter = new InMemorySpanExporter();
365363
const provider = new BasicTracerProvider({
366364
spanProcessors: [new SimpleSpanProcessor(memoryExporter)],
@@ -370,7 +368,7 @@ if (RUN_INTEGRATION_TESTS) {
370368
const worker = await Worker.create({
371369
workflowsPath: require.resolve('./workflows'),
372370
activities,
373-
taskQueue: 'test-otel-benign-err',
371+
taskQueue: 'test-otel-v2-benign-err',
374372
interceptors: {
375373
activity: [
376374
(ctx) => {
@@ -384,7 +382,7 @@ if (RUN_INTEGRATION_TESTS) {
384382

385383
await worker.runUntil(
386384
client.execute(workflows.throwMaybeBenignErr, {
387-
taskQueue: 'test-otel-benign-err',
385+
taskQueue: 'test-otel-v2-benign-err',
388386
workflowId: uuid4(),
389387
retry: { maximumAttempts: 3 },
390388
})
@@ -399,7 +397,7 @@ if (RUN_INTEGRATION_TESTS) {
399397
t.is(spans[2]!.status.code, SpanStatusCode.OK);
400398
});
401399

402-
test('executeUpdateWithStart works correctly with OTEL interceptors', async (t) => {
400+
test.serial('executeUpdateWithStart works correctly with OTEL interceptors', async (t) => {
403401
const staticResource = opentelemetry.resources.resourceFromAttributes({
404402
[ATTR_SERVICE_NAME]: 'ts-test-otel-worker',
405403
});
@@ -420,15 +418,15 @@ if (RUN_INTEGRATION_TESTS) {
420418
plugins: [plugin],
421419
}),
422420
activities,
423-
taskQueue: 'test-otel-update-start',
421+
taskQueue: 'test-otel-v2-update-start',
424422
plugins: [plugin],
425423
});
426424

427425
const client = new WorkflowClient();
428426

429427
const startWorkflowOperation = new WithStartWorkflowOperation(workflows.updateStartOtel, {
430428
workflowId: uuid4(),
431-
taskQueue: 'test-otel-update-start',
429+
taskQueue: 'test-otel-v2-update-start',
432430
workflowIdConflictPolicy: 'FAIL',
433431
});
434432

@@ -450,8 +448,8 @@ if (RUN_INTEGRATION_TESTS) {
450448

451449
// These tests verify makeWorkflowExporter's handling of async resource attributes:
452450
// https://github.com/temporalio/sdk-typescript/issues/1779
453-
test(`makeWorkflowExporter with SpanProcessor does await async resource attributes`, async (t) => {
454-
const taskQueue = `test-otel-async-processor`;
451+
test.serial(`makeWorkflowExporter with SpanProcessor does await async resource attributes`, async (t) => {
452+
const taskQueue = `test-otel-v2-async-processor`;
455453
const serviceName = `ts-test-otel-async-attributes`;
456454

457455
// In OTel v2, async resource attributes are created by passing Promise
@@ -523,7 +521,7 @@ if (RUN_INTEGRATION_TESTS) {
523521
const spans = Array<opentelemetry.tracing.ReadableSpan>();
524522

525523
const staticResource = opentelemetry.resources.resourceFromAttributes({
526-
[ATTR_SERVICE_NAME]: 'ts-test-otel-prebundled-worker',
524+
[ATTR_SERVICE_NAME]: 'ts-test-otel-v2-prebundled-worker',
527525
});
528526
const traceExporter: opentelemetry.tracing.SpanExporter = {
529527
export(spans_, resultCallback) {
@@ -561,7 +559,7 @@ if (RUN_INTEGRATION_TESTS) {
561559
const worker = await Worker.create({
562560
workflowBundle,
563561
activities,
564-
taskQueue: 'test-otel-prebundled',
562+
taskQueue: 'test-otel-v2-prebundled',
565563
plugins: [plugin],
566564
});
567565

@@ -570,7 +568,7 @@ if (RUN_INTEGRATION_TESTS) {
570568
plugins: [plugin],
571569
});
572570
await worker.runUntil(
573-
client.workflow.execute(workflows.smorgasbord, { taskQueue: 'test-otel-prebundled', workflowId: uuid4() })
571+
client.workflow.execute(workflows.smorgasbord, { taskQueue: 'test-otel-v2-prebundled', workflowId: uuid4() })
574572
);
575573
await provider.shutdown();
576574

@@ -649,7 +647,7 @@ if (RUN_INTEGRATION_TESTS) {
649647
const worker = await Worker.create({
650648
workflowsPath: require.resolve('./workflows'),
651649
activities,
652-
taskQueue: 'test-otel-tracestate',
650+
taskQueue: 'test-otel-v2-tracestate',
653651
interceptors: {
654652
workflowModules: [require.resolve('./workflows/otel-interceptors')],
655653
},
@@ -682,7 +680,7 @@ if (RUN_INTEGRATION_TESTS) {
682680
// Execute the workflow within this context so the traceState is propagated
683681
await otelApi.context.with(contextWithTraceState, async () => {
684682
await client.execute(workflows.successString, {
685-
taskQueue: 'test-otel-tracestate',
683+
taskQueue: 'test-otel-v2-tracestate',
686684
workflowId: uuid4(),
687685
});
688686
});
@@ -707,6 +705,61 @@ if (RUN_INTEGRATION_TESTS) {
707705
t.is(traceState!.get('vendor2'), 'value2');
708706
}
709707
});
708+
709+
test.serial('OTel span creation does not affect workflow PRNG sequence', async (t) => {
710+
const plugin = new OpenTelemetryPlugin({
711+
resource: opentelemetry.resources.resourceFromAttributes({
712+
[ATTR_SERVICE_NAME]: 'ts-test-prng-isolation',
713+
}),
714+
spanProcessor: new SimpleSpanProcessor({
715+
export(_spans, resultCallback) {
716+
resultCallback({ code: ExportResultCode.SUCCESS });
717+
},
718+
async shutdown() {},
719+
}),
720+
});
721+
722+
const otelBundle = await createOtelTestWorkflowBundle({
723+
workflowsPath: require.resolve('./workflows'),
724+
plugins: [plugin],
725+
});
726+
727+
// Run the workflow with OTel interceptors and capture uuid4() values
728+
const taskQueue1 = 'test-otel-v2-prng-isolation-1';
729+
const workerWithOtel = await Worker.create({
730+
workflowBundle: otelBundle,
731+
activities,
732+
taskQueue: taskQueue1,
733+
plugins: [plugin],
734+
});
735+
const client = new WorkflowClient();
736+
const workflowId = uuid4();
737+
const idsWithOtel = await workerWithOtel.runUntil(
738+
client.execute(workflows.prngIsolation, { taskQueue: taskQueue1, workflowId })
739+
);
740+
741+
t.is(idsWithOtel.length, 10, 'should produce 10 uuid4 values');
742+
t.is(new Set(idsWithOtel).size, 10, 'all uuid4 values should be unique');
743+
744+
// Replay the same workflow — if the DeterministicIdGenerator properly
745+
// isolates OTel's PRNG from the workflow's Math.random(), uuid4()
746+
// produces the same sequence on replay.
747+
const handle = client.getHandle(workflowId);
748+
const history = await handle.fetchHistory();
749+
await t.notThrowsAsync(
750+
Worker.runReplayHistory(
751+
{
752+
workflowBundle: otelBundle,
753+
interceptors: {
754+
workflowModules: [require.resolve('./workflows/otel-interceptors')],
755+
},
756+
},
757+
history
758+
)
759+
);
760+
761+
t.log('uuid4() values with OTel:', idsWithOtel);
762+
});
710763
}
711764

712765
test('Can replay otel history from 1.11.3', async (t) => {
@@ -755,60 +808,8 @@ test('Can replay otel history from 1.13.1', async (t) => {
755808
});
756809
});
757810

758-
// These histories were recorded with OTel v1 libraries. The v2 OTel library's
759-
// different module structure changes the webpack module initialization order in
760-
// the workflow sandbox, which shifts the deterministic Math.random() state and
761-
// produces different uuid4() values for child workflow IDs.
762-
// Workflows without auto-generated IDs (otel_1_11_3, otel_1_13_1) replay fine.
763-
test.skip('Can replay smorgasbord from 1.13.1', async (t) => {
764-
// This test will trigger NDE if yield points for `scheduleActivity` and `startChildWorkflowExecution` are not inserted
765-
const hist = await loadHistory('otel_smorgasbord_1_13_1.json');
766-
await t.notThrowsAsync(async () => {
767-
await Worker.runReplayHistory(
768-
{
769-
workflowBundle: await createOtelTestWorkflowBundle({
770-
workflowsPath: require.resolve('./workflows'),
771-
workflowInterceptorModules: [require.resolve('./workflows/otel-interceptors')],
772-
}),
773-
interceptors: {
774-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
775-
activity: [
776-
(ctx) => ({
777-
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),
778-
}),
779-
],
780-
},
781-
},
782-
hist
783-
);
784-
});
785-
});
786-
787-
test.skip('Can replay signal workflow from 1.13.1', async (t) => {
788-
const hist = await loadHistory('signal_workflow_1_13_1.json');
789-
await t.notThrowsAsync(async () => {
790-
await Worker.runReplayHistory(
791-
{
792-
workflowBundle: await createOtelTestWorkflowBundle({
793-
workflowsPath: require.resolve('./workflows/signal-workflow'),
794-
workflowInterceptorModules: [require.resolve('./workflows/otel-interceptors')],
795-
}),
796-
interceptors: {
797-
workflowModules: [require.resolve('./workflows/otel-interceptors')],
798-
activity: [
799-
(ctx) => ({
800-
inbound: new OpenTelemetryActivityInboundInterceptor(ctx),
801-
}),
802-
],
803-
},
804-
},
805-
hist
806-
);
807-
});
808-
});
809-
810-
test.skip('Can replay smorgasbord from 1.13.2', async (t) => {
811-
const hist = await loadHistory('otel_smorgasbord_1_13_2.json');
811+
test('Can replay smorgasbord history from 1.15.0', async (t) => {
812+
const hist = await loadHistory('otel_v2_smorgasbord.json');
812813
await t.notThrowsAsync(async () => {
813814
await Worker.runReplayHistory(
814815
{

packages/interceptors-opentelemetry-v2/src/__tests__/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ export * from './smorgasbord';
44
export * from './success-string';
55
export * from './throw-maybe-benign';
66
export * from './update-start-otel';
7+
export * from './prng-isolation';
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
/**
2+
* Workflow that captures uuid4() values to verify OTel span creation
3+
* does not affect the workflow's Math.random() PRNG sequence.
4+
*/
5+
import { uuid4 } from '@temporalio/workflow';
6+
7+
export async function prngIsolation(): Promise<string[]> {
8+
const ids: string[] = [];
9+
for (let i = 0; i < 10; i++) {
10+
ids.push(uuid4());
11+
}
12+
return ids;
13+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import type { IdGenerator } from '@opentelemetry/sdk-trace-base';
2+
import { alea, type RNG } from './workflow-imports';
3+
4+
// Seed the OTel ID generator's PRNG from the workflow's PRNG, then advance
5+
// the workflow PRNG once so the two sequences diverge immediately.
6+
let otelRandom: RNG | undefined;
7+
8+
function getOtelRandom(): RNG {
9+
if (otelRandom === undefined) {
10+
// Use Math.random() (which is the workflow's deterministic PRNG) to
11+
// produce a seed for a SEPARATE alea instance. After this, the OTel
12+
// PRNG and the workflow PRNG are independent sequences.
13+
const seed = [Math.random(), Math.random(), Math.random(), Math.random()];
14+
otelRandom = alea(seed.map((v) => (v * 0x100000000) >>> 0));
15+
}
16+
return otelRandom;
17+
}
18+
19+
function randomHex(length: number): string {
20+
const rng = getOtelRandom();
21+
const chars: string[] = [];
22+
for (let i = 0; i < length; i++) {
23+
const nibble = (rng() * 16) >>> 0;
24+
chars.push(nibble.toString(16));
25+
}
26+
return chars.join('');
27+
}
28+
29+
/**
30+
* Reset the OTel PRNG so the next workflow gets a freshly seeded generator.
31+
* Must be called during workflow dispose since modules are shared across
32+
* workflow instances in the reusable VM.
33+
*/
34+
export function resetIdGenerator(): void {
35+
otelRandom = undefined;
36+
}
37+
38+
/**
39+
* Generates span and trace IDs using a PRNG that is separate from the
40+
* workflow's Math.random(), preventing OTel ID generation from affecting
41+
* the deterministic sequence used by uuid4() for child workflow IDs.
42+
*/
43+
export class DeterministicIdGenerator implements IdGenerator {
44+
generateTraceId(): string {
45+
const id = randomHex(32);
46+
// Ensure non-zero per W3C Trace Context spec
47+
return id === '00000000000000000000000000000000' ? '00000000000000000000000000000001' : id;
48+
}
49+
50+
generateSpanId(): string {
51+
const id = randomHex(16);
52+
return id === '0000000000000000' ? '0000000000000001' : id;
53+
}
54+
}

packages/interceptors-opentelemetry-v2/src/workflow/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import {
3838
import { ContextManager } from './context-manager';
3939
import { SpanName, SPAN_DELIMITER } from './definitions';
4040
import { SpanExporter } from './span-exporter';
41+
import { DeterministicIdGenerator, resetIdGenerator } from './id-generator';
4142
import { workflowInfo, ContinueAsNew, getActivator, SdkFlags } from './workflow-imports';
4243

4344
export * from './definitions';
@@ -51,6 +52,7 @@ function getTracer(): otel.Tracer {
5152
}
5253
if (tracer === undefined) {
5354
const provider = new tracing.BasicTracerProvider({
55+
idGenerator: new DeterministicIdGenerator(),
5456
spanProcessors: [new tracing.SimpleSpanProcessor(new SpanExporter())],
5557
});
5658
otel.propagation.setGlobalPropagator(new W3CTraceContextPropagator());
@@ -326,6 +328,7 @@ export class OpenTelemetryInternalsInterceptor implements WorkflowInternalsInter
326328
if (contextManager !== undefined) {
327329
contextManager.disable();
328330
}
331+
resetIdGenerator();
329332
next(input);
330333
}
331334
}

packages/interceptors-opentelemetry-v2/src/workflow/workflow-imports-impl.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@
77
export { inWorkflowContext, proxySinks, workflowInfo, AsyncLocalStorage, ContinueAsNew } from '@temporalio/workflow';
88
export { SdkFlags } from '@temporalio/workflow/lib/flags';
99
export { getActivator } from '@temporalio/workflow/lib/global-attributes';
10+
export { alea, type RNG } from '@temporalio/workflow/lib/alea';

0 commit comments

Comments
 (0)