Skip to content

Commit 4fa8d2e

Browse files
committed
Fixes two-slots-two-pollers
1 parent cffd778 commit 4fa8d2e

5 files changed

Lines changed: 110 additions & 39 deletions

File tree

core-api/src/worker.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,11 @@ impl SlotSupplierPermit {
389389
pub fn user_data_mut<T: Any + Send + Sync>(&mut self) -> Option<&mut T> {
390390
self.user_data.as_mut().and_then(|b| b.downcast_mut())
391391
}
392+
393+
/// Deconstruct this permit and return the inner data
394+
pub fn into_user_data(self) -> Option<Box<dyn Any + Send + Sync>> {
395+
self.user_data
396+
}
392397
}
393398

394399
#[derive(Debug, Copy, Clone, derive_more::Display, Eq, PartialEq)]

core/src/abstractions.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use tokio_util::sync::CancellationToken;
2424
/// as handling associated metrics tracking.
2525
#[derive(Clone)]
2626
pub(crate) struct MeteredPermitDealer<SK: SlotKind> {
27-
supplier: Arc<dyn SlotSupplier<SlotKind = SK> + Send + Sync>,
27+
pub(crate) supplier: Arc<dyn SlotSupplier<SlotKind = SK> + Send + Sync>,
2828
/// The number of permit owners who have acquired a permit, but are not yet meaningfully using
2929
/// that permit. This is useful for giving a more semantically accurate count of used task
3030
/// slots, since we typically wait for a permit first before polling, but that slot isn't used
@@ -393,6 +393,35 @@ macro_rules! dbg_panic {
393393
}
394394
pub(crate) use dbg_panic;
395395

396+
pub(crate) struct ActiveCounter<F: Fn(usize)>(watch::Sender<usize>, Option<Arc<F>>);
397+
impl<F> ActiveCounter<F>
398+
where
399+
F: Fn(usize),
400+
{
401+
pub(crate) fn new(a: watch::Sender<usize>, change_fn: Option<Arc<F>>) -> Self {
402+
a.send_modify(|v| {
403+
*v += 1;
404+
if let Some(cfn) = change_fn.as_ref() {
405+
cfn(*v);
406+
}
407+
});
408+
Self(a, change_fn)
409+
}
410+
}
411+
impl<F> Drop for ActiveCounter<F>
412+
where
413+
F: Fn(usize),
414+
{
415+
fn drop(&mut self) {
416+
self.0.send_modify(|v| {
417+
*v -= 1;
418+
if let Some(cfn) = self.1.as_ref() {
419+
cfn(*v)
420+
};
421+
});
422+
}
423+
}
424+
396425
#[cfg(test)]
397426
pub(crate) mod tests {
398427
use super::*;

core/src/pollers/poll_buffer.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::{
2-
abstractions::{MeteredPermitDealer, OwnedMeteredSemPermit, dbg_panic},
2+
abstractions::{ActiveCounter, MeteredPermitDealer, OwnedMeteredSemPermit, dbg_panic},
33
pollers::{self, Poller},
44
worker::{
55
WFTPollerShared,
@@ -391,35 +391,6 @@ async fn handle_task_panic(t: JoinHandle<()>) {
391391
}
392392
}
393393

394-
struct ActiveCounter<F: Fn(usize)>(watch::Sender<usize>, Option<Arc<F>>);
395-
impl<F> ActiveCounter<F>
396-
where
397-
F: Fn(usize),
398-
{
399-
fn new(a: watch::Sender<usize>, change_fn: Option<Arc<F>>) -> Self {
400-
a.send_modify(|v| {
401-
*v += 1;
402-
if let Some(cfn) = change_fn.as_ref() {
403-
cfn(*v);
404-
}
405-
});
406-
Self(a, change_fn)
407-
}
408-
}
409-
impl<F> Drop for ActiveCounter<F>
410-
where
411-
F: Fn(usize),
412-
{
413-
fn drop(&mut self) {
414-
self.0.send_modify(|v| {
415-
*v -= 1;
416-
if let Some(cfn) = self.1.as_ref() {
417-
cfn(*v)
418-
};
419-
});
420-
}
421-
}
422-
423394
/// The PollScaler is responsible for managing the number of pollers based on the current load.
424395
///
425396
/// It does so by receiving suggestions from the server about whether to scale up or down. It will

core/src/worker/workflow/wft_poller.rs

Lines changed: 31 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ use crate::{
77
worker::{client::WorkerClient, wft_poller_behavior},
88
};
99
use futures_util::{Stream, stream};
10-
use std::sync::{Arc, OnceLock};
10+
use std::sync::{
11+
Arc, OnceLock,
12+
atomic::{AtomicBool, Ordering},
13+
};
1114
use temporal_sdk_core_api::worker::{WorkerConfig, WorkflowSlotKind};
1215
use temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse;
13-
use tokio::sync::watch;
16+
use tokio::sync::{Notify, watch};
1417
use tokio_util::sync::CancellationToken;
1518

1619
pub(crate) fn make_wft_poller(
@@ -74,6 +77,8 @@ pub(crate) struct WFTPollerShared {
7477
last_seen_sticky_backlog: (watch::Receiver<usize>, watch::Sender<usize>),
7578
sticky_active: OnceLock<watch::Receiver<usize>>,
7679
non_sticky_active: OnceLock<watch::Receiver<usize>>,
80+
wait_for_first_nonsticky_poll: Notify,
81+
have_done_first_poll: AtomicBool,
7782
}
7883
impl WFTPollerShared {
7984
pub(crate) fn new() -> Self {
@@ -82,6 +87,8 @@ impl WFTPollerShared {
8287
last_seen_sticky_backlog: (rx, tx),
8388
sticky_active: OnceLock::new(),
8489
non_sticky_active: OnceLock::new(),
90+
wait_for_first_nonsticky_poll: Notify::new(),
91+
have_done_first_poll: AtomicBool::new(false),
8592
}
8693
}
8794
pub(crate) fn set_sticky_active(&self, rx: watch::Receiver<usize>) {
@@ -93,15 +100,24 @@ impl WFTPollerShared {
93100
/// Makes either the sticky or non-sticky poller wait pre-permit-acquisition so that we can
94101
/// balance which kind of queue we poll appropriately.
95102
pub(crate) async fn wait_if_needed(&self, is_sticky: bool) {
103+
// Sticky shouldn't start polling until after the first non-sticky poll has been allowed
104+
if is_sticky {
105+
self.wait_for_first_nonsticky_poll.notified().await;
106+
}
107+
info!(
108+
"Waiting if needed. Sticky: {} / backlog {}",
109+
is_sticky,
110+
*self.last_seen_sticky_backlog.0.borrow()
111+
);
96112
// If there's a sticky backlog, prioritize it.
97113
if !is_sticky {
98114
let backlog = *self.last_seen_sticky_backlog.0.borrow();
99-
if backlog > 1 {
115+
if backlog >= 1 {
100116
let _ = self
101117
.last_seen_sticky_backlog
102118
.0
103119
.clone()
104-
.wait_for(|v| *v <= 1)
120+
.wait_for(|v| *v < 1)
105121
.await;
106122
}
107123
}
@@ -111,6 +127,12 @@ impl WFTPollerShared {
111127
if let Some((sticky_active, non_sticky_active)) =
112128
self.sticky_active.get().zip(self.non_sticky_active.get())
113129
{
130+
info!(
131+
"Balance (sticky {}), non-sticky {} sticky {}",
132+
is_sticky,
133+
*non_sticky_active.borrow(),
134+
*sticky_active.borrow()
135+
);
114136
if is_sticky {
115137
let _ = sticky_active
116138
.clone()
@@ -119,9 +141,13 @@ impl WFTPollerShared {
119141
} else {
120142
let _ = non_sticky_active
121143
.clone()
122-
.wait_for(|v| *v <= *sticky_active.borrow())
144+
.wait_for(|v| {
145+
*v < *sticky_active.borrow()
146+
|| !self.have_done_first_poll.load(Ordering::Relaxed)
147+
})
123148
.await;
124149
}
150+
info!("Done balance (sticky {})", is_sticky);
125151
}
126152
}
127153
}

tests/integ_tests/polling_tests.rs

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
1+
use crate::integ_tests::activity_functions::echo;
12
use assert_matches::assert_matches;
23
use std::{sync::Arc, time::Duration};
34
use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowOptions};
5+
use temporal_sdk::{ActivityOptions, WfContext};
46
use temporal_sdk_core::{
57
ClientOptionsBuilder, ephemeral_server::TemporalDevServerConfigBuilder, init_worker,
68
};
7-
use temporal_sdk_core_api::Worker;
9+
use temporal_sdk_core_api::{Worker, worker::PollerBehavior};
810
use temporal_sdk_core_protos::coresdk::{
9-
IntoCompletion,
11+
AsJsonPayloadExt, IntoCompletion,
1012
activity_task::activity_task as act_task,
1113
workflow_activation::{FireTimer, WorkflowActivationJob, workflow_activation_job},
1214
workflow_commands::{ActivityCancellationType, RequestCancelActivity, StartTimer},
1315
workflow_completion::WorkflowActivationCompletion,
1416
};
1517
use temporal_sdk_core_test_utils::{
16-
WorkerTestHelpers, default_cached_download, drain_pollers_and_shutdown,
18+
CoreWfStarter, WorkerTestHelpers, default_cached_download, drain_pollers_and_shutdown,
1719
init_core_and_create_wf, init_integ_telem, integ_worker_config, schedule_activity_cmd,
1820
};
1921
use tokio::time::timeout;
@@ -206,3 +208,41 @@ async fn switching_worker_client_changes_poll() {
206208
server1.shutdown().await.unwrap();
207209
server2.shutdown().await.unwrap();
208210
}
211+
212+
#[tokio::test]
213+
async fn only_one_workflow_slot_and_two_pollers() {
214+
let wf_name = "only_one_workflow_slot_and_two_pollers";
215+
let mut starter = CoreWfStarter::new(wf_name);
216+
starter
217+
.worker_config
218+
.max_outstanding_workflow_tasks(2_usize)
219+
.max_outstanding_local_activities(1_usize)
220+
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1))
221+
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2))
222+
.max_outstanding_activities(1_usize);
223+
let mut worker = starter.worker().await;
224+
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
225+
for _ in 0..3 {
226+
ctx.activity(ActivityOptions {
227+
activity_type: "echo_activity".to_string(),
228+
start_to_close_timeout: Some(Duration::from_secs(5)),
229+
input: "hi!".as_json_payload().expect("serializes fine"),
230+
..Default::default()
231+
})
232+
.await;
233+
}
234+
Ok(().into())
235+
});
236+
worker.register_activity("echo_activity", echo);
237+
worker
238+
.submit_wf(
239+
wf_name.to_string(),
240+
wf_name.to_owned(),
241+
vec![],
242+
WorkflowOptions::default(),
243+
)
244+
.await
245+
.unwrap();
246+
// If we don't fail the workflow on nondeterminism, we'll get stuck here retrying the WFT
247+
worker.run_until_done().await.unwrap();
248+
}

0 commit comments

Comments
 (0)