Skip to content

Commit 5dbd9f7

Browse files
committed
sink,storage-consumer: fix iceberg storage replay
Signed-off-by: tenfyzhong <tenfy@tenfy.cn>
1 parent b013be1 commit 5dbd9f7

File tree

10 files changed

+471
-12
lines changed

10 files changed

+471
-12
lines changed

cmd/storage-consumer/consumer.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"net/url"
2121
"sort"
2222
"strings"
23+
"sync"
2324
"time"
2425

2526
"github.com/pingcap/errors"
@@ -421,6 +422,60 @@ func (c *consumer) flushDMLEvents(ctx context.Context, tableID int64) error {
421422
}
422423
}
423424

425+
func (c *consumer) flushIcebergDMLEvents(ctx context.Context, tableID int64) error {
426+
group := c.eventsGroup[tableID]
427+
if group == nil {
428+
return nil
429+
}
430+
events := group.GetAllEvents()
431+
total := len(events)
432+
if total == 0 {
433+
return nil
434+
}
435+
var (
436+
schema string
437+
table string
438+
)
439+
if events[0].TableInfo != nil {
440+
schema = events[0].TableInfo.GetSchemaName()
441+
table = events[0].TableInfo.GetTableName()
442+
}
443+
444+
start := time.Now()
445+
for i, e := range events {
446+
done := make(chan struct{})
447+
var once sync.Once
448+
e.AddPostFlushFunc(func() {
449+
once.Do(func() {
450+
close(done)
451+
})
452+
})
453+
c.sink.AddDMLEvent(e)
454+
455+
ticker := time.NewTicker(defaultLogInterval)
456+
for {
457+
select {
458+
case <-ctx.Done():
459+
ticker.Stop()
460+
return context.Cause(ctx)
461+
case <-done:
462+
ticker.Stop()
463+
goto NEXT
464+
case <-ticker.C:
465+
log.Warn("iceberg DML event cannot be flushed in time",
466+
zap.Int("total", total), zap.Int("flushed", i),
467+
zap.Uint64("commitTs", e.CommitTs), zap.String("schema", schema), zap.String("table", table))
468+
}
469+
}
470+
NEXT:
471+
}
472+
473+
log.Info("flush iceberg DML events done",
474+
zap.String("schema", schema), zap.String("table", table), zap.Int64("tableID", tableID),
475+
zap.Int("total", total), zap.Duration("duration", time.Since(start)))
476+
return nil
477+
}
478+
424479
func (c *consumer) parseDMLFilePath(ctx context.Context, path string) error {
425480
var dmlkey cloudstorage.DmlPathKey
426481
dispatcherID, err := dmlkey.ParseIndexFilePath(

cmd/storage-consumer/iceberg.go

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func buildIcebergDDLEvents(
6464
fmt.Sprintf(
6565
"CREATE TABLE IF NOT EXISTS %s (%s)",
6666
commonType.QuoteSchema(curr.SchemaName, curr.TableName),
67-
strings.Join(icebergColumnSQLDefs(curr.Columns), ","),
67+
strings.Join(icebergCreateTableSQLDefs(curr.Columns), ","),
6868
),
6969
tableInfo,
7070
model.ActionCreateTable,
@@ -243,6 +243,32 @@ func icebergColumnSQLDefs(columns []sinkiceberg.Column) []string {
243243
return defs
244244
}
245245

246+
func icebergCreateTableSQLDefs(columns []sinkiceberg.Column) []string {
247+
defs := icebergColumnSQLDefs(columns)
248+
if primaryKey := icebergPrimaryKeySQLDef(columns); primaryKey != "" {
249+
defs = append(defs, primaryKey)
250+
}
251+
return defs
252+
}
253+
254+
func icebergPrimaryKeySQLDef(columns []sinkiceberg.Column) string {
255+
primaryKeyColumns := make([]string, 0, len(columns))
256+
for _, col := range columns {
257+
if !icebergColumnIsPrimaryKey(col) {
258+
continue
259+
}
260+
primaryKeyColumns = append(primaryKeyColumns, commonType.QuoteName(col.Name))
261+
}
262+
if len(primaryKeyColumns) == 0 {
263+
return ""
264+
}
265+
return fmt.Sprintf("PRIMARY KEY (%s)", strings.Join(primaryKeyColumns, ","))
266+
}
267+
268+
func icebergColumnIsPrimaryKey(col sinkiceberg.Column) bool {
269+
return col.OriginalTableCol != nil && strings.EqualFold(strings.TrimSpace(col.OriginalTableCol.IsPK), "true")
270+
}
271+
246272
func icebergColumnSQLDef(col sinkiceberg.Column) string {
247273
return fmt.Sprintf("%s %s %s",
248274
commonType.QuoteName(col.Name),
@@ -259,6 +285,10 @@ func icebergColumnNullability(required bool) string {
259285
}
260286

261287
func icebergColumnSQLType(col sinkiceberg.Column) string {
288+
if sqlType, ok := originalIcebergColumnSQLType(col); ok {
289+
return sqlType
290+
}
291+
262292
switch strings.TrimSpace(col.Type) {
263293
case "int":
264294
return "INT"
@@ -283,6 +313,10 @@ func icebergColumnSQLType(col sinkiceberg.Column) string {
283313
}
284314

285315
func icebergColumnToTableCol(col sinkiceberg.Column) (cloudstorage.TableCol, error) {
316+
if original := cloneOriginalIcebergTableCol(col); original != nil {
317+
return *original, nil
318+
}
319+
286320
result := cloudstorage.TableCol{
287321
Name: col.Name,
288322
}
@@ -318,6 +352,65 @@ func icebergColumnToTableCol(col sinkiceberg.Column) (cloudstorage.TableCol, err
318352
return result, nil
319353
}
320354

355+
func originalIcebergColumnSQLType(col sinkiceberg.Column) (string, bool) {
356+
if col.OriginalTableCol == nil {
357+
return "", false
358+
}
359+
360+
switch strings.ToUpper(strings.TrimSpace(col.OriginalTableCol.Tp)) {
361+
case "CHAR", "VARCHAR", "VAR_STRING",
362+
"BINARY", "VARBINARY",
363+
"TINYTEXT", "TEXT", "MEDIUMTEXT", "LONGTEXT",
364+
"TINYBLOB", "BLOB", "MEDIUMBLOB", "LONGBLOB",
365+
"ENUM", "SET":
366+
return formatTableColSQLType(*cloneOriginalIcebergTableCol(col)), true
367+
default:
368+
return "", false
369+
}
370+
}
371+
372+
func cloneOriginalIcebergTableCol(col sinkiceberg.Column) *cloudstorage.TableCol {
373+
if col.OriginalTableCol == nil {
374+
return nil
375+
}
376+
377+
cloned := *col.OriginalTableCol
378+
cloned.Name = col.Name
379+
if col.Required {
380+
cloned.Nullable = "false"
381+
} else {
382+
cloned.Nullable = ""
383+
}
384+
return &cloned
385+
}
386+
387+
func formatTableColSQLType(col cloudstorage.TableCol) string {
388+
tp := strings.ToUpper(strings.TrimSpace(col.Tp))
389+
baseType := strings.TrimSuffix(tp, " UNSIGNED")
390+
391+
switch baseType {
392+
case "CHAR", "VARCHAR", "VAR_STRING", "BINARY", "VARBINARY", "BIT":
393+
if col.Precision != "" {
394+
return fmt.Sprintf("%s(%s)", tp, col.Precision)
395+
}
396+
case "ENUM", "SET":
397+
if len(col.Elems) > 0 {
398+
return fmt.Sprintf("%s(%s)", tp, quoteSQLStringList(col.Elems))
399+
}
400+
}
401+
return tp
402+
}
403+
404+
func quoteSQLStringList(values []string) string {
405+
quoted := make([]string, 0, len(values))
406+
for _, value := range values {
407+
escaped := strings.ReplaceAll(value, "\\", "\\\\")
408+
escaped = strings.ReplaceAll(escaped, "'", "''")
409+
quoted = append(quoted, fmt.Sprintf("'%s'", escaped))
410+
}
411+
return strings.Join(quoted, ",")
412+
}
413+
321414
func parseIcebergDecimalType(raw string) (int, int, bool) {
322415
s := strings.TrimSpace(raw)
323416
if !strings.HasPrefix(s, "decimal(") || !strings.HasSuffix(s, ")") {
@@ -514,7 +607,7 @@ func (c *consumer) handleIceberg(ctx context.Context, round uint64) error {
514607
}
515608
}
516609

517-
if err := c.flushDMLEvents(ctx, tableID); err != nil {
610+
if err := c.flushIcebergDMLEvents(ctx, tableID); err != nil {
518611
return err
519612
}
520613
state.lastMetadataVersion = metadataVersion
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright 2026 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"testing"
19+
20+
"github.com/pingcap/ticdc/pkg/common"
21+
sinkiceberg "github.com/pingcap/ticdc/pkg/sink/iceberg"
22+
"github.com/pingcap/ticdc/pkg/util"
23+
"github.com/pingcap/tidb/pkg/meta/model"
24+
"github.com/pingcap/tidb/pkg/parser/ast"
25+
"github.com/pingcap/tidb/pkg/parser/charset"
26+
"github.com/pingcap/tidb/pkg/parser/mysql"
27+
"github.com/pingcap/tidb/pkg/types"
28+
"github.com/stretchr/testify/require"
29+
)
30+
31+
func TestBuildIcebergDDLEventsPreservesOriginalStringTypes(t *testing.T) {
32+
ctx := context.Background()
33+
34+
tmpDir := t.TempDir()
35+
extStorage, warehouseURL, err := util.GetTestExtStorage(ctx, tmpDir)
36+
require.NoError(t, err)
37+
defer extStorage.Close()
38+
39+
cfg := sinkiceberg.NewConfig()
40+
cfg.WarehouseURI = warehouseURL.String()
41+
cfg.WarehouseLocation = warehouseURL.String()
42+
cfg.Namespace = "ns"
43+
44+
tableWriter := sinkiceberg.NewTableWriter(cfg, extStorage)
45+
changefeedID := common.NewChangefeedID4Test("default", "cf")
46+
47+
textType := types.NewFieldType(mysql.TypeBlob)
48+
textType.SetCharset(charset.CharsetUTF8MB4)
49+
50+
varcharType := types.NewFieldType(mysql.TypeVarchar)
51+
varcharType.SetCharset(charset.CharsetUTF8MB4)
52+
varcharType.SetFlen(32)
53+
54+
idType := types.NewFieldType(mysql.TypeLonglong)
55+
idType.AddFlag(mysql.PriKeyFlag | mysql.NotNullFlag)
56+
57+
tableInfoV1 := common.WrapTableInfo("test", &model.TableInfo{
58+
ID: 20,
59+
Name: ast.NewCIStr("t_iceberg"),
60+
PKIsHandle: true,
61+
Columns: []*model.ColumnInfo{
62+
{ID: 1, Name: ast.NewCIStr("id"), FieldType: *idType},
63+
{ID: 2, Name: ast.NewCIStr("v"), FieldType: *textType},
64+
{ID: 3, Name: ast.NewCIStr("score"), FieldType: *types.NewFieldType(mysql.TypeLong)},
65+
},
66+
})
67+
require.NoError(t, tableWriter.EnsureTable(ctx, changefeedID, tableInfoV1))
68+
69+
tableInfoV2 := common.WrapTableInfo("test", &model.TableInfo{
70+
ID: 20,
71+
Name: ast.NewCIStr("t_iceberg"),
72+
PKIsHandle: true,
73+
Columns: []*model.ColumnInfo{
74+
{ID: 1, Name: ast.NewCIStr("id"), FieldType: *idType},
75+
{ID: 2, Name: ast.NewCIStr("v"), FieldType: *textType},
76+
{ID: 3, Name: ast.NewCIStr("score"), FieldType: *types.NewFieldType(mysql.TypeLong)},
77+
{ID: 4, Name: ast.NewCIStr("extra"), FieldType: *varcharType},
78+
},
79+
})
80+
require.NoError(t, tableWriter.EnsureTable(ctx, changefeedID, tableInfoV2))
81+
82+
version1, err := sinkiceberg.LoadTableVersion(ctx, cfg, extStorage, "test", "t_iceberg", 1)
83+
require.NoError(t, err)
84+
version2, err := sinkiceberg.LoadTableVersion(ctx, cfg, extStorage, "test", "t_iceberg", 2)
85+
require.NoError(t, err)
86+
87+
createEvents, err := buildIcebergDDLEvents(nil, version1)
88+
require.NoError(t, err)
89+
require.Len(t, createEvents, 2)
90+
require.Contains(t, createEvents[1].Query, "`v` TEXT NULL")
91+
require.NotContains(t, createEvents[1].Query, "`v` LONGBLOB NULL")
92+
require.Contains(t, createEvents[1].Query, "PRIMARY KEY (`id`)")
93+
94+
alterEvents, err := buildIcebergDDLEvents(version1, version2)
95+
require.NoError(t, err)
96+
require.Len(t, alterEvents, 1)
97+
require.Equal(t,
98+
"ALTER TABLE `test`.`t_iceberg` ADD COLUMN `extra` VARCHAR(32) NULL",
99+
alterEvents[0].Query,
100+
)
101+
}

0 commit comments

Comments
 (0)