Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update terminate logic to handle scheduled orchestrations #276

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## v1.4.1 (Unreleased)

### Updates

* Fix issue where scheduled orchestrations aren't immediately terminated ([#178](https://github.com/microsoft/durabletask-mssql/issues/178))

## v1.4.0

### New
Expand Down
1 change: 1 addition & 0 deletions src/DurableTask.SqlServer/DTUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ public static bool HasPayload(HistoryEvent e)
return historyEvent.EventType switch
{
EventType.ExecutionStarted => ((ExecutionStartedEvent)historyEvent).Version,
EventType.SubOrchestrationInstanceCreated => ((SubOrchestrationInstanceCreatedEvent)historyEvent).Version,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unrelated fix that I happened to notice independently. I'm just adding it to this PR as a convenience since it's a small change.

EventType.TaskScheduled => ((TaskScheduledEvent)historyEvent).Version,
_ => null,
};
Expand Down
85 changes: 55 additions & 30 deletions src/DurableTask.SqlServer/Scripts/logic.sql
Original file line number Diff line number Diff line change
Expand Up @@ -469,45 +469,70 @@ BEGIN

DECLARE @TaskHub varchar(50) = __SchemaNamePlaceholder__.CurrentTaskHub()

DECLARE @existingStatus varchar(30) = (
SELECT TOP 1 existing.[RuntimeStatus]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
)
DECLARE @existingStatus varchar(30)
DECLARE @existingLockExpiration datetime2(7)

-- Get the status of an existing orchestration
SELECT TOP 1
@existingStatus = existing.[RuntimeStatus],
@existingLockExpiration = existing.[LockExpiration]
FROM Instances existing WITH (HOLDLOCK)
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

IF @existingStatus IS NULL
BEGIN
ROLLBACK TRANSACTION;
THROW 50000, 'The instance does not exist.', 1;
END
-- If the instance is already completed, no need to terminate it.
IF @existingStatus IN ('Pending', 'Running')

DECLARE @now datetime2(7) = SYSUTCDATETIME()

IF @existingStatus IN ('Running', 'Pending')
BEGIN
IF NOT EXISTS (
SELECT TOP (1) 1 FROM NewEvents
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
)
-- Create a payload to store the reason, if any
DECLARE @PayloadID uniqueidentifier = NULL
IF @Reason IS NOT NULL
BEGIN
-- Payloads are stored separately from the events
DECLARE @PayloadID uniqueidentifier = NULL
IF @Reason IS NOT NULL
-- Note that we don't use the Reason column for the Reason with terminate events
SET @PayloadID = NEWID()
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment in line#468 states that the table order for this sproc should be Instances --> (NewEvents --> Payloads --> NewEvents) but this PR changes the table order to Instances --> (Payloads --> Instances --> NewEvents).
Does this present a concern for deadlocks, and if not, should we update the comment to reflect the new ordering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm...good observation. Let me take a closer look at this.

VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
END

-- Check the status of the orchestration to determine which termination path to take
IF @existingStatus = 'Pending' AND (@existingLockExpiration IS NULL OR @existingLockExpiration <= @now)
BEGIN
-- The orchestration hasn't started yet - transition it directly to the Terminated state and delete
-- any pending messages
UPDATE Instances SET
[RuntimeStatus] = 'Terminated',
[LastUpdatedTime] = @now,
[CompletedTime] = @now,
[OutputPayloadID] = @PayloadID,
[LockExpiration] = NULL -- release the lock, if any
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID

DELETE FROM NewEvents WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID
andystaples marked this conversation as resolved.
Show resolved Hide resolved
END
ELSE
BEGIN
-- The orchestration has actually started running in this case
IF NOT EXISTS (
SELECT TOP (1) 1 FROM NewEvents
WHERE [TaskHub] = @TaskHub AND [InstanceID] = @InstanceID AND [EventType] = 'ExecutionTerminated'
)
BEGIN
-- Note that we don't use the Reason column for the Reason with terminate events
SET @PayloadID = NEWID()
INSERT INTO Payloads ([TaskHub], [InstanceID], [PayloadID], [Text])
VALUES (@TaskHub, @InstanceID, @PayloadID, @Reason)
INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[EventType],
[PayloadID]
) VALUES (
@TaskHub,
@InstanceID,
'ExecutionTerminated',
@PayloadID)
END

INSERT INTO NewEvents (
[TaskHub],
[InstanceID],
[EventType],
[PayloadID]
) VALUES (
@TaskHub,
@InstanceID,
'ExecutionTerminated',
@PayloadID)
END
END

Expand Down Expand Up @@ -1444,7 +1469,7 @@ BEGIN
-- Instance IDs can be overwritten only if the orchestration is in a terminal state
IF @existingStatus NOT IN ('Failed')
BEGIN
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewing instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
DECLARE @msg nvarchar(4000) = FORMATMESSAGE('Cannot rewind instance with ID ''%s'' because it is not in a ''Failed'' state, but in ''%s'' state.', @InstanceID, @existingStatus);
THROW 50001, @msg, 1;
END

