Skip to content

Commit 97203bb

Browse files
authored
fix(io): fix sql sink build sql (#3913)
Signed-off-by: Song Gao <[email protected]>
1 parent 8cab48e commit 97203bb

File tree

2 files changed

+42
-44
lines changed

2 files changed

+42
-44
lines changed

extensions/impl/sql/sink.go

Lines changed: 35 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -56,47 +56,35 @@ type sqlSinkConfig struct {
5656
KeyField string `json:"keyField"`
5757
}
5858

59-
func (c *sqlSinkConfig) buildInsertSql(ctx api.StreamContext, mapData map[string]interface{}) ([]string, string, error) {
60-
keys, vals, err := c.getKeyValues(ctx, mapData)
59+
func (c *sqlSinkConfig) buildInsertSql(ctx api.StreamContext, mapData map[string]interface{}, keys []string) (string, error) {
60+
vals, err := c.getValuesByKeys(ctx, mapData, keys)
6161
if err != nil {
62-
return keys, "", err
62+
return "", err
6363
}
6464
sqlStr := "(" + strings.Join(vals, ",") + ")"
65-
return keys, sqlStr, nil
65+
return sqlStr, nil
6666
}
6767

68-
func (c *sqlSinkConfig) getKeyValues(ctx api.StreamContext, mapData map[string]interface{}) ([]string, []string, error) {
68+
func (c *sqlSinkConfig) getValuesByKeys(ctx api.StreamContext, mapData map[string]interface{}, keys []string) ([]string, error) {
6969
if 0 == len(mapData) {
70-
return nil, nil, fmt.Errorf("data is empty.")
70+
return nil, fmt.Errorf("data is empty.")
7171
}
72+
var vals []string
7273
logger := ctx.GetLogger()
73-
var keys, vals []string
74-
75-
if len(c.Fields) != 0 {
76-
for _, k := range c.Fields {
77-
keys = append(keys, k)
78-
if v, ok := mapData[k]; ok && v != nil {
79-
if reflect.String == reflect.TypeOf(v).Kind() {
80-
vals = append(vals, fmt.Sprintf("'%v'", v))
81-
} else {
82-
vals = append(vals, fmt.Sprintf(`%v`, v))
83-
}
84-
} else {
85-
logger.Warn("not found field:", k)
86-
vals = append(vals, fmt.Sprintf(`NULL`))
87-
}
88-
}
89-
} else {
90-
for k, v := range mapData {
91-
keys = append(keys, k)
74+
for _, k := range keys {
75+
v, ok := mapData[k]
76+
if ok && v != nil {
9277
if reflect.String == reflect.TypeOf(v).Kind() {
9378
vals = append(vals, fmt.Sprintf("'%v'", v))
9479
} else {
9580
vals = append(vals, fmt.Sprintf(`%v`, v))
9681
}
82+
} else {
83+
logger.Warn("not found field:", k)
84+
vals = append(vals, fmt.Sprintf(`NULL`))
9785
}
9886
}
99-
return keys, vals, nil
87+
return vals, nil
10088
}
10189

10290
func (s *SQLSinkConnector) Ping(ctx api.StreamContext, props map[string]any) error {
@@ -177,10 +165,10 @@ func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple)
177165

178166
func (s *SQLSinkConnector) collect(ctx api.StreamContext, item map[string]any) (err error) {
179167
if len(s.config.RowKindField) < 1 {
180-
var keys []string = nil
168+
keys := s.extractKeys(item)
181169
var values []string = nil
182170
var vars string
183-
keys, vars, err = s.config.buildInsertSql(ctx, item)
171+
vars, err = s.config.buildInsertSql(ctx, item, keys)
184172
if err != nil {
185173
return err
186174
}
@@ -205,12 +193,15 @@ func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageT
205193
}
206194

207195
func (s *SQLSinkConnector) collectList(ctx api.StreamContext, items []map[string]any) (err error) {
208-
var keys []string = nil
196+
if len(items) < 1 {
197+
return nil
198+
}
199+
keys := s.extractKeys(items[0])
209200
var values []string = nil
210201
var vars string
211202
if len(s.config.RowKindField) < 1 {
212203
for _, mapData := range items {
213-
keys, vars, err = s.config.buildInsertSql(ctx, mapData)
204+
vars, err = s.config.buildInsertSql(ctx, mapData, keys)
214205
if err != nil {
215206
return err
216207
}
@@ -244,10 +235,11 @@ func (s *SQLSinkConnector) save(ctx api.StreamContext, table string, data map[st
244235
return fmt.Errorf("invalid rowkind %s", rowkind)
245236
}
246237
}
238+
keys := s.extractKeys(data)
247239
var sqlStr string
248240
switch rowkind {
249241
case ast.RowkindInsert:
250-
keys, vars, err := s.config.buildInsertSql(ctx, data)
242+
vars, err := s.config.buildInsertSql(ctx, data, keys)
251243
if err != nil {
252244
return err
253245
}
@@ -260,7 +252,7 @@ func (s *SQLSinkConnector) save(ctx api.StreamContext, table string, data map[st
260252
if !ok {
261253
return fmt.Errorf("field %s does not exist in data %v", s.config.KeyField, data)
262254
}
263-
keys, vals, err := s.config.getKeyValues(ctx, data)
255+
vals, err := s.config.getValuesByKeys(ctx, data, keys)
264256
if err != nil {
265257
return err
266258
}
@@ -320,6 +312,17 @@ func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error
320312
return nil
321313
}
322314

315+
func (s *SQLSinkConnector) extractKeys(item map[string]any) []string {
316+
if len(s.config.Fields) > 0 {
317+
return s.config.Fields
318+
}
319+
keys := make([]string, 0, len(item))
320+
for k := range item {
321+
keys = append(keys, k)
322+
}
323+
return keys
324+
}
325+
323326
func buildInsertSQL(table string, keys []string, values []string) string {
324327
sql := fmt.Sprintf("INSERT INTO %s (%s) values ", table, strings.Join(keys, ",")) + strings.Join(values, ",") + ";"
325328
return sql

extensions/impl/sql/sink_test.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -197,34 +197,29 @@ func TestSQLSinkConfigKV(t *testing.T) {
197197
config := &sqlSinkConfig{
198198
Fields: []string{"a"},
199199
}
200-
_, _, err := config.getKeyValues(ctx, nil)
200+
_, err := config.getValuesByKeys(ctx, nil, config.Fields)
201201
require.Error(t, err)
202202

203-
keys, values, err := config.getKeyValues(ctx, map[string]interface{}{
203+
values, err := config.getValuesByKeys(ctx, map[string]interface{}{
204204
"a": "value",
205-
})
205+
}, config.Fields)
206206
require.NoError(t, err)
207-
require.Equal(t, []string{"a"}, keys)
208207
require.Equal(t, []string{"'value'"}, values)
209208

210209
config = &sqlSinkConfig{
211210
Fields: []string{"a"},
212211
}
213-
keys, values, err = config.getKeyValues(ctx, map[string]interface{}{
212+
values, err = config.getValuesByKeys(ctx, map[string]interface{}{
214213
"b": "value",
215-
})
214+
}, config.Fields)
216215
require.NoError(t, err)
217-
require.NoError(t, err)
218-
require.Equal(t, []string{"a"}, keys)
219216
require.Equal(t, []string{"NULL"}, values)
220217

221218
config = &sqlSinkConfig{}
222-
keys, values, err = config.getKeyValues(ctx, map[string]interface{}{
219+
values, err = config.getValuesByKeys(ctx, map[string]interface{}{
223220
"a": "value",
224-
})
225-
require.NoError(t, err)
221+
}, []string{"a"})
226222
require.NoError(t, err)
227-
require.Equal(t, []string{"a"}, keys)
228223
require.Equal(t, []string{"'value'"}, values)
229224
}
230225

0 commit comments

Comments
 (0)