diff --git a/drivers/mongodb/internal/mon_test.go b/drivers/mongodb/internal/mon_test.go index 84498fbcb..6612686e6 100644 --- a/drivers/mongodb/internal/mon_test.go +++ b/drivers/mongodb/internal/mon_test.go @@ -17,6 +17,8 @@ func TestMongodbIntegration(t *testing.T) { DataTypeSchema: MongoToIcebergSchema, ExecuteQuery: ExecuteQuery, IcebergDB: "mongodb_olake_mongodb_test", + CursorField: "_id", + PartitionRegex: "/{_id,identity}", } testConfig.TestIntegration(t) } diff --git a/drivers/mysql/internal/mysql_test.go b/drivers/mysql/internal/mysql_test.go index ba421d8b9..62a195bf5 100644 --- a/drivers/mysql/internal/mysql_test.go +++ b/drivers/mysql/internal/mysql_test.go @@ -17,6 +17,8 @@ func TestMySQLIntegration(t *testing.T) { DataTypeSchema: MySQLToIcebergSchema, ExecuteQuery: ExecuteQuery, IcebergDB: "mysql_olake_mysql_test", + CursorField: "id", + PartitionRegex: "/{id,identity}", } testConfig.TestIntegration(t) } diff --git a/drivers/postgres/internal/postgres_test.go b/drivers/postgres/internal/postgres_test.go index 1773c8ffb..d43a95b5f 100644 --- a/drivers/postgres/internal/postgres_test.go +++ b/drivers/postgres/internal/postgres_test.go @@ -18,6 +18,8 @@ func TestPostgresIntegration(t *testing.T) { DataTypeSchema: PostgresToIcebergSchema, ExecuteQuery: ExecuteQuery, IcebergDB: "postgres_postgres_public", + CursorField: "col_bigserial", + PartitionRegex: "/{col_bigserial,identity}", } testConfig.TestIntegration(t) } diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 2fa839545..1c644f6e2 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -10,6 +10,7 @@ import ( "time" "github.com/apache/spark-connect-go/v35/spark/sql" + "github.com/apache/spark-connect-go/v35/spark/sql/types" "github.com/datazip-inc/olake/constants" "github.com/datazip-inc/olake/utils" "github.com/datazip-inc/olake/utils/typeutils" @@ -37,6 +38,8 @@ type IntegrationTest struct { Namespace string ExecuteQuery func(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) IcebergDB string + CursorField string + PartitionRegex string } type PerformanceTest struct { @@ -114,8 +117,8 @@ func discoverCommand(config TestConfig) string { return fmt.Sprintf("/test-olake/build.sh driver-%s discover --config %s", config.Driver, config.SourcePath) } -// TODO: check if we can remove namespace from being passed as a parameter and use a common namespace for all drivers -func updateStreamsCommand(config TestConfig, namespace string, stream []string, isBackfill bool) string { +// update normalization=true for selected streams under selected_streams. by name +func updateSelectedStreamsCommand(config TestConfig, namespace, partitionRegex string, stream []string, isBackfill bool) string { if len(stream) == 0 { return "" } @@ -126,10 +129,11 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, condition := strings.Join(streamConditions, " or ") tmpCatalog := fmt.Sprintf("/tmp/%s_%s_streams.json", config.Driver, utils.Ternary(isBackfill, "backfill", "cdc").(string)) jqExpr := fmt.Sprintf( - `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true)) }' %s > %s && mv %s %s`, + `jq '.selected_streams = { "%s": (.selected_streams["%s"] | map(select(%s) | .normalization = true | .partition_regex = "%s")) }' %s > %s && mv %s %s`, namespace, namespace, condition, + partitionRegex, config.CatalogPath, tmpCatalog, tmpCatalog, @@ -138,6 +142,23 @@ func updateStreamsCommand(config TestConfig, namespace string, stream []string, return jqExpr } +// set sync_mode and cursor_field for a specific stream object in streams[] by namespace+name +func updateStreamConfigCommand(config TestConfig, namespace, streamName, syncMode, cursorField string) string { + tmpCatalog := fmt.Sprintf("/tmp/%s_set_mode_streams.json", config.Driver) + // map/select pattern updates nested array members + return fmt.Sprintf( + `jq --arg ns "%s" --arg name "%s" --arg mode "%s" --arg cursor "%s" '.streams = (.streams | map(if .stream.namespace == $ns and .stream.name == $name then (.stream.sync_mode = $mode | .stream.cursor_field = $cursor) else . end))' %s > %s && mv %s %s`, + namespace, streamName, syncMode, cursorField, + config.CatalogPath, tmpCatalog, tmpCatalog, config.CatalogPath, + ) +} + +// reset state file so incremental can perform initial load (equivalent to full load on first run) +func resetStateFileCommand(config TestConfig) string { + // Ensure the state is clean irrespective of previous CDC run + return fmt.Sprintf(`rm -f %s; echo '{}' > %s`, config.StatePath, config.StatePath) +} + // to get backfill streams from cdc streams e.g. "demo_cdc" -> "demo" func GetBackfillStreamsFromCDC(cdcStreams []string) []string { backfillStreams := []string{} @@ -261,15 +282,30 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, // ) - streamUpdateCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, []string{currentTestTable}, true) + streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization in streams.json (%d): %s\n%s", code, err, out, ) } - t.Logf("Enabled normalization in %s", cfg.TestConfig.CatalogPath) + t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) + + // Helper to run sync and verify + runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { + cmd := syncCommand(*cfg.TestConfig, useState) + if useState && operation != "" { + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) + } + if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { + return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) + } + t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) + VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver) + return nil + } + // 3. Phase A: Full load + CDC testCases := []struct { syncMode string operation string @@ -306,22 +342,6 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { dummySchema: nil, }, } - - runSync := func(c testcontainers.Container, useState bool, operation, opSymbol string, schema map[string]interface{}) error { - cmd := syncCommand(*cfg.TestConfig, useState) - if useState && operation != "" { - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, operation, false) - } - - if code, out, err := utils.ExecCommand(ctx, c, cmd); err != nil || code != 0 { - return fmt.Errorf("sync failed (%d): %s\n%s", code, err, out) - } - t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) - VerifyIcebergSync(t, currentTestTable, cfg.IcebergDB, cfg.DataTypeSchema, schema, opSymbol, cfg.TestConfig.Driver) - return nil - } - - // 3. Run Sync command and verify records in Iceberg for _, test := range testCases { t.Logf("Running test for: %s", test.syncMode) if err := runSync(c, test.useState, test.operation, test.opSymbol, test.dummySchema); err != nil { @@ -329,7 +349,47 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } } - // 4. Clean up + // 4. Phase B: Full load + Incremental (switch streams.json cdc -> incremental, set cursor_field = "id") + // Reset table to isolate incremental scenario + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + + // Ensure normalization remains on for selected stream + streamUpdateCmd = updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, []string{currentTestTable}, true) + if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { + return fmt.Errorf("failed to enable normalization in streams.json for incremental (%d): %s\n%s", + code, err, out, + ) + } + + // Patch: sync_mode = incremental, cursor_field = "id" + incPatch := updateStreamConfigCommand(*cfg.TestConfig, cfg.Namespace, currentTestTable, "incremental", cfg.CursorField) + if code, out, err := utils.ExecCommand(ctx, c, incPatch); err != nil || code != 0 { + return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) + } + + // Reset state so initial incremental behaves like a first full incremental load + resetState := resetStateFileCommand(*cfg.TestConfig) + if code, out, err := utils.ExecCommand(ctx, c, resetState); err != nil || code != 0 { + return fmt.Errorf("failed to reset state for incremental (%d): %s\n%s", code, err, out) + } + + // Initial incremental run (equivalent to full on first run) + t.Log("Running Incremental - full load") + if err := runSync(c, true, "", "r", cfg.ExpectedData); err != nil { + return err + } + + // Delta incremental: add new rows and sync again + t.Log("Running Incremental - inserts") + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "insert", false) + if err := runSync(c, true, "", "u", cfg.ExpectedData); err != nil { + return err + } + + // 5. Clean up cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) return nil @@ -353,8 +413,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { }) } +// TODO: Refactor parsing logic into a reusable utility functions // verifyIcebergSync verifies that data was correctly synchronized to Iceberg -func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, driver string) { +func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema map[string]string, schema map[string]interface{}, opSymbol, partitionRegex, driver string) { t.Helper() ctx := context.Background() spark, err := sql.NewSessionBuilder().Remote(sparkConnectAddress).Build(ctx) @@ -393,7 +454,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema for key, expected := range schema { icebergValue, ok := icebergMap[key] require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key) - require.Equal(t, icebergValue, expected, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) + require.Equal(t, expected, icebergValue, "Row %d: mismatch on %q: Iceberg has %#v, expected %#v", rowIdx, key, icebergValue, expected) } } t.Logf("Verified Iceberg synced data with respect to data synced from source[%s] found equal", driver) @@ -426,6 +487,24 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema "Data type mismatch for column %s: expected %s, got %s", col, expectedIceType, iceType) } t.Logf("Verified datatypes in Iceberg after sync") + + // Partition verification using only metadata tables + if partitionRegex == "" { + t.Log("No partitionRegex provided, skipping partition verification") + return + } + // Extract partition columns from describe rows + partitionCols := extractFirstPartitionColFromRows(describeRows) + require.NotEmpty(t, partitionCols, "Partition columns not found in Iceberg metadata") + + // Parse expected partition columns from pattern like "/{col,identity}" + // Supports multiple entries like "/{col1,identity}" by taking the first token as the source column + clean := strings.TrimPrefix(partitionRegex, "/{") + clean = strings.TrimSuffix(clean, "}") + toks := strings.Split(clean, ",") + expectedCol := strings.TrimSpace(toks[0]) + require.Equal(t, expectedCol, partitionCols, "Partition column does not match expected '%s'", expectedCol) + t.Logf("Verified partition column: %s", expectedCol) } func (cfg *PerformanceTest) TestPerformance(t *testing.T) { @@ -501,7 +580,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { } t.Log("(backfill) discover completed") - updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.BackfillStreams, true) + updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.BackfillStreams, true) if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { return fmt.Errorf("failed to update streams: %s", err) } @@ -535,7 +614,7 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { } t.Log("(cdc) discover completed") - updateStreamsCmd := updateStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.CDCStreams, false) + updateStreamsCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, "", cfg.CDCStreams, false) if code, _, err := utils.ExecCommand(ctx, c, updateStreamsCmd); err != nil || code != 0 { return fmt.Errorf("failed to update streams: %s", err) } @@ -585,3 +664,47 @@ func (cfg *PerformanceTest) TestPerformance(t *testing.T) { }() }) } + +// extractFirstPartitionColFromRows extracts the first partition column from DESCRIBE EXTENDED rows +func extractFirstPartitionColFromRows(rows []types.Row) string { + inPartitionSection := false + + for _, row := range rows { + // Convert []any -> []string + vals := row.Values() + parts := make([]string, len(vals)) + for i, v := range vals { + if v == nil { + parts[i] = "" + } else { + parts[i] = fmt.Sprint(v) // safe string conversion + } + } + line := strings.TrimSpace(strings.Join(parts, " ")) + if line == "" { + continue + } + + if strings.HasPrefix(line, "# Partition Information") { + inPartitionSection = true + continue + } + + if inPartitionSection { + if strings.HasPrefix(line, "# col_name") { + continue + } + + if strings.HasPrefix(line, "#") { + break + } + + fields := strings.Fields(line) + if len(fields) > 0 { + return fields[0] // return the first partition col + } + } + } + + return "" +}