Skip to content

Commit df50fdb

Browse files
committed
Implement remote spawn functionality
Add the ability to spawn processes on remote nodes via distribution: - Add Spawn/SpawnReply protocol messages for remote spawn RPC - Create spawn registry for registering spawnable functions by module/function name - Add pending_spawns tracking in DistributionManager for async request/reply - Implement node::spawn, spawn_link, and spawn_monitor functions - Add comprehensive tests for spawn registry and node spawn functions Remote spawning works by registering functions with distribution::spawn::register("module", "func", |args| async { ... }) then spawning them with node::spawn(remote_node, "module", "func", args).
1 parent 8f335a9 commit df50fdb

5 files changed

Lines changed: 595 additions & 32 deletions

File tree

crates/ambitious/src/distribution/manager.rs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ use std::sync::atomic::{AtomicU64, Ordering};
1818
use std::time::Duration;
1919
use tokio::sync::{mpsc, oneshot};
2020

21+
/// Type alias for spawn reply (pid, monitor_ref, error).
22+
type SpawnReply = (Option<Pid>, Option<u64>, Option<String>);
23+
2124
/// The type of a connected node.
2225
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2326
pub enum NodeType {
@@ -138,6 +141,9 @@ pub struct DistributionManager {
138141
/// Pending ping requests awaiting pong responses.
139142
/// Key is (node_atom, sequence_number), value is the oneshot sender.
140143
pending_pings: DashMap<(Atom, u64), oneshot::Sender<()>>,
144+
/// Pending spawn requests awaiting replies.
145+
/// Key is reference, value is the oneshot sender for (pid, monitor_ref, error).
146+
pending_spawns: DashMap<u64, oneshot::Sender<SpawnReply>>,
141147
}
142148

143149
impl DistributionManager {
@@ -157,6 +163,7 @@ impl DistributionManager {
157163
erlang_nodes: DashMap::new(),
158164
node_types: DashMap::new(),
159165
pending_pings: DashMap::new(),
166+
pending_spawns: DashMap::new(),
160167
}
161168
}
162169

@@ -476,6 +483,29 @@ impl DistributionManager {
476483
}
477484
}
478485

486+
/// Register a pending spawn request and return a receiver to await the reply.
487+
pub(crate) fn register_spawn(
488+
&self,
489+
reference: u64,
490+
) -> oneshot::Receiver<(Option<Pid>, Option<u64>, Option<String>)> {
491+
let (tx, rx) = oneshot::channel();
492+
self.pending_spawns.insert(reference, tx);
493+
rx
494+
}
495+
496+
/// Complete a pending spawn request (called when SpawnReply is received).
497+
pub(crate) fn complete_spawn(
498+
&self,
499+
reference: u64,
500+
pid: Option<Pid>,
501+
monitor_ref: Option<u64>,
502+
error: Option<String>,
503+
) {
504+
if let Some((_, tx)) = self.pending_spawns.remove(&reference) {
505+
let _ = tx.send((pid, monitor_ref, error));
506+
}
507+
}
508+
479509
/// Send a message to an Erlang/BEAM node.
480510
///
481511
/// The payload should be ETF-encoded bytes. This method decodes them to
@@ -1061,6 +1091,56 @@ async fn handle_incoming_message(from_node: Atom, msg: DistMessage) {
10611091
}
10621092
}
10631093

1094+
DistMessage::Spawn {
1095+
reference,
1096+
from,
1097+
module,
1098+
function,
1099+
args,
1100+
link,
1101+
monitor,
1102+
} => {
1103+
// Handle spawn request from remote node
1104+
let link_to = if link { Some(from) } else { None };
1105+
let monitor_from = if monitor { Some(from) } else { None };
1106+
1107+
let result =
1108+
super::spawn::execute_spawn(&module, &function, args, link_to, monitor_from);
1109+
1110+
// Send reply back to the requesting node
1111+
if let Some(manager) = DIST_MANAGER.get()
1112+
&& let Some(tx) = manager.get_node_tx(from_node)
1113+
{
1114+
let reply = match result {
1115+
Ok((pid, monitor_ref)) => DistMessage::SpawnReply {
1116+
reference,
1117+
pid: Some(pid),
1118+
monitor_ref: monitor_ref.map(|r| r.as_raw()),
1119+
error: None,
1120+
},
1121+
Err(e) => DistMessage::SpawnReply {
1122+
reference,
1123+
pid: None,
1124+
monitor_ref: None,
1125+
error: Some(e),
1126+
},
1127+
};
1128+
let _ = tx.try_send(reply);
1129+
}
1130+
}
1131+
1132+
DistMessage::SpawnReply {
1133+
reference,
1134+
pid,
1135+
monitor_ref,
1136+
error,
1137+
} => {
1138+
// Handle spawn reply - complete the pending spawn request
1139+
if let Some(manager) = DIST_MANAGER.get() {
1140+
manager.complete_spawn(reference, pid, monitor_ref, error);
1141+
}
1142+
}
1143+
10641144
_ => {
10651145
tracing::warn!(?msg, "Unexpected message type");
10661146
}

crates/ambitious/src/distribution/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ mod node;
4343
pub mod pg;
4444
mod process_monitor;
4545
pub(crate) mod protocol;
46+
pub mod spawn;
4647
mod tcp_transport;
4748
mod traits;
4849
mod transport;

crates/ambitious/src/distribution/protocol.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,37 @@ pub enum DistMessage {
142142
/// Serialized PgMessage.
143143
payload: Vec<u8>,
144144
},
145+
146+
// === Remote Spawning ===
147+
/// Request to spawn a process on this node.
148+
Spawn {
149+
/// Unique reference for this spawn request (for matching reply).
150+
reference: u64,
151+
/// The process requesting the spawn (will receive the reply).
152+
from: Pid,
153+
/// Module name (used as lookup key).
154+
module: String,
155+
/// Function name within the module.
156+
function: String,
157+
/// Serialized arguments for the function.
158+
args: Vec<u8>,
159+
/// Whether to link the spawned process to the requester.
160+
link: bool,
161+
/// Whether to monitor the spawned process.
162+
monitor: bool,
163+
},
164+
165+
/// Reply to a spawn request.
166+
SpawnReply {
167+
/// Reference from the spawn request.
168+
reference: u64,
169+
/// The spawned process PID (if successful).
170+
pid: Option<Pid>,
171+
/// Monitor reference (if monitor was requested and spawn succeeded).
172+
monitor_ref: Option<u64>,
173+
/// Error message (if spawn failed).
174+
error: Option<String>,
175+
},
145176
}
146177

147178
impl DistMessage {

0 commit comments

Comments
 (0)