-
Notifications
You must be signed in to change notification settings - Fork 937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Scheduled Actions V2] Invoker logic #7152
base: main
Are you sure you want to change the base?
Conversation
FrontendClient workflowservice.WorkflowServiceClient | ||
} | ||
|
||
executorTaskExecutor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about how everyone else feels, but this name makes my eyes bleed. At the same time, I didn't want to be inconsistent with other HSM naming. Maybe the path of least resistance is to just use a few synonyms for this? Like, starterTaskExecutor
(and rename the state machine to Starter
.. or something like that). Really open to suggestions. Ditto for renaming the tasks. In general I'm not satisfied with the naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just a starter though, right? Will this also e.g. terminate workflows? You can call it an invoker maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, it also terminates/cancels as needed. Invoker
works for me!
} | ||
|
||
// TODO - set last completion result/continued failure | ||
// TODO - set search attributes |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there's anything blocking me from passing the search attributes through to StartWorkflowExecution
, I just hadn't closed off the TODO. Will follow up.
You can keep a rate limiter in the executor. If the rate limiter is out of tokens, you'll have to back off either by failing the task, or committing any pending progress and adding a persistent timer task. It's dependent on how long you need to wait for the next token. Do not wait on the rate limiter in the shared executor. |
components/scheduler/executor.go
Outdated
// We want Executor to immediately wake and attempt to buffer when new starts | ||
// are added. | ||
e.NextInvocationTime = nil | ||
e.NextInvocationTime = timestamppb.New(event.Deadline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this always be the time of the first buffered start (chronologically)? I'm a bit rusty on the details so please correct me if I'm wrong. I'm just worried that this will invalidate a pending task if the event's buffered starts are further in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this always be the time of the first buffered start (chronologically)?
Yes, it should; that's what Deadline
gets set to. I suppose we could get rid of Deadline
and just compute it within the transition, though, I'll make that change.
I'm just worried that this will invalidate a pending task if the event's buffered starts are further in the future.
Is only one pending timer task allowed? If so, yes, that could happen presently.
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. | ||
// | ||
// Copyright (c) 2020 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. | |
// | |
// Copyright (c) 2020 Uber Technologies, Inc. | |
// Copyright (c) 2025 Temporal Technologies Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll follow up in a separate PR with this since it's from make generate
.
FrontendClient workflowservice.WorkflowServiceClient | ||
} | ||
|
||
executorTaskExecutor struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not just a starter though, right? Will this also e.g. terminate workflows? You can call it an invoker maybe?
|
||
rateLimitedDetails struct { | ||
// The requested interval to delay processing by rescheduilng. | ||
Delay time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason to export this field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to be exported in the workflow impl because it was serialized as an local activity result. I don't know what's required here.
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
type ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop grouping types, it's not a good practice. It generally messes with the generated docs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop recommending against a perfectly good style that's already pervasive in the codebase without a team agreement that we want to deprecate it.
isRunning := len(scheduler.Info.RunningWorkflows) > 0 | ||
|
||
// Resolve overlap policies and prepare next workflows to start. | ||
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually drain the entire buffer? I only see one action being returned here. (I don't have deep familiarity with the code so will just defer to you to confirm).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this actually drain the entire buffer? I only see one action being returned here.
It depends on the overlap policy; if something like OVERLAP_ALL
is specified, it'll drain the buffer. It drains as much of the buffer as possible; I guess I could rename the calling function from drainBuffer
to processBuffer
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree with that rename.. "drainBuffer" sounds like it'll be empty when it returns
} | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) | ||
defer cancelFunc() | ||
_, err := e.HistoryClient.TerminateWorkflowExecution(ctx, request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you start the workflow with the frontend client and terminate it with the history client. Why not be consistent here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is what the existing workflow scheduler does; I believe that StartWorkflowExecution
goes through frontend to make sure metering is applied, which wouldn't matter for terminateWorkflow
/cancelWorkflow
. Though, I'm not sure the reason to have the other two not go through frontend - @dnr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it goes through frontend for metering, and also namespace rps limit for safety.
Terminate/cancel could also, but I had already written them to go to history and it didn't seem worth changing at the time. I don't know if terminate/cancel are metered.. if they are, and if there's not a special case to exclude calls from schedules, that would be a visible metering change, so please check with someone who knows that stuff if you want to do that.
} | ||
|
||
return Executor{ | ||
ExecutorInternal: prevExecutor.ExecutorInternal, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, you're not cloning here, when you change the code to release the lock, you'll want to clone.
I probably wouldn't return the whole structs in these load
functions, instead take only what is relevant for the executor functionality.
errTypeRetryLimitExceeded = "RetryLimitExceeded" | ||
errTypeRateLimited = "RateLimited" | ||
errTypeAlreadyStarted = "serviceerror.WorkflowExecutionAlreadyStarted" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we'd use the standard errors package instead of performing string comparisons.
|
||
// translateError converts a dependent service error into an application error. | ||
// Errors are classified between retryable and non-retryable. | ||
func translateError(err error, msgPrefix string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand why you need this translation here. Why can't you use the original error? What's the value in wrapping with ApplicationError? That's an SDK concept and isn't relevant here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was to classify errors as retryable/non-retryable in a single point. Existing scheduler uses application errors to signal that from the local activities. If it's not a common pattern, I'll get rid of it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 temporal.ApplicationError is an sdk thing, don't use it (or any error wrapping) here
"google.golang.org/protobuf/types/known/timestamppb" | ||
) | ||
|
||
type ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please stop recommending against a perfectly good style that's already pervasive in the codebase without a team agreement that we want to deprecate it.
|
||
rateLimitedDetails struct { | ||
// The requested interval to delay processing by rescheduilng. | ||
Delay time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It needed to be exported in the workflow impl because it was serialized as an local activity result. I don't know what's required here.
isRunning := len(scheduler.Info.RunningWorkflows) > 0 | ||
|
||
// Resolve overlap policies and prepare next workflows to start. | ||
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd agree with that rename.. "drainBuffer" sounds like it'll be empty when it returns
} | ||
|
||
if env.Now().After(e.startWorkflowDeadline(scheduler, start)) { | ||
// Drop expired starts. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add a counter in ScheduleInfo for this case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Going to add a TODO and add a follow-up PR so I don't have to block this PR on waiting for API approvals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this should fit into ScheduleInfo.MissedCatchupWindow
. I'll use that.
isRunning := len(scheduler.Info.RunningWorkflows) > 0 | ||
|
||
// Resolve overlap policies and prepare next workflows to start. | ||
action := scheduler1.ProcessBuffer(executor.BufferedStarts, isRunning, scheduler.resolveOverlapPolicy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good for now, when "scheduler1" is deprecated and removed, it might be worth taking a look to see if scheduler1.ProcessBuffer
can be refactored to be a more natural fit here
FirstExecutionRunId: target.RunId, | ||
}, | ||
} | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should set callerinfo on this context (and all other client contexts) to get proper prioritization and stuff. Search for headers.SetCallerInfo
for examples. The workflow impl gets this from pernsworkermanager
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, will fix.
} | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) | ||
defer cancelFunc() | ||
_, err := e.HistoryClient.TerminateWorkflowExecution(ctx, request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it goes through frontend for metering, and also namespace rps limit for safety.
Terminate/cancel could also, but I had already written them to go to history and it didn't seem worth changing at the time. I don't know if terminate/cancel are metered.. if they are, and if there's not a special case to exclude calls from schedules, that would be a visible metering change, so please check with someone who knows that stuff if you want to do that.
|
||
// translateError converts a dependent service error into an application error. | ||
// Errors are classified between retryable and non-retryable. | ||
func translateError(err error, msgPrefix string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 temporal.ApplicationError is an sdk thing, don't use it (or any error wrapping) here
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
// THE SOFTWARE. | ||
|
||
package scheduler_test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great.
Have you looked at the old workflow_test.go that much? Most of the detailed behaviors of the old impl are actually tested there; the functional tests are much more limited. Obviously it would be a bunch of work to port over, and a bunch is not applicable, but I think it would be pretty valuable to preserve most of those cases.
I'm thinking ones like TestUpdateBetweenNominalAndJitter
, TestUpdateNotRetroactive
, TestBackfillInclusiveStartEnd
, and maybe even TestHugeBackfillBuffer
though that would probably need more modification.
At least, maybe use it as inspiration for future tests :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea! I'll port over the tests you called out and give workflow_test.go another pass to see what else might make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave the existing tests in workflow_test
a more thorough look. At a high level, I think that most of these cases actually are covered in the new tests, just broken up closer to each component. Some of the Update specs that deal with behavior we'll implement synchronously (like clearing the buffer when a schedule's action changes), so they aren't represented yet. Others, such as TestLotsOfIterations
, the CAN tests, and the Signal tests, are obsolete given the state machine architecture. The ExitScheduleWorkflow
tests I'd consider analogously covered by the "empty buffer"/"no work to do" tests.
- I've brought a few more test cases over relevant to the Invoker
TestUpdateBetweenNominalAndJitter
: this tests this branch in existing scheduler, which maps to this branch in SpecProcessor. I've added the test case to SpecProcessor.TestUpdateNotRetroactive
: I think this test case will come later, with the update handler, since we'll be processing updates synchronously where possible.TestBackfillInclusiveStartEnd
: I'll make sure this is in the spec for the Backfiller.TestHugeBackfillBuffer
: ditto
// Buffer should be empty, scheduler metadata should be updated. | ||
schedulerSm, err := hsm.MachineData[scheduler.Scheduler](e.schedulerNode) | ||
require.NoError(e.T(), err) | ||
require.Equal(e.T(), 0, len(executor.BufferedStarts)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
require.Equal(e.T(), 0, len(executor.BufferedStarts)) | |
require.Empty(e.T(), executor.BufferedStarts) |
reads nicer
components/scheduler/config.go
Outdated
CanceledTerminatedCountAsFailures bool // Whether cancelled+terminated count for pause-on-failure | ||
RecentActionCount int // How many recent actions are recorded in SchedulerInfo. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This comment is unclear to me, could you please clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reworded to, "Number of recent actions taken (workflow execution results) recorded in the ScheduleInfo metadata."
components/scheduler/config.go
Outdated
`ExecutionTimeout is the timeout for executing a single scheduler task.`, | ||
ServiceCallTimeout = dynamicconfig.NewGlobalDurationSetting( | ||
"component.scheduler.serviceCallTimeout", | ||
5*time.Second, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may cause problems at 10 seconds since the call takes up an executor in the history service. We may be able to put this on the outbound queue if it becomes an issue. In any case the fact that it's configurable is great. May want to ask @yycptt and @prathyushpv if they see an issue with this.
components/scheduler/helper_test.go
Outdated
@@ -64,16 +67,22 @@ func (root) IsWorkflowExecutionRunning() bool { | |||
return true | |||
} | |||
|
|||
func (s fakeEnv) Access( | |||
func newFakeEnv() *fakeEnv { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say we move this to the hsmtest
(or maybe it already exists there) but it's really not critical since we will rewrite all of this soon.
components/scheduler/helper_test.go
Outdated
// Add Executor sub state machine node | ||
executor := scheduler.NewExecutor() | ||
_, err = schedulerNode.AddChild(scheduler.ExecutorMachineKey, *executor) | ||
executor := scheduler.NewInvoker() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the comment and the variable name here and anywhere else.
// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. | ||
// | ||
// Copyright (c) 2020 Uber Technologies, Inc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 2025 and remove Uber.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result.OverlapSkipped = action.OverlapSkipped | ||
|
||
// Add starting workflows to result, trim others. | ||
for _, start := range readyStarts { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that you'll likely want to check if there's anything to discard just before completing the task as more actions may get invalidated by then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that more actions could become invalid (even if just by virtue of them falling past the startWorkflowDeadline
, after this loop completes), but I don't see why that's worth optimizing against, considering even if we checked discards right before we write the task's completion transition, we'd still be racing against the startWorkflowDeadline
wall clock. Maybe I'm not following the condition you're calling out?
continue | ||
} | ||
|
||
if env.Now().After(e.startWorkflowDeadline(scheduler, start)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that once a request is put in the buffer you should execute it to completion it since you're risking creating orphans with task retries.
Not 100% sure what the implication of these orphaned workflows would be. At minimum it may affect counts that we maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that once a request is put in the buffer you should execute it to completion it since you're risking creating orphans with task retries.
Yep, that is how it's set up; we invalidate within the ProcessBufferTask
, but once an action is in scope for ExecuteTask
, we'll drive it to completion. processBuffer
is the last chance we have to back out of a stale workflow start.
return nil, errRetryLimitExceeded | ||
} | ||
|
||
// Get rate limiter permission once per buffered start, on the first attempt only. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rate that we're trying to limit is "schedules kicked off" moreso than "RPS we're calling StartWorkflowExecution
at", it's intended to reflect the customer-facing "max actions taken per second" limit on a schedule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is an interesting topic. We will need to discuss this for existing APS and also the new OPS limiter. For OPS, we would like to limit cancel & terminate as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another thing I'd like to raise is if we still want to rate limit the scheduler itself, versus relying on backpressure from APS limits on StartWorkflowExecution
/cancel, terminate. This would be a good knob to simplify for our customers, not to mention operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I thought the consensus was to let the operations themselves get limited (either at frontend or history or both) and drop any additional limiting. (Also the transactions on the scheduler entity itself count against APS and OPS, right?)
|
||
func (e invokerTaskExecutor) newContext(namespace string) (context.Context, context.CancelFunc) { | ||
ctx, cancelFunc := context.WithTimeout(context.Background(), e.Config.ServiceCallTimeout()) | ||
ctx = headers.SetCallerInfo(ctx, headers.NewBackgroundCallerInfo(namespace)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should already be set on the context passed to our task executors, worth checking. And see comment above about propagating that context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked into this. The outboundQueueActiveTaskExecutor
provides the context to immediate tasks. Executable
generates the context that goes to task executor queues, which does indeed call SetCallerInfo
with the proper namespace. So, yes, it does! 👍 will update accordingly.
|
||
// Don't count "already started" for the error metric or retry, as it is most likely | ||
// due to misconfiguration. | ||
if !isAlreadyStartedError(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be counted as a completed start as it could be as a result of a retry?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wouldn't be as as result of a retry from the scheduler, though (or we wouldn't get already started
back). This is supposed to account more for the case that something external to the scheduler kicked off our target action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because each buffered start has a unique RequestID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, starts have a unique/deterministic request ID.
[]enumsspb.SchedulerInvokerState{ | ||
enumsspb.SCHEDULER_INVOKER_STATE_PROCESSING, | ||
}, | ||
enumsspb.SCHEDULER_INVOKER_STATE_PROCESSING, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad we don't have to do this in CHASM world :)
tasks = append(tasks, ExecuteTask{}) | ||
} | ||
|
||
return tasks, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this cause both tasks to be generated in one transaction? or upper layer actually guarantees that won't happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HSM will append all MachineTransition
TransitionOutput
s to the node's opLog
. When the MS transaction closes, MutableStateImpl.closeTransactionPrepareTasks
calls taskGenerator.GenerateDirtySubStateMachineTasks
which then compacts the root opLog
into a list of tasks, and applies them on the current versioned transition as usual (w/ AddTasks
or TrackStateMachineTimer
). I believe that means they'd all land in one transaction.
i.InvokerInternal.BufferedStarts = util.FilterSlice(i.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool { | ||
return !completed[start.RequestId] && !failed[start.RequestId] | ||
}) | ||
i.InvokerInternal.CancelWorkflows = util.FilterSlice(i.GetCancelWorkflows(), func(we *commonpb.WorkflowExecution) bool { | ||
return !canceled[we.RunId] | ||
}) | ||
i.InvokerInternal.TerminateWorkflows = util.FilterSlice(i.GetTerminateWorkflows(), func(we *commonpb.WorkflowExecution) bool { | ||
return !terminated[we.RunId] | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: slices.DeleteFunc()
// BufferedStarts. The BufferedStarts field is appended to the Invoker's queue. | ||
// For retrying execution, use EventRetry. | ||
type EventEnqueue struct { | ||
Node *hsm.Node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of those Node
fields in Events definition? I don't see them being used anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I had it in there early in development before I grasped how transitions were applied; I can't find it at all. Got rid of all the Node
refs in Event (and also got rid of an unused event in Generator).
SchedulerInternal: prevScheduler.SchedulerInternal, | ||
SchedulerInternal: common.CloneProto(prevScheduler.SchedulerInternal), | ||
cacheConflictToken: prevScheduler.cacheConflictToken, | ||
compiledSpec: prevScheduler.compiledSpec, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a deep copy for this field? or it's read only?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's read-only, and the spec itself shouldn't self-mutate, so we should be okay without a clone.
|
||
// Don't count "already started" for the error metric or retry, as it is most likely | ||
// due to misconfiguration. | ||
if !isAlreadyStartedError(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it because each buffered start has a unique RequestID?
result.RetryableStarts = append(result.RetryableStarts, start) | ||
} else { | ||
// Drop the start from the buffer. | ||
result.FailedStarts = append(result.FailedStarts, start) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need a metric for those dropped starts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can add one; we do have ScheduleActionErrors
but it'd be nice to see when retries are being exhausted; have we found the need for a separate metric for this with the existing scheduler or more just a nice to have?
ScheduleTime: start.ActualTime, | ||
ActualTime: timestamppb.New(env.Now()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤦 Two different "ActualTime"...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah; start.ActualTime
is the scheduled time after jitter, whereas ActualTime
is meant to be the time when it was actually kicked off. I suppose we could hide the detail by using start.ActualTime
for both, but that might be a little confusing if a start was scheduled but failed to start for a period before success.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, the confusing names are my fault 😞
They're two separate contexts:
When talking about scheduled times, we have "Nominal" vs "Actual", "nominal" is what matches the schedule spec, "actual" is that plus jitter. So "actual" is the time when we want the thing to run.
When talking about what actually happened, the "actual" from above is when we want it to run, and then there's when it actually did run. So "actual" turns into ScheduleTime and then the "real" time turns into ActualTime. Of course I should have picked a third word. Or maybe we should call it ActualActualTime or ReallyRealTime 😆
scheduler, err := loadScheduler(schedulerNode) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
invoker, err := e.loadInvoker(node) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like this timer logic is executed with the Scheduler locked, so we don't really need the deep copy here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, true. I'll update loadInvoker
to take a shouldClone
parameter since we also use it in the immediate task.
// processed time. | ||
func (i Invoker) getEligibleBufferedStarts() []*schedulespb.BufferedStart { | ||
return util.FilterSlice(i.GetBufferedStarts(), func(start *schedulespb.BufferedStart) bool { | ||
return start.Attempt > 0 && start.BackoffTime.AsTime().Before(i.GetLastProcessedTime().AsTime()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please help add some comments about the purpose of the LastProcessedTime field :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recalled the reason - it's subtle, I'll update the comments. It's because of how the Invoker drives its backoffs. Invoker generates a timer task to wake on the earliest possible backoff, with a ProcessBufferTask
. When the ProcessBufferTask
runs, to handle pending backoffs, it simply moves the LastProcessedTime
high water mark forward. With the MachineTransition that updates LastProcessedTime
, tasks are generated, and since LastProcessedTime
> BackoffTime
for some start, an immediate ExecuteTask
is generated.
The reason that we account for it separately in LastProcessedTime
is to support replication and being able to re-generate tasks based on incoming state. This way, when we replicate the machine at a specific point in time, it'll generate an identical set of tasks regardless of when tasks were regenerated (as opposed to if we used say, env.Now()
).
What changed?
The Invoker (start workflow) logic is added for HSM scheduler.
A few differences between workflow scheduler and HSM scheduler execution logic:
ProcessBuffer
task is 1:1 with a call toProcessBuffer
; draining is not attempted more than once on the same task.ProcessBuffer
task is scheduled to wake after the delay.ProcessBuffer
is used to drive retries.BufferedStart
has its ownAttempt
andBackoffTime
to manage retries. TheExecute
task is therefore idempotent.startWorkflow
,terminateWorkflow
, andcancelWorkflow
are no longer called within local activities. Instead, they are called with a short (5 sec) deadline within theExecute
task.Execute
task is generated whenever there are BufferedStarts, CancelWorkflows, or TerminateWorkflows available for execution. AnExecute
task will only ever make a single attempt on each work item before rescheduling aProcessBuffer
task.Execute
task completion will always schedule aProcessBuffer
task after completing work (unless the Execute task was a no-op). This is to avoid a race condition between reading the Invoker's current state, and choosing a transition (because applying a transition necessitates mutating the state field, which I'd rather just leave as-is).Still missing in HSM Invoker:
LastCompletionResult
also isn't wired upHow did you test it?
Potential risks