Skip to content

Commit 4b62d60

Browse files
authored
Nexus context propagation sample (temporalio#108)
Fixes temporalio#105
1 parent 0c5586f commit 4b62d60

19 files changed

Lines changed: 704 additions & 31 deletions

Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
</PropertyGroup>
2020

2121
<ItemGroup>
22-
<PackageReference Include="Temporalio" Version="1.7.0" />
23-
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.6.0" />
24-
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.6.0" />
25-
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.6.0" />
22+
<PackageReference Include="Temporalio" Version="1.9.0" />
23+
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.9.0" />
24+
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.9.0" />
25+
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.9.0" />
2626
<!--
2727
Can also reference the SDK downloaded to a local directory:
2828
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Prerequisites:
2323
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
2424
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
2525
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
26+
* [NexusContextPropagation](src/NexusContextPropagation) - Context propagation through Nexus services.
2627
* [OpenTelemetry](src/OpenTelemetry) - Demonstrates how to set up OpenTelemetry tracing and metrics for both the client and worker, using both the .NET metrics API and internal forwarding from the Core SDK.
2728
* [Patching](src/Patching) - Alter workflows safely with Patch and DeprecatePatch.
2829
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.

TemporalioSamples.sln

Lines changed: 294 additions & 4 deletions
Large diffs are not rendered by default.

src/ContextPropagation/MyContext.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,11 @@ namespace TemporalioSamples.ContextPropagation;
22

33
public static class MyContext
44
{
5-
public static readonly AsyncLocal<string> UserId = new();
5+
public static readonly AsyncLocal<string?> UserIdLocal = new();
6+
7+
public static string UserId
8+
{
9+
get => UserIdLocal.Value ?? "<unknown>";
10+
set => UserIdLocal.Value = value;
11+
}
612
}

src/ContextPropagation/Program.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
{
1616
LoggerFactory = loggerFactory,
1717
// This is where we set the interceptor to propagate context
18-
Interceptors = new[]
19-
{
20-
new ContextPropagationInterceptor<string>(
21-
MyContext.UserId,
18+
Interceptors =
19+
[
20+
new ContextPropagationInterceptor<string?>(
21+
MyContext.UserIdLocal,
2222
DataConverter.Default.PayloadConverter),
23-
},
23+
],
2424
});
2525

2626
async Task RunWorkerAsync()
@@ -53,7 +53,7 @@ async Task RunWorkerAsync()
5353
async Task ExecuteWorkflowAsync()
5454
{
5555
// Set our user ID that can be accessed in the workflow and activity
56-
MyContext.UserId.Value = "some-user";
56+
MyContext.UserId = "some-user";
5757

5858
// Start workflow, send signal, wait for completion, issue query
5959
logger.LogInformation("Executing workflow");

src/ContextPropagation/SayHelloActivities.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ public class SayHelloActivities
99
public string SayHello(string name)
1010
{
1111
ActivityExecutionContext.Current.Logger.LogInformation(
12-
"Activity called by user {UserId}",
13-
MyContext.UserId.Value);
12+
"Activity called by user {UserId}", MyContext.UserId);
1413
return $"Hello, {name}!";
1514
}
1615
}

