Skip to content

Commit 4558def

Browse files
authored
Merge pull request #2731 from headlamp-k8s/fix-websocket-race-condition
backend: prevent panic in WebSocket multiplexer
2 parents 447bdbd + 9c0d977 commit 4558def

File tree

5 files changed

+88
-14
lines changed

5 files changed

+88
-14
lines changed

backend/cmd/multiplexer.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,14 @@ func (m *Multiplexer) sendIfNewResourceVersion(
582582

583583
// sendCompleteMessage sends a COMPLETE message to the client.
584584
func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocket.Conn) error {
585+
conn.mu.RLock()
586+
if conn.closed {
587+
conn.mu.RUnlock()
588+
return nil // Connection is already closed, no need to send message
589+
}
590+
591+
conn.mu.RUnlock()
592+
585593
completeMsg := Message{
586594
ClusterID: conn.ClusterID,
587595
Path: conn.Path,
@@ -593,7 +601,14 @@ func (m *Multiplexer) sendCompleteMessage(conn *Connection, clientConn *websocke
593601
conn.writeMu.Lock()
594602
defer conn.writeMu.Unlock()
595603

596-
return clientConn.WriteJSON(completeMsg)
604+
err := clientConn.WriteJSON(completeMsg)
605+
if err != nil {
606+
logger.Log(logger.LevelInfo, nil, err, "connection closed while writing complete message")
607+
608+
return nil // Just return nil for any error - connection is dead anyway
609+
}
610+
611+
return nil
597612
}
598613

599614
// sendDataMessage sends the actual data message to the client.

backend/cmd/multiplexer_test.go

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1001,7 +1001,67 @@ func TestSendCompleteMessage_ClosedConnection(t *testing.T) {
10011001
// Test with closed connection
10021002
clientConn.Close()
10031003
err = m.sendCompleteMessage(conn, clientConn)
1004-
assert.Error(t, err)
1004+
assert.NoError(t, err)
1005+
}
1006+
1007+
func TestSendCompleteMessage_ErrorConditions(t *testing.T) {
1008+
tests := []struct {
1009+
name string
1010+
setupConn func(*Connection, *websocket.Conn)
1011+
expectedError bool
1012+
}{
1013+
{
1014+
name: "connection already marked as closed",
1015+
setupConn: func(conn *Connection, _ *websocket.Conn) {
1016+
conn.closed = true
1017+
},
1018+
expectedError: false,
1019+
},
1020+
{
1021+
name: "normal closure",
1022+
setupConn: func(_ *Connection, clientConn *websocket.Conn) {
1023+
//nolint:errcheck
1024+
clientConn.WriteMessage(websocket.CloseMessage,
1025+
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
1026+
clientConn.Close()
1027+
},
1028+
expectedError: false,
1029+
},
1030+
{
1031+
name: "unexpected close error",
1032+
setupConn: func(_ *Connection, clientConn *websocket.Conn) {
1033+
//nolint:errcheck
1034+
clientConn.WriteMessage(websocket.CloseMessage,
1035+
websocket.FormatCloseMessage(websocket.CloseProtocolError, ""))
1036+
clientConn.Close()
1037+
},
1038+
expectedError: false, // All errors return nil now
1039+
},
1040+
}
1041+
1042+
for _, tt := range tests {
1043+
t.Run(tt.name, func(t *testing.T) {
1044+
m := NewMultiplexer(kubeconfig.NewContextStore())
1045+
clientConn, clientServer := createTestWebSocketConnection()
1046+
defer clientServer.Close()
1047+
1048+
conn := &Connection{
1049+
ClusterID: "test-cluster",
1050+
Path: "/api/v1/pods",
1051+
UserID: "test-user",
1052+
Query: "watch=true",
1053+
}
1054+
1055+
tt.setupConn(conn, clientConn)
1056+
err := m.sendCompleteMessage(conn, clientConn)
1057+
1058+
if tt.expectedError {
1059+
assert.Error(t, err)
1060+
} else {
1061+
assert.NoError(t, err)
1062+
}
1063+
})
1064+
}
10051065
}
10061066

10071067
func createMockKubeAPIServer() *httptest.Server {

frontend/src/lib/k8s/api/v2/KubeList.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,15 @@ export const KubeList = {
3131
update: KubeListUpdateEvent<ObjectInterface>,
3232
itemClass: ObjectClass
3333
): KubeList<KubeObject<ObjectInterface>> {
34+
// Skip if the update's resource version is older than or equal to what we have
35+
if (
36+
list.metadata.resourceVersion &&
37+
update.object.metadata.resourceVersion &&
38+
parseInt(update.object.metadata.resourceVersion) <= parseInt(list.metadata.resourceVersion)
39+
) {
40+
return list;
41+
}
42+
3443
const newItems = [...list.items];
3544
const index = newItems.findIndex(item => item.metadata.uid === update.object.metadata.uid);
3645

frontend/src/lib/k8s/api/v2/useKubeObjectList.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -175,12 +175,8 @@ function useWatchKubeObjectListsMultiplexed<K extends KubeObject>({
175175
return lists.map(list => {
176176
const key = `${list.cluster}:${list.namespace || ''}`;
177177

178-
// Update resource version if newer one is available
179-
const currentVersion = latestResourceVersions.current[key];
180-
const newVersion = list.resourceVersion;
181-
if (!currentVersion || parseInt(newVersion) > parseInt(currentVersion)) {
182-
latestResourceVersions.current[key] = newVersion;
183-
}
178+
// Always use the latest resource version from the server
179+
latestResourceVersions.current[key] = list.resourceVersion;
184180

185181
// Construct WebSocket URL with current parameters
186182
return {

frontend/src/lib/k8s/api/v2/webSocket.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,6 @@ export const WebSocketManager = {
318318
// Handle COMPLETE messages
319319
if (data.type === 'COMPLETE') {
320320
this.completedPaths.add(key);
321-
return;
322-
}
323-
324-
// Skip if path is already completed
325-
if (this.completedPaths.has(key)) {
326-
return;
327321
}
328322

329323
// Parse and validate update data

0 commit comments

Comments
 (0)