-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunner.py
More file actions
73 lines (59 loc) · 2.25 KB
/
runner.py
File metadata and controls
73 lines (59 loc) · 2.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import asyncio
from datetime import datetime, timedelta
from app import TBot
from core.ledger import Ledger
from core.local_api import LocalDataSource
from core.models import Symbol
from core.strategy import Strategy
from middleware.llm_middleware import ModelConfig
from utilities.settings import DATA_DIR
ledger_db = Ledger()
async def run_single_cycle(run_time: datetime, sem: asyncio.Semaphore):
"""
Sets up and runs a single TBot execution, constrained by a semaphore.
"""
async with sem:
print(f"Starting cycle for: {run_time.isoformat()}")
# Instantiate components for this specific timestamp
# Note: If loading the CSV in LocalDataSource is very heavy,
# consider loading it once globally and passing the data frame instead.
ds = LocalDataSource(
filepath=DATA_DIR / "BTC_USDT_1h_feat.csv",
latest_bar=run_time
)
symbol = Symbol(name="BTCUSDT", alias="cmt_btcusdt")
model_conf = ModelConfig(
name="gemini-2.5-flash-preview-09-2025",
model="vertex_ai/gemini-2.5-flash-preview-09-2025"
)
# Assuming 'db' is a shared async-safe ledger object defined globally
strategy = Strategy(
name="levels_strategy",
symbol=symbol,
model_conf=model_conf,
ledger=ledger_db
)
bot = TBot(symbol, strategy, ds)
# Run the bot
await bot.run(time=run_time)
print(f"Completed cycle for: {run_time.isoformat()}")
async def main():
# 1. Configuration
start_time = datetime.fromisoformat("2025-01-08T00:00:00")
total_executions = 168 # 7 days * 24 hours
batch_size = 5 # Concurrent limit
sem = asyncio.Semaphore(batch_size)
tasks = []
for i in range(total_executions):
current_step_time = start_time + timedelta(hours=i)
tasks.append(run_single_cycle(current_step_time, sem))
# 4. Run all tasks
# gather runs them concurrently, but the semaphore inside run_single_cycle
# limits active execution to 10 at a time.
await asyncio.gather(*tasks)
if __name__ == "__main__":
try:
# Get the event loop
asyncio.run(main())
except KeyboardInterrupt:
print("\nExecution stopped by user.")