Skip to content

Commit 135a804

Browse files
committed
fix(compose): pass through when field mapping input already matches target type
buildFieldMappingConverter and buildStreamFieldMappingConverter panic when the incoming value is not a map[string]any, but in some workflow topologies the upstream node can emit a value that is already the converter's target type I (or a stream of I). Treat that case as a pass-through instead of a panic, mirroring the fix suggested in #957 and applying it to the stream variant as well. Fixes #957
1 parent d25ced3 commit 135a804

2 files changed

Lines changed: 151 additions & 0 deletions

File tree

compose/field_mapping.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ func buildFieldMappingConverter[I any]() func(input any) (any, error) {
213213
return func(input any) (any, error) {
214214
in, ok := input.(map[string]any)
215215
if !ok {
216+
if reflect.TypeOf(input) == generic.TypeOf[I]() {
217+
return input, nil
218+
}
216219
panic(newUnexpectedInputTypeErr(reflect.TypeOf(map[string]any{}), reflect.TypeOf(input)))
217220
}
218221

@@ -224,6 +227,9 @@ func buildStreamFieldMappingConverter[I any]() func(input streamReader) streamRe
224227
return func(input streamReader) streamReader {
225228
s, ok := unpackStreamReader[map[string]any](input)
226229
if !ok {
230+
if sr, ok := unpackStreamReader[I](input); ok {
231+
return packStreamReader(sr)
232+
}
227233
panic("mappingStreamAssign incoming streamReader chunk type not map[string]any")
228234
}
229235

compose/field_mapping_test.go

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Copyright 2025 CloudWeGo Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package compose
18+
19+
import (
20+
"io"
21+
"testing"
22+
23+
"github.com/cloudwego/eino/schema"
24+
)
25+
26+
type fieldMappingTarget struct {
27+
Name string `json:"name"`
28+
Age int `json:"age"`
29+
}
30+
31+
func TestBuildFieldMappingConverter_PassThroughWhenInputIsTargetType(t *testing.T) {
32+
conv := buildFieldMappingConverter[fieldMappingTarget]()
33+
34+
in := fieldMappingTarget{Name: "jenny", Age: 30}
35+
out, err := conv(in)
36+
if err != nil {
37+
t.Fatalf("unexpected err: %v", err)
38+
}
39+
got, ok := out.(fieldMappingTarget)
40+
if !ok {
41+
t.Fatalf("expected fieldMappingTarget, got %T", out)
42+
}
43+
if got != in {
44+
t.Fatalf("expected %+v, got %+v", in, got)
45+
}
46+
}
47+
48+
func TestBuildFieldMappingConverter_MapInputStillWorks(t *testing.T) {
49+
conv := buildFieldMappingConverter[fieldMappingTarget]()
50+
51+
out, err := conv(map[string]any{"Name": "jenny", "Age": 30})
52+
if err != nil {
53+
t.Fatalf("unexpected err: %v", err)
54+
}
55+
got, ok := out.(fieldMappingTarget)
56+
if !ok {
57+
t.Fatalf("expected fieldMappingTarget, got %T", out)
58+
}
59+
if got.Name != "jenny" || got.Age != 30 {
60+
t.Fatalf("unexpected result %+v", got)
61+
}
62+
}
63+
64+
func TestBuildFieldMappingConverter_UnrelatedTypePanics(t *testing.T) {
65+
conv := buildFieldMappingConverter[fieldMappingTarget]()
66+
67+
defer func() {
68+
if r := recover(); r == nil {
69+
t.Fatal("expected panic on unrelated input type")
70+
}
71+
}()
72+
_, _ = conv(42)
73+
}
74+
75+
func TestBuildStreamFieldMappingConverter_PassThroughWhenInputIsTargetType(t *testing.T) {
76+
conv := buildStreamFieldMappingConverter[fieldMappingTarget]()
77+
78+
items := []fieldMappingTarget{{Name: "a", Age: 1}, {Name: "b", Age: 2}}
79+
src := packStreamReader(schema.StreamReaderFromArray(items))
80+
81+
out := conv(src)
82+
sr, ok := unpackStreamReader[fieldMappingTarget](out)
83+
if !ok {
84+
t.Fatalf("expected stream reader of target type")
85+
}
86+
87+
var got []fieldMappingTarget
88+
for {
89+
v, err := sr.Recv()
90+
if err == io.EOF {
91+
break
92+
}
93+
if err != nil {
94+
t.Fatalf("recv err: %v", err)
95+
}
96+
got = append(got, v)
97+
}
98+
if len(got) != len(items) || got[0] != items[0] || got[1] != items[1] {
99+
t.Fatalf("expected %+v, got %+v", items, got)
100+
}
101+
}
102+
103+
func TestBuildStreamFieldMappingConverter_MapInputStillWorks(t *testing.T) {
104+
conv := buildStreamFieldMappingConverter[fieldMappingTarget]()
105+
106+
items := []map[string]any{
107+
{"Name": "a", "Age": 1},
108+
{"Name": "b", "Age": 2},
109+
}
110+
src := packStreamReader(schema.StreamReaderFromArray(items))
111+
112+
out := conv(src)
113+
sr, ok := unpackStreamReader[fieldMappingTarget](out)
114+
if !ok {
115+
t.Fatalf("expected stream reader of target type")
116+
}
117+
118+
var got []fieldMappingTarget
119+
for {
120+
v, err := sr.Recv()
121+
if err == io.EOF {
122+
break
123+
}
124+
if err != nil {
125+
t.Fatalf("recv err: %v", err)
126+
}
127+
got = append(got, v)
128+
}
129+
if len(got) != 2 || got[0].Name != "a" || got[1].Name != "b" {
130+
t.Fatalf("unexpected got %+v", got)
131+
}
132+
}
133+
134+
func TestBuildStreamFieldMappingConverter_UnrelatedTypePanics(t *testing.T) {
135+
conv := buildStreamFieldMappingConverter[fieldMappingTarget]()
136+
137+
src := packStreamReader(schema.StreamReaderFromArray([]int{1, 2, 3}))
138+
139+
defer func() {
140+
if r := recover(); r == nil {
141+
t.Fatal("expected panic on unrelated stream chunk type")
142+
}
143+
}()
144+
_ = conv(src)
145+
}

0 commit comments

Comments
 (0)