Esta plantilla de airflow tiene la estructura PostgreSQL + Airflow + FastAPI
Contiene un ejemplo con una API de weather muy simple, pero es un buen cimiento para apps más complejos.
El proyecto está organizado en los siguientes componentes principales:
- Ubicación:
/airflowy/dags - Función: Orquestador de flujos de trabajo que gestiona los pipelines de datos
- Componentes:
- WebServer: Interfaz web para gestionar y monitorizar los DAGs
- Scheduler: Programa la ejecución de los DAGs
- Worker: Ejecuta las tareas asignadas
- Triggerer: Maneja los desencadenadores de DAGs
- Ubicación:
/fastapi - Función: API REST para consumir los datos procesados
- Componentes:
- Endpoints: Puntos de acceso a la API
- Models: Esquemas de datos
- CRUD: Operaciones básicas con la base de datos
- Ubicación:
/db - Función: Base de datos relacional para almacenar datos
- Componentes:
- Scripts de inicialización en
/db/init
- Scripts de inicialización en
El proyecto incluye un ejemplo de pipeline completo para datos meteorológicos:
- DAGs para procesamiento diario y semanal
- Modelos de datos en FastAPI
- Endpoints para consultar información meteorológica
Copiar .env.example a .env y modificar las variables de entorno necesarias.
docker-compose up -d --buildEsto levantará el contenedor de postgres y el de airflow, y creará la base de datos y el usuario necesarios para que funcione.
python -m venv venv
source venv/bin/activate # Linux
venv\Scripts\activate # Windowspip install -r requirements-dev.txtpre-commit installCon esto se instalan los hooks de pre-commit para que se ejecuten al hacer commit.
Ejecutar pre-commit manualmente:
pre-commit run --all-files # Todos los archivos
pre-commit run # Archivos modificados
pre-commit run <hook_name> # Hook específicopython tests.pyLos tests funcionan con la base de datos del compose, por lo que habrá que tenerla levantada. Tener cuidado pues automáticamente se borran los datos de la base de datos al hacer un test.
En tests.py hay que añadir la función que actuaría a modo de dag, llamando a las diferentes funciones que se quieran testear.
- Las librerías en los scripts se incluirán dentro de las tareas, ya que airflow cuando actualiza los archivos, ejecuta lo que está fuera de las funciones, y eso puede dar problemas de rendimiento. Lo que está dentro de la función no se ejecuta hasta que se llama a la función.
- Para ver los cambios hechos en los scripts del airflow o fastapi realizar solo un docker-compose up -d volume_init, para que se copien los archivos en los volúmenes. Tanto airflow como fastapi usarán los archivos en los volúmenes.
- Una vez haya un servidor de SMTP disponible, activarlo en las variables de entorno.