diff --git a/src/Client/Core/DurableTaskClient.cs b/src/Client/Core/DurableTaskClient.cs index 439670cf..f0963c41 100644 --- a/src/Client/Core/DurableTaskClient.cs +++ b/src/Client/Core/DurableTaskClient.cs @@ -438,7 +438,13 @@ public virtual Task RestartAsync( string instanceId, bool restartWithNewInstanceId = false, CancellationToken cancellation = default) - => throw new NotSupportedException($"{this.GetType()} does not support orchestration restart."); + => throw new NotSupportedException($"{this.GetType()} does not support orchestration restart."); + + public virtual Task RewindInstanceAsync( + string instanceId, + string reason, + CancellationToken cancellation = default) + => throw new NotSupportedException($"{this.GetType()} does not support orchestration rewind."); // TODO: Create task hub diff --git a/src/Client/Core/OrchestrationRuntimeStatus.cs b/src/Client/Core/OrchestrationRuntimeStatus.cs index 0f36e85b..0125f64e 100644 --- a/src/Client/Core/OrchestrationRuntimeStatus.cs +++ b/src/Client/Core/OrchestrationRuntimeStatus.cs @@ -48,5 +48,10 @@ public enum OrchestrationRuntimeStatus /// /// The orchestration has been suspended. /// - Suspended, + Suspended, + + /// + /// The orchestration is rewinding. + /// + Rewinding, } diff --git a/src/Client/Grpc/GrpcDurableTaskClient.cs b/src/Client/Grpc/GrpcDurableTaskClient.cs index c57b22f9..cb1e96b5 100644 --- a/src/Client/Grpc/GrpcDurableTaskClient.cs +++ b/src/Client/Grpc/GrpcDurableTaskClient.cs @@ -431,6 +431,41 @@ public override async Task RestartAsync( } } + public override async Task RewindInstanceAsync( + string instanceId, + string reason, + CancellationToken cancellation = default) + { + Check.NotNullOrEmpty(instanceId); + Check.NotEntity(this.options.EnableEntitySupport, instanceId); + + var request = new P.RewindInstanceRequest + { + InstanceId = instanceId, + Reason = reason, + }; + try + { + await this.sidecarClient.RewindInstanceAsync(request, cancellationToken: cancellation); + } + catch (RpcException e) when (e.StatusCode == StatusCode.NotFound) + { + throw new ArgumentException($"An orchestration with the instanceId {instanceId} was not found.", e); + } + catch (RpcException e) when (e.StatusCode == StatusCode.FailedPrecondition) + { + throw new InvalidOperationException( + $"The orchestration with the instanceId {instanceId} cannot be rewound. " + + $"Only orchestrations in the \"Failed\" state can be rewound.", + e); + } + catch (RpcException e) when (e.StatusCode == StatusCode.Cancelled) + { + throw new OperationCanceledException( + $"The {nameof(this.RewindInstanceAsync)} operation was canceled.", e, cancellation); + } + } + static AsyncDisposable GetCallInvoker(GrpcDurableTaskClientOptions options, out CallInvoker callInvoker) { if (options.Channel is GrpcChannel c) diff --git a/src/Client/Grpc/ProtoUtils.cs b/src/Client/Grpc/ProtoUtils.cs index f5bc750d..46ce5a31 100644 --- a/src/Client/Grpc/ProtoUtils.cs +++ b/src/Client/Grpc/ProtoUtils.cs @@ -26,7 +26,8 @@ internal static P.OrchestrationStatus ToGrpcStatus(this OrchestrationRuntimeStat OrchestrationRuntimeStatus.Pending => P.OrchestrationStatus.Pending, OrchestrationRuntimeStatus.Running => P.OrchestrationStatus.Running, OrchestrationRuntimeStatus.Terminated => P.OrchestrationStatus.Terminated, - OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended, + OrchestrationRuntimeStatus.Suspended => P.OrchestrationStatus.Suspended, + OrchestrationRuntimeStatus.Rewinding => P.OrchestrationStatus.Rewinding, _ => throw new ArgumentOutOfRangeException(nameof(status), "Unexpected value"), }; #pragma warning restore 0618 // Referencing Obsolete member. diff --git a/src/Shared/Grpc/ProtoUtils.cs b/src/Shared/Grpc/ProtoUtils.cs index 13b64fe8..ac31aea4 100644 --- a/src/Shared/Grpc/ProtoUtils.cs +++ b/src/Shared/Grpc/ProtoUtils.cs @@ -219,6 +219,9 @@ internal static HistoryEvent ConvertHistoryEvent(P.HistoryEvent proto, EntityCon Tags = proto.HistoryState.OrchestrationState.Tags, }); break; + case P.HistoryEvent.EventTypeOneofCase.ExecutionRewound: + historyEvent = new ExecutionRewoundEvent(proto.EventId); + break; default: throw new NotSupportedException($"Deserialization of {proto.EventTypeCase} is not supported."); }