Skip to content

Commit 3dd1ba5

Browse files
committed
Use capabilities to ensure test uses graceful shutdown & use newest CLI
1 parent dab43fa commit 3dd1ba5

8 files changed

Lines changed: 155 additions & 122 deletions

File tree

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[env]
22
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
3-
#CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
3+
CLI_VERSION_OVERRIDE = "v1.6.3-serverless"
44

55
[alias]
66
# Not sure why --all-features doesn't work

crates/sdk-core/src/core_tests/workers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,7 @@ async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
12991299
let mw = MockWorkerInputs::new(stream.boxed());
13001300
let worker = mock_worker(MocksHolder::from_mock_worker(mock_client, mw));
13011301

1302-
// validate() reads describe_namespace and sets graceful_poll_shutdown = true
1302+
// validate() reads describe_namespace and sets capabilities.graceful_poll_shutdown = true
13031303
worker.validate().await.unwrap();
13041304

13051305
let poll_fut = worker.poll_workflow_activation();

crates/sdk-core/src/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ pub use temporalio_common::protos::TaskToken;
4242
pub use url::Url;
4343
pub use worker::{
4444
ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
45-
FixedSizeSlotSupplier, LocalActivitySlotKind, NexusSlotKind, PollError, PollerBehavior,
46-
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner,
47-
ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
48-
SlotReleaseContext, SlotReservationContext, SlotSupplier, SlotSupplierOptions,
49-
SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder,
50-
Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner, WorkerValidationError,
51-
WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
45+
FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
46+
PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
47+
ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
48+
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
49+
SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
50+
TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
51+
WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
5252
};
5353

