Skip to content

Commit a7a7d3c

Browse files
committed
cdc: add iceberg dual-write sink path and IT coverage
1 parent 86eefe0 commit a7a7d3c

File tree

11 files changed

+1087
-5
lines changed

11 files changed

+1087
-5
lines changed

.github/workflows/integration_test_iceberg.yaml

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,31 @@ jobs:
2929
# Only run CI when PR is not draft (workflow_dispatch doesn't have pull_request payload).
3030
if: github.event_name == 'workflow_dispatch' || github.event.pull_request.draft == false
3131
runs-on: ubuntu-latest
32+
services:
33+
kafka:
34+
image: bitnami/kafka:3.7
35+
ports:
36+
- 9092:9092
37+
env:
38+
KAFKA_ENABLE_KRAFT: "yes"
39+
KAFKA_CFG_NODE_ID: "0"
40+
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
41+
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
42+
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093"
43+
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://127.0.0.1:9092"
44+
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT"
45+
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
46+
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@127.0.0.1:9093"
47+
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
48+
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: "1"
49+
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: "1"
50+
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
51+
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
52+
ALLOW_PLAINTEXT_LISTENER: "yes"
3253
strategy:
3354
fail-fast: false
3455
matrix:
35-
group: [G00, G01]
56+
group: [G00, G01, G02]
3657
name: Iceberg Light IT ${{ matrix.group }}
3758
steps:
3859
- name: Check out code
@@ -55,6 +76,7 @@ jobs:
5576
sudo apt-get update
5677
sudo apt-get install -y --no-install-recommends \
5778
curl \
79+
kcat \
5880
lsof \
5981
mariadb-client \
6082
psmisc \
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"strings"
19+
20+
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
21+
"github.com/pingcap/log"
22+
"github.com/pingcap/ticdc/pkg/errors"
23+
"go.uber.org/zap"
24+
"golang.org/x/sync/errgroup"
25+
)
26+
27+
type consumer struct {
28+
client *kafka.Consumer
29+
writer *writer
30+
}
31+
32+
func newConsumer(ctx context.Context, o *option) *consumer {
33+
configMap := &kafka.ConfigMap{
34+
"bootstrap.servers": strings.Join(o.address, ","),
35+
"group.id": o.groupID,
36+
"auto.offset.reset": "earliest",
37+
"enable.auto.offset.store": false,
38+
"enable.auto.commit": false,
39+
}
40+
if len(o.ca) != 0 {
41+
_ = configMap.SetKey("security.protocol", "SSL")
42+
_ = configMap.SetKey("ssl.ca.location", o.ca)
43+
_ = configMap.SetKey("ssl.key.location", o.key)
44+
_ = configMap.SetKey("ssl.certificate.location", o.cert)
45+
}
46+
client, err := kafka.NewConsumer(configMap)
47+
if err != nil {
48+
log.Panic("create kafka consumer failed", zap.Error(err))
49+
}
50+
51+
topics := strings.Split(o.topic, ",")
52+
err = client.SubscribeTopics(topics, nil)
53+
if err != nil {
54+
log.Panic("subscribe topics failed", zap.Strings("topics", topics), zap.Error(err))
55+
}
56+
return &consumer{
57+
writer: newWriter(ctx, o),
58+
client: client,
59+
}
60+
}
61+
62+
func (c *consumer) readMessage(ctx context.Context) error {
63+
defer func() {
64+
if err := c.client.Close(); err != nil {
65+
log.Warn("close kafka consumer failed", zap.Error(err))
66+
}
67+
}()
68+
for {
69+
select {
70+
case <-ctx.Done():
71+
log.Info("consumer exist: context cancelled")
72+
return errors.Trace(ctx.Err())
73+
default:
74+
}
75+
msg, err := c.client.ReadMessage(-1)
76+
if err != nil {
77+
log.Error("read message failed, just continue to retry", zap.Error(err))
78+
continue
79+
}
80+
needCommit := c.writer.WriteMessage(ctx, msg)
81+
if !needCommit {
82+
continue
83+
}
84+
topicPartition, err := c.client.CommitMessage(msg)
85+
if err != nil {
86+
log.Error("commit message failed, just continue",
87+
zap.String("topic", *msg.TopicPartition.Topic), zap.Int32("partition", msg.TopicPartition.Partition),
88+
zap.Any("offset", msg.TopicPartition.Offset), zap.Error(err))
89+
continue
90+
}
91+
log.Debug("commit message success",
92+
zap.String("topic", topicPartition[0].String()), zap.Int32("partition", topicPartition[0].Partition),
93+
zap.Any("offset", topicPartition[0].Offset))
94+
}
95+
}
96+
97+
func (c *consumer) Run(ctx context.Context) error {
98+
g, ctx := errgroup.WithContext(ctx)
99+
g.Go(func() error {
100+
return c.writer.run(ctx)
101+
})
102+
g.Go(func() error {
103+
return c.readMessage(ctx)
104+
})
105+
return g.Wait()
106+
}

