Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 40 additions & 20 deletions examples/orderbook_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,44 @@ use std::time::Duration;
use tokio::time::timeout;

use kalshi_trade_rs::{
GetMarketsParams, KalshiClient, MarketFilterStatus, OrderbookAggregator,
GetMarketsParams, KalshiClient, MarketFilterStatus, OrderbookAggregator, OrderbookLadder,
auth::KalshiConfig,
ws::{Channel, KalshiStreamClient},
};

fn print_ladder(ladder: &OrderbookLadder) {
println!("\n=== {} ===", ladder.ticker);
println!("{:<6} {:>8} {:<8}", "PRICE", "YES QTY", "NO QTY");
println!("{}", "-".repeat(26));

// Collect all price points from both sides
let mut prices: Vec<i64> = ladder
.yes_levels
.keys()
.chain(ladder.no_levels.keys())
.copied()
.collect();
prices.sort_unstable();
prices.dedup();

for price in prices.iter().rev() {
let yes_qty = ladder.yes_levels.get(price).copied().unwrap_or(0);
let no_qty = ladder.no_levels.get(price).copied().unwrap_or(0);
let yes_str = if yes_qty > 0 {
yes_qty.to_string()
} else {
"-".to_string()
};
let no_str = if no_qty > 0 {
no_qty.to_string()
} else {
"-".to_string()
};
println!("{:<6} {:>8} {:<8}", price, yes_str, no_str);
}
println!();
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load .env file
Expand Down Expand Up @@ -178,31 +211,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}

// Pull-based: Print summary every 5 seconds
// Pull-based: Print full orderbook ladder every 5 seconds
if last_summary_time.elapsed() > Duration::from_secs(5) {
last_summary_time = std::time::Instant::now();
println!("\n--- Orderbook Summary ---");
for ticker in &market_tickers {
if let Some(summary) = aggregator.summary(ticker) {
if summary.initialized {
println!(
"{}: bid={:?} ask={:?} spread={:?}¢ mid={:.1}¢ yes_liq={} no_liq={}",
ticker,
summary.best_bid,
summary.best_ask,
summary.spread,
summary.midpoint.unwrap_or(0.0),
summary.total_yes_liquidity,
summary.total_no_liquidity
);
} else {
println!("{}: waiting for snapshot...", ticker);
match aggregator.full_book(ticker) {
Some(ladder) if aggregator.is_initialized(ticker) => {
print_ladder(&ladder);
}
} else {
println!("{}: not tracked", ticker);
Some(_) => println!("{}: waiting for snapshot...", ticker),
None => println!("{}: not tracked", ticker),
}
}
println!("-------------------------\n");
}
}

Expand Down
158 changes: 158 additions & 0 deletions examples/stream_firehose.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//! Example: Stream ticker data from Kalshi WebSocket API.
//!
//! This example demonstrates how to connect to the Kalshi WebSocket API
//! and subscribe to all Ticker, Trade, MarketLifecycle, and Multivariate markets
//!
//! # Usage
//!
//! Set the following environment variables:
//! - `KALSHI_API_KEY_ID`: Your Kalshi API key ID
//! - `KALSHI_PRIVATE_KEY_PATH`: Path to your RSA private key PEM file
//! - `KALSHI_ENV`: "demo" or "prod" (defaults to "demo")
//!
//! Then run:
//! ```bash
//! cargo run --example stream_firehose
//! ```

use std::time::Duration;
use kalshi_trade_rs::{Channel, KalshiConfig, KalshiStreamClient, StreamMessage};
use tokio::time::timeout;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Load .env file
dotenvy::dotenv().ok();

// Initialize tracing for logs
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("kalshi_trade_rs=debug".parse()?),
)
.init();

// Load configuration from environment
let config = KalshiConfig::from_env()?;

println!(
"Connecting to Kalshi {:?} environment...",
config.environment
);

// Connect to WebSocket
let client = KalshiStreamClient::connect(&config).await?;
let mut handle = client.handle();

println!("\nConnected! Subscribing to ticker updates...");

handle.subscribe(Channel::Ticker, &[]).await?;
handle.subscribe(Channel::Trade, &[]).await?;
handle.subscribe(Channel::MarketLifecycle, &[]).await?;
handle.subscribe(Channel::Multivariate, &[]).await?;

println!("Subscribed to all Ticker, Trade, MarketLifecycle, and Multivariate markets",);

println!("Waiting for updates (Ctrl+C to exit)...\n");

// Process updates for 60 seconds or until interrupted
let deadline = Duration::from_secs(60);
let start = std::time::Instant::now();

loop {
if start.elapsed() > deadline {
println!("\nReached time limit, shutting down...");
break;
}

match timeout(Duration::from_secs(5), handle.update_receiver.recv()).await {
Ok(Ok(update)) => match &update.msg {
StreamMessage::Closed { reason } => {
println!("[CLOSED] {}", reason);
break;
}
StreamMessage::ConnectionLost { reason, .. } => {
println!("[CONNECTION LOST] {}", reason);
break;
}
StreamMessage::Ticker(ticker) => {
println!(
"[TICKER] {} | price: {}¢ | bid: {}¢ | ask: {}¢ | vol: {}",
ticker.market_ticker,
ticker.price,
ticker.yes_bid,
ticker.yes_ask,
ticker.volume
);
}
StreamMessage::Trade(trade) => {
println!(
"[TRADE] {} | {} contracts @ {}¢ | taker: {:?}",
trade.market_ticker, trade.count, trade.yes_price, trade.taker_side
);
},
StreamMessage::MarketLifecycle(lifecycle) => {
println!(
"[LIFECYCLE] {} | event={:?} | open={:?} | close={:?} | result={:?}",
lifecycle.market_ticker,
lifecycle.event_type,
lifecycle.open_ts,
lifecycle.close_ts,
lifecycle.result
);
if let Some(meta) = &lifecycle.additional_metadata {
println!(" name={:?} | title={:?}", meta.name, meta.title);
}
}
StreamMessage::MultivariateLookup(mv) => {
println!(
"[MULTIVARIATE] collection={} | event={} | market={}",
mv.collection_ticker, mv.event_ticker, mv.market_ticker
);
for leg in &mv.selected_markets {
println!(
" leg: market={:?} | side={:?}",
leg.market_ticker, leg.side
);
}
},
StreamMessage::MarketPosition(pos) => {
println!(
"[POSITION] {} | position={:?} | cost={:?} | pnl={:?}",
pos.market_ticker.as_deref().unwrap_or("?"),
pos.position,
pos.position_cost,
pos.realized_pnl
);
}
_ => {
println!("[OTHER] {:?}", update.msg);
}
},
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
println!("[WARN] Dropped {} messages (slow consumer)", n);
}
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
println!("[ERROR] Channel closed");
break;
}
Err(_) => {
// Timeout - no messages in 5 seconds
println!("[INFO] No updates in last 5 seconds...");
}
}
}

// Unsubscribe and shut down
println!("Unsubscribing...");
handle.unsubscribe_all(Channel::Ticker).await?;
handle.unsubscribe_all(Channel::Trade).await?;
handle.unsubscribe_all(Channel::MarketLifecycle).await?;
handle.unsubscribe_all(Channel::Multivariate).await?;

println!("Shutting down...");
client.shutdown().await?;

println!("Done!");
Ok(())
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,5 +99,6 @@ pub use batch::{

// Re-export orderbook aggregation types
pub use orderbook::{
OrderbookAggregator, OrderbookDelta, OrderbookSummary, OrderbookUpdate, SequenceGap,
OrderbookAggregator, OrderbookDelta, OrderbookLadder, OrderbookSummary, OrderbookUpdate,
SequenceGap,
};
28 changes: 27 additions & 1 deletion src/orderbook/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Orderbook aggregator for maintaining live orderbook state.

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::{Arc, RwLock};

use tokio::sync::broadcast;
Expand Down Expand Up @@ -70,6 +70,17 @@ pub struct SequenceGap {
pub received: i64,
}

/// The full orderbook ladder for a market.
#[derive(Debug, Clone)]
pub struct OrderbookLadder {
/// Market ticker.
pub ticker: String,
/// YES side price levels: price_cents -> quantity, sorted ascending.
pub yes_levels: BTreeMap<i64, i64>,
/// NO side price levels: price_cents -> quantity, sorted ascending.
pub no_levels: BTreeMap<i64, i64>,
}

/// Default channel capacity for update broadcasts.
const DEFAULT_UPDATE_CAPACITY: usize = 1024;

Expand Down Expand Up @@ -358,6 +369,21 @@ impl OrderbookAggregator {
.unwrap_or(0)
}

/// Get the full orderbook ladder for a market.
///
/// Returns all YES and NO price levels with their quantities.
/// Returns `None` if the market is not being tracked.
pub fn full_book(&self, ticker: &str) -> Option<OrderbookLadder> {
let state = self.state.read().expect("state lock poisoned");
let orderbook = state.get(ticker)?;

Some(OrderbookLadder {
ticker: ticker.to_string(),
yes_levels: orderbook.yes_levels().clone(),
no_levels: orderbook.no_levels().clone(),
})
}

/// Get the list of tracked markets.
pub fn tracked_markets(&self) -> Vec<String> {
let state = self.state.read().expect("state lock poisoned");
Expand Down
3 changes: 2 additions & 1 deletion src/orderbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ mod aggregator;
mod state;

pub use aggregator::{
OrderbookAggregator, OrderbookDelta, OrderbookSummary, OrderbookUpdate, SequenceGap,
OrderbookAggregator, OrderbookDelta, OrderbookLadder, OrderbookSummary, OrderbookUpdate,
SequenceGap,
};
2 changes: 0 additions & 2 deletions src/orderbook/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,11 @@ impl OrderbookState {
}

/// Get all YES levels (price -> quantity).
#[allow(dead_code)]
pub fn yes_levels(&self) -> &BTreeMap<i64, i64> {
&self.yes_levels
}

/// Get all NO levels (price -> quantity).
#[allow(dead_code)]
pub fn no_levels(&self) -> &BTreeMap<i64, i64> {
&self.no_levels
}
Expand Down
5 changes: 1 addition & 4 deletions src/ws/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ impl Channel {
/// require market tickers. Market data channels like `OrderbookDelta`,
/// `Ticker`, and `Trade` require at least one market ticker.
pub fn requires_market_ticker(&self) -> bool {
matches!(
self,
Self::OrderbookDelta | Self::Ticker | Self::Trade | Self::MarketLifecycle
)
matches!(self, Self::OrderbookDelta)
}

/// Returns the wire format name for this channel.
Expand Down
Loading