Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod,
OutputOldValue: c.Sink.CSVConfig.OutputOldValue,
OutputHandleKey: c.Sink.CSVConfig.OutputHandleKey,
OutputFieldHeader: c.Sink.CSVConfig.OutputFieldHeader,
}
}
var pulsarConfig *config.PulsarConfig
Expand Down Expand Up @@ -599,6 +600,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod,
OutputOldValue: cloned.Sink.CSVConfig.OutputOldValue,
OutputHandleKey: cloned.Sink.CSVConfig.OutputHandleKey,
OutputFieldHeader: cloned.Sink.CSVConfig.OutputFieldHeader,
}
}
var kafkaConfig *KafkaConfig
Expand Down Expand Up @@ -983,6 +985,7 @@ type CSVConfig struct {
BinaryEncodingMethod string `json:"binary_encoding_method"`
OutputOldValue bool `json:"output_old_value"`
OutputHandleKey bool `json:"output_handle_key"`
OutputFieldHeader bool `json:"output_field_header"`
}

// LargeMessageHandleConfig denotes the large message handling config
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
bytesCnt := int64(0)
// There is always only one message here in task.msgs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment // There is always only one message here in task.msgs is an important assumption. While it might be true for the current implementation, it could become a source of bugs if the logic for batching messages into tasks changes in the future. If a task could contain multiple messages, this logic would only write the header for the very first message in the task, potentially missing headers for subsequent messages if they were intended to start new files. Consider adding a more detailed explanation or a link to where this assumption is guaranteed.

for _, msg := range task.msgs {
if msg.Key != nil && rowsCnt == 0 {
buf.Write(msg.Key)
bytesCnt += int64(len(msg.Key))
}
bytesCnt += int64(len(msg.Value))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ const (
"include-commit-ts": true,
"binary-encoding-method":"base64",
"output-old-value": false,
"output-handle-key": false
"output-handle-key": false,
"output-field-header": false
},
"date-separator": "month",
"enable-partition-separator": true,
Expand Down Expand Up @@ -423,7 +424,8 @@ const (
"include-commit-ts": true,
"binary-encoding-method":"base64",
"output-old-value": false,
"output-handle-key": false
"output-handle-key": false,
"output-field-header": false
},
"terminator": "\r\n",
"transaction-atomicity": "",
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ type CSVConfig struct {
OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
// output handle key
OutputHandleKey bool `toml:"output-handle-key" json:"output-handle-key"`
// output field header
OutputFieldHeader bool `toml:"output-field-header" json:"output-field-header"`
}

func (c *CSVConfig) validateAndAdjust() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Config struct {
DebeziumDisableSchema bool
// Debezium only. Whether before value should be included in the output.
DebeziumOutputOldValue bool
// CSV only. Whether header should be included in the output.
CSVOutputFieldHeader bool
}

// EncodingFormatType is the type of encoding format
Expand Down Expand Up @@ -129,6 +131,7 @@ func NewConfig(protocol config.Protocol) *Config {
DebeziumOutputOldValue: true,
OpenOutputOldValue: true,
DebeziumDisableSchema: false,
CSVOutputFieldHeader: false,
}
}

Expand Down Expand Up @@ -233,6 +236,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod
c.OutputOldValue = replicaConfig.Sink.CSVConfig.OutputOldValue
c.OutputHandleKey = replicaConfig.Sink.CSVConfig.OutputHandleKey
c.CSVOutputFieldHeader = replicaConfig.Sink.CSVConfig.OutputFieldHeader
}
if replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.LargeMessageHandle != nil {
c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle
Expand Down
11 changes: 11 additions & 0 deletions pkg/sink/codec/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"math"
"strings"
"unsafe"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/log"
Expand Down Expand Up @@ -378,3 +379,13 @@ func SanitizeTopicName(name string) string {
}
return sanitizedName
}

// UnsafeBytesToString create string from byte slice without copying
func UnsafeBytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

// UnsafeStringToBytes create byte slice from string without copying
func UnsafeStringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(&s))
}
29 changes: 24 additions & 5 deletions pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"io"

"github.com/pingcap/errors"
"github.com/pingcap/log"
lconfig "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"go.uber.org/zap"
)

