Skip to content

Commit d1c5cca

Browse files
authored
Merge pull request #25 from pbeets/fix/orderbook-delta-before-snapshot
fix: drop orderbook deltas before snapshot instead of creating ghost entries
2 parents 33b87c6 + 93511a8 commit d1c5cca

File tree

6 files changed

+114
-27
lines changed

6 files changed

+114
-27
lines changed

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,26 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
66
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Fixed
11+
12+
- `OrderbookAggregator` now drops delta messages that arrive before a snapshot
13+
instead of creating ghost orderbook entries. Previously, early deltas would
14+
insert empty uninitialized books into the state map, causing `full_book()` and
15+
`tracked_markets()` to return stale or invalid data.
16+
17+
### Changed
18+
19+
- **Breaking:** Removed `initialized` field from `OrderbookSummary`. The
20+
aggregator now guarantees that summaries are only produced for fully
21+
initialized orderbooks, making the field redundant. Remove any
22+
`summary.initialized` checks from your code.
23+
- `Channel::requires_market_ticker()` now returns `true` only for
24+
`OrderbookDelta`. Other market data channels (`Ticker`, `Trade`,
25+
`MarketLifecycle`, `Multivariate`) support subscribing with an empty ticker
26+
list to receive updates for all markets.
27+
828
## [0.2.0] - 2026-01-18
929

1030
### Added

examples/orderbook_aggregator.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
216216
last_summary_time = std::time::Instant::now();
217217
for ticker in &market_tickers {
218218
match aggregator.full_book(ticker) {
219-
Some(ladder) if aggregator.is_initialized(ticker) => {
220-
print_ladder(&ladder);
221-
}
222-
Some(_) => println!("{}: waiting for snapshot...", ticker),
223-
None => println!("{}: not tracked", ticker),
219+
Some(ladder) => print_ladder(&ladder),
220+
None => println!("{}: waiting for snapshot...", ticker),
224221
}
225222
}
226223
}

examples/stream_firehose.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
//! cargo run --example stream_firehose
1616
//! ```
1717
18-
use std::time::Duration;
1918
use kalshi_trade_rs::{Channel, KalshiConfig, KalshiStreamClient, StreamMessage};
19+
use std::time::Duration;
2020
use tokio::time::timeout;
2121

