This guide covers Google ADK's workflow agent patterns and how they are used in InfraAlert.
In Google's ADK, a Workflow Agent doesn't perform tasks itself but orchestrates other agents, called sub-agents. This allows for modular design, breaking down complex problems into specialized components.
ADK provides three built-in workflow types:
| Workflow Type | Execution Pattern | Use Case |
|---|---|---|
| SequentialAgent | Step-by-step | Pipelines where each step depends on the previous |
| ParallelAgent | Concurrent execution | Independent tasks that can run simultaneously |
| LoopAgent | Repeated execution | Polling, retries, monitoring until condition met |
Executes sub-agents in order, one after another. Each agent can access session state set by previous agents.
from google.adk.agents import SequentialAgent
from agents.workflow_agents import create_sequential_workflow
# Using factory function
workflow = create_sequential_workflow(
name="issue_pipeline",
agents=[
IssueDetectionAgent(name="detection"),
PriorityAnalysisAgent(name="priority"),
ResourceCoordinationAgent(name="coordination"),
],
description="Process infrastructure issues end-to-end",
)
# Or using ADK directly
workflow = SequentialAgent(
name="issue_pipeline",
sub_agents=[detection_agent, priority_agent, coordination_agent],
)When to use:
- Multi-step pipelines
- Data transformation chains
- Workflows where order matters
- Each step depends on previous output
InfraAlert Example: The main issue processing workflow:
Input → Issue Detection → Priority Analysis → Resource Coordination → Output
Executes all sub-agents simultaneously. Results are aggregated after all agents complete.
from google.adk.agents import ParallelAgent
from agents.workflow_agents import create_parallel_workflow
# Using factory function
workflow = create_parallel_workflow(
name="notifications",
agents=[
SmsNotificationAgent(name="sms"),
EmailNotificationAgent(name="email"),
DashboardUpdateAgent(name="dashboard"),
],
description="Send all notifications in parallel",
)
# Or using ADK directly
workflow = ParallelAgent(
name="notifications",
sub_agents=[sms_agent, email_agent, dashboard_agent],
)When to use:
- Independent tasks with no dependencies
- Notifications through multiple channels
- Batch processing
- Reducing overall latency
InfraAlert Example: Sending notifications:
Input → [SMS, Email, Dashboard, Government Portal] → Aggregated Results
(all execute simultaneously)
Repeatedly executes sub-agents until a condition is met or max iterations reached.
from google.adk.agents import LoopAgent
from agents.workflow_agents import create_loop_workflow
# Using factory function
workflow = create_loop_workflow(
name="status_monitor",
agents=[
CheckStatusAgent(name="check_status"),
UpdateDashboardAgent(name="update"),
],
max_iterations=10,
description="Monitor until issue resolved",
)
# Or using ADK directly
workflow = LoopAgent(
name="status_monitor",
sub_agents=[check_agent, update_agent],
max_iterations=10,
)When to use:
- Polling for status updates
- Retry logic with backoff
- Monitoring until resolution
- Iterative refinement
InfraAlert Example: Status monitoring:
┌────────────────────────────────────────┐
│ Start → Check Status → Update Dashboard│
│ ↑ │ │
│ └──────── (loop) ──────────┘ │
│ Until resolved or max=10 │
└────────────────────────────────────────┘
All workflow agents share session state, enabling data passing between sub-agents:
from agents.session_config import SessionKeys
# In IssueDetectionAgent (first in sequence)
async def _run_async_impl(self, ctx):
result = await self.detect_issue(ctx)
ctx.session.state[SessionKeys.ISSUE_DATA] = result
ctx.session.state[SessionKeys.ISSUE_ID] = result["issue_id"]
# In PriorityAnalysisAgent (second in sequence)
async def _run_async_impl(self, ctx):
# Access data from previous agent
issue_data = ctx.session.state.get(SessionKeys.ISSUE_DATA)
priority = await self.analyze_priority(issue_data)
ctx.session.state[SessionKeys.PRIORITY_SCORE] = priority["score"]| Key | Producer | Consumer |
|---|---|---|
ISSUE_DATA |
Issue Detection | Priority Analysis, Orchestrator |
ISSUE_ID |
Issue Detection | All downstream agents |
PRIORITY_SCORE |
Priority Analysis | Resource Coordination |
TEAM_ASSIGNMENT |
Resource Coordination | Platform Integration |
WORKFLOW_STATE |
Orchestrator | All agents |
You can nest workflow agents for complex scenarios:
# Nested workflow: Sequential with Parallel notifications
notification_workflow = ParallelAgent(
name="parallel_notifications",
sub_agents=[sms_agent, email_agent, dashboard_agent],
)
main_workflow = SequentialAgent(
name="complete_pipeline",
sub_agents=[
detection_agent,
priority_agent,
coordination_agent,
notification_workflow, # Parallel step in sequential workflow
],
)Workflow agents support before/after callbacks for cross-cutting concerns:
async def log_before(ctx):
"""Log before each sub-agent runs."""
logger.info(f"Starting agent: {ctx.agent.name}")
return None # Continue execution
async def log_after(ctx):
"""Log after each sub-agent completes."""
logger.info(f"Completed agent: {ctx.agent.name}")
return None
workflow = SequentialAgent(
name="monitored_workflow",
sub_agents=[agent1, agent2, agent3],
before_agent_callback=log_before,
after_agent_callback=log_after,
)The agents/workflow_agents.py module provides InfraAlert-specific workflow classes:
Pre-configured sequential workflow for issue processing:
from agents.workflow_agents import InfraAlertSequentialWorkflow
# Uses default agents if none provided
workflow = InfraAlertSequentialWorkflow()
# Or with custom agents
workflow = InfraAlertSequentialWorkflow(
name="custom_pipeline",
sub_agents=[custom_agent1, custom_agent2],
)Parallel workflow for concurrent operations:
from agents.workflow_agents import InfraAlertParallelWorkflow
workflow = InfraAlertParallelWorkflow(
name="batch_process",
sub_agents=[agent1, agent2, agent3],
)Loop workflow with configurable iterations:
from agents.workflow_agents import InfraAlertLoopWorkflow
workflow = InfraAlertLoopWorkflow(
name="retry_workflow",
sub_agents=[retry_agent],
max_iterations=5,
)Quick workflow creation:
from agents.workflow_agents import (
create_issue_processing_workflow,
create_notification_workflow,
create_monitoring_workflow,
)
# Standard issue processing
issue_workflow = create_issue_processing_workflow()
# Parallel notifications
notify_workflow = create_notification_workflow(
agents=[sms_agent, email_agent]
)
# Status monitoring loop
monitor_workflow = create_monitoring_workflow(
agents=[status_check_agent],
max_checks=10,
)# Run the sequential workflow
adk run agents.orchestrator.workflow_orchestrator:root_agent
# Run with web interface
adk web agents.orchestrator.workflow_orchestrator:root_agent --port 8085from google.adk.runner import Runner
from agents.orchestrator.workflow_orchestrator import root_agent
runner = Runner(agent=root_agent)
result = await runner.run(user_input="Report pothole on Main Street")from google.adk.runner import Runner
from agents.session_config import get_session_service
from agents.orchestrator.workflow_orchestrator import root_agent
session_service = get_session_service()
runner = Runner(agent=root_agent, session_service=session_service)
result = await runner.run(user_input="Report pothole")-
Use Session State for Data Passing
- Don't pass data through return values
- Use
SessionKeysfor consistent key names
-
Keep Sub-Agents Focused
- Each agent should do one thing well
- Let workflow agents handle coordination
-
Handle Failures Gracefully
- Use
after_agent_callbackfor error handling - Store workflow state for recovery
- Use
-
Choose the Right Pattern
- Sequential: Dependencies between steps
- Parallel: Independent operations
- Loop: Retries or polling
-
Limit Loop Iterations
- Always set
max_iterationsto prevent infinite loops - Use reasonable timeouts
- Always set
| Aspect | HTTP Orchestration | In-Process Workflows |
|---|---|---|
| Deployment | Separate Cloud Run services | Single process |
| Scaling | Independent per-agent | Shared resources |
| Communication | HTTP/REST | Direct method calls |
| Session State | Must serialize | Native sharing |
| Latency | Network overhead | Minimal |
| Use Case | Production microservices | Dev/testing, single-process |
InfraAlert uses HTTP orchestration in production (via OrchestratorAgent) and in-process workflows for local development and testing.
- AGENTS.md - Project conventions
- Session Config - Session management and SessionKeys
- Architecture - System architecture