Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions src/clob/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,36 @@ impl ClientInner<Unauthenticated> {
signer: &S,
nonce: Option<u32>,
) -> Result<Credentials> {
match self.create_api_key(signer, nonce).await {
// DERIVE-FIRST. Polymarket's `POST /auth/api-key` is NOT
// idempotent: each successful call (different timestamp =>
// different EIP-712 signature) registers a NEW api-key for
// the wallet, but only ONE — the canonical one — is
// recognised by the authenticated user-WSS channel
// (`wss://ws-subscriptions-clob.polymarket.com/ws/user`).
// Subscribing with a non-canonical key results in a silent
// server-side reset (`Connection reset without closing
// handshake`) within ~30 ms.
//
// Empirical verification 2026-05-07 (same wallet, nonce=1):
// - Rust create_then_derive: returned `29184383-…`,
// valid for REST balance/orders, rejected on WSS.
// - `GET /auth/derive-api-key`: returned `57f6d806-…`
// for the same wallet+nonce, accepted on WSS.
//
// `derive_api_key` returns the canonical key that POSTed in
// the past. Only when no key has ever been created for this
// wallet+nonce do we POST to create one — then re-derive to
// pick up the canonical (which on a fresh wallet+nonce is
// the one we just created, but on a wallet that has been
// POST-spammed in the past is whichever Polymarket nominated
// as canonical).
match self.derive_api_key(signer, nonce).await {
Ok(creds) => Ok(creds),
Err(err) if err.kind() == ErrorKind::Status => {
// Only fall back to derive_api_key for HTTP status errors (server responded
// with an error, e.g., key already exists). Propagate network/internal errors.
// No canonical key yet (404). Create one, then
// re-derive to make sure we return whatever
// Polymarket marks canonical on subsequent calls.
self.create_api_key(signer, nonce).await?;
Comment thread
cursor[bot] marked this conversation as resolved.
self.derive_api_key(signer, nonce).await
}
Err(err) => Err(err),
Expand Down
12 changes: 12 additions & 0 deletions src/clob/ws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,18 @@ impl ChannelResources {
}
}

impl Drop for ChannelResources {
fn drop(&mut self) {
// Break the Arc cycle that the resubscribe task creates by
// holding `Arc<SubscriptionManager>`. After abort, that task
// drops its reference, the manager's strong count falls to
// zero, its `ConnectionManager` clone is released, and the
// connection-loop guard inside `ConnectionManager` aborts the
// connection task in turn.
self.subscriptions.abort_reconnection_handler();
}
}

fn normalize_base_endpoint(endpoint: &str) -> String {
let trimmed = endpoint.trim_end_matches('/');
if let Some(stripped) = trimmed.strip_suffix("/ws/market") {
Expand Down
217 changes: 187 additions & 30 deletions src/clob/ws/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@

use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, PoisonError, RwLock};
use std::sync::{Arc, OnceLock, PoisonError, RwLock};
use std::time::Instant;

use async_stream::try_stream;
use dashmap::{DashMap, Entry};
use futures::Stream;
use tokio::sync::broadcast::error::RecvError;
use tokio::task::JoinHandle;

use super::interest::{InterestTracker, MessageInterest};
use super::types::request::SubscriptionRequest;
Expand Down Expand Up @@ -84,6 +85,13 @@ pub struct SubscriptionManager {
/// Track if custom features were enabled for any market subscription
/// (enables `best_bid_ask`, `new_market`, `market_resolved` messages)
custom_features_enabled: AtomicBool,
/// `JoinHandle` for the resubscribe task spawned by
/// [`Self::start_reconnection_handler`]. Stored here so callers
/// holding the owning `Arc<SubscriptionManager>` can cancel it via
/// [`Self::abort_reconnection_handler`] — the resubscribe task
/// captures `Arc<Self>`, which would otherwise create a cycle that
/// keeps the manager alive forever.
resub_handle: OnceLock<JoinHandle<()>>,
}

impl SubscriptionManager {
Expand All @@ -101,48 +109,72 @@ impl SubscriptionManager {
subscribed_markets: DashMap::new(),
last_auth: Arc::new(RwLock::new(None)),
custom_features_enabled: AtomicBool::new(false),
resub_handle: OnceLock::new(),
}
}

/// Start the reconnection handler that re-subscribes on connection recovery.
///
/// Idempotent and leak-safe: the spawn happens inside
/// [`OnceLock::get_or_init`] so a second call neither double-spawns
/// the task nor drops a `JoinHandle` whose task still owns
/// `Arc<Self>` (Tokio drop merely detaches; a detached resubscribe
/// task would re-create the very Arc cycle this module exists to
/// break). The cloned `Arc<Self>` lives inside the outer
/// closure and is only consumed if the closure runs.
pub fn start_reconnection_handler(self: &Arc<Self>) {
let this = Arc::clone(self);

tokio::spawn(async move {
let mut state_rx = this.connection.state_receiver();
let mut was_connected = state_rx.borrow().is_connected();

loop {
// Wait for next state change
if state_rx.changed().await.is_err() {
// Channel closed, connection manager is gone
break;
}

let state = *state_rx.borrow_and_update();

match state {
ConnectionState::Connected { .. } => {
if was_connected {
// Reconnect to subscriptions
#[cfg(feature = "tracing")]
tracing::debug!("WebSocket reconnected, re-establishing subscriptions");
this.resubscribe_all();
}
was_connected = true;
}
ConnectionState::Disconnected => {
// Connection permanently closed
self.resub_handle.get_or_init(move || {
tokio::spawn(async move {
let mut state_rx = this.connection.state_receiver();
let mut was_connected = state_rx.borrow().is_connected();

loop {
// Wait for next state change
if state_rx.changed().await.is_err() {
// Channel closed, connection manager is gone
break;
}
_ => {
// Other states are no-op

let state = *state_rx.borrow_and_update();

match state {
ConnectionState::Connected { .. } => {
if was_connected {
// Reconnect to subscriptions
#[cfg(feature = "tracing")]
tracing::debug!(
"WebSocket reconnected, re-establishing subscriptions"
);
this.resubscribe_all();
}
was_connected = true;
}
ConnectionState::Disconnected => {
// Connection permanently closed
break;
}
_ => {
// Other states are no-op
}
}
}
}
})
});
}

/// Abort the resubscribe task spawned by
/// [`Self::start_reconnection_handler`]. Must be called by whoever
/// owns `Arc<SubscriptionManager>` before they drop their last
/// reference, otherwise the spawned task keeps a strong `Arc<Self>`
/// and the manager — together with its underlying connection —
/// leaks for the rest of the process.
pub fn abort_reconnection_handler(&self) {
if let Some(handle) = self.resub_handle.get() {
handle.abort();
}
}

/// Re-send subscription requests for all tracked assets and markets.
fn resubscribe_all(&self) {
// Collect all subscribed assets
Expand Down Expand Up @@ -563,3 +595,128 @@ impl SubscriptionManager {
Ok(())
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;
use crate::ws::config::Config;

/// Without [`SubscriptionManager::abort_reconnection_handler`], the
/// task spawned by [`SubscriptionManager::start_reconnection_handler`]
/// holds an `Arc<SubscriptionManager>` clone forever, and the
/// manager's `ConnectionManager` clone keeps the connection-loop
/// `state_tx` / `sender_tx` alive. Both spawned tasks then leak for
/// the rest of the process. This regression test exercises the
/// public abort method and proves the strong count drops to zero.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn abort_reconnection_handler_breaks_arc_cycle() {
let interest = Arc::new(InterestTracker::new());
// Loopback at port 1 will never accept; the connection task will
// spin with backoff. We don't care about traffic — only that the
// task can be cancelled via the abort path and releases its Arc.
let connection = ConnectionManager::new(
"ws://127.0.0.1:1/never-connects".to_owned(),
Config::default(),
Arc::clone(&interest),
)
.expect("ConnectionManager::new should not fail before connect");
let subs = Arc::new(SubscriptionManager::new(connection, interest));
subs.start_reconnection_handler();

let weak = Arc::downgrade(&subs);
subs.abort_reconnection_handler();
drop(subs);

// Yield until the aborted resubscribe task drops its Arc clone.
// 50 yields × 10 ms is plenty for `JoinHandle::abort` to take
// effect; if the cycle were intact, this would never converge.
for _ in 0..50 {
if weak.upgrade().is_none() {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!(
"SubscriptionManager not dropped within 500ms after abort — \
Arc cycle still present"
);
}

/// Mirror test for the connection-loop guard: after the last
/// `ConnectionManager` clone drops, the spawned connection task is
/// aborted via the internal `_task_guard: Arc<AbortOnDropHandle>`.
/// We can't observe the task directly, but we can prove the Arc
/// chain unwinds by checking the manager itself drops.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn connection_manager_clones_release_when_subscription_manager_aborted() {
let interest = Arc::new(InterestTracker::new());
let connection = ConnectionManager::new(
"ws://127.0.0.1:1/never-connects".to_owned(),
Config::default(),
Arc::clone(&interest),
)
.expect("ConnectionManager::new should not fail before connect");
let subs = Arc::new(SubscriptionManager::new(connection.clone(), interest));
subs.start_reconnection_handler();

// Drop the local clone so `subs.connection` is the only userland
// ConnectionManager clone left (plus any held internally by the
// spawned tasks).
drop(connection);

let weak = Arc::downgrade(&subs);
subs.abort_reconnection_handler();
drop(subs);

for _ in 0..50 {
if weak.upgrade().is_none() {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!("ConnectionManager not released within 500ms after subscription abort");
}

/// Calling `start_reconnection_handler` twice must not spawn a
/// second resubscribe task. The first implementation of the leak
/// fix dropped the second `JoinHandle` after `OnceLock::set`
/// returned `Err`, but Tokio drop on a `JoinHandle` only detaches
/// — the orphan task kept its `Arc<Self>` clone forever and the
/// Arc cycle reappeared on the second call. `get_or_init` makes
/// the spawn run at most once and the captured `Arc<Self>` is
/// only consumed when the closure runs.
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn start_reconnection_handler_called_twice_does_not_leak() {
let interest = Arc::new(InterestTracker::new());
let connection = ConnectionManager::new(
"ws://127.0.0.1:1/never-connects".to_owned(),
Config::default(),
Arc::clone(&interest),
)
.expect("ConnectionManager::new should not fail before connect");
let subs = Arc::new(SubscriptionManager::new(connection, interest));

subs.start_reconnection_handler();
// Second call mirrors what userland would do if it forgot it
// had already started the handler. Must be a no-op, not a
// task-leak.
subs.start_reconnection_handler();

let weak = Arc::downgrade(&subs);
subs.abort_reconnection_handler();
drop(subs);

for _ in 0..50 {
if weak.upgrade().is_none() {
return;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
panic!(
"SubscriptionManager not dropped — second start_reconnection_handler \
call leaked an unaborted task holding Arc<Self>"
);
}
}
22 changes: 22 additions & 0 deletions src/rtds/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,23 @@ struct ClientInner<S: State> {
connection: ConnectionManager<RtdsMessage, SimpleParser>,
/// Subscription manager for handling subscriptions
subscriptions: Arc<SubscriptionManager>,
/// Aborts the resubscribe task when the last `ClientInner` is dropped,
/// breaking the Arc cycle that task creates by holding
/// `Arc<SubscriptionManager>`. Lives in its own field rather than as
/// a `Drop` impl on `ClientInner` so that `authenticate()` can still
/// destructure the inner.
resub_abort_guard: ResubAbortGuard,
}

/// RAII handle that aborts the spawned resubscribe task on drop.
/// Carries an extra `Arc<SubscriptionManager>` clone solely to call
/// [`SubscriptionManager::abort_reconnection_handler`] from `Drop`.
struct ResubAbortGuard(Arc<SubscriptionManager>);

impl Drop for ResubAbortGuard {
fn drop(&mut self) {
self.0.abort_reconnection_handler();
}
}

impl Client<Unauthenticated> {
Expand All @@ -76,13 +93,16 @@ impl Client<Unauthenticated> {
// Start reconnection handler to re-subscribe on connection recovery
subscriptions.start_reconnection_handler();

let resub_abort_guard = ResubAbortGuard(Arc::clone(&subscriptions));

Ok(Self {
inner: Arc::new(ClientInner {
state: Unauthenticated,
config,
endpoint: endpoint.to_owned(),
connection,
subscriptions,
resub_abort_guard,
}),
})
}
Expand Down Expand Up @@ -110,6 +130,7 @@ impl Client<Unauthenticated> {
endpoint: inner.endpoint,
connection: inner.connection,
subscriptions: inner.subscriptions,
resub_abort_guard: inner.resub_abort_guard,
}),
})
}
Expand Down Expand Up @@ -325,6 +346,7 @@ impl Client<Authenticated<Normal>> {
endpoint: inner.endpoint,
connection: inner.connection,
subscriptions: inner.subscriptions,
resub_abort_guard: inner.resub_abort_guard,
}),
})
}
Expand Down
Loading