Skip to content

Commit 241582d

Browse files
committed
Improve execution netting reconciliation
- Scope NETTING reconciliation by account and flag split ownership - Reject reduce-only NETTING fills that would open phantom positions - Route claimed reconciliation orders and fills to strategies - Cover reduce-only NETTING paths in Rust and Python - Document live reconciliation claim and ownership behavior - Resolves to #4106
1 parent 9770dab commit 241582d

12 files changed

Lines changed: 1053 additions & 27 deletions

File tree

RELEASES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ Released on TBD (UTC).
2020
- Added `register_custom_data_from_manifest` to register plug-in custom data with `DataRegistry` at load time (Rust)
2121
- Added `config_json` argument to plug-in `create` thunks and `PluginActor::new`/`PluginStrategy::new` (Rust)
2222
- Added portfolio PyO3 bindings and `Strategy.portfolio` access (#4085), thanks @ms32035
23+
- Added beta-weighted vega greeks against volatility index instruments (#4097), thanks @faysou
2324
- Added Binance Futures liquidation custom data subscriptions (#4095), thanks @graceyangfan
2425
- Added Deribit `option_combo` and `future_combo` parsing as `OptionSpread`/`FuturesSpread` instruments
2526
- Added Deribit combo trade leg parsing (`legs[]`, `combo_id`, `combo_trade_id`) on public trade messages
@@ -48,7 +49,9 @@ None
4849
- Fixed dYdX rate limiter being skipped due to missing keys (#4091), thanks @filipmacek
4950
- Fixed Hyperliquid `Alo` limit order status reports being parsed as trigger orders
5051
- Fixed `LiveNode` signal handling during startup connection wait (#4102), thanks @filipmacek
52+
- Fixed NETTING reconciliation opening phantom reduce-only positions (#4106), thanks for reporting @M-at-ti-a
5153
- Fixed Python `ShutdownSystem` dict serialization to round-trip `correlation_id` (was previously dropped)
54+
- Fixed unbounded Cache VecDeque memory leak in Rust (#4107), thanks @filipmacek
5255
- Fixed `BacktestEngine` option positions remaining open when data stops before expiry
5356

5457
### Internal Improvements

crates/execution/src/engine/mod.rs

Lines changed: 297 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use config::ExecutionEngineConfig;
3636
use futures::future::join_all;
3737
use indexmap::{IndexMap, IndexSet};
3838
use nautilus_common::{
39-
cache::{Cache, CacheSnapshotRef},
39+
cache::{Cache, CacheSnapshotRef, PositionRef},
4040
clients::ExecutionClient,
4141
clock::Clock,
4242
generators::position_id::PositionIdGenerator,
@@ -573,6 +573,9 @@ impl ExecutionEngine {
573573

574574
/// Registers external order claims for a strategy.
575575
///
576+
/// Venue-sourced external orders, fills, and materialized reconciliation activity for matching
577+
/// instruments will be associated with the strategy.
578+
///
576579
/// This operation is atomic: either all instruments are registered or none are.
577580
///
578581
/// # Errors
@@ -1507,17 +1510,23 @@ impl ExecutionEngine {
15071510
) {
15081511
log::debug!("Reconciling NET position for {}", report.instrument_id);
15091512

1510-
let positions_open =
1511-
cache.positions_open(None, Some(&report.instrument_id), None, None, None);
1513+
let positions_open = Self::netting_positions_open_for_report(cache, report);
1514+
1515+
let position_refs = positions_open
1516+
.iter()
1517+
.map(|position| &**position)
1518+
.collect::<Vec<_>>();
1519+
1520+
if let Some(message) =
1521+
Self::netting_split_position_ownership_message(report, &position_refs)
1522+
{
1523+
log::warn!("{message}");
1524+
}
15121525

15131526
// Sum up cached position quantities using domain types to avoid f64 precision loss
15141527
let cached_signed_qty: Decimal = positions_open
15151528
.iter()
1516-
.map(|p| match p.side {
1517-
PositionSide::Long => p.quantity.as_decimal(),
1518-
PositionSide::Short => -p.quantity.as_decimal(),
1519-
_ => Decimal::ZERO,
1520-
})
1529+
.map(|position| Self::position_signed_decimal_qty(position))
15211530
.sum();
15221531

15231532
log::debug!(
@@ -1529,6 +1538,46 @@ impl ExecutionEngine {
15291538
let _ = check_position_reconciliation(report, cached_signed_qty, size_precision);
15301539
}
15311540

1541+
fn netting_positions_open_for_report<'a>(
1542+
cache: &'a Cache,
1543+
report: &PositionStatusReport,
1544+
) -> Vec<PositionRef<'a>> {
1545+
cache.positions_open(
1546+
None,
1547+
Some(&report.instrument_id),
1548+
None,
1549+
Some(&report.account_id),
1550+
None,
1551+
)
1552+
}
1553+
1554+
fn netting_split_position_ownership_message(
1555+
report: &PositionStatusReport,
1556+
positions_open: &[&Position],
1557+
) -> Option<String> {
1558+
let mut strategy_ids = positions_open
1559+
.iter()
1560+
.map(|position| position.strategy_id.to_string())
1561+
.collect::<Vec<_>>();
1562+
strategy_ids.sort();
1563+
strategy_ids.dedup();
1564+
1565+
if strategy_ids.len() <= 1 {
1566+
return None;
1567+
}
1568+
1569+
let position_details = Self::position_details(positions_open.iter().copied());
1570+
1571+
Some(format!(
1572+
"NETTING reconciliation found split ownership for account_id={}, instrument_id={}: \
1573+
strategies=[{}], positions=[{}]",
1574+
report.account_id,
1575+
report.instrument_id,
1576+
strategy_ids.join(", "),
1577+
position_details
1578+
))
1579+
}
1580+
15321581
/// Reconciles an execution mass status report.
15331582
///
15341583
/// Processes all order reports, fill reports, and position reports contained
@@ -2501,12 +2550,20 @@ impl ExecutionEngine {
25012550

25022551
match position_opt {
25032552
None => {
2553+
if self.reject_reduce_only_netting_position_open(&fill, oms_type) {
2554+
return;
2555+
}
2556+
25042557
// Position is None - open new position
25052558
if self.open_position(instrument, None, fill, oms_type).is_ok() {
25062559
// Position opened successfully
25072560
}
25082561
}
25092562
Some(pos) if pos.is_closed() => {
2563+
if self.reject_reduce_only_netting_position_open(&fill, oms_type) {
2564+
return;
2565+
}
2566+
25102567
// Position is closed - open new position
25112568
if self
25122569
.open_position(instrument, Some(&pos), fill, oms_type)
@@ -2527,6 +2584,55 @@ impl ExecutionEngine {
25272584
}
25282585
}
25292586

2587+
fn reject_reduce_only_netting_position_open(
2588+
&self,
2589+
fill: &OrderFilled,
2590+
oms_type: OmsType,
2591+
) -> bool {
2592+
if oms_type != OmsType::Netting {
2593+
return false;
2594+
}
2595+
2596+
let cache = self.cache.borrow();
2597+
let Some(order) = cache.order_owned(&fill.client_order_id) else {
2598+
return false;
2599+
};
2600+
2601+
if !order.is_reduce_only() {
2602+
return false;
2603+
}
2604+
2605+
let positions_open = cache.positions_open(
2606+
None,
2607+
Some(&fill.instrument_id),
2608+
None,
2609+
Some(&fill.account_id),
2610+
None,
2611+
);
2612+
let position_id = fill
2613+
.position_id
2614+
.map_or_else(|| "None".to_string(), |position_id| position_id.to_string());
2615+
let matching_position_details = Self::position_details(
2616+
positions_open
2617+
.iter()
2618+
.filter(|position| position.is_opposite_side(fill.order_side))
2619+
.map(|position| &**position),
2620+
);
2621+
let open_position_details =
2622+
Self::position_details(positions_open.iter().map(|position| &**position));
2623+
2624+
log::error!(
2625+
"Cannot open NETTING position {position_id} from reduce-only fill {} for {}; \
2626+
matching_reduce_positions=[{}], open_positions=[{}]",
2627+
fill.trade_id,
2628+
fill.instrument_id,
2629+
matching_position_details,
2630+
open_position_details
2631+
);
2632+
2633+
true
2634+
}
2635+
25302636
fn open_position(
25312637
&self,
25322638
instrument: &InstrumentAny,
@@ -2644,6 +2750,29 @@ impl ExecutionEngine {
26442750
position.is_opposite_side(fill.order_side) && (fill.last_qty.raw > position.quantity.raw)
26452751
}
26462752

2753+
fn position_signed_decimal_qty(position: &Position) -> Decimal {
2754+
match position.side {
2755+
PositionSide::Long => position.quantity.as_decimal(),
2756+
PositionSide::Short => -position.quantity.as_decimal(),
2757+
_ => Decimal::ZERO,
2758+
}
2759+
}
2760+
2761+
fn position_details<'a>(positions: impl IntoIterator<Item = &'a Position>) -> String {
2762+
positions
2763+
.into_iter()
2764+
.map(|position| {
2765+
format!(
2766+
"{} strategy_id={} signed_qty={}",
2767+
position.id,
2768+
position.strategy_id,
2769+
Self::position_signed_decimal_qty(position)
2770+
)
2771+
})
2772+
.collect::<Vec<_>>()
2773+
.join(", ")
2774+
}
2775+
26472776
fn flip_position(
26482777
&mut self,
26492778
instrument: &InstrumentAny,
@@ -2825,3 +2954,163 @@ impl ExecutionEngine {
28252954
RefMut::map(cache, |c| c.own_order_book_mut(instrument_id).unwrap())
28262955
}
28272956
}
2957+
2958+
#[cfg(test)]
2959+
mod tests {
2960+
use std::str::FromStr;
2961+
2962+
use nautilus_model::{
2963+
enums::{LiquiditySide, OrderSide, OrderType, PositionSideSpecified},
2964+
identifiers::{AccountId, ClientOrderId, TradeId, TraderId, VenueOrderId},
2965+
instruments::{InstrumentAny, stubs::audusd_sim},
2966+
types::Price,
2967+
};
2968+
use rstest::*;
2969+
2970+
use super::*;
2971+
2972+
#[rstest]
2973+
fn netting_positions_open_for_report_scopes_positions_by_account() {
2974+
let instrument = InstrumentAny::CurrencyPair(audusd_sim());
2975+
let account1_id = AccountId::from("SIM-001");
2976+
let account2_id = AccountId::from("SIM-002");
2977+
let position1 = position_for_account(
2978+
&instrument,
2979+
account1_id,
2980+
StrategyId::from("S-001"),
2981+
PositionId::from("P-ACC-1"),
2982+
OrderSide::Buy,
2983+
Quantity::from(1_000),
2984+
);
2985+
let position2 = position_for_account(
2986+
&instrument,
2987+
account2_id,
2988+
StrategyId::from("S-002"),
2989+
PositionId::from("P-ACC-2"),
2990+
OrderSide::Buy,
2991+
Quantity::from(2_000),
2992+
);
2993+
let mut cache = Cache::default();
2994+
cache.add_position(&position1, OmsType::Netting).unwrap();
2995+
cache.add_position(&position2, OmsType::Netting).unwrap();
2996+
2997+
let report = PositionStatusReport::new(
2998+
account1_id,
2999+
instrument.id(),
3000+
PositionSideSpecified::Long,
3001+
Quantity::from(1_000),
3002+
UnixNanos::from(1_000_000),
3003+
UnixNanos::from(1_000_000),
3004+
None,
3005+
None,
3006+
None,
3007+
);
3008+
3009+
let positions_open = ExecutionEngine::netting_positions_open_for_report(&cache, &report);
3010+
let signed_qty: Decimal = positions_open
3011+
.iter()
3012+
.map(|position| ExecutionEngine::position_signed_decimal_qty(position))
3013+
.sum();
3014+
3015+
assert_eq!(positions_open.len(), 1);
3016+
assert_eq!(positions_open[0].id, position1.id);
3017+
assert_eq!(signed_qty, Decimal::from(1_000));
3018+
}
3019+
3020+
#[rstest]
3021+
fn netting_split_position_ownership_message_reports_only_split_ownership() {
3022+
let instrument = InstrumentAny::CurrencyPair(audusd_sim());
3023+
let account_id = AccountId::from("SIM-001");
3024+
let external_position = position_for_account(
3025+
&instrument,
3026+
account_id,
3027+
StrategyId::from("EXTERNAL"),
3028+
PositionId::from("P-EXTERNAL"),
3029+
OrderSide::Buy,
3030+
Quantity::from(1_000),
3031+
);
3032+
let strategy_position = position_for_account(
3033+
&instrument,
3034+
account_id,
3035+
StrategyId::from("S-001"),
3036+
PositionId::from("P-STRATEGY"),
3037+
OrderSide::Buy,
3038+
Quantity::from(500),
3039+
);
3040+
let same_strategy_position = position_for_account(
3041+
&instrument,
3042+
account_id,
3043+
StrategyId::from("EXTERNAL"),
3044+
PositionId::from("P-EXTERNAL-2"),
3045+
OrderSide::Buy,
3046+
Quantity::from(250),
3047+
);
3048+
let report = PositionStatusReport::new(
3049+
account_id,
3050+
instrument.id(),
3051+
PositionSideSpecified::Long,
3052+
Quantity::from(1_500),
3053+
UnixNanos::from(1_000_000),
3054+
UnixNanos::from(1_000_000),
3055+
None,
3056+
None,
3057+
None,
3058+
);
3059+
3060+
let message = ExecutionEngine::netting_split_position_ownership_message(
3061+
&report,
3062+
&[&external_position, &strategy_position],
3063+
)
3064+
.expect("split ownership should produce a warning message");
3065+
3066+
assert!(message.contains("account_id=SIM-001"));
3067+
assert!(message.contains(&format!("instrument_id={}", instrument.id())));
3068+
assert!(message.contains("EXTERNAL"));
3069+
assert!(message.contains("S-001"));
3070+
assert!(message.contains("P-EXTERNAL"));
3071+
assert!(message.contains("P-STRATEGY"));
3072+
assert!(message.contains("signed_qty=1000"));
3073+
assert!(message.contains("signed_qty=500"));
3074+
assert!(
3075+
ExecutionEngine::netting_split_position_ownership_message(
3076+
&report,
3077+
&[&external_position, &same_strategy_position],
3078+
)
3079+
.is_none()
3080+
);
3081+
}
3082+
3083+
fn position_for_account(
3084+
instrument: &InstrumentAny,
3085+
account_id: AccountId,
3086+
strategy_id: StrategyId,
3087+
position_id: PositionId,
3088+
order_side: OrderSide,
3089+
quantity: Quantity,
3090+
) -> Position {
3091+
let client_order_id = ClientOrderId::from(format!("O-{position_id}"));
3092+
let fill = OrderFilled::new(
3093+
TraderId::default(),
3094+
strategy_id,
3095+
instrument.id(),
3096+
client_order_id,
3097+
VenueOrderId::from(format!("V-{position_id}")),
3098+
account_id,
3099+
TradeId::new(format!("T-{position_id}")),
3100+
order_side,
3101+
OrderType::Market,
3102+
quantity,
3103+
Price::from_str("1.0").unwrap(),
3104+
instrument.quote_currency(),
3105+
LiquiditySide::Maker,
3106+
UUID4::new(),
3107+
UnixNanos::default(),
3108+
UnixNanos::default(),
3109+
false,
3110+
Some(position_id),
3111+
Some(Money::from("2 USD")),
3112+
);
3113+
3114+
Position::new(instrument, fill)
3115+
}
3116+
}

0 commit comments

Comments
 (0)