Skip to content

Commit 94f28c9

Browse files
authored
Improve error handling for depth manager (#1579)
* add timeout to jobs * feat: error handling depth cache * imrpove example * lint
1 parent 287f95d commit 94f28c9

6 files changed

+149
-15
lines changed

binance/ws/depthcache.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,17 @@ async def __aenter__(self):
185185
return self
186186

187187
async def __aexit__(self, *args, **kwargs):
188+
self._log.debug(f"Exiting depth cache manager for {self._symbol}")
188189
await self._socket.__aexit__(*args, **kwargs)
189190

190191
async def recv(self):
191192
dc = None
192193
while not dc:
193194
try:
194195
res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT)
196+
self._log.debug(f"Received message: {res}")
195197
except Exception as e:
196-
self._log.warning(e)
198+
self._log.warning(f"Exception recieving message: {e.__class__.__name__} (e) ")
197199
else:
198200
dc = await self._depth_event(res)
199201
return dc
@@ -203,7 +205,7 @@ async def _init_cache(self):
203205
204206
:return:
205207
"""
206-
208+
self._log.debug(f"Initialising depth cache for {self._symbol}")
207209
# initialise or clear depth cache
208210
self._depth_cache = DepthCache(self._symbol, conv_type=self._conv_type)
209211

@@ -228,16 +230,15 @@ async def _depth_event(self, msg):
228230
:return:
229231
230232
"""
233+
self._log.debug(f"Received depth event: {msg}")
231234

232235
if not msg:
233236
return None
234237

235238
if "e" in msg and msg["e"] == "error":
236-
# close the socket
237-
await self.close()
238-
239-
# notify the user by returning a None value
240-
return None
239+
# notify user by return msg with error
240+
self._log.error(f"Error in depth event restarting cache: {msg}")
241+
return msg
241242

242243
return await self._process_depth_message(msg)
243244

