-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathapp.py
More file actions
353 lines (298 loc) · 13.4 KB
/
app.py
File metadata and controls
353 lines (298 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
"""
Agent Patterns Demo Pack — Unified Web Launcher.
Serves a single-page app where users select a demo from cards,
then view the live visualization dashboard for the running demo.
"""
import asyncio
import importlib
import json
import os
import sys
import threading
import traceback
from pathlib import Path
import html as html_mod
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Request
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, JSONResponse
import uvicorn
from shared.events import EventBus
from shared.runtime.model_config import get_model_config
# -- Startup sanity check --
try:
import agent_framework.orchestrations # noqa: F401
except ModuleNotFoundError:
_venv = Path(__file__).parent / ".venv"
print(
"\n ERROR: 'agent-framework-orchestrations' package not found.\n"
" You are likely running with the system Python instead of the venv.\n\n"
f" Activate the virtual-env first:\n"
f" .venv\\Scripts\\activate (Windows)\n"
f" source .venv/bin/activate (macOS/Linux)\n\n"
f" Or run directly:\n"
f" .venv\\Scripts\\python app.py\n"
)
sys.exit(1)
DEMOS_DIR = Path(__file__).parent / "demos"
STATIC_DIR = Path(__file__).parent / "shared" / "ui" / "static"
# Registry of available demos
DEMO_REGISTRY = [
{
"id": "maker_checker",
"title": "Maker-Checker PR Review",
"pattern": "sequential",
"description": "Worker drafts a PR review, Reviewer approves or requests changes. Iterates up to 3 rounds.",
"agents": ["Worker", "Reviewer"],
"module": "demos.maker_checker.run",
"suggested_prompt": "Review this PR diff:\n```python\ndef calculate_total(items):\n total = 0\n for item in items:\n total += item['price'] * item['qty']\n return total\n```\nThe function calculates order total. Review for correctness, edge cases, and improvements.",
},
{
"id": "hierarchical_research",
"title": "Hierarchical Research Brief",
"pattern": "concurrent + sequential",
"description": "Manager decomposes a topic, specialists research in parallel, synthesizer merges findings.",
"agents": ["Manager", "Specialist_A", "Specialist_B", "Synthesizer"],
"module": "demos.hierarchical_research.run",
"suggested_prompt": "The potential of on-device AI models (like Foundry Local) for enterprise applications",
},
{
"id": "handoff_support",
"title": "Hand-off Customer Support",
"pattern": "handoff",
"description": "Triage agent classifies a customer query, then hands off to Billing or TechSupport.",
"agents": ["Triage", "Billing", "TechSupport"],
"module": "demos.handoff_support.run",
"suggested_prompt": "I was charged twice for my subscription last month and I can't access the admin dashboard. Can you help?",
},
{
"id": "network_brainstorm",
"title": "Network Brainstorm",
"pattern": "group chat",
"description": "Four peers collaborate in a shared conversation: Innovator, Pragmatist, Devil's Advocate, Synthesizer.",
"agents": ["Innovator", "Pragmatist", "DevilsAdvocate", "Synthesizer"],
"module": "demos.network_brainstorm.run",
"suggested_prompt": "How should a mid-size SaaS company adopt on-device AI (like Foundry Local) to improve their product? Consider privacy, latency, cost, and user experience.",
},
{
"id": "supervisor_router",
"title": "Supervisor Router",
"pattern": "handoff",
"description": "Supervisor classifies the task and transfers to the matching specialist (Code, Data, or Docs) using HandoffBuilder autonomous mode.",
"agents": ["Supervisor", "CodeExpert", "DataExpert", "DocExpert"],
"module": "demos.supervisor_router.run",
"suggested_prompt": "Write a Python function that reads a CSV file, groups rows by category, and returns the top 3 categories by total revenue. Include docstrings.",
},
{
"id": "swarm_auditor",
"title": "Swarm + Auditor",
"pattern": "concurrent + sequential",
"description": "Three generators brainstorm in parallel (ConcurrentBuilder), Auditor scores proposals, Selector picks the winner.",
"agents": ["Generator_A", "Generator_B", "Generator_C", "Auditor", "Selector"],
"module": "demos.swarm_auditor.run",
"suggested_prompt": "Our mid-size SaaS company wants to reduce cloud infrastructure costs by 40% while maintaining 99.9% uptime and improving developer velocity. What should we do?",
},
{
"id": "magentic_one",
"title": "Magentic One Assessment",
"pattern": "magentic one",
"description": "MagenticManager intelligently sequences Researcher, Strategist, and Critic to produce a composite feasibility assessment.",
"agents": ["MagenticManager", "Researcher", "Strategist", "Critic"],
"module": "demos.magentic_one.run",
"suggested_prompt": "Assess the feasibility of adopting on-device AI models (like Foundry Local) for a mid-size enterprise. Cover: current state of the technology, a recommended adoption strategy, and the top risks to watch out for.",
},
]
# Global state for the currently running demo
_current_demo_id: str | None = None
_current_event_bus: EventBus | None = None
_current_topology: dict | None = None
_demo_thread: threading.Thread | None = None
def _load_topology(demo_id: str) -> dict:
topo_path = DEMOS_DIR / demo_id / "topology.json"
if topo_path.exists():
return json.loads(topo_path.read_text(encoding="utf-8"))
return {}
def _run_demo_in_thread(demo_id: str, event_bus: EventBus, prompt: str | None = None):
"""Import and run the demo's run_demo coroutine."""
demo_info = next((d for d in DEMO_REGISTRY if d["id"] == demo_id), None)
if not demo_info:
return
try:
mod = importlib.import_module(demo_info["module"])
asyncio.run(mod.run_demo(event_bus, input_text=prompt))
except Exception:
tb = traceback.format_exc()
event_bus.emit("error", {
"agent": "System",
"error": tb,
"message": f"Demo crashed: {tb.splitlines()[-1]}",
})
app = FastAPI(title="Agent Patterns Demo Pack")
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
@app.get("/", response_class=HTMLResponse)
async def launcher_page():
return (Path(__file__).parent / "shared" / "ui" / "static" / "launcher.html").read_text(encoding="utf-8")
@app.get("/demo/{demo_id}", response_class=HTMLResponse)
async def demo_page(demo_id: str):
# Validate demo_id against known demos to prevent path traversal
demo_info = next((d for d in DEMO_REGISTRY if d["id"] == demo_id), None)
if not demo_info:
return HTMLResponse("<h1>Demo not found</h1>", status_code=404)
template = (STATIC_DIR / "dashboard.html").read_text(encoding="utf-8")
# Inject suggested prompt so the textarea is pre-filled on load
suggested = html_mod.escape(demo_info.get("suggested_prompt", ""))
template = template.replace(
'id="prompt-input" placeholder=',
f'id="prompt-input" placeholder=',
)
template = template.replace(
'</textarea>\n <button id="btn-send"',
f'{suggested}</textarea>\n <button id="btn-send"',
)
return HTMLResponse(template)
@app.get("/api/demos")
async def list_demos():
return JSONResponse(DEMO_REGISTRY)
@app.get("/api/topology")
async def get_topology():
return JSONResponse(_current_topology or {})
@app.get("/api/events")
async def get_events():
if _current_event_bus:
return JSONResponse(_current_event_bus.get_events())
return JSONResponse([])
@app.get("/api/status")
async def get_status():
running = _demo_thread is not None and _demo_thread.is_alive()
return JSONResponse({
"demo_id": _current_demo_id,
"running": running,
})
@app.post("/api/run/{demo_id}")
async def run_demo_endpoint(demo_id: str, body: dict | None = None):
global _current_demo_id, _current_event_bus, _current_topology, _demo_thread
# Validate demo exists
demo_info = next((d for d in DEMO_REGISTRY if d["id"] == demo_id), None)
if not demo_info:
return JSONResponse({"error": "Unknown demo"}, status_code=404)
# Extract optional prompt from request body
prompt = None
if body and isinstance(body.get("prompt"), str):
prompt = body["prompt"].strip() or None
# If same demo is already running, return current status
if _current_demo_id == demo_id and _demo_thread and _demo_thread.is_alive():
return JSONResponse({"status": "already_running", "demo_id": demo_id})
# Stop previous demo (event bus clear)
if _current_event_bus:
_current_event_bus.clear()
# Set up new demo
log_dir = str(DEMOS_DIR / demo_id / "runs")
_current_event_bus = EventBus(log_dir=log_dir)
_current_topology = _load_topology(demo_id)
_current_demo_id = demo_id
# Re-register existing WebSocket clients
for ws in list(_ws_clients):
_current_event_bus.register_ws(ws)
# Start demo in background thread
_demo_thread = threading.Thread(
target=_run_demo_in_thread,
args=(demo_id, _current_event_bus, prompt),
daemon=True,
)
_demo_thread.start()
return JSONResponse({"status": "started", "demo_id": demo_id})
@app.post("/api/replay")
async def load_replay(body: dict):
path = body.get("path", "")
if not path:
return JSONResponse({"error": "Path required"}, status_code=400)
# Restrict replay files to the demos/*/runs/ directories
replay_path = Path(path).resolve()
allowed_root = DEMOS_DIR.resolve()
if not replay_path.is_relative_to(allowed_root):
return JSONResponse({"error": "Access denied"}, status_code=403)
if not replay_path.exists() or not replay_path.suffix == ".jsonl":
return JSONResponse({"error": "File not found"}, status_code=404)
events = EventBus.load_replay(str(replay_path))
return JSONResponse(events)
@app.post("/api/stop")
async def stop_demo():
global _current_demo_id, _current_event_bus, _current_topology, _demo_thread
_current_demo_id = None
if _current_event_bus:
_current_event_bus.clear()
_current_event_bus = None
_current_topology = None
_demo_thread = None
return JSONResponse({"status": "stopped"})
@app.get("/api/model-config")
async def get_model_config_endpoint():
return JSONResponse(get_model_config().to_dict())
@app.get("/api/models/local")
async def list_local_models_endpoint():
"""Return Foundry Local catalog models with live status (loaded/cached/catalog)."""
from shared.runtime.model_config import _list_local_models_detailed
return JSONResponse({
"models": _list_local_models_detailed(),
"selected": get_model_config().local_model,
})
@app.get("/api/models/azure")
async def list_azure_models_endpoint():
"""List model deployments from the configured Microsoft Foundry endpoint."""
import httpx
from shared.runtime.foundry_client import _normalize_azure_base_url
cfg = get_model_config()
if not cfg.azure_endpoint or not cfg.azure_api_key:
return JSONResponse({"models": [], "selected": cfg.azure_model,
"error": "Azure Foundry endpoint/key not configured"})
try:
base_url = _normalize_azure_base_url(cfg.azure_endpoint)
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
f"{base_url}/models",
headers={"api-key": cfg.azure_api_key, "Authorization": f"Bearer {cfg.azure_api_key}"},
)
if resp.status_code == 200:
data = resp.json()
models = [
{"id": m.get("id", ""), "owned_by": m.get("owned_by", "")}
for m in data.get("data", [])
]
return JSONResponse({"models": models,
"selected": cfg.azure_deployment or cfg.azure_model})
return JSONResponse({"models": [], "selected": cfg.azure_model,
"error": f"HTTP {resp.status_code}"})
except Exception as e:
return JSONResponse({"models": [], "selected": cfg.azure_model, "error": str(e)})
@app.post("/api/model-config")
async def update_model_config_endpoint(request: Request):
try:
data = await request.json()
except Exception:
return JSONResponse({"error": "Invalid JSON"}, status_code=400)
try:
get_model_config().update(data)
except ValueError as e:
return JSONResponse({"error": str(e)}, status_code=400)
return JSONResponse(get_model_config().to_dict())
# WebSocket management
_ws_clients: set = set()
@app.websocket("/ws")
async def websocket_endpoint(ws: WebSocket):
await ws.accept()
_ws_clients.add(ws)
if _current_event_bus:
_current_event_bus.register_ws(ws)
try:
while True:
await ws.receive_text()
except WebSocketDisconnect:
pass
finally:
_ws_clients.discard(ws)
if _current_event_bus:
_current_event_bus.unregister_ws(ws)
if __name__ == "__main__":
port = int(os.getenv("UI_PORT", "8765"))
host = os.getenv("HOST", "127.0.0.1")
print(f"\n Agent Patterns Demo Pack: http://localhost:{port}\n")
uvicorn.run(app, host=host, port=port, log_level="warning")