Skip to content

Commit 166e287

Browse files
Support ClickHouse having extra columns compared to source for initial load (#2794)
Currently to construct the column selectors for the Insert into Select to ClickHouse in initial load, we get the columns from ClickHouse. This results in initial load failing if there is an extra column on target vs source. The reason we were using the CH schema and not the source schema here was so that column exclusion was automatically taken care of. But this caused an unideal situation where we support source having extra columns for initial load and ClickHouse having extra columns for CDC normalize. This PR re-wires things to make initial load use the source schema and gives it column exclusion info
1 parent fc3c4b4 commit 166e287

File tree

7 files changed

+81
-32
lines changed

7 files changed

+81
-32
lines changed

flow/connectors/clickhouse/qrep.go

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"log/slog"
77
"strings"
88

9-
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
109
"github.com/aws/aws-sdk-go-v2/aws"
1110
"github.com/aws/aws-sdk-go-v2/service/s3"
1211

@@ -33,26 +32,11 @@ func (c *ClickHouseConnector) SyncQRepRecords(
3332
slog.String("destinationTable", destTable),
3433
)
3534

36-
tblSchema, err := c.getTableSchema(ctx, destTable)
37-
if err != nil {
38-
return 0, fmt.Errorf("failed to get schema of table %s: %w", destTable, err)
39-
}
40-
c.logger.Info("Called QRep sync function and obtained table schema", flowLog)
35+
c.logger.Info("Called QRep sync function", flowLog)
4136

4237
avroSync := NewClickHouseAvroSyncMethod(config, c)
4338

44-
return avroSync.SyncQRepRecords(ctx, config, partition, tblSchema, stream)
45-
}
46-
47-
func (c *ClickHouseConnector) getTableSchema(ctx context.Context, tableName string) ([]driver.ColumnType, error) {
48-
queryString := fmt.Sprintf("SELECT * FROM `%s` LIMIT 0", tableName)
49-
rows, err := c.query(ctx, queryString)
50-
if err != nil {
51-
return nil, fmt.Errorf("failed to execute query: %w", err)
52-
}
53-
defer rows.Close()
54-
55-
return rows.ColumnTypes(), nil
39+
return avroSync.SyncQRepRecords(ctx, config, partition, stream)
5640
}
5741

5842
func (c *ClickHouseConnector) ConsolidateQRepPartitions(_ context.Context, config *protos.QRepConfig) error {

flow/connectors/clickhouse/qrep_avro_sync.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ import (
77
"strings"
88
"time"
99

10-
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
11-
1210
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
1311
avro "github.com/PeerDB-io/peerdb/flow/connectors/utils/avro"
1412
"github.com/PeerDB-io/peerdb/flow/generated/protos"
@@ -105,7 +103,6 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
105103
ctx context.Context,
106104
config *protos.QRepConfig,
107105
partition *protos.QRepPartition,
108-
dstTableSchema []driver.ColumnType,
109106
stream *model.QRecordStream,
110107
) (int, error) {
111108
dstTableName := config.DestinationTableIdentifier
@@ -146,17 +143,14 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
146143
endpoint := s.credsProvider.Provider.GetEndpointURL()
147144
region := s.credsProvider.Provider.GetRegion()
148145
avroFileUrl := utils.FileURLForS3Service(endpoint, region, s3o.Bucket, avroFile.FilePath)
149-
selectedColumnNames := make([]string, 0, len(dstTableSchema))
150-
insertedColumnNames := make([]string, 0, len(dstTableSchema))
151-
for _, col := range dstTableSchema {
152-
colName := col.Name()
153-
if strings.EqualFold(colName, signColName) ||
154-
strings.EqualFold(colName, config.SyncedAtColName) ||
155-
strings.EqualFold(colName, versionColName) ||
156-
(sourceSchemaAsDestinationColumn && strings.EqualFold(colName, sourceSchemaColName)) {
157-
continue
146+
selectedColumnNames := make([]string, 0, len(schema.Fields))
147+
insertedColumnNames := make([]string, 0, len(schema.Fields))
148+
for _, colName := range schema.GetColumnNames() {
149+
for _, excludedColumn := range config.Exclude {
150+
if colName == excludedColumn {
151+
continue
152+
}
158153
}
159-
160154
avroColName, ok := columnNameAvroFieldMap[colName]
161155
if !ok {
162156
s.logger.Error("destination column not found in avro schema",
@@ -184,7 +178,7 @@ func (s *ClickHouseAvroSyncMethod) SyncQRepRecords(
184178
sessionTokenPart = fmt.Sprintf(", '%s'", creds.AWS.SessionToken)
185179
}
186180

187-
hashColName := columnNameAvroFieldMap[dstTableSchema[0].Name()]
181+
hashColName := columnNameAvroFieldMap[schema.Fields[0].Name]
188182
numParts, err := internal.PeerDBClickHouseInitialLoadPartsPerPartition(ctx, s.config.Env)
189183
if err != nil {
190184
s.logger.Warn("failed to get chunking parts, proceeding without chunking", slog.Any("error", err))

flow/e2e/clickhouse/clickhouse.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"reflect"
7+
"strings"
78
"testing"
89
"time"
910

@@ -99,6 +100,32 @@ func (s ClickHouseSuite) Teardown(ctx context.Context) {
99100
require.NoError(s.t, s.connector.Close())
100101
}
101102

103+
type TestClickHouseColumn struct {
104+
Name string
105+
Type string
106+
}
107+
108+
// CreateRMTTable creates a ReplacingMergeTree table with the given name and columns.
109+
func (s ClickHouseSuite) CreateRMTTable(tableName string, columns []TestClickHouseColumn, orderingKey string) error {
110+
ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig())
111+
if err != nil {
112+
return err
113+
}
114+
defer ch.Close()
115+
116+
columnStrings := make([]string, len(columns))
117+
for i, col := range columns {
118+
columnStrings[i] = fmt.Sprintf("`%s` %s", col.Name, col.Type)
119+
}
120+
121+
// Join the column definitions into a single string
122+
columnStr := strings.Join(columnStrings, ", ")
123+
124+
// Create the table with ReplacingMergeTree engine
125+
createTableQuery := fmt.Sprintf("CREATE TABLE `%s` (%s) ENGINE = ReplacingMergeTree() ORDER BY `%s`", tableName, columnStr, orderingKey)
126+
return ch.Exec(s.t.Context(), createTableQuery)
127+
}
128+
102129
func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch, error) {
103130
ch, err := connclickhouse.Connect(s.t.Context(), nil, s.Peer().GetClickhouseConfig())
104131
if err != nil {

flow/e2e/clickhouse/peer_flow_ch_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1250,6 +1250,47 @@ func (s ClickHouseSuite) Test_SchemaAsColumn() {
12501250
e2e.RequireEnvCanceled(s.t, env)
12511251
}
12521252

1253+
func (s ClickHouseSuite) Test_Extra_CH_Columns() {
1254+
srcTableName := "test_extra_ch_cols"
1255+
srcFullName := s.attachSchemaSuffix("test_extra_ch_cols")
1256+
dstTableName := "test_extra_ch_cols"
1257+
1258+
require.NoError(s.t, s.source.Exec(s.t.Context(),
1259+
fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s (id INT PRIMARY KEY, "key" TEXT NOT NULL)`, srcFullName)))
1260+
1261+
require.NoError(s.t, s.CreateRMTTable(dstTableName, []TestClickHouseColumn{
1262+
{Name: "id", Type: "Int32"},
1263+
{Name: "key", Type: "String"},
1264+
{Name: "updatedAt", Type: "String"},
1265+
{Name: "_peerdb_is_deleted", Type: "Int8"},
1266+
{Name: "_peerdb_synced_at", Type: "DateTime"},
1267+
{Name: "_peerdb_version", Type: "Int64"},
1268+
}, "id"),
1269+
)
1270+
1271+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s (id,"key") VALUES (1,'init')`, srcFullName)))
1272+
1273+
connectionGen := e2e.FlowConnectionGenerationConfig{
1274+
FlowJobName: s.attachSuffix("test_extra_ch_cols"),
1275+
TableNameMapping: map[string]string{srcFullName: dstTableName},
1276+
Destination: s.Peer().Name,
1277+
}
1278+
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s)
1279+
flowConnConfig.DoInitialSnapshot = true
1280+
1281+
tc := e2e.NewTemporalClient(s.t)
1282+
env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
1283+
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
1284+
1285+
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on initial", srcTableName, dstTableName, "id,\"key\"")
1286+
1287+
require.NoError(s.t, s.source.Exec(s.t.Context(), fmt.Sprintf(`INSERT INTO %s (id,"key") VALUES (2,'cdc')`, srcFullName)))
1288+
e2e.EnvWaitForEqualTablesWithNames(env, s, "waiting on cdc", srcTableName, dstTableName, "id,\"key\"")
1289+
1290+
env.Cancel(s.t.Context())
1291+
e2e.RequireEnvCanceled(s.t, env)
1292+
}
1293+
12531294
func (s ClickHouseSuite) Test_NullEngine() {
12541295
srcTableName := "test_nullengine"
12551296
srcFullName := s.attachSchemaSuffix(srcTableName)

flow/workflows/snapshot_flow.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ func (s *SnapshotFlowExecution) cloneTable(
216216
Script: s.config.Script,
217217
Env: s.config.Env,
218218
ParentMirrorName: flowName,
219+
Exclude: mapping.Exclude,
219220
}
220221

221222
boundSelector.SpawnChild(childCtx, QRepFlowWorkflow, nil, config, nil)

protos/flow.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ message QRepConfig {
317317
map<string, string> env = 24;
318318

319319
string parent_mirror_name = 25;
320+
repeated string exclude = 26;
320321
}
321322

322323
message QRepPartition {

ui/app/mirrors/create/helpers/common.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,5 @@ export const blankQRepSetting: QRepConfig = {
6969
system: TypeSystem.Q,
7070
env: {},
7171
parentMirrorName: '',
72+
exclude: [],
7273
};

0 commit comments

Comments
 (0)