Skip to content

Commit ee26a92

Browse files
committed
Test with more values
1 parent 4fa8d2e commit ee26a92

2 files changed

Lines changed: 24 additions & 22 deletions

File tree

core/src/worker/workflow/wft_poller.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,6 @@ impl WFTPollerShared {
104104
if is_sticky {
105105
self.wait_for_first_nonsticky_poll.notified().await;
106106
}
107-
info!(
108-
"Waiting if needed. Sticky: {} / backlog {}",
109-
is_sticky,
110-
*self.last_seen_sticky_backlog.0.borrow()
111-
);
112107
// If there's a sticky backlog, prioritize it.
113108
if !is_sticky {
114109
let backlog = *self.last_seen_sticky_backlog.0.borrow();
@@ -127,12 +122,6 @@ impl WFTPollerShared {
127122
if let Some((sticky_active, non_sticky_active)) =
128123
self.sticky_active.get().zip(self.non_sticky_active.get())
129124
{
130-
info!(
131-
"Balance (sticky {}), non-sticky {} sticky {}",
132-
is_sticky,
133-
*non_sticky_active.borrow(),
134-
*sticky_active.borrow()
135-
);
136125
if is_sticky {
137126
let _ = sticky_active
138127
.clone()
@@ -147,7 +136,6 @@ impl WFTPollerShared {
147136
})
148137
.await;
149138
}
150-
info!("Done balance (sticky {})", is_sticky);
151139
}
152140
}
153141
}

tests/integ_tests/polling_tests.rs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,15 @@ use temporal_sdk_core::{
77
ClientOptionsBuilder, ephemeral_server::TemporalDevServerConfigBuilder, init_worker,
88
};
99
use temporal_sdk_core_api::{Worker, worker::PollerBehavior};
10-
use temporal_sdk_core_protos::coresdk::{
11-
AsJsonPayloadExt, IntoCompletion,
12-
activity_task::activity_task as act_task,
13-
workflow_activation::{FireTimer, WorkflowActivationJob, workflow_activation_job},
14-
workflow_commands::{ActivityCancellationType, RequestCancelActivity, StartTimer},
15-
workflow_completion::WorkflowActivationCompletion,
10+
use temporal_sdk_core_protos::{
11+
coresdk::{
12+
AsJsonPayloadExt, IntoCompletion,
13+
activity_task::activity_task as act_task,
14+
workflow_activation::{FireTimer, WorkflowActivationJob, workflow_activation_job},
15+
workflow_commands::{ActivityCancellationType, RequestCancelActivity, StartTimer},
16+
workflow_completion::WorkflowActivationCompletion,
17+
},
18+
temporal::api::enums::v1::EventType,
1619
};
1720
use temporal_sdk_core_test_utils::{
1821
CoreWfStarter, WorkerTestHelpers, default_cached_download, drain_pollers_and_shutdown,
@@ -209,16 +212,20 @@ async fn switching_worker_client_changes_poll() {
209212
server2.shutdown().await.unwrap();
210213
}
211214

215+
#[rstest::rstest]
212216
#[tokio::test]
213-
async fn only_one_workflow_slot_and_two_pollers() {
217+
async fn small_workflow_slots_and_pollers(
218+
#[values(1, 2)] wft_slots: usize,
219+
#[values(1, 2)] wft_pollers: usize,
220+
) {
214221
let wf_name = "only_one_workflow_slot_and_two_pollers";
215222
let mut starter = CoreWfStarter::new(wf_name);
216223
starter
217224
.worker_config
218-
.max_outstanding_workflow_tasks(2_usize)
225+
.max_outstanding_workflow_tasks(wft_slots)
219226
.max_outstanding_local_activities(1_usize)
220227
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(1))
221-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(2))
228+
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(wft_pollers))
222229
.max_outstanding_activities(1_usize);
223230
let mut worker = starter.worker().await;
224231
worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move {
@@ -236,7 +243,7 @@ async fn only_one_workflow_slot_and_two_pollers() {
236243
worker.register_activity("echo_activity", echo);
237244
worker
238245
.submit_wf(
239-
wf_name.to_string(),
246+
starter.get_task_queue(),
240247
wf_name.to_owned(),
241248
vec![],
242249
WorkflowOptions::default(),
@@ -245,4 +252,11 @@ async fn only_one_workflow_slot_and_two_pollers() {
245252
.unwrap();
246253
// If we don't fail the workflow on nondeterminism, we'll get stuck here retrying the WFT
247254
worker.run_until_done().await.unwrap();
255+
// Verify no task timeouts happened
256+
let history = starter.get_history().await;
257+
let any_task_timeouts = history
258+
.events
259+
.iter()
260+
.any(|e| e.event_type() == EventType::WorkflowTaskTimedOut);
261+
assert!(!any_task_timeouts);
248262
}

0 commit comments

Comments
 (0)