Skip to content

[Storage] Download/Upload Blob APIs Fixes for Transport Compatibility #40490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ async def retry_hook(settings, **kwargs):

async def is_checksum_retry(response):
# retry if invalid content md5
if response.context.get('validate_content', False) and response.http_response.headers.get('content-md5'):
if hasattr(response.http_response, "load_body"):
try:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adapted sync code to async upload is_checksum_retry

await response.http_response.load_body() # Load the body in memory and close the socket
await response.http_response.load_body()
except (StreamClosedError, StreamConsumedError):
pass
if response.context.get('validate_content', False) and response.http_response.headers.get('content-md5'):
computed_md5 = response.http_request.headers.get('content-md5', None) or \
encode_base64(StorageContentValidation.get_content_md5(response.http_response.body()))
if response.http_response.headers['content-md5'] != computed_md5:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@
async def process_content(data: Any, start_offset: int, end_offset: int, encryption: Dict[str, Any]) -> bytes:
if data is None:
raise ValueError("Response cannot be None.")
await data.response.load_body()
content = cast(bytes, data.response.body())
if hasattr(data.response, "load_body"):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is try-except necessary here? File share already has try-except built-in the process_content.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like transports support either read or load_body, so this should cover all the cases regardless.

await data.response.load_body()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

decompression could get tricky here