5454
use crate::{

crates/sdk-core/src/pollers/poll_buffer.rs

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::{
22
abstractions::{ActiveCounter, MeteredPermitDealer, OwnedMeteredSemPermit, dbg_panic},
33
pollers::{self, Poller},
44
worker::{
5-
ActivitySlotKind, NexusSlotKind, PollerBehavior, SlotKind, WFTPollerShared,
6-
WorkflowSlotKind,
5+
ActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollerBehavior, SlotKind,
6+
WFTPollerShared, WorkflowSlotKind,
77
client::{PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient},
88
},
99
};
@@ -102,16 +102,15 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
102102
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
103103
options: WorkflowTaskOptions,
104104
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
105-
graceful_poll_shutdown: Arc<AtomicBool>,
106-
server_supports_autoscaling: Arc<AtomicBool>,
105+
capabilities: Arc<NamespaceCapabilities>,
107106
) -> Self {
108107
let is_sticky = sticky_queue.is_some();
109108
let poll_scaler = PollScaler::new(
110109
poller_behavior,
111110
num_pollers_handler,
112111
shutdown.clone(),
113112
last_successful_poll_time,
114-
server_supports_autoscaling,
113+
capabilities.clone(),
115114
);
116115
if let Some(wftps) = options.wft_poller_shared.as_ref() {
117116
if is_sticky {
@@ -167,7 +166,7 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
167166
poll_scaler,
168167
pre_permit_delay,
169168
post_poll_fn,
170-
graceful_poll_shutdown,
169+
capabilities,
171170
)
172171
}
173172
}
@@ -183,8 +182,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
183182
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
184183
options: ActivityTaskOptions,
185184
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
186-
graceful_poll_shutdown: Arc<AtomicBool>,
187-
server_supports_autoscaling: Arc<AtomicBool>,
185+
capabilities: Arc<NamespaceCapabilities>,
188186
) -> Self {
189187
let pre_permit_delay = options
190188
.max_worker_acts_per_second
@@ -226,7 +224,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
226224
num_pollers_handler,
227225
shutdown.clone(),
228226
last_successful_poll_time,
229-
server_supports_autoscaling,
227+
capabilities.clone(),
230228
);
231229
Self::new(
232230
poll_fn,
@@ -235,7 +233,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
235233
poll_scaler,
236234
pre_permit_delay,
237235
None::<fn(&PollActivityTaskQueueResponse)>,
238-
graceful_poll_shutdown,
236+
capabilities,
239237
)
240238
}
241239
}
@@ -251,8 +249,7 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
251249
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
252250
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
253251
send_heartbeat: bool,
254-
graceful_poll_shutdown: Arc<AtomicBool>,
255-
server_supports_autoscaling: Arc<AtomicBool>,
252+
capabilities: Arc<NamespaceCapabilities>,
256253
) -> Self {
257254
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
258255
Some(NoRetryOnMatching {
@@ -286,11 +283,11 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
286283
num_pollers_handler,
287284
shutdown,
288285
last_successful_poll_time,
289-
server_supports_autoscaling,
286+
capabilities.clone(),
290287
),
291288
None::<fn() -> BoxFuture<'static, ()>>,
292289
None::<fn(&PollNexusTaskQueueResponse)>,
293-
graceful_poll_shutdown,
290+
capabilities,
294291
)
295292
}
296293
}
@@ -316,7 +313,7 @@ where
316313
mut poll_scaler: PollScaler<F>,
317314
pre_permit_delay: Option<impl Fn() -> DelayFut + Send + Sync + 'static>,
318315
post_poll_fn: Option<impl Fn(&T) + Send + Sync + 'static>,
319-
graceful_shutdown: Arc<AtomicBool>,
316+
capabilities: Arc<NamespaceCapabilities>,
320317
) -> Self
321318
where
322319
FT: Future<Output = pollers::Result<T>> + Send,
@@ -387,9 +384,9 @@ where
387384
} else {
388385
None
389386
};
390-
let graceful_shutdown = graceful_shutdown.clone();
387+
let capabilities = capabilities.clone();
391388
let poll_task = tokio::spawn(async move {
392-
let r = if graceful_shutdown.load(Ordering::Relaxed) {
389+
let r = if capabilities.graceful_poll_shutdown() {
393390
pf(timeout_override).await
394391
} else {
395392
let poll_interruptor = shutdown.cancelled().then(|_| async move {
@@ -510,7 +507,7 @@ where
510507
num_pollers_handler: Option<F>,
511508
shutdown: CancellationToken,
512509
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
513-
server_supports_autoscaling: Arc<AtomicBool>,
510+
capabilities: Arc<NamespaceCapabilities>,
514511
) -> Self {
515512
let (active_tx, active_rx) = watch::channel(0);
516513
let num_pollers_handler = num_pollers_handler.map(Arc::new);
@@ -527,7 +524,7 @@ where
527524
min,
528525
target: AtomicUsize::new(target),
529526
ever_saw_scaling_decision: AtomicBool::default(),
530-
server_supports_autoscaling,
527+
capabilities,
531528
behavior,
532529
ingested_this_period: Default::default(),
533530
ingested_last_period: Default::default(),
@@ -612,7 +609,7 @@ struct PollScalerReportHandle {
612609
min: usize,
613610
target: AtomicUsize,
614611
ever_saw_scaling_decision: AtomicBool,
615-
server_supports_autoscaling: Arc<AtomicBool>,
612+
capabilities: Arc<NamespaceCapabilities>,
616613
behavior: PollerBehavior,
617614

618615
ingested_this_period: AtomicUsize,
@@ -721,7 +718,7 @@ impl PollScalerReportHandle {
721718
/// autoscaling, it's safe to scale down without having seen a decision.
722719
fn can_scale_down(&self) -> bool {
723720
self.ever_saw_scaling_decision.load(Ordering::Relaxed)
724-
|| self.server_supports_autoscaling.load(Ordering::Relaxed)
721+
|| self.capabilities.poller_autoscaling()
725722
}
726723
}
727724

@@ -887,8 +884,10 @@ mod tests {
887884
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
888885
},
889886
Arc::new(AtomicCell::new(None)),
890-
Arc::new(AtomicBool::new(false)),
891-
Arc::new(AtomicBool::new(false)),
887+
Arc::new(NamespaceCapabilities {
888+
graceful_poll_shutdown: AtomicBool::new(false),
889+
poller_autoscaling: AtomicBool::new(false),
890+
}),
892891
);
893892

894893
// Poll a bunch of times, "interrupting" it each time, we should only actually have polled
@@ -945,8 +944,10 @@ mod tests {
945944
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))),
946945
},
947946
Arc::new(AtomicCell::new(None)),
948-
Arc::new(AtomicBool::new(false)),
949-
Arc::new(AtomicBool::new(false)),
947+
Arc::new(NamespaceCapabilities {
948+
graceful_poll_shutdown: AtomicBool::new(false),
949+
poller_autoscaling: AtomicBool::new(false),
950+
}),
950951
);
951952

952953
// Should not see error, unwraps should get empty response
@@ -1023,8 +1024,10 @@ mod tests {
10231024
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
10241025
},
10251026
Arc::new(AtomicCell::new(None)),
1026-
Arc::new(AtomicBool::new(false)),
1027-
Arc::new(AtomicBool::new(false)),
1027+
Arc::new(NamespaceCapabilities {
1028+
graceful_poll_shutdown: AtomicBool::new(false),
1029+
poller_autoscaling: AtomicBool::new(false),
1030+
}),
10281031
);
10291032

10301033
let first_task = pb.poll().await.expect("Should get first task");
@@ -1130,8 +1133,10 @@ mod tests {
11301133
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
11311134
},
11321135
Arc::new(AtomicCell::new(None)),
1133-
Arc::new(AtomicBool::new(false)),
1134-
Arc::new(AtomicBool::new(false)),
1136+
Arc::new(NamespaceCapabilities {
1137+
graceful_poll_shutdown: AtomicBool::new(false),
1138+
poller_autoscaling: AtomicBool::new(false),
1139+
}),
11351140
));
11361141

