Skip to content

Commit e3bc8f4

Browse files
committed
fix(schema): resolve deadlock between registry and shared layer
Signed-off-by: Jiyong Huang <[email protected]>
1 parent b1fb48f commit e3bc8f4

File tree

2 files changed

+95
-2
lines changed

2 files changed

+95
-2
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2026 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package schema
16+
17+
import (
18+
"sync"
19+
"testing"
20+
"time"
21+
22+
"github.com/lf-edge/ekuiper/v2/pkg/ast"
23+
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
24+
)
25+
26+
func TestSchemaRegistryDeadlock(t *testing.T) {
27+
// 1. Setup Global Store
28+
// GlobalSchemaStore is initialized by init(), but we should clean it to be safe
29+
GlobalSchemaStore.Lock()
30+
GlobalSchemaStore.streamMap = make(map[string]SchemaContainer)
31+
GlobalSchemaStore.schemaMap = make(map[string]map[string]map[string]*ast.JsonStreamField)
32+
GlobalSchemaStore.Unlock()
33+
34+
streamName := "deadlock_stream_" + t.Name()
35+
36+
// 2. Create SharedLayer
37+
// GetStream creates it if missing, and acquires Global Lock
38+
c := GetStream(streamName)
39+
s := c.(*SharedLayer)
40+
41+
// Create channels to signal start
42+
startChan := make(chan struct{})
43+
var wg sync.WaitGroup
44+
wg.Add(2)
45+
46+
// Goroutine 1: Simulate "Plan" phase calling GetStreamSchemaIndex
47+
// Route: GlobalSchemaStore.RLock -> SharedLayer.RLock
48+
go func() {
49+
defer wg.Done()
50+
<-startChan
51+
52+
for i := 0; i < 5000; i++ {
53+
// This used to deadlock: Global RLock -> Shared RLock
54+
GetStreamSchemaIndex(streamName)
55+
}
56+
}()
57+
58+
// Goroutine 2: Simulate "Open" calling Attach
59+
// Route: SharedLayer.Lock -> updateReg -> AddRuleSchema -> GlobalSchemaStore.Lock
60+
go func() {
61+
defer wg.Done()
62+
63+
// Pre-register rule schema so Attach has something to update
64+
ruleID := "rule_deadlock"
65+
s.RegSchema(ruleID, "datasource", nil, false)
66+
67+
// Use standard mock context
68+
ctx := mockContext.NewMockContext(ruleID, "op1")
69+
70+
<-startChan
71+
for i := 0; i < 5000; i++ {
72+
// This used to deadlock: Shared Lock -> Global Lock
73+
s.Attach(ctx)
74+
}
75+
}()
76+
77+
// Run test
78+
close(startChan)
79+
80+
// Wait with timeout
81+
done := make(chan struct{})
82+
go func() {
83+
wg.Wait()
84+
close(done)
85+
}()
86+
87+
select {
88+
case <-done:
89+
t.Log("Test finished successfully - No Deadlock")
90+
case <-time.After(10 * time.Second):
91+
t.Fatal("Test Timeout! Deadlock Detected!")
92+
}
93+
}

internal/topo/schema/reg.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,8 @@ func GetStream(name string) SchemaContainer {
107107

108108
func GetStreamSchema(name string) (map[string]*ast.JsonStreamField, error) {
109109
GlobalSchemaStore.RLock()
110-
defer GlobalSchemaStore.RUnlock()
111110
c, ok := GlobalSchemaStore.streamMap[name]
111+
GlobalSchemaStore.RUnlock()
112112
if !ok {
113113
return nil, nil
114114
}
@@ -117,8 +117,8 @@ func GetStreamSchema(name string) (map[string]*ast.JsonStreamField, error) {
117117

118118
func GetStreamSchemaIndex(streamName string) map[string]int {
119119
GlobalSchemaStore.RLock()
120-
defer GlobalSchemaStore.RUnlock()
121120
c, ok := GlobalSchemaStore.streamMap[streamName]
121+
GlobalSchemaStore.RUnlock()
122122
if !ok {
123123
return nil
124124
}

0 commit comments

Comments
 (0)