Skip to content

Commit 33b87c6

Browse files
authored
Merge pull request #23 from jaredmcqueen/main
removed Ticker, Trade, and MarketLifeCycle ticker constraints for str…
2 parents 2893e92 + e2d39af commit 33b87c6

File tree

7 files changed

+230
-29
lines changed

7 files changed

+230
-29
lines changed

examples/orderbook_aggregator.rs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,44 @@ use std::time::Duration;
1919
use tokio::time::timeout;
2020

2121
use kalshi_trade_rs::{
22-
GetMarketsParams, KalshiClient, MarketFilterStatus, OrderbookAggregator,
22+
GetMarketsParams, KalshiClient, MarketFilterStatus, OrderbookAggregator, OrderbookLadder,
2323
auth::KalshiConfig,
2424
ws::{Channel, KalshiStreamClient},
2525
};
2626

27+
fn print_ladder(ladder: &OrderbookLadder) {
28+
println!("\n=== {} ===", ladder.ticker);
29+
println!("{:<6} {:>8} {:<8}", "PRICE", "YES QTY", "NO QTY");
30+
println!("{}", "-".repeat(26));
31+
32+
// Collect all price points from both sides
33+
let mut prices: Vec<i64> = ladder
34+
.yes_levels
35+
.keys()
36+
.chain(ladder.no_levels.keys())
37+
.copied()
38+
.collect();
39+
prices.sort_unstable();
40+
prices.dedup();
41+
42+
for price in prices.iter().rev() {
43+
let yes_qty = ladder.yes_levels.get(price).copied().unwrap_or(0);
44+
let no_qty = ladder.no_levels.get(price).copied().unwrap_or(0);
45+
let yes_str = if yes_qty > 0 {
46+
yes_qty.to_string()
47+
} else {
48+
"-".to_string()
49+
};
50+
let no_str = if no_qty > 0 {
51+
no_qty.to_string()
52+
} else {
53+
"-".to_string()
54+
};
55+
println!("{:<6} {:>8} {:<8}", price, yes_str, no_str);
56+
}
57+
println!();
58+
}
59+
2760
#[tokio::main]
2861
async fn main() -> Result<(), Box<dyn std::error::Error>> {
2962
// Load .env file
@@ -178,31 +211,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
178211
}
179212
}
180213

181-
// Pull-based: Print summary every 5 seconds
214+
// Pull-based: Print full orderbook ladder every 5 seconds
182215
if last_summary_time.elapsed() > Duration::from_secs(5) {
183216
last_summary_time = std::time::Instant::now();
184-
println!("\n--- Orderbook Summary ---");
185217
for ticker in &market_tickers {
186-
if let Some(summary) = aggregator.summary(ticker) {
187-
if summary.initialized {
188-
println!(
189-
"{}: bid={:?} ask={:?} spread={:?}¢ mid={:.1}¢ yes_liq={} no_liq={}",
190-
ticker,
191-
summary.best_bid,
192-
summary.best_ask,
193-
summary.spread,
194-
summary.midpoint.unwrap_or(0.0),
195-
summary.total_yes_liquidity,
196-
summary.total_no_liquidity
197-
);
198-
} else {
199-
println!("{}: waiting for snapshot...", ticker);
218+
match aggregator.full_book(ticker) {
219+
Some(ladder) if aggregator.is_initialized(ticker) => {
220+
print_ladder(&ladder);
200221
}
201-
} else {
202-
println!("{}: not tracked", ticker);
222+
Some(_) => println!("{}: waiting for snapshot...", ticker),
223+
None => println!("{}: not tracked", ticker),
203224
}
204225
}
205-
println!("-------------------------\n");
206226
}
207227
}
208228

examples/stream_firehose.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
//! Example: Stream ticker data from Kalshi WebSocket API.
2+
//!
3+
//! This example demonstrates how to connect to the Kalshi WebSocket API
4+
//! and subscribe to all Ticker, Trade, MarketLifecycle, and Multivariate markets
5+
//!
6+
//! # Usage
7+
//!
8+
//! Set the following environment variables:
9+
//! - `KALSHI_API_KEY_ID`: Your Kalshi API key ID
10+
//! - `KALSHI_PRIVATE_KEY_PATH`: Path to your RSA private key PEM file
11+
//! - `KALSHI_ENV`: "demo" or "prod" (defaults to "demo")
12+
//!
13+
//! Then run:
14+
//! ```bash
15+
//! cargo run --example stream_firehose
16+
//! ```
17+
18+
use std::time::Duration;
19+
use kalshi_trade_rs::{Channel, KalshiConfig, KalshiStreamClient, StreamMessage};
20+
use tokio::time::timeout;
21+
22+
#[tokio::main]
23+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
24+
// Load .env file
25+
dotenvy::dotenv().ok();
26+
27+
// Initialize tracing for logs
28+
tracing_subscriber::fmt()
29+
.with_env_filter(
30+
tracing_subscriber::EnvFilter::from_default_env()
31+
.add_directive("kalshi_trade_rs=debug".parse()?),
32+
)
33+
.init();
34+
35+
// Load configuration from environment
36+
let config = KalshiConfig::from_env()?;
37+
38+
println!(
39+
"Connecting to Kalshi {:?} environment...",
40+
config.environment
41+
);
42+
43+
// Connect to WebSocket
44+
let client = KalshiStreamClient::connect(&config).await?;
45+
let mut handle = client.handle();
46+
47+
println!("\nConnected! Subscribing to ticker updates...");
48+
49+
handle.subscribe(Channel::Ticker, &[]).await?;
50+
handle.subscribe(Channel::Trade, &[]).await?;
51+
handle.subscribe(Channel::MarketLifecycle, &[]).await?;
52+
handle.subscribe(Channel::Multivariate, &[]).await?;
53+
54+
println!("Subscribed to all Ticker, Trade, MarketLifecycle, and Multivariate markets",);
55+
56+
println!("Waiting for updates (Ctrl+C to exit)...\n");
57+
58+
// Process updates for 60 seconds or until interrupted
59+
let deadline = Duration::from_secs(60);
60+
let start = std::time::Instant::now();
61+
62+
loop {
63+
if start.elapsed() > deadline {
64+
println!("\nReached time limit, shutting down...");
65+
break;
66+
}
67+
68+
match timeout(Duration::from_secs(5), handle.update_receiver.recv()).await {
69+
Ok(Ok(update)) => match &update.msg {
70+
StreamMessage::Closed { reason } => {
71+
println!("[CLOSED] {}", reason);
72+
break;
73+
}
74+
StreamMessage::ConnectionLost { reason, .. } => {
75+
println!("[CONNECTION LOST] {}", reason);
76+
break;
77+
}
78+
StreamMessage::Ticker(ticker) => {
79+
println!(
80+
"[TICKER] {} | price: {}¢ | bid: {}¢ | ask: {}¢ | vol: {}",
81+
ticker.market_ticker,
82+
ticker.price,
83+
ticker.yes_bid,
84+
ticker.yes_ask,
85+
ticker.volume
86+
);
87+
}
88+
StreamMessage::Trade(trade) => {
89+
println!(
90+
"[TRADE] {} | {} contracts @ {}¢ | taker: {:?}",
91+
trade.market_ticker, trade.count, trade.yes_price, trade.taker_side
92+
);
93+
},
94+
StreamMessage::MarketLifecycle(lifecycle) => {
95+
println!(
96+
"[LIFECYCLE] {} | event={:?} | open={:?} | close={:?} | result={:?}",
97+
lifecycle.market_ticker,
98+
lifecycle.event_type,
99+
lifecycle.open_ts,
100+
lifecycle.close_ts,
101+
lifecycle.result
102+
);
103+
if let Some(meta) = &lifecycle.additional_metadata {
104+
println!(" name={:?} | title={:?}", meta.name, meta.title);
105+
}
106+
}
107+
StreamMessage::MultivariateLookup(mv) => {
108+
println!(
109+
"[MULTIVARIATE] collection={} | event={} | market={}",
110+
mv.collection_ticker, mv.event_ticker, mv.market_ticker
111+
);
112+
for leg in &mv.selected_markets {
113+
println!(
114+
" leg: market={:?} | side={:?}",
115+
leg.market_ticker, leg.side
116+
);
117+
}
118+
},
119+
StreamMessage::MarketPosition(pos) => {
120+
println!(
121+
"[POSITION] {} | position={:?} | cost={:?} | pnl={:?}",
122+
pos.market_ticker.as_deref().unwrap_or("?"),
123+
pos.position,
124+
pos.position_cost,
125+
pos.realized_pnl
126+
);
127+
}
128+
_ => {
129+
println!("[OTHER] {:?}", update.msg);
130+
}
131+
},
132+
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
133+
println!("[WARN] Dropped {} messages (slow consumer)", n);
134+
}
135+
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
136+
println!("[ERROR] Channel closed");
137+
break;
138+
}
139+
Err(_) => {
140+
// Timeout - no messages in 5 seconds
141+
println!("[INFO] No updates in last 5 seconds...");
142+
}
143+
}
144+
}
145+
146+
// Unsubscribe and shut down
147+
println!("Unsubscribing...");
148+
handle.unsubscribe_all(Channel::Ticker).await?;
149+
handle.unsubscribe_all(Channel::Trade).await?;
150+
handle.unsubscribe_all(Channel::MarketLifecycle).await?;
151+
handle.unsubscribe_all(Channel::Multivariate).await?;
152+
153+
println!("Shutting down...");
154+
client.shutdown().await?;
155+
156+
println!("Done!");
157+
Ok(())
158+
}

