Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ export interface DBOSConfig {
systemDatabaseSchemaName?: string;

enableOTLP?: boolean;
tracingEnabled?: boolean;
logLevel?: string;
addContextMetadata?: boolean;
otlpTracesEndpoints?: string[];
Expand Down Expand Up @@ -249,7 +250,7 @@ export class DBOSExecutor {
}
this.logger = new GlobalLogger(this.telemetryCollector, this.config.telemetry.logs, this.appName);
this.ctxLogger = new DBOSContextualLogger(this.logger, () => getActiveSpan());
this.tracer = new Tracer(this.telemetryCollector, this.appName);
this.tracer = new Tracer(this.telemetryCollector);
this.serializer = config.serializer;

if (systemDatabase) {
Expand Down
1 change: 1 addition & 0 deletions src/dbos.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ export class DBOS {
}

globalParams.enableOTLP = DBOS.#dbosConfig?.enableOTLP ?? defaultEnableOTLP();
globalParams.tracingEnabled = DBOS.#dbosConfig?.tracingEnabled || globalParams.enableOTLP;

if (!isTraceContextWorking()) installTraceContextManager(internalConfig.name);

Expand Down
44 changes: 16 additions & 28 deletions src/telemetry/traces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,23 @@ class StubSpan implements DBOSSpan {
}

export function runWithTrace<R>(span: DBOSSpan, func: () => Promise<R>): Promise<R> {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return func();
}
const { context, trace } = require('@opentelemetry/api');
return context.with(trace.setSpan(context.active(), span as Span), func);
}

export function getActiveSpan() {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return undefined;
}
const { trace } = require('@opentelemetry/api');
return trace.getActiveSpan() as DBOSSpan | undefined;
}

export function isTraceContextWorking(): boolean {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return false;
}
const { context, trace } = require('@opentelemetry/api');
Expand All @@ -110,17 +110,18 @@ export function isTraceContextWorking(): boolean {
}

export function installTraceContextManager(appName: string = 'dbos'): void {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return;
}
const { AsyncLocalStorageContextManager } = require('@opentelemetry/context-async-hooks');
const { context, trace } = require('@opentelemetry/api');
const { BasicTracerProvider } = require('@opentelemetry/sdk-trace-base');

// setGlobalTracerProvider and setGlobalContextManager are "first one wins."
// If an external provider is already registered, these calls are safely ignored.
const contextManager = new AsyncLocalStorageContextManager();
contextManager.enable();
context.setGlobalContextManager(contextManager);

