Skip to content

Commit 68efe24

Browse files
committed
dev-server: avoid top-level eventually-consistent reads
Under release-with-assertions (debug-assertions on), the dev server tripped debug_assert_not_in_top_level_task by reading task outputs eventually consistently from top-level run_once tasks, panicking worker threads and hanging the turbopack-cli benchmark. - Make IssueReporter::report_issues a plain async method (not a turbo-task) that takes pre-collected PlainIssues; collect them in a new collect_issues operation read strongly consistently in handle_issues. Mark ConsoleUi::new root so the reporter can be resolved strongly consistently. - Apply content-source side effects via a new apply_side_effects_operation read strongly consistently instead of awaiting each apply() at the top level. - Compute the initial HMR version in initial_version_operation and read the update-stream content strongly consistently in UpdateStream::new.
1 parent 2c94800 commit 68efe24

6 files changed

Lines changed: 125 additions & 50 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turbopack/crates/turbopack-cli-utils/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ workspace = true
1616

1717
[dependencies]
1818
anyhow = { workspace = true }
19+
async-trait = { workspace = true }
1920
crossterm = "0.26.0"
2021
owo-colors = { workspace = true }
2122
rustc-hash = { workspace = true }

turbopack/crates/turbopack-cli-utils/src/issue.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ use std::{
88
};
99

1010
use anyhow::Result;
11+
use async_trait::async_trait;
1112
use crossterm::style::{StyledContent, Stylize};
1213
use owo_colors::{OwoColorize as _, Style};
1314
use rustc_hash::{FxHashMap, FxHashSet};
1415
use turbo_rcstr::RcStr;
15-
use turbo_tasks::{RawVc, TransientInstance, TransientValue, Vc};
16+
use turbo_tasks::{RawVc, ReadRef, TransientInstance, Vc};
1617
use turbo_tasks_fs::{FileLinesContent, source_context::get_source_context};
1718
use turbopack_core::issue::{
18-
CollectibleIssuesExt, IssueFilter, IssueReporter, IssueSeverity, PlainIssue, PlainIssueSource,
19-
PlainTraceItem, StyledString,
19+
IssueReporter, IssueSeverity, PlainIssue, PlainIssueSource, PlainIssues, PlainTraceItem,
20+
StyledString,
2021
};
2122

