Skip to content

Commit a33e868

Browse files
committed
perf: Early engine callback for confirmed cache hits
During the precompute phase, check local cache existence for each task (a cheap stat call). For confirmed cache hits, send the engine callback immediately in the dispatch loop instead of waiting for the full cache restore cycle. This unblocks dependent tasks sooner, reducing the DAG walk idle wait. Exposes sync exists_local() through the cache stack so rayon threads can check cache existence without async overhead.
1 parent cdfea7e commit a33e868

File tree

6 files changed

+91
-31
lines changed

6 files changed

+91
-31
lines changed

crates/turborepo-cache/src/async_cache.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ impl AsyncCache {
165165
self.real_cache.exists(key).await
166166
}
167167

168+
/// Synchronous local-only cache existence check.
169+
pub fn exists_local(&self, key: &str) -> Result<Option<CacheHitMetadata>, CacheError> {
170+
self.real_cache.exists_local(key)
171+
}
172+
168173
#[tracing::instrument(skip_all)]
169174
pub async fn fetch(
170175
&self,

crates/turborepo-cache/src/fs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl FSCache {
124124
}
125125

126126
#[tracing::instrument(skip_all)]
127-
pub(crate) fn exists(&self, hash: &str) -> Result<Option<CacheHitMetadata>, CacheError> {
127+
pub fn exists(&self, hash: &str) -> Result<Option<CacheHitMetadata>, CacheError> {
128128
let cache_dir = self.cache_directory.as_str();
129129
let mut buf = String::with_capacity(cache_dir.len() + 1 + hash.len() + "-meta.json".len());
130130
buf.push_str(cache_dir);

crates/turborepo-cache/src/multiplexer.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -180,30 +180,39 @@ impl CacheMultiplexer {
180180

181181
#[tracing::instrument(skip_all)]
182182
pub async fn exists(&self, key: &str) -> Result<Option<CacheHitMetadata>, CacheError> {
183-
if self.cache_config.local.read
184-
&& let Some(fs) = &self.fs
183+
if let Some(hit) = self.exists_local(key)? {
184+
return Ok(Some(hit));
185+
}
186+
187+
if self.cache_config.remote.read
188+
&& let Some(http) = self.get_http_cache()
185189
{
186-
match fs.exists(key) {
190+
match http.exists(key).await {
187191
cache_hit @ Ok(Some(_)) => {
188192
return cache_hit;
189193
}
190194
Ok(None) => {}
191-
Err(err) => debug!("failed to check fs cache: {:?}", err),
195+
Err(err) => debug!("failed to check http cache: {:?}", err),
192196
}
193197
}
194198

195-
if self.cache_config.remote.read
196-
&& let Some(http) = self.get_http_cache()
199+
Ok(None)
200+
}
201+
202+
/// Synchronous local-only cache existence check. Safe to call from
203+
/// rayon threads or any non-async context.
204+
pub fn exists_local(&self, key: &str) -> Result<Option<CacheHitMetadata>, CacheError> {
205+
if self.cache_config.local.read
206+
&& let Some(fs) = &self.fs
197207
{
198-
match http.exists(key).await {
208+
match fs.exists(key) {
199209
cache_hit @ Ok(Some(_)) => {
200210
return cache_hit;
201211
}
202212
Ok(None) => {}
203-
Err(err) => debug!("failed to check http cache: {:?}", err),
213+
Err(err) => debug!("failed to check fs cache: {:?}", err),
204214
}
205215
}
206-
207216
Ok(None)
208217
}
209218
}

crates/turborepo-lib/src/task_graph/visitor/mod.rs

Lines changed: 42 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,15 @@ use crate::{
4646
},
4747
};
4848

49+
/// Pre-computed data for a single task: hash, execution environment,
50+
/// and whether the local cache already contains a hit for this hash.
51+
#[derive(Debug)]
52+
struct PrecomputedTask {
53+
hash: String,
54+
execution_env: EnvironmentVariableMap,
55+
cache_hit: bool,
56+
}
57+
4958
// This holds the whole world
5059
pub struct Visitor<'a> {
5160
color_cache: ColorSelector,
@@ -186,13 +195,13 @@ impl<'a> Visitor<'a> {
186195

187196
/// Pre-compute task hashes and execution environments for all tasks in
188197
/// parallel. Tasks are processed in topological waves so dependency
189-
/// hashes are always available when needed. Returns a map from TaskId
190-
/// to (hash, execution_env).
198+
/// hashes are always available when needed. Also checks cache existence
199+
/// for each task so the dispatch loop can fast-path cache hits.
191200
fn precompute_task_hashes(
192201
&self,
193202
engine: &Engine,
194203
telemetry: &GenericEventBuilder,
195-
) -> Result<HashMap<TaskId<'static>, (String, EnvironmentVariableMap)>, Error> {
204+
) -> Result<HashMap<TaskId<'static>, PrecomputedTask>, Error> {
196205
use petgraph::algo::toposort;
197206
use rayon::prelude::*;
198207
use turborepo_engine::TaskNode;
@@ -230,14 +239,13 @@ impl<'a> Visitor<'a> {
230239
waves[d].push(node_idx);
231240
}
232241

233-
let results: Arc<Mutex<HashMap<TaskId<'static>, (String, EnvironmentVariableMap)>>> =
242+
let results: Arc<Mutex<HashMap<TaskId<'static>, PrecomputedTask>>> =
234243
Arc::new(Mutex::new(HashMap::with_capacity(sorted.len())));
235244

236245
// Process each wave in parallel. Within a wave, all dependencies
237246
// have already been hashed in earlier waves.
238247
for wave in &waves {
239-
type HashResult =
240-
Result<Option<(TaskId<'static>, String, EnvironmentVariableMap)>, Error>;
248+
type HashResult = Result<Option<(TaskId<'static>, PrecomputedTask)>, Error>;
241249
let wave_results: Vec<HashResult> = wave
242250
.par_iter()
243251
.map(|&node_idx| {
@@ -284,14 +292,24 @@ impl<'a> Visitor<'a> {
284292
self.task_hasher
285293
.env(task_id, task_env_mode, task_definition)?;
286294

287-
Ok(Some((task_id.clone(), task_hash, execution_env)))
295+
let cache_hit =
296+
task_definition.cache && self.run_cache.exists_local(&task_hash);
297+
298+
Ok(Some((
299+
task_id.clone(),
300+
PrecomputedTask {
301+
hash: task_hash,
302+
execution_env,
303+
cache_hit,
304+
},
305+
)))
288306
})
289307
.collect();
290308

291309
let mut map = results.lock().expect("precompute lock poisoned");
292310
for result in wave_results {
293-
if let Some((task_id, hash, env)) = result? {
294-
map.insert(task_id, (hash, env));
311+
if let Some((task_id, precomputed)) = result? {
312+
map.insert(task_id, precomputed);
295313
}
296314
}
297315
}
@@ -381,11 +399,11 @@ impl<'a> Visitor<'a> {
381399
.task_definition(&info)
382400
.ok_or(Error::MissingDefinition)?;
383401

384-
// Look up pre-computed hash and env instead of computing them here.
385-
let (task_hash, execution_env) = precomputed
386-
.get(&info)
387-
.ok_or(Error::MissingDefinition)?
388-
.clone();
402+
// Look up pre-computed hash, env, and cache status.
403+
let precomputed_task = precomputed.get(&info).ok_or(Error::MissingDefinition)?;
404+
let task_hash = precomputed_task.hash.clone();
405+
let execution_env = precomputed_task.execution_env.clone();
406+
let confirmed_cache_hit = precomputed_task.cache_hit;
389407

390408
debug!("task {} hash is {}", info, task_hash);
391409

@@ -395,6 +413,16 @@ impl<'a> Visitor<'a> {
395413
.task_cache(task_definition, workspace_info, info.clone(), &task_hash)
396414
};
397415

416+
// For confirmed cache hits (checked in parallel during
417+
// precompute), signal the engine immediately so dependents
418+
// can start without waiting for the full cache restore.
419+
let callback = if confirmed_cache_hit {
420+
callback.send(Ok(())).ok();
421+
None
422+
} else {
423+
Some(callback)
424+
};
425+
398426
// Drop to avoid holding the span across an await
399427

400428
drop(_enter);

crates/turborepo-run-cache/src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,15 @@ impl RunCache {
102102
}
103103
}
104104

105+
/// Synchronous local-only cache existence check. Can be called from
106+
/// rayon threads during hash precomputation.
107+
pub fn exists_local(&self, hash: &str) -> bool {
108+
if self.reads_disabled {
109+
return false;
110+
}
111+
matches!(self.cache.exists_local(hash), Ok(Some(_)))
112+
}
113+
105114
pub fn task_cache(
106115
self: &Arc<Self>,
107116
// TODO: Group these in a struct

crates/turborepo-task-executor/src/exec.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ where
265265
parent_span_id: Option<tracing::Id>,
266266
tracker: TaskTracker<()>,
267267
output_client: TaskOutput<O>,
268-
callback: oneshot::Sender<Result<(), StopExecution>>,
268+
callback: Option<oneshot::Sender<Result<(), StopExecution>>>,
269269
telemetry: &PackageTaskEventBuilder,
270270
) -> Result<(), InternalError> {
271271
let tracker: TaskTracker<chrono::DateTime<chrono::Local>> = tracker.start().await;
@@ -293,17 +293,20 @@ where
293293
SuccessOutcome::CacheHit => tracker.cached().await,
294294
SuccessOutcome::Run => tracker.build_succeeded(0).await,
295295
};
296-
callback.send(Ok(())).ok();
296+
if let Some(cb) = callback {
297+
cb.send(Ok(())).ok();
298+
}
297299
}
298300
Ok(ExecOutcome::Task { exit_code, message }) => {
299301
tracker.build_failed(exit_code, message).await;
300-
callback
301-
.send(match self.continue_on_error {
302+
if let Some(cb) = callback {
303+
cb.send(match self.continue_on_error {
302304
ContinueMode::Always => Ok(()),
303305
ContinueMode::DependenciesSuccessful => Err(StopExecution::DependentTasks),
304306
ContinueMode::Never => Err(StopExecution::AllTasks),
305307
})
306308
.ok();
309+
}
307310

308311
match self.continue_on_error {
309312
ContinueMode::Always | ContinueMode::DependenciesSuccessful => (),
@@ -312,16 +315,22 @@ where
312315
}
313316
Ok(ExecOutcome::Shutdown) => {
314317
tracker.cancel();
315-
callback.send(Err(StopExecution::AllTasks)).ok();
318+
if let Some(cb) = callback {
319+
cb.send(Err(StopExecution::AllTasks)).ok();
320+
}
316321
self.manager.stop().await;
317322
}
318323
Ok(ExecOutcome::Restarted) => {
319324
tracker.cancel();
320-
callback.send(Err(StopExecution::DependentTasks)).ok();
325+
if let Some(cb) = callback {
326+
cb.send(Err(StopExecution::DependentTasks)).ok();
327+
}
321328
}
322329
Err(e) => {
323330
tracker.cancel();
324-
callback.send(Err(StopExecution::AllTasks)).ok();
331+
if let Some(cb) = callback {
332+
cb.send(Err(StopExecution::AllTasks)).ok();
333+
}
325334
self.manager.stop().await;
326335
return Err(e);
327336
}

0 commit comments

Comments
 (0)