Skip to content

Add reshuffle before triggering load jobs. #34657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

claudevdm
Copy link
Collaborator

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@claudevdm claudevdm marked this pull request as ready for review April 17, 2025 18:03
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@claudevdm
Copy link
Collaborator Author

Run Python_Coverage PreCommit

self.reshuffle_before_load = not util.is_compat_version_prior_to(
p.options, "2.65.0")
if self.reshuffle_before_load:
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the non-determinism currently come from? If I'm reading things correctly, the preceding transform (PartitionFiles) is the only thing between this and the last fusion break (the GBK), and I think that should be deterministic since its operating per-element, but it is possible I'm missing something

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I am not sure exactly where the non-determinism currently come from, but we have seen cases of number of files being uploaded being different between retries during autoscaling and this is the only plausible explanation I could come up with.

that should be deterministic since its operating per-element, but it is possible I'm missing something
Can you elaborate on this?

Does GroupByKey guarantee determinism for the inputs to PartitionFiles? Without a Reshuffle it looks like part of the GroupFilesByTableDestinations (it lists being part of 3 stages?), PartitionFiles and the TriggerLoadJobs are fused into a single stage.

Adding a reshuffle puts TriggerLoadJobs* in their own stages, but it is less obvious what is happening with just the GBK.

Java has this precaution


https://cloud.google.com/dataflow/docs/concepts/exactly-once#output-delivery mentions best practice for IO's is to add a reshuffle before doing a write with side effects.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, does Reshuffle force determinism by grouping by unique id's?

Without reshuffle, if more elements destined for a given destination (key for GroupFilesByTableDestinations) arrived between retries, is there a chance these new files could be materialized for the key, and therefore more files are read by the GroupFilesByTableDestinations.read?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mentioned offline

The contents of GroupByKey should be deterministic unless there was a trigger that fired twice (in which case everything gets recomputed, there's no determinism at all)

The order may not be deterministic; I wonder if that's the issue here. That could potentially be solved for by sorting the files in PartitionFiles (thus making GBK + PartitionFiles collectively deterministic)

What are the symptoms you're seeing?

Without reshuffle, if more elements destined for a given destination (key for GroupFilesByTableDestinations) arrived between retries, is there a chance these new files could be materialized for the key, and therefore more files are read by the GroupFilesByTableDestinations.read?

The GBK operation shouldn't happen until the first trigger has fired. If more elements arrived late, that indicates that another trigger fired (which would impact the Reshuffle case as well)

https://cloud.google.com/dataflow/docs/concepts/exactly-once#output-delivery mentions best practice for IO's is to add a reshuffle before doing a write with side effects.

Functionally, we have a reshuffle as part of our GBK; adding another one will make this more expensive (obviously if we need it for correctness we need it, but we should understand why first)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - after chatting a bit offline my take is that I'm fine with this change if we can have confidence it will fix the issue (the performance cost is vanishingly small since it is just file names being shuffled, not records), but we should have that either empirically or theoretically.

@kennknowles may have ideas on what is going on here since it may be related to triggering semantics -

I will be AFK for the next 2 weeks, so don't block on me going forward :)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sg, I did more testing and have been able to consistently reproduce it without this fix, and have not been able to repro it with this fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us update CHANGES.md to mention this graph change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, does Reshuffle force determinism by grouping by unique id's?

Reshuffle checkpoints on Dataflow but not other runners. Otherwise any randomly generated ids in a fused stage will be generated again on retry. Hence this behavior getting the name RequiresStableInput which is the "right" way to express this (but unfortunately I don't think complete enough to use here).

@claudevdm claudevdm requested a review from kennknowles April 21, 2025 13:16
@liferoad liferoad added this to the 2.65.0 Release milestone Apr 21, 2025
self.reshuffle_before_load = not util.is_compat_version_prior_to(
p.options, "2.65.0")
if self.reshuffle_before_load:
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, does Reshuffle force determinism by grouping by unique id's?

Reshuffle checkpoints on Dataflow but not other runners. Otherwise any randomly generated ids in a fused stage will be generated again on retry. Hence this behavior getting the name RequiresStableInput which is the "right" way to express this (but unfortunately I don't think complete enough to use here).

@@ -1101,6 +1101,18 @@ def _load_data(
of the load jobs would fail but not other. If any of them fails, then
copy jobs are not triggered.
"""
self.reshuffle_before_load = not util.is_compat_version_prior_to(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Structural comments:

  • This reshuffle should be added outside of this transform. Make it the responsibility of the caller to ensure stable inputs.
  • It is be nicer to fully fork expand whenever we move to a new update-incompatible version. Basically freeze the old one and leave it behind (like ). You still get bugfixes applied to helper transforms, but keep expand itself straight-line code except for the top-level decision of which version to expand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reshuffle should be added outside of this transform. Make it the responsibility of the caller to ensure stable inputs.

Apologies, but I don't understand 100%.

class BigQueryBatchFileLoads(beam.PTransform):
is the transform, is a method that builds part of the BigQueryBatchFileLoads transform.

class TriggerLoadJobs(beam.DoFn):
is the DoFn that requires stable inputs.

So I am adding the reshuffle right before the DoFn that requires stable inputs (in _load_data).

Are you saying to create a new PTransform that wraps TriggerLoadJobs, with expand_2_264_0() that just returns ParDo(TriggerLoadJobs) and expand() returns Reshuffle() | ParDo(TriggerLoadJobs)?

param(compat_version=None),
param(compat_version="2.64.0"),
])
def test_reshuffle_before_load(self, compat_version):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this really tests what you want to test. You want:

  • correctness of the transform with both settings of the flag
  • if possible, some way to reproduce the issue you had before that is red before, green after

You also want to build an internal dataflow test of update compatibility with requested 2.64.0 version. I can show you a CL that does that if you haven't already seen them.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants