-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_loader.py
More file actions
76 lines (64 loc) · 2.03 KB
/
Copy pathdata_loader.py
File metadata and controls
76 lines (64 loc) · 2.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
"""SQLite helpers for OHLCV bars and raw depth snapshots."""
from __future__ import annotations
import sqlite3
from pathlib import Path
import pandas as pd
def load_price_series(
db_path: Path | str,
symbol: str,
lookback_days: int,
freq_minutes: int,
) -> pd.Series:
"""Return *close* prices resampled to `freq_minutes` for the desired span."""
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("SELECT MAX(open_time) FROM ohlcv WHERE symbol = ?", (symbol,))
latest_ms = cur.fetchone()[0]
if latest_ms is None:
raise RuntimeError(f"No OHLCV data for {symbol!r}")
latest_ts = pd.to_datetime(latest_ms, unit="ms", utc=True)
start_ts = latest_ts - pd.Timedelta(days=lookback_days)
start_ms = int(start_ts.timestamp() * 1_000)
cur.execute(
"""
SELECT open_time, close
FROM ohlcv
WHERE symbol = ? AND open_time >= ?
ORDER BY open_time ASC
""",
(symbol, start_ms),
)
rows = cur.fetchall()
conn.close()
if not rows:
raise RuntimeError("Price query yielded no rows")
df = pd.DataFrame(rows, columns=["open_time", "close"])
df["ts"] = pd.to_datetime(df["open_time"], unit="ms", utc=True)
closes = (
df.set_index("ts")
.sort_index()
.resample(f"{freq_minutes}T")
.last()
.dropna()["close"]
.astype(float)
)
return closes
def load_depth_raw(
db_path: Path | str,
symbol: str,
start_ms: int,
end_ms: int,
) -> pd.DataFrame:
"""Depth curve snapshots from `book_depth` (*percentage*, *depth*)."""
conn = sqlite3.connect(db_path)
sql = """
SELECT timestamp, percentage, depth
FROM book_depth
WHERE symbol = ? AND timestamp BETWEEN ? AND ?
"""
df = pd.read_sql_query(sql, conn, params=[symbol, start_ms, end_ms])
conn.close()
if df.empty:
raise RuntimeError("Depth query yielded no rows")
df["ts"] = pd.to_datetime(df["timestamp"], unit="ms", utc=True)
return df