π¨ Automated Incident Management for Your Data Infrastructure π¨
This integration bridges DataHub Cloud with PagerDuty to provide real-time alerting and incident management for critical data events. When something goes wrong with your dataβschema changes, ownership issues, or asset deprecationsβyour team gets immediately notified through PagerDuty's proven alerting system.
Modern data teams manage hundreds of datasets across multiple platforms. When critical data issues occur:
- Silent failures can go unnoticed for hours or days
- Downstream impacts affect business decisions and customer experiences
- Manual monitoring doesn't scale as data ecosystems grow
- Communication gaps between data teams and incident responders
This integration automatically monitors your DataHub Cloud instance and creates PagerDuty incidents for critical data events, ensuring:
- β‘ Immediate alerting when data issues occur
- π― Targeted notifications to the right on-call teams
- π Automated resolution when issues are fixed
- π Incident tracking for post-mortem analysis and SLA monitoring
The integration monitors these critical data events in DataHub Cloud:
- Data Quality Assertion Failures: When data quality tests fail (SUCCESS/FAILURE results)
- Schema Changes: When table schemas are modified unexpectedly
- Asset Deprecations: When critical datasets are marked as deprecated
- Ownership Changes: When dataset ownership is transferred
- Tag Modifications: When critical tags (like PII) are added/removed
- Domain Changes: When datasets are moved between domains
- Documentation Updates: When important documentation changes
- Glossary Term Changes: When business terms are applied/removed
The following DataHub Cloud features are NOT accessible via the REST API and therefore cannot be monitored by this integration:
- Ingestion Pipeline Failures
- Missing Data Alerts
These features are part of DataHub Cloud's Observe module and require separate integration methods. For monitoring these features, consider:
- Using DataHub Cloud's built-in notification system
- Integrating with DataHub Cloud's webhook capabilities (if available)
- Using DataHub Cloud's native alerting features
This integration supports EntityChangeEvent_v1 events, which include:
- Tag additions/removals
- Ownership changes
- Domain assignments
- Deprecation status changes
- Documentation updates
- Glossary term assignments
- Data Quality Assertion Run Events (SUCCESS/FAILURE results)
- Freshness Assertions (data freshness monitoring)
- Volume Assertions (row count validation)
- Column Assertions (data quality checks)
- Custom SQL Assertions (custom validation logic)
- Schema Assertions (schema validation)
Note: Data quality assertions ARE supported via Assertion Run Events, which emit when assertions are executed and include the result (SUCCESS or FAILURE), run ID, and assertee URN. This includes freshness assertions that monitor data freshness and update frequency.
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β DataHub Cloud β β Actions Frameworkβ β PagerDuty β
β (<namespace>.acβββββΆβ (Your Computer) βββββΆβ Incidents β
β ryl.io) β β β β β
βββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
β β β
Metadata Events Event Processing Incident Management
β’ Schema changes β’ Event filtering β’ Alert routing
β’ Ownership changes β’ Severity mapping β’ Escalation
β’ Tag modifications β’ Deduplication β’ Auto-resolution
- Event Monitoring: The Actions Framework polls DataHub Cloud via REST API using your Personal Access Token
- Event Processing: Events are filtered, categorized, and mapped to appropriate severity levels
- Incident Creation: Critical events trigger PagerDuty incidents with rich context and links
- Auto-Resolution: When issues are resolved in DataHub, corresponding PagerDuty incidents are automatically resolved
- Deduplication: Multiple events for the same issue are grouped into a single incident
- DataHub Cloud account access
- Personal Access Token from DataHub Cloud
- PagerDuty account with Events API v2 service
- Python 3.8+
- DataHub Cloud account access (
<namespace>.acryl.io) - Personal Access Token from DataHub Cloud
- PagerDuty account with Events API v2 service
- Python 3.8+
git clone https://github.com/martyacryl/datahubdemos.git
cd datahubdemos/datahub-pagerduty-integration
# Create virtual environment
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install dependencies
pip install -r requirements.txt- Log into DataHub Cloud at
https://<namespace>.acryl.io - Navigate to Settings β Access Tokens
- Click Generate new token
- Name: "PagerDuty Integration"
- Expiration: 1 year (recommended)
- Copy the token - you'll need it for the
.envfile
- Log into your PagerDuty account
- Navigate to Services β Service Directory
- Click + New Service
- Service Details:
- Name: "DataHub Metadata Alerts"
- Description: "Critical metadata events from DataHub Cloud"
- Integration Settings:
- Select "Use our API directly"
- Choose "Events API v2"
- Escalation Policy: Assign to your data team's escalation policy
- Click Create Service
- Copy the Integration Key from the service details
# Copy the environment template
cp .env.example .env
# Edit with your actual tokens (use any text editor)
nano .envUpdate .env with your tokens:
# Required: Your DataHub Cloud Personal Access Token
DATAHUB_GMS_TOKEN=eyJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9...
# Required: PagerDuty Integration Key (Events API v2)
PAGERDUTY_ROUTING_KEY=R012ABC34DEF5GHI67890JKL
# Optional: Environment identifier
ENVIRONMENT=production# Test with debug logging to see what's happening
python -m datahub_pagerduty_integration --debug
# Or use the DataHub Actions CLI directly
datahub actions -c config/pagerduty_action.yaml --debug- Check Logs: Look for successful connection messages
- Make a Test Change: Add a tag or change ownership in DataHub Cloud
- Check PagerDuty: Verify incidents are created for critical events
- Test Resolution: Fix the issue in DataHub and verify auto-resolution
The integration can be configured to monitor specific types of events. Edit config/pagerduty_action.yaml:
# Monitor only critical events
filter:
event_type: "EntityChangeEvent_v1"
event:
category: ["TAG", "OWNER", "DEPRECATION"]Customize how DataHub events map to PagerDuty severity levels:
action:
config:
severity_mapping:
schema_change: "warning" # Schema changes are warnings
owner_change: "info" # Ownership changes are info
deprecation: "warning" # Deprecations are warnings
tag_change: "info" # Tag changes are infoConfigure which events should automatically resolve incidents:
action:
config:
enable_auto_resolve: true
auto_resolve_operations: ["REMOVE"]Add organization-specific context to incidents:
action:
config:
custom_fields:
environment: "production"
team: "data-engineering"
service: "datahub"
runbook_url: "https://wiki.company.com/data-incident-response"# Run the test suite
python -m pytest tests/ -v
# Run with coverage
python -m pytest tests/ --cov=src/datahub_pagerduty_integration --cov-report=html# Test DataHub Cloud connection
python -c "
import os
import requests
from dotenv import load_dotenv
load_dotenv()
token = os.getenv('DATAHUB_GMS_TOKEN')
headers = {'Authorization': f'Bearer {token}'}
response = requests.get('https://<namespace>.acryl.io/gms/health', headers=headers)
print(f'DataHub Health: {response.status_code}')
"
# Test PagerDuty connection
python -c "
import os
import requests
import json
from dotenv import load_dotenv
load_dotenv()
routing_key = os.getenv('PAGERDUTY_ROUTING_KEY')
payload = {
'routing_key': routing_key,
'event_action': 'trigger',
'payload': {
'summary': 'Test incident from DataHub integration',
'source': 'DataHub Test',
'severity': 'info'
}
}
response = requests.post('https://events.pagerduty.com/v2/enqueue',
data=json.dumps(payload),
headers={'Content-Type': 'application/json'})
print(f'PagerDuty Test: {response.status_code} - {response.json()}')
"# Run with debug logging and watch for events
python -m datahub_pagerduty_integration --debug
# In another terminal, make changes in DataHub Cloud:
# 1. Add a tag to a dataset
# 2. Change ownership
# 3. Add a deprecation notice
# 4. Update documentation
# Watch the logs for event processing and PagerDuty API calls- Create Test Incident: Make a change in DataHub that should trigger an incident
- Check PagerDuty: Verify the incident appears with correct details
- Verify Context: Check that the incident includes:
- Clear summary of the issue
- Link to the DataHub entity
- Custom fields with environment/team info
- Appropriate severity level
- Test Auto-Resolution: Fix the issue in DataHub and verify the incident resolves
# Monitor memory and CPU usage
python -m datahub_pagerduty_integration --debug &
PID=$!
# Monitor resource usage
top -p $PID
# Or use more detailed monitoring
python -m py-spy top --pid $PIDThe main integration logic that:
- Extends DataHub's Action base class to handle metadata events
- Filters events based on criticality and type
- Maps DataHub events to PagerDuty incidents with appropriate severity
- Handles deduplication using entity URNs to prevent spam
- Manages auto-resolution when issues are fixed
- Formats rich incident details with links and context
Key methods:
act(): Processes each DataHub event_should_trigger_incident(): Determines if an event warrants an incident_send_trigger_event(): Creates PagerDuty incidents_send_resolve_event(): Resolves incidents automatically
DataHub Actions configuration that:
- Defines event sources (DataHub Cloud REST API)
- Configures polling intervals for checking new events
- Sets up event filtering to focus on critical events only
- Maps action parameters like routing keys and severity levels
# Simplified event processing logic
def act(self, event: EventEnvelope) -> None:
"""Process a DataHub event and potentially send to PagerDuty"""
# 1. Extract event metadata
event_type = event.event_type
entity_urn = event.event.get("entityUrn")
category = event.event.get("category")
# 2. Determine criticality
if self._should_trigger_incident(event.event):
# 3. Create rich incident context
incident_data = {
"summary": self._generate_summary(event),
"source": "DataHub Cloud",
"severity": self._get_severity(event),
"component": self._extract_component(entity_urn),
"custom_details": {
"entity_url": f"https://<namespace>.acryl.io/dataset/{entity_urn}",
"category": category,
"timestamp": datetime.utcnow().isoformat()
}
}
# 4. Send to PagerDuty
self._send_to_pagerduty(incident_data)The integration uses intelligent deduplication to prevent alert spam:
# Generate consistent deduplication key
dedup_key = f"datahub-{entity_urn}-{category}"This ensures:
- Multiple events for the same entity/category are grouped
- Incident updates rather than new incidents for ongoing issues
- Clean resolution when the underlying issue is fixed
The integration includes robust error handling:
- Retry logic for transient API failures
- Circuit breaker patterns for PagerDuty API rate limiting
- Dead letter queue for events that can't be processed
- Graceful degradation when services are unavailable
For local development and testing:
# Run interactively with debug logging
python -m datahub_pagerduty_integration --debug
# Run in background
nohup python -m datahub_pagerduty_integration > logs/integration.log 2>&1 &For 24/7 monitoring, deploy on a server:
# Copy to server
scp -r datahub-pagerduty-integration user@server:/opt/
# Install as systemd service
sudo cp scripts/datahub-pagerduty.service /etc/systemd/system/
sudo systemctl daemon-reload
sudo systemctl enable datahub-pagerduty
sudo systemctl start datahub-pagerduty
# Check status
sudo systemctl status datahub-pagerduty
sudo journalctl -u datahub-pagerduty -f# Build image
docker build -t datahub-pagerduty .
# Run container
docker run -d \
--name datahub-pagerduty \
--env-file .env \
--restart unless-stopped \
-v $(pwd)/logs:/app/logs \
datahub-pagerduty
# Check logs
docker logs -f datahub-pagerdutyThe integration can be deployed on any cloud platform:
- AWS: EC2 instance, ECS container, or Lambda function
- GCP: Compute Engine, Cloud Run, or Cloud Functions
- Azure: Virtual Machine, Container Instances, or Functions
For serverless deployments, consider using cron-triggered functions that run the integration periodically.
# View real-time logs
tail -f logs/datahub_pagerduty.log
# Search for errors
grep -i error logs/datahub_pagerduty.log
# Monitor API calls
grep -i "pagerduty\|datahub" logs/datahub_pagerduty.log# Check if the integration is running
ps aux | grep datahub_pagerduty
# Check DataHub Cloud connectivity
curl -H "Authorization: Bearer $DATAHUB_GMS_TOKEN" \
https://<namespace>.acryl.io/gms/health
# Check PagerDuty Events API
curl -X POST https://events.pagerduty.com/v2/enqueue \
-H "Content-Type: application/json" \
-d '{"routing_key":"test","event_action":"trigger","payload":{"summary":"health check"}}'Consider implementing these metrics:
- Events processed per hour
- PagerDuty API success rate
- DataHub API response times
- Integration uptime
ERROR: Failed to authenticate with DataHub Cloud
Solutions:
- Verify your Personal Access Token is correct
- Check token expiration date in DataHub Cloud
- Ensure token has required permissions
- Regenerate token if necessary
ERROR: Failed to send event to PagerDuty: 400 Client Error
Solutions:
- Verify PagerDuty routing key is correct
- Check PagerDuty service configuration
- Ensure Events API v2 is enabled
- Test API manually with curl
INFO: No events received in the last 5 minutes
Solutions:
- Check DataHub Cloud connectivity
- Verify event filtering rules aren't too restrictive
- Enable debug logging to see event flow
- Make test changes in DataHub Cloud
WARNING: High memory usage detected
Solutions:
- Restart the integration service
- Check for memory leaks in logs
- Adjust polling intervals
- Monitor system resources
Enable detailed logging to troubleshoot issues:
# Maximum verbosity
python -m datahub_pagerduty_integration --debug
# Log API requests and responses
export DATAHUB_DEBUG=true
export REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt- Check Logs: Always start with the application logs
- Test Connectivity: Verify both DataHub and PagerDuty APIs work
- Review Configuration: Double-check tokens and settings
- GitHub Issues: Report bugs or request features
- DataHub Community: Ask questions in DataHub Slack
- PagerDuty Support: For PagerDuty-specific issues
We welcome contributions! Here's how to get started:
# Fork and clone the repository
git clone https://github.com/yourusername/datahubdemos.git
cd datahubdemos/datahub-pagerduty-integration
# Create development environment
python -m venv dev-env
source dev-env/bin/activate
pip install -r requirements.txt
pip install -r requirements-dev.txt
# Install pre-commit hooks
pre-commit install# Run all tests
python -m pytest tests/ -v
# Run with coverage
python -m pytest tests/ --cov=src --cov-report=html
# Run specific test categories
python -m pytest tests/test_pagerduty_action.py -v
python -m pytest tests/integration/ -v# Format code
black src/ tests/
# Lint code
flake8 src/ tests/
# Type checking
mypy src/- Create a feature branch:
git checkout -b feature/new-feature - Make changes and add tests
- Run the test suite and ensure all tests pass
- Submit a pull request with a clear description
- DataHub Actions Framework
- DataHub Cloud Events API
- PagerDuty Events API v2
- DataHub Cloud Documentation
This project is licensed under the MIT License - see the LICENSE file for details.
- DataHub Team for building the Actions Framework
- PagerDuty for their robust Events API
- Open Source Community for inspiration and best practices
Made with β€οΈ by the Data Engineering Team
For questions or support, please open an issue or contact the maintainers.