Skip to content

Commit 20d28e0

Browse files
committed
Use capabilities to ensure test uses graceful shutdown & use newest CLI
1 parent 038ed5b commit 20d28e0

8 files changed

Lines changed: 118 additions & 74 deletions

File tree

.cargo/config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[env]
22
# This temporarily overrides the version of the CLI used for integration tests, locally and in CI
3-
#CLI_VERSION_OVERRIDE = "v1.4.1-cloud-v1-29-0-139-2.0"
3+
CLI_VERSION_OVERRIDE = "v1.6.3-serverless"
44

55
[alias]
66
# Not sure why --all-features doesn't work

crates/sdk-core/src/core_tests/workers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1299,7 +1299,7 @@ async fn graceful_shutdown_sends_shutdown_worker_rpc_during_initiate() {
12991299
let mw = MockWorkerInputs::new(stream.boxed());
13001300
let worker = mock_worker(MocksHolder::from_mock_worker(mock_client, mw));
13011301

1302-
// validate() reads describe_namespace and sets graceful_poll_shutdown = true
1302+
// validate() reads describe_namespace and sets capabilities.graceful_poll_shutdown = true
13031303
worker.validate().await.unwrap();
13041304

13051305
let poll_fut = worker.poll_workflow_activation();

crates/sdk-core/src/lib.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ pub use temporalio_common::protos::TaskToken;
4242
pub use url::Url;
4343
pub use worker::{
4444
ActivitySlotKind, CompleteActivityError, CompleteNexusError, CompleteWfError,
45-
FixedSizeSlotSupplier, LocalActivitySlotKind, NexusSlotKind, PollError, PollerBehavior,
46-
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceBasedTuner,
47-
ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType, SlotMarkUsedContext,
48-
SlotReleaseContext, SlotReservationContext, SlotSupplier, SlotSupplierOptions,
49-
SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions, TunerHolderOptionsBuilder,
50-
Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner, WorkerValidationError,
51-
WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
45+
FixedSizeSlotSupplier, LocalActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollError,
46+
PollerBehavior, ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder,
47+
ResourceBasedTuner, ResourceSlotOptions, SlotInfo, SlotInfoTrait, SlotKind, SlotKindType,
48+
SlotMarkUsedContext, SlotReleaseContext, SlotReservationContext, SlotSupplier,
49+
SlotSupplierOptions, SlotSupplierPermit, TunerBuilder, TunerHolder, TunerHolderOptions,
50+
TunerHolderOptionsBuilder, Worker, WorkerConfig, WorkerConfigBuilder, WorkerTuner,
51+
WorkerValidationError, WorkerVersioningStrategy, WorkflowErrorType, WorkflowSlotKind,
5252
};
5353

5454
use crate::{

crates/sdk-core/src/pollers/poll_buffer.rs

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use crate::{
22
abstractions::{ActiveCounter, MeteredPermitDealer, OwnedMeteredSemPermit, dbg_panic},
33
pollers::{self, Poller},
44
worker::{
5-
ActivitySlotKind, NexusSlotKind, PollerBehavior, SlotKind, WFTPollerShared,
6-
WorkflowSlotKind,
5+
ActivitySlotKind, NamespaceCapabilities, NexusSlotKind, PollerBehavior, SlotKind,
6+
WFTPollerShared, WorkflowSlotKind,
77
client::{PollActivityOptions, PollOptions, PollWorkflowOptions, WorkerClient},
88
},
99
};
@@ -77,16 +77,15 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
7777
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
7878
options: WorkflowTaskOptions,
7979
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
80-
graceful_poll_shutdown: Arc<AtomicBool>,
81-
server_supports_autoscaling: Arc<AtomicBool>,
80+
capabilities: Arc<NamespaceCapabilities>,
8281
) -> Self {
8382
let is_sticky = sticky_queue.is_some();
8483
let poll_scaler = PollScaler::new(
8584
poller_behavior,
8685
num_pollers_handler,
8786
shutdown.clone(),
8887
last_successful_poll_time,
89-
server_supports_autoscaling,
88+
capabilities.clone(),
9089
);
9190
if let Some(wftps) = options.wft_poller_shared.as_ref() {
9291
if is_sticky {
@@ -142,7 +141,7 @@ impl LongPollBuffer<PollWorkflowTaskQueueResponse, WorkflowSlotKind> {
142141
poll_scaler,
143142
pre_permit_delay,
144143
post_poll_fn,
145-
graceful_poll_shutdown,
144+
capabilities,
146145
)
147146
}
148147
}
@@ -158,8 +157,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
158157
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
159158
options: ActivityTaskOptions,
160159
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
161-
graceful_poll_shutdown: Arc<AtomicBool>,
162-
server_supports_autoscaling: Arc<AtomicBool>,
160+
capabilities: Arc<NamespaceCapabilities>,
163161
) -> Self {
164162
let pre_permit_delay = options
165163
.max_worker_acts_per_second
@@ -204,7 +202,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
204202
num_pollers_handler,
205203
shutdown.clone(),
206204
last_successful_poll_time,
207-
server_supports_autoscaling,
205+
capabilities.clone(),
208206
);
209207
Self::new(
210208
poll_fn,
@@ -213,7 +211,7 @@ impl LongPollBuffer<PollActivityTaskQueueResponse, ActivitySlotKind> {
213211
poll_scaler,
214212
pre_permit_delay,
215213
None::<fn(&PollActivityTaskQueueResponse)>,
216-
graceful_poll_shutdown,
214+
capabilities,
217215
)
218216
}
219217
}
@@ -229,8 +227,7 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
229227
num_pollers_handler: Option<impl Fn(usize) + Send + Sync + 'static>,
230228
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
231229
send_heartbeat: bool,
232-
graceful_poll_shutdown: Arc<AtomicBool>,
233-
server_supports_autoscaling: Arc<AtomicBool>,
230+
capabilities: Arc<NamespaceCapabilities>,
234231
) -> Self {
235232
let no_retry = if matches!(poller_behavior, PollerBehavior::Autoscaling { .. }) {
236233
Some(NoRetryOnMatching {
@@ -264,11 +261,11 @@ impl LongPollBuffer<PollNexusTaskQueueResponse, NexusSlotKind> {
264261
num_pollers_handler,
265262
shutdown,
266263
last_successful_poll_time,
267-
server_supports_autoscaling,
264+
capabilities.clone(),
268265
),
269266
None::<fn() -> BoxFuture<'static, ()>>,
270267
None::<fn(&PollNexusTaskQueueResponse)>,
271-
graceful_poll_shutdown,
268+
capabilities,
272269
)
273270
}
274271
}
@@ -294,7 +291,7 @@ where
294291
mut poll_scaler: PollScaler<F>,
295292
pre_permit_delay: Option<impl Fn() -> DelayFut + Send + Sync + 'static>,
296293
post_poll_fn: Option<impl Fn(&T) + Send + Sync + 'static>,
297-
graceful_shutdown: Arc<AtomicBool>,
294+
capabilities: Arc<NamespaceCapabilities>,
298295
) -> Self
299296
where
300297
FT: Future<Output = pollers::Result<T>> + Send,
@@ -365,9 +362,9 @@ where
365362
} else {
366363
None
367364
};
368-
let graceful_shutdown = graceful_shutdown.clone();
365+
let capabilities = capabilities.clone();
369366
let poll_task = tokio::spawn(async move {
370-
let r = if graceful_shutdown.load(Ordering::Relaxed) {
367+
let r = if capabilities.graceful_poll_shutdown() {
371368
pf(timeout_override).await
372369
} else {
373370
let poll_interruptor = shutdown.cancelled().then(|_| async move {
@@ -488,7 +485,7 @@ where
488485
num_pollers_handler: Option<F>,
489486
shutdown: CancellationToken,
490487
last_successful_poll_time: Arc<AtomicCell<Option<SystemTime>>>,
491-
server_supports_autoscaling: Arc<AtomicBool>,
488+
capabilities: Arc<NamespaceCapabilities>,
492489
) -> Self {
493490
let (active_tx, active_rx) = watch::channel(0);
494491
let num_pollers_handler = num_pollers_handler.map(Arc::new);
@@ -505,7 +502,7 @@ where
505502
min,
506503
target: AtomicUsize::new(target),
507504
ever_saw_scaling_decision: AtomicBool::default(),
508-
server_supports_autoscaling,
505+
capabilities,
509506
behavior,
510507
ingested_this_period: Default::default(),
511508
ingested_last_period: Default::default(),
@@ -590,7 +587,7 @@ struct PollScalerReportHandle {
590587
min: usize,
591588
target: AtomicUsize,
592589
ever_saw_scaling_decision: AtomicBool,
593-
server_supports_autoscaling: Arc<AtomicBool>,
590+
capabilities: Arc<NamespaceCapabilities>,
594591
behavior: PollerBehavior,
595592

596593
ingested_this_period: AtomicUsize,
@@ -699,7 +696,7 @@ impl PollScalerReportHandle {
699696
/// autoscaling, it's safe to scale down without having seen a decision.
700697
fn can_scale_down(&self) -> bool {
701698
self.ever_saw_scaling_decision.load(Ordering::Relaxed)
702-
|| self.server_supports_autoscaling.load(Ordering::Relaxed)
699+
|| self.capabilities.poller_autoscaling()
703700
}
704701
}
705702

@@ -865,8 +862,10 @@ mod tests {
865862
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
866863
},
867864
Arc::new(AtomicCell::new(None)),
868-
Arc::new(AtomicBool::new(false)),
869-
Arc::new(AtomicBool::new(false)),
865+
Arc::new(NamespaceCapabilities {
866+
graceful_poll_shutdown: AtomicBool::new(false),
867+
poller_autoscaling: AtomicBool::new(false),
868+
}),
870869
);
871870

872871
// Poll a bunch of times, "interrupting" it each time, we should only actually have polled
@@ -923,8 +922,10 @@ mod tests {
923922
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(1)))),
924923
},
925924
Arc::new(AtomicCell::new(None)),
926-
Arc::new(AtomicBool::new(false)),
927-
Arc::new(AtomicBool::new(false)),
925+
Arc::new(NamespaceCapabilities {
926+
graceful_poll_shutdown: AtomicBool::new(false),
927+
poller_autoscaling: AtomicBool::new(false),
928+
}),
928929
);
929930

930931
// Should not see error, unwraps should get empty response
@@ -1001,8 +1002,10 @@ mod tests {
10011002
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
10021003
},
10031004
Arc::new(AtomicCell::new(None)),
1004-
Arc::new(AtomicBool::new(false)),
1005-
Arc::new(AtomicBool::new(false)),
1005+
Arc::new(NamespaceCapabilities {
1006+
graceful_poll_shutdown: AtomicBool::new(false),
1007+
poller_autoscaling: AtomicBool::new(false),
1008+
}),
10061009
);
10071010

10081011
let first_task = pb.poll().await.expect("Should get first task");
@@ -1108,8 +1111,10 @@ mod tests {
11081111
wft_poller_shared: Some(Arc::new(WFTPollerShared::new(Some(10)))),
11091112
},
11101113
Arc::new(AtomicCell::new(None)),
1111-
Arc::new(AtomicBool::new(false)),
1112-
Arc::new(AtomicBool::new(false)),
1114+
Arc::new(NamespaceCapabilities {
1115+
graceful_poll_shutdown: AtomicBool::new(false),
1116+
poller_autoscaling: AtomicBool::new(false),
1117+
}),
11131118
));
11141119

