fix: data races that can crash the process under load#325
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces thread-safe helpers (setKillChannel, getKillChannel, deleteKillChannel, signalKill) guarded by a mutex to replace direct, concurrent map access to killchannel, resolving potential data races. It also implements a copy-on-write mechanism in updateUserInfo to prevent concurrent map read/write panics, removes unused event subscription fields, and adds comprehensive tests. The reviewer's feedback focuses on further improving thread safety and avoiding redundant lookups by passing the kill channel directly as a parameter to startClient and modifying deleteKillChannel to accept the channel as an argument to prevent a slow-cleanup goroutine from deleting a newly established session's channel.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| func deleteKillChannel(userID string) { | ||
| killchannelMu.Lock() | ||
| delete(killchannel, userID) | ||
| killchannelMu.Unlock() | ||
| } |
There was a problem hiding this comment.
To prevent a race condition where a slow-cleanup goroutine from an old session deletes the kill channel of a newly established session for the same user, deleteKillChannel should accept the channel as an argument and only delete it if it matches the current channel in the map.
| func deleteKillChannel(userID string) { | |
| killchannelMu.Lock() | |
| delete(killchannel, userID) | |
| killchannelMu.Unlock() | |
| } | |
| func deleteKillChannel(userID string, ch chan bool) { | |
| killchannelMu.Lock() | |
| if current, ok := killchannel[userID]; ok && current == ch { | |
| delete(killchannel, userID) | |
| } | |
| killchannelMu.Unlock() | |
| } |
| } | ||
|
|
||
| func (s *server) startClient(userID string, textjid string, token string, subscriptions []string) { | ||
| func (s *server) startClient(userID string, textjid string, token string) { |
There was a problem hiding this comment.
Pass the kill channel directly as a parameter to startClient to avoid any potential race conditions or lookups from the global map during startup.
| func (s *server) startClient(userID string, textjid string, token string) { | |
| func (s *server) startClient(userID string, textjid string, token string, kill chan bool) { |
| setKillChannel(txtid, make(chan bool, 1)) | ||
| go s.startClient(txtid, jid, token) |
There was a problem hiding this comment.
Create the kill channel and pass it directly to startClient to avoid looking it up from the global map inside the goroutine.
| setKillChannel(txtid, make(chan bool, 1)) | |
| go s.startClient(txtid, jid, token) | |
| ch := make(chan bool, 1) | |
| setKillChannel(txtid, ch) | |
| go s.startClient(txtid, jid, token, ch) |
| setKillChannel(txtid, make(chan bool, 1)) | ||
| go s.startClient(txtid, jid, token) |
There was a problem hiding this comment.
Create the kill channel and pass it directly to startClient to avoid looking it up from the global map inside the goroutine.
| setKillChannel(txtid, make(chan bool, 1)) | |
| go s.startClient(txtid, jid, token) | |
| ch := make(chan bool, 1) | |
| setKillChannel(txtid, ch) | |
| go s.startClient(txtid, jid, token, ch) |
| // Keep the session goroutine alive until a kill signal arrives. Block on the | ||
| // channel (captured once via the mutex-guarded helper) instead of polling — | ||
| // this parks the goroutine with zero CPU and no per-second mutex access. | ||
| kill, ok := getKillChannel(userID) | ||
| if !ok { | ||
| log.Error().Str("userid", userID).Msg("no kill channel registered for session; goroutine exiting") | ||
| return | ||
| } | ||
| <-kill | ||
| log.Info().Str("userid", userID).Msg("Received kill signal") | ||
| client.Disconnect() | ||
| clientManager.DeleteWhatsmeowClient(userID) | ||
| clientManager.DeleteMyClient(userID) | ||
| clientManager.DeleteHTTPClient(userID) | ||
| if _, err := s.db.Exec(`UPDATE users SET qrcode='', connected=0 WHERE id=$1`, userID); err != nil { | ||
| log.Error().Err(err).Msg("failed to mark user disconnected on kill") | ||
| } | ||
| deleteKillChannel(userID) |
There was a problem hiding this comment.
Since the kill channel is now passed directly as a parameter, we can eliminate the map lookup and the error handling. Also, pass the kill channel to deleteKillChannel to ensure we only delete the correct channel from the map.
| // Keep the session goroutine alive until a kill signal arrives. Block on the | |
| // channel (captured once via the mutex-guarded helper) instead of polling — | |
| // this parks the goroutine with zero CPU and no per-second mutex access. | |
| kill, ok := getKillChannel(userID) | |
| if !ok { | |
| log.Error().Str("userid", userID).Msg("no kill channel registered for session; goroutine exiting") | |
| return | |
| } | |
| <-kill | |
| log.Info().Str("userid", userID).Msg("Received kill signal") | |
| client.Disconnect() | |
| clientManager.DeleteWhatsmeowClient(userID) | |
| clientManager.DeleteMyClient(userID) | |
| clientManager.DeleteHTTPClient(userID) | |
| if _, err := s.db.Exec(`UPDATE users SET qrcode='', connected=0 WHERE id=$1`, userID); err != nil { | |
| log.Error().Err(err).Msg("failed to mark user disconnected on kill") | |
| } | |
| deleteKillChannel(userID) | |
| // Keep the session goroutine alive until a kill signal arrives. Block on the | |
| // channel (passed directly as a parameter) instead of polling — | |
| // this parks the goroutine with zero CPU and no per-second mutex access. | |
| <-kill | |
| log.Info().Str("userid", userID).Msg("Received kill signal") | |
| client.Disconnect() | |
| clientManager.DeleteWhatsmeowClient(userID) | |
| clientManager.DeleteMyClient(userID) | |
| clientManager.DeleteHTTPClient(userID) | |
| if _, err := s.db.Exec("UPDATE users SET qrcode='', connected=0 WHERE id=$1", userID); err != nil { | |
| log.Error().Err(err).Msg("failed to mark user disconnected on kill") | |
| } | |
| deleteKillChannel(userID, kill) |
| } | ||
|
|
||
| // delete removes the entry; signalKill on a missing entry is a safe no-op. | ||
| deleteKillChannel(u) |
| setKillChannel(uid, make(chan bool, 1)) | ||
| signalKill(uid) | ||
| _, _ = getKillChannel(uid) | ||
| deleteKillChannel(uid) |
…elete A slow-cleanup goroutine from an old session could delete the kill channel of a newer session for the same user (reconnect), leaving the new session unkillable. deleteKillChannel now takes the channel the caller owns and removes the entry only if the map still holds that exact channel. Addresses Gemini review on asternic#325. New test TestDeleteKillChannelStaleSession fails (unkillable session) on the old unconditional delete, passes now.
Each startClient goroutine now receives the channel its caller registered instead of looking it up from the global map at startup. This removes a lookup-ownership race (a concurrent reconnect could replace the map entry between setKillChannel and the goroutine's getKillChannel, so two sessions could end up blocked on the same buffered channel and one would leak). The goroutine now deterministically owns its own channel, which also pairs with the compare-and-delete in deleteKillChannel. Addresses the remaining Gemini review items on asternic#325.
|
Thanks for the thorough review — addressed all of it:
|
Summary
Three related data-race fixes, each of which can crash the whole process with
fatal error: concurrent map ...(or cause torn reads) under concurrent load. They're the same class of issue, so I've grouped them into one PR.These supersede #317, #318 and #319 — I closed those to consolidate into a single, easier-to-review change.
1.
updateUserInfomutated a shared map in place —helpers.goupdateUserInfowrote straight into the map held byuserinfocache(values.(Values).m[field] = value). That same map is handed to request goroutines through the request context, so the in-place write races with concurrent readers (Values.Get), producingfatal error: concurrent map read and map write.Now it's copy-on-write: build a fresh map, copy the old entries, set the new value, and return a new
Values. Callers already persist the result viauserinfocache.Set, so no call sites change.2.
killchannelmap accessed without synchronization —main.go,wmiau.go,handlers.goThe global
killchannel(map[string]chan bool) was read, written and deleted from HTTP request goroutines (Connect / Disconnect / logout) and from the per-sessionstartClientgoroutine, with no lock — so two simultaneous connects could crash withfatal error: concurrent map writes.It's now guarded by a dedicated mutex behind small helpers (
setKillChannel/getKillChannel/deleteKillChannel/signalKill). The lock is held only around the map operation, never while sending/receiving on a channel. As a small bonus, the keep-alive loop now blocks on<-killinstead of polling the map every second.3. Removed the write-only
MyClient.subscriptionsfield —clients.go,wmiau.go,handlers.goThis field was written from two goroutines (the event handler and the request path) but never read — a data race on dead state. Subscriptions are already re-read from
users.events/userinfocacheon every event (updateAndGetUserSubscriptions), so the field was redundant. Removed the field, its assignments, and the now-unusedsubscribedEventsparameter ofstartClient.Testing
New unit tests covering each fix:
TestUpdateUserInfoCopyOnWrite— proves the originalValuesis untouched after an update.TestUpdateUserInfoConcurrent— hammersupdateUserInfo+Getfrom many goroutines.TestKillChannelHelpers— set/get/delete/signal round-trip.TestKillChannelConcurrent— concurrent set/get/delete across many goroutines.TestUpdateAndGetUserSubscriptionsFromCache— subscriptions still resolve from the cache after removing the field.The two
*Concurrenttests rely on Go's built-in concurrent-map detector: on the unfixed code they fatal-error withconcurrent map writes/concurrent map read and map write(that was the failing "before" state); they pass on the fixed code.No behaviour changes for API consumers — these are internal correctness/stability fixes.