Skip to content

Commit f096b4b

Browse files
authored
Merge pull request #841 from datazip-inc/staging
chore: staging -> master v0.4.0
2 parents 276f843 + f606679 commit f096b4b

16 files changed

Lines changed: 480 additions & 201 deletions

File tree

destination/iceberg/iceberg.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func (i *Iceberg) Setup(ctx context.Context, stream types.StreamInterface, globa
102102
logger.Infof("Creating destination table [%s] in Iceberg database [%s] for stream [%s]", i.stream.GetDestinationTable(), i.stream.GetDestinationDatabase(&i.config.IcebergDatabase), i.stream.Name())
103103

104104
var requestPayload proto.IcebergPayload
105-
iceSchema := stream.Schema().ToIceberg(!stream.NormalizationEnabled())
105+
iceSchema := stream.Schema().ToIceberg(!stream.NormalizationEnabled(), i.stream)
106106
requestPayload = proto.IcebergPayload{
107107
Type: proto.IcebergPayload_GET_OR_CREATE_TABLE,
108108
Metadata: &proto.IcebergPayload_Metadata{

destination/parquet/parquet.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ func (p *Parquet) createNewPartitionFile(basePath string) error {
104104

105105
writer := func() any {
106106
if p.stream.NormalizationEnabled() {
107-
return pqgo.NewGenericWriter[any](pqFile, p.schema.ToTypeSchema().ToParquet(false), pqgo.Compression(&pqgo.Snappy))
107+
return pqgo.NewGenericWriter[any](pqFile, p.schema.ToTypeSchema().ToParquet(false, p.stream), pqgo.Compression(&pqgo.Snappy))
108108
}
109-
return pqgo.NewGenericWriter[any](pqFile, p.stream.Schema().ToParquet(true), pqgo.Compression(&pqgo.Snappy))
109+
return pqgo.NewGenericWriter[any](pqFile, p.stream.Schema().ToParquet(true, p.stream), pqgo.Compression(&pqgo.Snappy))
110110
}()
111111

112112
p.partitionedFiles[basePath] = append(p.partitionedFiles[basePath], &FileMetadata{

drivers/abstract/backfill.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha
4040

4141
logger.Infof("Starting backfill for stream[%s] with %d chunks", stream.GetStream().Name, len(chunks))
4242

43+
filterDataBySelectedColumnsFn := stream.RetainSelectedColumns()
44+
4345
chunkProcessor := func(gCtx context.Context, _ int, chunk types.Chunk) (err error) {
4446
// create backfill context, so that main context not affected if backfill retries
4547
backfillCtx, backfillCtxCancel := context.WithCancel(gCtx)
@@ -66,6 +68,7 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha
6668
logger.Infof("finished chunk min[%v] and max[%v] of stream %s", chunk.Min, chunk.Max, stream.ID())
6769
return nil
6870
})()
71+
6972
return a.driver.ChunkIterator(backfillCtx, stream, chunk, func(ctx context.Context, data map[string]any) error {
7073
olakeID := utils.GetKeysHash(data, stream.GetStream().SourceDefinedPrimaryKey.Array()...)
7174
olakeColumns := map[string]any{
@@ -78,7 +81,10 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha
7881
if stream.GetSyncMode() == types.CDC {
7982
olakeColumns[constants.CdcTimestamp] = time.Unix(0, 0)
8083
}
81-
return inserter.Push(ctx, types.CreateRawRecord(data, olakeColumns))
84+
85+
filteredData := filterDataBySelectedColumnsFn(data)
86+
87+
return inserter.Push(ctx, types.CreateRawRecord(filteredData, olakeColumns))
8288
})
8389
}
8490
utils.ConcurrentInGroupWithRetry(a.GlobalConnGroup, chunks, a.driver.MaxRetries(), chunkProcessor)

drivers/abstract/cdc.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ func (a *AbstractDriver) RunChangeStream(mainCtx context.Context, pool *destinat
9999
// - For Postgres: ignored (uses global replication slot)
100100
func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destination.WriterPool, streamIndex int) (err error) {
101101
writers := make(map[string]*destination.WriterThread)
102+
filterDataBySelectedColumnsFns := make(map[string]func(map[string]interface{}) map[string]interface{})
102103

103104
// create cdc context, so that main context not affected if cdc retries
104105
cdcCtx, cdcCtxCancel := context.WithCancel(mainCtx)
@@ -131,7 +132,15 @@ func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destinatio
131132
constants.OlakeTimestamp: time.Now().UTC(),
132133
}
133134
maps.Copy(olakeColumns, change.ExtraColumns)
134-
return writer.Push(ctx, types.CreateRawRecord(change.Data, olakeColumns))
135+
136+
filterDataBySelectedColumnsFn, exists := filterDataBySelectedColumnsFns[change.Stream.ID()]
137+
if !exists {
138+
filterDataBySelectedColumnsFn = change.Stream.RetainSelectedColumns()
139+
filterDataBySelectedColumnsFns[change.Stream.ID()] = filterDataBySelectedColumnsFn
140+
}
141+
filteredData := filterDataBySelectedColumnsFn(change.Data)
142+
143+
return writer.Push(ctx, types.CreateRawRecord(filteredData, olakeColumns))
135144
})
136145
}
137146

drivers/abstract/incremental.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.
9292
return nil
9393
})()
9494

95+
filterDataBySelectedColumnsFn := stream.RetainSelectedColumns()
96+
9597
// No retry logic here - retry happens at Read level
9698
return a.driver.StreamIncrementalChanges(incrementalCtx, stream, func(ctx context.Context, record map[string]any) error {
9799
maxPrimaryCursorValue, maxSecondaryCursorValue = a.getMaxIncrementCursorFromData(primaryCursor, secondaryCursor, maxPrimaryCursorValue, maxSecondaryCursorValue, record)
@@ -100,7 +102,8 @@ func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.
100102
constants.OpType: "u",
101103
constants.OlakeTimestamp: time.Now().UTC(),
102104
}
103-
return inserter.Push(ctx, types.CreateRawRecord(record, olakeColumns))
105+
filteredData := filterDataBySelectedColumnsFn(record)
106+
return inserter.Push(ctx, types.CreateRawRecord(filteredData, olakeColumns))
104107
})
105108
})
106109
return nil
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"selected_streams":{"DB2INST1":[{"partition_regex":"","stream_name":"DB2_TEST_TABLE_OLAKE","normalization":true}]},"streams":[{"stream":{"name":"DB2_TEST_TABLE_OLAKE","namespace":"DB2INST1","type_schema":{"properties":{"COL_BIGINT":{"type":["integer","null"],"destination_column_name":"col_bigint"},"COL_BLOB":{"type":["string","null"],"destination_column_name":"col_blob"},"COL_BOOL":{"type":["boolean","null"],"destination_column_name":"col_bool"},"COL_CHAR":{"type":["string","null"],"destination_column_name":"col_char"},"COL_CHARACTER":{"type":["string","null"],"destination_column_name":"col_character"},"COL_CLOB":{"type":["string","null"],"destination_column_name":"col_clob"},"COL_CURSOR":{"type":["integer","null"],"destination_column_name":"col_cursor"},"COL_DATE":{"type":["timestamp","null"],"destination_column_name":"col_date"},"COL_DECIMAL":{"type":["number","null"],"destination_column_name":"col_decimal"},"COL_DOUBLE":{"type":["number","null"],"destination_column_name":"col_double"},"COL_GRAPHIC":{"type":["string","null"],"destination_column_name":"col_graphic"},"COL_INT":{"type":["null","integer_small"],"destination_column_name":"col_int"},"COL_REAL":{"type":["number_small","null"],"destination_column_name":"col_real"},"COL_SMALLINT":{"type":["integer_small","null"],"destination_column_name":"col_smallint"},"COL_TIME":{"type":["string","null"],"destination_column_name":"col_time"},"COL_TIMESTAMP":{"type":["timestamp","null"],"destination_column_name":"col_timestamp"},"COL_VARCHAR":{"type":["string","null"],"destination_column_name":"col_varchar"},"COL_VARGRAPHIC":{"type":["string","null"],"destination_column_name":"col_vargraphic"},"ID":{"type":["integer"],"destination_column_name":"id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["null","string"],"destination_column_name":"_op_type","olake_column":true}}},"supported_sync_modes":["full_refresh","incremental"],"source_defined_primary_key":["ID"],"available_cursor_fields":["COL_CHARACTER","COL_VARCHAR","COL_TIME","COL_GRAPHIC","ID","COL_CHAR","COL_DECIMAL","COL_BLOB","COL_VARGRAPHIC","COL_CURSOR","COL_BIGINT","COL_DOUBLE","COL_SMALLINT","COL_BOOL","COL_TIMESTAMP","COL_DATE","COL_REAL","COL_INT","COL_CLOB"],"sync_mode":"incremental","destination_database":"db2_testdb:db2inst1","destination_table":"db2_test_table_olake","default_stream_properties":{"normalization":true,"append_mode":false}}}]}
1+
{"selected_streams":{"DB2INST1":[{"partition_regex":"","stream_name":"DB2_TEST_TABLE_OLAKE","normalization":true,"selected_columns":{"columns":["_op_type","COL_GRAPHIC","COL_BIGINT","COL_BLOB","COL_CURSOR","COL_TIMESTAMP","COL_VARGRAPHIC","COL_BOOL","COL_CHAR","COL_CLOB","COL_DOUBLE","ID","_olake_timestamp","COL_VARCHAR","COL_SMALLINT","COL_INT","COL_REAL","COL_DATE","_olake_id","COL_DECIMAL","COL_TIME","COL_CHARACTER"],"sync_new_columns":true}}]},"streams":[{"stream":{"name":"DB2_TEST_TABLE_OLAKE","namespace":"DB2INST1","type_schema":{"properties":{"COL_BIGINT":{"type":["integer","null"],"destination_column_name":"col_bigint"},"COL_BLOB":{"type":["string","null"],"destination_column_name":"col_blob"},"COL_BOOL":{"type":["boolean","null"],"destination_column_name":"col_bool"},"COL_CHAR":{"type":["string","null"],"destination_column_name":"col_char"},"COL_CHARACTER":{"type":["string","null"],"destination_column_name":"col_character"},"COL_CLOB":{"type":["string","null"],"destination_column_name":"col_clob"},"COL_CURSOR":{"type":["integer","null"],"destination_column_name":"col_cursor"},"COL_DATE":{"type":["null","timestamp"],"destination_column_name":"col_date"},"COL_DECIMAL":{"type":["number","null"],"destination_column_name":"col_decimal"},"COL_DOUBLE":{"type":["number","null"],"destination_column_name":"col_double"},"COL_GRAPHIC":{"type":["string","null"],"destination_column_name":"col_graphic"},"COL_INT":{"type":["integer_small","null"],"destination_column_name":"col_int"},"COL_REAL":{"type":["number_small","null"],"destination_column_name":"col_real"},"COL_SMALLINT":{"type":["integer_small","null"],"destination_column_name":"col_smallint"},"COL_TIME":{"type":["string","null"],"destination_column_name":"col_time"},"COL_TIMESTAMP":{"type":["timestamp","null"],"destination_column_name":"col_timestamp"},"COL_VARCHAR":{"type":["string","null"],"destination_column_name":"col_varchar"},"COL_VARGRAPHIC":{"type":["string","null"],"destination_column_name":"col_vargraphic"},"ID":{"type":["integer"],"destination_column_name":"id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["null","timestamp_micro"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true}}},"supported_sync_modes":["full_refresh","incremental"],"source_defined_primary_key":["ID"],"available_cursor_fields":["ID","COL_DECIMAL","COL_SMALLINT","COL_BOOL","COL_BIGINT","COL_CHAR","COL_VARCHAR","COL_DATE","COL_REAL","COL_INT","COL_BLOB","COL_CHARACTER","COL_CLOB","COL_TIMESTAMP","COL_TIME","COL_GRAPHIC","COL_VARGRAPHIC","COL_CURSOR","COL_DOUBLE"],"sync_mode":"incremental","destination_database":"db2_testdb:db2inst1","destination_table":"db2_test_table_olake","default_stream_properties":{"normalization":true,"append_mode":false}}}]}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
{"selected_streams":{"olake_mongodb_test":[{"partition_regex":"","stream_name":"mongodb_test_table_olake","normalization":false},{"partition_regex":"","stream_name":"test_collection","normalization":false}]},"streams":[{"stream":{"name":"mongodb_test_table_olake","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_resume_token":{"type":["string","null"],"destination_column_name":"_cdc_resume_token","olake_column":true},"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp","olake_column":true},"_id":{"type":["string"],"destination_column_name":"_id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"created_timestamp":{"type":["integer_small"],"destination_column_name":"created_timestamp"},"id":{"type":["integer_small"],"destination_column_name":"id"},"id_bigint":{"type":["integer"],"destination_column_name":"id_bigint"},"id_bool":{"type":["boolean"],"destination_column_name":"id_bool"},"id_cursor":{"type":["integer_small"],"destination_column_name":"id_cursor"},"id_double":{"type":["number"],"destination_column_name":"id_double"},"id_int":{"type":["integer_small"],"destination_column_name":"id_int"},"id_maxkey":{"type":["unknown"],"destination_column_name":"id_maxkey"},"id_minkey":{"type":["unknown"],"destination_column_name":"id_minkey"},"id_nested":{"type":["object"],"destination_column_name":"id_nested"},"id_nil":{"type":["null"],"destination_column_name":"id_nil"},"id_regex":{"type":["unknown"],"destination_column_name":"id_regex"},"id_timestamp":{"type":["timestamp"],"destination_column_name":"id_timestamp"},"name_varchar":{"type":["string"],"destination_column_name":"name_varchar"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":["id_nested","id_int","id_bigint","id_maxkey","created_timestamp","id_double","id_timestamp","id_nil","id_regex","name_varchar","id_bool","_id","id","id_cursor","id_minkey"],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"mongodb_test_table_olake","default_stream_properties":{"normalization":false,"append_mode":false}}},{"stream":{"name":"test_collection","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_resume_token":{"type":["string","null"],"destination_column_name":"_cdc_resume_token","olake_column":true},"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp","olake_column":true},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true}}},"supported_sync_modes":["incremental","cdc","strict_cdc","full_refresh"],"source_defined_primary_key":["_id"],"available_cursor_fields":[],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"test_collection","default_stream_properties":{"normalization":false,"append_mode":false}}}]}
1+
{"selected_streams":{"olake_mongodb_test":[{"partition_regex":"","stream_name":"test_collection","normalization":false,"selected_columns":{"columns":["_olake_id","_cdc_resume_token","_olake_timestamp","_cdc_timestamp","_op_type"],"sync_new_columns":true}},{"partition_regex":"","stream_name":"mongodb_test_table_olake","normalization":false,"selected_columns":{"columns":["id_timestamp","created_timestamp","id","id_int","id_nil","id_minkey","_olake_id","_olake_timestamp","id_maxkey","id_double","id_regex","name_varchar","_id","id_bigint","_op_type","id_nested","id_cursor","id_bool","_cdc_resume_token","_cdc_timestamp"],"sync_new_columns":true}}]},"streams":[{"stream":{"name":"test_collection","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_resume_token":{"type":["string","null"],"destination_column_name":"_cdc_resume_token","olake_column":true},"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp","olake_column":true},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true}}},"supported_sync_modes":["incremental","cdc","strict_cdc","full_refresh"],"source_defined_primary_key":["_id"],"available_cursor_fields":[],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"test_collection","default_stream_properties":{"normalization":false,"append_mode":false}}},{"stream":{"name":"mongodb_test_table_olake","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_resume_token":{"type":["string","null"],"destination_column_name":"_cdc_resume_token","olake_column":true},"_cdc_timestamp":{"type":["null","timestamp_micro"],"destination_column_name":"_cdc_timestamp","olake_column":true},"_id":{"type":["string"],"destination_column_name":"_id"},"_olake_id":{"type":["null","string"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["null","timestamp_micro"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"created_timestamp":{"type":["integer_small"],"destination_column_name":"created_timestamp"},"id":{"type":["integer_small"],"destination_column_name":"id"},"id_bigint":{"type":["integer"],"destination_column_name":"id_bigint"},"id_bool":{"type":["boolean"],"destination_column_name":"id_bool"},"id_cursor":{"type":["integer_small"],"destination_column_name":"id_cursor"},"id_double":{"type":["number"],"destination_column_name":"id_double"},"id_int":{"type":["integer_small"],"destination_column_name":"id_int"},"id_maxkey":{"type":["unknown"],"destination_column_name":"id_maxkey"},"id_minkey":{"type":["unknown"],"destination_column_name":"id_minkey"},"id_nested":{"type":["object"],"destination_column_name":"id_nested"},"id_nil":{"type":["null"],"destination_column_name":"id_nil"},"id_regex":{"type":["unknown"],"destination_column_name":"id_regex"},"id_timestamp":{"type":["timestamp"],"destination_column_name":"id_timestamp"},"name_varchar":{"type":["string"],"destination_column_name":"name_varchar"}}},"supported_sync_modes":["full_refresh","incremental","cdc","strict_cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":["id_nil","id_minkey","_id","id_cursor","id_timestamp","created_timestamp","id_double","id_nested","id","id_int","id_regex","name_varchar","id_bigint","id_bool","id_maxkey"],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"mongodb_test_table_olake","default_stream_properties":{"normalization":false,"append_mode":false}}}]}

0 commit comments

Comments
 (0)