Expand Down
8 changes: 7 additions & 1 deletion src/DurableTask.SqlServer/SqlOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,14 @@ public override async Task CompleteTaskOrchestrationWorkItemAsync(
IList<TaskMessage> orchestratorMessages,
IList<TaskMessage> timerMessages,
TaskMessage continuedAsNewMessage,
OrchestrationState orchestrationState)
OrchestrationState? orchestrationState)
{
if (orchestrationState is null || !newRuntimeState.IsValid)
{
// The work item was invalid. We can't do anything with it so we ignore it.
return;
}

ExtendedOrchestrationWorkItem currentWorkItem = (ExtendedOrchestrationWorkItem)workItem;

this.traceHelper.CheckpointStarting(orchestrationState);
Expand Down
2 changes: 1 addition & 1 deletion src/common.props
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>4</MinorVersion>
<PatchVersion>0</PatchVersion>
<PatchVersion>1</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix></VersionSuffix>
<AssemblyVersion>$(MajorVersion).$(MinorVersion).0.0</AssemblyVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ async Task ValidateDatabaseSchemaAsync(TestDatabase database, string schemaName
schemaName);
Assert.Equal(1, currentSchemaVersion.Major);
Assert.Equal(4, currentSchemaVersion.Minor);
Assert.Equal(0, currentSchemaVersion.Patch);
Assert.Equal(1, currentSchemaVersion.Patch);
}

sealed class TestDatabase : IDisposable
Expand Down
24 changes: 24 additions & 0 deletions test/DurableTask.SqlServer.Tests/Integration/Orchestrations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -862,5 +862,29 @@ await Assert.ThrowsAnyAsync<OperationCanceledException>(
// Now the orchestration should complete immediately
await instance.WaitForCompletion(timeout: TimeSpan.FromSeconds(3), expectedOutput: EventCount);
}

[Fact]
public async Task TerminateScheduledOrchestration()
{
string orchestrationName = "ScheduledOrchestration";

// Does nothing except return the original input
TestInstance<object> instance = await this.testService.RunOrchestration(
input: (object)null,
orchestrationName,
version: null,
instanceId: null,
scheduledStartTime: DateTime.UtcNow.AddSeconds(30),
implementation: (ctx, input) => Task.FromResult("done"));

// Terminate the orchestration before it starts
await instance.TerminateAsync("Bye!");

await instance.WaitForCompletion(
expectedStatus: OrchestrationStatus.Terminated,
expectedOutput: "Bye!");

LogAssert.NoWarningsOrErrors(this.testService.LogProvider);
}
}
}
69 changes: 64 additions & 5 deletions test/DurableTask.SqlServer.Tests/Utils/TestService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,32 @@ public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
activities);
}

public Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
TInput input,
string orchestrationName,
string version,
string instanceId,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
{
return this.RunOrchestration(
input,
orchestrationName,
version,
instanceId,
scheduledStartTime: null,
implementation,
onEvent,
activities);
}

public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
TInput input,
string orchestrationName,
string version,
string instanceId,
DateTime? scheduledStartTime,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
Expand All @@ -147,19 +168,43 @@ public async Task<TestInstance<TInput>> RunOrchestration<TOutput, TInput>(
inputGenerator: i => input,
orchestrationName: orchestrationName,
version: version,
scheduledStartTime: scheduledStartTime,
implementation,
onEvent,
activities);

return instances[0];
}

public Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
int count,
Func<int, string> instanceIdGenerator,
Func<int, TInput> inputGenerator,
string orchestrationName,
string version,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
{
return this.RunOrchestrations(
count,
instanceIdGenerator,
inputGenerator,
orchestrationName,
version,
scheduledStartTime: null,
implementation,
onEvent,
activities);
}

public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput, TInput>(
int count,
Func<int, string> instanceIdGenerator,
Func<int, TInput> inputGenerator,
string orchestrationName,
string version,
DateTime? scheduledStartTime,
Func<OrchestrationContext, TInput, Task<TOutput>> implementation,
Action<OrchestrationContext, string, string> onEvent = null,
params (string name, TaskActivity activity)[] activities)
Expand All @@ -178,11 +223,25 @@ public async Task<IReadOnlyList<TestInstance<TInput>>> RunOrchestrations<TOutput
TInput input = inputGenerator != null ? inputGenerator(i) : default;

DateTime utcNow = DateTime.UtcNow;
OrchestrationInstance instance = await this.client.CreateOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input);

OrchestrationInstance instance;
if (scheduledStartTime.HasValue)
{
instance = await this.client.CreateScheduledOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input,
startAt: scheduledStartTime.Value);
}
else
{
instance = await this.client.CreateOrchestrationInstanceAsync(
orchestrationName,
version,
instanceId,
input);
}

return new TestInstance<TInput>(
this.client,
Expand Down
Loading