Skip to content

Adding Throughput Bucket Header #40340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,20 @@ may have additional latencies associated with searching in the service.

You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well for additional guidance.

### Public Preview - Throughput Buckets
When multiple workloads share the same Azure Cosmos DB container, resource contention can lead to throttling, increased latency, and potential business impact.
To address this, Cosmos DB allows you to allocate throughput buckets, which help manage resource consumption for workloads sharing a Cosmos DB container by limiting the maximum throughput a bucket can consume.
However, throughput isn't reserved for any bucket, it remains shared across all workloads.

Up to five (5) throughput buckets can be configured per container, with an ID ranging from 1-5. Each bucket has a maximum throughput percentage, capping the fraction of the container’s total throughput that it can consume.
Requests assigned to a bucket can consume throughput only up to this limit. If the bucket exceeds its configured limit, subsequent requests are throttled.
This ensures that no single workload consumes excessive throughput and impacts others.

Throughput bucket configurations can be changed once every 10 minutes, otherwise the request is throttled with an HTTP 429 status code and substatus code 3213.
Also, requests with an invalid bucket ID (less than 1 or greater than 5) results in an error, as only bucket IDs 1 to 5 are valid.

You can find our sync samples [here][cosmos_throughput_bucket_sample] and our async samples [here][cosmos_throughput_bucket_sample_async] as well for additional guidance.

## Troubleshooting

### General
Expand Down Expand Up @@ -1060,6 +1074,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos
[BM25]: https://learn.microsoft.com/azure/search/index-similarity-and-scoring
[cosmos_fts]: https://aka.ms/cosmosfulltextsearch
[cosmos_index_policy_change]: https://learn.microsoft.com/azure/cosmos-db/index-policy#modifying-the-indexing-policy
[cosmos_throughput_bucket_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py
[cosmos_throughput_bucket_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py

## Contributing

Expand Down
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
'priority': 'priorityLevel',
'no_response': 'responsePayloadOnWriteDisabled',
'max_item_count': 'maxItemCount',
'throughput_bucket': 'throughputBucket'
}

# Cosmos resource ID validation regex breakdown:
Expand Down Expand Up @@ -317,6 +318,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if options.get("correlatedActivityId"):
headers[http_constants.HttpHeaders.CorrelatedActivityId] = options["correlatedActivityId"]

if options.get("throughputBucket"):
headers[http_constants.HttpHeaders.ThroughputBucket] = options["throughputBucket"]

if resource_type == "docs" and verb != "get":
if "responsePayloadOnWriteDisabled" in options:
responsePayloadOnWriteDisabled = options["responsePayloadOnWriteDisabled"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def __init__( # pylint: disable=too-many-statements
http_constants.HttpHeaders.IsContinuationExpected: False,
}

throughput_bucket = kwargs.pop('throughput_bucket', None)
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

# Keeps the latest response headers from the server.
self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()

Expand Down
44 changes: 44 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Large diffs are not rendered by default.

22 changes: 21 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword
level (to log all requests) or at a single request level. Requests will be logged at INFO level.
:keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending
response payloads for write operations on items by default unless specified differently per operation.
:keyword int throughput_bucket: The desired throughput bucket for the client

.. admonition:: Example:

Expand Down Expand Up @@ -251,6 +252,7 @@ async def create_database(
*,
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
initial_headers: Optional[Dict[str, str]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> DatabaseProxy:
"""
Expand All @@ -261,6 +263,7 @@ async def create_database(
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists.
:returns: A DatabaseProxy instance representing the new database.
Expand Down Expand Up @@ -296,6 +299,8 @@ async def create_database(
DeprecationWarning)
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
request_options = _build_options(kwargs)
_set_throughput_options(offer=offer_throughput, request_options=request_options)

Expand All @@ -309,6 +314,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
*,
offer_throughput: Optional[Union[int, ThroughputProperties]] = None,
initial_headers: Optional[Dict[str, str]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> DatabaseProxy:
"""
Expand All @@ -325,6 +331,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
:paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties]
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed.
:returns: A DatabaseProxy instance representing the database.
Expand All @@ -348,6 +355,8 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin
"The 'match_condition' flag does not apply to this method and is always ignored even if passed."
" It will now be removed in the future.",
DeprecationWarning)
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
try:
Expand Down Expand Up @@ -385,6 +394,7 @@ def list_databases(
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""List the databases in a Cosmos DB SQL database account.
Expand All @@ -393,6 +403,7 @@ def list_databases(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
:returns: An AsyncItemPaged of database properties (dicts).
:rtype: AsyncItemPaged[Dict[str, str]]
Expand All @@ -405,6 +416,8 @@ def list_databases(
DeprecationWarning)
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
if throughput_bucket is not None:
kwargs["throughput_bucket"] = throughput_bucket
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
Expand All @@ -423,6 +436,7 @@ def query_databases(
max_item_count: Optional[int] = None,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> AsyncItemPaged[Dict[str, Any]]:
"""Query the databases in a Cosmos DB SQL database account.
Expand All @@ -435,6 +449,7 @@ def query_databases(
:keyword str session_token: Token for use with Session consistency.
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
:returns: An AsyncItemPaged of database properties (dicts).
:rtype: AsyncItemPaged[Dict[str, str]]
Expand All @@ -447,6 +462,8 @@ def query_databases(
DeprecationWarning)
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
feed_options = _build_options(kwargs)
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
Expand All @@ -466,6 +483,7 @@ async def delete_database(
*,
initial_headers: Optional[Dict[str, str]] = None,
response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None,
throughput_bucket: Optional[int] = None,
**kwargs: Any
) -> None:
"""Delete the database with the given ID (name).
Expand All @@ -475,6 +493,7 @@ async def delete_database(
:type database: Union[str, ~azure.cosmos.DatabaseProxy, Dict[str, Any]]
:keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request.
:keyword response_hook: A callable invoked with the response metadata.
:keyword int throughput_bucket: The desired throughput bucket for the client
:paramtype response_hook: Callable[[Mapping[str, Any]], None]
:raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted.
:rtype: None
Expand All @@ -497,7 +516,8 @@ async def delete_database(
"The 'match_condition' flag does not apply to this method and is always ignored even if passed."
" It will now be removed in the future.",
DeprecationWarning)

if throughput_bucket is not None:
kwargs['throughput_bucket'] = throughput_bucket
if initial_headers is not None:
kwargs["initial_headers"] = initial_headers
request_options = _build_options(kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ def __init__( # pylint: disable=too-many-statements
http_constants.HttpHeaders.IsContinuationExpected: False,
}

throughput_bucket = kwargs.pop('throughput_bucket', None)
if throughput_bucket:
self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket

if consistency_level is not None:
self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level

Expand Down
Loading
Loading