The SURIMI DataLab ingestion pipeline supports multiple ingestion modes to handle different data update scenarios.
The pipeline moves CSV and README files through folders, while Parquet files are stored separately:
CSV/README Flow: bucket/raw/ → bucket/staging/ → bucket/processed/
Parquet Storage: bucket/hive/{schema}/{table}/
Flow:
- Upload CSV + README to
bucket/raw/folder - Pipeline validates and processes files
- Moves CSV + README to
bucket/staging/folder (temporary) - Converts CSV to Parquet (stored in
bucket/hive/schema/table/) - Moves original CSV + README to
bucket/processed/folder (archival)
Result:
bucket/raw/- Empty (files consumed)bucket/staging/- Temporary CSV + README (deleted after processing)bucket/hive/- Parquet files organized by schema/table (for queries)bucket/processed/- Original CSV + README (historical archive)
Set the ingestion mode and bucket configuration via environment variables in docker-compose.yml:
environment:
# MinIO bucket configuration
MINIO_BUCKET: data # Single bucket name (configurable for production)
RAW_PREFIX: raw/
STAGING_PREFIX: staging/
PROCESSED_PREFIX: processed/
HIVE_PREFIX: hive/ # Root folder for all Hive Parquet tables
# Hive schema configuration
HIVE_DEFAULT_SCHEMA: tables # Default schema for single-level folders (default: 'tables')
# Ingestion mode
INGESTION_MODE: merge # Options: merge, replace, appendOr set it at runtime:
export MINIO_BUCKET=my-production-bucket
export HIVE_DEFAULT_SCHEMA=surimi
export INGESTION_MODE=merge
docker-compose up -d airflow-schedulerThe system automatically determines Hive schema names from folder structure:
- Two-level folders:
raw/datasets/eu-catches/→ Schema:datasets - Single-level folders:
raw/eu-catches/→ Schema:tables(or value ofHIVE_DEFAULT_SCHEMA) - No folders: Uses
HIVE_DEFAULT_SCHEMAvalue
This avoids the generic "default" schema and organizes tables logically.
Behavior: Intelligently merges new data with existing data based on primary keys.
- With Primary Keys: Deduplicates based on key columns (new rows overwrite old rows with same key)
- Without Primary Keys: Full-row deduplication (exact duplicate rows removed)
When to Use:
- Incremental data loads
- Update existing records
- Fix/correct historical data
- Typical ETL scenarios
Example:
Existing Data:
country | year | catch
--------+------+-------
Greece | 2020 | 100
France | 2020 | 200
New Data:
country | year | catch
--------+------+-------
Greece | 2020 | 150 ← Updated value
Germany | 2020 | 300 ← New record
Result (Primary Keys: country, year):
country | year | catch
--------+------+-------
France | 2020 | 200 ← Unchanged
Greece | 2020 | 150 ← Updated
Germany | 2020 | 300 ← New
Technical Details:
- Reads all existing Parquet files for the table
- Concatenates existing + new data
- Drops duplicates based on primary keys (keeps 'last' = new data)
- Deletes old Parquet files
- Writes single consolidated Parquet file
Behavior: Completely replaces all existing data with new data.
- Deletes all existing Parquet files
- Writes only new data
When to Use:
- Full table refresh
- Snapshot data (replace yesterday's snapshot with today's)
- Data quality issues requiring clean slate
Example:
Existing Data:
country | year | catch
--------+------+-------
Greece | 2020 | 100
France | 2020 | 200
Spain | 2020 | 300
New Data:
country | year | catch
--------+------+-------
Greece | 2021 | 150
France | 2021 | 250
Result:
country | year | catch
--------+------+-------
Greece | 2021 | 150 ← Only new data
France | 2021 | 250 ← Old data gone
Technical Details:
- Lists all existing Parquet files for the table
- Deletes all existing files
- Writes new Parquet file with new data only
Behavior: Adds new data without checking for duplicates (fastest, but can create duplicates).
- Creates new Parquet file alongside existing ones
- No deduplication
- No modification of existing files
When to Use:
- Event logs (time-series data where duplicates impossible)
- Sensor data with guaranteed unique timestamps
- Performance-critical scenarios where you control upstream deduplication
Warning:
Example:
Existing Data (in data_20251213_120000.parquet):
country | year | catch
--------+------+-------
Greece | 2020 | 100
New Data (creates data_20251213_140000.parquet):
country | year | catch
--------+------+-------
Greece | 2020 | 150
Result (query returns ALL rows from ALL files):
country | year | catch
--------+------+-------
Greece | 2020 | 100 ← Old file
Greece | 2020 | 150 ← New file (DUPLICATE KEY!)
Technical Details:
- Writes new Parquet file to table directory
- No interaction with existing files
- Hive external table reads all Parquet files in directory
The system automatically detects primary keys using multiple strategies:
Primary Key
-----------
- country
- year
- quarter
Schema
------
- id (INTEGER) ★: Unique identifier (primary key)
- country (VARCHAR): Country name
Automatically recognizes common patterns:
- Columns named:
id,uuid,code,key - Columns ending in:
_id,_code
For datasets without explicit IDs, infers composite keys from first 5 columns containing:
country,year,quarter,date,region,area,species
Example: Your EU DCF Catches dataset automatically uses:
primary_keys = ['country', 'year', 'quarter']Files are tracked in PostgreSQL ingestion_audit table to prevent re-processing:
SELECT file_path, file_hash, status, processed_at
FROM ingestion_audit
WHERE table_name = 'eu_dcf_catches_2013_2023'
ORDER BY processed_at DESC;Key Points:
- Files identified by MD5 hash
- Only files with
status='success'are skipped - Failed files will be retried on next run
- Clearing audit table will re-process all files (useful for testing)
Check ingestion logs to see merge statistics:
docker-compose logs airflow-scheduler | grep -A 10 "Merging new data"Example log output:
[INFO] Converting /tmp/airflow_csv/test.csv to Parquet (mode: merge)...
[INFO] Loaded CSV: 9 rows, 30 columns
[INFO] Found 1 existing Parquet file(s) for eu_dcf_catches_2013_2023
[INFO] Loaded 9 existing rows from 1 file(s)
[INFO] Merging new data (9 rows) with existing data (9 rows)
[INFO] Using primary keys for merge: ['country', 'year', 'quarter']
[INFO] After merge: 12 rows (6 duplicates removed)
[INFO] Final dataset: 12 rows, 30 columns
[INFO] Deleting 1 old Parquet file(s)...
[INFO] [MERGE] Converted and uploaded: s3a://data/hive/default/eu_dcf_catches_2013_2023/data_20251213_193524.parquet
To test merge behavior with your EU DCF dataset:
# 1. Note current row count
docker exec trino trino --execute "SELECT COUNT(*) FROM hive.tables.eu_dcf_catches_2013_2023"
# 2. Modify test.csv (change some values)
# 3. Clear audit table to allow re-processing
docker exec postgres psql -U postgres -d datahub -c \
"DELETE FROM ingestion_audit WHERE table_name='eu_dcf_catches_2013_2023';"
# 4. Re-upload and trigger DAG
# (upload modified test.csv to MinIO)
docker exec airflow-scheduler airflow dags trigger comprehensive_csv_ingestion
# 5. Check logs for merge stats
docker-compose logs airflow-scheduler | grep -A 15 "Merging new data"
# 6. Verify row count (should match or be slightly different based on changes)
docker exec trino trino --execute "SELECT COUNT(*) FROM hive.tables.eu_dcf_catches_2013_2023"# Create new CSV with additional rows (different year/quarter)
# Upload to MinIO
# Trigger DAG
# Row count should increase# Set environment variable
export INGESTION_MODE=replace
docker-compose up -d airflow-scheduler
# Trigger DAG - old data will be completely replaced- Speed: Slower (reads existing data)
- Storage: Efficient (single consolidated file)
- Memory: Higher (loads all data into memory)
- Speed: Fast (no reads)
- Storage: Efficient (single file)
- Memory: Low (only new data)
- Speed: Fastest (no reads, no deletes)
- Storage: Can grow large (multiple files)
- Memory: Lowest (only new data)
Recommendation: Use MERGE for most scenarios. Switch to APPEND only for high-frequency time-series data.
Cause: Primary keys not detected or incorrect Solution:
- Check logs for "Using primary keys for merge: [...]"
- Add explicit "Primary Key" section to README
- Verify column names match between README and CSV
Cause: Dataset too large to fit in memory Solution:
- Increase Airflow worker memory in docker-compose.yml
- Use REPLACE mode instead
- Implement partitioning (future enhancement)
Cause: Permission errors or network issues Solution: Check logs for "Failed to delete" warnings
Planned improvements to ingestion modes:
- Partition-based MERGE: Merge only specific partitions (e.g., by year)
- Delta Lake integration: ACID transactions for concurrent writes
- Incremental processing: Process only changed rows
- Custom merge strategies: User-defined merge logic
- Soft deletes: Mark records as deleted instead of removing
services:
airflow-scheduler:
environment:
# MinIO bucket configuration
MINIO_BUCKET: data
RAW_PREFIX: raw/
STAGING_PREFIX: staging/
PROCESSED_PREFIX: processed/
HIVE_PREFIX: hive/
# Hive schema configuration
HIVE_DEFAULT_SCHEMA: tables # Change to 'surimi' or your project name
# MERGE mode (default)
INGESTION_MODE: merge
# REPLACE mode
# INGESTION_MODE: replace
# APPEND mode
# INGESTION_MODE: appendSchema
------
1. COUNTRY: Country code
2. YEAR: Four digits year
3. QUARTER: 1, 2, 3, or 4
4. SPECIES: FAO species code
5. CATCH: Catch in tonnes
Primary Key
-----------
Unique records identified by:
- COUNTRY
- YEAR
- QUARTER
- SPECIES