Skip to content
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
22531fe
naive poison message handler
davidmrdavid Apr 12, 2024
748b279
incorporate feedback
davidmrdavid Apr 13, 2024
40a00dd
add suffix, change to terminated
davidmrdavid Apr 15, 2024
b1b7fba
more changes to get poison message handling working E2E. It's hackier…
davidmrdavid Apr 16, 2024
b1808a1
simplify implementation
davidmrdavid Apr 16, 2024
45d523b
remove commented out code
davidmrdavid Apr 16, 2024
82e3531
remove csproj changes
davidmrdavid Apr 16, 2024
adf4579
undo change in message manager deps
davidmrdavid Apr 16, 2024
d20bb7e
undo csproj changeS
davidmrdavid Apr 16, 2024
40baca0
add activity pmh as well
davidmrdavid Apr 16, 2024
f896364
make configurable
davidmrdavid Apr 16, 2024
cef1410
move poison message handler to superclass
davidmrdavid Apr 16, 2024
eeea159
remove unecessary imports
davidmrdavid Apr 16, 2024
961d64b
remove unecessary import
davidmrdavid Apr 16, 2024
5dfe896
simplify code a bit
davidmrdavid Apr 16, 2024
4a25c5b
remove unused variable
davidmrdavid Apr 16, 2024
8afbfc2
simplify and unify guidance
davidmrdavid Apr 16, 2024
9057bfd
improve guidance
davidmrdavid Apr 16, 2024
6866828
call out backend-specificness
davidmrdavid Apr 16, 2024
b0d739c
clean up PR
davidmrdavid Apr 16, 2024
71e0b36
clean up csproj
davidmrdavid Apr 16, 2024
5934076
indent csproj comment
davidmrdavid Apr 16, 2024
a94cc4e
remove unused import
davidmrdavid Apr 16, 2024
37dbac4
have valid table-naming scheme
davidmrdavid Apr 18, 2024
865aa20
add log
davidmrdavid Apr 18, 2024
57bb966
add comments
davidmrdavid Apr 18, 2024
6c3bb79
create valid serializable activity failure
davidmrdavid Apr 18, 2024
b15dbb5
handle de-serialization errors as well
davidmrdavid Jun 14, 2024
cbb8274
add version suffix
davidmrdavid Jun 25, 2024
2acadbe
resolve conflicts
davidmrdavid Jun 25, 2024
16f38f1
rev patch
davidmrdavid Jun 25, 2024
74dc0f7
add dtfx.core
davidmrdavid Jun 25, 2024
584cf8d
merge mixed deserializtion hotfix
davidmrdavid Jun 27, 2024
51978a0
add imports
davidmrdavid Jun 27, 2024
65c29c4
pass nullable analysis
davidmrdavid Jun 27, 2024
de7e46b
make hotfix always occur
davidmrdavid Jun 27, 2024
a8b24e5
move nullable analysis
davidmrdavid Jun 27, 2024
a746b1e
make hotfix conditional on setting
davidmrdavid Jun 27, 2024
d219ffa
match diffs
davidmrdavid Jun 28, 2024
b2e1f0c
make hotfix always run
davidmrdavid Jun 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100;

