Skip to content

Commit aed8682

Browse files
author
zhixiangli
committed
test: add system tests for concurrent AsyncMultiRangeDownloader
1 parent cb4e640 commit aed8682

File tree

1 file changed

+193
-1
lines changed

1 file changed

+193
-1
lines changed

packages/google-cloud-storage/tests/system/test_zonal.py

Lines changed: 193 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@
22
import asyncio
33
import gc
44
import os
5+
import random
56
import uuid
67
from io import BytesIO
78

89
# python additional imports
910
import google_crc32c
1011
import pytest
11-
from google.api_core.exceptions import FailedPrecondition, NotFound
12+
from google.api_core.exceptions import FailedPrecondition, NotFound, OutOfRange
1213

1314
from google.cloud.storage.asyncio.async_appendable_object_writer import (
1415
_DEFAULT_FLUSH_INTERVAL_BYTES,
@@ -594,3 +595,194 @@ async def _run():
594595
gc.collect()
595596

596597
event_loop.run_until_complete(_run())
598+
599+
600+
def test_mrd_concurrent_download(
601+
storage_client, blobs_to_delete, event_loop, grpc_client
602+
):
603+
"""
604+
Test that mrd can handle concurrent `download_ranges` calls correctly.
605+
Tests overlapping ranges, high concurrency (len > 100 multiplexing batch limits),
606+
mixed random chunk sizes (small/medium/large), and full object fetching alongside specific chunks.
607+
"""
608+
object_size = 15 * 1024 * 1024 # 15MB
609+
object_name = f"test_mrd_concurrent-{uuid.uuid4()}"
610+
611+
async def _run():
612+
object_data = os.urandom(object_size)
613+
614+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
615+
await writer.open()
616+
await writer.append(object_data)
617+
await writer.close(finalize_on_close=True)
618+
619+
async with AsyncMultiRangeDownloader(
620+
grpc_client, _ZONAL_BUCKET, object_name
621+
) as mrd:
622+
tasks = []
623+
ranges_to_fetch = []
624+
625+
# Overlapping ranges & Mixed random chunk sizes
626+
# Small chunks
627+
for _ in range(60):
628+
start = random.randint(0, object_size - 100)
629+
length = random.randint(1, 100)
630+
ranges_to_fetch.append((start, length))
631+
# Medium chunks
632+
for _ in range(60):
633+
start = random.randint(0, object_size - 100000)
634+
length = random.randint(100, 100000)
635+
ranges_to_fetch.append((start, length))
636+
# Large chunks
637+
for _ in range(5):
638+
start = random.randint(0, object_size - 2000000)
639+
length = random.randint(1000000, 2000000)
640+
ranges_to_fetch.append((start, length))
641+
642+
# Full object fetching concurrently
643+
ranges_to_fetch.append((0, 0))
644+
645+
# High concurrency batching (Total > 100 ranges)
646+
assert len(ranges_to_fetch) > 100
647+
random.shuffle(ranges_to_fetch)
648+
649+
buffers = [BytesIO() for _ in range(len(ranges_to_fetch))]
650+
651+
for idx, (start, length) in enumerate(ranges_to_fetch):
652+
tasks.append(
653+
asyncio.create_task(
654+
mrd.download_ranges([(start, length, buffers[idx])])
655+
)
656+
)
657+
658+
await asyncio.gather(*tasks)
659+
660+
# Validation
661+
for idx, (start, length) in enumerate(ranges_to_fetch):
662+
if length == 0:
663+
expected_data = object_data[start:]
664+
else:
665+
expected_data = object_data[start : start + length]
666+
assert buffers[idx].getvalue() == expected_data
667+
668+
del writer
669+
gc.collect()
670+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
671+
672+
event_loop.run_until_complete(_run())
673+
674+
675+
def test_mrd_concurrent_download_cancellation(
676+
storage_client, blobs_to_delete, event_loop, grpc_client
677+
):
678+
"""
679+
Test task cancellation / abort mid-stream.
680+
Tests that downloading gracefully manages memory and internal references
681+
when tasks are canceled during active multiplexing, without breaking remaining downloads.
682+
"""
683+
object_size = 5 * 1024 * 1024 # 5MB
684+
object_name = f"test_mrd_cancel-{uuid.uuid4()}"
685+
686+
async def _run():
687+
object_data = os.urandom(object_size)
688+
689+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
690+
await writer.open()
691+
await writer.append(object_data)
692+
await writer.close(finalize_on_close=True)
693+
694+
async with AsyncMultiRangeDownloader(
695+
grpc_client, _ZONAL_BUCKET, object_name
696+
) as mrd:
697+
tasks = []
698+
num_chunks = 100
699+
chunk_size = object_size // num_chunks
700+
buffers = [BytesIO() for _ in range(num_chunks)]
701+
702+
for i in range(num_chunks):
703+
start = i * chunk_size
704+
tasks.append(
705+
asyncio.create_task(
706+
mrd.download_ranges([(start, chunk_size, buffers[i])])
707+
)
708+
)
709+
710+
# Let the loop start sending Bidi requests
711+
await asyncio.sleep(0.01)
712+
713+
# Cancel a subset of evenly distributed tasks
714+
for i in range(0, num_chunks, 2):
715+
tasks[i].cancel()
716+
717+
results = await asyncio.gather(*tasks, return_exceptions=True)
718+
719+
for i in range(num_chunks):
720+
if i % 2 == 0:
721+
assert isinstance(results[i], asyncio.CancelledError)
722+
else:
723+
start = i * chunk_size
724+
expected_data = object_data[start : start + chunk_size]
725+
assert buffers[i].getvalue() == expected_data
726+
727+
del writer
728+
gc.collect()
729+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
730+
731+
event_loop.run_until_complete(_run())
732+
733+
734+
def test_mrd_concurrent_download_out_of_bounds(
735+
storage_client, blobs_to_delete, event_loop, grpc_client
736+
):
737+
"""
738+
Test out-of-bounds & edge ranges concurrent with valid requests.
739+
Verifies isolation: invalid bounds generate correct exceptions and don't stall the stream
740+
for concurrently valid requests.
741+
"""
742+
object_size = 2 * 1024 * 1024 # 2MB
743+
object_name = f"test_mrd_oob-{uuid.uuid4()}"
744+
745+
async def _run():
746+
object_data = os.urandom(object_size)
747+
748+
writer = AsyncAppendableObjectWriter(grpc_client, _ZONAL_BUCKET, object_name)
749+
await writer.open()
750+
await writer.append(object_data)
751+
await writer.close(finalize_on_close=True)
752+
753+
async with AsyncMultiRangeDownloader(
754+
grpc_client, _ZONAL_BUCKET, object_name
755+
) as mrd:
756+
b_valid = BytesIO()
757+
t_valid = asyncio.create_task(mrd.download_ranges([(0, 100, b_valid)]))
758+
759+
b_oob1 = BytesIO()
760+
t_oob1 = asyncio.create_task(
761+
mrd.download_ranges([(object_size + 1000, 100, b_oob1)])
762+
)
763+
764+
# EOF ask for 100 bytes
765+
b_oob2 = BytesIO()
766+
t_oob2 = asyncio.create_task(
767+
mrd.download_ranges([(object_size, 100, b_oob2)])
768+
)
769+
770+
results = await asyncio.gather(
771+
t_valid, t_oob1, t_oob2, return_exceptions=True
772+
)
773+
774+
# Verify valid one processed correctly
775+
assert b_valid.getvalue() == object_data[:100]
776+
777+
# Verify fully OOB request returned Exception
778+
assert isinstance(results[1], OutOfRange)
779+
780+
# Verify request exactly at EOF successfully completed with 0 bytes
781+
assert not isinstance(results[2], Exception)
782+
assert b_oob2.getvalue() == b""
783+
784+
del writer
785+
gc.collect()
786+
blobs_to_delete.append(storage_client.bucket(_ZONAL_BUCKET).blob(object_name))
787+
788+
event_loop.run_until_complete(_run())

0 commit comments

Comments
 (0)