You are tasked with building a robust ETL pipeline to process tick-level market data. The data arrives in a semi-structured format (JSON Lines), with a schema that may evolve over time (e.g., new fields being added). The pipeline must be resilient to malformed or incomplete records, support data validation, and provide data quality insights.
The exchange provides daily JSONL files (available in the data folder) containing two types of events:
- Trades: Actual transactions with price and size (and sometimes additional metadata like exchange or condition).
- Quotes: Bid and ask prices available at a given time, possibly from different exchanges.
Each line in the file represents a single event and may contain nested structures. Due to upstream inconsistencies, some lines may have schema errors, invalid timestamps, or nonsensical values (e.g., negative prices).
Your job is to build a Python-based ETL pipeline that processes these files in order and loads the cleaned data into a SQLite database.
{"event_type": "trade", "symbol": "AAPL", "timestamp": "2025-03-25T09:30:01.123456", "trade": {"price": 179.55, "size": 100}}
{"event_type": "quote", "symbol": "AAPL", "timestamp": "2025-03-25T09:30:01.123789", "quote": {"bid": 179.5, "ask": 179.6}}
{"event_type": "trade", "symbol": "AAPL", "timestamp": "2025-03-25T09:30:02.001234", "trade": {"price": 179.60, "size": 50, "exchange": "NASDAQ", "condition": "REGULAR"}}
{"event_type": "quote", "symbol": "AAPL", "timestamp": "2025-03-25T09:30:02.003245", "quote": {"bid": 179.55, "ask": 179.65, "exchange": "NASDAQ"}}
{"event_type": "trade", "symbol": "MSFT", "timestamp": "BAD_TIMESTAMP", "trade": {"price": 299.80, "size": 50}}
{"event_type": "quote", "symbol": "MSFT", "timestamp": "2025-03-25T09:30:03.000001", "quote": {"bid": -299.70, "ask": 299.90}}Build a Python ETL pipeline with the following functionality:
-
Extract
- Parse JSONL input file line by line.
- Detect and separate trade and quote events.
- Process files sequentially, one per day.
-
Transform
- Normalize nested fields into flat structures.
- Include basic data validation.
-
Handle Bad Records
-
Capture any row that fails parsing or validation.
-
Log or print bad records with helpful log information.
-
-
Load
- Insert valid rows across all files into normalized tables:
- trades
- quotes
- Insert valid rows across all files into normalized tables:
-
Reporting
- For each trade, use the most recent bid and ask to compute a prevailing mid quote.
- Estimate transaction cost as:
- (traded price / mid price - 1) for longs
- (1 - traded price / mid price) for shorts
- Produce a report summarizing the transaction cost by instrument and by exchange.
- Code readability and modularity.
- Use of standard Python libraries and best practices.
- Robust error handling and logging.
- Data modeling and normalization.
- Clarity and completeness of generated reports.