Real-time SPA: Gleam+Lustre frontend + Go WebSocket server + NATS (Core, JetStream, KV).
Architecture:
- WebSocket: Real-time data sync only (sub, kv_sub, js_sub)
- HTTP REST API: Request-response operations (CRUD, commands)
- Data Flow: HTTP API → Backend → WebSocket → Frontend state update
- WebSocket: Real-time subscriptions with pattern-based access control (
*,>wildcards) - HTTP REST API: CRUD operations, commands, JWT auth (no frontend state updates)
{
"op": "sub" | "unsub" | "kv_sub" | "js_sub" | "cap_update" | "error",
"target": "subject-or-bucket-or-stream",
"inbox": "optional-correlation-id",
"data": { }
}Operations: sub, unsub, kv_sub, js_sub
Messages: sub_msg, kv_msg, js_msg, cap_update, error
Note: No cmd/reply - commands use HTTP REST API
- Articles:
GET/POST/PUT/DELETE /api/articles - Auth:
POST /api/auth/login,POST /api/auth/logout - Users:
GET/POST/PUT/DELETE /api/users - Commands:
POST /api/commands/{command}
Note: WebSocket for data sync only. HTTP for operations. Frontend state updated ONLY from WebSocket.
Message Examples:
- Subject:
{ "op": "sub_msg", "target": "chat.room.123", "data": { "payload": {} } } - KV:
{ "op": "kv_msg", "target": "todos", "data": { "key": "user.123.task.1", "op": "put", "value": {} } } - JetStream:
{ "op": "js_msg", "target": "chat.stream", "data": { "seq": 12345, "payload": {} } }
// Capabilities describe allowed operations per user/session
// Wildcards: * and > (subject-style patterns)
type Capabilities struct {
Subjects []string `json:"subjects"`
Buckets map[string][]string `json:"buckets"` // bucket pattern -> allowed key patterns
Commands []string `json:"commands"`
Streams map[string][]string `json:"streams"` // stream pattern -> allowed filter subject patterns
}- Pattern matching: use
nats.Matchsemantics for*and>. - Checks:
isAllowedSubject(subject string)isAllowedKV(bucket, keyPattern string)isAllowedStream(stream, filter string)isAllowedCommand(target string)
Implementation notes:
- Normalize and cache capability patterns at session start for faster checks.
- Keep a per-connection registry of active subs: subject, kv watches, js consumers; include the capability pattern that authorized it for quick revocation.
- Watch
auth.usersKV for the user’s key. - On update:
- Parse new capabilities.
- Diff against current; unsubscribe and close consumers not allowed anymore.
- Send
cap_updatewith the new effective capabilities.
- Enforce
isAllowedSubject(target)onsub. - Create a subscription; each incoming message is forwarded as
msgwithtargetas the subject. - Keep a
subIDmap if needed to supportunsubby identifier.
- Enforce
isAllowedKV(bucket, pattern)onkv_sub. - If
patternprovided usekv.WatchKeys(pattern), elsekv.WatchAll(). - Forward watch events as
msgwithtargetkv:<bucket>and include{ key, revision, op, value }.
- Enforce
isAllowedStream(stream, filter)onjs_sub. - Build a durable or ephemeral consumer:
- Optional
filtervianats.FilterSubject(filter). - Start position via
start_seqif provided; default to latest. batchcontrols server-side pull size (flow control).
- Optional
- Forward each message as
msgwithtargetjs:<stream>and include{ seq, subject, payload }. - Consider at-least-once delivery with explicit acks where necessary.
Architecture Decision: Commands will use HTTP REST API, not WebSocket
- WebSocket handles real-time subscriptions only
- HTTP API handles all CRUD operations and commands
- Frontend state updated through WebSocket subscriptions
- Outbound send queue
sendChwith bounded buffer. - Writer goroutine selects on
sendChwith a 250ms timeout.- If blocked: emit an
errormessage with a backpressure code and close the connection.
- If blocked: emit an
- Optionally expose
server_overloadedmetric and increment a counter.
- All NATS subs, KV watchers, and goroutines use a per-client
context.Context. - On disconnect, cancel the context and wait for goroutines to exit.
- Ensure consumers and watchers are drained and closed.
pub type KV(key, value) {
KV(
id: String,
state: KVState,
bucket: String,
filter: Option(String),
revision: Int,
data: Dict(key, value),
encoder_key: fn(key) -> Json,
encoder_value: fn(value) -> Json,
decoder_key: Decoder(key),
decoder_value: Decoder(value)
)
}Current Status: KV-focused WebSocket subscription and data management Features: Automatic state management, revision tracking, error handling Missing: Subject subscriptions, JetStream subscriptions, capabilities
- ✅
new_kv()- Create new KV subscription - ✅
ws_text_message()- Handle WebSocket messages - ✅
ws_open()- Handle WebSocket connection - ❌ Subject subscriptions - Not implemented
- ❌ JetStream subscriptions - Not implemented
- Frontend Messages:
ArticleCreate,ArticleUpdate,ArticleDelete, etc. - Implementation: These will use HTTP REST API, not WebSocket
- Real-time Updates: WebSocket subscriptions for data sync only
- Subject subscriptions (NATS Core)
- JetStream subscriptions with resume support
- Capability handling
- Command operations via HTTP REST API
{
"subjects": ["chat.room.*", "articles.>"],
"buckets": {
"todos": ["user.123.*"],
"articles": [">"]
},
"commands": [],
"streams": {
"chat.stream": ["chat.room.123", "chat.room.456"],
"audit.stream": [">"]
}
}Note: commands field exists but is unused. All operations use HTTP REST API.
sequenceDiagram
autonumber
participant C as Client (Gleam SPA)
participant W as WS Server (Go)
participant N as NATS Core
participant J as NATS JetStream
participant K as NATS KV
Note over C,W: Connect and authenticate
C->>W: WS open + auth token
W->>K: KV get auth.users/<uid>
K-->>W: Capabilities JSON
W-->>C: cap_update{ capabilities }
Note over C,W: Subject subscription
C->>W: { op: "sub", target: "chat.room.123" }
W->>N: SUB chat.room.123
N-->>W: msg(payload)
W-->>C: { op: "sub_msg", target: "chat.room.123", data: { payload } }
Note over C,W: KV subscription
C->>W: { op: "kv_sub", target: "todos", data: { pattern: "user.123.*" } }
W->>K: WatchKeys(todos, pattern)
K-->>W: KV event (put/delete)
W-->>C: { op: "kv_msg", target: "todos", data: { op: "put", key: "user.123.task1", value: "..." } }
Note over C,W: JetStream subscription with resume
C->>W: { op: "js_sub", target: "chat.stream", data: { start_seq: 12345, filter: "chat.room.123", batch: 100 } }
W->>J: Create/Bind consumer (filter, start_seq)
J-->>W: Messages (seq >= 12345)
W-->>C: { op: "js_msg", target: "chat.stream", data: { payload } }
Note over C,W: Command/Reply (HTTP REST API)
C->>W: { op: "cmd", target: "articles.create", inbox: "abc123", data: { ... } }
W-->>C: { op: "error", data: { "reason": "use HTTP POST /api/articles" } }
Note over C,W: Backpressure guard
W-->>C: error{ code: "backpressure" }
W-xC: Close connection
- WebSocket Only: All data model updates come through WebSocket subscriptions
- HTTP Acknowledgment: HTTP responses confirm operation success but don't modify frontend state
- Real-time Sync: Changes made via HTTP API automatically appear through WebSocket subscriptions
- Consistent State: Single source of truth from WebSocket ensures data consistency
- Primary: WebSocket for real-time updates
- Fallback: Long polling if WebSocket connection issues arise
- Graceful Degradation: App remains functional with slightly delayed updates during fallback
- Connect with valid capabilities.
- Subscribe to allowed subject → success.
- Subscribe to disallowed subject → denied.
- KV subscribe with allowed pattern → success.
- KV subscribe with disallowed pattern → denied.
- JS subscribe with allowed filter → success.
- JS subscribe with disallowed filter → denied.
- Auth KV update removes access → unsubscribed +
cap_updatesent. - Backpressure test: block
sendCh→ connection closed after 250ms.
- CRUD operations for articles
- Authentication endpoints
- User management endpoints
- Command endpoints
- JWT authorization and permissions
- Implement time-based resume for JetStream when
last_seq == 0(e.g., start fromtime_deltaseconds ago). - Add durable consumer support for long-lived JetStream subscriptions (naming per user/session).
- Add per-message ack support for at-least-once delivery in critical streams.
- Consider heartbeat
ping/pongat the protocol level for faster dead-connection detection. - Add protocol versioning in the envelope (e.g.,
v: 1) to allow non-breaking evolution.
- Implement full CRUD operations for articles
- Add authentication and user management endpoints
- Implement command endpoints for business operations
- Add comprehensive JWT authorization and permission system
- Consider implementing WebSocket command protocol in the future for real-time commands