Skip to content

Commit 803a6ca

Browse files
authored
Implement workflow chain execution to share results between workflows (#1872)
# READ CAREFULLY THEN REMOVE Remove bullet points that are not relevant. PLEASE REFRAIN FROM USING AI TO WRITE YOUR CODE AND PR DESCRIPTION. IF YOU DO USE AI TO WRITE YOUR CODE PLEASE PROVIDE A DESCRIPTION AND REVIEW IT CAREFULLY. MAKE SURE YOU UNDERSTAND THE CODE YOU ARE SUBMITTING USING AI. - Pull requests that do not follow these guidelines will be closed without review or comment. - If you use AI to write your PR description your pr will be close without review or comment. - If you are unsure about anything, feel free to ask for clarification. ## Description Please provide a clear description of your changes. --- ## Type of Change Please delete options that are not relevant. - [ ] 🐛 Bug fix (non-breaking change which fixes an issue) - [ ] ✨ New feature (non-breaking change which adds functionality) - [ ] 💥 Breaking change (fix or feature with breaking changes) - [ ] 📝 Documentation update - [ ] 🎨 UI/UX improvement - [ ] 🔒 Security enhancement - [ ] ⚡ Performance improvement ## Areas Affected Please check all that apply: - [ ] Email Integration (Gmail, IMAP, etc.) - [ ] User Interface/Experience - [ ] Authentication/Authorization - [ ] Data Storage/Management - [ ] API Endpoints - [ ] Documentation - [ ] Testing Infrastructure - [ ] Development Workflow - [ ] Deployment/Infrastructure ## Testing Done Describe the tests you've done: - [ ] Unit tests added/updated - [ ] Integration tests added/updated - [ ] Manual testing performed - [ ] Cross-browser testing (if UI changes) - [ ] Mobile responsiveness verified (if UI changes) ## Security Considerations For changes involving data or authentication: - [ ] No sensitive data is exposed - [ ] Authentication checks are in place - [ ] Input validation is implemented - [ ] Rate limiting is considered (if applicable) ## Checklist - [ ] I have read the [CONTRIBUTING](https://github.com/Mail-0/Zero/blob/staging/.github/CONTRIBUTING.md) document - [ ] My code follows the project's style guidelines - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in complex areas - [ ] I have updated the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix/feature works - [ ] All tests pass locally - [ ] Any dependent changes are merged and published ## Additional Notes Add any other context about the pull request here. ## Screenshots/Recordings Add screenshots or recordings here if applicable. --- _By submitting this pull request, I confirm that my contribution is made under the terms of the project's license._ <!-- This is an auto-generated description by cubic. --> --- ## Summary by cubic Added workflow chain execution so results can be shared between workflows, enabling workflows to pass data to each other during execution. - **New Features** - Introduced `executeWorkflowChain` to run multiple workflows in sequence and share results. - Updated workflow runner to use the new chain execution method. <!-- End of auto-generated description by cubic. --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **Bug Fixes** * Improved accuracy in counting successfully synced threads during workflow execution. * Workflows now proceed regardless of certain environment variable settings. * **New Features** * Added support for executing multiple workflows in sequence with consolidated result and error reporting. * **Refactor** * Simplified and streamlined workflow execution logic for better maintainability. * **Style** * Enhanced code formatting for improved readability. * **Documentation** * Updated method signatures to clarify optional parameters. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 9707613 commit 803a6ca

File tree

4 files changed

+66
-41
lines changed

4 files changed

+66
-41
lines changed

apps/server/src/main.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,7 @@ export class DbRpcDO extends RpcTarget {
187187
return await this.mainDo.deleteEmailTemplate(this.userId, templateId);
188188
}
189189

190-
async updateEmailTemplate(
191-
templateId: string,
192-
data: Partial<typeof emailTemplate.$inferInsert>,
193-
) {
190+
async updateEmailTemplate(templateId: string, data: Partial<typeof emailTemplate.$inferInsert>) {
194191
return await this.mainDo.updateEmailTemplate(this.userId, templateId, data);
195192
}
196193
}
@@ -522,7 +519,10 @@ class ZeroDB extends DurableObject<Env> {
522519
});
523520
}
524521

525-
async createEmailTemplate(userId: string, payload: Omit<typeof emailTemplate.$inferInsert, 'userId'>) {
522+
async createEmailTemplate(
523+
userId: string,
524+
payload: Omit<typeof emailTemplate.$inferInsert, 'userId'>,
525+
) {
526526
return await this.db
527527
.insert(emailTemplate)
528528
.values({

apps/server/src/pipelines.ts

Lines changed: 6 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ export class WorkflowRunner extends DurableObject<Env> {
393393
{ concurrency: 6 }, // Limit concurrency to avoid rate limits
394394
);
395395

396-
const syncedCount = syncResults.length;
396+
const syncedCount = syncResults.filter((result) => result.result.success).length;
397397
const failedCount = threadWorkflowParams.length - syncedCount;
398398

399399
if (failedCount > 0) {
@@ -627,37 +627,15 @@ export class WorkflowRunner extends DurableObject<Env> {
627627
// Execute configured workflows using the workflow engine
628628
const workflowResults = yield* Effect.tryPromise({
629629
try: async () => {
630-
const allResults = new Map<string, any>();
631-
const allErrors = new Map<string, Error>();
632-
633630
// Execute all workflows registered in the engine
634631
const workflowNames = workflowEngine.getWorkflowNames();
635632

636-
for (const workflowName of workflowNames) {
637-
console.log(`[THREAD_WORKFLOW] Executing workflow: ${workflowName}`);
638-
639-
try {
640-
const { results, errors } = await workflowEngine.executeWorkflow(
641-
workflowName,
642-
workflowContext,
643-
);
644-
645-
// Merge results and errors using efficient Map operations
646-
results.forEach((value, key) => allResults.set(key, value));
647-
errors.forEach((value, key) => allErrors.set(key, value));
648-
649-
console.log(`[THREAD_WORKFLOW] Completed workflow: ${workflowName}`);
650-
} catch (error) {
651-
console.error(
652-
`[THREAD_WORKFLOW] Failed to execute workflow ${workflowName}:`,
653-
error,
654-
);
655-
const errorObj = error instanceof Error ? error : new Error(String(error));
656-
allErrors.set(workflowName, errorObj);
657-
}
658-
}
633+
const { results, errors } = await workflowEngine.executeWorkflowChain(
634+
workflowNames,
635+
workflowContext,
636+
);
659637

660-
return { results: allResults, errors: allErrors };
638+
return { results, errors };
661639
},
662640
catch: (error) => ({ _tag: 'WorkflowCreationFailed' as const, error }),
663641
});

apps/server/src/routes/agent/index.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1779,7 +1779,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
17791779
private getDataStreamResponse(
17801780
onFinish: StreamTextOnFinishCallback<{}>,
17811781
options?: {
1782-
abortSignal: AbortSignal | undefined;
1782+
abortSignal?: AbortSignal;
17831783
connectionId?: string;
17841784
},
17851785
) {
@@ -1822,7 +1822,10 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
18221822
onError: (error) => {
18231823
console.error('Error in streamText', error);
18241824
},
1825-
system: await getPrompt(getPromptName(connectionId, EPrompts.Chat), AiChatPrompt(currentThreadId)),
1825+
system: await getPrompt(
1826+
getPromptName(connectionId, EPrompts.Chat),
1827+
AiChatPrompt(currentThreadId),
1828+
),
18261829
});
18271830

18281831
result.mergeIntoDataStream(dataStream);
@@ -1912,7 +1915,9 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
19121915
await this.persistMessages(finalMessages, [connection.id]);
19131916
this.removeAbortController(chatMessageId);
19141917
},
1915-
abortSignal ? { abortSignal, connectionId: connection.id } : { connectionId: connection.id },
1918+
abortSignal
1919+
? { abortSignal, connectionId: connection.id }
1920+
: { connectionId: connection.id },
19161921
);
19171922

19181923
if (response) {
@@ -1955,7 +1960,9 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
19551960
}
19561961
case IncomingMessageType.ThreadIdUpdate: {
19571962
this.connectionThreadIds.set(connection.id, data.threadId);
1958-
console.log(`[ZeroAgent] Updated threadId for connection ${connection.id}: ${data.threadId}`);
1963+
console.log(
1964+
`[ZeroAgent] Updated threadId for connection ${connection.id}: ${data.threadId}`,
1965+
);
19591966
break;
19601967
}
19611968
// case IncomingMessageType.Mail_List: {
@@ -2023,7 +2030,7 @@ export class ZeroAgent extends AIChatAgent<typeof env> {
20232030
async onChatMessage(
20242031
onFinish: StreamTextOnFinishCallback<{}>,
20252032
options?: {
2026-
abortSignal: AbortSignal | undefined;
2033+
abortSignal?: AbortSignal;
20272034
connectionId?: string;
20282035
},
20292036
) {

apps/server/src/thread-workflow-utils/workflow-engine.ts

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ export class WorkflowEngine {
4242
async executeWorkflow(
4343
workflowName: string,
4444
context: WorkflowContext,
45+
existingResults?: Map<string, any>,
4546
): Promise<{ results: Map<string, any>; errors: Map<string, Error> }> {
4647
const workflow = this.workflows.get(workflowName);
4748
if (!workflow) {
4849
throw new Error(`Workflow "${workflowName}" not found`);
4950
}
5051

51-
const results = new Map<string, any>();
52+
const results = new Map<string, any>(existingResults || []);
5253
const errors = new Map<string, Error>();
5354

5455
for (const step of workflow.steps) {
@@ -83,6 +84,45 @@ export class WorkflowEngine {
8384
return { results, errors };
8485
}
8586

87+
async executeWorkflowChain(
88+
workflowNames: string[],
89+
context: WorkflowContext,
90+
): Promise<{ results: Map<string, any>; errors: Map<string, Error> }> {
91+
let sharedResults = new Map<string, any>();
92+
let allErrors = new Map<string, Error>();
93+
94+
for (const workflowName of workflowNames) {
95+
console.log(`[WORKFLOW_ENGINE] Executing workflow in chain: ${workflowName}`);
96+
try {
97+
const { results, errors } = await this.executeWorkflow(
98+
workflowName,
99+
context,
100+
sharedResults,
101+
);
102+
103+
// Merge results
104+
for (const [key, value] of results) {
105+
sharedResults.set(key, value);
106+
}
107+
108+
// Merge errors
109+
for (const [key, error] of errors) {
110+
allErrors.set(key, error);
111+
}
112+
113+
console.log(
114+
`[WORKFLOW_ENGINE] Completed workflow: ${workflowName}, total results: ${sharedResults.size}`,
115+
);
116+
} catch (error) {
117+
const errorObj = error instanceof Error ? error : new Error(String(error));
118+
console.error(`[WORKFLOW_ENGINE] Failed to execute workflow ${workflowName}:`, errorObj);
119+
allErrors.set(workflowName, errorObj);
120+
}
121+
}
122+
123+
return { results: sharedResults, errors: allErrors };
124+
}
125+
86126
clearContext(context: WorkflowContext): void {
87127
if (context.results) {
88128
context.results.clear();
@@ -158,7 +198,7 @@ export const createDefaultWorkflows = (): WorkflowEngine => {
158198
],
159199
};
160200

161-
const _vectorizationWorkflow: WorkflowDefinition = {
201+
const vectorizationWorkflow: WorkflowDefinition = {
162202
name: 'message-vectorization',
163203
description: 'Vectorizes thread messages for search and analysis',
164204
steps: [
@@ -272,7 +312,7 @@ export const createDefaultWorkflows = (): WorkflowEngine => {
272312
};
273313

274314
engine.registerWorkflow(autoDraftWorkflow);
275-
// engine.registerWorkflow(vectorizationWorkflow);
315+
engine.registerWorkflow(vectorizationWorkflow);
276316
engine.registerWorkflow(threadSummaryWorkflow);
277317
engine.registerWorkflow(labelGenerationWorkflow);
278318

0 commit comments

Comments
 (0)