-
-
Notifications
You must be signed in to change notification settings - Fork 749
Open
Labels
Description
hi
i am trying to fetch historical exchange data,
and to use backend callback to store the data.
My code do the job but i am looking a cleaner way to do that.
import json
from datetime import datetime
import asyncio
from datetime import timedelta, UTC
from cryptofeed import FeedHandler
from cryptofeed.backends.quest import CandlesQuest
from cryptofeed.defines import CANDLES
from cryptofeed.exchanges import BinanceFutures
from backtest.config import settings
bf = BinanceFutures()
cq = CandlesQuest(host=settings.quest_host, port=settings.quest_port)
# async def candle_callback(c, receipt_timestamp):
# print(f"Candle received at {receipt_timestamp}: {c}")
async def get_history(loop, start, end):
cq.start(loop)
cq.running
async for data in bf.candles(
"BTC-USDT-PERP",
start,
end,
):
print("=================")
for row in data:
d = row.to_dict()
d["receipt_timestamp"] = d["timestamp"] + 1
await cq(row, d["receipt_timestamp"])
await cq.stop()
return
def main():
config = {"log": {"filename": "demo.log", "level": "DEBUG", "disabled": False}}
today = datetime.now(UTC)
yesterday = datetime.now(UTC) - timedelta(minutes=24)
start = yesterday.strftime("%Y-%m-%d %H:%M:%S")
end = today.strftime("%Y-%m-%d %H:%M:%S")
f = FeedHandler(config=config)
f.run(start_loop=False)
f.add_feed(
BinanceFutures(
symbols=["BTC-USDT-PERP"],
channels=[CANDLES],
callbacks={
CANDLES: CandlesQuest(
host=settings.quest_host, port=settings.quest_port
)
},
)
)
loop = asyncio.get_event_loop()
# loop.create_task(get_history(loop, start, end))
loop.run_until_complete(get_history(loop, start, end))
loop.run_forever()
if __name__ == "__main__":
main()
It work until but it raise an exception when i stop the script "CTRL-C"
2024-06-27 17:06:56,495 : INFO : BINANCE_FUTURES: starting backend task CandlesQuest with multiprocessing=False
2024-06-27 17:06:56,497 : DEBUG : BINANCE_FUTURES.http.0: create HTTP session
2024-06-27 17:06:56,497 : DEBUG : BINANCE_FUTURES.http.0: requesting data from https://fapi.binance.com/fapi/v1/klines?symbol=BTCUSDT&interval=1m&limit=1000&startTime=1719520976000&endTime=1719522416000
2024-06-27 17:06:56,515 : DEBUG : BINANCE_FUTURES.ws.2: connecting to wss://fstream.binance.com/stream?streams=btcusdt@kline_1m
=================
^CTask was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<QuestCallback.writer() running at /home/me/project/backtest/.venv/lib/python3.12/site-packages/cryptofeed/backends/quest.py:27> wait_for=<Future pending cb=[Task.task_wakeup()]>>
backtest-py3.12```