Skip to content

Commit 6f2f960

Browse files
authored
use batch delete for GCS IO (#34835)
* use batch delete for GCS IO * fixed the gcsio test * fix format * fixed format * fixed the lint
1 parent ff9a22a commit 6f2f960

File tree

2 files changed

+49
-6
lines changed

2 files changed

+49
-6
lines changed

sdks/python/apache_beam/io/gcp/gcsio.py

+17-4
Original file line numberDiff line numberDiff line change
@@ -265,10 +265,23 @@ def delete(self, path, recursive=False):
265265
bucket_name, blob_name = parse_gcs_path(path)
266266
bucket = self.client.bucket(bucket_name)
267267
if recursive:
268-
# List and delete all blobs under the prefix.
269-
blobs = bucket.list_blobs(prefix=blob_name)
270-
for blob in blobs:
271-
self._delete_blob(bucket, blob.name)
268+
# List all blobs under the prefix.
269+
blobs_to_delete = bucket.list_blobs(
270+
prefix=blob_name, retry=self._storage_client_retry)
271+
# Collect full paths for batch deletion.
272+
paths_to_delete = [
273+
f'gs://{bucket_name}/{blob.name}' for blob in blobs_to_delete
274+
]
275+
if paths_to_delete:
276+
# Delete them in batches.
277+
results = self.delete_batch(paths_to_delete)
278+
# Log any errors encountered during batch deletion.
279+
errors = [f'{path}: {err}' for path, err in results if err is not None]
280+
if errors:
281+
_LOGGER.warning(
282+
'Failed to delete some objects during recursive delete of %s: %s',
283+
path,
284+
', '.join(errors))
272285
else:
273286
# Delete only the specific blob.
274287
self._delete_blob(bucket, blob_name)

sdks/python/apache_beam/io/gcp/gcsio_test.py

+32-2
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,38 @@ def lookup_bucket(self, name):
7575
else:
7676
return self.create_bucket(name)
7777

78-
def batch(self):
79-
pass
78+
def batch(self, raise_exception=True):
79+
# Return a mock object configured to act as a context manager
80+
# and provide the necessary _responses attribute after __exit__.
81+
# test_delete performs 3 deletions.
82+
num_expected_responses = 3
83+
mock_batch = mock.Mock()
84+
85+
# Configure the mock responses (assuming success for test_delete)
86+
# These need to be available *after* the 'with' block finishes.
87+
# We'll store them temporarily and assign in __exit__.
88+
successful_responses = [
89+
mock.Mock(status_code=204) for _ in range(num_expected_responses)
90+
]
91+
92+
# Define the exit logic
93+
def mock_exit_logic(exc_type, exc_val, exc_tb):
94+
# Assign responses to the mock instance itself
95+
# so they are available after the 'with' block.
96+
mock_batch._responses = successful_responses
97+
98+
# Configure the mock to behave like a context manager
99+
mock_batch.configure_mock(
100+
__enter__=mock.Mock(return_value=mock_batch),
101+
__exit__=mock.Mock(side_effect=mock_exit_logic))
102+
103+
# The loop inside _batch_with_retry calls fn(request) for each item.
104+
# The real batch object might have methods like add() or similar,
105+
# but the core logic in gcsio.py calls the passed function `fn` directly
106+
# within the `with` block. So, no specific action methods seem needed
107+
# on the mock_batch itself for this test case.
108+
109+
return mock_batch
80110

81111
def add_file(self, bucket, blob, contents):
82112
folder = self.lookup_bucket(bucket)

0 commit comments

Comments
 (0)