Skip to content

Commit ce5ef1d

Browse files
authored
add streamer reconnection callbacks (#186)
* add streamer reconnection callbacks * add reconnect args to create() * change create to __await__
1 parent b7e88e2 commit ce5ef1d

File tree

6 files changed

+180
-56
lines changed

6 files changed

+180
-56
lines changed

docs/account-streamer.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
Account Streamer
22
================
33

4+
Basic usage
5+
-----------
6+
47
The account streamer is used to track account-level updates, such as order fills, watchlist updates and quote alerts.
58
Typically, you'll want a separate task running for the account streamer, which can then notify your application about important events.
69

@@ -35,3 +38,24 @@ Probably the most important information the account streamer handles is order fi
3538
3639
async for order in streamer.listen(PlacedOrder):
3740
print(order)
41+
42+
Retry callback
43+
--------------
44+
45+
The account streamer has a special "callback" function which can be used to execute arbitrary code whenever the websocket reconnects. This is useful for re-subscribing to whatever alerts you wanted to subscribe to initially (in fact, you can probably use the same function/code you use when initializing the connection).
46+
The callback function should look something like this:
47+
48+
.. code-block:: python
49+
50+
async def callback(streamer: AlertStreamer, arg1, arg2):
51+
await streamer.subscribe_quote_alerts()
52+
53+
The requirements are that the first parameter be the `AlertStreamer` instance, and the function should be asynchronous. Other than that, you have the flexibility to decide what arguments you want to use.
54+
This callback can then be used when creating the streamer:
55+
56+
.. code-block:: python
57+
58+
async with AlertStreamer(session, reconnect_fn=callback, reconnect_args=(arg1, arg2)) as streamer:
59+
# ...
60+
61+
The reconnection uses `websockets`' exponential backoff algorithm, which can be configured through environment variables `here <https://websockets.readthedocs.io/en/14.1/reference/variables.html>`_.

docs/data-streamer.rst

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ You can create a streamer using an active production session:
1010
.. code-block:: python
1111
1212
from tastytrade import DXLinkStreamer
13-
streamer = await DXLinkStreamer.create(session)
13+
streamer = await DXLinkStreamer(session)
1414
1515
Or, you can create a streamer using an asynchronous context manager:
1616

@@ -110,7 +110,7 @@ For example, we can use the streamer to create an option chain that will continu
110110
# the `streamer_symbol` property is the symbol used by the streamer
111111
streamer_symbols = [o.streamer_symbol for o in options]
112112
113-
streamer = await DXLinkStreamer.create(session)
113+
streamer = await DXLinkStreamer(session)
114114
# subscribe to quotes and greeks for all options on that date
115115
await streamer.subscribe(Quote, [symbol] + streamer_symbols)
116116
await streamer.subscribe(Greeks, streamer_symbols)
@@ -146,3 +146,24 @@ Now, we can access the quotes and greeks at any time, and they'll be up-to-date
146146
print(live_prices.quotes[symbol], live_prices.greeks[symbol])
147147
148148
>>> Quote(eventSymbol='.SPY230721C387', eventTime=0, sequence=0, timeNanoPart=0, bidTime=1689365699000, bidExchangeCode='X', bidPrice=62.01, bidSize=50.0, askTime=1689365699000, askExchangeCode='X', askPrice=62.83, askSize=50.0) Greeks(eventSymbol='.SPY230721C387', eventTime=0, eventFlags=0, index=7255910303911641088, time=1689398266363, sequence=0, price=62.6049270064687, volatility=0.536152815048564, delta=0.971506591907638, gamma=0.001814464566110275, theta=-0.1440768557397271, rho=0.0831882577866199, vega=0.0436861878838861)
149+
150+
Retry callback
151+
--------------
152+
153+
The data streamer has a special "callback" function which can be used to execute arbitrary code whenever the websocket reconnects. This is useful for re-subscribing to whatever events you wanted to subscribe to initially (in fact, you can probably use the same function/code you use when initializing the connection).
154+
The callback function should look something like this:
155+
156+
.. code-block:: python
157+
158+
async def callback(streamer: DXLinkStreamer, arg1, arg2):
159+
await streamer.subscribe(Quote, ['SPY'])
160+
161+
The requirements are that the first parameter be the `DXLinkStreamer` instance, and the function should be asynchronous. Other than that, you have the flexibility to decide what arguments you want to use.
162+
This callback can then be used when creating the streamer:
163+
164+
.. code-block:: python
165+
166+
async with DXLinkStreamer(session, reconnect_fn=callback, reconnect_args=(arg1, arg2)) as streamer:
167+
# ...
168+
169+
The reconnection uses `websockets`' exponential backoff algorithm, which can be configured through environment variables `here <https://websockets.readthedocs.io/en/14.1/reference/variables.html>`_.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ dependencies = [
1616
"httpx>=0.27.2",
1717
"pandas-market-calendars>=4.4.1",
1818
"pydantic>=2.9.2",
19-
"websockets>=14.1",
19+
"websockets>=14.1,<15",
2020
]
2121

2222
[project.urls]
@@ -29,7 +29,7 @@ dev-dependencies = [
2929
"pytest-aio>=1.5.0",
3030
"pytest-cov>=5.0.0",
3131
"ruff>=0.6.9",
32-
"pyright>=1.1.384",
32+
"pyright>=1.1.390",
3333
]
3434

3535
[tool.setuptools.package-data]

0 commit comments

Comments
 (0)