Unblock session calls when broker workers shut down#318
Conversation
There was a problem hiding this comment.
Pull request overview
This PR prevents module session calls from blocking indefinitely when the broker’s worker goroutines have already exited (e.g., due to early cancellation), by introducing a shutdown signal and making session operations cancellation-aware.
Changes:
- Added a
done <-chan struct{}tosession, wired to the broker worker context’sDone(). - Updated
Print,Printf,Printlnto avoid blocking sends by selecting ondone. - Updated
RequestFile/SendFileto select ondonefor blocking channel operations and return errors on shutdown.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| internal/dutagent/session.go | Adds done and uses it to unblock or fail fast in session print and file-transfer APIs. |
| internal/dutagent/broker.go | Wires session.done to the broker worker context so session calls can observe shutdown. |
Comments suppressed due to low confidence (1)
internal/dutagent/session.go:21
- This PR unblocks
Print*and file operations when broker workers shut down, butConsole()still returnschanio.ChanWriterinstances that write to unbuffered channels (internal/chanio/chanio.go:110-115). IftoClientWorkerhas exited, module writes tostdout/stderrcan still block indefinitely, which seems to contradict the PR intent of avoiding deadlocks when workers stop. Consider wrapping the console writers with a session-aware writer that selects ons.done, or otherwise ensuring stdout/stderr writes cannot block oncedoneis closed.
done <-chan struct{} // closed when broker workers shut down; unblocks pending session calls
printCh chan string
stdinCh chan []byte
stdoutCh chan []byte
stderrCh chan []byte
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
All session methods (Print, Printf, Println, RequestFile, SendFile)
blocked indefinitely on unbuffered channels if the companion
toClientWorker had already exited — for example when fromClientWorker
returned an error first and cancelled the shared worker context.
Add a done <-chan struct{} field to session, set to workerCtx.Done()
in Broker.Start. Every blocking channel send/receive in the session
now selects on done and returns immediately (or with an error for
file operations) when the broker is shutting down.
Signed-off-by: Fabian Wienand <fabian.wienand@blindspot.software>
c3eedac to
2118801
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| select { | ||
| case s.fileCh <- file: | ||
| case <-s.done: | ||
| s.currentFile = "" | ||
|
|
There was a problem hiding this comment.
SendFile now returns an error when done is closed, but the method still log.Fatals if s.currentFile != "" before it ever checks done. With the new shutdown-unblocking behavior, currentFile can remain set during cancellation (e.g., an in-flight RequestFile), which can turn a shutdown into a process exit. Consider checking done (and returning an error) before the currentFile != "" fatal, or ensuring currentFile is always cleared on shutdown.
| s.fileCh <- file | ||
|
|
||
| select { | ||
| case <-ctx.Done(): |
There was a problem hiding this comment.
If ctx.Done() fires here, fromClientWorker returns without clearing s.currentFile. In cancellation scenarios where toClientWorker has already set currentFile for an outstanding request, this can leave the session in an inconsistent state and may cause later calls (notably SendFile, which checks currentFile) to log.Fatal. Consider resetting s.currentFile before returning on the cancellation path.
| case <-ctx.Done(): | |
| case <-ctx.Done(): | |
| s.currentFile = "" |
All session methods (Print, Printf, Println, RequestFile, SendFile) blocked indefinitely on unbuffered channels if the companion toClientWorker had already exited — for example when fromClientWorker returned an error first and cancelled the shared worker context.
Add a done <-chan struct{} field to session, set to workerCtx.Done() in Broker.Start. Every blocking channel send/receive in the session now selects on done and returns immediately (or with an error for file operations) when the broker is shutting down.