Skip to content

Latest commit

 

History

History
405 lines (332 loc) · 14.6 KB

File metadata and controls

405 lines (332 loc) · 14.6 KB

CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

Project Overview

SURIMI DataLab is a comprehensive data infrastructure platform for the SURIMI project (European Commission/EDITO platform). It implements a modern data lakehouse architecture for healthcare data interoperability, caregiver/patient management, and multilingual data handling.

Technology Stack

  • Storage: MinIO (S3-compatible object storage)
  • Query Engine: Trino (distributed SQL)
  • Catalog: Hive Metastore (table metadata)
  • Metadata Management: DataHub (data discovery, lineage, ownership)
  • Analytics: Superset (BI dashboards), Jupyter notebooks
  • Orchestration: Airflow (workflow automation)
  • Ingestion: Airflow DAG with PostgreSQL tracking
  • Data Formats: CSV ingestion → Parquet storage; Shapefile ingestion → Parquet (geometry as WKT)
  • Geospatial: geopandas / fiona / shapely / pyproj (installed in Airflow container)

Data Flow

CSV Files   → MinIO (data/raw/) → Airflow DAG → Schema Validation & Conversion →
Shapefiles  ↗                                  → Geometry → WKT string column
MinIO (data/hive/ as Parquet) → Hive Metastore → Trino → Superset/Jupyter/DataHub
                  ↓
            data/processed/ (archived originals)

Development Commands

Docker Operations

# Start all services
docker-compose up -d

# View all logs
docker-compose logs -f

# View ingestion logs
docker-compose logs -f airflow-scheduler

# Stop all services
docker-compose down

# Restart after DAG changes
docker-compose restart airflow-scheduler

# Check service status
docker-compose ps

MinIO Operations

# Create required folders (run once after first startup)
docker exec -it minio bash -c "mkdir -p /bucket_data/data/raw /bucket_data/data/staging /bucket_data/data/hive /bucket_data/data/processed"

# Access MinIO console: http://localhost:9001
# Credentials: minioadmin/minioadmin

Database Operations

# Access PostgreSQL (for metadata and ingestion tracking)
docker exec -it postgres psql -U postgres -d datahub

# Useful queries:
# SELECT * FROM ingestion_audit ORDER BY processed_at DESC LIMIT 10;
# SELECT table_name, COUNT(*) as files FROM ingestion_audit GROUP BY table_name;

Trino Queries

# Access Trino CLI
docker exec -it trino trino

# Or via HTTP: http://localhost:8080

# Useful Trino queries:
# SHOW CATALOGS;
# SHOW SCHEMAS FROM hive;
# SHOW TABLES FROM hive.default;
# DESCRIBE hive.default.table_name;
# SELECT * FROM hive.default.table_name LIMIT 10;

Airflow DAG Operations

# DAG location: airflow/dags/comprehensive_csv_ingestion_dag.py

# Trigger ingestion manually
docker exec airflow-scheduler airflow dags trigger comprehensive_csv_ingestion

# View DAG status
docker exec airflow-scheduler airflow dags list

# Test specific task
docker exec airflow-scheduler airflow tasks test comprehensive_csv_ingestion scan_raw_bucket 2025-01-01

# Access Airflow Web UI
# Open http://localhost:8081

Architecture & Code Organization

Airflow Ingestion DAG (airflow/dags/comprehensive_csv_ingestion_dag.py)

The comprehensive CSV ingestion pipeline with automatic schema detection, Parquet conversion, and DataHub integration.

Key Functions:

  • scan_raw_bucket() - Scans MinIO bucket/raw/ for CSV files and Shapefile bundles (.shp + companions)
  • process_csv_files() - Parses README or auto-detects schema from CSV; reads shapefile schema via geopandas
  • move_to_staging() - Moves files from raw/ to staging/ folder; moves all shapefile components together
  • convert_to_parquet() - Converts CSV to Parquet with MERGE/REPLACE/APPEND modes; converts shapefile geometry to WKT
  • create_hive_tables() - Creates external Hive tables pointing to Parquet files
  • emit_to_datahub() - Emits metadata, lineage, and statistics to DataHub; adds geospatial/shapefile tags and CRS info
  • move_to_processed() - Archives files to processed/ folder; moves all shapefile components together

