Skip to content

feat: Expose option for user to edit max_queue_size in websockets, have async callbacks called in a task and update docs #1570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
6b9447f
add timeout to jobs
pcriadoperez Feb 18, 2025
cf7debb
Merge branch 'master' of https://github.com/sammchardy/python-binance
pcriadoperez Feb 21, 2025
74f285e
Merge branch 'master' of https://github.com/sammchardy/python-binance
pcriadoperez Feb 24, 2025
4d3e7ee
Merge branch 'master' of https://github.com/sammchardy/python-binance
pcriadoperez Mar 28, 2025
82d266a
feat: expose variable max_queue_size and have async callbacks called …
pcriadoperez Mar 30, 2025
958d499
fix tests
pcriadoperez Mar 30, 2025
847831c
fix proxy in test
pcriadoperez Mar 31, 2025
77054df
pass params to socket manager
pcriadoperez Apr 1, 2025
5c86fc6
skip test in 3.7
pcriadoperez Apr 1, 2025
3c4696e
add pytest timeout
pcriadoperez Apr 1, 2025
db4cee7
improve logging and error throw on failed connection and add test
pcriadoperez Apr 5, 2025
d20b43e
update tests
pcriadoperez Apr 5, 2025
11f8d12
skip for 3.7
pcriadoperez Apr 6, 2025
c7431a5
pyright tests
pcriadoperez Apr 6, 2025
80d1e53
fix test
pcriadoperez Apr 6, 2025
a7e7726
comment coveralls
pcriadoperez Apr 6, 2025
3114b53
add debug logging to test
pcriadoperez Apr 7, 2025
969b80d
add more logigng test only file
pcriadoperez Apr 7, 2025
4cb3031
fix test
pcriadoperez Apr 7, 2025
99080be
run all tests
pcriadoperez Apr 7, 2025
74060b1
reduce logging
pcriadoperez Apr 7, 2025
4eb1d88
update symbol for test
pcriadoperez Apr 7, 2025
e759466
uncomment coveralls to test
pcriadoperez Apr 7, 2025
3b5c10e
Revert "uncomment coveralls to test"
pcriadoperez Apr 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions .github/workflows/python-app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,19 @@ jobs:
run: pyright
- name: Test with tox
run: tox -e py
- name: Coveralls Parallel
uses: coverallsapp/github-action@v2
with:
flag-name: run-${{ join(matrix.*, '-') }}
parallel: true
finish:
needs: build
if: ${{ always() }}
runs-on: ubuntu-latest
timeout-minutes: 5
steps:
- name: Coveralls Finished
uses: coverallsapp/github-action@v2
with:
parallel-finished: true
# comment due to coveralls maintenance of April 6 2025: https://status.coveralls.io/
# - name: Coveralls Parallel
# uses: coverallsapp/github-action@v2
# with:
# flag-name: run-${{ join(matrix.*, '-') }}
# parallel: true
# finish:
# needs: build
# if: ${{ always() }}
# runs-on: ubuntu-latest
# timeout-minutes: 5
# steps:
# - name: Coveralls Finished
# uses: coverallsapp/github-action@v2
# with:
# parallel-finished: true
7 changes: 7 additions & 0 deletions binance/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ async def create(
testnet: bool = False,
loop=None,
session_params: Optional[Dict[str, Any]] = None,
private_key: Optional[Union[str, Path]] = None,
private_key_pass: Optional[str] = None,
https_proxy: Optional[str] = None,
time_unit: Optional[str] = None,
):
self = cls(
api_key,
Expand All @@ -75,6 +78,10 @@ async def create(
testnet,
loop,
session_params,
private_key,
private_key_pass,
https_proxy,
time_unit
)
self.https_proxy = https_proxy # move this to the constructor

Expand Down
8 changes: 5 additions & 3 deletions binance/ws/reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class ReconnectingWebsocket:
MIN_RECONNECT_WAIT = 0.1
TIMEOUT = 10
NO_MESSAGE_RECONNECT_TIMEOUT = 60
MAX_QUEUE_SIZE = 100

def __init__(
self,
Expand All @@ -57,6 +56,7 @@ def __init__(
is_binary: bool = False,
exit_coro=None,
https_proxy: Optional[str] = None,
max_queue_size: int = 100,
**kwargs,
):
self._loop = get_loop()
Expand All @@ -75,6 +75,7 @@ def __init__(
self._handle_read_loop = None
self._https_proxy = https_proxy
self._ws_kwargs = kwargs
self.max_queue_size = max_queue_size

def json_dumps(self, msg) -> str:
if orjson:
Expand Down Expand Up @@ -201,13 +202,14 @@ async def _read_loop(self):
self.ws.recv(), timeout=self.TIMEOUT
)
res = self._handle_message(res)
print(self._queue.qsize())
self._log.debug(f"Received message: {res}")
if res:
if self._queue.qsize() < self.MAX_QUEUE_SIZE:
if self._queue.qsize() < self.max_queue_size:
await self._queue.put(res)
else:
raise BinanceWebsocketQueueOverflow(
f"Message queue size {self._queue.qsize()} exceeded maximum {self.MAX_QUEUE_SIZE}"
f"Message queue size {self._queue.qsize()} exceeded maximum {self.max_queue_size}"
)
except asyncio.TimeoutError:
self._log.debug(f"no message in {self.TIMEOUT} seconds")
Expand Down
24 changes: 20 additions & 4 deletions binance/ws/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,19 @@ class BinanceSocketManager:
WEBSOCKET_DEPTH_10 = "10"
WEBSOCKET_DEPTH_20 = "20"

def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
def __init__(
self,
client: AsyncClient,
user_timeout=KEEPALIVE_TIMEOUT,
max_queue_size: int = 100,
):
"""Initialise the BinanceSocketManager

:param client: Binance API client
:type client: binance.AsyncClient

:param user_timeout: Timeout for user socket in seconds
:param max_queue_size: Max size of the websocket queue, defaults to 100
:type max_queue_size: int
"""
self.STREAM_URL = self.STREAM_URL.format(client.tld)
self.FSTREAM_URL = self.FSTREAM_URL.format(client.tld)
Expand All @@ -52,8 +59,8 @@ def __init__(self, client: AsyncClient, user_timeout=KEEPALIVE_TIMEOUT):
self._loop = get_loop()
self._client = client
self._user_timeout = user_timeout

self.testnet = self._client.testnet
self._max_queue_size = max_queue_size
self.ws_kwargs = {}

def _get_stream_url(self, stream_url: Optional[str] = None):
Expand Down Expand Up @@ -84,6 +91,7 @@ def _get_socket(
exit_coro=lambda p: self._exit_socket(f"{socket_type}_{p}"),
is_binary=is_binary,
https_proxy=self._client.https_proxy,
max_queue_size=self._max_queue_size,
**self.ws_kwargs,
)

Expand Down Expand Up @@ -1202,6 +1210,7 @@ def __init__(
session_params: Optional[Dict[str, Any]] = None,
https_proxy: Optional[str] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
max_queue_size: int = 100,
):
super().__init__(
api_key,
Expand All @@ -1214,10 +1223,14 @@ def __init__(
loop,
)
self._bsm: Optional[BinanceSocketManager] = None
self._max_queue_size = max_queue_size

async def _before_socket_listener_start(self):
assert self._client
self._bsm = BinanceSocketManager(client=self._client)
self._bsm = BinanceSocketManager(
client=self._client,
max_queue_size=self._max_queue_size
)

def _start_async_socket(
self,
Expand All @@ -1226,7 +1239,10 @@ def _start_async_socket(
params: Dict[str, Any],
path: Optional[str] = None,
) -> str:
start_time = time.time()
while not self._bsm:
if time.time() - start_time > 5:
raise RuntimeError("Binance Socket Manager failed to initialize after 5 seconds")
time.sleep(0.1)
socket = getattr(self._bsm, socket_name)(**params)
socket_path: str = path or socket._path # noqa
Expand Down
16 changes: 12 additions & 4 deletions binance/ws/threaded_stream.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import logging
import threading
from typing import Optional, Dict, Any

Expand All @@ -24,6 +25,7 @@ def __init__(
self._client: Optional[AsyncClient] = None
self._running: bool = True
self._socket_running: Dict[str, bool] = {}
self._log = logging.getLogger(__name__)
self._client_params = {
"api_key": api_key,
"api_secret": api_secret,
Expand All @@ -37,12 +39,17 @@ def __init__(
async def _before_socket_listener_start(self): ...

async def socket_listener(self):
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
await self._before_socket_listener_start()
try:
self._client = await AsyncClient.create(loop=self._loop, **self._client_params)
await self._before_socket_listener_start()
except Exception as e:
self._log.error(f"Failed to create client: {e}")
self.stop()
while self._running:
await asyncio.sleep(0.2)
while self._socket_running:
await asyncio.sleep(0.2)
self._log.info("Socket listener stopped")

async def start_listener(self, socket, path: str, callback):
async with socket as s:
Expand All @@ -56,7 +63,7 @@ async def start_listener(self, socket, path: str, callback):
if not msg:
continue # Handle both async and sync callbacks
if asyncio.iscoroutinefunction(callback):
await callback(msg)
asyncio.create_task(callback(msg))
else:
callback(msg)
del self._socket_running[path]
Expand All @@ -74,6 +81,7 @@ async def stop_client(self):
await self._client.close_connection()

def stop(self):
self._log.debug("Stopping ThreadedApiManager")
if not self._running:
return
self._running = False
Expand All @@ -85,6 +93,6 @@ def stop(self):
future.result(timeout=5) # Add timeout to prevent hanging
except Exception as e:
# Log the error but don't raise it
print(f"Error stopping client: {e}")
self._log.error(f"Error stopping client: {e}")
for socket_name in self._socket_running.keys():
self._socket_running[socket_name] = False
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ lint.ignore = ["F722","F841","F821","E402","E501","E902","E713","E741","E714", "
[tool.pytest.ini_options]
timeout = 10
timeout_method = "thread"
addopts = "-n 10"
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pytest-asyncio
pytest-cov
pytest-xdist
pytest-rerunfailures
pytest-timeout
requests-mock
tox
setuptools
Expand Down
2 changes: 1 addition & 1 deletion tests/test_reconnecting_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def test_get_reconnect_wait():
async def test_connect_max_reconnects_exceeded():
"""Test ws.connect exceeds maximum reconnect attempts."""
ws = ReconnectingWebsocket(url="wss://test.url")
ws.MAX_RECONNECTS = 2 # Set max reconnects to a low number for testing
ws.MAX_RECONNECTS = 2 # type: ignore # Set max reconnects to a low number for testing
ws._before_connect = AsyncMock()
ws._after_connect = AsyncMock()
ws._conn = AsyncMock()
Expand Down
49 changes: 45 additions & 4 deletions tests/test_streams_options.py
Original file line number Diff line number Diff line change
@@ -1,100 +1,141 @@
import sys
import pytest
import logging
from binance import BinanceSocketManager

pytestmark = [
pytest.mark.skipif(sys.version_info < (3, 8), reason="websockets_proxy Python 3.8+"),
pytest.mark.asyncio
]

# Configure logger for this module
logger = logging.getLogger(__name__)

# Test constants
OPTION_SYMBOL = "BTC-250328-40000-P"
OPTION_SYMBOL = "BTC-250926-40000-P"
UNDERLYING_SYMBOL = "BTC"
EXPIRATION_DATE = "250328"
EXPIRATION_DATE = "250926"
INTERVAL = "1m"
DEPTH = "20"

async def test_options_ticker(clientAsync):
"""Test options ticker socket"""
logger.info(f"Starting options ticker test for symbol: {OPTION_SYMBOL}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_ticker_socket(OPTION_SYMBOL)
async with socket as ts:
logger.debug("Waiting for ticker message...")
msg = await ts.recv()
logger.info(f"Received ticker message: {msg}")
assert msg['e'] == '24hrTicker'
logger.info("Options ticker test completed successfully")
await clientAsync.close_connection()

async def test_options_ticker_by_expiration(clientAsync):
"""Test options ticker by expiration socket"""
logger.info(f"Starting options ticker by expiration test for {UNDERLYING_SYMBOL}, expiration: {EXPIRATION_DATE}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_ticker_by_expiration_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
async with socket as ts:
logger.debug("Waiting for ticker by expiration message...")
msg = await ts.recv()
logger.info(f"Received {len(msg)} ticker messages")
assert len(msg) > 0
logger.info("Options ticker by expiration test completed successfully")
await clientAsync.close_connection()

async def test_options_recent_trades(clientAsync):
"""Test options recent trades socket"""
logger.info(f"Starting options recent trades test for {UNDERLYING_SYMBOL}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_recent_trades_socket(UNDERLYING_SYMBOL)
async with socket as ts:
logger.debug("Waiting for trade message...")
msg = await ts.recv()
logger.info(f"Received trade message: {msg}")
assert msg['e'] == 'trade'
logger.info("Options recent trades test completed successfully")
await clientAsync.close_connection()

async def test_options_kline(clientAsync):
"""Test options kline socket"""
logger.info(f"Starting options kline test for {OPTION_SYMBOL}, interval: {INTERVAL}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_kline_socket(OPTION_SYMBOL, INTERVAL)
async with socket as ts:
logger.debug("Waiting for kline message...")
msg = await ts.recv()
logger.info(f"Received kline message: {msg}")
assert msg['e'] == 'kline'
logger.info("Options kline test completed successfully")
await clientAsync.close_connection()

async def test_options_depth(clientAsync):
"""Test options depth socket"""
logger.info(f"Starting options depth test for {OPTION_SYMBOL}, depth: {DEPTH}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_depth_socket(OPTION_SYMBOL, DEPTH)
async with socket as ts:
logger.debug("Waiting for depth message...")
msg = await ts.recv()
logger.info(f"Received depth message: {msg}")
assert msg['e'] == 'depth'
logger.info("Options depth test completed successfully")
await clientAsync.close_connection()

async def test_options_multiplex(clientAsync):
"""Test options multiplex socket"""
bm = BinanceSocketManager(clientAsync)
streams = [
f"{OPTION_SYMBOL}@ticker",
f"{OPTION_SYMBOL}@trade",
]
logger.info(f"Starting options multiplex test with streams: {streams}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_multiplex_socket(streams)
async with socket as ts:
logger.debug("Waiting for multiplex message...")
msg = await ts.recv()
logger.info(f"Received multiplex message: {msg}")
assert 'stream' in msg
logger.info("Options multiplex test completed successfully")
await clientAsync.close_connection()

async def test_options_open_interest(clientAsync):
"""Test options open interest socket"""
logger.info(f"Starting options open interest test for {UNDERLYING_SYMBOL}, expiration: {EXPIRATION_DATE}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_open_interest_socket(UNDERLYING_SYMBOL, EXPIRATION_DATE)
async with socket as ts:
logger.debug("Waiting for open interest message...")
msg = await ts.recv()
logger.info(f"Received open interest message with {len(msg)} items")
assert len(msg) > 0
logger.info("Options open interest test completed successfully")
await clientAsync.close_connection()

async def test_options_mark_price(clientAsync):
"""Test options mark price socket"""
logger.info(f"Starting options mark price test for {UNDERLYING_SYMBOL}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_mark_price_socket(UNDERLYING_SYMBOL)
async with socket as ts:
logger.debug("Waiting for mark price message...")
msg = await ts.recv()
logger.info(f"Received mark price message with {len(msg)} items")
assert len(msg) > 0
logger.info("Options mark price test completed successfully")
await clientAsync.close_connection()

async def test_options_index_price(clientAsync):
"""Test options index price socket"""
symbol = 'ETHUSDT'
logger.info(f"Starting options index price test for {symbol}")
bm = BinanceSocketManager(clientAsync)
socket = bm.options_index_price_socket('ETHUSDT')
socket = bm.options_index_price_socket(symbol)
async with socket as ts:
logger.debug("Waiting for index price message...")
msg = await ts.recv()
logger.info(f"Received index price message: {msg}")
assert msg['e'] == 'index'
logger.info("Options index price test completed successfully")
await clientAsync.close_connection()
Loading
Loading