Skip to content

Commit 826c559

Browse files
authored
Merge pull request #31 from jaredmcqueen/implement-ws-user-orders
Implement ws user orders
2 parents 12ad5b6 + 92903c8 commit 826c559

File tree

3 files changed

+163
-1
lines changed

3 files changed

+163
-1
lines changed

examples/stream_user_orders.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
//! Example: Stream user order updates via the `user_orders` channel.
2+
//!
3+
//! This example demonstrates subscribing to the authenticated `user_orders`
4+
//! channel, which delivers real-time notifications whenever your orders are
5+
//! created, filled, canceled, or otherwise updated.
6+
//!
7+
//! # Usage
8+
//!
9+
//! ```bash
10+
//! KALSHI_API_KEY_ID=your_key KALSHI_PRIVATE_KEY_PATH=path/to/key.pem \
11+
//! cargo run --example stream_user_orders
12+
//! ```
13+
//!
14+
//! To filter updates to specific markets, pass tickers:
15+
//!
16+
//! ```bash
17+
//! KALSHI_API_KEY_ID=your_key KALSHI_PRIVATE_KEY_PATH=path/to/key.pem \
18+
//! MARKET_TICKERS="INXD-25JAN17-B5955,KXBTC-25DEC31-100000" \
19+
//! cargo run --example stream_user_orders
20+
//! ```
21+
//!
22+
//! # Notes
23+
//!
24+
//! - Requires valid API credentials
25+
//! - Omit `MARKET_TICKERS` to receive updates for all your orders
26+
//! - For testing, run the `trading.rs` example in another terminal
27+
28+
use std::time::Duration;
29+
use tokio::time::timeout;
30+
31+
use kalshi_trade_rs::{
32+
auth::KalshiConfig,
33+
ws::{Channel, KalshiStreamClient, StreamMessage},
34+
};
35+
36+
#[tokio::main]
37+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
38+
dotenvy::dotenv().ok();
39+
40+
tracing_subscriber::fmt()
41+
.with_env_filter(
42+
tracing_subscriber::EnvFilter::from_default_env()
43+
.add_directive("kalshi_trade_rs=debug".parse()?),
44+
)
45+
.init();
46+
47+
let config = KalshiConfig::from_env()?;
48+
println!(
49+
"Connecting to Kalshi {:?} environment...",
50+
config.environment
51+
);
52+
53+
let client = KalshiStreamClient::connect(&config).await?;
54+
let mut handle = client.handle();
55+
println!("Connected!\n");
56+
57+
// Optionally filter to specific markets via MARKET_TICKERS env var.
58+
// Leave unset to receive updates for all your orders.
59+
let tickers_env = std::env::var("MARKET_TICKERS").unwrap_or_default();
60+
let markets: Vec<&str> = if tickers_env.is_empty() {
61+
vec![]
62+
} else {
63+
tickers_env.split(',').map(str::trim).collect()
64+
};
65+
66+
if markets.is_empty() {
67+
println!("Subscribing to user_orders (all markets)...");
68+
} else {
69+
println!("Subscribing to user_orders for: {:?}", markets);
70+
}
71+
72+
handle.subscribe(Channel::UserOrders, &markets).await?;
73+
println!("Subscribed!\n");
74+
75+
println!("Waiting for order updates (60 seconds)...");
76+
println!("Tip: Place or cancel orders in another terminal to see updates\n");
77+
78+
let deadline = Duration::from_secs(300);
79+
let start = std::time::Instant::now();
80+
81+
loop {
82+
if start.elapsed() > deadline {
83+
println!("\nReached time limit, shutting down...");
84+
break;
85+
}
86+
87+
match timeout(Duration::from_secs(10), handle.update_receiver.recv()).await {
88+
Ok(Ok(update)) => match &update.msg {
89+
StreamMessage::Closed { reason } => {
90+
println!("[CLOSED] {}", reason);
91+
break;
92+
}
93+
StreamMessage::ConnectionLost { reason, .. } => {
94+
println!("[CONNECTION LOST] {}", reason);
95+
break;
96+
}
97+
StreamMessage::UserOrder(order) => {
98+
println!(
99+
"[ORDER] {} | id={} | status={} | side={} | price={} | initial={} remaining={} filled={}",
100+
order.ticker.as_deref().unwrap_or("?"),
101+
order.order_id,
102+
order
103+
.status
104+
.as_ref()
105+
.map(|s| format!("{:?}", s))
106+
.unwrap_or_else(|| "?".into()),
107+
order
108+
.side
109+
.as_ref()
110+
.map(|s| format!("{:?}", s))
111+
.unwrap_or_else(|| "?".into()),
112+
order.yes_price_dollars.as_deref().unwrap_or("?"),
113+
order.initial_count_fp.as_deref().unwrap_or("?"),
114+
order.remaining_count_fp.as_deref().unwrap_or("?"),
115+
order.fill_count_fp.as_deref().unwrap_or("?"),
116+
);
117+
if let Some(client_id) = &order.client_order_id {
118+
println!(" client_order_id={}", client_id);
119+
}
120+
}
121+
StreamMessage::Unsubscribed => {
122+
println!("[UNSUBSCRIBED] sid={}", update.sid);
123+
}
124+
_ => {
125+
println!("[OTHER] channel={} sid={}", update.channel, update.sid);
126+
}
127+
},
128+
Ok(Err(tokio::sync::broadcast::error::RecvError::Lagged(n))) => {
129+
println!("[WARN] Dropped {} messages (slow consumer)", n);
130+
}
131+
Ok(Err(tokio::sync::broadcast::error::RecvError::Closed)) => {
132+
println!("[ERROR] Channel closed");
133+
break;
134+
}
135+
Err(_) => {
136+
println!("[INFO] No updates in last 10 seconds (waiting for activity)...");
137+
}
138+
}
139+
}
140+
141+
println!("Unsubscribing...");
142+
handle.unsubscribe_all(Channel::UserOrders).await?;
143+
144+
println!("Shutting down...");
145+
client.shutdown().await?;
146+
147+
println!("Done!");
148+
Ok(())
149+
}