2222
#[tokio::main]
@@ -90,7 +90,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
9090
"[TRADE] {} | {} contracts @ {}¢ | taker: {:?}",
9191
trade.market_ticker, trade.count, trade.yes_price, trade.taker_side
9292
);
93-
},
93+
}
9494
StreamMessage::MarketLifecycle(lifecycle) => {
9595
println!(
9696
"[LIFECYCLE] {} | event={:?} | open={:?} | close={:?} | result={:?}",
@@ -115,7 +115,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
115115
leg.market_ticker, leg.side
116116
);
117117
}
118-
},
118+
}
119119
StreamMessage::MarketPosition(pos) => {
120120
println!(
121121
"[POSITION] {} | position={:?} | cost={:?} | pnl={:?}",

src/orderbook/aggregator.rs

Lines changed: 81 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@ pub struct OrderbookSummary {
2727
pub total_yes_liquidity: i64,
2828
/// Total NO side liquidity.
2929
pub total_no_liquidity: i64,
30-
/// Whether the orderbook has been initialized with a snapshot.
31-
pub initialized: bool,
3230
}
3331

3432
/// What changed in an orderbook update.
@@ -256,9 +254,14 @@ impl OrderbookAggregator {
256254
fn handle_delta(&self, delta: &crate::ws::OrderbookDeltaData, seq: Option<i64>) {
257255
let ticker = delta.market_ticker.clone();
258256

259-
let (new_qty, should_emit) = {
257+
let new_qty = {
260258
let mut state = self.state.write().expect("state lock poisoned");
261-
let orderbook = state.entry(ticker.clone()).or_default();
259+
let Some(orderbook) = state.get_mut(&ticker) else {
260+
return;
261+
};
262+
if !orderbook.is_initialized() {
263+
return;
264+
}
262265

263266
// Check sequence gap for this specific market
264267
if let (Some(last), Some(new)) = (orderbook.last_seq(), seq)
@@ -273,13 +276,10 @@ impl OrderbookAggregator {
273276
}
274277

275278
orderbook.update_seq(seq);
276-
let new_qty = orderbook.apply_delta(delta);
277-
278-
(new_qty, orderbook.is_initialized())
279+
orderbook.apply_delta(delta)
279280
};
280281

281-
// Only emit updates if we've seen a snapshot
282-
if should_emit && let Some(summary) = self.summary(&ticker) {
282+
if let Some(summary) = self.summary(&ticker) {
283283
let _ = self.update_sender.send(OrderbookUpdate {
284284
ticker,
285285
summary,
@@ -328,7 +328,6 @@ impl OrderbookAggregator {
328328
midpoint: orderbook.midpoint(),
329329
total_yes_liquidity: orderbook.total_yes_liquidity(),
330330
total_no_liquidity: orderbook.total_no_liquidity(),
331-
initialized: orderbook.is_initialized(),
332331
})
333332
}
334333

@@ -553,7 +552,6 @@ mod tests {
553552
assert_eq!(summary.midpoint, Some(46.0));
554553
assert_eq!(summary.total_yes_liquidity, 300);
555554
assert_eq!(summary.total_no_liquidity, 150);
556-
assert!(summary.initialized);
557555
}
558556

559557
#[test]
@@ -588,6 +586,78 @@ mod tests {
588586
assert_eq!(agg.depth_at_price("UNKNOWN", Side::Yes, 45), 0); // Unknown market
589587
}
590588

589+
#[test]
590+
fn test_full_book() {
591+
let agg = OrderbookAggregator::new();
592+
593+
assert!(agg.full_book("UNKNOWN").is_none());
594+
595+
// Snapshot creates initial book
596+
let snapshot = OrderbookSnapshotData {
597+
market_ticker: "TEST".to_string(),
598+
yes: Some(vec![[45, 100], [44, 200]]),
599+
yes_dollars: None,
600+
no: Some(vec![[55, 150]]),
601+
no_dollars: None,
602+
};
603+
agg.handle_snapshot(&snapshot);
604+
605+
// Apply deltas: add a new level, remove an existing one
606+
agg.handle_delta(
607+
&OrderbookDeltaData {
608+
market_ticker: "TEST".to_string(),
609+
price: 46,
610+
delta: 75,
611+
side: Side::Yes,
612+
price_dollars: None,
613+
client_order_id: None,
614+
},
615+
Some(1),
616+
);
617+
agg.handle_delta(
618+
&OrderbookDeltaData {
619+
market_ticker: "TEST".to_string(),
620+
price: 44,
621+
delta: -200,
622+
side: Side::Yes,
623+
price_dollars: None,
624+
client_order_id: None,
625+
},
626+
Some(2),
627+
);
628+
629+
let ladder = agg.full_book("TEST").unwrap();
630+
// Level 46 was added, level 44 was removed
631+
assert_eq!(ladder.yes_levels.len(), 2);
632+
assert_eq!(ladder.yes_levels[&46], 75);
633+
assert_eq!(ladder.yes_levels[&45], 100);
634+
assert!(!ladder.yes_levels.contains_key(&44));
635+
// NO side unchanged
636+
assert_eq!(ladder.no_levels[&55], 150);
637+
}
638+
639+
#[test]
640+
fn test_delta_before_snapshot_ignored() {
641+
let agg = OrderbookAggregator::new();
642+
643+
// Deltas before snapshot: no entry exists, delta is dropped
644+
agg.handle_delta(
645+
&OrderbookDeltaData {
646+
market_ticker: "TEST".to_string(),
647+
price: 45,
648+
delta: 100,
649+
side: Side::Yes,
650+
price_dollars: None,
651+
client_order_id: None,
652+
},
653+
Some(1),
654+
);
655+
656+
// No entry was created — full_book returns None
657+
assert!(agg.full_book("TEST").is_none());
658+
assert!(agg.tracked_markets().is_empty());
659+
}
660+
591661
#[test]
592662
fn test_clone_shares_state() {
593663
let agg1 = OrderbookAggregator::new();

src/orderbook/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
//! // Query orderbook state
3131
//! loop {
3232
//! if let Some(summary) = aggregator.summary("TICKER-1") {
33-
//! if summary.initialized {
34-
//! println!("Spread: {:?} cents", summary.spread);
35-
//! println!("Best bid: {:?}", summary.best_bid);
36-
//! println!("Best ask: {:?}", summary.best_ask);
37-
//! }
33+
//! println!("Spread: {:?} cents", summary.spread);
34+
//! println!("Best bid: {:?}", summary.best_bid);
35+
//! println!("Best ask: {:?}", summary.best_ask);
3836
//! }
3937
//! tokio::time::sleep(Duration::from_millis(100)).await;
4038
//! }

src/ws/channel.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,11 @@ impl Channel {
3636

3737
/// Returns true if this channel requires at least one market ticker.
3838
///
39-
/// Channels like `Fill` and `Communications` are user-scoped and don't
40-
/// require market tickers. Market data channels like `OrderbookDelta`,
41-
/// `Ticker`, and `Trade` require at least one market ticker.
39+
/// Only [`OrderbookDelta`](Self::OrderbookDelta) requires market tickers.
40+
/// Other market data channels (`Ticker`, `Trade`, `MarketLifecycle`,
41+
/// `Multivariate`) support subscribing with an empty ticker list to
42+
/// receive updates for all markets. User-scoped channels (`Fill`,
43+
/// `MarketPositions`, `Communications`) never require tickers.
4244
pub fn requires_market_ticker(&self) -> bool {
4345
matches!(self, Self::OrderbookDelta)
4446
}

0 commit comments

Comments
 (0)