Skip to content

Commit 4b5410b

Browse files
authored
perf: Send engine callback before tracker bookkeeping and add tracing spans (#11970)
## Summary Moves the engine callback send in task `execute()` to fire immediately after output is flushed (`output_client.finish()`), before the tracker bookkeeping (`tracker.cached()`, `tracker.build_succeeded()`, etc.). This unblocks the engine's DAG walker sooner — dependent tasks can be dispatched while the completed task's run summary tracking happens in the background. | Repo | Tasks | Baseline | After | Delta | |------|-------|----------|-------|-------| | Small (~5 packages) | 5 | 164ms | 163ms | ~flat | | Medium (~125 packages) | 70 | 848ms | 864ms | ~flat | | Large (~1000 packages) | 1700 | 845ms | 804ms | **-41ms (-4.9%)** | Profiled with `--profile` across three monorepos of varying size (full cache hit scenario, median of 5 warm runs). ## Changes Also adds tracing spans throughout the dispatch loop and builder startup for profiling visibility: - `visit_recv_wait`, `task_cache_new`, `exec_context_new` in the dispatch loop - `micro_frontends_from_disk`, `task_access_setup`, `turbo_json_loader_setup`, `root_turbo_json_load`, `pkg_dep_graph_validate` in builder startup Output ordering is preserved because the callback fires after all log replay and output flushing is complete. - **`crates/turborepo-task-executor/src/exec.rs`** — Restructured `execute()` to send callback immediately after output flush, before tracker bookkeeping. - **`crates/turborepo-lib/src/task_graph/visitor/mod.rs`** — Added `visit_recv_wait`, `task_cache_new`, `exec_context_new` tracing spans. - **`crates/turborepo-lib/src/run/builder.rs`** — Added tracing spans for uninstrumented startup phases.
1 parent 75406f6 commit 4b5410b

File tree

3 files changed

+106
-68
lines changed

3 files changed

+106
-68
lines changed

crates/turborepo-lib/src/run/builder.rs

Lines changed: 55 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -260,13 +260,15 @@ impl RunBuilder {
260260
repo_telemetry.track_package_manager(pkg_dep_graph.package_manager().name().to_string());
261261
repo_telemetry.track_size(pkg_dep_graph.len());
262262
run_telemetry.track_run_type(self.opts.run_opts.dry_run.is_some());
263-
let micro_frontend_configs =
263+
let micro_frontend_configs = {
264+
let _span = tracing::info_span!("micro_frontends_from_disk").entered();
264265
match MicrofrontendsConfigs::from_disk(&self.repo_root, &pkg_dep_graph) {
265266
Ok(configs) => configs,
266267
Err(err) => {
267268
return Err(Error::MicroFrontends(err));
268269
}
269-
};
270+
}
271+
};
270272

271273
let (scm, repo_index) = scm_task
272274
.instrument(tracing::info_span!("scm_task_await"))
@@ -323,53 +325,66 @@ impl RunBuilder {
323325
)?
324326
};
325327

326-
let task_access = TaskAccess::new(self.repo_root.clone(), async_cache.clone(), &scm);
327-
task_access.restore_config().await;
328+
let task_access = {
329+
let _span = tracing::info_span!("task_access_setup").entered();
330+
let ta = TaskAccess::new(self.repo_root.clone(), async_cache.clone(), &scm);
331+
ta.restore_config().await;
332+
ta
333+
};
328334

329335
let root_turbo_json_path = self.opts.repo_opts.root_turbo_json_path.clone();
330336
let future_flags = self.opts.future_flags;
331337

332338
let reader = TurboJsonReader::new(self.repo_root.clone()).with_future_flags(future_flags);
333339

334-
let turbo_json_loader = if task_access.is_enabled() {
335-
UnifiedTurboJsonLoader::task_access(
336-
reader,
337-
root_turbo_json_path.clone(),
338-
root_package_json.clone(),
339-
)
340-
} else if is_single_package {
341-
UnifiedTurboJsonLoader::single_package(
342-
reader,
343-
root_turbo_json_path.clone(),
344-
root_package_json.clone(),
345-
)
346-
} else if !root_turbo_json_path.exists() &&
347-
// Infer a turbo.json if allowing no turbo.json is explicitly allowed or if MFE configs are discovered
348-
(self.opts.repo_opts.allow_no_turbo_json || micro_frontend_configs.is_some())
349-
{
350-
UnifiedTurboJsonLoader::workspace_no_turbo_json(
351-
reader,
352-
pkg_dep_graph.packages(),
353-
micro_frontend_configs.clone(),
354-
)
355-
} else if let Some(micro_frontends) = &micro_frontend_configs {
356-
UnifiedTurboJsonLoader::workspace_with_microfrontends(
357-
reader,
358-
root_turbo_json_path.clone(),
359-
pkg_dep_graph.packages(),
360-
micro_frontends.clone(),
361-
)
362-
} else {
363-
UnifiedTurboJsonLoader::workspace(
364-
reader,
365-
root_turbo_json_path.clone(),
366-
pkg_dep_graph.packages(),
367-
)
340+
let turbo_json_loader = {
341+
let _span = tracing::info_span!("turbo_json_loader_setup").entered();
342+
if task_access.is_enabled() {
343+
UnifiedTurboJsonLoader::task_access(
344+
reader,
345+
root_turbo_json_path.clone(),
346+
root_package_json.clone(),
347+
)
348+
} else if is_single_package {
349+
UnifiedTurboJsonLoader::single_package(
350+
reader,
351+
root_turbo_json_path.clone(),
352+
root_package_json.clone(),
353+
)
354+
} else if !root_turbo_json_path.exists() &&
355+
// Infer a turbo.json if allowing no turbo.json is explicitly allowed or if MFE configs are discovered
356+
(self.opts.repo_opts.allow_no_turbo_json || micro_frontend_configs.is_some())
357+
{
358+
UnifiedTurboJsonLoader::workspace_no_turbo_json(
359+
reader,
360+
pkg_dep_graph.packages(),
361+
micro_frontend_configs.clone(),
362+
)
363+
} else if let Some(micro_frontends) = &micro_frontend_configs {
364+
UnifiedTurboJsonLoader::workspace_with_microfrontends(
365+
reader,
366+
root_turbo_json_path.clone(),
367+
pkg_dep_graph.packages(),
368+
micro_frontends.clone(),
369+
)
370+
} else {
371+
UnifiedTurboJsonLoader::workspace(
372+
reader,
373+
root_turbo_json_path.clone(),
374+
pkg_dep_graph.packages(),
375+
)
376+
}
368377
};
369378

370-
let root_turbo_json = turbo_json_loader.load(&PackageName::Root)?.clone();
379+
let root_turbo_json = {
380+
let _span = tracing::info_span!("root_turbo_json_load").entered();
381+
turbo_json_loader.load(&PackageName::Root)?.clone()
382+
};
371383

372-
pkg_dep_graph.validate()?;
384+
{
385+
let _span = tracing::info_span!("pkg_dep_graph_validate").entered();
386+
pkg_dep_graph.validate()?;
387+
}
373388

374389
let filtered_pkgs = {
375390
let _span = tracing::info_span!("calculate_filtered_packages").entered();

crates/turborepo-lib/src/task_graph/visitor/mod.rs

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use futures::{stream::FuturesUnordered, StreamExt};
1515
use itertools::Itertools;
1616
use miette::{Diagnostic, NamedSource, SourceSpan};
1717
use tokio::sync::mpsc;
18-
use tracing::{debug, warn, Span};
18+
use tracing::{debug, warn, Instrument, Span};
1919
use turbopath::{AbsoluteSystemPath, AnchoredSystemPath};
2020
use turborepo_ci::{Vendor, VendorBehavior};
2121
use turborepo_engine::{TaskError, TaskWarning};
@@ -336,7 +336,14 @@ impl<'a> Visitor<'a> {
336336
let factory = ExecContextFactory::new(self, errors.clone(), self.manager.clone(), &engine)?;
337337
let cached_vendor_behavior = Vendor::infer().and_then(|vendor| vendor.behavior.as_ref());
338338

339-
while let Some(message) = node_stream.recv().await {
339+
loop {
340+
let message = node_stream
341+
.recv()
342+
.instrument(tracing::info_span!("visit_recv_wait"))
343+
.await;
344+
let Some(message) = message else {
345+
break;
346+
};
340347
let span = tracing::debug_span!(parent: &span, "queue_task", task = %message.info);
341348
let _enter = span.enter();
342349
let crate::engine::Message { info, callback } = message;
@@ -382,12 +389,11 @@ impl<'a> Visitor<'a> {
382389

383390
debug!("task {} hash is {}", info, task_hash);
384391

385-
let task_cache = self.run_cache.task_cache(
386-
task_definition,
387-
workspace_info,
388-
info.clone(),
389-
&task_hash,
390-
);
392+
let task_cache = {
393+
let _span = tracing::info_span!("task_cache_new").entered();
394+
self.run_cache
395+
.task_cache(task_definition, workspace_info, info.clone(), &task_hash)
396+
};
391397

392398
// Drop to avoid holding the span across an await
393399

@@ -405,15 +411,18 @@ impl<'a> Visitor<'a> {
405411
}
406412
false => {
407413
let takes_input = task_definition.interactive || task_definition.persistent;
408-
let Some(mut exec_context) = factory.exec_context(
409-
info.clone(),
410-
task_hash,
411-
task_cache,
412-
execution_env,
413-
takes_input,
414-
self.task_access.clone(),
415-
)?
416-
else {
414+
let exec_context = {
415+
let _span = tracing::info_span!("exec_context_new").entered();
416+
factory.exec_context(
417+
info.clone(),
418+
task_hash,
419+
task_cache,
420+
execution_env,
421+
takes_input,
422+
self.task_access.clone(),
423+
)?
424+
};
425+
let Some(mut exec_context) = exec_context else {
417426
continue;
418427
};
419428

crates/turborepo-task-executor/src/exec.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -287,41 +287,55 @@ where
287287
result = Err(InternalError::Io(e));
288288
}
289289

290-
match result {
291-
Ok(ExecOutcome::Success(outcome)) => {
292-
match outcome {
293-
SuccessOutcome::CacheHit => tracker.cached().await,
294-
SuccessOutcome::Run => tracker.build_succeeded(0).await,
295-
};
290+
// Send the callback as early as possible after output is flushed.
291+
// For cache hits, this unblocks dependent tasks before the tracker
292+
// bookkeeping runs, shaving latency off the critical path.
293+
match &result {
294+
Ok(ExecOutcome::Success(_)) => {
296295
callback.send(Ok(())).ok();
297296
}
298-
Ok(ExecOutcome::Task { exit_code, message }) => {
299-
tracker.build_failed(exit_code, message).await;
297+
Ok(ExecOutcome::Task { .. }) => {
300298
callback
301299
.send(match self.continue_on_error {
302300
ContinueMode::Always => Ok(()),
303301
ContinueMode::DependenciesSuccessful => Err(StopExecution::DependentTasks),
304302
ContinueMode::Never => Err(StopExecution::AllTasks),
305303
})
306304
.ok();
305+
}
306+
Ok(ExecOutcome::Shutdown) | Err(_) => {
307+
callback.send(Err(StopExecution::AllTasks)).ok();
308+
}
309+
Ok(ExecOutcome::Restarted) => {
310+
callback.send(Err(StopExecution::DependentTasks)).ok();
311+
}
312+
}
307313

314+
// Tracker bookkeeping happens after the callback so dependents
315+
// can start while we update summaries.
316+
match result {
317+
Ok(ExecOutcome::Success(outcome)) => {
318+
match outcome {
319+
SuccessOutcome::CacheHit => tracker.cached().await,
320+
SuccessOutcome::Run => tracker.build_succeeded(0).await,
321+
};
322+
}
323+
Ok(ExecOutcome::Task { exit_code, message }) => {
324+
tracker.build_failed(exit_code, message).await;
308325
match self.continue_on_error {
309326
ContinueMode::Always | ContinueMode::DependenciesSuccessful => (),
310327
ContinueMode::Never => self.manager.stop().await,
311328
}
312329
}
313330
Ok(ExecOutcome::Shutdown) => {
314331
tracker.cancel();
315-
callback.send(Err(StopExecution::AllTasks)).ok();
316332
self.manager.stop().await;
317333
}
318334
Ok(ExecOutcome::Restarted) => {
319335
tracker.cancel();
320-
callback.send(Err(StopExecution::DependentTasks)).ok();
321336
}
322337
Err(e) => {
323338
tracker.cancel();
324-
callback.send(Err(StopExecution::AllTasks)).ok();
325339
self.manager.stop().await;
326340
return Err(e);
327341
}

0 commit comments

Comments
 (0)