Skip to content

Commit b56cb62

Browse files
committed
fix(schema): merger schemaId parse
Fix by refactor merger getter, together with converter getter. Extract schema id handler function. Signed-off-by: Jiyong Huang <huangjy@emqx.io>
1 parent 5bb9350 commit b56cb62

File tree

6 files changed

+59
-32
lines changed

6 files changed

+59
-32
lines changed

internal/converter/converter.go

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -61,37 +61,62 @@ func GetOrCreateConverter(ctx api.StreamContext, format string, schemaId string,
6161
t = message.FormatJson
6262
}
6363
if cp, ok := modules.Converters[t]; ok {
64-
schemaType, hasSchema := modules.ConverterSchemas[t]
65-
if hasSchema {
66-
schemaFileId := ""
67-
if schemaId != "" {
68-
r := strings.SplitN(schemaId, ".", 2)
69-
schemaFileId = r[0]
70-
if len(r) == 2 {
71-
props["$$messageName"] = r[1]
72-
}
73-
}
74-
ffs, err := schema.GetSchemaFile(schemaType, schemaFileId)
75-
if err != nil {
76-
return nil, err
77-
}
78-
return cp(ctx, ffs.SchemaFile, schemaFields, props)
79-
} else {
80-
return cp(ctx, schemaId, schemaFields, props)
64+
schemaPath, err := transSchemaId(t, schemaId, props)
65+
if err != nil {
66+
return nil, err
8167
}
68+
return cp(ctx, schemaPath, schemaFields, props)
8269
}
8370
return nil, fmt.Errorf("format type %s not supported", t)
8471
}
8572

8673
func GetConvertWriter(ctx api.StreamContext, format string, schemaId string, schema map[string]*ast.JsonStreamField, props map[string]any) (message.ConvertWriter, error) {
8774
t := strings.ToLower(format)
75+
schemaPath, err := transSchemaId(t, schemaId, map[string]any{})
76+
if err != nil {
77+
return nil, err
78+
}
8879
if cw, ok := modules.ConvertWriters[t]; ok {
89-
return cw(ctx, schemaId, schema, props)
80+
return cw(ctx, schemaPath, schema, props)
9081
}
91-
c, err := GetOrCreateConverter(ctx, t, schemaId, schema, props)
82+
c, err := GetOrCreateConverter(ctx, t, schemaPath, schema, props)
9283
if err != nil {
9384
return nil, err
9485
}
9586
ctx.GetLogger().Infof("writer %s not found, fall back to stack writer", t)
9687
return NewStackWriter(ctx, c)
9788
}
89+
90+
func GetMerger(ctx api.StreamContext, format string, schemaId string, schemaFields map[string]*ast.JsonStreamField) (modules.Merger, error) {
91+
t := strings.ToLower(format)
92+
if mp, ok := modules.Mergers[t]; ok {
93+
schemaPath, err := transSchemaId(t, schemaId, map[string]any{})
94+
if err != nil {
95+
return nil, err
96+
}
97+
return mp(ctx, schemaPath, schemaFields)
98+
} else {
99+
return nil, fmt.Errorf("merger %s not found", t)
100+
}
101+
}
102+
103+
func transSchemaId(t, schemaId string, props map[string]any) (string, error) {
104+
schemaType, requireSchema := modules.ConverterSchemas[t]
105+
if requireSchema {
106+
schemaFileId := ""
107+
if schemaId != "" {
108+
r := strings.SplitN(schemaId, ".", 2)
109+
schemaFileId = r[0]
110+
if len(r) == 2 {
111+
props["$$messageName"] = r[1]
112+
}
113+
}
114+
ffs, err := schema.GetSchemaFile(schemaType, schemaFileId)
115+
if err != nil {
116+
return "", err
117+
}
118+
return ffs.SchemaFile, nil
119+
} else { // If not require schema, just return the schemaId. And the register function need to deal with it by itself. Only the specific implementation defines the schemaId format.
120+
return schemaId, nil
121+
}
122+
}

internal/converter/ext_mock_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestGetConverter(t *testing.T) {
6060
require.NoError(t, err)
6161
ctx := mockContext.NewMockContext("test", "op1")
6262
_, err = GetOrCreateConverter(ctx, "mock", "a.b", nil, map[string]any{})
63-
require.NoError(t, err)
63+
require.EqualError(t, err, "schema type protobuf id a not found in registry")
6464
_, err = GetOrCreateConverter(ctx, "mock", "a.b.c.d", nil, map[string]any{})
65-
require.NoError(t, err)
65+
require.EqualError(t, err, "schema type protobuf id a not found in registry")
6666
}

internal/schema/registry.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ func GetSchemaFile(schemaType string, name string) (*modules.Files, error) {
248248
return nil, fmt.Errorf("schema type %s not found in registry", schemaType)
249249
}
250250
if _, ok := registry.schemas[schemaType][name]; !ok {
251-
// If schema id is not defined, just return as is
252-
return &modules.Files{SchemaFile: name}, nil
251+
return nil, fmt.Errorf("schema type %s id %s not found in registry", schemaType, name)
253252
}
254253
schemaFile := registry.schemas[schemaType][name]
255254
return schemaFile, nil

internal/topo/node/rate_limit.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -94,15 +94,11 @@ func NewRateLimitOp(ctx api.StreamContext, name string, rOpt *def.RuleOption, sc
9494
}
9595
o.mergeStrategy = 1
9696
} else if c.Merger != "" {
97-
if mp, ok := modules.Mergers[c.Merger]; ok {
98-
cm, err := mp(ctx, c.PayloadSchemaId, schema)
99-
if err != nil {
100-
return nil, fmt.Errorf("fail to initiate merge %s: %v", c.Merger, err)
101-
}
102-
o.merger = cm
103-
} else {
104-
return nil, fmt.Errorf("merger %s not found", c.Merger)
97+
cm, err := converter.GetMerger(ctx, c.Merger, c.PayloadSchemaId, schema)
98+
if err != nil {
99+
return nil, fmt.Errorf("fail to initiate merge %s: %v", c.Merger, err)
105100
}
101+
o.merger = cm
106102
o.mergeStrategy = 2
107103
}
108104
return o, nil

internal/topo/node/rate_limit_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/stretchr/testify/require"
2727

2828
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
29+
"github.com/lf-edge/ekuiper/v2/internal/schema"
2930
"github.com/lf-edge/ekuiper/v2/internal/topo/topotest/mockclock"
3031
"github.com/lf-edge/ekuiper/v2/internal/xsql"
3132
"github.com/lf-edge/ekuiper/v2/pkg/ast"
@@ -54,7 +55,7 @@ func TestNewRateLimit(t *testing.T) {
5455
assert.EqualError(t, err, "format delimited does not support partial decode")
5556
_, err = NewRateLimitOp(ctx, "test", &def.RuleOption{BufferLength: 10, SendError: true}, nil, map[string]any{"interval": "1s", "merger": "none", "format": "delimited"})
5657
assert.Error(t, err)
57-
assert.EqualError(t, err, "merger none not found")
58+
assert.EqualError(t, err, "fail to initiate merge none: merger none not found")
5859
modules.RegisterMerger("none", func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField) (modules.Merger, error) {
5960
return nil, errors.New("mock error")
6061
})
@@ -237,6 +238,7 @@ func TestRateLimitMerge(t *testing.T) {
237238
}
238239

239240
func TestRateLimitCustomMerge(t *testing.T) {
241+
schema.InitRegistry()
240242
testcases := []struct {
241243
name string
242244
sendCount int
@@ -291,6 +293,7 @@ func TestRateLimitCustomMerge(t *testing.T) {
291293
modules.RegisterMerger("mock", func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField) (modules.Merger, error) {
292294
return &message.MockMerger{}, nil
293295
})
296+
delete(modules.ConverterSchemas, "mock")
294297
mc := mockclock.GetMockClock()
295298
for _, tc := range testcases {
296299
t.Run(tc.name, func(t *testing.T) {

internal/topo/planner/planner_source_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/lf-edge/ekuiper/v2/internal/meta"
3030
"github.com/lf-edge/ekuiper/v2/internal/pkg/def"
3131
"github.com/lf-edge/ekuiper/v2/internal/pkg/store"
32+
"github.com/lf-edge/ekuiper/v2/internal/schema"
3233
"github.com/lf-edge/ekuiper/v2/internal/xsql"
3334
"github.com/lf-edge/ekuiper/v2/pkg/ast"
3435
"github.com/lf-edge/ekuiper/v2/pkg/message"
@@ -42,12 +43,15 @@ func TestPlanTopo(t *testing.T) {
4243
t.Error(err)
4344
return
4445
}
46+
schema.InitRegistry()
4547
modules.RegisterConverter("mockp", func(ctx api.StreamContext, _ string, _ map[string]*ast.JsonStreamField, _ map[string]any) (message.Converter, error) {
4648
return &message.MockPartialConverter{}, nil
4749
})
4850
modules.RegisterMerger("mock", func(ctx api.StreamContext, schemaId string, logicalSchema map[string]*ast.JsonStreamField) (modules.Merger, error) {
4951
return &message.MockMerger{}, nil
5052
})
53+
delete(modules.ConverterSchemas, "mock")
54+
delete(modules.ConverterSchemas, "mockp")
5155
streamSqls := map[string]string{
5256
"src1": `CREATE STREAM src1 () WITH (DATASOURCE="src1", FORMAT="json", TYPE="mqtt");`,
5357
"src2": `CREATE STREAM src2 () WITH (DATASOURCE="src1", FORMAT="json", TYPE="mqtt", SHARED="true");`,

0 commit comments

Comments
 (0)