Skip to content

Commit 2893e92

Browse files
committed
feat: add OrderbookAggregator for live orderbook state
Add a stateful aggregator that maintains orderbook state from WebSocket delta updates. Provides both pull-based (polling) and push-based (streaming) access patterns for querying best bid/ask, spread, depth, etc. - Add orderbook module with state.rs and aggregator.rs - Export OrderbookAggregator, OrderbookSummary, OrderbookUpdate, etc. - Export PriceLevel type from ws module - Add orderbook_aggregator example demonstrating usage
1 parent 1f0cdaa commit 2893e92

File tree

6 files changed

+1364
-2
lines changed

6 files changed

+1364
-2
lines changed

examples/orderbook_aggregator.rs

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
//! Example: Orderbook aggregator for live orderbook state.
2+
//!
3+
//! This example demonstrates how to use the OrderbookAggregator to maintain
4+
//! live orderbook state from WebSocket delta updates.
5+
//!
6+
//! # Usage
7+
//!
8+
//! Set the following environment variables:
9+
//! - `KALSHI_API_KEY_ID`: Your Kalshi API key ID
10+
//! - `KALSHI_PRIVATE_KEY_PATH`: Path to your RSA private key PEM file
11+
//! - `KALSHI_ENV`: "demo" or "prod" (defaults to "demo")
12+
//!
13+
//! Then run:
14+
//! ```bash
15+
//! cargo run --example orderbook_aggregator
16+
//! ```
17+
18+
use std::time::Duration;
19+
use tokio::time::timeout;
20+
21+
use kalshi_trade_rs::{
22+
GetMarketsParams, KalshiClient, MarketFilterStatus, OrderbookAggregator,
23+
auth::KalshiConfig,
24+
ws::{Channel, KalshiStreamClient},
25+
};
26+
27+
#[tokio::main]
28+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
29+
// Load .env file
30+
dotenvy::dotenv().ok();
31+
32+
// Initialize tracing for logs
33+
tracing_subscriber::fmt()
34+
.with_env_filter(
35+
tracing_subscriber::EnvFilter::from_default_env()
36+
.add_directive("kalshi_trade_rs=debug".parse()?),
37+
)
38+
.init();
39+
40+
// Load configuration from environment
41+
let config = KalshiConfig::from_env()?;
42+
43+
println!(
44+
"Connecting to Kalshi {:?} environment...",
45+
config.environment
46+
);
47+
48+
// First, fetch open markets via REST API and select the most active ones
49+
println!("Fetching open markets...");
50+
51+
let rest_client = KalshiClient::new(config.clone())?;
52+
53+
let params = GetMarketsParams::new()
54+
.status(MarketFilterStatus::Open)
55+
.limit(200);
56+
57+
let markets_response = rest_client.get_markets_with_params(params).await?;
58+
59+
// Sort by volume (descending) and take the top 3 most active markets
60+
let mut markets = markets_response.markets;
61+
62+
if markets.is_empty() {
63+
println!("No open markets found!");
64+
return Ok(());
65+
}
66+
67+
markets.sort_by(|a, b| b.volume.unwrap_or(0).cmp(&a.volume.unwrap_or(0)));
68+
69+
let selected_markets: Vec<_> = markets.into_iter().take(3).collect();
70+
71+
println!(
72+
"Selected {} markets to subscribe to:",
73+
selected_markets.len()
74+
);
75+
76+
for market in &selected_markets {
77+
println!(
78+
" - {} (vol: {})",
79+
market.ticker,
80+
market.volume.unwrap_or(0)
81+
);
82+
}
83+
84+
let market_tickers: Vec<String> = selected_markets.iter().map(|m| m.ticker.clone()).collect();
85+
86+
// Connect to WebSocket
87+
let client = KalshiStreamClient::connect(&config).await?;
88+
let handle = client.handle();
89+
90+
println!("\nConnected! Subscribing to orderbook updates...");
91+
92+
// Subscribe to orderbook delta channel
93+
let ticker_refs: Vec<&str> = market_tickers.iter().map(|s| s.as_str()).collect();
94+
95+
let mut sub_handle = handle.clone();
96+
sub_handle
97+
.subscribe(Channel::OrderbookDelta, &ticker_refs)
98+
.await?;
99+
100+
println!(
101+
"Subscribed to {} markets",
102+
sub_handle.markets(Channel::OrderbookDelta).len()
103+
);
104+
105+
// Create orderbook aggregator
106+
let aggregator = OrderbookAggregator::new();
107+
108+
// Get update receiver for push-based updates (before spawning processor)
109+
let mut update_receiver = aggregator.update_receiver();
110+
let mut gap_receiver = aggregator.gap_receiver();
111+
112+
// Spawn aggregator processor with cloned handle
113+
let agg_clone = aggregator.clone();
114+
let process_handle = handle.clone();
115+
tokio::spawn(async move {
116+
agg_clone.process_updates(process_handle).await;
117+
});
118+
119+
// Spawn gap monitor
120+
tokio::spawn(async move {
121+
while let Ok(gap) = gap_receiver.recv().await {
122+
println!(
123+
"[GAP] {:?}: expected seq {}, got {}",
124+
gap.ticker.as_deref().unwrap_or("global"),
125+
gap.expected,
126+
gap.received
127+
);
128+
}
129+
});
130+
131+
println!("Waiting for updates (Ctrl+C to exit)...\n");
132+
133+
// Demonstrate both pull and push patterns
134+
let deadline = Duration::from_secs(60);
135+
let start = std::time::Instant::now();
136+
let mut last_summary_time = std::time::Instant::now();
137+
138+
loop {
139+
if start.elapsed() > deadline {
140+
println!("\nReached time limit, shutting down...");
141+
break;
142+
}
143+
144+
// Push-based: React to orderbook changes
145+
match timeout(Duration::from_millis(500), update_receiver.recv()).await {
146+
Ok(Ok(update)) => {
147+
if let Some(delta) = &update.delta {
148+
println!(
149+
"[DELTA] {} | {:?} @ {}¢: {:+} -> {} | spread: {:?}¢ | mid: {:.1}¢",
150+
update.ticker,
151+
delta.side,
152+
delta.price,
153+
delta.quantity_change,
154+
delta.new_quantity,
155+
update.summary.spread,
156+
update.summary.midpoint.unwrap_or(0.0)
157+
);
158+
} else {
159+
// Snapshot received
160+
println!(
161+
"[SNAPSHOT] {} | bid: {:?} | ask: {:?} | spread: {:?}¢",
162+
update.ticker,
163+
update.summary.best_bid,
164+
update.summary.best_ask,
165+
update.summary.spread,
166+
);
167+
}
168+
}
169+
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
170+
println!("[WARN] Dropped {} orderbook updates (slow consumer)", n);
171+
}
172+
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
173+
println!("[ERROR] Update channel closed");
174+
break;
175+
}
176+
Err(_) => {
177+
// Timeout - no updates in 500ms
178+
}
179+
}
180+
181+
// Pull-based: Print summary every 5 seconds
182+
if last_summary_time.elapsed() > Duration::from_secs(5) {
183+
last_summary_time = std::time::Instant::now();
184+
println!("\n--- Orderbook Summary ---");
185+
for ticker in &market_tickers {
186+
if let Some(summary) = aggregator.summary(ticker) {
187+
if summary.initialized {
188+
println!(
189+
"{}: bid={:?} ask={:?} spread={:?}¢ mid={:.1}¢ yes_liq={} no_liq={}",
190+
ticker,
191+
summary.best_bid,
192+
summary.best_ask,
193+
summary.spread,
194+
summary.midpoint.unwrap_or(0.0),
195+
summary.total_yes_liquidity,
196+
summary.total_no_liquidity
197+
);
198+
} else {
199+
println!("{}: waiting for snapshot...", ticker);
200+
}
201+
} else {
202+
println!("{}: not tracked", ticker);
203+
}
204+
}
205+
println!("-------------------------\n");
206+
}
207+
}
208+
209+
// Unsubscribe and shut down
210+
println!("Unsubscribing...");
211+
sub_handle.unsubscribe_all(Channel::OrderbookDelta).await?;
212+
213+
println!("Shutting down...");
214+
client.shutdown().await?;
215+
216+
println!("Done!");
217+
Ok(())
218+
}

src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub mod batch;
4444
pub mod client;
4545
pub mod error;
4646
pub mod models;
47+
pub mod orderbook;
4748
pub mod ws;
4849

4950
// Re-export commonly used types at the crate root
@@ -95,3 +96,8 @@ pub use batch::{
9596
AggregatedCancelResponse, AggregatedCreateResponse, BatchManager, BatchManagerBuilder,
9697
BatchOperationResult, RateLimitTier, RetryConfig,
9798
};
99+
100+
// Re-export orderbook aggregation types
101+
pub use orderbook::{
102+
OrderbookAggregator, OrderbookDelta, OrderbookSummary, OrderbookUpdate, SequenceGap,
103+
};

0 commit comments

Comments
 (0)