Skip to content

Commit d3711d0

Browse files
committed
feat: add email trigger webhook to execution worker
- Add Composio webhook handler for GMAIL_NEW_GMAIL_MESSAGE triggers - Implement email importance evaluation (labels, keywords) - Dispatch notification tasks to execution agent for important emails
1 parent 14cd8f7 commit d3711d0

File tree

2 files changed

+269
-1
lines changed

2 files changed

+269
-1
lines changed

apps/execution-worker/src/index.ts

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import { Hono } from "hono";
88
import type { App } from "./context";
99
import { createDatabaseClient } from "./db/client";
1010
import { logger } from "./helpers/logger";
11+
import { handleEmailTrigger } from "./services/email-trigger-handler";
1112

1213
// Export Durable Object
1314
export { ExecutionAgent } from "./durable-objects/execution-agent";
@@ -25,7 +26,6 @@ app.get("/health", async (c) => {
2526
env: c.env,
2627
});
2728
const db = createDatabaseClient(c.env);
28-
// Simple query to verify DB connection
2929
await db.execute("SELECT 1");
3030

3131
return c.json({
@@ -49,6 +49,27 @@ app.get("/health", async (c) => {
4949
}
5050
});
5151

52+
app.post("/api/webhooks/composio", async (c) => {
53+
const webhookLogger = logger.withTags({ module: "composio-webhook" });
54+
55+
try {
56+
const body = await c.req.json();
57+
58+
webhookLogger.info("Received Composio webhook", {
59+
event: body.event,
60+
triggerName: body.data?.triggerName,
61+
});
62+
63+
const result = await handleEmailTrigger(body, c.env);
64+
return c.json(result);
65+
} catch (error) {
66+
webhookLogger.error("Error processing Composio webhook", {
67+
error: error instanceof Error ? error.message : String(error),
68+
});
69+
return c.json({ success: false, message: "Internal error" }, 500);
70+
}
71+
});
72+
5273
// Error handlers
5374
app.onError(withOnError<App>());
5475
app.notFound(withNotFound<App>());
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
import {
2+
agents,
3+
conversationParticipants,
4+
conversations,
5+
getDb,
6+
userGmailConnections,
7+
} from "@poppy/db";
8+
import { logger } from "@poppy/hono-helpers";
9+
import { desc, eq } from "drizzle-orm";
10+
import type { WorkerEnv } from "../context";
11+
12+
type Database = ReturnType<typeof getDb>;
13+
14+
export type EmailTriggerPayload = {
15+
event: string;
16+
data: {
17+
triggerName: string;
18+
payload: {
19+
messageId: string;
20+
threadId: string;
21+
labelIds: string[];
22+
snippet: string;
23+
historyId: string;
24+
internalDate: string;
25+
payload: {
26+
headers: Array<{ name: string; value: string }>;
27+
body?: { data?: string };
28+
parts?: Array<{
29+
mimeType: string;
30+
body?: { data?: string };
31+
}>;
32+
};
33+
};
34+
entityId: string;
35+
};
36+
};
37+
38+
const extractEmailHeader = (
39+
headers: Array<{ name: string; value: string }>,
40+
name: string,
41+
): string | undefined => {
42+
return headers.find((h) => h.name.toLowerCase() === name.toLowerCase())
43+
?.value;
44+
};
45+
46+
export const handleEmailTrigger = async (
47+
payload: EmailTriggerPayload,
48+
env: WorkerEnv,
49+
): Promise<{ success: boolean; message: string }> => {
50+
const emailLogger = logger.withTags({ module: "email-trigger" });
51+
52+
emailLogger.info("Processing email trigger", {
53+
event: payload.event,
54+
triggerName: payload.data?.triggerName,
55+
entityId: payload.data?.entityId,
56+
});
57+
58+
if (
59+
payload.event !== "trigger" ||
60+
payload.data?.triggerName !== "GMAIL_NEW_GMAIL_MESSAGE"
61+
) {
62+
emailLogger.info("Ignoring non-email trigger event", {
63+
event: payload.event,
64+
triggerName: payload.data?.triggerName,
65+
});
66+
return { success: true, message: "Event ignored" };
67+
}
68+
69+
const db = getDb(env.HYPERDRIVE.connectionString);
70+
const composioUserId = payload.data.entityId;
71+
72+
const gmailConnection = await db.query.userGmailConnections.findFirst({
73+
where: eq(userGmailConnections.composioUserId, composioUserId),
74+
});
75+
76+
if (!gmailConnection) {
77+
emailLogger.warn("No Gmail connection found for Composio user", {
78+
composioUserId,
79+
});
80+
return { success: false, message: "Gmail connection not found" };
81+
}
82+
83+
const emailPayload = payload.data.payload;
84+
const headers = emailPayload.payload.headers;
85+
86+
const emailInfo = {
87+
messageId: emailPayload.messageId,
88+
threadId: emailPayload.threadId,
89+
from: extractEmailHeader(headers, "From"),
90+
to: extractEmailHeader(headers, "To"),
91+
subject: extractEmailHeader(headers, "Subject"),
92+
date: extractEmailHeader(headers, "Date"),
93+
snippet: emailPayload.snippet,
94+
labels: emailPayload.labelIds,
95+
};
96+
97+
emailLogger.info("Received new email", {
98+
userId: gmailConnection.userId,
99+
from: emailInfo.from,
100+
subject: emailInfo.subject,
101+
});
102+
103+
const isImportant = await evaluateEmailImportance(emailInfo, env);
104+
105+
if (!isImportant) {
106+
emailLogger.info("Email not important enough to notify", {
107+
from: emailInfo.from,
108+
subject: emailInfo.subject,
109+
});
110+
return { success: true, message: "Email not important" };
111+
}
112+
113+
const result = await notifyUserAboutEmail(
114+
gmailConnection.userId,
115+
emailInfo,
116+
db,
117+
env,
118+
);
119+
120+
return result;
121+
};
122+
123+
type EmailInfo = {
124+
messageId: string;
125+
threadId: string;
126+
from: string | undefined;
127+
to: string | undefined;
128+
subject: string | undefined;
129+
date: string | undefined;
130+
snippet: string;
131+
labels: string[];
132+
};
133+
134+
const evaluateEmailImportance = async (
135+
emailInfo: EmailInfo,
136+
_env: WorkerEnv,
137+
): Promise<boolean> => {
138+
const hasImportantLabel = emailInfo.labels.includes("IMPORTANT");
139+
const hasStarredLabel = emailInfo.labels.includes("STARRED");
140+
const isCategoryPrimary = emailInfo.labels.includes("CATEGORY_PERSONAL");
141+
142+
const urgentKeywords = [
143+
"urgent",
144+
"asap",
145+
"immediately",
146+
"time sensitive",
147+
"action required",
148+
"deadline",
149+
"emergency",
150+
];
151+
152+
const subjectLower = (emailInfo.subject || "").toLowerCase();
153+
const snippetLower = emailInfo.snippet.toLowerCase();
154+
155+
const hasUrgentKeyword = urgentKeywords.some(
156+
(keyword) =>
157+
subjectLower.includes(keyword) || snippetLower.includes(keyword),
158+
);
159+
160+
const isImportant =
161+
hasImportantLabel ||
162+
hasStarredLabel ||
163+
(isCategoryPrimary && hasUrgentKeyword);
164+
165+
logger.info("Email importance evaluation", {
166+
from: emailInfo.from,
167+
subject: emailInfo.subject,
168+
hasImportantLabel,
169+
hasStarredLabel,
170+
isCategoryPrimary,
171+
hasUrgentKeyword,
172+
isImportant,
173+
});
174+
175+
return isImportant;
176+
};
177+
178+
const notifyUserAboutEmail = async (
179+
userId: string,
180+
emailInfo: EmailInfo,
181+
db: Database,
182+
env: WorkerEnv,
183+
): Promise<{ success: boolean; message: string }> => {
184+
const notifyLogger = logger.withTags({ module: "email-notify" });
185+
186+
const userConversation = await db
187+
.select({ conversationId: conversationParticipants.conversationId })
188+
.from(conversationParticipants)
189+
.innerJoin(
190+
conversations,
191+
eq(conversations.id, conversationParticipants.conversationId),
192+
)
193+
.where(eq(conversationParticipants.userId, userId))
194+
.orderBy(desc(conversations.updatedAt))
195+
.limit(1);
196+
197+
if (userConversation.length === 0) {
198+
notifyLogger.warn("No conversation found for user", { userId });
199+
return { success: false, message: "No conversation found" };
200+
}
201+
202+
const conversationId = userConversation[0].conversationId;
203+
204+
const interactionAgent = await db.query.agents.findFirst({
205+
where: eq(agents.conversationId, conversationId),
206+
});
207+
208+
if (!interactionAgent) {
209+
notifyLogger.warn("No interaction agent found for conversation", {
210+
conversationId,
211+
});
212+
return { success: false, message: "No interaction agent found" };
213+
}
214+
215+
const taskDescription = `[EMAIL NOTIFICATION] New important email received:
216+
From: ${emailInfo.from || "Unknown"}
217+
Subject: ${emailInfo.subject || "(no subject)"}
218+
Preview: ${emailInfo.snippet}
219+
220+
Notify the user about this email. Keep it brief but informative.`;
221+
222+
const executionAgentId = env.EXECUTION_AGENT.idFromName(interactionAgent.id);
223+
const executionAgent = env.EXECUTION_AGENT.get(executionAgentId);
224+
225+
try {
226+
await executionAgent.executeTask({
227+
agentId: interactionAgent.id,
228+
conversationId,
229+
taskDescription,
230+
userId,
231+
});
232+
233+
notifyLogger.info("Email notification task dispatched", {
234+
userId,
235+
from: emailInfo.from,
236+
subject: emailInfo.subject,
237+
});
238+
239+
return { success: true, message: "User notified" };
240+
} catch (error) {
241+
notifyLogger.error("Failed to dispatch email notification task", {
242+
error: error instanceof Error ? error.message : String(error),
243+
userId,
244+
});
245+
return { success: false, message: "Failed to notify user" };
246+
}
247+
};

0 commit comments

Comments
 (0)