Ingestion Pipeline Flow:

  1. DAG runs on schedule (every 5 minutes by default)
  2. Scans bucket/raw/ for new CSV files
  3. Parses README.txt or auto-detects schema
  4. Moves CSV to bucket/staging/
  5. Converts to Parquet with deduplication (MERGE mode)
  6. Stores Parquet in bucket/hive/schema/table/
  7. Creates Hive external table
  8. Emits metadata to DataHub
  9. Archives CSV to bucket/processed/

Environment Variables:

  • MINIO_BUCKET - Bucket name (default: data)
  • RAW_PREFIX - Raw folder prefix (default: raw/)
  • STAGING_PREFIX - Staging folder prefix (default: staging/)
  • HIVE_PREFIX - Hive storage prefix (default: hive/)
  • PROCESSED_PREFIX - Archive folder prefix (default: processed/)
  • HIVE_DEFAULT_SCHEMA - Default schema name (default: tables)
  • INGESTION_MODE - Ingestion mode: merge, replace, or append (default: merge)
  • DATAHUB_ENABLED - Enable DataHub emission (default: true)
  • DATAHUB_GMS_URL - DataHub GMS endpoint

Docker Compose Services

Service Dependencies:

  • postgres → Foundation for all metadata
  • hive-metastore → Depends on postgres
  • trino → Depends on hive-metastore, minio
  • datahub-gms → Depends on elasticsearch, neo4j, kafka, postgres
  • datahub-frontend → Depends on datahub-gms
  • airflow → Depends on postgres, minio, trino

Port Mappings:

  • MinIO: 9000 (API), 9001 (Console)
  • PostgreSQL: 5432
  • Hive Metastore: 9083
  • Trino: 8080
  • DataHub Frontend: 3000
  • DataHub GMS: 8090
  • Superset: 8088
  • Airflow: 8081
  • Jupyter: 8888
  • Elasticsearch: 9200
  • Neo4j: 7474 (HTTP), 7687 (Bolt)
  • Kafka: 9092

Trino Configuration (trino/etc/)

  • config.properties - Trino server config
  • jvm.config - JVM memory settings
  • node.properties - Node identification
  • catalog/hive.properties - Hive connector config (connects to hive-metastore:9083)
  • core-site.xml - Hadoop configuration for S3/MinIO access

Important: Trino uses Hive connector to query data stored in MinIO via the Hive Metastore catalog.

Airflow DAGs (airflow/dags/)

  • comprehensive_csv_ingestion_dag.py - Main ingestion pipeline with README parsing, Parquet conversion, and DataHub integration
  • Logs stored in airflow/logs/
  • Custom operators/hooks go in airflow/plugins/

Data Formats & Storage Layout

Within the MinIO bucket (default: data):

  • raw/ - Original CSV files uploaded by users
  • staging/ - Temporary CSV storage during processing
  • hive/schema/table/ - Parquet files organized by Hive schema and table
  • processed/ - Archived CSV files after successful ingestion

Key Implementation Patterns

README-Driven Schema

The DAG supports README.txt files in the same folder as CSV files to define schema:

Schema:
- column_name (TYPE): Description
- another_column (VARCHAR): Another description

Primary Key: column_name

The DAG's readme_parser.py module extracts metadata including:

  • Column definitions with types and descriptions
  • Primary keys for MERGE mode deduplication
  • Table name and description

Auto-Detection Fallback

When no README exists, the DAG automatically:

  • Infers table name from folder structure
  • Detects column types from CSV data using pandas
  • Determines schema name from folder path (two-level folders use first level as schema)
  • Uses configurable HIVE_DEFAULT_SCHEMA for single-level folders

