Skip to content

Commit b24a96f

Browse files
feat: integrate OpenTelemetry tracing with @microlabs/otel-cf-workers (#1971)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Adam <[email protected]> Co-authored-by: Aj Wazzan <[email protected]>
1 parent 559e4c6 commit b24a96f

File tree

8 files changed

+490
-72
lines changed

8 files changed

+490
-72
lines changed

apps/server/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
"@googleapis/gmail": "12.0.0",
3636
"@googleapis/people": "3.0.9",
3737
"@hono/trpc-server": "^0.3.4",
38+
"@microlabs/otel-cf-workers": "1.0.0-rc.52",
3839
"@microsoft/microsoft-graph-client": "^3.0.7",
3940
"@microsoft/microsoft-graph-types": "^2.40.0",
4041
"@modelcontextprotocol/sdk": "1.15.1",
42+
"@opentelemetry/api": "1.9.0",
4143
"@react-email/components": "^0.0.41",
4244
"@react-email/render": "1.1.0",
4345
"@sentry/cloudflare": "9.43.0",

apps/server/src/env.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ export type ZeroEnv = {
9494
MEET_AUTH_HEADER: string;
9595
MEET_API_URL: string;
9696
ENABLE_MEET: 'true' | 'false';
97+
OTEL_EXPORTER_OTLP_ENDPOINT?: string;
98+
OTEL_EXPORTER_OTLP_HEADERS?: string;
99+
OTEL_SERVICE_NAME?: string;
97100
};
98101

99102
const env = _env as ZeroEnv;

apps/server/src/lib/tracing.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { trace, type Tracer, type Attributes } from '@opentelemetry/api';
2+
3+
export const initTracing = (): Tracer => {
4+
return trace.getTracer('zero-email-server', '1.0.0');
5+
};
6+
7+
export const createSpan = (tracer: Tracer, name: string, attributes?: Attributes) => {
8+
return tracer.startSpan(name, { attributes });
9+
};

apps/server/src/main.ts

Lines changed: 125 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import {
2222
} from './lib/attachments';
2323
import { SyncThreadsCoordinatorWorkflow } from './workflows/sync-threads-coordinator-workflow';
2424
import { WorkerEntrypoint, DurableObject, RpcTarget } from 'cloudflare:workers';
25+
import { instrument, type ResolveConfigFn } from '@microlabs/otel-cf-workers';
2526
import { getZeroAgent, getZeroDB, verifyToken } from './lib/server-utils';
2627
import { SyncThreadsWorkflow } from './workflows/sync-threads-workflow';
2728
import { ShardRegistry, ZeroAgent, ZeroDriver } from './routes/agent';
@@ -40,6 +41,7 @@ import { ZeroMCP } from './routes/agent/mcp';
4041
import { publicRouter } from './routes/auth';
4142
import { WorkflowRunner } from './pipelines';
4243
import { autumnApi } from './routes/autumn';
44+
import { initTracing } from './lib/tracing';
4345
import { env, type ZeroEnv } from './env';
4446
import type { HonoContext } from './ctx';
4547
import { createDb, type DB } from './db';
@@ -747,49 +749,111 @@ const app = new Hono<HonoContext>()
747749
}
748750
})
749751
.post('/a8n/notify/:providerId', async (c) => {
750-
if (!c.req.header('Authorization')) return c.json({ error: 'Unauthorized' }, { status: 401 });
751-
if (env.DISABLE_WORKFLOWS === 'true') return c.json({ message: 'OK' }, { status: 200 });
752-
const providerId = c.req.param('providerId');
753-
if (providerId === EProviders.google) {
754-
const body = await c.req.json<{ historyId: string }>();
755-
const subHeader = c.req.header('x-goog-pubsub-subscription-name');
756-
if (!subHeader) {
757-
console.log('[GOOGLE] no subscription header', body);
758-
return c.json({}, { status: 200 });
752+
const tracer = initTracing();
753+
const span = tracer.startSpan('a8n_notify', {
754+
attributes: {
755+
'provider.id': c.req.param('providerId'),
756+
'notification.type': 'email_notification',
757+
'http.method': c.req.method,
758+
'http.url': c.req.url,
759+
},
760+
});
761+
762+
try {
763+
if (!c.req.header('Authorization')) {
764+
span.setAttributes({ 'auth.status': 'missing' });
765+
return c.json({ error: 'Unauthorized' }, { status: 401 });
759766
}
760-
const isValid = await verifyToken(c.req.header('Authorization')!.split(' ')[1]);
761-
if (!isValid) {
762-
console.log('[GOOGLE] invalid request', body);
763-
return c.json({}, { status: 200 });
767+
if (env.DISABLE_WORKFLOWS === 'true') {
768+
span.setAttributes({ 'workflows.disabled': true });
769+
return c.json({ message: 'OK' }, { status: 200 });
764770
}
765-
try {
766-
await env.thread_queue.send({
767-
providerId,
768-
historyId: body.historyId,
769-
subscriptionName: subHeader,
770-
});
771-
} catch (error) {
772-
console.error('Error sending to thread queue', error, {
773-
providerId,
774-
historyId: body.historyId,
775-
subscriptionName: subHeader,
771+
const providerId = c.req.param('providerId');
772+
if (providerId === EProviders.google) {
773+
const body = await c.req.json<{ historyId: string }>();
774+
const subHeader = c.req.header('x-goog-pubsub-subscription-name');
775+
776+
span.setAttributes({
777+
'history.id': body.historyId,
778+
'subscription.name': subHeader || 'missing',
776779
});
780+
781+
if (!subHeader) {
782+
console.log('[GOOGLE] no subscription header', body);
783+
span.setAttributes({ 'error.type': 'missing_subscription_header' });
784+
return c.json({}, { status: 200 });
785+
}
786+
const isValid = await verifyToken(c.req.header('Authorization')!.split(' ')[1]);
787+
if (!isValid) {
788+
console.log('[GOOGLE] invalid request', body);
789+
span.setAttributes({ 'auth.status': 'invalid' });
790+
return c.json({}, { status: 200 });
791+
}
792+
793+
span.setAttributes({ 'auth.status': 'valid' });
794+
795+
try {
796+
await env.thread_queue.send({
797+
providerId,
798+
historyId: body.historyId,
799+
subscriptionName: subHeader,
800+
});
801+
span.setAttributes({ 'queue.message_sent': true });
802+
} catch (error) {
803+
console.error('Error sending to thread queue', error, {
804+
providerId,
805+
historyId: body.historyId,
806+
subscriptionName: subHeader,
807+
});
808+
span.recordException(error as Error);
809+
span.setStatus({ code: 2, message: (error as Error).message });
810+
}
811+
return c.json({ message: 'OK' }, { status: 200 });
777812
}
778-
return c.json({ message: 'OK' }, { status: 200 });
813+
} catch (error) {
814+
span.recordException(error as Error);
815+
span.setStatus({ code: 2, message: (error as Error).message });
816+
throw error;
817+
} finally {
818+
span.end();
779819
}
780820
});
821+
const handler = {
822+
async fetch(request: Request, env: ZeroEnv, ctx: ExecutionContext): Promise<Response> {
823+
return app.fetch(request, env, ctx);
824+
},
825+
};
826+
827+
const config: ResolveConfigFn = (env: ZeroEnv) => {
828+
return {
829+
exporter: {
830+
url: env.OTEL_EXPORTER_OTLP_ENDPOINT || 'https://api.axiom.co/v1/traces',
831+
headers: env.OTEL_EXPORTER_OTLP_HEADERS
832+
? Object.fromEntries(
833+
env.OTEL_EXPORTER_OTLP_HEADERS.split(',').map((header: string) => {
834+
const [key, value] = header.split('=');
835+
return [key.trim(), value.trim()];
836+
}),
837+
)
838+
: {},
839+
},
840+
service: {
841+
name: env.OTEL_SERVICE_NAME || 'zero-email-server',
842+
version: '1.0.0',
843+
},
844+
};
845+
};
846+
781847
export default class Entry extends WorkerEntrypoint<ZeroEnv> {
782848
async fetch(request: Request): Promise<Response> {
783-
// const url = new URL(request.url);
784-
// if (url.pathname === '/__studio') {
785-
// return await studio(request, env.ZERO_DRIVER, {
786-
// basicAuth: { username: 'admin', password: 'password' },
787-
// });
788-
// }
789-
return app.fetch(request, this.env, this.ctx);
849+
const instrumentedHandler = instrument(handler, config);
850+
if (instrumentedHandler && instrumentedHandler.fetch) {
851+
return instrumentedHandler.fetch(request as any, this.env, this.ctx);
852+
}
853+
return handler.fetch(request, this.env, this.ctx);
790854
}
791855
async queue(
792-
batch: MessageBatch<any> | { queue: string; messages: Array<{ body: IEmailSendBatch }> },
856+
batch: MessageBatch<unknown> | { queue: string; messages: Array<{ body: IEmailSendBatch }> },
793857
) {
794858
switch (true) {
795859
case batch.queue.startsWith('subscribe-queue'): {
@@ -817,7 +881,7 @@ export default class Entry extends WorkerEntrypoint<ZeroEnv> {
817881
const { messageId, connectionId, mail } = msg.body;
818882

819883
const { pending_emails_status: statusKV, pending_emails_payload: payloadKV } = this
820-
.env as any;
884+
.env as { pending_emails_status: KVNamespace; pending_emails_payload: KVNamespace };
821885

822886
const status = await statusKV.get(messageId);
823887
if (status === 'cancelled') {
@@ -881,22 +945,41 @@ export default class Entry extends WorkerEntrypoint<ZeroEnv> {
881945
return;
882946
}
883947
case batch.queue.startsWith('thread-queue'): {
948+
const tracer = initTracing();
949+
884950
await Promise.all(
885951
batch.messages.map(async (msg: any) => {
886-
const providerId = msg.body.providerId;
887-
const historyId = msg.body.historyId;
888-
const subscriptionName = msg.body.subscriptionName;
952+
const span = tracer.startSpan('thread_queue_processing', {
953+
attributes: {
954+
'provider.id': msg.body.providerId,
955+
'history.id': msg.body.historyId,
956+
'subscription.name': msg.body.subscriptionName,
957+
'queue.name': batch.queue,
958+
},
959+
});
889960

890961
try {
962+
const providerId = msg.body.providerId;
963+
const historyId = msg.body.historyId;
964+
const subscriptionName = msg.body.subscriptionName;
965+
891966
const workflowRunner = env.WORKFLOW_RUNNER.get(env.WORKFLOW_RUNNER.newUniqueId());
892967
const result = await workflowRunner.runMainWorkflow({
893968
providerId,
894969
historyId,
895970
subscriptionName,
896971
});
897972
console.log('[THREAD_QUEUE] result', result);
973+
span.setAttributes({
974+
'workflow.result': typeof result === 'string' ? result : JSON.stringify(result),
975+
'workflow.success': true,
976+
});
898977
} catch (error) {
899978
console.error('Error running workflow', error);
979+
span.recordException(error as Error);
980+
span.setStatus({ code: 2, message: (error as Error).message });
981+
} finally {
982+
span.end();
900983
}
901984
}),
902985
);
@@ -914,7 +997,10 @@ export default class Entry extends WorkerEntrypoint<ZeroEnv> {
914997

915998
private async processScheduledEmails() {
916999
console.log('Checking for scheduled emails ready to be queued...');
917-
const { scheduled_emails: scheduledKV, send_email_queue } = this.env as any;
1000+
const { scheduled_emails: scheduledKV, send_email_queue } = this.env as {
1001+
scheduled_emails: KVNamespace;
1002+
send_email_queue: Queue<IEmailSendBatch>;
1003+
};
9181004

9191005
try {
9201006
const now = Date.now();
@@ -991,7 +1077,7 @@ export default class Entry extends WorkerEntrypoint<ZeroEnv> {
9911077

9921078
for (const key of listResp.keys) {
9931079
try {
994-
const wakeAtIso = (key as any).metadata?.wakeAt as string | undefined;
1080+
const wakeAtIso = key.metadata?.wakeAt as string | undefined;
9951081
if (!wakeAtIso) continue;
9961082
const wakeAt = new Date(wakeAtIso).getTime();
9971083
if (wakeAt > nowTs) continue;

apps/server/src/pipelines.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { Effect, Console, Logger } from 'effect';
2424
import { connection } from './db/schema';
2525
import { EProviders } from './types';
2626
import type { ZeroEnv } from './env';
27+
import { initTracing } from './lib/tracing';
2728
import { EPrompts } from './types';
2829
import { eq } from 'drizzle-orm';
2930
import { createDb } from './db';
@@ -140,6 +141,15 @@ export class WorkflowRunner extends DurableObject<ZeroEnv> {
140141
* @returns
141142
*/
142143
public runMainWorkflow(params: MainWorkflowParams) {
144+
const tracer = initTracing();
145+
const span = tracer.startSpan('workflow_main', {
146+
attributes: {
147+
'provider.id': params.providerId,
148+
'history.id': params.historyId,
149+
'subscription.name': params.subscriptionName
150+
}
151+
});
152+
143153
return Effect.gen(this, function* () {
144154
yield* Console.log('[MAIN_WORKFLOW] Starting workflow with payload:', params);
145155

@@ -148,9 +158,11 @@ export class WorkflowRunner extends DurableObject<ZeroEnv> {
148158
const serviceAccount = getServiceAccount();
149159

150160
const connectionId = yield* validateArguments(params, serviceAccount);
161+
span.setAttributes({ 'connection.id': connectionId });
151162

152163
if (!isValidUUID(connectionId)) {
153164
yield* Console.log('[MAIN_WORKFLOW] Invalid connection id format:', connectionId);
165+
span.setAttributes({ 'error.type': 'invalid_connection_id' });
154166
return yield* Effect.fail({
155167
_tag: 'InvalidConnectionId' as const,
156168
connectionId,
@@ -165,6 +177,8 @@ export class WorkflowRunner extends DurableObject<ZeroEnv> {
165177
}),
166178
}).pipe(Effect.orElse(() => Effect.succeed(null)));
167179

180+
span.setAttributes({ 'history.previous_id': previousHistoryId || 'none' });
181+
168182
if (providerId === EProviders.google) {
169183
yield* Console.log('[MAIN_WORKFLOW] Processing Google provider workflow');
170184
yield* Console.log('[MAIN_WORKFLOW] Previous history ID:', previousHistoryId);
@@ -181,17 +195,26 @@ export class WorkflowRunner extends DurableObject<ZeroEnv> {
181195
});
182196

183197
yield* Console.log('[MAIN_WORKFLOW] Zero workflow result:', result);
198+
span.setAttributes({ 'workflow.result': typeof result === 'string' ? result : JSON.stringify(result) });
184199
} else {
185200
yield* Console.log('[MAIN_WORKFLOW] Unsupported provider:', providerId);
201+
span.setAttributes({ 'error.type': 'unsupported_provider' });
186202
return yield* Effect.fail({
187203
_tag: 'UnsupportedProvider' as const,
188204
providerId,
189205
});
190206
}
191207

192208
yield* Console.log('[MAIN_WORKFLOW] Workflow completed successfully');
209+
span.setAttributes({ 'workflow.success': true });
193210
return 'Workflow completed successfully';
194211
}).pipe(
212+
Effect.tap(() => Effect.sync(() => span.end())),
213+
Effect.tapError((error) => Effect.sync(() => {
214+
span.recordException(error as unknown as Error);
215+
span.setStatus({ code: 2, message: String(error) });
216+
span.end();
217+
})),
195218
Effect.tapError((error) => Console.log('[MAIN_WORKFLOW] Error in workflow:', error)),
196219
Effect.provide(loggerLayer),
197220
Effect.runPromise,
@@ -600,7 +623,8 @@ export class WorkflowRunner extends DurableObject<ZeroEnv> {
600623
threadId: threadId.toString(),
601624
thread,
602625
foundConnection,
603-
results: new Map<string, any>(),
626+
results: new Map<string, unknown>(),
627+
env: this.env,
604628
};
605629

606630
// Execute configured workflows using the workflow engine
@@ -758,12 +782,13 @@ export class WorkflowRunner extends DurableObject<ZeroEnv> {
758782
threadId: threadId.toString(),
759783
thread,
760784
foundConnection,
761-
results: new Map<string, any>(),
785+
results: new Map<string, unknown>(),
786+
env: this.env,
762787
};
763788

764789
let workflowResults;
765790
try {
766-
const allResults = new Map<string, any>();
791+
const allResults = new Map<string, unknown>();
767792
const allErrors = new Map<string, Error>();
768793

769794
const workflowNames = workflowEngine.getWorkflowNames();

0 commit comments

Comments
 (0)