Skip to content

Commit d678c04

Browse files
committed
Add Nexus support to ContextPropagation sample
1 parent ba4711b commit d678c04

13 files changed

Lines changed: 154 additions & 13 deletions

.editorconfig

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ dotnet_diagnostic.SA1513.severity = none
9898
dotnet_diagnostic.SA1515.severity = none
9999

100100
# Do not require XML doc in samples
101+
dotnet_diagnostic.CS1591.severity = none
101102
dotnet_diagnostic.SA1600.severity = none
102103
dotnet_diagnostic.SA1602.severity = none
103104

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ obj/
77
/.vs
88
/.vscode
99
/.idea
10+
.claude/

Directory.Build.props

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,8 @@
55
<Authors>Temporal</Authors>
66
<ContinuousIntegrationBuild Condition="'$(GITHUB_ACTIONS)' == 'true'">true</ContinuousIntegrationBuild>
77
<EnableNETAnalyzers>true</EnableNETAnalyzers>
8-
<!--
9-
TODO(cretz): Reenable when https://github.com/dotnet/format/issues/1800 fixed
108
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
11-
-->
9+
<GenerateDocumentationFile>true</GenerateDocumentationFile>
1210
<ImplicitUsings>enable</ImplicitUsings>
1311
<Nullable>enable</Nullable>
1412
<PackageLicenseExpression>MIT</PackageLicenseExpression>

src/ContextPropagation/ContextPropagationInterceptor.cs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
namespace TemporalioSamples.ContextPropagation;
22

