Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 151 additions & 1 deletion core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
Worker, advance_fut,
PollWorkflowOptions, Worker, advance_fut,
internal_flags::CoreInternalFlags,
job_assert,
replay::TestHistoryBuilder,
Expand Down Expand Up @@ -3150,3 +3150,153 @@ async fn pass_timer_summary_to_metadata() {
.unwrap();
worker.run_until_done().await.unwrap();
}

#[tokio::test]
async fn both_normal_and_sticky_pollers_poll_concurrently() {
struct Counters {
// How many time PollWorkflowTaskQueue has been called
normal_poll_count: AtomicUsize,
sticky_poll_count: AtomicUsize,

// How many pollers are currently active (i.e. PollWorkflowTaskQueue
// has been called, but not the corresponding CompleteWorkflowTask)
normal_slots_active_count: AtomicUsize,
sticky_slots_active_count: AtomicUsize,

// Max number of pollers that were active at the same time
max_total_slots_active_count: AtomicUsize,
max_normal_slots_active_count: AtomicUsize,
max_sticky_slots_active_count: AtomicUsize,
}

let counters = Arc::new(Counters {
normal_poll_count: AtomicUsize::new(0),
sticky_poll_count: AtomicUsize::new(0),
normal_slots_active_count: AtomicUsize::new(0),
sticky_slots_active_count: AtomicUsize::new(0),
max_total_slots_active_count: AtomicUsize::new(0),
max_normal_slots_active_count: AtomicUsize::new(0),
max_sticky_slots_active_count: AtomicUsize::new(0),
});

// Create actual workflow task responses to return from polls
let mut task_responses = (1..100).map(|i| {
hist_to_poll_resp(
&canned_histories::single_timer(&format!("timer-{i}")),
format!("wf-{i}"),
1.into(),
)
.resp
});

let mut mock_client = mock_workflow_client();

// Track normal vs sticky poll requests and return actual workflow tasks
let cc = Arc::clone(&counters);
mock_client
.expect_poll_workflow_task()
.returning(move |_, opts: PollWorkflowOptions| {
let mut task_response = task_responses.next().unwrap_or_default();

// FIXME: Atomics initially made sense, but this has grown ugly, and there's probably
// cases where this may produce incorrect results due to race in operation ordering
// (really didn't put any thought into this). We also can't have
if opts.sticky_queue_name.is_none() {
// Normal queue poll
cc.normal_poll_count.fetch_add(1, Ordering::Relaxed);
cc.normal_slots_active_count.fetch_add(1, Ordering::Relaxed);
cc.max_normal_slots_active_count.fetch_max(
cc.normal_slots_active_count.load(Ordering::Relaxed),
Ordering::AcqRel,
);
cc.max_total_slots_active_count.fetch_max(
cc.normal_slots_active_count.load(Ordering::Relaxed)
+ cc.sticky_slots_active_count.load(Ordering::Relaxed),
Ordering::AcqRel,
);

task_response.task_token = [task_response.task_token, b"normal".to_vec()].concat();
} else {
// Sticky queue poll
cc.sticky_poll_count.fetch_add(1, Ordering::Relaxed);
cc.sticky_slots_active_count.fetch_add(1, Ordering::Relaxed);
cc.max_sticky_slots_active_count.fetch_max(
cc.sticky_slots_active_count.load(Ordering::Acquire),
Ordering::AcqRel,
);
cc.max_total_slots_active_count.fetch_max(
cc.normal_slots_active_count.load(Ordering::Relaxed)
+ cc.sticky_slots_active_count.load(Ordering::Relaxed),
Ordering::AcqRel,
);

task_response.task_token = [task_response.task_token, b"sticky".to_vec()].concat();
}

// Return actual workflow task responses
Ok(task_response)
});

let cc = Arc::clone(&counters);
mock_client
.expect_complete_workflow_task()
.returning(move |completion| {
if completion.task_token.0.ends_with(b"normal") {
cc.normal_slots_active_count.fetch_sub(1, Ordering::Relaxed);
} else {
cc.sticky_slots_active_count.fetch_sub(1, Ordering::Relaxed);
}
Ok(Default::default())
});

let worker = Worker::new(
test_worker_cfg()
.max_cached_workflows(500_usize) // We need cache, but don't want to deal with evictions
.max_outstanding_workflow_tasks(2_usize)
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2_usize))
.nonsticky_to_sticky_poll_ratio(0.2)
.no_remote_activities(true)
.build()
.unwrap(),
Some("stickytq".to_string()),
Arc::new(mock_client),
None,
);

