Skip to content

Commit b9a5db5

Browse files
Implement per-page workflow system for sync-threads to overcome API limits (#1941)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Adam <[email protected]> Co-authored-by: Aj Wazzan <[email protected]>
1 parent 17639d4 commit b9a5db5

File tree

7 files changed

+343
-23
lines changed

7 files changed

+343
-23
lines changed

apps/server/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export type ZeroEnv = {
1212
WORKFLOW_RUNNER: DurableObjectNamespace<WorkflowRunner & QueryableHandler>;
1313
THREAD_SYNC_WORKER: DurableObjectNamespace<ThreadSyncWorker>;
1414
SYNC_THREADS_WORKFLOW: Workflow;
15+
SYNC_THREADS_COORDINATOR_WORKFLOW: Workflow;
1516
HYPERDRIVE: { connectionString: string };
1617
pending_emails_status: KVNamespace;
1718
pending_emails_payload: KVNamespace;

apps/server/src/lib/driver/microsoft.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,11 +1039,11 @@ export class OutlookMailManager implements MailManager {
10391039
},
10401040
})) || [];
10411041

1042-
let references: string | undefined;
1043-
let inReplyTo: string | undefined;
1044-
let listUnsubscribe: string | undefined;
1045-
let listUnsubscribePost: string | undefined;
1046-
let replyTo: string | undefined;
1042+
const references: string | undefined = undefined;
1043+
const inReplyTo: string | undefined = undefined;
1044+
const listUnsubscribe: string | undefined = undefined;
1045+
const listUnsubscribePost: string | undefined = undefined;
1046+
const replyTo: string | undefined = undefined;
10471047

10481048
// TODO: use headers if available
10491049
// if (headers) {

apps/server/src/main.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { WorkerEntrypoint, DurableObject, RpcTarget } from 'cloudflare:workers';
2424
import { getZeroAgent, getZeroDB, verifyToken } from './lib/server-utils';
2525
import { ThreadSyncWorker } from './routes/agent/sync-worker';
2626
import { SyncThreadsWorkflow } from './workflows/sync-threads-workflow';
27+
import { SyncThreadsCoordinatorWorkflow } from './workflows/sync-threads-coordinator-workflow';
2728
import { EProviders, type IEmailSendBatch } from './types';
2829
import { oAuthDiscoveryMetadata } from 'better-auth/plugins';
2930
import { eq, and, desc, asc, inArray } from 'drizzle-orm';
@@ -1059,4 +1060,4 @@ export default class Entry extends WorkerEntrypoint<ZeroEnv> {
10591060
}
10601061
}
10611062

1062-
export { ZeroAgent, ZeroMCP, ZeroDB, ZeroDriver, ThinkingMCP, WorkflowRunner, ThreadSyncWorker, SyncThreadsWorkflow };
1063+
export { ZeroAgent, ZeroMCP, ZeroDB, ZeroDriver, ThinkingMCP, WorkflowRunner, ThreadSyncWorker, SyncThreadsWorkflow, SyncThreadsCoordinatorWorkflow };

apps/server/src/routes/agent/index.ts

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,23 @@ export class ZeroDriver extends DurableObject<ZeroEnv> {
327327
}
328328

329329
async isSyncing(): Promise<boolean> {
330-
try {
331-
const workflowInstance = await this.env.SYNC_THREADS_WORKFLOW.get(`${this.name}-inbox`);
332-
const status = (await workflowInstance.status()).status;
333-
return ['running', 'queued', 'waiting'].includes(status);
334-
} catch {
335-
return false;
336-
}
330+
return false;
331+
// try {
332+
// const coordinatorInstance = await this.env.SYNC_THREADS_COORDINATOR_WORKFLOW.get(`${this.name}-inbox-coordinator`);
333+
// const coordinatorStatus = (await coordinatorInstance.status()).status;
334+
// if (['running', 'queued', 'waiting'].includes(coordinatorStatus)) {
335+
// return true;
336+
// }
337+
// } catch {
338+
// }
339+
340+
// try {
341+
// const workflowInstance = await this.env.SYNC_THREADS_WORKFLOW.get(`${this.name}-inbox`);
342+
// const status = (await workflowInstance.status()).status;
343+
// return ['running', 'queued', 'waiting'].includes(status);
344+
// } catch {
345+
// return false;
346+
// }
337347
}
338348

339349
async getAllSubjects() {
@@ -1505,25 +1515,35 @@ export class ZeroDriver extends DurableObject<ZeroEnv> {
15051515

15061516
private async triggerSyncWorkflow(folder: string): Promise<void> {
15071517
try {
1508-
console.log(`[ZeroDriver] Triggering sync workflow for ${this.name}/${folder}`);
1518+
console.log(`[ZeroDriver] Triggering sync coordinator workflow for ${this.name}/${folder}`);
15091519

1510-
const instance = await this.env.SYNC_THREADS_WORKFLOW.create({
1511-
id: `${this.name}-${folder}`,
1520+
const instance = await this.env.SYNC_THREADS_COORDINATOR_WORKFLOW.create({
15121521
params: {
15131522
connectionId: this.name,
15141523
folder: folder,
15151524
},
15161525
});
15171526

15181527
console.log(
1519-
`[ZeroDriver] Sync workflow triggered for ${this.name}/${folder}, instance: ${instance.id}`,
1528+
`[ZeroDriver] Sync coordinator workflow triggered for ${this.name}/${folder}, instance: ${instance.id}`,
15201529
);
15211530
} catch (error) {
15221531
console.error(
1523-
`[ZeroDriver] Failed to trigger sync workflow for ${this.name}/${folder}:`,
1532+
`[ZeroDriver] Failed to trigger sync coordinator workflow for ${this.name}/${folder}:`,
15241533
error,
15251534
);
1526-
// await this.syncThreads(folder);
1535+
// try {
1536+
// const fallbackInstance = await this.env.SYNC_THREADS_WORKFLOW.create({
1537+
// id: `${this.name}-${folder}`,
1538+
// params: {
1539+
// connectionId: this.name,
1540+
// folder: folder,
1541+
// },
1542+
// });
1543+
// console.log(`[ZeroDriver] Fallback to original workflow: ${fallbackInstance.id}`);
1544+
// } catch (fallbackError) {
1545+
// console.error(`[ZeroDriver] Fallback workflow also failed:`, fallbackError);
1546+
// }
15271547
}
15281548
}
15291549
}
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
import { WorkflowEntrypoint, WorkflowStep } from 'cloudflare:workers';
2+
import { connectionToDriver } from '../lib/server-utils';
3+
import type { WorkflowEvent } from 'cloudflare:workers';
4+
import { connection } from '../db/schema';
5+
import type { ZeroEnv } from '../env';
6+
import { eq } from 'drizzle-orm';
7+
import { createDb } from '../db';
8+
9+
export interface SyncThreadsCoordinatorParams {
10+
connectionId: string;
11+
folder: string;
12+
}
13+
14+
export interface SyncThreadsCoordinatorResult {
15+
totalSynced: number;
16+
message: string;
17+
folder: string;
18+
totalPagesProcessed: number;
19+
totalThreads: number;
20+
totalSuccessfulSyncs: number;
21+
totalFailedSyncs: number;
22+
pageWorkflowResults: Array<{
23+
pageNumber: number;
24+
workflowId: string;
25+
status: 'completed' | 'failed';
26+
synced: number;
27+
error?: string;
28+
}>;
29+
}
30+
31+
export class SyncThreadsCoordinatorWorkflow extends WorkflowEntrypoint<
32+
ZeroEnv,
33+
SyncThreadsCoordinatorParams
34+
> {
35+
async run(
36+
event: WorkflowEvent<SyncThreadsCoordinatorParams>,
37+
step: WorkflowStep,
38+
): Promise<SyncThreadsCoordinatorResult> {
39+
const { connectionId, folder } = event.payload;
40+
41+
console.info(
42+
`[SyncThreadsCoordinatorWorkflow] Starting coordination for connection ${connectionId}, folder ${folder}`,
43+
);
44+
45+
const result: SyncThreadsCoordinatorResult = {
46+
totalSynced: 0,
47+
message: 'Coordination completed',
48+
folder,
49+
totalPagesProcessed: 0,
50+
totalThreads: 0,
51+
totalSuccessfulSyncs: 0,
52+
totalFailedSyncs: 0,
53+
pageWorkflowResults: [],
54+
};
55+
56+
const setupResult = await step.do(`setup-connection-${connectionId}-${folder}`, async () => {
57+
const { db, conn } = createDb(this.env.HYPERDRIVE.connectionString);
58+
59+
const foundConnection = await db.query.connection.findFirst({
60+
where: eq(connection.id, connectionId),
61+
});
62+
63+
await conn.end();
64+
65+
if (!foundConnection) {
66+
throw new Error(`Connection ${connectionId} not found`);
67+
}
68+
69+
const maxCount = parseInt(this.env.THREAD_SYNC_MAX_COUNT || '20');
70+
const shouldLoop = true;
71+
72+
return { maxCount, shouldLoop, foundConnection };
73+
});
74+
75+
const { maxCount, shouldLoop, foundConnection } = setupResult as {
76+
maxCount: number;
77+
shouldLoop: boolean;
78+
foundConnection: any;
79+
};
80+
const driver = connectionToDriver(foundConnection);
81+
82+
if (connectionId.includes('aggregate')) {
83+
console.info(
84+
`[SyncThreadsCoordinatorWorkflow] Skipping sync for aggregate instance - folder ${folder}`,
85+
);
86+
result.message = 'Skipped aggregate instance';
87+
return result;
88+
}
89+
90+
if (!driver) {
91+
console.warn(`[SyncThreadsCoordinatorWorkflow] No driver available for folder ${folder}`);
92+
result.message = 'No driver available';
93+
return result;
94+
}
95+
96+
// Process pages sequentially
97+
let currentPageToken: string | null = null;
98+
let pageNumber = 0;
99+
100+
do {
101+
pageNumber++;
102+
103+
// Process this page
104+
const pageResult = await step.do(
105+
`process-page-${pageNumber}-${folder}-${connectionId}`,
106+
async () => {
107+
console.info(
108+
`[SyncThreadsCoordinatorWorkflow] Processing page ${pageNumber} for ${folder}`,
109+
);
110+
111+
// Create workflow for this page
112+
const instance = await this.env.SYNC_THREADS_WORKFLOW.create({
113+
params: {
114+
connectionId,
115+
folder,
116+
pageNumber,
117+
pageToken: currentPageToken,
118+
maxCount,
119+
singlePageMode: true,
120+
},
121+
});
122+
123+
console.info(
124+
`[SyncThreadsCoordinatorWorkflow] Created workflow ${instance.id} for page ${pageNumber}`,
125+
);
126+
127+
// Simple polling to wait for completion
128+
let attempts = 0;
129+
const maxAttempts = 60; // 5 minutes
130+
131+
while (attempts < maxAttempts) {
132+
await new Promise((resolve) => setTimeout(resolve, 5000));
133+
134+
try {
135+
const status = await instance.status();
136+
if (status.status === 'complete') {
137+
return { result: status.output, workflowId: instance.id };
138+
} else if (status.status === 'errored') {
139+
throw new Error(`Workflow ${instance.id} failed`);
140+
}
141+
} catch (error) {
142+
if (attempts === maxAttempts - 1) {
143+
throw error;
144+
}
145+
}
146+
147+
attempts++;
148+
}
149+
150+
throw new Error(`Workflow ${instance.id} timed out`);
151+
},
152+
);
153+
154+
// Update result with this page's data
155+
if (pageResult?.result) {
156+
const workflowResult = pageResult.result as any;
157+
result.pageWorkflowResults.push({
158+
pageNumber,
159+
workflowId: pageResult.workflowId,
160+
status: 'completed',
161+
synced: workflowResult.synced || 0,
162+
});
163+
164+
result.totalSynced += workflowResult.synced || 0;
165+
result.totalPagesProcessed += 1;
166+
result.totalThreads += workflowResult.totalThreads || 0;
167+
result.totalSuccessfulSyncs += workflowResult.successfulSyncs || 0;
168+
result.totalFailedSyncs += workflowResult.failedSyncs || 0;
169+
170+
// Get next page token from workflow result if available
171+
currentPageToken = workflowResult.nextPageToken || null;
172+
} else {
173+
// If no result, we can't continue
174+
break;
175+
}
176+
177+
// If no more pages, stop
178+
if (!currentPageToken) {
179+
console.info(`[SyncThreadsCoordinatorWorkflow] No more pages for ${folder}`);
180+
break;
181+
}
182+
} while (currentPageToken && shouldLoop);
183+
184+
console.info(
185+
`[SyncThreadsCoordinatorWorkflow] Completed ${folder}: ${result.totalSynced} synced across ${result.totalPagesProcessed} pages`,
186+
);
187+
188+
return result;
189+
}
190+
}

0 commit comments

Comments
 (0)