From 6bdcf52b5d100b7355b439cdbd8aec6dd0994554 Mon Sep 17 00:00:00 2001 From: Slavomir Kocka Date: Mon, 4 May 2026 21:18:01 +0200 Subject: [PATCH 1/6] fix(ws): cancel spawned tasks on Client drop to prevent reconnect-loop leak MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dropping `clob::ws::Client` (or `rtds::Client`) did not cancel the background tasks spawned by `ConnectionManager::new` and `SubscriptionManager::start_reconnection_handler`. They kept running for the lifetime of the process, emitting `tracing::error!("Error handling connection: …")` on every reconnect attempt against an endpoint that no longer had any subscribers. Root cause was an Arc cycle: the resubscribe task captured `Arc`, which owned a `ConnectionManager` clone, which kept `state_tx`/`sender_tx` alive — so neither the `state_rx` nor the `sender_rx` await ever errored, and neither task could exit. Fix: - `ws::ConnectionManager` stores its connection-loop `JoinHandle` in an internal `Arc`. When the last `ConnectionManager` clone drops, the abort guard fires and the spawned task is cancelled. - `clob::ws::SubscriptionManager` and `rtds::SubscriptionManager` store their resubscribe `JoinHandle` in a `OnceLock` and expose `pub fn abort_reconnection_handler(&self)`. - `ChannelResources` (clob) gains a `Drop` impl that calls `abort_reconnection_handler` so dropping the `Client` cleans up. - `rtds::ClientInner` gains a `ResubAbortGuard` field that owns an extra `Arc` clone and aborts on drop. (A `Drop` impl on `ClientInner` itself isn't viable because `authenticate()` and `deauthenticate()` destructure it.) The cleanup chain on `Client` drop is now: ChannelResources/ClientInner drops → resubscribe task aborted → its `Arc` released → `SubscriptionManager` drops → its `ConnectionManager` clone drops → `Arc` strong count hits zero → connection task aborted. Adds two `tokio::test` regression cases in `clob/ws/subscription.rs` that prove the cycle break by `Arc::downgrade` then watching the `Weak::upgrade` resolve to `None` after `abort_reconnection_handler`. Without the fix, the weak handle would upgrade indefinitely. Closes #39 --- src/clob/ws/client.rs | 12 ++++ src/clob/ws/subscription.rs | 116 +++++++++++++++++++++++++++++++++++- src/rtds/client.rs | 22 +++++++ src/rtds/subscription.rs | 32 +++++++++- src/ws/connection.rs | 21 ++++++- 5 files changed, 198 insertions(+), 5 deletions(-) diff --git a/src/clob/ws/client.rs b/src/clob/ws/client.rs index 091ce69..2f0c5a8 100644 --- a/src/clob/ws/client.rs +++ b/src/clob/ws/client.rs @@ -643,6 +643,18 @@ impl ChannelResources { } } +impl Drop for ChannelResources { + fn drop(&mut self) { + // Break the Arc cycle that the resubscribe task creates by + // holding `Arc`. After abort, that task + // drops its reference, the manager's strong count falls to + // zero, its `ConnectionManager` clone is released, and the + // connection-loop guard inside `ConnectionManager` aborts the + // connection task in turn. + self.subscriptions.abort_reconnection_handler(); + } +} + fn normalize_base_endpoint(endpoint: &str) -> String { let trimmed = endpoint.trim_end_matches('/'); if let Some(stripped) = trimmed.strip_suffix("/ws/market") { diff --git a/src/clob/ws/subscription.rs b/src/clob/ws/subscription.rs index d01aa2d..6b32b75 100644 --- a/src/clob/ws/subscription.rs +++ b/src/clob/ws/subscription.rs @@ -5,13 +5,14 @@ use std::collections::{HashMap, HashSet}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, PoisonError, RwLock}; +use std::sync::{Arc, OnceLock, PoisonError, RwLock}; use std::time::Instant; use async_stream::try_stream; use dashmap::{DashMap, Entry}; use futures::Stream; use tokio::sync::broadcast::error::RecvError; +use tokio::task::JoinHandle; use super::interest::{InterestTracker, MessageInterest}; use super::types::request::SubscriptionRequest; @@ -84,6 +85,13 @@ pub struct SubscriptionManager { /// Track if custom features were enabled for any market subscription /// (enables `best_bid_ask`, `new_market`, `market_resolved` messages) custom_features_enabled: AtomicBool, + /// `JoinHandle` for the resubscribe task spawned by + /// [`Self::start_reconnection_handler`]. Stored here so callers + /// holding the owning `Arc` can cancel it via + /// [`Self::abort_reconnection_handler`] — the resubscribe task + /// captures `Arc`, which would otherwise create a cycle that + /// keeps the manager alive forever. + resub_handle: OnceLock>, } impl SubscriptionManager { @@ -101,14 +109,18 @@ impl SubscriptionManager { subscribed_markets: DashMap::new(), last_auth: Arc::new(RwLock::new(None)), custom_features_enabled: AtomicBool::new(false), + resub_handle: OnceLock::new(), } } /// Start the reconnection handler that re-subscribes on connection recovery. + /// + /// Idempotent: subsequent calls after the first are no-ops because the + /// stored `JoinHandle` is held in a `OnceLock`. pub fn start_reconnection_handler(self: &Arc) { let this = Arc::clone(self); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let mut state_rx = this.connection.state_receiver(); let mut was_connected = state_rx.borrow().is_connected(); @@ -141,6 +153,22 @@ impl SubscriptionManager { } } }); + + // OnceLock::set returns Err if already set; ignore — caller just + // re-invoked start_reconnection_handler. + let _: std::result::Result<_, _> = self.resub_handle.set(handle); + } + + /// Abort the resubscribe task spawned by + /// [`Self::start_reconnection_handler`]. Must be called by whoever + /// owns `Arc` before they drop their last + /// reference, otherwise the spawned task keeps a strong `Arc` + /// and the manager — together with its underlying connection — + /// leaks for the rest of the process. + pub fn abort_reconnection_handler(&self) { + if let Some(handle) = self.resub_handle.get() { + handle.abort(); + } } /// Re-send subscription requests for all tracked assets and markets. @@ -563,3 +591,87 @@ impl SubscriptionManager { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use crate::ws::config::Config; + + /// Without [`SubscriptionManager::abort_reconnection_handler`], the + /// task spawned by [`SubscriptionManager::start_reconnection_handler`] + /// holds an `Arc` clone forever, and the + /// manager's `ConnectionManager` clone keeps the connection-loop + /// `state_tx` / `sender_tx` alive. Both spawned tasks then leak for + /// the rest of the process. This regression test exercises the + /// public abort method and proves the strong count drops to zero. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn abort_reconnection_handler_breaks_arc_cycle() { + let interest = Arc::new(InterestTracker::new()); + // Loopback at port 1 will never accept; the connection task will + // spin with backoff. We don't care about traffic — only that the + // task can be cancelled via the abort path and releases its Arc. + let connection = ConnectionManager::new( + "ws://127.0.0.1:1/never-connects".to_owned(), + Config::default(), + Arc::clone(&interest), + ) + .expect("ConnectionManager::new should not fail before connect"); + let subs = Arc::new(SubscriptionManager::new(connection, interest)); + subs.start_reconnection_handler(); + + let weak = Arc::downgrade(&subs); + subs.abort_reconnection_handler(); + drop(subs); + + // Yield until the aborted resubscribe task drops its Arc clone. + // 50 yields × 10 ms is plenty for `JoinHandle::abort` to take + // effect; if the cycle were intact, this would never converge. + for _ in 0..50 { + if weak.upgrade().is_none() { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!( + "SubscriptionManager not dropped within 500ms after abort — \ + Arc cycle still present" + ); + } + + /// Mirror test for the connection-loop guard: after the last + /// `ConnectionManager` clone drops, the spawned connection task is + /// aborted via the internal `_task_guard: Arc`. + /// We can't observe the task directly, but we can prove the Arc + /// chain unwinds by checking the manager itself drops. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn connection_manager_clones_release_when_subscription_manager_aborted() { + let interest = Arc::new(InterestTracker::new()); + let connection = ConnectionManager::new( + "ws://127.0.0.1:1/never-connects".to_owned(), + Config::default(), + Arc::clone(&interest), + ) + .expect("ConnectionManager::new should not fail before connect"); + let subs = Arc::new(SubscriptionManager::new(connection.clone(), interest)); + subs.start_reconnection_handler(); + + // Drop the local clone so `subs.connection` is the only userland + // ConnectionManager clone left (plus any held internally by the + // spawned tasks). + drop(connection); + + let weak = Arc::downgrade(&subs); + subs.abort_reconnection_handler(); + drop(subs); + + for _ in 0..50 { + if weak.upgrade().is_none() { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!("ConnectionManager not released within 500ms after subscription abort"); + } +} diff --git a/src/rtds/client.rs b/src/rtds/client.rs index a5940d1..d2646c4 100644 --- a/src/rtds/client.rs +++ b/src/rtds/client.rs @@ -65,6 +65,23 @@ struct ClientInner { connection: ConnectionManager, /// Subscription manager for handling subscriptions subscriptions: Arc, + /// Aborts the resubscribe task when the last `ClientInner` is dropped, + /// breaking the Arc cycle that task creates by holding + /// `Arc`. Lives in its own field rather than as + /// a `Drop` impl on `ClientInner` so that `authenticate()` can still + /// destructure the inner. + resub_abort_guard: ResubAbortGuard, +} + +/// RAII handle that aborts the spawned resubscribe task on drop. +/// Carries an extra `Arc` clone solely to call +/// [`SubscriptionManager::abort_reconnection_handler`] from `Drop`. +struct ResubAbortGuard(Arc); + +impl Drop for ResubAbortGuard { + fn drop(&mut self) { + self.0.abort_reconnection_handler(); + } } impl Client { @@ -76,6 +93,8 @@ impl Client { // Start reconnection handler to re-subscribe on connection recovery subscriptions.start_reconnection_handler(); + let resub_abort_guard = ResubAbortGuard(Arc::clone(&subscriptions)); + Ok(Self { inner: Arc::new(ClientInner { state: Unauthenticated, @@ -83,6 +102,7 @@ impl Client { endpoint: endpoint.to_owned(), connection, subscriptions, + resub_abort_guard, }), }) } @@ -110,6 +130,7 @@ impl Client { endpoint: inner.endpoint, connection: inner.connection, subscriptions: inner.subscriptions, + resub_abort_guard: inner.resub_abort_guard, }), }) } @@ -325,6 +346,7 @@ impl Client> { endpoint: inner.endpoint, connection: inner.connection, subscriptions: inner.subscriptions, + resub_abort_guard: inner.resub_abort_guard, }), }) } diff --git a/src/rtds/subscription.rs b/src/rtds/subscription.rs index 233d4b4..820987f 100644 --- a/src/rtds/subscription.rs +++ b/src/rtds/subscription.rs @@ -3,13 +3,14 @@ reason = "Subscription types deliberately include the module name for clarity" )] -use std::sync::{Arc, PoisonError, RwLock}; +use std::sync::{Arc, OnceLock, PoisonError, RwLock}; use std::time::Instant; use async_stream::try_stream; use dashmap::{DashMap, Entry}; use futures::Stream; use tokio::sync::broadcast::error::RecvError; +use tokio::task::JoinHandle; use super::error::RtdsError; use super::types::request::{Subscription, SubscriptionRequest}; @@ -68,6 +69,13 @@ pub struct SubscriptionManager { /// Subscribed topics with reference counts (for multiplexing) subscribed_topics: DashMap, last_auth: RwLock>, + /// `JoinHandle` for the resubscribe task spawned by + /// [`Self::start_reconnection_handler`]. Stored here so callers + /// holding the owning `Arc` can cancel it via + /// [`Self::abort_reconnection_handler`] — the resubscribe task + /// captures `Arc`, which would otherwise create a cycle that + /// keeps the manager alive forever. + resub_handle: OnceLock>, } impl SubscriptionManager { @@ -79,14 +87,30 @@ impl SubscriptionManager { active_subs: DashMap::new(), subscribed_topics: DashMap::new(), last_auth: RwLock::new(None), + resub_handle: OnceLock::new(), + } + } + + /// Abort the resubscribe task spawned by + /// [`Self::start_reconnection_handler`]. Must be called by whoever + /// owns `Arc` before they drop their last + /// reference, otherwise the spawned task keeps a strong `Arc` + /// and the manager — together with its underlying connection — + /// leaks for the rest of the process. + pub fn abort_reconnection_handler(&self) { + if let Some(handle) = self.resub_handle.get() { + handle.abort(); } } /// Start the reconnection handler that re-subscribes on connection recovery. + /// + /// Idempotent: subsequent calls after the first are no-ops because the + /// stored `JoinHandle` is held in a `OnceLock`. pub fn start_reconnection_handler(self: &Arc) { let this = Arc::clone(self); - tokio::spawn(async move { + let handle = tokio::spawn(async move { let mut state_rx = this.connection.state_receiver(); let mut was_connected = state_rx.borrow().is_connected(); @@ -119,6 +143,10 @@ impl SubscriptionManager { } } }); + + // OnceLock::set returns Err if already set; ignore — caller just + // re-invoked start_reconnection_handler. + let _: std::result::Result<_, _> = self.resub_handle.set(handle); } /// Re-send subscription requests for all tracked topics. diff --git a/src/ws/connection.rs b/src/ws/connection.rs index 56b8c69..737a79c 100644 --- a/src/ws/connection.rs +++ b/src/ws/connection.rs @@ -5,6 +5,7 @@ use std::fmt::Debug; use std::marker::PhantomData; +use std::sync::Arc; use std::time::Instant; use backoff::backoff::Backoff as _; @@ -13,6 +14,7 @@ use serde::Serialize; use serde::de::DeserializeOwned; use tokio::net::TcpStream; use tokio::sync::{broadcast, mpsc, watch}; +use tokio::task::JoinHandle; use tokio::time::{interval, sleep, timeout}; use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, connect_async, tungstenite::Message}; @@ -100,10 +102,26 @@ where sender_tx: mpsc::UnboundedSender, /// Broadcast sender for incoming messages broadcast_tx: broadcast::Sender, + /// Aborts the spawned connection-loop task when the last `ConnectionManager` + /// clone is dropped. Without this, the task holds `state_tx` and + /// `sender_tx` indefinitely and reconnects forever even after every + /// public handle to the manager has been dropped. + _task_guard: Arc, /// Phantom data for unused type parameters _phantom: PhantomData

, } +/// Aborts the wrapped `JoinHandle` when dropped. Used as the cancellation +/// trigger for spawned WebSocket background tasks. +#[derive(Debug)] +struct AbortOnDropHandle(JoinHandle<()>); + +impl Drop for AbortOnDropHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + impl ConnectionManager where M: DeserializeOwned + Debug + Clone + Send + 'static, @@ -125,7 +143,7 @@ where let broadcast_tx_clone = broadcast_tx.clone(); let state_tx_clone = state_tx.clone(); - tokio::spawn(async move { + let handle = tokio::spawn(async move { Self::connection_loop( connection_endpoint, connection_config, @@ -142,6 +160,7 @@ where state_rx, sender_tx, broadcast_tx, + _task_guard: Arc::new(AbortOnDropHandle(handle)), _phantom: PhantomData, }) } From d23a7f23c9e4cead60e1f7a89dc12523904cc793 Mon Sep 17 00:00:00 2001 From: Slavomir Kocka Date: Mon, 4 May 2026 22:56:07 +0200 Subject: [PATCH 2/6] fix(ws): use OnceLock::get_or_init to make start_reconnection_handler leak-safe MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Per Cursor Bugbot review on PR #40: the first version of the fix unconditionally spawned the resubscribe task before attempting `OnceLock::set`. On a repeat call the second `JoinHandle` was dropped, but Tokio drop only detaches a task — the orphan kept its `Arc` clone alive forever, recreating the very Arc cycle the PR exists to break. Switch to `OnceLock::get_or_init`: the closure runs at most once across all callers, and its captured `Arc` is only consumed on the first call. Mirror the change in both `clob::ws::SubscriptionManager` and `rtds::SubscriptionManager`. Add a regression test `start_reconnection_handler_called_twice_does_not_leak` that calls the handler twice and asserts the manager drops within 500ms after abort. Without this fix, the test would never converge — the second-call orphan task holds the strong ref forever. --- src/clob/ws/subscription.rs | 115 +++++++++++++++++++++++++----------- src/rtds/subscription.rs | 72 +++++++++++----------- 2 files changed, 117 insertions(+), 70 deletions(-) diff --git a/src/clob/ws/subscription.rs b/src/clob/ws/subscription.rs index 6b32b75..135f232 100644 --- a/src/clob/ws/subscription.rs +++ b/src/clob/ws/subscription.rs @@ -115,48 +115,52 @@ impl SubscriptionManager { /// Start the reconnection handler that re-subscribes on connection recovery. /// - /// Idempotent: subsequent calls after the first are no-ops because the - /// stored `JoinHandle` is held in a `OnceLock`. + /// Idempotent and leak-safe: the spawn happens inside + /// [`OnceLock::get_or_init`] so a second call neither double-spawns + /// the task nor drops a `JoinHandle` whose task still owns + /// `Arc` (Tokio drop merely detaches; a detached resubscribe + /// task would re-create the very Arc cycle this module exists to + /// break). The cloned `Arc` lives inside the outer + /// closure and is only consumed if the closure runs. pub fn start_reconnection_handler(self: &Arc) { let this = Arc::clone(self); - - let handle = tokio::spawn(async move { - let mut state_rx = this.connection.state_receiver(); - let mut was_connected = state_rx.borrow().is_connected(); - - loop { - // Wait for next state change - if state_rx.changed().await.is_err() { - // Channel closed, connection manager is gone - break; - } - - let state = *state_rx.borrow_and_update(); - - match state { - ConnectionState::Connected { .. } => { - if was_connected { - // Reconnect to subscriptions - #[cfg(feature = "tracing")] - tracing::debug!("WebSocket reconnected, re-establishing subscriptions"); - this.resubscribe_all(); - } - was_connected = true; - } - ConnectionState::Disconnected => { - // Connection permanently closed + self.resub_handle.get_or_init(move || { + tokio::spawn(async move { + let mut state_rx = this.connection.state_receiver(); + let mut was_connected = state_rx.borrow().is_connected(); + + loop { + // Wait for next state change + if state_rx.changed().await.is_err() { + // Channel closed, connection manager is gone break; } - _ => { - // Other states are no-op + + let state = *state_rx.borrow_and_update(); + + match state { + ConnectionState::Connected { .. } => { + if was_connected { + // Reconnect to subscriptions + #[cfg(feature = "tracing")] + tracing::debug!( + "WebSocket reconnected, re-establishing subscriptions" + ); + this.resubscribe_all(); + } + was_connected = true; + } + ConnectionState::Disconnected => { + // Connection permanently closed + break; + } + _ => { + // Other states are no-op + } } } - } + }) }); - - // OnceLock::set returns Err if already set; ignore — caller just - // re-invoked start_reconnection_handler. - let _: std::result::Result<_, _> = self.resub_handle.set(handle); } /// Abort the resubscribe task spawned by @@ -674,4 +678,45 @@ mod tests { } panic!("ConnectionManager not released within 500ms after subscription abort"); } + + /// Calling `start_reconnection_handler` twice must not spawn a + /// second resubscribe task. The first implementation of the leak + /// fix dropped the second `JoinHandle` after `OnceLock::set` + /// returned `Err`, but Tokio drop on a `JoinHandle` only detaches + /// — the orphan task kept its `Arc` clone forever and the + /// Arc cycle reappeared on the second call. `get_or_init` makes + /// the spawn run at most once and the captured `Arc` is + /// only consumed when the closure runs. + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn start_reconnection_handler_called_twice_does_not_leak() { + let interest = Arc::new(InterestTracker::new()); + let connection = ConnectionManager::new( + "ws://127.0.0.1:1/never-connects".to_owned(), + Config::default(), + Arc::clone(&interest), + ) + .expect("ConnectionManager::new should not fail before connect"); + let subs = Arc::new(SubscriptionManager::new(connection, interest)); + + subs.start_reconnection_handler(); + // Second call mirrors what userland would do if it forgot it + // had already started the handler. Must be a no-op, not a + // task-leak. + subs.start_reconnection_handler(); + + let weak = Arc::downgrade(&subs); + subs.abort_reconnection_handler(); + drop(subs); + + for _ in 0..50 { + if weak.upgrade().is_none() { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + panic!( + "SubscriptionManager not dropped — second start_reconnection_handler \ + call leaked an unaborted task holding Arc" + ); + } } diff --git a/src/rtds/subscription.rs b/src/rtds/subscription.rs index 820987f..a7283e6 100644 --- a/src/rtds/subscription.rs +++ b/src/rtds/subscription.rs @@ -105,48 +105,50 @@ impl SubscriptionManager { /// Start the reconnection handler that re-subscribes on connection recovery. /// - /// Idempotent: subsequent calls after the first are no-ops because the - /// stored `JoinHandle` is held in a `OnceLock`. + /// Idempotent and leak-safe: the spawn happens inside + /// [`OnceLock::get_or_init`] so a second call neither double-spawns + /// the task nor drops a `JoinHandle` whose task still owns + /// `Arc` (Tokio drop merely detaches; a detached resubscribe + /// task would re-create the very Arc cycle this module exists to + /// break). The cloned `Arc` lives inside the outer + /// closure and is only consumed if the closure runs. pub fn start_reconnection_handler(self: &Arc) { let this = Arc::clone(self); - - let handle = tokio::spawn(async move { - let mut state_rx = this.connection.state_receiver(); - let mut was_connected = state_rx.borrow().is_connected(); - - loop { - // Wait for next state change - if state_rx.changed().await.is_err() { - // Channel closed, connection manager is gone - break; - } - - let state = *state_rx.borrow_and_update(); - - match state { - ConnectionState::Connected { .. } => { - if was_connected { - // Reconnect to subscriptions - #[cfg(feature = "tracing")] - tracing::debug!("RTDS reconnected, re-establishing subscriptions"); - this.resubscribe_all(); - } - was_connected = true; - } - ConnectionState::Disconnected => { - // Connection permanently closed + self.resub_handle.get_or_init(move || { + tokio::spawn(async move { + let mut state_rx = this.connection.state_receiver(); + let mut was_connected = state_rx.borrow().is_connected(); + + loop { + // Wait for next state change + if state_rx.changed().await.is_err() { + // Channel closed, connection manager is gone break; } - _ => { - // Other states are no-op + + let state = *state_rx.borrow_and_update(); + + match state { + ConnectionState::Connected { .. } => { + if was_connected { + // Reconnect to subscriptions + #[cfg(feature = "tracing")] + tracing::debug!("RTDS reconnected, re-establishing subscriptions"); + this.resubscribe_all(); + } + was_connected = true; + } + ConnectionState::Disconnected => { + // Connection permanently closed + break; + } + _ => { + // Other states are no-op + } } } - } + }) }); - - // OnceLock::set returns Err if already set; ignore — caller just - // re-invoked start_reconnection_handler. - let _: std::result::Result<_, _> = self.resub_handle.set(handle); } /// Re-send subscription requests for all tracked topics. From 31b03713c11d1958e015e7765aa9205038a254a7 Mon Sep 17 00:00:00 2001 From: Slavomir Kocka Date: Thu, 7 May 2026 23:07:15 +0200 Subject: [PATCH 3/6] fix(auth): create_or_derive_api_key derive-first to return canonical key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Polymarket's `POST /auth/api-key` is NOT idempotent: each successful call (different timestamp ⇒ different EIP-712 signature) registers a NEW api-key for the wallet, but only ONE — the canonical one — is recognised by the authenticated user-WSS channel (`wss://ws-subscriptions-clob.polymarket.com/ws/user`). Subscribing with a non-canonical key is silently rejected: server resets the connection within ~30 ms with `Connection reset without closing handshake`. The non-canonical key still works for REST balance and order calls, so the bug is invisible until the operator opens `subscribe_user_events`. Empirical reproduction 2026-05-07 with this fork on the same wallet + nonce=1: - Old `create_then_derive`: returned `29184383-…`, valid for REST orders, rejected on user-WSS. - `GET /auth/derive-api-key` only: returned `57f6d806-…` for the same wallet+nonce, accepted on user-WSS. The Python SDK (py-clob-client-v2) reproduces the issue when running against the same wallet — its `create_or_derive_api_key` also POSTs first. The bug is endpoint-side; clients that only call `/auth/derive-api-key` get the canonical key. Fix --- `create_or_derive_api_key` now derives first. Only when derive 404s (no key has ever been created for this wallet+nonce) do we POST to create one — then re-derive to pick up the now-canonical key. The HTTP round-trip count goes from 2 (POST + GET) to 1 (GET) on the hot path; cold-start adds the POST + a second GET. Tests ----- - `create_or_derive_api_key_returns_canonical_via_derive_without_creating` asserts POST is NEVER hit when derive succeeds (regression-pin for the WSS rejection). - `create_or_derive_api_key_creates_then_re_derives_when_no_canonical_exists` pins the cold-start path — derive 404 → POST create → derive again. - Existing `create_authenticated` test helper and `sign_order_should_succeed` updated: `/time` is now called once on the derive-only path (was twice when both endpoints were hit). 451/451 SDK tests pass. --- src/clob/client.rs | 31 ++++++++++++++++++++--- tests/auth.rs | 60 +++++++++++++++++++++++++++++++++++++++------ tests/clob.rs | 4 ++- tests/common/mod.rs | 4 ++- 4 files changed, 86 insertions(+), 13 deletions(-) diff --git a/src/clob/client.rs b/src/clob/client.rs index a335b61..0208b92 100644 --- a/src/clob/client.rs +++ b/src/clob/client.rs @@ -522,11 +522,36 @@ impl ClientInner { signer: &S, nonce: Option, ) -> Result { - match self.create_api_key(signer, nonce).await { + // DERIVE-FIRST. Polymarket's `POST /auth/api-key` is NOT + // idempotent: each successful call (different timestamp => + // different EIP-712 signature) registers a NEW api-key for + // the wallet, but only ONE — the canonical one — is + // recognised by the authenticated user-WSS channel + // (`wss://ws-subscriptions-clob.polymarket.com/ws/user`). + // Subscribing with a non-canonical key results in a silent + // server-side reset (`Connection reset without closing + // handshake`) within ~30 ms. + // + // Empirical verification 2026-05-07 (same wallet, nonce=1): + // - Rust create_then_derive: returned `29184383-…`, + // valid for REST balance/orders, rejected on WSS. + // - `GET /auth/derive-api-key`: returned `57f6d806-…` + // for the same wallet+nonce, accepted on WSS. + // + // `derive_api_key` returns the canonical key that POSTed in + // the past. Only when no key has ever been created for this + // wallet+nonce do we POST to create one — then re-derive to + // pick up the canonical (which on a fresh wallet+nonce is + // the one we just created, but on a wallet that has been + // POST-spammed in the past is whichever Polymarket nominated + // as canonical). + match self.derive_api_key(signer, nonce).await { Ok(creds) => Ok(creds), Err(err) if err.kind() == ErrorKind::Status => { - // Only fall back to derive_api_key for HTTP status errors (server responded - // with an error, e.g., key already exists). Propagate network/internal errors. + // No canonical key yet (404). Create one, then + // re-derive to make sure we return whatever + // Polymarket marks canonical on subsequent calls. + self.create_api_key(signer, nonce).await?; self.derive_api_key(signer, nonce).await } Err(err) => Err(err), diff --git a/tests/auth.rs b/tests/auth.rs index 7eb0f75..a2ebe8d 100644 --- a/tests/auth.rs +++ b/tests/auth.rs @@ -192,16 +192,16 @@ async fn derive_api_key_should_succeed() -> anyhow::Result<()> { } #[tokio::test] -async fn create_or_derive_api_key_should_succeed() -> anyhow::Result<()> { +async fn create_or_derive_api_key_returns_canonical_via_derive_without_creating() -> anyhow::Result<()> { + // Hot path: a key for this wallet+nonce already exists (Polymarket + // returned it on derive). The fix is to NEVER POST in this case — + // POST creates a new non-canonical key that Polymarket rejects on + // the user-WSS channel. Verified empirically 2026-05-07. let server = MockServer::start(); let signer = LocalSigner::from_str(PRIVATE_KEY)?.with_chain_id(Some(POLYGON)); let client = Client::new(&server.base_url(), Config::default())?; - let mock = server.mock(|when, then| { - when.method(httpmock::Method::POST).path("/auth/api-key"); - then.status(StatusCode::NOT_FOUND); - }); - let mock2 = server.mock(|when, then| { + let derive_mock = server.mock(|when, then| { when.method(httpmock::Method::GET) .path("/auth/derive-api-key") .header(POLY_ADDRESS, signer.address().to_string().to_lowercase()); @@ -211,13 +211,57 @@ async fn create_or_derive_api_key_should_succeed() -> anyhow::Result<()> { "secret": SECRET })); }); + // POST must NOT be hit when derive succeeds. + let create_mock = server.mock(|when, then| { + when.method(httpmock::Method::POST).path("/auth/api-key"); + then.status(StatusCode::OK).json_body(json!({"apiKey":"BAD","passphrase":"x","secret":"y"})); + }); let credentials = client.create_or_derive_api_key(&signer, None).await?; assert_eq!(credentials.key(), API_KEY); - mock.assert(); - mock2.assert(); + derive_mock.assert(); + create_mock.assert_hits(0); + Ok(()) +} + +#[tokio::test] +async fn create_or_derive_api_key_creates_then_re_derives_when_no_canonical_exists() -> anyhow::Result<()> { + // Cold path: derive 404s (no key has ever been created for this + // wallet+nonce). Then we POST to create one, then re-derive to + // pick up whatever Polymarket nominates as canonical (which on a + // fresh wallet+nonce is the one we just created, but on a + // previously-spammed wallet may differ). + let server = MockServer::start(); + let signer = LocalSigner::from_str(PRIVATE_KEY)?.with_chain_id(Some(POLYGON)); + let client = Client::new(&server.base_url(), Config::default())?; + + let derive_404 = server.mock(|when, then| { + when.method(httpmock::Method::GET) + .path("/auth/derive-api-key") + .header(POLY_ADDRESS, signer.address().to_string().to_lowercase()); + then.status(StatusCode::NOT_FOUND); + }); + let create_ok = server.mock(|when, then| { + when.method(httpmock::Method::POST) + .path("/auth/api-key") + .header(POLY_ADDRESS, signer.address().to_string().to_lowercase()); + then.status(StatusCode::OK).json_body(json!({ + "apiKey": API_KEY.to_string(), + "passphrase": PASSPHRASE, + "secret": SECRET + })); + }); + let err = client.create_or_derive_api_key(&signer, None).await; + // After create succeeds, we re-derive — derive_404 still 404s in + // this mock setup, so the call propagates the error. The + // important invariant: POST was reached, proving derive-first + // didn't short-circuit. A real Polymarket would return the key + // on the second derive. + assert!(err.is_err()); + derive_404.assert_hits(2); // before-create + after-create + create_ok.assert(); Ok(()) } diff --git a/tests/clob.rs b/tests/clob.rs index 9bc094a..f699678 100644 --- a/tests/clob.rs +++ b/tests/clob.rs @@ -1590,7 +1590,9 @@ mod authenticated { assert_eq!(signed_order.owner.to_string(), API_KEY.to_string()); assert_eq!(signed_order.order_type, OrderType::GTC); mock.assert(); - mock2.assert_calls(2); + // Was 2 (POST create + GET derive each fetched server time); + // derive-first only hits /time once. + mock2.assert_calls(1); Ok(()) } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 65df334..29288f4 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -88,7 +88,9 @@ pub async fn create_authenticated(server: &MockServer) -> anyhow::Result Date: Thu, 7 May 2026 23:26:05 +0200 Subject: [PATCH 4/6] tmp: probe instrumentation --- src/clob/client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/clob/client.rs b/src/clob/client.rs index 0208b92..265f9bb 100644 --- a/src/clob/client.rs +++ b/src/clob/client.rs @@ -522,6 +522,7 @@ impl ClientInner { signer: &S, nonce: Option, ) -> Result { + eprintln!("[SDK FIX] create_or_derive_api_key entered (derive-first path); nonce={nonce:?}"); // DERIVE-FIRST. Polymarket's `POST /auth/api-key` is NOT // idempotent: each successful call (different timestamp => // different EIP-712 signature) registers a NEW api-key for From ce7357e309372af7c194194cef4c64a0bc92d4b7 Mon Sep 17 00:00:00 2001 From: Slavomir Kocka Date: Thu, 7 May 2026 23:30:37 +0200 Subject: [PATCH 5/6] remove temporary instrumentation --- src/clob/client.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/clob/client.rs b/src/clob/client.rs index 265f9bb..0208b92 100644 --- a/src/clob/client.rs +++ b/src/clob/client.rs @@ -522,7 +522,6 @@ impl ClientInner { signer: &S, nonce: Option, ) -> Result { - eprintln!("[SDK FIX] create_or_derive_api_key entered (derive-first path); nonce={nonce:?}"); // DERIVE-FIRST. Polymarket's `POST /auth/api-key` is NOT // idempotent: each successful call (different timestamp => // different EIP-712 signature) registers a NEW api-key for From 8563bd60cfdf0f772d1684b26ca1d882216096af Mon Sep 17 00:00:00 2001 From: Slavomir Kocka Date: Thu, 7 May 2026 23:55:13 +0200 Subject: [PATCH 6/6] fix(auth): scope derive-first fallback to HTTP 404 only MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Cursor Bugbot caught: the previous fallback guard `err.kind() == ErrorKind::Status` matched ANY non-2xx response (500/503/429/etc.). Because the new fallback now performs a mutation (`create_api_key` registers a new non-canonical key on every successful POST), a transient blip on the derive endpoint would silently register a new key — exactly the bug derive-first is meant to prevent. Tighten the guard to HTTP 404 only via a small helper `is_not_found(&Error)` that downcasts the dyn error to `crate::error::Status` and checks `status_code == NOT_FOUND`. Other status errors propagate unchanged. Test: `create_or_derive_api_key_propagates_non_404_status_without_creating` loops over 500, 503, 429 and asserts derive returns the error verbatim, with create_api_key hit zero times in each case. Full SDK suite: 452/452 passing. --- src/clob/client.rs | 24 ++++++++++++++++++++++-- tests/auth.rs | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 2 deletions(-) diff --git a/src/clob/client.rs b/src/clob/client.rs index 0208b92..fdfa318 100644 --- a/src/clob/client.rs +++ b/src/clob/client.rs @@ -547,17 +547,37 @@ impl ClientInner { // as canonical). match self.derive_api_key(signer, nonce).await { Ok(creds) => Ok(creds), - Err(err) if err.kind() == ErrorKind::Status => { - // No canonical key yet (404). Create one, then + Err(err) if is_not_found(&err) => { + // No canonical key yet (HTTP 404). Create one, then // re-derive to make sure we return whatever // Polymarket marks canonical on subsequent calls. + // + // Critical: scoped to 404 specifically. Falling back + // on ANY status error (500/503/429/etc.) would issue + // a `create_api_key` POST on every transient + // derive-endpoint blip, registering a NEW non-canonical + // key each time — exactly the bug this PR exists to + // fix. Other Status errors propagate to the caller. self.create_api_key(signer, nonce).await?; self.derive_api_key(signer, nonce).await } Err(err) => Err(err), } } +} +/// Returns true iff `err` is an HTTP error wrapping a 404 status. +/// Anything else (other 4xx/5xx, network failure, parse error) is +/// treated as transient and propagated by `create_or_derive_api_key`. +fn is_not_found(err: &crate::error::Error) -> bool { + if err.kind() != ErrorKind::Status { + return false; + } + err.downcast_ref::() + .is_some_and(|s| s.status_code == crate::error::StatusCode::NOT_FOUND) +} + +impl ClientInner { async fn create_headers(&self, signer: &S, nonce: Option) -> Result { let chain_id = signer.chain_id().ok_or(Error::validation( "Chain id not set, be sure to provide one on the signer", diff --git a/tests/auth.rs b/tests/auth.rs index a2ebe8d..a974db3 100644 --- a/tests/auth.rs +++ b/tests/auth.rs @@ -282,6 +282,45 @@ async fn create_or_derive_api_key_should_propagate_network_errors() -> anyhow::R Ok(()) } +#[tokio::test] +async fn create_or_derive_api_key_propagates_non_404_status_without_creating() -> anyhow::Result<()> { + // The fallback to `create_api_key` is scoped to HTTP 404 only. + // A transient 500/503/429 from the derive endpoint MUST NOT + // trigger create — that would register a new non-canonical key + // on every blip, defeating the purpose of derive-first. Pinned + // because Cursor Bugbot caught this on the original patch + // (`err.kind() == ErrorKind::Status` matched ANY status). + for code in [ + StatusCode::INTERNAL_SERVER_ERROR, + StatusCode::TOO_MANY_REQUESTS, + StatusCode::SERVICE_UNAVAILABLE, + ] { + let server = MockServer::start(); + let signer = LocalSigner::from_str(PRIVATE_KEY)?.with_chain_id(Some(POLYGON)); + let client = Client::new(&server.base_url(), Config::default())?; + + let derive_err = server.mock(|when, then| { + when.method(httpmock::Method::GET) + .path("/auth/derive-api-key"); + then.status(code); + }); + let create_mock = server.mock(|when, then| { + when.method(httpmock::Method::POST).path("/auth/api-key"); + then.status(StatusCode::OK) + .json_body(json!({"apiKey": API_KEY, "passphrase": PASSPHRASE, "secret": SECRET})); + }); + + let err = client + .create_or_derive_api_key(&signer, None) + .await + .expect_err(&format!("status {code} must propagate, not trigger create")); + assert_eq!(err.kind(), Kind::Status, "got {err:?} for code {code}"); + derive_err.assert(); + create_mock.assert_hits(0); + } + Ok(()) +} + #[test] fn credentials_secret_accessor_should_return_secret() { let credentials = Credentials::new(API_KEY, SECRET.to_owned(), PASSPHRASE.to_owned());