Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,26 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Fixed

- `OrderbookAggregator` now drops delta messages that arrive before a snapshot
instead of creating ghost orderbook entries. Previously, early deltas would
insert empty uninitialized books into the state map, causing `full_book()` and
`tracked_markets()` to return stale or invalid data.

### Changed

- **Breaking:** Removed `initialized` field from `OrderbookSummary`. The
aggregator now guarantees that summaries are only produced for fully
initialized orderbooks, making the field redundant. Remove any
`summary.initialized` checks from your code.
- `Channel::requires_market_ticker()` now returns `true` only for
`OrderbookDelta`. Other market data channels (`Ticker`, `Trade`,
`MarketLifecycle`, `Multivariate`) support subscribing with an empty ticker
list to receive updates for all markets.

## [0.2.0] - 2026-01-18

### Added
Expand Down
7 changes: 2 additions & 5 deletions examples/orderbook_aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
last_summary_time = std::time::Instant::now();
for ticker in &market_tickers {
match aggregator.full_book(ticker) {
Some(ladder) if aggregator.is_initialized(ticker) => {
print_ladder(&ladder);
}
Some(_) => println!("{}: waiting for snapshot...", ticker),
None => println!("{}: not tracked", ticker),
Some(ladder) => print_ladder(&ladder),
None => println!("{}: waiting for snapshot...", ticker),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions examples/stream_firehose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
//! cargo run --example stream_firehose
//! ```

use std::time::Duration;
use kalshi_trade_rs::{Channel, KalshiConfig, KalshiStreamClient, StreamMessage};
use std::time::Duration;
use tokio::time::timeout;

#[tokio::main]
Expand Down Expand Up @@ -90,7 +90,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"[TRADE] {} | {} contracts @ {}¢ | taker: {:?}",
trade.market_ticker, trade.count, trade.yes_price, trade.taker_side
);
},
}
StreamMessage::MarketLifecycle(lifecycle) => {
println!(
"[LIFECYCLE] {} | event={:?} | open={:?} | close={:?} | result={:?}",
Expand All @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
leg.market_ticker, leg.side
);
}
},
}
StreamMessage::MarketPosition(pos) => {
println!(
"[POSITION] {} | position={:?} | cost={:?} | pnl={:?}",
Expand Down
92 changes: 81 additions & 11 deletions src/orderbook/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub struct OrderbookSummary {
pub total_yes_liquidity: i64,
/// Total NO side liquidity.
pub total_no_liquidity: i64,
/// Whether the orderbook has been initialized with a snapshot.
pub initialized: bool,
}

/// What changed in an orderbook update.
Expand Down Expand Up @@ -256,9 +254,14 @@ impl OrderbookAggregator {
fn handle_delta(&self, delta: &crate::ws::OrderbookDeltaData, seq: Option<i64>) {
let ticker = delta.market_ticker.clone();

let (new_qty, should_emit) = {
let new_qty = {
let mut state = self.state.write().expect("state lock poisoned");
let orderbook = state.entry(ticker.clone()).or_default();
let Some(orderbook) = state.get_mut(&ticker) else {
return;
};
if !orderbook.is_initialized() {
return;
}

// Check sequence gap for this specific market
if let (Some(last), Some(new)) = (orderbook.last_seq(), seq)
Expand All @@ -273,13 +276,10 @@ impl OrderbookAggregator {
}

orderbook.update_seq(seq);
let new_qty = orderbook.apply_delta(delta);

(new_qty, orderbook.is_initialized())
orderbook.apply_delta(delta)
};

// Only emit updates if we've seen a snapshot
if should_emit && let Some(summary) = self.summary(&ticker) {
if let Some(summary) = self.summary(&ticker) {
let _ = self.update_sender.send(OrderbookUpdate {
ticker,
summary,
Expand Down Expand Up @@ -328,7 +328,6 @@ impl OrderbookAggregator {
midpoint: orderbook.midpoint(),
total_yes_liquidity: orderbook.total_yes_liquidity(),
total_no_liquidity: orderbook.total_no_liquidity(),
initialized: orderbook.is_initialized(),
})
}

Expand Down Expand Up @@ -553,7 +552,6 @@ mod tests {
assert_eq!(summary.midpoint, Some(46.0));
assert_eq!(summary.total_yes_liquidity, 300);
assert_eq!(summary.total_no_liquidity, 150);
assert!(summary.initialized);
}

#[test]
Expand Down Expand Up @@ -588,6 +586,78 @@ mod tests {
assert_eq!(agg.depth_at_price("UNKNOWN", Side::Yes, 45), 0); // Unknown market
}

#[test]
fn test_full_book() {
let agg = OrderbookAggregator::new();

assert!(agg.full_book("UNKNOWN").is_none());

// Snapshot creates initial book
let snapshot = OrderbookSnapshotData {
market_ticker: "TEST".to_string(),
yes: Some(vec![[45, 100], [44, 200]]),
yes_dollars: None,
no: Some(vec![[55, 150]]),
no_dollars: None,
};
agg.handle_snapshot(&snapshot);

// Apply deltas: add a new level, remove an existing one
agg.handle_delta(
&OrderbookDeltaData {
market_ticker: "TEST".to_string(),
price: 46,
delta: 75,
side: Side::Yes,
price_dollars: None,
client_order_id: None,
},
Some(1),
);
agg.handle_delta(
&OrderbookDeltaData {
market_ticker: "TEST".to_string(),
price: 44,
delta: -200,
side: Side::Yes,
price_dollars: None,
client_order_id: None,
},
Some(2),
);

let ladder = agg.full_book("TEST").unwrap();
// Level 46 was added, level 44 was removed
assert_eq!(ladder.yes_levels.len(), 2);
assert_eq!(ladder.yes_levels[&46], 75);
assert_eq!(ladder.yes_levels[&45], 100);
assert!(!ladder.yes_levels.contains_key(&44));
// NO side unchanged
assert_eq!(ladder.no_levels[&55], 150);
}

#[test]
fn test_delta_before_snapshot_ignored() {
let agg = OrderbookAggregator::new();

// Deltas before snapshot: no entry exists, delta is dropped
agg.handle_delta(
&OrderbookDeltaData {
market_ticker: "TEST".to_string(),
price: 45,
delta: 100,
side: Side::Yes,
price_dollars: None,
client_order_id: None,
},
Some(1),
);

// No entry was created — full_book returns None
assert!(agg.full_book("TEST").is_none());
assert!(agg.tracked_markets().is_empty());
}

#[test]
fn test_clone_shares_state() {
let agg1 = OrderbookAggregator::new();
Expand Down
8 changes: 3 additions & 5 deletions src/orderbook/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@
//! // Query orderbook state
//! loop {
//! if let Some(summary) = aggregator.summary("TICKER-1") {
//! if summary.initialized {
//! println!("Spread: {:?} cents", summary.spread);
//! println!("Best bid: {:?}", summary.best_bid);
//! println!("Best ask: {:?}", summary.best_ask);
//! }
//! println!("Spread: {:?} cents", summary.spread);
//! println!("Best bid: {:?}", summary.best_bid);
//! println!("Best ask: {:?}", summary.best_ask);
//! }
//! tokio::time::sleep(Duration::from_millis(100)).await;
//! }
Expand Down
8 changes: 5 additions & 3 deletions src/ws/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ impl Channel {

/// Returns true if this channel requires at least one market ticker.
///
/// Channels like `Fill` and `Communications` are user-scoped and don't
/// require market tickers. Market data channels like `OrderbookDelta`,
/// `Ticker`, and `Trade` require at least one market ticker.
/// Only [`OrderbookDelta`](Self::OrderbookDelta) requires market tickers.
/// Other market data channels (`Ticker`, `Trade`, `MarketLifecycle`,
/// `Multivariate`) support subscribing with an empty ticker list to
/// receive updates for all markets. User-scoped channels (`Fill`,
/// `MarketPositions`, `Communications`) never require tickers.
pub fn requires_market_ticker(&self) -> bool {
matches!(self, Self::OrderbookDelta)
}
Expand Down