Skip to content

Commit a4c5685

Browse files
authored
fix(planner): mocksource emit name for unnest (#4008)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent 3b7a5f9 commit a4c5685

File tree

2 files changed

+138
-3
lines changed

2 files changed

+138
-3
lines changed

fvt/ruletest_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright 2026 EMQ Technologies Co., Ltd.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package fvt
16+
17+
import (
18+
"bufio"
19+
"context"
20+
"encoding/json"
21+
"fmt"
22+
"net/http"
23+
"strings"
24+
"testing"
25+
"time"
26+
27+
"github.com/stretchr/testify/suite"
28+
)
29+
30+
type RuletestTestSuite struct {
31+
suite.Suite
32+
}
33+
34+
func TestRuletestSuite(t *testing.T) {
35+
suite.Run(t, new(RuletestTestSuite))
36+
}
37+
38+
func (s *RuletestTestSuite) TestRuletestMockSourceUnnestKeepProjectedFields() {
39+
streamName := "demoRuletest5501"
40+
ruleID := "rule_ruletest_5501"
41+
42+
_, _ = client.DeleteStream(streamName)
43+
_, _ = client.Delete(fmt.Sprintf("ruletest/%s", ruleID))
44+
45+
s.T().Cleanup(func() {
46+
_, _ = client.Delete(fmt.Sprintf("ruletest/%s", ruleID))
47+
_, _ = client.DeleteStream(streamName)
48+
})
49+
50+
streamSQL := fmt.Sprintf(`{"sql":"CREATE STREAM %s (id STRING, time STRING, type STRING, data ARRAY(STRUCT(k BIGINT))) WITH (DATASOURCE=\"%s\", FORMAT=\"json\", TYPE=\"mqtt\")"}`, streamName, streamName)
51+
resp, err := client.CreateStream(streamSQL)
52+
s.Require().NoError(err)
53+
s.Require().Equal(http.StatusCreated, resp.StatusCode)
54+
55+
ruleDef := fmt.Sprintf(`{
56+
"id": "%s",
57+
"sql": "SELECT id, time, type, unnest(data) FROM %s",
58+
"mockSource": {
59+
"%s": {
60+
"loop": false,
61+
"data": [
62+
{
63+
"id": "id1",
64+
"time": "2023-05-30T15:23:23.123+08:00",
65+
"type": "1",
66+
"data": [
67+
{"k": 1},
68+
{"k": 2}
69+
]
70+
}
71+
]
72+
}
73+
},
74+
"sinkProps": {
75+
"sendSingle": true
76+
}
77+
}`, ruleID, streamName, streamName)
78+
79+
resp, err = client.Post("ruletest", ruleDef)
80+
s.Require().NoError(err)
81+
s.Require().Equal(http.StatusOK, resp.StatusCode)
82+
83+
result, err := GetResponseResultMap(resp)
84+
s.Require().NoError(err)
85+
s.Require().Equal(ruleID, result["id"])
86+
port, ok := result["port"].(float64)
87+
s.Require().True(ok)
88+
89+
sseURL := fmt.Sprintf("http://127.0.0.1:%d/test/%s", int(port), ruleID)
90+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
91+
defer cancel()
92+
93+
req, err := http.NewRequestWithContext(ctx, http.MethodGet, sseURL, nil)
94+
s.Require().NoError(err)
95+
req.Header.Set("Accept", "text/event-stream")
96+
sseResp, err := http.DefaultClient.Do(req)
97+
s.Require().NoError(err)
98+
s.Require().Equal(http.StatusOK, sseResp.StatusCode)
99+
defer sseResp.Body.Close()
100+
101+
// Start rule after SSE connected to avoid missing data.
102+
resp, err = client.Post(fmt.Sprintf("ruletest/%s/start", ruleID), "any")
103+
s.Require().NoError(err)
104+
s.Require().Equal(http.StatusOK, resp.StatusCode)
105+
106+
scanner := bufio.NewScanner(sseResp.Body)
107+
var got map[string]any
108+
for scanner.Scan() {
109+
line := scanner.Text()
110+
if !strings.HasPrefix(line, "data: ") {
111+
continue
112+
}
113+
payload := strings.TrimPrefix(line, "data: ")
114+
s.T().Log(payload)
115+
s.Require().NoError(json.Unmarshal([]byte(payload), &got))
116+
break
117+
}
118+
s.Require().NoError(scanner.Err())
119+
s.Require().NotEmpty(got)
120+
121+
// Regression guard: in mockSource+unnest ruletest, projected fields should not be dropped.
122+
s.Require().Equal("id1", got["id"])
123+
s.Require().Equal("2023-05-30T15:23:23.123+08:00", got["time"])
124+
s.Require().Equal("1", got["type"])
125+
_, hasK := got["k"]
126+
s.Require().True(hasK)
127+
}

internal/topo/planner/planner_source.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ func transformSourceNode(ctx api.StreamContext, t *DataSourcePlan, mockSourcesPr
3838
if t.streamFields == nil && options.Experiment != nil && options.Experiment.UseSliceTuple {
3939
return nil, nil, 0, errors.New("slice tuple mode does not support wildcard/schemaless")
4040
}
41+
emitterName := t.name
4142
mockProps, isMock := mockSourcesProp[string(t.name)]
4243
if isMock {
4344
t.streamStmt.Options.TYPE = "simulator"
@@ -68,10 +69,10 @@ func transformSourceNode(ctx api.StreamContext, t *DataSourcePlan, mockSourcesPr
6869
return nil, nil, 0, err
6970
}
7071
}
71-
return splitSource(ctx, t, si, options, mockProps, index, ruleId, pp)
72+
return splitSource(ctx, t, si, options, mockProps, index, ruleId, pp, emitterName)
7273
}
7374

74-
func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, options *def.RuleOption, mockProps map[string]any, index int, ruleId string, pp node.UnOperation) (node.DataSourceNode, []node.OperatorNode, int, error) {
75+
func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, options *def.RuleOption, mockProps map[string]any, index int, ruleId string, pp node.UnOperation, emitterName ast.StreamName) (node.DataSourceNode, []node.OperatorNode, int, error) {
7576
// Get all props
7677
props := nodeConf.GetSourceConf(t.streamStmt.Options.TYPE, t.streamStmt.Options)
7778
sp := &SourcePropsForSplit{}
@@ -96,6 +97,7 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
9697
}
9798

