-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathProgram.cs
More file actions
186 lines (157 loc) · 8.62 KB
/
Copy pathProgram.cs
File metadata and controls
186 lines (157 loc) · 8.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
// MultiAgentRouting — demonstrates workflow-based routing (durable routing decision via
// activity) and parallel agent fan-out via ExecuteAgentsInParallelAsync, with OTel tracing.
//
// Run: dotnet run --project samples/MAF/MultiAgentRouting/MultiAgentRouting.csproj
using System.ClientModel;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MultiAgentRouting;
using OpenAI;
using OpenTelemetry;
using OpenTelemetry.Trace;
using Temporalio.Client;
using Temporalio.Extensions.Agents;
using Temporalio.Extensions.Hosting;
using Temporalio.Extensions.OpenTelemetry;
// ── Step 1: Configure OpenTelemetry ──────────────────────────────────────────
// Register all four activity sources:
// • TracingInterceptor.ClientSource — client outbound spans
// • TracingInterceptor.WorkflowsSource — workflow inbound/outbound spans
// • TracingInterceptor.ActivitiesSource — activity inbound spans
// • TemporalAgentTelemetry.ActivitySourceName — agent turn + client send spans
using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource(TracingInterceptor.ClientSource.Name)
.AddSource(TracingInterceptor.WorkflowsSource.Name)
.AddSource(TracingInterceptor.ActivitiesSource.Name)
.AddSource(TemporalAgentTelemetry.ActivitySourceName)
.AddConsoleExporter()
.Build();
// ── Step 2: Build the application host ───────────────────────────────────────
var builder = Host.CreateApplicationBuilder(args);
builder.Logging.SetMinimumLevel(LogLevel.Warning);
// ── Step 3: Load configuration ───────────────────────────────────────────────
var apiKey = builder.Configuration.GetValue<string>("OPENAI_API_KEY");
var apiBaseUrl = builder.Configuration.GetValue<string>("OPENAI_API_BASE_URL");
if (string.IsNullOrEmpty(apiKey))
throw new InvalidOperationException(
"OPENAI_API_KEY is not configured. Set it with: " +
"dotnet user-secrets set \"OPENAI_API_KEY\" \"sk-...\" --project samples/MAF/MultiAgentRouting");
if (string.IsNullOrEmpty(apiBaseUrl))
throw new InvalidOperationException("OPENAI_API_BASE_URL is not configured in appsettings.json.");
const string model = "gpt-4o-mini";
var temporalAddress = builder.Configuration.GetValue<string>("TEMPORAL_ADDRESS") ?? "localhost:7233";
var openAiClient = new OpenAIClient(
new ApiKeyCredential(apiKey),
new OpenAIClientOptions { Endpoint = new Uri(apiBaseUrl) });
// ── Step 4: Register the IChatClient in DI ───────────────────────────────────
builder.Services.AddChatClient(openAiClient.GetChatClient(model).AsIChatClient());
// ── Step 5: Register ITemporalClient with TracingInterceptor ─────────────────
// The TracingInterceptor propagates OTel context across Temporal calls.
builder.Services.AddTemporalClient(opts =>
{
opts.TargetHost = temporalAddress;
opts.Namespace = "default";
opts.Interceptors = [new TracingInterceptor()];
});
// ── Step 6: Register the hosted worker with all three specialist agents ──────
builder.Services
.AddHostedTemporalWorker("agents")
.AddTemporalAgents(opts =>
{
opts.AddDurableAgent("WeatherAgent", agent =>
{
agent.Instructions =
"You are a weather specialist. Answer questions about weather conditions, forecasts, " +
"climate patterns, and meteorological phenomena. Keep responses concise and informative.";
agent.ChatClient = sp => sp.GetRequiredService<IChatClient>();
agent.TimeToLive = TimeSpan.FromHours(1); // shortened for demo; production default is 14 days
});
opts.AddDurableAgent("BillingAgent", agent =>
{
agent.Instructions =
"You are a billing and payments specialist. Answer questions about invoices, charges, " +
"payment methods, refunds, and account billing. Keep responses concise and informative.";
agent.ChatClient = sp => sp.GetRequiredService<IChatClient>();
agent.TimeToLive = TimeSpan.FromHours(1); // shortened for demo; production default is 14 days
});
opts.AddDurableAgent("TechSupportAgent", agent =>
{
agent.Instructions =
"You are a technical support specialist. Answer questions about software issues, " +
"hardware problems, troubleshooting steps, and technical configurations. " +
"Keep responses concise and informative.";
agent.ChatClient = sp => sp.GetRequiredService<IChatClient>();
agent.TimeToLive = TimeSpan.FromHours(1); // shortened for demo; production default is 14 days
});
})
.AddWorkflow<RoutingWorkflow>()
.AddWorkflow<ParallelAgentWorkflow>()
// Singleton: RoutingActivities is stateless (keyword scoring only), so one shared
// instance is safe and avoids unnecessary allocations on every activity execution.
.AddSingletonActivities<RoutingActivities>();
// ── Step 7: Start the host ───────────────────────────────────────────────────
var host = builder.Build();
await host.StartAsync();
Console.WriteLine("Worker started.\n");
// ── Step 8: Demonstrate workflow-based routing ───────────────────────────────
// Each question is dispatched to a RoutingWorkflow. The routing decision runs
// inside a RoutingActivities.ClassifyRequest activity — it is recorded in the
// workflow event history and is fully durable.
var client = host.Services.GetRequiredService<ITemporalClient>();
Console.WriteLine("── Demonstrating workflow-based routing ────────────────────");
var routingExamples = new[]
{
(Id: "session-weather-001", Question: "Will it rain in Seattle tomorrow?"),
(Id: "session-billing-001", Question: "Why was I charged twice on my last invoice?"),
(Id: "session-tech-001", Question: "My application keeps crashing with a null reference exception."),
};
foreach (var (sessionId, question) in routingExamples)
{
Console.WriteLine($"\nUser: {question}");
var workflowId = $"routing-{sessionId}-{Guid.NewGuid():N}";
var handle = await client.StartWorkflowAsync(
(RoutingWorkflow wf) => wf.RunAsync(question),
new WorkflowOptions { Id = workflowId, TaskQueue = "agents" });
try
{
var result = await handle.GetResultAsync();
Console.WriteLine($"Agent: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"Workflow failed: {ex.Message}");
}
}
// ── Step 9: Demonstrate parallel execution ───────────────────────────────────
Console.WriteLine("\n── Demonstrating parallel agent execution ──────────────────");
var parallelQuery = "Briefly introduce yourself and what you can help with.";
Console.WriteLine($"\nFan-out query (sent to all 3 agents simultaneously): \"{parallelQuery}\"\n");
var parallelWorkflowId = $"multi-agent-parallel-{Guid.NewGuid():N}";
var parallelHandle = await client.StartWorkflowAsync(
(ParallelAgentWorkflow wf) => wf.RunAsync(parallelQuery),
new WorkflowOptions { Id = parallelWorkflowId, TaskQueue = "agents" });
Console.WriteLine($"Parallel workflow started: {parallelWorkflowId}");
string[] parallelResults;
try
{
parallelResults = await parallelHandle.GetResultAsync();
}
catch (Exception ex)
{
Console.WriteLine($"Parallel workflow failed: {ex.Message}\n");
try { await host.StopAsync(); } catch (OperationCanceledException) { }
return;
}
Console.WriteLine("\nParallel responses:");
var agentNames = new[] { "WeatherAgent", "BillingAgent", "TechSupportAgent" };
for (var i = 0; i < parallelResults.Length; i++)
{
Console.WriteLine($"\n[{agentNames[i]}]: {parallelResults[i]}");
}
// ── Step 10: Graceful shutdown ───────────────────────────────────────────────
try { await host.StopAsync(); } catch (OperationCanceledException) { }
tracerProvider?.ForceFlush();
Console.WriteLine("\nDone.");