Skip to content

feat:(events) added events feedback#85

Open
ms3c wants to merge 1 commit into
ms3c/events-feedback-stagingfrom
feat/events_feedback
Open

feat:(events) added events feedback#85
ms3c wants to merge 1 commit into
ms3c/events-feedback-stagingfrom
feat/events_feedback

Conversation

@ms3c

@ms3c ms3c commented Nov 8, 2025

Copy link
Copy Markdown
Collaborator

Event Feedback System for Relay Server

Overview

This PR introduces a comprehensive event feedback system for the Vane relay server, providing real-time observability and programmatic notifications for all relay and swarm events. The system enables applications to subscribe to and react to relay operations such as circuit establishment, reservations, connections, and more.

Motivation

The relay server handles numerous P2P events (circuits, reservations, connections, listeners, external addresses) but previously lacked a mechanism for external components to track and react to these events programmatically. This feedback system provides:

  • Real-time event notifications for all 19 relay/swarm event types
  • Flexible subscription model supporting both broadcast and one-time subscriptions
  • Active event tracking with metadata and timing information
  • Type-safe event results via strongly-typed enums
  • Debugging and monitoring capabilities for production environments

Changes Summary

Files Added

  • node/relay/src/event_feedback.rs
    • Complete event feedback system implementation

Files Modified

  • node/relay/src/lib.rs

    • Integrated EventFeedbackManager initialization
    • Injected manager into RelayP2pWorker
  • node/relay/src/p2p.rs

    • Added feedback notifications for all 19 event types
    • Exposed 8 public API methods for event subscription

Key Features

1. Comprehensive Event Coverage

The system tracks 19 distinct event types:

Circuit Events

  • CircuitAccepted - Circuit relay request accepted
  • CircuitClosed - Circuit closed (gracefully or with error)
  • CircuitDenied - Circuit relay request denied

Reservation Events

  • ReservationAccepted - Reservation request accepted
  • ReservationClosed - Reservation closed gracefully
  • ReservationDenied - Reservation request denied
  • ReservationTimedOut - Reservation expired

Connection Events

  • ConnectionEstablished - New peer connection established
  • ConnectionClosed - Connection closed (with reason)
  • ConnectionError - Connection attempt failed

Listener Events

  • ListenerStarted - New listener started on address
  • ListenerClosed - Listener stopped
  • ListenerExpired - Listener address expired

External Address Events

  • ExternalAddrCandidate - New external address candidate discovered
  • ExternalAddrConfirmed - External address confirmed
  • ExternalAddrExpired - External address expired
  • ExternalAddrOfPeer - Discovered external address of peer

Other Events

  • Dialing - Outbound dial initiated
  • UnknownEvent - Catch-all for unhandled events

2. Dual Subscription Model

Broadcast Subscriptions

Subscribe to continuous event streams via mpsc::UnboundedReceiver:

// Subscribe to all circuit events
let mut circuit_rx = relay_worker.subscribe_all_circuits().await;
tokio::spawn(async move {
    while let Some(result) = circuit_rx.recv().await {
        match result {
            EventResult::CircuitAccepted { src_peer, dst_peer, started_at } => {
                println!("Circuit accepted: {} -> {}", src_peer, dst_peer);
            }
            EventResult::CircuitClosed { src_peer, dst_peer, error, .. } => {
                println!("Circuit closed: {} -> {} (error: {:?})", 
                    src_peer, dst_peer, error);
            }
            _ => {}
        }
    }
});

One-Time Subscriptions

Subscribe to specific events via oneshot::Receiver:

// Wait for specific circuit to close
let src = PeerId::from_str("12D3KooW...")?;
let dst = PeerId::from_str("12D3KooX...")?;
let circuit_feedback = relay_worker.subscribe_circuit(src, dst).await;

tokio::spawn(async move {
    match circuit_feedback.await {
        Ok(EventResult::CircuitClosed { error, duration_ms, .. }) => {
            println!("Circuit closed after {}ms: {:?}", duration_ms, error);
        }
        Ok(EventResult::CircuitDenied { status, .. }) => {
            println!("Circuit denied: {}", status);
        }
        _ => {}
    }
});

3. Active Event Tracking

The EventFeedbackManager maintains state for active events:

pub struct EventTrackingInfo {
    pub started_at: Instant,
    pub metadata: HashMap<String, String>,
    pub feedback_txs: Vec<tokio::sync::oneshot::Sender<EventResult>>,
}

Track metrics and query active events:

// Get counts of active events
let (circuits, reservations, connections) = relay_worker.get_feedback_stats().await;
println!("Active circuits: {}, reservations: {}, connections: {}", 
    circuits, reservations, connections);

// Get detailed circuit information
let circuit_details = feedback_manager.get_active_circuit_details().await;
for ((src, dst), info) in circuit_details {
    println!("Circuit {} -> {} active for {:?}", 
        src, dst, info.started_at.elapsed());
}

