A world-class data pipeline for downloading and storing historical and real-time energy market data from major US ISOs (Independent System Operators).
- Multi-ISO Support: ERCOT, CAISO, ISO-NE, NYISO (with extensible architecture for more)
- Standardized Schema: Consistent data model across all ISOs using interval-based timestamps
- Real-Time Updates: Smart polling system with 5-minute interval updates
- Historical Backfill: Download data back to January 1, 2019
- TimescaleDB Ready: Optimized for time-series data with compression support
- High-Performance Rust Processor: Process millions of records in seconds
- Data Types Supported:
- Locational Marginal Prices (LMP) - Day-ahead and real-time
- Settlement Point Prices (ERCOT)
- Ancillary Services pricing
- Load data and forecasts
- Generation by fuel type (planned)
- Transmission constraints (planned)
- Weather data (planned)
# Clone and install
git clone https://github.com/battalion-energy/power_market_pipeline.git
cd power_market_pipeline
pip install uv
uv sync
# Configure
cp .env.example .env
# Edit .env with your database URL and ISO credentials
# Initialize database
createdb power_market
uv run pmp init
# Download recent data
uv run pmp download --iso ERCOT --days 3
# Start real-time updates
uv run pmp realtime --iso ERCOTpip install uv
uv syncpip install -e .- Copy
.env.exampleto.env:
cp .env.example .env- Edit
.envwith your configuration:
# Database (required)
DATABASE_URL=postgresql://localhost:5432/power_market
# ERCOT (optional - needed for WebService API after Dec 2023)
ERCOT_USERNAME=your_username
ERCOT_PASSWORD=your_password
ERCOT_SUBSCRIPTION_KEY=your_key
# Other ISOs (optional)
CAISO_USERNAME=your_username
CAISO_PASSWORD=your_password- Create PostgreSQL database:
createdb power_market- Initialize schema:
uv run pmp initThis creates all tables and seeds initial data (ISOs, dataset categories).
The pmp (Power Market Pipeline) CLI provides all functionality:
# Show help
uv run pmp --help
# Initialize database
uv run pmp init
# Download data
uv run pmp download --iso ERCOT --days 7
uv run pmp download --iso CAISO --start 2024-01-01 --end 2024-01-31
# Start real-time updates
uv run pmp realtime --iso ERCOT
uv run pmp realtime --iso ERCOT --iso CAISO --data-types lmp,load
# Run historical backfill
uv run pmp backfill --iso ERCOT --start 2019-01-01
# View data catalog
uv run pmp catalog
uv run pmp catalog --iso ERCOTFor processing large ERCOT datasets, use the Rust processor:
cd ercot_data_processor
# Build the processor
cargo build --release
# Extract all CSV files from ERCOT ZIP archives
cargo run --release -- --extract-all-ercot /path/to/ERCOT_data
# Process extracted data into annual Parquet files
SKIP_CSV=1 cargo run --release -- --process-annual
# Other commands
cargo run --release -- --dam # Process DAM data
cargo run --release -- --ancillary # Process ancillary services
cargo run --release -- --lmp # Process LMP data
cargo run --release -- --bess # Analyze BESS resourcesFeatures:
- Processes millions of records in seconds
- Automatic schema evolution handling (e.g., 2011 DSTFlag addition)
- Forces all price columns to Float64 to prevent type mismatches
- Outputs compressed Parquet files (95%+ compression ratio)
- Handles nested ZIP extraction efficiently
The real-time updater runs continuously, fetching new data at 5-minute intervals:
# Update single ISO
uv run pmp realtime --iso ERCOT
# Update multiple ISOs
uv run pmp realtime --iso ERCOT --iso CAISO
# Use specialized ERCOT updater (polls every 5 seconds)
uv run pmp realtime --ercot-onlyFeatures:
- Triggers exactly at 5-minute marks (00:00, 00:05, 00:10, etc.)
- Polls aggressively for new data when triggered
- Handles connection failures gracefully
- Comprehensive logging for monitoring
Download historical data in chunks:
# Last 30 days
uv run pmp download --iso ERCOT --days 30
# Specific date range
uv run pmp download --iso ERCOT --start 2024-01-01 --end 2024-03-31
# Full historical (since Jan 1, 2019)
uv run pmp backfill --iso ERCOT --start 2019-01-01Note: Full historical backfill requires:
- Valid API credentials for each ISO
- Significant storage space (~1GB compressed for all ISOs)
- Several hours to complete
LMP (Locational Marginal Prices)
CREATE TABLE lmp (
interval_start TIMESTAMPTZ NOT NULL,
interval_end TIMESTAMPTZ NOT NULL,
iso VARCHAR(10) NOT NULL,
location VARCHAR(100) NOT NULL,
location_type VARCHAR(50),
market VARCHAR(10) NOT NULL,
lmp DECIMAL(10, 2),
energy DECIMAL(10, 2),
congestion DECIMAL(10, 2),
loss DECIMAL(10, 2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);Ancillary Services
CREATE TABLE ancillary_services (
interval_start TIMESTAMPTZ NOT NULL,
interval_end TIMESTAMPTZ NOT NULL,
iso VARCHAR(10) NOT NULL,
region VARCHAR(100) NOT NULL,
market VARCHAR(10) NOT NULL,
product VARCHAR(50) NOT NULL,
clearing_price DECIMAL(10, 2),
clearing_quantity DECIMAL(10, 2),
requirement DECIMAL(10, 2)
);DAM: Day-Ahead Market (hourly)RT5M: Real-Time 5-minuteRT15M: Real-Time 15-minuteHASP: Hour-Ahead Scheduling Process (CAISO)
hub: Trading hubszone: Load zonesnode: Individual nodes/buses
# Test basic functionality
uv run python test_mock_data.py
# Test historical periods (3 days, 1 month, historical)
uv run python test_with_mock_historical.py
# Test real-time updater
uv run python test_realtime_updater.pyThe mock tests demonstrate:
- Database connectivity and schema
- Bulk insert performance (~5,000 records/second)
- Data quality validation
- Real-time update scheduling
- Historical data volume estimates
For detailed information about the database design, schema, and management practices, see Database Design Documentation.
For deployment instructions on AWS EC2, see Deployment Guide.
power_market_pipeline/
├── downloaders/ # ISO-specific downloaders
│ ├── base_v2.py # Base class defining interface
│ ├── ercot/
│ │ ├── downloader_v2.py # ERCOT implementation
│ │ ├── selenium_client.py
│ │ └── webservice_client.py
│ ├── caiso/
│ │ └── downloader_v2.py # CAISO implementation
│ ├── isone/
│ │ └── downloader_v2.py # ISO-NE implementation
│ └── nyiso/
│ └── downloader_v2.py # NYISO implementation
├── database/
│ ├── docs/ # Database documentation
│ │ └── DATABASE_DESIGN.md # Schema design philosophy
│ ├── migrations/ # Versioned schema changes
│ ├── seeds/ # Reference data
│ ├── scripts/ # Database management scripts
│ ├── utils/ # Migration and seed utilities
│ ├── models_v2.py # SQLAlchemy models
│ ├── connection.py # Database connection
│ └── schema_v2.sql # SQL schema definition
├── services/
│ ├── data_fetcher.py # Orchestrates downloads
│ ├── realtime_updater.py # Real-time scheduler
│ └── dataset_registry.py # Dataset metadata
├── processors/ # Data transformation (Python)
├── ercot_data_processor/ # High-performance Rust processor
│ ├── src/
│ │ ├── main.rs # CLI entry point
│ │ ├── annual_processor.rs # Annual data aggregation
│ │ ├── csv_extractor.rs # ZIP/CSV extraction
│ │ └── ... # Other processors
│ └── annual_output/ # Processed Parquet files
└── power_market_pipeline/
└── cli.py # Command-line interface
- Create downloader in
downloaders/<iso>/downloader_v2.py - Inherit from
BaseDownloaderV2 - Implement required methods:
async def download_lmp(self, market, start_date, end_date, locations=None) async def download_ancillary_services(self, product, market, start_date, end_date) async def download_load(self, forecast_type, start_date, end_date) async def get_available_locations(self)
- Add ISO to seed data
- Update tests
- Create table in
database/models_v2.py - Add to SQL schema
- Create download method in base class
- Implement in each ISO downloader
- Update
DataFetcherservice
# Format code
uv run ruff format
# Lint
uv run ruff check
# Type check
uv run mypy .Create /etc/systemd/system/power-market-realtime.service:
[Unit]
Description=Power Market Pipeline Real-time Updater
After=network.target postgresql.service
[Service]
Type=simple
User=pmp
WorkingDirectory=/opt/power_market_pipeline
Environment="PATH=/opt/power_market_pipeline/.venv/bin"
ExecStart=/opt/power_market_pipeline/.venv/bin/python -m power_market_pipeline.cli realtime
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.targetdocker-compose up -d- Launch t3.large or larger
- Install PostgreSQL 14+ with TimescaleDB
- Clone repository
- Configure environment
- Set up systemd service
- Configure CloudWatch/Dash0 monitoring
Configure in .env:
OTEL_EXPORTER_OTLP_ENDPOINT=https://your-endpoint
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer your-token- Download success/failure rates
- Records processed per second
- Data quality metrics
- API response times
- Database performance
- Import errors: Ensure you're using
uv runor have activated the virtual environment - Database connection: Check DATABASE_URL in .env
- No data downloading: Most ISOs require API credentials
- Selenium errors: Install Chrome/ChromeDriver for ERCOT historical data
# Enable SQL echo
export SQL_ECHO=true
# Run with debug logging
uv run pmp download --iso ERCOT --days 1 --debug- Fork the repository
- Create feature branch (
git checkout -b feature/amazing-feature) - Make changes with tests
- Run linting (
uv run ruff check) - Commit (
git commit -m 'Add amazing feature') - Push (
git push origin feature/amazing-feature) - Open Pull Request
Proprietary - Battalion Energy
- GitHub Issues: https://github.com/battalion-energy/power_market_pipeline/issues
- Documentation: See
/docsfolder - Email: [contact email]