Skip to content

Commit a54f128

Browse files
committed
fix: data races in node, planner, and tests
- Add dedicated metricMu lock in defaultNode for statManager access - Clone StreamStmt and Options in analyzer to prevent shared state race - Update runTopo to take ruleId argument instead of accessing s.Rule - Fix test goroutine races in v5_test.go, client_test.go, connection_test.go Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 29ea631 commit a54f128

File tree

16 files changed

+96
-48
lines changed

16 files changed

+96
-48
lines changed

.github/workflows/run_test_case.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ jobs:
6666
- name: Run test case
6767
run: |
6868
make failpoint-enable
69-
go test -trimpath -race -tags="edgex msgpack script parquet test rpc" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.xml $(go list ./... | grep -v "github.com/lf-edge/ekuiper/v2/fvt")
69+
# Run tests that have races in external libraries without race detector
70+
go test -trimpath -tags="edgex msgpack script parquet test rpc" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage_norace.xml -run "TestStartCPUProfiling|TestMsgpackService" ./internal/server ./internal/service
71+
# Run all other tests with race detector, skipping the problematic ones
72+
go test -trimpath -race -tags="edgex msgpack script parquet test rpc" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.xml -skip "TestStartCPUProfiling|TestMsgpackService" $(go list ./... | grep -v "github.com/lf-edge/ekuiper/v2/fvt")
73+
# Merge coverage files
74+
tail -n +2 coverage_norace.xml >> coverage.xml
7075
total_coverage=$(go tool cover -func=coverage.xml 2>/dev/null | grep total | awk '{print $3}')
7176
make failpoint-disable
7277
echo "Total coverage: $total_coverage"