API Reference

Public Methods on RelayP2pWorker

Broadcast Subscriptions

/// Subscribe to all circuit events (accepted, closed, denied)
pub async fn subscribe_all_circuits(&self) 
    -> mpsc::UnboundedReceiver<EventResult>

/// Subscribe to all reservation events (accepted, closed, denied, timeout)
pub async fn subscribe_all_reservations(&self) 
    -> mpsc::UnboundedReceiver<EventResult>

/// Subscribe to all connection events (established, closed, error)
pub async fn subscribe_all_connections(&self) 
    -> mpsc::UnboundedReceiver<EventResult>

/// Subscribe to ALL events from the relay server
pub async fn subscribe_all_events(&self) 
    -> mpsc::UnboundedReceiver<EventResult>

One-Time Subscriptions

/// Subscribe to specific circuit (get notified when it closes/fails)
pub async fn subscribe_circuit(&self, src_peer: PeerId, dst_peer: PeerId) 
    -> oneshot::Receiver<EventResult>

/// Subscribe to specific reservation (get notified when it closes/fails)
pub async fn subscribe_reservation(&self, peer: PeerId) 
    -> oneshot::Receiver<EventResult>

/// Subscribe to specific connection (get notified when it closes/fails)
pub async fn subscribe_connection(&self, peer: PeerId) 
    -> oneshot::Receiver<EventResult>

Statistics

/// Get counts of active circuits, reservations, and connections
pub async fn get_feedback_stats(&self) -> (usize, usize, usize)

EventResult Enum

All events are returned as variants of the EventResult enum:

#[derive(Debug, Clone, Serialize)]
pub enum EventResult {
    // Circuits
    CircuitAccepted { src_peer: PeerId, dst_peer: PeerId, started_at: u64 },
    CircuitClosed { src_peer: PeerId, dst_peer: PeerId, error: Option<String>, duration_ms: u64 },
    CircuitDenied { src_peer: PeerId, dst_peer: PeerId, status: String },
    
    // Reservations
    ReservationAccepted { peer: PeerId, renewed: bool, started_at: u64 },
    ReservationClosed { peer: PeerId, reason: String, duration_ms: u64 },
    ReservationDenied { peer: PeerId, status: String },
    ReservationTimedOut { peer: PeerId, duration_ms: u64 },
    
    // Connections
    ConnectionEstablished { peer: PeerId, latency_ms: u64, endpoint: String },
    ConnectionClosed { peer: PeerId, reason: String, duration_ms: u64 },
    ConnectionError { peer: Option<PeerId>, error: String },
    
    // Listeners
    ListenerStarted { address: String },
    ListenerClosed { address: String, reason: String },
    ListenerExpired { address: String },
    
    // External Addresses
    ExternalAddrCandidate { address: String },
    ExternalAddrConfirmed { address: String },
    ExternalAddrExpired { address: String },
    ExternalAddrOfPeer { peer: PeerId, address: String },
    
    // Other
    Dialing { peer: PeerId },
    UnknownEvent { event_type: String, details: String },
}

Helper methods:

impl EventResult {
    /// Check if event represents success
    pub fn is_success(&self) -> bool
    
    /// Extract peer ID if event involves one
    pub fn get_peer_id(&self) -> Option<PeerId>
}

Usage Examples

Example 1: Monitor All Circuit Activity

use tokio::sync::mpsc;

// Subscribe to all circuit events
let mut circuit_rx = relay_worker.subscribe_all_circuits().await;

tokio::spawn(async move {
    while let Some(event) = circuit_rx.recv().await {
        match event {
            EventResult::CircuitAccepted { src_peer, dst_peer, .. } => {
                log::info!("✅ Circuit established: {} -> {}", src_peer, dst_peer);
            }
            EventResult::CircuitClosed { src_peer, dst_peer, error, duration_ms } => {
                if let Some(err) = error {
                    log::warn!("❌ Circuit failed after {}ms: {} -> {} ({})", 
                        duration_ms, src_peer, dst_peer, err);
                } else {
                    log::info!("✓ Circuit closed after {}ms: {} -> {}", 
                        duration_ms, src_peer, dst_peer);
                }
            }
            EventResult::CircuitDenied { src_peer, dst_peer, status } => {
                log::warn!("🚫 Circuit denied: {} -> {} ({})", 
                    src_peer, dst_peer, status);
            }
            _ => {}
        }
    }
});

Example 2: Track Specific Circuit Lifetime

// Start tracking a specific circuit
let src = PeerId::from_str("12D3KooWABC...")?;
let dst = PeerId::from_str("12D3KooWXYZ...")?;

let circuit_feedback = relay_worker.subscribe_circuit(src, dst).await;