Intelligent Schema Naming

The DAG determines Hive schema names from folder structure:

  • Two-level: raw/fisheries/catches/ → Schema: fisheries, Table: catches
  • Single-level: raw/eu-catches/ → Schema: tables (or HIVE_DEFAULT_SCHEMA)
  • Configurable via HIVE_DEFAULT_SCHEMA environment variable

MERGE/UPSERT Mode

The DAG supports three ingestion modes (configurable via INGESTION_MODE):

  1. merge (default) - Deduplicates based on primary keys from README
  2. replace - Full table refresh, replaces all existing data
  3. append - No deduplication, appends all rows

Multilingual Support

The DAG handles multilingual data (Greek, German, French, English) with:

  • UTF-8 encoding for all CSV files
  • Proper handling of special characters in Parquet conversion

Common Development Tasks

Adding a New Shapefile Dataset

  1. Upload all shapefile components to MinIO under data/raw/<folder>/ (all files must share the same base name):
    mc cp rivers.shp rivers.dbf rivers.shx rivers.prj minio/data/raw/DataLakeFile/
    # Or use MinIO Console: http://localhost:9001
  2. The DAG auto-detects the bundle and derives schema from the .dbf attribute table.
    • Table name: base name of the .shp file (e.g., rivers)
    • Schema name: first folder segment after raw/ (e.g., DataLakeFile)
    • Geometry column: geometry (WKT string, VARCHAR in Hive)
  3. Wait for the DAG (hourly) or trigger manually:
    docker exec airflow-scheduler airflow dags trigger comprehensive_csv_ingestion
  4. Query via Trino:
    SELECT geometry, * FROM hive.DataLakeFile.rivers LIMIT 10;

Adding a New CSV Dataset

  1. Upload CSV file to MinIO at data/raw/your_dataset_name/
    # Using mc (MinIO client)
    mc cp your_file.csv minio/data/raw/your_dataset_name/
    
    # Or use MinIO Console: http://localhost:9001
  2. (Optional) Add README.txt with schema in same folder
  3. Wait for automatic ingestion (runs every 5 minutes) or trigger manually:
    docker exec airflow-scheduler airflow dags trigger comprehensive_csv_ingestion
  4. Check Airflow UI for progress: http://localhost:8081
  5. Query data: docker exec trino trino --execute "SELECT * FROM hive.tables.your_dataset_name LIMIT 10"

Modifying Ingestion Logic

  1. Edit DAG file: airflow/dags/comprehensive_csv_ingestion_dag.py
  2. Restart Airflow: docker-compose restart airflow-scheduler airflow-webserver
  3. Verify changes in Airflow UI: http://localhost:8081

Debugging Failed Ingestion

  1. Open Airflow Web UI: http://localhost:8081
  2. Navigate to the DAG and check task logs
  3. Query audit table for failures:
    docker exec postgres psql -U postgres -d datahub -c \
      "SELECT * FROM ingestion_audit WHERE status = 'failed' ORDER BY processed_at DESC LIMIT 10;"

Querying Ingestion Statistics

-- Connect to PostgreSQL
docker exec -it postgres psql -U postgres -d datahub

-- Total ingestion stats
SELECT
    COUNT(*) as total_files,
    SUM(rows_appended) as total_rows,
    COUNT(DISTINCT table_name) as unique_tables,
    COUNT(CASE WHEN status = 'success' THEN 1 END) as successful,
    COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed
FROM ingestion_audit;

-- Recent ingestions
SELECT
    file_path,
    table_name,
    rows_appended,
    status,
    processed_at
FROM ingestion_audit
ORDER BY processed_at DESC
LIMIT 20;

-- Files per table
SELECT
    table_name,
    COUNT(*) as file_count,
    SUM(rows_appended) as total_rows