9899
var ops []node.OperatorNode
100+
emitterOpAdded := false
99101
// If having unique connection id AND unique sub id for each connection, need to share the sub node; Case 1 is neuron; Case 2 is edgeX
100102
needShareCon := hasConId || (hasSubId && conId != "")
101103
if !needShareCon {
@@ -133,9 +135,10 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
133135
}
134136
index++
135137
// another node to set emitter
136-
op := Transform(&operator.EmitterOp{Emitter: string(t.name)}, fmt.Sprintf("%d_emitter", index), options)
138+
op := Transform(&operator.EmitterOp{Emitter: string(emitterName)}, fmt.Sprintf("%d_emitter", index), options)
137139
index++
138140
ops = append(ops, op)
141+
emitterOpAdded = true
139142
}
140143
if len(t.colAliasMapping) > 0 {
141144
props["colAliasMapping"] = t.colAliasMapping
@@ -208,6 +211,11 @@ func splitSource(ctx api.StreamContext, t *DataSourcePlan, ss api.Source, option
208211
index++
209212
}
210213

214+
if t.name != emitterName && !emitterOpAdded {
215+
ops = append(ops, Transform(&operator.EmitterOp{Emitter: string(emitterName)}, fmt.Sprintf("%d_emitter", index), options))
216+
index++
217+
}
218+
211219
if t.streamStmt.Options.SHARED && !t.inRuleTest {
212220
isSliceRule := options.Experiment != nil && options.Experiment.UseSliceTuple
213221
// Create subtopo in the end to avoid errors in the middle

0 commit comments

Comments
 (0)