11371142
// Trigger the first poll to initialize and get the scaling decision
@@ -1212,8 +1217,10 @@ mod tests {
12121217
wft_poller_shared: None,
12131218
},
12141219
Arc::new(AtomicCell::new(None)),
1215-
Arc::new(AtomicBool::new(graceful)),
1216-
Arc::new(AtomicBool::new(false)),
1220+
Arc::new(NamespaceCapabilities {
1221+
graceful_poll_shutdown: AtomicBool::new(graceful),
1222+
poller_autoscaling: AtomicBool::new(false),
1223+
}),
12171224
);
12181225

12191226
let first = pb.poll().await.unwrap().unwrap();
@@ -1265,7 +1272,10 @@ mod tests {
12651272
min: minimum,
12661273
target: AtomicUsize::new(10),
12671274
ever_saw_scaling_decision: AtomicBool::new(false),
1268-
server_supports_autoscaling: Arc::new(AtomicBool::new(supports_autoscaling)),
1275+
capabilities: Arc::new(NamespaceCapabilities {
1276+
graceful_poll_shutdown: AtomicBool::new(false),
1277+
poller_autoscaling: AtomicBool::new(supports_autoscaling),
1278+
}),
12691279
behavior: PollerBehavior::Autoscaling {
12701280
minimum,
12711281
maximum: 10,

crates/sdk-core/src/worker/activities.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ mod tests {
756756
abstractions::tests::fixed_size_permit_dealer,
757757
pollers::{ActivityTaskOptions, LongPollBuffer},
758758
prost_dur,
759-
worker::{PollerBehavior, client::mocks::mock_worker_client},
759+
worker::{NamespaceCapabilities, PollerBehavior, client::mocks::mock_worker_client},
760760
};
761761
use crossbeam_utils::atomic::AtomicCell;
762762
use temporalio_common::protos::coresdk::activity_result::ActivityExecutionResult;
@@ -804,8 +804,10 @@ mod tests {
804804
max_tps: None,
805805
},
806806
Arc::new(AtomicCell::new(None)),
807-
Arc::new(AtomicBool::new(false)),
808-
Arc::new(AtomicBool::new(false)),
807+
Arc::new(NamespaceCapabilities {
808+
graceful_poll_shutdown: AtomicBool::new(false),
809+
poller_autoscaling: AtomicBool::new(false),
810+
}),
809811
);
810812
let atm = WorkerActivityTasks::new(
811813
sem.clone(),
@@ -898,8 +900,10 @@ mod tests {
898900
max_tps: None,
899901
},
900902
Arc::new(AtomicCell::new(None)),
901-
Arc::new(AtomicBool::new(false)),
902-
Arc::new(AtomicBool::new(false)),
903+
Arc::new(NamespaceCapabilities {
904+
graceful_poll_shutdown: AtomicBool::new(false),
905+
poller_autoscaling: AtomicBool::new(false),
906+
}),
903907
);
904908
let atm = WorkerActivityTasks::new(
905909
sem.clone(),
@@ -974,8 +978,10 @@ mod tests {
974978
max_tps: None,
975979
},
976980
Arc::new(AtomicCell::new(None)),
977-
Arc::new(AtomicBool::new(false)),
978-
Arc::new(AtomicBool::new(false)),
981+
Arc::new(NamespaceCapabilities {
982+
graceful_poll_shutdown: AtomicBool::new(false),
983+
poller_autoscaling: AtomicBool::new(false),
984+
}),
979985
);
980986
let atm = WorkerActivityTasks::new(
981987
sem.clone(),

0 commit comments

Comments
 (0)