FROM ingestion_audit
WHERE status = 'success'
GROUP BY table_name;

Testing Trino Connectivity

# From Trino container
docker exec -it trino trino --server localhost:8080

# From external Python
docker exec -it jupyter bash
pip install trino
python3 << EOF
from trino.dbapi import connect
conn = connect(host='trino', port=8080, user='admin', catalog='hive', schema='default')
cur = conn.cursor()
cur.execute('SHOW TABLES')
print(cur.fetchall())
EOF

Resetting Ingestion State

# Use the reset script
./reset_ingestion_only.sh

# Or manually:
# 1. Clear audit table
docker exec postgres psql -U postgres -d datahub -c "TRUNCATE ingestion_audit;"

# 2. Trigger reprocessing
docker exec airflow-scheduler airflow dags trigger comprehensive_csv_ingestion

Critical Files & Their Roles

  • docker-compose.yml - Orchestrates all services, defines volumes and networks
  • airflow/dags/comprehensive_csv_ingestion_dag.py - Main ingestion pipeline with all logic
  • airflow/dags/ingestion_scripts/readme_parser.py - README parsing module (used by DAG)
  • trino/etc/catalog/hive.properties - Connects Trino to Hive Metastore (critical for queries)
  • postgres/postgres-init.sql - PostgreSQL initialization script (creates databases)

Important Constraints & Notes

Schema Evolution

  • Tables are created with CREATE TABLE IF NOT EXISTS
  • Schema evolution (adding columns) is not yet implemented
  • Changing column types requires manual intervention (DROP/CREATE or ALTER)

Parquet Conversion

  • Fully implemented in the DAG
  • CSVs are converted to Parquet with three modes: MERGE (default), REPLACE, APPEND
  • Parquet files stored in bucket/hive/schema/table/ folder
  • External Hive tables point to Parquet files in MinIO

Health Checks

All services have health checks defined in docker-compose.yml:

  • Services wait for dependencies to be healthy before starting
  • If services repeatedly restart, check logs: docker-compose logs [service-name]

Volume Persistence

Named volumes persist data across container restarts:

  • minio_data - All object storage data (raw/, staging/, hive/, processed/)
  • postgres_data - PostgreSQL databases (includes ingestion_audit table)
  • hive_data - Hive Metastore warehouse
  • airflow_db_data - Airflow metadata
  • neo4j_data, elasticsearch_data - DataHub dependencies

To completely reset: docker-compose down -v (WARNING: deletes all data!)

Network

All services communicate via the surimi-network bridge network. Services reference each other by container name (e.g., postgres:5432, minio:9000).

Documentation Reference

  • README.md - Project overview and quick start (root)
  • QUICKSTART.md - Detailed step-by-step setup guide (root)
  • docs/DOCUMENTATION_INDEX.md - Master documentation roadmap
  • docs/RECENT_UPDATES.md - Changelog and migration guide
  • docs/SCHEMA_NAMING.md - Intelligent schema naming guide
  • docs/SINGLE_BUCKET_REFACTORING.md - Single bucket architecture details
  • docs/INGESTION_MODES.md - Complete ingestion modes reference
  • docs/ARCHITECTURE_GUIDE.md - Deep dive into architecture
  • docs/DEPLOYMENT_CHECKLIST.md - Production deployment considerations
  • docs/OPERATIONS.md - Day-to-day operations and commands
  • docs/DATAHUB_SETUP.md - DataHub setup guide
  • docs/SUPERSET_SETUP.md - Superset setup guide
  • docs/CONTRIBUTING.md - Development workflow and guidelines

SURIMI Project Context

SURIMI is a European Commission project focused on:

  • Healthcare data interoperability across EU member states
  • Caregiver and patient management systems
  • Multilingual data handling (Greek, German, French, English)
  • Privacy-preserving analytics
  • Built on the EDITO platform infrastructure

This codebase provides the data lakehouse foundation for SURIMI's analytical needs.