Skip to content

Commit 9ae1585

Browse files
chenjian2664ebyhr
authored andcommitted
Fix failure reading checkpoint created by CREATE OR REPLACE with different schema
1 parent 60cc199 commit 9ae1585

File tree

33 files changed

+341
-13
lines changed

33 files changed

+341
-13
lines changed

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java

Lines changed: 49 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,23 +1361,28 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
13611361
protocolEntry = protocolEntryForTable(DEFAULT_READER_VERSION, DEFAULT_WRITER_VERSION, containsTimestampType, tableMetadata.getProperties());
13621362
}
13631363

1364+
MetadataEntry metadataEntry = MetadataEntry.builder()
1365+
.setDescription(tableMetadata.getComment())
1366+
.setSchemaString(serializeSchemaAsJson(deltaTable.build()))
1367+
.setPartitionColumns(getPartitionedBy(tableMetadata.getProperties()))
1368+
.setConfiguration(configurationForNewTable(checkpointInterval, changeDataFeedEnabled, deletionVectorsEnabled, columnMappingMode, maxFieldId))
1369+
.build();
13641370
appendTableEntries(
13651371
commitVersion,
13661372
transactionLogWriter,
13671373
saveMode == SaveMode.REPLACE ? CREATE_OR_REPLACE_TABLE_OPERATION : CREATE_TABLE_OPERATION,
13681374
session,
13691375
protocolEntry,
1370-
MetadataEntry.builder()
1371-
.setDescription(tableMetadata.getComment())
1372-
.setSchemaString(serializeSchemaAsJson(deltaTable.build()))
1373-
.setPartitionColumns(getPartitionedBy(tableMetadata.getProperties()))
1374-
.setConfiguration(configurationForNewTable(checkpointInterval, changeDataFeedEnabled, deletionVectorsEnabled, columnMappingMode, maxFieldId))
1375-
.build());
1376+
metadataEntry);
13761377

13771378
transactionLogWriter.flush();
13781379