const provider: BasicTracerProviderType = new BasicTracerProvider({
resource: {
attributes: {
Expand All @@ -134,30 +135,13 @@ export function installTraceContextManager(appName: string = 'dbos'): void {
export class Tracer {
readonly applicationID: string;
readonly executorID: string;
constructor(
private readonly telemetryCollector: TelemetryCollector,
appName: string = 'dbos',
) {
constructor(private readonly telemetryCollector: TelemetryCollector) {
this.applicationID = globalParams.appID;
this.executorID = globalParams.executorID; // for consistency with src/context.ts
if (!globalParams.enableOTLP) {
return;
}
const { trace } = require('@opentelemetry/api');
const { BasicTracerProvider } = require('@opentelemetry/sdk-trace-base');

const tracer: BasicTracerProviderType = new BasicTracerProvider({
resource: {
attributes: {
'service.name': appName,
},
},
});
trace.setGlobalTracerProvider(tracer);
this.executorID = globalParams.executorID;
}

startSpanWithContext(spanContext: unknown, name: string, attributes?: Attributes): DBOSSpan {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return new StubSpan();
}
const opentelemetry = require('@opentelemetry/api');
Expand All @@ -167,7 +151,7 @@ export class Tracer {
}

startSpan(name: string, attributes?: Attributes, inputSpan?: DBOSSpan): DBOSSpan {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return new StubSpan();
}
const parentSpan = inputSpan as Span;
Expand All @@ -184,7 +168,7 @@ export class Tracer {
}

endSpan(inputSpan: DBOSSpan) {
if (!globalParams.enableOTLP) {
if (!globalParams.tracingEnabled) {
return;
}
const { hrTime } = require('@opentelemetry/core');
Expand All @@ -197,6 +181,10 @@ export class Tracer {
span.setAttribute('executorID', this.executorID);
}
span.end(hrTime(performance.now()));
this.telemetryCollector.push(span);
// Only push to DBOS's own collector when DBOS manages export.
// When an external TracerProvider is used, span.end() triggers its processors.
if (globalParams.enableOTLP) {
this.telemetryCollector.push(span);
}
}
}
1 change: 1 addition & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export const globalParams = {
executorID: process.env.DBOS__VMID || 'local', // The one true source of executorID
appID: process.env.DBOS__APPID || '', // The one true source of appID
enableOTLP: defaultEnableOTLP(), // Whether OTLP is enabled
tracingEnabled: false, // Whether span creation is active (enableOTLP or external TracerProvider)
dbosVersion: loadDbosVersion(), // The version of the DBOS library
};
export const sleepms = (ms: number) => new Promise((r) => setTimeout(r, ms));
Expand Down
112 changes: 91 additions & 21 deletions tests/telemetry.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
import { InMemorySpanExporter, ReadableSpan, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from './nodetraceprovider';
import { DBOS } from '../src';

const memoryExporter = new InMemorySpanExporter();
const provider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(memoryExporter)],
});
provider.register();

import Koa from 'koa';
import Router from '@koa/router';
import { context, trace, SpanStatusCode } from '@opentelemetry/api';
Expand All @@ -24,7 +17,7 @@ async function doSomethingTraced_internal() {
if (span) {
span.setAttribute('my-lib.didSomething', true);
}
if (globalParams.enableOTLP) {
if (globalParams.tracingEnabled) {
expect(DBOS.span).toBe(trace.getSpan(context.active()));
}
await DBOS.runStep(tracedStep, { name: 'tracedStep' });
Expand All @@ -33,11 +26,10 @@ async function doSomethingTraced_internal() {

const doSomethingTraced = DBOS.registerWorkflow(doSomethingTraced_internal);

export function createApp() {
function createApp() {
const app = new Koa();
const router = new Router();

// Tracing middleware (emulates instrumentation or full middleware, which is not working...)
app.use(async (ctx, next) => {
const current = trace.getSpan(context.active());
if (current) {
Expand All @@ -64,7 +56,6 @@ export function createApp() {
}
});

// Route
router.get('/test', async (ctx) => {
await doSomethingTraced();
ctx.body = 'OK';
Expand All @@ -78,29 +69,32 @@ export function createApp() {

function getParentSpanID(span: ReadableSpan) {
const ctx = span.parentSpanContext;
if (ctx) {
return ctx.spanId;
} else {
return undefined;
}
return ctx ? ctx.spanId : undefined;
}

describe('trace spans propagate ', () => {
describe('trace spans propagate', () => {
const memoryExporter = new InMemorySpanExporter();

beforeAll(async () => {
memoryExporter.reset();
const provider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(memoryExporter)],
});
provider.register();
DBOS.setConfig({ name: 'trace-span-propagate', enableOTLP: true });
await DBOS.launch();
});

afterAll(async () => {
await DBOS.shutdown();
trace.disable();
context.disable();
});

test('from-outside-into-DBOS-calls', async () => {
expect(isTraceContextWorking()).toBe(true);

const app = createApp();
const server = app.listen(0); // Koa uses native HTTP
const server = app.listen(0);

const { port } = server.address() as AddressInfo;

Expand Down Expand Up @@ -135,21 +129,28 @@ describe('trace spans propagate ', () => {
});

describe('disable-otlp', () => {
const memoryExporter = new InMemorySpanExporter();

beforeAll(async () => {
memoryExporter.reset();
const provider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(memoryExporter)],
});
provider.register();
DBOS.setConfig({ name: 'trace-span-propagate' });
await DBOS.launch();
});

afterAll(async () => {
await DBOS.shutdown();
trace.disable();
context.disable();
});

test('disable-otlp', async () => {
expect(isTraceContextWorking()).toBe(false);

const app = createApp();
const server = app.listen(0); // Koa uses native HTTP
const server = app.listen(0);

const { port } = server.address() as AddressInfo;

Expand All @@ -163,3 +164,72 @@ describe('disable-otlp', () => {
expect(spans.length).toBe(1);
});
});

describe('external-provider-span-propagation', () => {
const memoryExporter = new InMemorySpanExporter();

beforeAll(async () => {
const provider = new NodeTracerProvider({
spanProcessors: [new SimpleSpanProcessor(memoryExporter)],
});
provider.register();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the provider is first-registered win, then how does the test work?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't it work?

DBOS.setConfig({ name: 'external-provider-test', tracingEnabled: true });
await DBOS.launch();
});

afterAll(async () => {
await DBOS.shutdown();
trace.disable();
context.disable();
});

test('spans-flow-through-external-provider', async () => {
expect(isTraceContextWorking()).toBe(true);

const app = createApp();
const server = app.listen(0);

const { port } = server.address() as AddressInfo;

const res = await fetch(`http://localhost:${port}/test`);

expect(res.status).toBe(200);
server.close();

const spans = memoryExporter.getFinishedSpans();
const realSpans = spans.filter((s) => s.name !== 'probe');
expect(realSpans.length).toBe(3);

const stepSpan = realSpans[0];
const workflowSpan = realSpans[1];
const httpSpan = realSpans[2];

expect(getParentSpanID(stepSpan)).toBe(workflowSpan?.spanContext().spanId);
expect(stepSpan?.spanContext().traceId).toBe(workflowSpan?.spanContext().traceId);
expect(getParentSpanID(workflowSpan)).toBe(httpSpan?.spanContext().spanId);
expect(workflowSpan?.spanContext().traceId).toBe(httpSpan?.spanContext().traceId);
});
});

describe('dbos-standalone-tracing', () => {
beforeAll(async () => {
// No external provider — DBOS sets up its own BasicTracerProvider and context manager
DBOS.setConfig({ name: 'standalone-tracing-test', enableOTLP: true });
await DBOS.launch();
});

afterAll(async () => {
await DBOS.shutdown();
trace.disable();
context.disable();
});

test('context-propagation-works', () => {
expect(isTraceContextWorking()).toBe(true);
});

test('workflows-produce-real-spans', async () => {
const result = await doSomethingTraced();
expect(result).toBe('Done');
});
});