Skip to content

Spark: when doing rewrite_data_files, check for partitioning schema compatibility #12651

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

adrians
Copy link

@adrians adrians commented Mar 26, 2025

Context - the implementation as seen in the current release allows for this kind of scenario:

  • We start with a table of 20TB of data-files, divided in 200 coarse partitions (200 partitions X 200 parquet files X 512MB).
  • We want to do a partitioning schema evolution, to split every partition in 10 smaller partitions.
  • Doing a first rewrite_data_files, we obtain 200 file-groups (assigned randomly from the parquet-files) x 100GB each. This is caused by a bit of logic that says "if those files do not match the latest partitioning schema, assume they're unpartitioned".
  • After the first rewrite_data_files, we're left with 2000 fine partitions, but since every file-group can (at least in theory) write to every partition, the expected result is something like 2000 partitions x 200 files x 50MB.
  • At this point, we need to compact those data-files, so we run a second rewrite_data_files.
  • After the second rewrite_data_files, we're finally left with 2000 partitions x 20 files x 512MB.

This Pull-request proposes an algorithm that simplifies the scenario:

When building the file-groups for the first rewrite_data_files, check if the old partitioning schema is a coarser variant of the current schema. If that's the case, try to build file-groups using that partitioning system. The scenario now becomes:

  • Doing a first rewrite_data_files, we obtain 200 file-groups x 100GB each (based on the old partitioning schema).
  • After the first rewrite_data_files, we're left with 2000 fine partitions, but since every fine-partition can be obtained from a single parent old-partition, the expected result is something like 2000 partitions x 20 files x 512MB.
  • The second pass is not necessary. (In practice, if the coarse-partitions are slightly larger than 100GB, they might be split into 2 file-groups, so there might be some small parquet-files to compact, but this task is orders of magnitude faster now).

This is a significant improvement in terms of time taken to apply the new partitioning schema.

The criteria to determine if the new partitioning is "finer or the same" than the old partitioning look something like this:

  • the new (finer) partitioning spec has more (or the same) number of fields than the old (coarse) one;
    AND
  • the first N source-columns for the new (finer) partitioning spec must be the same as the N source-columns of the old partitioning-spec (N = number of fields in the old partition-spec)
    AND
  • the first N fields of the new (finer) partitioning spec must have "finer" transformations than the N fields in the old spec (N = number of fields in the old partition-spec) - see table below
if old.field[i].transformation is then new.field[i].transformation is the same or more specific
identity identity
year year
month
day
hour
identity
month month
day
hour
identity
day day
hour
identity
hour hour
identity
truncate(x) truncate(y) AND y≥x
identity
bucket(x) bucket(y) AND y≥x AND y%x=0
identity

For the third bullet-point in the list of criteria, I have found that the boolean Transform.satisfiesOrderOf(Transform a) method that implements that predicate pretty well - except maybe for the bucket case, for which it'll fall back to the "unpartitioned" scenario.

@@ -265,6 +265,22 @@ public boolean equals(Object other) {
return Arrays.equals(fields, that.fields);
}

public boolean equalOrFinerThan(PartitionSpec that) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced of the naming here, but we might want to think about the other direction so say that "one partition spec covers another"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think coarse/narrow are probably the wrong vocabulary for this since we specifically talking about a partitioning which can live within another. Covering sounds good, I believe Spark uses a "satisfies" vocabulary when describing a distribution falling within another.

For example Identity(x), identity(y) satisifies ( Identity x) but not vice versa

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback - I can rearrange the method and the wording.
The motivation for the change was to cut down the maintenance-times for the tables in my environment, so using a consistent terminology with the rest of the project didn't really make it as a priority.

}

for (int i = 0; i < that.fields.length; i++) {
if (this.fields[i].sourceId() != that.fields[i].sourceId()) {
Copy link
Contributor

@danielcweeks danielcweeks Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're making an incorrect assumption here about field ordering. I can have the following specs:

spec 1: identity(f1), identity(f2)
spec 2: identity(f2), identity(f1), identity(f3)

Which are both valid specs for the same schema and would fail in this scenario where spec 2 is finer than spec 1.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, that constraint is not necessary - will change.

@@ -265,6 +265,22 @@ public boolean equals(Object other) {
return Arrays.equals(fields, that.fields);
}

public boolean equalOrFinerThan(PartitionSpec that) {
if (this.fields.length < that.fields.length) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this going to be an issue with V1 Specs with void transforms? Ie
(identity(x), void(y), void(z)) and (identity(x), identity(y))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't tested it with V1-table-format or void transforms. Will add a bunch of unit-tests covering it.

task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;
table.spec().equalOrFinerThan(table.specs().get(task.file().specId()))
? task.file().partition()
: emptyStruct;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are in the process to refactoring out the compaction planning part to the core module.
Please make sure that any changes here land in the BinPackRewriteFilePlanner too:

for (FileScanTask task : tasks) {
// If a task uses an incompatible partition spec the data inside could contain values
// which belong to multiple partitions in the current spec. Treating all such files as
// un-partitioned and grouping them together helps to minimize new files made.
StructLike taskPartition =
task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;
filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task);
}

FWIW, i have an open PR to move the Spark compaction to the new API (#12692) which will remove the planning from here.

@github-actions github-actions bot added the core label Apr 4, 2025
Comment on lines +283 to +285
table.spec().equalOrFinerThan(table.specs().get(task.file().specId()))
? task.file().partition()
: emptyStruct;
Copy link
Contributor

@pvary pvary Apr 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the planning results, so it would be good to add testing to the core module too, which checks the generated plan is correct

Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label May 23, 2025
@adrians
Copy link
Author

adrians commented May 29, 2025

Bad bot.

@github-actions github-actions bot removed the stale label May 30, 2025
Copy link

This pull request has been marked as stale due to 30 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Jun 29, 2025
Copy link

github-actions bot commented Jul 7, 2025

This pull request has been closed due to lack of activity. This is not a judgement on the merit of the PR in any way. It is just a way of keeping the PR queue manageable. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@github-actions github-actions bot closed this Jul 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants