Skip to content

Commit 85d4f19

Browse files
authored
fix: enable race ci and fix problems (#3996)
Enables -race flag for unit tests and FVT tests in run_test_case.yaml to catch data races in CI. --------- Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent bdbe56a commit 85d4f19

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+593
-192
lines changed

.github/workflows/run_test_case.yaml

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,38 @@ jobs:
6363
cd extensions/functions/onnx
6464
sudo ./install.sh
6565
66+
- name: Install Merge Tool
67+
run: go install github.com/wadey/gocovmerge@latest
68+
6669
- name: Run test case
6770
run: |
6871
make failpoint-enable
69-
go test -trimpath -tags="edgex msgpack script parquet test rpc" --cover -covermode=atomic -coverpkg=./... -coverprofile=coverage.xml $(go list ./... | grep -v "github.com/lf-edge/ekuiper/v2/fvt")
72+
73+
# 1. Run MAIN tests (With Race)
74+
# We ADD -skip to avoid running the special tests here
75+
go test -trimpath -race -tags="edgex msgpack script parquet test rpc" \
76+
-skip "TestMsgpackService|TestExtensions|TestFuncState|TestFuncStateCheckpoint|TestStartCPUProfiling" \
77+
-cover -covermode=atomic -coverpkg=./... -coverprofile=cover_race.out \
78+
$(go list ./... | grep -v "github.com/lf-edge/ekuiper/v2/fvt")
79+
80+
# 2. Run SPECIAL tests (No Race)
81+
go test -trimpath -tags="edgex msgpack script parquet test rpc" \
82+
-run "TestMsgpackService|TestExtensions|TestFuncState|TestFuncStateCheckpoint|TestStartCPUProfiling" \
83+
-cover -covermode=atomic -coverpkg=./... -coverprofile=cover_norace.out \
84+
./internal/service ./internal/topo/exttest ./internal/server/proftest
85+
86+
# 3. Safe Merge using gocovmerge
87+
# This correctly sums up the counters for shared code
88+
$HOME/go/bin/gocovmerge cover_race.out cover_norace.out > coverage.xml
89+
90+
# 4. Calculate Total
7091
total_coverage=$(go tool cover -func=coverage.xml 2>/dev/null | grep total | awk '{print $3}')
71-
make failpoint-disable
7292
echo "Total coverage: $total_coverage"
93+
94+
# 5. Ensure failpoints disable even if tests fail
95+
- name: Cleanup Failpoints
96+
if: always() # Runs even if previous step failed
97+
run: make failpoint-disable
7398
- uses: actions/upload-artifact@v4
7499
if: failure()
75100
with:
@@ -86,7 +111,7 @@ jobs:
86111
- name: Run fvt
87112
run: |
88113
export KUIPER__BASIC__ALLOWEXTERNALFILEACCESS="true"
89-
go test -trimpath -tags="full deadlock" --cover -covermode=atomic -coverpkg=./... -coverprofile=fvt_coverage.xml ./fvt
114+
go test -trimpath -race -tags="full deadlock" --cover -covermode=atomic -coverpkg=./... -coverprofile=fvt_coverage.xml ./fvt
90115
total_coverage=$(go tool cover -func=coverage.xml 2>/dev/null | grep total | awk '{print $3}')
91116
echo "Total coverage: $total_coverage"
92117
- uses: actions/upload-artifact@v4

extensions/impl/video/source.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ func (s *Source) runCurrent(ctx api.StreamContext, fps string, ingest api.BytesI
167167
// We must read stderr even if DebugResp is false to prevent the ffmpeg pipe from
168168
// blocking and to capture the last error message for diagnostics.
169169
var lastStderr string
170+
stderrDone := make(chan struct{})
170171
go func() {
172+
defer close(stderrDone)
171173
scanner := bufio.NewScanner(stderr)
172174
for scanner.Scan() {
173175
line := scanner.Text()
@@ -203,6 +205,7 @@ func (s *Source) runCurrent(ctx api.StreamContext, fps string, ingest api.BytesI
203205
case <-ctx.Done():
204206
return nil
205207
case err := <-done:
208+
<-stderrDone
206209
if err != nil && lastStderr != "" {
207210
return fmt.Errorf("%v: %s", err, lastStderr)
208211
}

fvt/rule_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ func (s *RuleTestSuite) TestUpsert() {
172172
err := server.AddListener(tcp)
173173
s.Require().NoError(err)
174174
go func() {
175-
err = server.Serve()
175+
err := server.Serve()
176176
fmt.Println(err)
177177
}()
178178
fmt.Println(tcp.Address())
@@ -270,6 +270,8 @@ func (s *RuleTestSuite) TestUpsert() {
270270
})
271271
s.Run("compare result", func() {
272272
expected := map[string]string{"sim/new1": "{\"b\":2}", "sim/new2": "{\"a\":1}", "sim/old1": "{\"a\":1}", "sim/old2": "{\"b\":2}"}
273+
lock.Lock()
274+
defer lock.Unlock()
273275
s.Require().Equal(expected, result)
274276
})
275277
}

internal/io/http/httpserver/data_server.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,10 @@ type GlobalServerManager struct {
4545
websocketEndpoint map[string]*websocketEndpointContext
4646
}
4747

48-
var manager *GlobalServerManager
48+
var (
49+
manager *GlobalServerManager
50+
managerLock syncx.RWMutex
51+
)
4952

5053
func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
5154
r := mux.NewRouter()
@@ -66,6 +69,7 @@ func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
6669
return true
6770
},
6871
}
72+
managerLock.Lock()
6973
manager = &GlobalServerManager{
7074
websocketEndpoint: map[string]*websocketEndpointContext{},
7175
endpoint: map[string]string{},
@@ -81,20 +85,39 @@ func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
8185
s.ListenAndServeTLS(conf.Config.Source.HttpServerTls.Certfile, conf.Config.Source.HttpServerTls.Keyfile)
8286
}
8387
}(manager)
88+
managerLock.Unlock()
8489
time.Sleep(500 * time.Millisecond)
8590
}
8691

8792
func ShutDown() {
88-
manager.Shutdown()
93+
managerLock.RLock()
94+
if manager != nil {
95+
manager.Shutdown()
96+
}
97+
managerLock.RUnlock()
98+
managerLock.Lock()
8999
manager = nil
100+
managerLock.Unlock()
90101
}
91102

92103
func RegisterEndpoint(endpoint string, method string) (string, error) {
93-
return manager.RegisterEndpoint(endpoint, method)
104+
managerLock.RLock()
105+
m := manager
106+
managerLock.RUnlock()
107+
if m == nil {
108+
return "", fmt.Errorf("http server is not running")
109+
}
110+
return m.RegisterEndpoint(endpoint, method)
94111
}
95112

96113
func UnregisterEndpoint(endpoint, method string) {
97-
manager.UnregisterEndpoint(endpoint, method)
114+
managerLock.RLock()
115+
m := manager
116+
managerLock.RUnlock()
117+
if m == nil {
118+
return
119+
}
120+
m.UnregisterEndpoint(endpoint, method)
98121
}
99122

100123
const (

internal/io/http/httpserver/websocket_server.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,23 @@ type websocketEndpointContext struct {
5353
}
5454

5555
func RegisterWebSocketEndpoint(ctx api.StreamContext, endpoint string) (string, string, error) {
56-
return manager.RegisterWebSocketEndpoint(ctx, endpoint)
56+
managerLock.RLock()
57+
m := manager
58+
managerLock.RUnlock()
59+
if m == nil {
60+
return "", "", fmt.Errorf("http server is not running")
61+
}
62+
return m.RegisterWebSocketEndpoint(ctx, endpoint)
5763
}
5864

5965
func UnRegisterWebSocketEndpoint(endpoint string) {
60-
wctx := manager.UnRegisterWebSocketEndpoint(endpoint)
66+
managerLock.RLock()
67+
m := manager
68+
managerLock.RUnlock()
69+
if m == nil {
70+
return
71+
}
72+
wctx := m.UnRegisterWebSocketEndpoint(endpoint)
6173
if wctx != nil {
6274
// wait all process exit
6375
wctx.wg.Wait()
@@ -144,7 +156,10 @@ func (m *GlobalServerManager) RegisterWebSocketEndpoint(ctx api.StreamContext, e
144156
conf.Log.Infof("websocket endpint %v create connection", endpoint)
145157
}
146158
m.router.HandleFunc(endpoint, func(w http.ResponseWriter, r *http.Request) {
147-
if h, ok := m.routes[endpoint]; ok {
159+
m.RLock()
160+
h, ok := m.routes[endpoint]
161+
m.RUnlock()
162+
if ok {
148163
h(w, r)
149164
} else {
150165
w.WriteHeader(http.StatusNotFound)

internal/io/memory/memory_test.go

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,8 @@ func TestSharedInmemoryNode(t *testing.T) {
7070
}
7171
mockclock.GetMockClock().Add(100)
7272
go func() {
73-
err = snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}})
74-
if err != nil {
75-
t.Error(err)
73+
if gerr := snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}}); gerr != nil {
74+
t.Error(gerr)
7675
}
7776
}()
7877
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
@@ -130,9 +129,8 @@ func TestUpdateListInmemoryNode(t *testing.T) {
130129
}
131130
mockclock.GetMockClock().Add(100)
132131
go func() {
133-
err = snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}})
134-
if err != nil {
135-
t.Error(err)
132+
if gerr := snk.CollectList(ctx, &xsql.TransformedTupleList{Content: []api.MessageTuple{rawTuple}}); gerr != nil {
133+
t.Error(gerr)
136134
}
137135
}()
138136
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {
@@ -194,9 +192,8 @@ func TestUpdateInmemoryNode(t *testing.T) {
194192
}
195193
mockclock.GetMockClock().Add(100)
196194
go func() {
197-
err = snk.Collect(ctx, rawTuple)
198-
if err != nil {
199-
t.Error(err)
195+
if gerr := snk.Collect(ctx, rawTuple); gerr != nil {
196+
t.Error(gerr)
200197
}
201198
}()
202199
err = src.Subscribe(ctx, func(ctx api.StreamContext, res any, meta map[string]any, ts time.Time) {

internal/io/memory/pubsub/manager.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ func ProduceError(ctx api.StreamContext, topic string, err error) {
128128
}
129129

130130
func doProduce(ctx api.StreamContext, topic string, data any) {
131+
mu.RLock()
132+
defer mu.RUnlock()
131133
c, exists := pubTopics[topic]
132134
if !exists {
133135
return
134136
}
135137
logger := ctx.GetLogger()
136-
mu.RLock()
137-
defer mu.RUnlock()
138138
// broadcast to all consumers
139139
for name, out := range c.consumers {
140140
select {

internal/io/memory/store/db.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ func (db *database) dropTable(topic string, key string) error {
9595
defer db.Unlock()
9696
if tc, ok := db.tables[tableId]; ok {
9797
if tc.Decrease() == 0 {
98-
if tc.t != nil && tc.t.cancel != nil {
99-
tc.t.cancel()
98+
if tc.t != nil {
99+
tc.t.callCancel()
100100
}
101101
delete(db.tables, tableId)
102102
}
@@ -136,6 +136,20 @@ func (t *Table) delete(key interface{}) {
136136
delete(t.datamap, key)
137137
}
138138

139+
func (t *Table) setCancel(cancel context.CancelFunc) {
140+
t.Lock()
141+
defer t.Unlock()
142+
t.cancel = cancel
143+
}
144+
145+
func (t *Table) callCancel() {
146+
t.Lock()
147+
defer t.Unlock()
148+
if t.cancel != nil {
149+
t.cancel()
150+
}
151+
}
152+
139153
func (t *Table) Read(keys []string, values []interface{}) ([]pubsub.MemTuple, error) {
140154
t.RLock()
141155
defer t.RUnlock()

internal/io/memory/store/store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func runTable(topic string, topicRegex *regexp.Regexp, t *Table) {
4141
conf.Log.Infof("runTable %s", topic)
4242
ch := pubsub.CreateSub(topic, topicRegex, fmt.Sprintf("store_%s", topic), 1024)
4343
ctx, cancel := context.WithCancel(context.Background())
44-
t.cancel = cancel
44+
t.setCancel(cancel)
4545
for {
4646
select {
4747
case v := <-ch:

internal/io/mqtt/conn.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,11 @@ import (
2828
"github.com/lf-edge/ekuiper/v2/pkg/errorx"
2929
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
3030
"github.com/lf-edge/ekuiper/v2/pkg/modules"
31+
"github.com/lf-edge/ekuiper/v2/pkg/syncx"
3132
)
3233

3334
type Connection struct {
35+
mu syncx.Mutex
3436
client.Client
3537
id string
3638
server string
@@ -110,15 +112,20 @@ func (conn *Connection) Status(_ api.StreamContext) modules.ConnectionStatus {
110112
func (conn *Connection) SetStatusChangeHandler(ctx api.StreamContext, sch api.StatusChangeHandler) {
111113
st := conn.status.Load().(modules.ConnectionStatus)
112114
sch(st.Status, st.ErrMsg)
115+
conn.mu.Lock()
113116
conn.scHandler = sch
117+
conn.mu.Unlock()
114118
ctx.GetLogger().Infof("trigger status change handler")
115119
}
116120

117121
func (conn *Connection) onConnect(ctx api.StreamContext) {
118122
conn.connected.Store(true)
119123
conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionConnected})
120-
if conn.scHandler != nil {
121-
conn.scHandler(api.ConnectionConnected, "")
124+
conn.mu.Lock()
125+
handler := conn.scHandler
126+
conn.mu.Unlock()
127+
if handler != nil {
128+
handler(api.ConnectionConnected, "")
122129
} else {
123130
ctx.GetLogger().Warnf("sc handler has not set yet")
124131
}
@@ -137,16 +144,22 @@ func (conn *Connection) onConnect(ctx api.StreamContext) {
137144
func (conn *Connection) onConnectLost(ctx api.StreamContext, err error) {
138145
conn.connected.Store(false)
139146
conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionDisconnected, ErrMsg: err.Error()})
140-
if conn.scHandler != nil {
141-
conn.scHandler(api.ConnectionDisconnected, err.Error())
147+
conn.mu.Lock()
148+
handler := conn.scHandler
149+
conn.mu.Unlock()
150+
if handler != nil {
151+
handler(api.ConnectionDisconnected, err.Error())
142152
}
143153
ctx.GetLogger().Infof("%v", err)
144154
}
145155

146156
func (conn *Connection) onReconnecting(ctx api.StreamContext) {
147157
conn.status.Store(modules.ConnectionStatus{Status: api.ConnectionConnecting})
148-
if conn.scHandler != nil {
149-
conn.scHandler(api.ConnectionConnecting, "")
158+
conn.mu.Lock()
159+
handler := conn.scHandler
160+
conn.mu.Unlock()
161+
if handler != nil {
162+
handler(api.ConnectionConnecting, "")
150163
}
151164
ctx.GetLogger().Debugf("Reconnecting to mqtt broker")
152165
}

0 commit comments

Comments
 (0)