Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Temporalio" Version="1.7.0" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.6.0" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.6.0" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.6.0" />
<PackageReference Include="Temporalio" Version="1.9.0" />
<PackageReference Include="Temporalio.Extensions.DiagnosticSource" Version="1.9.0" />
<PackageReference Include="Temporalio.Extensions.Hosting" Version="1.9.0" />
<PackageReference Include="Temporalio.Extensions.OpenTelemetry" Version="1.9.0" />
<!--
Can also reference the SDK downloaded to a local directory:
<ProjectReference Include="$(MSBuildThisFileDirectory)..\temporal-sdk-dotnet\src\Temporalio\Temporalio.csproj" />
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Prerequisites:
* [DependencyInjection](src/DependencyInjection) - How to inject dependencies in activities and use generic hosts for workers
* [Encryption](src/Encryption) - End-to-end encryption with Temporal payload codecs.
* [Mutex](src/Mutex) - How to implement a mutex as a workflow. Demonstrates how to avoid race conditions or parallel mutually exclusive operations on the same resource.
* [NexusContextPropagation](src/NexusContextPropagation) - Context propagation through Nexus services.
* [OpenTelemetry](src/OpenTelemetry) - Demonstrates how to set up OpenTelemetry tracing and metrics for both the client and worker, using both the .NET metrics API and internal forwarding from the Core SDK.
* [Patching](src/Patching) - Alter workflows safely with Patch and DeprecatePatch.
* [Polling](src/Polling) - Recommended implementation of an activity that needs to periodically poll an external resource waiting its successful completion.
Expand Down
298 changes: 294 additions & 4 deletions TemporalioSamples.sln

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion src/ContextPropagation/MyContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,11 @@ namespace TemporalioSamples.ContextPropagation;

public static class MyContext
{
public static readonly AsyncLocal<string> UserId = new();
public static readonly AsyncLocal<string?> UserIdLocal = new();

public static string UserId
{
get => UserIdLocal.Value ?? "<unknown>";
set => UserIdLocal.Value = value;
}
}
12 changes: 6 additions & 6 deletions src/ContextPropagation/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
{
LoggerFactory = loggerFactory,
// This is where we set the interceptor to propagate context
Interceptors = new[]
{
new ContextPropagationInterceptor<string>(
MyContext.UserId,
Interceptors =
[
new ContextPropagationInterceptor<string?>(
MyContext.UserIdLocal,
DataConverter.Default.PayloadConverter),
},
],
});

