This example shows how to integrate FluxQueue with a FastAPI application to enqueue background work from your HTTP handlers.
- FastAPI installed
- A running Redis instance
- FluxQueue installed with the worker
For a minimal FastAPI example, you can use:
my_app/
__init__.py
main.py
tasks/
__init__.py
tasks.pyCreate tasks/tasks.py and define your FluxQueue instance and tasks:
from fluxqueue import FluxQueue
fluxqueue = FluxQueue()
@fluxqueue.task()
async def send_welcome_email(email: str):
# Replace this with your real email sending logic
print(f"Sending welcome email to {email}")In order to expose the task we modify my_app/tasks/__init__.py like this:
__all__ = ["send_welcome_email"]
from .tasks import send_welcome_emailIn main.py, wire FluxQueue into your FastAPI routes:
from fastapi import FastAPI
from .tasks import fluxqueue, send_welcome_email
app = FastAPI()
@app.post("/signup")
async def signup(email: str):
await send_welcome_email(email=email)
return {"status": "ok"}Start the FluxQueue worker in a separate process so it can execute tasks:
fluxqueue start --tasks-module-path my_app.tasks --queue defaultRun the FastAPI server with Uvicorn:
uvicorn my_app.main:app --reloadNow when you POST to /signup, the send_welcome_email task will be enqueued and processed by the worker.