diff --git a/.github/workflows/python-app.yml b/.github/workflows/python-app.yml index a3a2d61a5..713b1b810 100644 --- a/.github/workflows/python-app.yml +++ b/.github/workflows/python-app.yml @@ -67,18 +67,19 @@ jobs: run: pyright - name: Test with tox run: tox -e py - - name: Coveralls Parallel - uses: coverallsapp/github-action@v2 - with: - flag-name: run-${{ join(matrix.*, '-') }} - parallel: true - finish: - needs: build - if: ${{ always() }} - runs-on: ubuntu-latest - timeout-minutes: 5 - steps: - - name: Coveralls Finished - uses: coverallsapp/github-action@v2 - with: - parallel-finished: true + # comment due to coveralls maintenance of April 6 2025: https://status.coveralls.io/ + # - name: Coveralls Parallel + # uses: coverallsapp/github-action@v2 + # with: + # flag-name: run-${{ join(matrix.*, '-') }} + # parallel: true + # finish: + # needs: build + # if: ${{ always() }} + # runs-on: ubuntu-latest + # timeout-minutes: 5 + # steps: + # - name: Coveralls Finished + # uses: coverallsapp/github-action@v2 + # with: + # parallel-finished: true diff --git a/binance/async_client.py b/binance/async_client.py index 113483c08..a8f899685 100644 --- a/binance/async_client.py +++ b/binance/async_client.py @@ -64,7 +64,10 @@ async def create( testnet: bool = False, loop=None, session_params: Optional[Dict[str, Any]] = None, + private_key: Optional[Union[str, Path]] = None, + private_key_pass: Optional[str] = None, https_proxy: Optional[str] = None, + time_unit: Optional[str] = None, ): self = cls( api_key, @@ -75,6 +78,10 @@ async def create( testnet, loop, session_params, + private_key, + private_key_pass, + https_proxy, + time_unit ) self.https_proxy = https_proxy # move this to the constructor diff --git a/binance/ws/reconnecting_websocket.py b/binance/ws/reconnecting_websocket.py index 80c6fd7d6..3445a42fd 100644 --- a/binance/ws/reconnecting_websocket.py +++ b/binance/ws/reconnecting_websocket.py @@ -47,7 +47,6 @@ class ReconnectingWebsocket: MIN_RECONNECT_WAIT = 0.1 TIMEOUT = 10 NO_MESSAGE_RECONNECT_TIMEOUT = 60 - MAX_QUEUE_SIZE = 100 def __init__( self, @@ -57,6 +56,7 @@ def __init__( is_binary: bool = False, exit_coro=None, https_proxy: Optional[str] = None, + max_queue_size: int = 100, **kwargs, ): self._loop = get_loop() @@ -75,6 +75,7 @@ def __init__( self._handle_read_loop = None self._https_proxy = https_proxy self._ws_kwargs = kwargs + self.max_queue_size = max_queue_size def json_dumps(self, msg) -> str: if orjson: @@ -201,13 +202,14 @@ async def _read_loop(self): self.ws.recv(), timeout=self.TIMEOUT ) res = self._handle_message(res) + print(self._queue.qsize()) self._log.debug(f"Received message: {res}") if res: - if self._queue.qsize() < self.MAX_QUEUE_SIZE: + if self._queue.qsize() < self.max_queue_size: await self._queue.put(res) else: raise BinanceWebsocketQueueOverflow( - f"Message queue size {self._queue.qsize()} exceeded maximum {self.MAX_QUEUE_SIZE}" + f"Message queue size {self._queue.qsize()} exceeded maximum {self.max_queue_size}" ) except asyncio.TimeoutError: self._log.debug(f"no message in {self.TIMEOUT} seconds") diff --git a/binance/ws/streams.py b/binance/ws/streams.py index e2a550f19..aed867bcb 100755 --- a/binance/ws/streams.py +++ b/binance/ws/streams.py @@ -36,12 +36,19 @@ class BinanceSocketManager: WEBSOCKET_DEPTH_10 = "10" WEBSOCKET_DEPTH_20 = "20" - def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT): + def __init__( + self, + client: AsyncClient, + user_timeout=KEEPALIVE_TIMEOUT, + max_queue_size: int = 100, + ): """Initialise the BinanceSocketManager :param client: Binance API client :type client: binance.AsyncClient - + :param user_timeout: Timeout for user socket in seconds + :param max_queue_size: Max size of the websocket queue, defaults to 100 + :type max_queue_size: int """ self.STREAM_URL = self.STREAM_URL.format(client.tld) self.FSTREAM_URL = self.FSTREAM_URL.format(client.tld) @@ -52,8 +59,8 @@ def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT): self._loop = get_loop() self._client = client self._user_timeout = user_timeout - self.testnet = self._client.testnet + self._max_queue_size = max_queue_size self.ws_kwargs = {} def _get_stream_url(self, stream_url: Optional[str] = None): @@ -84,6 +91,7 @@ def _get_socket( exit_coro=lambda p: self._exit_socket(f"{socket_type}_{p}"), is_binary=is_binary, https_proxy=self._client.https_proxy, + max_queue_size=self._max_queue_size, **self.ws_kwargs, ) @@ -1202,6 +1210,7 @@ def __init__( session_params: Optional[Dict[str, Any]] = None, https_proxy: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None, + max_queue_size: int = 100, ): super().__init__( api_key, @@ -1214,10 +1223,14 @@ def __init__( loop, ) self._bsm: Optional[BinanceSocketManager] = None + self._max_queue_size = max_queue_size async def _before_socket_listener_start(self): assert self._client - self._bsm = BinanceSocketManager(client=self._client) + self._bsm = BinanceSocketManager( + client=self._client, + max_queue_size=self._max_queue_size + ) def _start_async_socket( self, @@ -1226,7 +1239,10 @@ def _start_async_socket( params: Dict[str, Any], path: Optional[str] = None, ) -> str: + start_time = time.time() while not self._bsm: + if time.time() - start_time > 5: + raise RuntimeError("Binance Socket Manager failed to initialize after 5 seconds") time.sleep(0.1) socket = getattr(self._bsm, socket_name)(**params) socket_path: str = path or socket._path # noqa diff --git a/binance/ws/threaded_stream.py b/binance/ws/threaded_stream.py index 2d6b0ecd4..5f43fe969 100755 --- a/binance/ws/threaded_stream.py +++ b/binance/ws/threaded_stream.py @@ -1,4 +1,5 @@ import asyncio +import logging import threading from typing import Optional, Dict, Any @@ -24,6 +25,7 @@ def __init__( self._client: Optional[AsyncClient] = None self._running: bool = True self._socket_running: Dict[str, bool] = {} + self._log = logging.getLogger(__name__) self._client_params = { "api_key": api_key, "api_secret": api_secret, @@ -37,12 +39,17 @@ def __init__( async def _before_socket_listener_start(self): ... async def socket_listener(self): - self._client = await AsyncClient.create(loop=self._loop, **self._client_params) - await self._before_socket_listener_start() + try: + self._client = await AsyncClient.create(loop=self._loop, **self._client_params) + await self._before_socket_listener_start() + except Exception as e: + self._log.error(f"Failed to create client: {e}") + self.stop() while self._running: await asyncio.sleep(0.2) while self._socket_running: await asyncio.sleep(0.2) + self._log.info("Socket listener stopped") async def start_listener(self, socket, path: str, callback): async with socket as s: @@ -56,7 +63,7 @@ async def start_listener(self, socket, path: str, callback): if not msg: continue # Handle both async and sync callbacks if asyncio.iscoroutinefunction(callback): - await callback(msg) + asyncio.create_task(callback(msg)) else: callback(msg) del self._socket_running[path] @@ -74,6 +81,7 @@ async def stop_client(self): await self._client.close_connection() def stop(self): + self._log.debug("Stopping ThreadedApiManager") if not self._running: return self._running = False @@ -85,6 +93,6 @@ def stop(self): future.result(timeout=5) # Add timeout to prevent hanging except Exception as e: # Log the error but don't raise it - print(f"Error stopping client: {e}") + self._log.error(f"Error stopping client: {e}") for socket_name in self._socket_running.keys(): self._socket_running[socket_name] = False diff --git a/pyproject.toml b/pyproject.toml index 740a8bbc6..c9f1e92f6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,4 +5,3 @@ lint.ignore = ["F722","F841","F821","E402","E501","E902","E713","E741","E714", " [tool.pytest.ini_options] timeout = 10 timeout_method = "thread" -addopts = "-n 10" diff --git a/test-requirements.txt b/test-requirements.txt index ca61525fc..371fe3e17 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -4,6 +4,7 @@ pytest-asyncio pytest-cov pytest-xdist pytest-rerunfailures +pytest-timeout requests-mock tox setuptools diff --git a/tests/test_reconnecting_websocket.py b/tests/test_reconnecting_websocket.py index fe0abb663..b294c2212 100644 --- a/tests/test_reconnecting_websocket.py +++ b/tests/test_reconnecting_websocket.py @@ -105,7 +105,7 @@ def test_get_reconnect_wait(): async def test_connect_max_reconnects_exceeded(): """Test ws.connect exceeds maximum reconnect attempts.""" ws = ReconnectingWebsocket(url="wss://test.url") - ws.MAX_RECONNECTS = 2 # Set max reconnects to a low number for testing + ws.MAX_RECONNECTS = 2 # type: ignore # Set max reconnects to a low number for testing ws._before_connect = AsyncMock() ws._after_connect = AsyncMock() ws._conn = AsyncMock() diff --git a/tests/test_streams_options.py b/tests/test_streams_options.py index 77f1bd317..8d77144ca 100644 --- a/tests/test_streams_options.py +++ b/tests/test_streams_options.py @@ -1,5 +1,6 @@ import sys import pytest +import logging from binance import BinanceSocketManager pytestmark = [ @@ -7,94 +8,134 @@ pytest.mark.asyncio ] +# Configure logger for this module +logger = logging.getLogger(__name__) + # Test constants -OPTION_SYMBOL = "BTC-250328-40000-P" +OPTION_SYMBOL = "BTC-250926-40000-P" UNDERLYING_SYMBOL = "BTC" -EXPIRATION_DATE = "250328" +EXPIRATION_DATE = "250926" INTERVAL = "1m" DEPTH = "20" async def test_options_ticker(clientAsync): """Test options ticker socket""" + logger.info(f"Starting options ticker test for symbol: {OPTION_SYMBOL}") bm = BinanceSocketManager(clientAsync) socket = bm.options_ticker_socket(OPTION_SYMBOL) async with socket as ts: + logger.debug("Waiting for ticker message...") msg = await ts.recv() + logger.info(f"Received ticker message: {msg}") assert msg['e'] == '24hrTicker' + logger.info("Options ticker test completed successfully") await clientAsync.close_connection() async def test_options_ticker_by_expiration(clientAsync): """Test options ticker by expiration socket""" + logger.info(f"Starting options ticker by expiration test for {UNDERLYING_SYMBOL}, expiration: {EXPIRATION_DATE}") bm = BinanceSocketManager(clientAsync) socket = bm.options_ticker_by_expiration_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE) async with socket as ts: + logger.debug("Waiting for ticker by expiration message...") msg = await ts.recv() + logger.info(f"Received {len(msg)} ticker messages") assert len(msg) > 0 + logger.info("Options ticker by expiration test completed successfully") await clientAsync.close_connection() async def test_options_recent_trades(clientAsync): """Test options recent trades socket""" + logger.info(f"Starting options recent trades test for {UNDERLYING_SYMBOL}") bm = BinanceSocketManager(clientAsync) socket = bm.options_recent_trades_socket(UNDERLYING_SYMBOL) async with socket as ts: + logger.debug("Waiting for trade message...") msg = await ts.recv() + logger.info(f"Received trade message: {msg}") assert msg['e'] == 'trade' + logger.info("Options recent trades test completed successfully") await clientAsync.close_connection() async def test_options_kline(clientAsync): """Test options kline socket""" + logger.info(f"Starting options kline test for {OPTION_SYMBOL}, interval: {INTERVAL}") bm = BinanceSocketManager(clientAsync) socket = bm.options_kline_socket(OPTION_SYMBOL, INTERVAL) async with socket as ts: + logger.debug("Waiting for kline message...") msg = await ts.recv() + logger.info(f"Received kline message: {msg}") assert msg['e'] == 'kline' + logger.info("Options kline test completed successfully") await clientAsync.close_connection() async def test_options_depth(clientAsync): """Test options depth socket""" + logger.info(f"Starting options depth test for {OPTION_SYMBOL}, depth: {DEPTH}") bm = BinanceSocketManager(clientAsync) socket = bm.options_depth_socket(OPTION_SYMBOL, DEPTH) async with socket as ts: + logger.debug("Waiting for depth message...") msg = await ts.recv() + logger.info(f"Received depth message: {msg}") assert msg['e'] == 'depth' + logger.info("Options depth test completed successfully") await clientAsync.close_connection() async def test_options_multiplex(clientAsync): """Test options multiplex socket""" - bm = BinanceSocketManager(clientAsync) streams = [ f"{OPTION_SYMBOL}@ticker", f"{OPTION_SYMBOL}@trade", ] + logger.info(f"Starting options multiplex test with streams: {streams}") + bm = BinanceSocketManager(clientAsync) socket = bm.options_multiplex_socket(streams) async with socket as ts: + logger.debug("Waiting for multiplex message...") msg = await ts.recv() + logger.info(f"Received multiplex message: {msg}") assert 'stream' in msg + logger.info("Options multiplex test completed successfully") await clientAsync.close_connection() async def test_options_open_interest(clientAsync): """Test options open interest socket""" + logger.info(f"Starting options open interest test for {UNDERLYING_SYMBOL}, expiration: {EXPIRATION_DATE}") bm = BinanceSocketManager(clientAsync) socket = bm.options_open_interest_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE) async with socket as ts: + logger.debug("Waiting for open interest message...") msg = await ts.recv() + logger.info(f"Received open interest message with {len(msg)} items") assert len(msg) > 0 + logger.info("Options open interest test completed successfully") await clientAsync.close_connection() async def test_options_mark_price(clientAsync): """Test options mark price socket""" + logger.info(f"Starting options mark price test for {UNDERLYING_SYMBOL}") bm = BinanceSocketManager(clientAsync) socket = bm.options_mark_price_socket(UNDERLYING_SYMBOL) async with socket as ts: + logger.debug("Waiting for mark price message...") msg = await ts.recv() + logger.info(f"Received mark price message with {len(msg)} items") assert len(msg) > 0 + logger.info("Options mark price test completed successfully") await clientAsync.close_connection() async def test_options_index_price(clientAsync): """Test options index price socket""" + symbol = 'ETHUSDT' + logger.info(f"Starting options index price test for {symbol}") bm = BinanceSocketManager(clientAsync) - socket = bm.options_index_price_socket('ETHUSDT') + socket = bm.options_index_price_socket(symbol) async with socket as ts: + logger.debug("Waiting for index price message...") msg = await ts.recv() + logger.info(f"Received index price message: {msg}") assert msg['e'] == 'index' + logger.info("Options index price test completed successfully") await clientAsync.close_connection() diff --git a/tests/test_threaded_socket_manager.py b/tests/test_threaded_socket_manager.py index 9adabfd5a..d76f30cec 100644 --- a/tests/test_threaded_socket_manager.py +++ b/tests/test_threaded_socket_manager.py @@ -1,34 +1,191 @@ from binance import ThreadedWebsocketManager +from binance.client import Client +import asyncio +import time +from .conftest import proxies, api_key, api_secret, proxy +import pytest +import sys +import logging +pytestmark = pytest.mark.skipif( + sys.version_info <= (3, 8), + reason="These tests require Python 3.8+ for proper websocket proxy support" +) received_ohlcv = False received_depth = False twm: ThreadedWebsocketManager +# Add logger definition before using it +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) -def handle_socket_message(msg): - global received_ohlcv, received_depth - print(msg) - if "e" in msg: - if msg["e"] == "depthUpdate": - received_depth = True - if msg["e"] == "kline": - received_ohlcv = True - if received_depth and received_ohlcv: - twm.stop() - +# Get real symbols from Binance API +client = Client(api_key, api_secret, {"proxies": proxies}) +exchange_info = client.get_exchange_info() +symbols = [info['symbol'].lower() for info in exchange_info['symbols']] +streams = [f"{symbol}@bookTicker" for symbol in symbols][0:100] # Take first 800 symbols def test_threaded_socket_manager(): + logger.debug("Starting test_threaded_socket_manager") global twm - twm = ThreadedWebsocketManager(api_key="", api_secret="", testnet=True) + twm = ThreadedWebsocketManager(api_key, api_secret, https_proxy=proxy, testnet=True) symbol = "BTCUSDT" - twm.start() + def handle_socket_message(msg): + global received_ohlcv, received_depth + if "e" in msg: + if msg["e"] == "depthUpdate": + logger.debug("Received depth update message") + received_depth = True + if msg["e"] == "kline": + logger.debug("Received kline message") + received_ohlcv = True + if received_depth and received_ohlcv: + logger.debug("Received both depth and OHLCV messages, stopping") + twm.stop() + + try: + logger.debug("Starting ThreadedWebsocketManager") + twm.start() + logger.debug("Starting kline socket for %s", symbol) + twm.start_kline_socket(callback=handle_socket_message, symbol=symbol) + logger.debug("Starting depth socket for %s", symbol) + twm.start_depth_socket(callback=handle_socket_message, symbol=symbol) + twm.join() + finally: + logger.debug("Cleaning up test_threaded_socket_manager") + twm.stop() + time.sleep(2) + + +def test_many_symbols_small_queue(): + logger.debug("Starting test_many_symbols_small_queue with queue size 1") + twm = ThreadedWebsocketManager(api_key, api_secret, https_proxy=proxy, testnet=True, max_queue_size=1) + + error_received = False + msg_received = False + + def handle_message(msg): + nonlocal error_received, msg_received + if msg.get("e") == "error": + error_received = True + logger.debug("Received WebSocket error: %s", msg.get('m', 'Unknown error')) + return + msg_received = True + logger.debug("Received valid message") + + try: + logger.debug("Starting ThreadedWebsocketManager") + twm.start() + logger.debug("Starting multiplex socket with %d streams", len(streams)) + twm.start_multiplex_socket(callback=handle_message, streams=streams) + logger.debug("Waiting 10 seconds for messages") + time.sleep(10) + + assert msg_received, "Should have received messages" + finally: + logger.debug("Cleaning up test_many_symbols_small_queue") + twm.stop() + time.sleep(2) + + +def test_many_symbols_adequate_queue(): + logger.debug("Starting test_many_symbols_adequate_queue with queue size 200") + twm = ThreadedWebsocketManager(api_key, api_secret, https_proxy=proxy, testnet=True, max_queue_size=200) + + messages_received = 0 + error_received = False + + def handle_message(msg): + nonlocal messages_received, error_received + if msg.get("e") == "error": + error_received = True + logger.debug("Received WebSocket error: %s", msg.get('m', 'Unknown error')) + return + + messages_received += 1 + if messages_received % 10 == 0: # Log every 10th message + logger.debug("Processed %d messages", messages_received) + + try: + logger.debug("Starting ThreadedWebsocketManager") + twm.start() + logger.debug("Starting futures multiplex socket with %d streams", len(streams)) + twm.start_futures_multiplex_socket(callback=handle_message, streams=streams) + logger.debug("Waiting 10 seconds for messages") + time.sleep(10) + + logger.debug("Test completed. Messages received: %d, Errors: %s", messages_received, error_received) + assert messages_received > 0, "Should have received some messages" + assert not error_received, "Should not have received any errors" + finally: + logger.debug("Cleaning up test_many_symbols_adequate_queue") + twm.stop() + time.sleep(2) + - twm.start_kline_socket(callback=handle_socket_message, symbol=symbol) +def test_slow_async_callback_no_error(): + logger.debug("Starting test_slow_async_callback_no_error with queue size 400") + twm = ThreadedWebsocketManager(api_key, api_secret, https_proxy=proxy, testnet=True, max_queue_size=400) + + messages_processed = 0 + error_received = False + + async def slow_async_callback(msg): + nonlocal messages_processed, error_received + if msg.get("e") == "error": + error_received = True + logger.debug("Received WebSocket error: %s", msg.get('m', 'Unknown error')) + return + + logger.debug("Processing message with 2 second delay") + await asyncio.sleep(2) + messages_processed += 1 + logger.debug("Message processed. Total processed: %d", messages_processed) + + try: + logger.debug("Starting ThreadedWebsocketManager") + twm.start() + logger.debug("Starting futures multiplex socket with %d streams", len(streams)) + twm.start_futures_multiplex_socket(callback=slow_async_callback, streams=streams) + logger.debug("Waiting 10 seconds for messages") + time.sleep(10) + + logger.debug("Test completed. Messages processed: %d, Errors: %s", messages_processed, error_received) + assert messages_processed > 0, "Should have processed some messages" + assert not error_received, "Should not have received any errors" + finally: + logger.debug("Cleaning up test_slow_async_callback_no_error") + twm.stop() + time.sleep(2) - twm.start_depth_socket(callback=handle_socket_message, symbol=symbol) - twm.join() +def test_no_internet_connection(): + """Test that socket manager times out when there's no internet connection""" + logger.debug("Starting test_no_internet_connection") + invalid_proxy = "http://invalid.proxy:1234" + logger.debug("Using invalid proxy: %s", invalid_proxy) + + with pytest.raises(RuntimeError, match="Binance Socket Manager failed to initialize after 5 seconds"): + twm = ThreadedWebsocketManager( + api_key, + api_secret, + https_proxy=invalid_proxy, + testnet=True + ) + + try: + logger.debug("Attempting to start ThreadedWebsocketManager with invalid proxy") + twm.start() + logger.debug("Attempting to start kline socket (should fail)") + twm.start_kline_socket( + callback=lambda x: print(x), + symbol="BTCUSDT" + ) + finally: + logger.debug("Cleaning up test_no_internet_connection") + twm.stop() + time.sleep(2) diff --git a/tests/test_threaded_stream.py b/tests/test_threaded_stream.py index dfc8496c2..0752b7b5e 100644 --- a/tests/test_threaded_stream.py +++ b/tests/test_threaded_stream.py @@ -1,5 +1,7 @@ import pytest import asyncio + +import websockets from binance.ws.threaded_stream import ThreadedApiManager from unittest.mock import Mock diff --git a/tests/test_ws_api.py b/tests/test_ws_api.py index dfdbad1cc..e7f116522 100644 --- a/tests/test_ws_api.py +++ b/tests/test_ws_api.py @@ -177,8 +177,8 @@ async def test_cleanup_on_exit(clientAsync): async def test_ws_queue_overflow(clientAsync): """WebSocket API should not overflow queue""" # - original_size = clientAsync.ws_api.MAX_QUEUE_SIZE - clientAsync.ws_api.MAX_QUEUE_SIZE = 1 + original_size = clientAsync.ws_api.max_queue_size + clientAsync.ws_api.max_queue_size = 1 try: # Request multiple order books concurrently diff --git a/tox.ini b/tox.ini index ab2427681..e221c0047 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ passenv = TEST_API_SECRET TEST_FUTURES_API_KEY TEST_FUTURES_API_SECRET -commands = pytest -n 1 -v tests/ --doctest-modules --cov binance --cov-report term-missing --reruns 3 --reruns-delay 120 +commands = pytest -n 1 -vvs tests/ --timeout=60 --doctest-modules --cov binance --cov-report term-missing --reruns 3 --reruns-delay 120 [pep8] ignore = E501