Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a644f64
fast_slow_store: only bound followers' wait, never the leader's populate
amankrx May 2, 2026
35cf8f1
fast_slow_store: never pass caller's writer into follower closures
amankrx May 2, 2026
1a236a0
execution_server: pre-validate CAS blobs and return PreconditionFailure
amankrx May 2, 2026
20b1de9
execution_server: detect missing Action proto and surface Preconditio…
amankrx May 2, 2026
55ac2f8
ft_aggregate: pass explicit TIMEOUT to absorb RediSearch slow scans
amankrx May 5, 2026
749e1b4
health_utils: run indicator checks in parallel, not serially
amankrx May 5, 2026
a424b38
health_utils: run indicator checks in parallel, not serially
amankrx May 5, 2026
8266b7d
action_messages: surface PreconditionFailure for any missing-CAS-blob…
amankrx May 6, 2026
4f8e090
fast_slow_store: fall through to slow on stale fast-tier map entries
amankrx May 6, 2026
c461440
dynamic_fake_redis: tolerate the new explicit FT.AGGREGATE TIMEOUT cl…
amankrx May 6, 2026
065daaa
store_awaited_action_db: retry try_subscribe once on miss to close dedup
amankrx May 6, 2026
ee11645
redis_store: lightweight check_health using PING instead of full I/O
amankrx May 6, 2026
e5a7c94
filesystem_store: make unref ENOENT idempotent and demote map/disk di…
amankrx May 6, 2026
1a6cc2a
Merge branch 'main' into temp-cas-bulk-changes
amankrx May 6, 2026
56b0f02
fast_slow_store: bypass leader/follower dedup for huge blobs
amankrx May 7, 2026
cc6dd6f
gcs_store, filesystem_store: lightweight check_health probes
amankrx May 7, 2026
7a89d6b
redis_store: raise check_health PING ceiling from 2s to 4s
amankrx May 7, 2026
ccdfee2
connection_manager: replace usize counter with Semaphore RAII permit
amankrx May 9, 2026
c6c69bd
redis_store: hold subscribed_keys write lock across receiver drop
amankrx May 9, 2026
b3d5473
Merge branch 'upstream/main' into temp-cas-bulk-changes
amankrx May 9, 2026
c742d9f
store_awaited_action_db: bound cid_* lifetime to stop orphan accumula…
amankrx May 11, 2026
e06e3e7
simple_scheduler_state_manager: count worker disconnects toward max_j…
amankrx May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,26 @@ pub struct FastSlowSpec {
/// and you wish to have an upstream read only store.
#[serde(default)]
pub slow_direction: StoreDirection,

/// Reads of blobs at or above this size bypass the populating-digests
/// dedup map and stream directly from the slow store, without
/// populating the fast tier.
///
/// Rationale: the leader/follower dedup is a win for blobs whose
/// transfer time is short relative to `LEADER_WAIT_TIMEOUT` — one
/// slow-store fetch fills the fast cache, subsequent readers serve
/// from fast. For multi-GB blobs (typically container layers) the
/// leader's transfer takes minutes; every concurrent follower hits
/// the timeout, falls through to the slow store anyway, and the
/// fast cache is then evicted aggressively to make room for the
/// huge blob — pushing out smaller, more-frequently-read entries.
/// Bypassing dedup for huge blobs avoids both pathologies.
///
/// Set to 0 (default) to use the built-in default of 256 MiB. Set
/// to a very large value (e.g. `u64::MAX`) to disable the bypass
/// and always use dedup regardless of size.
#[serde(default, deserialize_with = "convert_data_size_with_shellexpand")]
pub bypass_dedup_threshold_bytes: u64,
}

