Skip to content

Commit 200080d

Browse files
authored
feat: wire WebSocket event stream end-to-end (#1288)
* feat: wire WebSocket event stream end-to-end Backend: Wire event stream components (OutboxEventSource + LocalFanOut) in the unified binary, reusing the existing FA GORM connection for outbox polling. The event router starts in the background and the WebSocket handler is registered on GET /ws/events via the gateway server option. Frontend: Replace the stub useEffect in useEventStream with an actual WebSocket connection that connects to /ws/events, subscribes to all channels, and delivers events through the existing handleEvent callback for React Query cache invalidation. Includes exponential backoff reconnection (max 5 attempts) and clean teardown on unmount. Vite: Add /ws/events WebSocket proxy for the dev server so the frontend can reach the backend during local development. * fix: guard against negative EVENT_STREAM_BUFFER_SIZE Clamp negative values to the default (256) to avoid a panic in NewLocalFanOut when the env var is misconfigured. * fix: default EVENT_STREAM_ENABLED to false and add reconnect jitter Change EVENT_STREAM_ENABLED default from true to false so operators must explicitly opt-in. Add random jitter to WebSocket reconnect backoff to prevent thundering herd when backend restarts. --------- Co-authored-by: Ben Coombs <bjcoombs@users.noreply.github.com>
1 parent 8e4c9bc commit 200080d

3 files changed

Lines changed: 184 additions & 35 deletions

File tree

cmd/meridian/main.go

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ import (
8787

8888
// Gateway
8989
"github.com/meridianhub/meridian/services/gateway"
90+
"github.com/meridianhub/meridian/services/gateway/eventstream"
91+
"github.com/meridianhub/meridian/services/gateway/eventstream/adapters"
9092

9193
// Shared platform
9294
masterbootstrap "github.com/meridianhub/meridian/internal/bootstrap"
@@ -283,12 +285,20 @@ func run(logger *slog.Logger, grpcPort, httpPort int) error {
283285
// ─── Start Gateway HTTP Server ───────────────────────────────────────
284286

285287
platformDSN := replaceDSNDatabase(baseDSN, "meridian_platform")
286-
gwServer, err := wireGateway(grpcPort, httpPort, platformDSN, conns.gormDB("tenant"), logger)
288+
289+
eventRouter, extraGWOpts := wireEventStream(conns.gormDB("financial-accounting"), logger)
290+
291+
gwServer, err := wireGateway(grpcPort, httpPort, platformDSN, conns.gormDB("tenant"), logger, extraGWOpts...)
287292
if err != nil {
288293
return fmt.Errorf("gateway init: %w", err)
289294
}
290295

291296
gatewayErrors := make(chan error, 1)
297+
298+
// Start event router in background (consumes from EventSource and publishes to FanOut).
299+
routerCancel := startEventRouter(ctx, eventRouter, logger)
300+
defer routerCancel()
301+
292302
go func() {
293303
if err := gwServer.Start(context.Background()); err != nil {
294304
gatewayErrors <- err
@@ -870,7 +880,7 @@ var serviceNames = []string{
870880
// wireGateway creates the gateway HTTP server with the Vanguard transcoder
871881
// routing REST/JSON, Connect, and gRPC-Web requests to the shared gRPC server
872882
// running on grpcPort.
873-
func wireGateway(grpcPort, httpPort int, databaseURL string, tenantDB *gorm.DB, logger *slog.Logger) (*gateway.Server, error) {
883+
func wireGateway(grpcPort, httpPort int, databaseURL string, tenantDB *gorm.DB, logger *slog.Logger, extraOpts ...gateway.ServerOption) (*gateway.Server, error) {
874884
grpcTarget := fmt.Sprintf("localhost:%d", grpcPort)
875885

876886
authConfig := gateway.LoadAuthConfig()
@@ -931,9 +941,66 @@ func wireGateway(grpcPort, httpPort int, databaseURL string, tenantDB *gorm.DB,
931941
return nil, fmt.Errorf("failed to create tenant resolver: %w", err)
932942
}
933943

944+
// Append caller-provided options (e.g., event stream handler).
945+
opts = append(opts, extraOpts...)
946+
934947
return gateway.NewServer(config, logger, tenantResolver, opts...), nil
935948
}
936949

950+
// ─── Event Stream Wiring ─────────────────────────────────────────────────────
951+
952+
// startEventRouter launches the event router in a background goroutine and
953+
// returns a cancel function. If router is nil the returned cancel is a no-op.
954+
func startEventRouter(ctx context.Context, router *eventstream.Router, logger *slog.Logger) context.CancelFunc {
955+
if router == nil {
956+
return func() {}
957+
}
958+
routerCtx, cancel := context.WithCancel(ctx)
959+
go func() {
960+
if err := router.Start(routerCtx); err != nil {
961+
logger.Error("event router error", "error", err)
962+
}
963+
}()
964+
logger.Info("event router started")
965+
return cancel
966+
}
967+
968+
// wireEventStream conditionally builds event stream components and returns the
969+
// Router (for lifecycle management) and any gateway ServerOptions that should be
970+
// applied. When EVENT_STREAM_ENABLED is false or unset, both return values are
971+
// nil/empty. Set EVENT_STREAM_ENABLED=true to enable.
972+
func wireEventStream(faDB *gorm.DB, logger *slog.Logger) (*eventstream.Router, []gateway.ServerOption) {
973+
if !env.GetEnvAsBool("EVENT_STREAM_ENABLED", false) {
974+
return nil, nil
975+
}
976+
router, wsHandler := buildUnifiedEventStreamComponents(faDB, logger)
977+
logger.Info("event stream components initialized (outbox source, local fan-out)")
978+
return router, []gateway.ServerOption{gateway.WithEventStreamHandler(wsHandler)}
979+
}
980+
981+
// buildUnifiedEventStreamComponents creates event stream components for the unified binary.
982+
// Uses OutboxEventSource (polls shared outbox table) and LocalFanOut (in-process channels).
983+
// This matches dev/CI mode from the standalone gateway.
984+
//
985+
// The db parameter should be the financial-accounting GORM connection where
986+
// outbox events are written via OutboxPublisher. The connection is shared with
987+
// the service — no separate cleanup is needed.
988+
func buildUnifiedEventStreamComponents(db *gorm.DB, logger *slog.Logger) (*eventstream.Router, *eventstream.Handler) {
989+
pollInterval := env.GetEnvAsDuration("OUTBOX_POLL_INTERVAL", 500*time.Millisecond)
990+
source := adapters.NewOutboxEventSource(db, pollInterval, logger)
991+
992+
bufferSize := env.GetEnvAsInt("EVENT_STREAM_BUFFER_SIZE", 256)
993+
if bufferSize < 0 {
994+
bufferSize = 256
995+
}
996+
fanOut := adapters.NewLocalFanOut(bufferSize)
997+
998+
router := eventstream.NewRouter(source, fanOut)
999+
handler := eventstream.NewHandler(router, logger)
1000+
1001+
return router, handler
1002+
}
1003+
9371004
// ─── Per-Service Database Connections ────────────────────────────────────────
9381005

9391006
// replaceDSNDatabase replaces the database name in a PostgreSQL/CockroachDB DSN URL.

frontend/src/hooks/use-event-stream.ts

Lines changed: 113 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import { useEffect, useState, useCallback } from 'react'
1+
import { useEffect, useState, useCallback, useRef } from 'react'
22
import { useQueryClient } from '@tanstack/react-query'
33
import { useTenantSlug } from './use-tenant-context'
44

55
/**
66
* Represents a domain event from the backend event stream.
7-
* This stub supports the Phase 4 WebSocket integration.
87
*
98
* @example
109
* {
@@ -56,10 +55,35 @@ export const EVENT_QUERY_MAP: Record<string, (event: DomainEvent) => unknown[][]
5655
],
5756
}
5857

58+
/** Wire format of a message received from the gateway WebSocket endpoint. */
59+
interface ServerMessage {
60+
type: 'event' | 'subscribed' | 'error' | 'system'
61+
subscription_id?: string
62+
channel?: string
63+
event?: {
64+
event_id: string
65+
event_type: string
66+
aggregate_id?: string
67+
aggregate_type?: string
68+
tenant_id: string
69+
correlation_id?: string
70+
causation_id?: string
71+
timestamp: string
72+
payload: Record<string, unknown>
73+
}
74+
error_code?: string
75+
error_message?: string
76+
system_message?: string
77+
}
78+
79+
const MAX_RECONNECT_ATTEMPTS = 5
80+
const BASE_RECONNECT_DELAY_MS = 1000
81+
5982
/**
60-
* React hook for consuming real-time domain events.
61-
* This is a stub implementation that provides the complete interface for Phase 3 component development.
62-
* The actual WebSocket connection will be implemented in Phase 4 (PRD-025).
83+
* React hook for consuming real-time domain events via WebSocket.
84+
*
85+
* Connects to the gateway's /ws/events endpoint and delivers events to
86+
* the caller through the onEvent callback and React Query cache invalidation.
6387
*
6488
* Enforces tenant isolation - events from other tenants are silently ignored.
6589
*
@@ -72,44 +96,20 @@ export const EVENT_QUERY_MAP: Record<string, (event: DomainEvent) => unknown[][]
7296
* onEvent: (event) => console.log('Event received:', event),
7397
* autoInvalidate: true
7498
* })
75-
*
76-
* @phase 3 (Stub for Phase 4 WebSocket integration)
7799
*/
78100
export function useEventStream(options: UseEventStreamOptions = {}) {
79101
const { autoInvalidate = true, onEvent, eventTypes } = options
80102
const queryClient = useQueryClient()
81103
const tenantSlug = useTenantSlug()
82104

83-
// Phase 4: setConnected will be called from WebSocket onopen/onclose handlers
84-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
85105
const [connected, setConnected] = useState(false)
86106
const [lastEvent, setLastEvent] = useState<DomainEvent | null>(null)
87107

88-
// Phase 4: WebSocket connection setup
89-
// This useEffect will be implemented in Phase 4 (PRD-025) to connect to:
90-
// const ws = new WebSocket(`wss://${tenantSlug}.api.meridian.io/events`)
91-
// The connection will:
92-
// - Set connected = true on open
93-
// - Call handleEvent on message with decoded event data
94-
// - Set error and connected = false on close/error
95-
// - Clean up WebSocket on unmount
96-
useEffect(() => {
97-
// STUB: Phase 4 implementation
98-
// For now, this hook provides the interface without connection logic.
99-
// Phase 4 will establish a WebSocket connection here, call _handleEvent
100-
// on messages, and set connected = true on open.
101-
102-
return () => {
103-
// Phase 4: Cleanup WebSocket connection
104-
}
105-
}, [tenantSlug])
106-
107108
/**
108109
* Internal handler for processing received events.
109110
* Validates tenant isolation, applies event type filtering, fires callbacks, and triggers cache invalidation.
110-
* Prefixed with _ as it's unused until Phase 4 WebSocket integration.
111111
*/
112-
const _handleEvent = useCallback(
112+
const handleEvent = useCallback(
113113
(event: DomainEvent) => {
114114
// Enforce tenant isolation - ignore events from other tenants
115115
if (event.tenantSlug !== tenantSlug) {
@@ -138,11 +138,91 @@ export function useEventStream(options: UseEventStreamOptions = {}) {
138138
[eventTypes, onEvent, autoInvalidate, queryClient, tenantSlug]
139139
)
140140

141-
// Ensure _handleEvent is referenced for Phase 4
142-
void _handleEvent
141+
// Stable ref for handleEvent so the WebSocket effect does not re-run when
142+
// the callback identity changes.
143+
const handleEventRef = useRef(handleEvent)
144+
handleEventRef.current = handleEvent
145+
146+
useEffect(() => {
147+
if (!tenantSlug) return
148+
149+
let ws: WebSocket | null = null
150+
let reconnectTimeout: ReturnType<typeof setTimeout> | null = null
151+
let reconnectAttempts = 0
152+
let intentionalClose = false
153+
154+
const connect = () => {
155+
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'
156+
const wsUrl = `${wsProtocol}//${window.location.host}/ws/events`
157+
158+
ws = new WebSocket(wsUrl)
159+
160+
ws.onopen = () => {
161+
setConnected(true)
162+
reconnectAttempts = 0
163+
164+
// Subscribe to all channels; server-side auth controls access.
165+
// Client-side eventTypes filtering is applied in handleEvent.
166+
const subscribeMsg = {
167+
type: 'subscribe',
168+
id: crypto.randomUUID(),
169+
channels: ['*'],
170+
}
171+
ws?.send(JSON.stringify(subscribeMsg))
172+
}
173+
174+
ws.onmessage = (event) => {
175+
try {
176+
const msg: ServerMessage = JSON.parse(event.data)
177+
if (msg.type === 'event' && msg.event) {
178+
const domainEvent: DomainEvent = {
179+
eventType: msg.event.event_type,
180+
tenantSlug: msg.event.tenant_id,
181+
payload: msg.event.payload ?? {},
182+
timestamp: new Date(msg.event.timestamp),
183+
}
184+
handleEventRef.current(domainEvent)
185+
}
186+
} catch {
187+
// Malformed message - silently ignore
188+
}
189+
}
190+
191+
ws.onclose = () => {
192+
setConnected(false)
193+
194+
// Reconnect with exponential backoff + jitter unless intentionally closed
195+
if (!intentionalClose && reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {
196+
const baseDelay = BASE_RECONNECT_DELAY_MS * Math.pow(2, reconnectAttempts)
197+
const delay = baseDelay + Math.random() * baseDelay
198+
reconnectTimeout = setTimeout(() => {
199+
reconnectAttempts++
200+
connect()
201+
}, delay)
202+
}
203+
}
204+
205+
ws.onerror = () => {
206+
// onclose will fire after onerror; reconnect logic lives there.
207+
ws?.close()
208+
}
209+
}
210+
211+
connect()
212+
213+
return () => {
214+
intentionalClose = true
215+
if (reconnectTimeout) clearTimeout(reconnectTimeout)
216+
if (ws) {
217+
ws.onclose = null // Prevent reconnect on intentional close
218+
ws.close()
219+
}
220+
setConnected(false)
221+
}
222+
}, [tenantSlug])
143223

144224
return {
145-
/** Whether the WebSocket is currently connected (Phase 4) */
225+
/** Whether the WebSocket is currently connected */
146226
connected,
147227
/** The last event received, or null if no events yet */
148228
lastEvent,

frontend/vite.config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export default defineConfig({
1616
},
1717
server: {
1818
proxy: {
19+
'/ws/events': { target: 'http://localhost:8090', ws: true },
1920
'/meridian.': { target: 'http://localhost:8090', changeOrigin: true },
2021
'/v1': { target: 'http://localhost:8090', changeOrigin: true },
2122
},
@@ -24,6 +25,7 @@ export default defineConfig({
2425
port: 5173,
2526
strictPort: true,
2627
proxy: {
28+
'/ws/events': { target: 'http://localhost:8090', ws: true },
2729
'/meridian.': { target: 'http://localhost:8090', changeOrigin: true },
2830
'/v1': { target: 'http://localhost:8090', changeOrigin: true },
2931
},

0 commit comments

Comments
 (0)