Skip to content

Improve error handling for depth manager #1579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 21, 2025
15 changes: 8 additions & 7 deletions binance/ws/depthcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,15 +185,17 @@ async def __aenter__(self):
return self

async def __aexit__(self, *args, **kwargs):
self._log.debug(f"Exiting depth cache manager for {self._symbol}")
await self._socket.__aexit__(*args, **kwargs)

async def recv(self):
dc = None
while not dc:
try:
res = await asyncio.wait_for(self._socket.recv(), timeout=self.TIMEOUT)
self._log.debug(f"Received message: {res}")
except Exception as e:
self._log.warning(e)
self._log.warning(f"Exception recieving message: {e.__class__.__name__} (e) ")
else:
dc = await self._depth_event(res)
return dc
Expand All @@ -203,7 +205,7 @@ async def _init_cache(self):

:return:
"""

self._log.debug(f"Initialising depth cache for {self._symbol}")
# initialise or clear depth cache
self._depth_cache = DepthCache(self._symbol, conv_type=self._conv_type)

Expand All @@ -228,16 +230,15 @@ async def _depth_event(self, msg):
:return:

"""
self._log.debug(f"Received depth event: {msg}")

if not msg:
return None

if "e" in msg and msg["e"] == "error":
# close the socket
await self.close()

# notify the user by returning a None value
return None
# notify user by return msg with error
self._log.error(f"Error in depth event restarting cache: {msg}")
return msg

return await self._process_depth_message(msg)

Expand Down
8 changes: 6 additions & 2 deletions binance/ws/reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ async def _read_loop(self):
self.ws.recv(), timeout=self.TIMEOUT
)
res = self._handle_message(res)
print(self._queue.qsize())
self._log.debug(f"Received message: {res}")
if res:
if self._queue.qsize() < self.max_queue_size:
Expand All @@ -216,6 +215,11 @@ async def _read_loop(self):
# _no_message_received_reconnect
except asyncio.CancelledError as e:
self._log.debug(f"_read_loop cancelled error {e}")
await self._queue.put({
"e": "error",
"type": f"{e.__class__.__name__}",
"m": f"{e}",
})
break
except (
asyncio.IncompleteReadError,
Expand All @@ -236,7 +240,7 @@ async def _read_loop(self):
Exception,
) as e:
# reports errors and break the loop
self._log.error(f"Unknown exception ({e})")
self._log.error(f"Unknown exception: {e.__class__.__name__} ({e})")
await self._queue.put({
"e": "error",
"type": e.__class__.__name__,
Expand Down
18 changes: 12 additions & 6 deletions binance/ws/threaded_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,19 @@ async def start_listener(self, socket, path: str, callback):
except asyncio.TimeoutError:
...
continue
except Exception as e:
self._log.error(f"Error receiving message: {e}")
msg = {
"e": "error",
"type": e.__class__.__name__,
"m": f"{e}",
}
if not msg:
continue # Handle both async and sync callbacks
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(msg))
else:
if not msg:
continue # Handle both async and sync callbacks
if asyncio.iscoroutinefunction(callback):
asyncio.create_task(callback(msg))
else:
callback(msg)
callback(msg)
del self._socket_running[path]

def run(self):
Expand Down
25 changes: 25 additions & 0 deletions docs/depth_cache.rst
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,31 @@ Websocket Errors
----------------

If the underlying websocket is disconnected and is unable to reconnect None is returned for the depth_cache parameter.
If the underlying websocket is disconnected an error msg is passed to the callback and to recv() containing the error message.
In the case the BinanceWebsocketClosed is returned, the websocket will attempt to reconnect 5 times before returning a BinanceUnableToConnect error.
Example:

.. code:: python

depth_cache = await dcm.recv()
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
logger.error(f"Received depth cache error in callback: {depth_cache}")
if type == 'BinanceWebsocketClosed':
# ignore as attempts to reconnect
continue
break

.. code:: python
def handle_depth_cache(depth_cache):
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
logger.error(f"Received depth cache error in callback: {depth_cache}")
type = depth_cache.get('type')
if type == 'BinanceWebsocketClosed':
# Automatically attempts to reconnect
return
dcm.stop()
return
# handle non error cases here

Examples
--------
Expand Down
60 changes: 60 additions & 0 deletions examples/depth_cache_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env python3

import os
import sys

root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(root)

import asyncio
import logging
from binance import AsyncClient
from binance.ws.depthcache import DepthCacheManager

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

async def main():
# Initialize the client
client = await AsyncClient.create()

# Symbol to monitor
symbol = 'BTCUSDT'

# Create a depth cache manager instance
async with DepthCacheManager(
client=client,
symbol=symbol,
) as dcm:
logger.info(f"Started depth cache for {symbol}")

# Monitor depth cache updates for 1 minute
for _ in range(100): # 6 iterations * 10 seconds = 1 minute
depth_cache = await dcm.recv()
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
logger.error(f"Received depth cache error in callback: {depth_cache}")
if type == 'BinanceWebsocketClosed':
# ignore as attempts to reconnect
continue
break

# Get current bids and asks
bids = depth_cache.get_bids()[:5] # Top 5 bids
asks = depth_cache.get_asks()[:5] # Top 5 asks

logger.info("Top 5 bids:")
for bid in bids:
logger.info(f"Price: {bid[0]}, Quantity: {bid[1]}")

logger.info("Top 5 asks:")
for ask in asks:
logger.info(f"Price: {ask[0]}, Quantity: {ask[1]}")

logger.info(f"Last update time: {depth_cache.update_time}")

# Close the client
await client.close_connection()

if __name__ == '__main__':
# Run the async example
asyncio.run(main())
38 changes: 38 additions & 0 deletions examples/depth_cache_threaded_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/usr/bin/env python3

import os
import sys

root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(root)

import logging
from binance.ws.depthcache import ThreadedDepthCacheManager

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

def main():
dcm = ThreadedDepthCacheManager()
dcm.start()

def handle_depth_cache(depth_cache):
if isinstance(depth_cache, dict) and depth_cache.get('e') == 'error':
logger.error(f"Received depth cache error in callback: {depth_cache}")
type = depth_cache.get('type')
if type == 'BinanceWebsocketClosed':
# Automatically attempts to reconnect
return
logger.error(f"Error received - Closing depth cache: {depth_cache}")
dcm.stop()
return

logger.info(f"symbol {depth_cache.symbol}")
logger.info(depth_cache.get_bids()[:5])

dcm.start_depth_cache(handle_depth_cache, symbol='BNBBTC')
dcm.join()


if __name__ == "__main__":
main()
Loading