11151120
// Trigger the first poll to initialize and get the scaling decision
@@ -1190,8 +1195,10 @@ mod tests {
11901195
wft_poller_shared: None,
11911196
},
11921197
Arc::new(AtomicCell::new(None)),
1193-
Arc::new(AtomicBool::new(graceful)),
1194-
Arc::new(AtomicBool::new(false)),
1198+
Arc::new(NamespaceCapabilities {
1199+
graceful_poll_shutdown: AtomicBool::new(graceful),
1200+
poller_autoscaling: AtomicBool::new(false),
1201+
}),
11951202
);
11961203

11971204
let first = pb.poll().await.unwrap().unwrap();
@@ -1243,7 +1250,10 @@ mod tests {
12431250
min: minimum,
12441251
target: AtomicUsize::new(10),
12451252
ever_saw_scaling_decision: AtomicBool::new(false),
1246-
server_supports_autoscaling: Arc::new(AtomicBool::new(supports_autoscaling)),
1253+
capabilities: Arc::new(NamespaceCapabilities {
1254+
graceful_poll_shutdown: AtomicBool::new(false),
1255+
poller_autoscaling: AtomicBool::new(supports_autoscaling),
1256+
}),
12471257
behavior: PollerBehavior::Autoscaling {
12481258
minimum,
12491259
maximum: 10,

crates/sdk-core/src/worker/activities.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -733,7 +733,7 @@ mod tests {
733733
abstractions::tests::fixed_size_permit_dealer,
734734
pollers::{ActivityTaskOptions, LongPollBuffer},
735735
prost_dur,
736-
worker::{PollerBehavior, client::mocks::mock_worker_client},
736+
worker::{NamespaceCapabilities, PollerBehavior, client::mocks::mock_worker_client},
737737
};
738738
use crossbeam_utils::atomic::AtomicCell;
739739
use temporalio_common::protos::coresdk::activity_result::ActivityExecutionResult;
@@ -781,8 +781,10 @@ mod tests {
781781
max_tps: None,
782782
},
783783
Arc::new(AtomicCell::new(None)),
784-
Arc::new(AtomicBool::new(false)),
785-
Arc::new(AtomicBool::new(false)),
784+
Arc::new(NamespaceCapabilities {
785+
graceful_poll_shutdown: AtomicBool::new(false),
786+
poller_autoscaling: AtomicBool::new(false),
787+
}),
786788
);
787789
let atm = WorkerActivityTasks::new(
788790
sem.clone(),
@@ -875,8 +877,10 @@ mod tests {
875877
max_tps: None,
876878
},
877879
Arc::new(AtomicCell::new(None)),
878-
Arc::new(AtomicBool::new(false)),
879-
Arc::new(AtomicBool::new(false)),
880+
Arc::new(NamespaceCapabilities {
881+
graceful_poll_shutdown: AtomicBool::new(false),
882+
poller_autoscaling: AtomicBool::new(false),
883+
}),
880884
);
881885
let atm = WorkerActivityTasks::new(
882886
sem.clone(),
@@ -951,8 +955,10 @@ mod tests {
951955
max_tps: None,
952956
},
953957
Arc::new(AtomicCell::new(None)),
954-
Arc::new(AtomicBool::new(false)),
955-
Arc::new(AtomicBool::new(false)),
958+
Arc::new(NamespaceCapabilities {
959+
graceful_poll_shutdown: AtomicBool::new(false),
960+
poller_autoscaling: AtomicBool::new(false),
961+
}),
956962
);
957963
let atm = WorkerActivityTasks::new(
958964
sem.clone(),

0 commit comments

Comments
 (0)