for _ in 1..50 {
let activation = worker.poll_workflow_activation().await.unwrap();
let _ = worker
.complete_workflow_activation(WorkflowActivationCompletion::empty(activation.run_id))
.await;
}

assert!(
counters.normal_poll_count.load(Ordering::Relaxed) > 0,
"Normal poller should have been called at least once"
);
assert!(
counters.sticky_poll_count.load(Ordering::Relaxed) > 0,
"Sticky poller should have been called at least once"
);
assert!(
counters
.max_normal_slots_active_count
.load(Ordering::Relaxed)
>= 1,
"Normal poller should have been active at least once"
);
assert!(
counters
.max_sticky_slots_active_count
.load(Ordering::Relaxed)
>= 1,
"Sticky poller should have been active at least once"
);
assert_eq!(
counters
.max_total_slots_active_count
.load(Ordering::Relaxed),
2,
"At peak, there should be exactly 2 pollers active at the same time"
);
}
67 changes: 47 additions & 20 deletions core/src/worker/workflow/wft_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ impl WFTPollerShared {
/// Makes either the sticky or non-sticky poller wait pre-permit-acquisition so that we can
/// balance which kind of queue we poll appropriately.
pub(crate) async fn wait_if_needed(&self, is_sticky: bool) {
// If there's a sticky backlog, prioritize it.
if !is_sticky {
let _ = self
.last_seen_sticky_backlog
.0
.clone()
.wait_for(|v| *v == 0)
.await;
}

// We need to make sure there's at least one poller of both kinds available. So, we check
// that we won't end up using every available permit with one kind of poller. In practice
// this is only ever likely to be an issue with very small numbers of slots.
Expand All @@ -121,26 +111,63 @@ impl WFTPollerShared {
{
let mut sticky_active = sticky_active.clone();
let mut non_sticky_active = non_sticky_active.clone();
let mut sticky_backlog = self.last_seen_sticky_backlog.0.clone();

loop {
let num_sticky_active = *sticky_active.borrow_and_update();
let num_non_sticky_active = *non_sticky_active.borrow_and_update();
let both_are_zero = num_sticky_active == 0 && num_non_sticky_active == 0;
if both_are_zero {
let num_sticky_backlog = *sticky_backlog.borrow_and_update();

let allow = || {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could avoid being a closure by using else-ifs... but I don't have a firm position on if that's more readable or not.

if !is_sticky {
break;
// There should always be at least one non-sticky poller.
if num_non_sticky_active == 0 {
return true;
}

// Do not allow an additional non-sticky poller to prevent starting a first sticky poller.
if num_sticky_active == 0 && num_non_sticky_active + 1 >= max_slots {
return false;
}

// If there's a meaningful sticky backlog, prioritize sticky.
if num_sticky_backlog > 1 && num_sticky_backlog > num_sticky_active {
return false;
}
} else {
// There should always be at least one sticky poller.
if num_sticky_active == 0 {
return true;
}

// Do not allow an additional sticky poller to prevent starting a first non-sticky poller.
if num_non_sticky_active == 0 && num_sticky_active + 1 >= max_slots {
return false;
}

// If there's a meaningful sticky backlog, prioritize sticky.
if num_sticky_backlog > 1 && num_sticky_backlog > num_sticky_active {
return true;
}
}
} else {
let would_exceed_max_slots =
(num_sticky_active + num_non_sticky_active + 1) >= max_slots;
let must_wait = would_exceed_max_slots
&& (num_sticky_active == 0 || num_non_sticky_active == 0);
if !must_wait {
break;

// Just balance the two poller types.
// FIXME: Do we need anything more here, to ensure proper balancing?
Comment on lines +154 to +155
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go effectively just forces there to be an even amount of both kinds of pending requests, except always favoring sticky during a backlog, and when counts are even: https://github.com/temporalio/sdk-go/blob/9682a692145f503a02b73f32435cb04bdc6f57c1/internal/internal_task_pollers.go#L782

We already are favoring it with a backlog. It's not obvious to me that just making an even amount is the best move either, but, it is what Core used to do and clearly it at least works, so we could go with that.

if num_sticky_active + num_non_sticky_active < max_slots {
return true;
}

false
};

if allow() {
return;
}

tokio::select! {
_ = sticky_active.changed() => (),
_ = non_sticky_active.changed() => (),
_ = sticky_backlog.changed() => (),
}
}
}
Expand Down
Loading