Skip to content

Commit 6d77e1d

Browse files
committed
fix: resolve data races in WindowOperator and GlobalServerManager
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 53e50ad commit 6d77e1d

File tree

3 files changed

+23
-4
lines changed

3 files changed

+23
-4
lines changed

internal/io/http/httpserver/data_server.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ type GlobalServerManager struct {
4646
}
4747

4848
var manager *GlobalServerManager
49+
var managerLock sync.RWMutex
4950

5051
func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
5152
r := mux.NewRouter()
@@ -66,6 +67,7 @@ func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
6667
return true
6768
},
6869
}
70+
managerLock.Lock()
6971
manager = &GlobalServerManager{
7072
websocketEndpoint: map[string]*websocketEndpointContext{},
7173
endpoint: map[string]string{},
@@ -81,19 +83,36 @@ func InitGlobalServerManager(ip string, port int, tlsConf *model.TlsConf) {
8183
s.ListenAndServeTLS(conf.Config.Source.HttpServerTls.Certfile, conf.Config.Source.HttpServerTls.Keyfile)
8284
}
8385
}(manager)
86+
managerLock.Unlock()
8487
time.Sleep(500 * time.Millisecond)
8588
}
8689

8790
func ShutDown() {
88-
manager.Shutdown()
91+
managerLock.RLock()
92+
if manager != nil {
93+
manager.Shutdown()
94+
}
95+
managerLock.RUnlock()
96+
managerLock.Lock()
8997
manager = nil
98+
managerLock.Unlock()
9099
}
91100

92101
func RegisterEndpoint(endpoint string, method string) (string, error) {
102+
managerLock.RLock()
103+
defer managerLock.RUnlock()
104+
if manager == nil {
105+
return "", fmt.Errorf("http server is not running")
106+
}
93107
return manager.RegisterEndpoint(endpoint, method)
94108
}
95109

96110
func UnregisterEndpoint(endpoint, method string) {
111+
managerLock.RLock()
112+
defer managerLock.RUnlock()
113+
if manager == nil {
114+
return
115+
}
97116
manager.UnregisterEndpoint(endpoint, method)
98117
}
99118

internal/topo/node/node_test/window_inc_agg_op_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -525,7 +525,7 @@ func TestIncAggSlidingWindowDelay(t *testing.T) {
525525
}
526526

527527
func waitExecute() {
528-
time.Sleep(50 * time.Millisecond)
528+
time.Sleep(100 * time.Millisecond)
529529
}
530530

531531
func TestIncHoppingWindow(t *testing.T) {

internal/topo/node/window_op.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,7 @@ func (o *WindowOperator) handleInputs(ctx api.StreamContext, inputs []xsql.Event
640640
// For hopping and sliding window, firstly check if the beginning tuples are expired and discard them
641641
if o.isOverlapWindow && !allDiscarded {
642642
if left.After(tuple.GetTimestamp()) {
643-
log.Debugf("tuple %x emitted at %d expired", tuple, tuple.GetTimestamp().UnixMilli())
643+
log.Debugf("tuple at %d expired", tuple.GetTimestamp().UnixMilli())
644644
// Expired tuple, remove it by not adding back to inputs
645645
continue
646646
}
@@ -727,7 +727,7 @@ func (o *WindowOperator) scan(inputs []xsql.EventRow, triggerTime time.Time, ctx
727727
results.WindowRange = xsql.NewWindowRange(windowStart, windowEnd.UnixMilli(), triggerTime.Add(-o.window.Delay).UnixMilli())
728728
}
729729
log.Debugf("window %s triggered for %d tuples", o.name, len(inputs))
730-
log.Debugf("Sent: %v", results)
730+
731731
o.Broadcast(results)
732732
o.onSend(ctx, results)
733733

0 commit comments

Comments
 (0)