Skip to content

Commit cd3b394

Browse files
committed
fix: data races in node, planner, and tests
- Add dedicated metricMu lock in defaultNode for statManager access - Clone StreamStmt and Options in analyzer to prevent shared state race - Update runTopo to take ruleId argument instead of accessing s.Rule - Fix test goroutine races in v5_test.go, client_test.go, connection_test.go Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 29ea631 commit cd3b394

File tree

6 files changed

+28
-16
lines changed

6 files changed

+28
-16
lines changed

internal/io/mqtt/v5_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ func TestV5SourceSink(t *testing.T) {
4242
err := server.AddListener(tcp)
4343
require.NoError(t, err)
4444
go func() {
45-
err = server.Serve()
46-
require.NoError(t, err)
45+
_ = server.Serve()
4746
}()
4847
url := "mqtt://127.0.0.1:12883"
4948
dataDir, err := conf.GetDataLoc()

internal/io/mqtt/v5client/client_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ func TestV5MultiTopicSubscribe(t *testing.T) {
3939
err := server.AddListener(tcp)
4040
require.NoError(t, err)
4141
go func() {
42-
err = server.Serve()
43-
require.NoError(t, err)
42+
_ = server.Serve()
4443
}()
4544
defer func() {
4645
server.Close()

internal/plugin/portable/runtime/connection_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,18 +150,18 @@ func TestDataIn(t *testing.T) {
150150
if err != nil {
151151
t.Errorf("phase %d create client error %v", i, err)
152152
}
153-
go func() {
153+
go func(phase int) {
154154
c := 0
155155
for c < 3 {
156156
err := client.Send(okMsg)
157157
if err != nil {
158-
t.Errorf("phase %d client send error %v", i, err)
158+
t.Errorf("phase %d client send error %v", phase, err)
159159
return
160160
}
161-
conf.Log.Debugf("phase %d sent %d messages", i, c)
161+
conf.Log.Debugf("phase %d sent %d messages", phase, c)
162162
c++
163163
}
164-
}()
164+
}(i)
165165
c := 0
166166
for c < 3 {
167167
msg, err := ch.Recv()
@@ -199,18 +199,18 @@ func TestDataOut(t *testing.T) {
199199
if err != nil {
200200
t.Errorf("phase %d create channel error %v", i, err)
201201
}
202-
go func() {
202+
go func(phase int) {
203203
c := 0
204204
for c < 3 {
205205
err := ch.Send(okMsg)
206206
if err != nil {
207-
t.Errorf("phase %d client send error %v", i, err)
207+
t.Errorf("phase %d client send error %v", phase, err)
208208
return
209209
}
210-
conf.Log.Debugf("phase %d sent %d messages", i, c)
210+
conf.Log.Debugf("phase %d sent %d messages", phase, c)
211211
c++
212212
}
213-
}()
213+
}(i)
214214
c := 0
215215
for c < 3 {
216216
msg, err := client.Recv()

internal/topo/node/node.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type defaultNode struct {
4040
name string
4141
concurrency int
4242
sendError bool
43+
metricMu syncx.RWMutex
4344
statManager metric.StatManager
4445
ctx api.StreamContext
4546
ctrlCh chan<- error
@@ -99,13 +100,17 @@ func (o *defaultNode) SetQos(qos def.Qos) {
99100
}
100101

101102
func (o *defaultNode) GetMetrics() []any {
103+
o.metricMu.RLock()
104+
defer o.metricMu.RUnlock()
102105
if o.statManager != nil {
103106
return o.statManager.GetMetrics()
104107
}
105108
return nil
106109
}
107110

108111
func (o *defaultNode) RemoveMetrics(ruleId string) {
112+
o.metricMu.RLock()
113+
defer o.metricMu.RUnlock()
109114
if o.statManager != nil {
110115
o.statManager.Clean(ruleId)
111116
}
@@ -231,7 +236,9 @@ func (o *defaultSinkNode) SetBarrierHandler(bh checkpoint.BarrierHandler) {
231236

232237
func (o *defaultNode) prepareExec(ctx api.StreamContext, errCh chan<- error, opType string) {
233238
ctx.GetLogger().Infof("%s started", o.name)
239+
o.metricMu.Lock()
234240
o.statManager = metric.NewStatManager(ctx, opType)
241+
o.metricMu.Unlock()
235242
o.ctx = ctx
236243
wg := ctx.Value(context.RuleWaitGroupKey)
237244
if wg != nil {

internal/topo/planner/analyzer.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,8 +521,15 @@ func convertStreamInfo(streamStmt *ast.StreamStmt) (*streamInfo, error) {
521521
return nil, err
522522
}
523523
}
524+
// Clone the statement to avoid data race when multiple rules sharing the same stream definition
525+
// The planner may modify the options, e.g. set the type to default value
526+
newStmt := *streamStmt
527+
if streamStmt.Options != nil {
528+
newOpt := *streamStmt.Options
529+
newStmt.Options = &newOpt
530+
}
524531
return &streamInfo{
525-
stmt: streamStmt,
532+
stmt: &newStmt,
526533
schema: ss,
527534
}, nil
528535
}

internal/topo/rule/action.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ func (s *State) doStart() error {
8888
s.topoGraph = s.topology.GetTopo()
8989
}
9090
}
91-
go s.runTopo(s.topology)
91+
go s.runTopo(s.topology, s.Rule.Id)
9292
return nil
9393
})
9494
if err != nil {
@@ -116,7 +116,7 @@ func (s *State) doStop(stateType machine.RunState, msg string) {
116116
}
117117

118118
// This is called async
119-
func (s *State) runTopo(tp *topo.Topo) {
119+
func (s *State) runTopo(tp *topo.Topo, ruleId string) {
120120
s.logger.Infof("topo %d opens", tp.GetRunId())
121121
e := <-tp.Open()
122122
s.logger.Infof("topo %d stops", tp.GetRunId())
@@ -135,7 +135,7 @@ func (s *State) runTopo(tp *topo.Topo) {
135135
if len(msg) > 0 {
136136
lastWill = fmt.Sprintf("%s: %s", lastWill, msg)
137137
}
138-
s.updateTrigger(s.Rule.Id, false)
138+
s.updateTrigger(ruleId, false)
139139
}
140140
}
141141
// The run exit may be caused by user action or rule itself

0 commit comments

Comments
 (0)