From 206de805d723d6aee7d7b24bd38652e751507b9c Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Wed, 25 Mar 2026 16:13:10 +0530 Subject: [PATCH 1/8] chore: RAN ALL Sync --- .github/workflows/integration-tests.yml | 14 +- constants/constants.go | 1 + .../iceberg/arrow-writer/transforms.go | 64 ++++- drivers/kafka/docker-compose.yml | 54 ++++ drivers/kafka/go.mod | 18 +- drivers/kafka/internal/kafka_test.go | 27 ++ drivers/kafka/internal/kafka_test_utils.go | 268 ++++++++++++++++++ .../testdata/iceberg_destination.json | 18 ++ .../testdata/parquet_destination.json | 10 + drivers/kafka/internal/testdata/source.json | 13 + .../kafka/internal/testdata/test_streams.json | 1 + utils/testutils/test_utils.go | 134 +++++++-- 12 files changed, 577 insertions(+), 45 deletions(-) create mode 100644 drivers/kafka/docker-compose.yml create mode 100644 drivers/kafka/internal/kafka_test.go create mode 100644 drivers/kafka/internal/kafka_test_utils.go create mode 100644 drivers/kafka/internal/testdata/iceberg_destination.json create mode 100644 drivers/kafka/internal/testdata/parquet_destination.json create mode 100644 drivers/kafka/internal/testdata/source.json create mode 100644 drivers/kafka/internal/testdata/test_streams.json diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 2c898ca3d..5deb318a6 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -53,6 +53,7 @@ jobs: docker compose -f ./drivers/oracle/docker-compose.yml up -d docker compose -f ./drivers/db2/docker-compose.yml up -d docker compose -f ./drivers/mssql/docker-compose.yml up -d + docker compose -f ./drivers/kafka/docker-compose.yml up -d docker compose -f ./destination/iceberg/local-test/docker-compose.yml up minio mc postgres spark-iceberg -d - name: Wait for MySQL @@ -100,6 +101,15 @@ jobs: command: | docker exec db2-test bash -c "su - db2inst1 -c 'db2 connect to TESTDB'" + - name: Wait for Kafka + uses: nick-fields/retry@v2 + with: + timeout_minutes: 5 + max_attempts: 30 + retry_wait_seconds: 5 + command: | + docker exec kafka kafka-topics --bootstrap-server localhost:9092 --list + - name: Wait for MSSQL uses: nick-fields/retry@v2 with: @@ -143,7 +153,7 @@ jobs: - name: Run Integration Tests run: | - go test -v -p 6 ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... ./drivers/mssql/internal/... -timeout 0 -run 'Integration' + go test -v -p 6 ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... ./drivers/mssql/internal/... ./drivers/kafka/internal/... -timeout 0 -run 'Integration' - name: Cleanup if: always() @@ -155,4 +165,4 @@ jobs: docker compose -f ./drivers/mongodb/docker-compose.yml down docker compose -f ./drivers/db2/docker-compose.yml down docker compose -f ./drivers/mssql/docker-compose.yml down - + docker compose -f ./drivers/kafka/docker-compose.yml down diff --git a/constants/constants.go b/constants/constants.go index b6f741603..7aead91f8 100644 --- a/constants/constants.go +++ b/constants/constants.go @@ -56,6 +56,7 @@ var ParallelCDCDrivers = []DriverType{MongoDB, MSSQL} var ErrNonRetryable = fmt.Errorf("failed with non retryable error") var ErrGlobalContextGroup = fmt.Errorf("global context group error") var SkipCDCDrivers = []DriverType{Oracle, DB2} +var OnlyStrictCDCDriver = []DriverType{Kafka} // DriversRequiringIncrementalFormatter are drivers that require special formatting for incremental value var DriversRequiringIncrementalFormatter = []DriverType{Oracle, DB2, MSSQL} diff --git a/destination/iceberg/arrow-writer/transforms.go b/destination/iceberg/arrow-writer/transforms.go index 870346f24..25a1db54c 100644 --- a/destination/iceberg/arrow-writer/transforms.go +++ b/destination/iceberg/arrow-writer/transforms.go @@ -12,7 +12,7 @@ import ( "time" "github.com/datazip-inc/olake/destination/iceberg/proto" - + "github.com/datazip-inc/olake/utils/typeutils" "github.com/twmb/murmur3" ) @@ -64,25 +64,44 @@ func hashString(s string) uint32 { func identityTransform(val any, colType string) (pathStr string, typedVal any, err error) { switch colType { case "boolean": - b := val.(bool) + b, err := typeutils.ReformatBool(val) + if err != nil { + return "", nil, err + } return strconv.FormatBool(b), b, nil case "int": - v := val.(int32) + v, err := typeutils.ReformatInt32(val) + if err != nil { + return "", nil, err + } return fmt.Sprintf("%d", v), v, nil case "long": - v := val.(int64) + v, err := typeutils.ReformatInt64(val) + if err != nil { + return "", nil, err + } return fmt.Sprintf("%d", v), v, nil case "float": - v := val.(float32) + v, err := typeutils.ReformatFloat32(val) + if err != nil { + return "", nil, err + } return fmt.Sprintf("%g", v), v, nil case "double": - v := val.(float64) + v, err := typeutils.ReformatFloat64(val) + if err != nil { + return "", nil, err + } return fmt.Sprintf("%g", v), v, nil case "string": s := val.(string) return s, s, nil case "timestamptz": - t := val.(time.Time).UTC() + t, err := typeutils.ReformatDate(val, false) + if err != nil { + return "", nil, err + } + t = t.UTC() if t.IsZero() { return NULL, nil, nil } @@ -99,7 +118,10 @@ func timeTransform(val any, unit string, colType string) (pathStr string, typedV return "", nil, fmt.Errorf("unsupported time transform %q", unit) } - v, _ := val.(time.Time) + v, err := typeutils.ReformatDate(val, false) + if err != nil { + return "", nil, err + } v = v.UTC() if v.IsZero() { return NULL, nil, nil @@ -133,15 +155,21 @@ func bucketTransform(val any, num int, colType string) (pathStr string, typedVal var h uint32 switch colType { case "int": - v, _ := val.(int32) + v, err := typeutils.ReformatInt32(val) + if err != nil { + return "", nil, err + } h = hashInt(v) case "long": - v, _ := val.(int64) + v, err := typeutils.ReformatInt64(val) + if err != nil { + return "", nil, err + } h = hashInt(v) case "timestamptz": - tm, ok := val.(time.Time) - if !ok { - return "", nil, fmt.Errorf("expected time.Time for colType %q, got %T", colType, val) + tm, err := typeutils.ReformatDate(val, false) + if err != nil { + return "", nil, err } if tm.IsZero() { return NULL, nil, nil @@ -169,7 +197,10 @@ func truncateTransform(val any, n int, colType string) (pathStr string, typedVal switch colType { case "int": - v, _ := val.(int32) + v, err := typeutils.ReformatInt32(val) + if err != nil { + return "", nil, err + } if n > math.MaxInt32 { return "", nil, fmt.Errorf("truncate width %d exceeds int32 range", n) } @@ -177,7 +208,10 @@ func truncateTransform(val any, n int, colType string) (pathStr string, typedVal trunc := v - (((v % n32) + n32) % n32) return fmt.Sprintf("%d", trunc), trunc, nil case "long": - v, _ := val.(int64) + v, err := typeutils.ReformatInt64(val) + if err != nil { + return "", nil, err + } n64 := int64(n) // Using Iceberg's formula for proper negative number handling trunc := v - (((v % n64) + n64) % n64) diff --git a/drivers/kafka/docker-compose.yml b/drivers/kafka/docker-compose.yml new file mode 100644 index 000000000..c0f859b98 --- /dev/null +++ b/drivers/kafka/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.8' + +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.5.0 + container_name: zookeeper + restart: unless-stopped + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + kafka: + image: confluentinc/cp-kafka:7.5.0 + container_name: kafka + restart: unless-stopped + depends_on: + - zookeeper + ports: + - "29092:29092" + - "39092:39092" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL_HOST://0.0.0.0:29092,EXTERNAL_CONT://0.0.0.0:39092 + # Advertise IPv4 endpoints to avoid `localhost` resolving to `::1` (IPv6) + # and breaking Kafka clients during local integration tests. + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_HOST://127.0.0.1:29092,EXTERNAL_CONT://host.docker.internal:39092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT,EXTERNAL_CONT:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + healthcheck: + test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 >/dev/null 2>&1 || exit 1"] + interval: 5s + timeout: 10s + retries: 30 + start_period: 20s + volumes: + - ./data/kafka-data:/var/lib/kafka/data + + schema-registry: + image: confluentinc/cp-schema-registry:latest + container_name: schema-registry + platform: linux/amd64 + depends_on: + kafka: + condition: service_healthy + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 \ No newline at end of file diff --git a/drivers/kafka/go.mod b/drivers/kafka/go.mod index 158077d91..e7be93c4a 100644 --- a/drivers/kafka/go.mod +++ b/drivers/kafka/go.mod @@ -7,17 +7,20 @@ replace github.com/datazip-inc/olake => ../../ replace google.golang.org/genproto => google.golang.org/genproto v0.0.0-20240123012728-ef4313101c80 require ( + github.com/apache/arrow-go/v18 v18.2.0 github.com/datazip-inc/olake v0.0.0-00010101000000-000000000000 github.com/linkedin/goavro/v2 v2.15.0 github.com/segmentio/kafka-go v0.4.49 + github.com/stretchr/testify v1.11.1 ) require ( + cloud.google.com/go/compute/metadata v0.9.0 // indirect dario.cat/mergo v1.0.1 // indirect github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/andybalholm/brotli v1.1.1 // indirect - github.com/apache/arrow-go/v18 v18.2.0 // indirect + github.com/apache/spark-connect-go/v35 v35.0.0-20250317154112-ffd832059443 // indirect github.com/apache/thrift v0.21.0 // indirect github.com/aws/aws-sdk-go v1.55.6 // indirect github.com/aws/aws-sdk-go-v2 v1.39.2 // indirect @@ -46,10 +49,12 @@ require ( github.com/docker/docker v28.3.3+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.2 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect github.com/gabriel-vasile/mimetype v1.4.8 // indirect + github.com/go-errors/errors v1.5.1 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.2.6 // indirect @@ -66,10 +71,12 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.10 // indirect github.com/mattn/go-colorable v0.1.14 // indirect @@ -77,6 +84,9 @@ require ( github.com/mattn/go-runewidth v0.0.16 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect + github.com/minio/md5-simd v1.1.2 // indirect + github.com/minio/minio-go/v7 v7.0.34 // indirect + github.com/minio/sha256-simd v1.0.0 // indirect github.com/mitchellh/hashstructure v1.1.0 // indirect github.com/moby/docker-image-spec v1.3.1 // indirect github.com/moby/go-archive v0.1.0 // indirect @@ -85,6 +95,8 @@ require ( github.com/moby/sys/user v0.4.0 // indirect github.com/moby/sys/userns v0.1.0 // indirect github.com/moby/term v0.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/olekukonko/tablewriter v0.0.5 // indirect @@ -98,6 +110,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/rs/xid v1.6.0 // indirect github.com/rs/zerolog v1.34.0 // indirect github.com/sagikazarmark/locafero v0.8.0 // indirect github.com/shirou/gopsutil/v4 v4.25.1 // indirect @@ -108,7 +121,6 @@ require ( github.com/spf13/cobra v1.9.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/spf13/viper v1.20.1 // indirect - github.com/stretchr/testify v1.11.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/testcontainers/testcontainers-go v0.37.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect @@ -132,6 +144,7 @@ require ( golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect golang.org/x/mod v0.30.0 // indirect golang.org/x/net v0.48.0 // indirect + golang.org/x/oauth2 v0.34.0 // indirect golang.org/x/sync v0.19.0 // indirect golang.org/x/sys v0.39.0 // indirect golang.org/x/telemetry v0.0.0-20251111182119-bc8e575c7b54 // indirect @@ -141,6 +154,7 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20251202230838-ff82c1b0f217 // indirect google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.10 // indirect + gopkg.in/ini.v1 v1.66.6 // indirect gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/drivers/kafka/internal/kafka_test.go b/drivers/kafka/internal/kafka_test.go new file mode 100644 index 000000000..e23fc9dfc --- /dev/null +++ b/drivers/kafka/internal/kafka_test.go @@ -0,0 +1,27 @@ +package driver + +import ( + "testing" + + "github.com/datazip-inc/olake/constants" + "github.com/datazip-inc/olake/utils/testutils" +) + +func TestKafkaIntegration(t *testing.T) { + t.Parallel() + testConfig := &testutils.IntegrationTest{ + TestConfig: testutils.GetTestConfig(string(constants.Kafka)), + Namespace: "topics", + ExpectedData: ExpectedKafkaData, + ExpectedUpdatedData: ExpectedKafkaUpdatedData, + DestinationDataTypeSchema: KafkaToDestinationSchema, + UpdatedDestinationDataTypeSchema: EvolvedKafkaToDestinationSchema, + DefaultCDCColumnsSchema: ExpectedKafkaDefaultCDCColumnsSchema, + ExecuteQuery: ExecuteQuery, + DestinationDB: "kafka_topics", + CursorField: "int_value:bigint", + PartitionRegex: "/{int_value,identity}", + FilterConfig: "", + } + testConfig.TestIntegration(t) +} diff --git a/drivers/kafka/internal/kafka_test_utils.go b/drivers/kafka/internal/kafka_test_utils.go new file mode 100644 index 000000000..b1e4feca5 --- /dev/null +++ b/drivers/kafka/internal/kafka_test_utils.go @@ -0,0 +1,268 @@ +package driver + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/datazip-inc/olake/utils" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/require" +) + +// ExecuteQuery executes Kafka queries for testing based on the operation type +func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) { + t.Helper() + + var brokers []string + if fileConfig { + var config Config + require.NoError(t, utils.UnmarshalFile("./testdata/source.json", &config, false), "failed to unmarshal kafka test source config") + brokers = strings.Split(config.BootstrapServers, ",") + } else { + brokers = []string{"127.0.0.1:29092"} + } + + for i, b := range brokers { + brokers[i] = strings.TrimSpace(strings.ReplaceAll(b, "host.docker.internal", "127.0.0.1")) + } + + activeBroker := waitForAnyKafkaBroker(ctx, t, brokers) + + // Message key and value + key := []byte("test-key") + value := []byte(`{"int_value": 100,"float_value": 99.99,"boolean_true": true,"boolean_false": false,"timestamp_value": "2026-03-22T14:30:00Z","string_value": "test_string"}`) + evolved_value := []byte(`{"int_value": 100,"float_value": 99.99,"boolean_true": true,"boolean_false": false,"timestamp_value": "2026-03-22T14:30:00Z","string_value": "test_string", "id_int": 101}`) + switch operation { + case "create", "clean", "drop": + // 1. Dial a reachable broker for admin operations + conn, err := kafka.DialContext(ctx, "tcp", activeBroker) + require.NoError(t, err, "failed to dial kafka for topic creation") + defer conn.Close() + + // 2. Loop over provided streams + for i := 0; i < len(streams); i++ { + + // 3. If it's a "clean" or "drop" operation, delete first + if operation == "clean" || operation == "drop" { + _ = conn.DeleteTopics(streams[i]) + time.Sleep(5 * time.Second) + if operation == "drop" { + continue + } + } + partitionNumber := utils.Ternary(i == 0, 1, 5).(int) + err = conn.CreateTopics(kafka.TopicConfig{ + Topic: streams[i], + NumPartitions: partitionNumber, + ReplicationFactor: 1, + }) + // 3. Ignore if already exists + if err != nil && err != kafka.TopicAlreadyExists { + require.NoError(t, err, "failed to create topic '%s' explicitly", streams[i]) + } + t.Logf("Topic '%s' is ready for writes (%d partitions)", streams[i], partitionNumber) + } + return + case "add", "insert": + // NEW: Initialize the writer only when needed + writer := &kafka.Writer{ + Addr: kafka.TCP(brokers...), + Balancer: &kafka.LeastBytes{}, + AllowAutoTopicCreation: false, // Much safer! + MaxAttempts: 10, + } + defer writer.Close() + + for _, s := range streams { + if strings.HasSuffix(s, "_2") { + // Topic _2: Round-robin 3 messages across 3 partitions (0, 3, 4) + for _, p := range []int{0, 3, 4} { + addDataToPartition(ctx, t, writer, s, p, key, value) + } + t.Logf("Added 3 messages to topic '%s' across partitions 0, 3, 4", s) + } else if strings.HasSuffix(s, "_3") { + // Topic _3: Fill all 5 partitions (0-4) + for p := 0; p < 5; p++ { + addDataToPartition(ctx, t, writer, s, p, key, value) + } + t.Logf("Added 5 messages to topic '%s' (one per partition)", s) + } else { + // Topic _1 OR base topic: 5 messages in its single partition (0) + for i := 0; i < 5; i++ { + addDataToPartition(ctx, t, writer, s, 0, key, value) + } + t.Logf("Added 5 messages to topic '%s' in partition 0", s) + } + } + return + case "evolve-schema": + // NEW: Initialize the writer only when needed + writer := &kafka.Writer{ + Addr: kafka.TCP(brokers...), + Balancer: &kafka.LeastBytes{}, + AllowAutoTopicCreation: false, + MaxAttempts: 10, + } + defer writer.Close() + + for _, s := range streams { + if strings.HasSuffix(s, "_2") { + // Topic _2: Round-robin 3 messages across 3 partitions (0, 3, 4) + for _, p := range []int{0, 3, 4} { + addDataToPartition(ctx, t, writer, s, p, key, evolved_value) + } + t.Logf("Added 3 messages to topic '%s' across partitions 0, 3, 4", s) + } else if strings.HasSuffix(s, "_3") { + // Topic _3: Fill all 5 partitions (0-4) + for p := 0; p < 5; p++ { + addDataToPartition(ctx, t, writer, s, p, key, evolved_value) + } + t.Logf("Added 5 messages to topic '%s' (one per partition)", s) + } else { + // Topic _1 OR base topic: 5 messages in its single partition (0) + for i := 0; i < 5; i++ { + addDataToPartition(ctx, t, writer, s, 0, key, evolved_value) + } + t.Logf("Added 5 messages to topic '%s' in partition 0", s) + } + } + return + case "Avro-insert": + + default: + t.Fatalf("unsupported operation: %s", operation) + } +} + +func addDataToPartition(ctx context.Context, t *testing.T, writer *kafka.Writer, topic string, partition int, key, value []byte) { + t.Helper() + originalBalancer := writer.Balancer + writer.Balancer = nil + writer.Topic = topic + defer func() { writer.Balancer = originalBalancer }() + writeMessagesWithRetry(ctx, t, writer, kafka.Message{ + Key: key, + Value: value, + Partition: partition, + }) + t.Logf("Added message to topic '%s', partition %d", topic, partition) +} + +func waitForAnyKafkaBroker(ctx context.Context, t *testing.T, brokers []string) string { + t.Helper() + + var lastErr error + + for { + for _, b := range brokers { + conn, err := kafka.DialContext(ctx, "tcp", b) + if err != nil { + lastErr = err + continue + } + + // 🔥 Real readiness check: metadata must be available + _, err = conn.ReadPartitions() + if err != nil { + lastErr = err + t.Logf("Waiting for Kafka broker %s to initialize metadata... (%v)", b, err) + _ = conn.Close() // IMPORTANT: avoid connection leaks + continue + } + + _ = conn.Close() + + t.Logf("Kafka broker %s is fully ready (metadata available)", b) + + // Optional: small buffer for stability in slower environments + time.Sleep(1 * time.Second) + + return b + } + + // Respect test context timeout + if err := ctx.Err(); err != nil { + t.Fatalf("kafka brokers not ready (tried: %v): last error: %v", brokers, lastErr) + } + + // Slightly relaxed retry interval + time.Sleep(1 * time.Second) + } +} + +func writeMessagesWithRetry(ctx context.Context, t *testing.T, writer *kafka.Writer, msg kafka.Message) { + t.Helper() + + var lastErr error + var attempts int + nextLog := time.Now() + for { + lastErr = writer.WriteMessages(ctx, msg) + if lastErr == nil { + return + } + attempts++ + if err := ctx.Err(); err != nil { + require.NoError(t, lastErr, "failed to write seed kafka message after %d attempts (topic=%q partition=%d)", attempts, writer.Topic, msg.Partition) + return + } + // Without this, a bad broker/topic state spins silently until the test timeout (e.g. 30m). + if time.Now().After(nextLog) { + t.Logf("kafka seed write retry: attempt=%d topic=%q partition=%d err=%v", attempts, writer.Topic, msg.Partition, lastErr) + nextLog = time.Now().Add(5 * time.Second) + } + time.Sleep(200 * time.Millisecond) + } +} + +var ExpectedKafkaData = map[string]interface{}{ + "int_value": int64(100), + "float_value": float64(99.99), + "boolean_true": true, + "boolean_false": false, + "timestamp_value": arrow.Timestamp(time.Date(2026, 3, 22, 14, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)), + "string_value": "test_string", +} + +var ExpectedKafkaUpdatedData = map[string]interface{}{ + "int_value": int64(100), + "float_value": float64(99.99), + "boolean_true": true, + "boolean_false": false, + "timestamp_value": arrow.Timestamp(time.Date(2026, 3, 22, 14, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)), + "string_value": "test_string", + "id_int": int64(101), +} + +var KafkaToDestinationSchema = map[string]string{ + "int_value": "bigint", + "float_value": "double", + "boolean_true": "boolean", + "boolean_false": "boolean", + "timestamp_value": "timestamp", + "string_value": "string", +} + +var EvolvedKafkaToDestinationSchema = map[string]string{ + "int_value": "bigint", + "float_value": "double", + "boolean_true": "boolean", + "boolean_false": "boolean", + "timestamp_value": "timestamp", + "string_value": "string", + "id_int": "bigint", +} + +var ExpectedKafkaDefaultCDCColumnsSchema = map[string]string{ + "_kafka_key": "string", + "_kafka_offset": "bigint", + "_kafka_partition": "int", + "_kafka_timestamp": "timestamp", + "_op_type": "string", + "_cdc_timestamp": "timestamp", + "_olake_id": "string", + "_olake_timestamp": "timestamp", +} diff --git a/drivers/kafka/internal/testdata/iceberg_destination.json b/drivers/kafka/internal/testdata/iceberg_destination.json new file mode 100644 index 000000000..8582710c2 --- /dev/null +++ b/drivers/kafka/internal/testdata/iceberg_destination.json @@ -0,0 +1,18 @@ +{ + "type": "ICEBERG", + "writer": { + "catalog_type": "jdbc", + "jdbc_url": "jdbc:postgresql://host.docker.internal:5432/iceberg", + "jdbc_username": "iceberg", + "jdbc_password": "password", + "iceberg_s3_path": "s3a://warehouse", + "s3_endpoint": "http://host.docker.internal:9000", + "s3_use_ssl": false, + "s3_path_style": true, + "aws_access_key": "admin", + "aws_secret_key": "password", + "iceberg_db": "olake_iceberg", + "aws_region": "us-east-1", + "arrow_writes": true + } +} diff --git a/drivers/kafka/internal/testdata/parquet_destination.json b/drivers/kafka/internal/testdata/parquet_destination.json new file mode 100644 index 000000000..b37200aeb --- /dev/null +++ b/drivers/kafka/internal/testdata/parquet_destination.json @@ -0,0 +1,10 @@ +{ + "type": "PARQUET", + "writer": { + "s3_bucket": "warehouse", + "s3_region": "us-east-1", + "s3_access_key": "admin", + "s3_secret_key": "password", + "s3_endpoint": "http://host.docker.internal:9000" + } +} \ No newline at end of file diff --git a/drivers/kafka/internal/testdata/source.json b/drivers/kafka/internal/testdata/source.json new file mode 100644 index 000000000..56128ecb9 --- /dev/null +++ b/drivers/kafka/internal/testdata/source.json @@ -0,0 +1,13 @@ +{ + "bootstrap_servers": "host.docker.internal:39092", + "protocol": { + "security_protocol": "PLAINTEXT" + }, + "consumer_group_id": "kafka-integration-test-group", + "max_threads": 1, + "threads_equal_total_partitions": false, + "backoff_retry_count": 3, + "schema_registry": { + "endpoint": "http://127.0.0.1:8081" + } +} \ No newline at end of file diff --git a/drivers/kafka/internal/testdata/test_streams.json b/drivers/kafka/internal/testdata/test_streams.json new file mode 100644 index 000000000..4618e062b --- /dev/null +++ b/drivers/kafka/internal/testdata/test_streams.json @@ -0,0 +1 @@ +{"selected_streams":{"topics":[{"partition_regex":"","stream_name":"kafka_test_table_olake_3","append_mode":true,"normalization":false,"selected_columns":{"columns":["_kafka_partition","boolean_true","_olake_id","int_value","_kafka_offset","_kafka_key","string_value","_op_type","boolean_false","_olake_timestamp","float_value","timestamp_value","_kafka_timestamp"],"sync_new_columns":true}},{"partition_regex":"","stream_name":"kafka_test_table_olake_2","append_mode":true,"normalization":false,"selected_columns":{"columns":["float_value","_kafka_partition","_kafka_timestamp","boolean_true","string_value","_olake_id","timestamp_value","_kafka_key","_kafka_offset","boolean_false","_op_type","_olake_timestamp","int_value"],"sync_new_columns":true}},{"partition_regex":"","stream_name":"kafka_test_table_olake_1","append_mode":true,"normalization":false,"selected_columns":{"columns":["string_value","float_value","int_value","_kafka_timestamp","_kafka_offset","timestamp_value","_kafka_partition","_op_type","_olake_timestamp","boolean_false","boolean_true","_kafka_key","_olake_id"],"sync_new_columns":true}}]},"streams":[{"stream":{"name":"kafka_test_table_olake_3","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake_3","default_stream_properties":{"normalization":false,"append_mode":true}}},{"stream":{"name":"kafka_test_table_olake_2","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake_2","default_stream_properties":{"normalization":false,"append_mode":true}}},{"stream":{"name":"kafka_test_table_olake_1","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake_1","default_stream_properties":{"normalization":false,"append_mode":true}}}]} \ No newline at end of file diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 6a8b61fe4..9ded4e3cf 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -394,9 +394,9 @@ func (cfg *IntegrationTest) runSyncAndVerify( t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) - // Use evolved schema only for CDC "update" operation (where schema evolution is expected) + // Use evolved schema only for CDC "update" operation or for kafka when schema evolution is expected // Incremental "insert" uses opSymbol "u" but doesn't have schema evolution - evolvedSchema := operation == "update" + evolvedSchema := operation == "update" || operation == "evolve-schema" switch destinationType { case "iceberg": @@ -481,6 +481,39 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( }, } + kafkaTestCases := []syncTestCase{ + { + name: "CDC - strict - insert", + operation: "", + useState: false, + opSymbol: "c", + expected: cfg.ExpectedData, + }, + { + name: "CDC - strict - evolve-schema", + operation: "evolve-schema", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedUpdatedData, + }, + { + name: "CDC - strict - Avro - insert", + operation: "Avro-insert", + useState: true, + opSymbol: "u", + expected: cfg.ExpectedUpdatedData, + }, + { + name: "CDC - strict - Avro - evolve-schema", + operation: "Avro-evolve-schema", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedUpdatedData, + }, + } + + testCases = utils.Ternary(slices.Contains(constants.OnlyStrictCDCDriver, constants.DriverType(cfg.TestConfig.Driver)), kafkaTestCases, testCases).([]syncTestCase) + // Run each test case for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -561,6 +594,38 @@ func (cfg *IntegrationTest) testParquetFullLoadAndCDC( }, } + kafkaTestCases := []syncTestCase{ + { + name: "CDC - strict - insert", + operation: "", + useState: false, + opSymbol: "c", + expected: cfg.ExpectedData, + }, + { + name: "CDC - strict - evolve-schema", + operation: "evolve-schema", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedUpdatedData, + }, + { + name: "CDC - strict - Avro - insert", + operation: "Avro-insert", + useState: true, + opSymbol: "u", + expected: cfg.ExpectedUpdatedData, + }, + { + name: "CDC - strict - Avro - evolve-schema", + operation: "Avro-evolve-schema", + useState: true, + opSymbol: "c", + expected: cfg.ExpectedUpdatedData, + }, + } + + testCases = utils.Ternary(slices.Contains(constants.OnlyStrictCDCDriver, constants.DriverType(cfg.TestConfig.Driver)), kafkaTestCases, testCases).([]syncTestCase) // Run each test case for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -780,6 +845,12 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { t.Logf("Test data directory: %s", cfg.TestConfig.HostTestDataPath) currentTestTable := fmt.Sprintf("%s_test_table_olake", cfg.TestConfig.Driver) + syncTopics := []string{currentTestTable} + + if cfg.TestConfig.Driver == string(constants.Kafka) { + syncTopics = []string{currentTestTable + "_1", currentTestTable + "_2", currentTestTable + "_3"} + } + t.Run("Discover", func(t *testing.T) { req := testcontainers.ContainerRequest{ Image: "golang:1.25.8-bookworm", @@ -807,9 +878,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // 2. Query on test table - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "create", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "clean", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "add", false) // 3. Run discover command discoverCmd := discoverCommand(*cfg.TestConfig) @@ -832,7 +903,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { t.Logf("Generated streams validated with test streams") // 5. Clean up - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "drop", false) t.Logf("%s discover test-container clean up", cfg.TestConfig.Driver) return nil }, @@ -881,15 +952,15 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // 2. Query on test table - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) - cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "create", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "clean", false) + cfg.ExecuteQuery(ctx, t, syncTopics, "add", false) // streamUpdateCmd := fmt.Sprintf( // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, // ) - streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, []string{currentTestTable}, true) + streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, syncTopics, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization and partition regex in streams.json (%d): %s\n%s", code, err, out, @@ -909,33 +980,39 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { if !slices.Contains(constants.SkipCDCDrivers, constants.DriverType(cfg.TestConfig.Driver)) { for _, wt := range writerTypes { t.Run(fmt.Sprintf("Iceberg (%s) Full load + CDC tests", wt.name), func(t *testing.T) { - if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndCDC); err != nil { - t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", wt.name, err) + for _, topic := range syncTopics { + if err := cfg.testIcebergWriter(ctx, t, c, topic, wt.useArrow, cfg.testIcebergFullLoadAndCDC); err != nil { + t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", wt.name, err) + } } }) } t.Run("Parquet Full load + CDC tests", func(t *testing.T) { - if err := cfg.testParquetFullLoadAndCDC(ctx, t, c, currentTestTable); err != nil { - t.Fatalf("Parquet Full load + CDC tests failed: %v", err) + for _, topic := range syncTopics { + if err := cfg.testParquetFullLoadAndCDC(ctx, t, c, topic); err != nil { + t.Fatalf("Parquet Full load + CDC tests failed: %v", err) + } } }) } - for _, wt := range writerTypes { - t.Run(fmt.Sprintf("Iceberg (%s) Full load + Incremental tests", wt.name), func(t *testing.T) { - if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndIncremental); err != nil { - t.Fatalf("Iceberg (%s) Full load + Incremental tests failed: %v", wt.name, err) + if !slices.Contains(constants.OnlyStrictCDCDriver, constants.DriverType(cfg.TestConfig.Driver)) { + for _, wt := range writerTypes { + t.Run(fmt.Sprintf("Iceberg (%s) Full load + Incremental tests", wt.name), func(t *testing.T) { + if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndIncremental); err != nil { + t.Fatalf("Iceberg (%s) Full load + Incremental tests failed: %v", wt.name, err) + } + }) + } + + t.Run("Parquet Full load + Incremental tests", func(t *testing.T) { + if err := cfg.testParquetFullLoadAndIncremental(ctx, t, c, currentTestTable); err != nil { + t.Fatalf("Parquet Full load + Incremental tests failed: %v", err) } }) } - t.Run("Parquet Full load + Incremental tests", func(t *testing.T) { - if err := cfg.testParquetFullLoadAndIncremental(ctx, t, c, currentTestTable); err != nil { - t.Fatalf("Parquet Full load + Incremental tests failed: %v", err) - } - }) - // 5. Clean up cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) t.Logf("%s sync test-container clean up", cfg.TestConfig.Driver) @@ -1005,6 +1082,11 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema "SELECT * FROM %s WHERE _op_type = '%s'", fullTableName, opSymbol, ) + if slices.Contains(constants.OnlyStrictCDCDriver, constants.DriverType(driver)) { + if _, ok := schema["id_int"]; ok { + selectQuery += " AND id_int IS NOT NULL" + } + } t.Logf("Executing query: %s", selectQuery) var selectRows []types.Row @@ -1079,7 +1161,7 @@ func VerifyIcebergSync(t *testing.T, tableName, icebergDB string, datatypeSchema for key := range defaultCDCColumnsSchema { icebergValue, ok := icebergMap[key] require.Truef(t, ok, "Row %d: missing column %q in Iceberg result", rowIdx, key) - require.NotEmpty(t, icebergValue, "Row %d: expected column %q to be non-empty, got %#v", rowIdx, key, icebergValue) + require.NotNil(t, icebergValue, "Row %d: expected column %q to be non-empty, got %#v", rowIdx, key, icebergValue) if key == constants.CdcTimestamp { ts, ok := normalizeToTime(icebergValue) require.Truef(t, ok, "Row %d: expected %q to be a timestamp, got %T (%#v)", rowIdx, key, icebergValue, icebergValue) @@ -1254,7 +1336,7 @@ func VerifyParquetSync(t *testing.T, tableName, parquetDB string, datatypeSchema for key := range defaultCDCColumnsSchema { val, ok := parquetMap[key] require.Truef(t, ok, "Row %d: missing column %q in Parquet result", rowIdx, key) - require.NotEmpty(t, val, "Row %d: expected column %q to be non-empty, got %#v", rowIdx, key, val) + require.NotNil(t, val, "Row %d: expected column %q to be non-empty, got %#v", rowIdx, key, val) if key == constants.CdcTimestamp { ts, ok := normalizeToTime(val) require.Truef(t, ok, "Row %d: expected %q to be a timestamp, got %T (%#v)", rowIdx, key, val, val) From 4ff6ef78ad6ec66756eba498e643598265e96bd4 Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Thu, 26 Mar 2026 18:15:52 +0530 Subject: [PATCH 2/8] chore: Solved Filter issue --- drivers/kafka/internal/kafka_test.go | 20 +- drivers/kafka/internal/kafka_test_utils.go | 278 +++++++++++++----- .../testdata/iceberg_destination.json | 2 +- drivers/kafka/internal/testdata/source.json | 6 +- .../kafka/internal/testdata/test_streams.json | 2 +- utils/testutils/test_utils.go | 61 ++-- utils/typeutils/compare.go | 10 + 7 files changed, 271 insertions(+), 108 deletions(-) diff --git a/drivers/kafka/internal/kafka_test.go b/drivers/kafka/internal/kafka_test.go index e23fc9dfc..31c13fc4d 100644 --- a/drivers/kafka/internal/kafka_test.go +++ b/drivers/kafka/internal/kafka_test.go @@ -21,7 +21,25 @@ func TestKafkaIntegration(t *testing.T) { DestinationDB: "kafka_topics", CursorField: "int_value:bigint", PartitionRegex: "/{int_value,identity}", - FilterConfig: "", + ExtraExpectedData: map[string]map[string]interface{}{ + "Avro-insert": ExpectedKafkaAvroData, + "Avro-evolve-schema": ExpectedKafkaAvroUpdatedData, + }, + FilterConfig: `{ + "logical_operator": "And", + "conditions": [ + { + "column": "int_value", + "operator": ">=", + "value": 100 + }, + { + "column": "float_value", + "operator": "<", + "value": 100.00 + } + ] + }`, } testConfig.TestIntegration(t) } diff --git a/drivers/kafka/internal/kafka_test_utils.go b/drivers/kafka/internal/kafka_test_utils.go index b1e4feca5..62b6d1718 100644 --- a/drivers/kafka/internal/kafka_test_utils.go +++ b/drivers/kafka/internal/kafka_test_utils.go @@ -1,13 +1,19 @@ package driver import ( + "bytes" "context" + "encoding/binary" + "encoding/json" + "fmt" + "net/http" "strings" "testing" "time" "github.com/apache/arrow-go/v18/arrow" "github.com/datazip-inc/olake/utils" + "github.com/linkedin/goavro/v2" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" ) @@ -35,37 +41,33 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation key := []byte("test-key") value := []byte(`{"int_value": 100,"float_value": 99.99,"boolean_true": true,"boolean_false": false,"timestamp_value": "2026-03-22T14:30:00Z","string_value": "test_string"}`) evolved_value := []byte(`{"int_value": 100,"float_value": 99.99,"boolean_true": true,"boolean_false": false,"timestamp_value": "2026-03-22T14:30:00Z","string_value": "test_string", "id_int": 101}`) + filtervalue1 := []byte(`{"int_value": 99,"float_value": 99.99}`) + filtervalue2 := []byte(`{"int_value": 100,"float_value": 100.00}`) switch operation { case "create", "clean", "drop": // 1. Dial a reachable broker for admin operations conn, err := kafka.DialContext(ctx, "tcp", activeBroker) require.NoError(t, err, "failed to dial kafka for topic creation") defer conn.Close() - - // 2. Loop over provided streams - for i := 0; i < len(streams); i++ { - - // 3. If it's a "clean" or "drop" operation, delete first - if operation == "clean" || operation == "drop" { - _ = conn.DeleteTopics(streams[i]) - time.Sleep(5 * time.Second) - if operation == "drop" { - continue - } + // 3. If it's a "clean" or "drop" operation, delete first + if operation == "clean" || operation == "drop" { + _ = conn.DeleteTopics(streams[0]) + time.Sleep(5 * time.Second) + if operation == "drop" { + return } - partitionNumber := utils.Ternary(i == 0, 1, 5).(int) - err = conn.CreateTopics(kafka.TopicConfig{ - Topic: streams[i], - NumPartitions: partitionNumber, - ReplicationFactor: 1, - }) - // 3. Ignore if already exists - if err != nil && err != kafka.TopicAlreadyExists { - require.NoError(t, err, "failed to create topic '%s' explicitly", streams[i]) - } - t.Logf("Topic '%s' is ready for writes (%d partitions)", streams[i], partitionNumber) } - return + partitionNumber := 5 + err = conn.CreateTopics(kafka.TopicConfig{ + Topic: streams[0], + NumPartitions: partitionNumber, + ReplicationFactor: 1, + }) + // 3. Ignore if already exists + if err != nil && err != kafka.TopicAlreadyExists { + require.NoError(t, err, "failed to create topic '%s' explicitly", streams[0]) + } + t.Logf("Topic '%s' is ready for writes (%d partitions)", streams[0], partitionNumber) case "add", "insert": // NEW: Initialize the writer only when needed writer := &kafka.Writer{ @@ -75,29 +77,12 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation MaxAttempts: 10, } defer writer.Close() - - for _, s := range streams { - if strings.HasSuffix(s, "_2") { - // Topic _2: Round-robin 3 messages across 3 partitions (0, 3, 4) - for _, p := range []int{0, 3, 4} { - addDataToPartition(ctx, t, writer, s, p, key, value) - } - t.Logf("Added 3 messages to topic '%s' across partitions 0, 3, 4", s) - } else if strings.HasSuffix(s, "_3") { - // Topic _3: Fill all 5 partitions (0-4) - for p := 0; p < 5; p++ { - addDataToPartition(ctx, t, writer, s, p, key, value) - } - t.Logf("Added 5 messages to topic '%s' (one per partition)", s) - } else { - // Topic _1 OR base topic: 5 messages in its single partition (0) - for i := 0; i < 5; i++ { - addDataToPartition(ctx, t, writer, s, 0, key, value) - } - t.Logf("Added 5 messages to topic '%s' in partition 0", s) - } + for p := 0; p < 5; p++ { + addDataToPartition(ctx, t, writer, streams[0], p, key, value) } - return + addDataToPartition(ctx, t, writer, streams[0], 0, key, filtervalue1) + addDataToPartition(ctx, t, writer, streams[0], 1, key, filtervalue2) + t.Logf("Added 5 messages to topic '%s' (one per partition)", streams[0]) case "evolve-schema": // NEW: Initialize the writer only when needed writer := &kafka.Writer{ @@ -107,31 +92,94 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation MaxAttempts: 10, } defer writer.Close() + for p := 0; p < 5; p++ { + addDataToPartition(ctx, t, writer, streams[0], p, key, evolved_value) + } + t.Logf("Added 5 messages to topic '%s' (one per partition)", streams[0]) + case "Avro-insert", "Avro-evolve-schema": + var config Config + utils.UnmarshalFile("./testdata/source.json", &config, false) + + registryURL := config.SchemaRegistry.Endpoint + registryURL = strings.ReplaceAll(registryURL, "host.docker.internal", "127.0.0.1") + + writer := &kafka.Writer{ + Addr: kafka.TCP(brokers...), + Balancer: &kafka.LeastBytes{}, + MaxAttempts: 10, + RequiredAcks: kafka.RequireAll, + } + defer writer.Close() - for _, s := range streams { - if strings.HasSuffix(s, "_2") { - // Topic _2: Round-robin 3 messages across 3 partitions (0, 3, 4) - for _, p := range []int{0, 3, 4} { - addDataToPartition(ctx, t, writer, s, p, key, evolved_value) - } - t.Logf("Added 3 messages to topic '%s' across partitions 0, 3, 4", s) - } else if strings.HasSuffix(s, "_3") { - // Topic _3: Fill all 5 partitions (0-4) - for p := 0; p < 5; p++ { - addDataToPartition(ctx, t, writer, s, p, key, evolved_value) - } - t.Logf("Added 5 messages to topic '%s' (one per partition)", s) - } else { - // Topic _1 OR base topic: 5 messages in its single partition (0) - for i := 0; i < 5; i++ { - addDataToPartition(ctx, t, writer, s, 0, key, evolved_value) - } - t.Logf("Added 5 messages to topic '%s' in partition 0", s) - } + // Base schema + schemaV1 := `{ + "type":"record", + "name":"test", + "fields":[ + {"name":"int32_value","type":"int"}, + {"name":"int64_value","type":"long"}, + {"name":"float32_value","type":"float"}, + {"name":"float64_value","type":"double"}, + {"name":"boolean_true","type":"boolean"}, + {"name":"boolean_false","type":"boolean"}, + {"name":"timestamp_value","type":{"type":"long","logicalType":"timestamp-micros"}}, + {"name":"string_value","type":"string"} + ] + }` + + // Evolved schema + schemaV2 := `{ + "type":"record", + "name":"test", + "fields":[ + {"name":"int32_value","type":"long"}, + {"name":"int64_value","type":"long"}, + {"name":"float32_value","type":"float"}, + {"name":"float64_value","type":"double"}, + {"name":"boolean_true","type":"boolean"}, + {"name":"boolean_false","type":"boolean"}, + {"name":"timestamp_value","type":{"type":"long","logicalType":"timestamp-micros"}}, + {"name":"string_value","type":"string"}, + {"name":"id_int","type":"long","default":0} + ] + }` + + schema := schemaV1 + if operation == "Avro-evolve-schema" { + schema = schemaV2 } - return - case "Avro-insert": - + + codec, err := goavro.NewCodec(schema) + require.NoError(t, err) + + schemaID := registerSchemaWithRetry(t, registryURL, streams[0], schema) + + dataToProduce := map[string]interface{}{ + "int32_value": int32(32), + "int64_value": int64(6400000000), + "float32_value": float32(32.5), + "float64_value": float64(64.6464), + "boolean_true": true, + "boolean_false": false, + "timestamp_value": int64(time.Date(2026, 3, 22, 14, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)), + "string_value": "test_string", + } + + if operation == "Avro-evolve-schema" { + dataToProduce["id_int"] = int64(101) + } + + binaryData, err := codec.BinaryFromNative(nil, dataToProduce) + require.NoError(t, err) + + confluentMsg := encodeConfluentBinary(schemaID, binaryData) + + err = writer.WriteMessages(ctx, kafka.Message{ + Topic: streams[0], + Key: []byte("avro-key"), + Value: confluentMsg, + }) + require.NoError(t, err) default: t.Fatalf("unsupported operation: %s", operation) } @@ -218,6 +266,67 @@ func writeMessagesWithRetry(ctx context.Context, t *testing.T, writer *kafka.Wri } } +func retryWithBackoff(attempts int, baseDelay time.Duration, fn func() error) error { + delay := baseDelay + var err error + + for i := 0; i < attempts; i++ { + err = fn() + if err == nil { + return nil + } + time.Sleep(delay) + delay *= 2 + } + return fmt.Errorf("after %d attempts, last error: %w", attempts, err) +} + +func registerSchemaWithRetry(t *testing.T, url, topic, schema string) uint32 { + body, err := json.Marshal(map[string]string{"schema": schema}) + require.NoError(t, err) + + var schemaID uint32 + + client := &http.Client{Timeout: 10 * time.Second} + + err = retryWithBackoff(5, 2*time.Second, func() error { + resp, err := client.Post( + fmt.Sprintf("%s/subjects/%s-value/versions", url, topic), + "application/vnd.schemaregistry.v1+json", + bytes.NewReader(body), + ) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("registry not ready (status: %d)", resp.StatusCode) + } + + var res struct { + ID uint32 `json:"id"` + } + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return err + } + + schemaID = res.ID + return nil + }) + + require.NoError(t, err) + return schemaID +} + +func encodeConfluentBinary(id uint32, data []byte) []byte { + out := make([]byte, 5+len(data)) + out[0] = 0x00 + binary.BigEndian.PutUint32(out[1:5], id) + copy(out[5:], data) + return out +} + var ExpectedKafkaData = map[string]interface{}{ "int_value": int64(100), "float_value": float64(99.99), @@ -266,3 +375,36 @@ var ExpectedKafkaDefaultCDCColumnsSchema = map[string]string{ "_olake_id": "string", "_olake_timestamp": "timestamp", } + +var ExpectedKafkaAvroUpdatedData = map[string]interface{}{ + "int32_value": int64(32), // 🔥 promoted from int → long + "int64_value": int64(6400000000), + + "float32_value": float32(32.5), + "float64_value": float64(64.6464), + + "boolean_true": true, + "boolean_false": false, + + "timestamp_value": arrow.Timestamp(time.Date(2026, 3, 22, 14, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)), + + "string_value": "test_string", + + "id_int": int64(101), // new field +} + +var ExpectedKafkaAvroData = map[string]interface{}{ + "int32_value": int32(32), + "int64_value": int64(6400000000), + + // ⚠️ float32 might get promoted to float64 in destination + "float32_value": float32(32.5), + "float64_value": float64(64.6464), + + "boolean_true": true, + "boolean_false": false, + + "timestamp_value": arrow.Timestamp(time.Date(2026, 3, 22, 14, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)), + + "string_value": "test_string", +} diff --git a/drivers/kafka/internal/testdata/iceberg_destination.json b/drivers/kafka/internal/testdata/iceberg_destination.json index 8582710c2..4972d4bae 100644 --- a/drivers/kafka/internal/testdata/iceberg_destination.json +++ b/drivers/kafka/internal/testdata/iceberg_destination.json @@ -13,6 +13,6 @@ "aws_secret_key": "password", "iceberg_db": "olake_iceberg", "aws_region": "us-east-1", - "arrow_writes": true + "arrow_writes": false } } diff --git a/drivers/kafka/internal/testdata/source.json b/drivers/kafka/internal/testdata/source.json index 56128ecb9..93af2e656 100644 --- a/drivers/kafka/internal/testdata/source.json +++ b/drivers/kafka/internal/testdata/source.json @@ -4,10 +4,10 @@ "security_protocol": "PLAINTEXT" }, "consumer_group_id": "kafka-integration-test-group", - "max_threads": 1, - "threads_equal_total_partitions": false, + "max_threads": 10, + "threads_equal_total_partitions": true, "backoff_retry_count": 3, "schema_registry": { - "endpoint": "http://127.0.0.1:8081" + "endpoint": "http://host.docker.internal:8081" } } \ No newline at end of file diff --git a/drivers/kafka/internal/testdata/test_streams.json b/drivers/kafka/internal/testdata/test_streams.json index 4618e062b..b7ef98f60 100644 --- a/drivers/kafka/internal/testdata/test_streams.json +++ b/drivers/kafka/internal/testdata/test_streams.json @@ -1 +1 @@ -{"selected_streams":{"topics":[{"partition_regex":"","stream_name":"kafka_test_table_olake_3","append_mode":true,"normalization":false,"selected_columns":{"columns":["_kafka_partition","boolean_true","_olake_id","int_value","_kafka_offset","_kafka_key","string_value","_op_type","boolean_false","_olake_timestamp","float_value","timestamp_value","_kafka_timestamp"],"sync_new_columns":true}},{"partition_regex":"","stream_name":"kafka_test_table_olake_2","append_mode":true,"normalization":false,"selected_columns":{"columns":["float_value","_kafka_partition","_kafka_timestamp","boolean_true","string_value","_olake_id","timestamp_value","_kafka_key","_kafka_offset","boolean_false","_op_type","_olake_timestamp","int_value"],"sync_new_columns":true}},{"partition_regex":"","stream_name":"kafka_test_table_olake_1","append_mode":true,"normalization":false,"selected_columns":{"columns":["string_value","float_value","int_value","_kafka_timestamp","_kafka_offset","timestamp_value","_kafka_partition","_op_type","_olake_timestamp","boolean_false","boolean_true","_kafka_key","_olake_id"],"sync_new_columns":true}}]},"streams":[{"stream":{"name":"kafka_test_table_olake_3","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake_3","default_stream_properties":{"normalization":false,"append_mode":true}}},{"stream":{"name":"kafka_test_table_olake_2","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake_2","default_stream_properties":{"normalization":false,"append_mode":true}}},{"stream":{"name":"kafka_test_table_olake_1","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake_1","default_stream_properties":{"normalization":false,"append_mode":true}}}]} \ No newline at end of file +{"selected_streams":{"topics":[{"partition_regex":"","stream_name":"kafka_test_table_olake","append_mode":true,"normalization":false,"selected_columns":{"columns":["timestamp_value","_olake_id","_kafka_partition","_kafka_offset","boolean_false","string_value","int_value","_kafka_timestamp","_olake_timestamp","_op_type","_kafka_key","float_value","boolean_true"],"sync_new_columns":true}}]},"streams":[{"stream":{"name":"kafka_test_table_olake","namespace":"topics","type_schema":{"properties":{"_kafka_key":{"type":["string"],"destination_column_name":"_kafka_key"},"_kafka_offset":{"type":["integer"],"destination_column_name":"_kafka_offset"},"_kafka_partition":{"type":["integer_small"],"destination_column_name":"_kafka_partition"},"_kafka_timestamp":{"type":["timestamp_milli"],"destination_column_name":"_kafka_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id","olake_column":true},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp","olake_column":true},"_op_type":{"type":["null","string"],"destination_column_name":"_op_type","olake_column":true},"boolean_false":{"type":["boolean"],"destination_column_name":"boolean_false"},"boolean_true":{"type":["boolean"],"destination_column_name":"boolean_true"},"float_value":{"type":["number"],"destination_column_name":"float_value"},"int_value":{"type":["integer"],"destination_column_name":"int_value"},"string_value":{"type":["string"],"destination_column_name":"string_value"},"timestamp_value":{"type":["timestamp"],"destination_column_name":"timestamp_value"}}},"supported_sync_modes":["strict_cdc"],"source_defined_primary_key":["_kafka_offset","_kafka_partition"],"available_cursor_fields":[],"sync_mode":"strict_cdc","destination_database":"kafka:topics","destination_table":"kafka_test_table_olake","default_stream_properties":{"normalization":false,"append_mode":true}}}]} \ No newline at end of file diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 9ded4e3cf..3b4942342 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -49,6 +49,7 @@ type IntegrationTest struct { CursorField string PartitionRegex string FilterConfig string + ExtraExpectedData map[string]map[string]interface{} } type PerformanceTest struct { @@ -378,7 +379,7 @@ func (cfg *IntegrationTest) runSyncAndVerify( cmd := syncCommand(*cfg.TestConfig, useState, destinationType, "--destination-database-prefix", destDBPrefix) // Execute operation before sync if needed - if useState && operation != "" { + if (useState || slices.Contains(constants.OnlyStrictCDCDriver, constants.DriverType(cfg.TestConfig.Driver))) && operation != "" { cfg.ExecuteQuery(ctx, t, []string{testTable}, operation, false) if cfg.TestConfig.Driver == "mssql" { t.Log("Waiting 20 seconds for MSSQL CDC to process transactions...") @@ -394,9 +395,9 @@ func (cfg *IntegrationTest) runSyncAndVerify( t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) - // Use evolved schema only for CDC "update" operation or for kafka when schema evolution is expected + // Use evolved schema only for CDC "update" operation or for kafka when schema evolution is expected // Incremental "insert" uses opSymbol "u" but doesn't have schema evolution - evolvedSchema := operation == "update" || operation == "evolve-schema" + evolvedSchema := operation == "update" || operation == "evolve-schema" || operation == "Avro-evolve-schema" switch destinationType { case "iceberg": @@ -499,16 +500,16 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( { name: "CDC - strict - Avro - insert", operation: "Avro-insert", - useState: true, - opSymbol: "u", - expected: cfg.ExpectedUpdatedData, + useState: false, + opSymbol: "c", + expected: cfg.ExtraExpectedData["Avro-insert"], }, { name: "CDC - strict - Avro - evolve-schema", operation: "Avro-evolve-schema", useState: true, opSymbol: "c", - expected: cfg.ExpectedUpdatedData, + expected: cfg.ExtraExpectedData["Avro-evolve-schema"], }, } @@ -523,7 +524,9 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( cfg.ExecuteQuery(ctx, t, []string{testTable}, "evolve-schema", false) } } - + if slices.Contains(constants.OnlyStrictCDCDriver, constants.DriverType(cfg.TestConfig.Driver)) && tc.operation == "Avro-insert" { + dropIcebergTable(t, testTable, cfg.DestinationDB) + } if err := cfg.runSyncAndVerify( ctx, t, @@ -612,16 +615,16 @@ func (cfg *IntegrationTest) testParquetFullLoadAndCDC( { name: "CDC - strict - Avro - insert", operation: "Avro-insert", - useState: true, - opSymbol: "u", - expected: cfg.ExpectedUpdatedData, + useState: false, + opSymbol: "c", + expected: cfg.ExtraExpectedData["Avro-insert"], }, { name: "CDC - strict - Avro - evolve-schema", operation: "Avro-evolve-schema", useState: true, opSymbol: "c", - expected: cfg.ExpectedUpdatedData, + expected: cfg.ExtraExpectedData["Avro-evolve-schema"], }, } @@ -845,12 +848,6 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { t.Logf("Test data directory: %s", cfg.TestConfig.HostTestDataPath) currentTestTable := fmt.Sprintf("%s_test_table_olake", cfg.TestConfig.Driver) - syncTopics := []string{currentTestTable} - - if cfg.TestConfig.Driver == string(constants.Kafka) { - syncTopics = []string{currentTestTable + "_1", currentTestTable + "_2", currentTestTable + "_3"} - } - t.Run("Discover", func(t *testing.T) { req := testcontainers.ContainerRequest{ Image: "golang:1.25.8-bookworm", @@ -878,9 +875,9 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // 2. Query on test table - cfg.ExecuteQuery(ctx, t, syncTopics, "create", false) - cfg.ExecuteQuery(ctx, t, syncTopics, "clean", false) - cfg.ExecuteQuery(ctx, t, syncTopics, "add", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) // 3. Run discover command discoverCmd := discoverCommand(*cfg.TestConfig) @@ -903,7 +900,7 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { t.Logf("Generated streams validated with test streams") // 5. Clean up - cfg.ExecuteQuery(ctx, t, syncTopics, "drop", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "drop", false) t.Logf("%s discover test-container clean up", cfg.TestConfig.Driver) return nil }, @@ -952,15 +949,15 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { } // 2. Query on test table - cfg.ExecuteQuery(ctx, t, syncTopics, "create", false) - cfg.ExecuteQuery(ctx, t, syncTopics, "clean", false) - cfg.ExecuteQuery(ctx, t, syncTopics, "add", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "create", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "clean", false) + cfg.ExecuteQuery(ctx, t, []string{currentTestTable}, "add", false) // streamUpdateCmd := fmt.Sprintf( // `jq '(.selected_streams[][] | .normalization) = true' %s > /tmp/streams.json && mv /tmp/streams.json %s`, // cfg.TestConfig.CatalogPath, cfg.TestConfig.CatalogPath, // ) - streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, syncTopics, true) + streamUpdateCmd := updateSelectedStreamsCommand(*cfg.TestConfig, cfg.Namespace, cfg.PartitionRegex, cfg.FilterConfig, []string{currentTestTable}, true) if code, out, err := utils.ExecCommand(ctx, c, streamUpdateCmd); err != nil || code != 0 { return fmt.Errorf("failed to enable normalization and partition regex in streams.json (%d): %s\n%s", code, err, out, @@ -980,19 +977,15 @@ func (cfg *IntegrationTest) TestIntegration(t *testing.T) { if !slices.Contains(constants.SkipCDCDrivers, constants.DriverType(cfg.TestConfig.Driver)) { for _, wt := range writerTypes { t.Run(fmt.Sprintf("Iceberg (%s) Full load + CDC tests", wt.name), func(t *testing.T) { - for _, topic := range syncTopics { - if err := cfg.testIcebergWriter(ctx, t, c, topic, wt.useArrow, cfg.testIcebergFullLoadAndCDC); err != nil { - t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", wt.name, err) - } + if err := cfg.testIcebergWriter(ctx, t, c, currentTestTable, wt.useArrow, cfg.testIcebergFullLoadAndCDC); err != nil { + t.Fatalf("Iceberg (%s) Full load + CDC tests failed: %v", wt.name, err) } }) } t.Run("Parquet Full load + CDC tests", func(t *testing.T) { - for _, topic := range syncTopics { - if err := cfg.testParquetFullLoadAndCDC(ctx, t, c, topic); err != nil { - t.Fatalf("Parquet Full load + CDC tests failed: %v", err) - } + if err := cfg.testParquetFullLoadAndCDC(ctx, t, c, currentTestTable); err != nil { + t.Fatalf("Parquet Full load + CDC tests failed: %v", err) } }) } diff --git a/utils/typeutils/compare.go b/utils/typeutils/compare.go index 0db5b666d..8611cebc2 100644 --- a/utils/typeutils/compare.go +++ b/utils/typeutils/compare.go @@ -1,6 +1,7 @@ package typeutils import ( + "encoding/json" "fmt" "reflect" "strings" @@ -65,6 +66,15 @@ func Compare(a, b any) int { return 1 } return 0 + case json.Number: + // Try int64 first, then fall back to float64. + if aInt, err := aVal.Int64(); err == nil { + return Compare(aInt, b) + } + if aFloat, err := aVal.Float64(); err == nil { + return Compare(aFloat, b) + } + return strings.Compare(aVal.String(), fmt.Sprintf("%v", b)) default: // check for custom timestamp aTime, aOk := a.(Time) From b517293dce1a161a0666744378372f5dfa1acc21 Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Thu, 26 Mar 2026 23:59:00 +0530 Subject: [PATCH 3/8] fix: update docker-compose for kafka integration test --- drivers/kafka/docker-compose.yml | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/drivers/kafka/docker-compose.yml b/drivers/kafka/docker-compose.yml index c0f859b98..bc3498146 100644 --- a/drivers/kafka/docker-compose.yml +++ b/drivers/kafka/docker-compose.yml @@ -1,8 +1,7 @@ -version: '3.8' services: zookeeper: - image: confluentinc/cp-zookeeper:7.5.0 + image: confluentinc/cp-zookeeper:7.6.0 container_name: zookeeper restart: unless-stopped environment: @@ -10,7 +9,7 @@ services: ZOOKEEPER_TICK_TIME: 2000 kafka: - image: confluentinc/cp-kafka:7.5.0 + image: confluentinc/cp-kafka:7.6.0 container_name: kafka restart: unless-stopped depends_on: @@ -22,8 +21,6 @@ services: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,EXTERNAL_HOST://0.0.0.0:29092,EXTERNAL_CONT://0.0.0.0:39092 - # Advertise IPv4 endpoints to avoid `localhost` resolving to `::1` (IPv6) - # and breaking Kafka clients during local integration tests. KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,EXTERNAL_HOST://127.0.0.1:29092,EXTERNAL_CONT://host.docker.internal:39092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL_HOST:PLAINTEXT,EXTERNAL_CONT:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL @@ -31,18 +28,17 @@ services: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 healthcheck: - test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 >/dev/null 2>&1 || exit 1"] - interval: 5s + test: ["CMD-SHELL", "kafka-broker-api-versions --bootstrap-server localhost:9092 || exit 1"] + interval: 10s timeout: 10s - retries: 30 - start_period: 20s + retries: 5 + start_period: 30s volumes: - - ./data/kafka-data:/var/lib/kafka/data + - kafka_storage:/var/lib/kafka/data schema-registry: - image: confluentinc/cp-schema-registry:latest + image: confluentinc/cp-schema-registry:7.6.0 container_name: schema-registry - platform: linux/amd64 depends_on: kafka: condition: service_healthy @@ -51,4 +47,7 @@ services: environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' - SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 \ No newline at end of file + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + +volumes: + kafka_storage: \ No newline at end of file From 1cc22508d8e365adc23d52d1460c194409b4ab5c Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Fri, 27 Mar 2026 00:48:48 +0530 Subject: [PATCH 4/8] chore: increased concurrency --- .github/workflows/integration-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 5deb318a6..7e1a46a69 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -153,7 +153,7 @@ jobs: - name: Run Integration Tests run: | - go test -v -p 6 ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... ./drivers/mssql/internal/... ./drivers/kafka/internal/... -timeout 0 -run 'Integration' + go test -v -p 7 ./drivers/kafka/internal/... ./drivers/mysql/internal/... ./drivers/postgres/internal/... ./drivers/mongodb/internal/... ./drivers/oracle/internal/... ./drivers/db2/internal/... ./drivers/mssql/internal/... -timeout 0 -run 'Integration' - name: Cleanup if: always() From bbbc29754dda9a73b335b1009520650508d7610c Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Fri, 27 Mar 2026 07:41:30 +0530 Subject: [PATCH 5/8] chore: All testcases passed --- drivers/kafka/internal/kafka_test.go | 4 + drivers/kafka/internal/kafka_test_utils.go | 44 ++++++++- utils/testutils/test_utils.go | 103 ++++++++++++--------- 3 files changed, 102 insertions(+), 49 deletions(-) diff --git a/drivers/kafka/internal/kafka_test.go b/drivers/kafka/internal/kafka_test.go index 31c13fc4d..5a846b81c 100644 --- a/drivers/kafka/internal/kafka_test.go +++ b/drivers/kafka/internal/kafka_test.go @@ -25,6 +25,10 @@ func TestKafkaIntegration(t *testing.T) { "Avro-insert": ExpectedKafkaAvroData, "Avro-evolve-schema": ExpectedKafkaAvroUpdatedData, }, + ExtraExpectedDataSchema: map[string]map[string]string{ + "Avro-insert": ExpectedKafkaAvroDataSchema, + "Avro-evolve-schema": ExpectedKafkaAvroUpdatedDataSchema, + }, FilterConfig: `{ "logical_operator": "And", "conditions": [ diff --git a/drivers/kafka/internal/kafka_test_utils.go b/drivers/kafka/internal/kafka_test_utils.go index 62b6d1718..38aa49c21 100644 --- a/drivers/kafka/internal/kafka_test_utils.go +++ b/drivers/kafka/internal/kafka_test_utils.go @@ -118,8 +118,10 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation "fields":[ {"name":"int32_value","type":"int"}, {"name":"int64_value","type":"long"}, + {"name":"int_value","type":"long"}, {"name":"float32_value","type":"float"}, {"name":"float64_value","type":"double"}, + {"name":"float_value","type":"double"}, {"name":"boolean_true","type":"boolean"}, {"name":"boolean_false","type":"boolean"}, {"name":"timestamp_value","type":{"type":"long","logicalType":"timestamp-micros"}}, @@ -134,8 +136,10 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation "fields":[ {"name":"int32_value","type":"long"}, {"name":"int64_value","type":"long"}, + {"name":"int_value","type":"long"}, {"name":"float32_value","type":"float"}, {"name":"float64_value","type":"double"}, + {"name":"float_value","type":"double"}, {"name":"boolean_true","type":"boolean"}, {"name":"boolean_false","type":"boolean"}, {"name":"timestamp_value","type":{"type":"long","logicalType":"timestamp-micros"}}, @@ -155,10 +159,12 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation schemaID := registerSchemaWithRetry(t, registryURL, streams[0], schema) dataToProduce := map[string]interface{}{ - "int32_value": int32(32), + "int32_value": int32(132), "int64_value": int64(6400000000), + "int_value": int64(101), "float32_value": float32(32.5), "float64_value": float64(64.6464), + "float_value": float64(66.6666), "boolean_true": true, "boolean_false": false, "timestamp_value": int64(time.Date(2026, 3, 22, 14, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)), @@ -376,12 +382,41 @@ var ExpectedKafkaDefaultCDCColumnsSchema = map[string]string{ "_olake_timestamp": "timestamp", } +var ExpectedKafkaAvroDataSchema = map[string]string{ + "int32_value": "int", + "int64_value": "bigint", + "int_value": "bigint", + "float32_value": "float", + "float64_value": "double", + "float_value": "double", + "boolean_true": "boolean", + "boolean_false": "boolean", + "timestamp_value": "timestamp", + "string_value": "string", +} + +var ExpectedKafkaAvroUpdatedDataSchema = map[string]string{ + "int32_value": "bigint", + "int64_value": "bigint", + "int_value": "bigint", + "float32_value": "float", + "float64_value": "double", + "float_value": "double", + "boolean_true": "boolean", + "boolean_false": "boolean", + "timestamp_value": "timestamp", + "string_value": "string", + "id_int": "bigint", +} + var ExpectedKafkaAvroUpdatedData = map[string]interface{}{ - "int32_value": int64(32), // 🔥 promoted from int → long + "int32_value": int64(132), // promoted from int → long "int64_value": int64(6400000000), + "int_value": int64(101), "float32_value": float32(32.5), "float64_value": float64(64.6464), + "float_value": float64(66.6666), "boolean_true": true, "boolean_false": false, @@ -394,12 +429,13 @@ var ExpectedKafkaAvroUpdatedData = map[string]interface{}{ } var ExpectedKafkaAvroData = map[string]interface{}{ - "int32_value": int32(32), + "int32_value": int32(132), "int64_value": int64(6400000000), + "int_value": int64(101), - // ⚠️ float32 might get promoted to float64 in destination "float32_value": float32(32.5), "float64_value": float64(64.6464), + "float_value": float64(66.6666), "boolean_true": true, "boolean_false": false, diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 3b4942342..3f8e226ba 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -50,6 +50,7 @@ type IntegrationTest struct { PartitionRegex string FilterConfig string ExtraExpectedData map[string]map[string]interface{} + ExtraExpectedDataSchema map[string]map[string]string } type PerformanceTest struct { @@ -359,7 +360,8 @@ type syncTestCase struct { operation string useState bool opSymbol string - expected map[string]interface{} + expectedData map[string]interface{} + expectedSchema map[string]string } // runSyncAndVerify executes a sync command and verifies the results in Iceberg @@ -374,6 +376,7 @@ func (cfg *IntegrationTest) runSyncAndVerify( opSymbol string, schema map[string]interface{}, isCDC bool, + expectedSchema map[string]string, ) error { destDBPrefix := fmt.Sprintf("integration_%s", cfg.TestConfig.Driver) cmd := syncCommand(*cfg.TestConfig, useState, destinationType, "--destination-database-prefix", destDBPrefix) @@ -395,27 +398,11 @@ func (cfg *IntegrationTest) runSyncAndVerify( t.Logf("Sync successful for %s driver", cfg.TestConfig.Driver) - // Use evolved schema only for CDC "update" operation or for kafka when schema evolution is expected - // Incremental "insert" uses opSymbol "u" but doesn't have schema evolution - evolvedSchema := operation == "update" || operation == "evolve-schema" || operation == "Avro-evolve-schema" - switch destinationType { case "iceberg": - { - if evolvedSchema { - VerifyIcebergSync(t, testTable, cfg.DestinationDB, cfg.UpdatedDestinationDataTypeSchema, cfg.DefaultCDCColumnsSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver, isCDC) - } else { - VerifyIcebergSync(t, testTable, cfg.DestinationDB, cfg.DestinationDataTypeSchema, cfg.DefaultCDCColumnsSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver, isCDC) - } - } + VerifyIcebergSync(t, testTable, cfg.DestinationDB, expectedSchema, cfg.DefaultCDCColumnsSchema, schema, opSymbol, cfg.PartitionRegex, cfg.TestConfig.Driver, isCDC) case "parquet": - { - if evolvedSchema { - VerifyParquetSync(t, testTable, cfg.DestinationDB, cfg.UpdatedDestinationDataTypeSchema, cfg.DefaultCDCColumnsSchema, schema, opSymbol, cfg.TestConfig.Driver, isCDC) - } else { - VerifyParquetSync(t, testTable, cfg.DestinationDB, cfg.DestinationDataTypeSchema, cfg.DefaultCDCColumnsSchema, schema, opSymbol, cfg.TestConfig.Driver, isCDC) - } - } + VerifyParquetSync(t, testTable, cfg.DestinationDB, expectedSchema, cfg.DefaultCDCColumnsSchema, schema, opSymbol, cfg.TestConfig.Driver, isCDC) } return nil @@ -457,28 +444,32 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( operation: "", useState: false, opSymbol: "r", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "CDC - insert", operation: "insert", useState: true, opSymbol: "c", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "CDC - update", operation: "update", useState: true, opSymbol: "u", - expected: cfg.ExpectedUpdatedData, + expectedData: cfg.ExpectedUpdatedData, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { name: "CDC - delete", operation: "delete", useState: true, opSymbol: "d", - expected: nil, + expectedData: nil, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } @@ -488,28 +479,32 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( operation: "", useState: false, opSymbol: "c", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "CDC - strict - evolve-schema", operation: "evolve-schema", useState: true, opSymbol: "c", - expected: cfg.ExpectedUpdatedData, + expectedData: cfg.ExpectedUpdatedData, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { name: "CDC - strict - Avro - insert", operation: "Avro-insert", useState: false, opSymbol: "c", - expected: cfg.ExtraExpectedData["Avro-insert"], + expectedData: cfg.ExtraExpectedData["Avro-insert"], + expectedSchema: cfg.ExtraExpectedDataSchema["Avro-insert"], }, { name: "CDC - strict - Avro - evolve-schema", operation: "Avro-evolve-schema", useState: true, opSymbol: "c", - expected: cfg.ExtraExpectedData["Avro-evolve-schema"], + expectedData: cfg.ExtraExpectedData["Avro-evolve-schema"], + expectedSchema: cfg.ExtraExpectedDataSchema["Avro-evolve-schema"], }, } @@ -536,8 +531,9 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( "iceberg", tc.operation, tc.opSymbol, - tc.expected, + tc.expectedData, tc.name != "Full-Refresh", + tc.expectedSchema, ); err != nil { t.Fatalf("%s test failed: %v", tc.name, err) } @@ -572,28 +568,32 @@ func (cfg *IntegrationTest) testParquetFullLoadAndCDC( operation: "", useState: false, opSymbol: "r", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "CDC - insert", operation: "insert", useState: true, opSymbol: "c", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "CDC - update", operation: "update", useState: true, opSymbol: "u", - expected: cfg.ExpectedUpdatedData, + expectedData: cfg.ExpectedUpdatedData, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { name: "CDC - delete", operation: "delete", useState: true, opSymbol: "d", - expected: nil, + expectedData: nil, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } @@ -603,28 +603,32 @@ func (cfg *IntegrationTest) testParquetFullLoadAndCDC( operation: "", useState: false, opSymbol: "c", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "CDC - strict - evolve-schema", operation: "evolve-schema", useState: true, opSymbol: "c", - expected: cfg.ExpectedUpdatedData, + expectedData: cfg.ExpectedUpdatedData, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { name: "CDC - strict - Avro - insert", operation: "Avro-insert", useState: false, opSymbol: "c", - expected: cfg.ExtraExpectedData["Avro-insert"], + expectedData: cfg.ExtraExpectedData["Avro-insert"], + expectedSchema: cfg.ExtraExpectedDataSchema["Avro-insert"], }, { name: "CDC - strict - Avro - evolve-schema", operation: "Avro-evolve-schema", useState: true, opSymbol: "c", - expected: cfg.ExtraExpectedData["Avro-evolve-schema"], + expectedData: cfg.ExtraExpectedData["Avro-evolve-schema"], + expectedSchema: cfg.ExtraExpectedDataSchema["Avro-evolve-schema"], }, } @@ -653,8 +657,9 @@ func (cfg *IntegrationTest) testParquetFullLoadAndCDC( "parquet", tc.operation, tc.opSymbol, - tc.expected, + tc.expectedData, tc.name != "Full-Refresh", + tc.expectedSchema, ); err != nil { t.Fatalf("%s test failed: %v", tc.name, err) } @@ -700,21 +705,24 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndIncremental( operation: "", useState: false, opSymbol: "r", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "Incremental - insert", operation: "insert", useState: true, opSymbol: "u", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "Incremental - update", operation: "update", useState: true, opSymbol: "u", - expected: cfg.ExpectedUpdatedData, + expectedData: cfg.ExpectedUpdatedData, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } @@ -741,8 +749,9 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndIncremental( "iceberg", tc.operation, tc.opSymbol, - tc.expected, + tc.expectedData, false, + tc.expectedSchema, ); err != nil { t.Fatalf("Incremental test %s failed: %v", tc.name, err) } @@ -787,21 +796,24 @@ func (cfg *IntegrationTest) testParquetFullLoadAndIncremental( operation: "", useState: false, opSymbol: "r", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "Incremental - insert", operation: "insert", useState: true, opSymbol: "u", - expected: cfg.ExpectedData, + expectedData: cfg.ExpectedData, + expectedSchema: cfg.DestinationDataTypeSchema, }, { name: "Incremental - update", operation: "update", useState: true, opSymbol: "u", - expected: cfg.ExpectedUpdatedData, + expectedData: cfg.ExpectedUpdatedData, + expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } @@ -829,8 +841,9 @@ func (cfg *IntegrationTest) testParquetFullLoadAndIncremental( "parquet", tc.operation, tc.opSymbol, - tc.expected, + tc.expectedData, false, + tc.expectedSchema, ); err != nil { t.Fatalf("Incremental test %s failed: %v", tc.name, err) } From 7343fbd03f3c35172749a32df2488ed988a2c0ca Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Fri, 27 Mar 2026 11:14:05 +0530 Subject: [PATCH 6/8] chore: Lint issue resolved --- drivers/kafka/internal/kafka_test_utils.go | 48 ++--- utils/testutils/test_utils.go | 220 ++++++++++----------- 2 files changed, 134 insertions(+), 134 deletions(-) diff --git a/drivers/kafka/internal/kafka_test_utils.go b/drivers/kafka/internal/kafka_test_utils.go index 38aa49c21..a9b839328 100644 --- a/drivers/kafka/internal/kafka_test_utils.go +++ b/drivers/kafka/internal/kafka_test_utils.go @@ -383,40 +383,40 @@ var ExpectedKafkaDefaultCDCColumnsSchema = map[string]string{ } var ExpectedKafkaAvroDataSchema = map[string]string{ - "int32_value": "int", - "int64_value": "bigint", - "int_value": "bigint", - "float32_value": "float", - "float64_value": "double", - "float_value": "double", - "boolean_true": "boolean", - "boolean_false": "boolean", + "int32_value": "int", + "int64_value": "bigint", + "int_value": "bigint", + "float32_value": "float", + "float64_value": "double", + "float_value": "double", + "boolean_true": "boolean", + "boolean_false": "boolean", "timestamp_value": "timestamp", - "string_value": "string", + "string_value": "string", } - + var ExpectedKafkaAvroUpdatedDataSchema = map[string]string{ - "int32_value": "bigint", - "int64_value": "bigint", - "int_value": "bigint", - "float32_value": "float", - "float64_value": "double", - "float_value": "double", - "boolean_true": "boolean", - "boolean_false": "boolean", + "int32_value": "bigint", + "int64_value": "bigint", + "int_value": "bigint", + "float32_value": "float", + "float64_value": "double", + "float_value": "double", + "boolean_true": "boolean", + "boolean_false": "boolean", "timestamp_value": "timestamp", - "string_value": "string", - "id_int": "bigint", + "string_value": "string", + "id_int": "bigint", } var ExpectedKafkaAvroUpdatedData = map[string]interface{}{ "int32_value": int64(132), // promoted from int → long "int64_value": int64(6400000000), - "int_value": int64(101), + "int_value": int64(101), "float32_value": float32(32.5), "float64_value": float64(64.6464), - "float_value": float64(66.6666), + "float_value": float64(66.6666), "boolean_true": true, "boolean_false": false, @@ -431,11 +431,11 @@ var ExpectedKafkaAvroUpdatedData = map[string]interface{}{ var ExpectedKafkaAvroData = map[string]interface{}{ "int32_value": int32(132), "int64_value": int64(6400000000), - "int_value": int64(101), + "int_value": int64(101), "float32_value": float32(32.5), "float64_value": float64(64.6464), - "float_value": float64(66.6666), + "float_value": float64(66.6666), "boolean_true": true, "boolean_false": false, diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index 3f8e226ba..f0a5ab194 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -440,70 +440,70 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndCDC( testCases := []syncTestCase{ { - name: "Full-Refresh", - operation: "", - useState: false, - opSymbol: "r", - expectedData: cfg.ExpectedData, + name: "Full-Refresh", + operation: "", + useState: false, + opSymbol: "r", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "CDC - insert", - operation: "insert", - useState: true, - opSymbol: "c", - expectedData: cfg.ExpectedData, + name: "CDC - insert", + operation: "insert", + useState: true, + opSymbol: "c", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "CDC - update", - operation: "update", - useState: true, - opSymbol: "u", - expectedData: cfg.ExpectedUpdatedData, + name: "CDC - update", + operation: "update", + useState: true, + opSymbol: "u", + expectedData: cfg.ExpectedUpdatedData, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { - name: "CDC - delete", - operation: "delete", - useState: true, - opSymbol: "d", - expectedData: nil, + name: "CDC - delete", + operation: "delete", + useState: true, + opSymbol: "d", + expectedData: nil, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } kafkaTestCases := []syncTestCase{ { - name: "CDC - strict - insert", - operation: "", - useState: false, - opSymbol: "c", - expectedData: cfg.ExpectedData, + name: "CDC - strict - insert", + operation: "", + useState: false, + opSymbol: "c", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "CDC - strict - evolve-schema", - operation: "evolve-schema", - useState: true, - opSymbol: "c", - expectedData: cfg.ExpectedUpdatedData, + name: "CDC - strict - evolve-schema", + operation: "evolve-schema", + useState: true, + opSymbol: "c", + expectedData: cfg.ExpectedUpdatedData, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { - name: "CDC - strict - Avro - insert", - operation: "Avro-insert", - useState: false, - opSymbol: "c", - expectedData: cfg.ExtraExpectedData["Avro-insert"], + name: "CDC - strict - Avro - insert", + operation: "Avro-insert", + useState: false, + opSymbol: "c", + expectedData: cfg.ExtraExpectedData["Avro-insert"], expectedSchema: cfg.ExtraExpectedDataSchema["Avro-insert"], }, { - name: "CDC - strict - Avro - evolve-schema", - operation: "Avro-evolve-schema", - useState: true, - opSymbol: "c", - expectedData: cfg.ExtraExpectedData["Avro-evolve-schema"], + name: "CDC - strict - Avro - evolve-schema", + operation: "Avro-evolve-schema", + useState: true, + opSymbol: "c", + expectedData: cfg.ExtraExpectedData["Avro-evolve-schema"], expectedSchema: cfg.ExtraExpectedDataSchema["Avro-evolve-schema"], }, } @@ -564,70 +564,70 @@ func (cfg *IntegrationTest) testParquetFullLoadAndCDC( testCases := []syncTestCase{ { - name: "Full-Refresh", - operation: "", - useState: false, - opSymbol: "r", - expectedData: cfg.ExpectedData, + name: "Full-Refresh", + operation: "", + useState: false, + opSymbol: "r", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "CDC - insert", - operation: "insert", - useState: true, - opSymbol: "c", - expectedData: cfg.ExpectedData, + name: "CDC - insert", + operation: "insert", + useState: true, + opSymbol: "c", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "CDC - update", - operation: "update", - useState: true, - opSymbol: "u", - expectedData: cfg.ExpectedUpdatedData, + name: "CDC - update", + operation: "update", + useState: true, + opSymbol: "u", + expectedData: cfg.ExpectedUpdatedData, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { - name: "CDC - delete", - operation: "delete", - useState: true, - opSymbol: "d", - expectedData: nil, + name: "CDC - delete", + operation: "delete", + useState: true, + opSymbol: "d", + expectedData: nil, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } kafkaTestCases := []syncTestCase{ { - name: "CDC - strict - insert", - operation: "", - useState: false, - opSymbol: "c", - expectedData: cfg.ExpectedData, + name: "CDC - strict - insert", + operation: "", + useState: false, + opSymbol: "c", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "CDC - strict - evolve-schema", - operation: "evolve-schema", - useState: true, - opSymbol: "c", - expectedData: cfg.ExpectedUpdatedData, + name: "CDC - strict - evolve-schema", + operation: "evolve-schema", + useState: true, + opSymbol: "c", + expectedData: cfg.ExpectedUpdatedData, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, { - name: "CDC - strict - Avro - insert", - operation: "Avro-insert", - useState: false, - opSymbol: "c", - expectedData: cfg.ExtraExpectedData["Avro-insert"], + name: "CDC - strict - Avro - insert", + operation: "Avro-insert", + useState: false, + opSymbol: "c", + expectedData: cfg.ExtraExpectedData["Avro-insert"], expectedSchema: cfg.ExtraExpectedDataSchema["Avro-insert"], }, { - name: "CDC - strict - Avro - evolve-schema", - operation: "Avro-evolve-schema", - useState: true, - opSymbol: "c", - expectedData: cfg.ExtraExpectedData["Avro-evolve-schema"], + name: "CDC - strict - Avro - evolve-schema", + operation: "Avro-evolve-schema", + useState: true, + opSymbol: "c", + expectedData: cfg.ExtraExpectedData["Avro-evolve-schema"], expectedSchema: cfg.ExtraExpectedDataSchema["Avro-evolve-schema"], }, } @@ -701,27 +701,27 @@ func (cfg *IntegrationTest) testIcebergFullLoadAndIncremental( // Test cases for incremental sync incrementalTestCases := []syncTestCase{ { - name: "Full-Refresh", - operation: "", - useState: false, - opSymbol: "r", - expectedData: cfg.ExpectedData, + name: "Full-Refresh", + operation: "", + useState: false, + opSymbol: "r", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "Incremental - insert", - operation: "insert", - useState: true, - opSymbol: "u", - expectedData: cfg.ExpectedData, + name: "Incremental - insert", + operation: "insert", + useState: true, + opSymbol: "u", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "Incremental - update", - operation: "update", - useState: true, - opSymbol: "u", - expectedData: cfg.ExpectedUpdatedData, + name: "Incremental - update", + operation: "update", + useState: true, + opSymbol: "u", + expectedData: cfg.ExpectedUpdatedData, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } @@ -792,27 +792,27 @@ func (cfg *IntegrationTest) testParquetFullLoadAndIncremental( // Test cases for incremental sync incrementalTestCases := []syncTestCase{ { - name: "Full-Refresh", - operation: "", - useState: false, - opSymbol: "r", - expectedData: cfg.ExpectedData, + name: "Full-Refresh", + operation: "", + useState: false, + opSymbol: "r", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "Incremental - insert", - operation: "insert", - useState: true, - opSymbol: "u", - expectedData: cfg.ExpectedData, + name: "Incremental - insert", + operation: "insert", + useState: true, + opSymbol: "u", + expectedData: cfg.ExpectedData, expectedSchema: cfg.DestinationDataTypeSchema, }, { - name: "Incremental - update", - operation: "update", - useState: true, - opSymbol: "u", - expectedData: cfg.ExpectedUpdatedData, + name: "Incremental - update", + operation: "update", + useState: true, + opSymbol: "u", + expectedData: cfg.ExpectedUpdatedData, expectedSchema: cfg.UpdatedDestinationDataTypeSchema, }, } From 9a9b7ae8c7329d663f43ecb3739cd643a8bf5f2c Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Fri, 27 Mar 2026 11:35:17 +0530 Subject: [PATCH 7/8] chore: Lint issue resolved-2 --- utils/testutils/test_utils.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/utils/testutils/test_utils.go b/utils/testutils/test_utils.go index f0a5ab194..7de14ec1d 100644 --- a/utils/testutils/test_utils.go +++ b/utils/testutils/test_utils.go @@ -356,11 +356,11 @@ func DeleteParquetFiles(t *testing.T, parquetDB, tableName string) error { // syncTestCase represents a test case for sync operations type syncTestCase struct { - name string - operation string - useState bool - opSymbol string - expectedData map[string]interface{} + name string + operation string + useState bool + opSymbol string + expectedData map[string]interface{} expectedSchema map[string]string } From efffd147e5c3f2ce102370790f9213856fef1033 Mon Sep 17 00:00:00 2001 From: saksham-datazip Date: Fri, 27 Mar 2026 15:20:34 +0530 Subject: [PATCH 8/8] chore: self-reviewed --- drivers/kafka/internal/kafka_test_utils.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/drivers/kafka/internal/kafka_test_utils.go b/drivers/kafka/internal/kafka_test_utils.go index a9b839328..60900eedc 100644 --- a/drivers/kafka/internal/kafka_test_utils.go +++ b/drivers/kafka/internal/kafka_test_utils.go @@ -218,12 +218,12 @@ func waitForAnyKafkaBroker(ctx context.Context, t *testing.T, brokers []string) continue } - // 🔥 Real readiness check: metadata must be available + // Real readiness check: metadata must be available _, err = conn.ReadPartitions() if err != nil { lastErr = err t.Logf("Waiting for Kafka broker %s to initialize metadata... (%v)", b, err) - _ = conn.Close() // IMPORTANT: avoid connection leaks + _ = conn.Close() continue } @@ -231,7 +231,7 @@ func waitForAnyKafkaBroker(ctx context.Context, t *testing.T, brokers []string) t.Logf("Kafka broker %s is fully ready (metadata available)", b) - // Optional: small buffer for stability in slower environments + // small buffer for stability in slower environments time.Sleep(1 * time.Second) return b @@ -250,6 +250,8 @@ func waitForAnyKafkaBroker(ctx context.Context, t *testing.T, brokers []string) func writeMessagesWithRetry(ctx context.Context, t *testing.T, writer *kafka.Writer, msg kafka.Message) { t.Helper() + ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() var lastErr error var attempts int nextLog := time.Now() @@ -263,10 +265,10 @@ func writeMessagesWithRetry(ctx context.Context, t *testing.T, writer *kafka.Wri require.NoError(t, lastErr, "failed to write seed kafka message after %d attempts (topic=%q partition=%d)", attempts, writer.Topic, msg.Partition) return } - // Without this, a bad broker/topic state spins silently until the test timeout (e.g. 30m). + // Without this, a bad broker/topic state spins silently until the test timeout (e.g. 3m). if time.Now().After(nextLog) { t.Logf("kafka seed write retry: attempt=%d topic=%q partition=%d err=%v", attempts, writer.Topic, msg.Partition, lastErr) - nextLog = time.Now().Add(5 * time.Second) + nextLog = time.Now().Add(20 * time.Second) } time.Sleep(200 * time.Millisecond) }