Skip to content

Commit 81bcf63

Browse files
committed
Callback
1 parent 198d143 commit 81bcf63

File tree

5 files changed

+359
-39
lines changed

5 files changed

+359
-39
lines changed

apps/execution-worker/src/context.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,23 @@ import type { ExecutionAgent } from "./durable-objects/execution-agent";
33

44
// Worker context and environment types
55

6+
// Type for InteractionService RPC interface
7+
export type InteractionServiceRpc = {
8+
handleAgentCompletion(input: {
9+
agentId: string;
10+
conversationId: string;
11+
success: boolean;
12+
result?: string;
13+
error?: string;
14+
}): Promise<void>;
15+
};
16+
617
// Re-export the Env from the global Cloudflare namespace with proper DO typing
718
export type WorkerEnv = SharedHonoEnv &
819
Cloudflare.Env & {
920
EXECUTION_AGENT: DurableObjectNamespace<ExecutionAgent>;
1021
HYPERDRIVE: Hyperdrive;
11-
INTERACTION_WORKER: Fetcher;
22+
INTERACTION_WORKER: Fetcher & InteractionServiceRpc;
1223
};
1324

1425
export type Variables = SharedHonoVariables;

apps/execution-worker/src/durable-objects/execution-agent.ts

Lines changed: 119 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,9 @@ export class ExecutionAgent extends Agent<WorkerEnv, ExecutionState> {
1515
};
1616

1717
/**
18-
* Execute a task using an agentic loop with tools
18+
* Execute task in background and ping interaction worker when done
1919
*/
20-
@callable()
21-
async executeTask(
22-
input: TaskInput,
23-
): Promise<{ success: boolean; result?: unknown; error?: string }> {
24-
// Check if already executing to prevent concurrent runs
25-
if (this.state.isExecuting) {
26-
logger
27-
.withTags({
28-
agentId: input.agentId,
29-
conversationId: input.conversationId,
30-
})
31-
.warn("ExecutionAgent: Attempted concurrent execution", {
32-
taskDescription: input.taskDescription.substring(0, 100),
33-
});
34-
35-
return {
36-
success: false,
37-
error: "Agent is already executing a task",
38-
};
39-
}
40-
20+
private async executeTaskInBackground(input: TaskInput): Promise<void> {
4121
this.setState({ isExecuting: true });
4222

4323
logger
@@ -204,7 +184,40 @@ ${input.taskDescription}`,
204184
});
205185

206186
this.setState({ isExecuting: false });
207-
return { success: true, result: finalResult };
187+
188+
// Ping interaction worker with completion via RPC
189+
try {
190+
logger
191+
.withTags({
192+
agentId: input.agentId,
193+
conversationId: input.conversationId,
194+
})
195+
.info("ExecutionAgent: Pinging interaction worker with completion");
196+
197+
await this.env.INTERACTION_WORKER.handleAgentCompletion({
198+
agentId: input.agentId,
199+
conversationId: input.conversationId,
200+
success: true,
201+
result: result.text,
202+
});
203+
204+
logger
205+
.withTags({
206+
agentId: input.agentId,
207+
conversationId: input.conversationId,
208+
})
209+
.info("ExecutionAgent: Successfully pinged interaction worker");
210+
} catch (rpcError) {
211+
logger
212+
.withTags({
213+
agentId: input.agentId,
214+
conversationId: input.conversationId,
215+
})
216+
.error("ExecutionAgent: Failed to ping interaction worker", {
217+
error:
218+
rpcError instanceof Error ? rpcError.message : String(rpcError),
219+
});
220+
}
208221
} catch (error) {
209222
const errorMessage =
210223
error instanceof Error ? error.message : String(error);
@@ -251,7 +264,89 @@ ${input.taskDescription}`,
251264
}
252265

253266
this.setState({ isExecuting: false });
254-
return { success: false, error: errorMessage };
267+
268+
// Ping interaction worker with error via RPC
269+
try {
270+
logger
271+
.withTags({
272+
agentId: input.agentId,
273+
conversationId: input.conversationId,
274+
})
275+
.info("ExecutionAgent: Pinging interaction worker with error");
276+
277+
await this.env.INTERACTION_WORKER.handleAgentCompletion({
278+
agentId: input.agentId,
279+
conversationId: input.conversationId,
280+
success: false,
281+
error: errorMessage,
282+
});
283+
284+
logger
285+
.withTags({
286+
agentId: input.agentId,
287+
conversationId: input.conversationId,
288+
})
289+
.info(
290+
"ExecutionAgent: Successfully pinged interaction worker with error",
291+
);
292+
} catch (rpcError) {
293+
logger
294+
.withTags({
295+
agentId: input.agentId,
296+
conversationId: input.conversationId,
297+
})
298+
.error(
299+
"ExecutionAgent: Failed to ping interaction worker with error",
300+
{
301+
error:
302+
rpcError instanceof Error ? rpcError.message : String(rpcError),
303+
},
304+
);
305+
}
255306
}
256307
}
308+
309+
/**
310+
* Execute a task using an agentic loop with tools
311+
* Returns immediately and runs task in background
312+
*/
313+
@callable()
314+
async executeTask(
315+
input: TaskInput,
316+
): Promise<{ success: boolean; message: string }> {
317+
// Check if already executing to prevent concurrent runs
318+
if (this.state.isExecuting) {
319+
logger
320+
.withTags({
321+
agentId: input.agentId,
322+
conversationId: input.conversationId,
323+
})
324+
.warn("ExecutionAgent: Attempted concurrent execution", {
325+
taskDescription: input.taskDescription.substring(0, 100),
326+
});
327+
328+
return {
329+
success: false,
330+
message: "Agent is already executing a task",
331+
};
332+
}
333+
334+
logger
335+
.withTags({
336+
agentId: input.agentId,
337+
conversationId: input.conversationId,
338+
})
339+
.info("ExecutionAgent: Starting task execution in background", {
340+
taskDescription: input.taskDescription.substring(0, 200),
341+
taskLength: input.taskDescription.length,
342+
});
343+
344+
// Start execution in background
345+
this.ctx.waitUntil(this.executeTaskInBackground(input));
346+
347+
return {
348+
success: true,
349+
message: "Task execution started in background",
350+
};
351+
}
257352
}

apps/interaction-worker/src/index.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { WorkerEntrypoint } from "cloudflare:workers";
12
import {
23
useWorkersLogger,
34
withDefaultCors,
@@ -6,14 +7,26 @@ import {
67
} from "@poppy/hono-helpers";
78
import { loopMessageWebhookPayloadSchema } from "@poppy/schemas";
89
import { Hono } from "hono";
9-
import type { App } from "./context";
10+
import type { App, WorkerEnv } from "./context";
1011
import { createDatabaseClient } from "./db/client";
1112
import { logger } from "./helpers/logger";
1213
import { handleMessageInbound } from "./services/loop/loop-message-inbound-handler";
14+
import {
15+
type AgentCompletionInput,
16+
processAgentCompletion,
17+
} from "./services/process-message/process-agent-completion";
1318

1419
// Export Durable Object
1520
export { MessageDebouncer } from "./durable-objects/message-debouncer";
1621

22+
// WorkerEntrypoint for RPC calls from other workers
23+
export class InteractionService extends WorkerEntrypoint<WorkerEnv> {
24+
async handleAgentCompletion(input: AgentCompletionInput): Promise<void> {
25+
const db = createDatabaseClient(this.env);
26+
await processAgentCompletion(input, db, this.env);
27+
}
28+
}
29+
1730
const app = new Hono<App>();
1831

1932
// Middleware

0 commit comments

Comments
 (0)