Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
BinaryEncodingMethod: c.Sink.CSVConfig.BinaryEncodingMethod,
OutputOldValue: c.Sink.CSVConfig.OutputOldValue,
OutputHandleKey: c.Sink.CSVConfig.OutputHandleKey,
OutputFieldHeader: c.Sink.CSVConfig.OutputFieldHeader,
}
}
var pulsarConfig *config.PulsarConfig
Expand Down Expand Up @@ -599,6 +600,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
BinaryEncodingMethod: cloned.Sink.CSVConfig.BinaryEncodingMethod,
OutputOldValue: cloned.Sink.CSVConfig.OutputOldValue,
OutputHandleKey: cloned.Sink.CSVConfig.OutputHandleKey,
OutputFieldHeader: cloned.Sink.CSVConfig.OutputFieldHeader,
}
}
var kafkaConfig *KafkaConfig
Expand Down Expand Up @@ -983,6 +985,7 @@ type CSVConfig struct {
BinaryEncodingMethod string `json:"binary_encoding_method"`
OutputOldValue bool `json:"output_old_value"`
OutputHandleKey bool `json:"output_handle_key"`
OutputFieldHeader bool `json:"output_field_header"`
}

// LargeMessageHandleConfig denotes the large message handling config
Expand Down
5 changes: 5 additions & 0 deletions cdc/sink/dmlsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,12 @@ func (d *dmlWorker) writeDataFile(ctx context.Context, path string, task *single
buf := bytes.NewBuffer(make([]byte, 0, task.size))
rowsCnt := 0
bytesCnt := int64(0)
// There is always only one message here in task.msgs

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment // There is always only one message here in task.msgs is an important assumption. While it might be true for the current implementation, it could become a source of bugs if the logic for batching messages into tasks changes in the future. If a task could contain multiple messages, this logic would only write the header for the very first message in the task, potentially missing headers for subsequent messages if they were intended to start new files. Consider adding a more detailed explanation or a link to where this assumption is guaranteed.

for _, msg := range task.msgs {
if msg.Key != nil && rowsCnt == 0 {
buf.Write(msg.Key)
bytesCnt += int64(len(msg.Key))
}
bytesCnt += int64(len(msg.Value))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ const (
"include-commit-ts": true,
"binary-encoding-method":"base64",
"output-old-value": false,
"output-handle-key": false
"output-handle-key": false,
"output-field-header": false
},
"date-separator": "month",
"enable-partition-separator": true,
Expand Down Expand Up @@ -423,7 +424,8 @@ const (
"include-commit-ts": true,
"binary-encoding-method":"base64",
"output-old-value": false,
"output-handle-key": false
"output-handle-key": false,
"output-field-header": false
},
"terminator": "\r\n",
"transaction-atomicity": "",
Expand Down
2 changes: 2 additions & 0 deletions pkg/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ type CSVConfig struct {
OutputOldValue bool `toml:"output-old-value" json:"output-old-value"`
// output handle key
OutputHandleKey bool `toml:"output-handle-key" json:"output-handle-key"`
// output field header
OutputFieldHeader bool `toml:"output-field-header" json:"output-field-header"`
}

func (c *CSVConfig) validateAndAdjust() error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/sink/codec/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ type Config struct {
DebeziumDisableSchema bool
// Debezium only. Whether before value should be included in the output.
DebeziumOutputOldValue bool
// CSV only. Whether header should be included in the output.
CSVOutputFieldHeader bool
}

// EncodingFormatType is the type of encoding format
Expand Down Expand Up @@ -129,6 +131,7 @@ func NewConfig(protocol config.Protocol) *Config {
DebeziumOutputOldValue: true,
OpenOutputOldValue: true,
DebeziumDisableSchema: false,
CSVOutputFieldHeader: false,
}
}

Expand Down Expand Up @@ -233,6 +236,7 @@ func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) er
c.BinaryEncodingMethod = replicaConfig.Sink.CSVConfig.BinaryEncodingMethod
c.OutputOldValue = replicaConfig.Sink.CSVConfig.OutputOldValue
c.OutputHandleKey = replicaConfig.Sink.CSVConfig.OutputHandleKey
c.CSVOutputFieldHeader = replicaConfig.Sink.CSVConfig.OutputFieldHeader
}
if replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.LargeMessageHandle != nil {
c.LargeMessageHandle = replicaConfig.Sink.KafkaConfig.LargeMessageHandle
Expand Down
29 changes: 24 additions & 5 deletions pkg/sink/codec/csv/csv_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ import (
"io"

"github.com/pingcap/errors"
"github.com/pingcap/log"
lconfig "github.com/pingcap/tidb/pkg/lightning/config"
"github.com/pingcap/tidb/pkg/lightning/mydump"
"github.com/pingcap/tidb/pkg/lightning/worker"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"go.uber.org/zap"
)

const defaultIOConcurrency = 1
Expand Down Expand Up @@ -52,11 +54,13 @@ func NewBatchDecoder(ctx context.Context,
backslashEscape = true
}
cfg := &lconfig.CSVConfig{
Separator: codecConfig.Delimiter,
Delimiter: codecConfig.Quote,
Terminator: codecConfig.Terminator,
Null: []string{codecConfig.NullString},
BackslashEscape: backslashEscape,
Separator: codecConfig.Delimiter,
Delimiter: codecConfig.Quote,
Terminator: codecConfig.Terminator,
Null: []string{codecConfig.NullString},
BackslashEscape: backslashEscape,
HeaderSchemaMatch: true,
Header: codecConfig.CSVOutputFieldHeader,
}
csvParser, err := mydump.NewCSVParser(ctx, cfg,
mydump.NewStringReader(string(value)),
Expand All @@ -65,6 +69,21 @@ func NewBatchDecoder(ctx context.Context,
if err != nil {
return nil, err
}
if codecConfig.CSVOutputFieldHeader {
err := csvParser.ReadColumns()
if err != nil {
return nil, err
}
header := csvParser.Columns()
log.Info("parser CSV header", zap.Any("header", header), zap.Any("cap", cap(header)))
// check column name
idx := len(header) - len(tableInfo.Columns)
for i, col := range tableInfo.Columns {
if col.Name.O != header[idx+i] {
log.Panic("check column name order failed", zap.Any("col", col.Name.O), zap.Any("header", header[idx+i]))
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using log.Panic for a data validation error like a CSV header mismatch is too aggressive, as it will crash the entire capture process. It's better to return an error to allow for graceful error handling, such as putting the changefeed into an error state.

            if col.Name.O != header[idx+i] {
                return nil, cerror.WrapError(cerror.ErrCSVDecodeFailed,
                    errors.Errorf("csv header column name mismatch, expected: %s, got: %s", col.Name.O, header[idx+i]))
            }

}
}
return &batchDecoder{
codecConfig: codecConfig,
tableInfo: tableInfo,
Expand Down
25 changes: 24 additions & 1 deletion pkg/sink/codec/csv/csv_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

// BatchEncoder encodes the events into the byte of a batch into.
type BatchEncoder struct {
header []byte
valueBuf *bytes.Buffer
callback func()
batchSize int
Expand All @@ -36,6 +37,9 @@ func (b *BatchEncoder) AppendTxnEvent(
callback func(),
) error {
for _, rowEvent := range e.Rows {
if b.config.CSVOutputFieldHeader && b.batchSize == 0 {
b.setHeader(rowEvent)
}
row, err := rowChangedEvent2CSVMsg(b.config, rowEvent)
if err != nil {
return err
Expand All @@ -53,7 +57,7 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
return nil
}

ret := common.NewMsg(config.ProtocolCsv, nil,
ret := common.NewMsg(config.ProtocolCsv, b.header,
b.valueBuf.Bytes(), 0, model.MessageTypeRow, nil, nil)
ret.SetRowsCount(b.batchSize)
ret.Callback = b.callback
Expand All @@ -64,10 +68,29 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
}
b.callback = nil
b.batchSize = 0
b.header = nil

return []*common.Message{ret}
}

func (b *BatchEncoder) setHeader(rowEvent *model.RowChangedEvent) {
buf := &bytes.Buffer{}
columns := rowEvent.Columns
if rowEvent.IsDelete() {
columns = rowEvent.PreColumns
}
colNames := make([]string, 0, len(columns))
for _, col := range columns {
if col == nil {
continue
}
info := rowEvent.TableInfo.ForceGetColumnInfo(col.ColumnID)
colNames = append(colNames, info.Name.O)
}
buf.Write(encodeHeader(b.config, colNames))
b.header = buf.Bytes()
}

// newBatchEncoder creates a new csv BatchEncoder.
func newBatchEncoder(config *common.Config) codec.TxnEventEncoder {
return &BatchEncoder{
Expand Down
53 changes: 53 additions & 0 deletions pkg/sink/codec/csv/csv_encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package csv

import (
"strings"
"testing"

"github.com/pingcap/tiflow/cdc/entry"
Expand Down Expand Up @@ -102,3 +103,55 @@ func TestCSVAppendRowChangedEventWithCallback(t *testing.T) {
msgs[0].Callback()
require.Equal(t, 1, count, "expected all callbacks to be called")
}

func TestCSVBatchCodecWithHeader(t *testing.T) {
helper := entry.NewSchemaTestHelper(t)
defer helper.Close()

ddl := helper.DDL2Event("create table test.table1(col1 int primary key)")
event1 := helper.DML2Event("insert into test.table1 values (1)", "test", "table1")
event2 := helper.DML2Event("insert into test.table1 values (2)", "test", "table1")

cs := &model.SingleTableTxn{
Rows: []*model.RowChangedEvent{
event1,
event2,
},
}
cfg := &common.Config{
Delimiter: ",",
Quote: "\"",
Terminator: "\n",
NullString: "\\N",
IncludeCommitTs: true,
CSVOutputFieldHeader: true,
}
encoder := newBatchEncoder(cfg)
err := encoder.AppendTxnEvent(cs, nil)
require.Nil(t, err)
messages := encoder.Build()
require.Len(t, messages, 1)
header := strings.Split(string(messages[0].Key), cfg.Terminator)[0]
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,ticdc-meta$commit-ts,col1", header)
require.Equal(t, len(cs.Rows), messages[0].GetRowsCount())

cfg.CSVOutputFieldHeader = false
encoder = newBatchEncoder(cfg)
err = encoder.AppendTxnEvent(cs, nil)
require.Nil(t, err)
messages1 := encoder.Build()
require.Len(t, messages1, 1)
require.Equal(t, messages1[0].Value, messages[0].Value)
require.Equal(t, len(cs.Rows), messages1[0].GetRowsCount())

cfg.CSVOutputFieldHeader = true
cs = &model.SingleTableTxn{
TableInfo: ddl.TableInfo,
Rows: nil,
}
encoder = newBatchEncoder(cfg)
err = encoder.AppendTxnEvent(cs, nil)
require.Nil(t, err)
messages = encoder.Build()
require.Len(t, messages, 0)
}
34 changes: 32 additions & 2 deletions pkg/sink/codec/csv/csv_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func newCSVMessage(config *common.Config) *csvMessage {
// Col2: Table name, the name of the source table.
// Col3: Schema name, the name of the source schema.
// Col4: Commit TS, the commit-ts of the source txn (optional).
// Col5-n: one or more columns that represent the data to be changed.
// Column 5: The is-update column only exists when the value of output-old-value is true.(optional)
// Col6-n: one or more columns that represent the data to be changed.
func (c *csvMessage) encode() []byte {
strBuilder := new(strings.Builder)
if c.opType == operationUpdate && c.config.OutputOldValue && len(c.preColumns) != 0 {
Expand All @@ -116,7 +117,7 @@ func (c *csvMessage) encode() []byte {
c.encodeMeta(c.opType.String(), strBuilder)
c.encodeColumns(c.columns, strBuilder)
}
return []byte(strBuilder.String())
return common.UnsafeStringToBytes(strBuilder.String())
}

func (c *csvMessage) encodeMeta(opType string, b *strings.Builder) {
Expand Down Expand Up @@ -486,3 +487,32 @@ func csvColumns2RowChangeColumns(csvConfig *common.Config, csvCols []any, ticols

return cols, nil
}

// The header should contain the name corresponding to the file record field,
// and should have the same number as the record field.
// | ticdc-meta$operation | ticdc-meta$table | ticdc-meta$schema | ticdc-meta$commit-ts | ticdc-meta$is-update | col1 | col2 | ... |
func encodeHeader(config *common.Config, colNames []string) []byte {
if !config.CSVOutputFieldHeader {
return nil
}
strBuilder := new(strings.Builder)
strBuilder.WriteString("ticdc-meta$operation")
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$table")
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$schema")
if config.IncludeCommitTs {
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$commit-ts")
}
if config.OutputOldValue {
strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString("ticdc-meta$is-update")
}
for _, name := range colNames {
Comment on lines +511 to +512

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The encodeHeader function does not account for the OutputHandleKey configuration. When OutputHandleKey is true, encodeMeta adds the handle key as a column, but there is no corresponding header column. This will cause a mismatch between the header and the data rows. A header for the handle key should be added, for instance ticdc-meta$handle-key.

    }
    if config.OutputHandleKey {
        strBuilder.WriteString(config.Delimiter)
        strBuilder.WriteString("ticdc-meta$handle-key")
    }
    for _, name := range colNames {

strBuilder.WriteString(config.Delimiter)
strBuilder.WriteString(name)
}
strBuilder.WriteString(config.Terminator)
return common.UnsafeStringToBytes(strBuilder.String())
}
32 changes: 32 additions & 0 deletions pkg/sink/codec/csv/csv_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,3 +1101,35 @@ func TestCSVMessageDecode(t *testing.T) {
}
}
}

func TestEncodeHeader(t *testing.T) {
cfg := &common.Config{
OutputOldValue: true,
IncludeCommitTs: true,
Delimiter: " ",
CSVOutputFieldHeader: true,
}
colNames := []string{"col1", "col2"}
header := encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema ticdc-meta$commit-ts ticdc-meta$is-update col1 col2", string(header))

cfg.OutputOldValue = false
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema ticdc-meta$commit-ts col1 col2", string(header))

cfg.IncludeCommitTs = false
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation ticdc-meta$table ticdc-meta$schema col1 col2", string(header))

cfg.Delimiter = ","
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,col1,col2", string(header))

cfg.Terminator = "\n"
header = encodeHeader(cfg, colNames)
require.Equal(t, "ticdc-meta$operation,ticdc-meta$table,ticdc-meta$schema,col1,col2\n", string(header))

cfg.CSVOutputFieldHeader = false
header = encodeHeader(cfg, colNames)
require.Nil(t, header)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ quote = '"'
# Representation of null values in CSV files, the default value is '\N'
null = '\N'
# Include commit-ts in the row data. The default value is false.
include-commit-ts = true
include-commit-ts = true
# output header in CSV files. The default value is false.
output-field-header = true
3 changes: 3 additions & 0 deletions tests/integration_tests/csv_storage_basic/data/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,6 @@ VALUES (
x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF',
x'000102030405060708090A0B0C0D0E0F101112131415161718191A1B1C1D1E1F202122232425262728292A2B2C2D2E2F303132333435363738393A3B3C3D3E3F404142434445464748494A4B4C4D4E4F505152535455565758595A5B5C5D5E5F606162636465666768696A6B6C6D6E6F707172737475767778797A7B7C7D7E7F808182838485868788898A8B8C8D8E8F909192939495969798999A9B9C9D9E9FA0A1A2A3A4A5A6A7A8A9AAABACADAEAFB0B1B2B3B4B5B6B7B8B9BABBBCBDBEBFC0C1C2C3C4C5C6C7C8C9CACBCCCDCECFD0D1D2D3D4D5D6D7D8D9DADBDCDDDEDFE0E1E2E3E4E5E6E7E8E9EAEBECEDEEEFF0F1F2F3F4F5F6F7F8F9FAFBFCFDFEFF'
);

ALTER TABLE binary_columns ADD c_new INT;
INSERT INTO binary_columns (c_new) VALUES (2);
7 changes: 7 additions & 0 deletions tests/integration_tests/csv_storage_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ function run() {
run_storage_consumer $WORK_DIR $SINK_URI $CUR/conf/changefeed.toml ""
sleep 8
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml 100
# check csv header
find "$WORK_DIR/storage_test/" -type f -name "*.csv" | while read -r file; do
first_line=$(head -n 1 $file)
if [[ "$first_line" != ticdc-meta* ]]; then
echo "check CSV header failed. header: $first_line"
fi
Comment on lines +32 to +34

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The check for the CSV header is not strict enough. It only echoes a message on failure and does not cause the script to exit with an error. This could lead to test failures being missed. The check should be made stricter to exit on failure. Also, it's good practice to quote file paths (e.g., "$file" in head) to handle potential spaces or special characters.

Suggested change
if [[ "$first_line" != ticdc-meta* ]]; then
echo "check CSV header failed. header: $first_line"
fi
if [[ "$first_line" != ticdc-meta* ]]; then
echo "check CSV header failed. header: $first_line"
exit 1
fi

done
}

trap stop_tidb_cluster EXIT
Expand Down
Loading