33
using System.Threading.Tasks;
4+
using NexusRpc.Handlers;
45
using Temporalio.Api.Common.V1;
56
using Temporalio.Client;
67
using Temporalio.Client.Interceptors;
@@ -39,6 +40,10 @@ public WorkflowInboundInterceptor InterceptWorkflow(WorkflowInboundInterceptor n
3940
public ActivityInboundInterceptor InterceptActivity(ActivityInboundInterceptor nextInterceptor) =>
4041
new ContextPropagationActivityInboundInterceptor(this, nextInterceptor);
4142

43+
public NexusOperationInboundInterceptor InterceptNexusOperation(
44+
NexusOperationInboundInterceptor nextInterceptor) =>
45+
new ContextPropagationNexusOperationInboundInterceptor(this, nextInterceptor);
46+
4247
private Dictionary<string, Payload> HeaderFromContext(IDictionary<string, Payload>? existing)
4348
{
4449
var ret = existing != null ?
@@ -67,6 +72,28 @@ private TResult WithHeadersApplied<TResult>(
6772
return func();
6873
}
6974

75+
private Dictionary<string, string> HeaderFromContextForNexus(IDictionary<string, string>? existing)
76+
{
77+
var ret = existing != null ?
78+
new Dictionary<string, string>(existing) : new Dictionary<string, string>(1);
79+
// Nexus headers are string-based, so serialize context value to JSON.
80+
// Alternative approach: could use payload converter and put entire payload as JSON on header.
81+
ret[headerKey] = System.Text.Json.JsonSerializer.Serialize(context.Value);
82+
return ret;
83+
}
84+
85+
private Task<TResult> WithHeadersAppliedForNexusAsync<TResult>(
86+
IReadOnlyDictionary<string, string>? headers, Func<Task<TResult>> func)
87+
{
88+
if (headers?.TryGetValue(headerKey, out var value) == true)
89+
{
90+
// Deserialize can return null for nullable types, which is expected
91+
context.Value = System.Text.Json.JsonSerializer.Deserialize<T>(value)!;
92+
}
93+
// These are async local, no need to unapply afterwards
94+
return func();
95+
}
96+
7097
private class ContextPropagationClientOutboundInterceptor : ClientOutboundInterceptor
7198
{
7299
private readonly ContextPropagationInterceptor<T> root;
@@ -153,6 +180,11 @@ public override Task<ChildWorkflowHandle<TWorkflow, TResult>> StartChildWorkflow
153180
StartChildWorkflowInput input) =>
154181
Next.StartChildWorkflowAsync<TWorkflow, TResult>(
155182
input with { Headers = root.HeaderFromContext(input.Headers) });
183+
184+
public override Task<NexusOperationHandle<TResult>> StartNexusOperationAsync<TResult>(
185+
StartNexusOperationInput input) =>
186+
Next.StartNexusOperationAsync<TResult>(
187+
input with { Headers = root.HeaderFromContextForNexus(input.Headers) });
156188
}
157189

158190
private class ContextPropagationActivityInboundInterceptor : ActivityInboundInterceptor
@@ -166,4 +198,19 @@ public ContextPropagationActivityInboundInterceptor(
166198
public override Task<object?> ExecuteActivityAsync(ExecuteActivityInput input) =>
167199
root.WithHeadersApplied(input.Headers, () => Next.ExecuteActivityAsync(input));
168200
}
201+
202+
private class ContextPropagationNexusOperationInboundInterceptor : NexusOperationInboundInterceptor
203+
{
204+
private readonly ContextPropagationInterceptor<T> root;
205+
206+
public ContextPropagationNexusOperationInboundInterceptor(
207+
ContextPropagationInterceptor<T> root, NexusOperationInboundInterceptor next)
208+
: base(next) => this.root = root;
209+
210+
public override Task<OperationStartResult<object?>> ExecuteNexusOperationStartAsync(
211+
ExecuteNexusOperationStartInput input) =>
212+
root.WithHeadersAppliedForNexusAsync(
213+
input.Context.Headers,
214+
() => base.ExecuteNexusOperationStartAsync(input));
215+
}
169216
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
namespace TemporalioSamples.ContextPropagation;
2+
3+
using NexusRpc;
4+
5+
[NexusService]
6+
public interface INexusGreetingService
7+
{
8+
static readonly string EndpointName = "context-propagation-greeting-service";
9+
10+
[NexusOperation]
11+
GreetingOutput SayGreeting(GreetingInput input);
12+
13+
public record GreetingInput(string Name);
14+
15+
public record GreetingOutput(string Message);
16+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace TemporalioSamples.ContextPropagation;
2+
3+
using System.Threading.Tasks;
4+
using Microsoft.Extensions.Logging;
5+
using Temporalio.Workflows;
6+
7+
[Workflow]
8+
public class NexusGreetingHandlerWorkflow
9+
{
10+
[WorkflowQuery]
11+
public string? CapturedUserId { get; private set; }
12+
13+
[WorkflowRun]
14+
public async Task<INexusGreetingService.GreetingOutput> RunAsync(
15+
INexusGreetingService.GreetingInput input)
16+
{
17+
// Capture context to prove propagation through Nexus to handler workflow
18+
CapturedUserId = MyContext.UserId;
19+
Workflow.Logger.LogInformation(
20+
"Handler workflow executing for {Name}, called by user {UserId}",
21+
input.Name,
22+
CapturedUserId);
23+
24+
var message = $"Greeting for {input.Name} (processed by user: {CapturedUserId})";
25+
return await Task.FromResult(new INexusGreetingService.GreetingOutput(message));
26+
}
27+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
namespace TemporalioSamples.ContextPropagation;
2+
3+
using Microsoft.Extensions.Logging;
4+
using NexusRpc.Handlers;
5+
using Temporalio.Nexus;
6+
7+
[NexusServiceHandler(typeof(INexusGreetingService))]
8+
public class NexusGreetingService
9+
{
10+
[NexusOperationHandler]
11+
public IOperationHandler<INexusGreetingService.GreetingInput, INexusGreetingService.GreetingOutput> SayGreeting() =>
12+
WorkflowRunOperationHandler.FromHandleFactory(
13+
(WorkflowRunOperationContext context, INexusGreetingService.GreetingInput input) =>
14+
{
15+
// Log context to show it was propagated to the handler
16+
NexusOperationExecutionContext.Current.Logger.LogInformation(
17+
"Nexus greeting service called by user {UserId}", MyContext.UserId);
18+
19+
return context.StartWorkflowAsync(
20+
(NexusGreetingHandlerWorkflow wf) => wf.RunAsync(input),
21+
new() { Id = context.HandlerContext.RequestId });
22+
});
23+
}

src/ContextPropagation/Program.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ async Task RunWorkerAsync()
4040
client,
4141
new TemporalWorkerOptions(taskQueue: "interceptors-sample").
4242
AddAllActivities<SayHelloActivities>(new()).
43-
AddWorkflow<SayHelloWorkflow>());
43+
AddWorkflow<SayHelloWorkflow>().
44+
AddNexusService(new NexusGreetingService()).
45+
AddWorkflow<NexusGreetingHandlerWorkflow>());
4446
try
4547
{
4648
await worker.ExecuteAsync(tokenSource.Token);

src/ContextPropagation/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# Interceptors
22

3-
This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout the
4-
workflows and activities. While this demonstrates context propagation specifically, it can also be used to show how to
5-
create interceptors for any other purpose.
3+
This sample demonstrates how to use interceptors to propagate contextual information from an `AsyncLocal` throughout
4+
workflows, activities, and Nexus operations. While this demonstrates context propagation specifically, it can also be
5+
used to show how to create interceptors for any other purpose.
66

77
To run, first see [README.md](../../README.md) for prerequisites. Then, run the following from this directory in a
88
separate terminal to start the worker:
@@ -14,4 +14,4 @@ Then in another terminal, run the workflow from this directory:
1414
dotnet run workflow
1515

1616
The workflow terminal will show the completed workflow result and the worker terminal will show the contextual user ID
17-
is present in the workflow and activity.
17+
is present in the workflow, Nexus operation handler, Nexus handler workflow, and activity.

src/ContextPropagation/SayHelloWorkflow.workflow.cs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,16 @@ public async Task<string> RunAsync(string name)
1313
{
1414
Workflow.Logger.LogInformation("Workflow called by user {UserId}", MyContext.UserId);
1515

16-
// Wait for signal then run activity
16+
// Wait for signal then call Nexus service and run activity
1717
await Workflow.WaitConditionAsync(() => complete);
18+
19+
// Call Nexus service to demonstrate context propagation through Nexus
20+
var nexusClient = Workflow.CreateNexusClient<INexusGreetingService>(
21+
INexusGreetingService.EndpointName);
22+
var nexusResult = await nexusClient.ExecuteNexusOperationAsync(
23+
svc => svc.SayGreeting(new(name)));
24+
Workflow.Logger.LogInformation("Nexus result: {Result}", nexusResult.Message);
25+
1826
return await Workflow.ExecuteActivityAsync(
1927
(SayHelloActivities act) => act.SayHello(name),
2028
new() { StartToCloseTimeout = TimeSpan.FromMinutes(5) });

0 commit comments

Comments
 (0)