Skip to content

Commit 94f6393

Browse files
authored
sm-executor: stop dropping completed job results (#163)
* sm-executor: stop dropping completed job results * style: fix nightly rustfmt import grouping * sm-executor: drop STF completion errors * test: fix rebased sm-executor helper call * chore: ignore sharded functional test artifacts
1 parent 8d0cdfe commit 94f6393

File tree

8 files changed

+871
-26
lines changed

8 files changed

+871
-26
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ target/
6464
# Tests artifacts
6565
mutants.out*
6666
lcov.info
67+
functional-tests/_dd_parallel_shard*/
6768

6869
# Unit test / coverage reports
6970
htmlcov/
@@ -97,4 +98,4 @@ fuzz-*.log
9798

9899
# Docker volume data
99100
docker/vol/fdb/data/
100-
docker/vol/mosaic-*/tables/
101+
docker/vol/mosaic-*/tables/

crates/job/api/src/submission.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl JobActions {
102102
///
103103
/// The peer ID on [`JobCompletion`] plus the variant here (garbler vs
104104
/// evaluator) identifies which SM to deliver to.
105-
#[derive(Debug)]
105+
#[derive(Debug, Clone)]
106106
pub enum ActionCompletion {
107107
/// Garbler tracked action completed.
108108
Garbler {
@@ -180,7 +180,7 @@ impl ActionCompletion {
180180
///
181181
/// Jobs always retry internally until they succeed — every submitted action
182182
/// eventually produces exactly one completion. There is no failure variant.
183-
#[derive(Debug)]
183+
#[derive(Debug, Clone)]
184184
pub struct JobCompletion {
185185
/// The peer whose SM this result should be routed to.
186186
pub peer_id: PeerId,

crates/job/scheduler/src/garbling/mod.rs

Lines changed: 100 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ use mosaic_job_api::{
5858
use mosaic_net_svc_api::PeerId;
5959
use tracing::Instrument;
6060

61+
use crate::SchedulerFault;
62+
6163
/// Size of each v5c gate record in bytes (3 × u32).
6264
const GATE_SIZE: usize = 12;
6365

@@ -259,10 +261,11 @@ impl GarblingCoordinator {
259261
/// Jobs submitted via [`submit`](Self::submit) are collected into batches,
260262
/// sessions are created via the `factory`, and workers process them
261263
/// concurrently.
262-
pub fn new(
264+
pub(crate) fn new(
263265
config: GarblingConfig,
264266
factory: Arc<dyn SessionFactory>,
265267
completion_tx: kanal::AsyncSender<JobCompletion>,
268+
fault_tx: kanal::AsyncSender<SchedulerFault>,
266269
) -> Self {
267270
let (submit_tx, submit_rx) = kanal::bounded_async(config.max_concurrent * 2);
268271

@@ -273,7 +276,13 @@ impl GarblingCoordinator {
273276
.enable_timer()
274277
.build()
275278
.expect("failed to build monoio runtime for garbling coordinator")
276-
.block_on(coordinator_loop(config, factory, submit_rx, completion_tx));
279+
.block_on(coordinator_loop(
280+
config,
281+
factory,
282+
submit_rx,
283+
completion_tx,
284+
fault_tx,
285+
));
277286
})
278287
.expect("failed to spawn garbling coordinator thread");
279288

@@ -319,6 +328,7 @@ async fn coordinator_loop(
319328
factory: Arc<dyn SessionFactory>,
320329
submit_rx: kanal::AsyncReceiver<PendingCircuitJob>,
321330
completion_tx: kanal::AsyncSender<JobCompletion>,
331+
fault_tx: kanal::AsyncSender<SchedulerFault>,
322332
) {
323333
let span = tracing::info_span!(
324334
"job_scheduler.garbling_coordinator",
@@ -450,6 +460,7 @@ async fn coordinator_loop(
450460
sessions,
451461
&mut workers,
452462
&completion_tx,
463+
&fault_tx,
453464
&mut pending_retry,
454465
)
455466
.instrument(tracing::info_span!(
@@ -491,6 +502,7 @@ async fn run_pass(
491502
sessions: Vec<ActiveSession>,
492503
workers: &mut [WorkerHandle],
493504
completion_tx: &kanal::AsyncSender<JobCompletion>,
505+
fault_tx: &kanal::AsyncSender<SchedulerFault>,
494506
pending_retry: &mut Vec<PendingCircuitJob>,
495507
) {
496508
let n_workers = workers.len();
@@ -545,6 +557,7 @@ async fn run_pass(
545557
&mut active_jobs_by_worker,
546558
workers,
547559
completion_tx,
560+
fault_tx,
548561
pending_retry,
549562
)
550563
.await;
@@ -615,6 +628,7 @@ async fn run_pass(
615628
&mut active_jobs_by_worker,
616629
workers,
617630
completion_tx,
631+
fault_tx,
618632
Duration::from_secs(60),
619633
pending_retry,
620634
)
@@ -680,13 +694,15 @@ async fn collect_finish_reports(
680694
active_jobs_by_worker: &mut HashMap<usize, Vec<PendingCircuitJob>>,
681695
workers: &mut [WorkerHandle],
682696
completion_tx: &kanal::AsyncSender<JobCompletion>,
697+
fault_tx: &kanal::AsyncSender<SchedulerFault>,
683698
pending_retry: &mut Vec<PendingCircuitJob>,
684699
) {
685700
collect_finish_reports_with_timeout(
686701
active_worker_ids,
687702
active_jobs_by_worker,
688703
workers,
689704
completion_tx,
705+
fault_tx,
690706
Duration::from_secs(60),
691707
pending_retry,
692708
)
@@ -700,6 +716,7 @@ async fn collect_finish_reports_with_timeout(
700716
active_jobs_by_worker: &mut HashMap<usize, Vec<PendingCircuitJob>>,
701717
workers: &mut [WorkerHandle],
702718
completion_tx: &kanal::AsyncSender<JobCompletion>,
719+
fault_tx: &kanal::AsyncSender<SchedulerFault>,
703720
finish_timeout: Duration,
704721
pending_retry: &mut Vec<PendingCircuitJob>,
705722
) {
@@ -719,11 +736,18 @@ async fn collect_finish_reports_with_timeout(
719736
Some(WorkerReport::FinishDone(report)) => {
720737
active_jobs_by_worker.remove(&wid);
721738
for completion in report.completions {
739+
let peer_id = completion.peer_id;
722740
if completion_tx.send(completion).await.is_err() {
723741
tracing::error!(
724742
worker = wid,
725743
"completion channel closed while forwarding finish report"
726744
);
745+
let _ = fault_tx
746+
.send(SchedulerFault::CompletionChannelClosed {
747+
source: "garbling_coordinator",
748+
peer_id,
749+
})
750+
.await;
727751
return;
728752
}
729753
}
@@ -756,7 +780,7 @@ async fn collect_finish_reports_with_timeout(
756780
///
757781
/// Receives sessions and chunk commands from the coordinator's main thread.
758782
/// Processes sessions sequentially per chunk (parallel across workers).
759-
/// Sends completions directly to the SM via `completion_tx`.
783+
/// Reports completed results back to the coordinator.
760784
async fn worker_loop(
761785
id: usize,
762786
chunk_timeout: Duration,
@@ -959,7 +983,7 @@ fn convert_block(block: &Block, num_gates: usize) -> OwnedBlock {
959983

960984
#[cfg(test)]
961985
mod tests {
962-
use std::{future::Future, pin::Pin, sync::Arc};
986+
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration};
963987

964988
use mosaic_cac_types::{
965989
GarblingSeed, Seed,
@@ -1045,6 +1069,7 @@ mod tests {
10451069
}];
10461070

10471071
let (completion_tx, _completion_rx) = kanal::bounded_async(1);
1072+
let (fault_tx, _fault_rx) = kanal::bounded_async(1);
10481073
let mut pending_retry = Vec::new();
10491074
run_pass(
10501075
&GarblingConfig {
@@ -1057,6 +1082,7 @@ mod tests {
10571082
sessions,
10581083
&mut workers,
10591084
&completion_tx,
1085+
&fault_tx,
10601086
&mut pending_retry,
10611087
)
10621088
.await;
@@ -1123,13 +1149,15 @@ mod tests {
11231149

11241150
let mut active_jobs_by_worker = HashMap::from([(0usize, vec![sample_job(5)])]);
11251151
let (completion_tx, completion_rx) = kanal::bounded_async(2);
1152+
let (fault_tx, _fault_rx) = kanal::bounded_async(1);
11261153
let mut pending_retry = Vec::new();
11271154

11281155
collect_finish_reports(
11291156
&[0],
11301157
&mut active_jobs_by_worker,
11311158
&mut workers,
11321159
&completion_tx,
1160+
&fault_tx,
11331161
&mut pending_retry,
11341162
)
11351163
.await;
@@ -1162,13 +1190,15 @@ mod tests {
11621190
let job = sample_job(6);
11631191
let mut active_jobs_by_worker = HashMap::from([(0usize, vec![job.clone()])]);
11641192
let (completion_tx, _completion_rx) = kanal::bounded_async(1);
1193+
let (fault_tx, _fault_rx) = kanal::bounded_async(1);
11651194
let mut pending_retry = Vec::new();
11661195

11671196
collect_finish_reports_with_timeout(
11681197
&[0],
11691198
&mut active_jobs_by_worker,
11701199
&mut workers,
11711200
&completion_tx,
1201+
&fault_tx,
11721202
Duration::from_millis(1),
11731203
&mut pending_retry,
11741204
)
@@ -1179,4 +1209,70 @@ mod tests {
11791209
assert_eq!(pending_retry[0].peer_id, job.peer_id);
11801210
});
11811211
}
1212+
1213+
#[test]
1214+
fn closed_completion_channel_reports_scheduler_fault() {
1215+
run_monoio(async {
1216+
let peer_id = PeerId::from([11; 32]);
1217+
let (command_tx, _command_rx) = kanal::bounded_async(2);
1218+
let (report_tx, report_rx) = kanal::bounded_async(2);
1219+
let (completion_tx, completion_rx) = kanal::bounded_async(1);
1220+
let (fault_tx, fault_rx) = kanal::bounded_async(1);
1221+
drop(completion_rx);
1222+
1223+
report_tx
1224+
.send(WorkerReport::FinishDone(FinishReport {
1225+
completions: vec![JobCompletion {
1226+
peer_id,
1227+
completion: ActionCompletion::Garbler {
1228+
id: ActionId::SendCommitMsgChunk(0),
1229+
result: ActionResult::CommitMsgChunkAcked,
1230+
},
1231+
}],
1232+
retry_jobs: vec![],
1233+
}))
1234+
.await
1235+
.expect("send finish report");
1236+
1237+
let mut workers = vec![WorkerHandle {
1238+
id: 0,
1239+
command_tx,
1240+
report_rx,
1241+
thread: None,
1242+
}];
1243+
let mut active_jobs_by_worker = HashMap::from([(
1244+
0usize,
1245+
vec![PendingCircuitJob {
1246+
peer_id,
1247+
action: CircuitAction::GarblerTransfer {
1248+
seed: GarblingSeed::from([5; 32]),
1249+
},
1250+
}],
1251+
)]);
1252+
let mut pending_retry = Vec::new();
1253+
1254+
collect_finish_reports_with_timeout(
1255+
&[0],
1256+
&mut active_jobs_by_worker,
1257+
&mut workers,
1258+
&completion_tx,
1259+
&fault_tx,
1260+
Duration::from_millis(1),
1261+
&mut pending_retry,
1262+
)
1263+
.await;
1264+
1265+
let fault = monoio::time::timeout(Duration::from_secs(2), fault_rx.recv())
1266+
.await
1267+
.expect("timed out waiting for scheduler fault")
1268+
.expect("fault channel should stay open");
1269+
assert!(matches!(
1270+
fault,
1271+
SchedulerFault::CompletionChannelClosed {
1272+
source: "garbling_coordinator",
1273+
peer_id: fault_peer,
1274+
} if fault_peer == peer_id
1275+
));
1276+
});
1277+
}
11821278
}

crates/job/scheduler/src/lib.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,16 @@ pub mod scheduler;
2020

2121
pub(crate) mod priority;
2222

23+
use mosaic_net_svc_api::PeerId;
24+
25+
#[derive(Debug, Clone)]
26+
pub(crate) enum SchedulerFault {
27+
CompletionChannelClosed {
28+
source: &'static str,
29+
peer_id: PeerId,
30+
},
31+
}
32+
2333
// Re-export the API crate for convenience.
2434
pub use garbling::GarblingConfig;
2535
pub use mosaic_job_api;

crates/job/scheduler/src/pool/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use self::{
1717
queue::JobQueue,
1818
worker::{Worker, WorkerJob},
1919
};
20-
use crate::priority::Priority;
20+
use crate::{SchedulerFault, priority::Priority};
2121

2222
/// A job waiting in the pool's shared queue.
2323
///
@@ -91,6 +91,7 @@ impl<D: ExecuteGarblerJob + ExecuteEvaluatorJob> JobThreadPool<D> {
9191
config: PoolConfig,
9292
dispatcher: Arc<D>,
9393
completion_tx: kanal::AsyncSender<JobCompletion>,
94+
fault_tx: kanal::AsyncSender<SchedulerFault>,
9495
) -> Self {
9596
let queue = Arc::new(JobQueue::new(config.priority_queue));
9697

@@ -101,6 +102,7 @@ impl<D: ExecuteGarblerJob + ExecuteEvaluatorJob> JobThreadPool<D> {
101102
Arc::clone(&dispatcher),
102103
Arc::clone(&queue),
103104
completion_tx.clone(),
105+
fault_tx.clone(),
104106
config.concurrency_per_worker,
105107
)
106108
})

0 commit comments

Comments
 (0)