Skip to content

Commit 857e6c6

Browse files
committed
Add multi-transport support and migrate chat example to GenServer v2
- Add Transport/Protocol trait abstractions for pluggable transports - Add TcpTransport for native Rust-to-Rust TCP connections - Add Erlang distribution module (feature-gated) for BEAM interop - Update DistributionManager to support both Native and Erlang nodes - Migrate Room and Registry GenServers in chat example to v2 API - Fix clippy warning for TransportType Default derive
1 parent c89aa89 commit 857e6c6

12 files changed

Lines changed: 1345 additions & 132 deletions

File tree

crates/ambitious/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,17 @@ hostname = "0.4"
3636
tokio-tungstenite = { version = "0.28", optional = true }
3737
serde_json = { version = "1", optional = true }
3838
futures = { version = "0.3", optional = true }
39+
40+
# Erlang Term Format and Distribution Protocol
3941
erltf = "0.14.0"
4042
erltf_serde = "0.14.0"
43+
edp_client = { version = "0.14.0", optional = true }
4144

4245
[features]
4346
default = []
4447
websocket = ["tokio-tungstenite", "serde_json", "futures"]
4548
peer = ["tokio/process"]
49+
erlang-dist = ["edp_client"]
4650

4751
[dev-dependencies]
4852
tokio = { version = "1", features = ["full", "test-util"] }
Lines changed: 372 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,372 @@
1+
//! Erlang Distribution Protocol support for BEAM interoperability.
2+
//!
3+
//! This module enables Ambitious nodes to connect to and communicate with
4+
//! Erlang/Elixir nodes using the native Erlang Distribution Protocol.
5+
//!
6+
//! # Architecture
7+
//!
8+
//! Unlike the QUIC/TCP transports which use our own postcard-based protocol,
9+
//! this module uses `edp_client` to implement the full Erlang distribution
10+
//! protocol including:
11+
//!
12+
//! - EPMD (Erlang Port Mapper Daemon) for node discovery
13+
//! - Erlang distribution handshake (challenge/response)
14+
//! - Erlang Term Format (ETF) for message encoding via `erltf`
15+
//! - Atom caching for efficient serialization
16+
//! - Process linking and monitoring with BEAM semantics
17+
//!
18+
//! # Example
19+
//!
20+
//! ```ignore
21+
//! use ambitious::distribution::erlang::{ErlangConnection, ErlangConfig};
22+
//!
23+
//! // Connect to an Erlang node
24+
//! let config = ErlangConfig::new("rust@localhost", "erlang@localhost", "secret_cookie");
25+
//! let conn = ErlangConnection::connect(config).await?;
26+
//!
27+
//! // Send a message to a registered process
28+
//! conn.send_to_name(from_pid, "my_server", my_message).await?;
29+
//! ```
30+
//!
31+
//! # Feature Flag
32+
//!
33+
//! This module is only available when the `erlang-dist` feature is enabled:
34+
//!
35+
//! ```toml
36+
//! [dependencies]
37+
//! ambitious = { version = "0.1", features = ["erlang-dist"] }
38+
//! ```
39+
40+
#[cfg(feature = "erlang-dist")]
41+
mod inner {
42+
use super::super::protocol::DistError;
43+
use crate::core::Pid;
44+
pub use edp_client::control::ControlMessage;
45+
pub use edp_client::{Connection, ConnectionConfig, DistributionFlags};
46+
use erltf::OwnedTerm;
47+
use erltf::types::{Atom, ExternalPid, ExternalReference};
48+
49+
/// Configuration for connecting to an Erlang node.
50+
#[derive(Debug, Clone)]
51+
pub struct ErlangConfig {
52+
/// Our node name (e.g., "rust@localhost").
53+
pub local_node: String,
54+
/// Remote node name (e.g., "erlang@localhost").
55+
pub remote_node: String,
56+
/// The shared cookie for authentication.
57+
pub cookie: String,
58+
/// Optional EPMD host (defaults to localhost).
59+
pub epmd_host: Option<String>,
60+
/// Connection timeout in seconds.
61+
pub timeout_secs: u64,
62+
}
63+
64+
impl ErlangConfig {
65+
/// Create a new Erlang connection config.
66+
pub fn new(local_node: &str, remote_node: &str, cookie: &str) -> Self {
67+
Self {
68+
local_node: local_node.to_string(),
69+
remote_node: remote_node.to_string(),
70+
cookie: cookie.to_string(),
71+
epmd_host: None,
72+
timeout_secs: 30,
73+
}
74+
}
75+
76+
/// Set the EPMD host.
77+
pub fn with_epmd_host(mut self, host: &str) -> Self {
78+
self.epmd_host = Some(host.to_string());
79+
self
80+
}
81+
82+
/// Set the connection timeout.
83+
pub fn with_timeout(mut self, secs: u64) -> Self {
84+
self.timeout_secs = secs;
85+
self
86+
}
87+
}
88+
89+
/// A connection to an Erlang/BEAM node.
90+
///
91+
/// This wraps `edp_client::Connection` and provides a more ergonomic API
92+
/// that integrates with Ambitious's process model.
93+
pub struct ErlangConnection {
94+
/// The underlying edp_client connection.
95+
inner: Connection,
96+
/// Our node name.
97+
local_node: String,
98+
/// Remote node name.
99+
remote_node: String,
100+
/// Counter for generating unique IDs.
101+
id_counter: std::sync::atomic::AtomicU64,
102+
}
103+
104+
impl ErlangConnection {
105+
/// Connect to an Erlang node using the provided configuration.
106+
pub async fn connect(config: ErlangConfig) -> Result<Self, DistError> {
107+
let conn_config =
108+
ConnectionConfig::new(&config.local_node, &config.remote_node, &config.cookie);
109+
110+
// Apply optional settings
111+
let conn_config = if let Some(ref host) = config.epmd_host {
112+
conn_config.with_epmd_host(host)
113+
} else {
114+
conn_config
115+
};
116+
117+
let mut connection = Connection::new(conn_config);
118+
119+
connection
120+
.connect()
121+
.await
122+
.map_err(|e| DistError::Connect(format!("Erlang connection failed: {}", e)))?;
123+
124+
tracing::info!(
125+
local = %config.local_node,
126+
remote = %config.remote_node,
127+
"Connected to Erlang node"
128+
);
129+
130+
Ok(Self {
131+
inner: connection,
132+
local_node: config.local_node,
133+
remote_node: config.remote_node,
134+
id_counter: std::sync::atomic::AtomicU64::new(1),
135+
})
136+
}
137+
138+
/// Send a message to a process by PID.
139+
///
140+
/// The message will be encoded to ETF before sending.
141+
pub async fn send_to_pid(
142+
&mut self,
143+
from: &ErlangPid,
144+
to: &ErlangPid,
145+
message: OwnedTerm,
146+
) -> Result<(), DistError> {
147+
self.inner
148+
.send_message(from.0.clone(), to.0.clone(), message)
149+
.await
150+
.map_err(|e| DistError::Io(format!("send failed: {}", e)))
151+
}
152+
153+
/// Send a message to a registered process by name.
154+
///
155+
/// The message will be encoded to ETF before sending.
156+
pub async fn send_to_name(
157+
&mut self,
158+
from: &ErlangPid,
159+
name: &str,
160+
message: OwnedTerm,
161+
) -> Result<(), DistError> {
162+
self.inner
163+
.send_to_name(from.0.clone(), Atom::new(name), message)
164+
.await
165+
.map_err(|e| DistError::Io(format!("send_to_name failed: {}", e)))
166+
}
167+
168+
/// Receive the next message from the connection.
169+
///
170+
/// Returns the control message and optional payload.
171+
pub async fn receive(&mut self) -> Result<ErlangMessage, DistError> {
172+
let (control, payload) = self
173+
.inner
174+
.receive_message()
175+
.await
176+
.map_err(|e| DistError::Io(format!("receive failed: {}", e)))?;
177+
178+
Ok(ErlangMessage { control, payload })
179+
}
180+
181+
/// Link two processes across the distribution boundary.
182+
pub async fn link(&mut self, from: &ErlangPid, to: &ErlangPid) -> Result<(), DistError> {
183+
self.inner
184+
.link(&from.0, &to.0)
185+
.await
186+
.map_err(|e| DistError::Io(format!("link failed: {}", e)))
187+
}
188+
189+
/// Unlink two processes.
190+
pub async fn unlink(
191+
&mut self,
192+
from: &ErlangPid,
193+
to: &ErlangPid,
194+
unlink_id: u64,
195+
) -> Result<(), DistError> {
196+
self.inner
197+
.unlink(&from.0, &to.0, unlink_id)
198+
.await
199+
.map_err(|e| DistError::Io(format!("unlink failed: {}", e)))
200+
}
201+
202+
/// Monitor a remote process.
203+
pub async fn monitor(
204+
&mut self,
205+
from: &ErlangPid,
206+
to: &ErlangPid,
207+
reference: &ErlangRef,
208+
) -> Result<(), DistError> {
209+
self.inner
210+
.monitor(&from.0, &to.0, &reference.0)
211+
.await
212+
.map_err(|e| DistError::Io(format!("monitor failed: {}", e)))
213+
}
214+
215+
/// Remove a monitor.
216+
pub async fn demonitor(
217+
&mut self,
218+
from: &ErlangPid,
219+
to: &ErlangPid,
220+
reference: &ErlangRef,
221+
) -> Result<(), DistError> {
222+
self.inner
223+
.demonitor(&from.0, &to.0, &reference.0)
224+
.await
225+
.map_err(|e| DistError::Io(format!("demonitor failed: {}", e)))
226+
}
227+
228+
/// Check if the connection is still active.
229+
pub fn is_connected(&self) -> bool {
230+
self.inner.is_connected()
231+
}
232+
233+
/// Close the connection.
234+
pub async fn close(&mut self) -> Result<(), DistError> {
235+
self.inner
236+
.close()
237+
.await
238+
.map_err(|e| DistError::Io(format!("close failed: {}", e)))
239+
}
240+
241+
/// Get our local node name.
242+
pub fn local_node(&self) -> &str {
243+
&self.local_node
244+
}
245+
246+
/// Get the remote node name.
247+
pub fn remote_node(&self) -> &str {
248+
&self.remote_node
249+
}
250+
251+
/// Allocate a new PID for a local process.
252+
///
253+
/// This creates an `ExternalPid` that can be used to send messages
254+
/// as a specific process to the Erlang node.
255+
pub fn allocate_pid(&self) -> ErlangPid {
256+
let id = self
257+
.id_counter
258+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
259+
ErlangPid(ExternalPid::new(
260+
Atom::new(&self.local_node),
261+
id as u32,
262+
0,
263+
1,
264+
))
265+
}
266+
267+
/// Allocate a new reference for monitors.
268+
pub fn allocate_ref(&self) -> ErlangRef {
269+
let id = self
270+
.id_counter
271+
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
272+
ErlangRef(ExternalReference::new(
273+
Atom::new(&self.local_node),
274+
1,
275+
vec![id as u32],
276+
))
277+
}
278+
}
279+
280+
/// An Erlang PID wrapper for type safety.
281+
#[derive(Debug, Clone)]
282+
pub struct ErlangPid(ExternalPid);
283+
284+
impl ErlangPid {
285+
/// Create from an erltf ExternalPid.
286+
pub fn new(pid: ExternalPid) -> Self {
287+
Self(pid)
288+
}
289+
290+
/// Get the inner ExternalPid.
291+
pub fn into_inner(self) -> ExternalPid {
292+
self.0
293+
}
294+
295+
/// Get a reference to the inner ExternalPid.
296+
pub fn as_inner(&self) -> &ExternalPid {
297+
&self.0
298+
}
299+
300+
/// Convert from an Ambitious Pid.
301+
///
302+
/// Note: This creates a representation suitable for sending to Erlang,
303+
/// but the Erlang node won't be able to send back to this PID directly
304+
/// without proper integration.
305+
pub fn from_ambitious(pid: Pid, node_name: &str, creation: u32) -> Self {
306+
Self(ExternalPid::new(
307+
Atom::new(node_name),
308+
pid.id() as u32,
309+
0,
310+
creation,
311+
))
312+
}
313+
}
314+
315+
/// An Erlang reference wrapper.
316+
#[derive(Debug, Clone)]
317+
pub struct ErlangRef(ExternalReference);
318+
319+
impl ErlangRef {
320+
/// Create from an erltf ExternalReference.
321+
pub fn new(reference: ExternalReference) -> Self {
322+
Self(reference)
323+
}
324+
325+
/// Get the inner reference.
326+
pub fn into_inner(self) -> ExternalReference {
327+
self.0
328+
}
329+
330+
/// Get a reference to the inner ExternalReference.
331+
pub fn as_inner(&self) -> &ExternalReference {
332+
&self.0
333+
}
334+
}
335+
336+
/// A message received from an Erlang node.
337+
#[derive(Debug)]
338+
pub struct ErlangMessage {
339+
/// The control message (describes the operation).
340+
pub control: ControlMessage,
341+
/// The optional payload (the actual message data).
342+
pub payload: Option<OwnedTerm>,
343+
}
344+
345+
impl ErlangMessage {
346+
/// Get a reference to the payload if present.
347+
pub fn payload(&self) -> Option<&OwnedTerm> {
348+
self.payload.as_ref()
349+
}
350+
351+
/// Take the payload, leaving None in its place.
352+
pub fn take_payload(&mut self) -> Option<OwnedTerm> {
353+
self.payload.take()
354+
}
355+
}
356+
}
357+
358+
// Re-export when feature is enabled
359+
#[cfg(feature = "erlang-dist")]
360+
pub use inner::*;
361+
362+
// Provide a helpful error when feature is not enabled
363+
#[cfg(not(feature = "erlang-dist"))]
364+
/// Placeholder when erlang-dist feature is not enabled.
365+
///
366+
/// Enable the `erlang-dist` feature to use Erlang distribution:
367+
/// ```toml
368+
/// ambitious = { features = ["erlang-dist"] }
369+
/// ```
370+
pub fn connect(_config: ()) -> Result<(), super::protocol::DistError> {
371+
Err(super::protocol::DistError::NotInitialized)
372+
}

0 commit comments

Comments
 (0)