Skip to content
5 changes: 4 additions & 1 deletion destination/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,11 @@ func (p *Parquet) Write(_ context.Context, records []types.RawRecord) error {

// Check validates local paths and S3 credentials if applicable.
func (p *Parquet) Check(_ context.Context) error {
uniqueSuffix := fmt.Sprintf("%d", time.Now().UnixNano())
threadID := fmt.Sprintf("test_parquet_destination_%s", uniqueSuffix)

p.options = &destination.Options{
ThreadID: "test_parquet_destination",
ThreadID: threadID,
}

// check for s3 writer configuration
Expand Down
16 changes: 9 additions & 7 deletions drivers/mongodb/internal/mon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
func TestMongodbIntegration(t *testing.T) {
t.Parallel()
testConfig := &testutils.IntegrationTest{
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
Namespace: "olake_mongodb_test",
ExpectedData: ExpectedMongoData,
ExpectedUpdateData: ExpectedUpdatedMongoData,
DataTypeSchema: MongoToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "mongodb_olake_mongodb_test",
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
Namespace: "olake_mongodb_test",
ExpectedData: ExpectedMongoData,
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
DestinationDataTypeSchema: MongoToDestinationSchema,
UpdatedDestinationDataTypeSchema: UpdatedMongoToDestinationSchema,
ExecuteQuery: ExecuteQuery,
DestinationDB: "mongodb_olake_mongodb_test",
}
testConfig.TestIntegration(t)
}
Expand Down
93 changes: 79 additions & 14 deletions drivers/mongodb/internal/mon_test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ var (
"nested_string": "nested_value",
"nested_int": 42,
}

pqNestedDoc = bson.M{
"nested_string": "pq_nested_value",
"nested_int": 789,
}
)

func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation string, fileConfig bool) {
Expand Down Expand Up @@ -104,12 +109,12 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
_, err := collection.InsertOne(ctx, doc)
require.NoError(t, err, "Failed to insert document")

case "update":
filter := bson.M{"id_int": int32(100)}
case "update-iceberg":
filter := bson.M{"id": int32(1)}
update := bson.M{
"$set": bson.M{
"id_bigint": int64(987654321098765),
"id_int": int32(200),
"id_int": int64(200),
"id_timestamp": time.Date(2024, 7, 1, 15, 30, 0, 0, time.UTC),
"id_double": float64(202.456),
"id_bool": false,
Expand All @@ -125,8 +130,34 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
_, err := collection.UpdateOne(ctx, filter, update)
require.NoError(t, err, "Failed to update document")

case "delete":
filter := bson.M{"id_int": 200}
case "update-parquet":
filter := bson.M{"id": int32(2)}
update := bson.M{
"$set": bson.M{
"id_bigint": int64(5243623234247324),
"id_int": int32(500),
"id_timestamp": time.Date(2023, 12, 19, 15, 30, 0, 0, time.UTC),
"id_double": float64(302.295),
"id_bool": false,
"created_timestamp": primitive.Timestamp{T: uint32(1754907699), I: 1},
"id_nil": nil,
"id_regex": primitive.Regex{Pattern: "updated.*", Options: "i"},
"id_nested": pqNestedDoc,
"id_minkey": primitive.MinKey{},
"id_maxkey": primitive.MaxKey{},
"name_varchar": "new updated varchar",
},
}
_, err := collection.UpdateOne(ctx, filter, update)
require.NoError(t, err, "Failed to update document")

case "delete-iceberg":
filter := bson.M{"id": 1}
_, err := collection.DeleteOne(ctx, filter)
require.NoError(t, err, "Failed to delete document")

case "delete-parquet":
filter := bson.M{"id": 2}
_, err := collection.DeleteOne(ctx, filter)
require.NoError(t, err, "Failed to delete document")

Expand All @@ -138,6 +169,12 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
}
return

case "devolve-schema":
_, err := collection.DeleteMany(ctx, bson.M{})
require.NoError(t, err, "Failed to clean collection")
// Re-insert test data after cleaning
insertTestData(t, ctx, collection)

case "bulk_cdc_data_insert":
backfillStreams := testutils.GetBackfillStreamsFromCDC(streams)
totalRows := 15000000
Expand Down Expand Up @@ -180,8 +217,10 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
}

func insertTestData(t *testing.T, ctx context.Context, collection *mongo.Collection) {
testData := []bson.M{
{
t.Helper()
for i := 1; i <= 5; i++ {
doc := bson.M{
"id": i,
"id_bigint": int64(123456789012345),
"id_int": int32(100),
"id_timestamp": time.Date(2023, 1, 1, 12, 0, 0, 0, time.UTC),
Expand All @@ -194,10 +233,8 @@ func insertTestData(t *testing.T, ctx context.Context, collection *mongo.Collect
"id_minkey": primitive.MinKey{},
"id_maxkey": primitive.MaxKey{},
"name_varchar": "varchar_val",
},
}
}

for i, doc := range testData {
_, err := collection.InsertOne(ctx, doc)
require.NoError(t, err, "Failed to insert test data row %d", i)
}
Expand All @@ -217,9 +254,9 @@ var ExpectedMongoData = map[string]interface{}{
"name_varchar": "varchar_val",
}

var ExpectedUpdatedMongoData = map[string]interface{}{
var ExpectedIcebergUpdatedData = map[string]interface{}{
"id_bigint": int64(987654321098765),
"id_int": int32(200),
"id_int": int64(200),
"id_timestamp": arrow.Timestamp(time.Date(2024, 7, 1, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
"id_double": float64(202.456),
"id_bool": false,
Expand All @@ -231,13 +268,41 @@ var ExpectedUpdatedMongoData = map[string]interface{}{
"name_varchar": "updated varchar",
}

var MongoToIcebergSchema = map[string]string{
var ExpectedParquetUpdatedData = map[string]interface{}{
"id_bigint": int64(5243623234247324),
"id_int": int32(500),
"id_timestamp": arrow.Timestamp(time.Date(2023, 12, 19, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
"id_double": float64(302.295),
"id_bool": false,
"created_timestamp": int32(1754907699),
"id_regex": `{"pattern": "updated.*", "options": "i"}`,
"id_nested": `{"nested_int":789,"nested_string":"pq_nested_value"}`,
"id_minkey": `{}`,
"id_maxkey": `{}`,
"name_varchar": "new updated varchar",
}

var MongoToDestinationSchema = map[string]string{
"id_bigint": "bigint",
"id_int": "int",
"id_timestamp": "timestamp",
"id_double": "double",
"id_bool": "boolean",
"created_timestamp": "string",
"created_timestamp": "int",
"id_regex": "string",
"id_nested": "string",
"id_minkey": "string",
"id_maxkey": "string",
"name_varchar": "string",
}

var UpdatedMongoToDestinationSchema = map[string]string{
"id_bigint": "bigint",
"id_int": "bigint",
"id_timestamp": "timestamp",
"id_double": "double",
"id_bool": "boolean",
"created_timestamp": "int",
"id_regex": "string",
"id_nested": "string",
"id_minkey": "string",
Expand Down
10 changes: 10 additions & 0 deletions drivers/mongodb/internal/testdata/parquet_destination.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"type": "PARQUET",
"writer": {
"s3_bucket": "warehouse",
"s3_region": "us-east-1",
"s3_access_key": "admin",
"s3_secret_key": "password",
"s3_endpoint": "http://host.docker.internal:9000"
}
}
2 changes: 1 addition & 1 deletion drivers/mongodb/internal/testdata/test_streams.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"selected_streams":{"olake_mongodb_test":[{"partition_regex":"","stream_name":"mongodb_test_table_olake","normalization":false},{"partition_regex":"","stream_name":"test_collection","normalization":false}]},"streams":[{"stream":{"name":"mongodb_test_table_olake","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_id":{"type":["string"],"destination_column_name":"_id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"},"created_timestamp":{"type":["integer_small"],"destination_column_name":"created_timestamp"},"id_bigint":{"type":["integer"],"destination_column_name":"id_bigint"},"id_bool":{"type":["boolean"],"destination_column_name":"id_bool"},"id_double":{"type":["number"],"destination_column_name":"id_double"},"id_int":{"type":["integer_small"],"destination_column_name":"id_int"},"id_maxkey":{"type":["unknown"],"destination_column_name":"id_maxkey"},"id_minkey":{"type":["unknown"],"destination_column_name":"id_minkey"},"id_nested":{"type":["object"],"destination_column_name":"id_nested"},"id_nil":{"type":["null"],"destination_column_name":"id_nil"},"id_regex":{"type":["unknown"],"destination_column_name":"id_regex"},"id_timestamp":{"type":["timestamp"],"destination_column_name":"id_timestamp"},"name_varchar":{"type":["string"],"destination_column_name":"name_varchar"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":["id_maxkey","id_minkey","_id","id_nested","id_nil","id_timestamp","id_bigint","created_timestamp","id_int","id_double","id_regex","name_varchar","id_bool"],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"mongodb_test_table_olake"}},{"stream":{"name":"test_collection","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"}}},"supported_sync_modes":["strict_cdc","full_refresh","incremental","cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":[],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"test_collection"}}]}
{"selected_streams":{"olake_mongodb_test":[{"partition_regex":"","stream_name":"mongodb_test_table_olake","normalization":false},{"partition_regex":"","stream_name":"test_collection","normalization":false}]},"streams":[{"stream":{"name":"mongodb_test_table_olake","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_id":{"type":["string"],"destination_column_name":"_id"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"},"created_timestamp":{"type":["integer_small"],"destination_column_name":"created_timestamp"},"id":{"type":["integer_small"],"destination_column_name":"id"},"id_bigint":{"type":["integer"],"destination_column_name":"id_bigint"},"id_bool":{"type":["boolean"],"destination_column_name":"id_bool"},"id_double":{"type":["number"],"destination_column_name":"id_double"},"id_int":{"type":["integer_small"],"destination_column_name":"id_int"},"id_maxkey":{"type":["unknown"],"destination_column_name":"id_maxkey"},"id_minkey":{"type":["unknown"],"destination_column_name":"id_minkey"},"id_nested":{"type":["object"],"destination_column_name":"id_nested"},"id_nil":{"type":["null"],"destination_column_name":"id_nil"},"id_regex":{"type":["unknown"],"destination_column_name":"id_regex"},"id_timestamp":{"type":["timestamp"],"destination_column_name":"id_timestamp"},"name_varchar":{"type":["string"],"destination_column_name":"name_varchar"}}},"supported_sync_modes":["cdc","strict_cdc","full_refresh","incremental"],"source_defined_primary_key":["_id"],"available_cursor_fields":["id_bigint","created_timestamp","id_maxkey","id_double","id_nested","id","id_nil","id_timestamp","id_int","id_bool","name_varchar","id_minkey","id_regex","_id"],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"mongodb_test_table_olake"}},{"stream":{"name":"test_collection","namespace":"olake_mongodb_test","type_schema":{"properties":{"_cdc_timestamp":{"type":["timestamp_micro","null"],"destination_column_name":"_cdc_timestamp"},"_olake_id":{"type":["string","null"],"destination_column_name":"_olake_id"},"_olake_timestamp":{"type":["null","timestamp_micro"],"destination_column_name":"_olake_timestamp"},"_op_type":{"type":["string","null"],"destination_column_name":"_op_type"}}},"supported_sync_modes":["full_refresh","incremental","cdc","strict_cdc"],"source_defined_primary_key":["_id"],"available_cursor_fields":[],"sync_mode":"cdc","destination_database":"mongodb:olake_mongodb_test","destination_table":"test_collection"}}]}
16 changes: 9 additions & 7 deletions drivers/mysql/internal/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import (
func TestMySQLIntegration(t *testing.T) {
t.Parallel()
testConfig := &testutils.IntegrationTest{
TestConfig: testutils.GetTestConfig(string(constants.MySQL)),
Namespace: "olake_mysql_test",
ExpectedData: ExpectedMySQLData,
ExpectedUpdateData: ExpectedUpdatedMySQLData,
DataTypeSchema: MySQLToIcebergSchema,
ExecuteQuery: ExecuteQuery,
IcebergDB: "mysql_olake_mysql_test",
TestConfig: testutils.GetTestConfig(string(constants.MySQL)),
Namespace: "olake_mysql_test",
ExpectedData: ExpectedMySQLData,
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
DestinationDataTypeSchema: MySQLToDestinationSchema,
UpdatedDestinationDataTypeSchema: EvolvedMySQLToDestinationSchema,
ExecuteQuery: ExecuteQuery,
DestinationDB: "mysql_olake_mysql_test",
}
testConfig.TestIntegration(t)
}
Expand Down
Loading
Loading