Skip to content

Conversation

@thisisArjit
Copy link
Contributor

@thisisArjit thisisArjit commented Nov 13, 2025

Dear Gobblin maintainers,

Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below!

JIRA

Description

  • Add pre-publish step for IcebergSource to delete respective partitions before copying files + unit tests included
  • Set required properties in work units generated (in IcebergSource) & CopyableFile (in IcebergFileStreamExtractor) to use CopyDataPublisher
  • Avoid using closer in downloadFile impl as it leads to stream closed exception during process work unit step
  • Remove grouping logic in IcebergSource (1 work unit = multiple files), this leads to FileAwareInputStreamDataWriter can only process one file and cannot be reused

Tests

  • Added & updated unit tests for pre-publish delete step
  • Manually verified that the flows are working as expected

Commits

  • My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

@thisisArjit thisisArjit force-pushed the oh-azure-workaround branch 4 times, most recently from a6e6ef9 to 1d2fa8b Compare November 20, 2025 08:30
@thisisArjit thisisArjit force-pushed the oh-azure-workaround branch 2 times, most recently from 73a7ba0 to 883ad29 Compare November 24, 2025 05:16
@thisisArjit thisisArjit changed the title Workaround for Iceberg file based copy issues Add delete step in IcebergSource and fix lineage publish issue Nov 24, 2025
@thisisArjit thisisArjit changed the title Add delete step in IcebergSource and fix lineage publish issue Add pre-publish step for IcebergSource and fix other issues in Iceberg file based copy Nov 24, 2025
@thisisArjit thisisArjit marked this pull request as ready for review November 24, 2025 05:41
/**
* Invoke the private addDeleteStepIfNeeded method using reflection
*/
private void invokeAddDeleteStepIfNeeded(List<IcebergTable.FilePathWithPartition> sourceFiles,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceFiles is not being used while invoking the method addDeleteStepIfNeeded

Comment on lines 355 to 359
List<IcebergTable.FilePathWithPartition> sourceFiles = Lists.newArrayList();
Map<String, String> partitionData = new HashMap<>();
partitionData.put("datepartition", "2025-10-11");
sourceFiles.add(new IcebergTable.FilePathWithPartition("/source/path/file1.parquet", partitionData, 1000L));
sourceFiles.add(new IcebergTable.FilePathWithPartition("/source/path/file4.parquet", partitionData, 1000L));
Copy link
Contributor

@khandelwal-prateek khandelwal-prateek Nov 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sourceFiles is not being used in invokeDeleteStepIfNeeded, this test is effectively verifying similar behaviour as testDeleteSinglePartitionDirectory

public void testDeleteTargetDirectoryNotExists() throws Exception {
// Configure: delete enabled but point to non-existent directory
state.setProp(IcebergSource.DELETE_FILES_NOT_IN_SOURCE, true);
state.setProp(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, "/non/existent/path");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can also add a check for testing when DATA_PUBLISHER_FINAL_DIR is not set i.e. before setting this property in this test itself, and rename to something like testDeleteTargetDirectoryNotConfiguredOrNotExists

.datasetOutputPath(targetFs.getUri().getPath())
.ancestorsOwnerAndPermission(ancestorOwnerAndPermissionList)
.build();
copyableFile.setFsDatasets(originFs, targetFs);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add a test that verifies that after extraction if we deserialize the CopyableFile then destinationData is not null, hence, also test the lineage fix

@thisisArjit thisisArjit force-pushed the oh-azure-workaround branch 2 times, most recently from c21d238 to d66598e Compare November 24, 2025 09:43
@khandelwal-prateek
Copy link
Contributor

LGTM!

@abhishekmjain abhishekmjain merged commit 7923a96 into apache:master Nov 24, 2025
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants