Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ target/
# Tests artifacts
mutants.out*
lcov.info
functional-tests/_dd_parallel_shard*/

# Unit test / coverage reports
htmlcov/
Expand Down Expand Up @@ -97,4 +98,4 @@ fuzz-*.log

# Docker volume data
docker/vol/fdb/data/
docker/vol/mosaic-*/tables/
docker/vol/mosaic-*/tables/
4 changes: 2 additions & 2 deletions crates/job/api/src/submission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl JobActions {
///
/// The peer ID on [`JobCompletion`] plus the variant here (garbler vs
/// evaluator) identifies which SM to deliver to.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum ActionCompletion {
/// Garbler tracked action completed.
Garbler {
Expand Down Expand Up @@ -180,7 +180,7 @@ impl ActionCompletion {
///
/// Jobs always retry internally until they succeed — every submitted action
/// eventually produces exactly one completion. There is no failure variant.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct JobCompletion {
/// The peer whose SM this result should be routed to.
pub peer_id: PeerId,
Expand Down
104 changes: 100 additions & 4 deletions crates/job/scheduler/src/garbling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ use mosaic_job_api::{
use mosaic_net_svc_api::PeerId;
use tracing::Instrument;

use crate::SchedulerFault;

/// Size of each v5c gate record in bytes (3 × u32).
const GATE_SIZE: usize = 12;

Expand Down Expand Up @@ -259,10 +261,11 @@ impl GarblingCoordinator {
/// Jobs submitted via [`submit`](Self::submit) are collected into batches,
/// sessions are created via the `factory`, and workers process them
/// concurrently.
pub fn new(
pub(crate) fn new(
config: GarblingConfig,
factory: Arc<dyn SessionFactory>,
completion_tx: kanal::AsyncSender<JobCompletion>,
fault_tx: kanal::AsyncSender<SchedulerFault>,
) -> Self {
let (submit_tx, submit_rx) = kanal::bounded_async(config.max_concurrent * 2);

Expand All @@ -273,7 +276,13 @@ impl GarblingCoordinator {
.enable_timer()
.build()
.expect("failed to build monoio runtime for garbling coordinator")
.block_on(coordinator_loop(config, factory, submit_rx, completion_tx));
.block_on(coordinator_loop(
config,
factory,
submit_rx,
completion_tx,
fault_tx,
));
})
.expect("failed to spawn garbling coordinator thread");

Expand Down Expand Up @@ -319,6 +328,7 @@ async fn coordinator_loop(
factory: Arc<dyn SessionFactory>,
submit_rx: kanal::AsyncReceiver<PendingCircuitJob>,
completion_tx: kanal::AsyncSender<JobCompletion>,
fault_tx: kanal::AsyncSender<SchedulerFault>,
) {
let span = tracing::info_span!(
"job_scheduler.garbling_coordinator",
Expand Down Expand Up @@ -450,6 +460,7 @@ async fn coordinator_loop(
sessions,
&mut workers,
&completion_tx,
&fault_tx,
&mut pending_retry,
)
.instrument(tracing::info_span!(
Expand Down Expand Up @@ -491,6 +502,7 @@ async fn run_pass(
sessions: Vec<ActiveSession>,
workers: &mut [WorkerHandle],
completion_tx: &kanal::AsyncSender<JobCompletion>,
fault_tx: &kanal::AsyncSender<SchedulerFault>,
pending_retry: &mut Vec<PendingCircuitJob>,
) {
let n_workers = workers.len();
Expand Down Expand Up @@ -545,6 +557,7 @@ async fn run_pass(
&mut active_jobs_by_worker,
workers,
completion_tx,
fault_tx,
pending_retry,
)
.await;
Expand Down Expand Up @@ -615,6 +628,7 @@ async fn run_pass(
&mut active_jobs_by_worker,
workers,
completion_tx,
fault_tx,
Duration::from_secs(60),
pending_retry,
)
Expand Down Expand Up @@ -680,13 +694,15 @@ async fn collect_finish_reports(
active_jobs_by_worker: &mut HashMap<usize, Vec<PendingCircuitJob>>,
workers: &mut [WorkerHandle],
completion_tx: &kanal::AsyncSender<JobCompletion>,
fault_tx: &kanal::AsyncSender<SchedulerFault>,
pending_retry: &mut Vec<PendingCircuitJob>,
) {
collect_finish_reports_with_timeout(
active_worker_ids,
active_jobs_by_worker,
workers,
completion_tx,
fault_tx,
Duration::from_secs(60),
pending_retry,
)
Expand All @@ -700,6 +716,7 @@ async fn collect_finish_reports_with_timeout(
active_jobs_by_worker: &mut HashMap<usize, Vec<PendingCircuitJob>>,
workers: &mut [WorkerHandle],
completion_tx: &kanal::AsyncSender<JobCompletion>,
fault_tx: &kanal::AsyncSender<SchedulerFault>,
finish_timeout: Duration,
pending_retry: &mut Vec<PendingCircuitJob>,
) {
Expand All @@ -719,11 +736,18 @@ async fn collect_finish_reports_with_timeout(
Some(WorkerReport::FinishDone(report)) => {
active_jobs_by_worker.remove(&wid);
for completion in report.completions {
let peer_id = completion.peer_id;
if completion_tx.send(completion).await.is_err() {
tracing::error!(
worker = wid,
"completion channel closed while forwarding finish report"
);
let _ = fault_tx
.send(SchedulerFault::CompletionChannelClosed {
source: "garbling_coordinator",
peer_id,
})
.await;
return;
}
}
Expand Down Expand Up @@ -756,7 +780,7 @@ async fn collect_finish_reports_with_timeout(
///
/// Receives sessions and chunk commands from the coordinator's main thread.
/// Processes sessions sequentially per chunk (parallel across workers).
/// Sends completions directly to the SM via `completion_tx`.
/// Reports completed results back to the coordinator.
async fn worker_loop(
id: usize,
chunk_timeout: Duration,
Expand Down Expand Up @@ -959,7 +983,7 @@ fn convert_block(block: &Block, num_gates: usize) -> OwnedBlock {

#[cfg(test)]
mod tests {
use std::{future::Future, pin::Pin, sync::Arc};
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, time::Duration};

use mosaic_cac_types::{
GarblingSeed, Seed,
Expand Down Expand Up @@ -1045,6 +1069,7 @@ mod tests {
}];

let (completion_tx, _completion_rx) = kanal::bounded_async(1);
let (fault_tx, _fault_rx) = kanal::bounded_async(1);
let mut pending_retry = Vec::new();
run_pass(
&GarblingConfig {
Expand All @@ -1057,6 +1082,7 @@ mod tests {
sessions,
&mut workers,
&completion_tx,
&fault_tx,
&mut pending_retry,
)
.await;
Expand Down Expand Up @@ -1123,13 +1149,15 @@ mod tests {

let mut active_jobs_by_worker = HashMap::from([(0usize, vec![sample_job(5)])]);
let (completion_tx, completion_rx) = kanal::bounded_async(2);
let (fault_tx, _fault_rx) = kanal::bounded_async(1);
let mut pending_retry = Vec::new();

collect_finish_reports(
&[0],
&mut active_jobs_by_worker,
&mut workers,
&completion_tx,
&fault_tx,
&mut pending_retry,
)
.await;
Expand Down Expand Up @@ -1162,13 +1190,15 @@ mod tests {
let job = sample_job(6);
let mut active_jobs_by_worker = HashMap::from([(0usize, vec![job.clone()])]);
let (completion_tx, _completion_rx) = kanal::bounded_async(1);
let (fault_tx, _fault_rx) = kanal::bounded_async(1);
let mut pending_retry = Vec::new();

collect_finish_reports_with_timeout(
&[0],
&mut active_jobs_by_worker,
&mut workers,
&completion_tx,
&fault_tx,
Duration::from_millis(1),
&mut pending_retry,
)
Expand All @@ -1179,4 +1209,70 @@ mod tests {
assert_eq!(pending_retry[0].peer_id, job.peer_id);
});
}

#[test]
fn closed_completion_channel_reports_scheduler_fault() {
run_monoio(async {
let peer_id = PeerId::from([11; 32]);
let (command_tx, _command_rx) = kanal::bounded_async(2);
let (report_tx, report_rx) = kanal::bounded_async(2);
let (completion_tx, completion_rx) = kanal::bounded_async(1);
let (fault_tx, fault_rx) = kanal::bounded_async(1);
drop(completion_rx);

report_tx
.send(WorkerReport::FinishDone(FinishReport {
completions: vec![JobCompletion {
peer_id,
completion: ActionCompletion::Garbler {
id: ActionId::SendCommitMsgChunk(0),
result: ActionResult::CommitMsgChunkAcked,
},
}],
retry_jobs: vec![],
}))
.await
.expect("send finish report");

let mut workers = vec![WorkerHandle {
id: 0,
command_tx,
report_rx,
thread: None,
}];
let mut active_jobs_by_worker = HashMap::from([(
0usize,
vec![PendingCircuitJob {
peer_id,
action: CircuitAction::GarblerTransfer {
seed: GarblingSeed::from([5; 32]),
},
}],
)]);
let mut pending_retry = Vec::new();

collect_finish_reports_with_timeout(
&[0],
&mut active_jobs_by_worker,
&mut workers,
&completion_tx,
&fault_tx,
Duration::from_millis(1),
&mut pending_retry,
)
.await;

let fault = monoio::time::timeout(Duration::from_secs(2), fault_rx.recv())
.await
.expect("timed out waiting for scheduler fault")
.expect("fault channel should stay open");
assert!(matches!(
fault,
SchedulerFault::CompletionChannelClosed {
source: "garbling_coordinator",
peer_id: fault_peer,
} if fault_peer == peer_id
));
});
}
}
10 changes: 10 additions & 0 deletions crates/job/scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ pub mod scheduler;

pub(crate) mod priority;

use mosaic_net_svc_api::PeerId;

#[derive(Debug, Clone)]
pub(crate) enum SchedulerFault {
CompletionChannelClosed {
source: &'static str,
peer_id: PeerId,
},
}

// Re-export the API crate for convenience.
pub use garbling::GarblingConfig;
pub use mosaic_job_api;
Expand Down
4 changes: 3 additions & 1 deletion crates/job/scheduler/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use self::{
queue::JobQueue,
worker::{Worker, WorkerJob},
};
use crate::priority::Priority;
use crate::{SchedulerFault, priority::Priority};

/// A job waiting in the pool's shared queue.
///
Expand Down Expand Up @@ -91,6 +91,7 @@ impl<D: ExecuteGarblerJob + ExecuteEvaluatorJob> JobThreadPool<D> {
config: PoolConfig,
dispatcher: Arc<D>,
completion_tx: kanal::AsyncSender<JobCompletion>,
fault_tx: kanal::AsyncSender<SchedulerFault>,
) -> Self {
let queue = Arc::new(JobQueue::new(config.priority_queue));

Expand All @@ -101,6 +102,7 @@ impl<D: ExecuteGarblerJob + ExecuteEvaluatorJob> JobThreadPool<D> {
Arc::clone(&dispatcher),
Arc::clone(&queue),
completion_tx.clone(),
fault_tx.clone(),
config.concurrency_per_worker,
)
})
Expand Down
Loading
Loading