Skip to content

Airflow ETL Pipeline for processing drone data in the NICSI-backed Pragyan project. Automates extraction, transformation, and loading for AI analysis.

License

Notifications You must be signed in to change notification settings

mehul79/BE-Pipeline-NICSI

Repository files navigation

πŸ”„ Airflow ETL Pipeline

A comprehensive ETL (Extract, Transform, Load) pipeline built with Apache Airflow for automated data processing, encryption/decryption, validation, and database operations. This project demonstrates modern data engineering practices with containerized deployment and real-time data processing capabilities.


πŸ“‹ Overview

This Airflow-based ETL system:

  • Processes encrypted data from multiple API sources
  • Performs decryption using Java, ASP.NET, and PHP algorithms
  • Validates data integrity
  • Stores results in PostgreSQL

Key features:

  • Automated scheduling
  • Comprehensive logging
  • Robust error handling

πŸ—οΈ Architecture

πŸ”§ Apache Airflow (Python + Docker)

  • Automated DAG scheduling and task orchestration
  • Bash operators for modular task execution
  • Logging and monitoring
  • Containerized with Docker Compose

🌐 Flask API Server (Python + Flask)

  • RESTful endpoints for data reception
  • CORS-enabled
  • Real-time data processing
  • Token-based authentication

πŸ—„οΈ Database Layer (PostgreSQL + PgAdmin)

  • Dynamic table creation (date-based naming)
  • Structured logs with timestamps
  • Connection pooling
  • Data persistence and retrieval

πŸ” Security & Encryption

  • Decryption support for Java, ASP.NET, PHP
  • Token-based encryption key management
  • Integrity and validation checks
  • Secure encrypted data pipeline

πŸš€ Features

βœ… Core ETL Operations

  • Data extraction from multiple APIs
  • Multi-language decryption
  • Data validation and integrity checks
  • Dynamic PostgreSQL table creation
  • Logging and error tracking

βœ… Data Processing

  • Token-based encryption key management
  • External API validation (level codes)
  • CSV export & temporary file handling
  • Timezone-aware timestamp processing

βœ… Infrastructure

  • Docker-based multi-service setup
  • Airflow UI + Scheduler
  • PostgreSQL + pgAdmin
  • Flask API server
  • Health checks and monitoring

πŸ› οΈ Quick Setup

βœ… Prerequisites

  • Python 3.8+
  • Docker + Docker Compose
  • PostgreSQL

βœ… Setup Steps

  1. Clone the Repository
git clone <repository-url>
cd airflow_tutorial
  1. Start Services
docker-compose up -d
  1. Access Interfaces
  1. Initialize DAGs

DAGs are automatically loaded from the dags/ folder.


πŸ“ Project Structure

airflow_tutorial/
β”œβ”€β”€ dags/                  
β”‚   β”œβ”€β”€ final_etl.py             # Main ETL DAG  
β”‚   β”œβ”€β”€ data_migrations.py       # Migration DAG  
β”‚   └── tomtom/                  # Additional DAGs  
β”œβ”€β”€ data/                  
β”‚   β”œβ”€β”€ config.py                # Config & endpoints  
β”‚   β”œβ”€β”€ database.py              # DB setup & models  
β”‚   β”œβ”€β”€ etl_m12.py               # ETL core logic  
β”‚   β”œβ”€β”€ api_utils.py             # API fetching utils  
β”‚   β”œβ”€β”€ decryption_utils.py      # Decryption functions  
β”‚   β”œβ”€β”€ validation_utils.py      # Validation logic  
β”‚   └── logging_config.py        # Logging setup  
β”œβ”€β”€ docker-compose.yaml          # Multi-service orchestration  
β”œβ”€β”€ Dockerfile                   # Airflow Docker image  
β”œβ”€β”€ requirements.txt             # Python packages  
└── flask_app.py                 # Flask API server  

πŸ”§ DAG Workflows

πŸ“Œ final_etl.py - Main ETL Pipeline

  1. fetch_the_api – Fetch external API data
  2. validate_the_api – Validate API data
  3. log_the_data – Configure logging
  4. decrypt_the_data – Decrypt based on algorithm
  5. validate_the_data – Final data validation
  6. final_etl_processing – Store processed data

πŸ“Œ data_migrations.py - Migration Workflow

  1. table_created – Create PostgreSQL tables dynamically
  2. fetch_the_api – Perform local DB ETL operations

πŸ” Decryption Support

Supports automatic decryption based on portLanguageId:

  • Java β†’ decrypt_java_api()
  • ASP.NET β†’ decrypt_asp_dot_net_api()
  • PHP β†’ decrypt_php_data()

πŸ“Š Data Validation

Includes:

  • Level code verification from external APIs
  • Field integrity checks (ministryCode, projectCode, etc.)
  • Null/empty value detection
  • Token validation
  • Granularity & hierarchy validation

πŸ—„οΈ Dynamic Database Schema

Example:

CREATE TABLE logs_YYYY_MM_DD (
    id SERIAL PRIMARY KEY,
    error VARCHAR,
    token VARCHAR,
    error_details TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

🌐 API Endpoints

πŸ”Œ Flask Server (Port 5001)

  • POST /recvApiData – Receive API data
  • GET /getData – Fetch stored/encrypted data

πŸ”— External APIs

  • Data API: http://localhost:3000/getData
  • Encryption Key API: http://localhost:3000/getEncryptionKey?token=
  • Validation API: http://10.23.124.59:2222/validateDataLevelCode

βš™οΈ Configuration

πŸ”‘ Environment Variables

DATABASE_URL="postgresql://postgres:[email protected]:5432/demo"
AIRFLOW_UID=50000
AIRFLOW_WWW_USER_USERNAME=mehul
AIRFLOW_WWW_USER_PASSWORD=mehul

πŸ“ About ETL, Cron Jobs, and Airflow

πŸ”„ What is ETL?

  • Extract: Gather data from sources
  • Transform: Clean, validate, reshape data
  • Load: Store into target DB or warehouse

⏰ Why Not Cron Jobs?

  • Lacks error tracking and conditional logic
  • Hard to monitor/scale/debug

πŸŒ€ Why Apache Airflow?

  • Visual DAGs and workflow management
  • Built-in logging, retries, alerts
  • Parallel execution and scheduling
  • Web UI for monitoring
  • Modular, scalable, and production-grade

🀝 Contributing

We welcome contributions: Submit PRs or open issues to contribute!


πŸ‘¨β€πŸ’» Author Notes

Created by Mehul Gupta to demonstrate a modern, production-ready ETL pipeline with Airflow, Docker, Flask, and PostgreSQL. Includes secure multi-language decryption, robust validation, and real-time processing.


πŸ“„ License

This project is licensed under the MIT License. See the LICENSE file for more details.


Happy data processing! πŸ”„

About

Airflow ETL Pipeline for processing drone data in the NICSI-backed Pragyan project. Automates extraction, transformation, and loading for AI analysis.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published