Skip to content

Commit c44c605

Browse files
committed
Add msgbus OptionGreeks binary codecs
- Add SBE and Cap'n Proto schemas for `OptionGreeks` - Route `OptionGreeks` egress and ingress through binary codecs - Reorder `OptionGreeks` wire and Python enum positions - Cover binary egress, republish, and optional-field round trips - Call out wire and Python enum breaks in release notes
1 parent 6ce3ce7 commit c44c605

28 files changed

Lines changed: 1706 additions & 268 deletions

File tree

RELEASES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ This release includes many breaking changes across the user-facing Rust v2 APIs.
2020
- Added `RedisCacheConfig`, `PostgresCacheConfig`, and `RedisMessageBusConfig` for Rust factories
2121
- Added SEC1 EC private key support to socket TLS configuration (Rust)
2222
- Added SBE and Cap'n Proto encodings for Rust-native message bus publishers
23+
- Added SBE and Cap'n Proto external msgbus payload support for `OptionGreeks`
2324
- Added `order_position_index` Postgres table for the order-position index; run `make init-db` to migrate
2425
- Added negative price support for `Commodity` instruments in risk checks (#2330), thanks for reporting @fabz1
2526
- Added `add_native_exec_algorithm` and `ExecutionAlgorithmConfig` bindings to the Python v2 backtest engine
@@ -59,6 +60,7 @@ This release includes many breaking changes across the user-facing Rust v2 APIs.
5960
- Changed Parquet catalog write APIs to take borrowed slices instead of owned `Vec` (Rust) (#4296), thanks @sunlei
6061
- Changed `PoolProfiler.price_sqrt_ratio_x96` to return `int` instead of `str`
6162
- Changed PyO3 `DataActor`/`Strategy` historical request `start`/`end` to require UTC datetimes
63+
- Changed Python `NautilusDataType` enum order to put `OptionGreeks` before `InstrumentStatus`
6264
- Changed Redis cache account/order/position storage to event logs; clear old typed state (Rust)
6365
- Changed cache database and message bus backing construction to use factory-owned config structs (Rust)
6466
- Changed Rust actor `self.clock()` to return `ClockApi`; call methods directly instead of borrowing
@@ -68,6 +70,8 @@ This release includes many breaking changes across the user-facing Rust v2 APIs.
6870
- Changed `OrderAny::from_events` to return `OrderReplayError` instead of `anyhow::Error` (Rust)
6971
- Changed `OrderList::validate` to return `OrderListValidationError` instead of `anyhow::Error` (Rust)
7072
- Changed Rust message bus subscriber-count and presence queries to return invalid-topic errors instead of panicking
73+
- Changed Cap'n Proto `DataAny` ordinals to put `OptionGreeks` before instrument schemas
74+
- Changed SBE `DataAny` variants and template IDs to put `OptionGreeks` before instrument schemas
7175
- Changed `SyntheticInstrument` fallible methods to return `SyntheticInstrumentError` instead of `anyhow::Error` (Rust)
7276
- Changed tick scheme constructors and parsing to return `TickSchemeError` instead of `anyhow::Error` (Rust)
7377
- Changed WebSocket and socket `reconnect_timeout_ms` to bound only connection establishment (Rust)

crates/backtest/src/python/engine.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1191,14 +1191,14 @@ fn pyobject_to_data(_py: Python, obj: &Bound<'_, PyAny>) -> PyResult<Data> {
11911191
return Ok(Data::FundingRateUpdate(funding_rate));
11921192
}
11931193

1194-
if let Ok(status) = obj.extract::<InstrumentStatus>() {
1195-
return Ok(Data::InstrumentStatus(status));
1196-
}
1197-
11981194
if let Ok(greeks) = obj.extract::<OptionGreeks>() {
11991195
return Ok(Data::OptionGreeks(greeks));
12001196
}
12011197

1198+
if let Ok(status) = obj.extract::<InstrumentStatus>() {
1199+
return Ok(Data::InstrumentStatus(status));
1200+
}
1201+
12021202
if let Ok(close) = obj.extract::<InstrumentClose>() {
12031203
return Ok(Data::InstrumentClose(close));
12041204
}
@@ -1237,14 +1237,14 @@ fn pyobject_to_data(_py: Python, obj: &Bound<'_, PyAny>) -> PyResult<Data> {
12371237
return Ok(Data::FundingRateUpdate(funding_rate));
12381238
}
12391239

1240-
if let Ok(status) = InstrumentStatus::from_pyobject(obj) {
1241-
return Ok(Data::InstrumentStatus(status));
1242-
}
1243-
12441240
if let Ok(greeks) = OptionGreeks::from_pyobject(obj) {
12451241
return Ok(Data::OptionGreeks(greeks));
12461242
}
12471243

1244+
if let Ok(status) = InstrumentStatus::from_pyobject(obj) {
1245+
return Ok(Data::InstrumentStatus(status));
1246+
}
1247+
12481248
if let Ok(close) = InstrumentClose::from_pyobject(obj) {
12491249
return Ok(Data::InstrumentClose(close));
12501250
}

crates/common/src/msgbus/api.rs

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,6 +1616,8 @@ mod tests {
16161616
use nautilus_model::defi::{
16171617
AmmType, Chain, Dex, DexType, PoolIdentifier, PoolLiquidityUpdateType, Token,
16181618
};
1619+
#[cfg(any(feature = "sbe", feature = "capnp"))]
1620+
use nautilus_model::{data::OptionGreekValues, enums::GreeksConvention};
16191621
use nautilus_model::{
16201622
data::{
16211623
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks,
@@ -1949,6 +1951,28 @@ mod tests {
19491951
)
19501952
}
19511953

1954+
#[cfg(any(feature = "sbe", feature = "capnp"))]
1955+
fn option_greeks() -> OptionGreeks {
1956+
OptionGreeks {
1957+
instrument_id: InstrumentId::from("BTC-30JUN23-40000-C.DERIBIT"),
1958+
convention: GreeksConvention::PriceAdjusted,
1959+
greeks: OptionGreekValues {
1960+
delta: 0.525,
1961+
gamma: 0.00032,
1962+
vega: 12.25,
1963+
theta: -0.72,
1964+
rho: 0.18,
1965+
},
1966+
mark_iv: Some(0.0),
1967+
bid_iv: None,
1968+
ask_iv: Some(0.54),
1969+
underlying_price: Some(41_500.25),
1970+
open_interest: Some(0.0),
1971+
ts_event: UnixNanos::from(20),
1972+
ts_init: UnixNanos::from(21),
1973+
}
1974+
}
1975+
19521976
fn portfolio_snapshot() -> PortfolioSnapshot {
19531977
PortfolioSnapshot::new(
19541978
AccountId::from("SIM-001"),
@@ -2529,6 +2553,15 @@ mod tests {
25292553
subscribe_funding_rates,
25302554
assert_eq_ref,
25312555
);
2556+
assert_typed_external_round_trips(
2557+
encoding,
2558+
BusPayloadType::OptionGreeks,
2559+
"data.option_greeks.BTC-30JUN23-40000-C.DERIBIT",
2560+
option_greeks(),
2561+
publish_option_greeks,
2562+
subscribe_option_greeks,
2563+
assert_eq_ref,
2564+
);
25322565
}
25332566

25342567
#[cfg(feature = "sbe")]
@@ -2545,7 +2578,6 @@ mod tests {
25452578

25462579
#[rstest]
25472580
#[case(BusPayloadType::AccountState)]
2548-
#[case(BusPayloadType::OptionGreeks)]
25492581
fn republish_external_message_skips_unsupported_binary_payload(
25502582
#[case] payload_type: BusPayloadType,
25512583
) {
@@ -2560,16 +2592,6 @@ mod tests {
25602592
}),
25612593
None,
25622594
);
2563-
let greeks_received = received.clone();
2564-
subscribe_option_greeks(
2565-
"events.unsupported.*".into(),
2566-
TypedHandler::from(move |greeks: &OptionGreeks| {
2567-
greeks_received
2568-
.borrow_mut()
2569-
.push(serde_json::to_value(greeks).unwrap());
2570-
}),
2571-
None,
2572-
);
25732595

25742596
for encoding in [SerializationEncoding::Sbe, SerializationEncoding::Capnp] {
25752597
let message = BusMessage::with_str_topic(
@@ -2631,6 +2653,26 @@ mod tests {
26312653
reset_message_bus();
26322654
}
26332655

2656+
#[cfg(feature = "sbe")]
2657+
#[rstest]
2658+
fn publish_option_greeks_sbe_forwards_decodable_payload_to_external_egress() {
2659+
let publications = install_capturing_external_egress(SerializationEncoding::Sbe);
2660+
let greeks = option_greeks();
2661+
2662+
publish_option_greeks("data.option_greeks.TEST".into(), &greeks);
2663+
2664+
let publications = publications.borrow();
2665+
assert_eq!(publications.len(), 1);
2666+
assert_eq!(publications[0].topic, "data.option_greeks.TEST");
2667+
assert_eq!(
2668+
OptionGreeks::from_sbe(&publications[0].payload)
2669+
.expect("SBE payload must decode as OptionGreeks"),
2670+
greeks
2671+
);
2672+
drop(publications);
2673+
reset_message_bus();
2674+
}
2675+
26342676
#[cfg(not(feature = "sbe"))]
26352677
#[rstest]
26362678
fn publish_quote_sbe_without_feature_drops_payload() {
@@ -2669,6 +2711,32 @@ mod tests {
26692711
reset_message_bus();
26702712
}
26712713

2714+
#[cfg(feature = "capnp")]
2715+
#[rstest]
2716+
fn publish_option_greeks_capnp_forwards_decodable_payload_to_external_egress() {
2717+
let publications = install_capturing_external_egress(SerializationEncoding::Capnp);
2718+
let greeks = option_greeks();
2719+
2720+
publish_option_greeks("data.option_greeks.TEST".into(), &greeks);
2721+
2722+
let publications = publications.borrow();
2723+
assert_eq!(publications.len(), 1);
2724+
assert_eq!(publications[0].topic, "data.option_greeks.TEST");
2725+
let reader = capnp::serialize::read_message(
2726+
&mut &publications[0].payload[..],
2727+
capnp::message::ReaderOptions::new(),
2728+
)
2729+
.expect("Cap'n Proto payload must be readable");
2730+
let root = reader
2731+
.get_root::<market_capnp::option_greeks::Reader>()
2732+
.expect("Cap'n Proto payload must have an OptionGreeks root");
2733+
let decoded = OptionGreeks::from_capnp(root)
2734+
.expect("Cap'n Proto payload must decode as OptionGreeks");
2735+
assert_eq!(decoded, greeks);
2736+
drop(publications);
2737+
reset_message_bus();
2738+
}
2739+
26722740
#[cfg(not(feature = "capnp"))]
26732741
#[rstest]
26742742
fn publish_quote_capnp_without_feature_drops_payload() {

crates/common/src/msgbus/external/codec/capnp.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use std::any::Any;
1818
use anyhow::Context;
1919
use bytes::Bytes;
2020
use nautilus_model::data::{
21-
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10,
22-
QuoteTick, TradeTick,
21+
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks, OrderBookDeltas,
22+
OrderBookDepth10, QuoteTick, TradeTick,
2323
};
2424
use nautilus_serialization::{
2525
capnp::{FromCapnp, ToCapnp},
@@ -95,6 +95,12 @@ define_deserializer!(
9595
"FundingRateUpdate",
9696
market_capnp::funding_rate_update::Reader
9797
);
98+
define_deserializer!(
99+
deserialize_option_greeks,
100+
OptionGreeks,
101+
"OptionGreeks",
102+
market_capnp::option_greeks::Reader
103+
);
98104

99105
macro_rules! serialize_payload_as {
100106
($message:expr, $type_name:expr, $ty:ty, $root:ty) => {{
@@ -171,6 +177,12 @@ pub(super) fn serialize_payload(
171177
FundingRateUpdate,
172178
market_capnp::funding_rate_update::Builder
173179
),
180+
BusPayloadType::OptionGreeks => serialize_payload_as!(
181+
message,
182+
type_name,
183+
OptionGreeks,
184+
market_capnp::option_greeks::Builder
185+
),
174186
_ => Err(PayloadCodecError::Dropped(format!(
175187
"Cap'n Proto serialization is not supported for {type_name}"
176188
))),

crates/common/src/msgbus/external/codec/capnp_unavailable.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use std::any::Any;
1717

1818
use bytes::Bytes;
1919
use nautilus_model::data::{
20-
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10,
21-
QuoteTick, TradeTick,
20+
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks, OrderBookDeltas,
21+
OrderBookDepth10, QuoteTick, TradeTick,
2222
};
2323

2424
use super::PayloadCodecError;
@@ -40,6 +40,7 @@ define_deserializer!(deserialize_bar, Bar);
4040
define_deserializer!(deserialize_mark_price, MarkPriceUpdate);
4141
define_deserializer!(deserialize_index_price, IndexPriceUpdate);
4242
define_deserializer!(deserialize_funding_rate, FundingRateUpdate);
43+
define_deserializer!(deserialize_option_greeks, OptionGreeks);
4344

4445
pub(super) fn serialize_payload(
4546
payload_type: BusPayloadType,

crates/common/src/msgbus/external/codec/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ use std::any::Any;
1919

2020
use bytes::Bytes;
2121
use nautilus_model::data::{
22-
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10,
23-
QuoteTick, TradeTick,
22+
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks, OrderBookDeltas,
23+
OrderBookDepth10, QuoteTick, TradeTick,
2424
};
2525
use serde::de::DeserializeOwned;
2626

@@ -127,6 +127,7 @@ define_market_data_deserializer!(
127127
FundingRateUpdate,
128128
FundingRateUpdate
129129
);
130+
define_market_data_deserializer!(deserialize_option_greeks, OptionGreeks, OptionGreeks);
130131

131132
fn deserialize_market_data_payload<T>(
132133
payload_type: BusPayloadType,

crates/common/src/msgbus/external/codec/sbe.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use std::any::Any;
1717

1818
use bytes::Bytes;
1919
use nautilus_model::data::{
20-
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10,
21-
QuoteTick, TradeTick,
20+
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks, OrderBookDeltas,
21+
OrderBookDepth10, QuoteTick, TradeTick,
2222
};
2323
use nautilus_serialization::sbe::{FromSbe, ToSbe};
2424

@@ -64,6 +64,7 @@ define_deserializer!(
6464
FundingRateUpdate,
6565
"FundingRateUpdate"
6666
);
67+
define_deserializer!(deserialize_option_greeks, OptionGreeks, "OptionGreeks");
6768

6869
pub(super) fn serialize_payload(
6970
payload_type: BusPayloadType,
@@ -89,6 +90,7 @@ pub(super) fn serialize_payload(
8990
BusPayloadType::FundingRateUpdate => {
9091
serialize_payload_as::<FundingRateUpdate>(type_name, message)
9192
}
93+
BusPayloadType::OptionGreeks => serialize_payload_as::<OptionGreeks>(type_name, message),
9294
_ => Err(PayloadCodecError::Dropped(format!(
9395
"SBE serialization is not supported for {type_name}"
9496
))),

crates/common/src/msgbus/external/codec/sbe_unavailable.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use std::any::Any;
1717

1818
use bytes::Bytes;
1919
use nautilus_model::data::{
20-
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10,
21-
QuoteTick, TradeTick,
20+
Bar, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate, OptionGreeks, OrderBookDeltas,
21+
OrderBookDepth10, QuoteTick, TradeTick,
2222
};
2323

2424
use super::PayloadCodecError;
@@ -40,6 +40,7 @@ define_deserializer!(deserialize_bar, Bar);
4040
define_deserializer!(deserialize_mark_price, MarkPriceUpdate);
4141
define_deserializer!(deserialize_index_price, IndexPriceUpdate);
4242
define_deserializer!(deserialize_funding_rate, FundingRateUpdate);
43+
define_deserializer!(deserialize_option_greeks, OptionGreeks);
4344

4445
pub(super) fn serialize_payload(
4546
payload_type: BusPayloadType,

crates/common/src/msgbus/external/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,11 +190,11 @@ pub fn republish_external_message(message: &BusMessage) -> anyhow::Result<()> {
190190
publish_funding_rate,
191191
)?,
192192
BusPayloadType::OptionGreeks => {
193-
handle_json_msgpack(
193+
handle_market_data(
194194
topic,
195-
message.payload_type,
196195
message.encoding,
197196
&message.payload,
197+
codec::deserialize_option_greeks,
198198
publish_option_greeks,
199199
)?;
200200
}

crates/common/src/msgbus/message.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ impl BusPayloadType {
196196
| Self::MarkPriceUpdate
197197
| Self::IndexPriceUpdate
198198
| Self::FundingRateUpdate
199+
| Self::OptionGreeks
199200
)
200201
}
201202
}
@@ -357,7 +358,7 @@ mod tests {
357358
#[case(BusPayloadType::AccountState, BusPayloadCategory::BuiltIn)]
358359
#[case(BusPayloadType::OrderEvent, BusPayloadCategory::BuiltIn)]
359360
#[case(BusPayloadType::Instrument, BusPayloadCategory::Other)]
360-
#[case(BusPayloadType::OptionGreeks, BusPayloadCategory::Other)]
361+
#[case(BusPayloadType::OptionGreeks, BusPayloadCategory::MarketData)]
361362
#[case(
362363
BusPayloadType::Custom(Ustr::from("CustomPayload")),
363364
BusPayloadCategory::Other
@@ -391,7 +392,22 @@ mod tests {
391392
#[case(BusPayloadType::AccountState, SerializationEncoding::Json, true)]
392393
#[case(BusPayloadType::AccountState, SerializationEncoding::Capnp, false)]
393394
#[case(BusPayloadType::Instrument, SerializationEncoding::Sbe, false)]
394-
#[case(BusPayloadType::OptionGreeks, SerializationEncoding::Capnp, false)]
395+
#[cfg_attr(
396+
feature = "sbe",
397+
case(BusPayloadType::OptionGreeks, SerializationEncoding::Sbe, true)
398+
)]
399+
#[cfg_attr(
400+
not(feature = "sbe"),
401+
case(BusPayloadType::OptionGreeks, SerializationEncoding::Sbe, false)
402+
)]
403+
#[cfg_attr(
404+
feature = "capnp",
405+
case(BusPayloadType::OptionGreeks, SerializationEncoding::Capnp, true)
406+
)]
407+
#[cfg_attr(
408+
not(feature = "capnp"),
409+
case(BusPayloadType::OptionGreeks, SerializationEncoding::Capnp, false)
410+
)]
395411
#[case(
396412
BusPayloadType::Custom(Ustr::from("CustomPayload")),
397413
SerializationEncoding::MsgPack,

0 commit comments

Comments
 (0)