diff --git a/.github/workflows/2pc-integration-tests.yml b/.github/workflows/2pc-integration-tests.yml new file mode 100644 index 000000000..6856ac6fb --- /dev/null +++ b/.github/workflows/2pc-integration-tests.yml @@ -0,0 +1,21 @@ +name: 2PC Integration Tests + +on: + push: + branches: + - "master" + paths: + - '**/*.go' + - '**/*.java' + pull_request: + branches: + - "*" + paths: + - '**/*.go' + - '**/*.java' + +jobs: + twopc-integration-tests: + uses: ./.github/workflows/integration-tests-runner.yml + with: + test_type: 2pc diff --git a/.github/workflows/integration-tests-runner.yml b/.github/workflows/integration-tests-runner.yml new file mode 100644 index 000000000..69546d556 --- /dev/null +++ b/.github/workflows/integration-tests-runner.yml @@ -0,0 +1,179 @@ +name: Integration Test Runner + +on: + workflow_call: + inputs: + test_type: + description: 'Type of tests to run: integration or 2pc' + required: true + type: string + +env: + MYSQL_ROOT_PASSWORD: root1234 + +jobs: + run-tests: + name: ${{ inputs.test_type == '2pc' && '2PC Integration Tests' || 'Integration Tests' }} + environment: integration_tests + runs-on: 32gb-runner + timeout-minutes: 45 + steps: + - name: Checkout code + uses: actions/checkout@v3 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version-file: "go.mod" + + - name: Set up Java for Maven + uses: actions/setup-java@v3 + with: + distribution: 'temurin' + java-version: '17' + + - name: Install DB2 CLI Driver + run: | + go run github.com/ibmdb/go_ibm_db/installer@v0.5.4 + cp -r "$(go env GOPATH)/pkg/mod/github.com/ibmdb/clidriver" ./clidriver + IBM_DB_HOME="$(pwd)/clidriver" + echo "IBM_DB_HOME=$IBM_DB_HOME" >> $GITHUB_ENV + echo "CGO_CFLAGS=-I$IBM_DB_HOME/include" >> $GITHUB_ENV + echo "CGO_LDFLAGS=-L$IBM_DB_HOME/lib -Wl,-rpath,$IBM_DB_HOME/lib" >> $GITHUB_ENV + echo "LD_LIBRARY_PATH=$IBM_DB_HOME/lib" >> $GITHUB_ENV + + - name: Start Test Infrastructure + run: | + docker compose -f ./drivers/mysql/docker-compose.yml up -d + docker compose -f ./drivers/postgres/docker-compose.yml up -d + docker compose -f ./drivers/mongodb/docker-compose.yml up -d + docker compose -f ./drivers/oracle/docker-compose.yml up -d + docker compose -f ./drivers/db2/docker-compose.yml up -d + docker compose -f ./drivers/mssql/docker-compose.yml up -d + docker compose -f ./destination/iceberg/local-test/docker-compose.yml up minio mc postgres spark-iceberg -d + + - name: Start Kafka + if: inputs.test_type == 'integration' + run: docker compose -f ./drivers/kafka/docker-compose.yml up -d + + - name: Wait for MySQL + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec olake_mysql-test mysql -h localhost -u root -p${{ env.MYSQL_ROOT_PASSWORD }} -e "SELECT 1" + + - name: Wait for PostgreSQL + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec olake_postgres-test psql -h localhost -U postgres -d postgres -c "SELECT 1" + + - name: Wait for MongoDB + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec primary_mongo mongosh --host localhost --port 27017 -u mongodb -p secure_password123 --authenticationDatabase admin --eval "db.adminCommand('ping')" + + - name: Wait for Oracle + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec oracle-23c bash -c "echo 'SELECT 1 FROM dual;' | sqlplus -s system/secret1234@//localhost:1521/ORCL" + + - name: Wait for DB2 + uses: nick-fields/retry@v2 + with: + timeout_minutes: 10 + max_attempts: 30 + retry_wait_seconds: 25 + command: | + docker exec db2-test bash -c "su - db2inst1 -c 'db2 connect to TESTDB'" + + - name: Wait for Kafka + if: inputs.test_type == 'integration' + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec kafkaJson kafka-topics --bootstrap-server localhost:9092 --list + docker exec kafkaAvro kafka-topics --bootstrap-server localhost:9092 --list + curl -f http://localhost:8081/subjects + + - name: Wait for MSSQL + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec olake-mssql /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P 'Password!123' -C -Q "SELECT 1" + + - name: Initialize MSSQL Database + run: | + docker exec olake-mssql /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P 'Password!123' -C -d master -i /docker-entrypoint-initdb.d/01-init.sql + + - name: Set up Data Directories + run: | + sudo mkdir -p /home/runner/work/olake/olake/destination/iceberg/local-test/data/postgres-data + sudo mkdir -p /home/runner/work/olake/olake/destination/iceberg/local-test/data/minio-data + sudo mkdir -p /home/runner/work/olake/olake/destination/iceberg/local-test/data/ivy-cache + sudo chown -R 999:999 /home/runner/work/olake/olake/destination/iceberg/local-test/data + sudo chmod -R 777 /home/runner/work/olake/olake/destination/iceberg/local-test/data + + - name: Install Go Dependencies + run: go mod download + + - name: Build Project + run: go build -v ./... + + - name: Cache Maven dependencies + uses: actions/cache@v4 + with: + path: | + ~/.m2/repository + key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} + restore-keys: | + ${{ runner.os }}-maven- + + - name: Build Iceberg Sink + working-directory: ./destination/iceberg/olake-iceberg-java-writer + run: mvn clean package -DskipTests + + - name: Run Integration Tests + if: inputs.test_type == 'integration' + run: | + go test -v -p 7 ./drivers/kafka/internal/... ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... ./drivers/mssql/internal/... -timeout 0 -run 'Integration' + + - name: Run 2PC Integration Tests + if: inputs.test_type == '2pc' + run: | + go test -v -p 6 ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/mssql/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... -timeout 0 -run '2PC' + + - name: Cleanup + if: always() + run: | + docker compose -f ./destination/iceberg/local-test/docker-compose.yml down + docker compose -f ./drivers/mysql/docker-compose.yml down + docker compose -f ./drivers/postgres/docker-compose.yml down + docker compose -f ./drivers/oracle/docker-compose.yml down + docker compose -f ./drivers/mongodb/docker-compose.yml down + docker compose -f ./drivers/db2/docker-compose.yml down + docker compose -f ./drivers/mssql/docker-compose.yml down + + - name: Cleanup Kafka + if: always() && inputs.test_type == 'integration' + run: docker compose -f ./drivers/kafka/docker-compose.yml down diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 3b57d4a59..d2b68fcf7 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -1,4 +1,5 @@ name: Integration Tests + on: push: branches: @@ -13,159 +14,8 @@ on: - '**/*.go' - '**/*.java' -env: - MYSQL_ROOT_PASSWORD: root1234 - jobs: integration-tests: - environment: integration_tests - runs-on: 32gb-runner - timeout-minutes: 45 - steps: - - name: Checkout code - uses: actions/checkout@v3 - - - name: Set up Go - uses: actions/setup-go@v4 - with: - go-version-file: "go.mod" - - - name: Set up Java for Maven - uses: actions/setup-java@v3 - with: - distribution: 'temurin' - java-version: '17' - - - name: Install DB2 CLI Driver - run: | - go run github.com/ibmdb/go_ibm_db/installer@v0.5.4 - cp -r "$(go env GOPATH)/pkg/mod/github.com/ibmdb/clidriver" ./clidriver - IBM_DB_HOME="$(pwd)/clidriver" - echo "IBM_DB_HOME=$IBM_DB_HOME" >> $GITHUB_ENV - echo "CGO_CFLAGS=-I$IBM_DB_HOME/include" >> $GITHUB_ENV - echo "CGO_LDFLAGS=-L$IBM_DB_HOME/lib -Wl,-rpath,$IBM_DB_HOME/lib" >> $GITHUB_ENV - echo "LD_LIBRARY_PATH=$IBM_DB_HOME/lib" >> $GITHUB_ENV - - - name: Start Test Infrastructure - run: | - docker compose -f ./drivers/mysql/docker-compose.yml up -d - docker compose -f ./drivers/postgres/docker-compose.yml up -d - docker compose -f ./drivers/mongodb/docker-compose.yml up -d - docker compose -f ./drivers/oracle/docker-compose.yml up -d - docker compose -f ./drivers/db2/docker-compose.yml up -d - docker compose -f ./drivers/mssql/docker-compose.yml up -d - docker compose -f ./drivers/kafka/docker-compose.yml up -d - docker compose -f ./destination/iceberg/local-test/docker-compose.yml up minio mc postgres spark-iceberg -d - - - name: Wait for MySQL - uses: nick-fields/retry@v2 - with: - timeout_minutes: 5 - max_attempts: 30 - retry_wait_seconds: 5 - command: | - docker exec olake_mysql-test mysql -h localhost -u root -p${{ env.MYSQL_ROOT_PASSWORD }} -e "SELECT 1" - - - name: Wait for PostgreSQL - uses: nick-fields/retry@v2 - with: - timeout_minutes: 5 - max_attempts: 30 - retry_wait_seconds: 5 - command: | - docker exec olake_postgres-test psql -h localhost -U postgres -d postgres -c "SELECT 1" - - - name: Wait for MongoDB - uses: nick-fields/retry@v2 - with: - timeout_minutes: 5 - max_attempts: 30 - retry_wait_seconds: 5 - command: | - docker exec primary_mongo mongosh --host localhost --port 27017 -u mongodb -p secure_password123 --authenticationDatabase admin --eval "db.adminCommand('ping')" - - - name: Wait for Oracle - uses: nick-fields/retry@v2 - with: - timeout_minutes: 5 - max_attempts: 30 - retry_wait_seconds: 5 - command: | - docker exec oracle-23c bash -c "echo 'SELECT 1 FROM dual;' | sqlplus -s system/secret1234@//localhost:1521/ORCL" - - - name: Wait for DB2 - uses: nick-fields/retry@v2 - with: - timeout_minutes: 10 - max_attempts: 30 - retry_wait_seconds: 25 - command: | - docker exec db2-test bash -c "su - db2inst1 -c 'db2 connect to TESTDB'" - - - name: Wait for Kafka - uses: nick-fields/retry@v2 - with: - timeout_minutes: 5 - max_attempts: 30 - retry_wait_seconds: 5 - command: | - docker exec kafkaJson kafka-topics --bootstrap-server localhost:9092 --list - docker exec kafkaAvro kafka-topics --bootstrap-server localhost:9092 --list - curl -f http://localhost:8081/subjects - - - name: Wait for MSSQL - uses: nick-fields/retry@v2 - with: - timeout_minutes: 5 - max_attempts: 30 - retry_wait_seconds: 5 - command: | - docker exec olake-mssql /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P 'Password!123' -C -Q "SELECT 1" - - - name: Initialize MSSQL Database - run: | - docker exec olake-mssql /opt/mssql-tools18/bin/sqlcmd -S localhost -U sa -P 'Password!123' -C -d master -i /docker-entrypoint-initdb.d/01-init.sql - - - - name: Set up Data Directories - run: | - sudo mkdir -p /home/runner/work/olake/olake/destination/iceberg/local-test/data/postgres-data - sudo mkdir -p /home/runner/work/olake/olake/destination/iceberg/local-test/data/minio-data - sudo mkdir -p /home/runner/work/olake/olake/destination/iceberg/local-test/data/ivy-cache - sudo chown -R 999:999 /home/runner/work/olake/olake/destination/iceberg/local-test/data - sudo chmod -R 777 /home/runner/work/olake/olake/destination/iceberg/local-test/data - - - name: Install Go Dependencies - run: go mod download - - - name: Build Project - run: go build -v ./... - - - name: Cache Maven dependencies - uses: actions/cache@v4 - with: - path: | - ~/.m2/repository - key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: | - ${{ runner.os }}-maven- - - - name: Build Iceberg Sink - working-directory: ./destination/iceberg/olake-iceberg-java-writer - run: mvn clean package -DskipTests - - - name: Run Integration Tests - run: | - go test -v -p 7 ./drivers/kafka/internal/... ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... ./drivers/mssql/internal/... -timeout 0 -run 'Integration' - - - name: Cleanup - if: always() - run: | - docker compose -f ./destination/iceberg/local-test/docker-compose.yml down - docker compose -f ./drivers/mysql/docker-compose.yml down - docker compose -f ./drivers/postgres/docker-compose.yml down - docker compose -f ./drivers/oracle/docker-compose.yml down - docker compose -f ./drivers/mongodb/docker-compose.yml down - docker compose -f ./drivers/db2/docker-compose.yml down - docker compose -f ./drivers/mssql/docker-compose.yml down - docker compose -f ./drivers/kafka/docker-compose.yml down + uses: ./.github/workflows/integration-tests-runner.yml + with: + test_type: integration diff --git a/drivers/db2/go.mod b/drivers/db2/go.mod index 50bf204eb..22eae54af 100644 --- a/drivers/db2/go.mod +++ b/drivers/db2/go.mod @@ -8,7 +8,7 @@ require ( github.com/ibmdb/go_ibm_db v0.4.5 github.com/jmoiron/sqlx v1.4.0 github.com/stretchr/testify v1.11.1 - golang.org/x/crypto v0.50.0 + golang.org/x/crypto v0.52.0 ) require ( @@ -135,14 +135,14 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/text v0.36.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/text v0.37.0 // indirect + golang.org/x/tools v0.44.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect diff --git a/drivers/db2/internal/db2_test.go b/drivers/db2/internal/db2_test.go index 10607a92f..92437d473 100644 --- a/drivers/db2/internal/db2_test.go +++ b/drivers/db2/internal/db2_test.go @@ -7,20 +7,19 @@ import ( "github.com/datazip-inc/olake/utils/testutils" ) -func TestDB2Integration(t *testing.T) { - t.Parallel() - testConfig := &testutils.IntegrationTest{ - TestConfig: testutils.GetTestConfig(string(constants.DB2)), - Namespace: "DB2INST1", - ExpectedData: ExpectedDB2Data, - ExpectedUpdatedData: ExpectedUpdatedDB2Data, - DestinationDataTypeSchema: DB2ToDestinationSchema, - UpdatedDestinationDataTypeSchema: UpdatedDB2ToDestinationSchema, - ExecuteQuery: ExecuteQuery, - DestinationDB: "db2_testdb_db2inst1", - CursorField: "COL_CURSOR:COL_TIMESTAMP", - PartitionRegex: "/{id, identity}", - ColumnToExclude: "EXCLUDEDCOLUMN", +// db2BaseConfig returns an IntegrationTest pre-populated with all fields shared +// between TestDB2Integration and TestDB22PC. +func db2BaseConfig() *testutils.IntegrationTest { + return &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.DB2)), + Namespace: "DB2INST1", + ExpectedData: ExpectedDB2Data, + DestinationDataTypeSchema: DB2ToDestinationSchema, + ExecuteQuery: ExecuteQuery, + DestinationDB: "db2_testdb_db2inst1", + CursorField: "COL_CURSOR:COL_TIMESTAMP", + PartitionRegex: "/{id, identity}", + ColumnToExclude: "EXCLUDEDCOLUMN", FilterConfig: `{ "logical_operator": "And", "conditions": [ @@ -37,5 +36,17 @@ func TestDB2Integration(t *testing.T) { ] }`, } - testConfig.TestIntegration(t) +} + +func TestDB2Integration(t *testing.T) { + t.Parallel() + cfg := db2BaseConfig() + cfg.ExpectedUpdatedData = ExpectedUpdatedDB2Data + cfg.UpdatedDestinationDataTypeSchema = UpdatedDB2ToDestinationSchema + cfg.TestIntegration(t) +} + +func TestDB22PC(t *testing.T) { + t.Parallel() + db2BaseConfig().Test2PCIntegration(t) } diff --git a/drivers/db2/internal/db2_test_util.go b/drivers/db2/internal/db2_test_util.go index 5d4580b02..fe681b180 100644 --- a/drivers/db2/internal/db2_test_util.go +++ b/drivers/db2/internal/db2_test_util.go @@ -119,6 +119,26 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation require.NoError(t, err, "Failed to insert filtered test data row") return + case "insert_2pc": + query = fmt.Sprintf(` + INSERT INTO %s ( + col_cursor, col_bigint, col_char, col_character, + col_varchar, col_date, col_decimal, + col_double, col_real, col_int, col_smallint, + col_clob, col_blob, col_timestamp, col_time, + col_graphic, col_vargraphic, col_bool + ) VALUES ( + 7, 12345678901234, 'c', 'char_val', + 'varchar_val', DATE('2023-01-01'), 123.45, + 123.456789, 123.5, 123, 123, + CLOB('sample text'), BLOB(X'424C4F422044415441204F4E45'), + TIMESTAMP('2023-01-01-12.00.00.000000'), + TIME('12.00.00'), + GRAPHIC('graphic_val'), + VARGRAPHIC('vargraphic_val'), + TRUE + )`, integrationTestTable) + case "update": query = fmt.Sprintf(` UPDATE %s SET diff --git a/drivers/kafka/go.mod b/drivers/kafka/go.mod index 2af883c36..6038b96f4 100644 --- a/drivers/kafka/go.mod +++ b/drivers/kafka/go.mod @@ -138,16 +138,16 @@ require ( go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.50.0 // indirect + golang.org/x/crypto v0.52.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/text v0.36.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/text v0.37.0 // indirect + golang.org/x/tools v0.44.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect diff --git a/drivers/mongodb/go.mod b/drivers/mongodb/go.mod index eaab511b3..bcbd8ba7a 100644 --- a/drivers/mongodb/go.mod +++ b/drivers/mongodb/go.mod @@ -132,10 +132,10 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect + golang.org/x/mod v0.35.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect @@ -159,11 +159,11 @@ require ( github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect go.mongodb.org/mongo-driver v1.17.3 - golang.org/x/crypto v0.50.0 - golang.org/x/net v0.53.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/text v0.36.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/crypto v0.52.0 + golang.org/x/net v0.54.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/text v0.37.0 // indirect + golang.org/x/tools v0.44.0 // indirect ) replace ( diff --git a/drivers/mongodb/internal/cdc.go b/drivers/mongodb/internal/cdc.go index e2ad515c8..312e18c18 100644 --- a/drivers/mongodb/internal/cdc.go +++ b/drivers/mongodb/internal/cdc.go @@ -29,11 +29,12 @@ const ( var ErrIdleTermination = errors.New("change stream terminated due to idle timeout") type CDCDocument struct { - OperationType string `json:"operationType"` - FullDocument map[string]any `json:"fullDocument"` - ClusterTime primitive.Timestamp `json:"clusterTime"` - WallTime primitive.DateTime `json:"wallTime"` - DocumentKey map[string]any `json:"documentKey"` + OperationType string `json:"operationType"` + FullDocument map[string]any `json:"fullDocument"` + FullDocumentBeforeChange map[string]any `json:"fullDocumentBeforeChange"` + ClusterTime primitive.Timestamp `json:"clusterTime"` + WallTime primitive.DateTime `json:"wallTime"` + DocumentKey map[string]any `json:"documentKey"` } func (m *Mongo) ChangeStreamConfig() (bool, bool, bool) { @@ -45,7 +46,7 @@ func (m *Mongo) PreCDC(cdcCtx context.Context, streams []types.StreamInterface) collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name()) pipeline := mongo.Pipeline{ {{Key: "$match", Value: bson.D{ - {Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "delete"}}}}, + {Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "replace", "delete"}}}}, }}}, } @@ -78,11 +79,12 @@ func (m *Mongo) StreamChanges(ctx context.Context, streamIndex int, metadataStat // state >= metadata → state is current or ahead; read forward normally. if mtState != nil { // TODO: addition of all the state updations in metadata file even for blank sync scenario - // metadata > state → crash-recovery path (metadata committed but state write failed). + // metadata > state → crash-recovery path (metadata committed but state write failed), no further sync for this stream just update the state to metadata resume token. // state >= metadata → read forward normally. if typeutils.Compare(prevResumeToken, mtState) < 0 { logger.Infof("Stream[%s] metadata ahead of state, using metadata resume token for recovery", stream.ID()) - prevResumeToken = mtState + m.cdcCursor.Store(stream.ID(), mtState) + return mtState, nil } } @@ -94,10 +96,10 @@ func (m *Mongo) StreamChanges(ctx context.Context, streamIndex int, metadataStat } pipeline := mongo.Pipeline{ {{Key: "$match", Value: bson.D{ - {Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "delete"}}}}, + {Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "replace", "delete"}}}}, }}}, } - changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(maxAwait) + changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetFullDocumentBeforeChange(options.WhenAvailable).SetMaxAwaitTime(maxAwait) collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name()) changeStreamOpts = changeStreamOpts.SetResumeAfter(map[string]any{cdcCursorField: prevResumeToken}) @@ -167,9 +169,19 @@ func (m *Mongo) handleChangeDoc(ctx context.Context, cursor *mongo.ChangeStream, return fmt.Errorf("error while decoding: %s", err) } - if record.OperationType == "delete" { - // replace full document(null) with documentKey - record.FullDocument = record.DocumentKey + record.OperationType = normalizeOperationType(record.OperationType) + + switch record.OperationType { + case "delete": + if record.FullDocumentBeforeChange != nil { + record.FullDocument = record.FullDocumentBeforeChange + } else { + record.FullDocument = record.DocumentKey + } + case "update": + if record.FullDocument == nil && record.FullDocumentBeforeChange != nil { + record.FullDocument = record.FullDocumentBeforeChange + } } filterMongoObject(record.FullDocument) @@ -297,3 +309,15 @@ func GetResumeToken(cursor *mongo.ChangeStream, streamID string) (string, error) return token, nil } + +// normalizeOperationType maps MongoDB-specific operation types to the standard +// set understood by the abstract CDC layer (insert, update, delete). +// "replace" swaps the full document but keeps _id unchanged, so it is treated as an update. +func normalizeOperationType(opType string) string { + switch opType { + case "replace": + return "update" + default: + return opType + } +} diff --git a/drivers/mongodb/internal/mon_test.go b/drivers/mongodb/internal/mon_test.go index 0eca85c44..8f4eca7c8 100644 --- a/drivers/mongodb/internal/mon_test.go +++ b/drivers/mongodb/internal/mon_test.go @@ -7,21 +7,20 @@ import ( "github.com/datazip-inc/olake/utils/testutils" ) -func TestMongodbIntegration(t *testing.T) { - t.Parallel() - testConfig := &testutils.IntegrationTest{ - TestConfig: testutils.GetTestConfig(string(constants.MongoDB)), - Namespace: "olake_mongodb_test", - ExpectedData: ExpectedMongoData, - ExpectedUpdatedData: ExpectedUpdatedData, - DestinationDataTypeSchema: MongoToDestinationSchema, - UpdatedDestinationDataTypeSchema: UpdatedMongoToDestinationSchema, - DefaultCDCColumnsSchema: ExpectedMongoDbDefaultCDCColumnsSchema, - ExecuteQuery: ExecuteQuery, - DestinationDB: "mongodb_olake_mongodb_test", - CursorField: "id_cursor:id_int", - PartitionRegex: "/{_id,identity}", - ColumnToExclude: "excludedColumn", +// mongodbBaseConfig returns an IntegrationTest pre-populated with all fields shared +// between TestMongodbIntegration and TestMongodb2PC. +func mongodbBaseConfig() *testutils.IntegrationTest { + return &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.MongoDB)), + Namespace: "olake_mongodb_test", + ExpectedData: ExpectedMongoData, + DestinationDataTypeSchema: MongoToDestinationSchema, + DefaultCDCColumnsSchema: ExpectedMongoDbDefaultCDCColumnsSchema, + ExecuteQuery: ExecuteQuery, + DestinationDB: "mongodb_olake_mongodb_test", + CursorField: "id_cursor:id_int", + PartitionRegex: "/{_id,identity}", + ColumnToExclude: "excludedColumn", FilterConfig: `{ "logical_operator": "And", "conditions": [ @@ -38,7 +37,19 @@ func TestMongodbIntegration(t *testing.T) { ] }`, } - testConfig.TestIntegration(t) +} + +func TestMongodbIntegration(t *testing.T) { + t.Parallel() + cfg := mongodbBaseConfig() + cfg.ExpectedUpdatedData = ExpectedUpdatedData + cfg.UpdatedDestinationDataTypeSchema = UpdatedMongoToDestinationSchema + cfg.TestIntegration(t) +} + +func TestMongodb2PC(t *testing.T) { + t.Parallel() + mongodbBaseConfig().Test2PCIntegration(t) } func TestMongodbPerformance(t *testing.T) { diff --git a/drivers/mongodb/internal/mon_test_util.go b/drivers/mongodb/internal/mon_test_util.go index 2d3d9ddb7..f7891038c 100644 --- a/drivers/mongodb/internal/mon_test_util.go +++ b/drivers/mongodb/internal/mon_test_util.go @@ -118,6 +118,25 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation _, err = collection.InsertOne(ctx, filteredDoc) require.NoError(t, err, "Failed to insert filtered test data row") + case "insert_2pc": + doc2 := bson.M{ + "id_bigint": int64(123456789012345), + "id_int": int32(100), + "id_timestamp": time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC), + "id_double": float64(123.456), + "id_bool": true, + "id_cursor": int32(7), + "created_timestamp": primitive.Timestamp{T: uint32(1754905992), I: 1}, + "id_nil": nil, + "id_regex": primitive.Regex{Pattern: "test.*", Options: "i"}, + "id_nested": nestedDoc, + "id_minkey": primitive.MinKey{}, + "id_maxkey": primitive.MaxKey{}, + "name_varchar": "varchar_val", + } + _, err2 := collection.InsertOne(ctx, doc2) + require.NoError(t, err2, "Failed to insert document (insert_2pc)") + case "update": filter := bson.M{"id": int32(1)} update := bson.M{ diff --git a/drivers/mssql/go.mod b/drivers/mssql/go.mod index 76b1d5cb9..ffdd71f88 100644 --- a/drivers/mssql/go.mod +++ b/drivers/mssql/go.mod @@ -13,7 +13,7 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/microsoft/go-mssqldb v1.9.5 github.com/stretchr/testify v1.11.1 - golang.org/x/crypto v0.50.0 + golang.org/x/crypto v0.52.0 ) require ( @@ -143,14 +143,14 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/text v0.36.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/text v0.37.0 // indirect + golang.org/x/tools v0.44.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect diff --git a/drivers/mssql/internal/mssql_test.go b/drivers/mssql/internal/mssql_test.go index 2bf8e8579..072e62cbf 100644 --- a/drivers/mssql/internal/mssql_test.go +++ b/drivers/mssql/internal/mssql_test.go @@ -7,21 +7,20 @@ import ( "github.com/datazip-inc/olake/utils/testutils" ) -func TestMSSQLIntegration(t *testing.T) { - t.Parallel() - testConfig := &testutils.IntegrationTest{ - TestConfig: testutils.GetTestConfig(string(constants.MSSQL)), - Namespace: "dbo", - ExpectedData: ExpectedMSSQLData, - ExpectedUpdatedData: ExpectedUpdatedMSSQLData, - DestinationDataTypeSchema: MSSQLToDestinationSchema, - UpdatedDestinationDataTypeSchema: MSSQLToDestinationSchema, - DefaultCDCColumnsSchema: ExpectedMSSQLDefaultCDCColumnsSchema, - ExecuteQuery: ExecuteQuery, - ColumnToExclude: "excludedColumn", - DestinationDB: "mssql_olake_mssql_test_dbo", - CursorField: "id_cursor:col_int", - PartitionRegex: "/{id,identity}", +// mssqlBaseConfig returns an IntegrationTest pre-populated with all fields shared +// between TestMSSQLIntegration and TestMSSQL2PC. +func mssqlBaseConfig() *testutils.IntegrationTest { + return &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.MSSQL)), + Namespace: "dbo", + ExpectedData: ExpectedMSSQLData, + DestinationDataTypeSchema: MSSQLToDestinationSchema, + DefaultCDCColumnsSchema: ExpectedMSSQLDefaultCDCColumnsSchema, + ExecuteQuery: ExecuteQuery, + ColumnToExclude: "excludedColumn", + DestinationDB: "mssql_olake_mssql_test_dbo", + CursorField: "id_cursor:col_int", + PartitionRegex: "/{id,identity}", FilterConfig: `{ "logical_operator": "And", "conditions": [ @@ -38,5 +37,17 @@ func TestMSSQLIntegration(t *testing.T) { ] }`, } - testConfig.TestIntegration(t) +} + +func TestMSSQLIntegration(t *testing.T) { + t.Parallel() + cfg := mssqlBaseConfig() + cfg.ExpectedUpdatedData = ExpectedUpdatedMSSQLData + cfg.UpdatedDestinationDataTypeSchema = MSSQLToDestinationSchema + cfg.TestIntegration(t) +} + +func TestMSSQL2PC(t *testing.T) { + t.Parallel() + mssqlBaseConfig().Test2PCIntegration(t) } diff --git a/drivers/mssql/internal/mssql_test_util.go b/drivers/mssql/internal/mssql_test_util.go index 4591f5ceb..7a02ba1ca 100644 --- a/drivers/mssql/internal/mssql_test_util.go +++ b/drivers/mssql/internal/mssql_test_util.go @@ -233,10 +233,43 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation _, err = db.ExecContext(ctx, filteredQuery) require.NoError(t, err, "failed to insert filtered CDC row") + case "insert_2pc": + insertTwo := fmt.Sprintf(` + INSERT INTO dbo.%s ( + id_cursor, + col_tinyint, col_smallint, col_int, col_bigint, + col_decimal, col_numeric, col_smallmoney, col_money, + col_float, col_real, col_bit, + col_char, col_varchar, col_text, col_nchar, col_nvarchar, col_ntext, + col_date, col_time, col_smalldatetime, col_datetime, col_datetime2, col_datetimeoffset, + col_uniqueidentifier, + col_xml, col_sysname, + col_image, col_hierarchyid, col_sql_variant, + col_int_nullable, col_varchar_nullable, col_datetime2_nullable, + created_at + ) VALUES ( + 7, + 3, 5, 10, 19, + 123.50, 10.12500, 1.2500, 2.5000, + 123.50, 12.50, 1, + 'char_val__', 'varchar_val', 'text_val', N'nchar_val_', N'nvarchar_val', N'ntext_val', + '2023-01-01', '12:00:00', '2023-01-01 12:00:00', '2023-01-01 12:00:00', + '2023-01-01 12:00:00', '2023-01-01 12:00:00 +00:00', + '123e4567-e89b-12d3-a456-426614174000', + 'test', 'sysname_val', + 0x43434343, + hierarchyid::Parse('/1/1/'), CAST('variant_base' AS sql_variant), + NULL, NULL, NULL, + '2023-01-01 12:00:00' + ); + `, integrationTestTable) + _, err2 := db.ExecContext(ctx, insertTwo) + require.NoError(t, err2, "failed to insert CDC row (insert_2pc)") + case "update": updateRow := fmt.Sprintf(` UPDATE dbo.%s SET - id_cursor = 7, + id_cursor = 100, col_bigint = 20, col_decimal = 543.25, col_money = 9.7500, @@ -253,7 +286,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation col_datetime2_nullable = '2024-07-01 15:30:00', created_at = '2024-07-01 15:30:00', excludedColumn = 102 - WHERE id = 6; + WHERE id = 1; `, integrationTestTable) _, err := db.ExecContext(ctx, updateRow) require.NoError(t, err, "failed to update CDC row") diff --git a/drivers/mysql/go.mod b/drivers/mysql/go.mod index 7544e96d7..263481c99 100644 --- a/drivers/mysql/go.mod +++ b/drivers/mysql/go.mod @@ -111,10 +111,10 @@ require ( go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/trace v1.43.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect + golang.org/x/mod v0.35.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/tools v0.44.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect @@ -174,11 +174,11 @@ require ( github.com/spf13/viper v1.20.1 // indirect github.com/xitongsys/parquet-go v1.6.2 // indirect github.com/xitongsys/parquet-go-source v0.0.0-20241021075129-b732d2ac9c9b // indirect - golang.org/x/crypto v0.50.0 - golang.org/x/net v0.53.0 // indirect + golang.org/x/crypto v0.52.0 + golang.org/x/net v0.54.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/text v0.36.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/text v0.37.0 // indirect ) replace ( diff --git a/drivers/mysql/internal/mysql_test.go b/drivers/mysql/internal/mysql_test.go index aaad7e47d..0d844f934 100644 --- a/drivers/mysql/internal/mysql_test.go +++ b/drivers/mysql/internal/mysql_test.go @@ -7,21 +7,20 @@ import ( "github.com/datazip-inc/olake/utils/testutils" ) -func TestMySQLIntegration(t *testing.T) { - t.Parallel() - testConfig := &testutils.IntegrationTest{ - TestConfig: testutils.GetTestConfig(string(constants.MySQL)), - Namespace: "olake_mysql_test", - ExpectedData: ExpectedMySQLData, - ExpectedUpdatedData: ExpectedUpdatedData, - DestinationDataTypeSchema: MySQLToDestinationSchema, - UpdatedDestinationDataTypeSchema: EvolvedMySQLToDestinationSchema, - DefaultCDCColumnsSchema: ExpectedMySQLDefaultCDCColumnsSchema, - ExecuteQuery: ExecuteQuery, - DestinationDB: "mysql_olake_mysql_test", - CursorField: "id_cursor:id_smallint", - PartitionRegex: "/{id,identity}", - ColumnToExclude: "excludedColumn", +// mysqlBaseConfig returns an IntegrationTest pre-populated with all fields shared +// between TestMySQLIntegration and TestMySQL2PC. +func mysqlBaseConfig() *testutils.IntegrationTest { + return &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.MySQL)), + Namespace: "olake_mysql_test", + ExpectedData: ExpectedMySQLData, + DestinationDataTypeSchema: MySQLToDestinationSchema, + DefaultCDCColumnsSchema: ExpectedMySQLDefaultCDCColumnsSchema, + ExecuteQuery: ExecuteQuery, + DestinationDB: "mysql_olake_mysql_test", + CursorField: "id_cursor:id_smallint", + PartitionRegex: "/{id,identity}", + ColumnToExclude: "excludedColumn", FilterConfig: `{ "logical_operator": "And", "conditions": [ @@ -38,7 +37,19 @@ func TestMySQLIntegration(t *testing.T) { ] }`, } - testConfig.TestIntegration(t) +} + +func TestMySQLIntegration(t *testing.T) { + t.Parallel() + cfg := mysqlBaseConfig() + cfg.ExpectedUpdatedData = ExpectedUpdatedData + cfg.UpdatedDestinationDataTypeSchema = EvolvedMySQLToDestinationSchema + cfg.TestIntegration(t) +} + +func TestMySQL2PC(t *testing.T) { + t.Parallel() + mysqlBaseConfig().Test2PCIntegration(t) } func TestMySQLPerformance(t *testing.T) { diff --git a/drivers/mysql/internal/mysql_test_util.go b/drivers/mysql/internal/mysql_test_util.go index 47b7bb65d..7c790c5e5 100644 --- a/drivers/mysql/internal/mysql_test_util.go +++ b/drivers/mysql/internal/mysql_test_util.go @@ -164,6 +164,35 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation require.NoError(t, err, "Failed to insert filtered test data row") return + case "insert_2pc": + query = fmt.Sprintf(` + INSERT INTO %s ( + id_cursor, id, id_bigint, + id_int, id_int_unsigned, id_integer, id_integer_unsigned, + id_mediumint, id_mediumint_unsigned, id_smallint, id_smallint_unsigned, + id_tinyint, id_tinyint_unsigned, price_decimal, amount_decimal_9_2, price_double, + price_double_precision, price_float, price_numeric, price_real, + name_char, name_varchar, name_text, name_tinytext, + name_mediumtext, name_longtext, created_date, + created_timestamp, is_active, + long_varchar, name_bool, status, priority, + name_latin1, name_ucs2, name_utf16le, grade, + tags, permissions + ) VALUES ( + 7, 7, 123456789012345, + 100, 101, 102, 103, + 5001, 5002, 101, 102, + 50, 51, + 123.45, 5330197.27, 123.456, + 123.456, 123.45, 123.45, 123.456, + 'c', 'varchar_val', 'text_val', 'tinytext_val', + 'mediumtext_val', 'longtext_val', '2023-01-01 12:00:00', + '2023-01-01 12:00:00', 1, + 'long_varchar_val', 1, 'active', 'high', + 'latin1_val', 'ucs2_val', 'utf16le_val', 'naïve', + 'sports,reading', 'read,write' + )`, integrationTestTable) + case "update": query = fmt.Sprintf(` UPDATE %s SET @@ -189,7 +218,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation tags = 'gaming,reading', permissions = 'read,write,execute', excludedColumn = 102, includedColumn = 202 - WHERE id = 6`, integrationTestTable) + WHERE id = 1`, integrationTestTable) case "delete": query = fmt.Sprintf("DELETE FROM %s WHERE id = 1", integrationTestTable) diff --git a/drivers/oracle/go.mod b/drivers/oracle/go.mod index d001f530c..a689c4735 100644 --- a/drivers/oracle/go.mod +++ b/drivers/oracle/go.mod @@ -14,7 +14,7 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/sijms/go-ora/v2 v2.8.24 github.com/stretchr/testify v1.11.1 - golang.org/x/crypto v0.50.0 + golang.org/x/crypto v0.52.0 ) require ( @@ -148,14 +148,14 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/text v0.36.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/text v0.37.0 // indirect + golang.org/x/tools v0.44.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect diff --git a/drivers/oracle/internal/oracle_test.go b/drivers/oracle/internal/oracle_test.go index 2fdcd4d74..00aa72de6 100644 --- a/drivers/oracle/internal/oracle_test.go +++ b/drivers/oracle/internal/oracle_test.go @@ -7,20 +7,19 @@ import ( "github.com/datazip-inc/olake/utils/testutils" ) -func TestOracleIntegration(t *testing.T) { - t.Parallel() - testConfig := &testutils.IntegrationTest{ - TestConfig: testutils.GetTestConfig(string(constants.Oracle)), - Namespace: "MYUSER", - ExpectedData: ExpectedOracleData, - ExpectedUpdatedData: ExpectedUpdatedOracleData, - DestinationDataTypeSchema: OracleToDestinationSchema, - UpdatedDestinationDataTypeSchema: UpdatedOracleToDestinationSchema, - ExecuteQuery: ExecuteQuery, - DestinationDB: "oracle_myuser", - CursorField: "COL_CURSOR:COL_SMALLINT", - PartitionRegex: "/{id, identity}", - ColumnToExclude: "EXCLUDEDCOLUMN", +// oracleBaseConfig returns an IntegrationTest pre-populated with all fields shared +// between TestOracleIntegration and TestOracle2PC. +func oracleBaseConfig() *testutils.IntegrationTest { + return &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.Oracle)), + Namespace: "MYUSER", + ExpectedData: ExpectedOracleData, + DestinationDataTypeSchema: OracleToDestinationSchema, + ExecuteQuery: ExecuteQuery, + DestinationDB: "oracle_myuser", + CursorField: "COL_CURSOR:COL_SMALLINT", + PartitionRegex: "/{id, identity}", + ColumnToExclude: "EXCLUDEDCOLUMN", FilterConfig: `{ "logical_operator": "And", "conditions": [ @@ -37,5 +36,17 @@ func TestOracleIntegration(t *testing.T) { ] }`, } - testConfig.TestIntegration(t) +} + +func TestOracleIntegration(t *testing.T) { + t.Parallel() + cfg := oracleBaseConfig() + cfg.ExpectedUpdatedData = ExpectedUpdatedOracleData + cfg.UpdatedDestinationDataTypeSchema = UpdatedOracleToDestinationSchema + cfg.TestIntegration(t) +} + +func TestOracle2PC(t *testing.T) { + t.Parallel() + oracleBaseConfig().Test2PCIntegration(t) } diff --git a/drivers/oracle/internal/oracle_test_util.go b/drivers/oracle/internal/oracle_test_util.go index 1eb7e4263..140834bd0 100644 --- a/drivers/oracle/internal/oracle_test_util.go +++ b/drivers/oracle/internal/oracle_test_util.go @@ -112,6 +112,23 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation require.NoError(t, err, "Failed to insert filtered test data row") return + case "insert_2pc": + query = fmt.Sprintf(` + INSERT INTO %s ( + col_cursor, col_bigint, col_char, col_character, + col_varchar2, col_date, col_decimal, + col_double_precision, col_float, col_int, col_smallint, + col_integer, col_clob, col_nclob, col_timestamp, col_timestamptz, col_timestampltz + ) VALUES ( + 7, 123456789012345, 'c', 'char_val', + 'varchar_val', TO_DATE('2023-01-01', 'YYYY-MM-DD'), 123.45, + 123.456789, 123.5, 123, 123, 12345, + 'sample text', 'sample nclob', + TIMESTAMP '2023-01-01 12:00:00', + TIMESTAMP '2023-01-01 12:00:00+00:00', + TIMESTAMP '2023-01-01 12:00:00+05:30' + )`, integrationTestTable) + case "update": query = fmt.Sprintf(` UPDATE %s SET diff --git a/drivers/postgres/go.mod b/drivers/postgres/go.mod index c82087c2a..bbdd452c4 100644 --- a/drivers/postgres/go.mod +++ b/drivers/postgres/go.mod @@ -9,7 +9,7 @@ require ( github.com/jackc/pgx/v5 v5.9.2 github.com/lib/pq v1.10.9 github.com/stretchr/testify v1.11.1 - golang.org/x/crypto v0.50.0 + golang.org/x/crypto v0.52.0 ) require ( @@ -140,12 +140,12 @@ require ( go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/text v0.36.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/text v0.37.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect @@ -161,8 +161,8 @@ require ( github.com/jmoiron/sqlx v1.4.0 github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/tools v0.44.0 // indirect ) replace ( diff --git a/drivers/postgres/internal/cdc.go b/drivers/postgres/internal/cdc.go index 86708941b..a9d461d97 100644 --- a/drivers/postgres/internal/cdc.go +++ b/drivers/postgres/internal/cdc.go @@ -143,9 +143,15 @@ func (p *Postgres) StreamChanges(ctx context.Context, _ int, metadataStates map[ // persist replicator for post cdc p.replicator = replicator - // validate global state (might got invalid during full load) - if err := validateGlobalState(postgresGlobalState, slot.LSN); err != nil { - return nil, fmt.Errorf("%s: invalid global state: %s", constants.ErrNonRetryable, err) + // validateGlobalState ensures slot and state agree before WAL replay begins. + // Skip it when remainingStreams is empty: all streams are already committed in + // Iceberg, so no WAL will be replayed and the slot/state relationship is irrelevant. + if len(remainingStreams) > 0 { + if err := validateGlobalState(postgresGlobalState, slot.LSN); err != nil { + return nil, fmt.Errorf("%s: invalid global state: %s", constants.ErrNonRetryable, err) + } + } else { + logger.Infof("all streams already committed in destination, skipping state LSN validation") } // choose replicator via factory based on OutputPlugin config (default wal2json) diff --git a/drivers/postgres/internal/postgres_test.go b/drivers/postgres/internal/postgres_test.go index 13c874f8c..4c529f837 100644 --- a/drivers/postgres/internal/postgres_test.go +++ b/drivers/postgres/internal/postgres_test.go @@ -8,21 +8,20 @@ import ( _ "github.com/lib/pq" ) -func TestPostgresIntegration(t *testing.T) { - t.Parallel() - testConfig := &testutils.IntegrationTest{ - TestConfig: testutils.GetTestConfig(string(constants.Postgres)), - Namespace: "public", - ExpectedData: ExpectedPostgresData, - ExpectedUpdatedData: ExpectedUpdatedData, - DestinationDataTypeSchema: PostgresToDestinationSchema, - UpdatedDestinationDataTypeSchema: UpdatedPostgresToDestinationSchema, - DefaultCDCColumnsSchema: ExpectedPostgresDefaultCDCColumnsSchema, - ExecuteQuery: ExecuteQuery, - DestinationDB: "postgres_postgres_public", - CursorField: "col_cursor:col_int", - PartitionRegex: "/{col_bigserial,identity}", - ColumnToExclude: "excludedcolumn", +// postgresBaseConfig returns an IntegrationTest pre-populated with all fields shared +// between TestPostgresIntegration and TestPostgres2PC. +func postgresBaseConfig() *testutils.IntegrationTest { + return &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.Postgres)), + Namespace: "public", + ExpectedData: ExpectedPostgresData, + DestinationDataTypeSchema: PostgresToDestinationSchema, + DefaultCDCColumnsSchema: ExpectedPostgresDefaultCDCColumnsSchema, + ExecuteQuery: ExecuteQuery, + DestinationDB: "postgres_postgres_public", + CursorField: "col_cursor:col_int", + PartitionRegex: "/{col_bigserial,identity}", + ColumnToExclude: "excludedcolumn", FilterConfig: `{ "logical_operator": "And", "conditions": [ @@ -39,7 +38,19 @@ func TestPostgresIntegration(t *testing.T) { ] }`, } - testConfig.TestIntegration(t) +} + +func TestPostgresIntegration(t *testing.T) { + t.Parallel() + cfg := postgresBaseConfig() + cfg.ExpectedUpdatedData = ExpectedUpdatedData + cfg.UpdatedDestinationDataTypeSchema = UpdatedPostgresToDestinationSchema + cfg.TestIntegration(t) +} + +func TestPostgres2PC(t *testing.T) { + t.Parallel() + postgresBaseConfig().Test2PCIntegration(t) } func TestPostgresPerformance(t *testing.T) { diff --git a/drivers/postgres/internal/postgres_test_util.go b/drivers/postgres/internal/postgres_test_util.go index 69c24aeb3..0cb30e541 100644 --- a/drivers/postgres/internal/postgres_test_util.go +++ b/drivers/postgres/internal/postgres_test_util.go @@ -148,6 +148,30 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation require.NoError(t, err, "Failed to insert filtered test data row") return + case "insert_2pc": + query = fmt.Sprintf(` + INSERT INTO %s ( + col_cursor, col_bigint, col_bool, col_char, col_character, + col_character_varying, col_date, col_decimal, + col_double_precision, col_float4, col_int, col_int2, + col_integer, col_interval, col_json, col_jsonb, + col_name, col_numeric, col_real, col_text, + col_timestamp, col_timestamptz, col_uuid, col_varbit, col_xml, + col_point, col_polygon, col_circle + ) VALUES ( + 7, 123456789012345, TRUE, 'c', 'charac_val', + 'varchar_val', '2023-01-01', 123.45, + 123.456789, 123.45, 123, 123, 12345, + '1 hour', '{"key": "value"}', '{"key": "value"}', + 'test_name', 123.45, 123.45, 'sample text', + '2023-01-01 12:00:00', '2023-01-01 12:00:00+00', + '123e4567-e89b-12d3-a456-426614174000', B'101010', + 'value', + '(10.5,20.5)'::point, + '((0,0),(10,0),(10,10),(0,10),(0,0))'::polygon, + '<(5,5),3.5>'::circle + )`, integrationTestTable) + case "update": query = fmt.Sprintf(` UPDATE %s SET diff --git a/drivers/s3/go.mod b/drivers/s3/go.mod index 48d7bb93d..40c1aabfc 100644 --- a/drivers/s3/go.mod +++ b/drivers/s3/go.mod @@ -125,15 +125,15 @@ require ( go.opentelemetry.io/otel/metric v1.43.0 // indirect go.opentelemetry.io/otel/trace v1.43.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.50.0 // indirect + golang.org/x/crypto v0.52.0 // indirect golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/sync v0.20.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect - golang.org/x/text v0.36.0 // indirect - golang.org/x/tools v0.43.0 // indirect + golang.org/x/sys v0.45.0 // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect + golang.org/x/text v0.37.0 // indirect + golang.org/x/tools v0.44.0 // indirect golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect diff --git a/go.mod b/go.mod index 7b38f69b7..2f38525fb 100644 --- a/go.mod +++ b/go.mod @@ -51,14 +51,14 @@ require ( github.com/rs/xid v1.6.0 // indirect go.opentelemetry.io/otel/sdk v1.43.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.43.0 // indirect - golang.org/x/telemetry v0.0.0-20260311193753-579e4da9a98c // indirect + golang.org/x/telemetry v0.0.0-20260409153401-be6f6cb8b1fa // indirect gopkg.in/ini.v1 v1.66.6 // indirect ) require ( dario.cat/mergo v1.0.2 // indirect github.com/paulmach/orb v0.12.0 - golang.org/x/tools v0.43.0 // indirect + golang.org/x/tools v0.44.0 // indirect ) require ( @@ -155,13 +155,13 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.50.0 + golang.org/x/crypto v0.52.0 golang.org/x/exp v0.0.0-20250506013437-ce4c2cf36ca6 // indirect - golang.org/x/mod v0.34.0 // indirect - golang.org/x/net v0.53.0 // indirect + golang.org/x/mod v0.35.0 // indirect + golang.org/x/net v0.54.0 // indirect golang.org/x/oauth2 v0.34.0 // indirect - golang.org/x/sys v0.43.0 // indirect - golang.org/x/text v0.36.0 + golang.org/x/sys v0.45.0 // indirect + golang.org/x/text v0.37.0 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect; from staging gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/protocol/root.go b/protocol/root.go index af7b8db91..d4f5fae09 100644 --- a/protocol/root.go +++ b/protocol/root.go @@ -1,9 +1,12 @@ package protocol import ( + "context" "fmt" "os" + "os/signal" "path/filepath" + "syscall" "github.com/datazip-inc/olake/constants" "github.com/datazip-inc/olake/drivers/abstract" @@ -80,13 +83,55 @@ var RootCmd = &cobra.Command{ }, } +// CreateRootCommand wires the cobra root for the given driver. It mutates +// package-level state (RootCmd, connector) and installs a process-wide signal +// handler, so it must be called at most once per process — the existing +// connector.RegisterDriver entry point already enforces this. func CreateRootCommand(_ bool, driver any) *cobra.Command { RootCmd.AddCommand(commands...) - connector = abstract.NewAbstractDriver(RootCmd.Context(), driver.(abstract.DriverInterface)) + + // Wire SIGINT/SIGTERM into the root context so CDC, backfill and + // destination-writer paths reach their existing ctx.Done() branches on + // pod eviction, docker stop, or Ctrl-C, instead of being killed mid-read. + ctx := signalAwareRootContext(RootCmd.Context()) + RootCmd.SetContext(ctx) + + connector = abstract.NewAbstractDriver(ctx, driver.(abstract.DriverInterface)) return RootCmd } +// signalAwareRootContext wraps parent so that the returned context cancels on +// SIGINT / SIGTERM as well as on any parent cancellation. Used to wire pod +// eviction, docker stop, and Ctrl-C through to the existing ctx.Done() +// branches in CDC, backfill, and destination-writer paths. +// +// Source / destination consistency on cancel is still owned by each +// driver.PostCDC and destination writer.Close implementation. This wrapper only +// makes process signals visible through ctx.Done(); it does not make source +// checkpoints and destination commits atomic. Any implementation that performs +// a final commit after work has been written must continue to check ctx.Done() +// before that commit and must treat a canceled context as a reason to avoid +// advancing only one side of the source/destination boundary. +// +// The Kafka driver has a separate `TODO: Add 2PC support for Kafka` for a +// future stricter contract. Other drivers may have similar source-specific +// checkpointing constraints, so this helper should not be used as a substitute +// for driver-level cancellation safety. +func signalAwareRootContext(parent context.Context) context.Context { + ctx, stop := signal.NotifyContext(parent, syscall.SIGINT, syscall.SIGTERM) + // signal.NotifyContext keeps the signal handler installed until stop() is + // called. Releasing it after the first cancellation lets a subsequent + // SIGINT/SIGTERM fall through to the Go runtime default (terminate), which + // is the behavior an operator hitting Ctrl-C twice expects. + go func() { + <-ctx.Done() + stop() + }() + + return ctx +} + func init() { // TODO: replace --catalog flag with --streams commands = append(commands, specCmd, checkCmd, discoverCmd, syncCmd, clearCmd) diff --git a/protocol/root_test.go b/protocol/root_test.go new file mode 100644 index 000000000..1f64f2c00 --- /dev/null +++ b/protocol/root_test.go @@ -0,0 +1,112 @@ +package protocol + +import ( + "context" + "os" + "os/exec" + "runtime" + "syscall" + "testing" + "time" +) + +// TestSignalAwareRootContextCancelsOnSignal verifies that signalAwareRootContext +// cancels the returned context when the process receives SIGINT or SIGTERM. +// +// The test re-execs itself in a child process per signal case so that we can +// safely deliver a real OS signal without affecting the parent test runner. +// The OLAKE_SIGNAL_CONTEXT_HELPER env var switches the binary into "helper" +// mode; OLAKE_TEST_SIGNAL selects which signal the helper sends to itself. +func TestSignalAwareRootContextCancelsOnSignal(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("process signal behavior differs on windows") + } + + tests := []struct { + name string + signal syscall.Signal + env string + }{ + { + name: "SIGTERM cancellation", + signal: syscall.SIGTERM, + env: "SIGTERM", + }, + { + name: "SIGINT cancellation", + signal: syscall.SIGINT, + env: "SIGINT", + }, + } + + // Helper-mode branch: runs inside the re-execed child. Installs the + // signal handler under test, sends the chosen signal to ourselves, and + // asserts the context cancels with context.Canceled within a bounded + // timeout. + if os.Getenv("OLAKE_SIGNAL_CONTEXT_HELPER") == "1" { + ctx := signalAwareRootContext(context.Background()) + + var signal syscall.Signal + + switch os.Getenv("OLAKE_TEST_SIGNAL") { + case "SIGTERM": + signal = syscall.SIGTERM + case "SIGINT": + signal = syscall.SIGINT + default: + t.Fatal("unknown test signal") + } + + currentProcess, err := os.FindProcess(os.Getpid()) + if err != nil { + t.Fatal(err) + } + + if err := currentProcess.Signal(signal); err != nil { + t.Fatal(err) + } + + select { + case <-ctx.Done(): + if ctx.Err() != context.Canceled { + t.Fatalf("expected canceled context, got %v", ctx.Err()) + } + case <-time.After(time.Second): + t.Fatalf("context was not canceled after %v", signal) + } + + return + } + + // Parent-mode branch: spawns one helper child per signal case and fails + // the subtest if the child exits non-zero. + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cmd := exec.Command(os.Args[0], "-test.run=TestSignalAwareRootContextCancelsOnSignal") + cmd.Env = append(os.Environ(), "OLAKE_SIGNAL_CONTEXT_HELPER=1", "OLAKE_TEST_SIGNAL="+tt.env) + + if output, err := cmd.CombinedOutput(); err != nil { + t.Fatalf("signal helper failed: %v\n%s", err, output) + } + }) + } +} + +// TestSignalAwareRootContextPreservesParentCancellation verifies that +// canceling the parent context still propagates through the signal-aware +// wrapper. Without this, callers that cancel via context.WithCancel / +// context.WithTimeout would be silently ignored after the wrap. +func TestSignalAwareRootContextPreservesParentCancellation(t *testing.T) { + parent, cancel := context.WithCancel(context.Background()) + ctx := signalAwareRootContext(parent) + cancel() + + select { + case <-ctx.Done(): + if ctx.Err() != context.Canceled { + t.Fatalf("expected canceled context, got %v", ctx.Err()) + } + case <-time.After(time.Second): + t.Fatal("context was not canceled after parent cancellation") + } +} diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 6810cb2a9..a075cdf92 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -71,6 +71,7 @@ type TestConfig struct { IcebergDestinationPath string ParquetDestinationPath string StatePath string + StateCheckpointPath string // backup of state.json used in 2PC recovery tests StatsPath string BenchmarksPath string HostTestDataPath string @@ -200,6 +201,7 @@ func GetTestConfig(driver string, extraParams ...string) *TestConfig { IcebergDestinationPath: fmt.Sprintf(containerTestDataPath, driver, "iceberg_destination.json"), ParquetDestinationPath: fmt.Sprintf(containerTestDataPath, driver, "parquet_destination.json"), StatePath: fmt.Sprintf(containerTestDataPath, driver, "state.json"), + StateCheckpointPath: fmt.Sprintf(containerTestDataPath, driver, "state_checkpoint.json"), StatsPath: fmt.Sprintf(containerTestDataPath, driver, "stats.json"), } } @@ -284,6 +286,16 @@ func resetStateFileCommand(config TestConfig) string { return fmt.Sprintf(`rm -f %s; echo '{}' > %s`, config.StatePath, config.StatePath) } +// saveStateFileCommand copies state.json to the checkpoint state file. +func saveStateFileCommand(config *TestConfig) string { + return fmt.Sprintf(`cp %s %s`, config.StatePath, config.StateCheckpointPath) +} + +// restoreStateFileCommand replaces state.json with the previously saved checkpoint backup. +func restoreStateFileCommand(config *TestConfig) string { + return fmt.Sprintf(`cp %s %s`, config.StateCheckpointPath, config.StatePath) +} + func toggleArrowIcebergWrites(config TestConfig, enabled bool) string { tmpDest := "/tmp/iceberg_destination.json" return fmt.Sprintf( @@ -361,11 +373,14 @@ func DeleteParquetFiles(t *testing.T, parquetDB, tableName string) error { // syncTestCase represents a test case for sync operations type syncTestCase struct { - name string - operation string - useState bool - opSymbol string - expected map[string]interface{} + name string + operation string + useState bool + opSymbol string + expected map[string]interface{} + preSetupCommands []string // shell commands to execute in the container before the sync + verifyNoDuplicates bool // if true, assert COUNT(*) == COUNT(DISTINCT _olake_id) after sync + expectedRowCountByOpType int64 // when > 0, assert COUNT(DISTINCT _olake_id) == this value (catches over-sync and under-sync) } // runSyncAndVerify executes a sync command and verifies the results in Iceberg @@ -818,194 +833,444 @@ func (cfg *IntegrationTest) testParquetFullLoadAndIncremental( return nil } -func (cfg *IntegrationTest) TestIntegration(t *testing.T) { - ctx := context.Background() +// testIceberg2PCCDCRecovery tests 2PC (Two-Phase Commit) failure recovery for CDC mode using +// the Iceberg destination. It simulates a state-save failure mid-sync: saves a pre-insert +// checkpoint, performs a CDC insert, then restores to the checkpoint and inserts a second +// record (insert_2pc) to verify the driver correctly recovers without duplicating rows. +func (cfg *IntegrationTest) testIceberg2PCCDCRecovery( + ctx context.Context, + t *testing.T, + c testcontainers.Container, + testTable string, +) error { + t.Log("Starting Iceberg 2PC CDC Recovery tests") - t.Logf("Root Project directory: %s", cfg.TestConfig.HostRootPath) - t.Logf("Test data directory: %s", cfg.TestConfig.HostTestDataPath) - currentTestTable := utils.Ternary(cfg.TestConfig.DataFormat == "", fmt.Sprintf("%s_test_table_olake", cfg.TestConfig.Driver), fmt.Sprintf("%s_%s_test_table_olake", cfg.TestConfig.Driver, cfg.TestConfig.DataFormat)).(string) + if err := cfg.resetTable(ctx, t, testTable); err != nil { + return fmt.Errorf("failed to reset table: %w", err) + } - t.Run("Discover", func(t *testing.T) { - req := testcontainers.ContainerRequest{ - Image: "golang:1.25.10-bookworm", - ImagePlatform: "linux/amd64", - HostConfigModifier: func(hc *container.HostConfig) { - hc.Binds = []string{ - fmt.Sprintf("%s:/test-olake:rw", cfg.TestConfig.HostRootPath), - fmt.Sprintf("%s:/test-olake/drivers/%s/internal/testdata:rw", cfg.TestConfig.HostTestDataPath, cfg.TestConfig.Driver), - } - hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + twoPCCDCTestCases := []syncTestCase{ + { + name: "Full-Refresh", + operation: "", + useState: false, + opSymbol: "r", + expected: cfg.ExpectedData, + verifyNoDuplicates: true, + expectedRowCountByOpType: 5, + }, + { + name: "CDC - insert", + operation: "insert", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedData, + preSetupCommands: []string{ + saveStateFileCommand(cfg.TestConfig), }, - ConfigModifier: func(config *container.Config) { - config.WorkingDir = "/test-olake" + }, + { + // Simulate 2PC failure: restore state to pre-insert checkpoint, insert a + // second record, run sync. The driver recovers: it advances state to the + // committed metadata LSN by making a bounded sync. + // expectedRowCountByOpType=1 because no new data lands in Iceberg here, + // as it just recovers the sync from state -> metadata LSN. + name: "CDC - Recovery Sync", + operation: "insert_2pc", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedData, + verifyNoDuplicates: true, + expectedRowCountByOpType: 1, + preSetupCommands: []string{ + restoreStateFileCommand(cfg.TestConfig), }, - Env: map[string]string{ - "TELEMETRY_DISABLED": "true", + }, + { + // After the recovery sync advanced state to the committed metadata LSN, + // a normal sync should see both the original insert and insert_2pc rows. + name: "CDC - Post Recovery Sync", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedData, + verifyNoDuplicates: true, + expectedRowCountByOpType: 2, // insert row + insert_2pc row, both unique by _olake_id + }, + } + + for _, tc := range twoPCCDCTestCases { + t.Run(tc.name, func(t *testing.T) { + for _, cmd := range tc.preSetupCommands { + if code, out, execErr := utils.ExecCommand(ctx, c, cmd); execErr != nil || code != 0 { + t.Fatalf("%s pre-sync command failed (%d): %v\n%s", tc.name, code, execErr, out) + } + } + + if err := cfg.runSyncAndVerify( + ctx, t, c, testTable, tc.useState, "iceberg", + tc.operation, tc.opSymbol, tc.expected, + tc.name != "Full-Refresh", + ); err != nil { + t.Fatalf("%s test failed: %v", tc.name, err) + } + + if tc.verifyNoDuplicates { + VerifyIcebergNoDuplicates(ctx, t, testTable, cfg.DestinationDB, tc.opSymbol, tc.expectedRowCountByOpType) + } + }) + } + + t.Log("Iceberg 2PC CDC Recovery tests completed successfully") + dropIcebergTable(t, testTable, cfg.DestinationDB) + t.Logf("Dropped Iceberg table after 2PC CDC tests: %s", testTable) + return nil +} + +// testIceberg2PCIncrementalRecovery tests 2PC (Two-Phase Commit) failure recovery for +// incremental mode using the Iceberg destination. It simulates a state-save failure after +// the cursor advances: saves a pre-insert checkpoint, performs an incremental insert, then +// restores to the checkpoint and inserts a second record (insert_2pc) to verify that the +// cursor re-reads the overlapping range, deduplicates the original insert via MERGE INTO, +// and correctly surfaces only the net-new insert_2pc row. +func (cfg *IntegrationTest) testIceberg2PCIncrementalRecovery( + ctx context.Context, + t *testing.T, + c testcontainers.Container, + testTable string, +) error { + t.Log("Starting Iceberg 2PC Incremental Recovery tests") + + if err := cfg.resetTable(ctx, t, testTable); err != nil { + return fmt.Errorf("failed to reset table: %w", err) + } + + // Patch streams.json: set sync_mode = incremental, cursor_field + incPatch := updateStreamConfigCommand(*cfg.TestConfig, cfg.Namespace, testTable, "incremental", cfg.CursorField) + code, out, err := utils.ExecCommand(ctx, c, incPatch) + if err != nil || code != 0 { + return fmt.Errorf("failed to patch streams.json for incremental (%d): %s\n%s", code, err, out) + } + + // Reset state so initial incremental behaves like a first full incremental load + resetState := resetStateFileCommand(*cfg.TestConfig) + code, out, err = utils.ExecCommand(ctx, c, resetState) + if err != nil || code != 0 { + return fmt.Errorf("failed to reset state for incremental (%d): %s\n%s", code, err, out) + } + + twoPCIncrementalTestCases := []syncTestCase{ + { + name: "Full-Refresh", + operation: "", + useState: false, + opSymbol: "r", + expected: cfg.ExpectedData, + verifyNoDuplicates: true, + expectedRowCountByOpType: 5, + }, + { + name: "Incremental - insert", + operation: "insert", + useState: true, + opSymbol: "u", + expected: cfg.ExpectedData, + preSetupCommands: []string{ + saveStateFileCommand(cfg.TestConfig), }, - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ - { - PostReadies: []testcontainers.ContainerHook{ - func(ctx context.Context, c testcontainers.Container) error { - // 1. Install required tools - if code, out, err := utils.ExecCommand(ctx, c, installCmd); err != nil || code != 0 { - return fmt.Errorf("install failed (%d): %s\n%s", code, err, out) - } + }, + { + // Simulate 2PC failure: restore cursor to pre-insert checkpoint, insert a + // second record, run sync. The cursor re-reads the range and deduplicates + // the original insert via MERGE INTO; insert_2pc is net-new. + // expectedRowCountByOpType=1: only insert_2pc is visible (original deduplicated). + name: "Incremental - State Save Failure Sync", + operation: "insert_2pc", + useState: true, + opSymbol: "u", + expected: cfg.ExpectedData, + verifyNoDuplicates: true, + expectedRowCountByOpType: 1, + preSetupCommands: []string{ + restoreStateFileCommand(cfg.TestConfig), + }, + }, + { + // After recovery, state is now consistent. A normal sync should see both + // the original insert row and insert_2pc row — 2 distinct records total. + name: "Incremental - Post Recovery Sync", + useState: true, + opSymbol: "u", + expected: cfg.ExpectedData, + verifyNoDuplicates: true, + expectedRowCountByOpType: 2, // insert row + insert_2pc row, both unique by _olake_id + }, + } - // 2. Query on test table - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + for _, tc := range twoPCIncrementalTestCases { + t.Run(tc.name, func(t *testing.T) { + for _, cmd := range tc.preSetupCommands { + if code, out, execErr := utils.ExecCommand(ctx, c, cmd); execErr != nil || code != 0 { + t.Fatalf("%s pre-sync command failed (%d): %v\n%s", tc.name, code, execErr, out) + } + } - // 3. Run discover command - discoverCmd := discoverCommand(*cfg.TestConfig) - if code, out, err := utils.ExecCommand(ctx, c, discoverCmd); err != nil || code != 0 { - return fmt.Errorf("discover failed (%d): %s\n%s", code, err, string(out)) - } + if err := cfg.runSyncAndVerify( + ctx, t, c, testTable, tc.useState, "iceberg", + tc.operation, tc.opSymbol, tc.expected, + false, + ); err != nil { + t.Fatalf("Incremental 2PC test %s failed: %v", tc.name, err) + } - // 4. Verify streams.json file - streamsJSON, err := os.ReadFile(cfg.TestConfig.HostTestCatalogPath) - if err != nil { - return fmt.Errorf("failed to read expected streams JSON: %s", err) - } - testStreamsJSON, err := os.ReadFile(cfg.TestConfig.HostCatalogPath) - if err != nil { - return fmt.Errorf("failed to read actual streams JSON: %s", err) - } - if !utils.NormalizedEqual(string(streamsJSON), string(testStreamsJSON)) { - return fmt.Errorf("streams.json does not match expected test_streams.json\nExpected:\n%s\nGot:\n%s", string(streamsJSON), string(testStreamsJSON)) - } - t.Logf("Generated streams validated with test streams") + if tc.verifyNoDuplicates { + VerifyIcebergNoDuplicates(ctx, t, testTable, cfg.DestinationDB, tc.opSymbol, tc.expectedRowCountByOpType) + } + }) + } - // 5. Clean up - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) - t.Logf("%s discover test-container clean up", cfg.TestConfig.Driver) - return nil - }, + t.Log("Iceberg 2PC Incremental Recovery tests completed successfully") + dropIcebergTable(t, testTable, cfg.DestinationDB) + t.Logf("Dropped Iceberg table after 2PC Incremental tests: %s", testTable) + return nil +} + +// runInTestContainer starts a disposable golang:bookworm container, mounts the project root +// and driver test-data directory, runs testFn inside its PostReadies lifecycle hook, and +// terminates the container when done. +func (cfg *IntegrationTest) runInTestContainer( + ctx context.Context, + t *testing.T, + testFn func(ctx context.Context, c testcontainers.Container) error, +) { + t.Helper() + req := testcontainers.ContainerRequest{ + Image: "golang:1.25.10-bookworm", + ImagePlatform: "linux/amd64", + HostConfigModifier: func(hc *container.HostConfig) { + hc.Binds = []string{ + fmt.Sprintf("%s:/test-olake:rw", cfg.TestConfig.HostRootPath), + fmt.Sprintf("%s:/test-olake/drivers/%s/internal/testdata:rw", cfg.TestConfig.HostTestDataPath, cfg.TestConfig.Driver), + } + hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") + }, + ConfigModifier: func(config *container.Config) { + config.WorkingDir = "/test-olake" + }, + Env: map[string]string{ + "TELEMETRY_DISABLED": "true", + }, + LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ + { + PostReadies: []testcontainers.ContainerHook{ + func(ctx context.Context, c testcontainers.Container) error { + return testFn(ctx, c) }, }, }, - Cmd: []string{"tail", "-f", "/dev/null"}, - } + }, + Cmd: []string{"tail", "-f", "/dev/null"}, + } - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - }) - require.NoError(t, err, "Container startup failed") - defer func() { - if err := container.Terminate(ctx); err != nil { - t.Logf("warning: failed to terminate container: %v", err) - } - }() + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, }) + require.NoError(t, err, "Container startup failed") + defer func() { + if err := container.Terminate(ctx); err != nil { + t.Logf("warning: failed to terminate container: %v", err) + } + }() +} + +// Test2PCIntegration runs the full Two-Phase Commit (2PC) failure-recovery integration test +// suite in an isolated container. It exercises CDC and incremental state-recovery scenarios +// independently of the happy-path integration tests, allowing them to be scheduled and +// reported separately. +func (cfg *IntegrationTest) Test2PCIntegration(t *testing.T) { + ctx := context.Background() + + t.Logf("Root Project directory: %s", cfg.TestConfig.HostRootPath) + t.Logf("Test data directory: %s", cfg.TestConfig.HostTestDataPath) + currentTestTable := utils.Ternary(cfg.TestConfig.DataFormat == "", fmt.Sprintf("%s_test_table_olake", cfg.TestConfig.Driver), fmt.Sprintf("%s_%s_test_table_olake", cfg.TestConfig.Driver, cfg.TestConfig.DataFormat)).(string) + + // 2PC tests don't need schema discovery — the schema is already validated by the regular integration test. + testStreamsData, err := os.ReadFile(cfg.TestConfig.HostTestCatalogPath) + require.NoError(t, err, "failed to read test_streams.json") + require.NoError(t, os.WriteFile(cfg.TestConfig.HostCatalogPath, testStreamsData, 0600), "failed to write streams.json") t.Run("Sync", func(t *testing.T) { - req := testcontainers.ContainerRequest{ - Image: "golang:1.25.10-bookworm", - ImagePlatform: "linux/amd64", - HostConfigModifier: func(hc *container.HostConfig) { - hc.Binds = []string{ - fmt.Sprintf("%s:/test-olake:rw", cfg.TestConfig.HostRootPath), - fmt.Sprintf("%s:/test-olake/drivers/%s/internal/testdata:rw", cfg.TestConfig.HostTestDataPath, cfg.TestConfig.Driver), + cfg.runInTestContainer(ctx, t, func(ctx context.Context, c testcontainers.Container) error { + if code, out, err := utils.ExecCommand(ctx, c, installCmd); err != nil || code != 0 { + return fmt.Errorf("install failed (%d): %s\n%s", code, err, out) + } + + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + + streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, []string{currentTestTable}, true, cfg.ColumnToExclude) + if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { + return fmt.Errorf("failed to enable normalization and partition regex in streams.json (%d): %s\n%s", + code, err, out, + ) + } + t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) + + writerTypes := []struct { + name string + useArrow bool + }{ + {"Legacy", false}, + {"Arrow", true}, + } + + if !slices.Contains(constants.SkipCDCDrivers, constants.DriverType(cfg.TestConfig.Driver)) { + for _, wt := range writerTypes { + t.Run(fmt.Sprintf("Iceberg (%s) 2PC CDC Recovery tests", wt.name), func(t *testing.T) { + if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIceberg2PCCDCRecovery); err != nil { + t.Fatalf("Iceberg (%s) 2PC CDC Recovery tests failed: %v", wt.name, err) + } + }) } - hc.ExtraHosts = append(hc.ExtraHosts, "host.docker.internal:host-gateway") - }, - ConfigModifier: func(config *container.Config) { - config.WorkingDir = "/test-olake" - }, - Env: map[string]string{ - "TELEMETRY_DISABLED": "true", - }, - LifecycleHooks: []testcontainers.ContainerLifecycleHooks{ - { - PostReadies: []testcontainers.ContainerHook{ - func(ctx context.Context, c testcontainers.Container) error { - // 1. Install required tools - if code, out, err := utils.ExecCommand(ctx, c, installCmd); err != nil || code != 0 { - return fmt.Errorf("install failed (%d): %s\n%s", code, err, out) - } + } - // 2. Query on test table - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) - - // streamUpdateCmd := fmt.Sprintf( - // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, - // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, - // ) - streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, []string{currentTestTable}, true, cfg.ColumnToExclude) - if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { - return fmt.Errorf("failed to enable normalization and partition regex in streams.json (%d): %s\n%s", - code, err, out, - ) - } + if cfg.TestConfig.Driver != string(constants.Kafka) { + for _, wt := range writerTypes { + t.Run(fmt.Sprintf("Iceberg (%s) 2PC Incremental Recovery tests", wt.name), func(t *testing.T) { + if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIceberg2PCIncrementalRecovery); err != nil { + t.Fatalf("Iceberg (%s) 2PC Incremental Recovery tests failed: %v", wt.name, err) + } + }) + } + } - t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + t.Logf("%s 2PC sync test-container clean up", cfg.TestConfig.Driver) + return nil + }) + }) +} - writerTypes := []struct { - name string - useArrow bool - }{ - {"Legacy", false}, - {"Arrow", true}, - } +func (cfg *IntegrationTest) TestIntegration(t *testing.T) { + ctx := context.Background() - // Skip cdc tests for drivers not supporting cdc mode - if !slices.Contains(constants.SkipCDCDrivers, constants.DriverType(cfg.TestConfig.Driver)) { - for _, wt := range writerTypes { - t.Run(fmt.Sprintf("Iceberg (%s) Full load + CDC tests", wt.name), func(t *testing.T) { - if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndCDC); err != nil { - t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", wt.name, err) - } - }) - } + t.Logf("Root Project directory: %s", cfg.TestConfig.HostRootPath) + t.Logf("Test data directory: %s", cfg.TestConfig.HostTestDataPath) + currentTestTable := utils.Ternary(cfg.TestConfig.DataFormat == "", fmt.Sprintf("%s_test_table_olake", cfg.TestConfig.Driver), fmt.Sprintf("%s_%s_test_table_olake", cfg.TestConfig.Driver, cfg.TestConfig.DataFormat)).(string) - t.Run("Parquet Full load + CDC tests", func(t *testing.T) { - if err := cfg.testParquetFullLoadAndCDC(ctx, t, c, currentTestTable); err != nil { - t.Fatalf("Parquet Full load + CDC tests failed: %v", err) - } - }) - } + t.Run("Discover", func(t *testing.T) { + cfg.runInTestContainer(ctx, t, func(ctx context.Context, c testcontainers.Container) error { + // 1. Install required tools + if code, out, err := utils.ExecCommand(ctx, c, installCmd); err != nil || code != 0 { + return fmt.Errorf("install failed (%d): %s\n%s", code, err, out) + } - // Skip incremental tests for drivers not supporting incremental mode - if cfg.TestConfig.Driver != string(constants.Kafka) { - for _, wt := range writerTypes { - t.Run(fmt.Sprintf("Iceberg (%s) Full load + Incremental tests", wt.name), func(t *testing.T) { - if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndIncremental); err != nil { - t.Fatalf("Iceberg (%s) Full load + Incremental tests failed: %v", wt.name, err) - } - }) - } + // 2. Query on test table + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) - t.Run("Parquet Full load + Incremental tests", func(t *testing.T) { - if err := cfg.testParquetFullLoadAndIncremental(ctx, t, c, currentTestTable); err != nil { - t.Fatalf("Parquet Full load + Incremental tests failed: %v", err) - } - }) - } + // 3. Run discover command + discoverCmd := discoverCommand(*cfg.TestConfig) + if code, out, err := utils.ExecCommand(ctx, c, discoverCmd); err != nil || code != 0 { + return fmt.Errorf("discover failed (%d): %s\n%s", code, err, string(out)) + } - // 5. Clean up - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) - t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) - return nil - }, - }, - }, - }, - Cmd: []string{"tail", "-f", "/dev/null"}, - } + // 4. Verify streams.json file + streamsJSON, err := os.ReadFile(cfg.TestConfig.HostTestCatalogPath) + if err != nil { + return fmt.Errorf("failed to read expected streams JSON: %s", err) + } + testStreamsJSON, err := os.ReadFile(cfg.TestConfig.HostCatalogPath) + if err != nil { + return fmt.Errorf("failed to read actual streams JSON: %s", err) + } + if !utils.NormalizedEqual(string(streamsJSON), string(testStreamsJSON)) { + return fmt.Errorf("streams.json does not match expected test_streams.json\nExpected:\n%s\nGot:\n%s", string(streamsJSON), string(testStreamsJSON)) + } + t.Logf("Generated streams validated with test streams") - container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, + // 5. Clean up + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + t.Logf("%s discover test-container clean up", cfg.TestConfig.Driver) + return nil }) - require.NoError(t, err, "Container startup failed") - defer func() { - if err := container.Terminate(ctx); err != nil { - t.Logf("warning: failed to terminate container: %v", err) + }) + + t.Run("Sync", func(t *testing.T) { + cfg.runInTestContainer(ctx, t, func(ctx context.Context, c testcontainers.Container) error { + // 1. Install required tools + if code, out, err := utils.ExecCommand(ctx, c, installCmd); err != nil || code != 0 { + return fmt.Errorf("install failed (%d): %s\n%s", code, err, out) } - }() + + // 2. Query on test table + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + + // streamUpdateCmd := fmt.Sprintf( + // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, + // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, + // ) + streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, []string{currentTestTable}, true, cfg.ColumnToExclude) + if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { + return fmt.Errorf("failed to enable normalization and partition regex in streams.json (%d): %s\n%s", + code, err, out, + ) + } + + t.Logf("Enabled normalization and added partition regex in %s", cfg.TestConfig.CatalogPath) + + writerTypes := []struct { + name string + useArrow bool + }{ + {"Legacy", false}, + {"Arrow", true}, + } + + // Skip cdc tests for drivers not supporting cdc mode + if !slices.Contains(constants.SkipCDCDrivers, constants.DriverType(cfg.TestConfig.Driver)) { + for _, wt := range writerTypes { + t.Run(fmt.Sprintf("Iceberg (%s) Full load + CDC tests", wt.name), func(t *testing.T) { + if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndCDC); err != nil { + t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", wt.name, err) + } + }) + } + + t.Run("Parquet Full load + CDC tests", func(t *testing.T) { + if err := cfg.testParquetFullLoadAndCDC(ctx, t, c, currentTestTable); err != nil { + t.Fatalf("Parquet Full load + CDC tests failed: %v", err) + } + }) + } + + // Skip incremental tests for drivers not supporting incremental mode + if cfg.TestConfig.Driver != string(constants.Kafka) { + for _, wt := range writerTypes { + t.Run(fmt.Sprintf("Iceberg (%s) Full load + Incremental tests", wt.name), func(t *testing.T) { + if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndIncremental); err != nil { + t.Fatalf("Iceberg (%s) Full load + Incremental tests failed: %v", wt.name, err) + } + }) + } + + t.Run("Parquet Full load + Incremental tests", func(t *testing.T) { + if err := cfg.testParquetFullLoadAndIncremental(ctx, t, c, currentTestTable); err != nil { + t.Fatalf("Parquet Full load + Incremental tests failed: %v", err) + } + }) + } + + // 5. Clean up + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) + return nil + }) }) } @@ -1221,6 +1486,66 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema t.Logf("Verified partition column: %s", expectedCol) } +// VerifyIcebergNoDuplicates asserts that no duplicate _olake_id values exist for the given +// _op_type in the Iceberg table. +func VerifyIcebergNoDuplicates(ctx context.Context, t *testing.T, tableName, icebergDB, opSymbol string, expectedRowCountByOpType int64) { + t.Helper() + + spark, err := sql.NewSessionBuilder().Remote(sparkConnectAddress).Build(ctx) + require.NoError(t, err, "Failed to connect to Spark Connect server for duplicate check") + defer func() { + if stopErr := spark.Stop(); stopErr != nil { + t.Errorf("Failed to stop Spark session: %v", stopErr) + } + }() + + fullTableName := fmt.Sprintf("%s.%s.%s", icebergCatalog, icebergDB, tableName) + + // Refresh to get the latest committed Iceberg snapshot. + refreshQuery := fmt.Sprintf("REFRESH TABLE %s", fullTableName) + if _, refreshErr := spark.Sql(ctx, refreshQuery); refreshErr != nil { + t.Logf("REFRESH TABLE (non-fatal): %v", refreshErr) + } + + countQuery := fmt.Sprintf( + "SELECT COUNT(*) AS total, COUNT(DISTINCT _olake_id) AS distinct_count FROM %s WHERE _op_type = '%s'", + fullTableName, opSymbol, + ) + t.Logf("Executing duplicate-check query: %s", countQuery) + + df, err := spark.Sql(ctx, countQuery) + require.NoError(t, err, "Failed to run duplicate-check COUNT query") + + rows, err := df.Collect(ctx) + require.NoError(t, err, "Failed to collect duplicate-check COUNT results") + require.Len(t, rows, 1, "COUNT query must return exactly one row") + + total, ok := rows[0].Value("total").(int64) + require.True(t, ok, "COUNT(*) value is not int64: %T", rows[0].Value("total")) + + distinct, ok2 := rows[0].Value("distinct_count").(int64) + require.True(t, ok2, "COUNT(DISTINCT) value is not int64: %T", rows[0].Value("distinct_count")) + + // 1. No duplicates: every row must have a unique _olake_id. + require.Equal(t, total, distinct, + "Duplicate rows detected for _op_type='%s': total=%d, distinct=%d. "+ + "Iceberg MERGE INTO did not deduplicate re-synced records.", + opSymbol, total, distinct) + + // 2. Exact count: when caller specifies an expected row count, enforce it so that both + // over-sync (old rows re-processed and inserted again) and under-sync (new rows missed) + // are caught. + if expectedRowCountByOpType > 0 { + require.Equal(t, expectedRowCountByOpType, distinct, + "Row count mismatch for _op_type='%s': expected %d distinct rows, got %d. "+ + "Either old rows were re-synced (over-sync) or new rows were missed (under-sync).", + opSymbol, expectedRowCountByOpType, distinct) + } + + t.Logf("Duplicate check passed for _op_type='%s': %d rows, all unique by _olake_id (expected %d)", + opSymbol, distinct, expectedRowCountByOpType) +} + // VerifyParquetSync verifies that data was correctly synchronized to Parquet files in MinIO func VerifyParquetSync(t *testing.T, tableName, parquetDB string, datatypeSchema map[string]string, defaultCDCColumnsSchema map[string]string, schema map[string]interface{}, opSymbol, driver string, isCDC bool, excludedColumn string) { t.Helper()