13791380
if (replaceExistingTable) {
1380-
writeCheckpointIfNeeded(session, schemaTableName, location, tableHandle.toCredentialsHandle(), tableHandle.getReadVersion(), checkpointInterval, commitVersion);
1381+
List<DeltaLakeColumnHandle> existingColumns = getColumns(tableHandle.getMetadataEntry(), tableHandle.getProtocolEntry());
1382+
List<DeltaLakeColumnHandle> newColumns = getColumns(metadataEntry, protocolEntry);
1383+
if (isNewCheckpointFileRequired(existingColumns, newColumns)) {
1384+
writeCheckpoint(session, schemaTableName, location, tableHandle.toCredentialsHandle(), commitVersion);
1385+
}
13811386
}
13821387
}
13831388
}
@@ -1536,9 +1541,11 @@ public DeltaLakeOutputTableHandle beginCreateTable(
15361541
OptionalLong readVersion = OptionalLong.empty();
15371542
ProtocolEntry protocolEntry;
15381543

1544+
boolean isNewCheckpointFileRequired = false;
15391545
if (replaceExistingTable) {
15401546
protocolEntry = protocolEntryForTable(handle.getProtocolEntry().minReaderVersion(), handle.getProtocolEntry().minWriterVersion(), containsTimestampType, tableMetadata.getProperties());
15411547
readVersion = OptionalLong.of(handle.getReadVersion());
1548+
isNewCheckpointFileRequired = isNewCheckpointFileRequired(getColumns(handle.getMetadataEntry(), handle.getProtocolEntry()), columnHandles.build());
15421549
}
15431550
else {
15441551
TrinoFileSystem fileSystem = fileSystemFactory.create(session, location);
@@ -1561,6 +1568,7 @@ public DeltaLakeOutputTableHandle beginCreateTable(
15611568
columnMappingMode,
15621569
maxFieldId,
15631570
replace,
1571+
isNewCheckpointFileRequired,
15641572
readVersion,
15651573
protocolEntry);
15661574
}
@@ -1749,8 +1757,8 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
17491757
transactionLogWriter.flush();
17501758
writeCommitted = true;
17511759

1752-
if (handle.replace() && handle.readVersion().isPresent()) {
1753-
writeCheckpointIfNeeded(session, schemaTableName, handle.location(), handle.toCredentialsHandle(), handle.readVersion().getAsLong(), handle.checkpointInterval(), commitVersion);
1760+
if (handle.replace() && handle.readVersion().isPresent() && handle.isSchemaChanged()) {
1761+
writeCheckpoint(session, schemaTableName, location, handle.toCredentialsHandle(), commitVersion);
17541762
}
17551763

17561764
if (isCollectExtendedStatisticsColumnStatisticsOnWrite(session) && !computedStatistics.isEmpty()) {
@@ -3189,9 +3197,7 @@ private void writeCheckpointIfNeeded(
31893197
// This does not pose correctness issue but may be confusing if someone looks into transaction log.
31903198
// To fix that we should allow for getting snapshot for given version.
31913199

3192-
TransactionLogReader transactionLogReader = new FileSystemTransactionLogReader(tableLocation, credentialsHandle, fileSystemFactory);
3193-
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, transactionLogReader, table, tableLocation, Optional.of(newVersion), credentialsHandle);
3194-
checkpointWriterManager.writeCheckpoint(session, snapshot, credentialsHandle);
3200+
writeCheckpoint(session, table, tableLocation, credentialsHandle, newVersion);
31953201
}
31963202
catch (Exception e) {
31973203
// We can't fail here as transaction was already committed, in case of INSERT this could result
@@ -3200,6 +3206,37 @@ private void writeCheckpointIfNeeded(
32003206
}
32013207
}
32023208

3209+
private void writeCheckpoint(ConnectorSession session, SchemaTableName table, String tableLocation, VendedCredentialsHandle credentialsHandle, long newVersion)
3210+
throws IOException
3211+
{
3212+
TransactionLogReader transactionLogReader = new FileSystemTransactionLogReader(tableLocation, credentialsHandle, fileSystemFactory);
3213+
TableSnapshot snapshot = transactionLogAccess.loadSnapshot(session, transactionLogReader, table, tableLocation, Optional.of(newVersion), credentialsHandle);
3214+
checkpointWriterManager.writeCheckpoint(session, snapshot, credentialsHandle);
3215+
}
3216+
3217+
private static boolean isNewCheckpointFileRequired(List<DeltaLakeColumnHandle> existingColumns, List<DeltaLakeColumnHandle> newColumns)
3218+
{
3219+
Map<String, Type> newColumnHandles = newColumns.stream()
3220+
.filter(column -> !isMetadataColumnHandle(column))
3221+
.collect(toImmutableMap(DeltaLakeColumnHandle::columnName, DeltaLakeColumnHandle::type));
3222+
3223+
for (DeltaLakeColumnHandle existingColumn : existingColumns) {
3224+
if (isMetadataColumnHandle(existingColumn)) {
3225+
continue;
3226+
}
3227+
3228+
if (!newColumnHandles.containsKey(existingColumn.columnName())) {
3229+
// remove/rename column requires creating a new checkpoint file
3230+
return true;
3231+
}
3232+
Type newType = newColumnHandles.get(existingColumn.columnName());
3233+
if (!existingColumn.type().equals(newType)) {
3234+
return true;
3235+
}
3236+
}
3237+
return false;
3238+
}
3239+
32033240
private void cleanupFailedWrite(ConnectorSession session, VendedCredentialsHandle credentialsHandle, List<DataFileInfo> dataFiles)
32043241
{
32053242
Location location = Location.of(credentialsHandle.tableLocation());

plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeOutputTableHandle.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ public record DeltaLakeOutputTableHandle(
4242
ColumnMappingMode columnMappingMode,
4343
OptionalInt maxColumnId,
4444
boolean replace,
45+
boolean isSchemaChanged,
4546
OptionalLong readVersion,
4647
ProtocolEntry protocolEntry)
4748
implements ConnectorOutputTableHandle

plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,154 @@ private void testAddNestedColumnWithColumnMappingMode(String columnMappingMode)
337337
.containsPattern("(delta\\.columnMapping\\.physicalName.*?){11}");
338338
}
339339

340+
/**
341+
* @see databricks164.test_write_checkpoint_on_schema_change
342+
*/
343+
@Test
344+
void testDatabricksWriteCheckpointOnSchemaChange()
345+
throws Exception
346+
{
347+
String tableName = "test_dbx_write_checkpoint_on_schema_change" + randomNameSuffix();
348+
Path tableLocation = catalogDir.resolve(tableName);
349+
copyDirectoryContents(new File(Resources.getResource("databricks164/test_write_checkpoint_on_schema_change").toURI()).toPath(), tableLocation);
350+
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));
351+
352+
Path tableLocationPath = Path.of(getTableLocation(tableName).replace("file://", ""));
353+
354+
// verify checkpoint at version 3 not exists
355+
// alter table to add a new column not treated as schema change, thus no new checkpoint should be created
356+
assertThat(Files.exists(tableLocationPath.resolve("_delta_log/00000000000000000003.checkpoint.parquet"))).isFalse();
357+
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 4"))
358+
.matches("VALUES (4, varchar 'version4')");
359+
360+
// verify checkpoint at version 5 exists
361+
// alter table to drop a column treated as schema change, thus new checkpoint should be created
362+
assertThat(Files.exists(tableLocationPath.resolve("_delta_log/00000000000000000005.checkpoint.parquet"))).isTrue();
363+
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 6"))
364+
.matches("VALUES varchar 'version6'");
365+
366+
// verify checkpoint at version 7 exists
367+
// alter table to rename a column treated as schema change, thus new checkpoint should be created
368+
assertThat(Files.exists(tableLocationPath.resolve("_delta_log/00000000000000000007.checkpoint.parquet"))).isTrue();
369+
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 8"))
370+
.matches("VALUES varchar 'version8'");
371+
372+
// verify checkpoint at version 9 exists
373+
// alter table to change column type treated as schema change, thus new checkpoint should be created
374+
assertThat(Files.exists(tableLocationPath.resolve("_delta_log/00000000000000000009.checkpoint.parquet"))).isTrue();
375+
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 10"))
376+
.matches("VALUES 10");
377+
378+
// verify checkpoint at version 11 exists
379+
// alter table to change column type back treated as schema change, thus new checkpoint should be created
380+
assertThat(Files.exists(tableLocationPath.resolve("_delta_log/00000000000000000011.checkpoint.parquet"))).isTrue();
381+
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 12"))
382+
.matches("VALUES varchar 'version12'");
383+
}
384+
385+
@Test
386+
void testWriteCheckpointOnSchemaChange()
387+
{
388+
try (TestTable table = newTrinoTable("test_write_checkpoint_on_schema_change", "(x int) WITH (checkpoint_interval = 2)")) {
389+
Path tableLocation = Path.of(getTableLocation(table.getName()).replace("file://", ""));
390+
391+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1);
392+
// generate checkpoint at version 2
393+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 2", 1);
394+
assertThat(query("TABLE " + table.getName()))
395+
.matches("VALUES 1, 2");
396+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000002.checkpoint.parquet"))).isTrue();
397+
398+
// alter table to add a new column at version 3
399+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (x int, y varchar) WITH (checkpoint_interval = 20)");
400+
// alter table to add a new column not treated as schema change, thus no new checkpoint should be created
401+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000003.checkpoint.parquet"))).isFalse();
402+
assertUpdate("INSERT INTO " + table.getName() + " VALUES (4, 'version4')", 1);
403+
assertThat(query("TABLE " + table.getName()))
404+
.matches("VALUES (4, varchar 'version4')");
405+
406+
// alter table to drop a column at version 5
407+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (y varchar) WITH (checkpoint_interval = 20)");
408+
// alter table to drop a column treated as schema change, thus new checkpoint should be created
409+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000005.checkpoint.parquet"))).isTrue();
410+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 'version6'", 1);
411+
assertThat(query("TABLE " + table.getName()))
412+
.matches("VALUES varchar 'version6'");
413+
414+
// alter table to rename a column at version 7
415+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (z varchar) WITH (checkpoint_interval = 20)");
416+
// alter table to rename a column treated as schema change, thus new checkpoint should be created
417+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000007.checkpoint.parquet"))).isTrue();
418+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 'version8'", 1);
419+
assertThat(query("TABLE " + table.getName()))
420+
.matches("VALUES varchar 'version8'");
421+
422+
// alter table to change column type at version 9
423+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (z int) WITH (checkpoint_interval = 20)");
424+
// alter table to change column type treated as schema change, thus new checkpoint should be created
425+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000009.checkpoint.parquet"))).isTrue();
426+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 10", 1);
427+
assertThat(query("TABLE " + table.getName()))
428+
.matches("VALUES 10");
429+
430+
// alter table to change column type back at version 11, test change type again
431+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " (z varchar) WITH (checkpoint_interval = 20)");
432+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000011.checkpoint.parquet"))).isTrue();
433+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 'version12'", 1);
434+
assertThat(query("TABLE " + table.getName()))
435+
.matches("VALUES varchar 'version12'");
436+
}
437+
}
438+
439+
@Test
440+
void testWriteCheckpointOnSchemaChangeCTAS()
441+
{
442+
try (TestTable table = newTrinoTable("test_write_checkpoint_on_schema_change", "(x int) WITH (checkpoint_interval = 2)")) {
443+
Path tableLocation = Path.of(getTableLocation(table.getName()).replace("file://", ""));
444+
445+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 1", 1);
446+
// generate checkpoint at version 2
447+
assertUpdate("INSERT INTO " + table.getName() + " VALUES 2", 1);
448+
assertThat(query("TABLE " + table.getName()))
449+
.matches("VALUES 1, 2");
450+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000002.checkpoint.parquet"))).isTrue();
451+
452+
// alter table to add a new column at version 3
453+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (checkpoint_interval = 20) AS SELECT 3 x, CAST('version3' AS varchar) y", 1);
454+
// alter table to add a new column not treated as schema change, thus no new checkpoint should be created
455+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000003.checkpoint.parquet"))).isFalse();
456+
assertThat(query("TABLE " + table.getName()))
457+
.matches("VALUES (3, varchar 'version3')");
458+
459+
// alter table to drop a column at version 4
460+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (checkpoint_interval = 20) AS SELECT CAST('version4' AS varchar) y", 1);
461+
// alter table to drop a column treated as schema change, thus new checkpoint should be created
462+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000004.checkpoint.parquet"))).isTrue();
463+
assertThat(query("TABLE " + table.getName()))
464+
.matches("VALUES varchar 'version4'");
465+
466+
// alter table to rename a column at version 5
467+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (checkpoint_interval = 20) AS SELECT CAST('version5' AS varchar) z", 1);
468+
// alter table to rename a column treated as schema change, thus new checkpoint should be created
469+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000005.checkpoint.parquet"))).isTrue();
470+
assertThat(query("TABLE " + table.getName()))
471+
.matches("VALUES varchar 'version5'");
472+
473+
// alter table to change column type at version 6
474+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (checkpoint_interval = 20) AS SELECT 6 z", 1);
475+
// alter table to change column type treated as schema change, thus new checkpoint should be created
476+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000006.checkpoint.parquet"))).isTrue();
477+
assertThat(query("TABLE " + table.getName()))
478+
.matches("VALUES 6");
479+
480+
// alter table to change column type back at version 7, test change type again
481+
assertUpdate("CREATE OR REPLACE TABLE " + table.getName() + " WITH (checkpoint_interval = 20) AS SELECT CAST('version7' AS varchar) z", 1);
482+
assertThat(Files.exists(tableLocation.resolve("_delta_log/00000000000000000007.checkpoint.parquet"))).isTrue();
483+
assertThat(query("TABLE " + table.getName()))
484+
.matches("VALUES varchar 'version7'");
485+
}
486+
}
487+
340488
@Test // regression test for https://github.com/trinodb/trino/issues/24121
341489
void testPartitionValuesParsedCheckpoint()
342490
throws Exception

0 commit comments

Comments
 (0)