@@ -36,22 +36,38 @@ impl ExchangeStream for TradesConnection {
3636 type Item = Trade ;
3737
3838 async fn next ( & mut self ) -> Result < Self :: Item , WsError > {
39- let content_event = self . connection . next ( ) . await ?;
40- let trade_event = match self . instrument {
41- Instrument :: Perp => {
42- let interpreted_response = serde_json:: from_value :: < TradeEventPerp > ( content_event. data ) . expect ( "Exchange responded with invalid trade event" ) ;
43- Trade :: from ( interpreted_response)
39+ loop {
40+ let content_event = self . connection . next ( ) . await ?;
41+ let trade = match self . instrument {
42+ Instrument :: Perp => {
43+ let parsed = serde_json:: from_value :: < TradeEventPerp > ( content_event. data . clone ( ) ) . expect ( "Exchange responded with invalid trade event" ) ;
44+ Trade :: from ( parsed)
45+ }
46+ Instrument :: Spot | Instrument :: Margin => {
47+ let parsed = serde_json:: from_value :: < TradeEventSpot > ( content_event. data . clone ( ) ) . expect ( "Exchange responded with invalid trade event" ) ;
48+ Trade :: from ( parsed)
49+ }
50+ _ => unimplemented ! ( ) ,
51+ } ;
52+ if trade. price == 0.0 || trade. qty_asset == 0.0 {
53+ warn_zeroed_trade ( & content_event) ;
54+ continue ;
4455 }
45- Instrument :: Spot | Instrument :: Margin => {
46- let initial = serde_json:: from_value :: < TradeEventSpot > ( content_event. data ) . expect ( "Exchange responded with invalid trade event" ) ;
47- Trade :: from ( initial)
48- }
49- _ => unimplemented ! ( ) ,
50- } ;
51- Ok ( trade_event)
56+ return Ok ( trade) ;
57+ }
5258 }
5359}
5460
61+ fn warn_zeroed_trade ( event : & adapters:: generics:: ws:: ContentEvent ) {
62+ tracing:: warn!(
63+ raw_json = %event. data,
64+ topic = %event. topic,
65+ event_type = %event. event_type,
66+ event_time = %event. time,
67+ "Binance sent a zero-valued trade, skipping"
68+ ) ;
69+ }
70+
5571#[ serde_as]
5672#[ derive( Clone , Debug , Default , serde:: Deserialize , serde:: Serialize ) ]
5773pub struct TradeEventPerp {
0 commit comments