src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,5 +99,6 @@ pub use batch::{
9999

100100
// Re-export orderbook aggregation types
101101
pub use orderbook::{
102-
OrderbookAggregator, OrderbookDelta, OrderbookSummary, OrderbookUpdate, SequenceGap,
102+
OrderbookAggregator, OrderbookDelta, OrderbookLadder, OrderbookSummary, OrderbookUpdate,
103+
SequenceGap,
103104
};

src/orderbook/aggregator.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Orderbook aggregator for maintaining live orderbook state.
22
3-
use std::collections::HashMap;
3+
use std::collections::{BTreeMap, HashMap};
44
use std::sync::{Arc, RwLock};
55

66
use tokio::sync::broadcast;
@@ -70,6 +70,17 @@ pub struct SequenceGap {
7070
pub received: i64,
7171
}
7272

73+
/// The full orderbook ladder for a market.
74+
#[derive(Debug, Clone)]
75+
pub struct OrderbookLadder {
76+
/// Market ticker.
77+
pub ticker: String,
78+
/// YES side price levels: price_cents -> quantity, sorted ascending.
79+
pub yes_levels: BTreeMap<i64, i64>,
80+
/// NO side price levels: price_cents -> quantity, sorted ascending.
81+
pub no_levels: BTreeMap<i64, i64>,
82+
}
83+
7384
/// Default channel capacity for update broadcasts.
7485
const DEFAULT_UPDATE_CAPACITY: usize = 1024;
7586

@@ -358,6 +369,21 @@ impl OrderbookAggregator {
358369
.unwrap_or(0)
359370
}
360371

372+
/// Get the full orderbook ladder for a market.
373+
///
374+
/// Returns all YES and NO price levels with their quantities.
375+
/// Returns `None` if the market is not being tracked.
376+
pub fn full_book(&self, ticker: &str) -> Option<OrderbookLadder> {
377+
let state = self.state.read().expect("state lock poisoned");
378+
let orderbook = state.get(ticker)?;
379+
380+
Some(OrderbookLadder {
381+
ticker: ticker.to_string(),
382+
yes_levels: orderbook.yes_levels().clone(),
383+
no_levels: orderbook.no_levels().clone(),
384+
})
385+
}
386+
361387
/// Get the list of tracked markets.
362388
pub fn tracked_markets(&self) -> Vec<String> {
363389
let state = self.state.read().expect("state lock poisoned");

src/orderbook/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,6 @@ mod aggregator;
7676
mod state;
7777

7878
pub use aggregator::{
79-
OrderbookAggregator, OrderbookDelta, OrderbookSummary, OrderbookUpdate, SequenceGap,
79+
OrderbookAggregator, OrderbookDelta, OrderbookLadder, OrderbookSummary, OrderbookUpdate,
80+
SequenceGap,
8081
};

src/orderbook/state.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,11 @@ impl OrderbookState {
175175
}
176176

177177
/// Get all YES levels (price -> quantity).
178-
#[allow(dead_code)]
179178
pub fn yes_levels(&self) -> &BTreeMap<i64, i64> {
180179
&self.yes_levels
181180
}
182181

183182
/// Get all NO levels (price -> quantity).
184-
#[allow(dead_code)]
185183
pub fn no_levels(&self) -> &BTreeMap<i64, i64> {
186184
&self.no_levels
187185
}

src/ws/channel.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,7 @@ impl Channel {
4040
/// require market tickers. Market data channels like `OrderbookDelta`,
4141
/// `Ticker`, and `Trade` require at least one market ticker.
4242
pub fn requires_market_ticker(&self) -> bool {
43-
matches!(
44-
self,
45-
Self::OrderbookDelta | Self::Ticker | Self::Trade | Self::MarketLifecycle
46-
)
43+
matches!(self, Self::OrderbookDelta)
4744
}
4845

4946
/// Returns the wire format name for this channel.

0 commit comments

Comments
 (0)