Skip to content

Commit a92cb36

Browse files
authored
Merge pull request #495 from hebo6/fix/logs-ws-deadlock
Fix deadlock that wedges the logs WebSocket for an entire process
2 parents 5e62578 + 0be8ab4 commit a92cb36

4 files changed

Lines changed: 68 additions & 22 deletions

File tree

src/api/pc_api.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package api
33
import (
44
"net/http"
55
"strconv"
6-
"sync"
76

87
"github.com/f1bonacc1/process-compose/src/types"
98

@@ -28,7 +27,6 @@ import (
2827

2928
type PcApi struct {
3029
project app.IProject
31-
wsMtx sync.Mutex
3230
}
3331

3432
func NewPcApi(project app.IProject) *PcApi {

src/api/state_ws_api.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"sync"
99
"sync/atomic"
10+
"time"
1011

1112
"github.com/f1bonacc1/process-compose/src/app"
1213
"github.com/f1bonacc1/process-compose/src/pclog"
@@ -136,9 +137,8 @@ func (api *PcApi) handleStateStream(ws *websocket.Conn, observer *stateWsObserve
136137
var writeMu sync.Mutex
137138
writeJSON := func(ev types.ProcessStateEvent) error {
138139
writeMu.Lock()
139-
api.wsMtx.Lock()
140+
_ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
140141
err := ws.WriteJSON(&ev)
141-
api.wsMtx.Unlock()
142142
writeMu.Unlock()
143143
return err
144144
}

src/api/ws_api.go

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import (
77
"strconv"
88
"strings"
99
"sync"
10+
"sync/atomic"
11+
"time"
1012

1113
"github.com/f1bonacc1/process-compose/src/app"
1214
"github.com/f1bonacc1/process-compose/src/pclog"
@@ -45,6 +47,7 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) {
4547
}
4648

4749
done := make(chan struct{})
50+
wsWriteMtx := &sync.Mutex{}
4851
if follow {
4952
go handleIncoming(ws, done)
5053
}
@@ -55,37 +58,57 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) {
5558
logChan := make(chan LogMessage, 256)
5659
chanCloseMtx := &sync.Mutex{}
5760
isChannelClosed := false
61+
closeLogChan := func() {
62+
chanCloseMtx.Lock()
63+
defer chanCloseMtx.Unlock()
64+
if !isChannelClosed {
65+
close(logChan)
66+
isChannelClosed = true
67+
}
68+
}
69+
dropped := &atomic.Uint64{}
70+
warnedOnce := &atomic.Bool{}
71+
enqueue := func(msg LogMessage) bool {
72+
chanCloseMtx.Lock()
73+
defer chanCloseMtx.Unlock()
74+
if isChannelClosed {
75+
return false
76+
}
77+
select {
78+
case logChan <- msg:
79+
default:
80+
dropped.Add(1)
81+
if warnedOnce.CompareAndSwap(false, true) {
82+
log.Warn().Str("process", processName).Msg("ws subscriber backpressured; dropping log lines")
83+
}
84+
}
85+
return true
86+
}
5887
connector := pclog.NewConnector(
5988
func(messages []string) {
6089
for _, message := range messages {
6190
msg := LogMessage{
6291
Message: message,
6392
ProcessName: processName,
6493
}
65-
logChan <- msg
94+
enqueue(msg)
6695
}
6796
if !follow {
68-
chanCloseMtx.Lock()
69-
defer chanCloseMtx.Unlock()
70-
close(logChan)
71-
isChannelClosed = true
97+
closeLogChan()
7298
}
7399
},
74100
func(message string) (n int, err error) {
75101
msg := LogMessage{
76102
Message: message,
77103
ProcessName: processName,
78104
}
79-
chanCloseMtx.Lock()
80-
defer chanCloseMtx.Unlock()
81-
if isChannelClosed {
105+
if !enqueue(msg) {
82106
return 0, nil
83107
}
84-
logChan <- msg
85108
return len(message), nil
86109
},
87110
endOffset)
88-
go api.handleLog(ws, processName, connector, logChan, done)
111+
go api.handleLog(ws, processName, connector, logChan, done, wsWriteMtx, dropped, closeLogChan)
89112

90113
err = api.project.GetLogsAndSubscribe(processName, connector)
91114
if err != nil {
@@ -96,7 +119,22 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) {
96119

97120
}
98121

99-
func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclog.Connector, logChan chan LogMessage, done chan struct{}) {
122+
func (api *PcApi) handleLog(
123+
ws *websocket.Conn,
124+
procName string,
125+
connector *pclog.Connector,
126+
logChan chan LogMessage,
127+
done chan struct{},
128+
wsWriteMtx *sync.Mutex,
129+
dropped *atomic.Uint64,
130+
closeLogChan func(),
131+
) {
132+
defer func() {
133+
if count := dropped.Load(); count > 0 {
134+
log.Warn().Str("process", procName).Uint64("dropped", count).
135+
Msg("ws subscriber disconnected after dropped lines")
136+
}
137+
}()
100138
defer func(project app.IProject, name string, observer pclog.LogObserver) {
101139
err := project.UnSubscribeLogger(name, observer)
102140
if err != nil {
@@ -107,22 +145,25 @@ func (api *PcApi) handleLog(ws *websocket.Conn, procName string, connector *pclo
107145
for {
108146
select {
109147
case msg, open := <-logChan:
110-
api.wsMtx.Lock()
148+
if !open {
149+
return
150+
}
151+
// Serialize writes per ws.Conn. Multiple process streams share the
152+
// same connection when the request contains comma-separated names.
153+
wsWriteMtx.Lock()
154+
_ = ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
111155
err := ws.WriteJSON(&msg)
112-
api.wsMtx.Unlock()
156+
wsWriteMtx.Unlock()
113157
if err != nil {
114158
if errors.Is(err, net.ErrClosed) {
115159
return
116160
}
117161
log.Err(err).Msg("Failed to write to socket")
118162
return
119163
}
120-
if !open {
121-
return
122-
}
123164
case <-done:
124165
log.Warn().Msg("Socket closed remotely")
125-
close(logChan)
166+
closeLogChan()
126167
return
127168
}
128169

src/pclog/process_log_buffer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,16 @@ func (b *ProcessLogBuffer) Write(message string) {
5050
}
5151
b.mxBuf.Unlock()
5252

53+
// Snapshot observers under the lock, then fan out without holding it.
54+
// Holding mxObs across observer.WriteString lets a slow/blocked observer
55+
// stall every other Write and any new GetLogsAndSubscribe on this buffer.
5356
b.mxObs.Lock()
54-
defer b.mxObs.Unlock()
57+
snapshot := make([]LogObserver, 0, len(b.observers))
5558
for _, observer := range b.observers {
59+
snapshot = append(snapshot, observer)
60+
}
61+
b.mxObs.Unlock()
62+
for _, observer := range snapshot {
5663
_, _ = observer.WriteString(message)
5764
}
5865
}

0 commit comments

Comments
 (0)