|
3 | 3 | //! This module provides a streaming client using the actor pattern for |
4 | 4 | //! real-time market data and account updates. |
5 | 5 | //! |
6 | | -//! # Example |
| 6 | +//! # Quick Start |
7 | 7 | //! |
8 | 8 | //! ```no_run |
9 | 9 | //! use kalshi_trade_rs::auth::KalshiConfig; |
10 | | -//! use kalshi_trade_rs::ws::{Channel, ConnectStrategy, KalshiStreamClient}; |
| 10 | +//! use kalshi_trade_rs::ws::{Channel, ConnectStrategy, KalshiStreamClient, StreamMessage}; |
11 | 11 | //! |
12 | 12 | //! # async fn example() -> Result<(), Box<dyn std::error::Error>> { |
13 | 13 | //! let config = KalshiConfig::from_env()?; |
|
23 | 23 | //! // Subscribe to ticker updates for specific markets |
24 | 24 | //! handle.subscribe(Channel::Ticker, &["INXD-25JAN17-B5955"]).await?; |
25 | 25 | //! |
26 | | -//! // Add more markets (automatically uses add_markets under the hood) |
27 | | -//! handle.subscribe(Channel::Ticker, &["KXBTC-25DEC31-100000"]).await?; |
28 | | -//! |
29 | | -//! // Check what markets we're subscribed to |
30 | | -//! println!("Ticker markets: {:?}", handle.markets(Channel::Ticker)); |
31 | | -//! |
32 | | -//! // Process updates - handle disconnection events |
| 26 | +//! // Process updates |
33 | 27 | //! while let Ok(update) = handle.update_receiver.recv().await { |
34 | 28 | //! match &update.msg { |
35 | | -//! kalshi_trade_rs::ws::StreamMessage::Closed { reason } => { |
36 | | -//! println!("Connection closed: {}", reason); |
37 | | -//! break; |
| 29 | +//! StreamMessage::Ticker(data) => { |
| 30 | +//! println!("{}: {}¢", data.market_ticker, data.price); |
38 | 31 | //! } |
39 | | -//! kalshi_trade_rs::ws::StreamMessage::ConnectionLost { reason } => { |
| 32 | +//! StreamMessage::ConnectionLost { reason } => { |
40 | 33 | //! eprintln!("Connection lost: {}", reason); |
41 | | -//! break; // Reconnect with backoff here |
| 34 | +//! break; // Handle reconnection (see below) |
42 | 35 | //! } |
43 | | -//! _ => println!("Update: {:?}", update), |
| 36 | +//! _ => {} |
44 | 37 | //! } |
45 | 38 | //! } |
| 39 | +//! # Ok(()) |
| 40 | +//! # } |
| 41 | +//! ``` |
| 42 | +//! |
| 43 | +//! # Connection Strategies |
| 44 | +//! |
| 45 | +//! Two strategies control initial connection behavior: |
| 46 | +//! |
| 47 | +//! - [`ConnectStrategy::Simple`] - Single attempt, fast-fail on error. Good for testing. |
| 48 | +//! - [`ConnectStrategy::Retry`] - Exponential backoff until connected. Recommended for production. |
| 49 | +//! |
| 50 | +//! **Note**: These strategies only apply to the *initial* connection. Once connected, |
| 51 | +//! handling disconnections is your responsibility (see Reconnection Pattern below). |
46 | 52 | //! |
47 | | -//! // Unsubscribe from specific markets |
48 | | -//! handle.unsubscribe(Channel::Ticker, &["INXD-25JAN17-B5955"]).await?; |
| 53 | +//! # Reconnection Pattern |
49 | 54 | //! |
50 | | -//! // Or unsubscribe from entire channel |
| 55 | +//! The library does not automatically reconnect after a connection is lost. This is |
| 56 | +//! intentional - reconnection policies vary by application (e.g., max retries, backoff |
| 57 | +//! strategy, whether to resubscribe to the same markets). |
| 58 | +//! |
| 59 | +//! When the connection is lost, you'll receive [`StreamMessage::ConnectionLost`] on |
| 60 | +//! your update receiver. Implement reconnection like this: |
| 61 | +//! |
| 62 | +//! ```no_run |
| 63 | +//! use kalshi_trade_rs::auth::KalshiConfig; |
| 64 | +//! use kalshi_trade_rs::ws::{Channel, ConnectStrategy, KalshiStreamClient, StreamMessage}; |
| 65 | +//! use std::time::Duration; |
| 66 | +//! |
| 67 | +//! async fn run_with_reconnect(config: &KalshiConfig) -> Result<(), Box<dyn std::error::Error>> { |
| 68 | +//! let markets = vec!["INXD-25JAN17-B5955", "KXBTC-25DEC31-100000"]; |
| 69 | +//! let mut attempt = 0; |
| 70 | +//! |
| 71 | +//! loop { |
| 72 | +//! // Connect (Retry strategy handles initial connection retries) |
| 73 | +//! let client = match KalshiStreamClient::connect_with_strategy( |
| 74 | +//! config, |
| 75 | +//! ConnectStrategy::Retry, |
| 76 | +//! ).await { |
| 77 | +//! Ok(c) => { |
| 78 | +//! attempt = 0; // Reset on successful connect |
| 79 | +//! c |
| 80 | +//! } |
| 81 | +//! Err(e) => { |
| 82 | +//! eprintln!("Connection failed: {}", e); |
| 83 | +//! continue; |
| 84 | +//! } |
| 85 | +//! }; |
| 86 | +//! |
| 87 | +//! let mut handle = client.handle(); |
| 88 | +//! |
| 89 | +//! // Resubscribe to markets |
| 90 | +//! for market in &markets { |
| 91 | +//! if let Err(e) = handle.subscribe(Channel::Ticker, &[market]).await { |
| 92 | +//! eprintln!("Subscribe failed: {}", e); |
| 93 | +//! } |
| 94 | +//! } |
| 95 | +//! |
| 96 | +//! // Process updates until disconnection |
| 97 | +//! while let Ok(update) = handle.update_receiver.recv().await { |
| 98 | +//! match &update.msg { |
| 99 | +//! StreamMessage::Ticker(data) => { |
| 100 | +//! println!("{}: {}¢", data.market_ticker, data.price); |
| 101 | +//! } |
| 102 | +//! StreamMessage::ConnectionLost { reason } => { |
| 103 | +//! eprintln!("Connection lost: {}", reason); |
| 104 | +//! break; // Exit inner loop to reconnect |
| 105 | +//! } |
| 106 | +//! StreamMessage::Closed { .. } => { |
| 107 | +//! return Ok(()); // Graceful close, don't reconnect |
| 108 | +//! } |
| 109 | +//! _ => {} |
| 110 | +//! } |
| 111 | +//! } |
| 112 | +//! |
| 113 | +//! // Backoff before reconnecting |
| 114 | +//! attempt += 1; |
| 115 | +//! let backoff = Duration::from_secs(std::cmp::min(attempt * 2, 60)); |
| 116 | +//! eprintln!("Reconnecting in {:?}...", backoff); |
| 117 | +//! tokio::time::sleep(backoff).await; |
| 118 | +//! } |
| 119 | +//! } |
| 120 | +//! ``` |
| 121 | +//! |
| 122 | +//! See the `examples/stream_reconnect.rs` for a complete working example. |
| 123 | +//! |
| 124 | +//! # Available Channels |
| 125 | +//! |
| 126 | +//! Market data channels (require market tickers): |
| 127 | +//! - [`Channel::Ticker`] - Price and volume updates |
| 128 | +//! - [`Channel::Trade`] - Executed trades |
| 129 | +//! - [`Channel::OrderbookDelta`] - Orderbook changes |
| 130 | +//! - [`Channel::MarketLifecycle`] - Market status changes |
| 131 | +//! |
| 132 | +//! User channels (no market tickers needed): |
| 133 | +//! - [`Channel::Fill`] - Your executed fills |
| 134 | +//! - [`Channel::MarketPositions`] - Position changes |
| 135 | +//! - [`Channel::Communications`] - RFQ/quote updates |
| 136 | +//! - [`Channel::Multivariate`] - Multivariate event updates |
| 137 | +//! |
| 138 | +//! # Subscription Management |
| 139 | +//! |
| 140 | +//! ```no_run |
| 141 | +//! # use kalshi_trade_rs::ws::{Channel, KalshiStreamHandle}; |
| 142 | +//! # async fn example(handle: &mut KalshiStreamHandle) -> Result<(), Box<dyn std::error::Error>> { |
| 143 | +//! // Subscribe to markets |
| 144 | +//! handle.subscribe(Channel::Ticker, &["MARKET-A", "MARKET-B"]).await?; |
| 145 | +//! |
| 146 | +//! // Add more markets to existing subscription |
| 147 | +//! handle.subscribe(Channel::Ticker, &["MARKET-C"]).await?; |
| 148 | +//! |
| 149 | +//! // Remove specific markets |
| 150 | +//! handle.unsubscribe(Channel::Ticker, &["MARKET-A"]).await?; |
| 151 | +//! |
| 152 | +//! // Unsubscribe from entire channel |
51 | 153 | //! handle.unsubscribe_all(Channel::Ticker).await?; |
| 154 | +//! |
| 155 | +//! // Check current subscriptions |
| 156 | +//! println!("Markets: {:?}", handle.markets(Channel::Ticker)); |
| 157 | +//! println!("Subscribed: {}", handle.is_subscribed(Channel::Ticker)); |
52 | 158 | //! # Ok(()) |
53 | 159 | //! # } |
54 | 160 | //! ``` |
|
0 commit comments