@@ -28,15 +28,24 @@ pub enum NodeType {
2828 Erlang ,
2929}
3030
31+ /// An outgoing message destined for an Erlang/BEAM node.
32+ #[ cfg( feature = "erlang-dist" ) ]
33+ struct ErlangOutgoingMessage {
34+ /// Destination PID on the BEAM node.
35+ dest : Pid ,
36+ /// The message payload as an OwnedTerm.
37+ payload : erltf:: OwnedTerm ,
38+ }
39+
3140/// Handle for managing an Erlang node connection.
3241///
3342/// Wraps an `ErlangConnection` with message queuing and lifecycle management.
3443#[ cfg( feature = "erlang-dist" ) ]
3544pub struct ErlangNodeHandle {
3645 /// Node info.
3746 info : NodeInfo ,
38- /// Sender for outgoing ETF-encoded messages.
39- tx : mpsc:: Sender < erltf :: OwnedTerm > ,
47+ /// Sender for outgoing ETF-encoded messages with destination info .
48+ tx : mpsc:: Sender < ErlangOutgoingMessage > ,
4049 /// Last seen timestamp.
4150 last_seen_ms : AtomicU64 ,
4251}
@@ -448,6 +457,167 @@ impl DistributionManager {
448457 self . nodes . get ( & node_atom) . map ( |n| n. tx . clone ( ) )
449458 }
450459
460+ /// Send a message to an Erlang/BEAM node.
461+ ///
462+ /// The payload should be ETF-encoded bytes. This method decodes them to
463+ /// an OwnedTerm and sends via the Erlang Distribution Protocol.
464+ #[ cfg( feature = "erlang-dist" ) ]
465+ pub fn send_to_erlang ( & self , pid : Pid , etf_bytes : Vec < u8 > ) -> Result < ( ) , DistError > {
466+ let node_atom = pid. node ( ) ;
467+
468+ if let Some ( handle) = self . erlang_nodes . get ( & node_atom) {
469+ // Decode ETF bytes to OwnedTerm
470+ let payload = erltf:: decode ( & etf_bytes)
471+ . map_err ( |e| DistError :: Decode ( format ! ( "ETF decode error: {}" , e) ) ) ?;
472+
473+ // Create the outgoing message with destination info
474+ let msg = ErlangOutgoingMessage { dest : pid, payload } ;
475+
476+ // Send via the channel
477+ if handle. tx . try_send ( msg) . is_err ( ) {
478+ tracing:: warn!( ?pid, "Message queue full for Erlang node" ) ;
479+ }
480+ Ok ( ( ) )
481+ } else {
482+ Err ( DistError :: NotConnected ( node_atom) )
483+ }
484+ }
485+
486+ /// Connect to an Erlang/BEAM node.
487+ ///
488+ /// This establishes a connection using the Erlang Distribution Protocol,
489+ /// enabling transparent message exchange with Erlang, Elixir, and other
490+ /// BEAM-based systems.
491+ ///
492+ /// # Arguments
493+ ///
494+ /// * `remote_node` - The remote node name (e.g., "elixir@localhost")
495+ /// * `cookie` - The shared secret cookie for authentication
496+ ///
497+ /// # Example
498+ ///
499+ /// ```ignore
500+ /// use ambitious::dist;
501+ ///
502+ /// // Connect to an Elixir node
503+ /// dist::connect_erlang("my_app@localhost", "secret_cookie").await?;
504+ ///
505+ /// // Now you can send messages to BEAM processes transparently
506+ /// ambitious::send(beam_pid, MyMessage { data: 42 });
507+ /// ```
508+ #[ cfg( feature = "erlang-dist" ) ]
509+ pub async fn connect_erlang ( & self , remote_node : & str , cookie : & str ) -> Result < Atom , DistError > {
510+ use super :: erlang:: { ErlangConfig , ErlangConnection } ;
511+
512+ let node_atom = Atom :: new ( remote_node) ;
513+
514+ // Check if already connected
515+ if self . erlang_nodes . contains_key ( & node_atom) {
516+ return Err ( DistError :: AlreadyConnected ( node_atom) ) ;
517+ }
518+
519+ // Create connection config
520+ let config = ErlangConfig :: new ( & self . node_name , remote_node, cookie) ;
521+
522+ // Connect to the BEAM node
523+ let conn = ErlangConnection :: connect ( config) . await ?;
524+
525+ tracing:: info!(
526+ local = %self . node_name,
527+ remote = %remote_node,
528+ "Connected to Erlang/BEAM node"
529+ ) ;
530+
531+ // Create message channel
532+ let ( tx, mut rx) = mpsc:: channel :: < ErlangOutgoingMessage > ( 256 ) ;
533+
534+ // Create node info
535+ let info = NodeInfo {
536+ name : NodeName :: from ( remote_node) ,
537+ id : crate :: core:: NodeId :: local ( ) , // NodeId is just for display
538+ addr : None , // BEAM nodes use EPMD for discovery
539+ creation : 0 , // BEAM doesn't expose creation in the same way
540+ } ;
541+
542+ // Store the handle
543+ let handle = ErlangNodeHandle {
544+ info,
545+ tx,
546+ last_seen_ms : AtomicU64 :: new ( current_time_ms ( ) ) ,
547+ } ;
548+ self . erlang_nodes . insert ( node_atom, handle) ;
549+ self . node_types . insert ( node_atom, NodeType :: Erlang ) ;
550+
551+ // Spawn sender task
552+ let conn = std:: sync:: Arc :: new ( tokio:: sync:: Mutex :: new ( conn) ) ;
553+ let conn_sender = conn. clone ( ) ;
554+ let sender_node = node_atom;
555+
556+ tokio:: spawn ( async move {
557+ while let Some ( msg) = rx. recv ( ) . await {
558+ let mut guard = conn_sender. lock ( ) . await ;
559+
560+ // Create "from" PID for this message
561+ let from_pid = guard. allocate_pid ( ) ;
562+
563+ // Convert the destination PID to an ErlangPid
564+ let to_pid = super :: erlang:: ErlangPid :: from_ambitious (
565+ msg. dest ,
566+ & sender_node. as_str ( ) ,
567+ 0 , // creation
568+ ) ;
569+
570+ tracing:: debug!(
571+ from = ?from_pid,
572+ to = ?to_pid,
573+ "Sending message to BEAM node"
574+ ) ;
575+
576+ if let Err ( e) = guard. send_to_pid ( & from_pid, & to_pid, msg. payload ) . await {
577+ tracing:: warn!(
578+ node = %sender_node,
579+ error = %e,
580+ "Failed to send message to BEAM node"
581+ ) ;
582+ }
583+ }
584+ tracing:: debug!( node = %sender_node, "Erlang sender task ended" ) ;
585+ } ) ;
586+
587+ // Spawn receiver task to handle incoming messages
588+ let receiver_node = node_atom;
589+ let conn_receiver = conn;
590+
591+ tokio:: spawn ( async move {
592+ loop {
593+ let msg = {
594+ let mut guard = conn_receiver. lock ( ) . await ;
595+ match guard. receive ( ) . await {
596+ Ok ( msg) => msg,
597+ Err ( e) => {
598+ tracing:: warn!( node = %receiver_node, error = %e, "Erlang receive error" ) ;
599+ break ;
600+ }
601+ }
602+ } ;
603+
604+ tracing:: debug!( node = %receiver_node, ?msg, "Received message from BEAM" ) ;
605+
606+ // Handle the incoming message based on control message type
607+ handle_erlang_incoming ( receiver_node, msg) . await ;
608+ }
609+
610+ // Clean up on disconnect
611+ if let Some ( manager) = DIST_MANAGER . get ( ) {
612+ manager. erlang_nodes . remove ( & receiver_node) ;
613+ manager. node_types . remove ( & receiver_node) ;
614+ tracing:: info!( node = %receiver_node, "Disconnected from Erlang node" ) ;
615+ }
616+ } ) ;
617+
618+ Ok ( node_atom)
619+ }
620+
451621 /// Get our node name atom.
452622 pub fn node_name_atom ( & self ) -> Atom {
453623 self . node_name_atom
@@ -873,6 +1043,159 @@ async fn handle_incoming_message(from_node: Atom, msg: DistMessage) {
8731043 }
8741044}
8751045
1046+ /// Handle an incoming message from an Erlang/BEAM node.
1047+ ///
1048+ /// This function processes messages received via the Erlang Distribution Protocol
1049+ /// and delivers them to local Ambitious processes.
1050+ #[ cfg( feature = "erlang-dist" ) ]
1051+ async fn handle_erlang_incoming ( from_node : Atom , msg : super :: erlang:: ErlangMessage ) {
1052+ use super :: erlang:: ControlMessage ;
1053+
1054+ match msg. control {
1055+ ControlMessage :: Send { to_pid, .. } | ControlMessage :: SendSender { to_pid, .. } => {
1056+ // Extract the destination PID from the OwnedTerm
1057+ if let Some ( local_pid) = extract_local_pid ( & to_pid) {
1058+ if let Some ( payload) = msg. payload {
1059+ deliver_etf_to_local ( local_pid, payload) ;
1060+ }
1061+ } else {
1062+ tracing:: warn!(
1063+ node = %from_node,
1064+ ?to_pid,
1065+ "Could not extract local PID from BEAM message"
1066+ ) ;
1067+ }
1068+ }
1069+ ControlMessage :: RegSend { to_name, .. } => {
1070+ // Extract the registered name from the OwnedTerm
1071+ if let Some ( name) = extract_atom_name ( & to_name) {
1072+ // Look up the registered process
1073+ if let Some ( handle) = crate :: process:: global:: try_handle ( ) {
1074+ if let Some ( local_pid) = handle. registry ( ) . whereis ( & name) {
1075+ if let Some ( payload) = msg. payload {
1076+ deliver_etf_to_local ( local_pid, payload) ;
1077+ }
1078+ } else {
1079+ tracing:: debug!(
1080+ node = %from_node,
1081+ name = %name,
1082+ "No process registered with name"
1083+ ) ;
1084+ }
1085+ }
1086+ } else {
1087+ tracing:: warn!(
1088+ node = %from_node,
1089+ ?to_name,
1090+ "Could not extract registered name from BEAM message"
1091+ ) ;
1092+ }
1093+ }
1094+ ControlMessage :: Link { from_pid, to_pid } => {
1095+ tracing:: debug!(
1096+ node = %from_node,
1097+ ?from_pid,
1098+ ?to_pid,
1099+ "Received LINK from BEAM (not yet implemented)"
1100+ ) ;
1101+ // TODO: Implement cross-runtime linking
1102+ }
1103+ ControlMessage :: Exit {
1104+ from_pid,
1105+ to_pid,
1106+ reason,
1107+ } => {
1108+ tracing:: debug!(
1109+ node = %from_node,
1110+ ?from_pid,
1111+ ?to_pid,
1112+ ?reason,
1113+ "Received EXIT from BEAM (not yet implemented)"
1114+ ) ;
1115+ // TODO: Implement cross-runtime exit signal handling
1116+ }
1117+ ControlMessage :: MonitorP { .. } => {
1118+ tracing:: debug!(
1119+ node = %from_node,
1120+ "Received MONITOR_P from BEAM (not yet implemented)"
1121+ ) ;
1122+ // TODO: Implement cross-runtime monitoring
1123+ }
1124+ ControlMessage :: DemonitorP { .. } => {
1125+ tracing:: debug!(
1126+ node = %from_node,
1127+ "Received DEMONITOR_P from BEAM (not yet implemented)"
1128+ ) ;
1129+ }
1130+ other => {
1131+ tracing:: debug!(
1132+ node = %from_node,
1133+ ?other,
1134+ "Unhandled BEAM control message type"
1135+ ) ;
1136+ }
1137+ }
1138+ }
1139+
1140+ /// Extract a local PID from an OwnedTerm that should be an ExternalPid.
1141+ #[ cfg( feature = "erlang-dist" ) ]
1142+ fn extract_local_pid ( term : & erltf:: OwnedTerm ) -> Option < Pid > {
1143+ use erltf:: OwnedTerm ;
1144+
1145+ match term {
1146+ OwnedTerm :: Pid ( pid) => {
1147+ // The pid.id is the process ID in the Erlang term
1148+ // Reconstruct the local PID using our node's atom and the ID
1149+ let local_node = crate :: core:: node:: node_name_atom ( ) ;
1150+ let creation = crate :: core:: current_creation ( ) ;
1151+ Some ( Pid :: from_parts_atom ( local_node, pid. id as u64 , creation) )
1152+ }
1153+ _ => None ,
1154+ }
1155+ }
1156+
1157+ /// Extract an atom name from an OwnedTerm.
1158+ #[ cfg( feature = "erlang-dist" ) ]
1159+ fn extract_atom_name ( term : & erltf:: OwnedTerm ) -> Option < String > {
1160+ use erltf:: OwnedTerm ;
1161+
1162+ match term {
1163+ OwnedTerm :: Atom ( atom) => Some ( atom. as_str ( ) . to_string ( ) ) ,
1164+ _ => None ,
1165+ }
1166+ }
1167+
1168+ /// Deliver an ETF-encoded payload to a local process.
1169+ ///
1170+ /// The payload is encoded back to ETF bytes and delivered raw.
1171+ /// The receiving process is responsible for decoding using erltf_serde.
1172+ #[ cfg( feature = "erlang-dist" ) ]
1173+ fn deliver_etf_to_local ( pid : Pid , payload : erltf:: OwnedTerm ) {
1174+ // Encode the OwnedTerm back to ETF bytes
1175+ let etf_bytes = match erltf:: encode ( & payload) {
1176+ Ok ( bytes) => bytes,
1177+ Err ( e) => {
1178+ tracing:: warn!(
1179+ ?pid,
1180+ error = %e,
1181+ "Failed to encode ETF payload for local delivery"
1182+ ) ;
1183+ return ;
1184+ }
1185+ } ;
1186+
1187+ // Deliver to local process
1188+ if let Some ( handle) = crate :: process:: global:: try_handle ( )
1189+ && let Err ( e) = handle. registry ( ) . send_raw ( pid, etf_bytes)
1190+ {
1191+ tracing:: debug!(
1192+ ?pid,
1193+ error = ?e,
1194+ "Failed to deliver BEAM message to local process"
1195+ ) ;
1196+ }
1197+ }
1198+
8761199// === Public API Functions ===
8771200
8781201/// Connect to a remote node.
0 commit comments