Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ docker-publish-image:
# Runs the Docker image locally
docker-run-image: docker-build-image
docker run mate:$(cargo tag current)

# Runs clippy and fmt on the entire workspace
fmt:
cargo clippy --fix --workspace --allow-dirty --allow-staged && cargo fmt
5 changes: 1 addition & 4 deletions src/cli/src/process/hub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,10 @@ use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use tempfile::TempDir;
use tokio::process::{Child, Command};
use tokio::time::sleep;
use tracing::debug;

use mate_config::Config;
Expand Down Expand Up @@ -88,8 +86,7 @@ impl Hub {
child_processes.push(executor);
}

// TODO: Perform Polling via Transport perhaps?
sleep(Duration::from_secs(1)).await;
self.wait_for_components().await?;

Ok(child_processes)
}
Expand Down
6 changes: 2 additions & 4 deletions src/cli/src/transport.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use anyhow::Result;

use mate_config::{Config, transport::TransportConfig};
use mate_ipc::{
protocol::ProcessType,
transport::{Transport, unix_socket::UnixSocketTransport},
};
use mate_ipc::protocol::ProcessType;
use mate_ipc::transport::{Transport, unix_socket::UnixSocketTransport};

// TODO: Instead of accessing the Transport directly, we should only access the `IpcService`
pub async fn make_transport(
Expand Down
1 change: 1 addition & 0 deletions src/ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ async-trait = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt", "sync", "time"] }
uuid = { workspace = true, features = ["serde", "v4"] }

Expand Down
29 changes: 24 additions & 5 deletions src/ipc/src/transport/unix_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
use tokio::sync::oneshot::{Sender, channel};
use tokio::task::JoinHandle;
use tokio::time::{sleep, timeout};
use tracing::debug;
use uuid::Uuid;

use crate::protocol::{Message, ProcessType};
Expand Down Expand Up @@ -165,13 +166,31 @@ impl UnixSocketTransport {
/// Finally frees the connection
async fn send_message_internal(&self, msg: &Message) -> Result<()> {
let target_socket = Self::socket_path_for_process(&self.base_path, &msg.to);
let mut tries = 0;

if !target_socket.exists() {
return Err(anyhow!(
"Target process {:?} socket does not exist at {:?}",
msg.to,
target_socket
));
loop {
if target_socket.exists() {
break;
}

if tries >= UNIX_SOCKET_CONNECTION_RETRIES {
return Err(anyhow!(
"Target process {:?} socket does not exist at {:?}",
msg.to,
target_socket
));
}

sleep(Duration::from_millis(10)).await;

tries += 1;

debug!(
"Waiting for target process {:?} socket to be available at {:?} (attempt {}/{})",
msg.to, target_socket, tries, UNIX_SOCKET_CONNECTION_RETRIES
);
}
}

let mut stream =
Expand Down
Loading