Skip to content

Commit 4251859

Browse files
feat: added schema evolution test cases
1 parent b335e57 commit 4251859

File tree

8 files changed

+138
-174
lines changed

8 files changed

+138
-174
lines changed

drivers/mongodb/internal/mon_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
func TestMongodbIntegration(t *testing.T) {
1111
t.Parallel()
1212
testConfig := &testutils.IntegrationTest{
13-
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
14-
Namespace: "olake_mongodb_test",
15-
ExpectedData: ExpectedMongoData,
16-
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
17-
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
18-
DestinationDataTypeSchema: MongoToDestinationSchema, // Use same schema for parquet
19-
ExecuteQuery: ExecuteQuery,
20-
DestinationDB: "mongodb_olake_mongodb_test",
13+
TestConfig: testutils.GetTestConfig(string(constants.MongoDB)),
14+
Namespace: "olake_mongodb_test",
15+
ExpectedData: ExpectedMongoData,
16+
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
17+
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
18+
DestinationDataTypeSchema: MongoToDestinationSchema,
19+
UpdatedDestinationDataTypeSchema: MongoToDestinationSchema,
20+
ExecuteQuery: ExecuteQuery,
21+
DestinationDB: "mongodb_olake_mongodb_test",
2122
}
2223
testConfig.TestIntegration(t)
2324
}

