|
| 1 | +# FX Streaming Data Pipeline — **RUNBOOK.md** |
| 2 | + |
| 3 | +> **Scope:** Tier‑1 UK Bank (BARX‑style) FX events → Pub/Sub → Dataflow (Beam) → BigQuery (raw→stage→marts), orchestrated by Cloud Composer. Security with VPC‑SC + CMEK. Documentation is fully **sanitized** (no client data). |
| 4 | +
|
| 5 | +--- |
| 6 | + |
| 7 | +## What we are doing (context) |
| 8 | + |
| 9 | +Operating a production‑grade, real‑time FX streaming pipeline and keeping it stable, secure, and trustworthy. |
| 10 | + |
| 11 | +## Why we are doing it (logic) |
| 12 | + |
| 13 | +Front‑office and regulatory consumers depend on accurate, low‑latency data. This runbook gives deterministic steps to monitor, troubleshoot, and recover with minimal MTTR. |
| 14 | + |
| 15 | +--- |
| 16 | + |
| 17 | +## 0) Resource naming (as implemented) |
| 18 | + |
| 19 | +* **GCP Project:** `tier1-uk-bank-fx-sim` |
| 20 | +* **Pub/Sub topics:** `fx.trades`, `fx.quotes`, `fx.confirms`, and `fx.dlq` (dead‑letter) |
| 21 | +* **Dataflow job / template:** `fx_stream_pipeline` (Streaming Engine ON) |
| 22 | +* **BigQuery datasets:** `fx_raw`, `fx_stage`, `fx_mart`, `fx_ops` (QC/ops) |
| 23 | +* **BigQuery tables (examples):** `fx_raw.trades_*`, `fx_stage.validated_trades`, `fx_mart.aggregated_positions`, `fx_ops.qc_results` |
| 24 | +* **Composer DAGs:** `fx_ingestion_dag`, `fx_qc_dag`, `fx_replay_dag`, `fx_backfill_dag` |
| 25 | +* **KMS key (CMEK):** `projects/<kms-project>/locations/<loc>/keyRings/fx-ring/cryptoKeys/fx-cmek` |
| 26 | + |
| 27 | +> Replace bracketed values if your actual names differ; otherwise use these defaults consistently in README and diagrams. |
| 28 | +
|
| 29 | +--- |
| 30 | + |
| 31 | +## 1) System Overview |
| 32 | + |
| 33 | +* **Producers:** |
| 34 | + |
| 35 | + * *trades* from OMS/EMS (BARX ecosystem) |
| 36 | + * *quotes* from pricing engine |
| 37 | + * *confirms* from post‑trade/custodian |
| 38 | +* **Transport:** Google **Pub/Sub** topics per event type |
| 39 | +* **Processing:** **Dataflow** (Apache Beam) — validate, dedup (windowed Distinct), enrich, route; DLQ on error |
| 40 | +* **Storage:** **BigQuery** datasets: `fx_raw`, `fx_stage`, `fx_mart` (partitioned & clustered) |
| 41 | +* **Orchestration:** **Cloud Composer** (Airflow) — ingestion checks, QC DAGs, replay/compaction/exports |
| 42 | +* **Security:** **VPC‑SC** perimeter; **CMEK** via Cloud KMS; least‑privilege SA‑IAM |
| 43 | +* **Observability:** Cloud Monitoring dashboards + Error Reporting + BQ audit views |
| 44 | +* **SLO:** **p95 end‑to‑end latency < 90s** (Pub/Sub publish → BQ mart row available) |
| 45 | + |
| 46 | +--- |
| 47 | + |
| 48 | +## 2) Environments |
| 49 | + |
| 50 | +* **dev** – rapid iteration; relaxed alerting |
| 51 | +* **stg** – near‑prod; full QC; can replay from prod snapshots where allowed |
| 52 | +* **prod** – restricted; VPC‑SC enforced; change via PR + approval |
| 53 | + |
| 54 | +> **Change control:** All DAG/Beam changes merge via PR + CODEOWNERS review; versioned releases with tags. |
| 55 | +
|
| 56 | +--- |
| 57 | + |
| 58 | +## 3) Data Contracts (high level) |
| 59 | + |
| 60 | +* **Envelope:** JSON/Avro with required keys per topic |
| 61 | + |
| 62 | + * `trades`: `trade_id` (UUID), `symbol`, `side`, `qty`, `price`, `event_time`, `trader_id`, `venue`, `notional_ccy` |
| 63 | + * `quotes`: `quote_id`, `symbol`, `bid`, `ask`, `event_time`, `source` |
| 64 | + * `confirms`: `confirm_id`, `trade_id`, `status`, `event_time` |
| 65 | +* **Semantics:** UTC timestamps; ISO currency codes; `event_time` monotonic per key |
| 66 | +* **Contracts stored in:** `docs/data_contracts/*.json` (sanitized) |
| 67 | + |
| 68 | +--- |
| 69 | + |
| 70 | +## 4) Daily Operations Checklist (Prod) |
| 71 | + |
| 72 | +1. **Composer** → DAGs status: |
| 73 | + |
| 74 | + * `fx_ingestion_dag` **success** in last 24h |
| 75 | + * `fx_qc_dag` **success**; QC tables updated |
| 76 | +2. **Dataflow** → Job `fx_stream_pipeline`: |
| 77 | + |
| 78 | + * Worker health **green**, backlog **near zero** |
| 79 | +3. **BigQuery**: |
| 80 | + |
| 81 | + * New partitions created for `fx_raw.*` |
| 82 | + * Stage→Mart transformations completed |
| 83 | +4. **Monitoring**: |
| 84 | + |
| 85 | + * p95 latency < **90s** (daily trend OK) |
| 86 | + * DLQ count < **1000** |
| 87 | +5. **Incidents:** Review Error Reporting; zero unresolved P1 |
| 88 | + |
| 89 | +--- |
| 90 | + |
| 91 | +## 5) Monitoring & Alerting |
| 92 | + |
| 93 | +**Dashboards** (Cloud Monitoring): |
| 94 | + |
| 95 | +* *FX‑Latency* — E2E latency p50/p95; Dataflow backlog |
| 96 | +* *FX‑DLQ* — DLQ depth per topic; replay throughput |
| 97 | +* *FX‑BQ Health* — load errors, query slots utilization |
| 98 | + |
| 99 | +**Key alerts** |
| 100 | + |
| 101 | +* `fx-latency-p95-breach` — p95 > 90s for 10m |
| 102 | +* `fx-dlq-depth-high` — DLQ > 1000 messages for 15m |
| 103 | +* `fx-bq-load-failed` — load job errors > 0 |
| 104 | +* `fx-dataflow-job-down` — job not running / workers unhealthy |
| 105 | + |
| 106 | +--- |
| 107 | + |
| 108 | +## 6) QC (Quality Control) Jobs (Composer → BigQuery) |
| 109 | + |
| 110 | +* **Row‑count reconciliation:** Pub/Sub msg count vs `fx_raw` inserts (±0.1%) |
| 111 | +* **Schema validation:** required fields non‑null; datatypes per contract |
| 112 | +* **Business rules:** `trade_date <= event_time`, `qty > 0`, `notional > 0` |
| 113 | +* **Latency SLO check:** event ingest → mart arrival < 90s (p95) |
| 114 | +* **Anomaly scans:** outlier prices vs rolling window; symbol whitelist |
| 115 | + |
| 116 | +**QC outputs:** `fx_ops.qc_results` (table), daily summary email/Slack (sanitized alias) |
| 117 | + |
| 118 | +--- |
| 119 | + |
| 120 | +## 7) Replay, Backfill & Compaction |
| 121 | + |
| 122 | +* **Replay DLQ:** `fx_replay_dag` → re‑validate → write to stage → mark processed |
| 123 | +* **Backfill:** run `fx_backfill_dag` with `start_ts`, `end_ts` → idempotent writes |
| 124 | +* **Compaction:** periodic merge of micro‑batches into partitioned tables to optimize cost/perf |
| 125 | + |
| 126 | +--- |
| 127 | + |
| 128 | +## 8) Failure Scenarios **& Mitigation Steps** |
| 129 | + |
| 130 | +> **Conventions:** |
| 131 | +> |
| 132 | +> * *Detect* = how we notice; *Diagnose* = what to check; *Mitigate* = what to do now; *Fix* = durable action; *Prevent* = long‑term guardrail. |
| 133 | +
|
| 134 | +### A. Pub/Sub backlog spike (producer burst / subscriber slowdown) |
| 135 | + |
| 136 | +* **Detect:** Dataflow backlog ↑; p95 latency > 90s; Pub/Sub *Oldest unacked* ↑ |
| 137 | +* **Diagnose:** Dataflow worker CPU/mem; autoscaling events; hot keys/skew; publisher rate change |
| 138 | +* **Mitigate:** |
| 139 | + |
| 140 | + 1. Temporarily **raise max workers** / enable **Streaming Engine** |
| 141 | + 2. **Shard keys** if single hot symbol; widen Beam windows |
| 142 | + 3. Throttle upstream publisher (if coordinated) |
| 143 | +* **Fix:** Optimize transforms (combine, side inputs); add **partitioning/branching** per topic |
| 144 | +* **Prevent:** Sizing SLO tests; autoscaling caps tuned; producer rate contracts |
| 145 | + |
| 146 | +### B. Dataflow job crash / workers unhealthy |
| 147 | + |
| 148 | +* **Detect:** Alert `fx-dataflow-job-down`; Error Reporting exceptions |
| 149 | +* **Diagnose:** Job logs; recent deploy; schema drift entering transforms |
| 150 | +* **Mitigate:** |
| 151 | + |
| 152 | + 1. **Drain** bad job; **restart** last stable template/version |
| 153 | + 2. If schema drift: **bypass to DLQ** and keep pipeline live |
| 154 | +* **Fix:** Add null‑safe parsing; feature‑flag for new fields |
| 155 | +* **Prevent:** Canary deploys; contract‑first validation |
| 156 | + |
| 157 | +### C. BigQuery load failures (invalid schema / quota) |
| 158 | + |
| 159 | +* **Detect:** Alert `fx-bq-load-failed`; BQ load error logs |
| 160 | +* **Diagnose:** Mismatch types; unexpected nulls; partition decorator overflow; slots saturated |
| 161 | +* **Mitigate:** |
| 162 | + |
| 163 | + 1. Route offending records → **DLQ** immediately |
| 164 | + 2. Retry with **write‑truncate** for stage temp tables when safe |
| 165 | +* **Fix:** Align schemas; add **RELAXED** mode only if contract permits |
| 166 | +* **Prevent:** Pre‑deploy schema checks; slot reservations for ETL windows |
| 167 | + |
| 168 | +### D. DLQ depth continuously growing |
| 169 | + |
| 170 | +* **Detect:** Alert `fx-dlq-depth-high` sustained |
| 171 | +* **Diagnose:** Top error codes by message; recent code push; upstream change |
| 172 | +* **Mitigate:** |
| 173 | + |
| 174 | + 1. Hot‑patch parser rules to accept benign changes |
| 175 | + 2. Run `fx_replay_dag` for a narrow time window after fix |
| 176 | +* **Fix:** Tighten contract + versioning; communicate with producers |
| 177 | +* **Prevent:** Schema evolution policy; consumer tests in CI |
| 178 | + |
| 179 | +### E. Late data spike (watermarks behind) |
| 180 | + |
| 181 | +* **Detect:** QC latency breach; watermark lag metrics |
| 182 | +* **Diagnose:** Venue outage; clock skew; batch dumps from upstream |
| 183 | +* **Mitigate:** Temporarily **increase allowed lateness**; widen windows |
| 184 | +* **Fix:** Move to event‑time + idempotent **upserts/merges** in stage |
| 185 | +* **Prevent:** Producer SLAs; clock sync monitoring |
| 186 | + |
| 187 | +### F. Duplicate events / over‑counting |
| 188 | + |
| 189 | +* **Detect:** QC reconciliation fails; metric jumps without volume change |
| 190 | +* **Diagnose:** At‑least‑once retries; replay overlap |
| 191 | +* **Mitigate:** Ensure **Beam Distinct** keyed by `trade_id` within window; use **BQ MERGE** on `trade_id` |
| 192 | +* **Fix:** Add `insertId` on writes; strengthen idempotency contract |
| 193 | +* **Prevent:** Replay windows guarded; de‑dup dashboards |
| 194 | + |
| 195 | +### G. Schema drift (new/renamed fields) |
| 196 | + |
| 197 | +* **Detect:** Parser exceptions; null inflation in stage |
| 198 | +* **Diagnose:** Compare incoming payload vs contract |
| 199 | +* **Mitigate:** Route unknown fields to **side‑channel**; keep core flow green |
| 200 | +* **Fix:** Versioned contracts; code path for vNext |
| 201 | +* **Prevent:** Producer change notice; contract registry checks in CI |
| 202 | + |
| 203 | +### H. CMEK / KMS key issue (permission/rotation) |
| 204 | + |
| 205 | +* **Detect:** Permission denied on writes/reads |
| 206 | +* **Diagnose:** KMS key IAM; key state (disabled/rotated) |
| 207 | +* **Mitigate:** Use **emergency key version** (break‑glass) per policy |
| 208 | +* **Fix:** Rotate and re‑grant minimal roles |
| 209 | +* **Prevent:** Key‑health alerts; rotation runbook calendarized |
| 210 | + |
| 211 | +### I. VPC‑SC perimeter violation (exfil attempt) |
| 212 | + |
| 213 | +* **Detect:** Access denied; VPC‑SC violation logs |
| 214 | +* **Diagnose:** Source network, SA identity, service endpoint |
| 215 | +* **Mitigate:** Run workload **inside** perimeter; use **Private Google Access** |
| 216 | +* **Fix:** Update perimeter members; service endpoints |
| 217 | +* **Prevent:** Regular perimeter tests; least‑privilege SA maps |
| 218 | + |
| 219 | +### J. Composer DAG failures |
| 220 | + |
| 221 | +* **Detect:** Task red; DAG missed schedule |
| 222 | +* **Diagnose:** Airflow logs; connection secrets; dependency timing |
| 223 | +* **Mitigate:** Rerun failed task; increase pool slots; clear & backfill |
| 224 | +* **Fix:** Add retries/exponential backoff; task‑level SLAs |
| 225 | +* **Prevent:** Unit tests for DAGs; pinned provider versions |
| 226 | + |
| 227 | +### K. Cost anomaly (BQ/DF overrun) |
| 228 | + |
| 229 | +* **Detect:** Budget alert; cost dashboard spike |
| 230 | +* **Diagnose:** Top queries/jobs by slot & bytes; worker‑hours |
| 231 | +* **Mitigate:** Pause non‑critical DAGs; cap autoscaling; prune partitions |
| 232 | +* **Fix:** Optimize clustering/partitioning; caching; storage TTLs |
| 233 | +* **Prevent:** Budgets + alerts; monthly cost reviews |
| 234 | + |
| 235 | +--- |
| 236 | + |
| 237 | +## 9) Standard Operating Procedures (SOP) |
| 238 | + |
| 239 | +* **Deploy new Dataflow template** |
| 240 | + |
| 241 | + 1. Build template → stage in GCS (versioned path) |
| 242 | + 2. Update Composer variable `DF_TEMPLATE_URI` |
| 243 | + 3. Trigger canary in **stg** → verify QC → promote to **prod** |
| 244 | +* **Trigger manual replay** |
| 245 | + |
| 246 | + * Composer → `fx_replay_dag` → set `topic`, `start_ts`, `end_ts` → run; monitor replay metrics |
| 247 | +* **Open a P1 incident** |
| 248 | + |
| 249 | + * Criteria: sustained p95 > 5m over SLO, DLQ > 10k, job down |
| 250 | + * Create incident ticket; page on‑call; start incident doc |
| 251 | + |
| 252 | +--- |
| 253 | + |
| 254 | +## 10) Access & Security |
| 255 | + |
| 256 | +* **Perimeter:** VPC‑SC around BQ, GCS, Pub/Sub, Dataflow, Composer |
| 257 | +* **Encryption:** CMEK for BQ datasets, GCS buckets, Pub/Sub topics |
| 258 | +* **Identities:** Dedicated SAs per component; no user keys; break‑glass account documented |
| 259 | +* **Auditing:** Cloud Audit Logs stored 400 days; access reviews monthly |
| 260 | + |
| 261 | +--- |
| 262 | + |
| 263 | +## 11) Escalation & Ownership |
| 264 | + |
| 265 | +| Area | Owner / Alias | |
| 266 | +| ------------------------ | ------------------------ | |
| 267 | +| Data Engineering On‑Call | de‑oncall@barx‑sim.local | |
| 268 | +| Platform/SRE | sre@barx‑sim.local | |
| 269 | +| Security | secops@barx‑sim.local | |
| 270 | + |
| 271 | +> **RTO goal:** Critical flow restored in **< 30 minutes**; stale data reconciled same day. |
| 272 | +
|
| 273 | +--- |
| 274 | + |
| 275 | +## 12) Change Management |
| 276 | + |
| 277 | +* PRs with linked ADRs; semantic version tags (e.g., `v1.1.0`) |
| 278 | +* Release notes summarizing schema or SLO impacts |
| 279 | +* Post‑deploy validation: QC green, latency within SLO for 1h |
| 280 | + |
| 281 | +--- |
| 282 | + |
| 283 | +## 13) Appendix |
| 284 | + |
| 285 | +### A. Sample QC SQL (row‑count) |
| 286 | + |
| 287 | +```sql |
| 288 | +-- trades: previous 15 minutes |
| 289 | +WITH src AS ( |
| 290 | + SELECT COUNT(1) AS c FROM fx_raw.trades |
| 291 | + WHERE _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 MINUTE) |
| 292 | +), tgt AS ( |
| 293 | + SELECT COUNT(1) AS c FROM fx_stage.validated_trades |
| 294 | + WHERE _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 MINUTE) |
| 295 | +) |
| 296 | +SELECT src.c AS pubsub_estimate, tgt.c AS bq_rows, ABS(src.c - tgt.c) / GREATEST(src.c,1) AS rel_diff; |
| 297 | +``` |
| 298 | + |
| 299 | +### B. Beam dedup sketch |
| 300 | + |
| 301 | +```python |
| 302 | +( |
| 303 | + trades |
| 304 | + | beam.Map(lambda r: (r['trade_id'], r)) |
| 305 | + | beam.WindowInto(beam.window.FixedWindows(60)) |
| 306 | + | beam.Distinct() |
| 307 | +) |
| 308 | +``` |
| 309 | + |
| 310 | +### C. Paths & Consoles |
| 311 | + |
| 312 | +* Pub/Sub topics: `projects/<proj>/topics/fx.trades|quotes|confirms` |
| 313 | +* Dataflow job: `fx_stream_pipeline` |
| 314 | +* BQ datasets: `fx_raw`, `fx_stage`, `fx_mart` |
| 315 | +* Composer DAGs: `fx_ingestion_dag`, `fx_qc_dag`, `fx_replay_dag` |
| 316 | + |
| 317 | +--- |
| 318 | + |
| 319 | +*Last updated: {{2025‑11‑05}} | Document is sanitized — no client data.* |
0 commit comments