#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
Expand Down
44 changes: 43 additions & 1 deletion nativelink-redis-tester/src/dynamic_fake_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ pub trait SubscriptionManagerNotify {
pub struct FakeRedisBackend<S: SubscriptionManagerNotify> {
/// Contains a list of all of the Redis keys -> fields.
pub table: Arc<Mutex<HashMap<String, HashMap<String, Value>>>>,
/// TTL (in seconds) attached to each key via `EXPIRE`. Lets tests
/// assert that a key was set with a bounded lifetime — needed for
/// regression coverage of the cid_* orphan fix, where the original
/// bug was the absence of any TTL.
pub expiries: Arc<Mutex<HashMap<String, i64>>>,
subscription_manager: Arc<Mutex<Option<Arc<S>>>>,
}

Expand All @@ -55,6 +60,7 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
pub fn new() -> Self {
Self {
table: Arc::new(Mutex::new(HashMap::new())),
expiries: Arc::new(Mutex::new(HashMap::new())),
subscription_manager: Arc::new(Mutex::new(None)),
}
}
Expand Down Expand Up @@ -137,9 +143,21 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
panic!("Aggregate query should be a string: {args:?}");
};
let query = str::from_utf8(raw_query).unwrap();
// The real ft_aggregate caller now passes an explicit
// `TIMEOUT <ms>` clause before `LOAD`. Tolerate both
// shapes here so this fake doesn't break older callers
// and the LOAD-args check still validates the bit we
// actually care about.
let load_offset = if matches!(args.get(2), Some(OwnedFrame::BulkString(b)) if b == b"TIMEOUT")
{
// Skip "TIMEOUT" and its millisecond argument.
4
} else {
2
};
// Lazy implementation making assumptions.
assert_eq!(
args[2..6],
args[load_offset..load_offset + 4],
vec![
OwnedFrame::BulkString(b"LOAD".to_vec()),
OwnedFrame::BulkString(b"2".to_vec()),
Expand Down Expand Up @@ -322,6 +340,30 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
Value::Okay
}

"EXPIRE" => {
// `EXPIRE key seconds` — record the TTL so tests
// can assert one was attached. Real Redis
// returns 1 if the key existed (TTL applied) or
// 0 if not.
let key_name =
str::from_utf8(args[0].as_bytes().expect("Key argument is not bytes"))
.expect("Unable to parse key name")
.to_string();
let seconds = str::from_utf8(
args[1].as_bytes().expect("EXPIRE seconds is not bytes"),
)
.expect("EXPIRE seconds is not utf8")
.parse::<i64>()
.expect("EXPIRE seconds is not an integer");
let exists = self.table.lock().unwrap().contains_key(&key_name);
if exists {
self.expiries.lock().unwrap().insert(key_name, seconds);
Value::Int(1)
} else {
Value::Int(0)
}
}

"HMGET" => {
let key_name =
str::from_utf8(args[0].as_bytes().expect("Key argument is not bytes"))
Expand Down
51 changes: 50 additions & 1 deletion nativelink-scheduler/src/simple_scheduler_state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,56 @@ where
ActionStage::Queued
}
}
UpdateOperationType::UpdateWithDisconnect => ActionStage::Queued,
UpdateOperationType::UpdateWithDisconnect => {
// A worker disconnecting mid-action is functionally
// equivalent to an error from the action's POV: the
// action did not complete on that worker. Pre-fix
// this branch unconditionally re-queued without
// touching `attempts`, so `max_job_retries` never
// tripped. The customer-visible failure mode: a
// worker pod whose resource limits are below the
// peak working set of its action set OOMKills, the
// scheduler re-dispatches the same action set to a
// freshly-spawned worker, that worker OOMs the
// same way, and the loop continues indefinitely.
// Bazel's client-side `--test_timeout` was the
// only thing eventually breaking the cycle, with
// the symptom surfacing to the user as TIMEOUT
// (looks like a slow test) or NO STATUS (looks
// like a stuck dependency) — neither pointing at
// the cluster.
//
// Counting disconnects against `max_job_retries`
// bounds the loop. A transient network blip on a
// single dispatch still has `max_job_retries - 1`
// attempts of headroom (customer-helm sets this
// to 5), so legitimate blips remain harmless;
// sustained crash-loops surface a clean
// backend-attributed failure that names the worker
// disconnect path explicitly.
awaited_action.attempts += 1;
if awaited_action.attempts > self.max_job_retries {
ActionStage::Completed(ActionResult {
execution_metadata: ExecutionMetadata {
worker: maybe_worker_id
.map_or_else(String::default, ToString::to_string),
..ExecutionMetadata::default()
},
error: Some(make_err!(
Code::Aborted,
"Worker disconnected {} times (>{} max_job_retries) before \
completing this action; suspect worker crash loop, e.g. OOM \
from action working set exceeding pod memory limit. \
operation_id={operation_id} last_worker={maybe_worker_id:?}",
awaited_action.attempts,
self.max_job_retries,
)),
..ActionResult::default()
})
} else {
ActionStage::Queued
}
}
// We shouldn't get here, but we just ignore it if we do.
UpdateOperationType::ExecutionComplete => {
warn!("inner_update_operation got an ExecutionComplete, that's unexpected.");
Expand Down
77 changes: 65 additions & 12 deletions nativelink-scheduler/src/store_awaited_action_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,16 +695,44 @@ where
ActionUniqueQualifier::Cacheable(_) => {}
ActionUniqueQualifier::Uncacheable(_) => return Ok(None),
}
let stream = self
.store
.search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
tokio::pin!(stream);
let maybe_awaited_action = stream
.try_next()
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
// Lookup, then on a miss retry once after a brief sleep. The
// backing index is `RediSearch`'s secondary index over the
// awaited-action hash-set; there is a sub-millisecond-scale
// window between a peer's HSET completing on the master and
// the index commit becoming visible to a concurrent reader.
// Two near-simultaneous `add_action` calls for the same
// `unique_qualifier` can both observe an empty result in that
// window and each create a separate operation, leading to
// duplicate scheduler operations for the same action digest
// (observed in production as "two same actions running on
// different PRs"). One short retry closes the window without
// a heavyweight atomic-claim mechanism.
//
// Uses real tokio time rather than the configurable `now_fn`
// sleep: `MockInstantWrapped::sleep` busy-yields until mock
// time advances, which scheduler tests don't do for this
// path, and the retry itself is a millisecond-scale physical
// wait that should never participate in test-controlled time.
const SUBSCRIBE_RACE_RETRY_DELAY: Duration = Duration::from_millis(20);
let mut maybe_awaited_action: Option<AwaitedAction> = None;
for attempt in 0..2_u32 {
if attempt > 0 {
tokio::time::sleep(SUBSCRIBE_RACE_RETRY_DELAY).await;
}
let stream = self
.store
.search_by_index_prefix(SearchUniqueQualifierToAwaitedAction(unique_qualifier))
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
tokio::pin!(stream);
maybe_awaited_action = stream
.try_next()
.await
.err_tip(|| "In RedisAwaitedActionDb::try_subscribe")?;
if maybe_awaited_action.is_some() {
break;
}
}
match maybe_awaited_action {
Some(awaited_action) => {
// TODO(palfrey) We don't support joining completed jobs because we
Expand Down Expand Up @@ -911,14 +939,39 @@ where
continue;
}

// Add the client_operation_id to operation_id mapping
// Add the client_operation_id → operation_id mapping with
// a bounded TTL.
//
// Pre-fix this was written with `None` (no TTL). When
// PR #2315 later attaches `retain_completed_for_s` to the
// matching `aa_*` on completion, the cid_* survives that
// expiry and becomes a permanent orphan: a subsequent
// `WaitExecution` resolves cid_* → operation_id whose
// `aa_*` is gone, the orphan path returns `NotFound`, and
// the client (Bazel) restarts `Execute` — which creates
// *another* unbounded cid_*. In production this produced
// ~3.8M cid_* keys with ~4.5% already orphaned and
// intermittent "lost-action" symptoms in long builds.
//
// We can't compute the exact correct TTL here because the
// action's full lifetime is unknown at insert time (it
// depends on queue wait + execute + retain_completed_for).
// A generous fixed upper bound keeps orphans from
// accumulating indefinitely while staying well above any
// realistic single-action runtime. 24h comfortably exceeds
// the longest production `max_action_executing_timeout_s +
// client_action_timeout_s + retain_completed_for_s`
// combinations observed in customer configs (~1500s) and
// bounds orphan accumulation to a single day's worth of
// builds at most.
const CLIENT_ID_MAPPING_TTL: Duration = Duration::from_secs(24 * 60 * 60);
self.store
.update_data(
UpdateClientIdToOperationId {
client_operation_id: client_operation_id.clone(),
operation_id: operation_id.clone(),
},
None,
Some(CLIENT_ID_MAPPING_TTL),
)
.await
.err_tip(|| "In RedisAwaitedActionDb::add_action while adding client mapping")?;
Expand Down
86 changes: 86 additions & 0 deletions nativelink-scheduler/tests/redis_store_awaited_action_db_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,89 @@ async fn test_orphaned_client_operation_id_returns_none() -> Result<(), Error> {

Ok(())
}

/// Regression test for the cid_* orphan accumulation bug.
///
/// Pre-fix, `add_action` wrote the `cid_<client_operation_id>` →
/// `operation_id` pointer with `expiry=None`. PR #2315 later started
/// attaching `retain_completed_for_s` TTL onto the matching `aa_*`
/// key on completion. The TTL mismatch meant: aa_* expired after
/// `retain_completed_for_s`, cid_* lingered forever. A subsequent
/// `WaitExecution` resolving the stale cid_* hit the orphan path,
/// returned `NotFound`, and the client (Bazel) restarted Execute,
/// creating *another* unbounded cid_*. In production this produced
/// ~3.8M cid_* keys with ~4.5% already orphaned.
///
/// This test asserts that `add_action` now writes the cid_* with a
/// bounded TTL. We don't pin the exact TTL value here (that's an
/// implementation detail of the fix), only that *some* TTL was
/// attached — exercising the path that previously passed `None`.
#[nativelink_test]
async fn add_action_attaches_ttl_to_cid_mapping() -> Result<(), Error> {
const CLIENT_OPERATION_ID: &str = "cid_ttl_test_client";
const WORKER_OPERATION_ID: &str = "cid_ttl_test_worker";
const SUB_CHANNEL: &str = "sub_channel";

let action_info = Arc::new(ActionInfo {
command_digest: DigestInfo::zero_digest(),
input_root_digest: DigestInfo::zero_digest(),
timeout: Duration::from_secs(1),
platform_properties: HashMap::new(),
priority: 0,
load_timestamp: SystemTime::UNIX_EPOCH,
insert_timestamp: SystemTime::UNIX_EPOCH,
unique_qualifier: ActionUniqueQualifier::Cacheable(ActionUniqueKey {
instance_name: INSTANCE_NAME.to_string(),
digest_function: DigestHasherFunc::Sha256,
digest: DigestInfo::zero_digest(),
}),
});

let fake_redis_backend: FakeRedisBackend<RedisSubscriptionManager> = FakeRedisBackend::new();
let fake_redis_port = fake_redis_backend.clone().run().await;
let spec = RedisSpec {
addresses: vec![format!("redis://127.0.0.1:{fake_redis_port}")],
experimental_pub_sub_channel: Some(SUB_CHANNEL.to_string()),
..Default::default()
};
let store = RedisStore::new_standard(spec).await.expect("Working spec");
fake_redis_backend.set_subscription_manager(store.subscription_manager().await.unwrap());

let notifier = Arc::new(Notify::new());
let awaited_action_db = StoreAwaitedActionDb::new(
store.clone(),
notifier.clone(),
MockInstantWrapped::default,
move || WORKER_OPERATION_ID.into(),
60,
)
.await
.unwrap();

let _subscription = awaited_action_db
.add_action(
CLIENT_OPERATION_ID.into(),
action_info.clone(),
Duration::from_mins(1),
)
.await
.unwrap();

// The cid_* key must have an EXPIRE attached. Pre-fix this map
// was empty for the cid_ key — the assertion below would fail.
let expiries = fake_redis_backend.expiries.lock().unwrap().clone();
let cid_key = format!("cid_{CLIENT_OPERATION_ID}");
let ttl = expiries.get(&cid_key).copied().unwrap_or_else(|| {
panic!(
"expected an EXPIRE on {cid_key} after add_action, but none was set. \
All recorded expiries: {expiries:?}"
)
});
assert!(
ttl > 0,
"cid_* TTL must be positive (got {ttl}); a 0 or negative TTL would \
immediately evict the mapping and break in-flight WaitExecution calls"
);

Ok(())
}
Loading
Loading