Skip to content

Commit d9be91c

Browse files
committed
feat: add fixed-point decimal fields and WebSocket sharding support
Add support for latest Kalshi API changes: - Add _fp (fixed-point decimal string) fields to models: - Market: volume_fp, volume_24h_fp, open_interest_fp - Trade: count_fp - Candlestick: volume_fp, open_interest_fp - Order: fill_count_fp, remaining_count_fp, initial_count_fp - Fill: count_fp - Position: position_fp, resting_orders_count_fp, total_cost_shares_fp - Series: volume, volume_fp - Settlement: yes_count_fp, no_count_fp - Add include_latest_before_start parameter to candlestick queries - Add WebSocket communications channel sharding support: - CommunicationsSharding struct for shard_factor/shard_key config - subscribe_communications_sharded() method on KalshiStreamHandle - Updated build_subscribe() to include sharding parameters
1 parent dbbc2ed commit d9be91c

File tree

12 files changed

+254
-8
lines changed

12 files changed

+254
-8
lines changed

src/batch.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,8 +772,11 @@ mod tests {
772772
yes_price_dollars: None,
773773
no_price_dollars: None,
774774
fill_count: 0,
775+
fill_count_fp: None,
775776
remaining_count: 0,
777+
remaining_count_fp: None,
776778
initial_count: 10,
779+
initial_count_fp: None,
777780
taker_fees: None,
778781
maker_fees: None,
779782
taker_fill_cost: None,

src/models/fill.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ pub struct Fill {
2020
pub side: Side,
2121
pub action: Action,
2222
pub count: i64,
23+
/// Count (fixed-point decimal string).
24+
#[serde(default)]
25+
pub count_fp: Option<String>,
2326
/// Price in cents.
2427
pub yes_price: i64,
2528
/// Price in cents.

src/models/market.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,21 @@ pub struct Market {
206206
/// Total contracts traded.
207207
#[serde(default)]
208208
pub volume: Option<i64>,
209+
/// Total contracts traded (fixed-point decimal string).
210+
#[serde(default)]
211+
pub volume_fp: Option<String>,
209212
/// 24-hour trading volume.
210213
#[serde(default)]
211214
pub volume_24h: Option<i64>,
215+
/// 24-hour trading volume (fixed-point decimal string).
216+
#[serde(default)]
217+
pub volume_24h_fp: Option<String>,
212218
/// Contracts outstanding.
213219
#[serde(default)]
214220
pub open_interest: Option<i64>,
221+
/// Contracts outstanding (fixed-point decimal string).
222+
#[serde(default)]
223+
pub open_interest_fp: Option<String>,
215224

216225
/// Notional value per contract in dollars.
217226
#[serde(default)]
@@ -592,6 +601,9 @@ pub struct Trade {
592601
pub price: Option<f64>,
593602
/// Contract quantity.
594603
pub count: i64,
604+
/// Contract quantity (fixed-point decimal string).
605+
#[serde(default)]
606+
pub count_fp: Option<String>,
595607
/// Yes side price in cents.
596608
pub yes_price: i64,
597609
/// No side price in cents.
@@ -798,9 +810,15 @@ pub struct Candlestick {
798810
/// Trading volume during the period.
799811
#[serde(default)]
800812
pub volume: Option<i64>,
813+
/// Trading volume during the period (fixed-point decimal string).
814+
#[serde(default)]
815+
pub volume_fp: Option<String>,
801816
/// Open interest at end of period.
802817
#[serde(default)]
803818
pub open_interest: Option<i64>,
819+
/// Open interest at end of period (fixed-point decimal string).
820+
#[serde(default)]
821+
pub open_interest_fp: Option<String>,
804822
}
805823

806824
/// Response from GET /series/{series_ticker}/markets/{ticker}/candlesticks.
@@ -837,6 +855,9 @@ pub struct GetCandlesticksParams {
837855
pub end_ts: i64,
838856
/// Candlestick period interval.
839857
pub period_interval: CandlestickPeriod,
858+
/// Include synthetic candlestick before start_ts for price continuity.
859+
#[serde(skip_serializing_if = "Option::is_none")]
860+
pub include_latest_before_start: Option<bool>,
840861
}
841862

842863
impl GetCandlesticksParams {
@@ -868,15 +889,27 @@ impl GetCandlesticksParams {
868889
start_ts,
869890
end_ts,
870891
period_interval,
892+
include_latest_before_start: None,
871893
})
872894
}
873895

896+
/// Include synthetic candlestick before start_ts for price continuity.
897+
#[must_use]
898+
pub fn include_latest_before_start(mut self, include: bool) -> Self {
899+
self.include_latest_before_start = Some(include);
900+
self
901+
}
902+
874903
#[must_use]
875904
pub fn to_query_string(&self) -> String {
876905
let mut qb = QueryBuilder::new();
877906
qb.push("start_ts", self.start_ts);
878907
qb.push("end_ts", self.end_ts);
879908
qb.push("period_interval", self.period_interval.as_minutes());
909+
qb.push_opt(
910+
"include_latest_before_start",
911+
self.include_latest_before_start,
912+
);
880913
qb.build()
881914
}
882915
}
@@ -1091,13 +1124,20 @@ mod tests {
10911124
"status": "active",
10921125
"title": "Will Bitcoin reach $50,000?",
10931126
"volume": 1000,
1094-
"open_interest": 500
1127+
"volume_fp": "1000.5",
1128+
"volume_24h": 500,
1129+
"volume_24h_fp": "500.25",
1130+
"open_interest": 250,
1131+
"open_interest_fp": "250.125"
10951132
}"#;
10961133
let market: Market = serde_json::from_str(json).unwrap();
10971134
assert_eq!(market.ticker, "KXBTC-25JAN10-B50000");
10981135
assert_eq!(market.market_type, MarketType::Binary);
10991136
assert_eq!(market.status, MarketStatus::Active);
11001137
assert_eq!(market.volume, Some(1000));
1138+
assert_eq!(market.volume_fp, Some("1000.5".to_string()));
1139+
assert_eq!(market.volume_24h_fp, Some("500.25".to_string()));
1140+
assert_eq!(market.open_interest_fp, Some("250.125".to_string()));
11011141
}
11021142

11031143
#[test]
@@ -1152,6 +1192,7 @@ mod tests {
11521192
"trade_id": "abc123",
11531193
"ticker": "KXBTC-25JAN10-B50000",
11541194
"count": 10,
1195+
"count_fp": "10.5",
11551196
"yes_price": 50,
11561197
"no_price": 50,
11571198
"taker_side": "yes",
@@ -1160,6 +1201,7 @@ mod tests {
11601201
let trade: Trade = serde_json::from_str(json).unwrap();
11611202
assert_eq!(trade.trade_id, "abc123");
11621203
assert_eq!(trade.count, 10);
1204+
assert_eq!(trade.count_fp, Some("10.5".to_string()));
11631205
assert_eq!(trade.taker_side, TakerSide::Yes);
11641206
}
11651207

src/models/order.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,17 @@ pub struct Order {
4242
#[serde(default)]
4343
pub no_price_dollars: Option<String>,
4444
pub fill_count: i64,
45+
/// Fill count (fixed-point decimal string).
46+
#[serde(default)]
47+
pub fill_count_fp: Option<String>,
4548
pub remaining_count: i64,
49+
/// Remaining count (fixed-point decimal string).
50+
#[serde(default)]
51+
pub remaining_count_fp: Option<String>,
4652
pub initial_count: i64,
53+
/// Initial count (fixed-point decimal string).
54+
#[serde(default)]
55+
pub initial_count_fp: Option<String>,
4756
/// Fees in cents.
4857
#[serde(default)]
4958
pub taker_fees: Option<i64>,

src/models/position.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ pub struct MarketPosition {
1414
pub total_traded_dollars: Option<String>,
1515
/// Negative = NO position, positive = YES position.
1616
pub position: i64,
17+
/// Position (fixed-point decimal string).
18+
#[serde(default)]
19+
pub position_fp: Option<String>,
1720
/// Position cost in cents.
1821
pub market_exposure: i64,
1922
#[serde(default)]
@@ -23,6 +26,9 @@ pub struct MarketPosition {
2326
#[serde(default)]
2427
pub realized_pnl_dollars: Option<String>,
2528
pub resting_orders_count: i64,
29+
/// Resting orders count (fixed-point decimal string).
30+
#[serde(default)]
31+
pub resting_orders_count_fp: Option<String>,
2632
/// In cents.
2733
pub fees_paid: i64,
2834
#[serde(default)]
@@ -40,6 +46,9 @@ pub struct EventPosition {
4046
#[serde(default)]
4147
pub total_cost_dollars: Option<String>,
4248
pub total_cost_shares: i64,
49+
/// Total cost shares (fixed-point decimal string).
50+
#[serde(default)]
51+
pub total_cost_shares_fp: Option<String>,
4352
/// In cents.
4453
pub event_exposure: i64,
4554
#[serde(default)]

src/models/series.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,14 @@ pub struct Series {
100100
pub frequency: String,
101101
/// Title of the series.
102102
pub title: String,
103+
/// Total contracts traded across all events in this series.
104+
/// Only present when `include_volume=true` in the request.
105+
#[serde(default)]
106+
pub volume: Option<i64>,
107+
/// Total contracts traded (fixed-point decimal string).
108+
/// Only present when `include_volume=true` in the request.
109+
#[serde(default)]
110+
pub volume_fp: Option<String>,
103111
}
104112

105113
/// Response from GET /series/{series_ticker}.

src/models/settlement.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,16 @@ pub struct Settlement {
1616
pub market_result: MarketResult,
1717
/// Number of YES contracts held at settlement.
1818
pub yes_count: i64,
19+
/// Number of YES contracts (fixed-point decimal string).
20+
#[serde(default)]
21+
pub yes_count_fp: Option<String>,
1922
/// Total cost of YES contracts in cents.
2023
pub yes_total_cost: i64,
2124
/// Number of NO contracts held at settlement.
2225
pub no_count: i64,
26+
/// Number of NO contracts (fixed-point decimal string).
27+
#[serde(default)]
28+
pub no_count_fp: Option<String>,
2329
/// Total cost of NO contracts in cents.
2430
pub no_total_cost: i64,
2531
/// Revenue from settlement in cents.

src/ws.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ use std::time::Duration;
170170

171171
pub use channel::Channel;
172172
pub use client::{KalshiStreamClient, KalshiStreamHandle};
173-
pub use command::{SubscribeResult, UnsubscribeResult};
173+
pub use command::{CommunicationsSharding, SubscribeResult, UnsubscribeResult};
174174
pub use message::{
175175
Action, FillData, MarketLifecycleData, MarketLifecycleEventType, MarketPositionData,
176176
MultivariateLookupData, MveLeg, OrderbookDeltaData, OrderbookSnapshotData, Side, StreamMessage,

src/ws/client.rs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,11 +613,77 @@ impl KalshiStreamHandle {
613613

614614
// --- Internal methods ---
615615

616+
/// Subscribe to the communications channel with sharding for high-throughput RFQ/quote traffic.
617+
///
618+
/// Sharding allows distributing communications traffic across multiple connections.
619+
/// Each connection specifies a shard_factor (total number of shards) and shard_key
620+
/// (which shard this connection handles).
621+
///
622+
/// # Arguments
623+
///
624+
/// * `sharding` - Sharding configuration specifying shard_factor and shard_key.
625+
///
626+
/// # Example
627+
///
628+
/// ```no_run
629+
/// # use kalshi_trade_rs::ws::{Channel, KalshiStreamHandle, CommunicationsSharding};
630+
/// # async fn example(handle: &mut KalshiStreamHandle) -> Result<(), Box<dyn std::error::Error>> {
631+
/// // Set up 4 shards, this connection handles shard 0
632+
/// let sharding = CommunicationsSharding::new(4, 0);
633+
/// handle.subscribe_communications_sharded(sharding).await?;
634+
/// # Ok(())
635+
/// # }
636+
/// ```
637+
pub async fn subscribe_communications_sharded(
638+
&mut self,
639+
sharding: super::command::CommunicationsSharding,
640+
) -> Result<()> {
641+
let result = self
642+
.subscribe_raw_with_sharding(&[Channel::Communications], &[], Some(sharding))
643+
.await?;
644+
645+
// Check for failures
646+
if !result.failed.is_empty() {
647+
let errors: Vec<String> = result
648+
.failed
649+
.iter()
650+
.map(|e| format!("{}: {}", e.code, e.message))
651+
.collect();
652+
return Err(Error::Api(errors.join("; ")));
653+
}
654+
655+
// Update state if successful
656+
if let Some(sub) = result.successful.first() {
657+
let mut subs = self
658+
.subscriptions
659+
.write()
660+
.expect("subscription lock poisoned");
661+
subs.entry(Channel::Communications)
662+
.or_insert(SubscriptionState {
663+
sid: sub.sid,
664+
markets: std::collections::HashSet::new(),
665+
});
666+
}
667+
668+
Ok(())
669+
}
670+
616671
/// Raw subscribe without local state management.
617672
async fn subscribe_raw(
618673
&self,
619674
channels: &[Channel],
620675
markets: &[&str],
676+
) -> Result<SubscribeResult> {
677+
self.subscribe_raw_with_sharding(channels, markets, None)
678+
.await
679+
}
680+
681+
/// Raw subscribe with optional sharding support.
682+
async fn subscribe_raw_with_sharding(
683+
&self,
684+
channels: &[Channel],
685+
markets: &[&str],
686+
sharding: Option<super::command::CommunicationsSharding>,
621687
) -> Result<SubscribeResult> {
622688
let (tx, rx) = oneshot::channel();
623689

@@ -629,6 +695,7 @@ impl KalshiStreamHandle {
629695
let cmd = StreamCommand::Subscribe {
630696
channels: channel_strings,
631697
market_tickers: market_strings,
698+
sharding,
632699
response: tx,
633700
};
634701

@@ -786,6 +853,7 @@ mod tests {
786853
channels,
787854
market_tickers,
788855
response,
856+
..
789857
}) => {
790858
// Verify correct command type and params
791859
assert_eq!(channels, vec!["ticker"]);

0 commit comments

Comments
 (0)