Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 47 additions & 40 deletions src/rust/graph/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).
use std::cmp;
use std::collections::{HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::atomic::{self, AtomicUsize};
use std::sync::atomic::{self, AtomicBool, AtomicUsize};
use std::sync::{Arc, mpsc};
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -194,8 +193,32 @@ async fn invalidate_with_changed_dependencies() {
);
}

#[ignore] // flaky:
// https://github.com/pantsbuild/pants/issues/10839
// Deterministic coverage of partial-cache preservation; paired with `invalidate_randomly`.
#[tokio::test]
async fn invalidate_preserves_clean_prefix() {
let graph = empty_graph();
let range = 100;
let pivot = 50;

let ctx0 = graph.context(TContext::new().with_salt(0));
let warmed = graph.create(TNode::new(range), &ctx0).await.unwrap();
assert_eq!(TNode::validate(&warmed).unwrap(), 1);

// Dirty one interior node: only its dependents (pivot..=range) become dirty.
graph.invalidate_from_roots(true, |n: &TNode| n.id == pivot);

let ctx1 = graph.context(TContext::new().with_salt(1));
let recomputed = graph.create(TNode::new(range), &ctx1).await.unwrap();
let expected: Vec<T> = (0..pivot)
.map(|i| T(i, 0))
.chain((pivot..=range).map(|i| T(i, 1)))
.collect();
assert_eq!(recomputed, expected);
assert_eq!(TNode::validate(&recomputed).unwrap(), 2);
}

// Stress test: concurrent invalidation races against repeated requests. Asserts only structural
// correctness (via `TNode::validate`) and liveness. See #10839 for history.
#[tokio::test]
async fn invalidate_randomly() {
let graph = empty_graph();
Expand All @@ -204,56 +227,40 @@ async fn invalidate_randomly() {
let sleep_per_invalidation = Duration::from_millis(100);
let range = 100;

// Spawn a background thread to randomly invalidate in the relevant range. Hold its handle so
// it doesn't detach.
let graph2 = graph.clone();
let (send, recv) = mpsc::channel();
let _join = thread::spawn(move || {
let halt = Arc::new(AtomicBool::new(false));
let halt2 = halt.clone();
let bg = thread::spawn(move || {
let mut rng = rand::rng();
let mut invalidations = invalidations;
while invalidations > 0 {
invalidations -= 1;

// Invalidate a random node in the graph.
for _ in 0..invalidations {
let candidate = rng.random_range(0..range);
graph2.invalidate_from_roots(true, |n: &TNode| n.id == candidate);

thread::sleep(sleep_per_invalidation);
}
send.send(()).unwrap();
halt2.store(true, atomic::Ordering::SeqCst);
});

// Continuously re-request the root with increasing context values, and assert that Node and
// context values are ascending.
let mut iterations = 0;
let mut max_distinct_context_values = 0;
loop {
let context = graph.context(TContext::new().with_salt(iterations));

// Compute the root, and validate its output.
let node_output = match graph.create(TNode::new(range), &context).await {
Ok(output) => output,
Err(TError::Invalidated) => {
// Some amount of concurrent invalidation is expected: retry.
continue;
// Poll `halt` at the top so we exit even if every compute hits Invalidated.
let mut attempts: usize = 0;
let mut successful: usize = 0;
while !halt.load(atomic::Ordering::SeqCst) {
attempts += 1;
let context = graph.context(TContext::new().with_salt(attempts));
match graph.create(TNode::new(range), &context).await {
Ok(output) => {
TNode::validate(&output).unwrap();
successful += 1;
}
Err(TError::Invalidated) => {}
Err(e) => panic!("Did not expect any errors other than Invalidation. Got: {e:?}"),
};
max_distinct_context_values = cmp::max(
max_distinct_context_values,
TNode::validate(&node_output).unwrap(),
);

// Poll the channel to see whether the background thread has exited.
if recv.try_recv().is_ok() {
break;
}
iterations += 1;
}
bg.join().unwrap();

// Liveness only: correctness is checked by `TNode::validate` above.
assert!(
max_distinct_context_values > 1,
"In {iterations} iterations, observed a maximum of {max_distinct_context_values} distinct context values."
successful > 0,
"No successful computes in {attempts} attempts."
);
}

Expand Down
Loading