|
| 1 | +# Vehicle Position Subscriptions |
| 2 | + |
| 3 | +Design document for real-time vehicle position streaming via GraphQL subscriptions. |
| 4 | + |
| 5 | +Branch: `vp-2` |
| 6 | + |
| 7 | +## Goal |
| 8 | + |
| 9 | +Allow clients to subscribe to live GTFS-RT vehicle position updates over WebSocket, with filtering by geography and feed. Positions are pushed as full snapshots on each RT update cycle. |
| 10 | + |
| 11 | +## Architecture |
| 12 | + |
| 13 | +``` |
| 14 | +RT Feed Sources |
| 15 | + | |
| 16 | + v |
| 17 | +Cache (Local or Redis) |
| 18 | + | pub/sub notification on update |
| 19 | + v |
| 20 | +Subscription Resolver |
| 21 | + | collect + filter positions from all cached feeds |
| 22 | + v |
| 23 | +WebSocket (graphql-ws protocol) |
| 24 | + | |
| 25 | + v |
| 26 | +Client |
| 27 | +``` |
| 28 | + |
| 29 | +Key design choice: the subscription resolver reads only from the RT cache, not the database. This keeps the hot path fast and avoids DB load per push, but limits filtering to data available in the GTFS-RT messages themselves. |
| 30 | + |
| 31 | +## GraphQL Schema |
| 32 | + |
| 33 | +```graphql |
| 34 | +type Subscription { |
| 35 | + vehicle_positions(where: VehiclePositionFilter): [VehiclePosition!]! |
| 36 | +} |
| 37 | + |
| 38 | +input VehiclePositionFilter { |
| 39 | + bbox: BoundingBox |
| 40 | + feed_onestop_ids: [String!] |
| 41 | + limit: Int |
| 42 | +} |
| 43 | +``` |
| 44 | + |
| 45 | +Extended `VehiclePosition` type fields: `trip`, `feed_onestop_id`, `bearing`, `speed`, `stop_id` (as String, no DB lookup). |
| 46 | + |
| 47 | +## Work Items |
| 48 | + |
| 49 | +### Infrastructure |
| 50 | + |
| 51 | +- [x] WebSocket transport in gqlgen server (gorilla/websocket, keepalive pings) |
| 52 | +- [x] `wsAwareTimeout` middleware bypasses `http.TimeoutHandler` for WebSocket upgrades (single router, no middleware duplication) |
| 53 | +- [x] Hijack() support on responseWriterWrapper for meters middleware |
| 54 | +- [x] GraphiQL v3 upgrade with native subscription support |
| 55 | + |
| 56 | +### Cache Pub/Sub |
| 57 | + |
| 58 | +- [x] `Subscribe()` and `GetSourceKeys()` on Cache interface |
| 59 | +- [x] LocalCache in-memory subscriber notification |
| 60 | +- [x] RedisCache PSUBSCRIBE-based notification |
| 61 | +- [x] Clean up unused `subscribers` map and `notifySubscribers` on RedisCache (dead code removed) |
| 62 | +- [ ] Consider debounce/coalesce: multiple RT feed updates within a short window should produce one push, not N |
| 63 | + |
| 64 | +### Vehicle Position Parsing |
| 65 | + |
| 66 | +- [x] Parse `ent.Vehicle` in `Source.processMessage` (was a TODO) |
| 67 | +- [x] `GetVehiclePositions()` on Source, Finder, and RTFinder interface |
| 68 | +- [x] `GetCachedFeedIDs()` to discover feeds with VP data from cache keys |
| 69 | +- [ ] **Thread safety (near-term):** Add `sync.RWMutex` to `Source` struct. `processMessage` is called from background goroutines (LocalCache.AddData, Redis listener) while `GetVehiclePositions`, `GetTrip`, and `GetTimestamp` are called concurrently from request/subscription goroutines. The fields (`entityByTrip`, `alerts`, `vehiclePositions`) are replaced non-atomically — readers can see inconsistent state across fields. This is a pre-existing race for trip updates and alerts; adding vehicle positions extends the surface. Fix: write lock in `processMessage`, read lock in all getters. |
| 70 | + |
| 71 | +### Subscription Resolver |
| 72 | + |
| 73 | +- [x] `subscriptionResolver.VehiclePositions` — subscribe, initial snapshot, ongoing pushes |
| 74 | +- [x] `convertVehiclePosition` — full protobuf-to-model mapping (position, bearing, speed, vehicle, trip, stop info, timestamp) |
| 75 | +- [x] `matchesFilter` — bbox filtering |
| 76 | +- [x] `feed_onestop_ids` filtering (at feed level before collecting) |
| 77 | + |
| 78 | +### Filters — Remaining |
| 79 | + |
| 80 | +- [x] `agency_ids`: Removed from schema — requires DB lookup which conflicts with cache-only design. May revisit. |
| 81 | +- [x] `route_ids`: Removed from schema — depends on optional RT field, keeping surface simple. May revisit. |
| 82 | +- [x] `stop_id`: Changed from `Stop` (object, DB lookup) to `String` (raw GTFS-RT value) to keep subscription path DB-free. |
| 83 | +- [x] `limit` on VehiclePositionFilter — defaults to 1000 (RESOLVER_MAXLIMIT), capped at 1000 |
| 84 | + |
| 85 | +### Performance |
| 86 | + |
| 87 | +- [ ] Scope notifications: currently any cache update (trip updates, alerts, etc.) triggers a full vehicle position collection for all subscribers. Filter to only `realtime_vehicle_positions` topics before collecting. |
| 88 | +- [ ] Debounce: coalesce rapid updates within a window (e.g., 500ms-1s) before pushing |
| 89 | +- [ ] RedisCache `GetSourceKeys` does a SCAN on every notification per subscriber. Cache key list in memory with short TTL, or maintain incrementally. |
| 90 | + |
| 91 | +### Observability / Security |
| 92 | + |
| 93 | +- [x] WebSocket requests now go through the same middleware stack as HTTP (single router) |
| 94 | +- [x] WebSocket upgrader `CheckOrigin` allows all origins — acceptable because all connections require explicit API key/auth header (no cookie/session auth) |
| 95 | +- [ ] Add subscription connection count metrics / logging for monitoring |
| 96 | + |
| 97 | +### Testing |
| 98 | + |
| 99 | +- [x] Integration tests via gqlgen WebSocket test client (`subscription_resolver_test.go`): |
| 100 | + - Full field mapping (feed_onestop_id, bearing, speed, position, vehicle, trip) |
| 101 | + - Feed filter (two feeds, filter to one) |
| 102 | + - Bbox filter (matching and non-matching) |
| 103 | + - Live update (empty initial snapshot, push data, verify update arrives) |
| 104 | +- [ ] Unit tests for `convertVehiclePosition` (pure function, easy to test) |
| 105 | +- [ ] Unit tests for `GetCachedFeedIDs` key parsing |
| 106 | +- [ ] Verify existing RT resolver tests still pass with Cache interface changes |
| 107 | + |
| 108 | +### Dev Tooling |
| 109 | + |
| 110 | +- [x] `cmd/rt-test-server` — synthetic VP generator + optional real RT feed fetcher |
| 111 | +- [x] `testdata/dmfr/rt-test.dmfr.json` fixture |
| 112 | + |
| 113 | +## Design Decisions |
| 114 | + |
| 115 | +- **Cache-only, no DB in hot path.** The subscription resolver reads exclusively from the RT cache. This avoids DB load per push and keeps latency low. It constrains what filtering and field resolution is possible — filters that require static GTFS data (agency_ids, route_ids) and fields that resolve to DB objects (stop_id as Stop) were removed or simplified to match. |
| 116 | +- **`stop_id` is a raw String, not a resolved Stop object.** The rest of the GraphQL API resolves stop_id to a Stop entity via DB lookup. The subscription intentionally returns the raw GTFS-RT string to avoid DB dependencies. A resolved `stop` field could be added later as an opt-in. |
| 117 | +- **`agency_ids` and `route_ids` removed from filter.** Both require joining against static GTFS data. Keeping the filter surface to what the cache can deliver (bbox, feed_onestop_ids, limit). |
| 118 | +- **Single router with `wsAwareTimeout`.** Standard Go pattern — `http.TimeoutHandler` is incompatible with WebSocket (no Hijack support, kills long-lived connections). A single middleware skips the timeout for Upgrade requests. All other middleware (auth, CORS, meters, logging, permissions) is shared. |
| 119 | +- **`CheckOrigin` allows all origins.** Safe because auth is API-key-based (explicit header), not cookie/session-based. No ambient browser credentials to exploit via CSRF. |
| 120 | +- **Full snapshot per push, not deltas.** Simpler for consumers (replace the whole list, no client-side state management). Trade-off is bandwidth for large fleets. |
| 121 | +- **Default limit of 1000 per push.** Uses existing `RESOLVER_MAXLIMIT`. Prevents unbounded responses from unfiltered subscriptions. |
| 122 | +- **gorilla/websocket.** Archived/unmaintained, but it's gqlgen's transitive dependency. No alternative without switching gqlgen's transport. |
| 123 | + |
| 124 | +## Open Questions |
| 125 | + |
| 126 | +1. **Snapshot vs. delta**: Currently sends full snapshot of all matching positions on every update. Should this eventually support deltas (added/changed/removed)? |
| 127 | +2. **Rate limiting**: Should there be a max subscription count per client or global? |
| 128 | +3. **gorilla/websocket**: Archived/unmaintained, but it's a transitive dependency via gqlgen's `transport.Websocket`. No action needed unless gqlgen migrates (likely to `coder/websocket`). |
0 commit comments