Skip to content

Adding Throughput Bucket Header #40340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
'priority': 'priorityLevel',
'no_response': 'responsePayloadOnWriteDisabled',
'max_item_count': 'maxItemCount',
'throughput_bucket': 'throughputBucket'
}

# Cosmos resource ID validation regex breakdown:
Expand Down Expand Up @@ -317,6 +318,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("correlatedActivityId"):
headers[http_constants.HttpHeaders.CorrelatedActivityId] = options["correlatedActivityId"]

if options.get("throughputBucket"):
headers[http_constants.HttpHeaders.ThroughputBucket] = options["throughputBucket"]

if resource_type == "docs" and verb != "get":
if "responsePayloadOnWriteDisabled" in options:
responsePayloadOnWriteDisabled = options["responsePayloadOnWriteDisabled"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def __init__( # pylint: disable=too-many-statements
http_constants.HttpHeaders.IsContinuationExpected: False,
}

throughput_bucket = kwargs.pop('throughput_bucket', None)
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

# Keeps the latest response headers from the server.
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()

Expand Down
44 changes: 44 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Large diffs are not rendered by default.

22 changes: 21 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword
level (to log all requests) or at a single request level. Requests will be logged at INFO level.
:keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending
response payloads for write operations on items by default unless specified differently per operation.
:keyword int throughput_bucket: The desired throughput bucket for the client

.. admonition:: Example:

Expand Down Expand Up @@ -251,6 +252,7 @@ async def create_database(
*,
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
initial_headers: Optional[Dict[str, str]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> DatabaseProxy:
"""
Expand All @@ -261,6 +263,7 @@ async def create_database(
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists.
:returns: A DatabaseProxy instance representing the new database.
Expand Down Expand Up @@ -296,6 +299,8 @@ async def create_database(
DeprecationWarning)
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
request_options = _build_options(kwargs)
_set_throughput_options(offer=offer_throughput, request_options=request_options)

Expand All @@ -309,6 +314,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
*,
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
initial_headers: Optional[Dict[str, str]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> DatabaseProxy:
"""
Expand All @@ -325,6 +331,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed.
:returns: A DatabaseProxy instance representing the database.
Expand All @@ -348,6 +355,8 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
"The 'match_condition' flag does not apply to this method and is always ignored even if passed."
" It will now be removed in the future.",
DeprecationWarning)
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
try:
Expand Down Expand Up @@ -385,6 +394,7 @@ def list_databases(
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List the databases in a Cosmos DB SQL database account.
Expand All @@ -393,6 +403,7 @@ def list_databases(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
:returns: An AsyncItemPaged of database properties (dicts).
:rtype: AsyncItemPaged[Dict[str, str]]
Expand All @@ -405,6 +416,8 @@ def list_databases(
DeprecationWarning)
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
Expand All @@ -423,6 +436,7 @@ def query_databases(
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Query the databases in a Cosmos DB SQL database account.
Expand All @@ -435,6 +449,7 @@ def query_databases(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
:returns: An AsyncItemPaged of database properties (dicts).
:rtype: AsyncItemPaged[Dict[str, str]]
Expand All @@ -447,6 +462,8 @@ def query_databases(
DeprecationWarning)
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
Expand All @@ -466,6 +483,7 @@ async def delete_database(
*,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> None:
"""Delete the database with the given ID (name).
Expand All @@ -475,6 +493,7 @@ async def delete_database(
:type database: Union[str, ~azure.cosmos.DatabaseProxy, Dict[str, Any]]
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted.
:rtype: None
Expand All @@ -497,7 +516,8 @@ async def delete_database(
"The 'match_condition' flag does not apply to this method and is always ignored even if passed."
" It will now be removed in the future.",
DeprecationWarning)

if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
request_options = _build_options(kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def __init__( # pylint: disable=too-many-statements
http_constants.HttpHeaders.IsContinuationExpected: False,
}

throughput_bucket = kwargs.pop('throughput_bucket', None)
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

Expand Down
28 changes: 27 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,15 @@ async def read(
self,
*,
initial_headers: Optional[Dict[str, str]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> Dict[str, Any]:
"""Read the database properties.

:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None]
:keyword int throughput_bucket: The desired throughput bucket for the client
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved.
:returns: A dict representing the database properties
:rtype: Dict[str, Any]
Expand All @@ -152,6 +154,8 @@ async def read(
database_link = _get_database_link(self)
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
request_options = _build_options(kwargs)

self._properties = await self.client_connection.ReadDatabase(
Expand All @@ -177,6 +181,7 @@ async def create_container(
vector_embedding_policy: Optional[Dict[str, Any]] = None,
change_feed_policy: Optional[Dict[str, Any]] = None,
full_text_policy: Optional[Dict[str, Any]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> ContainerProxy:
"""Create a new container with the given ID (name).
Expand Down Expand Up @@ -210,6 +215,7 @@ async def create_container(
:keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container.
Used to denote the default language to be used for all full text indexes, or to individually
assign a language to each full text index path.
:keyword int throughput_bucket: The desired throughput bucket for the client
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed.
:returns: A `ContainerProxy` instance representing the new container.
:rtype: ~azure.cosmos.aio.ContainerProxy
Expand Down Expand Up @@ -277,9 +283,10 @@ async def create_container(
definition["changeFeedPolicy"] = change_feed_policy
if full_text_policy is not None:
definition["fullTextPolicy"] = full_text_policy

if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
request_options = _build_options(kwargs)
_set_throughput_options(offer=offer_throughput, request_options=request_options)

Expand All @@ -305,6 +312,7 @@ async def create_container_if_not_exists(
vector_embedding_policy: Optional[Dict[str, Any]] = None,
change_feed_policy: Optional[Dict[str, Any]] = None,
full_text_policy: Optional[Dict[str, Any]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> ContainerProxy:
"""Create a container if it does not exist already.
Expand Down Expand Up @@ -340,6 +348,7 @@ async def create_container_if_not_exists(
:keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container.
Used to denote the default language to be used for all full text indexes, or to individually
assign a language to each full text index path.
:keyword int throughput_bucket: The desired throughput bucket for the client
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed.
:returns: A `ContainerProxy` instance representing the new container.
:rtype: ~azure.cosmos.aio.ContainerProxy
Expand Down Expand Up @@ -384,6 +393,7 @@ async def create_container_if_not_exists(
vector_embedding_policy=vector_embedding_policy,
change_feed_policy=change_feed_policy,
full_text_policy=full_text_policy,
throughput_bucket=throughput_bucket,
**kwargs
)

Expand Down Expand Up @@ -421,13 +431,15 @@ def list_containers(
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs
) -> AsyncItemPaged[Dict[str, Any]]:
"""List the containers in the database.

:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of container properties (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
Expand All @@ -450,6 +462,8 @@ def list_containers(
DeprecationWarning)
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
Expand All @@ -470,6 +484,7 @@ def query_containers(
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List the properties for containers in the current database.
Expand All @@ -481,6 +496,7 @@ def query_containers(
:keyword int max_item_count: Max number of items to be returned in the enumeration operation.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]
:returns: An AsyncItemPaged of container properties (dicts).
:rtype: AsyncItemPaged[Dict[str, Any]]
Expand All @@ -493,6 +509,8 @@ def query_containers(
DeprecationWarning)
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
Expand Down Expand Up @@ -520,6 +538,7 @@ async def replace_container(
analytical_storage_ttl: Optional[int] = None,
computed_properties: Optional[List[Dict[str, str]]] = None,
full_text_policy: Optional[Dict[str, Any]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> ContainerProxy:
"""Reset the properties of the container.
Expand Down Expand Up @@ -548,6 +567,7 @@ async def replace_container(
:keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container.
Used to denote the default language to be used for all full text indexes, or to individually
assign a language to each full text index path.
:keyword int throughput_bucket: The desired throughput bucket for the client
:returns: A `ContainerProxy` instance representing the container after replace completed.
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced.
This includes if the container with given id does not exist.
Expand Down Expand Up @@ -583,6 +603,8 @@ async def replace_container(
DeprecationWarning)
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
request_options = _build_options(kwargs)

container_id = self._get_container_id(container)
Expand Down Expand Up @@ -615,6 +637,7 @@ async def delete_container(
container: Union[str, ContainerProxy, Mapping[str, Any]],
*,
initial_headers: Optional[Dict[str, str]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> None:
"""Delete a container.
Expand All @@ -626,6 +649,7 @@ async def delete_container(
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:paramtype response_hook: Callable[[Mapping[str, str], None], None]
:keyword int throughput_bucket: The desired throughput bucket for the client
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted.
:rtype: None
"""
Expand All @@ -649,6 +673,8 @@ async def delete_container(
DeprecationWarning)
if initial_headers is not None:
kwargs['initial_headers'] = initial_headers
if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
request_options = _build_options(kwargs)

collection_link = self._get_container_link(container)
Expand Down
Loading
Loading