Skip to content

nikhilgiridharan/TradePulse

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Β 

History

99 Commits
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 
Β 

Repository files navigation

TradePulse

CI Python Kafka AWS Railway License

A production-grade two-stream real-time data pipeline ingesting live equity trades and news headlines, processing 15,000+ events/second with sub-100ms end-to-end latency, exactly-once semantics, and ML-powered anomaly detection.


🌐 Demo

Website tradepulse.nikhilgiridharan.com
Medium https://medium.com/@nikhilgiridharan/building-tradepulse-a-production-grade-real-time-market-data-pipeline-2dbf5be6dc10
Video https://www.youtube.com/watch?v=BupQapP_J0k

Architecture

╔══════════════════════════════════════════════════════════════════╗
β•‘  STREAM 1 β€” EQUITY DATA (10,000+ events/sec)                     β•‘
β•‘                                                                   β•‘
β•‘  Polygon.io ──▢ Kafka ──▢ Validator ──▢ Faust ──▢ DynamoDB      β•‘
β•‘  WebSocket      market     Pydantic      Stream    Hot store      β•‘
β•‘  Live trades    .trades     + DLQ        engine    48hr TTL       β•‘
β•‘                               β”‚            β”‚          β”‚           β•‘
β•‘                               β–Ό            β–Ό          β–Ό           β•‘
β•‘                           SQS DLQ    Anomaly ML   S3 Parquet     β•‘
β•‘                                      Feat Store   Cold store      β•‘
β•‘                                           β”‚                       β•‘
β•‘  STREAM 2 β€” NEWS DATA      β—„β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                      β•‘
β•‘                             60s temporal join                     β•‘
β•‘  Finnhub ──▢ Kafka ──▢ VADER ──▢ Correlator ──▢ DynamoDB        β•‘
β•‘  REST poll   market    Sentiment  News+market   market_sentiment  β•‘
β•‘  60s poll    .news     <1ms       signal join                     β•‘
β•‘                                                                   β•‘
β•‘  FastAPI ──▢ Custom Dashboard ──▢ tradepulse.nikhilgiridharan.comβ•‘
β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•

Key Engineering Decisions

1. Exactly-Once Semantics β€” Three Independent Layers

Financial data has zero tolerance for duplicates. A replayed trade processed twice corrupts every downstream aggregation. TradePulse enforces exactly-once at three layers simultaneously:

Layer 1 β€” Kafka producer:    enable.idempotence=True
                             Kafka deduplicates via sequence numbers

Layer 2 β€” Faust processing:  processing_guarantee='exactly_once'
                             Process + offset commit are atomic

Layer 3 β€” DynamoDB write:    ConditionExpression: attribute_not_exists
                             Rejects duplicate items on replay

If Faust crashes after writing to DynamoDB but before committing the Kafka offset, the event replays on restart β€” but the conditional write rejects the duplicate silently. Exactly-once guaranteed end to end.

2. DynamoDB Write Sharding

Using ticker as the bare partition key routes all AAPL writes to one DynamoDB partition. At market open (~800 writes/sec for AAPL alone), this hits DynamoDB's per-partition throughput limit and causes throttling β€” which is exactly how the February 3rd incident happened.

Naive key "AAPL":         Shard key "AAPL#3":
All writes β†’ 1 partition  shard = hash(ticker+timestamp) % 8
800 writes/sec β†’ THROTTLE 8 partitions Γ— 100 writes/sec β†’ HEALTHY

3. Dead Letter Queue Pattern

Failed messages are never dropped silently. After 3 retries, messages route to SQS with full context (original payload, error reason, Kafka offset, retry count). A DLQ consumer retries every 15 minutes. After 5 DLQ retries, messages archive to s3://tradepulse-data/dead-letters/ for manual inspection.

4. Backpressure Mechanism

When DynamoDB write latency exceeds 100ms for 3 consecutive writes, Faust pauses consumption for 500ms. Without backpressure, slow downstream writes cause the in-memory buffer to grow unboundedly β€” eventually causing OOM or a crash. Backpressure trades a controlled increase in Kafka consumer lag (safe β€” Kafka holds data durably) for stable memory usage.

5. Two-Stream Temporal Join

A news article alone is weak signal. A volume spike alone is ambiguous. When strong-sentiment news arrives within 60 seconds of a volume z-score spike (>2.0), TradePulse surfaces a correlated market event β€” the compound signal that quantitative traders call "news alpha."

09:31:42 β€” NVDA volume z-score: 4.7
09:31:55 β€” Finnhub: "NVIDIA Blackwell GPU shipments accelerate ahead of schedule"
09:31:55 β€” VADER sentiment: +0.891 (strongly positive)
09:31:55 β€” Correlation: STRONG (Ξ”t = 13 seconds)

6. Real-Time Anomaly Detection on the Stream

Isolation Forest runs inside the Faust agent at 0.3ms per event. The model trains on the last 1,000 events per ticker and retrains every 500 new events β€” market conditions change throughout the day and a static model trained at open would be badly miscalibrated by close.


Performance Benchmarks

Load tested using a custom script simulating market-open burst traffic (8-10x sustained volume) across 5 tickers. Measurement: end-to-end from WebSocket receive to DynamoDB write confirmed.

Metric Result
Sustained throughput 14,800 events/sec
Peak throughput (30s burst) 31,200 events/sec
End-to-end latency p50 12ms
End-to-end latency p95 34ms
End-to-end latency p99 67ms
DynamoDB write latency p99 18ms
Kafka consumer lag (sustained) <50 messages
Anomaly detection inference 0.3ms/event
VADER sentiment inference <1ms/article
Data loss under failure conditions 0 events

API Endpoints

Method Endpoint Description Cache
GET /quotes/{ticker} Latest trade price and volume 1s
GET /aggregations/{ticker} VWAP, z-score, momentum 5s
GET /anomalies/{ticker} Recent Isolation Forest detections None
GET /features/{ticker} Real-time feature vector 1s
GET /sentiment/{ticker} News sentiment with market correlation None
GET /health Pipeline health status None

Tech Stack

Layer Technology Why
Data source (equity) Polygon.io WebSocket Live tick data, free tier
Data source (news) Finnhub REST API Company-level news, free tier
Message broker Apache Kafka Durable buffer, replay capability, exactly-once
Stream processing Apache Faust Python-native, fault-tolerant state, exactly-once
Sentiment analysis VADER <1ms inference vs 200ms+ for FinBERT
Anomaly detection Scikit-learn Isolation Forest Multi-dimensional, adapts to market conditions
Hot storage AWS DynamoDB Single-digit ms reads, zero ops overhead
Cold storage AWS S3 + Parquet Columnar queries via Athena, Snappy compression
Dead letter queue AWS SQS Durable failure capture with retry
API layer FastAPI Async, auto OpenAPI, Pydantic validation
Containerization Docker Compose One-command local stack
Deployment Railway FastAPI dashboard in demo mode
Observability AWS CloudWatch Metrics, alarms, dashboards

Scale Considerations

At 10x (150k events/sec): Move from local Kafka to Amazon MSK, switch DynamoDB to on-demand capacity mode, run 3 Faust worker instances in parallel, reduce S3 flush interval from 5min to 2min.

At 100x (1.5M events/sec): Replace Faust with Apache Flink, add Redis caching layer in front of DynamoDB, move to multi-region DynamoDB Global Tables, spin anomaly detection into a dedicated microservice, add Redshift for sub-second historical analytics.


Getting Started

Option A β€” View live demo

Visit tradepulse.nikhilgiridharan.com β€” no setup required. Runs in demo mode with simulated market data seeded from real current prices.

Option B β€” Run the full pipeline locally

Prerequisites:

  • Docker and Docker Compose
  • Polygon.io API key (free tier)
  • Finnhub API key (free tier)
  • AWS account with DynamoDB, S3, and SQS access
# Clone and configure
git clone https://github.com/nikhilgiridharan/TradePulse
cd TradePulse
cp .env.example .env

# Add your keys to .env:
# POLYGON_API_KEY, FINNHUB_API_KEY
# AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY

# Start everything
make up

# Dashboard available at http://localhost:8000

Services started by make up:

  • Kafka + Zookeeper
  • Polygon.io equity producer
  • Finnhub news producer
  • Faust stream processor
  • FastAPI dashboard
make logs    # Tail all service logs
make test    # Run unit + integration tests
make down    # Stop all services

Documentation

Document Description
Architecture Full system design with component responsibilities
Schema DynamoDB table design, partition keys, access patterns
Benchmarks Load test methodology and full results
Runbook Operational procedures and alert thresholds
Feature Store Feature definitions and update frequency
Postmortem Hot partition incident β€” cause, fix, learnings
Deployment Railway setup and local full-pipeline instructions

Project Structure

TradePulse/
β”œβ”€β”€ src/
β”‚   β”œβ”€β”€ producer/
β”‚   β”‚   β”œβ”€β”€ polygon_producer.py      # WebSocket β†’ Kafka, idempotent
β”‚   β”‚   └── news_producer.py         # Finnhub polling β†’ Kafka
β”‚   β”œβ”€β”€ processing/
β”‚   β”‚   β”œβ”€β”€ faust_app.py             # Main stream processor, exactly-once
β”‚   β”‚   β”œβ”€β”€ aggregations.py          # VWAP, z-score, momentum
β”‚   β”‚   β”œβ”€β”€ anomaly_detection.py     # Isolation Forest, rolling retrain
β”‚   β”‚   β”œβ”€β”€ sentiment_analyzer.py    # VADER + market correlation
β”‚   β”‚   └── feature_store.py         # Precomputed features β†’ DynamoDB
β”‚   β”œβ”€β”€ validation/
β”‚   β”‚   └── schema_validator.py      # Pydantic validation + DLQ routing
β”‚   β”œβ”€β”€ storage/
β”‚   β”‚   β”œβ”€β”€ dynamo_writer.py         # Shard keys, conditional writes
β”‚   β”‚   β”œβ”€β”€ s3_writer.py             # Parquet buffering, Hive partitioning
β”‚   β”‚   └── dlq_handler.py           # SQS retry + S3 archival
β”‚   β”œβ”€β”€ api/
β”‚   β”‚   β”œβ”€β”€ main.py                  # FastAPI, 6 endpoints, CORS
β”‚   β”‚   └── static/
β”‚   β”‚       β”œβ”€β”€ index.html           # Live dashboard
β”‚   β”‚       └── about.html           # Documentation page
β”‚   └── monitoring/
β”‚       └── cloudwatch_metrics.py    # Batched CloudWatch emission
β”œβ”€β”€ tests/
β”‚   β”œβ”€β”€ unit/                        # Aggregation math, anomaly, validation
β”‚   └── integration/                 # Kafka, DynamoDB, DLQ end-to-end
β”œβ”€β”€ docs/                            # Architecture, schema, runbook, postmortem
β”œβ”€β”€ docker-compose.yml               # Full local stack
β”œβ”€β”€ Dockerfile                       # Railway production image
β”œβ”€β”€ railway.toml                     # Railway deployment config
└── Makefile                         # make up Β· make test Β· make lint

Releases

No releases published

Packages

 
 
 

Contributors

Languages