Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,5 +200,14 @@ func (w *worker) setResult(curr uint64, err error) {
func (w *worker) getState() workerState {
w.lock.Lock()
defer w.lock.Unlock()
return w.state
state := w.state
// state is a shallow copy: workerState embeds result, whose `failed` map is
// shared by reference. The worker keeps mutating it under w.lock, while
// callers (coordinator stats) read it without holding the lock. Deep-copy the
// map so the returned snapshot is safe to read concurrently.
state.failed = make(map[uint64]int, len(w.state.failed))
for height, count := range w.state.failed {
state.failed[height] = count
}
return state
}
56 changes: 56 additions & 0 deletions das/worker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package das

import (
"fmt"
"sync"
"testing"
)

// TestWorker_getStateConcurrentRead verifies that a workerState snapshot
// returned by getState can be read concurrently with the worker mutating its
// `failed` map.
//
// Before the fix, getState returned workerState by value, but the embedded
// `failed` map was shared by reference with the live worker. Iterating that map
// in the coordinator's stats path (unsafeStats) while the worker kept writing to
// it under its lock triggered a fatal "concurrent map iteration and map write".
// This test reproduces that access pattern and must run cleanly (also under
// -race) after the fix deep-copies the map.
func TestWorker_getStateConcurrentRead(t *testing.T) {
w := &worker{
state: workerState{
result: result{
job: job{from: 1, to: 1000},
failed: make(map[uint64]int),
},
},
}

var wg sync.WaitGroup
wg.Add(2)

// writer: record a failure per height, mutating `failed` under w.lock,
// exactly as setResult does during sampling.
go func() {
defer wg.Done()
for h := uint64(1); h <= 1000; h++ {
w.setResult(h, fmt.Errorf("height %d failed", h))
}
}()

// reader: snapshot the state and iterate `failed`, as the coordinator's
// unsafeStats does, without holding the worker lock.
go func() {
defer wg.Done()
for range 1000 {
st := w.getState()
total := 0
for _, count := range st.failed {
total += count
}
_ = total
}
}()

wg.Wait()
}
Loading