-
Notifications
You must be signed in to change notification settings - Fork 138
feat: integration test for incremental sync and partitioning #521
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
vikaxsh
wants to merge
20
commits into
staging
Choose a base branch
from
feat/integration-test
base: staging
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
2018fd5
feat: integration test for incremental sync
vikaxsh 49f7854
Merge branch 'staging' into feat/integration-test
vikaxsh 9d6304f
fix: cursor field for each driver
vikaxsh 8ec5e2c
feat: add incremental test with filter
vikaxsh 6802830
Merge branch 'staging' into feat/integration-test
vikaxsh cff601b
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh 47abe61
Merge branch 'feat/integration-test' of https://github.com/datazip-in…
vikaxsh 35a35ff
fix: partition verification
vikaxsh 7893352
Merge branch 'staging' into feat/integration-test
vikaxsh bac32fd
fix: partition column verify
vikaxsh b670258
fix: lint issue
vikaxsh a00207d
fix: partition column verify
vikaxsh eb2e112
fix: partition column verify
vikaxsh 6141f3b
fix: partition column verify
vikaxsh ac7877d
fix: partition column verify
vikaxsh b437a78
fix: lint issue
vikaxsh 3ba920f
Merge branch 'staging' of https://github.com/datazip-inc/olake into f…
vikaxsh 42e459e
chore: updated partioning verify query
vikaxsh 06b3e9d
chore: added a todo
vikaxsh 2cd7c5a
Merge branch 'staging' into feat/integration-test
vaibhav-datazip File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ import ( | |
| "time" | ||
|
|
||
| "github.com/apache/spark-connect-go/v35/spark/sql" | ||
| "github.com/apache/spark-connect-go/v35/spark/sql/types" | ||
| "github.com/datazip-inc/olake/constants" | ||
| "github.com/datazip-inc/olake/utils" | ||
| "github.com/datazip-inc/olake/utils/typeutils" | ||
|
|
@@ -37,6 +38,8 @@ type IntegrationTest struct { | |
| Namespace string | ||
| ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) | ||
| IcebergDB string | ||
| CursorField string | ||
| PartitionRegex string | ||
| } | ||
|
|
||
| type PerformanceTest struct { | ||
|
|
@@ -114,8 +117,8 @@ func discoverCommand(config TestConfig) string { | |
| return fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath) | ||
| } | ||
|
|
||
| // TODO: check if we can remove namespace from being passed as a parameter and use a common namespace for all drivers | ||
| func updateStreamsCommand(config TestConfig, namespace string, stream []string, isBackfill bool) string { | ||
| // update normalization=true for selected streams under selected_streams.<namespace> by name | ||
| func updateSelectedStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string { | ||
| if len(stream) == 0 { | ||
| return "" | ||
| } | ||
|
|
@@ -126,10 +129,11 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, | |
| condition := strings.Join(streamConditions, " or ") | ||
| tmpCatalog := fmt.Sprintf("/tmp/%s_%s_streams.json", config.Driver, utils.Ternary(isBackfill, "backfill", "cdc").(string)) | ||
| jqExpr := fmt.Sprintf( | ||
| `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true)) }' %s > %s && mv %s %s`, | ||
| `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true | .partition_regex = "%s")) }' %s > %s && mv %s %s`, | ||
| namespace, | ||
| namespace, | ||
| condition, | ||
| partitionRegex, | ||
| config.CatalogPath, | ||
| tmpCatalog, | ||
| tmpCatalog, | ||
|
|
@@ -138,6 +142,23 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, | |
| return jqExpr | ||
| } | ||
|
|
||
| // set sync_mode and cursor_field for a specific stream object in streams[] by namespace+name | ||
| func updateStreamConfigCommand(config TestConfig, namespace, streamName, syncMode, cursorField string) string { | ||
| tmpCatalog := fmt.Sprintf("/tmp/%s_set_mode_streams.json", config.Driver) | ||
| // map/select pattern updates nested array members | ||
| return fmt.Sprintf( | ||
| `jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor) else . end))' %s > %s && mv %s %s`, | ||
| namespace, streamName, syncMode, cursorField, | ||
| config.CatalogPath, tmpCatalog, tmpCatalog, config.CatalogPath, | ||
| ) | ||
| } | ||
|
|
||
| // reset state file so incremental can perform initial load (equivalent to full load on first run) | ||
| func resetStateFileCommand(config TestConfig) string { | ||
| // Ensure the state is clean irrespective of previous CDC run | ||
| return fmt.Sprintf(`rm -f %s; echo '{}' > %s`, config.StatePath, config.StatePath) | ||
| } | ||
|
|
||
| // to get backfill streams from cdc streams e.g. "demo_cdc" -> "demo" | ||
| func GetBackfillStreamsFromCDC(cdcStreams []string) []string { | ||
| backfillStreams := []string{} | ||
|
|
@@ -261,15 +282,30 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { | |
| // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, | ||
| // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, | ||
| // ) | ||
| streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true) | ||
| streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) | ||
| if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to enable normalization in streams.json (%d): %s\n%s", | ||
| code, err, out, | ||
| ) | ||
| } | ||
|
|
||
| t.Logf("Enabled normalization in %s", cfg.TestConfig.CatalogPath) | ||
| t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) | ||
|
|
||
| // Helper to run sync and verify | ||
| runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { | ||
| cmd := syncCommand(*cfg.TestConfig, useState) | ||
| if useState && operation != "" { | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) | ||
| } | ||
| if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { | ||
| return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) | ||
| } | ||
| t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) | ||
| VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver) | ||
| return nil | ||
| } | ||
|
|
||
| // 3. Phase A: Full load + CDC | ||
| testCases := []struct { | ||
| syncMode string | ||
| operation string | ||
|
|
@@ -306,30 +342,54 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { | |
| dummySchema: nil, | ||
| }, | ||
| } | ||
|
|
||
| runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { | ||
| cmd := syncCommand(*cfg.TestConfig, useState) | ||
| if useState && operation != "" { | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) | ||
| } | ||
|
|
||
| if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { | ||
| return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) | ||
| } | ||
| t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) | ||
| VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver) | ||
| return nil | ||
| } | ||
|
|
||
| // 3. Run Sync command and verify records in Iceberg | ||
| for _, test := range testCases { | ||
| t.Logf("Running test for: %s", test.syncMode) | ||
| if err := runSync(c, test.useState, test.operation, test.opSymbol, test.dummySchema); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // 4. Clean up | ||
| // 4. Phase B: Full load + Incremental (switch streams.json cdc -> incremental, set cursor_field = "id") | ||
| // Reset table to isolate incremental scenario | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) | ||
|
|
||
| // Ensure normalization remains on for selected stream | ||
| streamUpdateCmd = updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) | ||
| if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s", | ||
| code, err, out, | ||
| ) | ||
| } | ||
|
Comment on lines
+360
to
+365
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is getting repeated here. |
||
|
|
||
| // Patch: sync_mode = incremental, cursor_field = "id" | ||
| incPatch := updateStreamConfigCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField) | ||
| if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) | ||
| } | ||
|
|
||
| // Reset state so initial incremental behaves like a first full incremental load | ||
| resetState := resetStateFileCommand(*cfg.TestConfig) | ||
| if code, out, err := utils.ExecCommand(ctx, c, resetState); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to reset state for incremental (%d): %s\n%s", code, err, out) | ||
| } | ||
|
|
||
| // Initial incremental run (equivalent to full on first run) | ||
| t.Log("Running Incremental - full load") | ||
| if err := runSync(c, true, "", "r", cfg.ExpectedData); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Delta incremental: add new rows and sync again | ||
| t.Log("Running Incremental - inserts") | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false) | ||
| if err := runSync(c, true, "", "u", cfg.ExpectedData); err != nil { | ||
| return err | ||
| } | ||
|
Comment on lines
+379
to
+390
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here as well a test cases array can be created and call that same for loop |
||
|
|
||
| // 5. Clean up | ||
| cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) | ||
| t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) | ||
| return nil | ||
|
|
@@ -353,8 +413,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { | |
| }) | ||
| } | ||
|
|
||
| // TODO: Refactor parsing logic into a reusable utility functions | ||
| // verifyIcebergSync verifies that data was correctly synchronized to Iceberg | ||
| func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, driver string) { | ||
| func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, partitionRegex, driver string) { | ||
| t.Helper() | ||
| ctx := context.Background() | ||
| spark, err := sql.NewSessionBuilder().Remote(sparkConnectAddress).Build(ctx) | ||
|
|
@@ -393,7 +454,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema | |
| for key, expected := range schema { | ||
| icebergValue, ok := icebergMap[key] | ||
| require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key) | ||
| require.Equal(t, icebergValue, expected, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) | ||
| require.Equal(t, expected, icebergValue, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) | ||
| } | ||
| } | ||
| t.Logf("Verified Iceberg synced data with respect to data synced from source[%s] found equal", driver) | ||
|
|
@@ -426,6 +487,24 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema | |
| "Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType) | ||
| } | ||
| t.Logf("Verified datatypes in Iceberg after sync") | ||
|
|
||
| // Partition verification using only metadata tables | ||
| if partitionRegex == "" { | ||
| t.Log("No partitionRegex provided, skipping partition verification") | ||
| return | ||
| } | ||
| // Extract partition columns from describe rows | ||
| partitionCols := extractFirstPartitionColFromRows(describeRows) | ||
| require.NotEmpty(t, partitionCols, "Partition columns not found in Iceberg metadata") | ||
|
|
||
| // Parse expected partition columns from pattern like "/{col,identity}" | ||
| // Supports multiple entries like "/{col1,identity}" by taking the first token as the source column | ||
| clean := strings.TrimPrefix(partitionRegex, "/{") | ||
| clean = strings.TrimSuffix(clean, "}") | ||
| toks := strings.Split(clean, ",") | ||
| expectedCol := strings.TrimSpace(toks[0]) | ||
| require.Equal(t, expectedCol, partitionCols, "Partition column does not match expected '%s'", expectedCol) | ||
| t.Logf("Verified partition column: %s", expectedCol) | ||
| } | ||
|
|
||
| func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | ||
|
|
@@ -501,7 +580,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | |
| } | ||
| t.Log("(backfill) discover completed") | ||
|
|
||
| updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.BackfillStreams, true) | ||
| updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.BackfillStreams, true) | ||
| if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to update streams: %s", err) | ||
| } | ||
|
|
@@ -535,7 +614,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | |
| } | ||
| t.Log("(cdc) discover completed") | ||
|
|
||
| updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.CDCStreams, false) | ||
| updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.CDCStreams, false) | ||
| if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { | ||
| return fmt.Errorf("failed to update streams: %s", err) | ||
| } | ||
|
|
@@ -585,3 +664,47 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { | |
| }() | ||
| }) | ||
| } | ||
|
|
||
| // extractFirstPartitionColFromRows extracts the first partition column from DESCRIBE EXTENDED rows | ||
| func extractFirstPartitionColFromRows(rows []types.Row) string { | ||
| inPartitionSection := false | ||
|
|
||
| for _, row := range rows { | ||
| // Convert []any -> []string | ||
| vals := row.Values() | ||
| parts := make([]string, len(vals)) | ||
| for i, v := range vals { | ||
| if v == nil { | ||
| parts[i] = "" | ||
| } else { | ||
| parts[i] = fmt.Sprint(v) // safe string conversion | ||
| } | ||
| } | ||
| line := strings.TrimSpace(strings.Join(parts, " ")) | ||
| if line == "" { | ||
| continue | ||
| } | ||
|
|
||
| if strings.HasPrefix(line, "# Partition Information") { | ||
| inPartitionSection = true | ||
| continue | ||
| } | ||
|
|
||
| if inPartitionSection { | ||
| if strings.HasPrefix(line, "# col_name") { | ||
| continue | ||
| } | ||
|
|
||
| if strings.HasPrefix(line, "#") { | ||
| break | ||
| } | ||
|
|
||
| fields := strings.Fields(line) | ||
| if len(fields) > 0 { | ||
| return fields[0] // return the first partition col | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return "" | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update this error message as well