2223
use crate::source_context::format_source_context_lines;
@@ -366,7 +367,7 @@ impl PartialEq for ConsoleUi {
366367

367368
#[turbo_tasks::value_impl]
368369
impl ConsoleUi {
369-
#[turbo_tasks::function]
370+
#[turbo_tasks::function(root)]
370371
pub fn new(options: TransientInstance<LogOptions>) -> Vc<Self> {
371372
ConsoleUi {
372373
options: (*options).clone(),
@@ -376,15 +377,15 @@ impl ConsoleUi {
376377
}
377378
}
378379

380+
#[async_trait]
379381
#[turbo_tasks::value_impl]
380382
impl IssueReporter for ConsoleUi {
381-
#[turbo_tasks::function]
382383
async fn report_issues(
383384
&self,
384-
source: TransientValue<RawVc>,
385+
issues: ReadRef<PlainIssues>,
386+
source: RawVc,
385387
min_failing_severity: IssueSeverity,
386-
) -> Result<Vc<bool>> {
387-
let issues = source.peek_issues();
388+
) -> Result<bool> {
388389
let LogOptions {
389390
ref current_dir,
390391
ref project_dir,
@@ -395,7 +396,7 @@ impl IssueReporter for ConsoleUi {
395396
} = self.options;
396397
let mut grouped_issues: GroupedIssues = FxHashMap::default();
397398

398-
let plain_issues = issues.get_plain_issues(&IssueFilter::everything()).await?;
399+
let plain_issues = &issues.0;
399400
let issues = plain_issues
400401
.iter()
401402
.map(|plain_issue| {
@@ -405,11 +406,7 @@ impl IssueReporter for ConsoleUi {
405406
.collect::<Vec<_>>();
406407

407408
let issue_ids = issues.iter().map(|(_, id)| *id).collect::<FxHashSet<_>>();
408-
let mut new_ids = self
409-
.seen
410-
.lock()
411-
.unwrap()
412-
.new_ids(source.into_value(), issue_ids);
409+
let mut new_ids = self.seen.lock().unwrap().new_ids(source, issue_ids);
413410

414411
let mut has_fatal = false;
415412
for (plain_issue, id) in issues {
@@ -539,7 +536,7 @@ impl IssueReporter for ConsoleUi {
539536
}
540537
}
541538

542-
Ok(Vc::cell(has_fatal))
539+
Ok(has_fatal)
543540
}
544541
}
545542

turbopack/crates/turbopack-core/src/issue/mod.rs

Lines changed: 66 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ use serde::{Deserialize, Serialize};
1616
use turbo_esregex::EsRegex;
1717
use turbo_rcstr::{RcStr, rcstr};
1818
use turbo_tasks::{
19-
CollectiblesSource, NonLocalValue, OperationVc, RawVc, ReadRef, ResolvedVc, TransientValue,
20-
TryFlatJoinIterExt, TryJoinIterExt, Upcast, ValueDefault, ValueToString, ValueToStringRef, Vc,
21-
emit, trace::TraceRawVcs,
19+
CollectiblesSource, NonLocalValue, OperationVc, RawVc, ReadRef, ResolvedVc, TryFlatJoinIterExt,
20+
TryJoinIterExt, Upcast, ValueDefault, ValueToString, ValueToStringRef, Vc, emit,
21+
trace::TraceRawVcs,
2222
};
2323
use turbo_tasks_fs::{
2424
FileContent, FileLine, FileLinesContent, FileSystem, FileSystemPath, glob::Glob,
@@ -904,6 +904,14 @@ pub struct PlainIssue {
904904
pub import_traces: Vec<PlainTrace>,
905905
}
906906

907+
/// A collection of [`PlainIssue`]s collected from a single source.
908+
///
909+
/// Returned by [`collect_issues`] so that the (plain) issues can be read strongly
910+
/// consistently from a top-level task and handed to a non-turbo-task [`IssueReporter`].
911+
#[turbo_tasks::value(serialization = "skip")]
912+
#[derive(Debug)]
913+
pub struct PlainIssues(pub Vec<ReadRef<PlainIssue>>);
914+
907915
#[turbo_tasks::value(serialization = "skip")]
908916
#[derive(Clone, Debug, PartialOrd, Ord)]
909917
pub struct PlainAdditionalIssueSource {
@@ -1062,24 +1070,30 @@ impl PlainSource {
10621070
}
10631071
}
10641072

1073+
#[async_trait]
10651074
#[turbo_tasks::value_trait]
10661075
pub trait IssueReporter {
1067-
/// Reports issues to the user (e.g. to stdio). Returns whether fatal
1076+
/// Reports already-collected issues to the user (e.g. to stdio). Returns whether fatal
10681077
/// (program-ending) issues were present.
10691078
///
1079+
/// This is intentionally *not* a `#[turbo_tasks::function]`: it performs no turbo-tasks
1080+
/// reads of its own (the issues are collected ahead of time by [`collect_issues`]), so it
1081+
/// is safe to call from a top-level task.
1082+
///
10701083
/// # Arguments:
10711084
///
1072-
/// * `source` - The root [Vc] from which issues are traced. Can be used by implementers to
1073-
/// determine which issues are new. This must be derived from the OperationVc so issues can
1074-
/// be collected.
1075-
/// * `min_failing_severity` - The minimum Vc<[IssueSeverity]>
1076-
/// The minimum issue severity level considered to fatally end the program.
1077-
#[turbo_tasks::function]
1078-
fn report_issues(
1079-
self: Vc<Self>,
1080-
source: TransientValue<RawVc>,
1085+
/// * `issues` - The plain issues already collected from the source.
1086+
/// * `source` - The root [`RawVc`] from which the issues were traced. Can be used by
1087+
/// implementers as a dedup key to determine which issues are new. This must be derived from
1088+
/// the `OperationVc` the issues were collected from.
1089+
/// * `min_failing_severity` - The minimum issue severity level considered to fatally end the
1090+
/// program.
1091+
async fn report_issues(
1092+
&self,
1093+
issues: ReadRef<PlainIssues>,
1094+
source: RawVc,
10811095
min_failing_severity: IssueSeverity,
1082-
) -> Vc<bool>;
1096+
) -> Result<bool>;
10831097
}
10841098

10851099
pub trait CollectibleIssuesExt
@@ -1117,6 +1131,21 @@ where
11171131
}
11181132
}
11191133

1134+
/// Collects all issues emitted by `source` as resolved [`PlainIssue`]s.
1135+
///
1136+
/// This is an `operation` function so its (plain) result can be read *strongly consistently* (via
1137+
/// [`OperationVc::read_strongly_consistent`]) from a top-level task without tripping the
1138+
/// eventually-consistent-read assertion. The per-issue `PlainIssue::from_issue` reads happen
1139+
/// *inside* this task, where eventually-consistent reads are legal.
1140+
#[turbo_tasks::function(operation, root)]
1141+
async fn collect_issues(source: OperationVc<()>) -> Result<Vc<PlainIssues>> {
1142+
let plain = source
1143+
.peek_issues()
1144+
.get_plain_issues(&IssueFilter::everything())
1145+
.await?;
1146+
Ok(PlainIssues(plain).cell())
1147+
}
1148+
11201149
/// A helper function to print out issues to the console.
11211150
///
11221151
/// Must be called in a turbo-task as this constructs a `cell`
@@ -1129,13 +1158,31 @@ pub async fn handle_issues<T: Send>(
11291158
) -> Result<()> {
11301159
let source_vc = source_op.connect();
11311160
let _ = source_op.resolve().strongly_consistent().await?;
1161+
let source_raw = Vc::into_raw(source_vc);
1162+
1163+
// Collect the issues in a dedicated `operation` task and read its *plain* result strongly
1164+
// consistently. This is safe at the top level (unlike an eventually-consistent read), while
1165+
// the per-issue reads happen inside `collect_issues`. The source is type-erased to
1166+
// `OperationVc<()>` so a single non-generic task can collect issues for any source.
1167+
let erased_source = OperationVc::<()>::try_from(source_raw)?;
1168+
let issues = collect_issues(erased_source)
1169+
.read_strongly_consistent()
1170+
.await?;
11321171

1133-
let has_fatal = issue_reporter.report_issues(
1134-
TransientValue::new(Vc::into_raw(source_vc)),
1135-
min_failing_severity,
1136-
);
1172+
// `report_issues` is a plain async method; reach it via a `TraitRef`. Resolve the reporter
1173+
// strongly consistently first so that `into_trait_ref` is a plain cell read (rather than an
1174+
// eventually-consistent task-output read) at the top level.
1175+
let reporter = issue_reporter
1176+
.to_resolved()
1177+
.strongly_consistent()
1178+
.await?
1179+
.into_trait_ref()
1180+
.await?;
1181+
let has_fatal = reporter
1182+
.report_issues(issues, source_raw, min_failing_severity)
1183+
.await?;
11371184

1138-
if *has_fatal.await? {
1185+
if has_fatal {
11391186
let mut message = "Fatal issue(s) occurred".to_owned();
11401187
if let Some(path) = path.as_ref() {
11411188
message += &format!(" in {path}");

turbopack/crates/turbopack-dev-server/src/lib.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ use socket2::{Domain, Protocol, Socket, Type};
3030
use tokio::task::JoinHandle;
3131
use tracing::{Instrument, Level, Span, event, info_span};
3232
use turbo_tasks::{
33-
Effects, NonLocalValue, OperationVc, PrettyPrintError, TurboTasksApi, Vc,
34-
read_strongly_consistent_and_apply_effects, run_once_with_reason, take_effects,
33+
Completion, Effects, NonLocalValue, OperationVc, PrettyPrintError, ResolvedVc, TurboTasksApi,
34+
Vc, read_strongly_consistent_and_apply_effects, run_once_with_reason, take_effects,
3535
trace::TraceRawVcs, util::FormatDuration,
3636
};
3737
use turbopack_core::issue::{IssueReporter, IssueSeverity, handle_issues};
@@ -71,6 +71,19 @@ async fn get_source_with_issues_operation(
7171
Ok(ContentSourceWithIssues { source_op, effects }.cell())
7272
}
7373

74+
/// Applies all collected [`ContentSourceSideEffect`]s. The individual `apply()` reads happen
75+
/// *inside* this task (where eventually-consistent reads are legal); the caller reads the result
76+
/// strongly consistently so the work is finished before the response is observed.
77+
#[turbo_tasks::function(operation, root)]
78+
async fn apply_side_effects_operation(
79+
side_effects: Vec<ResolvedVc<Box<dyn ContentSourceSideEffect>>>,
80+
) -> Result<Vc<Completion>> {
81+
for side_effect in &side_effects {
82+
side_effect.apply().await?;
83+
}
84+
Ok(Completion::new())
85+
}
86+
7487
#[derive(TraceRawVcs, Debug, NonLocalValue)]
7588
pub struct DevServerBuilder {
7689
#[turbo_tasks(trace_ignore)]
@@ -266,13 +279,17 @@ impl DevServerBuilder {
266279
);
267280
}
268281
if !side_effects.is_empty() {
282+
let side_effects: Vec<_> = side_effects.into_iter().collect();
269283
let join_handle = tokio::spawn(run_once_with_reason(
270284
tt.clone(),
271285
side_effects_reason,
272286
async move {
273-
for side_effect in side_effects {
274-
side_effect.apply().await?;
275-
}
287+
// Apply the side effects inside a dedicated `operation`
288+
// task and read its result strongly consistently, so this
289+
// top-level task performs no eventually-consistent read.
290+
apply_side_effects_operation(side_effects)
291+
.read_strongly_consistent()
292+
.await?;
276293
Ok(())
277294
},
278295
));

turbopack/crates/turbopack-dev-server/src/update/stream.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,22 @@ fn versioned_content_update_operation(
111111
content.update(*from)
112112
}
113113

114+
/// Computes the initial [`Version`] for an update stream from a resolved source request. Runs as
115+
/// an `operation` so [`UpdateStream::new`] can read it strongly consistently from its top-level
116+
/// task without performing an eventually-consistent read.
117+
#[turbo_tasks::function(operation, root)]
118+
async fn initial_version_operation(
119+
content: OperationVc<ResolveSourceRequestResult>,
120+
) -> Result<Vc<Box<dyn Version>>> {
121+
Ok(match *content.read_strongly_consistent().await? {
122+
ResolveSourceRequestResult::Static(static_content, _) => {
123+
static_content.await?.content.version()
124+
}
125+
ResolveSourceRequestResult::HttpProxy(proxy_result) => Vc::upcast(proxy_result.connect()),
126+
_ => Vc::upcast(NotFoundVersion::new()),
127+
})
128+
}
129+
114130
#[turbo_tasks::function(operation, root)]
115131
async fn get_update_stream_item_operation(
116132
resource: RcStr,
@@ -277,17 +293,13 @@ impl UpdateStream {
277293

278294
let content = get_content.call();
279295
// We can ignore issues reported in content here since [compute_update_stream]
280-
// will handle them
281-
let version = match *content.connect().await? {
282-
ResolveSourceRequestResult::Static(static_content, _) => {
283-
static_content.await?.content.version()
284-
}
285-
ResolveSourceRequestResult::HttpProxy(proxy_result) => {
286-
Vc::upcast(proxy_result.connect())
287-
}
288-
_ => Vc::upcast(NotFoundVersion::new()),
289-
};
290-
let version_state = VersionState::new(version.into_trait_ref().await?).await?;
296+
// will handle them. This runs in a top-level task (`UpdateServer::run`'s
297+
// `start_once_process`), so the initial version is computed in a dedicated `operation`
298+
// task (where the per-content reads are legal) and read strongly consistently.
299+
let version = initial_version_operation(content)
300+
.read_trait_strongly_consistent()
301+
.await?;
302+
let version_state = VersionState::new(version).await?;
291303

292304
let _ = compute_update_stream(
293305
resource,

0 commit comments

Comments
 (0)