Skip to content

Commit 96dc4e3

Browse files
committed
migration: Send IsReady to all upstream full mat nodes
On restart, we have a race condition that causes a panic when "recovering" the `DfState`. This occurs on joins with fully materialized nodes, when replay begins. Before replay, all nodes will be sent a `Ready` message, but base nodes (which must open files from disk), "ready" themselves asynchronously - meaning, the Leader does not block for them to become ready. The problem we are encountering is when a replay that contains a join node begins, the `DomainMigrationPlan` ensures only the left-side is ready (by sending an `IsReady` message). The right-side has been sent a `Ready` message, but we have not guaranteed that it is, indeed, "ready". This CL checks each ReplayPath, in the `DomainMigrationPlan`'s `Plan` phase, and if the path contains fully-materialized nodes, it checks to see if there's a join node. If there is a join, it walks up the DAG to look for both base nodes and fully materialized nodes. The indices of those nodes are added to a set, and a further part of the `DomainMigrationPlan::commit()` will send an `IsReady` message to those additional nodes. This will not cause any additional traffic at startup as there's already a `HashSet` in `DomainMigrationPlan::commit()` which tracks if a `IsReady` has already been sent to a target node. Fixes: REA-5126 Change-Id: Icbc815ce0a4aa4ed1b03aa734c785122c0839cbb Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8563 Tested-by: Buildkite CI Reviewed-by: Michael Zink <[email protected]>
1 parent 3d43048 commit 96dc4e3

File tree

2 files changed

+55
-0
lines changed
  • readyset-server/src/controller/migrate/materialization

2 files changed

+55
-0
lines changed

readyset-server/src/controller/migrate/materialization/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1325,6 +1325,11 @@ impl Materializations {
13251325
},
13261326
)?;
13271327
}
1328+
for (domain, node) in pending.additional_ancestors {
1329+
if non_ready_nodes.remove(&(domain, node)) {
1330+
dmp.add_message(domain, DomainRequest::IsReady { node })?;
1331+
}
1332+
}
13281333

13291334
dmp.add_message(
13301335
pending.source_domain,

readyset-server/src/controller/migrate/materialization/plan.rs

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ pub(super) struct PendingReplay {
6868
pub(super) source: LocalNodeIndex,
6969
pub(super) source_domain: DomainIndex,
7070
pub(super) target_domain: DomainIndex,
71+
72+
// If the segment contained a join, we need to ensure any fully materialized
73+
// nodes in all ancestors are `Ready` before we start the replay. Else on restart, we have a
74+
// race condition between the replay hitting the join node and doing a lookup on the other
75+
// parent, and the other parent actually having it's base tables opened.
76+
pub(super) additional_ancestors: HashSet<(DomainIndex, LocalNodeIndex)>,
7177
}
7278

7379
impl<'a> Plan<'a> {
@@ -286,6 +292,45 @@ impl<'a> Plan<'a> {
286292
.collect()
287293
}
288294

295+
/// Find the ancestors of any joins for the given path. This is to ensure that all parents
296+
/// are `Ready` (and will have been sent an `IsReady` message) before we start a replay.
297+
///
298+
/// We assume that since the node traversal is topologically sorted (see
299+
/// `Materializations::commit()`), any parent node would have been sent a `Ready` message.
300+
///
301+
/// To do this, we look for all join nodes in the provided `path`, and walk the tree of
302+
/// the each parent. We then find all fully materialized nodes in all ancestors.
303+
fn find_additional_ancestors(
304+
&self,
305+
path: &RawReplayPath,
306+
) -> HashSet<(DomainIndex, LocalNodeIndex)> {
307+
let mut additional_ancestors = HashSet::new();
308+
309+
let mut seen = HashSet::new();
310+
for segment in path.segments() {
311+
let n = &self.graph[segment.node];
312+
if n.is_join().is_ok_and(|b| b) {
313+
// as the `path.segments()` iterator traverses downward through the graph,
314+
// we can skip ancestors of the current segment we've already looked at.
315+
let mut queue = n.ancestors().unwrap_or_default();
316+
queue.retain(|a| !seen.contains(a));
317+
318+
while let Some(parent) = queue.pop() {
319+
let node = &self.graph[parent];
320+
if node.is_base() || node.requires_full_materialization() {
321+
additional_ancestors.insert((node.domain(), node.local_addr()));
322+
} else {
323+
queue.extend(node.ancestors().unwrap_or_default());
324+
}
325+
326+
seen.insert(segment.node);
327+
}
328+
}
329+
}
330+
331+
additional_ancestors
332+
}
333+
289334
/// Finds the appropriate replay paths for the given index, and inform all domains on those
290335
/// paths about them. It also notes if any data backfills will need to be run, which is
291336
/// eventually reported back by `finalize`.
@@ -718,6 +763,10 @@ impl<'a> Plan<'a> {
718763
}
719764
}
720765
} else {
766+
// for full materializations, we need to identifiy the parents that
767+
// are also fully materialized, and make sure to wait for them to be `Ready`.
768+
let additional_ancestors = self.find_additional_ancestors(&path);
769+
721770
// for full materializations, the last domain should report when it's done
722771
if i == segments.len() - 1 {
723772
if let DomainRequest::SetupReplayPath {
@@ -742,6 +791,7 @@ impl<'a> Plan<'a> {
742791
.last()
743792
.ok_or_else(|| internal_err!())?
744793
.0,
794+
additional_ancestors,
745795
});
746796
}
747797
}

0 commit comments

Comments
 (0)