drivers/mongodb/internal/mon_test_util.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
114114
update := bson.M{
115115
"$set": bson.M{
116116
"id_bigint": int64(987654321098765),
117-
"id_int": int32(200),
117+
"id_int": int64(200),
118118
"id_timestamp": time.Date(2024, 7, 1, 15, 30, 0, 0, time.UTC),
119119
"id_double": float64(202.456),
120120
"id_bool": false,
@@ -250,7 +250,7 @@ var ExpectedMongoData = map[string]interface{}{
250250

251251
var ExpectedIcebergUpdatedData = map[string]interface{}{
252252
"id_bigint": int64(987654321098765),
253-
"id_int": int32(200),
253+
"id_int": int64(200),
254254
"id_timestamp": arrow.Timestamp(time.Date(2024, 7, 1, 15, 30, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
255255
"id_double": float64(202.456),
256256
"id_bool": false,

drivers/mysql/internal/mysql_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import (
1010
func TestMySQLIntegration(t *testing.T) {
1111
t.Parallel()
1212
testConfig := &testutils.IntegrationTest{
13-
TestConfig: testutils.GetTestConfig(string(constants.MySQL)),
14-
Namespace: "olake_mysql_test",
15-
ExpectedData: ExpectedMySQLData,
16-
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
17-
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
18-
DestinationDataTypeSchema: MySQLToDestinationSchema,
19-
ExecuteQuery: ExecuteQuery,
20-
DestinationDB: "mysql_olake_mysql_test",
13+
TestConfig: testutils.GetTestConfig(string(constants.MySQL)),
14+
Namespace: "olake_mysql_test",
15+
ExpectedData: ExpectedMySQLData,
16+
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
17+
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
18+
DestinationDataTypeSchema: MySQLToDestinationSchema,
19+
UpdatedDestinationDataTypeSchema: EvolvedMySQLToDestinationSchema,
20+
ExecuteQuery: ExecuteQuery,
21+
DestinationDB: "mysql_olake_mysql_test",
2122
}
2223
testConfig.TestIntegration(t)
2324
}

drivers/mysql/internal/mysql_test_util.go

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,12 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
183183
require.NoError(t, err, fmt.Sprintf("failed to execute %s operation", operation), err)
184184
return
185185

186+
case "evolve-schema":
187+
query = fmt.Sprintf("ALTER TABLE %s MODIFY COLUMN id_int BIGINT, MODIFY COLUMN price_float DOUBLE;", integrationTestTable)
188+
189+
case "devolve-schema":
190+
query = fmt.Sprintf("ALTER TABLE %s MODIFY COLUMN id_int INT, MODIFY COLUMN price_float FLOAT", integrationTestTable)
191+
186192
default:
187193
t.Fatalf("Unsupported operation: %s", operation)
188194
}
@@ -256,7 +262,7 @@ var ExpectedMySQLData = map[string]interface{}{
256262

257263
var ExpectedIcebergUpdatedData = map[string]interface{}{
258264
"id_bigint": int64(987654321098765),
259-
"id_int": int32(200),
265+
"id_int": int64(200),
260266
"id_int_unsigned": int32(201),
261267
"id_integer": int32(202),
262268
"id_integer_unsigned": int32(203),
@@ -269,7 +275,7 @@ var ExpectedIcebergUpdatedData = map[string]interface{}{
269275
"price_decimal": float32(543.21),
270276
"price_double": float64(654.321),
271277
"price_double_precision": float64(654.321),
272-
"price_float": float32(543.21),
278+
"price_float": float64(543.21),
273279
"price_numeric": float32(543.21),
274280
"price_real": float64(654.321),
275281
"name_char": "X",
@@ -347,3 +353,35 @@ var MySQLToDestinationSchema = map[string]string{
347353
"long_varchar": "mediumtext",
348354
"name_bool": "tinyint",
349355
}
356+
357+
var EvolvedMySQLToDestinationSchema = map[string]string{
358+
"id": "unsigned int",
359+
"id_bigint": "bigint",
360+
"id_int": "bigint",
361+
"id_int_unsigned": "unsigned int",
362+
"id_integer": "int",
363+
"id_integer_unsigned": "unsigned int",
364+
"id_mediumint": "mediumint",
365+
"id_mediumint_unsigned": "unsigned mediumint",
366+
"id_smallint": "smallint",
367+
"id_smallint_unsigned": "unsigned smallint",
368+
"id_tinyint": "tinyint",
369+
"id_tinyint_unsigned": "unsigned tinyint",
370+
"price_decimal": "decimal",
371+
"price_double": "double",
372+
"price_double_precision": "double",
373+
"price_float": "double",
374+
"price_numeric": "decimal",
375+
"price_real": "double",
376+
"name_char": "char",
377+
"name_varchar": "varchar",
378+
"name_text": "text",
379+
"name_tinytext": "tinytext",
380+
"name_mediumtext": "mediumtext",
381+
"name_longtext": "longtext",
382+
"created_date": "datetime",
383+
"created_timestamp": "timestamp",
384+
"is_active": "tinyint",
385+
"long_varchar": "mediumtext",
386+
"name_bool": "tinyint",
387+
}

drivers/postgres/internal/postgres_test.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,15 @@ import (
1111
func TestPostgresIntegration(t *testing.T) {
1212
t.Parallel()
1313
testConfig := &testutils.IntegrationTest{
14-
TestConfig: testutils.GetTestConfig(string(constants.Postgres)),
15-
Namespace: "public",
16-
ExpectedData: ExpectedPostgresData,
17-
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
18-
ExpectedParquetUpdateData: ExpectedParquetUpdateData,
19-
DestinationDataTypeSchema: PostgresToDestinationSchema,
20-
ExecuteQuery: ExecuteQuery,
21-
DestinationDB: "postgres_postgres_public",
14+
TestConfig: testutils.GetTestConfig(string(constants.Postgres)),
15+
Namespace: "public",
16+
ExpectedData: ExpectedPostgresData,
17+
ExpectedIcebergUpdateData: ExpectedIcebergUpdatedData,
18+
ExpectedParquetUpdateData: ExpectedParquetUpdatedData,
19+
DestinationDataTypeSchema: PostgresToDestinationSchema,
20+
UpdatedDestinationDataTypeSchema: UpdatedPostgresToDestinationSchema,
21+
ExecuteQuery: ExecuteQuery,
22+
DestinationDB: "postgres_postgres_public",
2223
}
2324
testConfig.TestIntegration(t)
2425
}

drivers/postgres/internal/postgres_test_util.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@ func ExecuteQuery(ctx context.Context, t *testing.T, streams []string, operation
197197
require.NoError(t, err, fmt.Sprintf("failed to execute %s operation", operation), err)
198198
return
199199

200+
case "evolve-schema":
201+
query = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN col_int TYPE BIGINT, ALTER COLUMN col_float4 TYPE FLOAT`, integrationTestTable)
202+
203+
case "devolve-schema":
204+
query = fmt.Sprintf(`ALTER TABLE %s ALTER COLUMN col_int TYPE INT, ALTER COLUMN col_float4 TYPE REAL`, integrationTestTable)
205+
200206
default:
201207
t.Fatalf("Unsupported operation: %s", operation)
202208
}
@@ -270,8 +276,8 @@ var ExpectedIcebergUpdatedData = map[string]interface{}{
270276
"col_date": arrow.Timestamp(time.Date(2024, 7, 1, 0, 0, 0, 0, time.UTC).UnixNano() / int64(time.Microsecond)),
271277
"col_decimal": float64(543.21),
272278
"col_double_precision": 987.654321,
273-
"col_float4": float32(543.21),
274-
"col_int": int32(321),
279+
"col_float4": float64(543.21),
280+
"col_int": int64(321),
275281
"col_int2": int32(321),
276282
"col_integer": int32(54321),
277283
"col_interval": "02:00:00",
@@ -288,7 +294,7 @@ var ExpectedIcebergUpdatedData = map[string]interface{}{
288294
"col_xml": "<updated>value</updated>",
289295
}
290296

291-
var ExpectedParquetUpdateData = map[string]interface{}{
297+
var ExpectedParquetUpdatedData = map[string]interface{}{
292298
"col_bigint": int64(987654321098765),
293299
"col_bool": true,
294300
"col_char": "p",
@@ -342,3 +348,31 @@ var PostgresToDestinationSchema = map[string]string{
342348
"col_varbit": "varbit",
343349
"col_xml": "xml",
344350
}
351+
352+
var UpdatedPostgresToDestinationSchema = map[string]string{
353+
"col_bigint": "bigint",
354+
"col_bigserial": "bigserial",
355+
"col_bool": "boolean",
356+
"col_char": "char",
357+
"col_character": "character",
358+
"col_character_varying": "varchar",
359+
"col_date": "date",
360+
"col_decimal": "double",
361+
"col_double_precision": "double precision",
362+
"col_float4": "double",
363+
"col_int": "bigint",
364+
"col_int2": "smallint",
365+
"col_integer": "integer",
366+
"col_interval": "interval",
367+
"col_json": "json",
368+
"col_jsonb": "jsonb",
369+
"col_name": "name",
370+
"col_numeric": "double",
371+
"col_real": "real",
372+
"col_text": "text",
373+
"col_timestamp": "timestamp",
374+
"col_timestamptz": "timestamptz",
375+
"col_uuid": "uuid",
376+
"col_varbit": "varbit",
377+
"col_xml": "xml",
378+
}

utils/testutils/test_schema.go

Lines changed: 0 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -132,137 +132,3 @@ var GlobalTypeMapping = map[string]string{
132132
"bit(n)": "string",
133133
"varying(n)": "string",
134134
}
135-
136-
// ParquetTypeMapping maps database types to Parquet types
137-
// Parquet has a simpler type system than Iceberg
138-
var ParquetTypeMapping = map[string]string{
139-
// Integer Types
140-
"tinyint": "int",
141-
"smallint": "int",
142-
"mediumint": "int",
143-
"int": "int",
144-
"integer": "int",
145-
"unsigned int": "int",
146-
"unsigned smallint": "int",
147-
"unsigned tinyint": "int",
148-
"unsigned mediumint": "int",
149-
"int2": "int",
150-
"int4": "int",
151-
"smallserial": "int",
152-
"serial": "int",
153-
"serial2": "int",
154-
"serial4": "int",
155-
156-
"bigint": "bigint",
157-
"int8": "bigint",
158-
"serial8": "bigint",
159-
"bigserial": "bigint",
160-
"year": "bigint",
161-
162-
// Floating Point Types
163-
"float": "float",
164-
"real": "float",
165-
"decimal": "float",
166-
"numeric": "float",
167-
"float4": "float",
168-
"money": "float",
169-
170-
"double": "double",
171-
"float8": "double",
172-
"double precision": "double",
173-
174-
// Boolean Types
175-
"bool": "boolean",
176-
"boolean": "boolean",
177-
178-
// String Types
179-
"char": "string",
180-
"varchar": "string",
181-
"tinytext": "string",
182-
"text": "string",
183-
"mediumtext": "string",
184-
"longtext": "string",
185-
"character": "string",
186-
"character varying": "string",
187-
"longvarchar": "string",
188-
"bpchar": "string",
189-
"name": "string",
190-
191-
// Binary Types
192-
"binary": "string",
193-
"varbinary": "string",
194-
"tinyblob": "string",
195-
"blob": "string",
196-
"mediumblob": "string",
197-
"longblob": "string",
198-
"bytea": "string",
199-
200-
// JSON and Document Types
201-
"json": "string",
202-
"jsonb": "string",
203-
"xml": "string",
204-
"hstore": "string",
205-
206-
// Network Types
207-
"cidr": "string",
208-
"inet": "string",
209-
"macaddr": "string",
210-
"macaddr8": "string",
211-
212-
// Spatial Types
213-
"geometry": "string",
214-
"point": "string",
215-
"linestring": "string",
216-
"polygon": "string",
217-
"multipoint": "string",
218-
"multilinestring": "string",
219-
"multipolygon": "string",
220-
"geometrycollection": "string",
221-
"circle": "string",
222-
"path": "string",
223-
"box": "string",
224-
"line": "string",
225-
"lseg": "string",
226-
227-
// Full Text Search Types
228-
"tsvector": "string",
229-
"tsquery": "string",
230-
231-
// UUID
232-
"uuid": "string",
233-
234-
// Range Types
235-
"tsrange": "string",
236-
"tstzrange": "string",
237-
"int4range": "string",
238-
"numrange": "string",
239-
"daterange": "string",
240-
241-
// Array
242-
"array": "string",
243-
"ARRAY": "string",
244-
"int2vector": "string",
245-
246-
// Enum and Set
247-
"enum": "string",
248-
"set": "string",
249-
250-
// Date/Time
251-
"date": "timestamp",
252-
"timestamp": "timestamp",
253-
"datetime": "timestamp",
254-
"timestamptz": "timestamp",
255-
"timestamp with time zone": "timestamp",
256-
"timestamp without time zone": "timestamp",
257-
258-
"time": "string",
259-
"timez": "string",
260-
"interval": "string",
261-
262-
// Misc
263-
"pg_lsn": "string",
264-
"bit varying": "string",
265-
"varbit": "string",
266-
"bit(n)": "string",
267-
"varying(n)": "string",
268-
}

0 commit comments

Comments
 (0)