Skip to content

Add poller autoscaler#1956

Merged
Quinn-With-Two-Ns merged 17 commits intotemporalio:masterfrom
Quinn-With-Two-Ns:SDK-2478
Jul 21, 2025
Merged

Add poller autoscaler#1956
Quinn-With-Two-Ns merged 17 commits intotemporalio:masterfrom
Quinn-With-Two-Ns:SDK-2478

Conversation

@Quinn-With-Two-Ns
Copy link
Copy Markdown
Contributor

@Quinn-With-Two-Ns Quinn-With-Two-Ns commented May 14, 2025

Add poller autoscaler based on temporalio/sdk-rust@93471ac

@Quinn-With-Two-Ns Quinn-With-Two-Ns marked this pull request as ready for review June 23, 2025 22:55
@Quinn-With-Two-Ns Quinn-With-Two-Ns requested a review from a team as a code owner June 23, 2025 22:55
Comment thread worker/worker.go
// NewPollerBehaviorSimpleMaximum creates a PollerBehavior that allows the worker to start up to a maximum number of pollers.
//
// NOTE: Experimental
func NewPollerBehaviorSimpleMaximum(
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Open question if these should take options instead

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't think so. Worst case scenario if we ever need to add anything it can be a new behavior.

Comment thread internal/worker.go
Comment thread internal/internal_task_pollers.go Outdated
taskPollers: []scalableTaskPoller{
newScalableTaskPoller(localActivityTaskPoller, params.Logger, NewPollerBehaviorSimpleMaximum(1)),
},
taskProcessor: localActivityTaskPoller,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're setting a "poller" as taskProcessor? That feels confusing

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The localActivityTaskPoller implements the poller interface and the task processing interface. I am not too inclined to split them , but do you have a better naming suggestion?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, not really 😅 it's probably less important for the Local Activity case

numStickyPollerMetric *numPollerMetric
}

// workflowTaskProcessor implements processing of a workflow task and can create
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the differentiation between Processor/Poller could be clearer. This comment says processor can create pollers, but newWorkflowTaskProcessor says it creates a new poller

Comment thread internal/internal_worker_test.go Outdated
Comment thread internal/internal_worker.go Outdated
pollerCount: params.MaxConcurrentWorkflowTaskQueuePollers,
taskProcessor := newWorkflowTaskProcessor(taskHandler, contextManager, service, params)

var taskWorkers []scalableTaskPoller
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be 3 terms that are all related but slightly different: task pollers, task workers, and task processor.

Looking at the code here, taskWorkers == taskPollers, and looking down in the file, taskProcessor: poller,. Do we need all 3 terms separately? Or can we combine taskWorkers and taskPollers, and differentiate them more clearly from taskProcessor?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let me get rid of taskWorkers, it was a term I was using, but don't think it fits well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we have two interfaces task pollers and task processor that do two different things (poll tasks and process those tasks). For some task types the same struct implements both interfaces.

Comment thread internal/internal_worker_base_test.go Outdated
Copy link
Copy Markdown
Member

@Sushisource Sushisource left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth reviewing James' fix here https://github.com/temporalio/sdk-core/pull/941/files#diff-a65538899b187dda6f4128ecad6a38b908f9491bb63a4828f488e0e1d97f548fR121

To ensure that's not a problem here with the balancer either.

Comment thread internal/internal_worker.go Outdated
Comment thread internal/internal_worker_base.go Outdated
Comment thread internal/internal_worker_base.go Outdated
Comment on lines +428 to +432
// TODO(quinn): is there a way we can refactor this to avoid the function?
if func() bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could set a var which is this fn's return value and continue if false, but it makes the defer stuff harder too... I think this is totally fine.

Comment thread internal/internal_worker_base.go Outdated
Comment thread worker/worker.go
Comment thread internal/worker.go
) PollerBehavior {
initialNumberOfPollers := options.InitialNumberOfPollers
if initialNumberOfPollers <= 0 {
initialNumberOfPollers = defaultAutoscalingInitialNumberOfPollers // Default initial number of pollers.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be helpful to log something here so users are aware their configuration is invalid?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more to catch a non-set options then an invalid configuration. Setting to below zero is invalid and we could error here, but we don't for the old poller setting so I kept the same logic here.


// balance checks if the poller type is balanced with other poller types. The goal is to ensure that
// at least one poller of each type is running before allowing any poller of the given type to increase.
func (pb *pollerBalancer) balance(ctx context.Context, pollerType string) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could potentially be susceptible to some of the stuff fixed in temporalio/sdk-rust#941 -- but I'm not sure. Adding a test is probably worth it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the core issue, This code actually won't apply in the SimpleMaximum case. We didn't change any behaviour in the SimpleMaximum case for Go

@Quinn-With-Two-Ns Quinn-With-Two-Ns merged commit ce09baa into temporalio:master Jul 21, 2025
22 of 24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants