Skip to content
15 changes: 8 additions & 7 deletions drivers/mongodb/internal/mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
func TestMongodbIntegration(t *testing.T) {
t.Parallel()
testConfig := &testutils.IntegrationTest{
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
Namespace: "olake_mongodb_test",
ExpectedData: ExpectedMongoData,
ExpectedUpdateData: ExpectedUpdatedMongoData,
DataTypeSchema: MongoToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "mongodb_olake_mongodb_test",
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
Namespace: "olake_mongodb_test",
ExpectedData: ExpectedMongoData,
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
DestinationDataTypeSchema: MongoToDestinationSchema, // Use same schema for parquet
ExecuteQuery: ExecuteQuery,
DestinationDB: "mongodb_olake_mongodb_test",
}
testConfig.TestIntegration(t)
}
Expand Down
67 changes: 56 additions & 11 deletions drivers/mongodb/internal/mon_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var (
"nested_string": "nested_value",
"nested_int": 42,
}

pqNestedDoc = bson.M{
"nested_string": "pq_nested_value",
"nested_int": 789,
}
)

func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) {
Expand Down Expand Up @@ -104,8 +109,8 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
_, err := collection.InsertOne(ctx, doc)
require.NoError(t, err, "Failed to insert document")

case "update":
filter := bson.M{"id_int": int32(100)}
case "update-iceberg":
filter := bson.M{"id": int32(1)}
update := bson.M{
"$set": bson.M{
"id_bigint": int64(987654321098765),
Expand All @@ -125,8 +130,34 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
_, err := collection.UpdateOne(ctx, filter, update)
require.NoError(t, err, "Failed to update document")

case "delete":
filter := bson.M{"id_int": 200}
case "update-parquet":
filter := bson.M{"id": int32(2)}
update := bson.M{
"$set": bson.M{
"id_bigint": int64(5243623234247324),
"id_int": int32(500),
"id_timestamp": time.Date(2023, 12, 19, 15, 30, 0, 0, time.UTC),
"id_double": float64(302.295),
"id_bool": false,
"created_timestamp": primitive.Timestamp{T: uint32(1754907699), I: 1},
"id_nil": nil,
"id_regex": primitive.Regex{Pattern: "updated.*", Options: "i"},
"id_nested": pqNestedDoc,
"id_minkey": primitive.MinKey{},
"id_maxkey": primitive.MaxKey{},
"name_varchar": "new updated varchar",
},
}
_, err := collection.UpdateOne(ctx, filter, update)
require.NoError(t, err, "Failed to update document")

case "delete-iceberg":
filter := bson.M{"id": 1}
_, err := collection.DeleteOne(ctx, filter)
require.NoError(t, err, "Failed to delete document")

case "delete-parquet":
filter := bson.M{"id": 2}
_, err := collection.DeleteOne(ctx, filter)
require.NoError(t, err, "Failed to delete document")

Expand Down Expand Up @@ -180,8 +211,10 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
}

func insertTestData(t *testing.T, ctx context.Context, collection *mongo.Collection) {
testData := []bson.M{
{
t.Helper()
for i := 1; i <= 5; i++ {
doc := bson.M{
"id": i,
"id_bigint": int64(123456789012345),
"id_int": int32(100),
"id_timestamp": time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC),
Expand All @@ -194,10 +227,8 @@ func insertTestData(t *testing.T, ctx context.Context, collection *mongo.Collect
"id_minkey": primitive.MinKey{},
"id_maxkey": primitive.MaxKey{},
"name_varchar": "varchar_val",
},
}
}

for i, doc := range testData {
_, err := collection.InsertOne(ctx, doc)
require.NoError(t, err, "Failed to insert test data row %d", i)
}
Expand All @@ -217,7 +248,7 @@ var ExpectedMongoData = map[string]interface{}{
"name_varchar": "varchar_val",
}

var ExpectedUpdatedMongoData = map[string]interface{}{
var ExpectedIcebergUpdatedData = map[string]interface{}{
"id_bigint": int64(987654321098765),
"id_int": int32(200),
"id_timestamp": arrow.Timestamp(time.Date(2024, 7, 1, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
Expand All @@ -231,7 +262,21 @@ var ExpectedUpdatedMongoData = map[string]interface{}{
"name_varchar": "updated varchar",
}

var MongoToIcebergSchema = map[string]string{
var ExpectedParquetUpdatedData = map[string]interface{}{
"id_bigint": int64(5243623234247324),
"id_int": int32(500),
"id_timestamp": arrow.Timestamp(time.Date(2023, 12, 19, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
"id_double": float64(302.295),
"id_bool": false,
"created_timestamp": int32(1754907699),
"id_regex": `{"pattern": "updated.*", "options": "i"}`,
"id_nested": `{"nested_int":789,"nested_string":"pq_nested_value"}`,
"id_minkey": `{}`,
"id_maxkey": `{}`,
"name_varchar": "new updated varchar",
}

var MongoToDestinationSchema = map[string]string{
"id_bigint": "bigint",
"id_int": "int",
"id_timestamp": "timestamp",
Expand Down
10 changes: 10 additions & 0 deletions drivers/mongodb/internal/testdata/parquet_destination.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "PARQUET",
"writer": {
"s3_bucket": "warehouse",
"s3_region": "us-east-1",
"s3_access_key": "admin",
"s3_secret_key": "password",
"s3_endpoint": "http://host.docker.internal:9000"
}
}
2 changes: 1 addition & 1 deletion drivers/mongodb/internal/testdata/test_streams.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"selected_streams":{"olake_mongodb_test":[{"partition_regex":"","stream_name":"mongodb_test_table_olake","normalization":false},{"partition_regex":"","stream_name":"test_collection","normalization":false}]},"streams":[{"stream":{"name":"mongodb_test_table_olake","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_id":{"type":["string"],"destination_column_name":"_id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"},"created_timestamp":{"type":["integer_small"],"destination_column_name":"created_timestamp"},"id_bigint":{"type":["integer"],"destination_column_name":"id_bigint"},"id_bool":{"type":["boolean"],"destination_column_name":"id_bool"},"id_double":{"type":["number"],"destination_column_name":"id_double"},"id_int":{"type":["integer_small"],"destination_column_name":"id_int"},"id_maxkey":{"type":["unknown"],"destination_column_name":"id_maxkey"},"id_minkey":{"type":["unknown"],"destination_column_name":"id_minkey"},"id_nested":{"type":["object"],"destination_column_name":"id_nested"},"id_nil":{"type":["null"],"destination_column_name":"id_nil"},"id_regex":{"type":["unknown"],"destination_column_name":"id_regex"},"id_timestamp":{"type":["timestamp"],"destination_column_name":"id_timestamp"},"name_varchar":{"type":["string"],"destination_column_name":"name_varchar"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":["id_maxkey","id_minkey","_id","id_nested","id_nil","id_timestamp","id_bigint","created_timestamp","id_int","id_double","id_regex","name_varchar","id_bool"],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"mongodb_test_table_olake"}},{"stream":{"name":"test_collection","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":[],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"test_collection"}}]}
{"selected_streams":{"olake_mongodb_test":[{"partition_regex":"","stream_name":"mongodb_test_table_olake","normalization":false},{"partition_regex":"","stream_name":"test_collection","normalization":false}]},"streams":[{"stream":{"name":"mongodb_test_table_olake","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_id":{"type":["string"],"destination_column_name":"_id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"},"created_timestamp":{"type":["integer_small"],"destination_column_name":"created_timestamp"},"id":{"type":["integer_small"],"destination_column_name":"id"},"id_bigint":{"type":["integer"],"destination_column_name":"id_bigint"},"id_bool":{"type":["boolean"],"destination_column_name":"id_bool"},"id_double":{"type":["number"],"destination_column_name":"id_double"},"id_int":{"type":["integer_small"],"destination_column_name":"id_int"},"id_maxkey":{"type":["unknown"],"destination_column_name":"id_maxkey"},"id_minkey":{"type":["unknown"],"destination_column_name":"id_minkey"},"id_nested":{"type":["object"],"destination_column_name":"id_nested"},"id_nil":{"type":["null"],"destination_column_name":"id_nil"},"id_regex":{"type":["unknown"],"destination_column_name":"id_regex"},"id_timestamp":{"type":["timestamp"],"destination_column_name":"id_timestamp"},"name_varchar":{"type":["string"],"destination_column_name":"name_varchar"}}},"supported_sync_modes":["cdc","strict_cdc","full_refresh","incremental"],"source_defined_primary_key":["_id"],"available_cursor_fields":["id_bigint","created_timestamp","id_maxkey","id_double","id_nested","id","id_nil","id_timestamp","id_int","id_bool","name_varchar","id_minkey","id_regex","_id"],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"mongodb_test_table_olake"}},{"stream":{"name":"test_collection","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["null","timestamp_micro"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"}}},"supported_sync_modes":["full_refresh","incremental","cdc","strict_cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":[],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"test_collection"}}]}
15 changes: 8 additions & 7 deletions drivers/mysql/internal/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
func TestMySQLIntegration(t *testing.T) {
t.Parallel()
testConfig := &testutils.IntegrationTest{
TestConfig: testutils.GetTestConfig(string(constants.MySQL)),
Namespace: "olake_mysql_test",
ExpectedData: ExpectedMySQLData,
ExpectedUpdateData: ExpectedUpdatedMySQLData,
DataTypeSchema: MySQLToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "mysql_olake_mysql_test",
TestConfig: testutils.GetTestConfig(string(constants.MySQL)),
Namespace: "olake_mysql_test",
ExpectedData: ExpectedMySQLData,
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
DestinationDataTypeSchema: MySQLToDestinationSchema,
ExecuteQuery: ExecuteQuery,
DestinationDB: "mysql_olake_mysql_test",
}
testConfig.TestIntegration(t)
}
Expand Down
66 changes: 60 additions & 6 deletions drivers/mysql/internal/mysql_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
'long_varchar_val', 1
)`, integrationTestTable)

case "update":
case "update-iceberg":
query = fmt.Sprintf(`
UPDATE %s SET
id_bigint = 987654321098765,
Expand All @@ -130,10 +130,33 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
created_date = '2024-07-01 15:30:00',
created_timestamp = '2024-07-01 15:30:00', is_active = 0,
long_varchar = 'updated long...', name_bool = 0
WHERE id = 1`, integrationTestTable)
WHERE id = 6`, integrationTestTable)

case "delete":
query = fmt.Sprintf("DELETE FROM %s WHERE id = 1", integrationTestTable)
case "update-parquet":
query = fmt.Sprintf(`
UPDATE %s SET
id_bigint = 435225634345223,
id_int = 456, id_int_unsigned = 123,
id_integer = 789, id_integer_unsigned = 321,
id_mediumint = 1001, id_mediumint_unsigned = 1002,
id_smallint = 201, id_smallint_unsigned = 753,
id_tinyint = 32, id_tinyint_unsigned = 61,
price_decimal = 453.21, price_double = 123.456,
price_double_precision = 123.456, price_float = 543.21,
price_numeric = 543.21, price_real = 123.456,
name_char = 'X', name_varchar = 'updated varchar',
name_text = 'updated text', name_tinytext = 'upd tiny',
name_mediumtext = 'upd medium', name_longtext = 'upd long',
created_date = '2026-07-01 15:30:00',
created_timestamp = '2028-07-01 15:30:00', is_active = 0,
long_varchar = 'updated long...', name_bool = 0
WHERE id = 2`, integrationTestTable)

case "delete-iceberg":
query = fmt.Sprintf("DELETE FROM %s WHERE id = 6", integrationTestTable)

case "delete-parquet":
query = fmt.Sprintf("DELETE FROM %s WHERE id = 2", integrationTestTable)

case "setup_cdc":
backfillStreams := testutils.GetBackfillStreamsFromCDC(streams)
Expand Down Expand Up @@ -231,7 +254,7 @@ var ExpectedMySQLData = map[string]interface{}{
"name_bool": int32(1),
}

var ExpectedUpdatedMySQLData = map[string]interface{}{
var ExpectedIcebergUpdatedData = map[string]interface{}{
"id_bigint": int64(987654321098765),
"id_int": int32(200),
"id_int_unsigned": int32(201),
Expand Down Expand Up @@ -262,7 +285,38 @@ var ExpectedUpdatedMySQLData = map[string]interface{}{
"name_bool": int32(0),
}

var MySQLToIcebergSchema = map[string]string{
var ExpectedParquetUpdatedData = map[string]interface{}{
"id_bigint": int64(435225634345223),
"id_int": int32(456),
"id_int_unsigned": int32(123),
"id_integer": int32(789),
"id_integer_unsigned": int32(321),
"id_mediumint": int32(1001),
"id_mediumint_unsigned": int32(1002),
"id_smallint": int32(201),
"id_smallint_unsigned": int32(753),
"id_tinyint": int32(32),
"id_tinyint_unsigned": int32(61),
"price_decimal": float32(453.21),
"price_double": float64(123.456),
"price_double_precision": float64(123.456),
"price_float": float32(543.21),
"price_numeric": float32(543.21),
"price_real": float64(123.456),
"name_char": "X",
"name_varchar": "updated varchar",
"name_text": "updated text",
"name_tinytext": "upd tiny",
"name_mediumtext": "upd medium",
"name_longtext": "upd long",
"created_date": arrow.Timestamp(time.Date(2026, 7, 1, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
"created_timestamp": arrow.Timestamp(time.Date(2028, 7, 1, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
"is_active": int32(0),
"long_varchar": "updated long...",
"name_bool": int32(0),
}

var MySQLToDestinationSchema = map[string]string{
"id": "unsigned int",
"id_bigint": "bigint",
"id_int": "int",
Expand Down
10 changes: 10 additions & 0 deletions drivers/mysql/internal/testdata/parquet_destination.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "PARQUET",
"writer": {
"s3_bucket": "warehouse",
"s3_region": "us-east-1",
"s3_access_key": "admin",
"s3_secret_key": "password",
"s3_endpoint": "http://host.docker.internal:9000"
}
}
15 changes: 8 additions & 7 deletions drivers/postgres/internal/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ import (
func TestPostgresIntegration(t *testing.T) {
t.Parallel()
testConfig := &testutils.IntegrationTest{
TestConfig: testutils.GetTestConfig(string(constants.Postgres)),
Namespace: "public",
ExpectedData: ExpectedPostgresData,
ExpectedUpdateData: ExpectedUpdatedPostgresData,
DataTypeSchema: PostgresToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "postgres_postgres_public",
TestConfig: testutils.GetTestConfig(string(constants.Postgres)),
Namespace: "public",
ExpectedData: ExpectedPostgresData,
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
ExpectedParquetUpdateData: ExpectedParquetUpdateData,
DestinationDataTypeSchema: PostgresToDestinationSchema,
ExecuteQuery: ExecuteQuery,
DestinationDB: "postgres_postgres_public",
}
testConfig.TestIntegration(t)
}
Expand Down
Loading
Loading