Skip to content

Commit

Permalink
Add nats
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaslorentz committed Apr 24, 2023
1 parent 40bf025 commit 3058527
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 17 deletions.
7 changes: 7 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ services:
timeout: 5s
retries: 5

nats:
image: nats
ports:
- 4222:4222
- 6222:6222
- 8222:8222

volumes:
mysql:
sqlserver:
Expand Down
2 changes: 1 addition & 1 deletion src/LLL.DurableTask.EFCore/EFCoreOrchestrationOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ public class EFCoreOrchestrationOptions
public TimeSpan OrchestrationLockTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan ActivtyLockTimeout { get; set; } = TimeSpan.FromMinutes(1);
public TimeSpan FetchNewMessagesPollingTimeout { get; set; } = TimeSpan.FromSeconds(10);
public int DelayInSecondsAfterFailure { get; set; } = 5;
public int DelayInSecondsAfterFailure { get; set; } = 0;
}
}
47 changes: 42 additions & 5 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NATS.Client;

namespace LLL.DurableTask.EFCore
{
Expand All @@ -30,6 +31,8 @@ public partial class EFCoreOrchestrationService :
private readonly ILogger<EFCoreOrchestrationService> _logger;

private CancellationTokenSource _stopCts = new CancellationTokenSource();
private IConnection _connection;
private IAsyncSubscription _subscription;

public EFCoreOrchestrationService(
IOptions<EFCoreOrchestrationOptions> options,
Expand All @@ -49,6 +52,12 @@ public EFCoreOrchestrationService(
_instanceMapper = instanceMapper;
_executionMapper = executionMapper;
_logger = logger;


var cf = new ConnectionFactory();
_connection = cf.CreateConnection();
_subscription = _connection.SubscribeAsync(">");
_subscription.Start();
}

public int TaskOrchestrationDispatcherCount => _options.TaskOrchestrationDispatcherCount;
Expand Down Expand Up @@ -152,7 +161,8 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
instance,
execution,
runtimeState,
_stopCts.Token);
_stopCts.Token,
_subscription);

var messages = await session.FetchNewMessagesAsync(dbContext);

Expand All @@ -179,7 +189,10 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
r => r != null,
receiveTimeout,
_options.PollingInterval,
stoppableCancellationToken);
stoppableCancellationToken,
BackoffPollingHelper.CreateNatsWaitUntilSignal(
_subscription,
orchestrations.Select(nv => $"orchestration.{QueueMapper.ToQueue(nv)}.*").ToHashSet()));
}

public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan receiveTimeout, CancellationToken cancellationToken)
Expand Down Expand Up @@ -215,7 +228,10 @@ public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(
x => x != null,
receiveTimeout,
_options.PollingInterval,
stoppableCancellationToken);
stoppableCancellationToken,
BackoffPollingHelper.CreateNatsWaitUntilSignal(
_subscription,
activities.Select(nv => $"activitiy.{QueueMapper.ToQueue(nv)}").ToHashSet()));
}

public async Task ReleaseTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkItem workItem)
Expand Down Expand Up @@ -398,16 +414,32 @@ await _dbContextExtensions.WithinTransaction(dbContext, async () =>
session.RuntimeState = newOrchestrationRuntimeState;
session.ClearMessages();
}

// Notify
foreach (var executionStartedEvent in orchestratorMessages.Select(m => m.Event).OfType<ExecutionStartedEvent>())
{
_connection.Publish($"orchestration.{QueueMapper.ToQueue(executionStartedEvent.Name, executionStartedEvent.Version)}.{executionStartedEvent.OrchestrationInstance.InstanceId}",
Array.Empty<byte>());
}

foreach (var taskSheduledEvent in outboundMessages.Select(m => m.Event).OfType<TaskScheduledEvent>())
{
_connection.Publish($"activitiy.{QueueMapper.ToQueue(taskSheduledEvent.Name, taskSheduledEvent.Version)}", Array.Empty<byte>());
}

_connection.Publish($"history.{workItem.InstanceId}", Array.Empty<byte>());

_connection.Flush();
}

public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workItem, TaskMessage responseMessage)
{
var (id, lockId, orchestrationQueue) = ParseTaskActivityWorkItemId(workItem.Id);

using (var dbContext = _dbContextFactory.CreateDbContext())
{
await _dbContextExtensions.WithinTransaction(dbContext, async () =>
{
var (id, lockId, orchestrationQueue) = ParseTaskActivityWorkItemId(workItem.Id);

var activityMessage = await dbContext.ActivityMessages
.FirstAsync(w => w.Id == id && w.LockId == lockId);

Expand All @@ -423,6 +455,11 @@ await _dbContextExtensions.WithinTransaction(dbContext, async () =>
await dbContext.SaveChangesAsync();
});
}

// Notify
_connection.Publish($"orchestration.{orchestrationQueue}.{workItem.TaskMessage.OrchestrationInstance.InstanceId}", Array.Empty<byte>());

_connection.Flush();
}

private async Task SendTaskOrchestrationMessagesAsync(
Expand Down
13 changes: 10 additions & 3 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationServiceClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ public Task CreateTaskOrchestrationAsync(TaskMessage creationMessage)

public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, OrchestrationStatus[] dedupeStatuses)
{
var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;

using (var dbContext = _dbContextFactory.CreateDbContext())
{
var executionStartedEvent = creationMessage.Event as ExecutionStartedEvent;

var instanceId = creationMessage.OrchestrationInstance.InstanceId;
var executionId = creationMessage.OrchestrationInstance.ExecutionId;

Expand Down Expand Up @@ -75,6 +75,8 @@ public async Task CreateTaskOrchestrationAsync(TaskMessage creationMessage, Orch

await dbContext.SaveChangesAsync();
}

_connection.Publish($"orchestration.{QueueMapper.ToQueue(executionStartedEvent.Name, executionStartedEvent.Version)}.{executionStartedEvent.OrchestrationInstance.InstanceId}", Array.Empty<byte>());
}

public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
Expand All @@ -95,6 +97,7 @@ public async Task<string> GetOrchestrationHistoryAsync(string instanceId, string
var events = await dbContext.Events
.Where(e => e.ExecutionId == executionId)
.OrderBy(e => e.SequenceNumber)
.AsNoTracking()
.ToArrayAsync();

return $"[{string.Join(",", events.Select(e => e.Content))}]";
Expand Down Expand Up @@ -190,7 +193,10 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
s => IsFinalExecutionStatus(s.OrchestrationStatus),
timeout,
_options.PollingInterval,
stoppableCancellationToken);
stoppableCancellationToken,
BackoffPollingHelper.CreateNatsWaitUntilSignal(
_subscription,
new HashSet<string> { $"history.{instanceId}" }));

if (!IsFinalExecutionStatus(state.OrchestrationStatus))
return null;
Expand Down Expand Up @@ -220,6 +226,7 @@ public async Task<OrchestrationQueryResult> GetOrchestrationWithQueryAsync(Orche
.OrderByDescending(x => x.CreatedTime)
.ThenByDescending(x => x.InstanceId)
.Take(query.PageSize + 1)
.AsNoTracking()
.ToArrayAsync();

var pageInstances = instances
Expand Down
11 changes: 9 additions & 2 deletions src/LLL.DurableTask.EFCore/EFCoreOrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using LLL.DurableTask.EFCore.Entities;
using LLL.DurableTask.EFCore.Polling;
using Microsoft.EntityFrameworkCore;
using NATS.Client;

namespace LLL.DurableTask.EFCore
{
Expand All @@ -16,21 +17,24 @@ public class EFCoreOrchestrationSession : IOrchestrationSession

private readonly IDbContextFactory<OrchestrationDbContext> _dbContextFactory;
private readonly CancellationToken _stopCancellationToken;
private readonly IAsyncSubscription _subscription;

public EFCoreOrchestrationSession(
EFCoreOrchestrationOptions options,
IDbContextFactory<OrchestrationDbContext> dbContextFactory,
Instance instance,
Execution execution,
OrchestrationRuntimeState runtimeState,
CancellationToken stopCancellationToken)
CancellationToken stopCancellationToken,
IAsyncSubscription subscription)
{
_options = options;
_dbContextFactory = dbContextFactory;
Instance = instance;
Execution = execution;
RuntimeState = runtimeState;
_stopCancellationToken = stopCancellationToken;
_subscription = subscription;
}

public Instance Instance { get; }
Expand All @@ -55,7 +59,10 @@ public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
x => x == null || x.Count > 0,
_options.FetchNewMessagesPollingTimeout,
_options.PollingInterval,
_stopCancellationToken);
_stopCancellationToken,
BackoffPollingHelper.CreateNatsWaitUntilSignal(
_subscription,
new HashSet<string> { $"orchestration.{Instance.LastQueue}.{Instance.InstanceId}" }));
}

public async Task<IList<TaskMessage>> FetchNewMessagesAsync(
Expand Down
1 change: 1 addition & 0 deletions src/LLL.DurableTask.EFCore/LLL.DurableTask.EFCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
<PackageReference Include="Microsoft.EntityFrameworkCore" Version="7.0.2" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Relational" Version="7.0.2" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1" PrivateAssets="all" />
<PackageReference Include="NATS.Client" Version="1.0.1" />
</ItemGroup>

<ItemGroup>
Expand Down
66 changes: 60 additions & 6 deletions src/LLL.DurableTask.EFCore/Polling/BackoffPollingHelper.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using NATS.Client;

namespace LLL.DurableTask.EFCore.Polling
{
Expand All @@ -12,7 +16,11 @@ public static async Task<T> PollAsync<T>(
Func<T, bool> shouldAcceptValue,
TimeSpan timeout,
PollingIntervalOptions interval,
CancellationToken cancellationToken)
CancellationToken cancellationToken,
Func<CancellationToken, Task> waitFunction = null,
[CallerMemberName] string memberName = "",
[CallerFilePath] string fileName = "",
[CallerLineNumber] int lineNumber = 0)
{
T value;

Expand All @@ -22,13 +30,29 @@ public static async Task<T> PollAsync<T>(
{
cancellationToken.ThrowIfCancellationRequested();

value = await valueProvider();
using var waitCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var waitTask = (waitFunction ?? WaitUntilCancellation)(waitCts.Token);

if (shouldAcceptValue(value)
|| stopwatch.Elapsed >= timeout)
break;
try
{
value = await valueProvider();

await Task.Delay(CalculateDelay(interval, count++));
if (shouldAcceptValue(value)
|| stopwatch.Elapsed >= timeout)
break;

waitCts.CancelAfter(CalculateDelay(interval, count++));

try
{
await waitTask;
}
catch (OperationCanceledException) { }
}
finally
{
waitCts.Cancel();
}
} while (stopwatch.Elapsed < timeout);

return value;
Expand All @@ -38,5 +62,35 @@ private static int CalculateDelay(PollingIntervalOptions interval, int count)
{
return (int)Math.Min(interval.Initial * Math.Pow(interval.Factor, count), interval.Max);
}

public static async Task WaitUntilCancellation(CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource();
using var registration = cancellationToken.Register(() => tcs.TrySetCanceled());
await tcs.Task;
}

public static Func<CancellationToken, Task> CreateNatsWaitUntilSignal(IAsyncSubscription subscription, HashSet<string> subjects)
{
return async (cancellationToken) =>
{
var tcs = new TaskCompletionSource();
void OnMessage(object o, MsgHandlerEventArgs e)
{
if (subjects.Contains(e.Message.Subject) || subjects.Any(s => e.Message.Subject.StartsWith(s)))
tcs.TrySetResult();
};
subscription.MessageHandler += OnMessage;
using var registration = cancellationToken.Register(() => tcs.TrySetCanceled());
try
{
await tcs.Task;
}
finally
{
subscription.MessageHandler -= OnMessage;
}
};
}
}
}

0 comments on commit 3058527

Please sign in to comment.