Skip to content

Commit b3c0f46

Browse files
authored
perf: Parallelize task hash computation across topological waves (#11969)
## Summary Moves task hash computation out of the serial dispatch loop and into a parallel pre-computation phase using rayon. Tasks are grouped into topological waves by dependency depth, and each wave is hashed concurrently. Also parallelizes external dependency hash precomputation. Previously, every task's hash and execution environment were computed one-at-a-time inside the `while let` visitor dispatch loop. With ~1700 tasks at ~160μs each, this was ~270ms of serial work on the critical path. Now, the same work completes in ~53ms using rayon's thread pool. Also cleans up `Engine::dependencies()`/`dependents()` to return `Vec` instead of `HashSet`, and `calculate_dependency_hashes` to take a `&[&TaskNode]` slice, since all callers only iterate — no set operations are performed. ## Benchmarks Profiled with `--profile` across three monorepos of varying size (full cache hit scenario, median of 5 warm runs): | Repo | Tasks | Baseline | After | Delta | |------|-------|----------|-------|-------| | Small (~5 packages) | 15 | 170ms | 169ms | ~flat | | Medium (~125 packages) | 700 | 876ms | 876ms | ~flat | | Large (~1000 packages) | 1700 | 867ms | 775ms | **-92ms (-10.6%)** | `queue_task` self-time in the large repo dropped from **268ms to 78ms** (71% reduction). The improvement scales with task count. ## Changes - **`crates/turborepo-lib/src/task_graph/visitor/mod.rs`** — Added `precompute_task_hashes()` which groups tasks into topological waves and hashes each wave in parallel via rayon. The dispatch loop now looks up pre-computed hashes instead of computing them inline. - **`crates/turborepo-engine/src/lib.rs`** — `dependencies()`, `dependents()`, and `neighbors()` return `Vec` instead of `HashSet`. Updated `EngineInfo` impl's associated type accordingly. - **`crates/turborepo-task-hash/src/lib.rs`** — `calculate_task_hash` takes `&[&TaskNode]` instead of `HashSet<&TaskNode>`. `calculate_dependency_hashes` takes a slice. `precompute_external_deps_hashes` uses `par_iter` instead of a sequential loop.
1 parent 69a89b3 commit b3c0f46

File tree

3 files changed

+155
-41
lines changed
  • crates
    • turborepo-engine/src
    • turborepo-lib/src/task_graph/visitor
    • turborepo-task-hash/src

3 files changed

+155
-41
lines changed

crates/turborepo-engine/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -417,11 +417,11 @@ impl<T: TaskDefinitionInfo + Clone> Engine<Built, T> {
417417
}
418418
}
419419

