Skip to content

Commit 7da2893

Browse files
committed
Implement proper ping/pong response tracking
Previously ping() just sent a ping and checked if the connection was still alive after a small delay. Now it properly: - Registers a pending ping with sequence number before sending - Awaits the pong response with the configured timeout - Returns Pong only when actual pong is received Added pending_pings DashMap to DistributionManager with register_ping() and complete_ping() methods.
1 parent 5174313 commit 7da2893

2 files changed

Lines changed: 36 additions & 19 deletions

File tree

crates/ambitious/src/distribution/manager.rs

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::path::Path;
1616
use std::sync::Arc;
1717
use std::sync::atomic::{AtomicU64, Ordering};
1818
use std::time::Duration;
19-
use tokio::sync::mpsc;
19+
use tokio::sync::{mpsc, oneshot};
2020

2121
/// The type of a connected node.
2222
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -135,6 +135,9 @@ pub struct DistributionManager {
135135
erlang_nodes: DashMap<Atom, ErlangNodeHandle>,
136136
/// Node type lookup (for all connected nodes).
137137
node_types: DashMap<Atom, NodeType>,
138+
/// Pending ping requests awaiting pong responses.
139+
/// Key is (node_atom, sequence_number), value is the oneshot sender.
140+
pending_pings: DashMap<(Atom, u64), oneshot::Sender<()>>,
138141
}
139142

140143
impl DistributionManager {
@@ -153,6 +156,7 @@ impl DistributionManager {
153156
#[cfg(feature = "erlang-dist")]
154157
erlang_nodes: DashMap::new(),
155158
node_types: DashMap::new(),
159+
pending_pings: DashMap::new(),
156160
}
157161
}
158162

@@ -457,6 +461,21 @@ impl DistributionManager {
457461
self.nodes.get(&node_atom).map(|n| n.tx.clone())
458462
}
459463

464+
/// Register a pending ping and return a receiver to await the pong.
465+
pub(crate) fn register_ping(&self, node: Atom, seq: u64) -> oneshot::Receiver<()> {
466+
let (tx, rx) = oneshot::channel();
467+
self.pending_pings.insert((node, seq), tx);
468+
rx
469+
}
470+
471+
/// Complete a pending ping (called when pong is received).
472+
pub(crate) fn complete_ping(&self, node: Atom, seq: u64) {
473+
if let Some((_, tx)) = self.pending_pings.remove(&(node, seq)) {
474+
// Send () to signal pong received. Ignore error if receiver dropped.
475+
let _ = tx.send(());
476+
}
477+
}
478+
460479
/// Send a message to an Erlang/BEAM node.
461480
///
462481
/// The payload should be ETF-encoded bytes. This method decodes them to
@@ -915,11 +934,13 @@ async fn handle_incoming_message(from_node: Atom, msg: DistMessage) {
915934
}
916935
}
917936
DistMessage::Pong { seq } => {
918-
// Update last seen timestamp - this is the critical heartbeat response
919-
if let Some(manager) = DIST_MANAGER.get()
920-
&& let Some(node) = manager.nodes.get(&from_node)
921-
{
922-
node.touch();
937+
// Update last seen timestamp and complete any pending ping request
938+
if let Some(manager) = DIST_MANAGER.get() {
939+
if let Some(node) = manager.nodes.get(&from_node) {
940+
node.touch();
941+
}
942+
// Notify any waiting ping caller that pong was received
943+
manager.complete_ping(from_node, seq);
923944
}
924945
tracing::trace!(seq, from_node = %from_node, "Received pong");
925946
}

crates/ambitious/src/node.rs

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ pub async fn ping(node: Atom, timeout_ms: u64) -> PingResult {
221221
}
222222

223223
/// Implementation of ping with Duration timeout.
224-
async fn ping_impl(node: Atom, _timeout_duration: Duration) -> PingResult {
224+
async fn ping_impl(node: Atom, timeout_duration: Duration) -> PingResult {
225225
let manager = match distribution::manager() {
226226
Some(m) => m,
227227
None => return PingResult::Pang,
@@ -234,24 +234,20 @@ async fn ping_impl(node: Atom, _timeout_duration: Duration) -> PingResult {
234234

235235
let seq = PING_SEQ.fetch_add(1, Ordering::Relaxed);
236236

237+
// Register the ping before sending so we can await the pong
238+
let pong_rx = manager.register_ping(node, seq);
239+
237240
// Send ping
238241
let msg = DistMessage::Ping { seq };
239242
if tx.try_send(msg).is_err() {
240243
return PingResult::Pang;
241244
}
242245

243-
// For now, we just check if the send succeeded.
244-
// A proper implementation would wait for the Pong response.
245-
// TODO: Implement proper ping/pong with response tracking
246-
247-
// Small delay to let the ping go through
248-
tokio::time::sleep(Duration::from_millis(10)).await;
249-
250-
// Check if still connected (basic liveness check)
251-
if manager.get_node_tx(node).is_some() {
252-
PingResult::Pong
253-
} else {
254-
PingResult::Pang
246+
// Wait for pong response with timeout
247+
match tokio::time::timeout(timeout_duration, pong_rx).await {
248+
Ok(Ok(())) => PingResult::Pong,
249+
Ok(Err(_)) => PingResult::Pang, // Sender dropped (node disconnected)
250+
Err(_) => PingResult::Pang, // Timeout
255251
}
256252
}
257253

0 commit comments

Comments
 (0)