Skip to content

Commit d8f3bbb

Browse files
committed
Implement BEAM cross-runtime monitoring
Add handlers for Erlang Distribution Protocol MONITOR_P, DEMONITOR_P, and MONITOR_P_EXIT control messages. This enables process monitoring between Ambitious and BEAM/Erlang/Elixir processes. - Add extract_reference helper for converting ETF references to Ref - Implement MonitorP handler to register incoming monitors - Implement DemonitorP handler to cancel monitors - Implement MonitorPExit handler to deliver DOWN messages
1 parent 1c6fa05 commit d8f3bbb

1 file changed

Lines changed: 135 additions & 11 deletions

File tree

crates/ambitious/src/distribution/manager.rs

Lines changed: 135 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1260,18 +1260,79 @@ async fn handle_erlang_incoming(from_node: Atom, msg: super::erlang::ErlangMessa
12601260
);
12611261
}
12621262
}
1263-
ControlMessage::MonitorP { .. } => {
1264-
tracing::debug!(
1265-
node = %from_node,
1266-
"Received MONITOR_P from BEAM (not yet implemented)"
1267-
);
1268-
// TODO: Implement cross-runtime monitoring
1263+
ControlMessage::MonitorP {
1264+
from_pid,
1265+
to_proc,
1266+
reference,
1267+
} => {
1268+
// A BEAM process wants to monitor a local Ambitious process.
1269+
// from_pid is the BEAM monitoring process, to_proc is the local process.
1270+
if let (Some(remote_pid), Some(local_pid), Some(monitor_ref)) = (
1271+
extract_remote_pid(&from_pid),
1272+
extract_local_pid(&to_proc),
1273+
extract_reference(&reference),
1274+
) {
1275+
tracing::debug!(
1276+
node = %from_node,
1277+
%remote_pid,
1278+
%local_pid,
1279+
?monitor_ref,
1280+
"Received MONITOR_P from BEAM"
1281+
);
1282+
1283+
if let Some(manager) = DIST_MANAGER.get() {
1284+
manager.process_monitors.add_incoming_monitor(
1285+
local_pid,
1286+
remote_pid,
1287+
monitor_ref,
1288+
from_node,
1289+
);
1290+
}
1291+
} else {
1292+
tracing::warn!(
1293+
node = %from_node,
1294+
?from_pid,
1295+
?to_proc,
1296+
?reference,
1297+
"Could not extract PIDs/reference from BEAM MONITOR_P message"
1298+
);
1299+
}
12691300
}
1270-
ControlMessage::DemonitorP { .. } => {
1271-
tracing::debug!(
1272-
node = %from_node,
1273-
"Received DEMONITOR_P from BEAM (not yet implemented)"
1274-
);
1301+
ControlMessage::DemonitorP {
1302+
from_pid,
1303+
to_proc,
1304+
reference,
1305+
} => {
1306+
// A BEAM process wants to cancel its monitor on a local Ambitious process.
1307+
if let (Some(remote_pid), Some(local_pid), Some(monitor_ref)) = (
1308+
extract_remote_pid(&from_pid),
1309+
extract_local_pid(&to_proc),
1310+
extract_reference(&reference),
1311+
) {
1312+
tracing::debug!(
1313+
node = %from_node,
1314+
%remote_pid,
1315+
%local_pid,
1316+
?monitor_ref,
1317+
"Received DEMONITOR_P from BEAM"
1318+
);
1319+
1320+
if let Some(manager) = DIST_MANAGER.get() {
1321+
manager.process_monitors.remove_incoming_monitor(
1322+
local_pid,
1323+
remote_pid,
1324+
monitor_ref,
1325+
);
1326+
}
1327+
} else {
1328+
tracing::warn!(
1329+
node = %from_node,
1330+
?from_pid,
1331+
?to_proc,
1332+
?reference,
1333+
"Could not extract PIDs/reference from BEAM DEMONITOR_P message"
1334+
);
1335+
}
12751336
}
12761337
ControlMessage::Unlink { from_pid, to_pid } => {
12771338
// A BEAM process wants to unlink from a local Ambitious process (deprecated protocol).
@@ -1324,6 +1385,48 @@ async fn handle_erlang_incoming(from_node: Atom, msg: super::erlang::ErlangMessa
13241385
"Received UNLINK_ID_ACK from BEAM"
13251386
);
13261387
}
1388+
ControlMessage::MonitorPExit {
1389+
from_proc,
1390+
to_pid,
1391+
reference,
1392+
reason,
1393+
} => {
1394+
// A monitored BEAM process has exited - deliver DOWN to local monitoring process.
1395+
// from_proc is the exiting BEAM process, to_pid is the local monitoring process.
1396+
if let (Some(remote_pid), Some(local_pid), Some(monitor_ref)) = (
1397+
extract_remote_pid(&from_proc),
1398+
extract_local_pid(&to_pid),
1399+
extract_reference(&reference),
1400+
) {
1401+
let reason_str = extract_exit_reason(&reason);
1402+
1403+
tracing::debug!(
1404+
node = %from_node,
1405+
%remote_pid,
1406+
%local_pid,
1407+
?monitor_ref,
1408+
reason = %reason_str,
1409+
"Received MONITOR_P_EXIT from BEAM"
1410+
);
1411+
1412+
if let Some(manager) = DIST_MANAGER.get() {
1413+
manager.process_monitors.handle_process_down(
1414+
monitor_ref,
1415+
remote_pid,
1416+
&reason_str,
1417+
);
1418+
}
1419+
} else {
1420+
tracing::warn!(
1421+
node = %from_node,
1422+
?from_proc,
1423+
?to_pid,
1424+
?reference,
1425+
?reason,
1426+
"Could not extract PIDs/reference from BEAM MONITOR_P_EXIT message"
1427+
);
1428+
}
1429+
}
13271430
other => {
13281431
tracing::debug!(
13291432
node = %from_node,
@@ -1378,6 +1481,27 @@ fn extract_atom_name(term: &erltf::OwnedTerm) -> Option<String> {
13781481
}
13791482
}
13801483

1484+
/// Extract a monitor reference from an OwnedTerm.
1485+
#[cfg(feature = "erlang-dist")]
1486+
fn extract_reference(term: &erltf::OwnedTerm) -> Option<Ref> {
1487+
use erltf::OwnedTerm;
1488+
1489+
match term {
1490+
OwnedTerm::Reference(r) => {
1491+
// The ids field is a Vec<u32> containing the reference identifiers.
1492+
// Combine the first two IDs into a u64 for our Ref.
1493+
// Erlang references typically have up to 3 ID words.
1494+
let id = match r.ids.as_slice() {
1495+
[id0] => *id0 as u64,
1496+
[id0, id1, ..] => ((*id1 as u64) << 32) | (*id0 as u64),
1497+
[] => return None,
1498+
};
1499+
Some(Ref::from_raw(id))
1500+
}
1501+
_ => None,
1502+
}
1503+
}
1504+
13811505
/// Extract an exit reason from an OwnedTerm.
13821506
///
13831507
/// Common Erlang exit reasons:

0 commit comments

Comments
 (0)