-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathmain.py
More file actions
91 lines (72 loc) · 2.82 KB
/
main.py
File metadata and controls
91 lines (72 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# Copyright (c) Microsoft. All rights reserved.
# On-Call Copilot - Multi-Agent Hosted Agent
import os
import sys
from pathlib import Path
from agent_framework import Agent
from agent_framework_foundry import FoundryChatClient
from agent_framework_foundry_hosting import ResponsesHostServer
from agent_framework_orchestrations import ConcurrentBuilder
from azure.identity import DefaultAzureCredential
from dotenv import load_dotenv
from app.agents.comms import COMMS_INSTRUCTIONS
from app.agents.pir import PIR_INSTRUCTIONS
from app.agents.summary import SUMMARY_INSTRUCTIONS
from app.agents.triage import TRIAGE_INSTRUCTIONS
load_dotenv(Path(__file__).resolve().parent / ".env")
def _presence(name: str) -> str:
return "set" if os.environ.get(name) else "unset"
print(f"[oncall-copilot] Starting... Python {sys.version}", flush=True)
print(f"[oncall-copilot] AZURE_AI_PROJECT_ENDPOINT={_presence('AZURE_AI_PROJECT_ENDPOINT')}", flush=True)
print(f"[oncall-copilot] AZURE_MODEL_PROJECT_ENDPOINT={_presence('AZURE_MODEL_PROJECT_ENDPOINT')}", flush=True)
print(f"[oncall-copilot] AZURE_OPENAI_CHAT_DEPLOYMENT_NAME={_presence('AZURE_OPENAI_CHAT_DEPLOYMENT_NAME')}", flush=True)
_credential = DefaultAzureCredential()
def create_workflow():
"""Create 4 specialist agents and wire them into a concurrent workflow."""
project_endpoint = os.environ.get("AZURE_MODEL_PROJECT_ENDPOINT") or os.environ["AZURE_AI_PROJECT_ENDPOINT"]
model = os.environ.get("AZURE_OPENAI_CHAT_DEPLOYMENT_NAME", "model-router")
chat_client = FoundryChatClient(
project_endpoint=project_endpoint,
model=model,
credential=_credential,
)
triage = Agent(
client=chat_client,
instructions=TRIAGE_INSTRUCTIONS,
name="triage-agent",
)
summary = Agent(
client=chat_client,
instructions=SUMMARY_INSTRUCTIONS,
name="summary-agent",
)
comms = Agent(
client=chat_client,
instructions=COMMS_INSTRUCTIONS,
name="comms-agent",
)
pir = Agent(
client=chat_client,
instructions=PIR_INSTRUCTIONS,
name="pir-agent",
)
return ConcurrentBuilder(
participants=[triage, summary, comms, pir],
intermediate_outputs=False,
).build()
def main():
print("[oncall-copilot] Building workflow...", flush=True)
workflow_agent = create_workflow().as_agent(
name="oncall-copilot",
description="Runs triage, summary, comms, and PIR agents for incident response.",
)
print("[oncall-copilot] Starting server on port 8088...", flush=True)
ResponsesHostServer(workflow_agent).run(port=8088)
if __name__ == "__main__":
try:
main()
except Exception as e:
print(f"[oncall-copilot] FATAL: {e}", flush=True)
import traceback
traceback.print_exc()
sys.exit(1)