-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmarket_data_ingester.py
More file actions
140 lines (106 loc) · 3.76 KB
/
market_data_ingester.py
File metadata and controls
140 lines (106 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import os
import time
import requests
from dotenv import load_dotenv
from supabase import create_client
from experiment_context import with_experiment_context
load_dotenv()
PAPER_SYMBOL = "tTESTBTC:TESTUSD"
REQUEST_TIMEOUT_SECONDS = 20
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_SERVICE_ROLE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
SYMBOL = os.getenv("SYMBOL", PAPER_SYMBOL)
TIMEFRAME = os.getenv("TIMEFRAME", "1m")
def parse_positive_int_env(name, default):
raw_value = os.getenv(name)
if raw_value in (None, ""):
return default
try:
value = int(raw_value)
except ValueError as exc:
raise RuntimeError(f"{name} must be an integer.") from exc
if value <= 0:
raise RuntimeError(f"{name} must be greater than zero.")
return value
MARKET_DATA_POLL_SECONDS = parse_positive_int_env("MARKET_DATA_POLL_SECONDS", 30)
def fail_closed_check():
if not SUPABASE_URL or not SUPABASE_SERVICE_ROLE_KEY:
raise RuntimeError("Missing required Supabase env values.")
if SYMBOL != PAPER_SYMBOL:
raise RuntimeError(f"SYMBOL must be {PAPER_SYMBOL} for paper trading.")
if not TIMEFRAME:
raise RuntimeError("TIMEFRAME is required.")
def audit(supabase, event_type, severity="info", payload=None):
supabase.table("agent_audit_log").insert({
"event_type": event_type,
"severity": severity,
"payload": with_experiment_context(payload),
}).execute()
def fetch_latest_candle():
url = f"https://api-pub.bitfinex.com/v2/candles/trade:{TIMEFRAME}:{SYMBOL}/last"
response = requests.get(url, timeout=REQUEST_TIMEOUT_SECONDS)
response.raise_for_status()
mts, open_, close, high, low, volume = response.json()
return {
"symbol": SYMBOL,
"timeframe": TIMEFRAME,
"mts": mts,
"open": open_,
"close": close,
"high": high,
"low": low,
"volume": volume,
"source": "bitfinex",
}
def upsert_candle(supabase, candle):
supabase.table("market_candles").upsert(
candle,
on_conflict="symbol,timeframe,mts",
).execute()
def main():
fail_closed_check()
supabase = create_client(SUPABASE_URL, SUPABASE_SERVICE_ROLE_KEY)
last_seen_mts = None
print("Market Data Ingester started.")
print(
f"Symbol: {SYMBOL} | timeframe: {TIMEFRAME} | "
f"poll_seconds: {MARKET_DATA_POLL_SECONDS}"
)
audit(
supabase,
"market_data_ingester_started",
payload={
"symbol": SYMBOL,
"timeframe": TIMEFRAME,
"market_data_poll_seconds": MARKET_DATA_POLL_SECONDS,
},
)
while True:
try:
candle = fetch_latest_candle()
upsert_candle(supabase, candle)
if candle["mts"] == last_seen_mts:
print(
f"Updated live candle: symbol={candle['symbol']} timeframe={candle['timeframe']} "
f"mts={candle['mts']} close={candle['close']}"
)
else:
print(
f"Saved new candle: symbol={candle['symbol']} timeframe={candle['timeframe']} "
f"mts={candle['mts']} close={candle['close']}"
)
last_seen_mts = candle["mts"]
except Exception as exc:
print(f"ERROR: {exc}")
try:
audit(
supabase,
"market_data_ingester_error",
severity="error",
payload={"error": str(exc)},
)
except Exception as audit_exc:
print(f"ERROR writing market_data_ingester_error audit event: {audit_exc}")
time.sleep(MARKET_DATA_POLL_SECONDS)
if __name__ == "__main__":
main()