async Task RunWorkerAsync()
Expand Down Expand Up @@ -53,7 +53,7 @@ async Task RunWorkerAsync()
async Task ExecuteWorkflowAsync()
{
// Set our user ID that can be accessed in the workflow and activity
MyContext.UserId.Value = "some-user";
MyContext.UserId = "some-user";

// Start workflow, send signal, wait for completion, issue query
logger.LogInformation("Executing workflow");
Expand Down
3 changes: 1 addition & 2 deletions src/ContextPropagation/SayHelloActivities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ public class SayHelloActivities
public string SayHello(string name)
{
ActivityExecutionContext.Current.Logger.LogInformation(
"Activity called by user {UserId}",
MyContext.UserId.Value);
"Activity called by user {UserId}", MyContext.UserId);
return $"Hello, {name}!";
}
}
12 changes: 3 additions & 9 deletions src/ContextPropagation/SayHelloWorkflow.workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ public class SayHelloWorkflow
[WorkflowRun]
public async Task<string> RunAsync(string name)
{
Workflow.Logger.LogInformation(
"Workflow called by user {UserId}",
MyContext.UserId.Value);
Workflow.Logger.LogInformation("Workflow called by user {UserId}", MyContext.UserId);

// Wait for signal then run activity
await Workflow.WaitConditionAsync(() => complete);
Expand All @@ -25,18 +23,14 @@ public async Task<string> RunAsync(string name)
[WorkflowSignal]
public async Task SignalCompleteAsync()
{
Workflow.Logger.LogInformation(
"Signal called by user {UserId}",
MyContext.UserId.Value);
Workflow.Logger.LogInformation("Signal called by user {UserId}", MyContext.UserId);
complete = true;
}

[WorkflowQuery]
public bool IsComplete()
{
Workflow.Logger.LogInformation(
"Query called by user {UserId}",
MyContext.UserId.Value);
Workflow.Logger.LogInformation("Query called by user {UserId}", MyContext.UserId);
return complete;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
namespace TemporalioSamples.NexusContextPropagation.Caller;

using Microsoft.Extensions.Logging;
using Temporalio.Workflows;
using TemporalioSamples.ContextPropagation;

[Workflow]
public class HelloCallerWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync(string name, IHelloService.HelloLanguage language)
{
Workflow.Logger.LogInformation("Caller workflow called by user {UserId}", MyContext.UserId);
var output = await Workflow.CreateNexusClient<IHelloService>(IHelloService.EndpointName).
ExecuteNexusOperationAsync(svc => svc.SayHello(new(name, language)));
return output.Message;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace TemporalioSamples.NexusContextPropagation.Handler;

using Microsoft.Extensions.Logging;
using Temporalio.Exceptions;
using Temporalio.Workflows;
using TemporalioSamples.ContextPropagation;

[Workflow]
public class HelloHandlerWorkflow
{
[WorkflowRun]
public async Task<IHelloService.HelloOutput> RunAsync(IHelloService.HelloInput input)
{
Workflow.Logger.LogInformation("Handler workflow called by user {UserId}", MyContext.UserId);
var message = input.Language switch
{
IHelloService.HelloLanguage.En => $"Hello {input.Name} 👋",
IHelloService.HelloLanguage.Fr => $"Bonjour {input.Name} 👋",
IHelloService.HelloLanguage.De => $"Hallo {input.Name} 👋",
IHelloService.HelloLanguage.Es => $"¡Hola! {input.Name} 👋",
IHelloService.HelloLanguage.Tr => $"Merhaba {input.Name} 👋",
_ => throw new ApplicationFailureException(
$"Unsupported language: {input.Language}", errorType: "UNSUPPORTED_LANGUAGE"),
};
return new($"{message} (user id: {MyContext.UserId})");
}
}
27 changes: 27 additions & 0 deletions src/NexusContextPropagation/Handler/HelloService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
namespace TemporalioSamples.NexusContextPropagation.Handler;

using Microsoft.Extensions.Logging;
using NexusRpc.Handlers;
using Temporalio.Nexus;
using TemporalioSamples.ContextPropagation;

[NexusServiceHandler(typeof(IHelloService))]
public class HelloService
{
[NexusOperationHandler]
public IOperationHandler<IHelloService.HelloInput, IHelloService.HelloOutput> SayHello() =>
// This Nexus service operation is backed by a workflow run
WorkflowRunOperationHandler.FromHandleFactory(
(WorkflowRunOperationContext context, IHelloService.HelloInput input) =>
{
NexusOperationExecutionContext.Current.Logger.LogInformation(
"Hello service called by user {UserId}", MyContext.UserId);
return context.StartWorkflowAsync(
(HelloHandlerWorkflow wf) => wf.RunAsync(input),
// Workflow IDs should typically be business meaningful IDs and are used to
// dedupe workflow starts. For this example, we're using the request ID
// allocated by Temporal when the caller workflow schedules the operation,
// this ID is guaranteed to be stable across retries of this operation.
new() { Id = context.HandlerContext.RequestId });
});
}
25 changes: 25 additions & 0 deletions src/NexusContextPropagation/IHelloService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
namespace TemporalioSamples.NexusContextPropagation;

using NexusRpc;

[NexusService]
public interface IHelloService
{
static readonly string EndpointName = "nexus-context-propagation-endpoint";

[NexusOperation]
HelloOutput SayHello(HelloInput input);

public record HelloInput(string Name, HelloLanguage Language);

public record HelloOutput(string Message);

public enum HelloLanguage
{
En,
Fr,
De,
Es,
Tr,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
namespace TemporalioSamples.NexusContextPropagation;

using System.Threading.Tasks;
using NexusRpc.Handlers;
using Temporalio.Client.Interceptors;
using Temporalio.Worker.Interceptors;
using Temporalio.Workflows;

public class NexusContextPropagationInterceptor(
AsyncLocal<string?> context,
string headerKey = "__my_context_key") : IClientInterceptor, IWorkerInterceptor
{
public NexusOperationInboundInterceptor InterceptNexusOperation(
NexusOperationInboundInterceptor nextInterceptor) =>
new NexusOperationInbound(context, headerKey, nextInterceptor);

public WorkflowInboundInterceptor InterceptWorkflow(
WorkflowInboundInterceptor nextInterceptor) =>
new WorkflowInbound(context, headerKey, nextInterceptor);

private class NexusOperationInbound(
AsyncLocal<string?> context,
string headerKey,
NexusOperationInboundInterceptor next) : NexusOperationInboundInterceptor(next)
{
public override Task<OperationStartResult<object?>> ExecuteNexusOperationStartAsync(
ExecuteNexusOperationStartInput input)
{
if (input.Context.Headers?.TryGetValue(headerKey, out var value) == true)
{
context.Value = value;
}
return base.ExecuteNexusOperationStartAsync(input);
}
}

private class WorkflowInbound(
AsyncLocal<string?> context,
string headerKey,
WorkflowInboundInterceptor next) : WorkflowInboundInterceptor(next)
{
public override void Init(WorkflowOutboundInterceptor outbound) =>
base.Init(new WorkflowOutbound(context, headerKey, outbound));
}

private class WorkflowOutbound(
AsyncLocal<string?> context,
string headerKey,
WorkflowOutboundInterceptor next) : WorkflowOutboundInterceptor(next)
{
public override Task<NexusOperationHandle<TResult>> StartNexusOperationAsync<TResult>(
StartNexusOperationInput input)
{
if (context.Value is { } value)
{
Dictionary<string, string> headers =
input.Headers != null ? new(input.Headers!) : new(1);
headers.Add(headerKey, value);
input = input with { Headers = headers };
}
return base.StartNexusOperationAsync<TResult>(input);
}
}
}
106 changes: 106 additions & 0 deletions src/NexusContextPropagation/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Converters;
using Temporalio.Worker;
using TemporalioSamples.ContextPropagation;
using TemporalioSamples.NexusContextPropagation;
using TemporalioSamples.NexusContextPropagation.Caller;
using TemporalioSamples.NexusContextPropagation.Handler;

using var loggerFactory = LoggerFactory.Create(builder =>
builder.
AddSimpleConsole(options => options.TimestampFormat = "[HH:mm:ss] ").
SetMinimumLevel(LogLevel.Information));
var logger = loggerFactory.CreateLogger<Program>();

// Cancellation token cancelled on ctrl+c
using var tokenSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, eventArgs) =>
{
tokenSource.Cancel();
eventArgs.Cancel = true;
};

Task<TemporalClient> ConnectClientAsync(string temporalNamespace) =>
TemporalClient.ConnectAsync(new("localhost:7233")
{
Namespace = temporalNamespace,
LoggerFactory = loggerFactory,
// This is where we set the interceptor to propagate context
Interceptors =
[
new ContextPropagationInterceptor<string?>(
MyContext.UserIdLocal,
DataConverter.Default.PayloadConverter),
// Separate interceptor just for moving in and out of Nexus operation headers. This could
// have been implemented in the ContextPropagationInterceptor, but for sample logic
// separation, it was added as a separate interceptor in this project instead.
new NexusContextPropagationInterceptor(MyContext.UserIdLocal),
],
});

async Task RunHandlerWorkerAsync()
{
// Run worker until cancelled
logger.LogInformation("Running handler worker");
using var worker = new TemporalWorker(
await ConnectClientAsync("nexus-context-propagation-handler-namespace"),
new TemporalWorkerOptions(taskQueue: "nexus-context-propagation-handler-sample").
AddNexusService(new HelloService()).
AddWorkflow<HelloHandlerWorkflow>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
logger.LogInformation("Handler worker cancelled");
}
}

async Task RunCallerWorkerAsync()
{
// Run worker until cancelled
logger.LogInformation("Running caller worker");
using var worker = new TemporalWorker(
await ConnectClientAsync("nexus-context-propagation-caller-namespace"),
new TemporalWorkerOptions(taskQueue: "nexus-context-propagation-caller-sample").
AddWorkflow<HelloCallerWorkflow>());
try
{
await worker.ExecuteAsync(tokenSource.Token);
}
catch (OperationCanceledException)
{
logger.LogInformation("Caller worker cancelled");
}
}

async Task ExecuteCallerWorkflowAsync()
{
// Set our user ID that can be accessed in the workflows and Nexus service
MyContext.UserId = "some-user";

logger.LogInformation("Executing caller workflow");
var client = await ConnectClientAsync("nexus-context-propagation-caller-namespace");
var result = await client.ExecuteWorkflowAsync(
(HelloCallerWorkflow wf) => wf.RunAsync("Temporal", IHelloService.HelloLanguage.Es),
new(id: "nexus-context-propagation-id", taskQueue: "nexus-context-propagation-caller-sample"));
logger.LogInformation("Workflow result: {Result}", result);
}

switch (args.ElementAtOrDefault(0))
{
case "handler-worker":
await RunHandlerWorkerAsync();
break;
case "caller-worker":
await RunCallerWorkerAsync();
break;
case "caller-workflow":
await ExecuteCallerWorkflowAsync();
break;
default:
throw new ArgumentException(
"Must pass 'handler-worker', 'caller-worker', or 'caller-workflow' as the single argument");
}
Loading
Loading