const defaultIOConcurrency = 1
Expand Down Expand Up @@ -52,11 +54,13 @@ func NewBatchDecoder(ctx context.Context,
backslashEscape = true
}
cfg := &lconfig.CSVConfig{
Separator: codecConfig.Delimiter,
Delimiter: codecConfig.Quote,
Terminator: codecConfig.Terminator,
Null: []string{codecConfig.NullString},
BackslashEscape: backslashEscape,
Separator: codecConfig.Delimiter,
Delimiter: codecConfig.Quote,
Terminator: codecConfig.Terminator,
Null: []string{codecConfig.NullString},
BackslashEscape: backslashEscape,
HeaderSchemaMatch: true,
Header: codecConfig.CSVOutputFieldHeader,
}
csvParser, err := mydump.NewCSVParser(ctx, cfg,
mydump.NewStringReader(string(value)),
Expand All @@ -65,6 +69,21 @@ func NewBatchDecoder(ctx context.Context,
if err != nil {
return nil, err
}
if codecConfig.CSVOutputFieldHeader {
err := csvParser.ReadColumns()
if err != nil {
return nil, err
}
header := csvParser.Columns()
log.Info("parser CSV header", zap.Any("header", header), zap.Any("cap", cap(header)))
// check column name
idx := len(header) - len(tableInfo.Columns)
for i, col := range tableInfo.Columns {
if col.Name.L != header[idx+i] {
log.Panic("check column name order failed", zap.Any("col", col.Name.O), zap.Any("header", header[idx+i]))
}
}
}
return &batchDecoder{
codecConfig: codecConfig,
tableInfo: tableInfo,
Expand Down
25 changes: 24 additions & 1 deletion pkg/sink/codec/csv/csv_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

// BatchEncoder encodes the events into the byte of a batch into.
type BatchEncoder struct {
header []byte
valueBuf *bytes.Buffer
callback func()
batchSize int
Expand All @@ -36,6 +37,9 @@ func (b *BatchEncoder) AppendTxnEvent(
callback func(),
) error {
for _, rowEvent := range e.Rows {
if b.config.CSVOutputFieldHeader && b.batchSize == 0 {
b.setHeader(rowEvent)
}
row, err := rowChangedEvent2CSVMsg(b.config, rowEvent)
if err != nil {
return err
Expand All @@ -53,7 +57,7 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
return nil
}

ret := common.NewMsg(config.ProtocolCsv, nil,
ret := common.NewMsg(config.ProtocolCsv, b.header,
b.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil)
ret.SetRowsCount(b.batchSize)
ret.Callback = b.callback
Expand All @@ -64,10 +68,29 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
}
b.callback = nil
b.batchSize = 0
b.header = nil

return []*common.Message{ret}
}

func (b *BatchEncoder) setHeader(rowEvent *model.RowChangedEvent) {
buf := &bytes.Buffer{}
columns := rowEvent.Columns
if rowEvent.IsDelete() {
columns = rowEvent.PreColumns
}
colNames := make([]string, 0, len(columns))
for _, col := range columns {
if col == nil {
continue
}
info := rowEvent.TableInfo.ForceGetColumnInfo(col.ColumnID)
colNames = append(colNames, info.Name.O)
}
buf.Write(encodeHeader(b.config, colNames))
b.header = buf.Bytes()
}

// newBatchEncoder creates a new csv BatchEncoder.
func newBatchEncoder(config *common.Config) codec.TxnEventEncoder {
return &BatchEncoder{
Expand Down
53 changes: 53 additions & 0 deletions pkg/sink/codec/csv/csv_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package csv

import (
"strings"
"testing"

"github.com/pingcap/tiflow/cdc/entry"
Expand Down Expand Up @@ -102,3 +103,55 @@ func TestCSVAppendRowChangedEventWithCallback(t *testing.T) {
msgs[0].Callback()
require.Equal(t, 1, count, "expected all callbacks to be called")
}

func TestCSVBatchCodecWithHeader(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

ddl := helper.DDL2Event("create table test.table1(col1 int primary key)")
event1 := helper.DML2Event("insert into test.table1 values (1)", "test", "table1")
event2 := helper.DML2Event("insert into test.table1 values (2)", "test", "table1")

cs := &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{
event1,
event2,
},
}
cfg := &common.Config{
Delimiter: ",",
Quote: "\"",
Terminator: "\n",
NullString: "\\N",
IncludeCommitTs: true,
CSVOutputFieldHeader: true,
}
encoder := newBatchEncoder(cfg)
err := encoder.AppendTxnEvent(cs, nil)
require.Nil(t, err)
messages := encoder.Build()
require.Len(t, messages, 1)
header := strings.Split(string(messages[0].Key), cfg.Terminator)[0]
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,ticdc-meta$commit-ts,col1", header)
require.Equal(t, len(cs.Rows), messages[0].GetRowsCount())

cfg.CSVOutputFieldHeader = false
encoder = newBatchEncoder(cfg)
err = encoder.AppendTxnEvent(cs, nil)
require.Nil(t, err)
messages1 := encoder.Build()
require.Len(t, messages1, 1)
require.Equal(t, messages1[0].Value, messages[0].Value)
require.Equal(t, len(cs.Rows), messages1[0].GetRowsCount())

cfg.CSVOutputFieldHeader = true
cs = &model.SingleTableTxn{
TableInfo: ddl.TableInfo,
Rows: nil,
}
encoder = newBatchEncoder(cfg)
err = encoder.AppendTxnEvent(cs, nil)
require.Nil(t, err)
messages = encoder.Build()
require.Len(t, messages, 0)
}
34 changes: 32 additions & 2 deletions pkg/sink/codec/csv/csv_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func newCSVMessage(config *common.Config) *csvMessage {
// Col2: Table name, the name of the source table.
// Col3: Schema name, the name of the source schema.
// Col4: Commit TS, the commit-ts of the source txn (optional).
// Col5-n: one or more columns that represent the data to be changed.
// Column 5: The is-update column only exists when the value of output-old-value is true.(optional)
// Col6-n: one or more columns that represent the data to be changed.
func (c *csvMessage) encode() []byte {
strBuilder := new(strings.Builder)
if c.opType == operationUpdate && c.config.OutputOldValue && len(c.preColumns) != 0 {
Expand All @@ -116,7 +117,7 @@ func (c *csvMessage) encode() []byte {
c.encodeMeta(c.opType.String(), strBuilder)
c.encodeColumns(c.columns, strBuilder)
}
return []byte(strBuilder.String())
return common.UnsafeStringToBytes(strBuilder.String())
}

func (c *csvMessage) encodeMeta(opType string, b *strings.Builder) {
Expand Down Expand Up @@ -486,3 +487,32 @@ func csvColumns2RowChangeColumns(csvConfig *common.Config, csvCols []any, ticols

return cols, nil
}

// The header should contain the name corresponding to the file record field,
// and should have the same number as the record field.
// | ticdc-meta$operation | ticdc-meta$table | ticdc-meta$schema | ticdc-meta$commit-ts | ticdc-meta$is-update | col1 | col2 | ... |
func encodeHeader(config *common.Config, colNames []string) []byte {
if !config.CSVOutputFieldHeader {
return nil
}
strBuilder := new(strings.Builder)
strBuilder.WriteString("ticdc-meta$operation")
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$table")
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$schema")
if config.IncludeCommitTs {
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$commit-ts")
}
if config.OutputOldValue {
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$is-update")
}
for _, name := range colNames {
Comment on lines +511 to +512

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The encodeHeader function does not account for the OutputHandleKey configuration. When OutputHandleKey is true, encodeMeta adds the handle key as a column, but there is no corresponding header column. This will cause a mismatch between the header and the data rows. A header for the handle key should be added, for instance ticdc-meta$handle-key.

    }
    if config.OutputHandleKey {
        strBuilder.WriteString(config.Delimiter)
        strBuilder.WriteString("ticdc-meta$handle-key")
    }
    for _, name := range colNames {

strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString(name)
}
strBuilder.WriteString(config.Terminator)
return common.UnsafeStringToBytes(strBuilder.String())
}
32 changes: 32 additions & 0 deletions pkg/sink/codec/csv/csv_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,35 @@ func TestCSVMessageDecode(t *testing.T) {
}
}
}

func TestEncodeHeader(t *testing.T) {
cfg := &common.Config{
OutputOldValue: true,
IncludeCommitTs: true,
Delimiter: " ",
CSVOutputFieldHeader: true,
}
colNames := []string{"col1", "col2"}
header := encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema ticdc-meta$commit-ts ticdc-meta$is-update col1 col2", string(header))

cfg.OutputOldValue = false
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema ticdc-meta$commit-ts col1 col2", string(header))

cfg.IncludeCommitTs = false
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema col1 col2", string(header))

cfg.Delimiter = ","
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,col1,col2", string(header))

cfg.Terminator = "\n"
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,col1,col2\n", string(header))

cfg.CSVOutputFieldHeader = false
header = encodeHeader(cfg, colNames)
require.Nil(t, header)
}
10 changes: 5 additions & 5 deletions tests/integration_tests/canal_json_storage_basic/data/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ CREATE TABLE multi_data_type
);

CREATE TABLE multi_charset (
id INT,
name varchar(128) CHARACTER SET gbk,
country char(32) CHARACTER SET gbk,
city varchar(64),
`id` INT,
Name varchar(128) CHARACTER SET gbk,
`Country` char(32) CHARACTER SET gbk,
CITY varchar(64),
description text CHARACTER SET gbk,
image tinyblob,
`IMAGE` tinyblob,
PRIMARY KEY (id)
) ENGINE = InnoDB CHARSET = utf8mb4;

Expand Down
Loading
Loading