Skip to content

Commit c57cc5f

Browse files
ldamasioclaude
andauthored
fix(funding): resolve USDT conversion route per asset (handles fiat/BRL) (#87)
The saga assumed every non-USDT spot asset X has a direct {X}USDT SELL pair, so a BRL spot balance produced BRLUSDT (-1121 Invalid symbol) and the quote 500'd. Resolve the route symbol-agnostically (ADR-0023): - {X}USDT trading → SELL X (base qty) [existing] - else USDT{X} trading → BUY USDT (quote qty) [new: fiat/quote-asset, e.g. BRL via USDTBRL] - else skip the asset. Adds ExchangePort::spot_symbol_is_trading (Binance: /api/v3/exchangeInfo, -1121 → false), StubExchange BUY balance handling + set_trading_symbols, and test inverse_pair_brl_to_usdt (R$1000 @ 5.0 BRL/USDT → ~199.8 USDT → REFRESHED). All money uses Decimal. 6 funding tests pass vs Postgres 16. Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 37abee2 commit c57cc5f

6 files changed

Lines changed: 187 additions & 10 deletions

File tree

robson-connectors/src/binance_rest.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,21 @@ impl BinanceRestClient {
626626
.map_err(|e| BinanceRestError::ParseError(format!("Invalid spot price: {}", e)))
627627
}
628628

629+
pub async fn spot_symbol_is_trading(&self, symbol: &str) -> Result<bool, BinanceRestError> {
630+
let body = match self
631+
.get_public("/api/v3/exchangeInfo", vec![("symbol", symbol.to_string())])
632+
.await
633+
{
634+
Ok(body) => body,
635+
Err(BinanceRestError::ApiError { code: -1121, .. }) => return Ok(false),
636+
Err(error) => return Err(error),
637+
};
638+
let response: BinanceSpotExchangeInfoResponse =
639+
serde_json::from_str(&body).map_err(|e| BinanceRestError::ParseError(e.to_string()))?;
640+
641+
Ok(response.symbols.into_iter().any(|info| info.status == "TRADING"))
642+
}
643+
629644
pub async fn place_spot_market_order(
630645
&self,
631646
symbol: &str,
@@ -882,6 +897,16 @@ pub struct BinanceSpotBalance {
882897
pub locked: Decimal,
883898
}
884899

900+
#[derive(Debug, Clone, Deserialize)]
901+
struct BinanceSpotExchangeInfoResponse {
902+
symbols: Vec<BinanceSpotSymbolInfo>,
903+
}
904+
905+
#[derive(Debug, Clone, Deserialize)]
906+
struct BinanceSpotSymbolInfo {
907+
status: String,
908+
}
909+
885910
#[derive(Debug, Clone, Deserialize)]
886911
#[serde(rename_all = "camelCase")]
887912
pub struct BinanceSpotOrderResponse {

robson-exec/src/ports.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ pub trait ExchangePort: Send + Sync {
177177

178178
async fn get_spot_price(&self, symbol: &str) -> Result<Price, ExecError>;
179179

180+
async fn spot_symbol_is_trading(&self, symbol: &str) -> Result<bool, ExecError> {
181+
let _ = symbol;
182+
Err(ExecError::Exchange("spot symbol trading lookup is not implemented".to_string()))
183+
}
184+
180185
async fn place_spot_market_order(
181186
&self,
182187
request: SpotOrderRequest,

robson-exec/src/stub.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
//! These implementations simulate exchange and market data behavior
44
//! without making real API calls.
55
6-
use std::{collections::HashMap, sync::RwLock};
6+
use std::{
7+
collections::{HashMap, HashSet},
8+
sync::RwLock,
9+
};
710

811
use async_trait::async_trait;
912
use chrono::{DateTime, Utc};
@@ -52,6 +55,7 @@ pub struct StubExchange {
5255
spot_balances: RwLock<HashMap<String, SpotBalance>>,
5356
spot_orders: RwLock<HashMap<String, SpotOrder>>,
5457
transfers: RwLock<Vec<Transfer>>,
58+
trading_symbols: RwLock<Option<HashSet<String>>>,
5559
spot_order_calls: RwLock<u64>,
5660
transfer_calls: RwLock<u64>,
5761
}
@@ -76,14 +80,15 @@ impl StubExchange {
7680
spot_balances: RwLock::new(HashMap::new()),
7781
spot_orders: RwLock::new(HashMap::new()),
7882
transfers: RwLock::new(Vec::new()),
83+
trading_symbols: RwLock::new(None),
7984
spot_order_calls: RwLock::new(0),
8085
transfer_calls: RwLock::new(0),
8186
}
8287
}
8388

8489
/// Create a stub exchange with a specific futures balance.
8590
pub fn with_balance(default_price: Decimal, balance: Decimal) -> Self {
86-
let mut exchange = Self::new(default_price);
91+
let exchange = Self::new(default_price);
8792
*exchange.futures_balance.write().unwrap() = balance;
8893
exchange
8994
}
@@ -105,6 +110,11 @@ impl StubExchange {
105110
prices.insert(symbol.to_string(), price);
106111
}
107112

113+
pub fn set_trading_symbols(&self, symbols: &[&str]) {
114+
*self.trading_symbols.write().unwrap() =
115+
Some(symbols.iter().map(|symbol| (*symbol).to_string()).collect());
116+
}
117+
108118
/// Get price for a symbol (or default).
109119
pub fn get_price_decimal(&self, symbol: &str) -> Decimal {
110120
let prices = self.prices.read().unwrap();
@@ -322,6 +332,16 @@ impl ExchangePort for StubExchange {
322332
Ok(Price::new(self.get_price_decimal(symbol)).unwrap())
323333
}
324334

335+
async fn spot_symbol_is_trading(&self, symbol: &str) -> Result<bool, ExecError> {
336+
if self.should_fail() {
337+
return Err(ExecError::Exchange("Simulated symbol lookup failure".to_string()));
338+
}
339+
if let Some(symbols) = self.trading_symbols.read().unwrap().as_ref() {
340+
return Ok(symbols.contains(symbol));
341+
}
342+
Ok(self.prices.read().unwrap().contains_key(symbol))
343+
}
344+
325345
async fn place_spot_market_order(
326346
&self,
327347
request: SpotOrderRequest,
@@ -340,7 +360,10 @@ impl ExchangePort for StubExchange {
340360
SpotOrderSide::Sell => base_qty * price,
341361
SpotOrderSide::Buy => request.quantity,
342362
};
343-
let fee = quote_qty * self.fee_rate;
363+
let fee = match request.side {
364+
SpotOrderSide::Sell => quote_qty * self.fee_rate,
365+
SpotOrderSide::Buy => base_qty * self.fee_rate,
366+
};
344367
let asset = request.symbol.trim_end_matches("USDT");
345368

346369
if request.side == SpotOrderSide::Sell {
@@ -357,6 +380,28 @@ impl ExchangePort for StubExchange {
357380
locked: Decimal::ZERO,
358381
});
359382
usdt.free += quote_qty - fee;
383+
} else if request.side == SpotOrderSide::Buy
384+
&& request.quantity_kind == SpotOrderQuantity::Quote
385+
{
386+
let quote_asset = request.symbol.strip_prefix("USDT").ok_or_else(|| {
387+
ExecError::Exchange(format!(
388+
"Stub spot BUY quote order requires an inverse USDT pair, got {}",
389+
request.symbol
390+
))
391+
})?;
392+
let mut balances = self.spot_balances.write().unwrap();
393+
let quote = balances.entry(quote_asset.to_string()).or_insert_with(|| SpotBalance {
394+
asset: quote_asset.to_string(),
395+
free: Decimal::ZERO,
396+
locked: Decimal::ZERO,
397+
});
398+
quote.free = (quote.free - request.quantity).max(Decimal::ZERO);
399+
let usdt = balances.entry("USDT".to_string()).or_insert_with(|| SpotBalance {
400+
asset: "USDT".to_string(),
401+
free: Decimal::ZERO,
402+
locked: Decimal::ZERO,
403+
});
404+
usdt.free += base_qty - fee;
360405
}
361406

362407
let order = SpotOrder {

robsond/src/binance_exchange.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,10 @@ impl ExchangePort for BinanceExchangeAdapter {
337337
self.client.get_spot_price(symbol).await.map_err(Self::map_error)
338338
}
339339

340+
async fn spot_symbol_is_trading(&self, symbol: &str) -> Result<bool, ExecError> {
341+
self.client.spot_symbol_is_trading(symbol).await.map_err(Self::map_error)
342+
}
343+
340344
async fn place_spot_market_order(
341345
&self,
342346
request: SpotOrderRequest,

robsond/src/funding/saga.rs

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,15 @@ impl<E: ExchangePort + 'static, S: Store + 'static> FundingService<E, S> {
8080
if balance.asset == "USDT" || balance.free <= Decimal::ZERO {
8181
continue;
8282
}
83-
let symbol = format!("{}USDT", balance.asset);
84-
let price = self.exchange.get_spot_price(&symbol).await?;
85-
let gross = balance.free * price.as_decimal();
83+
let Some(route) = self.resolve_usdt_route(&balance.asset).await? else {
84+
tracing::debug!(asset = %balance.asset, "Skipping spot asset without USDT route");
85+
continue;
86+
};
87+
let price = self.exchange.get_spot_price(&route.symbol).await?;
88+
let gross = match route.side {
89+
SpotOrderSide::Sell => balance.free * price.as_decimal(),
90+
SpotOrderSide::Buy => balance.free / price.as_decimal(),
91+
};
8692
if gross < self.config.dust_usdt {
8793
continue;
8894
}
@@ -97,7 +103,7 @@ impl<E: ExchangePort + 'static, S: Store + 'static> FundingService<E, S> {
97103
asset: balance.asset,
98104
qty: balance.free,
99105
est_usdt,
100-
symbol,
106+
symbol: route.symbol,
101107
});
102108
}
103109

@@ -152,15 +158,16 @@ impl<E: ExchangePort + 'static, S: Store + 'static> FundingService<E, S> {
152158
if view.state == FundingState::Converting.as_str() {
153159
for item in &quote.items {
154160
let client_order_id = spot_client_order_id(quote_id, &item.asset);
161+
let route = route_from_quote_item(&item.asset, &item.symbol)?;
155162
let order =
156163
match self.exchange.get_spot_order(&item.symbol, &client_order_id).await? {
157164
Some(order) if order.status == "FILLED" => order,
158165
_ => {
159166
self.exchange
160167
.place_spot_market_order(SpotOrderRequest {
161168
symbol: item.symbol.clone(),
162-
side: SpotOrderSide::Sell,
163-
quantity_kind: SpotOrderQuantity::Base,
169+
side: route.side,
170+
quantity_kind: route.quantity_kind,
164171
quantity: item.qty,
165172
client_order_id: client_order_id.clone(),
166173
})
@@ -174,14 +181,21 @@ impl<E: ExchangePort + 'static, S: Store + 'static> FundingService<E, S> {
174181
item.asset
175182
)));
176183
}
184+
let usdt_out = match route.side {
185+
SpotOrderSide::Sell => order.cummulative_quote_qty,
186+
SpotOrderSide::Buy if order.fee_asset == "USDT" => {
187+
(order.executed_qty - order.fee).max(Decimal::ZERO)
188+
},
189+
SpotOrderSide::Buy => order.executed_qty,
190+
};
177191
self.append_and_project(
178192
quote_id,
179193
FundingState::Converting,
180194
"ConversionExecuted",
181195
json!({
182196
"asset": item.asset,
183197
"qty": order.executed_qty,
184-
"usdt_out": order.cummulative_quote_qty,
198+
"usdt_out": usdt_out,
185199
"client_order_id": client_order_id,
186200
}),
187201
)
@@ -338,6 +352,28 @@ impl<E: ExchangePort + 'static, S: Store + 'static> FundingService<E, S> {
338352
.unwrap_or(Decimal::ZERO))
339353
}
340354

355+
async fn resolve_usdt_route(&self, asset: &str) -> DaemonResult<Option<SpotConversionRoute>> {
356+
let direct = format!("{asset}USDT");
357+
if self.exchange.spot_symbol_is_trading(&direct).await? {
358+
return Ok(Some(SpotConversionRoute {
359+
symbol: direct,
360+
side: SpotOrderSide::Sell,
361+
quantity_kind: SpotOrderQuantity::Base,
362+
}));
363+
}
364+
365+
let inverse = format!("USDT{asset}");
366+
if self.exchange.spot_symbol_is_trading(&inverse).await? {
367+
return Ok(Some(SpotConversionRoute {
368+
symbol: inverse,
369+
side: SpotOrderSide::Buy,
370+
quantity_kind: SpotOrderQuantity::Quote,
371+
}));
372+
}
373+
374+
Ok(None)
375+
}
376+
341377
async fn fail(&self, saga_id: Uuid, reason: &str) -> DaemonResult<()> {
342378
self.append_and_project(
343379
saga_id,
@@ -480,3 +516,31 @@ fn spot_client_order_id(saga_id: Uuid, asset: &str) -> String {
480516
fn transfer_client_key(saga_id: Uuid) -> String {
481517
format!("rbx-fund-transfer-{}", saga_id.simple())
482518
}
519+
520+
#[cfg(feature = "postgres")]
521+
struct SpotConversionRoute {
522+
symbol: String,
523+
side: SpotOrderSide,
524+
quantity_kind: SpotOrderQuantity,
525+
}
526+
527+
#[cfg(feature = "postgres")]
528+
fn route_from_quote_item(asset: &str, symbol: &str) -> DaemonResult<SpotConversionRoute> {
529+
if symbol == format!("{asset}USDT") {
530+
return Ok(SpotConversionRoute {
531+
symbol: symbol.to_string(),
532+
side: SpotOrderSide::Sell,
533+
quantity_kind: SpotOrderQuantity::Base,
534+
});
535+
}
536+
537+
if symbol == format!("USDT{asset}") {
538+
return Ok(SpotConversionRoute {
539+
symbol: symbol.to_string(),
540+
side: SpotOrderSide::Buy,
541+
quantity_kind: SpotOrderQuantity::Quote,
542+
});
543+
}
544+
545+
Err(DaemonError::Config(format!("invalid_funding_route:{asset}:{symbol}")))
546+
}

robsond/src/funding/tests.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ impl ExchangePort for PostConvertedCrashExchange {
152152
self.inner.get_spot_price(symbol).await
153153
}
154154

155+
async fn spot_symbol_is_trading(&self, symbol: &str) -> Result<bool, ExecError> {
156+
self.inner.spot_symbol_is_trading(symbol).await
157+
}
158+
155159
async fn place_spot_market_order(
156160
&self,
157161
request: SpotOrderRequest,
@@ -239,6 +243,36 @@ async fn happy_path(pool: PgPool) -> anyhow::Result<()> {
239243
Ok(())
240244
}
241245

246+
#[sqlx::test(migrations = "../migrations")]
247+
#[ignore = "Requires DATABASE_URL"]
248+
async fn inverse_pair_brl_to_usdt(pool: PgPool) -> anyhow::Result<()> {
249+
let exchange = Arc::new(StubExchange::new(dec!(50000)));
250+
exchange.set_futures_balance(dec!(10000));
251+
exchange.set_spot_balance("BRL", dec!(1000), Decimal::ZERO);
252+
exchange.set_price("USDTBRL", dec!(5.0));
253+
exchange.set_trading_symbols(&["USDTBRL"]);
254+
let harness = test_service(pool, exchange, FundingConfig::default());
255+
256+
let quote = harness.service.quote().await?;
257+
258+
assert_eq!(quote.items.len(), 1);
259+
assert_eq!(quote.items[0].asset, "BRL");
260+
assert_eq!(quote.items[0].symbol, "USDTBRL");
261+
assert_eq!(quote.items[0].qty, dec!(1000));
262+
assert_eq!(quote.items[0].est_usdt, dec!(199.4000));
263+
264+
let response = harness.service.execute(quote.quote_id, "inverse-brl").await?;
265+
266+
assert_eq!(response.state, FundingState::Refreshed.as_str());
267+
assert_eq!(harness.exchange.spot_order_call_count(), 1);
268+
assert_eq!(harness.exchange.transfer_call_count(), 1);
269+
270+
let capital = harness.service.refresh_capital().await?;
271+
assert_eq!(capital, dec!(10199.800));
272+
273+
Ok(())
274+
}
275+
242276
#[sqlx::test(migrations = "../migrations")]
243277
#[ignore = "Requires DATABASE_URL"]
244278
async fn idempotent_convert_no_double_execute(pool: PgPool) -> anyhow::Result<()> {

0 commit comments

Comments
 (0)