A high-throughput, real-time financial article processing system that leverages LLMs to parse unstructured text into structured data at scale. Built for quantitative investment funds to gain competitive edge in market intelligence.
Market Firehose processes 100+ articles per minute from multiple sources, using AI to extract:
- Publisher, author, and publication date
- Article title and body content
- Related market sectors and industries
- Stock tickers and company mentions
- Sentiment analysis
The system supports real-time pub/sub subscriptions, allowing downstream consumers to receive structured articles instantly.
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β MARKET FIREHOSE SYSTEM β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββ βββββββββββββββ β
β β RSS Feeds β β API Feeds β β Webhooks β β Scrapers β β
β ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ ββββββββ¬βββββββ β
β ββββββββββββββββββ΄βββββββββββββββββ΄βββββββββββββββββ β
β β β
β ββββββββββββΌβββββββββββ β
β β INGESTION LAYER β β
β β (Feed Adapters) β β
β ββββββββββββ¬βββββββββββ β
β β β
β ββββββββββββΌβββββββββββ β
β β MESSAGE QUEUE β β
β β (Redis) β β
β ββββββββββββ¬βββββββββββ β
β β β
β βββββββββββββββββββββββββββΌββββββββββββββββββββββββββ β
β β β β β
β ββββββββββΌβββββββββ ββββββββββββββΌβββββββββββββ ββββββββββΌβββββββββ β
β β LLM WORKER β β LLM WORKER β β LLM WORKER β β
β β (GPT-4o-mini) β β (GPT-4o-mini) β β (GPT-4o-mini) β β
β ββββββββββ¬βββββββββ ββββββββββββββ¬βββββββββββββ ββββββββββ¬βββββββββ β
β βββββββββββββββββββββββββββΌββββββββββββββββββββββββββ β
β β β
β ββββββββββββΌβββββββββββ β
β β POSTGRESQL β β
β β (Structured Data) β β
β ββββββββββββ¬βββββββββββ β
β β β
β ββββββββββββΌβββββββββββ β
β β PUB/SUB ENGINE β β
β β (Redis Streams) β β
β ββββββββββββ¬βββββββββββ β
β β β
β ββββββββββββββββββββββββββββββΌβββββββββββββββββββββββββββββ β
β β β β β
β βββββββΌββββββ βββββββββββΌββββββββββ ββββββββββΌββββββββ β
β β REST API β β WebSocket β β Webhooks β β
β β (FastAPI) β β Subscribers β β Callbacks β β
β βββββββββββββ βββββββββββββββββββββ ββββββββββββββββββ β
β β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
- Multi-source support: RSS feeds, REST APIs, webhooks, web scrapers
- Extensible adapters: Easy to add new data sources
- Deduplication: Content fingerprinting to prevent duplicates
- Rate limiting: Configurable polling intervals per source
- LLM-powered parsing: GPT-4o-mini for intelligent text extraction
- Parallel processing: 10+ concurrent workers
- Batch processing: Efficient handling of article batches
- Retry logic: Automatic retry with exponential backoff
| Field | Description |
|---|---|
publisher |
News source/publication name |
author |
Article author(s) |
published_date |
Publication timestamp |
title |
Article headline |
body |
Full article content |
summary |
AI-generated summary |
sectors |
Related market sectors (GICS-based) |
mentioned_tickers |
Stock symbols mentioned |
sentiment |
Positive/negative/neutral |
sentiment_score |
-1.0 to 1.0 score |
- WebSocket: Real-time streaming for low-latency clients
- Webhooks: Push notifications to external systems
- Polling: REST API with cursor-based pagination
- Filtering: Subscribe by sector, source, sentiment, or ticker
- Python 3.11+
- PostgreSQL 16+
- Redis 7+
- OpenAI API key
- Clone the repository
git clone https://github.com/your-org/Market-Firehose-System.git
cd Market-Firehose-System- Set up environment
# Copy environment template
cp env.example .env
# Edit .env with your configuration
# IMPORTANT: Set your OPENAI_API_KEY- Install dependencies
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install -r requirements.txt- Start services with Docker
docker-compose up -d postgres redis- Initialize the database
# The schema will be auto-applied when PostgreSQL starts
# Or run manually:
docker exec -i firehose_postgres psql -U postgres -d market_firehose < src/db/schema.sql
docker exec -i firehose_postgres psql -U postgres -d market_firehose < scripts/seed_sectors.sql- Run the API server
uvicorn src.main:app --reload --host 0.0.0.0 --port 8000- Start the worker (in a separate terminal)
python -m src.queue.workerdocker-compose up -dThis starts:
- PostgreSQL database (port 5432)
- Redis (port 6379)
- API server (port 8000)
- Background worker
POST /api/v1/articles
{
"articles": [
{
"raw_content": "Apple Inc. reported record quarterly earnings...",
"external_url": "https://example.com/article/123",
"source_id": "optional-uuid"
}
],
"priority": 0
}Response:
{
"ingested": 1,
"duplicates": 0,
"article_ids": ["550e8400-e29b-41d4-a716-446655440000"]
}GET /api/v1/articles
Query Parameters:
sector_id: Filter by sectorsource_id: Filter by sourcesentiment: Filter by sentiment (positive/negative/neutral)ticker: Filter by mentioned tickerfrom_date: Start date (ISO format)to_date: End date (ISO format)limit: Page size (default: 50)offset: Pagination offset
WS /api/v1/ws/subscribe
const ws = new WebSocket("ws://localhost:8000/api/v1/ws/subscribe");
ws.send(
JSON.stringify({
sectors: ["technology", "healthcare"],
min_confidence: 0.7,
})
);
ws.onmessage = (event) => {
const article = JSON.parse(event.data);
console.log("New article:", article.title);
};GET /health
{
"status": "healthy",
"database": "connected",
"redis": "connected",
"queue_depth": 42,
"articles_per_minute": 87.5
}| Table | Description |
|---|---|
articles |
Processed articles with extracted fields |
sources |
Article feed configurations |
sectors |
Market sector taxonomy (GICS-based) |
article_sectors |
Article-sector relationships |
subscribers |
Pub/sub subscriber configurations |
delivery_log |
Article delivery audit trail |
idx_articles_status: Fast pending article lookupidx_articles_published: Chronological queriesidx_articles_tickers: Ticker-based filtering (GIN)idx_articles_title_search: Full-text search
| Variable | Description | Default |
|---|---|---|
DATABASE_URL |
PostgreSQL connection string | postgresql+asyncpg://... |
REDIS_URL |
Redis connection string | redis://localhost:6379/0 |
OPENAI_API_KEY |
OpenAI API key | required |
OPENAI_MODEL |
LLM model to use | gpt-4o-mini |
WORKER_CONCURRENCY |
Parallel workers | 10 |
BATCH_SIZE |
Articles per batch | 20 |
LOG_LEVEL |
Logging level | INFO |
- Ingestion: 200+ articles/minute
- Processing: 100+ articles/minute (with 10 workers)
- Latency: <5s average per article
- API Response: <50ms for queries
- Horizontal scaling via additional workers
- Connection pooling for database efficiency
- Redis clustering for high availability
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific test file
pytest tests/test_llm_parser.py -vMarket-Firehose-System/
βββ src/
β βββ api/ # FastAPI routes and WebSocket handlers
β βββ db/ # Database layer and repositories
β βββ models/ # Pydantic models
β βββ services/ # Business logic (LLM, processing)
β βββ ingestion/ # Feed adapters
β βββ queue/ # Redis queue and workers
β βββ utils/ # Logging, metrics
βββ tests/ # Test suite
βββ scripts/ # Database scripts
βββ docker-compose.yml # Container orchestration
βββ requirements.txt # Python dependencies
- Phase 1: Foundation (Database, Models)
- Phase 2: LLM Processing Engine
- Phase 3: Feed Ingestion Adapters
- Phase 4: Message Queue & Workers
- Phase 5: REST API
- Phase 6: Pub/Sub Subscriptions
- Phase 7: Monitoring & Production
- Quantitative Analysis in Finance - Investopedia
- SEC Markets Data - Official SEC data
- FastAPI Documentation - API framework
- LLM Text Classification - Medium
MIT License - see LICENSE for details.
Built with β€οΈ for Company X Quant Team