content = cast(bytes, data.response.body())
else:
content = b"".join([d async for d in data])
if encryption.get('key') is not None or encryption.get('resolver') is not None:
try:
return decrypt_blob(
Expand Down
54 changes: 0 additions & 54 deletions sdk/storage/azure-storage-blob/tests/test_common_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
from devtools_testutils import FakeTokenCredential, recorded_by_proxy
from devtools_testutils.storage import StorageRecordedTestCase
from settings.testcase import BlobPreparer
from test_helpers import MockStorageTransport

# ------------------------------------------------------------------------------
TEST_CONTAINER_PREFIX = 'container'
Expand Down Expand Up @@ -3532,57 +3531,4 @@ def test_upload_blob_partial_stream_chunked(self, **kwargs):
result = blob.download_blob().readall()
assert result == data[:length]

@BlobPreparer()
def test_mock_transport_no_content_validation(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

transport = MockStorageTransport()
blob_client = BlobClient(
self.account_url(storage_account_name, "blob"),
container_name='test_cont',
blob_name='test_blob',
credential=storage_account_key,
transport=transport,
retry_total=0
)

content = blob_client.download_blob()
assert content is not None

props = blob_client.get_blob_properties()
assert props is not None

data = b"Hello World!"
resp = blob_client.upload_blob(data, overwrite=True)
assert resp is not None

blob_data = blob_client.download_blob().read()
assert blob_data == b"Hello World!" # data is fixed by mock transport

resp = blob_client.delete_blob()
assert resp is None

@BlobPreparer()
def test_mock_transport_with_content_validation(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

transport = MockStorageTransport()
blob_client = BlobClient(
self.account_url(storage_account_name, "blob"),
container_name='test_cont',
blob_name='test_blob',
credential=storage_account_key,
transport=transport,
retry_total=0
)

data = b"Hello World!"
resp = blob_client.upload_blob(data, overwrite=True, validate_content=True)
assert resp is not None

blob_data = blob_client.download_blob(validate_content=True).read()
assert blob_data == b"Hello World!" # data is fixed by mock transport

# ------------------------------------------------------------------------------
56 changes: 1 addition & 55 deletions sdk/storage/azure-storage-blob/tests/test_common_blob_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
from devtools_testutils.aio import recorded_by_proxy_async
from devtools_testutils.storage.aio import AsyncStorageRecordedTestCase
from settings.testcase import BlobPreparer
from test_helpers_async import AsyncStream, MockStorageTransport
from test_helpers_async import AsyncStream

# ------------------------------------------------------------------------------
TEST_CONTAINER_PREFIX = 'container'
Expand Down Expand Up @@ -3456,58 +3456,4 @@ async def test_upload_blob_partial_stream_chunked(self, **kwargs):
result = await (await blob.download_blob()).readall()
assert result == data[:length]

@BlobPreparer()
async def test_mock_transport_no_content_validation(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

transport = MockStorageTransport()
blob_client = BlobClient(
self.account_url(storage_account_name, "blob"),
container_name='test_cont',
blob_name='test_blob',
credential=storage_account_key,
transport=transport,
retry_total=0
)

content = await blob_client.download_blob()
assert content is not None

props = await blob_client.get_blob_properties()
assert props is not None

data = b"Hello Async World!"
stream = AsyncStream(data)
resp = await blob_client.upload_blob(stream, overwrite=True)
assert resp is not None

blob_data = await (await blob_client.download_blob()).read()
assert blob_data == b"Hello Async World!" # data is fixed by mock transport

resp = await blob_client.delete_blob()
assert resp is None

@BlobPreparer()
async def test_mock_transport_with_content_validation(self, **kwargs):
storage_account_name = kwargs.pop("storage_account_name")
storage_account_key = kwargs.pop("storage_account_key")

transport = MockStorageTransport()
blob_client = BlobClient(
self.account_url(storage_account_name, "blob"),
container_name='test_cont',
blob_name='test_blob',
credential=storage_account_key,
transport=transport,
retry_total=0
)

data = b"Hello Async World!"
stream = AsyncStream(data)
resp = await blob_client.upload_blob(stream, overwrite=True, validate_content=True)
assert resp is not None

blob_data = await (await blob_client.download_blob(validate_content=True)).read()
assert blob_data == b"Hello Async World!" # data is fixed by mock transport
# ------------------------------------------------------------------------------
105 changes: 96 additions & 9 deletions sdk/storage/azure-storage-blob/tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from typing import Any, Dict, Optional
from typing_extensions import Self

from azure.core.pipeline.transport import HttpTransport, RequestsTransportResponse
from azure.core.pipeline.transport import (
HttpTransport,
RequestsTransport,
RequestsTransportResponse
)
from azure.core.rest import HttpRequest
from requests import Response
from urllib3 import HTTPResponse
Expand Down Expand Up @@ -49,15 +53,15 @@ def tell(self):
return self.wrapped_stream.tell()


class MockHttpClientResponse(Response):
class MockClientResponse(Response):
def __init__(
self, url: str,
body_bytes: bytes,
headers: Dict[str, Any],
status: int = 200,
reason: str = "OK"
) -> None:
super(MockHttpClientResponse).__init__()
super(MockClientResponse).__init__()
self._url = url
self._body = body_bytes
self._content = body_bytes
Expand All @@ -70,7 +74,7 @@ def __init__(
self.raw = HTTPResponse()


class MockStorageTransport(HttpTransport):
class MockLegacyTransport(HttpTransport):
"""
This transport returns legacy http response objects from azure core and is
intended only to test our backwards compatibility support.
Expand All @@ -89,7 +93,7 @@ def send(self, request: HttpRequest, **kwargs: Any) -> RequestsTransportResponse

rest_response = RequestsTransportResponse(
request=request,
requests_response=MockHttpClientResponse(
requests_response=MockClientResponse(
request.url,
b"Hello World!",
headers,
Expand All @@ -99,7 +103,7 @@ def send(self, request: HttpRequest, **kwargs: Any) -> RequestsTransportResponse
# get_blob_properties
rest_response = RequestsTransportResponse(
request=request,
requests_response=MockHttpClientResponse(
requests_response=MockClientResponse(
request.url,
b"",
{
Expand All @@ -112,7 +116,7 @@ def send(self, request: HttpRequest, **kwargs: Any) -> RequestsTransportResponse
# upload_blob
rest_response = RequestsTransportResponse(
request=request,
requests_response=MockHttpClientResponse(
requests_response=MockClientResponse(
request.url,
b"",
{
Expand All @@ -126,7 +130,7 @@ def send(self, request: HttpRequest, **kwargs: Any) -> RequestsTransportResponse
# delete_blob
rest_response = RequestsTransportResponse(
request=request,
requests_response=MockHttpClientResponse(
requests_response=MockClientResponse(
request.url,
b"",
{
Expand All @@ -137,7 +141,90 @@ def send(self, request: HttpRequest, **kwargs: Any) -> RequestsTransportResponse
)
)
else:
raise ValueError("The request is not accepted as part of MockStorageTransport.")
raise ValueError("The request is not accepted as part of MockLegacyTransport.")
return rest_response

def __enter__(self) -> Self:
return self

def __exit__(self, *args: Any) -> None:
pass

def open(self) -> None:
pass

def close(self) -> None:
pass


class MockCoreTransport(RequestsTransport):
"""
This transport returns http response objects from azure core pipelines and is
intended only to test our backwards compatibility support.
"""
def send(self, request: HttpRequest, **kwargs: Any) -> RequestsTransportResponse:
if request.method == 'GET':
# download_blob
headers = {
"Content-Type": "application/octet-stream",
"Content-Range": "bytes 0-17/18",
"Content-Length": "18",
}

if "x-ms-range-get-content-md5" in request.headers:
headers["Content-MD5"] = "7Qdih1MuhjZehB6Sv8UNjA==" # cspell:disable-line

rest_response = RequestsTransportResponse(
request=request,
requests_response=MockClientResponse(
request.url,
b"Hello World!",
headers,
)
)
elif request.method == 'HEAD':
# get_blob_properties
rest_response = RequestsTransportResponse(
request=request,
requests_response=MockClientResponse(
request.url,
b"",
{
"Content-Type": "application/octet-stream",
"Content-Length": "1024",
},
)
)
elif request.method == 'PUT':
# upload_blob
rest_response = RequestsTransportResponse(
request=request,
requests_response=MockClientResponse(
request.url,
b"",
{
"Content-Length": "0",
},
201,
"Created"
)
)
elif request.method == 'DELETE':
# delete_blob
rest_response = RequestsTransportResponse(
request=request,
requests_response=MockClientResponse(
request.url,
b"",
{
"Content-Length": "0",
},
202,
"Accepted"
)
)
else:
raise ValueError("The request is not accepted as part of MockCoreTransport.")
return rest_response

def __enter__(self) -> Self:
Expand Down
Loading