Skip to content

Spark: SnapshotTableSparkAction add validation for non-overlapping source/dest table paths. #12779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

slfan1989
Copy link
Contributor

I am currently working on migrating some Hive tables to Iceberg, with a particular focus on SnapShot and Migrate functionalities. I noticed a TODO in the SnapShot process that needs improvement. In this PR, I will enhance this by adding a validation to ensure non-overlapping source and destination table paths, along with corresponding unit tests.

@github-actions github-actions bot added the spark label Apr 11, 2025
@slfan1989 slfan1989 changed the title Spark: SnapShotAction add validation for non-overlapping source/dest table paths. Spark: SnapshotTableSparkAction add validation for non-overlapping source/dest table paths. Apr 11, 2025
@slfan1989
Copy link
Contributor Author

@manuzhang @ebyhr Recently, I've been using Snapshot and Migrate for large-scale table migrations and have noticed some details that can be optimized. The changes are not significant. Could you please help review this PR? Thank you very much!

public void testTableLocationOverlapThrowsException() throws IOException {
// Ensure the test runs only for non-Hadoop-based catalogs,
// because path-based tables cannot have a custom location set.
assumeTrue(
Copy link
Contributor

@nastra nastra Apr 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please use assumeThat(..) from AssertJ instead of JUnit. I've created #12817 to make sure we enforce this going forward

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your suggestion! I have improved this part of the code.

boolean threw = true;
try {
Preconditions.checkArgument(
!sourceTableLocation().equals(icebergTable.location()),
"The destination table location overlaps with the source table location.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following . can be removed. Most existing error messages don't end with . unless they have several sentences.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you decide to remove the full stop, don't forget to remove it from the test as well so that it will not fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for helping review the code! I have made improvements to this part of the code.

Comment on lines 91 to 92
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put("location", "file:" + location);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We make it single line:

Map<String, String> tableProperties = Map.of("location", "file:" + location);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur. This is much cleaner.


// Test that an exception is thrown
// when the destination table location overlaps with the source table location
// Assert that the exception message matches the expected error message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assert that the exception message matches the expected error message

I would remove this comment. This is obvious when reading the following code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur with this comment as well. Given that in other places in the code base where the caching of IllegalArgumentException is being tested doesn't have any explanation in the comments earthier I don't think the comment is needed here as well. It wouldn't be a divergence from rest of the codebase.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion. I have removed this part of the comment.

Copy link
Contributor

@sririshindra sririshindra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@slfan1989 Thanks for the PR.

I understand that the PR is trying to fix the existing TODO, but it is not clear to me why the TODO is needed in the first place.

What exactly is the concern regarding having the same location for the source and the snapshot table? Is it just a matter of table hygiene or is it because the source hive table's data might be duplicated and become corrupt?

I don't think the data is getting corrupt in any way. I have modified your test code and ran a small test as follows.

// I actually removed the your change in SnapshotTableSparkAction to make this work.
// I added the following code snippet before your validation in the test.
// I am pasting the results in comments.
Object result1 = sql("select * from %s", SOURCE_NAME);
System.out.println(result1); // [(1, 'a'), (2, 'b')]
Object result2 = sql("select * from %s", tableName);
System.out.println(result2); // [(1, 'a'), (2, 'b')]

sql("INSERT INTO TABLE %s VALUES (3, 'c')", SOURCE_NAME);
sql("INSERT INTO TABLE %s VALUES (4, 'd')", SOURCE_NAME);
Object result3 = sql("select * from %s", SOURCE_NAME);
System.out.println(result3); // [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
Object result4 = sql("select * from %s", tableName);
System.out.println(result4); // [(1, 'a'), (2, 'b')]

sql("INSERT INTO TABLE %s VALUES (5, 'e')", tableName);
sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName);
Object result5 = sql("select * from %s", SOURCE_NAME);
System.out.println(result5); // [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
Object result6 = sql("select * from %s", tableName);
System.out.println(result6);  // [(1, 'a'), (2, 'b'), (5, 'e'), (6, 'f')]
    

I then checked all the results in the debugger and I also checked the files in the data location. With the same location specified for both the existing hive table and the snapshot table, any new writes to hive are being written directly to the parent (LOCATION) folder and any new writes to the snapshot table are being written to a data folder within the same location. The table location also has a metadata folder generated corresponding to the snapshot table.

I listed all the files that are generated.

.
├── _SUCCESS
├── data
│   ├── 00000-10-65f77ccb-5068-41dd-a8a9-17355c73ce08-0-00001.parquet
│   └── 00000-11-51fb3cd3-522e-4dce-92fb-75432d63d542-0-00001.parquet
├── metadata
│   ├── 00000-69bc7156-f0a9-460b-917d-64d82a211831.metadata.json
│   ├── 00001-eb060329-9131-4a53-a1a7-a5eb408baefc.metadata.json
│   ├── 00002-d9ac89e8-733b-4c5d-9cfa-86f393bbafc0.metadata.json
│   ├── 1ef0b8a9-05b6-44b5-a897-4ded8c699868-m0.avro
│   ├── 81db2b93-2891-46ff-87db-751ce1a7e1dc-m0.avro
│   ├── cf0457ad-6ecb-4b46-8940-edb8e4951b4c-m0.avro
│   ├── snap-7307383682881663630-1-1ef0b8a9-05b6-44b5-a897-4ded8c699868.avro
│   ├── snap-8229039877763876716-1-cf0457ad-6ecb-4b46-8940-edb8e4951b4c.avro
│   └── snap-8502937755551355664-1-81db2b93-2891-46ff-87db-751ce1a7e1dc.avro
├── part-00000-014efe7d-9dc8-4460-8b90-fc850991e929-c000.snappy.parquet
├── part-00000-954a0122-e470-40ef-9f95-e3da5be9948d-c000.snappy.parquet
├── part-00000-e51dffb2-da30-49d1-afb3-bced119cc7de-c000.snappy.parquet
└── part-00000-f469c332-c1ea-4b41-bf60-5ec5f5b4d8b7-c000.snappy.parquet

As you can see, any new data files written to the snapshot table are being written a separate data folder and new data files written to the existing hive table are being written to the preexisting location folder.

The select statements are also showing the results as expected i.e the new data inserted into the existing hive table is only showing up when the hive table is queried and the new data inserted into the snapshot table is only showing up when the snapshot table is queried.

So, apart from hygiene concerns, I don't see an issue with respect to correctness. Granted, hygiene is an important thing to consider, but maybe we should leave that to the user rather than enforce it in iceberg directly. If I am missing something please let me know.

Comment on lines 91 to 92
Map<String, String> tableProperties = Maps.newHashMap();
tableProperties.put("location", "file:" + location);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur. This is much cleaner.


// Test that an exception is thrown
// when the destination table location overlaps with the source table location
// Assert that the exception message matches the expected error message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I concur with this comment as well. Given that in other places in the code base where the caching of IllegalArgumentException is being tested doesn't have any explanation in the comments earthier I don't think the comment is needed here as well. It wouldn't be a divergence from rest of the codebase.

Copy link
Contributor

@sririshindra sririshindra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, what might be a good idea to enforce is that the "write.data.path" and "write.metadata.path" properties should not be made equal to the existing source table's location.

Here is what I did. I modified the tableProperties like so.

Map<String, String> tableProperties = Map.of("location", "file:" + location,
            "write.data.path", "file:" + location);

I then ran the same queries as before after creating the snapshot.

// I actually removed the your change in SnapshotTableSparkAction to make this work.
// I added the following code snippet before your validation in the test.
// I am pasting the results in comments.
Object result1 = sql("select * from %s", SOURCE_NAME);
System.out.println(result1); // [(1, 'a'), (2, 'b')]
Object result2 = sql("select * from %s", tableName);
System.out.println(result2); // [(1, 'a'), (2, 'b')]

sql("INSERT INTO TABLE %s VALUES (3, 'c')", SOURCE_NAME);
sql("INSERT INTO TABLE %s VALUES (4, 'd')", SOURCE_NAME);
Object result3 = sql("select * from %s", SOURCE_NAME);
System.out.println(result3); // [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')]
Object result4 = sql("select * from %s", tableName);
System.out.println(result4); // [(1, 'a'), (2, 'b')]

sql("INSERT INTO TABLE %s VALUES (5, 'e')", tableName);
sql("INSERT INTO TABLE %s VALUES (6, 'f')", tableName);
sql("REFRESH table %s", SOURCE_NAME);
Object result5 = sql("select * from %s", SOURCE_NAME);
System.out.println(result5); // [(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') , (5, 'e'), (6, 'f')] // Notice that the hive table is now corrupted. 
Object result6 = sql("select * from %s", tableName);
System.out.println(result6);  // [(1, 'a'), (2, 'b'), (5, 'e'), (6, 'f')]
    

The preexisting hive table is basically corrupted because "write.data.path", "file:" + location points to the same location.
This becomes an even bigger problem if "write.metadata.path", "file:" + location is also written to the same location. In that case, hive reads will fail entirely because spark will be unable to read metadata avro/json files because their schema will not match the existing hive table's schema.

❯ tree                                                                                                                                                                     
.
├── 00000-10-a240f497-6c00-46eb-8df6-2316e9123796-0-00001.parquet
├── 00000-11-c3c4d323-7302-4ee3-a130-33263b71a26c-0-00001.parquet
├── _SUCCESS
├── metadata
│   ├── 00000-bbe3af3d-65da-474e-9f37-f015556c66c1.metadata.json
│   ├── 00001-6187b41a-f6bb-4301-9f70-27767a126307.metadata.json
│   ├── 00002-515c29d2-2ab7-4083-a630-a20bd1365729.metadata.json
│   ├── 032a7c92-62c2-4cbe-888f-6e8b2ecfe054-m0.avro
│   ├── 9373d8e5-1b3a-45d0-bbfe-6cfb406220b6-m0.avro
│   ├── a849f925-1470-4f04-90a1-9b94e864de17-m0.avro
│   ├── snap-2648286813050968924-1-9373d8e5-1b3a-45d0-bbfe-6cfb406220b6.avro
│   ├── snap-3555057750814590581-1-032a7c92-62c2-4cbe-888f-6e8b2ecfe054.avro
│   └── snap-6111089045027256362-1-a849f925-1470-4f04-90a1-9b94e864de17.avro
├── part-00000-33b37f45-e4a1-448e-94b4-7bf363301d29-c000.snappy.parquet
├── part-00000-74e5a013-36db-4417-8d06-07bec9a38f27-c000.snappy.parquet
├── part-00000-78ec9a8d-4b7f-4a79-86e2-6fcba224a763-c000.snappy.parquet
└── part-00000-f01888c1-a553-46c6-afe7-02bfbd255bb9-c000.snappy.parquet

As you can see, any new data files written to the snapshot table are being written to the same data folder and new data files written to the existing hive table are also being written to the preexisting location folder. Since both hive tables and iceberg tables have data files present in the same data location the hive data is corrupted as both the hive data and the Iceberg data is being read by the hive table.

Another issue that I thought of that how running the removeOrphanFiles procedure on the snapshot table will affect the preexisting hive table. Maybe running remove orphan files procedure will delete valid data files corresponding to the hive table. Maybe that is a good justification for enforcing the rule for snapshot table location to be different from the existing hive table location.

@slfan1989
Copy link
Contributor Author

@sririshindra Thank you very much for your message! The information you provided is very detailed and insightful. From my personal understanding, I believe the original purpose of the Snapshot is to ensure that the data in the source table remains unchanged, meaning that no data or information in the source table is modified.

Although writing metadata into the source table directory may not immediately cause noticeable issues, from a design perspective, this still constitutes a modification. As a user, I'd like to share some of my thoughts: I am currently working on converting Hive tables to Iceberg tables.

During this process, I do want the source table to remain unchanged in order to ensure data consistency and facilitate subsequent rollback operations. After all, the source table usually holds critical business data, and we rely on it as a stable foundation. Therefore, I chose to use the Snapshot process for the conversion to ensure that the data in the source table is preserved to the greatest extent during the migration to the Iceberg table.

Regarding the other details you raised, I will carefully review the code and provide further analysis and feedback.

Your detailed analysis has left a strong impression on me. Thank you again for your valuable insights!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants