Skip to content

Commit 5bc6248

Browse files
committed
fix(review): serialize store.Load with concurrent Append writes
A journal Append on a session larger than a single page-write can be observed mid-write by a concurrent reader using a separate file handle, so the bufio scanner in ReadAll can decode a truncated line and fail. Switch sessLock to RWMutex; Load takes RLock, Append keeps the write lock. Append's internal replay uses a non-locking helper to avoid re-entering the lock.
1 parent 17bd63c commit 5bc6248

1 file changed

Lines changed: 22 additions & 8 deletions

File tree

internal/agentstore/store.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type Store struct {
2626

2727
mu sync.Mutex
2828
open map[agentcore.SessionID]*Journal
29-
sessLock map[agentcore.SessionID]*sync.Mutex
29+
sessLock map[agentcore.SessionID]*sync.RWMutex
3030
}
3131

3232
// SessionMeta is the lightweight summary used by List. It is NOT a
@@ -50,7 +50,7 @@ func NewStore(root string) (*Store, error) {
5050
return &Store{
5151
root: root,
5252
open: map[agentcore.SessionID]*Journal{},
53-
sessLock: map[agentcore.SessionID]*sync.Mutex{},
53+
sessLock: map[agentcore.SessionID]*sync.RWMutex{},
5454
}, nil
5555
}
5656

@@ -110,8 +110,21 @@ func (s *Store) Create(id agentcore.SessionID, projectID, title string, model ag
110110
}
111111

112112
// Load replays a session's journal into its current state. Returns
113-
// ErrSessionNotFound if the session directory does not exist.
113+
// ErrSessionNotFound if the session directory does not exist. Takes
114+
// the per-session read lock so a concurrent Append cannot expose a
115+
// partially-written event (single Write+Sync on the writer side is
116+
// not observed atomically by a separate file handle on the reader
117+
// side once an event grows past the kernel's page-write boundary).
114118
func (s *Store) Load(id agentcore.SessionID) (agentcore.Session, error) {
119+
lock := s.sessionLock(id)
120+
lock.RLock()
121+
defer lock.RUnlock()
122+
return s.loadLocked(id)
123+
}
124+
125+
// loadLocked replays the journal without taking the per-session lock.
126+
// Callers must already hold sessionLock(id) (read or write).
127+
func (s *Store) loadLocked(id agentcore.SessionID) (agentcore.Session, error) {
115128
path := filepath.Join(s.sessionDir(id), journalFile)
116129
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
117130
return agentcore.Session{}, ErrSessionNotFound
@@ -150,7 +163,7 @@ func (s *Store) Append(id agentcore.SessionID, ev agentproto.AgentEvent) error {
150163
// an empty events.jsonl under a freshly created directory, leaving an
151164
// invalid journal (no SessionCreated header) that future Load calls
152165
// reject and future Create calls refuse because the file exists.
153-
current, err := s.Load(id)
166+
current, err := s.loadLocked(id)
154167
if err != nil {
155168
return err
156169
}
@@ -223,16 +236,17 @@ func (s *Store) sessionDir(id agentcore.SessionID) string {
223236
return filepath.Join(s.root, string(id))
224237
}
225238

226-
// sessionLock returns the per-session mutex used to serialize the
239+
// sessionLock returns the per-session RWMutex used to serialize the
227240
// Load/Apply/Append sequence in Create and Append. Without it two
228241
// concurrent appends on the same session could validate against the
229-
// same pre-snapshot and persist mutually-inconsistent events.
230-
func (s *Store) sessionLock(id agentcore.SessionID) *sync.Mutex {
242+
// same pre-snapshot and persist mutually-inconsistent events. Load
243+
// holds the read side so it cannot observe a journal mid-write.
244+
func (s *Store) sessionLock(id agentcore.SessionID) *sync.RWMutex {
231245
s.mu.Lock()
232246
defer s.mu.Unlock()
233247
m, ok := s.sessLock[id]
234248
if !ok {
235-
m = &sync.Mutex{}
249+
m = &sync.RWMutex{}
236250
s.sessLock[id] = m
237251
}
238252
return m

0 commit comments

Comments
 (0)