Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions crates/dag/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// SPDX-License-Identifier: MPL-2.0

use petgraph::{
Direction,
prelude::DiGraph,
visit::{Dfs, Topo, Walker},
};
Expand Down Expand Up @@ -103,6 +104,34 @@ where
topo.iter(&self.0).map(|i| &self.0[i])
}

/// Returns batches of nodes that can be executed in parallel.
pub fn batched_topo(&self) -> Vec<Vec<N>>
where
N: Ord,
{
let mut g = self.0.clone();
let mut batches = Vec::new();

while g.node_count() > 0 {
let mut sources: Vec<_> = g.externals(Direction::Incoming).collect();
if sources.is_empty() && g.node_count() > 0 {
// Cycle detected.
break;
}

let batch_nodes: Vec<_> = sources.iter().map(|&i| g[i].clone()).collect();
batches.push(batch_nodes);

// Reverse index before removing nodes to avoid graph invalidation (dupes in batches)
sources.sort_by_key(|&idx| std::cmp::Reverse(idx.index()));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this further, not sure I understand how removing these nodes in any order causes graph invalidation. Nodes should be removable in any order?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

petgraph seems to move nodes around once you remove one, invalidating the just collected externals. Reversing means you remove by highest indices first keeping the externals valid (at least from what i can tell)

Cloning the graph into a StableGraph instead should also work without requiring the reverse trick

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense 👍

for ix in sources {
g.remove_node(ix);
}
}
batches
}

/// Transpose the graph, returning the clone
pub fn transpose(&self) -> Self {
let mut transposed = self.0.clone();
Expand All @@ -120,3 +149,94 @@ where
self.0.node_indices().find(|i| self.0[*i] == *node)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_batched_linear_dag() {
let mut graph: Dag<i32> = Dag::new();

// A -> B -> C -> D
let a = graph.add_node_or_get_index(&1);
let b = graph.add_node_or_get_index(&2);
let c = graph.add_node_or_get_index(&3);
let d = graph.add_node_or_get_index(&4);

graph.add_edge(a, b);
graph.add_edge(b, c);
graph.add_edge(c, d);

let batches = graph.batched_topo();

// Each node is in its own batch (sequential)
assert_eq!(batches.len(), 4);
for batch in &batches {
assert_eq!(batch.len(), 1);
}
}

#[test]
fn test_topo_batched_simple_dag() {
let mut graph: Dag<usize> = Dag::new();

// Create a simple DAG:
// A -> C -> E
// B -> D -> E
let a = graph.add_node_or_get_index(&1);
let b = graph.add_node_or_get_index(&2);
let c = graph.add_node_or_get_index(&3);
let d = graph.add_node_or_get_index(&4);
let e = graph.add_node_or_get_index(&5);

graph.add_edge(a, c);
graph.add_edge(b, d);
graph.add_edge(c, e);
graph.add_edge(d, e);

let batches = graph.batched_topo();

assert_eq!(batches.len(), 3);

// TODO: How tf do i get node value from A to E?

// Batch 0: A and B (no dependencies)
assert_eq!(batches[0].len(), 2);
assert!(batches[0].contains(&1));
assert!(batches[0].contains(&2));

// Batch 1: C and D
assert_eq!(batches[1].len(), 2);
assert!(batches[1].contains(&3));
assert!(batches[1].contains(&4));

// Batch 2: E
assert_eq!(batches[2].len(), 1);
assert!(batches[2].contains(&5));
}

#[test]
fn test_topo_batched_fully_parallel() {
let mut graph: Dag<char> = Dag::new();

// Four independent nodes
let _a = graph.add_node_or_get_index(&'A');
let _b = graph.add_node_or_get_index(&'B');
let _c = graph.add_node_or_get_index(&'C');
let _d = graph.add_node_or_get_index(&'D');

let batches = graph.batched_topo();

// All nodes in one batch (fully parallel)
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].len(), 4);
}

#[test]
fn test_topo_batched_empty_graph() {
let graph: Dag<i32> = Dag::new();
let batches = graph.batched_topo();
assert_eq!(batches.len(), 0);
}
}
22 changes: 15 additions & 7 deletions crates/triggers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl<'a> Collection<'a> {
}
}

/// Bake the trigger collection into a sane dependency order
pub fn bake(&mut self) -> Result<Vec<format::CompiledHandler>, Error> {
/// Bake the trigger collection into a sane dependency order, grouped by parallelizable stages.
pub fn bake_in_stages(&mut self) -> Result<Vec<Vec<format::CompiledHandler>>, Error> {
let mut graph = dag::Dag::new();

// ensure all keys are in place
Expand Down Expand Up @@ -117,11 +117,19 @@ impl<'a> Collection<'a> {
}
}

// Recollect in dependency order
let results = graph
.topo()
.filter_map(|i| self.hits.remove(i))
.flatten()
// Recollect in dependency order batches
let stages = graph.batched_topo();

let results = stages
.into_iter()
.map(|stage| {
stage
.iter()
.filter_map(|id| self.hits.get(id))
.flatten()
.cloned()
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();
Ok(results)
}
Expand Down
35 changes: 16 additions & 19 deletions moss/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,19 +414,21 @@ impl Client {
fn apply_triggers(scope: TriggerScope<'_>, fstree: &vfs::Tree<PendingFile>) -> Result<(), postblit::Error> {
let triggers = postblit::triggers(scope, fstree)?;

let progress = ProgressBar::new(triggers.len() as u64).with_style(
let total_items: u64 = triggers.iter().map(|batch| batch.len() as u64).sum();

let progress_bar = ProgressBar::new(total_items).with_style(
ProgressStyle::with_template("\n|{bar:20.green/blue}| {pos}/{len} {msg}")
.unwrap()
.progress_chars("■≡=- "),
);

let phase_name = match &scope {
TriggerScope::Transaction(_, _) => {
progress.set_message("Running transaction-scope triggers");
progress_bar.set_message("Running transaction-scope triggers");
"transaction-scope-triggers"
}
TriggerScope::System(_, _) => {
progress.set_message("Running system-scope triggers");
progress_bar.set_message("Running system-scope triggers");
"system-scope-triggers"
}
};
Expand All @@ -435,37 +437,32 @@ impl Client {

info!(
phase = phase_name,
total_items = triggers.len(),
total_items = total_items,
progress = 0.0,
event_type = "progress_start",
);

for (i, trigger) in progress.wrap_iter(triggers.iter()).enumerate() {
trigger.execute()?;

let trigger_command = match trigger.handler() {
triggers::format::Handler::Run { run, .. } => run.clone(),
triggers::format::Handler::Delete { .. } => "delete operation".to_owned(),
};
postblit::execute_triggers(scope, &triggers, |progress| {
progress_bar.set_position(progress.completed);
info!(
progress = (i + 1) as f32 / triggers.len() as f32,
current = i + 1,
total = triggers.len(),
progress = progress.completed as f32 / total_items as f32,
current = progress.completed,
total = total_items,
event_type = "progress_update",
"Executing {}",
trigger_command
"Executing {:?}",
progress.item
);
}
})?;

info!(
phase = phase_name,
duration_ms = timer.elapsed().as_millis(),
items_processed = triggers.len(),
items_processed = total_items,
progress = 1.0,
event_type = "progress_completed",
);

progress.finish_and_clear();
progress_bar.finish_and_clear();

Ok(())
}
Expand Down
Loading