cmd/kafka-iceberg-consumer/main.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright 2025 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package main
15+
16+
import (
17+
"context"
18+
"flag"
19+
"fmt"
20+
"net/http"
21+
_ "net/http/pprof"
22+
"os"
23+
"os/signal"
24+
"runtime/debug"
25+
"syscall"
26+
27+
"github.com/google/uuid"
28+
"github.com/pingcap/log"
29+
"github.com/pingcap/ticdc/pkg/logger"
30+
"github.com/pingcap/ticdc/pkg/version"
31+
"go.uber.org/zap"
32+
"golang.org/x/sync/errgroup"
33+
)
34+
35+
var (
36+
logPath string
37+
logLevel string
38+
)
39+
40+
func main() {
41+
debug.SetMemoryLimit(14 * 1024 * 1024 * 1024)
42+
var (
43+
upstreamURIStr string
44+
configFile string
45+
enableProfiling bool
46+
)
47+
groupID := fmt.Sprintf("ticdc_kafka_iceberg_consumer_%s", uuid.New().String())
48+
consumerOption := newOption()
49+
50+
flag.StringVar(&configFile, "config", "", "config file for changefeed")
51+
flag.StringVar(&upstreamURIStr, "upstream-uri", "", "Kafka uri")
52+
flag.StringVar(&logPath, "log-file", "cdc_kafka_iceberg_consumer.log", "log file path")
53+
flag.StringVar(&logLevel, "log-level", "info", "log level")
54+
flag.BoolVar(&enableProfiling, "enable-profiling", false, "enable pprof profiling")
55+
56+
flag.StringVar(&consumerOption.icebergWarehouse, "iceberg-warehouse", "", "Iceberg warehouse URI (s3://bucket/warehouse)")
57+
flag.StringVar(&consumerOption.icebergCatalog, "iceberg-catalog", "glue", "Iceberg catalog type (hadoop, glue)")
58+
flag.StringVar(&consumerOption.icebergNamespace, "iceberg-namespace", "", "Iceberg namespace")
59+
flag.StringVar(&consumerOption.icebergDatabase, "iceberg-database", "", "Iceberg database name")
60+
flag.StringVar(&consumerOption.icebergRegion, "iceberg-region", "us-east-1", "AWS region for Glue catalog")
61+
flag.StringVar(&consumerOption.icebergMode, "iceberg-mode", "append", "Iceberg mode (append, upsert)")
62+
flag.StringVar(&consumerOption.groupID, "consumer-group-id", groupID, "consumer group id")
63+
flag.StringVar(&consumerOption.timezone, "tz", "System", "Specify time zone of Kafka consumer")
64+
flag.StringVar(&consumerOption.ca, "ca", "", "CA certificate path for Kafka SSL connection")
65+
flag.StringVar(&consumerOption.cert, "cert", "", "Certificate path for Kafka SSL connection")
66+
flag.StringVar(&consumerOption.key, "key", "", "Private key path for Kafka SSL connection")
67+
flag.Parse()
68+
69+
err := logger.InitLogger(&logger.Config{
70+
Level: logLevel,
71+
File: logPath,
72+
})
73+
if err != nil {
74+
log.Panic("init logger failed", zap.Error(err))
75+
}
76+
version.LogVersionInfo("kafka iceberg consumer")
77+
78+
consumerOption.Adjust(upstreamURIStr, configFile)
79+
80+
ctx, cancel := context.WithCancel(context.Background())
81+
g, ctx := errgroup.WithContext(ctx)
82+
83+
if enableProfiling {
84+
g.Go(func() error {
85+
return http.ListenAndServe(":6060", nil)
86+
})
87+
}
88+
89+
cons := newConsumer(ctx, consumerOption)
90+
g.Go(func() error {
91+
return cons.Run(ctx)
92+
})
93+
94+
g.Go(func() error {
95+
sigterm := make(chan os.Signal, 1)
96+
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
97+
select {
98+
case <-ctx.Done():
99+
log.Info("terminating: context cancelled")
100+
case <-sigterm:
101+
log.Info("terminating: via signal")
102+
}
103+
cancel()
104+
return nil
105+
})
106+
err = g.Wait()
107+
if err != nil {
108+
log.Error("kafka iceberg consumer exited with error", zap.Error(err))
109+
} else {
110+
log.Info("kafka iceberg consumer exited")
111+
}
112+
}

0 commit comments

Comments
 (0)