Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 126 additions & 21 deletions hub/agents/python/email/gaia_agent_email/api_routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@

from __future__ import annotations

import asyncio
import hashlib
import hmac
import re
import secrets
import threading
from typing import List, Optional
from typing import Any, List, Optional

from fastapi import APIRouter, HTTPException
from fastapi.responses import HTMLResponse
Expand All @@ -65,6 +66,8 @@
SingleEmailInput,
ThreadInput,
)
from gaia_agent_email.tools.llm_triage import LLMTriageError
from gaia_agent_email.tools.summarize_tools import EmailSummarizeError
from gaia_agent_email.tools.triage_heuristics import classify_category_heuristic
from gaia.logger import get_logger

Expand Down Expand Up @@ -121,32 +124,67 @@ def _split_sentences(text: str) -> List[str]:

class EmailTriageService:
"""Convert contract inputs (or raw Gmail-API messages) into a contract
:class:`EmailTriageResult` deterministically.
:class:`EmailTriageResult`.

No LLM is invoked: category comes from the agent's heuristic
categorizer, and the summary / action items / draft proposal are derived
from the message text with explicit rules. This keeps the REST surface
fast, offline-testable, and aligned with the agent's pre-LLM fast path.
Triage always uses the local Lemonade LLM. High-confidence heuristic
signals (spam, promotions) skip the LLM classify call as an internal
optimisation, but the LLM is always used for summaries and for any
message the heuristic cannot confidently classify.
"""

# -- Public: contract path ---------------------------------------------

def triage_request(self, request: EmailTriageRequest) -> EmailTriageResponse:
"""Triage a contract request envelope into a contract response."""
def triage_request(
self,
request: EmailTriageRequest,
chat: Optional[Any] = None,
) -> EmailTriageResponse:
"""Triage a contract request envelope into a contract response.

Args:
request: The frozen #1262 contract request.
chat: Pre-built chat client. When None a local Lemonade client
is constructed via :meth:`_build_llm_chat`.
"""
payload = request.payload
resolved_chat = chat or self._build_llm_chat()
if isinstance(payload, SingleEmailInput):
result = self._triage_single(payload)
kind = "single"
result = self._triage_single_llm(payload, resolved_chat)
elif isinstance(payload, ThreadInput):
result = self._triage_thread(payload)
kind = "thread"
result = self._triage_thread_llm(payload, resolved_chat)
else: # pragma: no cover - discriminated union guarantees one of the two
raise HTTPException(
status_code=422,
detail=f"Unsupported payload kind: {getattr(payload, 'kind', '?')!r}",
)
return EmailTriageResponse(request_kind=kind, result=result)

def _build_llm_chat(self, base_url: Optional[str] = None) -> Any:
"""Build a local Lemonade chat client for LLM triage/summarise.

Validates the AC3 local-only contract before constructing the client.
Raises ``ConfigurationError`` loudly when ``base_url`` points at a
cloud LLM — no silent fallback to heuristic.
"""
from gaia_agent_email.config import EmailAgentConfig
from gaia.chat.sdk import AgentConfig, AgentSDK

cfg = EmailAgentConfig(base_url=base_url)
cfg.validate()

sdk_cfg = AgentConfig(
base_url=base_url,
use_local_llm=True,
use_claude=False,
use_chatgpt=False,
# Output cap (not input); context window governs what fits.
# 4096 gives Gemma-4-E4B room for its reasoning chain + JSON.
max_tokens=4096,
)
return AgentSDK(sdk_cfg)

def triage_gmail_message(
self, msg: dict, *, principal_email: str
) -> EmailTriageResult:
Expand Down Expand Up @@ -176,37 +214,97 @@ def triage_gmail_message(
reply_to=_parse_address(sender),
)

# -- Internal -----------------------------------------------------------

def _triage_single(self, payload: SingleEmailInput) -> EmailTriageResult:
def _triage_single_llm(
self, payload: SingleEmailInput, chat: Any
) -> EmailTriageResult:
msg = payload.message
return self._build_result(
return self._build_result_llm(
subject=msg.subject,
sender_raw=_format_address(msg.from_),
body=msg.body,
label_ids=[],
principal=payload.principal,
reply_to=msg.from_,
chat=chat,
message_id=msg.message_id,
)

def _triage_thread(self, payload: ThreadInput) -> EmailTriageResult:
# Summarize the whole thread; categorize on the LAST inbound message
# (the one awaiting the principal's attention), reply to its sender.
def _triage_thread_llm(self, payload: ThreadInput, chat: Any) -> EmailTriageResult:
messages: List[EmailMessage] = payload.messages
last = messages[-1]
# Join newest-first so the model sees the most recent context first.
combined_body = "\n\n".join(
f"{_format_address(m.from_)}: {m.body}" for m in messages
f"{_format_address(m.from_)}: {m.body}" for m in reversed(messages)
)
result = self._build_result(
return self._build_result_llm(
subject=last.subject,
sender_raw=_format_address(last.from_),
body=combined_body,
label_ids=[],
principal=payload.principal,
reply_to=last.from_,
summary_prefix=f"Thread of {len(messages)} messages. ",
chat=chat,
message_id=payload.thread_id,
)

def _build_result_llm(
self,
*,
subject: str,
sender_raw: str,
body: str,
label_ids: List[str],
principal: EmailAddress,
reply_to: Optional[EmailAddress],
summary_prefix: str = "",
chat: Any,
message_id: Optional[str] = None,
) -> EmailTriageResult:
"""Build a result using LLM escalation when heuristic confidence is low."""
from gaia_agent_email.tools.llm_triage import classify_email_llm
from gaia_agent_email.tools.summarize_tools import summarize_email_llm

heuristic = classify_category_heuristic(
subject=subject, sender=sender_raw, label_ids=label_ids
)

if heuristic.confident:
category = EmailCategory(heuristic.category)
else:
llm_result = classify_email_llm(
chat,
subject=subject,
sender=sender_raw,
body=body,
)
category = EmailCategory(llm_result["category"])

llm_summary = summarize_email_llm(
chat,
subject=subject,
sender=sender_raw,
body=body,
)
summary = summary_prefix + llm_summary

action_items = self._extract_action_items(body)
draft = self._build_draft(
subject=subject,
reply_to=reply_to,
principal=principal,
is_spam=heuristic.is_spam,
is_phishing=heuristic.is_phishing,
)
return EmailTriageResult(
category=category,
is_spam=heuristic.is_spam,
is_phishing=heuristic.is_phishing,
summary=summary,
action_items=action_items,
draft=draft,
message_id=message_id,
)
return result

def _build_result(
self,
Expand All @@ -218,6 +316,7 @@ def _build_result(
principal: EmailAddress,
reply_to: Optional[EmailAddress],
summary_prefix: str = "",
message_id: Optional[str] = None,
) -> EmailTriageResult:
heuristic = classify_category_heuristic(
subject=subject, sender=sender_raw, label_ids=label_ids
Expand All @@ -239,6 +338,7 @@ def _build_result(
summary=summary,
action_items=action_items,
draft=draft,
message_id=message_id,
)

def _summarize(self, subject: str, body: str) -> str:
Expand Down Expand Up @@ -513,7 +613,12 @@ async def triage_email(request: EmailTriageRequest) -> EmailTriageResponse:
summary, extracted action items, and an optional draft-reply proposal.
No mail is read or sent; this analyzes only the payload in the request.
"""
return _service.triage_request(request)
try:
return await asyncio.to_thread(_service.triage_request, request)
except (LLMTriageError, EmailSummarizeError) as e:
raise HTTPException(
status_code=502, detail=f"local LLM triage failed: {e}"
) from e


@router.post("/draft", response_model=EmailDraftResponse)
Expand Down
8 changes: 8 additions & 0 deletions hub/agents/python/email/gaia_agent_email/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,14 @@ class EmailTriageResult(_Strict):
draft: Optional[DraftReply] = Field(
default=None, description="Proposed reply, or null when none is suggested."
)
message_id: Optional[str] = Field(
default=None,
description=(
"Echoes the provider message-id from the request (SingleEmailInput.message "
"or ThreadInput.thread_id). Null when the result was produced from a "
"raw Gmail-API message (no contract message_id available)."
),
)


class EmailTriageResponse(_Strict):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,6 @@
)

_CATEGORY_BY_LOWER = {c.lower(): c for c in ALL_CATEGORIES}
# Cap body characters sent to the classifier — enough signal for a category
# decision without unbounded prompt growth on long threads.
_BODY_CHAR_LIMIT = 4000


class LLMTriageError(RuntimeError):
Expand All @@ -116,12 +113,11 @@ def _build_user_prompt(subject: str, sender: str, body: str) -> str:
# delimiters the system prompt is trained to treat as data.
from gaia_agent_email.tools.read_tools import wrap_untrusted_body

clipped = (body or "").strip()[:_BODY_CHAR_LIMIT]
return (
f"Classify this email.\n\n"
f"Subject: {subject}\n"
f"From: {sender}\n"
f"Body:\n{wrap_untrusted_body(clipped)}\n"
f"Body:\n{wrap_untrusted_body((body or '').strip())}\n"
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@

from gaia.agents.base.tools import tool
from gaia_agent_email.gmail_backend import decode_message_body
from gaia_agent_email.tools.read_tools import (
DEFAULT_BODY_LIMIT_CHARS,
wrap_untrusted_body,
)
from gaia_agent_email.tools.read_tools import wrap_untrusted_body
from gaia_agent_email.verbose import log_tool_call
from gaia.connectors.errors import ConnectorsError
from gaia.logger import get_logger
Expand Down Expand Up @@ -96,12 +93,11 @@ def _envelope_err(message: str) -> str:


def _build_user_prompt(subject: str, sender: str, body: str) -> str:
clipped = (body or "").strip()[:DEFAULT_BODY_LIMIT_CHARS]
return (
"Summarize this email.\n\n"
f"Subject: {subject}\n"
f"From: {sender}\n"
f"Body:\n{wrap_untrusted_body(clipped)}\n"
f"Body:\n{wrap_untrusted_body((body or '').strip())}\n"
)


Expand Down
Loading
Loading