src/ContextPropagation/SayHelloWorkflow.workflow.cs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@ public class SayHelloWorkflow
1111
[WorkflowRun]
1212
public async Task<string> RunAsync(string name)
1313
{
14-
Workflow.Logger.LogInformation(
15-
"Workflow called by user {UserId}",
16-
MyContext.UserId.Value);
14+
Workflow.Logger.LogInformation("Workflow called by user {UserId}", MyContext.UserId);
1715

1816
// Wait for signal then run activity
1917
await Workflow.WaitConditionAsync(() => complete);
@@ -25,18 +23,14 @@ public async Task<string> RunAsync(string name)
2523
[WorkflowSignal]
2624
public async Task SignalCompleteAsync()
2725
{
28-
Workflow.Logger.LogInformation(
29-
"Signal called by user {UserId}",
30-
MyContext.UserId.Value);
26+
Workflow.Logger.LogInformation("Signal called by user {UserId}", MyContext.UserId);
3127
complete = true;
3228
}
3329

3430
[WorkflowQuery]
3531
public bool IsComplete()
3632
{
37-
Workflow.Logger.LogInformation(
38-
"Query called by user {UserId}",
39-
MyContext.UserId.Value);
33+
Workflow.Logger.LogInformation("Query called by user {UserId}", MyContext.UserId);
4034
return complete;
4135
}
4236
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
namespace TemporalioSamples.NexusContextPropagation.Caller;
2+
3+
using Microsoft.Extensions.Logging;
4+
using Temporalio.Workflows;
5+
using TemporalioSamples.ContextPropagation;
6+
7+
[Workflow]
8+
public class HelloCallerWorkflow
9+
{
10+
[WorkflowRun]
11+
public async Task<string> RunAsync(string name, IHelloService.HelloLanguage language)
12+
{
13+
Workflow.Logger.LogInformation("Caller workflow called by user {UserId}", MyContext.UserId);
14+
var output = await Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName).
15+
ExecuteNexusOperationAsync(svc => svc.SayHello(new(name, language)));
16+
return output.Message;
17+
}
18+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace TemporalioSamples.NexusContextPropagation.Handler;
2+
3+
using Microsoft.Extensions.Logging;
4+
using Temporalio.Exceptions;
5+
using Temporalio.Workflows;
6+
using TemporalioSamples.ContextPropagation;
7+
8+
[Workflow]
9+
public class HelloHandlerWorkflow
10+
{
11+
[WorkflowRun]
12+
public async Task<IHelloService.HelloOutput> RunAsync(IHelloService.HelloInput input)
13+
{
14+
Workflow.Logger.LogInformation("Handler workflow called by user {UserId}", MyContext.UserId);
15+
var message = input.Language switch
16+
{
17+
IHelloService.HelloLanguage.En => $"Hello {input.Name} 👋",
18+
IHelloService.HelloLanguage.Fr => $"Bonjour {input.Name} 👋",
19+
IHelloService.HelloLanguage.De => $"Hallo {input.Name} 👋",
20+
IHelloService.HelloLanguage.Es => $"¡Hola! {input.Name} 👋",
21+
IHelloService.HelloLanguage.Tr => $"Merhaba {input.Name} 👋",
22+
_ => throw new ApplicationFailureException(
23+
$"Unsupported language: {input.Language}", errorType: "UNSUPPORTED_LANGUAGE"),
24+
};
25+
return new($"{message} (user id: {MyContext.UserId})");
26+
}
27+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
namespace TemporalioSamples.NexusContextPropagation.Handler;
2+
3+
using Microsoft.Extensions.Logging;
4+
using NexusRpc.Handlers;
5+
using Temporalio.Nexus;
6+
using TemporalioSamples.ContextPropagation;
7+
8+
[NexusServiceHandler(typeof(IHelloService))]
9+
public class HelloService
10+
{
11+
[NexusOperationHandler]
12+
public IOperationHandler<IHelloService.HelloInput, IHelloService.HelloOutput> SayHello() =>
13+
// This Nexus service operation is backed by a workflow run
14+
WorkflowRunOperationHandler.FromHandleFactory(
15+
(WorkflowRunOperationContext context, IHelloService.HelloInput input) =>
16+
{
17+
NexusOperationExecutionContext.Current.Logger.LogInformation(
18+
"Hello service called by user {UserId}", MyContext.UserId);
19+
return context.StartWorkflowAsync(
20+
(HelloHandlerWorkflow wf) => wf.RunAsync(input),
21+
// Workflow IDs should typically be business meaningful IDs and are used to
22+
// dedupe workflow starts. For this example, we're using the request ID
23+
// allocated by Temporal when the caller workflow schedules the operation,
24+
// this ID is guaranteed to be stable across retries of this operation.
25+
new() { Id = context.HandlerContext.RequestId });
26+
});
27+
}

0 commit comments

Comments
 (0)