Skip to content

Latest commit

Β 

History

History
493 lines (382 loc) Β· 21 KB

File metadata and controls

493 lines (382 loc) Β· 21 KB

WebSockets

Overview

Hoist-core provides a cluster-aware WebSocket system for real-time bidirectional communication between the server and connected browser clients. The primary use case is server-to-client push β€” sending data updates, notifications, and config change alerts without requiring the client to poll β€” but the system also supports client-to-server messaging via event subscriptions.

The system is built on Spring's native WebSocket support and managed by WebSocketService, a Grails service that wraps raw WebSocketSession objects in HoistWebSocketChannel instances. Each channel represents a single connected Hoist client tab and carries metadata about the authenticated user, app version, and connection health.

Why cluster-aware? In multi-instance deployments behind a load balancer, a client's WebSocket connection lands on one specific server instance, but the business logic that produces a push message may run on any instance (e.g. a primaryOnly timer). The push methods automatically route messages to the correct instance using Hazelcast distributed execution β€” callers do not need to know which instance holds a given channel.

Key design decisions:

  • Channels, not topics. The server addresses messages to specific channel keys or broadcasts to all channels. Topic-based subscription management is left to application code β€” the framework provides the transport.
  • Fire-and-forget delivery. Push messages to disconnected or unknown channels are silently dropped. This simplifies caller code and avoids error cascades from transient disconnections.
  • Bidirectional messaging. Clients can send messages to the server (e.g. heartbeats), which are dispatched as local Grails events (xhWebSocketMessageReceived) for application services to handle.
  • JSON wire format. All messages are serialized as {topic, data} JSON payloads using Hoist's JSONSerializer, ensuring consistent serialization behavior with the rest of the framework.

Source Files

File Location Role
WebSocketService grails-app/services/io/xh/hoist/websocket/ Primary service β€” push API, channel registry, cluster routing, heartbeat handling
HoistWebSocketChannel src/main/groovy/io/xh/hoist/websocket/ Managed channel wrapper β€” thread safety, user lookup, connection metadata
HoistWebSocketHandler src/main/groovy/io/xh/hoist/websocket/ Spring TextWebSocketHandler β€” relays connection events to WebSocketService
HoistWebSocketConfigurer src/main/groovy/io/xh/hoist/websocket/ Spring @EnableWebSocket configurer β€” registers handler at /xhWebSocket
WebSocketAdminController grails-app/controllers/io/xh/hoist/admin/cluster/ Admin endpoint β€” list channels and push test messages (cluster-routed)
ClientAdminController grails-app/controllers/io/xh/hoist/admin/ Legacy admin endpoint β€” list all clients and push messages
ApplicationConfig src/main/groovy/io/xh/hoist/configuration/ Default Grails config β€” sets hoist.enableWebSockets = true
HoistCoreGrailsPlugin src/main/groovy/io/xh/hoist/ Plugin descriptor β€” conditionally registers HoistWebSocketConfigurer bean

Key Classes

WebSocketService

grails-app/services/io/xh/hoist/websocket/WebSocketService.groovy

The central service for WebSocket communication. It maintains an in-memory registry of HoistWebSocketChannel instances for channels connected to the local instance, and provides cluster-aware methods to push messages to any channel in the cluster.

Push API

Method Description
pushToChannel(channelKey, topic, data) Push to a single channel, anywhere in the cluster
pushToChannels(channelKeys, topic, data) Push to multiple channels, routing each to the correct instance
pushToAllChannels(topic, data) Broadcast to every connected channel across all instances
pushToLocalChannels(topic, data) Broadcast to channels on this instance only

All push methods serialize the message once as a TextMessage containing {topic: String, data: Object} JSON, then deliver it. Messages to unknown or disconnected channels are silently dropped β€” these methods do not throw.

Cluster routing in pushToChannels: The channel key format is {authUsername}|{instanceName}|{uuid}, so the service extracts the instance name from the key, groups channels by instance, and dispatches in parallel. Local channels are pushed directly; remote channels are pushed via ClusterUtils.runOnInstance(), which executes the push on the target instance through Hazelcast's distributed execution framework.

void pushToChannels(Collection<String> channelKeys, String topic, Object data) {
    if (!channelKeys) return
    def msg = serialize(topic, data),
        byInstance = channelKeys.groupBy { instanceFromKey(it) }
    asyncEach(byInstance.entrySet()) { Entry e ->
        def instance = e.key as String,
            keys = e.value as List<String>
        instance == instanceName ?
            pushInternal(keys, msg) :
            runOnInstance(this.&pushInternal, instance, [keys, msg])
    }
}

Channel query API

Method Description
getAllChannels() Returns Collection<Map> β€” serialized metadata for all channels across the cluster
getLocalChannels() Returns Collection<HoistWebSocketChannel> β€” live channel objects on this instance
hasChannel(channelKey) Checks cluster-wide whether a channel is connected
hasLocalChannel(channelKey) Checks this instance only

Events

WebSocketService publishes three Grails events via EventPublisher.notify():

Constant Event Name Payload When
CHANNEL_OPENED_EVENT xhWebSocketOpened HoistWebSocketChannel New channel registered
CHANNEL_CLOSED_EVENT xhWebSocketClosed HoistWebSocketChannel Channel disconnected
MSG_RECEIVED_EVENT xhWebSocketMessageReceived Map [channel, topic, data] Client sends a non-heartbeat message

These are local Grails events (not Hazelcast topics), so they fire only on the instance where the channel is connected. Application services can subscribe to them using BaseService.subscribe():

class MyService extends BaseService {
    void init() {
        subscribe(WebSocketService.CHANNEL_OPENED_EVENT) { HoistWebSocketChannel channel ->
            logInfo("New connection from ${channel.apparentUsername}")
        }
    }
}

Heartbeat handling

When a client sends a message with topic xhHeartbeat, the service replies with {topic: 'xhHeartbeat', data: 'pong'}. This keeps the WebSocket connection alive through proxies and load balancers and lets the client verify connectivity. Non-heartbeat incoming messages fire MSG_RECEIVED_EVENT.

clearCaches

The clearCaches() override closes all local WebSocket sessions and clears the channel map, causing clients to reconnect.

HoistWebSocketChannel

src/main/groovy/io/xh/hoist/websocket/HoistWebSocketChannel.groovy

A managed wrapper around a raw WebSocketSession that adds:

  • Thread safety β€” Wraps the session in a ConcurrentWebSocketSessionDecorator with configurable send-time limit and buffer size limit (from xhWebSocketConfig).
  • User identity β€” Reads authUsername and apparentUsername strings from the session's HTTP session attributes (set during the handshake by HoistFilter/authentication). Accounts for admin impersonation by tracking both identities. Provides getAuthUser() and getApparentUser() accessors that look up the corresponding live HoistUser objects via userService.find(). The getUser() accessor is an alias for getApparentUser().
  • Client metadata β€” Extracts appVersion, appBuild, loadId, tabId, and clientAppCode from the WebSocket connection URI's query parameters.
  • Connection tracking β€” Records createdTime, sentMessageCount, receivedMessageCount, and their timestamps for display in the Admin Console.

Channel key format

Each channel is assigned a unique key on construction:

{authUsername}|{instanceName}|{8-char-uuid}

For example: jsmith|hoist-app-1|a3f8b2c1

This format is significant β€” WebSocketService parses the instance name from the key to route messages to the correct cluster member. The key is sent to the client upon successful registration (topic xhRegistrationSuccess, data {channelKey: ...}) and must be stored by the client for subsequent use.

JSON serialization

Implements JSONFormat via formatForJSON(), returning a Map of channel metadata. This is what WebSocketService.getAllChannels() serializes when aggregating channels across instances.

Implementation Classes

HoistWebSocketHandler

A thin Spring TextWebSocketHandler that relays connection lifecycle events (afterConnectionEstablished, handleTextMessage, afterConnectionClosed) to WebSocketService. One handler instance is created per connection via PerConnectionWebSocketHandler.

HoistWebSocketConfigurer

A Spring WebSocketConfigurer that registers the WebSocket endpoint at /xhWebSocket. It copies HTTP session attributes (including authentication context) into the WebSocket session via HttpSessionHandshakeInterceptor. This bean is conditionally registered only when hoist.enableWebSockets is true.

Configuration

Grails application config

Property Type Default Description
hoist.enableWebSockets Boolean true Master switch. When false, the HoistWebSocketConfigurer bean is not registered and no WebSocket endpoint is exposed. Set in application.groovy.

Soft configuration (AppConfig)

Config key Type Default Description
xhWebSocketConfig json {sendTimeLimitMs: 1000, bufferSizeLimitBytes: 1000000} Parameters for the ConcurrentWebSocketSessionDecorator wrapping each session. sendTimeLimitMs controls the maximum time (in milliseconds) to wait for a send to complete before timing out. bufferSizeLimitBytes controls the maximum buffer size for pending outgoing messages.

The xhWebSocketConfig is read by HoistWebSocketChannel at channel creation time. Changing this config affects only newly created channels β€” existing channels retain their original settings until they reconnect.

These values are tuned for typical Hoist usage. Increase bufferSizeLimitBytes if pushing large payloads (e.g. full grid datasets), and increase sendTimeLimitMs if clients are on slow connections. If a send exceeds either limit, the ConcurrentWebSocketSessionDecorator will close the session, and the client will need to reconnect.

Application Implementation

Enabling WebSocket support

WebSocket support is enabled by default. No application configuration is needed unless you want to disable it:

// In application.groovy β€” only needed to disable
hoist {
    enableWebSockets = false
}

Registering channels for app-specific updates

The framework handles the WebSocket lifecycle automatically β€” connection, registration, and heartbeats require no app code. What applications must implement is the "subscription" layer: which clients want which updates.

A typical pattern:

  1. Client connects β€” The framework assigns a channelKey and sends it to the client via the xhRegistrationSuccess topic.
  2. Client calls an app endpoint β€” e.g. POST /api/subscribeToPositions with its channelKey and any filtering parameters.
  3. App service stores the subscription β€” Maps channel keys to their requested data streams.
  4. Server pushes updates β€” When data changes, the app service calls pushToChannel() or pushToChannels() with the stored keys.
  5. Client disconnects β€” The app service listens for CHANNEL_CLOSED_EVENT to clean up subscriptions, or uses hasChannel() to prune stale keys.
class PositionPushService extends BaseService {

    def webSocketService

    // Map of channelKey -> subscription details
    private Map<String, Map> subscriptions = new ConcurrentHashMap<>()

    void init() {
        // Clean up when clients disconnect
        subscribe(WebSocketService.CHANNEL_CLOSED_EVENT) { HoistWebSocketChannel channel ->
            subscriptions.remove(channel.key)
        }

        // Timer to push position updates periodically
        createTimer(
            name: 'pushPositionUpdates',
            interval: 5000,
            runFn: this.&pushPositionUpdates
        )
    }

    /** Called by a client endpoint to register interest in position updates. */
    void registerSubscription(String channelKey, Map filters) {
        subscriptions[channelKey] = filters
    }

    private void pushPositionUpdates() {
        subscriptions.each { channelKey, filters ->
            if (!webSocketService.hasChannel(channelKey)) {
                // Prune stale subscription
                subscriptions.remove(channelKey)
                return
            }
            def data = loadPositions(filters)
            webSocketService.pushToChannel(channelKey, 'positionUpdate', data)
        }
    }

    private List loadPositions(Map filters) {
        // ... app-specific data loading
    }
}

Common Patterns

Broadcasting to all connected clients

Use pushToAllChannels() when a single instance triggers a broadcast (e.g. a primaryOnly timer, an admin action, or a webhook callback):

// βœ… Broadcast from a primaryOnly timer or single-instance trigger
void onConfigChanged() {
    webSocketService.pushToAllChannels('configRefresh', [timestamp: System.currentTimeMillis()])
}

Broadcasting from a cluster-wide listener

Use pushToLocalChannels() when the triggering event already fires on every instance (e.g. a replicated cache onChange listener or a cluster-wide topic subscription). This avoids sending duplicate messages:

// βœ… In a listener that fires on every instance (e.g. replicated cache onChange)
void onCacheChanged() {
    webSocketService.pushToLocalChannels('dataRefresh', refreshData)
}

Pushing to specific users by role

Query allChannels to find channels matching a criteria, then push to those keys:

void notifyAdmins(String message) {
    def adminKeys = webSocketService.allChannels
        .findAll {
            def user = userService.find(it.apparentUser.username as String)
            user?.hasRole('HOIST_ADMIN')
        }
        .collect { it.key as String }

    webSocketService.pushToChannels(adminKeys, 'adminAlert', [message: message])
}

Responding to incoming client messages

While uncommon, servers can receive messages from clients. Subscribe to MSG_RECEIVED_EVENT:

void init() {
    subscribe(WebSocketService.MSG_RECEIVED_EVENT) { Map event ->
        def channel = event.channel as HoistWebSocketChannel,
            topic = event.topic as String,
            data = event.data

        if (topic == 'clientHeartbeat') {
            logDebug("Client ${channel.apparentUsername} sent heartbeat")
        }
    }
}

Checking channel liveness before expensive operations

void pushExpensiveReport(String channelKey) {
    // Check first to avoid computing data for a disconnected client
    if (!webSocketService.hasChannel(channelKey)) {
        logDebug("Channel $channelKey no longer connected, skipping report generation")
        return
    }
    def report = generateExpensiveReport()
    webSocketService.pushToChannel(channelKey, 'report', report)
}

Client Integration

On the client side, hoist-react manages the WebSocket connection through its WebSocketService. The integration flow:

  1. Connection β€” The hoist-react client opens a WebSocket to ws[s]://{host}/xhWebSocket with query parameters including appVersion, appBuild, loadId, tabId, and clientAppCode.
  2. Registration β€” The server sends an xhRegistrationSuccess message containing the channelKey. The client stores this key.
  3. Heartbeat β€” The client periodically sends {topic: 'xhHeartbeat'} messages. The server replies with {topic: 'xhHeartbeat', data: 'pong'}.
  4. Push messages β€” Incoming messages are dispatched by topic to registered handlers in the client app. Application code subscribes to specific topics to receive updates.
  5. Reconnection β€” If the connection drops, hoist-react automatically reconnects and re-registers, obtaining a new channelKey. Application services must account for key changes by re-registering subscriptions after reconnection.

The EnvironmentService reports webSocketsEnabled in its environment payload, allowing the client to conditionally enable or disable its WebSocket support based on server configuration.

The Hoist Admin Console provides a "WebSockets" tab (backed by WebSocketAdminController and ClientAdminController) that displays all connected channels across the cluster with their metadata, and supports pushing test messages to individual channels.

Common Pitfalls

Using pushToAllChannels from a cluster-wide listener

If the triggering event fires on every instance (e.g. a replicated cache change listener, or a Hazelcast topic subscription), using pushToAllChannels() will cause each instance to broadcast to all channels in the cluster β€” resulting in every client receiving the message N times (where N is the number of instances).

// ❌ Wrong: replicated cache onChange fires on every instance
void onReplicatedCacheChange() {
    webSocketService.pushToAllChannels('refresh', data)   // N instances x all channels = duplicates!
}

// βœ… Correct: use pushToLocalChannels when the trigger fires on every instance
void onReplicatedCacheChange() {
    webSocketService.pushToLocalChannels('refresh', data)  // Each instance pushes to its own clients only
}

Storing channel keys without cleanup

Channel keys become invalid when clients disconnect or reconnect. If your service stores channel keys (for targeted push), you must clean them up. Failing to do so leads to a growing collection of stale keys, wasted push attempts, and potential memory leaks.

// ❌ Wrong: storing keys without cleanup
void registerSubscription(String channelKey) {
    subscriptions.add(channelKey)
    // Keys never removed β€” list grows forever
}

// βœ… Correct: subscribe to close events and/or use hasChannel() to prune
void init() {
    subscribe(WebSocketService.CHANNEL_CLOSED_EVENT) { HoistWebSocketChannel channel ->
        subscriptions.remove(channel.key)
    }
}

Accessing HoistWebSocketChannel properties from getAllChannels()

getAllChannels() returns Collection<Map>, not Collection<HoistWebSocketChannel>. The maps contain serialized metadata (with authUser and apparentUser as Maps serialized from HoistUser.formatForJSON(), not live HoistUser objects). Attempting to call channel methods on these maps will fail.

// ❌ Wrong: treating Map results as HoistWebSocketChannel objects
webSocketService.allChannels.each { channel ->
    channel.sendMessage(msg)           // Error! This is a Map, not a channel
    channel.apparentUser.hasRole('MY_ROLE')  // Error! 'apparentUser' is a serialized Map, not a HoistUser
}

// βœ… Correct: use Map keys, look up users when needed
webSocketService.allChannels.each { channelMap ->
    def user = userService.find(channelMap.apparentUser.username as String)
    if (user?.hasRole('MY_ROLE')) {
        webSocketService.pushToChannel(channelMap.key as String, 'alert', data)
    }
}

Sending large payloads without adjusting buffer limits

The ConcurrentWebSocketSessionDecorator enforces a buffer size limit (default 1MB via xhWebSocketConfig.bufferSizeLimitBytes). If a message exceeds this limit β€” or if multiple messages queue up beyond it β€” the session is forcibly closed.

// ❌ Risky: pushing large datasets without considering buffer limits
webSocketService.pushToChannel(key, 'fullDataset', hugeListOfRecords)

// βœ… Better: push incremental updates or summaries, or increase buffer config
webSocketService.pushToChannel(key, 'dataUpdate', [
    updatedIds: changedRecordIds,
    timestamp: System.currentTimeMillis()
])

If large payloads are unavoidable, increase bufferSizeLimitBytes in the xhWebSocketConfig AppConfig. Also consider increasing sendTimeLimitMs for clients on slower connections.

Assuming channel keys survive reconnection

When a client reconnects (e.g. after a network interruption or server restart), it receives a new channel key. Any keys stored by the server from the previous session are now invalid. Design your subscription model so clients re-register after reconnection.

// ❌ Wrong: assuming a channelKey is permanent
void onClientStartup(String channelKey) {
    permanentSubscriptions[userId] = channelKey  // Will go stale on reconnect
}

// βœ… Correct: design for re-registration
void onClientSubscribe(String channelKey, String userId) {
    // Replace any previous key for this user
    subscriptionsByUser[userId] = channelKey
}