src/models/order.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,7 @@ pub struct QueuePosition {
910910
#[derive(Debug, Clone, Serialize, Deserialize)]
911911
pub struct QueuePositionsResponse {
912912
/// Queue positions for the requested orders.
913+
#[serde(default, deserialize_with = "null_as_empty_vec::deserialize")]
913914
pub queue_positions: Vec<QueuePosition>,
914915
}
915916

@@ -974,6 +975,18 @@ impl GetQueuePositionsParams {
974975
}
975976
}
976977

978+
mod null_as_empty_vec {
979+
use serde::{Deserialize, Deserializer};
980+
981+
pub fn deserialize<'de, D, T>(deserializer: D) -> Result<Vec<T>, D::Error>
982+
where
983+
D: Deserializer<'de>,
984+
T: Deserialize<'de>,
985+
{
986+
Ok(Option::<Vec<T>>::deserialize(deserializer)?.unwrap_or_default())
987+
}
988+
}
989+
977990
#[cfg(test)]
978991
mod tests {
979992
use super::*;

src/ws/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -732,7 +732,7 @@ impl StreamMessage {
732732
"order_group_updates" => serde_json::from_value::<OrderGroupUpdateData>(value)
733733
.map(StreamMessage::OrderGroupUpdate),
734734
// User order update notifications
735-
"user_orders" => serde_json::from_value::<UserOrderData>(value)
735+
"user_order" | "user_orders" => serde_json::from_value::<UserOrderData>(value)
736736
.map(|d| StreamMessage::UserOrder(Box::new(d))),
737737
// Multivariate lookup notifications
738738
"multivariate_lookup" => serde_json::from_value::<MultivariateLookupData>(value)

0 commit comments

Comments
 (0)