Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ name = "tokio"
# - README.md
# - Update CHANGELOG.md.
# - Create "v1.x.y" git tag.
version = "1.49.0+anthropic.1"
version = "1.49.0+anthropic.2"
edition = "2021"
rust-version = "1.71"
authors = ["Tokio Contributors <team@tokio.rs>"]
Expand Down
22 changes: 22 additions & 0 deletions tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ pub struct Builder {
/// Specify a random number generator seed to provide deterministic results
pub(super) seed_generator: RngSeedGenerator,

/// See [`deterministic_external_spawn`](Builder::deterministic_external_spawn).
pub(super) deterministic_external_spawn: bool,

/// When true, enables task poll count histogram instrumentation.
pub(super) metrics_poll_count_histogram_enable: bool,

Expand Down Expand Up @@ -330,6 +333,8 @@ impl Builder {

disable_lifo_slot: false,

deterministic_external_spawn: false,

timer_flavor: TimerFlavor::Traditional,

#[cfg(feature = "stall-detection")]
Expand Down Expand Up @@ -1262,6 +1267,21 @@ impl Builder {
self
}

/// (`current_thread` only) Drains the inject queue into the local queue
/// before each task pop, so spawns and wakes from non-runtime threads
/// are observed at a deterministic point relative to local processing.
///
/// Intended for deterministic-simulation testing where a coordinated
/// worker thread spawns onto this runtime while the executor is blocked.
/// In that setup the inject items are pushed in FIFO order with no
/// concurrent pop, so draining at a fixed point makes the resulting
/// schedule fully deterministic. Has no effect on the multi-threaded
/// scheduler.
pub fn deterministic_external_spawn(&mut self, val: bool) -> &mut Self {
self.deterministic_external_spawn = val;
self
}

/// Sets the number of scheduler ticks after which the scheduler will poll for
/// external events (timers, I/O, and so on).
///
Expand Down Expand Up @@ -1786,6 +1806,7 @@ impl Builder {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
deterministic_external_spawn: self.deterministic_external_spawn,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
Expand Down Expand Up @@ -1967,6 +1988,7 @@ cfg_rt_multi_thread! {
#[cfg(tokio_unstable)]
unhandled_panic: self.unhandled_panic.clone(),
disable_lifo_slot: self.disable_lifo_slot,
deterministic_external_spawn: self.deterministic_external_spawn,
seed_generator: seed_generator_1,
metrics_poll_count_histogram: self.metrics_poll_count_histogram_builder(),
},
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ pub(crate) struct Config {
/// deterministic way.
pub(crate) seed_generator: RngSeedGenerator,

/// When true, the `current_thread` scheduler drains the inject queue
/// into the local queue before each task pop, so external spawns/wakes
/// are observed at a deterministic point relative to local processing.
/// Intended for deterministic-simulation testing where a coordinated
/// worker thread spawns onto this runtime.
pub(crate) deterministic_external_spawn: bool,

/// How to build poll time histograms
pub(crate) metrics_poll_count_histogram: Option<crate::runtime::HistogramBuilder>,

Expand Down
10 changes: 10 additions & 0 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,16 @@ impl Core {
}

fn next_task(&mut self, handle: &Handle) -> Option<Notified> {
if handle.shared.config.deterministic_external_spawn {
// Absorb all inject items into the local FIFO before popping. With
// a coordinated external thread that only pushes while this
// executor is blocked, this makes the resulting schedule fully
// deterministic regardless of `global_queue_interval`.
while let Some(task) = handle.next_remote_task() {
self.tasks.push_back(task);
}
return self.next_local_task(handle);
}
if self.tick % self.global_queue_interval == 0 {
handle
.next_remote_task()
Expand Down