Skip to content

Commit f09af7c

Browse files
authored
feat(sql): support multiple stream join table (#3866)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 601e3c5 commit f09af7c

File tree

9 files changed

+457
-16
lines changed

9 files changed

+457
-16
lines changed

fvt/rule_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,3 +567,86 @@ func (s *RuleTestSuite) assertSchemaEquality(expected, actual map[string]any) {
567567
// 5. Compare the sorted slices to ensure the nested values are identical.
568568
s.Equal(expectedValues, actualValues, "The set of nested schema values should be identical")
569569
}
570+
571+
func (s *RuleTestSuite) TestJoinWithLookup() {
572+
client.DeleteRule("ruleSim11")
573+
client.DeleteStream("sim11")
574+
client.DeleteStream("sim12")
575+
client.DeleteTables("sim13")
576+
topic := "test11"
577+
subCh := pubsub.CreateSub(topic, nil, topic, 1024)
578+
defer pubsub.CloseSourceConsumerChannel(topic, topic)
579+
data1 := []map[string]any{
580+
{
581+
"k": "v1",
582+
},
583+
}
584+
conf1 := map[string]any{
585+
"data": data1,
586+
"interval": "1ms",
587+
"loop": false,
588+
}
589+
resp1, err := client.CreateConf("sources/simulator/confKeys/sim11", conf1)
590+
s.Require().NoError(err)
591+
s.Require().Equal(http.StatusOK, resp1.StatusCode)
592+
streamSql1 := `{"sql": "create stream sim11() WITH (TYPE=\"simulator\", CONF_KEY=\"sim11\")"}`
593+
resp1, err = client.CreateStream(streamSql1)
594+
s.Require().NoError(err)
595+
s.T().Log(GetResponseText(resp1))
596+
s.Require().Equal(http.StatusCreated, resp1.StatusCode)
597+
data2 := []map[string]any{
598+
{
599+
"k": "v1",
600+
},
601+
}
602+
conf2 := map[string]any{
603+
"data": data2,
604+
"interval": "1ms",
605+
"loop": false,
606+
}
607+
resp2, err := client.CreateConf("sources/simulator/confKeys/sim12", conf2)
608+
s.Require().NoError(err)
609+
s.Require().Equal(http.StatusOK, resp2.StatusCode)
610+
streamSql2 := `{"sql": "create stream sim12() WITH (TYPE=\"simulator\", CONF_KEY=\"sim12\")"}`
611+
resp2, err = client.CreateStream(streamSql2)
612+
s.Require().NoError(err)
613+
s.T().Log(GetResponseText(resp2))
614+
s.Require().Equal(http.StatusCreated, resp2.StatusCode)
615+
616+
data3 := []map[string]any{
617+
{
618+
"k": "v1",
619+
},
620+
}
621+
conf3 := map[string]any{
622+
"data": data3,
623+
}
624+
resp3, err := client.CreateConf("sources/simulator/confKeys/sim13", conf3)
625+
s.Require().NoError(err)
626+
s.Require().Equal(http.StatusOK, resp3.StatusCode)
627+
tableSql := `{"sql": "create table sim13() WITH (TYPE=\"simulator\", CONF_KEY=\"sim13\", KIND=\"lookup\")"}`
628+
resp3, err = client.CreateStream(tableSql)
629+
s.Require().NoError(err)
630+
s.T().Log(GetResponseText(resp3))
631+
s.Require().Equal(http.StatusCreated, resp3.StatusCode)
632+
633+
ruleSql := `{
634+
"id": "ruleSim11",
635+
"sql": "SELECT sim11.k as k1, sim12.k as k2, sim13.k as k3 from sim11 inner join sim12 on sim11.k = sim12.k inner join sim13 on sim13.k = sim11.k group by countwindow(2)",
636+
"actions": [
637+
{
638+
"memory": {
639+
"topic": "test11"
640+
}
641+
}
642+
],
643+
"options": {
644+
645+
}
646+
}`
647+
resp, err := client.CreateRule(ruleSql)
648+
s.Require().NoError(err)
649+
s.T().Log(GetResponseText(resp))
650+
s.Require().Equal(http.StatusCreated, resp.StatusCode)
651+
s.assertRecvMemTuple(subCh, []map[string]any{{"k1": "v1", "k2": "v1", "k3": "v1"}})
652+
}

fvt/sdk.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ func (sdk *SDK) CreateStream(streamJson string) (resp *http.Response, err error)
9393
return http.Post(sdk.baseUrl.JoinPath("streams").String(), ContentTypeJson, bytes.NewBufferString(streamJson))
9494
}
9595

96+
func (sdk *SDK) CreateTable(tableJson string) (resp *http.Response, err error) {
97+
return http.Post(sdk.baseUrl.JoinPath("tables").String(), ContentTypeJson, bytes.NewBufferString(tableJson))
98+
}
99+
96100
func (sdk *SDK) DeleteStream(name string) (resp *http.Response, err error) {
97101
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("streams", name).String(), nil)
98102
if err != nil {
@@ -102,6 +106,15 @@ func (sdk *SDK) DeleteStream(name string) (resp *http.Response, err error) {
102106
return sdk.httpClient.Do(req)
103107
}
104108

109+
func (sdk *SDK) DeleteTables(name string) (resp *http.Response, err error) {
110+
req, err := http.NewRequest(http.MethodDelete, sdk.baseUrl.JoinPath("tables", name).String(), nil)
111+
if err != nil {
112+
fmt.Println(err)
113+
return
114+
}
115+
return sdk.httpClient.Do(req)
116+
}
117+
105118
func (sdk *SDK) GetStreamSchema(name string) (map[string]any, error) {
106119
resp, err := http.Get(sdk.baseUrl.JoinPath("streams", name, "schema").String())
107120
if err != nil {

internal/binder/io/builtin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ func init() {
5656

5757
modules.RegisterLookupSource("memory", memory.GetLookupSource)
5858
modules.RegisterLookupSource("httppull", http.GetLookUpSource)
59+
modules.RegisterLookupSource("simulator", func() api.Source { return &simulator.SimulatorLookupSource{} })
5960

6061
modules.RegisterConnection("mqtt", mqtt.CreateConnection)
6162
modules.RegisterConnection("nng", nng.CreateConnection)
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2025 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package simulator
16+
17+
import (
18+
"github.com/lf-edge/ekuiper/contract/v2/api"
19+
20+
"github.com/lf-edge/ekuiper/v2/pkg/cast"
21+
)
22+
23+
type SimulatorLookupSource struct {
24+
cfg *sLookupConfig
25+
}
26+
27+
func (s *SimulatorLookupSource) Provision(ctx api.StreamContext, configs map[string]any) error {
28+
cfg := &sLookupConfig{
29+
Data: make([]map[string]any, 0),
30+
}
31+
if err := cast.MapToStruct(configs, cfg); err != nil {
32+
return err
33+
}
34+
s.cfg = cfg
35+
return nil
36+
}
37+
38+
func (s *SimulatorLookupSource) Close(ctx api.StreamContext) error {
39+
return nil
40+
}
41+
42+
func (s *SimulatorLookupSource) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) error {
43+
return nil
44+
}
45+
46+
func (s *SimulatorLookupSource) Lookup(ctx api.StreamContext, lookupFields []string, cmpKeys []string, cmpValues []any) ([]map[string]any, error) {
47+
res := make([]map[string]any, 0)
48+
for _, d := range s.cfg.Data {
49+
for index, key := range cmpKeys {
50+
value, ok := d[key]
51+
if ok && value == cmpValues[index] {
52+
selectedRow := make(map[string]any)
53+
for _, field := range lookupFields {
54+
selectedRow[field] = d[field]
55+
}
56+
res = append(res, selectedRow)
57+
}
58+
}
59+
}
60+
return res, nil
61+
}
62+
63+
type sLookupConfig struct {
64+
Data []map[string]any `json:"data"`
65+
}
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
// Copyright 2025 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package simulator
16+
17+
import (
18+
"testing"
19+
20+
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
22+
23+
mockContext "github.com/lf-edge/ekuiper/v2/pkg/mock/context"
24+
)
25+
26+
func TestSimulatorLookupSource_Lookup(t *testing.T) {
27+
tests := []struct {
28+
name string
29+
data []map[string]any
30+
lookupFields []string
31+
cmpKeys []string
32+
cmpValues []any
33+
expected []map[string]any
34+
}{
35+
{
36+
name: "single match",
37+
data: []map[string]any{
38+
{"id": 1, "name": "Alice", "age": 25},
39+
{"id": 2, "name": "Bob", "age": 30},
40+
{"id": 3, "name": "Charlie", "age": 35},
41+
},
42+
lookupFields: []string{"id", "name"},
43+
cmpKeys: []string{"id"},
44+
cmpValues: []any{2},
45+
expected: []map[string]any{
46+
{"id": 2, "name": "Bob"},
47+
},
48+
},
49+
{
50+
name: "multiple matches",
51+
data: []map[string]any{
52+
{"id": 1, "name": "Alice", "age": 25},
53+
{"id": 2, "name": "Bob", "age": 30},
54+
{"id": 1, "name": "Alice", "age": 28},
55+
{"id": 3, "name": "Charlie", "age": 35},
56+
},
57+
lookupFields: []string{"id", "name"},
58+
cmpKeys: []string{"id"},
59+
cmpValues: []any{1},
60+
expected: []map[string]any{
61+
{"id": 1, "name": "Alice"},
62+
{"id": 1, "name": "Alice"},
63+
},
64+
},
65+
{
66+
name: "no match",
67+
data: []map[string]any{
68+
{"id": 1, "name": "Alice", "age": 25},
69+
{"id": 2, "name": "Bob", "age": 30},
70+
{"id": 3, "name": "Charlie", "age": 35},
71+
},
72+
lookupFields: []string{"id", "name"},
73+
cmpKeys: []string{"id"},
74+
cmpValues: []any{99},
75+
expected: []map[string]any{},
76+
},
77+
{
78+
name: "string value match",
79+
data: []map[string]any{
80+
{"id": 1, "name": "Alice", "age": 25},
81+
{"id": 2, "name": "Bob", "age": 30},
82+
{"id": 3, "name": "Charlie", "age": 35},
83+
},
84+
lookupFields: []string{"id", "name"},
85+
cmpKeys: []string{"name"},
86+
cmpValues: []any{"Bob"},
87+
expected: []map[string]any{
88+
{"id": 2, "name": "Bob"},
89+
},
90+
},
91+
{
92+
name: "multiple lookup fields",
93+
data: []map[string]any{
94+
{"id": 1, "name": "Alice", "age": 25, "city": "New York"},
95+
{"id": 2, "name": "Bob", "age": 30, "city": "London"},
96+
{"id": 3, "name": "Charlie", "age": 35, "city": "Paris"},
97+
},
98+
lookupFields: []string{"id", "name", "age"},
99+
cmpKeys: []string{"name"},
100+
cmpValues: []any{"Bob"},
101+
expected: []map[string]any{
102+
{"id": 2, "name": "Bob", "age": 30},
103+
},
104+
},
105+
{
106+
name: "empty data",
107+
data: []map[string]any{},
108+
lookupFields: []string{"id", "name"},
109+
cmpKeys: []string{"id"},
110+
cmpValues: []any{1},
111+
expected: []map[string]any{},
112+
},
113+
}
114+
115+
for _, tt := range tests {
116+
t.Run(tt.name, func(t *testing.T) {
117+
// Create SimulatorLookupSource instance
118+
source := &SimulatorLookupSource{}
119+
120+
// Create mock context
121+
ctx := mockContext.NewMockContext("test", "Test")
122+
123+
// Provision the source with test data
124+
configs := map[string]any{
125+
"data": tt.data,
126+
}
127+
err := source.Provision(ctx, configs)
128+
require.NoError(t, err)
129+
130+
// Connect the source
131+
err = source.Connect(ctx, func(status string, message string) {
132+
// do nothing
133+
})
134+
require.NoError(t, err)
135+
136+
// Perform lookup
137+
result, err := source.Lookup(ctx, tt.lookupFields, tt.cmpKeys, tt.cmpValues)
138+
139+
// Verify results
140+
assert.NoError(t, err)
141+
assert.Equal(t, tt.expected, result)
142+
143+
// Close the source
144+
err = source.Close(ctx)
145+
require.NoError(t, err)
146+
})
147+
}
148+
}
149+
150+
func TestSimulatorLookupSource_Provision(t *testing.T) {
151+
tests := []struct {
152+
name string
153+
configs map[string]any
154+
wantErr bool
155+
errMsg string
156+
}{
157+
{
158+
name: "valid config",
159+
configs: map[string]any{
160+
"data": []map[string]any{
161+
{"id": 1, "name": "Alice"},
162+
{"id": 2, "name": "Bob"},
163+
},
164+
},
165+
wantErr: false,
166+
},
167+
{
168+
name: "empty data config",
169+
configs: map[string]any{
170+
"data": []map[string]any{},
171+
},
172+
wantErr: false,
173+
},
174+
{
175+
name: "nil data config",
176+
configs: map[string]any{
177+
"data": nil,
178+
},
179+
wantErr: false,
180+
},
181+
}
182+
183+
for _, tt := range tests {
184+
t.Run(tt.name, func(t *testing.T) {
185+
source := &SimulatorLookupSource{}
186+
ctx := mockContext.NewMockContext("test", "Test")
187+
188+
err := source.Provision(ctx, tt.configs)
189+
190+
if tt.wantErr {
191+
assert.Error(t, err)
192+
if tt.errMsg != "" {
193+
assert.Contains(t, err.Error(), tt.errMsg)
194+
}
195+
} else {
196+
assert.NoError(t, err)
197+
assert.NotNil(t, source.cfg)
198+
}
199+
})
200+
}
201+
}

0 commit comments

Comments
 (0)