Skip to content

Commit b6cf790

Browse files
committed
fix(race): resolve races in SimulatorSource and FVT TestUpsert
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 519a186 commit b6cf790

File tree

2 files changed

+9
-1
lines changed

2 files changed

+9
-1
lines changed

fvt/rule_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (s *RuleTestSuite) TestUpsert() {
172172
err := server.AddListener(tcp)
173173
s.Require().NoError(err)
174174
go func() {
175-
err = server.Serve()
175+
err := server.Serve()
176176
fmt.Println(err)
177177
}()
178178
fmt.Println(tcp.Address())
@@ -270,6 +270,8 @@ func (s *RuleTestSuite) TestUpsert() {
270270
})
271271
s.Run("compare result", func() {
272272
expected := map[string]string{"sim/new1": "{\"b\":2}", "sim/new2": "{\"a\":1}", "sim/old1": "{\"a\":1}", "sim/old2": "{\"b\":2}"}
273+
lock.Lock()
274+
defer lock.Unlock()
273275
s.Require().Equal(expected, result)
274276
})
275277
}

internal/io/simulator/simulator.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@ import (
2020
"github.com/lf-edge/ekuiper/contract/v2/api"
2121

2222
"github.com/lf-edge/ekuiper/v2/pkg/cast"
23+
"github.com/lf-edge/ekuiper/v2/pkg/syncx"
2324
)
2425

2526
type SimulatorSource struct {
2627
index int
2728
cfg *sConfig
2829
eof api.EOFIngest
30+
syncx.Mutex
2931
}
3032

3133
type sConfig struct {
@@ -44,6 +46,8 @@ func (s *SimulatorSource) Provision(ctx api.StreamContext, configs map[string]an
4446

4547
func (s *SimulatorSource) Close(ctx api.StreamContext) error {
4648
// Allow to reset in close rule trial run
49+
s.Lock()
50+
defer s.Unlock()
4751
s.index = 0
4852
return nil
4953
}
@@ -58,6 +62,8 @@ func (s *SimulatorSource) SetEofIngest(eof api.EOFIngest) {
5862
}
5963

6064
func (s *SimulatorSource) Pull(ctx api.StreamContext, trigger time.Time, ingest api.TupleIngest, _ api.ErrorIngest) {
65+
s.Lock()
66+
defer s.Unlock()
6167
if s.index >= len(s.cfg.Data) {
6268
if s.cfg.Loop {
6369
s.index = 0

0 commit comments

Comments
 (0)