Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ await this.taskHub.AddTaskOrchestrations(typeof (InstanceStoreTestOrchestration)
[TestMethod]
public async Task SegmentedQueryUnequalCountsTest()
{
await this.queryClient.InitializeStoreAsync(true);

await this.taskHub.AddTaskOrchestrations(typeof (InstanceStoreTestOrchestration),
typeof (InstanceStoreTestOrchestration2))
.AddTaskActivities(new Activity1())
Expand Down Expand Up @@ -158,11 +160,16 @@ await this.client.CreateOrchestrationInstanceAsync(
[TestMethod]
public async Task PurgeOrchestrationHistoryTest()
{
await this.queryClient.InitializeStoreAsync(true);



await this.taskHub.AddTaskOrchestrations(typeof (InstanceStoreTestOrchestration),
typeof (InstanceStoreTestOrchestration2))
.AddTaskActivities(new Activity1())
.StartAsync();


for (var i = 0; i < 25; i++)
{
string instanceId = "apiservice" + i;
Expand Down Expand Up @@ -222,6 +229,8 @@ await this.client.PurgeOrchestrationInstanceHistoryAsync
[TestMethod]
public async Task PurgeManyOrchestrationHistoryTest()
{
await this.queryClient.InitializeStoreAsync(true);

await this.taskHub.AddTaskOrchestrations(typeof (InstanceStoreTestOrchestration),
typeof (InstanceStoreTestOrchestration2))
.AddTaskActivities(new Activity1())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,10 @@ IEnumerable<AzureTableOrchestrationStateEntity> CreateStateEntities(AzureTableCl
Input = "INPUT_" + instanceId + "_" + genId,
Output = null
};
tableClient.WriteEntitiesAsync(new AzureTableOrchestrationStateEntity(runtimeState)).Wait();


entities.Add(new AzureTableOrchestrationStateEntity(runtimeState));
azureTableClient.WriteEntitiesAsync(entities).Wait();

return entities;
}

Expand Down
2 changes: 1 addition & 1 deletion Test/DurableTask.ServiceBusAMS.Tests/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static ServiceBusOrchestrationServiceSettings CreateTestClientSettings()
}

// ReSharper disable once UnusedParameter.Local
static IOrchestrationService CreateOrchestrationServiceWorker(
public static IOrchestrationService CreateOrchestrationServiceWorker(
ServiceBusOrchestrationServiceSettings settings,
TimeSpan jumpStartAttemptInterval)
{
Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public static IEnumerable<Type> KnownTypes()
/// </summary>
internal HistoryEvent()
{
Timestamp = DateTime.UtcNow;
Timestamp = DateTime.SpecifyKind(DateTime.UtcNow, DateTimeKind.Utc);
}

/// <summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
<PropertyGroup>
<MajorVersion>2</MajorVersion>
<MinorVersion>7</MinorVersion>
<PatchVersion>1</PatchVersion>
<PatchVersion>4</PatchVersion>

<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<FileVersion>$(VersionPrefix).0</FileVersion>
<FileVersion>$(VersionPrefix).1</FileVersion>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
<FileVersion Condition="'$(FileVersionRevision)' != ''">$(VersionPrefix).$(FileVersionRevision)</FileVersion>
<!-- The assembly version is only the major/minor pair, making it easier to do in-place upgrades -->
Expand Down Expand Up @@ -45,6 +45,7 @@
<PackageReference Include="Azure.Storage.Common" Version="12.15.0" />
<PackageReference Include="System.Linq.Async" Version="4.0.0" />
<PackageReference Include="Microsoft.CSharp" Version="4.7.0" />
<Reference Include="System.Transactions" />
</ItemGroup>

<ItemGroup>
Expand Down
116 changes: 87 additions & 29 deletions src/DurableTask.ServiceBusAMS/ServiceBusOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace DurableTask.ServiceBus
using Azure.Messaging.ServiceBus.Administration;
using Azure;
using DurableTask.ServiceBus.Common.Abstraction;
using System.Transactions;

/// <summary>
/// Orchestration Service and Client implementation using Azure Service Bus
Expand Down Expand Up @@ -607,7 +608,7 @@ public async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAs
return new TaskOrchestrationWorkItem
{
InstanceId = receiver.SessionId,
LockedUntilUtc = receiver.SessionLockedUntil.DateTime,
LockedUntilUtc = receiver.SessionLockedUntil.UtcDateTime,
NewMessages = newTaskMessages.ToList(),
OrchestrationRuntimeState = runtimeState
};
Expand All @@ -625,7 +626,7 @@ Task UpdateInstanceStoreAsync(ExecutionStartedEvent executionStartedEvent, long
OrchestrationStatus = OrchestrationStatus.Pending,
Input = executionStartedEvent.Input,
Tags = executionStartedEvent.Tags,
CreatedTime = executionStartedEvent.Timestamp,
CreatedTime = DateTime.SpecifyKind(executionStartedEvent.Timestamp, DateTimeKind.Utc),
LastUpdatedTime = DateTime.UtcNow,
CompletedTime = DateTimeUtils.MinDateTime,
ParentInstance = executionStartedEvent.ParentInstance,
Expand Down Expand Up @@ -701,7 +702,7 @@ public async Task RenewTaskOrchestrationWorkItemLockAsync(TaskOrchestrationWorkI
}
}
this.ServiceStats.OrchestrationDispatcherStats.SessionsRenewed.Increment();
workItem.LockedUntilUtc = sessionState.SessionReceiver.SessionLockedUntil.DateTime;
workItem.LockedUntilUtc = sessionState.SessionReceiver.SessionLockedUntil.UtcDateTime;
}

/// <summary>
Expand Down Expand Up @@ -855,23 +856,42 @@ public async Task CompleteTaskOrchestrationWorkItemAsync(
}
}

TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItemMessages",
runtimeState.OrchestrationInstance,
() =>
{
string allIds = string.Join(" ", sessionState.Messages.Select(m => $"[SEQ: {m.SequenceNumber} LT: {m.LockToken}]"));
return $"Completing orchestration messages sequence and lock tokens: {allIds}";
});

await RenewTaskOrchestrationWorkItemLockAsync(workItem);

foreach (var value in sessionState.Messages)
using (var ts = new System.Transactions.TransactionScope(System.Transactions.TransactionScopeAsyncFlowOption.Enabled))
{
await session.CompleteMessageAsync(value);
Transaction.Current.TransactionCompleted += (o, e) =>
TraceHelper.TraceInstance(
e.Transaction.TransactionInformation.Status == TransactionStatus.Committed ? TraceEventType.Information : TraceEventType.Error,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-TransactionComplete",
runtimeState.OrchestrationInstance,
() => $@"Orchestration Transaction Completed {e.Transaction.TransactionInformation.LocalIdentifier} status: {e.Transaction.TransactionInformation.Status}");

TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItem-CreateTransaction",
runtimeState.OrchestrationInstance,
() => $@"Created new Orchestration Transaction - txnid: {Transaction.Current.TransactionInformation.LocalIdentifier}");

TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskOrchestrationWorkItemMessages",
runtimeState.OrchestrationInstance,
() =>
{
string allIds = string.Join(" ", sessionState.Messages.Select(m => $"[SEQ: {m.SequenceNumber} LT: {m.LockToken}]"));
return $"Completing orchestration messages sequence and lock tokens: {allIds}";
});

foreach (var value in sessionState.Messages)
{
await session.CompleteMessageAsync(value);
}
ts.Complete();
}

await session.CloseAsync();

this.ServiceStats.OrchestrationDispatcherStats.SessionBatchesCompleted.Increment();
}

Expand Down Expand Up @@ -909,18 +929,25 @@ public async Task AbandonTaskOrchestrationWorkItemAsync(TaskOrchestrationWorkIte
}

TraceHelper.TraceSession(TraceEventType.Error, "ServiceBusOrchestrationService-AbandonTaskOrchestrationWorkItem", workItem.InstanceId, "Abandoning {0} messages due to work item abort", sessionState.Messages.Count());
foreach (var message in sessionState.Messages)
{
await sessionState.SessionReceiver.AbandonMessageAsync(message);
}

try
{
await sessionState.SessionReceiver.CloseAsync();
foreach (var message in sessionState.Messages)
{
await sessionState.SessionReceiver.AbandonMessageAsync(message);
}
}
catch (Exception ex) when (!Utils.IsFatal(ex))
finally
{
TraceHelper.TraceExceptionSession(TraceEventType.Warning, "ServiceBusOrchestrationService-AbandonTaskOrchestrationWorkItemError", workItem.InstanceId, ex, "Error while aborting session");
// call close async to release the session
try
{
await sessionState.SessionReceiver.CloseAsync();
}
catch (Exception ex)
{
TraceHelper.TraceExceptionSession(TraceEventType.Error, "ServiceBusOrchestrationService-AbandonTaskOrchestrationWorkItemError", workItem.InstanceId, ex, "Could not close session receiver");
}
}
}

Expand Down Expand Up @@ -969,7 +996,7 @@ public async Task<TaskActivityWorkItem> LockNextTaskActivityWorkItem(TimeSpan re
return new TaskActivityWorkItem
{
Id = receivedMessage.MessageId,
LockedUntilUtc = receivedMessage.LockedUntil.DateTime,
LockedUntilUtc = receivedMessage.LockedUntil.UtcDateTime,
TaskMessage = taskMessage
};
}
Expand Down Expand Up @@ -1007,7 +1034,7 @@ public async Task<TaskActivityWorkItem> RenewTaskActivityWorkItemLockAsync(TaskA
if (message != null)
{
await this.workerReceiver.RenewMessageLockAsync(message);
workItem.LockedUntilUtc = message.LockedUntil.DateTime;
workItem.LockedUntilUtc = message.LockedUntil.UtcDateTime;
this.ServiceStats.ActivityDispatcherStats.SessionsRenewed.Increment();
}

Expand Down Expand Up @@ -1037,7 +1064,26 @@ public async Task CompleteTaskActivityWorkItemAsync(TaskActivityWorkItem workIte
throw new ArgumentNullException("originalMessage");
}

await this.workerReceiver.CompleteMessageAsync(originalMessage);
using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
Transaction.Current.TransactionCompleted += (o, e) =>
TraceHelper.TraceInstance(
e.Transaction.TransactionInformation.Status == TransactionStatus.Committed ? TraceEventType.Information : TraceEventType.Error,
"ServiceBusOrchestrationService-CompleteTaskActivityWorkItem-TransactionComplete",
workItem.TaskMessage.OrchestrationInstance,
() => $@"TaskActivity Transaction Completed {e.Transaction.TransactionInformation.LocalIdentifier} status: {e.Transaction.TransactionInformation.Status}");

TraceHelper.TraceInstance(
TraceEventType.Information,
"ServiceBusOrchestrationService-CompleteTaskActivityWorkItem-CreateTransaction",
workItem.TaskMessage.OrchestrationInstance,
() => $@"Created new TaskActivity Transaction - txnid: {Transaction.Current.TransactionInformation.LocalIdentifier} - message sequence and lock token: [SEQ: {originalMessage.SequenceNumber} LT: {originalMessage.LockToken}]");



await this.workerReceiver.CompleteMessageAsync(originalMessage);
ts.Complete();
}
await this.orchestratorSender.SendMessageAsync(brokeredResponseMessage);
this.ServiceStats.ActivityDispatcherStats.SessionBatchesCompleted.Increment();
this.ServiceStats.OrchestrationDispatcherStats.MessagesSent.Increment();
Expand Down Expand Up @@ -1511,12 +1557,24 @@ async Task ProcessTrackingWorkItemAsync(TrackingWorkItem workItem)
}

// Cleanup our session

foreach (var message in sessionState.Messages)
try
{
await sessionState.SessionReceiver.CompleteMessageAsync(message);
foreach (var message in sessionState.Messages)
{
await sessionState.SessionReceiver.CompleteMessageAsync(message);
}
}
finally
{
try
{
await sessionState.SessionReceiver.CloseAsync();
}
catch (Exception ex)
{
TraceHelper.TraceException( TraceEventType.Error, "ServiceBusOrchestrationService-ProcessTrackingWorkItemAsync", ex, $"could not close session receiver for tracking work item");
}
}
await sessionState.SessionReceiver.CloseAsync();
}

void TraceEntities<T>(
Expand Down
22 changes: 18 additions & 4 deletions src/DurableTask.ServiceBusAMS/Tracking/AzureTableClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace DurableTask.ServiceBus.Tracking
using Azure;
using Azure.Data.Tables;
using Azure.Core;
using static Microsoft.Azure.Amqp.Serialization.SerializableType;

internal class AzureTableClient
{
Expand Down Expand Up @@ -139,14 +140,27 @@ public async Task<IEnumerable<AzureTableOrchestrationStateEntity>> QueryOrchestr
return await ReadAllEntitiesAsync<AzureTableOrchestrationStateEntity>(query, this.historyTable, - JumpStartTableScanIntervalInDays);
}

public IAsyncEnumerable<Page<AzureTableOrchestrationStateEntity>> QueryOrchestrationStatesSegmented(
public async Task<List<AzureTableOrchestrationStateEntity>> QueryOrchestrationStatesSegmented(
OrchestrationStateQuery stateQuery, string? continuationToken, int count)
{
string query = CreateQueryInternal(stateQuery, false);
var pageable = this.historyTable.QueryAsync<AzureTableOrchestrationStateEntity>(query);
var pageable = this.historyTable.QueryAsync<TableEntity>(query);

//TODO: count
return pageable.AsPages(continuationToken);
var pages = pageable.AsPages(continuationToken);

var results = new List<AzureTableOrchestrationStateEntity>();

await foreach (Page<TableEntity> page in pages)
{
foreach (var entity in page.Values)
{
AzureTableCompositeTableEntity composite = (AzureTableCompositeTableEntity)Activator.CreateInstance(typeof(AzureTableOrchestrationStateEntity));
composite.ReadEntity(entity);
results.Add((AzureTableOrchestrationStateEntity)composite);
}
}

return results;
}

public async Task<IEnumerable<AzureTableOrchestrationStateEntity>> QueryJumpStartOrchestrationsAsync(OrchestrationStateQuery stateQuery)
Expand Down
Loading