How should I inject dependencies correctly? #2194
-
|
How should I initialize the asynchronous AsyncFileClient class and use the fileClient instance correctly in the handle_csg function? This class is responsible for asynchronous uploading and downloading of files. I executed the following command, but an error occurred. 0. versionfaststream[rabbit]==0.5.28 1. command:
2. error:
3. codeimport os
import time
import json
from faststream import FastStream, Depends
from faststream.rabbit import RabbitBroker
from pydantic import BaseModel
from contextlib import asynccontextmanager
from download_file_async2 import AsyncFileClient
COS_DATA_DIR = os.getenv("TARGET_COS_DATASET_DIR")
FILE_ENDPOINT = os.getenv("FILE_ENDPOINT")
TOKEN_ENDPOINT = os.getenv("TOKEN_ENDPOINT")
@asynccontextmanager
async def lifespan(app: FastStream):
# Initialize connection pool
fileClient = AsyncFileClient(
TOKEN_ENDPOINT, FILE_ENDPOINT,
Inter_Arrival_Time=600,
session_pool_size=4
)
app.state.fileClient = fileClient
yield
# Close connection pool
await fileClient.close()
rabbit_url = "amqp://guest:[email protected]/"
broker = RabbitBroker(rabbit_url, log_level=3)
app = FastStream(broker, lifespan=lifespan)
@broker.subscriber(
"mid_queue",
)
async def handle_msg(
message,
fileClient: AsyncFileClient = Depends(lambda: app.state.fileClient)
):
try:
file_name = message['file_name']
cos_file_path = f"{COS_DATA_DIR}/{file_name}.aac"
upload_uuid = await fileClient.upload_file(cos_file_path)
except Exception as e:
log.logger.error(f"[processMessage]: {e}") |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
|
Hi, I've slightly corrected your code for better clarity:
import logging
import os
from contextlib import asynccontextmanager
from typing import Annotated
from faststream import FastStream, Context, ContextRepo
from faststream.rabbit import RabbitBroker
COS_DATA_DIR = os.getenv("TARGET_COS_DATASET_DIR") or "/dataset"
FILE_ENDPOINT = os.getenv("FILE_ENDPOINT")
TOKEN_ENDPOINT = os.getenv("TOKEN_ENDPOINT")
class AsyncFileClient:
def __init__(self, *args, **kwargs) -> None:
...
async def upload_file(self, file: str) -> str:
return file
async def close(self) -> None:
...
@asynccontextmanager
async def lifespan(context: ContextRepo):
# Initialize connection pool
file_client = AsyncFileClient(
TOKEN_ENDPOINT,
FILE_ENDPOINT,
Inter_Arrival_Time=600,
session_pool_size=4
)
context.set_global("file_client", file_client)
yield
# Close connection pool
await file_client.close()
rabbit_url = "amqp://guest:[email protected]/"
broker = RabbitBroker(rabbit_url, log_level=3)
app = FastStream(broker, lifespan=lifespan)
logger = logging.getLogger(__name__)
@broker.subscriber("mid_queue")
async def handle_msg(message, file_client: Annotated[AsyncFileClient, Context()]):
try:
file_name = message['file_name']
cos_file_path = f"{COS_DATA_DIR}/{file_name}.aac"
upload_uuid = await file_client.upload_file(cos_file_path)
print("di works: ", upload_uuid)
except Exception as e:
logger.error(f"[processMessage]: {e}")
@app.after_startup
async def startup():
await broker.publish({"file_name": "hello"}, "mid_queue")
faststream run consumer:app --workers 1
|
Beta Was this translation helpful? Give feedback.
Hi,
faststreamhas a dependency injection mechanism viaContext, you can read about it here. Also here is the documentation on how to set upContextinlifespan.I've slightly corrected your code for better clarity: