Skip to content

WebSocket socket/file descriptor leak — ~120 leaked sockets per minute #280

@nobdefender

Description

@nobdefender

WebSocket socket/file descriptor leak — ~120 leaked sockets per minute

Summary

ConnectionManager and its associated background tasks (connection_loop, heartbeat_loop, reconnection_handler) are never properly cleaned up when ChannelResources is removed from the DashMap. This causes a steady accumulation of orphaned WebSocket connections and tokio tasks, leading to "Too many open files" (EMFILE) errors in production.

Environment

  • polymarket-client-sdk version: 0.4.3
  • OS: Linux (Docker, ulimit -n 1024)
  • Tokio runtime, long-running process
  • 8 trading strategies sharing one clob::ws::Client (market channel) + 8 individual clob::ws::Client<Authenticated> (user channel) + 10 rtds::Client instances

Observed behavior

File descriptors grow linearly at ~120/minute from process start:

2026-03-09 08:33:51 | FDs: 158 | Sockets: 152 | TCP: 150
2026-03-09 08:34:51 | FDs: 278 | Sockets: 272 | TCP: 270
2026-03-09 08:35:51 | FDs: 392 | Sockets: 386 | TCP: 390
2026-03-09 08:36:52 | FDs: 507 | Sockets: 501 | TCP: 512
2026-03-09 08:37:52 | FDs: 627 | Sockets: 621 | TCP: 631
2026-03-09 08:38:52 | FDs: 745 | Sockets: 739 | TCP: 747
2026-03-09 08:39:52 | FDs: 864 | Sockets: 858 | TCP: 858

Process crashes after ~8 minutes. The first symptom is TLS failing to read root CA certificates because the OS can't open any more files:

WARN tokio_tungstenite::tls::encryption::rustls: native root CA certificate loading errors:
  [Error { context: "failed to read PEM from file", kind: Io {
    inner: Os { code: 24, kind: Uncategorized, message: "Too many open files" },
    path: "/etc/ssl/certs/Amazon_Root_CA_1.pem" } },
   Error { context: "failed to read PEM from file", kind: Io {
    inner: Os { code: 24, kind: Uncategorized, message: "Too many open files" },
    path: "/etc/ssl/certs/HARICA_TLS_ECC_Root_CA_2021.pem" } },
   ... (dozens more CA certs fail to load) ...]

This cascades into all network operations failing — new WebSocket connections cannot complete TLS handshake and REST API calls fail — causing a total failure of all strategies.

Root cause analysis

Bug 1: Circular reference prevents ConnectionManager cleanup

When ChannelResources is removed from the channels DashMap (via unsubscribe_and_cleanup at client.rs:613), the ConnectionManager and its background tasks are never stopped due to a reference cycle:

The cycle:

reconnection_handler task (subscription.rs:111)
  → holds Arc<SubscriptionManager> (subscription.rs:109)
    → holds ConnectionManager clone (subscription.rs:76)
      → holds sender_tx clone (keeps connection_loop alive)
      → holds state_tx clone (keeps reconnection_handler alive)

Step by step:

  1. ChannelResources::new() creates a ConnectionManager and clones it into SubscriptionManager (client.rs:630-631):

    let connection = ConnectionManager::new(endpoint, config, Arc::clone(&interest))?;
    let subscriptions = Arc::new(SubscriptionManager::new(connection.clone(), interest));
  2. start_reconnection_handler() spawns a task that holds Arc::clone(&self) of the SubscriptionManager (subscription.rs:108-109):

    pub fn start_reconnection_handler(self: &Arc<Self>) {
        let this = Arc::clone(self);  // ← prevents SubscriptionManager from being dropped
        tokio::spawn(async move {
            let mut state_rx = this.connection.state_receiver();
            loop {
                if state_rx.changed().await.is_err() { break; }
                // ...
            }
        });
    }
  3. When entry.remove() drops ChannelResources (client.rs:613):

    • ChannelResources.connection (first ConnectionManager clone) is dropped — one sender_tx clone gone
    • ChannelResources.subscriptions (Arc<SubscriptionManager>) — Arc refcount decrements
    • BUT the reconnection_handler task still holds another Arc<SubscriptionManager>
    • So SubscriptionManager is NOT dropped
    • Its ConnectionManager (second clone, held by SubscriptionManager) is NOT dropped
    • Its sender_tx still alive → sender_rx.is_closed() returns false (connection.rs:163)
    • connection_loop continues running → WebSocket stays open
    • state_tx clone still alive → state_rx.changed() doesn't error → reconnection_handler doesn't exit

Result: Every ChannelResources removal leaks:

  • 1 connection_loop background task (holding a live WebSocket)
  • 1 heartbeat_loop background task
  • 1 reconnection_handler background task
  • 1 TCP socket (the WebSocket connection)
  • Associated broadcast channels and mpsc channels

Bug 2: No graceful WebSocket close on disconnect/reconnect

In handle_connection (connection.rs:228-320), when the function returns (either on error or normal exit), the split write/read halves are simply dropped:

let (mut write, mut read) = ws_stream.split();
// ... select loop ...
// Cleanup
heartbeat_handle.abort();
Ok(())  // ← write and read dropped implicitly, no Close frame sent

Per the WebSocket protocol (RFC 6455 §7.1.1), a proper close requires sending a Close frame. Dropping split halves without sending Close leaves the TCP connection in a half-open state. The OS keeps the socket open until the TCP keepalive timeout expires (typically minutes to hours), during which time the file descriptor is consumed.

Bug 3: No Drop implementation for ConnectionManager

ConnectionManager has no impl Drop. When all clones are dropped, there is no mechanism to:

  • Signal the connection_loop to stop (it only checks sender_rx.is_closed() which depends on ALL clones being dropped — see Bug 1)
  • Abort the spawned connection_loop task
  • Send a WebSocket Close frame
  • Close the underlying TCP stream

How our usage pattern triggers the leak

Our application runs strategies in 15-minute intervals. Each interval:

  1. Strategies call subscribe_orderbook(asset_ids) and subscribe_prices(asset_ids) — creates or reuses ChannelResources
  2. At interval end, strategies call unsubscribe_orderbook(asset_ids) and unsubscribe_prices(asset_ids)
  3. unsubscribe_and_cleanup() checks has_subscriptions() — if empty, calls entry.remove()
  4. New interval starts → get_or_create_channel() finds vacant entry → creates new ChannelResources
  5. Old ChannelResources's ConnectionManager and tasks are never cleaned up (Bug 1)

For the user channel (per-strategy, not shared), this happens for each of 8 strategies every 15 minutes = 32 leaked ConnectionManager instances per hour.

For RtdsClient, each RtdsClient::default() creates a ConnectionManager with a reconnection_handler. If the client is dropped and recreated, the same circular reference prevents cleanup.

Additionally, within each ConnectionManager, the reconnect loop (connection.rs:161-224) creates new TCP connections on each reconnect attempt without sending a WebSocket Close frame on the old connection (Bug 2), accumulating half-open sockets.

Reproduction

use polymarket_client_sdk::clob::ws::Client;
use polymarket_client_sdk::types::U256;
use std::str::FromStr;
use tokio::time::{sleep, Duration};

#[tokio::main]
async fn main() {
    let client = Client::default();
    let asset_id = U256::from_str("1234...").unwrap();

    // Each iteration leaks one ConnectionManager + WebSocket + 3 background tasks
    for i in 0..100 {
        let _stream = client.subscribe_orderbook(vec![asset_id]).unwrap();
        // Stream is consumed for a while...
        client.unsubscribe_orderbook(&[asset_id]).unwrap();
        // ChannelResources removed from DashMap, but tasks keep running

        sleep(Duration::from_secs(1)).await;

        // Check FD count
        if let Ok(count) = std::fs::read_dir("/proc/self/fd").map(|d| d.count()) {
            println!("Iteration {i}: FDs = {count}");
        }
    }
}

Suggested fix

1. Break the circular reference

Store the JoinHandle from start_reconnection_handler and abort it when ChannelResources is dropped:

struct ChannelResources {
    connection: ConnectionManager<WsMessage, Arc<InterestTracker>>,
    subscriptions: Arc<SubscriptionManager>,
    reconnection_handle: tokio::task::JoinHandle<()>,  // NEW
}

impl Drop for ChannelResources {
    fn drop(&mut self) {
        self.reconnection_handle.abort();  // Break the Arc cycle
    }
}

2. Add graceful shutdown to ConnectionManager

impl<M, P> ConnectionManager<M, P> {
    /// Signal the connection loop to stop and close the WebSocket gracefully.
    pub fn shutdown(&self) {
        // Dropping all sender_tx clones causes sender_rx.is_closed() → true
        // But we need a dedicated shutdown signal since clones may exist
    }
}

impl<M, P> Drop for ConnectionManager<M, P> {
    fn drop(&mut self) {
        // Could use a CancellationToken or oneshot channel to signal shutdown
    }
}

Alternatively, use a CancellationToken shared between ConnectionManager and connection_loop:

pub struct ConnectionManager<M, P> {
    // ... existing fields ...
    cancel: CancellationToken,  // NEW
}

// In connection_loop, check cancel.is_cancelled() alongside sender_rx.is_closed()
// In Drop, call self.cancel.cancel()

3. Send WebSocket Close frame before reconnect

In handle_connection, before returning:

// Before cleanup
if let Err(e) = write.send(Message::Close(None)).await {
    // best-effort, ignore errors
}
heartbeat_handle.abort();
Ok(())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions