Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/cli/src/process/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use mate::proto::job::{Job, JobResult};
use mate::proto::task::TaskIdentifier;
use mate_executor::Executor;
use mate_ipc::channel::IpcServer;
use mate_ipc::protocol::{Message, MessagePayload, ProcessType};
use mate_ipc::protocol::{
ExecutorMessage, Message, MessagePayload, ProcessType, SchedulerMessage, SystemMessage,
};
use mate_ipc::transport::Transport;
use mate_repository::TaskRepository;

Expand Down Expand Up @@ -94,7 +96,7 @@ impl ExecutorProcess {
.request(Message::new(
process_type,
ProcessType::Storage,
MessagePayload::JobCompleted(
ExecutorMessage::JobCompleted(
job_id,
JobResult::Failure(format!("Task load failed: {}", err)),
),
Expand Down Expand Up @@ -126,7 +128,7 @@ impl ExecutorProcess {
.request(Message::new(
process_type,
ProcessType::Storage,
MessagePayload::JobCompleted(job_id, job_result),
ExecutorMessage::JobCompleted(job_id, job_result),
))
.await
{
Expand Down Expand Up @@ -167,12 +169,20 @@ impl ExecutorProcess {

async fn handle_message(&self, msg: Message) -> Option<MessagePayload> {
match msg.payload {
MessagePayload::ExecuteJob(job) => match self.execute(job.clone()).await {
Ok(()) => Some(MessagePayload::JobAccepted(job.id)),
Err(err) => Some(MessagePayload::JobFailed(job.id, err.to_string())),
},
MessagePayload::Ping => Some(MessagePayload::Pong),
MessagePayload::Shutdown => Some(MessagePayload::ShutdownAck),
MessagePayload::Scheduler(SchedulerMessage::ExecuteJob(job)) => {
// Stash the job ID before moving the job out of the Box to avoid cloning the entire Job.
let job_id = job.id.clone();
let job_unboxed = *job;

match self.execute(job_unboxed).await {
Ok(()) => Some(ExecutorMessage::JobAccepted(job_id.clone()).into()),
Err(err) => Some(ExecutorMessage::JobFailed(job_id, err.to_string()).into()),
}
}
MessagePayload::System(SystemMessage::Ping) => Some(SystemMessage::Pong.into()),
MessagePayload::System(SystemMessage::Shutdown) => {
Some(SystemMessage::ShutdownAck.into())
}
_ => None,
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/cli/src/process/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tracing::debug;

use mate_config::Config;
use mate_ipc::channel::IpcServer;
use mate_ipc::protocol::{Message, MessagePayload, ProcessType};
use mate_ipc::protocol::{Message, ProcessType, SystemMessage};

use crate::transport::make_transport;

Expand Down Expand Up @@ -96,7 +96,7 @@ impl Hub {
.request(Message::new(
IPC_SENDER_HUB,
ProcessType::Storage,
MessagePayload::Ping,
SystemMessage::Ping,
))
.await?;

Expand All @@ -106,7 +106,7 @@ impl Hub {
.request(Message::new(
IPC_SENDER_HUB,
ProcessType::Scheduler,
MessagePayload::Ping,
SystemMessage::Ping,
))
.await?;

Expand All @@ -117,7 +117,7 @@ impl Hub {
.request(Message::new(
IPC_SENDER_HUB,
ProcessType::Executor(i),
MessagePayload::Ping,
SystemMessage::Ping,
))
.await?;

Expand Down
8 changes: 4 additions & 4 deletions src/cli/src/server/api/v0/health/retrieve.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use axum::{Extension, Json};
use serde::Serialize;

use mate_ipc::protocol::{Message, MessagePayload, ProcessType};
use mate_ipc::protocol::{Message, ProcessType, SystemMessage};

use crate::process::hub::IPC_SENDER_HUB;
use crate::server::api::v0::ApiError;
Expand All @@ -23,7 +23,7 @@ pub async fn handler(
.request(Message::new(
IPC_SENDER_HUB,
ProcessType::Storage,
MessagePayload::Ping,
SystemMessage::Ping,
))
.await
.is_ok();
Expand All @@ -32,7 +32,7 @@ pub async fn handler(
.request(Message::new(
IPC_SENDER_HUB,
ProcessType::Scheduler,
MessagePayload::Ping,
SystemMessage::Ping,
))
.await
.is_ok();
Expand All @@ -44,7 +44,7 @@ pub async fn handler(
.request(Message::new(
IPC_SENDER_HUB,
ProcessType::Executor(i),
MessagePayload::Ping,
SystemMessage::Ping,
))
.await
.is_ok()
Expand Down
16 changes: 9 additions & 7 deletions src/cli/src/server/api/v0/jobs/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use serde::Deserialize;
use serde_json::Value;

use mate::proto::job::Job;
use mate_ipc::protocol::{Message, MessagePayload, ProcessType};
use mate_ipc::protocol::{HubMessage, Message, MessagePayload, ProcessType, StorageMessage};

use crate::server::api::v0::ApiError;
use crate::server::state::SharedServices;
Expand Down Expand Up @@ -45,7 +45,7 @@ pub async fn handler(
let msg = Message::new(
ProcessType::Hub,
ProcessType::Storage,
MessagePayload::StoreJob(job.clone()),
HubMessage::StoreJob(Box::new(job)),
);

let message = services
Expand All @@ -60,11 +60,13 @@ pub async fn handler(
})?;

match message.payload {
MessagePayload::JobStored(Ok(job)) => Ok(Json(job)),
MessagePayload::JobStored(Err(message)) => Err(ApiError {
status: StatusCode::INTERNAL_SERVER_ERROR,
message,
}),
MessagePayload::Storage(StorageMessage::JobStored(result)) => match *result {
Ok(job) => Ok(Json(job)),
Err(message) => Err(ApiError {
status: StatusCode::INTERNAL_SERVER_ERROR,
message,
}),
},
_ => Err(ApiError {
status: StatusCode::INTERNAL_SERVER_ERROR,
message: String::from("Unexpected response from storage service"),
Expand Down
9 changes: 5 additions & 4 deletions src/cli/src/server/api/v0/jobs/retrieve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::Deserialize;
use uuid::Uuid;

use mate::proto::job::{Job, JobQuery, JobStatus};
use mate_ipc::protocol::{Message, MessagePayload, ProcessType};
use mate_ipc::protocol::{HubMessage, Message, MessagePayload, ProcessType, StorageMessage};

use crate::server::api::v0::ApiError;
use crate::server::state::SharedServices;
Expand All @@ -24,10 +24,11 @@ pub async fn handler(
id: Uuid::new_v4(),
from: ProcessType::Hub,
to: ProcessType::Storage,
payload: MessagePayload::QueryJobs(JobQuery {
payload: HubMessage::QueryJobs(JobQuery {
status: query.status,
time_range: None,
}),
})
.into(),
reply_to: None,
};

Expand All @@ -42,7 +43,7 @@ pub async fn handler(
})?;

match response.payload {
MessagePayload::JobsResult(jobs) => {
MessagePayload::Storage(StorageMessage::JobsResult(jobs)) => {
if let Some(id) = query.id {
let jobs: Vec<Job> = jobs.into_iter().filter(|job| job.id == id).collect();
return Ok(Json(jobs));
Expand Down
74 changes: 57 additions & 17 deletions src/ipc/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,80 @@ use mate::proto::job::{Job, JobQuery, JobResult, JobStatus};
pub type ExecutorId = usize;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessagePayload {
// Hub -> Storage
StoreJob(Job),
pub enum HubMessage {
StoreJob(Box<Job>),
QueryJobs(JobQuery),
UpdateJobStatus(Uuid, JobStatus),
}
Comment on lines +11 to +15
Copy link

Copilot AI Mar 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HubMessage::UpdateJobStatus is defined here but (in this PR) there’s no corresponding receiver-side handling in Storage (nor backend support), and the only send sites are Scheduler→Storage. This makes the variant effectively a no-op at runtime; either implement end-to-end handling or remove/relocate it to the appropriate message scope.

Copilot uses AI. Check for mistakes.

// Storage -> Hub/Scheduler/Executor
JobStored(Result<Job, String>),
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum StorageMessage {
JobStored(Box<Result<Job, String>>),
JobsResult(Vec<Job>),
JobUpdated(Result<(), String>),
}

// Scheduler -> Storage
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SchedulerMessage {
ClaimJobs((SystemTime, SystemTime)),
ExecuteJob(Box<Job>),
}

// Scheduler -> Executor
ExecuteJob(Job),

// Executor -> Storage
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ExecutorMessage {
JobStarted(Uuid),
JobCompleted(Uuid, JobResult),
JobFailed(Uuid, String),

// Executor -> Scheduler (acknowledgment)
JobAccepted(Uuid),
}

// Health checks
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum SystemMessage {
Ping,
Pong,

// Shutdown
Shutdown,
ShutdownAck,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MessagePayload {
Hub(HubMessage),
Storage(StorageMessage),
Scheduler(SchedulerMessage),
Executor(ExecutorMessage),
System(SystemMessage),
}

impl From<HubMessage> for MessagePayload {
fn from(m: HubMessage) -> Self {
MessagePayload::Hub(m)
}
}

impl From<StorageMessage> for MessagePayload {
fn from(m: StorageMessage) -> Self {
MessagePayload::Storage(m)
}
}

impl From<SchedulerMessage> for MessagePayload {
fn from(m: SchedulerMessage) -> Self {
MessagePayload::Scheduler(m)
}
}

impl From<ExecutorMessage> for MessagePayload {
fn from(m: ExecutorMessage) -> Self {
MessagePayload::Executor(m)
}
}

impl From<SystemMessage> for MessagePayload {
fn from(m: SystemMessage) -> Self {
MessagePayload::System(m)
}
}

#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub enum ProcessType {
Hub,
Expand All @@ -60,12 +100,12 @@ pub struct Message {
}

impl Message {
pub fn new(from: ProcessType, to: ProcessType, payload: MessagePayload) -> Self {
pub fn new(from: ProcessType, to: ProcessType, payload: impl Into<MessagePayload>) -> Self {
Self {
id: Uuid::new_v4(),
from,
to,
payload,
payload: payload.into(),
reply_to: None,
}
}
Expand Down
15 changes: 9 additions & 6 deletions src/ipc/src/transport/unix_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ mod tests {
use tempfile::TempDir;
use tokio::time::sleep;

use crate::protocol::MessagePayload;
use crate::protocol::{MessagePayload, SystemMessage};

use super::*;

Expand All @@ -318,7 +318,7 @@ mod tests {
id: Uuid::new_v4(),
from: ProcessType::Hub,
to: ProcessType::Storage,
payload: MessagePayload::Ping,
payload: MessagePayload::System(SystemMessage::Ping),
reply_to: None,
};

Expand Down Expand Up @@ -348,13 +348,13 @@ mod tests {
tokio::spawn(async move {
if let Ok(request) = transport2.recv().await {
if request.from == ProcessType::Hub
&& matches!(request.payload, MessagePayload::Ping)
&& matches!(request.payload, MessagePayload::System(SystemMessage::Ping))
{
let response = Message {
id: Uuid::new_v4(),
from: ProcessType::Storage,
to: request.from,
payload: MessagePayload::Pong,
payload: SystemMessage::Pong.into(),
reply_to: Some(request.id),
};
let _ = transport2.send(response).await;
Expand All @@ -367,13 +367,16 @@ mod tests {
id: Uuid::new_v4(),
from: ProcessType::Hub,
to: ProcessType::Storage,
payload: MessagePayload::Ping,
payload: SystemMessage::Ping.into(),
reply_to: None,
};

let response = transport1.request(request).await?;

assert!(matches!(response.payload, MessagePayload::Pong));
assert!(matches!(
response.payload,
MessagePayload::System(SystemMessage::Pong)
));

Ok(())
}
Expand Down
Loading
Loading