// Wait for circuit to close (with timeout)
tokio::select! {
    result = circuit_feedback => {
        match result {
            Ok(EventResult::CircuitClosed { duration_ms, error, .. }) => {
                println!("Circuit closed after {}ms: {:?}", duration_ms, error);
            }
            Ok(other) => println!("Unexpected result: {:?}", other),
            Err(_) => println!("Circuit feedback channel closed"),
        }
    }
    _ = tokio::time::sleep(Duration::from_secs(300)) => {
        println!("Circuit still active after 5 minutes");
    }
}

Example 3: Monitor Connection Health

// Subscribe to all connection events
let mut conn_rx = relay_worker.subscribe_all_connections().await;

let mut connection_stats = HashMap::new();

tokio::spawn(async move {
    while let Some(event) = conn_rx.recv().await {
        match event {
            EventResult::ConnectionEstablished { peer, latency_ms, .. } => {
                connection_stats.insert(peer, latency_ms);
                println!("Peer {} connected (latency: {}ms)", peer, latency_ms);
            }
            EventResult::ConnectionClosed { peer, duration_ms, reason } => {
                connection_stats.remove(&peer);
                println!("Peer {} disconnected after {}ms: {}", 
                    peer, duration_ms, reason);
            }
            EventResult::ConnectionError { peer, error } => {
                println!("Connection error: {:?} - {}", peer, error);
            }
            _ => {}
        }
    }
});

Example 4: Dashboard/Monitoring Service

// Create a monitoring service that tracks all event types
let mut all_events = relay_worker.subscribe_all_events().await;

tokio::spawn(async move {
    let mut metrics = EventMetrics::default();
    
    while let Some(event) = all_events.recv().await {
        // Update metrics
        match event {
            EventResult::CircuitAccepted { .. } => metrics.circuits_accepted += 1,
            EventResult::CircuitClosed { error, .. } => {
                if error.is_some() {
                    metrics.circuits_failed += 1;
                } else {
                    metrics.circuits_closed += 1;
                }
            }
            EventResult::ReservationAccepted { renewed, .. } => {
                if renewed {
                    metrics.reservations_renewed += 1;
                } else {
                    metrics.reservations_new += 1;
                }
            }
            // ... handle other events
            _ => {}
        }
        
        // Periodically export metrics
        if metrics.should_export() {
            export_to_prometheus(&metrics).await;
        }
    }
});

Example 5: Testing & Validation

#[tokio::test]
async fn test_circuit_lifecycle() {
    let relay = setup_test_relay().await;
    let mut circuit_rx = relay.subscribe_all_circuits().await;
    
    // Trigger circuit creation
    let (src, dst) = create_test_peers().await;
    
    // Wait for acceptance
    let event = circuit_rx.recv().await.unwrap();
    assert!(matches!(event, EventResult::CircuitAccepted { .. }));
    
    // Trigger circuit close
    close_circuit(src, dst).await;
    
    // Verify closure event
    let event = circuit_rx.recv().await.unwrap();
    assert!(matches!(event, EventResult::CircuitClosed { .. }));
}

Implementation Details

Event Flow

  1. Event occurs in libp2p swarm (e.g., circuit accepted)
  2. Event handler in p2p.rs calls corresponding notify_* method
  3. EventFeedbackManager processes the event:
    • Creates EventResult with timing/metadata
    • Broadcasts to all matching subscriptions
    • Sends one-time notification to specific subscribers
    • Updates active event tracking
  4. Subscribers receive the event asynchronously

Thread Safety

  • EventFeedbackManager is wrapped in Arc for shared ownership
  • Internal state uses Mutex for thread-safe access
  • All async methods use .await properly without blocking

Memory Management

  • One-time subscriptions are automatically cleaned up after delivery
  • Broadcast subscriptions remain active until receiver is dropped
  • Active event tracking is cleaned up when events complete

Testing Recommendations

  1. Unit tests for EventFeedbackManager methods
  2. Integration tests using test relay setup
  3. Load tests to verify performance with many subscribers
  4. Edge case tests for subscription cleanup and error handling

Breaking Changes

None. This is a purely additive change that doesn't modify existing APIs.

Performance Impact

  • Minimal overhead - event notifications are fire-and-forget
  • No blocking - all operations are async
  • Efficient broadcasting - uses mpsc::unbounded channels
  • Lazy allocation - only active subscribers incur cost

Checklist

  • Code compiles successfully
  • All 19 event types integrated
  • Public API exposed on RelayP2pWorker
  • Type-safe EventResult enum
  • Documentation added
  • Unit tests added (TODO)
  • Integration tests added (TODO)
  • Performance benchmarks run (TODO)

@ms3c ms3c requested a review from MrishoLukamba November 8, 2025 15:28
Comment thread node/relay/src/p2p.rs
}
}

// ========== PUBLIC FEEDBACK API ==========

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These public feedback api should be in rpc not here, so in lib.rs pass the feedback manager handler to rpc also and then have rpc methods to these APIs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants