DataHub is a metadata management platform that catalogs your data assets. In the SURIMI pipeline, DataHub provides:
- 📊 Data Discovery - Search and browse all ingested datasets
- 🔄 Data Lineage - Track data flow from CSV → Parquet → Hive tables
- 📖 Schema Documentation - Column names, types, descriptions from README files
- 👥 Data Governance - Ownership, tags, classifications
- 🔍 Search - Full-text search across all metadata
CSV Upload → Process → Parquet → Hive Table → DataHub
↓
Searchable in DataHub UI
Shows lineage, schema, docs
From your ingestion pipeline, DataHub receives:
-
Dataset Information
- Dataset name (e.g.,
default.test) - Platform (Hive)
- Environment (PROD)
- Dataset name (e.g.,
-
Schema Metadata
- Column names
- Column types (VARCHAR, BIGINT, etc.)
- Column descriptions (from README files)
-
Custom Properties
- Source CSV file path
- Row count
- README file path
- Processing timestamp
-
Lineage Information
- Source: CSV in MinIO
- Transformation: Parquet conversion
- Destination: Hive table
Currently in comprehensive_csv_ingestion_dag.py, the ingest_metadata_to_datahub task:
def ingest_metadata_to_datahub(**context):
# Prepares DataHub metadata structure
dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:hive,{schema_name}.{table_name},PROD)"
dataset_properties = {
"customProperties": {
"source_file": metadata['object_name'],
"row_count": str(metadata.get('row_count', 0)),
"readme_path": metadata.get('readme_path', '')
},
"name": table_name,
"description": description,
"uri": f"s3a://data/hive/{metadata.get('parquet_path', '')}"
}
# Currently just logs - doesn't actually send to DataHub
logger.info(f"Would ingest to DataHub: {dataset_urn}")Status:
Pros:
- ✅ Official DataHub client
- ✅ Full feature support
- ✅ Type-safe API
- ✅ Handles authentication
Cons:
- ❌ Requires installing
acryl-datahubpackage - ❌ More complex code
Implementation:
from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
SchemaMetadataClass,
SchemaFieldClass,
StringTypeClass
)
# Create emitter
emitter = DatahubRestEmitter(
gms_server='http://datahub-gms:8080',
token=None # Add if auth enabled
)
# Create dataset URN
dataset_urn = make_dataset_urn(
platform='hive',
name=f'{schema_name}.{table_name}',
env='PROD'
)
# Create metadata events
dataset_properties = DatasetPropertiesClass(
name=table_name,
description=description,
customProperties={
'source_file': object_name,
'row_count': str(row_count)
}
)
# Emit to DataHub
emitter.emit_mcp(
entity_urn=dataset_urn,
aspect=dataset_properties
)
emitter.close()Pros:
- ✅ No special dependencies
- ✅ Works with standard HTTP
- ✅ Easy to debug
Cons:
- ❌ Need to construct JSON payloads manually
- ❌ More verbose
Implementation:
import requests
import json
datahub_gms_url = 'http://datahub-gms:8080'
# Prepare metadata change proposal (MCP)
mcp = {
"entityType": "dataset",
"entityUrn": f"urn:li:dataset:(urn:li:dataPlatform:hive,{schema_name}.{table_name},PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"value": json.dumps({
"name": table_name,
"description": description,
"customProperties": {
"source_file": object_name,
"row_count": str(row_count),
"readme_path": readme_path
}
})
}
}
# Send to DataHub
response = requests.post(
f'{datahub_gms_url}/entities?action=ingest',
json={"proposals": [mcp]},
headers={'Content-Type': 'application/json'}
)
if response.status_code == 200:
logger.info(f"Successfully ingested {table_name} to DataHub")
else:
logger.error(f"Failed to ingest: {response.text}")Pros:
- ✅ No code changes needed
- ✅ Good for one-time bulk imports
Cons:
- ❌ Not integrated with Airflow DAG
- ❌ Manual process
Usage:
# Install DataHub CLI
pip install acryl-datahub
# Ingest via recipe file
datahub ingest -c recipe.ymlExample recipe.yml:
source:
type: hive
config:
host_port: trino:8080
database: default
sink:
type: datahub-rest
config:
server: http://datahub-gms:8080Add to airflow/requirements.txt:
acryl-datahub==0.12.0
Rebuild Airflow:
docker-compose build airflow-init airflow-scheduler airflow-webserver
docker-compose up -d airflow-init airflow-scheduler airflow-webserverReplace the logging code in ingest_metadata_to_datahub() with actual DataHub SDK calls.
Upload a small CSV and verify it appears in DataHub UI at http://localhost:9002
After successful ingestion, in the DataHub UI you'll see:
- Dataset name:
default.test - Platform: Hive
- Description from README
- Custom properties (source file, row count, etc.)
- All columns with their types
- Column descriptions from README
- Data types mapped correctly
CSV File (MinIO)
↓
Parquet File (MinIO hive)
↓
Hive Table (Trino)
- Source file path
- Row count
- Processing timestamp
- README location
- Search by table name, column name, or description
- Filter by platform, domain, or tags
┌──────────────────────────────────────────────┐
│ Airflow DAG (comprehensive_csv) │
│ Task: ingest_metadata_to_datahub │
└──────────────┬───────────────────────────────┘
│
↓ (HTTP/SDK)
┌──────────────────────────────────────────────┐
│ DataHub GMS (Generalized Metadata │
│ Service) - Port 8080 │
│ │
│ - REST API for metadata ingestion │
│ - GraphQL API for queries │
│ - Stores metadata in backends │
└──────────────┬───────────────────────────────┘
│
↓ (Stores in)
┌──────────────────────────────────────────────┐
│ Backend Stores │
│ │
│ - PostgreSQL (main metadata) │
│ - Elasticsearch (search index) │
│ - Neo4j (lineage graph) │
│ - Kafka (change events) │
└───────────────────────────────────────────────┘
│
↑ (Read from)
┌──────────────────────────────────────────────┐
│ DataHub Frontend - Port 9002 │
│ http://localhost:9002 │
│ │
│ - Web UI for browsing metadata │
│ - Search interface │
│ - Lineage visualization │
└───────────────────────────────────────────────┘
Default Credentials:
- Username:
datahub - Password:
datahub
What to Check:
- Navigate to "Datasets"
- Filter by Platform: "Hive"
- Look for your tables (e.g.,
default.test) - Click on a table to see schema, properties, lineage
Source: CSV with README.txt
README – Fish Landings Data
============================
Summary
-------
Commercial fish landings by country and species.
Schema
------
- country (VARCHAR): Country name
- year (INTEGER): Year of landing
- species (VARCHAR): Fish species code
- tonnage (DOUBLE): Landed weight in tonnes
In DataHub:
- Dataset:
default.fish_landings - Description: "Commercial fish landings by country and species"
- Columns:
country(VARCHAR) - "Country name"year(INTEGER) - "Year of landing"species(VARCHAR) - "Fish species code"tonnage(DOUBLE) - "Landed weight in tonnes"
Source: CSV without README
id,name,value
1,test,100
2,sample,200In DataHub:
- Dataset:
default.my_data - Description: (empty)
- Columns:
id(BIGINT) - (no description)name(VARCHAR) - (no description)value(BIGINT) - (no description)
- Custom Properties:
- source_file:
my_data/data.csv - row_count:
2
- source_file:
Check 1: Is DataHub GMS running?
docker-compose ps | grep datahub-gms
curl http://localhost:8080/healthCheck 2: Check Airflow task logs
docker exec airflow-scheduler cat /opt/airflow/logs/.../ingest_metadata_to_datahub/attempt=1.logCheck 3: Check DataHub GMS logs
docker-compose logs datahub-gms | tail -50Possible causes:
- Metadata not actually sent (currently just logging)
- DataHub indexing lag (wait 1-2 minutes)
- Wrong platform/environment in URN
- Elasticsearch not indexed yet
Solution: Force reindex:
docker exec datahub-gms curl -X POST http://localhost:8080/operations?action=reindexIf DataHub has auth enabled:
- Create an access token in DataHub UI (Settings → Access Tokens)
- Pass token in SDK:
emitter = DatahubRestEmitter( gms_server='http://datahub-gms:8080', token='your-token-here' )
- Decision: Choose Method 1 (DataHub SDK) for production use
- Install: Add
acryl-datahubtoairflow/requirements.txt - Implement: Replace logging code with actual SDK calls
- Test: Upload a CSV and verify in DataHub UI
- Enhance: Add lineage, ownership, tags, glossary terms
Would you like me to implement the full DataHub SDK integration in your DAG?
Last Updated: 2025-12-12 DataHub Version: HEAD (latest) DataHub UI: http://localhost:9002