/// <summary>
/// Gets or sets the maximum dequeue count of any message before it is flagged as a "poison message".
/// The default value is 20.
/// </summary>
public int PoisonMessageDeuqueCountThreshold { get; set; } = 20;
/// <summary>
/// Gets or sets the maximum number of concurrent storage operations that can be executed in the context
/// of a single orchestration instance.
Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)
{
// first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage.
string serializedSchedulerState;
if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl))
if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri? blobUrl))
{
serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl);
// we know blobUrl is not null because TryGetLargeMessageReference returned true
serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl!);
Comment on lines +240 to +241
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is here because I added nullable analysis

}
else
{
Expand Down
10 changes: 6 additions & 4 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage
{
using System;
Expand Down Expand Up @@ -118,17 +118,18 @@ public async Task<string> SerializeMessageDataAsync(MessageData messageData)
/// <returns>Actual string representation of message.</returns>
public async Task<string> FetchLargeMessageIfNecessary(string message)
{
if (TryGetLargeMessageReference(message, out Uri blobUrl))
if (TryGetLargeMessageReference(message, out Uri? blobUrl))
{
return await this.DownloadAndDecompressAsBytesAsync(blobUrl);
// we know blobUrl is not null because TryGetLargeMessageReference returned true
return await this.DownloadAndDecompressAsBytesAsync(blobUrl!);
}
else
{
return message;
}
}

internal static bool TryGetLargeMessageReference(string messagePayload, out Uri blobUrl)
internal static bool TryGetLargeMessageReference(string messagePayload, out Uri? blobUrl)
{
if (Uri.IsWellFormedUriString(messagePayload, UriKind.Absolute))
{
Expand Down Expand Up @@ -314,6 +315,7 @@ public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId)
return storageOperationCount;
}
}
#nullable disable

#if NETSTANDARD2_0
class TypeNameSerializationBinder : ISerializationBinder
Expand Down
48 changes: 31 additions & 17 deletions src/DurableTask.AzureStorage/Messaging/ControlQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,27 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
MessageData messageData;
try
{
// try to de-serialize message
messageData = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);

// if successful, check if it's a poison message. If so, we handle it
// and log metadata about it as the de-serialization succeeded.
await this.HandleIfPoisonMessageAsync(messageData);
}
catch (Exception e)
catch (Exception exception)
{
// We have limited information about the details of the message
// since we failed to deserialize it.
this.settings.Logger.MessageFailure(
this.storageAccountName,
this.settings.TaskHubName,
Comment on lines -114 to -118
Copy link
Collaborator Author

@davidmrdavid davidmrdavid Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was moved to this.AbandonMessageAsync, to simplify this exception-handling block

queueMessage.Id /* MessageId */,
string.Empty /* InstanceId */,
string.Empty /* ExecutionId */,
this.storageQueue.Name,
string.Empty /* EventType */,
0 /* TaskEventId */,
e.ToString());

// Abandon the message so we can try it again later.
await this.AbandonMessageAsync(queueMessage);
// Deserialization errors can be persistent, so we check if this is a poison message.
bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
if (isPoisonMessage)
{
// we have already handled the poison message, so we move on.
return;
}

// This is not a poison message (at least not yet), so we abandon it to retry later.
await this.AbandonMessageAsync(queueMessage, exception);
return;
}

Expand Down Expand Up @@ -190,8 +190,22 @@ await batch.ParallelForEachAsync(async delegate (QueueMessage queueMessage)
}

// This overload is intended for cases where we aren't able to deserialize an instance of MessageData.
public Task AbandonMessageAsync(QueueMessage queueMessage)
public Task AbandonMessageAsync(QueueMessage queueMessage, Exception exception)
{

// We have limited information about the details of the message
// since we failed to deserialize it.
this.settings.Logger.MessageFailure(
this.storageAccountName,
this.settings.TaskHubName,
queueMessage.Id /* MessageId */,
string.Empty /* InstanceId */,
string.Empty /* ExecutionId */,
this.storageQueue.Name,
string.Empty /* EventType */,
0 /* TaskEventId */,
exception.ToString());

this.stats.PendingOrchestratorMessages.TryRemove(queueMessage.Id, out _);
return base.AbandonMessageAsync(
queueMessage,
Expand Down
93 changes: 93 additions & 0 deletions src/DurableTask.AzureStorage/Messaging/TaskHubQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage.Messaging
using DurableTask.AzureStorage.Storage;
using DurableTask.Core;
using DurableTask.Core.History;
using Microsoft.WindowsAzure.Storage.Table;

abstract class TaskHubQueue
{
Expand Down Expand Up @@ -57,6 +58,98 @@ public TaskHubQueue(
this.backoffHelper = new BackoffPollingHelper(minPollingDelay, maxPollingDelay);
}

public async Task HandleIfPoisonMessageAsync(MessageData messageData)
{
var queueMessage = messageData.OriginalQueueMessage;
var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;

if (queueMessage.DequeueCount > maxThreshold)
{
string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
await poisonMessagesTable.CreateIfNotExistsAsync();

// provide guidance, which is backend-specific
string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," +
$" which is greater than the threshold poison message threshold ({maxThreshold}). " +
$"The message has been moved to the '{poisonMessageTableName}' table for manual review. " +
$"This will fail the consuming orchestrator, activity, or entity";
messageData.TaskMessage.Event.PoisonGuidance = guidance;

// add to poison table
var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name)
{
Properties =
{
["RawMessage"] = new EntityProperty(queueMessage.Message),
["Reason"] = new EntityProperty(guidance)
}
};

await poisonMessagesTable.InsertAsync(poisonMessage);

// delete from queue so it doesn't get processed again.
await this.storageQueue.DeleteMessageAsync(queueMessage);

// since isPoison is `true`, we'll override the deserialized message
messageData.TaskMessage.Event.IsPoison = true;

this.settings.Logger.PoisonMessageDetected(
this.storageAccountName,
this.settings.TaskHubName,
messageData.TaskMessage.Event.EventType.ToString(),
messageData.TaskMessage.Event.EventId,
messageData.OriginalQueueMessage.Id,
messageData.TaskMessage.OrchestrationInstance.InstanceId,
messageData.TaskMessage.OrchestrationInstance.ExecutionId,
this.Name,
messageData.OriginalQueueMessage.DequeueCount);
}
}

public async Task<bool> TryHandlingDeserializationPoisonMessage(QueueMessage queueMessage, Exception deserializationException)
{

var maxThreshold = this.settings.PoisonMessageDeuqueCountThreshold;
bool isPoisonMessage = queueMessage.DequeueCount > maxThreshold;
if (isPoisonMessage)
{
isPoisonMessage = true;
string guidance = $"Queue message ID '{queueMessage.Id}' was dequeued {queueMessage.DequeueCount} times," +
$" which is greater than the threshold poison message threshold ({maxThreshold}). " +
$"A de-serialization error ocurred: \n {deserializationException}";
var poisonMessage = new DynamicTableEntity(queueMessage.Id, this.Name)
{
Properties =
{
["RawMessage"] = new EntityProperty(queueMessage.Message),
["Reason"] = new EntityProperty(guidance)
}
};

// add to poison table
string poisonMessageTableName = this.settings.TaskHubName.ToLowerInvariant() + "Poison";
Table poisonMessagesTable = this.azureStorageClient.GetTableReference(poisonMessageTableName);
await poisonMessagesTable.CreateIfNotExistsAsync();
await poisonMessagesTable.InsertAsync(poisonMessage);

// delete from queue so it doesn't get processed again.
await this.storageQueue.DeleteMessageAsync(queueMessage);

this.settings.Logger.PoisonMessageDetected(
this.storageAccountName,
this.settings.TaskHubName,
string.Empty,
0,
string.Empty,
string.Empty,
string.Empty,
this.Name,
queueMessage.DequeueCount);
}
return isPoisonMessage;
}

public string Name => this.storageQueue.Name;

public Uri Uri => this.storageQueue.Uri;
Expand Down
34 changes: 26 additions & 8 deletions src/DurableTask.AzureStorage/Messaging/WorkItemQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

#nullable enable
namespace DurableTask.AzureStorage.Messaging
{
using System;
Expand All @@ -30,26 +30,44 @@ public WorkItemQueue(

protected override TimeSpan MessageVisibilityTimeout => this.settings.WorkItemQueueVisibilityTimeout;

public async Task<MessageData> GetMessageAsync(CancellationToken cancellationToken)
public async Task<MessageData?> GetMessageAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
QueueMessage queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);
QueueMessage? queueMessage = await this.storageQueue.GetMessageAsync(this.settings.WorkItemQueueVisibilityTimeout, cancellationToken);

if (queueMessage == null)
{
await this.backoffHelper.WaitAsync(cancellationToken);
continue;
}

MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);
try
{
MessageData data = await this.messageManager.DeserializeQueueMessageAsync(
queueMessage,
this.storageQueue.Name);

// if successful, check if it's a poison message. If so, we handle it
// and log metadata about it as the de-serialization succeeded.
await this.HandleIfPoisonMessageAsync(data);
this.backoffHelper.Reset();
return data;
}
catch (Exception exception)
{
// Deserialization errors can be persistent, so we check if this is a poison message.
bool isPoisonMessage = await this.TryHandlingDeserializationPoisonMessage(queueMessage, exception);
if (isPoisonMessage)
{
// we have already handled the poison message, so we move on.
continue;
}
}


this.backoffHelper.Reset();
return data;
}
catch (Exception e)
{
Expand Down
11 changes: 11 additions & 0 deletions src/DurableTask.Core/History/HistoryEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,16 @@ protected HistoryEvent(int eventId)
/// Implementation for <see cref="IExtensibleDataObject.ExtensionData"/>.
/// </summary>
public ExtensionDataObject? ExtensionData { get; set; }

/// <summary>
/// Gets or sets whether this is a poison message.
/// </summary>
public bool IsPoison { get; set; } = false;

/// <summary>
/// Gets or sets user-facing details for why a message was labeled as poison.
/// This is to be set by each storage provider.
/// </summary>
public string PoisonGuidance { get; set; } = "";
}
}
16 changes: 16 additions & 0 deletions src/DurableTask.Core/TaskActivityDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace DurableTask.Core
using DurableTask.Core.History;
using DurableTask.Core.Logging;
using DurableTask.Core.Middleware;
using DurableTask.Core.Serializing;
using DurableTask.Core.Tracing;

