Skip to content

Commit 1b28188

Browse files
Feature: Added HasWebsocketEvents field to the Orchestrator Host API, providing clients with a reliable source of truth for real-time event capabilities.
Improvement: Optimized database performance by implementing lightweight status updates for WebSocket events, avoiding full record rewrites. Fix: Enhanced connection reliability by tying the "Connected" status strictly to verified pong responses rather than just TCP connection establishment. Fix: Resolved delay in reconnection events by triggering an immediate ping upon connection, ensuring instant status updates. Improvement: Refactored connection monitoring to automatically detect and handle stale WebSocket connections without redundant network traffic.
1 parent ac7e3b2 commit 1b28188

File tree

8 files changed

+129
-41
lines changed

8 files changed

+129
-41
lines changed

src/data/models/orchestrator_host.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ type OrchestratorHost struct {
3939
UpdatedAt string `json:"updated_at,omitempty"`
4040
RequiredClaims []string `json:"required_claims,omitempty"`
4141
RequiredRoles []string `json:"required_roles,omitempty"`
42+
HasWebsocketEvents bool `json:"has_websocket_events,omitempty"`
4243
}
4344

4445
func (o OrchestratorHost) GetHost() string {
@@ -223,6 +224,10 @@ func (o *OrchestratorHost) Diff(source OrchestratorHost) bool {
223224
return true
224225
}
225226

227+
if o.HasWebsocketEvents != source.HasWebsocketEvents {
228+
return true
229+
}
230+
226231
if o.ReverseProxy != nil && source.ReverseProxy == nil {
227232
return true
228233
}

src/data/orchestrator.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,7 @@ func (j *JsonDatabase) UpdateOrchestratorHost(ctx basecontext.ApiContext, host *
238238
j.data.OrchestratorHosts[i].ParallelsDesktopLicensed = host.ParallelsDesktopLicensed
239239
// Reverse Proxy Hosts
240240
j.data.OrchestratorHosts[i].IsReverseProxyEnabled = host.IsReverseProxyEnabled
241+
j.data.OrchestratorHosts[i].HasWebsocketEvents = host.HasWebsocketEvents
241242
j.data.OrchestratorHosts[i].ReverseProxy = host.ReverseProxy
242243
j.data.OrchestratorHosts[i].ReverseProxyHosts = host.ReverseProxyHosts
243244

@@ -353,6 +354,33 @@ func (j *JsonDatabase) UpdateOrchestratorHostTimestamp(ctx basecontext.ApiContex
353354
return ErrOrchestratorHostNotFound
354355
}
355356

357+
// UpdateOrchestratorHostWebsocketStatus updates only the HasWebsocketEvents field
358+
func (j *JsonDatabase) UpdateOrchestratorHostWebsocketStatus(ctx basecontext.ApiContext, hostID string, hasWebsocketEvents bool) (bool, error) {
359+
if !j.IsConnected() {
360+
return false, ErrDatabaseNotConnected
361+
}
362+
363+
if hostID == "" {
364+
return false, ErrOrchestratorHostEmptyIdOrHost
365+
}
366+
367+
j.dataMutex.Lock()
368+
defer j.dataMutex.Unlock()
369+
370+
for i, dbHost := range j.data.OrchestratorHosts {
371+
if strings.EqualFold(dbHost.ID, hostID) {
372+
if j.data.OrchestratorHosts[i].HasWebsocketEvents == hasWebsocketEvents {
373+
return false, nil
374+
}
375+
j.data.OrchestratorHosts[i].HasWebsocketEvents = hasWebsocketEvents
376+
ctx.LogDebugf("[Database] Host %s websocket status updated to %v", dbHost.Host, hasWebsocketEvents)
377+
return true, nil
378+
}
379+
}
380+
381+
return false, ErrOrchestratorHostNotFound
382+
}
383+
356384
func (j *JsonDatabase) GetOrchestratorAvailableResources(ctx basecontext.ApiContext) map[string]models.HostResourceItem {
357385
j.dataMutex.RLock()
358386
defer j.dataMutex.RUnlock()

src/mappers/orchestrator.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func DtoOrchestratorHostToApiResponse(dto data_models.OrchestratorHost) models.O
2626
RequiredClaims: dto.RequiredClaims,
2727
RequiredRoles: dto.RequiredRoles,
2828
State: dto.State,
29+
HasWebsocketEvents: dto.HasWebsocketEvents,
2930
}
3031

3132
if dto.Resources != nil {

src/models/orchestrator_host.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ type OrchestratorHostResponse struct {
121121
DetailedResources *HostResources `json:"detailed_resources,omitempty"`
122122
RequiredClaims []string `json:"required_claims,omitempty"`
123123
RequiredRoles []string `json:"required_roles,omitempty"`
124+
HasWebsocketEvents bool `json:"has_websocket_events,omitempty"`
124125
}
125126

126127
type OrchestratorHostUpdateRequest struct {

src/orchestrator/handlers/host_health_handler.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,29 @@ func (h *HostHealthHandler) handlePong(ctx basecontext.ApiContext, hostID string
6767
return
6868
}
6969

70+
// If the host was not marked as having WebSocket events, update it now that we received a pong
71+
if !host.HasWebsocketEvents {
72+
updated, err := dbService.UpdateOrchestratorHostWebsocketStatus(ctx, hostID, true)
73+
if err != nil {
74+
ctx.LogErrorf("[HostHealthHandler] Error updating host %s websocket status: %v", hostID, err)
75+
} else if updated {
76+
// Broadcast WebSocket connection event now that we have confirmed capability via pong
77+
if emitter := serviceprovider.GetEventEmitter(); emitter != nil && emitter.IsRunning() {
78+
msg := models.NewEventMessage(constants.EventTypeOrchestrator, "HOST_WEBSOCKET_CONNECTED", models.HostHealthUpdate{
79+
HostID: host.ID,
80+
State: "websocket_connected",
81+
})
82+
go func() {
83+
if err := emitter.Broadcast(msg); err != nil {
84+
ctx.LogErrorf("[HostHealthHandler] Failed to broadcast event %s: %v", "HOST_WEBSOCKET_CONNECTED", err)
85+
} else {
86+
ctx.LogInfof("[HostHealthHandler] Broadcasted HOST_WEBSOCKET_CONNECTED event for host %s", hostID)
87+
}
88+
}()
89+
}
90+
}
91+
}
92+
7093
host.UpdatedAt = helpers.GetUtcCurrentDateTime()
7194
stateChanged := false
7295

src/orchestrator/host_websocket_manager.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/Parallels/prl-devops-service/basecontext"
99
"github.com/Parallels/prl-devops-service/constants"
1010
"github.com/Parallels/prl-devops-service/data/models"
11+
event "github.com/Parallels/prl-devops-service/models"
1112
"github.com/Parallels/prl-devops-service/orchestrator/interfaces"
1213
"github.com/Parallels/prl-devops-service/serviceprovider"
1314
)
@@ -98,6 +99,28 @@ func (m *HostWebSocketManager) DisconnectHost(hostID string) {
9899
client.Close()
99100
delete(m.clients, hostID)
100101
m.ctx.LogInfof("[HostWebSocketManager] Disconnected host %s", hostID)
102+
103+
// Update DB status
104+
dbService, err := serviceprovider.GetDatabaseService(m.ctx)
105+
if err == nil {
106+
updated, _ := dbService.UpdateOrchestratorHostWebsocketStatus(m.ctx, hostID, false)
107+
if updated {
108+
if emitter := serviceprovider.GetEventEmitter(); emitter != nil && emitter.IsRunning() {
109+
msg := event.NewEventMessage(constants.EventTypeOrchestrator, "HOST_WEBSOCKET_DISCONNECTED",
110+
event.HostHealthUpdate{
111+
HostID: hostID,
112+
State: "websocket_disconnected",
113+
})
114+
go func() {
115+
if err := emitter.Broadcast(msg); err != nil {
116+
m.ctx.LogErrorf("[HostWebSocketManager] Failed to broadcast HOST_WEBSOCKET_DISCONNECTED event: %v", err)
117+
} else {
118+
m.ctx.LogInfof("[HostWebSocketManager] Broadcasted HOST_WEBSOCKET_DISCONNECTED event for host %s", hostID)
119+
}
120+
}()
121+
}
122+
}
123+
}
101124
}
102125
}
103126

src/orchestrator/main.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -171,16 +171,35 @@ func (s *OrchestratorService) processHost(host models.OrchestratorHost) {
171171
stalenessThreshold := s.refreshInterval * 3
172172

173173
if err == nil && time.Since(lastUpdated) < stalenessThreshold {
174-
s.ctx.LogDebugf("[Orchestrator] Host %s is connected and fresh (last updated: %s), sending ping", host.Host, host.UpdatedAt)
175-
if err := manager.SendPing(host.ID); err != nil {
176-
s.ctx.LogWarnf("[Orchestrator] Failed to send ping to host %s: %v, falling back to HTTP health check", host.Host, err)
177-
websocketPingFailed = true
178-
} else {
179-
// Ping successful, skip HTTP health check
180-
return
174+
s.ctx.LogDebugf("[Orchestrator] Host %s is connected and fresh (last updated: %s). Skipping HTTP health check.", host.Host, host.UpdatedAt)
175+
// Ping successful (implied by freshness), skip HTTP health check
176+
if !host.HasWebsocketEvents {
177+
host.HasWebsocketEvents = true
178+
_, _ = s.db.UpdateOrchestratorHostWebsocketStatus(s.ctx, host.ID, true)
181179
}
180+
return
182181
} else {
183182
s.ctx.LogWarnf("[Orchestrator] Host %s is connected but stale (last updated: %s). Falling back to HTTP health check.", host.Host, host.UpdatedAt)
183+
websocketPingFailed = true
184+
if host.HasWebsocketEvents {
185+
host.HasWebsocketEvents = false
186+
updated, _ := s.db.UpdateOrchestratorHostWebsocketStatus(s.ctx, host.ID, false)
187+
if updated {
188+
if emitter := serviceprovider.GetEventEmitter(); emitter != nil && emitter.IsRunning() {
189+
msg := apimodels.NewEventMessage(constants.EventTypeOrchestrator, "HOST_WEBSOCKET_DISCONNECTED", apimodels.HostHealthUpdate{
190+
HostID: host.ID,
191+
State: "websocket_disconnected",
192+
})
193+
go func() {
194+
if err := emitter.Broadcast(msg); err != nil {
195+
s.ctx.LogErrorf("[Orchestrator] Failed to broadcast event %s: %v", "HOST_WEBSOCKET_DISCONNECTED", err)
196+
} else {
197+
s.ctx.LogInfof("[Orchestrator] Broadcasted HOST_WEBSOCKET_DISCONNECTED event for host %s (detected staleness in processHost)", host.Host)
198+
}
199+
}()
200+
}
201+
}
202+
}
184203
}
185204
}
186205

src/orchestrator/websocket_client.go

Lines changed: 22 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,6 @@ func (c *HostWebSocketClient) establishConnection(events []constants.EventType)
142142
c.setConnected(true)
143143
c.ctx.LogInfof("[HostWebSocketClient] Connected to host %s", c.hostName)
144144

145-
c.broadcastConnectionEvent()
146-
147145
return nil
148146
}
149147

@@ -215,6 +213,11 @@ func (c *HostWebSocketClient) startPingRoutine() {
215213
c.mu.Unlock()
216214

217215
go func() {
216+
// Send initial ping immediately to confirm connection status
217+
if err := c.SendPing(); err != nil {
218+
c.ctx.LogDebugf("[HostWebSocketClient] Failed to send initial ping to host %s: %v", c.hostName, err)
219+
}
220+
218221
for {
219222
select {
220223
case <-c.stopChan:
@@ -268,44 +271,29 @@ func (c *HostWebSocketClient) notifyDisconnection() {
268271

269272
c.setConnected(false)
270273

271-
if emitter := serviceprovider.GetEventEmitter(); emitter != nil && emitter.IsRunning() {
272-
msg := api_models.NewEventMessage(constants.EventTypeOrchestrator, "HOST_WEBSOCKET_DISCONNECTED", api_models.HostHealthUpdate{
273-
HostID: c.hostID,
274-
State: "websocket_disconnected",
275-
})
276-
go func() {
277-
if err := emitter.Broadcast(msg); err != nil {
278-
c.ctx.LogErrorf("[HostWebSocketClient] Failed to broadcast HOST_WEBSOCKET_DISCONNECTED event: %v", err)
279-
} else {
280-
c.ctx.LogInfof("[HostWebSocketClient] Broadcasted HOST_WEBSOCKET_DISCONNECTED event for host %s", c.hostName)
274+
// Update DB status
275+
if dbService, err := serviceprovider.GetDatabaseService(c.ctx); err == nil {
276+
updated, _ := dbService.UpdateOrchestratorHostWebsocketStatus(c.ctx, c.hostID, false)
277+
if updated {
278+
if emitter := serviceprovider.GetEventEmitter(); emitter != nil && emitter.IsRunning() {
279+
msg := api_models.NewEventMessage(constants.EventTypeOrchestrator, "HOST_WEBSOCKET_DISCONNECTED", api_models.HostHealthUpdate{
280+
HostID: c.hostID,
281+
State: "websocket_disconnected",
282+
})
283+
go func() {
284+
if err := emitter.Broadcast(msg); err != nil {
285+
c.ctx.LogErrorf("[HostWebSocketClient] Failed to broadcast HOST_WEBSOCKET_DISCONNECTED event: %v", err)
286+
} else {
287+
c.ctx.LogInfof("[HostWebSocketClient] Broadcasted HOST_WEBSOCKET_DISCONNECTED event for host %s", c.hostName)
288+
}
289+
}()
281290
}
282-
}()
291+
}
283292
} else {
284293
c.ctx.LogWarnf("[HostWebSocketClient] EventEmitter not available to broadcast disconnection for host %s", c.hostName)
285294
}
286295
}
287296

288-
func (c *HostWebSocketClient) broadcastConnectionEvent() {
289-
c.ctx.LogInfof("[HostWebSocketClient] Host %s WebSocket connection established", c.hostName)
290-
291-
// Broadcast WebSocket connection event
292-
if emitter := serviceprovider.GetEventEmitter(); emitter != nil && emitter.IsRunning() {
293-
msg := api_models.NewEventMessage(constants.EventTypeOrchestrator, "HOST_WEBSOCKET_CONNECTED", api_models.HostHealthUpdate{
294-
HostID: c.hostID,
295-
State: "websocket_connected",
296-
})
297-
go func() {
298-
if err := emitter.Broadcast(msg); err != nil {
299-
c.ctx.LogErrorf("[HostWebSocketClient] Failed to broadcast HOST_WEBSOCKET_CONNECTED event: %v", err)
300-
} else {
301-
c.ctx.LogInfof("[HostWebSocketClient] Broadcasted HOST_WEBSOCKET_CONNECTED event for host %s", c.hostName)
302-
}
303-
}()
304-
} else {
305-
c.ctx.LogWarnf("[HostWebSocketClient] EventEmitter not available to broadcast connection for host %s", c.hostName)
306-
}
307-
}
308-
309297
func (c *HostWebSocketClient) Probe() bool {
310298
// Use a short timeout for probing
311299
c.ctx.LogInfof("[HostWebSocketClient] Probing host %s for WebSocket support...", c.hostName)

0 commit comments

Comments
 (0)