A streaming post-trade reconciliation system inspired by real-world infrastructure.
System & Monitoring Metrics
- β‘ Per-record Latency: ~45ms
In trading systems, multiple systems (internal and external) log the same trade at different times, with slight variations. These include:
- Internal trading engines
- Broker confirmations
- Risk/PnL systems
- Clearinghouses or custodians
Discrepancies in price, quantity, or timestamp can indicate serious issues: execution errors, data corruption, or compliance violations.
Goals
- Reconciling trade records across systems
- Detecting and flagging mismatches
- Generating end-of-day compliance reports
- Ensuring timely alerts and robust downstream reliability
To build a real-time, Kafka-driven trade reconciliation engine that compares:
- Execution data from the internal engine
- Confirmation data from broker systems
- Optional risk snapshots from the PnL system
β¦and flags any mismatches in real-time.
In a fast-paced trading environment, maintaining data integrity across numerous disparate systems is paramount. Even minor variations in trade details can lead to significant financial, operational, or regulatory risks.
TradeRecon directly addresses this by providing an automated, real-time mechanism to:
- Ensure Data Consistency: Guaranteeing that all internal and external records of a trade align.
- Mitigate Risk: Rapidly identifying potential execution errors, data corruption, or unauthorized activities.
- Streamline Compliance: Automating audit-ready reconciliation reports for regulatory obligations.
- Enhance Operational Efficiency: Reducing manual reconciliation effort and allowing focus on higher-value engineering tasks.
graph TD
A[Execution Engine] --> B(Kafka Topic: executions)
C[Confirmation System] --> D(Kafka Topic: confirmations)
E[PnL System] --> F(Kafka Topic: pnl_snapshot)
B -- Trade Data --> G[TradeRecon Engine]
D -- Trade Data --> G
F -- PnL Data --> G
G -- Reconciled Data --> H[Mismatch Checker]
H -- Results --> I[SQLite Database]
H -- Metrics --> P[Prometheus Metrics Endpoint]
H -- Alerts --> J[CLI Logs / Simulated Alerts]
H -- Reports --> K[HTML Report / CSV Export]
P -- Scrapes Metrics --> Q(Prometheus)
Q -- Data Source --> R(Grafana)
R -- Visualizes --> S[Monitoring Dashboards]
Simulated as Kafka topics (and/or fallback CSVs) for flexible testing:
executions
: Primary trade record from the internal trading system.broker_confirmations
: External confirmation of a trade from brokers.pnl_snapshot
: Snapshot of PnL impact and commission from the accounting system.
Example entries:
# executions.csv
trade_id,ticker,quantity,price,timestamp
T001,AAPL,100,190.50,2025-07-26T10:01:23
# broker_confirmations.csv
trade_id,ticker,quantity,price,timestamp
T001,AAPL,100,190.50,2025-07-26T10:01:22.900
# pnl_snapshot.csv
trade_id,pnl_impact,commission
T001,95.00,0.5
For every matched trade ID across the incoming streams, TradeRecon applies the following checks:
- β Quantity Match: Exact match between execution and confirmation.
- β Price Match: Must be within a tolerance (e.g., β€ 0.005).
- β Timestamp Match: Must be within a 100ms drift tolerance.
- β
PnL Consistency:
abs(price Γ quantity - commission - pnl_impact) < 1.0
On mismatch:
- Detailed CLI logging
- HTML summary report update
- Persistence to SQLite (audit trail)
- Prometheus metric updates
Layer | Tools | Role in Project |
---|---|---|
Stream Transport | Kafka (kafka-python ) |
Real-time ingestion |
Data Persistence | SQLite + SQLAlchemy | Audit trail storage |
Reconciliation Engine | Custom Python + threading | Core logic for trade comparison |
Reporting & UI | Flask + Jinja2 | Dynamic reports and UI |
Metrics Collection | Prometheus + prometheus_client |
Export metrics |
Visualization | Grafana | Monitoring dashboards |
Alerting | CLI Logs / (Slack, Email - simulated) | Immediate visibility |
Containerization | Docker, docker-compose | Easy deployment |
Testing | Pytest | Unit + integration tests |
Monitoring | Python logging |
Runtime observability |
TradeRecon/
βββ app/
β βββ __init__.py
β βββ consumer.py
β βββ reconcile.py
β βββ report_generator.py
β βββ utils.py
β βββ main.py
βββ kafka/
β βββ producer.py
βββ data/
β βββ executions.csv
β βββ broker_confirmations.csv
β βββ pnl_snapshot.csv
βββ reports/
β βββ templates/
β βββ report.html
βββ tests/
β βββ test_reconciliation.py
βββ prometheus/
β βββ prometheus.yml
βββ grafana/
β βββ provisioning/
β β βββ datasources/
β β β βββ datasource.yml
β β βββ dashboards/
β β βββ dashboard.yml
β βββ dashboards/
β βββ traderecon_dashboard.json
βββ Dockerfile
βββ docker-compose.yml
βββ requirements.txt
βββ README.md
Prerequisites:
Ensure you have Docker + Docker Compose installed.
- Replace your
docker-compose.yml
with the provided one. - Create the
prometheus/
directory and addprometheus.yml
. - Add
grafana/provisioning/
structure with:datasources/datasource.yml
dashboards/dashboard.yml
- Add
grafana/dashboards/traderecon_dashboard.json
- Ensure updated
app/main.py
andapp/reconcile.py
git clone https://github.com/OnePunchMonk/TradeRecon
cd TradeRecon
docker-compose up --build -d
Wait ~1β2 minutes for services to fully boot.
docker exec -it traderecon_app python kafka/producer.py
Watch logs from docker-compose
to see processing in real time.
Visit http://localhost:5000/
- View dynamic reconciliation results
- Optionally download CSV summary
Visit http://localhost:3000/
- Login:
- Username:
admin
- Password:
admin
- Username:
- View the pre-provisioned TradeRecon Overview dashboard.
π Monitoring Metrics with Grafana
Latency, bandwidth, and per-record reconciliation delays are logged and exported via Prometheus. Queue lengths and throughput metrics are visualized using Grafana dashboards for real-time observability and debugging.
π§Ύ Trade Overview Reporting with Jinja2
Reconciliation summaries are dynamically rendered using Jinja2-powered HTML templates. These reports include matched and mismatched trades, timestamp skews, and PnL validation outcomes.
- The reconciliation engine currently runs per incoming trade event via Kafka.
- Input trade data is simulated using CSV files, acting as Kafka producers.
- Reconciliation happens in real-time, not batch-based.
To improve observability and align with end-of-day compliance workflows, the pipeline run can be automated to start using Apache Airflow as an alternative batch processor, replacing Kafka for time-triggered execution.
We propose a hybrid horizontal architecture:
- Support both Kafka (real-time) and Airflow (batch) backends.
- Introduce a boolean field
reconciled
to the data schema, ensuring duplicate trades or already-matched entries are skipped in the batch pipeline.
- Current ingestion relies on flat CSV files, which limits scalability and concurrency.
- As a key future goal, migrate the ingestion and persistence layer to MongoDB or another scalable store.
- This also supports transitioning from Kafka-based real-time streaming to Airflow-based batch reconciliation as needed, enabling more flexible and resilient pipelines.
graph TD
A[Reconcile Trades] --> B[Generate Report]
B --> C[Send Email]
Category | Extension Idea | Description |
---|---|---|
π§ͺ Testing | Hypothesis-based Fuzzing | Generate boundary cases for corrupted/malformed trades. |
β± Scheduling | Airflow DAG Integration | Schedule end-of-day reports and batch validations. |
π Security | OAuth2 / AuthZ Middleware | Role-based access control to reports and APIs. |
π¦ Database | Switch to PostgreSQL | For better scale and query performance with audit trails. |
π¬ Alerting | Slack/Email Integrations | Integrate with actual messaging services for ops alerts. |
π§ ML Integration | Anomaly Detection | Use ML to score suspicious trade patterns before reconciliation. |