/// <summary>
Expand Down Expand Up @@ -192,6 +193,21 @@ await this.dispatchPipeline.RunAsync(dispatchContext, async _ =>

try
{
if (scheduledEvent.IsPoison)
{
// if the activity is "poison", then we should not executed again. Instead, we'll manually fail the activity
// by throwing an exception on behalf of the user-code. In the exception, we provide the storage-provider's guidance
// on how to deal with the poison message.

// We need to account for all possible deserialization modes, so we construct an exception valid in all modes.
// TODO: revise - this is clunky
var exception = new Exception(scheduledEvent.PoisonGuidance);
var failureDetails = new FailureDetails(exception);
var details = Utils.SerializeCause(exception, JsonDataConverter.Default);
var taskFailure = new TaskFailureException(details, exception, details).WithFailureDetails(failureDetails);
throw taskFailure;
}

string? output = await taskActivity.RunAsync(context, scheduledEvent.Input);
responseEvent = new TaskCompletedEvent(-1, scheduledEvent.EventId, output);
}
Expand Down
16 changes: 16 additions & 0 deletions src/DurableTask.Core/TaskOrchestrationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,22 @@ void ProcessEvents(IEnumerable<HistoryEvent> events)

void ProcessEvent(HistoryEvent historyEvent)
{
if (historyEvent.IsPoison)
{
// If the message is labeled as "poison", then we should avoid processing it again.
// Therefore, we replace the event "in place" with an "ExecutionTerminatedEvent", so the
// orchestrator stops immediately.

var terminationEvent = new ExecutionTerminatedEvent(-1, historyEvent.PoisonGuidance);
historyEvent = terminationEvent;

// since replay is not guaranteed, we need to populate `this.result`
// with a completed task
var taskCompletionSource = new TaskCompletionSource<string>();
taskCompletionSource.SetResult("");
this.result = taskCompletionSource.Task;
}

bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
if (this.context.IsSuspended && !overrideSuspension)
{
Expand Down