Skip to content

Commit 40ab22b

Browse files
committed
moss: Execute triggers in parallel batches
Use the batched_topo() method to return a vec of vecs and execute each batch of triggers in parallel with rayon Refactor so we only enter the isolation container once for each scope of triggers executed (if neccessary) Add progress callbacks so we can continue to present a smooth progress bar to the user despite the batched execution On a VM with 4 vCPUs on an NVMe drive formatted with ext4, total wall time for installing a simple package is reduced from ~21secs to ~13secs. moss: Postblit code cleanup Add common entry to execute triggers and split off common functionality into it's own function moss: Fix percentage calculation, improved callbacks
1 parent ee35d21 commit 40ab22b

File tree

3 files changed

+153
-72
lines changed

3 files changed

+153
-72
lines changed

crates/triggers/src/lib.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ impl<'a> Collection<'a> {
7878
}
7979
}
8080

81-
/// Bake the trigger collection into a sane dependency order
82-
pub fn bake(&mut self) -> Result<Vec<format::CompiledHandler>, Error> {
81+
/// Bake the trigger collection into a sane dependency order, grouped by parallelizable stages.
82+
pub fn bake_in_stages(&mut self) -> Result<Vec<Vec<format::CompiledHandler>>, Error> {
8383
let mut graph = dag::Dag::new();
8484

8585
// ensure all keys are in place
@@ -117,11 +117,19 @@ impl<'a> Collection<'a> {
117117
}
118118
}
119119

120-
// Recollect in dependency order
121-
let results = graph
122-
.topo()
123-
.filter_map(|i| self.hits.remove(i))
124-
.flatten()
120+
// Recollect in dependency order batches
121+
let stages = graph.batched_topo();
122+
123+
let results = stages
124+
.into_iter()
125+
.map(|stage| {
126+
stage
127+
.iter()
128+
.filter_map(|id| self.hits.get(id))
129+
.flatten()
130+
.cloned()
131+
.collect::<Vec<_>>()
132+
})
125133
.collect::<Vec<_>>();
126134
Ok(results)
127135
}

moss/src/client/mod.rs

Lines changed: 16 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -414,19 +414,21 @@ impl Client {
414414
fn apply_triggers(scope: TriggerScope<'_>, fstree: &vfs::Tree<PendingFile>) -> Result<(), postblit::Error> {
415415
let triggers = postblit::triggers(scope, fstree)?;
416416

417-
let progress = ProgressBar::new(triggers.len() as u64).with_style(
417+
let total_items: u64 = triggers.iter().map(|batch| batch.len() as u64).sum();
418+
419+
let progress_bar = ProgressBar::new(total_items).with_style(
418420
ProgressStyle::with_template("\n|{bar:20.green/blue}| {pos}/{len} {msg}")
419421
.unwrap()
420422
.progress_chars("■≡=- "),
421423
);
422424

423425
let phase_name = match &scope {
424426
TriggerScope::Transaction(_, _) => {
425-
progress.set_message("Running transaction-scope triggers");
427+
progress_bar.set_message("Running transaction-scope triggers");
426428
"transaction-scope-triggers"
427429
}
428430
TriggerScope::System(_, _) => {
429-
progress.set_message("Running system-scope triggers");
431+
progress_bar.set_message("Running system-scope triggers");
430432
"system-scope-triggers"
431433
}
432434
};
@@ -435,37 +437,32 @@ impl Client {
435437

436438
info!(
437439
phase = phase_name,
438-
total_items = triggers.len(),
440+
total_items = total_items,
439441
progress = 0.0,
440442
event_type = "progress_start",
441443
);
442444

443-
for (i, trigger) in progress.wrap_iter(triggers.iter()).enumerate() {
444-
trigger.execute()?;
445-
446-
let trigger_command = match trigger.handler() {
447-
triggers::format::Handler::Run { run, .. } => run.clone(),
448-
triggers::format::Handler::Delete { .. } => "delete operation".to_owned(),
449-
};
445+
postblit::execute_triggers(scope, &triggers, |progress| {
446+
progress_bar.set_position(progress.completed);
450447
info!(
451-
progress = (i + 1) as f32 / triggers.len() as f32,
452-
current = i + 1,
453-
total = triggers.len(),
448+
progress = progress.completed as f32 / total_items as f32,
449+
current = progress.completed,
450+
total = total_items,
454451
event_type = "progress_update",
455-
"Executing {}",
456-
trigger_command
452+
"Executing {:?}",
453+
progress.item
457454
);
458-
}
455+
})?;
459456

460457
info!(
461458
phase = phase_name,
462459
duration_ms = timer.elapsed().as_millis(),
463-
items_processed = triggers.len(),
460+
items_processed = total_items,
464461
progress = 1.0,
465462
event_type = "progress_completed",
466463
);
467464

468-
progress.finish_and_clear();
465+
progress_bar.finish_and_clear();
469466

470467
Ok(())
471468
}

moss/src/client/postblit.rs

Lines changed: 122 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@
1111
use std::{
1212
path::{Path, PathBuf},
1313
process,
14+
sync::atomic::{AtomicUsize, Ordering},
1415
};
1516

1617
use crate::Installation;
1718
use container::Container;
1819
use itertools::Itertools;
20+
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
1921
use serde::Deserialize;
2022
use thiserror::Error;
2123
use tracing::{error, warn};
@@ -100,13 +102,19 @@ impl TriggerScope<'_> {
100102
}
101103

102104
/// Condensed type for loaded triggers with scope and executor
103-
#[derive(Debug)]
104-
pub(super) struct TriggerRunner<'a> {
105-
scope: TriggerScope<'a>,
105+
pub(super) struct TriggerRunner {
106106
trigger: CompiledHandler,
107107
}
108108

109-
/// Load all triggers matching the given scope and staging filesystem
109+
/// Progress callback handler
110+
#[derive(Debug, Clone)]
111+
pub struct Progress<'a> {
112+
pub completed: u64,
113+
pub item: &'a str,
114+
}
115+
116+
/// Load all triggers matching the given scope and staging filesystem, return in batches
117+
/// suitable for concurrent/parallel processing.
110118
///
111119
/// # Arguments
112120
///
@@ -115,7 +123,7 @@ pub(super) struct TriggerRunner<'a> {
115123
pub(super) fn triggers<'a>(
116124
scope: TriggerScope<'a>,
117125
fstree: &vfs::tree::Tree<PendingFile>,
118-
) -> Result<Vec<TriggerRunner<'a>>, Error> {
126+
) -> Result<Vec<Vec<TriggerRunner>>, Error> {
119127
// Pre-calculate trigger root path once
120128
let trigger_root = {
121129
let mut path = PathBuf::with_capacity(50);
@@ -145,57 +153,125 @@ pub(super) fn triggers<'a>(
145153
// Load trigger collection, process all the paths, convert to scoped TriggerRunner vec
146154
let mut collection = triggers::Collection::new(triggers.iter())?;
147155
collection.process_paths(fstree.iter().map(|m| m.to_string()));
148-
let computed_commands = collection
149-
.bake()?
156+
let batches = collection
157+
.bake_in_stages()?
150158
.into_iter()
151-
.map(|trigger| TriggerRunner { scope, trigger })
159+
.map(|batch| batch.into_iter().map(|trigger| TriggerRunner { trigger }).collect_vec())
152160
.collect_vec();
153-
Ok(computed_commands)
161+
Ok(batches)
154162
}
155163

156-
impl TriggerRunner<'_> {
157-
pub fn handler(&self) -> &Handler {
158-
self.trigger.handler()
164+
/// Execute triggers based on TriggerScope
165+
///
166+
/// Execute either transaction or system scope triggers using container sandboxing as necessary
167+
pub fn execute_triggers(
168+
scope: TriggerScope<'_>,
169+
triggers: &[Vec<TriggerRunner>],
170+
on_progress: impl Fn(Progress<'_>) + Send + Sync,
171+
) -> Result<(), Error> {
172+
match scope {
173+
scope @ TriggerScope::Transaction(install, _) => {
174+
execute_transaction_triggers(install, scope, triggers, &on_progress)?;
175+
}
176+
scope @ TriggerScope::System(install, _) => {
177+
execute_system_triggers(install, scope, triggers, &on_progress)?;
178+
}
179+
};
180+
181+
Ok(())
182+
}
183+
184+
/// Execute transaction triggers
185+
///
186+
/// Transaction triggers are run via sandboxing ([`container::Container`]) to limit their
187+
/// system view, and limit write access. Each batch of triggers are executed in parallel
188+
/// to speed up execution time.
189+
fn execute_transaction_triggers<P>(
190+
install: &Installation,
191+
scope: TriggerScope<'_>,
192+
triggers: &[Vec<TriggerRunner>],
193+
on_progress: P,
194+
) -> Result<(), Error>
195+
where
196+
P: Fn(Progress<'_>) + Send + Sync,
197+
{
198+
// TODO: Add caching support via /var/
199+
let isolation = Container::new(install.isolation_dir())
200+
.networking(false)
201+
.bind_ro(scope.host_path("etc"), "/etc")
202+
.bind_rw(scope.guest_path("usr"), "/usr")
203+
.work_dir("/");
204+
205+
isolation.run(|| execute_triggers_directly(triggers, &on_progress))?;
206+
207+
Ok(())
208+
}
209+
210+
/// Execute system triggers
211+
///
212+
/// System triggers will execute without any sandboxing when moss is used directly against the
213+
/// live root filesystem, and will force sandboxing when using a non-`/` root (such as using the
214+
/// `-D argument with `moss install`). Each batch of triggers is executed in parallel to speed up
215+
/// execution time.
216+
fn execute_system_triggers<P>(
217+
install: &Installation,
218+
scope: TriggerScope<'_>,
219+
triggers: &[Vec<TriggerRunner>],
220+
on_progress: P,
221+
) -> Result<(), Error>
222+
where
223+
P: Fn(Progress<'_>) + Send + Sync,
224+
{
225+
// OK, if the root == `/` then we can run directly, otherwise we need to containerise with RW.
226+
if install.root.to_string_lossy() == "/" {
227+
execute_triggers_directly(triggers, on_progress)?;
228+
} else {
229+
let isolation = Container::new(install.isolation_dir())
230+
.networking(false)
231+
.bind_rw(scope.host_path("etc"), "/etc")
232+
.bind_rw(scope.guest_path("usr"), "/usr")
233+
.work_dir("/");
234+
235+
isolation.run(|| execute_triggers_directly(triggers, &on_progress))?;
159236
}
237+
Ok(())
238+
}
160239

161-
/// Execute a trigger, taking care to account for the transaction scope and client scope
162-
///
163-
/// All transaction triggers are run via sandboxing ([`container::Container`]) to limit their
164-
/// system view, and limit write access.
165-
/// System triggers will execute without any sandboxing when moss is used directly against the
166-
/// live root filesystem, and will force sandboxing when using a non-`/` root (such as using the
167-
/// `-D argument with `moss install`)
168-
pub fn execute(&self) -> Result<(), Error> {
169-
match self.scope {
170-
TriggerScope::Transaction(install, _) => {
171-
// TODO: Add caching support via /var/
172-
let isolation = Container::new(install.isolation_dir())
173-
.networking(false)
174-
.bind_ro(self.scope.host_path("etc"), "/etc")
175-
.bind_rw(self.scope.guest_path("usr"), "/usr")
176-
.work_dir("/");
177-
178-
Ok(isolation.run(|| execute_trigger_directly(&self.trigger))?)
179-
}
180-
TriggerScope::System(install, _) => {
181-
// OK, if the root == `/` then we can run directly, otherwise we need to containerise with RW.
182-
if install.root.to_string_lossy() == "/" {
183-
Ok(execute_trigger_directly(&self.trigger)?)
184-
} else {
185-
let isolation = Container::new(install.isolation_dir())
186-
.networking(false)
187-
.bind_rw(self.scope.host_path("etc"), "/etc")
188-
.bind_rw(self.scope.guest_path("usr"), "/usr")
189-
.work_dir("/");
190-
191-
Ok(isolation.run(|| execute_trigger_directly(&self.trigger))?)
192-
}
193-
}
194-
}
240+
impl TriggerRunner {
241+
pub fn handler(&self) -> &Handler {
242+
self.trigger.handler()
195243
}
196244
}
197245

198246
/// Internal executor for triggers.
247+
fn execute_triggers_directly<P>(triggers: &[Vec<TriggerRunner>], on_progress: P) -> Result<(), Error>
248+
where
249+
P: Fn(Progress<'_>) + Send + Sync,
250+
{
251+
let rayon_runtime = rayon::ThreadPoolBuilder::new().build().expect("rayon runtime");
252+
253+
let counter = AtomicUsize::new(0);
254+
255+
rayon_runtime.install(|| {
256+
triggers.iter().try_for_each(|batch| {
257+
batch.par_iter().try_for_each(|trigger| {
258+
let res = execute_trigger_directly(&trigger.trigger);
259+
let completed = counter.fetch_add(1, Ordering::Relaxed);
260+
(on_progress)(Progress {
261+
completed: completed as u64,
262+
item: match trigger.handler() {
263+
Handler::Run { run, .. } => run,
264+
Handler::Delete { .. } => "delete operation",
265+
},
266+
});
267+
res
268+
})
269+
})
270+
})?;
271+
Ok(())
272+
}
273+
274+
/// Internal executor for individual triggers.
199275
fn execute_trigger_directly(trigger: &CompiledHandler) -> Result<(), Error> {
200276
match trigger.handler() {
201277
Handler::Run { run, args } => {

0 commit comments

Comments
 (0)