-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasync_with_aiohttp_client_ring_buffer_timescaledb.py
More file actions
181 lines (140 loc) · 5.4 KB
/
async_with_aiohttp_client_ring_buffer_timescaledb.py
File metadata and controls
181 lines (140 loc) · 5.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import json
import time
from collections import deque
from datetime import datetime, timezone
import asyncio
import aiohttp
import asyncpg
import pyarrow as pa
db_user = "postgres"
db_password = "CHANGE_PASSWORD"
class AsyncRingBuffer:
def __init__(self, maxlen=10_000):
self._buf = deque(maxlen=maxlen)
self._lock = asyncio.Lock()
async def push(self, item):
async with self._lock:
self._buf.append(item)
async def pop_all(self):
async with self._lock:
items = list(self._buf)
self._buf.clear()
return items
def trade_events_to_pa_table(events: list[dict]) -> pa.Table:
now = time.time()
times = [now] * len(events) # or real exchange timestamps
return pa.table({
"time": pa.array(
[int(t * 1000) for t in times],
type=pa.timestamp("ms", tz="UTC"),
),
"maker_side": [e["makerSide"] for e in events],
"amount": [float(e["amount"]) for e in events],
"price": [float(e["price"]) for e in events],
"tid": [int(e["tid"]) for e in events],
})
async def write_batch_to_timescaledb(conn: asyncpg.Connection, table: pa.Table):
if table.num_rows == 0:
return
raw_times = table["time"].to_pylist()
time_values: list[datetime] = []
for t in raw_times:
if isinstance(t, datetime):
# Ensure it's timezone-aware for timestamptz
if t.tzinfo is None:
t = t.replace(tzinfo=timezone.utc)
time_values.append(t)
elif isinstance(t, (int, float)):
# Interpret as Unix seconds
time_values.append(datetime.fromtimestamp(t, tz=timezone.utc))
elif t is None:
# You probably don't want NULL here if column is NOT NULL,
# but this keeps us from crashing.
time_values.append(None)
else:
raise TypeError(f"Unexpected type for time column: {type(t)} -> {t!r}")
maker_side = table["maker_side"].to_pylist()
amount = table["amount"].to_pylist()
price = table["price"].to_pylist()
tid = table["tid"].to_pylist()
rows = list(zip(
time_values,
maker_side,
amount,
price,
tid,
))
await conn.copy_records_to_table(
"trades",
records=rows,
columns=["time", "maker_side", "amount", "price", "tid"],
)
async def pyarrow_consumer(ring: AsyncRingBuffer, db_pool: asyncpg.Pool):
while True:
await asyncio.sleep(1.0)
events = await ring.pop_all()
if not events:
continue
table = trade_events_to_pa_table(events)
async with db_pool.acquire() as conn:
await write_batch_to_timescaledb(conn, table)
# 🔍 Deep debug: what DB / host / port / user does Python see,
# and how many rows are in public.trades *from THIS connection*?
dbname = await conn.fetchval("SELECT current_database();")
dbuser = await conn.fetchval("SELECT current_user;")
host = await conn.fetchval("SELECT inet_server_addr();")
port = await conn.fetchval("SELECT inet_server_port();")
count = await conn.fetchval("SELECT count(*) FROM public.trades;")
print(
f"[DB] Wrote batch of {table.num_rows} rows; "
f"count(trades)={count}; "
f"db={dbname} user={dbuser} host={host} port={port}"
)
async def stream_gemini(symbol: str, ring: AsyncRingBuffer):
url = f"wss://api.gemini.com/v1/marketdata/{symbol}"
async with aiohttp.ClientSession() as session:
async with session.ws_connect(url) as ws:
print(f"Connected to Gemini WebSocket for {symbol}")
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
if data.get("type") != "update":
continue
for event in data["events"]:
if event["type"] == "trade":
await ring.push(event)
async def main():
print("Initializing TimescaleDB connection pool...")
# Try to create the asyncpg pool and report success or failure
try:
db_pool = await asyncpg.create_pool(
user=db_user,
password=db_password,
database="marketdata",
host="127.0.0.1",
port=5432,
min_size=1,
max_size=4,
)
except Exception as e:
print("❌ Failed to connect to TimescaleDB!")
print("Error:", e)
return
print("✅ Successfully connected to TimescaleDB.")
# Optionally: test the connection with a simple query
try:
async with db_pool.acquire() as conn:
result = await conn.fetchval("SELECT now();")
print(f"📡 TimescaleDB responded. Server time: {result}")
except Exception as e:
print("❌ Failed to execute test query against TimescaleDB.")
print("Error:", e)
return
# If the DB is working, start the tasks
print("🚀 Starting consumer and producer tasks...")
ring = AsyncRingBuffer(maxlen=50_000)
consumer = asyncio.create_task(pyarrow_consumer(ring, db_pool))
producer = asyncio.create_task(stream_gemini("btcusd", ring))
await asyncio.gather(consumer, producer)
if __name__ == "__main__":
asyncio.run(main())