Skip to content

Commit 9dac1d4

Browse files
authored
feat: Expose option for user to edit max_queue_size in websockets, have async callbacks called in a task and update docs (#1570)
* add timeout to jobs * feat: expose variable max_queue_size and have async callbacks called in a task * fix tests * fix proxy in test * pass params to socket manager * skip test in 3.7 * add pytest timeout * improve logging and error throw on failed connection and add test * update tests * skip for 3.7 * pyright tests * fix test * comment coveralls * add debug logging to test * add more logigng test only file * fix test * run all tests * reduce logging * update symbol for test * uncomment coveralls to test * Revert "uncomment coveralls to test" This reverts commit e759466.
1 parent 06c721e commit 9dac1d4

13 files changed

+285
-51
lines changed

.github/workflows/python-app.yml

+16-15
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,19 @@ jobs:
6767
run: pyright
6868
- name: Test with tox
6969
run: tox -e py
70-
- name: Coveralls Parallel
71-
uses: coverallsapp/github-action@v2
72-
with:
73-
flag-name: run-${{ join(matrix.*, '-') }}
74-
parallel: true
75-
finish:
76-
needs: build
77-
if: ${{ always() }}
78-
runs-on: ubuntu-latest
79-
timeout-minutes: 5
80-
steps:
81-
- name: Coveralls Finished
82-
uses: coverallsapp/github-action@v2
83-
with:
84-
parallel-finished: true
70+
# comment due to coveralls maintenance of April 6 2025: https://status.coveralls.io/
71+
# - name: Coveralls Parallel
72+
# uses: coverallsapp/github-action@v2
73+
# with:
74+
# flag-name: run-${{ join(matrix.*, '-') }}
75+
# parallel: true
76+
# finish:
77+
# needs: build
78+
# if: ${{ always() }}
79+
# runs-on: ubuntu-latest
80+
# timeout-minutes: 5
81+
# steps:
82+
# - name: Coveralls Finished
83+
# uses: coverallsapp/github-action@v2
84+
# with:
85+
# parallel-finished: true

binance/async_client.py

+7
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ async def create(
6464
testnet: bool = False,
6565
loop=None,
6666
session_params: Optional[Dict[str, Any]] = None,
67+
private_key: Optional[Union[str, Path]] = None,
68+
private_key_pass: Optional[str] = None,
6769
https_proxy: Optional[str] = None,
70+
time_unit: Optional[str] = None,
6871
):
6972
self = cls(
7073
api_key,
@@ -75,6 +78,10 @@ async def create(
7578
testnet,
7679
loop,
7780
session_params,
81+
private_key,
82+
private_key_pass,
83+
https_proxy,
84+
time_unit
7885
)
7986
self.https_proxy = https_proxy # move this to the constructor
8087

binance/ws/reconnecting_websocket.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ class ReconnectingWebsocket:
4747
MIN_RECONNECT_WAIT = 0.1
4848
TIMEOUT = 10
4949
NO_MESSAGE_RECONNECT_TIMEOUT = 60
50-
MAX_QUEUE_SIZE = 100
5150

5251
def __init__(
5352
self,
@@ -57,6 +56,7 @@ def __init__(
5756
is_binary: bool = False,
5857
exit_coro=None,
5958
https_proxy: Optional[str] = None,
59+
max_queue_size: int = 100,
6060
**kwargs,
6161
):
6262
self._loop = get_loop()
@@ -75,6 +75,7 @@ def __init__(
7575
self._handle_read_loop = None
7676
self._https_proxy = https_proxy
7777
self._ws_kwargs = kwargs
78+
self.max_queue_size = max_queue_size
7879

7980
def json_dumps(self, msg) -> str:
8081
if orjson:
@@ -201,13 +202,14 @@ async def _read_loop(self):
201202
self.ws.recv(), timeout=self.TIMEOUT
202203
)
203204
res = self._handle_message(res)
205+
print(self._queue.qsize())
204206
self._log.debug(f"Received message: {res}")
205207
if res:
206-
if self._queue.qsize() < self.MAX_QUEUE_SIZE:
208+
if self._queue.qsize() < self.max_queue_size:
207209
await self._queue.put(res)
208210
else:
209211
raise BinanceWebsocketQueueOverflow(
210-
f"Message queue size {self._queue.qsize()} exceeded maximum {self.MAX_QUEUE_SIZE}"
212+
f"Message queue size {self._queue.qsize()} exceeded maximum {self.max_queue_size}"
211213
)
212214
except asyncio.TimeoutError:
213215
self._log.debug(f"no message in {self.TIMEOUT} seconds")

binance/ws/streams.py

+20-4
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,19 @@ class BinanceSocketManager:
3636
WEBSOCKET_DEPTH_10 = "10"
3737
WEBSOCKET_DEPTH_20 = "20"
3838

39-
def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
39+
def __init__(
40+
self,
41+
client: AsyncClient,
42+
user_timeout=KEEPALIVE_TIMEOUT,
43+
max_queue_size: int = 100,
44+
):
4045
"""Initialise the BinanceSocketManager
4146
4247
:param client: Binance API client
4348
:type client: binance.AsyncClient
44-
49+
:param user_timeout: Timeout for user socket in seconds
50+
:param max_queue_size: Max size of the websocket queue, defaults to 100
51+
:type max_queue_size: int
4552
"""
4653
self.STREAM_URL = self.STREAM_URL.format(client.tld)
4754
self.FSTREAM_URL = self.FSTREAM_URL.format(client.tld)
@@ -52,8 +59,8 @@ def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
5259
self._loop = get_loop()
5360
self._client = client
5461
self._user_timeout = user_timeout
55-
5662
self.testnet = self._client.testnet
63+
self._max_queue_size = max_queue_size
5764
self.ws_kwargs = {}
5865

5966
def _get_stream_url(self, stream_url: Optional[str] = None):
@@ -84,6 +91,7 @@ def _get_socket(
8491
exit_coro=lambda p: self._exit_socket(f"{socket_type}_{p}"),
8592
is_binary=is_binary,
8693
https_proxy=self._client.https_proxy,
94+
max_queue_size=self._max_queue_size,
8795
**self.ws_kwargs,
8896
)
8997

@@ -1202,6 +1210,7 @@ def __init__(
12021210
session_params: Optional[Dict[str, Any]] = None,
12031211
https_proxy: Optional[str] = None,
12041212
loop: Optional[asyncio.AbstractEventLoop] = None,
1213+
max_queue_size: int = 100,
12051214
):
12061215
super().__init__(
12071216
api_key,
@@ -1214,10 +1223,14 @@ def __init__(
12141223
loop,
12151224
)
12161225
self._bsm: Optional[BinanceSocketManager] = None
1226+
self._max_queue_size = max_queue_size
12171227

12181228
async def _before_socket_listener_start(self):
12191229
assert self._client
1220-
self._bsm = BinanceSocketManager(client=self._client)
1230+
self._bsm = BinanceSocketManager(
1231+
client=self._client,
1232+
max_queue_size=self._max_queue_size
1233+
)
12211234

12221235
def _start_async_socket(
12231236
self,
@@ -1226,7 +1239,10 @@ def _start_async_socket(
12261239
params: Dict[str, Any],
12271240
path: Optional[str] = None,
12281241
) -> str:
1242+
start_time = time.time()
12291243
while not self._bsm:
1244+
if time.time() - start_time > 5:
1245+
raise RuntimeError("Binance Socket Manager failed to initialize after 5 seconds")
12301246
time.sleep(0.1)
12311247
socket = getattr(self._bsm, socket_name)(**params)
12321248
socket_path: str = path or socket._path # noqa

binance/ws/threaded_stream.py

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23
import threading
34
from typing import Optional, Dict, Any
45

@@ -24,6 +25,7 @@ def __init__(
2425
self._client: Optional[AsyncClient] = None
2526
self._running: bool = True
2627
self._socket_running: Dict[str, bool] = {}
28+
self._log = logging.getLogger(__name__)
2729
self._client_params = {
2830
"api_key": api_key,
2931
"api_secret": api_secret,
@@ -37,12 +39,17 @@ def __init__(
3739
async def _before_socket_listener_start(self): ...
3840

3941
async def socket_listener(self):
40-
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
41-
await self._before_socket_listener_start()
42+
try:
43+
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
44+
await self._before_socket_listener_start()
45+
except Exception as e:
46+
self._log.error(f"Failed to create client: {e}")
47+
self.stop()
4248
while self._running:
4349
await asyncio.sleep(0.2)
4450
while self._socket_running:
4551
await asyncio.sleep(0.2)
52+
self._log.info("Socket listener stopped")
4653

4754
async def start_listener(self, socket, path: str, callback):
4855
async with socket as s:
@@ -56,7 +63,7 @@ async def start_listener(self, socket, path: str, callback):
5663
if not msg:
5764
continue # Handle both async and sync callbacks
5865
if asyncio.iscoroutinefunction(callback):
59-
await callback(msg)
66+
asyncio.create_task(callback(msg))
6067
else:
6168
callback(msg)
6269
del self._socket_running[path]
@@ -74,6 +81,7 @@ async def stop_client(self):
7481
await self._client.close_connection()
7582

7683
def stop(self):
84+
self._log.debug("Stopping ThreadedApiManager")
7785
if not self._running:
7886
return
7987
self._running = False
@@ -85,6 +93,6 @@ def stop(self):
8593
future.result(timeout=5) # Add timeout to prevent hanging
8694
except Exception as e:
8795
# Log the error but don't raise it
88-
print(f"Error stopping client: {e}")
96+
self._log.error(f"Error stopping client: {e}")
8997
for socket_name in self._socket_running.keys():
9098
self._socket_running[socket_name] = False

pyproject.toml

-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,3 @@ lint.ignore = ["F722","F841","F821","E402","E501","E902","E713","E741","E714", "
55
[tool.pytest.ini_options]
66
timeout = 10
77
timeout_method = "thread"
8-
addopts = "-n 10"

test-requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pytest-asyncio
44
pytest-cov
55
pytest-xdist
66
pytest-rerunfailures
7+
pytest-timeout
78
requests-mock
89
tox
910
setuptools

tests/test_reconnecting_websocket.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def test_get_reconnect_wait():
105105
async def test_connect_max_reconnects_exceeded():
106106
"""Test ws.connect exceeds maximum reconnect attempts."""
107107
ws = ReconnectingWebsocket(url="wss://test.url")
108-
ws.MAX_RECONNECTS = 2 # Set max reconnects to a low number for testing
108+
ws.MAX_RECONNECTS = 2 # type: ignore # Set max reconnects to a low number for testing
109109
ws._before_connect = AsyncMock()
110110
ws._after_connect = AsyncMock()
111111
ws._conn = AsyncMock()

tests/test_streams_options.py

+45-4
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,141 @@
11
import sys
22
import pytest
3+
import logging
34
from binance import BinanceSocketManager
45

56
pytestmark = [
67
pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+"),
78
pytest.mark.asyncio
89
]
910

11+
# Configure logger for this module
12+
logger = logging.getLogger(__name__)
13+
1014
# Test constants
11-
OPTION_SYMBOL = "BTC-250328-40000-P"
15+
OPTION_SYMBOL = "BTC-250926-40000-P"
1216
UNDERLYING_SYMBOL = "BTC"
13-
EXPIRATION_DATE = "250328"
17+
EXPIRATION_DATE = "250926"
1418
INTERVAL = "1m"
1519
DEPTH = "20"
1620

1721
async def test_options_ticker(clientAsync):
1822
"""Test options ticker socket"""
23+
logger.info(f"Starting options ticker test for symbol: {OPTION_SYMBOL}")
1924
bm = BinanceSocketManager(clientAsync)
2025
socket = bm.options_ticker_socket(OPTION_SYMBOL)
2126
async with socket as ts:
27+
logger.debug("Waiting for ticker message...")
2228
msg = await ts.recv()
29+
logger.info(f"Received ticker message: {msg}")
2330
assert msg['e'] == '24hrTicker'
31+
logger.info("Options ticker test completed successfully")
2432
await clientAsync.close_connection()
2533

2634
async def test_options_ticker_by_expiration(clientAsync):
2735
"""Test options ticker by expiration socket"""
36+
logger.info(f"Starting options ticker by expiration test for {UNDERLYING_SYMBOL}, expiration: {EXPIRATION_DATE}")
2837
bm = BinanceSocketManager(clientAsync)
2938
socket = bm.options_ticker_by_expiration_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
3039
async with socket as ts:
40+
logger.debug("Waiting for ticker by expiration message...")
3141
msg = await ts.recv()
42+
logger.info(f"Received {len(msg)} ticker messages")
3243
assert len(msg) > 0
44+
logger.info("Options ticker by expiration test completed successfully")
3345
await clientAsync.close_connection()
3446

3547
async def test_options_recent_trades(clientAsync):
3648
"""Test options recent trades socket"""
49+
logger.info(f"Starting options recent trades test for {UNDERLYING_SYMBOL}")
3750
bm = BinanceSocketManager(clientAsync)
3851
socket = bm.options_recent_trades_socket(UNDERLYING_SYMBOL)
3952
async with socket as ts:
53+
logger.debug("Waiting for trade message...")
4054
msg = await ts.recv()
55+
logger.info(f"Received trade message: {msg}")
4156
assert msg['e'] == 'trade'
57+
logger.info("Options recent trades test completed successfully")
4258
await clientAsync.close_connection()
4359

4460
async def test_options_kline(clientAsync):
4561
"""Test options kline socket"""
62+
logger.info(f"Starting options kline test for {OPTION_SYMBOL}, interval: {INTERVAL}")
4663
bm = BinanceSocketManager(clientAsync)
4764
socket = bm.options_kline_socket(OPTION_SYMBOL, INTERVAL)
4865
async with socket as ts:
66+
logger.debug("Waiting for kline message...")
4967
msg = await ts.recv()
68+
logger.info(f"Received kline message: {msg}")
5069
assert msg['e'] == 'kline'
70+
logger.info("Options kline test completed successfully")
5171
await clientAsync.close_connection()
5272

5373
async def test_options_depth(clientAsync):
5474
"""Test options depth socket"""
75+
logger.info(f"Starting options depth test for {OPTION_SYMBOL}, depth: {DEPTH}")
5576
bm = BinanceSocketManager(clientAsync)
5677
socket = bm.options_depth_socket(OPTION_SYMBOL, DEPTH)
5778
async with socket as ts:
79+
logger.debug("Waiting for depth message...")
5880
msg = await ts.recv()
81+
logger.info(f"Received depth message: {msg}")
5982
assert msg['e'] == 'depth'
83+
logger.info("Options depth test completed successfully")
6084
await clientAsync.close_connection()
6185

6286
async def test_options_multiplex(clientAsync):
6387
"""Test options multiplex socket"""
64-
bm = BinanceSocketManager(clientAsync)
6588
streams = [
6689
f"{OPTION_SYMBOL}@ticker",
6790
f"{OPTION_SYMBOL}@trade",
6891
]
92+
logger.info(f"Starting options multiplex test with streams: {streams}")
93+
bm = BinanceSocketManager(clientAsync)
6994
socket = bm.options_multiplex_socket(streams)
7095
async with socket as ts:
96+
logger.debug("Waiting for multiplex message...")
7197
msg = await ts.recv()
98+
logger.info(f"Received multiplex message: {msg}")
7299
assert 'stream' in msg
100+
logger.info("Options multiplex test completed successfully")
73101
await clientAsync.close_connection()
74102

75103
async def test_options_open_interest(clientAsync):
76104
"""Test options open interest socket"""
105+
logger.info(f"Starting options open interest test for {UNDERLYING_SYMBOL}, expiration: {EXPIRATION_DATE}")
77106
bm = BinanceSocketManager(clientAsync)
78107
socket = bm.options_open_interest_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
79108
async with socket as ts:
109+
logger.debug("Waiting for open interest message...")
80110
msg = await ts.recv()
111+
logger.info(f"Received open interest message with {len(msg)} items")
81112
assert len(msg) > 0
113+
logger.info("Options open interest test completed successfully")
82114
await clientAsync.close_connection()
83115

84116
async def test_options_mark_price(clientAsync):
85117
"""Test options mark price socket"""
118+
logger.info(f"Starting options mark price test for {UNDERLYING_SYMBOL}")
86119
bm = BinanceSocketManager(clientAsync)
87120
socket = bm.options_mark_price_socket(UNDERLYING_SYMBOL)
88121
async with socket as ts:
122+
logger.debug("Waiting for mark price message...")
89123
msg = await ts.recv()
124+
logger.info(f"Received mark price message with {len(msg)} items")
90125
assert len(msg) > 0
126+
logger.info("Options mark price test completed successfully")
91127
await clientAsync.close_connection()
92128

93129
async def test_options_index_price(clientAsync):
94130
"""Test options index price socket"""
131+
symbol = 'ETHUSDT'
132+
logger.info(f"Starting options index price test for {symbol}")
95133
bm = BinanceSocketManager(clientAsync)
96-
socket = bm.options_index_price_socket('ETHUSDT')
134+
socket = bm.options_index_price_socket(symbol)
97135
async with socket as ts:
136+
logger.debug("Waiting for index price message...")
98137
msg = await ts.recv()
138+
logger.info(f"Received index price message: {msg}")
99139
assert msg['e'] == 'index'
140+
logger.info("Options index price test completed successfully")
100141
await clientAsync.close_connection()

0 commit comments

Comments
 (0)