binance/ws/reconnecting_websocket.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ async def _read_loop(self):
202202
self.ws.recv(), timeout=self.TIMEOUT
203203
)
204204
res = self._handle_message(res)
205-
print(self._queue.qsize())
206205
self._log.debug(f"Received message: {res}")
207206
if res:
208207
if self._queue.qsize() < self.max_queue_size:
@@ -216,6 +215,11 @@ async def _read_loop(self):
216215
# _no_message_received_reconnect
217216
except asyncio.CancelledError as e:
218217
self._log.debug(f"_read_loop cancelled error {e}")
218+
await self._queue.put({
219+
"e": "error",
220+
"type": f"{e.__class__.__name__}",
221+
"m": f"{e}",
222+
})
219223
break
220224
except (
221225
asyncio.IncompleteReadError,
@@ -236,7 +240,7 @@ async def _read_loop(self):
236240
Exception,
237241
) as e:
238242
# reports errors and break the loop
239-
self._log.error(f"Unknown exception ({e})")
243+
self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})")
240244
await self._queue.put({
241245
"e": "error",
242246
"type": e.__class__.__name__,

binance/ws/threaded_stream.py

+12-6
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,19 @@ async def start_listener(self, socket, path: str, callback):
5959
except asyncio.TimeoutError:
6060
...
6161
continue
62+
except Exception as e:
63+
self._log.error(f"Error receiving message: {e}")
64+
msg = {
65+
"e": "error",
66+
"type": e.__class__.__name__,
67+
"m": f"{e}",
68+
}
69+
if not msg:
70+
continue # Handle both async and sync callbacks
71+
if asyncio.iscoroutinefunction(callback):
72+
asyncio.create_task(callback(msg))
6273
else:
63-
if not msg:
64-
continue # Handle both async and sync callbacks
65-
if asyncio.iscoroutinefunction(callback):
66-
asyncio.create_task(callback(msg))
67-
else:
68-
callback(msg)
74+
callback(msg)
6975
del self._socket_running[path]
7076

7177
def run(self):

docs/depth_cache.rst

+25
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,31 @@ Websocket Errors
153153
----------------
154154

155155
If the underlying websocket is disconnected and is unable to reconnect None is returned for the depth_cache parameter.
156+
If the underlying websocket is disconnected an error msg is passed to the callback and to recv() containing the error message.
157+
In the case the BinanceWebsocketClosed is returned, the websocket will attempt to reconnect 5 times before returning a BinanceUnableToConnect error.
158+
Example:
159+
160+
.. code:: python
161+
162+
depth_cache = await dcm.recv()
163+
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
164+
logger.error(f"Received depth cache error in callback: {depth_cache}")
165+
if type == 'BinanceWebsocketClosed':
166+
# ignore as attempts to reconnect
167+
continue
168+
break
169+
170+
.. code:: python
171+
def handle_depth_cache(depth_cache):
172+
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
173+
logger.error(f"Received depth cache error in callback: {depth_cache}")
174+
type = depth_cache.get('type')
175+
if type == 'BinanceWebsocketClosed':
176+
# Automatically attempts to reconnect
177+
return
178+
dcm.stop()
179+
return
180+
# handle non error cases here
156181
157182
Examples
158183
--------

examples/depth_cache_example.py

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
#!/usr/bin/env python3
2+
3+
import os
4+
import sys
5+
6+
root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
7+
sys.path.append(root)
8+
9+
import asyncio
10+
import logging
11+
from binance import AsyncClient
12+
from binance.ws.depthcache import DepthCacheManager
13+
14+
logging.basicConfig(level=logging.DEBUG)
15+
logger = logging.getLogger(__name__)
16+
17+
async def main():
18+
# Initialize the client
19+
client = await AsyncClient.create()
20+
21+
# Symbol to monitor
22+
symbol = 'BTCUSDT'
23+
24+
# Create a depth cache manager instance
25+
async with DepthCacheManager(
26+
client=client,
27+
symbol=symbol,
28+
) as dcm:
29+
logger.info(f"Started depth cache for {symbol}")
30+
31+
# Monitor depth cache updates for 1 minute
32+
for _ in range(100): # 6 iterations * 10 seconds = 1 minute
33+
depth_cache = await dcm.recv()
34+
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
35+
logger.error(f"Received depth cache error in callback: {depth_cache}")
36+
if type == 'BinanceWebsocketClosed':
37+
# ignore as attempts to reconnect
38+
continue
39+
break
40+
41+
# Get current bids and asks
42+
bids = depth_cache.get_bids()[:5] # Top 5 bids
43+
asks = depth_cache.get_asks()[:5] # Top 5 asks
44+
45+
logger.info("Top 5 bids:")
46+
for bid in bids:
47+
logger.info(f"Price: {bid[0]}, Quantity: {bid[1]}")
48+
49+
logger.info("Top 5 asks:")
50+
for ask in asks:
51+
logger.info(f"Price: {ask[0]}, Quantity: {ask[1]}")
52+
53+
logger.info(f"Last update time: {depth_cache.update_time}")
54+
55+
# Close the client
56+
await client.close_connection()
57+
58+
if __name__ == '__main__':
59+
# Run the async example
60+
asyncio.run(main())
+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
#!/usr/bin/env python3
2+
3+
import os
4+
import sys
5+
6+
root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
7+
sys.path.append(root)
8+
9+
import logging
10+
from binance.ws.depthcache import ThreadedDepthCacheManager
11+
12+
logging.basicConfig(level=logging.DEBUG)
13+
logger = logging.getLogger(__name__)
14+
15+
def main():
16+
dcm = ThreadedDepthCacheManager()
17+
dcm.start()
18+
19+
def handle_depth_cache(depth_cache):
20+
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
21+
logger.error(f"Received depth cache error in callback: {depth_cache}")
22+
type = depth_cache.get('type')
23+
if type == 'BinanceWebsocketClosed':
24+
# Automatically attempts to reconnect
25+
return
26+
logger.error(f"Error received - Closing depth cache: {depth_cache}")
27+
dcm.stop()
28+
return
29+
30+
logger.info(f"symbol {depth_cache.symbol}")
31+
logger.info(depth_cache.get_bids()[:5])
32+
33+
dcm.start_depth_cache(handle_depth_cache, symbol='BNBBTC')
34+
dcm.join()
35+
36+
37+
if __name__ == "__main__":
38+
main()

0 commit comments

Comments
 (0)