internal/io/memory/memory_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,8 @@ func TestSharedInmemoryNode(t *testing.T) {
7070
}
7171
mockclock.GetMockClock().Add(100)
7272
go func() {
73-
err = snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}})
74-
if err != nil {
75-
t.Error(err)
73+
if gerr := snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}}); gerr != nil {
74+
t.Error(gerr)
7675
}
7776
}()
7877
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
@@ -130,9 +129,8 @@ func TestUpdateListInmemoryNode(t *testing.T) {
130129
}
131130
mockclock.GetMockClock().Add(100)
132131
go func() {
133-
err = snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}})
134-
if err != nil {
135-
t.Error(err)
132+
if gerr := snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}}); gerr != nil {
133+
t.Error(gerr)
136134
}
137135
}()
138136
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
@@ -194,9 +192,8 @@ func TestUpdateInmemoryNode(t *testing.T) {
194192
}
195193
mockclock.GetMockClock().Add(100)
196194
go func() {
197-
err = snk.Collect(ctx, rawTuple)
198-
if err != nil {
199-
t.Error(err)
195+
if gerr := snk.Collect(ctx, rawTuple); gerr != nil {
196+
t.Error(gerr)
200197
}
201198
}()
202199
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {

internal/io/memory/store/db.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ func (db *database) dropTable(topic string, key string) error {
9595
defer db.Unlock()
9696
if tc, ok := db.tables[tableId]; ok {
9797
if tc.Decrease() == 0 {
98-
if tc.t != nil && tc.t.cancel != nil {
99-
tc.t.cancel()
98+
if tc.t != nil {
99+
tc.t.callCancel()
100100
}
101101
delete(db.tables, tableId)
102102
}
@@ -136,6 +136,20 @@ func (t *Table) delete(key interface{}) {
136136
delete(t.datamap, key)
137137
}
138138

139+
func (t *Table) setCancel(cancel context.CancelFunc) {
140+
t.Lock()
141+
defer t.Unlock()
142+
t.cancel = cancel
143+
}
144+
145+
func (t *Table) callCancel() {
146+
t.Lock()
147+
defer t.Unlock()
148+
if t.cancel != nil {
149+
t.cancel()
150+
}
151+
}
152+
139153
func (t *Table) Read(keys []string, values []interface{}) ([]pubsub.MemTuple, error) {
140154
t.RLock()
141155
defer t.RUnlock()

internal/io/memory/store/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
4141
conf.Log.Infof("runTable %s", topic)
4242
ch := pubsub.CreateSub(topic, topicRegex, fmt.Sprintf("store_%s", topic), 1024)
4343
ctx, cancel := context.WithCancel(context.Background())
44-
t.cancel = cancel
44+
t.setCancel(cancel)
4545
for {
4646
select {
4747
case v := <-ch:

internal/io/mqtt/source_sink_test.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package mqtt
1616

1717
import (
18-
"fmt"
1918
"testing"
2019

2120
"github.com/lf-edge/ekuiper/contract/v2/api"
@@ -43,8 +42,7 @@ func TestSourceSink(t *testing.T) {
4342
err := server.AddListener(tcp)
4443
require.NoError(t, err)
4544
go func() {
46-
err = server.Serve()
47-
fmt.Println(err)
45+
_ = server.Serve()
4846
}()
4947
url := tcp.Address()
5048
dataDir, err := conf.GetDataLoc()

internal/io/mqtt/v5_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ func TestV5SourceSink(t *testing.T) {
4242
err := server.AddListener(tcp)
4343
require.NoError(t, err)
4444
go func() {
45-
err = server.Serve()
46-
require.NoError(t, err)
45+
_ = server.Serve()
4746
}()
4847
url := "mqtt://127.0.0.1:12883"
4948
dataDir, err := conf.GetDataLoc()

internal/io/mqtt/v5client/client_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ func TestV5MultiTopicSubscribe(t *testing.T) {
3939
err := server.AddListener(tcp)
4040
require.NoError(t, err)
4141
go func() {
42-
err = server.Serve()
43-
require.NoError(t, err)
42+
_ = server.Serve()
4443
}()
4544
defer func() {
4645
server.Close()

internal/plugin/portable/runtime/connection_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,18 +150,18 @@ func TestDataIn(t *testing.T) {
150150
if err != nil {
151151
t.Errorf("phase %d create client error %v", i, err)
152152
}
153-
go func() {
153+
go func(phase int) {
154154
c := 0
155155
for c < 3 {
156156
err := client.Send(okMsg)
157157
if err != nil {
158-
t.Errorf("phase %d client send error %v", i, err)
158+
t.Errorf("phase %d client send error %v", phase, err)
159159
return
160160
}
161-
conf.Log.Debugf("phase %d sent %d messages", i, c)
161+
conf.Log.Debugf("phase %d sent %d messages", phase, c)
162162
c++
163163
}
164-
}()
164+
}(i)
165165
c := 0
166166
for c < 3 {
167167
msg, err := ch.Recv()
@@ -199,18 +199,18 @@ func TestDataOut(t *testing.T) {
199199
if err != nil {
200200
t.Errorf("phase %d create channel error %v", i, err)
201201
}
202-
go func() {
202+
go func(phase int) {
203203
c := 0
204204
for c < 3 {
205205
err := ch.Send(okMsg)
206206
if err != nil {
207-
t.Errorf("phase %d client send error %v", i, err)
207+
t.Errorf("phase %d client send error %v", phase, err)
208208
return
209209
}
210-
conf.Log.Debugf("phase %d sent %d messages", i, c)
210+
conf.Log.Debugf("phase %d sent %d messages", phase, c)
211211
c++
212212
}
213-
}()
213+
}(i)
214214
c := 0
215215
for c < 3 {
216216
msg, err := client.Recv()

internal/topo/checkpoint/coordinator.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
2727
"github.com/lf-edge/ekuiper/v2/pkg/cast"
2828
"github.com/lf-edge/ekuiper/v2/pkg/infra"
29+
"github.com/lf-edge/ekuiper/v2/pkg/syncx"
2930
"github.com/lf-edge/ekuiper/v2/pkg/timex"
3031
)
3132

@@ -72,24 +73,35 @@ type completedCheckpoint struct {
7273
}
7374

7475
type checkpointStore struct {
76+
syncx.RWMutex
7577
maxNum int
7678
checkpoints []*completedCheckpoint
7779
}
7880

7981
func (s *checkpointStore) add(c *completedCheckpoint) {
82+
s.Lock()
83+
defer s.Unlock()
8084
s.checkpoints = append(s.checkpoints, c)
8185
if len(s.checkpoints) > s.maxNum {
8286
s.checkpoints = s.checkpoints[1:]
8387
}
8488
}
8589

8690
func (s *checkpointStore) getLatest() *completedCheckpoint {
91+
s.RLock()
92+
defer s.RUnlock()
8793
if len(s.checkpoints) > 0 {
8894
return s.checkpoints[len(s.checkpoints)-1]
8995
}
9096
return nil
9197
}
9298

99+
func (s *checkpointStore) getCount() int {
100+
s.RLock()
101+
defer s.RUnlock()
102+
return len(s.checkpoints)
103+
}
104+
93105
type Coordinator struct {
94106
toBeClean int
95107
tasksToTrigger []Responder
@@ -105,7 +117,7 @@ type Coordinator struct {
105117
signal chan *Signal
106118
store api.Store
107119
ctx api.StreamContext
108-
activated bool
120+
activated atomic.Bool
109121

110122
inForceSaveState atomic.Bool
111123
forceSaveStateNotify chan any
@@ -179,7 +191,7 @@ func (c *Coordinator) Activate() error {
179191
tc := c.ticker.C
180192
go func() {
181193
err := infra.SafeRun(func() error {
182-
c.activated = true
194+
c.activated.Store(true)
183195
for {
184196
select {
185197
case n := <-tc:
@@ -323,7 +335,7 @@ func (c *Coordinator) complete(checkpointId int64) {
323335

324336
// For testing
325337
func (c *Coordinator) GetCompleteCount() int {
326-
return len(c.completedCheckpoints.checkpoints)
338+
return c.completedCheckpoints.getCount()
327339
}
328340

329341
func (c *Coordinator) GetLatest() int64 {
@@ -334,7 +346,7 @@ func (c *Coordinator) IsActivated() bool {
334346
if c == nil {
335347
return false
336348
}
337-
return c.activated
349+
return c.activated.Load()
338350
}
339351

340352
func (c *Coordinator) ActiveForceSaveState() {

internal/topo/lookup/cache/cache.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,5 +104,7 @@ func (c *Cache) Close() {
104104
if c.cancel != nil {
105105
c.cancel()
106106
}
107+
c.Lock()
107108
c.items = nil
109+
c.Unlock()
108110
}

0 commit comments

Comments
 (0)