Skip to content

Conversation

@thisisArjit
Copy link
Contributor

@thisisArjit thisisArjit commented Sep 22, 2025

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • This PR aims to update iceberg table's schema during partition copy if there is a schema mismatch
  • It assumes that the schema is backward compatible & the same is enforced by the catalog.

Tests

  • Added new & updated existing unit tests

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@thisisArjit thisisArjit force-pushed the schema-evolution-iceberg branch 2 times, most recently from 7bb0252 to c2db4e8 Compare October 15, 2025 14:16
@thisisArjit thisisArjit marked this pull request as ready for review October 16, 2025 05:35
@thisisArjit thisisArjit force-pushed the schema-evolution-iceberg branch from c2db4e8 to c2c98d3 Compare October 16, 2025 05:38
@thisisArjit thisisArjit changed the title Evolve iceberg table schema for partition copy [GOBBLIN-2230] Evolve iceberg table schema for partition copy Oct 16, 2025
Comment on lines 198 to 199
this.getSrcIcebergTable().accessTableMetadata().schema(),
this.partitionColumnName,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fetching schema after generating files may lead to corruption in very high concurrency scenario, we should get schema when we get manifest files similar to what done in full table replication.

* @param partitionValue the partition column value on which data files will be replaced
*/
protected void overwritePartition(List<DataFile> dataFiles, String partitionColName, String partitionValue)
protected void updateSchemaAndPartition(List<DataFile> dataFiles, Schema updatedSchema, String partitionColName, String partitionValue)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update Java doc here as well
Also name of function maybe changed to updateSchemaAndOverwritePartition ?

Comment on lines 309 to 312

updateSchema(updatedSchema, false);
overwritePartition(dataFiles, partitionColName, partitionValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be part of a single table transaction commit ?
Is there any issue with using of that ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updating schema like this is not possible in a single transaction

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay then lets break this down so that retryer of Overwritepartitionstep doesn't retry schema update again, might need to refactor IcebergOverwritePartitionsStep to call both function sequentially one after other

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This partition commit and schema update step should be made similar to what done in full table i.e. even the destination table schema should not have been updated in between the job or this should be documented properly and tested as well

@thisisArjit thisisArjit force-pushed the schema-evolution-iceberg branch 3 times, most recently from 3056fbc to 23b6b15 Compare October 17, 2025 14:15
Comment on lines +169 to +174
}

private Retryer<Void> createSchemaUpdateRetryer() {
Config config = ConfigFactory.parseProperties(this.properties);
Config retryerOverridesConfig = config.hasPath(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
? config.getConfig(IcebergOverwritePartitionsStep.SCHEMA_UPDATE_RETRYER_CONFIG_PREFIX)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can this be combined with above function by passing prefix as function param ?

Comment on lines 335 to 503

TableOperations tableOps = catalog.newTableOps(tableId);
IcebergTable icebergTable = new IcebergTable(tableId,
catalog.newTableOps(tableId),
tableOps,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious, is this change required ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, not required.. it was required for previous version but didn't revert the change completely

@thisisArjit thisisArjit force-pushed the schema-evolution-iceberg branch from 23b6b15 to c188edd Compare October 24, 2025 11:35
@Blazer-007 Blazer-007 merged commit 66e95ce into apache:master Oct 24, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants