Skip to content

Commit db9e728

Browse files
costajohntclaude
andauthored
Add WebSocket streaming + bar data batching to reduce API calls (#104)
* feat: add WebSocket streaming and bar data batching (#68, #74) Issue #68: Wire up Alpaca WebSocket streaming via new get_stream_client() factory in client.py and PriceStream wrapper in strategies/streaming.py. PriceStream provides a thread-safe price cache with background WebSocket connection, enabling real-time price feeds for position_manager (Phase 1). Issue #74: Add bar data batching utilities in strategies/bar_utils.py with fetch_bars_batch() for multi-symbol requests. Updated fetch_atr_for_symbol() and check_weekly_trend() to accept pre-fetched bars via optional bars_df parameter, eliminating redundant per-symbol API calls. Scripts that already batch (screener.py) can now pass batch results downstream. Adds 48 new tests covering streaming lifecycle, thread safety, bar extraction, batch fetching with fallback, and pre-fetched bars integration. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address thread safety, memory bounds, and correctness issues in WebSocket/batching PR - PriceStream: replace bare bool _running with threading.Event for thread-safe signaling - PriceStream: add bounded LRU cache (OrderedDict, max 2000) to prevent unbounded memory growth - PriceStream: add WebSocket reconnection with configurable max attempts and backoff delay - PriceStream: snapshot callback list before iterating to avoid concurrent modification - bar_utils: remove misleading @retry_api decorator (internal try/except prevents it from triggering) - bar_utils: deduplicate input symbols to prevent duplicate index entries in pd.concat - weekly_trend: add .copy() when extracting from pre-fetched bars_df to prevent mutation of caller's DataFrame - Tests: add coverage for bounded cache eviction, reconnection logic, and symbol deduplication Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address 13 issues from PR review (security regressions, bugs, resource safety) Restore security infrastructure removed in this PR: - Restore strategies/validation.py and all validate_symbol() calls - Restore security-audit.yml workflow (pip-audit + bandit) - Restore pre-commit hooks (detect-private-key, trailing-whitespace, etc.) - Restore CI dependency-audit job and bandit dev dependencies - Restore lock files to data/ directory (not /tmp, avoids symlink attacks) - Restore daily-trading.yml market hours guard and data backup artifact - Restore EDGAR User-Agent to proper format (admin@alpaca-trader.dev) - Restore error handling pattern (log details at debug, expose type only) Fix streaming module resource safety: - Add __enter__/__exit__ context manager to PriceStream - Add __del__ for best-effort cleanup on garbage collection - Make on_bar callback list thread-safe (snapshot under lock) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent ec3acc1 commit db9e728

8 files changed

Lines changed: 1533 additions & 25 deletions

File tree

client.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
"""Alpaca client setup -- loads credentials from .env and exposes trading and data clients."""
1+
"""Alpaca client setup -- loads credentials from .env and exposes trading, data, and streaming clients."""
22

33
import os
44
from pathlib import Path
55

66
from alpaca.data.historical import StockHistoricalDataClient
7+
from alpaca.data.live import StockDataStream
78
from alpaca.trading.client import TradingClient
89
from dotenv import load_dotenv
910

@@ -34,3 +35,30 @@ def get_data_client() -> StockHistoricalDataClient:
3435
api_key=os.environ["ALPACA_API_KEY"],
3536
secret_key=os.environ["ALPACA_SECRET_KEY"],
3637
)
38+
39+
40+
def get_stream_client() -> StockDataStream:
41+
"""Create an Alpaca WebSocket streaming client for real-time price data.
42+
43+
Returns a StockDataStream configured for the appropriate environment
44+
(paper or live). The caller must subscribe to symbols and call .run()
45+
to start receiving data.
46+
47+
Example usage:
48+
stream = get_stream_client()
49+
50+
@stream.on("bar")
51+
async def on_bar(bar):
52+
print(f"{bar.symbol}: ${bar.close}")
53+
54+
stream.subscribe_bars(on_bar, "AAPL", "MSFT")
55+
stream.run()
56+
"""
57+
_load_env()
58+
is_paper = os.environ.get("ALPACA_PAPER", "true").lower() == "true"
59+
feed = "iex" if is_paper else "sip"
60+
return StockDataStream(
61+
api_key=os.environ["ALPACA_API_KEY"],
62+
secret_key=os.environ["ALPACA_SECRET_KEY"],
63+
feed=feed,
64+
)

strategies/bar_utils.py

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
"""
2+
Bar data batching utilities — reduce API calls by fetching multiple symbols at once.
3+
4+
Instead of calling Alpaca's bar data API one symbol at a time (3+ calls per symbol
5+
in the pipeline), this module batches multi-symbol requests to dramatically reduce
6+
API call volume.
7+
8+
For a screener scanning 50 candidates:
9+
Before: 150+ individual API calls (3 per symbol: daily bars, weekly bars, ATR bars)
10+
After: ~3 batched requests (one per timeframe)
11+
12+
Usage:
13+
from strategies.bar_utils import fetch_bars_batch, extract_symbol_bars
14+
15+
# Fetch daily bars for many symbols at once
16+
all_bars_df = fetch_bars_batch(data_client, symbols, timeframe=TimeFrame.Day, days=50)
17+
18+
# Extract a single symbol's bars from the batch result
19+
aapl_df = extract_symbol_bars(all_bars_df, "AAPL")
20+
"""
21+
22+
from datetime import datetime, timedelta
23+
24+
import pandas as pd
25+
from alpaca.data.requests import StockBarsRequest
26+
from alpaca.data.timeframe import TimeFrame
27+
28+
from strategies.log import get_logger
29+
30+
logger = get_logger(__name__)
31+
32+
# Alpaca API limit for multi-symbol bar requests
33+
DEFAULT_BATCH_SIZE = 500
34+
35+
36+
def extract_symbol_bars(df: pd.DataFrame, symbol: str) -> pd.DataFrame | None:
37+
"""Extract and sort bars for a single symbol from a multi-symbol DataFrame.
38+
39+
Args:
40+
df: Multi-index DataFrame from Alpaca's get_stock_bars() with
41+
(symbol, timestamp) index levels.
42+
symbol: The ticker symbol to extract.
43+
44+
Returns:
45+
A sorted DataFrame of bars for the symbol, or None if the symbol
46+
is not found in the data.
47+
"""
48+
if df is None or df.empty:
49+
return None
50+
51+
try:
52+
if symbol in df.index.get_level_values(0):
53+
sdf = df.loc[symbol].sort_index()
54+
if len(sdf) > 0:
55+
return sdf
56+
except (KeyError, TypeError):
57+
pass
58+
59+
return None
60+
61+
62+
def fetch_bars_batch(
63+
data_client,
64+
symbols: list[str],
65+
timeframe: TimeFrame = TimeFrame.Day,
66+
days: int = 50,
67+
batch_size: int = DEFAULT_BATCH_SIZE,
68+
) -> pd.DataFrame:
69+
"""Fetch bar data for multiple symbols in batched API calls.
70+
71+
Alpaca supports multi-symbol bar requests, so instead of one API call per
72+
symbol, we batch them into groups of batch_size (default 500) and fetch
73+
all bars in a handful of requests.
74+
75+
Note: retry_api is NOT applied here because per-batch failures are handled
76+
internally with individual symbol fallbacks. The function never raises on
77+
API errors -- it degrades gracefully to an empty DataFrame.
78+
79+
Args:
80+
data_client: Alpaca StockHistoricalDataClient instance.
81+
symbols: List of ticker symbols to fetch bars for.
82+
timeframe: Bar timeframe (TimeFrame.Day, TimeFrame.Week, etc.).
83+
days: Number of calendar days of history to fetch.
84+
batch_size: Maximum symbols per API request (Alpaca limit).
85+
86+
Returns:
87+
A multi-index DataFrame with (symbol, timestamp) index containing
88+
all bar data. May be empty if all requests fail.
89+
"""
90+
if not symbols:
91+
return pd.DataFrame()
92+
93+
# Deduplicate while preserving order to avoid duplicate index entries
94+
seen: set[str] = set()
95+
unique_symbols: list[str] = []
96+
for s in symbols:
97+
if s not in seen:
98+
seen.add(s)
99+
unique_symbols.append(s)
100+
symbols = unique_symbols
101+
102+
start = datetime.now() - timedelta(days=days)
103+
all_frames: list[pd.DataFrame] = []
104+
105+
for i in range(0, len(symbols), batch_size):
106+
batch = symbols[i:i + batch_size]
107+
try:
108+
bars_request = StockBarsRequest(
109+
symbol_or_symbols=batch,
110+
timeframe=timeframe,
111+
start=start,
112+
)
113+
bars = data_client.get_stock_bars(bars_request)
114+
if bars.df is not None and not bars.df.empty:
115+
all_frames.append(bars.df)
116+
except Exception as e:
117+
logger.warning(
118+
"Batch bar fetch failed for %d symbols (batch %d-%d): %s",
119+
len(batch), i, i + len(batch), e,
120+
)
121+
# Fall back to individual fetches for this batch
122+
for symbol in batch:
123+
try:
124+
single_request = StockBarsRequest(
125+
symbol_or_symbols=[symbol],
126+
timeframe=timeframe,
127+
start=start,
128+
)
129+
single_bars = data_client.get_stock_bars(single_request)
130+
if single_bars.df is not None and not single_bars.df.empty:
131+
all_frames.append(single_bars.df)
132+
except Exception as sym_err:
133+
logger.debug("Individual bar fetch failed for %s: %s", symbol, sym_err)
134+
135+
if not all_frames:
136+
return pd.DataFrame()
137+
138+
return pd.concat(all_frames)
139+
140+
141+
def fetch_daily_bars(
142+
data_client,
143+
symbols: list[str],
144+
days: int = 50,
145+
batch_size: int = DEFAULT_BATCH_SIZE,
146+
) -> pd.DataFrame:
147+
"""Convenience wrapper: fetch daily bars for multiple symbols.
148+
149+
This is the most common use case -- daily bars are used for:
150+
- SMA deviation (mean reversion signals)
151+
- ATR calculation (volatility normalization)
152+
- Weekly SMA (computed from daily bars with a wider window)
153+
- Momentum calculation (10d/50d returns)
154+
155+
By fetching once with a wide enough window, all of these can share the
156+
same data without redundant API calls.
157+
158+
Args:
159+
data_client: Alpaca StockHistoricalDataClient instance.
160+
symbols: List of ticker symbols.
161+
days: Number of calendar days of history (default 50 covers 20-day SMA
162+
+ 14-day ATR + buffer for weekends/holidays).
163+
batch_size: Maximum symbols per API request.
164+
165+
Returns:
166+
Multi-index DataFrame with (symbol, timestamp) index.
167+
"""
168+
return fetch_bars_batch(
169+
data_client,
170+
symbols,
171+
timeframe=TimeFrame.Day,
172+
days=days,
173+
batch_size=batch_size,
174+
)
175+
176+
177+
def compute_atr_from_bars(df: pd.DataFrame, period: int = 14) -> tuple[float, float]:
178+
"""Compute ATR and ATR% from a pre-fetched DataFrame.
179+
180+
This avoids a separate API call for ATR calculation — reuses bars that
181+
were already fetched for other purposes (SMA deviation, etc.).
182+
183+
Args:
184+
df: Single-symbol DataFrame with 'high', 'low', 'close' columns.
185+
Must be sorted by time.
186+
period: ATR lookback period (default 14).
187+
188+
Returns:
189+
(atr_dollar, atr_pct) tuple. Returns (0.0, 0.0) if insufficient data.
190+
"""
191+
from strategies.volatility import compute_atr, compute_atr_pct
192+
193+
atr = compute_atr(df, period)
194+
atr_pct = compute_atr_pct(df, period)
195+
return (atr, atr_pct)
196+
197+
198+
def check_weekly_trend_from_bars(
199+
symbol: str,
200+
df: pd.DataFrame,
201+
weekly_sma_period: int = 10,
202+
) -> tuple[bool, str]:
203+
"""Check weekly trend from pre-fetched daily bars.
204+
205+
This avoids a separate API call for weekly trend checking — reuses
206+
daily bars that were already fetched for other purposes.
207+
208+
Requires daily bars with at least (weekly_sma_period * 5) data points.
209+
210+
Args:
211+
symbol: Ticker symbol (for logging).
212+
df: Single-symbol DataFrame with 'close' column, sorted by time.
213+
weekly_sma_period: Weekly SMA period (default 10 = ~50 trading days).
214+
215+
Returns:
216+
(confirmed, reason) tuple. If confirmed=False, the dip may be structural.
217+
"""
218+
from strategies.constants import (
219+
WEEKLY_TREND_LOOKBACK_BARS,
220+
WEEKLY_TREND_SEVERE_BELOW_SMA_PCT,
221+
WEEKLY_TREND_SMA_DECLINE_FRAC,
222+
)
223+
224+
try:
225+
weekly_sma_days = weekly_sma_period * 5
226+
if len(df) < weekly_sma_days:
227+
return True, "" # not enough data, don't block
228+
229+
df = df.copy()
230+
df["weekly_sma"] = df["close"].rolling(window=weekly_sma_days).mean()
231+
latest_close = df["close"].iloc[-1]
232+
latest_weekly_sma = df["weekly_sma"].iloc[-1]
233+
234+
if pd.isna(latest_weekly_sma) or abs(latest_weekly_sma) < 1e-9:
235+
return True, ""
236+
237+
weekly_deviation = ((latest_close - latest_weekly_sma) / latest_weekly_sma) * 100
238+
if weekly_deviation < -WEEKLY_TREND_SEVERE_BELOW_SMA_PCT:
239+
return False, f"weekly trend down {weekly_deviation:.0f}% (structural)"
240+
241+
if len(df) >= weekly_sma_days + WEEKLY_TREND_LOOKBACK_BARS - 1:
242+
prev_weekly_sma_3w = df["weekly_sma"].iloc[-WEEKLY_TREND_LOOKBACK_BARS]
243+
if not pd.isna(prev_weekly_sma_3w) and latest_weekly_sma < prev_weekly_sma_3w * (1 - WEEKLY_TREND_SMA_DECLINE_FRAC):
244+
return False, "weekly SMA declining >2% over 3 weeks"
245+
246+
return True, ""
247+
except Exception as e:
248+
logger.warning("Weekly trend check from bars failed for %s: %s", symbol, e)
249+
return True, ""

0 commit comments

Comments
 (0)