420-
pub fn dependencies(&self, task_id: &TaskId) -> Option<HashSet<&TaskNode>> {
420+
pub fn dependencies(&self, task_id: &TaskId) -> Option<Vec<&TaskNode>> {
421421
self.neighbors(task_id, petgraph::Direction::Outgoing)
422422
}
423423

424-
pub fn dependents(&self, task_id: &TaskId) -> Option<HashSet<&TaskNode>> {
424+
pub fn dependents(&self, task_id: &TaskId) -> Option<Vec<&TaskNode>> {
425425
self.neighbors(task_id, petgraph::Direction::Incoming)
426426
}
427427

@@ -468,7 +468,7 @@ impl<T: TaskDefinitionInfo + Clone> Engine<Built, T> {
468468
&self,
469469
task_id: &TaskId,
470470
direction: petgraph::Direction,
471-
) -> Option<HashSet<&TaskNode>> {
471+
) -> Option<Vec<&TaskNode>> {
472472
let index = self.task_lookup.get(task_id)?;
473473
Some(
474474
self.task_graph
@@ -521,7 +521,7 @@ impl<T: TaskDefinitionInfo + Clone> Engine<Built, T> {
521521
// definitions and dependency information needed for run summaries.
522522
impl EngineInfo for Engine<Built, TaskDefinition> {
523523
type TaskIter<'a> = std::iter::FilterMap<
524-
std::collections::hash_set::IntoIter<&'a TaskNode>,
524+
std::vec::IntoIter<&'a TaskNode>,
525525
fn(&'a TaskNode) -> Option<&'a TaskId<'static>>,
526526
>;
527527

crates/turborepo-lib/src/task_graph/visitor/mod.rs

Lines changed: 140 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ mod exec;
33

44
use std::{
55
borrow::Cow,
6-
collections::HashSet,
6+
collections::{HashMap, HashSet},
77
io::Write,
88
sync::{Arc, Mutex},
99
};
@@ -184,6 +184,124 @@ impl<'a> Visitor<'a> {
184184
}
185185
}
186186

187+
/// Pre-compute task hashes and execution environments for all tasks in
188+
/// parallel. Tasks are processed in topological waves so dependency
189+
/// hashes are always available when needed. Returns a map from TaskId
190+
/// to (hash, execution_env).
191+
fn precompute_task_hashes(
192+
&self,
193+
engine: &Engine,
194+
telemetry: &GenericEventBuilder,
195+
) -> Result<HashMap<TaskId<'static>, (String, EnvironmentVariableMap)>, Error> {
196+
use petgraph::algo::toposort;
197+
use rayon::prelude::*;
198+
use turborepo_engine::TaskNode;
199+
200+
let graph = engine.task_graph();
201+
let mut sorted = toposort(graph, None).map_err(|_| Error::MissingDefinition)?;
202+
// toposort returns dependents before dependencies (edges point
203+
// dependent→dependency via Outgoing). Reverse so dependencies
204+
// come first.
205+
sorted.reverse();
206+
207+
// Compute depth (topological level) for each node so we can process
208+
// independent tasks in parallel within each wave. Dependencies
209+
// (Outgoing neighbors) must have lower depth.
210+
let mut depth: HashMap<petgraph::graph::NodeIndex, usize> = HashMap::new();
211+
for &node_idx in &sorted {
212+
let max_dep_depth = graph
213+
.neighbors_directed(node_idx, petgraph::Direction::Outgoing)
214+
.filter_map(|dep| depth.get(&dep))
215+
.max()
216+
.copied();
217+
let d = match max_dep_depth {
218+
Some(dd) => dd + 1,
219+
None => 0,
220+
};
221+
depth.insert(node_idx, d);
222+
}
223+
224+
let max_depth = depth.values().max().copied().unwrap_or(0);
225+
226+
// Group task nodes by depth level.
227+
let mut waves: Vec<Vec<petgraph::graph::NodeIndex>> = vec![Vec::new(); max_depth + 1];
228+
for &node_idx in &sorted {
229+
let d = depth[&node_idx];
230+
waves[d].push(node_idx);
231+
}
232+
233+
let results: Arc<Mutex<HashMap<TaskId<'static>, (String, EnvironmentVariableMap)>>> =
234+
Arc::new(Mutex::new(HashMap::with_capacity(sorted.len())));
235+
236+
// Process each wave in parallel. Within a wave, all dependencies
237+
// have already been hashed in earlier waves.
238+
for wave in &waves {
239+
type HashResult =
240+
Result<Option<(TaskId<'static>, String, EnvironmentVariableMap)>, Error>;
241+
let wave_results: Vec<HashResult> = wave
242+
.par_iter()
243+
.map(|&node_idx| {
244+
let node = &graph[node_idx];
245+
let TaskNode::Task(task_id) = node else {
246+
return Ok(None);
247+
};
248+
249+
let package_name = PackageName::from(task_id.package());
250+
let workspace_info = self
251+
.package_graph
252+
.package_info(&package_name)
253+
.ok_or_else(|| Error::MissingPackage {
254+
package_name: package_name.clone(),
255+
task_id: task_id.clone(),
256+
})?;
257+
258+
let task_definition = engine
259+
.task_definition(task_id)
260+
.ok_or(Error::MissingDefinition)?;
261+
262+
let task_env_mode = task_definition.env_mode.unwrap_or(self.global_env_mode);
263+
264+
let dependency_set = engine
265+
.dependencies(task_id)
266+
.ok_or(Error::MissingDefinition)?;
267+
268+
let package_task_event =
269+
PackageTaskEventBuilder::new(task_id.package(), task_id.task())
270+
.with_parent(telemetry);
271+
package_task_event.track_env_mode(&task_env_mode.to_string());
272+
273+
let task_hash_telemetry = package_task_event.child();
274+
let task_hash = self.task_hasher.calculate_task_hash(
275+
task_id,
276+
task_definition,
277+
task_env_mode,
278+
workspace_info,
279+
&dependency_set,
280+
task_hash_telemetry,
281+
)?;
282+
283+
let execution_env =
284+
self.task_hasher
285+
.env(task_id, task_env_mode, task_definition)?;
286+
287+
Ok(Some((task_id.clone(), task_hash, execution_env)))
288+
})
289+
.collect();
290+
291+
let mut map = results.lock().expect("precompute lock poisoned");
292+
for result in wave_results {
293+
if let Some((task_id, hash, env)) = result? {
294+
map.insert(task_id, (hash, env));
295+
}
296+
}
297+
}
298+
299+
Ok(Arc::try_unwrap(results)
300+
.expect("all wave references dropped")
301+
.into_inner()
302+
.expect("mutex not poisoned"))
303+
}
304+
187305
#[tracing::instrument(skip_all)]
188306
pub async fn visit(
189307
&self,
@@ -194,6 +312,16 @@ impl<'a> Visitor<'a> {
194312
self.color_cache.color_for_key(&task.to_string());
195313
}
196314

315+
// Pre-compute all task hashes and execution envs in parallel using
316+
// rayon. Tasks are grouped into topological waves so that each
317+
// task's dependency hashes are available before it is hashed.
318+
// This replaces the per-task serial hashing that was inside the
319+
// dispatch loop.
320+
let precomputed = {
321+
let _span = tracing::info_span!("precompute_task_hashes").entered();
322+
self.precompute_task_hashes(&engine, telemetry)?
323+
};
324+
197325
let concurrency = self.run_opts.concurrency as usize;
198326
let (node_sender, mut node_stream) = mpsc::channel(concurrency);
199327

@@ -222,12 +350,13 @@ impl<'a> Visitor<'a> {
222350
task_id: info.clone(),
223351
})?;
224352

225-
let package_task_event =
226-
PackageTaskEventBuilder::new(info.package(), info.task()).with_parent(telemetry);
227353
let command = workspace_info.package_json.scripts.get(info.task());
228354

229355
match command {
230356
Some(cmd) if info.package() == ROOT_PKG_NAME && turbo_regex().is_match(cmd) => {
357+
let package_task_event =
358+
PackageTaskEventBuilder::new(info.package(), info.task())
359+
.with_parent(telemetry);
231360
package_task_event.track_error(TrackedErrors::RecursiveError);
232361
let (span, text) = cmd.span_and_text("package.json");
233362

@@ -245,20 +374,11 @@ impl<'a> Visitor<'a> {
245374
.task_definition(&info)
246375
.ok_or(Error::MissingDefinition)?;
247376

248-
let task_env_mode = task_definition.env_mode.unwrap_or(self.global_env_mode);
249-
package_task_event.track_env_mode(&task_env_mode.to_string());
250-
251-
let dependency_set = engine.dependencies(&info).ok_or(Error::MissingDefinition)?;
252-
253-
let task_hash_telemetry = package_task_event.child();
254-
let task_hash = self.task_hasher.calculate_task_hash(
255-
&info,
256-
task_definition,
257-
task_env_mode,
258-
workspace_info,
259-
dependency_set,
260-
task_hash_telemetry,
261-
)?;
377+
// Look up pre-computed hash and env instead of computing them here.
378+
let (task_hash, execution_env) = precomputed
379+
.get(&info)
380+
.ok_or(Error::MissingDefinition)?
381+
.clone();
262382

263383
debug!("task {} hash is {}", info, task_hash);
264384

@@ -284,13 +404,6 @@ impl<'a> Visitor<'a> {
284404
}));
285405
}
286406
false => {
287-
// Compute execution env only when we actually need it (not
288-
// during dry runs). The task_hasher is !Send so this must
289-
// happen in the dispatch loop rather than inside the spawned task.
290-
let execution_env =
291-
self.task_hasher
292-
.env(&info, task_env_mode, task_definition)?;
293-
294407
let takes_input = task_definition.interactive || task_definition.persistent;
295408
let Some(mut exec_context) = factory.exec_context(
296409
info.clone(),
@@ -301,11 +414,6 @@ impl<'a> Visitor<'a> {
301414
self.task_access.clone(),
302415
)?
303416
else {
304-
// TODO(gsoltis): if/when we fix https://github.com/vercel/turborepo/issues/937
305-
// the following block should never get hit. In the meantime, keep it after
306-
// hashing so that downstream tasks can count on the hash existing
307-
//
308-
// bail if the script doesn't exist or is empty
309417
continue;
310418
};
311419

@@ -319,6 +427,9 @@ impl<'a> Visitor<'a> {
319427

320428
let tracker = self.run_tracker.track_task(info.clone().into_owned());
321429
let parent_span = Span::current();
430+
let package_task_event =
431+
PackageTaskEventBuilder::new(info.package(), info.task())
432+
.with_parent(telemetry);
322433
let execution_telemetry = package_task_event.child();
323434

324435
tasks.push(tokio::spawn(async move {

crates/turborepo-task-hash/src/lib.rs

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,14 @@ impl<'a, R: RunOptsHashInfo> TaskHasher<'a, R> {
297297
if self.run_opts.single_package() {
298298
return;
299299
}
300-
for (name, info) in workspaces {
301-
let hash = get_external_deps_hash(&info.transitive_dependencies);
302-
self.external_deps_hash_cache
303-
.insert(name.as_str().to_owned(), hash);
304-
}
300+
let ws: Vec<_> = workspaces.collect();
301+
self.external_deps_hash_cache = ws
302+
.par_iter()
303+
.map(|(name, info)| {
304+
let hash = get_external_deps_hash(&info.transitive_dependencies);
305+
(name.as_str().to_owned(), hash)
306+
})
307+
.collect();
305308
}
306309

307310
#[tracing::instrument(skip(self, task_definition, task_env_mode, workspace, dependency_set))]
@@ -311,7 +314,7 @@ impl<'a, R: RunOptsHashInfo> TaskHasher<'a, R> {
311314
task_definition: &T,
312315
task_env_mode: EnvMode,
313316
workspace: &PackageInfo,
314-
dependency_set: HashSet<&TaskNode>,
317+
dependency_set: &[&TaskNode],
315318
telemetry: PackageTaskEventBuilder,
316319
) -> Result<String, Error> {
317320
let do_framework_inference = self.run_opts.framework_inference();
@@ -453,7 +456,7 @@ impl<'a, R: RunOptsHashInfo> TaskHasher<'a, R> {
453456
/// returns: Result<Vec<String, Global>, Error>
454457
fn calculate_dependency_hashes(
455458
&self,
456-
dependency_set: HashSet<&TaskNode>,
459+
dependency_set: &[&TaskNode],
457460
) -> Result<Vec<Arc<str>>, Error> {
458461
let state = self
459462
.task_hash_tracker
@@ -462,7 +465,7 @@ impl<'a, R: RunOptsHashInfo> TaskHasher<'a, R> {
462465
.expect("hash tracker rwlock poisoned");
463466

464467
let mut dependency_hash_list: Vec<Arc<str>> = Vec::with_capacity(dependency_set.len());
465-
for dependency_task in &dependency_set {
468+
for dependency_task in dependency_set {
466469
let TaskNode::Task(dependency_task_id) = dependency_task else {
467470
continue;
468471
};

0 commit comments

Comments
 (0)