-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Core: Support incremental compute for partition stats #12629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Some engines want to synchronously write the partition stats. (Similar to how Trino synchronously write the puffin files during insert). Reading all the manifests in a table can be avoided to compute partition stats if the we compute the stats incrementally and merge it with previous stats. @aokolnychyi, @pvary, @deniskuzZ, @gaborkaszab : Let me know what you guys think. |
PartitionMap<PartitionStats> statsMap = PartitionMap.create(table.specs()); | ||
// read previous stats | ||
try (CloseableIterable<PartitionStats> oldStats = | ||
readPartitionStatsFile(statsFileSchema, Files.localInput(statisticsFile.path()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the new unified tuple is used for reading the old stats file. It automatically handled the schema evolution.
9b9f5ad
to
4f973a3
Compare
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Outdated
Show resolved
Hide resolved
afb51c2
to
9eef4c9
Compare
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Outdated
Show resolved
Hide resolved
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Outdated
Show resolved
Hide resolved
data/src/test/java/org/apache/iceberg/data/TestPartitionStatsHandler.java
Outdated
Show resolved
Hide resolved
I have added 1.9.0 milestone for this PR as it is a small change (excluding refactoring) and we still have some time for 1.9.0 sue to open issues in milestone. |
manifestFilePredicate = | ||
manifestFile -> | ||
snapshotIdsRange.contains(manifestFile.snapshotId()) | ||
&& !manifestFile.hasExistingFiles(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we want this as a default predicate?
manifestFile -> !manifestFile.hasExistingFiles()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could add it as default filter:
if (fromSnapshot != null) {
manifestFilePredicate =
manifestFile -> snapshotIdsRange.contains(manifestFile.snapshotId())
}
List<ManifestFile> manifests =
currentSnapshot.allManifests(table.io()).stream()
.filter(manifestFilePredicate)
.filter(manifestFile -> !manifestFile.hasExistingFiles())
.collect(Collectors.toList());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point.
While computing incremental, I observed that it may become duplicate counts. So, I added.
I do have some gaps, I need to understand fully when and all we mark manifest entry as existing.
Is there any scenario exist to consider "existing" entries or just "added" is enough?
There is another check down below, that considers both added and existing (added long back).
if (entry.isLive()) { |
I will update the code to just keep added entry and also add a testcase of rewrite data files to ensure stats are same after the rewrite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, looks like ManifestFile
can have both added and existing entries together? So, Instead of filtering here. I will keep filtering just at the entries level down below in collectStatsForManifest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we have compaction and expire snapshots? new manifests would have the EXISTING entries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we do with the stats of the removed files?
Lets say:
- S1 adds data
- Execute the stats collection
- S2 adds more data
- S3 compacts data from S1, and S2 - This removes files created by S1, and S2 and creates new files
- Execute incremental/normal stats collection
What happens with the stats in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
compaction doesn't remove the data. if we expire S1 and S2 we don't have prev snapshots/stats and start fresh (i.e. full compute)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't expire data, could we detect that S3 is only a compaction commit, and the stats don't need to be changed?
What if S3 instead is a MoW commit? Can we detect the changes and calculate stats incrementally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Compaction will have snapshot operation as REPLACE and we can reuse the old stats for that scenario. But need to write the new stats file with same data to handle clean GC of snapshot files.
Compaction will be tested end to end while adding the spark procedure.
- About the live (existing + added),
For full compute, old manifest files will be marked as deleted and entries will be reused as existing in the manifest files + may have additional added entry. So, for full compute need to consider both existing and added.
For incremental compute, old stats file has some entires which are now existing. So, should consider the existing entires.
This all leads to the next question, what happens when manifest is deleted. That case we just update the snapshot entry (last modified) and not decrement the stats. Hence, we should skip it for incremental compute again.
All these logic present in collectStatsForManifest
and existing testcases (full compute and incremental) covers it as it uses mergeAppend
which produces manifest mix of added and existing entires.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We didn't need decrement stats for full compute because we were discarding the deleted manifests. Only considering live manifests.
Now, I am not really sure for compaction, the current code will work. We may need decrement stats just for incremental compute. I will test compaction scenario tomorrow and handle this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM +1
PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); | ||
if (statisticsFile == null) { | ||
LOG.info("Previous stats not found. Computing the stats for whole table."); | ||
return PartitionStatsUtil.computeStats(table, null, snapshot); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could this throw an error instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why? That is to handle the case when no stats files existed before, and we need to execute a full computation.
We enter here when computing stats for the first time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, the user requested incremental stat compute, but with wrong parameters. In this case we could either "correct" the mistake or throw an error.
The question is, how frequent is the problem, and how easy is to detect from the user side
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by wrong parameters? non existing snapshotId?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean by wrong parameters? non existing snapshotId?
Exactly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throwing error now and added the testcase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't agree with that design, see #12629 (comment)
data/src/main/java/org/apache/iceberg/data/PartitionStatsHandler.java
Outdated
Show resolved
Hide resolved
@deniskuzZ, @pvary: Thanks guys for the review. I have addressed all the comments. You can take a fresh look again tomorrow :D (after some break :D) |
04b1cb9
to
4e97efe
Compare
Table table, Snapshot snapshot, StructType partitionType) throws IOException { | ||
PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); | ||
if (statisticsFile == null) { | ||
throw new RuntimeException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's user-friendly + recompute
flag loses it's purpose (you can call directly computeAndWriteStats
).
Now every client needs to implement either the same prev stats file check or do the try-catch.
try {
computeAndWriteStatsFileIncremental()
} catch (RuntimeException e) {
if (e.getMessage().equals("bla-bla")) {
computeAndWriteStats
}
}
I would expect from computeAndWriteStatsFileIncremental
do what's needed instead throwing Previous stats not found exception.
Non-existent snapshotId is a diff situation. We should validate if snapshot == null
and throw Snapshot doesn't exist
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
recompute flag loses it's purpose
there is no recompute flag exposed to the user. The private method (incrementalComputeAndMerge
) which is throwing this exception is also always computing incrementally.
I would expect from computeAndWriteStatsFileIncremental do what's needed instead throwing Previous stats not found exception.
computeAndWriteStatsFileIncremental
says incremental compute. Forcefully recomputing when there is an error is not a good idea as the method's responsibility is just to try incremental compute?
Maybe I can expose another method called computeAndWriteStatsWithFallback()
, which will internally calls it?
public void computeAndWriteStatsIncrementalWithFallback() {
try {
computeAndWriteStatsFileIncremental();
} catch (RuntimeException e) {
if ("bla-bla".equals(e.getMessage())) {
computeAndWriteStats(); // Fallback in case of a specific error
} else {
throw e; // Re-throw unexpected errors
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked how you did it initially. Please disregard the recompute flag
comment, it has nothing to do with the incremental workflow.
Think about what changes are needed on the client side. I was planning just to replace the existing call to the incremental one unless it's ANALYZE TABLE (force recompute).
What are the use-cases we would benefit from the prev stats file missing exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets see what @pvary thinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very late to this conversation, sorry about that :) I think we should talk a bit about how a user would use these APIs to compute stats and then we might be able to sort this disagreement out too.
I see the other PR to introduce a compute_partition_stats
stored proc for the "full-compute" path. I'd assume there would be another proc, incremental_compute_partition_stats
or similar that will execute the "incremental-compute" path. If my assumption is correct, I think the question is why would a user decide to call one instead of the other. The expectation here is that the "full-compute" path is more expensive than the "incremental-compute" path. So if the users motivation is to run this cheaper operation then falling back to the full compute could be misleading.
Or from a different angle: if the incremental path is expected to try first with the cheap computation and then fall back to the more expensive one, then what would be the point of having the compute_partition_stats
procedure to execute "full-compute". Why would one call it? So in general I'm in favour of throwing an exception if the incremental computation is not feasible.
Well, if the plan is to have that single Spark procedure for both approaches, then everything I wrote above is irrelevant :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My take on this is to always use incremental
unless you explicitly need to recompute.
incremental
should be smart enough to decide if it needs to start from scratch (first stats compute) or reuse prev stats, compute diff, and merge.
What is the benefit of "Previous stats not found for incremental compute. Try full compute"? It just moves the need for exception handling to a client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are different approaches that could work well. I understand that the most convenient would be to offer a single computeStats
function that can decide between incremental or full computation. On the other hand that could hide some details from the users. I had the impression that on Iceberg APIs are designed in a way to have clear boundaries and not mix functionalities like incremental or full stat computation.
I believe that to come to a conclusion we might want to raise this question on dev@ to have wider visibility. People probably are busy with the upcoming Iceberg Summit, but still we could get good insights. @ajantha-bhat WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gaborkaszab, please check the code closely, incremental
doesn't do full computation
unless there were no prior stats.
Maybe I am wrong, but creating a dev mail thread to discuss every minor change seems counterproductive to me, when we already have people interested in this change here.
Maybe it's worth dropping computeAndWrite
since recompute
flag introduces confusion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I am wrong, but creating a dev mail thread to discuss every minor change seems counterproductive to me, when we already have people interested in this change here.
Strongly agree.
4e97efe
to
0fe332d
Compare
feb41f4
to
e3bcb3f
Compare
@pvary, @gaborkaszab, @deniskuzZ, @nastra: PR is ready for review. |
Discussed this with @ajantha-bhat offline:
What do you think @deniskuzZ, @gaborkaszab? |
Not introducing a specific API for the "force full computation" path makes sense to me. Thanks for the heads-up on the "drop stats" topic. I think it's fine. We might want to introduce a parameterless Thanks for your work @ajantha-bhat ! |
Hive provides the QL to force the stats recompute. Maybe use-cases could be limited and mainly related to recovery, but still, I think it won't harm to have an API for that.
|
Do we know someone from Trino / Spark who can chime in with their requirements? |
Iceberg doesn't create 2 snapshots for this. Updating table metadata with the partition stats is @pvary : Thanks for discussing this in depth. I agree that as of now one interface is enough In the future we can have
I didn't see much participation directly from these community. Maybe in future we can discuss again about adding new interface to force refresh if it is needed. |
I have updated the PR with just one interface (as a new commit) |
@ajantha-bhat, I meant two metadata files, though they might still be large. Could you clarify the concern around keeping the API to trigger full partition stats recompute? Clients have to rely on workarounds, even though Iceberg internally supports this through a private method. In Hive, we have a concrete need for this functionality. So what's the suggested approach - should we hack it on the client side or bring this capability into the engine repo directly? PS: what's the iceberg view on the fact that we are changing the behavior of the existing API (full recompute -> incremental)? From a client's perspective, it might be considered a breaking change. I just checked the Impala docs (https://impala.apache.org/docs/build/html/topics/impala_compute_stats.html) and they mentions support for both options:
|
There are no strong concern. We felt it is redundant to have many APIs. Plus the reason for full compute again is very rare (maybe only during corruption). Plus there are ways to achieve full compute with the single API by clearing stats.
Still the full stats available for the user. The way it compute internally has changed. No difference in the output for the user.
This is little different. Per partition or whole table. (Not based on the snapshot) @deniskuzZ: I have a question for hive users, if the user calls incremental first time (table without previous stats) are you expecting it to throw error or do full compute? |
it's rare, but it exists, so why not expose a clear API to resolve it instead of suggesting workarounds? In my opinion, we are overthinking here.
If the partition spec is not provided, stats is computed for all the partitions individually, as we do here.
do a full compute. |
@ebyhr, @findepi: Do I remember correctly that you work on Trino? Thanks, |
I believe @raunaqmorarka is the best person to answer the question. |
@deniskuzZ: How does Hive INCREMENTAL stats work? I'm not able to find the doc 😢 About the Impala, here is what I have found:
For me, this means that Impala uses incremental stats in a very different way than we do it in Iceberg. Having a full recompute would not help them, but they would need a "drop stats" |
@raunaqmorarka: Do you have any opinion on whether Trino needs force refresh API for stats? @pvary and @deniskuzZ: I also checked Spark and they doesn't have incremental or force refresh option. One API should be enough. https://spark.apache.org/docs/latest/sql-ref-syntax-aux-analyze-table.html I think there is no difference of opinion on the current code (one API). We are just debating on whether another API of force refresh is needed. So, I think we can go ahead with this PR and later add force refresh option if really needed. This PR is hanging from long time. Merging this will enable the further work on spark action and other engine integrations. |
Please don't take my comment as a blocker for merging the PR. However, why not be a bit more flexible and retain the existing force refresh API for recovery, especially since it's already in use by some of the engines? |
"Using full compute as previous statistics file is not present for incremental compute."); | ||
stats = computeStats(table, snapshot, file -> true, false /* incremental */).values(); | ||
} else { | ||
stats = incrementalComputeAndMerge(table, snapshot, partitionType, statisticsFile); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe computeAndMergeStatsIncremental
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated as computeAndMergeStatsIncremental
* | ||
* @param snapshot the snapshot corresponding to the deleted manifest entry. | ||
*/ | ||
public void deletedEntryForIncrementalCompute(ContentFile<?> file, Snapshot snapshot) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this and friends could be package private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The public methods should be deprecated first
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last minor changes
Looks like we (mainly Peter) have decided to go with just one API now. If there are strong requirements, we will add force refresh in the future. |
PR is ready to be merged, I will rebase the spark actions and procedure PR once this is merged. |
Merged to main. Here we went for the narrowest possible API. Feel free to revisit the need for the "invalidate"/"recompute" API from time-to-time, and if there are multiple engines requesting the feature we could add it. |
If the previous stats file exist, no need to compute the stats from the scratch.
Identify the latest snapshot for which partition stats file exist. Read the previous stats, incrementally compute the stats for new snapshots, merge the stats and write them to the new file.
PartitionStatsHandler.computeAndWriteStats()
-- incrementally compute stats if previous stats available, if not full compute.