Skip to content

Commit cdda0af

Browse files
authored
Merge pull request #885 from datazip-inc/staging
chore: staging -> master v0.6.0
2 parents cd88547 + a3b1b67 commit cdda0af

45 files changed

Lines changed: 3252 additions & 479 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/integration-tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ jobs:
2020
integration-tests:
2121
environment: integration_tests
2222
runs-on: 32gb-runner
23-
timeout-minutes: 30
23+
timeout-minutes: 45
2424
steps:
2525
- name: Checkout code
2626
uses: actions/checkout@v3

constants/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ const (
4848
MSSQL DriverType = "mssql"
4949
)
5050

51+
// Drivers where filters are applied in memory after full refresh data is read.
52+
var FullRefreshPostReadFilterDrivers = []DriverType{S3, Kafka}
5153
var RelationalDrivers = []DriverType{Postgres, MySQL, Oracle, DB2, MSSQL}
5254

5355
var ParallelCDCDrivers = []DriverType{MongoDB, MSSQL}

destination/iceberg/iceberg.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,18 @@ func (i *Iceberg) FlattenAndCleanData(ctx context.Context, records []types.RawRe
365365
return false, nil, nil, fmt.Errorf("failed to extract schema from records: %s", err)
366366
}
367367

368-
return schemaDifference, records, recordsSchema, err
368+
if i.options.ApplyFilter {
369+
filter, isLegacy, filterErr := i.stream.GetFilter()
370+
if filterErr != nil {
371+
return false, nil, nil, fmt.Errorf("failed to parse stream filter: %s", filterErr)
372+
}
373+
records, err = typeutils.FilterRecords(ctx, records, filter, isLegacy, recordsSchema)
374+
if err != nil {
375+
return false, nil, nil, fmt.Errorf("failed to filter records: %s", err)
376+
}
377+
}
378+
379+
return schemaDifference, records, recordsSchema, nil
369380
}
370381

371382
// compares with global schema and update schema in destination accordingly

destination/parquet/parquet.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -403,9 +403,22 @@ func (p *Parquet) FlattenAndCleanData(ctx context.Context, records []types.RawRe
403403
}
404404
}
405405

406-
return schemaChange, records, p.schema, utils.Concurrent(ctx, records, runtime.GOMAXPROCS(0)*16, func(_ context.Context, record types.RawRecord, _ int) error {
406+
if err := utils.Concurrent(ctx, records, runtime.GOMAXPROCS(0)*16, func(_ context.Context, record types.RawRecord, _ int) error {
407407
return typeutils.ReformatRecord(p.schema, record.Data)
408-
})
408+
}); err != nil {
409+
return false, nil, nil, fmt.Errorf("failed to reformat records: %s", err)
410+
}
411+
if p.options.ApplyFilter {
412+
filter, isLegacy, filterErr := p.stream.GetFilter()
413+
if filterErr != nil {
414+
return false, nil, nil, fmt.Errorf("failed to parse stream filter: %s", filterErr)
415+
}
416+
records, err = typeutils.FilterRecords(ctx, records, filter, isLegacy, p.schema)
417+
if err != nil {
418+
return false, nil, nil, fmt.Errorf("failed to filter records: %s", err)
419+
}
420+
}
421+
return schemaChange, records, p.schema, nil
409422
}
410423

411424
// EvolveSchema updates the schema based on changes. Need to pass olakeTimestamp to get the correct partition path based on record ingestion time.

destination/writers.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ type (
1818
WriterOption func(Writer) error
1919

2020
Options struct {
21-
Identifier string
22-
Number int64
23-
Backfill bool
24-
ThreadID string
21+
Identifier string
22+
Number int64
23+
Backfill bool
24+
ThreadID string
25+
ApplyFilter bool
2526
}
2627

2728
ThreadOptions func(opt *Options)
@@ -34,6 +35,7 @@ type (
3435
Stats struct {
3536
TotalRecordsToSync atomic.Int64 // total record that are required to sync
3637
ReadCount atomic.Int64 // records that got read
38+
RecordsFiltered atomic.Int64 // records that got filtered
3739
ThreadCount atomic.Int64 // total number of writer threads
3840
}
3941

@@ -83,6 +85,11 @@ func WithThreadID(threadID string) ThreadOptions {
8385
opt.ThreadID = threadID
8486
}
8587
}
88+
func WithApplyFilter(applyFilter bool) ThreadOptions {
89+
return func(opt *Options) {
90+
opt.ApplyFilter = applyFilter
91+
}
92+
}
8693

8794
func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams []string, batchSize int64) (*WriterPool, error) {
8895
newfunc, found := RegisteredWriters[config.Type]
@@ -105,6 +112,7 @@ func NewWriterPool(ctx context.Context, config *types.WriterConfig, syncStreams
105112
TotalRecordsToSync: atomic.Int64{},
106113
ThreadCount: atomic.Int64{},
107114
ReadCount: atomic.Int64{},
115+
RecordsFiltered: atomic.Int64{},
108116
},
109117
config: config.WriterConfig,
110118
init: newfunc,
@@ -230,12 +238,12 @@ func (wt *WriterThread) flush(ctx context.Context, buf []types.RawRecord) (err e
230238
// create flush context
231239
flushCtx, cancel := context.WithCancel(ctx)
232240
defer cancel()
233-
241+
recordsCountBeforeFiltering := len(buf)
234242
evolution, buf, threadSchema, err := wt.writer.FlattenAndCleanData(flushCtx, buf)
235243
if err != nil {
236244
return fmt.Errorf("failed to flatten and clean data: %s", err)
237245
}
238-
246+
wt.stats.RecordsFiltered.Add(int64(recordsCountBeforeFiltering - len(buf)))
239247
// TODO: after flattening record type raw_record not make sense
240248
if evolution {
241249
wt.streamArtifact.mu.Lock()

drivers/abstract/backfill.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (a *AbstractDriver) Backfill(mainCtx context.Context, backfilledStreams cha
4949
defer backfillCtxCancel()
5050

5151
threadID := generateThreadID(stream.ID(), fmt.Sprintf("min[%v]-max[%v]", chunk.Min, chunk.Max))
52-
inserter, prevMetadataState, err := pool.NewWriter(backfillCtx, stream, destination.WithBackfill(true), destination.WithThreadID(threadID))
52+
inserter, prevMetadataState, err := pool.NewWriter(backfillCtx, stream, destination.WithBackfill(true), destination.WithThreadID(threadID), destination.WithApplyFilter(slices.Contains(constants.FullRefreshPostReadFilterDrivers, constants.DriverType(a.driver.Type()))))
5353
if err != nil {
5454
return fmt.Errorf("failed to create new writer thread: %s", err)
5555
}

drivers/abstract/cdc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (a *AbstractDriver) streamChanges(mainCtx context.Context, pool *destinatio
116116

117117
for _, stream := range streams {
118118
threadID := generateThreadID(stream.ID(), "")
119-
w, writerMeta, createErr := pool.NewWriter(cdcCtx, stream, destination.WithThreadID(threadID))
119+
w, writerMeta, createErr := pool.NewWriter(cdcCtx, stream, destination.WithThreadID(threadID), destination.WithApplyFilter(true))
120120
if createErr != nil {
121121
return fmt.Errorf("failed to create CDC writer for stream %s: %s", stream.ID(), createErr)
122122
}

drivers/abstract/incremental.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (a *AbstractDriver) Incremental(mainCtx context.Context, pool *destination.
7575
defer incrementalCtxCancel()
7676

7777
threadID := generateThreadID(stream.ID(), fmt.Sprintf("%v_%v", maxPrimaryCursorValue, maxSecondaryCursorValue))
78-
inserter, prevMetadataState, err := pool.NewWriter(incrementalCtx, stream, destination.WithThreadID(threadID))
78+
inserter, prevMetadataState, err := pool.NewWriter(incrementalCtx, stream, destination.WithThreadID(threadID), destination.WithApplyFilter(true))
7979
if err != nil {
8080
return fmt.Errorf("failed to create new writer thread: %s", err)
8181
}

drivers/db2/internal/db2_test.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,21 @@ func TestDB2Integration(t *testing.T) {
2020
DestinationDB: "db2_testdb_db2inst1",
2121
CursorField: "COL_CURSOR:COL_TIMESTAMP",
2222
PartitionRegex: "/{id, identity}",
23+
FilterConfig: `{
24+
"logical_operator": "And",
25+
"conditions": [
26+
{
27+
"column": "COL_DOUBLE",
28+
"operator": "<",
29+
"value": 239834.89
30+
},
31+
{
32+
"column": "COL_TIMESTAMP",
33+
"operator": ">=",
34+
"value": "2022-07-01T15:30:00.000+00:00"
35+
}
36+
]
37+
}`,
2338
}
2439
testConfig.TestIntegration(t)
2540
}

drivers/db2/internal/db2_test_util.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,30 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
9191
VARGRAPHIC('vargraphic_val'),
9292
TRUE
9393
)`, integrationTestTable)
94+
_, err = db.ExecContext(ctx, query)
95+
require.NoError(t, err, "Failed to execute %s operation", operation)
96+
// insert a filtered row — timestamp is before the filter threshold, so it won't be synced
97+
filteredQuery := fmt.Sprintf(`
98+
INSERT INTO %s (
99+
col_cursor, col_bigint, col_char, col_character,
100+
col_varchar, col_date, col_decimal,
101+
col_double, col_real, col_int, col_smallint,
102+
col_clob, col_blob, col_timestamp, col_time,
103+
col_graphic, col_vargraphic, col_bool
104+
) VALUES (
105+
-1, 111111111111111, 'x', 'filtered',
106+
'filtered_val', DATE('2022-06-15'), 50.123,
107+
50.123, 50.0, 0, 0,
108+
CLOB('filtered text'), BLOB(X'00'),
109+
TIMESTAMP('2022-06-15-10.00.00.000000'),
110+
TIME('10.00.00'),
111+
GRAPHIC('filtered'),
112+
VARGRAPHIC('filtered'),
113+
FALSE
114+
)`, integrationTestTable)
115+
_, err = db.ExecContext(ctx, filteredQuery)
116+
require.NoError(t, err, "Failed to insert filtered test data row")
117+
return
94118

95119
case "update":
96120
query = fmt.Sprintf(`
@@ -149,6 +173,27 @@ func insertTestData(t *testing.T, ctx context.Context, db *sqlx.DB, tableName st
149173
_, err := db.ExecContext(ctx, query)
150174
require.NoError(t, err, "Failed to insert test data")
151175
}
176+
// insert a filtered row — timestamp is before the filter threshold, so it won't be synced
177+
filteredQuery := fmt.Sprintf(`
178+
INSERT INTO %s (
179+
col_cursor, col_bigint, col_char, col_character,
180+
col_varchar, col_date, col_decimal,
181+
col_double, col_real, col_int, col_smallint,
182+
col_clob, col_blob, col_timestamp, col_time,
183+
col_graphic, col_vargraphic, col_bool
184+
) VALUES (
185+
-1, 111111111111111, 'x', 'filtered',
186+
'filtered_val', DATE('2021-06-15'), 500234.123,
187+
500234.123, 500234.0, 0, 0,
188+
CLOB('filtered text'), BLOB(X'00'),
189+
TIMESTAMP('2021-06-15-10.00.00.000000'),
190+
TIME('10.00.00'),
191+
GRAPHIC('filtered'),
192+
VARGRAPHIC('filtered'),
193+
FALSE
194+
)`, tableName)
195+
_, err := db.ExecContext(ctx, filteredQuery)
196+
require.NoError(t, err, "Failed to insert filtered test data row")
152197
}
153198

154199
var ExpectedDB2Data = map[string]interface{}{

0 commit comments

Comments
 (0)