Skip to content

Add reshuffle before triggering load jobs. #34657

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@
## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* [Python] Reshuffle now preserves PaneInfo, where previously PaneInfo was lost
after reshuffle. To opt out of this change, set the
update_compatibility_version to a previous Beam version e.g. "2.64.0".
([#34348](https://github.com/apache/beam/pull/34348))
* [Python] BigQueryFileLoads now adds a Reshuffle before triggering load jobs.
This fixes a bug where there can be data loss in a streaming pipeline if there
is a pending load job during autoscaling. To opt out of this change, set the
update_compatibility_version to a previous Beam version e.g. "2.64.0".
([#34657](https://github.com/apache/beam/pull/34657))

## Deprecations

Expand Down
16 changes: 14 additions & 2 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
from apache_beam.options import value_provider as vp
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import trigger
from apache_beam.transforms import util
from apache_beam.transforms.display import DisplayDataItem
from apache_beam.transforms.util import GroupIntoBatches
from apache_beam.transforms.window import GlobalWindows

# Protect against environments where bigquery library is not available.
Expand Down Expand Up @@ -1062,7 +1062,7 @@ def _write_files_with_auto_sharding(
destination_data_kv_pc
|
'ToHashableTableRef' >> beam.Map(bigquery_tools.to_hashable_table_ref)
| 'WithAutoSharding' >> GroupIntoBatches.WithShardedKey(
| 'WithAutoSharding' >> util.GroupIntoBatches.WithShardedKey(
batch_size=_FILE_TRIGGERING_RECORD_COUNT,
max_buffering_duration_secs=_FILE_TRIGGERING_BATCHING_DURATION_SECS,
clock=clock)
Expand Down Expand Up @@ -1101,6 +1101,18 @@ def _load_data(
of the load jobs would fail but not other. If any of them fails, then
copy jobs are not triggered.
"""
self.reshuffle_before_load = not util.is_compat_version_prior_to(
p.options, "2.65.0")
if self.reshuffle_before_load:
# Ensure that TriggerLoadJob retry inputs are deterministic by breaking
# fusion for inputs.
partitions_using_temp_tables = (
partitions_using_temp_tables
| "ReshuffleBeforeLoadWithTempTables" >> beam.Reshuffle())
partitions_direct_to_destination = (
partitions_direct_to_destination
| "ReshuffleBeforeLoadWithoutTempTables" >> beam.Reshuffle())

# Load data using temp tables
trigger_loads_outputs = (
partitions_using_temp_tables
Expand Down
55 changes: 50 additions & 5 deletions sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,44 @@ def test_records_traverse_transform_with_mocks(self):

assert_that(jobs, equal_to([job_reference]), label='CheckJobs')

@parameterized.expand([
param(compat_version=None),
param(compat_version="2.64.0"),
])
def test_reshuffle_before_load(self, compat_version):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference

mock_job = mock.Mock()
mock_job.status.state = 'DONE'
mock_job.status.errorResult = None
mock_job.jobReference = job_reference

bq_client = mock.Mock()
bq_client.jobs.Get.return_value = mock_job

bq_client.jobs.Insert.return_value = result_job

transform = bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
test_client=bq_client,
validate=False,
temp_file_format=bigquery_tools.FileFormat.JSON)

options = PipelineOptions(update_compatibility_version=compat_version)
# Need to test this with the DirectRunner to avoid serializing mocks
with TestPipeline('DirectRunner', options=options) as p:
_ = p | beam.Create(_ELEMENTS) | transform

reshuffle_before_load = compat_version is None
assert transform.reshuffle_before_load == reshuffle_before_load

def test_load_job_id_used(self):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
Expand Down Expand Up @@ -774,11 +812,16 @@ def test_multiple_partition_files_write_dispositions(
self.assertEqual(mock_call_process.call_count, 1)

@parameterized.expand([
param(is_streaming=False, with_auto_sharding=False),
param(is_streaming=True, with_auto_sharding=False),
param(is_streaming=True, with_auto_sharding=True),
param(is_streaming=False, with_auto_sharding=False, compat_version=None),
param(is_streaming=True, with_auto_sharding=False, compat_version=None),
param(is_streaming=True, with_auto_sharding=True, compat_version=None),
param(
is_streaming=True, with_auto_sharding=False, compat_version="2.64.0"),
param(
is_streaming=True, with_auto_sharding=True, compat_version="2.64.0"),
])
def test_triggering_frequency(self, is_streaming, with_auto_sharding):
def test_triggering_frequency(
self, is_streaming, with_auto_sharding, compat_version):
destination = 'project1:dataset1.table1'

job_reference = bigquery_api.JobReference()
Expand Down Expand Up @@ -820,7 +863,9 @@ def __call__(self):
with_auto_sharding=with_auto_sharding)

# Need to test this with the DirectRunner to avoid serializing mocks
test_options = PipelineOptions(flags=['--allow_unsafe_triggers'])
test_options = PipelineOptions(
flags=['--allow_unsafe_triggers'],
update_compatibility_version=compat_version)
test_options.view_as(StandardOptions).streaming = is_streaming
with TestPipeline(runner='BundleBasedDirectRunner',
options=test_options) as p:
Expand Down
Loading