From 5fba97d01506550c2d18044ce31d4cde781cd35e Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Wed, 2 Apr 2025 12:37:07 -0400 Subject: [PATCH 01/15] inital commit for throughput bucket --- sdk/cosmos/azure-cosmos/azure/cosmos/_base.py | 3 +++ sdk/cosmos/azure-cosmos/azure/cosmos/container.py | 4 ++++ .../azure-cosmos/azure/cosmos/cosmos_client.py | 9 +++++++++ sdk/cosmos/azure-cosmos/azure/cosmos/database.py | 13 +++++++++++++ .../azure-cosmos/azure/cosmos/http_constants.py | 1 + 5 files changed, 30 insertions(+) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index 654b23c5d71f..ee13418c142b 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -316,6 +316,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches if options.get("correlatedActivityId"): headers[http_constants.HttpHeaders.CorrelatedActivityId] = options["correlatedActivityId"] + if options.get("throughputBucket"): + headers[http_constants.HttpHeaders.ThroughputBucket] = options["throughputBucket"] + if resource_type == "docs" and verb != "get": if "responsePayloadOnWriteDisabled" in options: responsePayloadOnWriteDisabled = options["responsePayloadOnWriteDisabled"] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 69be9491f27b..e28641005d50 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -849,6 +849,7 @@ def create_item( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Create an item in the container. @@ -876,6 +877,7 @@ def create_item( # pylint:disable=docstring-missing-param :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Item with the given ID already exists. :returns: A CosmosDict representing the new item. The dict will be empty if `no_response` is specified. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -896,6 +898,8 @@ def create_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) request_options["disableAutomaticIdGeneration"] = not enable_automatic_id_generation if populate_query_metrics: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index cc4bd43d13a2..0f0dcd738365 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -188,6 +188,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword level (to log all requests) or at a single request level. Requests will be logged at INFO level. :keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending response payloads on rite operations for items. + :keyword int throughput_bucket: The desired throughput bucket for the client .. admonition:: Example: @@ -265,6 +266,7 @@ def create_database( # pylint:disable=docstring-missing-param etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -279,6 +281,7 @@ def create_database( # pylint:disable=docstring-missing-param has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, str]], None] :returns: A DatabaseProxy instance representing the new database. :rtype: ~azure.cosmos.DatabaseProxy @@ -308,6 +311,8 @@ def create_database( # pylint:disable=docstring-missing-param UserWarning, ) request_options["populateQueryMetrics"] = populate_query_metrics + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket _set_throughput_options(offer=offer_throughput, request_options=request_options) result = self.client_connection.CreateDatabase(database={"id": id}, options=request_options, **kwargs) @@ -326,6 +331,7 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param initial_headers: Optional[Dict[str, str]] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -346,6 +352,7 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A DatabaseProxy instance representing the database. :rtype: ~azure.cosmos.DatabaseProxy :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed. @@ -358,6 +365,8 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param kwargs["etag"] = etag if match_condition is not None: kwargs["match_condition"] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket try: database_proxy = self.get_database_client(id) database_proxy.read( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index af11c807b02b..6191d4042641 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -25,6 +25,8 @@ from typing import Any, Dict, List, Union, Optional, Mapping, Callable import warnings + +from adodbapi.examples.db_print import kw_args from azure.core import MatchConditions from azure.core.tracing.decorator import distributed_trace from azure.core.paging import ItemPaged @@ -178,6 +180,7 @@ def create_container( # pylint:disable=docstring-missing-param vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a new container with the given ID (name). @@ -212,6 +215,7 @@ def create_container( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the new container. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :rtype: ~azure.cosmos.ContainerProxy @@ -267,6 +271,8 @@ def create_container( # pylint:disable=docstring-missing-param kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics is not None: warnings.warn( @@ -302,6 +308,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a container if it does not exist already. @@ -338,6 +345,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the container. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container read or creation failed. :rtype: ~azure.cosmos.ContainerProxy @@ -370,6 +378,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param vector_embedding_policy=vector_embedding_policy, change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, + throughput_bucket=throughput_bucket, **kwargs ) @@ -563,6 +572,7 @@ def replace_container( # pylint:disable=docstring-missing-param analytical_storage_ttl: Optional[int] = None, computed_properties: Optional[List[Dict[str, str]]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Reset the properties of the container. @@ -593,6 +603,7 @@ def replace_container( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the container after replace completed. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced. This includes if the container with given id does not exist. @@ -615,6 +626,8 @@ def replace_container( # pylint:disable=docstring-missing-param kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics is not None: warnings.warn( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py index 31a95d2600d6..6bf8b5de5ecb 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py @@ -189,6 +189,7 @@ class HttpHeaders: DisableRUPerMinuteUsage = "x-ms-documentdb-disable-ru-per-minute-usage" IsRUPerMinuteUsed = "x-ms-documentdb-is-ru-per-minute-used" OfferIsRUPerMinuteThroughputEnabled = "x-ms-offer-is-ru-per-minute-throughput-enabled" + ThroughputBucket = "x-ms-cosmos-throughput-bucket" #CHECK # Partitioned collection headers PartitionKey = "x-ms-documentdb-partitionkey" From 198e8cef9298d06bb6f73a568233e2e5ff233dc3 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 3 Apr 2025 10:37:49 -0400 Subject: [PATCH 02/15] added test for throughput bucket --- .../azure-cosmos/azure/cosmos/container.py | 4 +++ .../azure-cosmos/azure/cosmos/database.py | 1 - sdk/cosmos/azure-cosmos/tests/test_query.py | 33 +++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index e28641005d50..9570ae51d7ce 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -563,6 +563,7 @@ def query_items( # pylint:disable=docstring-missing-param response_hook: Optional[Callable[[Mapping[str, Any], Union[Dict[str, Any], ItemPaged[Dict[str, Any]]]], None]] = None, continuation_token_limit: Optional[int] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Return all results matching the given `query`. @@ -604,6 +605,7 @@ def query_items( # pylint:disable=docstring-missing-param :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used existing indexes and how it could use potential new indexes. Please note that this options will incur overhead, so it should be enabled only when debugging slow queries. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of items (dicts). :rtype: ItemPaged[Dict[str, Any]] @@ -656,6 +658,8 @@ def query_items( # pylint:disable=docstring-missing-param feed_options["correlatedActivityId"] = correlated_activity_id if continuation_token_limit is not None: feed_options["responseContinuationTokenLimitInKb"] = continuation_token_limit + if throughput_bucket is not None: + feed_options["throughputBucket"] = throughput_bucket if response_hook and hasattr(response_hook, "clear"): response_hook.clear() if self.container_link in self.__get_client_container_caches(): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index 6191d4042641..03fb1ff8961d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -26,7 +26,6 @@ import warnings -from adodbapi.examples.db_print import kw_args from azure.core import MatchConditions from azure.core.tracing.decorator import distributed_trace from azure.core.paging import ItemPaged diff --git a/sdk/cosmos/azure-cosmos/tests/test_query.py b/sdk/cosmos/azure-cosmos/tests/test_query.py index 28262aa0f7e3..d62c708e9053 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_query.py +++ b/sdk/cosmos/azure-cosmos/tests/test_query.py @@ -109,6 +109,39 @@ def test_populate_index_metrics(self): self.assertDictEqual(expected_index_metrics, index_metrics) self.created_db.delete_container(created_collection.id) + def test_throughput_bucket(self): + throughputBucketNumber = 2 + created_collection = self.created_db.create_container("query_index_test", + PartitionKey(path="/pk"), + throughput_bucket=throughputBucketNumber) + + + doc_id = 'MyId' + str(uuid.uuid4()) + document_definition = {'pk': 'pk', 'id': doc_id} + created_collection.create_item(body=document_definition) + + query = 'SELECT * from c' + query_iterable = created_collection.query_items( + query=query, + partition_key='pk', + populate_index_metrics=True + ) + + iter_list = list(query_iterable) + self.assertEqual(iter_list[0]['id'], doc_id) + + INDEX_HEADER_NAME = http_constants.HttpHeaders.ThroughputBucket + self.assertTrue(INDEX_HEADER_NAME in created_collection.client_connection.last_response_headers) + # index_metrics = created_collection.client_connection.last_response_headers[INDEX_HEADER_NAME] + # self.assertIsNotNone(index_metrics) + # expected_index_metrics = {'UtilizedSingleIndexes': [{'FilterExpression': '', 'IndexSpec': '/pk/?', + # 'FilterPreciseSet': True, 'IndexPreciseSet': True, + # 'IndexImpactScore': 'High'}], + # 'PotentialSingleIndexes': [], 'UtilizedCompositeIndexes': [], + # 'PotentialCompositeIndexes': []} + # self.assertDictEqual(expected_index_metrics, index_metrics) + self.created_db.delete_container(created_collection.id) + # TODO: Need to validate the query request count logic @pytest.mark.skip def test_max_item_count_honored_in_order_by_query(self): From f810c42e5de1aed84ae6ae94b20d926e348c3fdd Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 3 Apr 2025 10:39:01 -0400 Subject: [PATCH 03/15] got rid of extra comment --- sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py index 6bf8b5de5ecb..08e104d69de7 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py @@ -189,7 +189,7 @@ class HttpHeaders: DisableRUPerMinuteUsage = "x-ms-documentdb-disable-ru-per-minute-usage" IsRUPerMinuteUsed = "x-ms-documentdb-is-ru-per-minute-used" OfferIsRUPerMinuteThroughputEnabled = "x-ms-offer-is-ru-per-minute-throughput-enabled" - ThroughputBucket = "x-ms-cosmos-throughput-bucket" #CHECK + ThroughputBucket = "x-ms-cosmos-throughput-bucket" # Partitioned collection headers PartitionKey = "x-ms-documentdb-partitionkey" From 4afa6268d521c99c5405664e1142075b323d48fc Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 3 Apr 2025 22:45:08 -0400 Subject: [PATCH 04/15] sync part of throughput headers --- sdk/cosmos/azure-cosmos/azure/cosmos/_base.py | 1 + .../azure/cosmos/_cosmos_client_connection.py | 4 + .../azure-cosmos/azure/cosmos/container.py | 40 ++- .../azure/cosmos/cosmos_client.py | 16 +- .../azure-cosmos/azure/cosmos/database.py | 19 ++ sdk/cosmos/azure-cosmos/tests/test_headers.py | 229 ++++++++++++++++++ sdk/cosmos/azure-cosmos/tests/test_query.py | 33 --- 7 files changed, 305 insertions(+), 37 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index ee13418c142b..634e7a2b0788 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -63,6 +63,7 @@ 'priority': 'priorityLevel', 'no_response': 'responsePayloadOnWriteDisabled', 'max_item_count': 'maxItemCount', + 'throughput_bucket': 'throughputBucket' } # Cosmos resource ID validation regex breakdown: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index 3934c23bcf99..f76024cb7846 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -160,6 +160,10 @@ def __init__( # pylint: disable=too-many-statements http_constants.HttpHeaders.IsContinuationExpected: False, } + throughput_bucket = kwargs.pop('throughput_bucket', None) + if throughput_bucket is not None: + self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket + # Keeps the latest response headers from the server. self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 9570ae51d7ce..009ec6727a07 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -155,6 +155,7 @@ def read( # pylint:disable=docstring-missing-param session_token: Optional[str] = None, priority: Optional[Literal["High", "Low"]] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the container properties. @@ -169,6 +170,7 @@ def read( # pylint:disable=docstring-missing-param before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be retrieved. This includes if the container does not exist. :returns: Dict representing the retrieved container. @@ -180,6 +182,8 @@ def read( # pylint:disable=docstring-missing-param kwargs['priority'] = priority if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics: warnings.warn( @@ -207,6 +211,7 @@ def read_item( # pylint:disable=docstring-missing-param initial_headers: Optional[Dict[str, str]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Get the item identified by `item`. @@ -225,6 +230,7 @@ def read_item( # pylint:disable=docstring-missing-param :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A CosmosDict representing the item to be retrieved. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -245,6 +251,8 @@ def read_item( # pylint:disable=docstring-missing-param kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) request_options["partitionKey"] = self._set_partition_key(partition_key) @@ -275,6 +283,7 @@ def read_all_items( # pylint:disable=docstring-missing-param priority: Optional[Literal["High", "Low"]] = None, response_hook: Optional[Callable[[Mapping[str, Any], Union[Dict[str, Any], ItemPaged[Dict[str, Any]]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List all the items in the container. @@ -292,6 +301,7 @@ def read_all_items( # pylint:disable=docstring-missing-param :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -301,6 +311,8 @@ def read_all_items( # pylint:disable=docstring-missing-param kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -631,6 +643,8 @@ def query_items( # pylint:disable=docstring-missing-param kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughputBucket"] = throughput_bucket feed_options = build_options(kwargs) if enable_cross_partition_query is not None: feed_options["enableCrossPartitionQuery"] = enable_cross_partition_query @@ -658,8 +672,6 @@ def query_items( # pylint:disable=docstring-missing-param feed_options["correlatedActivityId"] = correlated_activity_id if continuation_token_limit is not None: feed_options["responseContinuationTokenLimitInKb"] = continuation_token_limit - if throughput_bucket is not None: - feed_options["throughputBucket"] = throughput_bucket if response_hook and hasattr(response_hook, "clear"): response_hook.clear() if self.container_link in self.__get_client_container_caches(): @@ -699,6 +711,7 @@ def replace_item( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Replaces the specified item if it exists in the container. @@ -723,6 +736,7 @@ def replace_item( # pylint:disable=docstring-missing-param :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The replace operation failed or the item with given id does not exist. :returns: A CosmosDict representing the item after replace went through. The dict will be empty if `no_response` @@ -746,6 +760,8 @@ def replace_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True if populate_query_metrics is not None: @@ -775,6 +791,7 @@ def upsert_item( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Insert or update the specified item. @@ -798,6 +815,7 @@ def upsert_item( # pylint:disable=docstring-missing-param :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item could not be upserted. :returns: A CosmosDict representing the upserted item. The dict will be empty if `no_response` is specified. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -818,6 +836,8 @@ def upsert_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True if populate_query_metrics is not None: @@ -935,6 +955,7 @@ def patch_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """ Patches the specified item with the provided operations if it @@ -962,6 +983,7 @@ def patch_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The patch operations failed or the item with given id does not exist. :returns: A CosmosDict representing the item after the patch operations went through. The dict will be empty @@ -982,6 +1004,8 @@ def patch_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True request_options["partitionKey"] = self._set_partition_key(partition_key) @@ -1007,6 +1031,7 @@ def execute_item_batch( etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosList: """ Executes the transactional batch for the specified partition key. @@ -1025,6 +1050,7 @@ def execute_item_batch( request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A CosmosList representing the items after the batch operations went through. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The batch failed to execute. :raises ~azure.cosmos.exceptions.CosmosBatchOperationError: A transactional batch operation failed in the batch. @@ -1042,6 +1068,8 @@ def execute_item_batch( kwargs['match_condition'] = match_condition if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) request_options["partitionKey"] = self._set_partition_key(partition_key) request_options["disableAutomaticIdGeneration"] = True @@ -1063,6 +1091,7 @@ def delete_item( # pylint:disable=docstring-missing-param etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the specified item from the container. @@ -1084,6 +1113,7 @@ def delete_item( # pylint:disable=docstring-missing-param request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The item wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The item does not exist in the container. :rtype: None @@ -1098,6 +1128,8 @@ def delete_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) if partition_key is not None: request_options["partitionKey"] = self._set_partition_key(partition_key) @@ -1343,6 +1375,7 @@ def delete_all_items_by_partition_key( session_token: Optional[str] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """The delete by partition key feature is an asynchronous, background operation that allows you to delete all @@ -1360,6 +1393,7 @@ def delete_all_items_by_partition_key( has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :rtype: None """ if pre_trigger_include is not None: @@ -1372,6 +1406,8 @@ def delete_all_items_by_partition_key( kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) # regardless if partition key is valid we set it as invalid partition keys are set to a default empty value request_options["partitionKey"] = self._set_partition_key(partition_key) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 0f0dcd738365..68702232200d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -304,6 +304,8 @@ def create_database( # pylint:disable=docstring-missing-param kwargs["etag"] = etag if match_condition is not None: kwargs["match_condition"] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics is not None: warnings.warn( @@ -311,8 +313,6 @@ def create_database( # pylint:disable=docstring-missing-param UserWarning, ) request_options["populateQueryMetrics"] = populate_query_metrics - if throughput_bucket is not None: - kwargs["throughput_bucket"] = throughput_bucket _set_throughput_options(offer=offer_throughput, request_options=request_options) result = self.client_connection.CreateDatabase(database={"id": id}, options=request_options, **kwargs) @@ -408,6 +408,7 @@ def list_databases( # pylint:disable=docstring-missing-param session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List the databases in a Cosmos DB SQL database account. @@ -417,6 +418,7 @@ def list_databases( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of database properties (dicts). :rtype: Iterable[Dict[str, str]] """ @@ -424,6 +426,8 @@ def list_databases( # pylint:disable=docstring-missing-param kwargs["session_token"] = session_token if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -451,6 +455,7 @@ def query_databases( # pylint:disable=docstring-missing-param session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Query the databases in a Cosmos DB SQL database account. @@ -465,6 +470,7 @@ def query_databases( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of database properties (dicts). :rtype: Iterable[Dict[str, str]] """ @@ -472,6 +478,8 @@ def query_databases( # pylint:disable=docstring-missing-param kwargs["session_token"] = session_token if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket feed_options = build_options(kwargs) if enable_cross_partition_query is not None: feed_options["enableCrossPartitionQuery"] = enable_cross_partition_query @@ -512,6 +520,7 @@ def delete_database( # pylint:disable=docstring-missing-param etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the database with the given ID (name). @@ -526,6 +535,7 @@ def delete_database( # pylint:disable=docstring-missing-param :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted. :rtype: None """ @@ -537,6 +547,8 @@ def delete_database( # pylint:disable=docstring-missing-param kwargs["etag"] = etag if match_condition is not None: kwargs["match_condition"] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics is not None: warnings.warn( diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index 03fb1ff8961d..9230527369a5 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -129,6 +129,7 @@ def read( # pylint:disable=docstring-missing-param *, session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the database properties. @@ -136,6 +137,7 @@ def read( # pylint:disable=docstring-missing-param :keyword str session_token: Token for use with Session consistency. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A dict representing the database properties. :rtype: Dict[Str, Any] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved. @@ -145,6 +147,8 @@ def read( # pylint:disable=docstring-missing-param kwargs['session_token'] = session_token if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics is not None: warnings.warn( @@ -391,6 +395,7 @@ def delete_container( # pylint:disable=docstring-missing-param initial_headers: Optional[Dict[str, str]] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete a container. @@ -405,6 +410,7 @@ def delete_container( # pylint:disable=docstring-missing-param has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted. :rtype: None """ @@ -416,6 +422,8 @@ def delete_container( # pylint:disable=docstring-missing-param kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = build_options(kwargs) if populate_query_metrics is not None: warnings.warn( @@ -462,6 +470,7 @@ def list_containers( # pylint:disable=docstring-missing-param session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List the containers in the database. @@ -471,6 +480,7 @@ def list_containers( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of container properties (dicts). :rtype: Iterable[Dict[str, Any]] @@ -487,6 +497,8 @@ def list_containers( # pylint:disable=docstring-missing-param kwargs['session_token'] = session_token if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -515,6 +527,7 @@ def query_containers( # pylint:disable=docstring-missing-param session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List the properties for containers in the current database. @@ -527,6 +540,10 @@ def query_containers( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None] + + :keyword int throughput_bucket: The desired throughput bucket for the client + + :returns: An Iterable of container properties (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -534,6 +551,8 @@ def query_containers( # pylint:disable=docstring-missing-param kwargs['session_token'] = session_token if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 4c40fa86f83a..15a31fde2e25 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -5,11 +5,24 @@ from unittest.mock import MagicMock import pytest +import uuid import azure.cosmos.cosmos_client as cosmos_client import test_config from azure.cosmos import DatabaseProxy +from azure.cosmos import http_constants, DatabaseProxy, _endpoint_discovery_retry_policy +from azure.cosmos.partition_key import PartitionKey +client_throughput_bucket_number = 2 +request_throughput_bucket_number = 3 +def client_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(client_throughput_bucket_number)) + +def request_raw_response_hook(response): + # if http_constants.HttpHeaders.ThroughputBucket in response.http_request.headers: + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(request_throughput_bucket_number)) @pytest.mark.cosmosEmulator class TestHeaders(unittest.TestCase): @@ -48,12 +61,14 @@ def test_correlated_activity_id(self): query = 'SELECT * from c ORDER BY c._ts' cosmos_client_connection = self.container.client_connection + original_connection_post = cosmos_client_connection._CosmosClientConnection__Post cosmos_client_connection._CosmosClientConnection__Post = MagicMock( side_effect=self.side_effect_correlated_activity_id) try: list(self.container.query_items(query=query, partition_key="pk-1")) except StopIteration: pass + cosmos_client_connection._CosmosClientConnection__Post = original_connection_post def test_max_integrated_cache_staleness(self): cosmos_client_connection = self.container.client_connection @@ -80,6 +95,220 @@ def test_negative_max_integrated_cache_staleness(self): except Exception as exception: assert isinstance(exception, ValueError) + def test_client_level_throughput_bucket(self): + cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number, + raw_response_hook=client_raw_response_hook) + + def test_request_precedence_throughput_bucket(self): + client = cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number) + created_db = client.create_database( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + client.delete_database(created_db.id) + + def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): + created_db = self.client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.client.delete_database( + created_db.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_list_db_throughput_bucket(self): + self.client.list_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_query_db_throughput_bucket(self): + self.client.query_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_db_read_throughput_bucket(self): + self.database.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_create_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(created_collection.id) + + def test_create_container_if_not_exists_throughput_bucket(self): + created_collection = self.database.create_container_if_not_exists( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(created_collection.id) + + def test_delete_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + self.database.delete_container( + created_collection.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_list_containers_throughput_bucket(self): + self.database.list_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_query_containers_throughput_bucket(self): + self.database.query_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_replace_container_throughput_bucket(self): + replaced_collection = self.database.replace_container( + self.container, + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(replaced_collection.id) + + def test_container_read_throughput_bucket(self): + self.container.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_read_item_throughput_bucket(self): + created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + self.container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_read_all_items_throughput_bucket(self): + for i in range(10): + self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) + + self.container.read_all_items( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_query_items_throughput_bucket(self): + doc_id = 'MyId' + str(uuid.uuid4()) + document_definition = {'pk': 'pk', 'id': doc_id} + self.container.create_item(body=document_definition) + + query = 'SELECT * from c' + self.container.query_items( + query=query, + partition_key='pk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_replace_item_throughput_bucket(self): + created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + self.container.replace_item( + item=created_document['id'], + body={'id': '2' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_upsert_item_throughput_bucket(self): + self.container.upsert_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_create_item_throughput_bucket(self): + self.container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_patch_item_throughput_bucket(self): + pkValue = "patch_item_pk" + str(uuid.uuid4()) + # Create item to patch + item = { + "id": "patch_item", + "pk": pkValue, + "prop": "prop1", + "address": { + "city": "Redmond" + }, + "company": "Microsoft", + "number": 3} + self.container.create_item(item) + # Define and run patch operations + operations = [ + {"op": "add", "path": "/color", "value": "yellow"}, + {"op": "remove", "path": "/prop"}, + {"op": "replace", "path": "/company", "value": "CosmosDB"}, + {"op": "set", "path": "/address/new_city", "value": "Atlanta"}, + {"op": "incr", "path": "/number", "value": 7}, + {"op": "move", "from": "/color", "path": "/favorite_color"} + ] + self.container.patch_item( + item="patch_item", + partition_key=pkValue, + patch_operations=operations, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_execute_item_batch_throughput_bucket(self): + created_collection = self.database.create_container( + id='test_execute_item ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/company')) + batch = [] + for i in range(100): + batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},))) + + created_collection.execute_item_batch( + batch_operations=batch, + partition_key="Microsoft", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + self.database.delete_container(created_collection) + + def test_container_delete_item_throughput_bucket(self): + created_item = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + self.container.delete_item( + created_item['id'], + partition_key='mypk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_delete_all_items_by_partition_key_throughput_bucket(self): + client = cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number) + created_db = client.create_database("test_db" + str(uuid.uuid4())) + + created_collection = created_db.create_container( + id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/pk', kind='Hash')) + + # Create two partition keys + partition_key1 = "{}-{}".format("Partition Key 1", str(uuid.uuid4())) + partition_key2 = "{}-{}".format("Partition Key 2", str(uuid.uuid4())) + + # add items for partition key 1 + for i in range(1, 3): + created_collection.upsert_item( + dict(id="item{}".format(i), pk=partition_key1)) + + # add items for partition key 2 + pk2_item = created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) + + # delete all items for partition key 1 + created_collection.delete_all_items_by_partition_key(partition_key1) + + items = list(created_collection.read_all_items()) if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_query.py b/sdk/cosmos/azure-cosmos/tests/test_query.py index d62c708e9053..28262aa0f7e3 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_query.py +++ b/sdk/cosmos/azure-cosmos/tests/test_query.py @@ -109,39 +109,6 @@ def test_populate_index_metrics(self): self.assertDictEqual(expected_index_metrics, index_metrics) self.created_db.delete_container(created_collection.id) - def test_throughput_bucket(self): - throughputBucketNumber = 2 - created_collection = self.created_db.create_container("query_index_test", - PartitionKey(path="/pk"), - throughput_bucket=throughputBucketNumber) - - - doc_id = 'MyId' + str(uuid.uuid4()) - document_definition = {'pk': 'pk', 'id': doc_id} - created_collection.create_item(body=document_definition) - - query = 'SELECT * from c' - query_iterable = created_collection.query_items( - query=query, - partition_key='pk', - populate_index_metrics=True - ) - - iter_list = list(query_iterable) - self.assertEqual(iter_list[0]['id'], doc_id) - - INDEX_HEADER_NAME = http_constants.HttpHeaders.ThroughputBucket - self.assertTrue(INDEX_HEADER_NAME in created_collection.client_connection.last_response_headers) - # index_metrics = created_collection.client_connection.last_response_headers[INDEX_HEADER_NAME] - # self.assertIsNotNone(index_metrics) - # expected_index_metrics = {'UtilizedSingleIndexes': [{'FilterExpression': '', 'IndexSpec': '/pk/?', - # 'FilterPreciseSet': True, 'IndexPreciseSet': True, - # 'IndexImpactScore': 'High'}], - # 'PotentialSingleIndexes': [], 'UtilizedCompositeIndexes': [], - # 'PotentialCompositeIndexes': []} - # self.assertDictEqual(expected_index_metrics, index_metrics) - self.created_db.delete_container(created_collection.id) - # TODO: Need to validate the query request count logic @pytest.mark.skip def test_max_item_count_honored_in_order_by_query(self): From 736221693e6808d44094cd95e3cf78d6eea54fd1 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Fri, 4 Apr 2025 10:08:36 -0400 Subject: [PATCH 05/15] added async part with new test file --- .../azure-cosmos/tests/test_headers_async.py | 314 ++++++++++++++++++ 1 file changed, 314 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/tests/test_headers_async.py diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py new file mode 100644 index 000000000000..15a31fde2e25 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -0,0 +1,314 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +import unittest +from unittest.mock import MagicMock + +import pytest +import uuid + +import azure.cosmos.cosmos_client as cosmos_client +import test_config +from azure.cosmos import DatabaseProxy +from azure.cosmos import http_constants, DatabaseProxy, _endpoint_discovery_retry_policy +from azure.cosmos.partition_key import PartitionKey + +client_throughput_bucket_number = 2 +request_throughput_bucket_number = 3 +def client_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(client_throughput_bucket_number)) + +def request_raw_response_hook(response): + # if http_constants.HttpHeaders.ThroughputBucket in response.http_request.headers: + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(request_throughput_bucket_number)) + +@pytest.mark.cosmosEmulator +class TestHeaders(unittest.TestCase): + database: DatabaseProxy = None + client: cosmos_client.CosmosClient = None + configs = test_config.TestConfig + host = configs.host + masterKey = configs.masterKey + + dedicated_gateway_max_age_thousand = 1000 + dedicated_gateway_max_age_million = 1000000 + dedicated_gateway_max_age_negative = -1 + + @classmethod + def setUpClass(cls): + cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey) + cls.database = cls.client.get_database_client(cls.configs.TEST_DATABASE_ID) + cls.container = cls.database.get_container_client(cls.configs.TEST_MULTI_PARTITION_CONTAINER_ID) + + def side_effect_dedicated_gateway_max_age_thousand(self, *args, **kwargs): + # Extract request headers from args + assert args[2]["x-ms-dedicatedgateway-max-age"] == self.dedicated_gateway_max_age_thousand + raise StopIteration + + def side_effect_dedicated_gateway_max_age_million(self, *args, **kwargs): + # Extract request headers from args + assert args[2]["x-ms-dedicatedgateway-max-age"] == self.dedicated_gateway_max_age_million + raise StopIteration + + def side_effect_correlated_activity_id(self, *args, **kwargs): + # Extract request headers from args + assert args[3]["x-ms-cosmos-correlated-activityid"] # cspell:disable-line + raise StopIteration + + def test_correlated_activity_id(self): + query = 'SELECT * from c ORDER BY c._ts' + + cosmos_client_connection = self.container.client_connection + original_connection_post = cosmos_client_connection._CosmosClientConnection__Post + cosmos_client_connection._CosmosClientConnection__Post = MagicMock( + side_effect=self.side_effect_correlated_activity_id) + try: + list(self.container.query_items(query=query, partition_key="pk-1")) + except StopIteration: + pass + cosmos_client_connection._CosmosClientConnection__Post = original_connection_post + + def test_max_integrated_cache_staleness(self): + cosmos_client_connection = self.container.client_connection + cosmos_client_connection._CosmosClientConnection__Get = MagicMock( + side_effect=self.side_effect_dedicated_gateway_max_age_thousand) + try: + self.container.read_item(item="id-1", partition_key="pk-1", + max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_thousand) + except StopIteration: + pass + + cosmos_client_connection._CosmosClientConnection__Get = MagicMock( + side_effect=self.side_effect_dedicated_gateway_max_age_million) + try: + self.container.read_item(item="id-1", partition_key="pk-1", + max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_million) + except StopIteration: + pass + + def test_negative_max_integrated_cache_staleness(self): + try: + self.container.read_item(item="id-1", partition_key="pk-1", + max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_negative) + except Exception as exception: + assert isinstance(exception, ValueError) + + def test_client_level_throughput_bucket(self): + cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number, + raw_response_hook=client_raw_response_hook) + + def test_request_precedence_throughput_bucket(self): + client = cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number) + created_db = client.create_database( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + client.delete_database(created_db.id) + + def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): + created_db = self.client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.client.delete_database( + created_db.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_list_db_throughput_bucket(self): + self.client.list_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_query_db_throughput_bucket(self): + self.client.query_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_db_read_throughput_bucket(self): + self.database.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_create_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(created_collection.id) + + def test_create_container_if_not_exists_throughput_bucket(self): + created_collection = self.database.create_container_if_not_exists( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(created_collection.id) + + def test_delete_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + self.database.delete_container( + created_collection.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_list_containers_throughput_bucket(self): + self.database.list_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_query_containers_throughput_bucket(self): + self.database.query_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_replace_container_throughput_bucket(self): + replaced_collection = self.database.replace_container( + self.container, + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(replaced_collection.id) + + def test_container_read_throughput_bucket(self): + self.container.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_read_item_throughput_bucket(self): + created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + self.container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_read_all_items_throughput_bucket(self): + for i in range(10): + self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) + + self.container.read_all_items( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_query_items_throughput_bucket(self): + doc_id = 'MyId' + str(uuid.uuid4()) + document_definition = {'pk': 'pk', 'id': doc_id} + self.container.create_item(body=document_definition) + + query = 'SELECT * from c' + self.container.query_items( + query=query, + partition_key='pk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_replace_item_throughput_bucket(self): + created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + self.container.replace_item( + item=created_document['id'], + body={'id': '2' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_upsert_item_throughput_bucket(self): + self.container.upsert_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_create_item_throughput_bucket(self): + self.container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_patch_item_throughput_bucket(self): + pkValue = "patch_item_pk" + str(uuid.uuid4()) + # Create item to patch + item = { + "id": "patch_item", + "pk": pkValue, + "prop": "prop1", + "address": { + "city": "Redmond" + }, + "company": "Microsoft", + "number": 3} + self.container.create_item(item) + # Define and run patch operations + operations = [ + {"op": "add", "path": "/color", "value": "yellow"}, + {"op": "remove", "path": "/prop"}, + {"op": "replace", "path": "/company", "value": "CosmosDB"}, + {"op": "set", "path": "/address/new_city", "value": "Atlanta"}, + {"op": "incr", "path": "/number", "value": 7}, + {"op": "move", "from": "/color", "path": "/favorite_color"} + ] + self.container.patch_item( + item="patch_item", + partition_key=pkValue, + patch_operations=operations, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_execute_item_batch_throughput_bucket(self): + created_collection = self.database.create_container( + id='test_execute_item ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/company')) + batch = [] + for i in range(100): + batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},))) + + created_collection.execute_item_batch( + batch_operations=batch, + partition_key="Microsoft", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + self.database.delete_container(created_collection) + + def test_container_delete_item_throughput_bucket(self): + created_item = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + self.container.delete_item( + created_item['id'], + partition_key='mypk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_delete_all_items_by_partition_key_throughput_bucket(self): + client = cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number) + created_db = client.create_database("test_db" + str(uuid.uuid4())) + + created_collection = created_db.create_container( + id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/pk', kind='Hash')) + + # Create two partition keys + partition_key1 = "{}-{}".format("Partition Key 1", str(uuid.uuid4())) + partition_key2 = "{}-{}".format("Partition Key 2", str(uuid.uuid4())) + + # add items for partition key 1 + for i in range(1, 3): + created_collection.upsert_item( + dict(id="item{}".format(i), pk=partition_key1)) + + # add items for partition key 2 + pk2_item = created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) + + # delete all items for partition key 1 + created_collection.delete_all_items_by_partition_key(partition_key1) + + items = list(created_collection.read_all_items()) + +if __name__ == "__main__": + unittest.main() From 4f8aa50234f6b98a64ee89424af9dae5f5bcc040 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Fri, 4 Apr 2025 14:30:57 -0400 Subject: [PATCH 06/15] added async part with tests --- .../azure/cosmos/aio/_container.py | 44 ++++ .../azure/cosmos/aio/_cosmos_client.py | 21 ++ .../aio/_cosmos_client_connection_async.py | 4 + .../azure/cosmos/aio/_database.py | 27 +++ .../azure-cosmos/azure/cosmos/database.py | 8 +- sdk/cosmos/azure-cosmos/tests/test_headers.py | 21 +- .../azure-cosmos/tests/test_headers_async.py | 216 +++++++----------- 7 files changed, 192 insertions(+), 149 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 59bf8ee71ba3..072014e40bb5 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -157,6 +157,7 @@ async def read( session_token: Optional[str] = None, priority: Optional[Literal["High", "Low"]] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the container properties. @@ -171,6 +172,7 @@ async def read( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be retrieved. This includes if the container does not exist. :returns: Dict representing the retrieved container. @@ -182,6 +184,8 @@ async def read( kwargs['priority'] = priority if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) if populate_partition_key_range_statistics is not None: request_options["populatePartitionKeyRangeStatistics"] = populate_partition_key_range_statistics @@ -207,6 +211,7 @@ async def create_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Create an item in the container. @@ -235,6 +240,7 @@ async def create_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Item with the given ID already exists. :returns: A CosmosDict representing the new item. The dict will be empty if `no_response` is specified. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -255,6 +261,8 @@ async def create_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = not enable_automatic_id_generation if indexing_directive is not None: @@ -278,6 +286,7 @@ async def read_item( initial_headers: Optional[Dict[str, str]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Get the item identified by `item`. @@ -297,6 +306,7 @@ async def read_item( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved. :returns: A CosmosDict representing the retrieved item. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -320,6 +330,8 @@ async def read_item( kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["partitionKey"] = await self._set_partition_key(partition_key) @@ -342,6 +354,7 @@ def read_all_items( Union[Dict[str, Any], AsyncItemPaged[Dict[str, Any]]]], None]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List all the items in the container. @@ -359,6 +372,7 @@ def read_all_items( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -368,6 +382,8 @@ def read_all_items( kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -404,6 +420,7 @@ def query_items( continuation_token_limit: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, Any], Union[Dict[str, Any], AsyncItemPaged[Dict[str, Any]]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Return all results matching the given `query`. @@ -443,6 +460,7 @@ def query_items( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -470,6 +488,8 @@ def query_items( kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughputBucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -736,6 +756,7 @@ async def upsert_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Insert or update the specified item. @@ -760,6 +781,7 @@ async def upsert_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item could not be upserted. :returns: A CosmosDict representing the upserted item. The dict will be empty if `no_response` is specified. @@ -781,6 +803,8 @@ async def upsert_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True if self.container_link in self.__get_client_container_caches(): @@ -808,6 +832,7 @@ async def replace_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Replaces the specified item if it exists in the container. @@ -833,6 +858,7 @@ async def replace_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The replace operation failed or the item with given id does not exist. :returns: A CosmosDict representing the item after replace went through. The dict will be empty if `no_response` @@ -856,6 +882,8 @@ async def replace_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True if self.container_link in self.__get_client_container_caches(): @@ -881,6 +909,7 @@ async def patch_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """ Patches the specified item with the provided operations if it @@ -908,6 +937,7 @@ async def patch_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The patch operations failed or the item with given id does not exist. :returns: A CosmosDict representing the item after the patch operations went through. The dict will be empty if @@ -928,6 +958,8 @@ async def patch_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True request_options["partitionKey"] = await self._set_partition_key(partition_key) @@ -954,6 +986,7 @@ async def delete_item( etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the specified item from the container. @@ -977,6 +1010,7 @@ async def delete_item( before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Dict[str, str], None], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The item wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The item does not exist in the container. :rtype: None @@ -995,6 +1029,8 @@ async def delete_item( kwargs['match_condition'] = match_condition if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["partitionKey"] = await self._set_partition_key(partition_key) if self.container_link in self.__get_client_container_caches(): @@ -1214,6 +1250,7 @@ async def delete_all_items_by_partition_key( session_token: Optional[str] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """The delete by partition key feature is an asynchronous, background operation that allows you to delete all @@ -1231,6 +1268,7 @@ async def delete_all_items_by_partition_key( has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :rtype: None """ if pre_trigger_include is not None: @@ -1243,6 +1281,8 @@ async def delete_all_items_by_partition_key( kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) # regardless if partition key is valid we set it as invalid partition keys are set to a default empty value request_options["partitionKey"] = await self._set_partition_key(partition_key) @@ -1264,6 +1304,7 @@ async def execute_item_batch( etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosList: """ Executes the transactional batch for the specified partition key. @@ -1282,6 +1323,7 @@ async def execute_item_batch( request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A CosmosList representing the items after the batch operations went through. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The batch failed to execute. :raises ~azure.cosmos.exceptions.CosmosBatchOperationError: A transactional batch operation failed in the batch. @@ -1299,6 +1341,8 @@ async def execute_item_batch( kwargs['match_condition'] = match_condition if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["partitionKey"] = await self._set_partition_key(partition_key) request_options["disableAutomaticIdGeneration"] = True diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py index 988ba067eb48..3145121f78c0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py @@ -167,6 +167,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword level (to log all requests) or at a single request level. Requests will be logged at INFO level. :keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending response payloads for write operations on items by default unless specified differently per operation. + :keyword int throughput_bucket: The desired throughput bucket for the client .. admonition:: Example: @@ -254,6 +255,7 @@ async def create_database( initial_headers: Optional[Dict[str, str]] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -269,6 +271,7 @@ async def create_database( :keyword match_condition: The match condition to use upon the etag. :paramtype match_condition: ~azure.core.MatchConditions :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists. :returns: A DatabaseProxy instance representing the new database. @@ -292,6 +295,8 @@ async def create_database( kwargs["etag"] = etag if match_condition is not None: kwargs["match_condition"] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) _set_throughput_options(offer=offer_throughput, request_options=request_options) @@ -308,6 +313,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin initial_headers: Optional[Dict[str, str]] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -329,6 +335,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin :keyword match_condition: The match condition to use upon the etag. :paramtype match_condition: ~azure.core.MatchConditions :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed. :returns: A DatabaseProxy instance representing the database. @@ -342,6 +349,8 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin kwargs["etag"] = etag if match_condition is not None: kwargs["match_condition"] = match_condition + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket try: database_proxy = self.get_database_client(id) await database_proxy.read(**kwargs) @@ -378,6 +387,7 @@ def list_databases( session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List the databases in a Cosmos DB SQL database account. @@ -386,6 +396,7 @@ def list_databases( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any]], None] :returns: An AsyncItemPaged of database properties (dicts). :rtype: AsyncItemPaged[Dict[str, str]] @@ -394,6 +405,8 @@ def list_databases( kwargs["session_token"] = session_token if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -413,6 +426,7 @@ def query_databases( session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Query the databases in a Cosmos DB SQL database account. @@ -425,6 +439,7 @@ def query_databases( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any]], None] :returns: An AsyncItemPaged of database properties (dicts). :rtype: AsyncItemPaged[Dict[str, str]] @@ -433,6 +448,8 @@ def query_databases( kwargs["session_token"] = session_token if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -455,6 +472,7 @@ async def delete_database( etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the database with the given ID (name). @@ -469,6 +487,7 @@ async def delete_database( :keyword match_condition: The match condition to use upon the etag. :paramtype match_condition: ~azure.core.MatchConditions :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted. :rtype: None @@ -481,6 +500,8 @@ async def delete_database( kwargs["etag"] = etag if match_condition is not None: kwargs["match_condition"] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) database_link = _get_database_link(database) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 49219533a7e6..9033e14213f1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -163,6 +163,10 @@ def __init__( # pylint: disable=too-many-statements http_constants.HttpHeaders.IsContinuationExpected: False, } + throughput_bucket = kwargs.pop('throughput_bucket', None) + if throughput_bucket is not None: + self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket + if consistency_level is not None: self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py index d5dc685fd7a1..53b6d93b9d4f 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py @@ -133,6 +133,7 @@ async def read( *, session_token: Optional[str] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the database properties. @@ -140,6 +141,7 @@ async def read( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved. :returns: A dict representing the database properties @@ -150,6 +152,8 @@ async def read( kwargs['session_token'] = session_token if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) self._properties = await self.client_connection.ReadDatabase( @@ -178,6 +182,7 @@ async def create_container( vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a new container with the given ID (name). @@ -216,6 +221,7 @@ async def create_container( :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :returns: A `ContainerProxy` instance representing the new container. :rtype: ~azure.cosmos.aio.ContainerProxy @@ -273,6 +279,8 @@ async def create_container( kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) _set_throughput_options(offer=offer_throughput, request_options=request_options) @@ -301,6 +309,7 @@ async def create_container_if_not_exists( vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a container if it does not exist already. @@ -341,6 +350,7 @@ async def create_container_if_not_exists( :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :returns: A `ContainerProxy` instance representing the new container. :rtype: ~azure.cosmos.aio.ContainerProxy @@ -371,6 +381,7 @@ async def create_container_if_not_exists( vector_embedding_policy=vector_embedding_policy, change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, + throughput_bucket=throughput_bucket, **kwargs ) @@ -409,6 +420,7 @@ def list_containers( max_item_count: Optional[int] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs ) -> AsyncItemPaged[Dict[str, Any]]: """List the containers in the database. @@ -417,6 +429,7 @@ def list_containers( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None] :returns: An AsyncItemPaged of container properties (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -435,6 +448,8 @@ def list_containers( kwargs['session_token'] = session_token if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -456,6 +471,7 @@ def query_containers( max_item_count: Optional[int] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List the properties for containers in the current database. @@ -468,6 +484,7 @@ def query_containers( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None] :returns: An AsyncItemPaged of container properties (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -476,6 +493,8 @@ def query_containers( kwargs['session_token'] = session_token if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -506,6 +525,7 @@ async def replace_container( analytical_storage_ttl: Optional[int] = None, computed_properties: Optional[List[Dict[str, str]]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Reset the properties of the container. @@ -539,6 +559,7 @@ async def replace_container( :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the container after replace completed. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced. This includes if the container with given id does not exist. @@ -562,6 +583,8 @@ async def replace_container( kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) container_id = self._get_container_id(container) @@ -597,6 +620,7 @@ async def delete_container( initial_headers: Optional[Dict[str, str]] = None, etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete a container. @@ -612,6 +636,7 @@ async def delete_container( :keyword match_condition: The match condition to use upon the etag. :paramtype match_condition: ~azure.core.MatchConditions :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Dict[str, str], None], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted. :rtype: None @@ -624,6 +649,8 @@ async def delete_container( kwargs['etag'] = etag if match_condition is not None: kwargs['match_condition'] = match_condition + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) collection_link = self._get_container_link(container) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index 9230527369a5..c10fc7445fdc 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -291,7 +291,7 @@ def create_container( # pylint:disable=docstring-missing-param return ContainerProxy(self.client_connection, self.database_link, result["id"], properties=result) @distributed_trace - def create_container_if_not_exists( # pylint:disable=docstring-missing-param + def create_container_if_not_exists( # pylint:disable=docstring-missing-param self, id: str, partition_key: PartitionKey, @@ -358,6 +358,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param container_proxy.read( populate_query_metrics=populate_query_metrics, session_token=session_token, + throughput_bucket=throughput_bucket, initial_headers=initial_headers, **kwargs ) @@ -539,11 +540,8 @@ def query_containers( # pylint:disable=docstring-missing-param :keyword str session_token: Token for use with Session consistency. :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. - :paramtype response_hook: Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None] - :keyword int throughput_bucket: The desired throughput bucket for the client - - + :paramtype response_hook: Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None] :returns: An Iterable of container properties (dicts). :rtype: Iterable[Dict[str, Any]] """ diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 15a31fde2e25..8682a50ab982 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -21,8 +21,8 @@ def client_raw_response_hook(response): def request_raw_response_hook(response): # if http_constants.HttpHeaders.ThroughputBucket in response.http_request.headers: - assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] - == str(request_throughput_bucket_number)) + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(request_throughput_bucket_number)) @pytest.mark.cosmosEmulator class TestHeaders(unittest.TestCase): @@ -132,7 +132,7 @@ def test_query_db_throughput_bucket(self): def test_db_read_throughput_bucket(self): self.database.read( throughput_bucket=request_throughput_bucket_number, - raw_response_hook=request_raw_response_hook) + raw_response_hook=request_raw_response_hook) def test_create_container_throughput_bucket(self): created_collection = self.database.create_container( @@ -156,8 +156,8 @@ def test_delete_container_throughput_bucket(self): PartitionKey(path="/pk")) self.database.delete_container( created_collection.id, - throughput_bucket=request_throughput_bucket_number, - raw_response_hook=request_raw_response_hook) + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) def test_list_containers_throughput_bucket(self): self.database.list_containers( @@ -285,11 +285,8 @@ def test_container_delete_item_throughput_bucket(self): raw_response_hook=request_raw_response_hook) def test_container_delete_all_items_by_partition_key_throughput_bucket(self): - client = cosmos_client.CosmosClient(self.host, self.masterKey, - throughput_bucket=client_throughput_bucket_number) - created_db = client.create_database("test_db" + str(uuid.uuid4())) - created_collection = created_db.create_container( + created_collection = self.database.create_container( id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), partition_key=PartitionKey(path='/pk', kind='Hash')) @@ -306,9 +303,11 @@ def test_container_delete_all_items_by_partition_key_throughput_bucket(self): pk2_item = created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) # delete all items for partition key 1 - created_collection.delete_all_items_by_partition_key(partition_key1) + created_collection.delete_all_items_by_partition_key( + partition_key1, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) - items = list(created_collection.read_all_items()) if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py index 15a31fde2e25..35481e4d3b03 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -2,206 +2,159 @@ # Copyright (c) Microsoft Corporation. All rights reserved. import unittest -from unittest.mock import MagicMock import pytest import uuid -import azure.cosmos.cosmos_client as cosmos_client + import test_config -from azure.cosmos import DatabaseProxy -from azure.cosmos import http_constants, DatabaseProxy, _endpoint_discovery_retry_policy +from azure.cosmos import http_constants +from azure.cosmos.aio import CosmosClient, _retry_utility_async, DatabaseProxy from azure.cosmos.partition_key import PartitionKey client_throughput_bucket_number = 2 request_throughput_bucket_number = 3 -def client_raw_response_hook(response): +async def client_raw_response_hook(response): assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(client_throughput_bucket_number)) -def request_raw_response_hook(response): - # if http_constants.HttpHeaders.ThroughputBucket in response.http_request.headers: +async def request_raw_response_hook(response): assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(request_throughput_bucket_number)) @pytest.mark.cosmosEmulator -class TestHeaders(unittest.TestCase): - database: DatabaseProxy = None - client: cosmos_client.CosmosClient = None +class TestHeadersAsync(unittest.IsolatedAsyncioTestCase): + client: CosmosClient = None configs = test_config.TestConfig host = configs.host masterKey = configs.masterKey - - dedicated_gateway_max_age_thousand = 1000 - dedicated_gateway_max_age_million = 1000000 - dedicated_gateway_max_age_negative = -1 + database: DatabaseProxy = None @classmethod def setUpClass(cls): - cls.client = cosmos_client.CosmosClient(cls.host, cls.masterKey) - cls.database = cls.client.get_database_client(cls.configs.TEST_DATABASE_ID) - cls.container = cls.database.get_container_client(cls.configs.TEST_MULTI_PARTITION_CONTAINER_ID) - - def side_effect_dedicated_gateway_max_age_thousand(self, *args, **kwargs): - # Extract request headers from args - assert args[2]["x-ms-dedicatedgateway-max-age"] == self.dedicated_gateway_max_age_thousand - raise StopIteration - - def side_effect_dedicated_gateway_max_age_million(self, *args, **kwargs): - # Extract request headers from args - assert args[2]["x-ms-dedicatedgateway-max-age"] == self.dedicated_gateway_max_age_million - raise StopIteration - - def side_effect_correlated_activity_id(self, *args, **kwargs): - # Extract request headers from args - assert args[3]["x-ms-cosmos-correlated-activityid"] # cspell:disable-line - raise StopIteration - - def test_correlated_activity_id(self): - query = 'SELECT * from c ORDER BY c._ts' - - cosmos_client_connection = self.container.client_connection - original_connection_post = cosmos_client_connection._CosmosClientConnection__Post - cosmos_client_connection._CosmosClientConnection__Post = MagicMock( - side_effect=self.side_effect_correlated_activity_id) - try: - list(self.container.query_items(query=query, partition_key="pk-1")) - except StopIteration: - pass - cosmos_client_connection._CosmosClientConnection__Post = original_connection_post - - def test_max_integrated_cache_staleness(self): - cosmos_client_connection = self.container.client_connection - cosmos_client_connection._CosmosClientConnection__Get = MagicMock( - side_effect=self.side_effect_dedicated_gateway_max_age_thousand) - try: - self.container.read_item(item="id-1", partition_key="pk-1", - max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_thousand) - except StopIteration: - pass - - cosmos_client_connection._CosmosClientConnection__Get = MagicMock( - side_effect=self.side_effect_dedicated_gateway_max_age_million) - try: - self.container.read_item(item="id-1", partition_key="pk-1", - max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_million) - except StopIteration: - pass - - def test_negative_max_integrated_cache_staleness(self): - try: - self.container.read_item(item="id-1", partition_key="pk-1", - max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_negative) - except Exception as exception: - assert isinstance(exception, ValueError) - - def test_client_level_throughput_bucket(self): - cosmos_client.CosmosClient(self.host, self.masterKey, + if (cls.masterKey == '[YOUR_KEY_HERE]' or + cls.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + + async def asyncSetUp(self): + self.client = CosmosClient(self.host, self.masterKey) + self.database = self.client.get_database_client(self.configs.TEST_DATABASE_ID) + self.container = self.database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID) + + async def test_client_level_throughput_bucket(self): + CosmosClient(self.host, self.masterKey, throughput_bucket=client_throughput_bucket_number, raw_response_hook=client_raw_response_hook) - def test_request_precedence_throughput_bucket(self): - client = cosmos_client.CosmosClient(self.host, self.masterKey, + async def test_request_precedence_throughput_bucket(self): + client = CosmosClient(self.host, self.masterKey, throughput_bucket=client_throughput_bucket_number) - created_db = client.create_database( + created_db = await client.create_database( "test_db" + str(uuid.uuid4()), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) client.delete_database(created_db.id) - def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): - created_db = self.client.create_database_if_not_exists( + async def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): + created_db = await self.client.create_database_if_not_exists( "test_db" + str(uuid.uuid4()), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - self.client.delete_database( + await self.client.delete_database( created_db.id, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_list_db_throughput_bucket(self): + async def test_list_db_throughput_bucket(self): self.client.list_databases( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_query_db_throughput_bucket(self): + async def test_query_db_throughput_bucket(self): + query = 'SELECT * from c' self.client.query_databases( + query=query, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_db_read_throughput_bucket(self): - self.database.read( + async def test_db_read_throughput_bucket(self): + await self.database.read( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_create_container_throughput_bucket(self): - created_collection = self.database.create_container( + async def test_create_container_throughput_bucket(self): + created_collection = await self.database.create_container( str(uuid.uuid4()), PartitionKey(path="/pk"), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - self.database.delete_container(created_collection.id) + await self.database.delete_container(created_collection.id) - def test_create_container_if_not_exists_throughput_bucket(self): - created_collection = self.database.create_container_if_not_exists( + async def test_create_container_if_not_exists_throughput_bucket(self): + created_collection = await self.database.create_container_if_not_exists( str(uuid.uuid4()), PartitionKey(path="/pk"), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - self.database.delete_container(created_collection.id) + await self.database.delete_container(created_collection.id) - def test_delete_container_throughput_bucket(self): - created_collection = self.database.create_container( + async def test_delete_container_throughput_bucket(self): + created_collection = await self.database.create_container( str(uuid.uuid4()), PartitionKey(path="/pk")) - self.database.delete_container( + await self.database.delete_container( created_collection.id, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_list_containers_throughput_bucket(self): + async def test_list_containers_throughput_bucket(self): self.database.list_containers( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_query_containers_throughput_bucket(self): + async def test_query_containers_throughput_bucket(self): + query = 'SELECT * from c' self.database.query_containers( + query=query, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_replace_container_throughput_bucket(self): - replaced_collection = self.database.replace_container( + async def test_replace_container_throughput_bucket(self): + replaced_collection = await self.database.replace_container( self.container, PartitionKey(path="/pk"), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) self.database.delete_container(replaced_collection.id) - def test_container_read_throughput_bucket(self): - self.container.read( + async def test_container_read_throughput_bucket(self): + await self.container.read( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_read_item_throughput_bucket(self): - created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) - self.container.read_item( + async def test_container_read_item_throughput_bucket(self): + created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + await self.container.read_item( item=created_document['id'], partition_key="mypk", throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_read_all_items_throughput_bucket(self): + async def test_container_read_all_items_throughput_bucket(self): for i in range(10): - self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) + await self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) self.container.read_all_items( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_query_items_throughput_bucket(self): + async def test_container_query_items_throughput_bucket(self): doc_id = 'MyId' + str(uuid.uuid4()) document_definition = {'pk': 'pk', 'id': doc_id} - self.container.create_item(body=document_definition) + await self.container.create_item(body=document_definition) query = 'SELECT * from c' self.container.query_items( @@ -210,27 +163,27 @@ def test_container_query_items_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_replace_item_throughput_bucket(self): - created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) - self.container.replace_item( + async def test_container_replace_item_throughput_bucket(self): + created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + await self.container.replace_item( item=created_document['id'], body={'id': '2' + str(uuid.uuid4()), 'pk': 'mypk'}, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_upsert_item_throughput_bucket(self): - self.container.upsert_item( + async def test_container_upsert_item_throughput_bucket(self): + await self.container.upsert_item( body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_create_item_throughput_bucket(self): - self.container.create_item( + async def test_container_create_item_throughput_bucket(self): + await self.container.create_item( body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_patch_item_throughput_bucket(self): + async def test_container_patch_item_throughput_bucket(self): pkValue = "patch_item_pk" + str(uuid.uuid4()) # Create item to patch item = { @@ -242,7 +195,7 @@ def test_container_patch_item_throughput_bucket(self): }, "company": "Microsoft", "number": 3} - self.container.create_item(item) + await self.container.create_item(item) # Define and run patch operations operations = [ {"op": "add", "path": "/color", "value": "yellow"}, @@ -252,44 +205,40 @@ def test_container_patch_item_throughput_bucket(self): {"op": "incr", "path": "/number", "value": 7}, {"op": "move", "from": "/color", "path": "/favorite_color"} ] - self.container.patch_item( + await self.container.patch_item( item="patch_item", partition_key=pkValue, patch_operations=operations, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_execute_item_batch_throughput_bucket(self): - created_collection = self.database.create_container( + async def test_container_execute_item_batch_throughput_bucket(self): + created_collection = await self.database.create_container( id='test_execute_item ' + str(uuid.uuid4()), partition_key=PartitionKey(path='/company')) batch = [] for i in range(100): batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},))) - created_collection.execute_item_batch( + await created_collection.execute_item_batch( batch_operations=batch, partition_key="Microsoft", throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - self.database.delete_container(created_collection) + await self.database.delete_container(created_collection) - def test_container_delete_item_throughput_bucket(self): - created_item = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + async def test_container_delete_item_throughput_bucket(self): + created_item = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) - self.container.delete_item( + await self.container.delete_item( created_item['id'], partition_key='mypk', throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - def test_container_delete_all_items_by_partition_key_throughput_bucket(self): - client = cosmos_client.CosmosClient(self.host, self.masterKey, - throughput_bucket=client_throughput_bucket_number) - created_db = client.create_database("test_db" + str(uuid.uuid4())) - - created_collection = created_db.create_container( + async def test_container_delete_all_items_by_partition_key_throughput_bucket(self): + created_collection = await self.database.create_container( id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), partition_key=PartitionKey(path='/pk', kind='Hash')) @@ -299,16 +248,17 @@ def test_container_delete_all_items_by_partition_key_throughput_bucket(self): # add items for partition key 1 for i in range(1, 3): - created_collection.upsert_item( + await created_collection.upsert_item( dict(id="item{}".format(i), pk=partition_key1)) # add items for partition key 2 - pk2_item = created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) + pk2_item = await created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) # delete all items for partition key 1 - created_collection.delete_all_items_by_partition_key(partition_key1) - - items = list(created_collection.read_all_items()) + await created_collection.delete_all_items_by_partition_key( + partition_key1, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) if __name__ == "__main__": unittest.main() From ea44ada8f69e8071b3f4b978e626e5ef41f82a0d Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Fri, 4 Apr 2025 14:36:31 -0400 Subject: [PATCH 07/15] got rid of extra space --- sdk/cosmos/azure-cosmos/azure/cosmos/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index c10fc7445fdc..133dd3194893 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -291,7 +291,7 @@ def create_container( # pylint:disable=docstring-missing-param return ContainerProxy(self.client_connection, self.database_link, result["id"], properties=result) @distributed_trace - def create_container_if_not_exists( # pylint:disable=docstring-missing-param + def create_container_if_not_exists( # pylint:disable=docstring-missing-param self, id: str, partition_key: PartitionKey, From 88cbfd46ba7c52026a970cf9dded51de8152a34a Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Fri, 4 Apr 2025 14:42:55 -0400 Subject: [PATCH 08/15] added _async label to test_headers_async --- .../azure-cosmos/tests/test_headers_async.py | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py index 35481e4d3b03..5e42dec3db30 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -44,12 +44,12 @@ async def asyncSetUp(self): self.database = self.client.get_database_client(self.configs.TEST_DATABASE_ID) self.container = self.database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID) - async def test_client_level_throughput_bucket(self): + async def test_client_level_throughput_bucket_async(self): CosmosClient(self.host, self.masterKey, throughput_bucket=client_throughput_bucket_number, raw_response_hook=client_raw_response_hook) - async def test_request_precedence_throughput_bucket(self): + async def test_request_precedence_throughput_bucket_async(self): client = CosmosClient(self.host, self.masterKey, throughput_bucket=client_throughput_bucket_number) created_db = await client.create_database( @@ -58,7 +58,7 @@ async def test_request_precedence_throughput_bucket(self): raw_response_hook=request_raw_response_hook) client.delete_database(created_db.id) - async def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): + async def test_create_db_if_not_exists_and_delete_db_throughput_bucket_async(self): created_db = await self.client.create_database_if_not_exists( "test_db" + str(uuid.uuid4()), throughput_bucket=request_throughput_bucket_number, @@ -68,24 +68,24 @@ async def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_list_db_throughput_bucket(self): + async def test_list_db_throughput_bucket_async(self): self.client.list_databases( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_query_db_throughput_bucket(self): + async def test_query_db_throughput_bucket_async(self): query = 'SELECT * from c' self.client.query_databases( query=query, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_db_read_throughput_bucket(self): + async def test_db_read_throughput_bucket_async(self): await self.database.read( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_create_container_throughput_bucket(self): + async def test_create_container_throughput_bucket_async(self): created_collection = await self.database.create_container( str(uuid.uuid4()), PartitionKey(path="/pk"), @@ -93,7 +93,7 @@ async def test_create_container_throughput_bucket(self): raw_response_hook=request_raw_response_hook) await self.database.delete_container(created_collection.id) - async def test_create_container_if_not_exists_throughput_bucket(self): + async def test_create_container_if_not_exists_throughput_bucket_async(self): created_collection = await self.database.create_container_if_not_exists( str(uuid.uuid4()), PartitionKey(path="/pk"), @@ -101,7 +101,7 @@ async def test_create_container_if_not_exists_throughput_bucket(self): raw_response_hook=request_raw_response_hook) await self.database.delete_container(created_collection.id) - async def test_delete_container_throughput_bucket(self): + async def test_delete_container_throughput_bucket_async(self): created_collection = await self.database.create_container( str(uuid.uuid4()), PartitionKey(path="/pk")) @@ -110,19 +110,19 @@ async def test_delete_container_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_list_containers_throughput_bucket(self): + async def test_list_containers_throughput_bucket_async(self): self.database.list_containers( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_query_containers_throughput_bucket(self): + async def test_query_containers_throughput_bucket_async(self): query = 'SELECT * from c' self.database.query_containers( query=query, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_replace_container_throughput_bucket(self): + async def test_replace_container_throughput_bucket_async(self): replaced_collection = await self.database.replace_container( self.container, PartitionKey(path="/pk"), @@ -130,12 +130,12 @@ async def test_replace_container_throughput_bucket(self): raw_response_hook=request_raw_response_hook) self.database.delete_container(replaced_collection.id) - async def test_container_read_throughput_bucket(self): + async def test_container_read_throughput_bucket_async(self): await self.container.read( throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_read_item_throughput_bucket(self): + async def test_container_read_item_throughput_bucket_async(self): created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) await self.container.read_item( item=created_document['id'], @@ -143,7 +143,7 @@ async def test_container_read_item_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_read_all_items_throughput_bucket(self): + async def test_container_read_all_items_throughput_bucket_async(self): for i in range(10): await self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) @@ -151,7 +151,7 @@ async def test_container_read_all_items_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_query_items_throughput_bucket(self): + async def test_container_query_items_throughput_bucket_async(self): doc_id = 'MyId' + str(uuid.uuid4()) document_definition = {'pk': 'pk', 'id': doc_id} await self.container.create_item(body=document_definition) @@ -163,7 +163,7 @@ async def test_container_query_items_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_replace_item_throughput_bucket(self): + async def test_container_replace_item_throughput_bucket_async(self): created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) await self.container.replace_item( item=created_document['id'], @@ -171,19 +171,19 @@ async def test_container_replace_item_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_upsert_item_throughput_bucket(self): + async def test_container_upsert_item_throughput_bucket_async(self): await self.container.upsert_item( body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_create_item_throughput_bucket(self): + async def test_container_create_item_throughput_bucket_async(self): await self.container.create_item( body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_patch_item_throughput_bucket(self): + async def test_container_patch_item_throughput_bucket_async(self): pkValue = "patch_item_pk" + str(uuid.uuid4()) # Create item to patch item = { @@ -212,7 +212,7 @@ async def test_container_patch_item_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_execute_item_batch_throughput_bucket(self): + async def test_container_execute_item_batch_throughput_bucket_async(self): created_collection = await self.database.create_container( id='test_execute_item ' + str(uuid.uuid4()), partition_key=PartitionKey(path='/company')) @@ -228,7 +228,7 @@ async def test_container_execute_item_batch_throughput_bucket(self): await self.database.delete_container(created_collection) - async def test_container_delete_item_throughput_bucket(self): + async def test_container_delete_item_throughput_bucket_async(self): created_item = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) await self.container.delete_item( @@ -237,7 +237,7 @@ async def test_container_delete_item_throughput_bucket(self): throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - async def test_container_delete_all_items_by_partition_key_throughput_bucket(self): + async def test_container_delete_all_items_by_partition_key_throughput_bucket_async(self): created_collection = await self.database.create_container( id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), partition_key=PartitionKey(path='/pk', kind='Hash')) From 73b773ff8b2bfe7d7729cb7687106447c5f407e3 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Fri, 4 Apr 2025 14:51:31 -0400 Subject: [PATCH 09/15] removed extra line --- sdk/cosmos/azure-cosmos/tests/test_headers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 8682a50ab982..2303170cc625 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -285,7 +285,6 @@ def test_container_delete_item_throughput_bucket(self): raw_response_hook=request_raw_response_hook) def test_container_delete_all_items_by_partition_key_throughput_bucket(self): - created_collection = self.database.create_container( id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), partition_key=PartitionKey(path='/pk', kind='Hash')) From 419e374bda7d25b460e4e73652c4f79d1d3730a6 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Mon, 7 Apr 2025 21:05:27 -0400 Subject: [PATCH 10/15] removed unnecessary syntax --- .../azure-cosmos/azure/cosmos/_cosmos_client_connection.py | 2 +- .../azure/cosmos/aio/_cosmos_client_connection_async.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index f76024cb7846..2e0cb7de0011 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -161,7 +161,7 @@ def __init__( # pylint: disable=too-many-statements } throughput_bucket = kwargs.pop('throughput_bucket', None) - if throughput_bucket is not None: + if throughput_bucket: self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket # Keeps the latest response headers from the server. diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index 9033e14213f1..6f439d076fc0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -164,7 +164,7 @@ def __init__( # pylint: disable=too-many-statements } throughput_bucket = kwargs.pop('throughput_bucket', None) - if throughput_bucket is not None: + if throughput_bucket: self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket if consistency_level is not None: From d661dff0096c806a11d401931d9e7e49540d4cb5 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Fri, 11 Apr 2025 01:03:23 -0400 Subject: [PATCH 11/15] made requested changes on pr, mostly for async tests --- .../azure-cosmos/azure/cosmos/aio/_container.py | 2 +- .../azure/cosmos/aio/_cosmos_client.py | 2 +- .../azure-cosmos/azure/cosmos/aio/_database.py | 2 +- .../azure-cosmos/azure/cosmos/cosmos_client.py | 2 +- sdk/cosmos/azure-cosmos/azure/cosmos/database.py | 4 ++-- sdk/cosmos/azure-cosmos/tests/test_headers.py | 3 ++- .../azure-cosmos/tests/test_headers_async.py | 14 +++++++------- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 5f3b851e6f50..b7993d414dcd 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -487,7 +487,7 @@ def query_items( if priority is not None: kwargs['priority'] = priority if throughput_bucket is not None: - kwargs["throughputBucket"] = throughput_bucket + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py index d87fb57e0cf8..64f50020d373 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py @@ -355,7 +355,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) - if throughput_bucket is not None: + if throughput_bucket is not None: kwargs["throughput_bucket"] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py index 62cafec1d6ef..9cc576a1c8eb 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py @@ -673,7 +673,7 @@ async def delete_container( DeprecationWarning) if initial_headers is not None: kwargs['initial_headers'] = initial_headers - if throughput_bucket is not None: + if throughput_bucket is not None: kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 6f771ab18ebc..10277941bfa0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -372,7 +372,7 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", UserWarning) - if throughput_bucket is not None: + if throughput_bucket is not None: kwargs["throughput_bucket"] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index a162b3162255..d23d86159991 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -144,7 +144,7 @@ def read( # pylint:disable=docstring-missing-param "The 'session_token' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) - if throughput_bucket is not None: + if throughput_bucket is not None: kwargs["throughput_bucket"] = throughput_bucket if populate_query_metrics is not None: warnings.warn( @@ -668,7 +668,7 @@ def replace_container( # pylint:disable=docstring-missing-param "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) - if throughput_bucket is not None: + if throughput_bucket is not None: kwargs['throughput_bucket'] = throughput_bucket if populate_query_metrics is not None: warnings.warn( diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 2303170cc625..43667b96ee64 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -68,7 +68,8 @@ def test_correlated_activity_id(self): list(self.container.query_items(query=query, partition_key="pk-1")) except StopIteration: pass - cosmos_client_connection._CosmosClientConnection__Post = original_connection_post + finally: + cosmos_client_connection._CosmosClientConnection__Post = original_connection_post def test_max_integrated_cache_staleness(self): cosmos_client_connection = self.container.client_connection diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py index 5e42dec3db30..7f2e21563977 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -56,7 +56,7 @@ async def test_request_precedence_throughput_bucket_async(self): "test_db" + str(uuid.uuid4()), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - client.delete_database(created_db.id) + await client.delete_database(created_db.id) async def test_create_db_if_not_exists_and_delete_db_throughput_bucket_async(self): created_db = await self.client.create_database_if_not_exists( @@ -128,7 +128,7 @@ async def test_replace_container_throughput_bucket_async(self): PartitionKey(path="/pk"), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) - self.database.delete_container(replaced_collection.id) + await self.database.delete_container(replaced_collection.id) async def test_container_read_throughput_bucket_async(self): await self.container.read( @@ -147,9 +147,9 @@ async def test_container_read_all_items_throughput_bucket_async(self): for i in range(10): await self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) - self.container.read_all_items( - throughput_bucket=request_throughput_bucket_number, - raw_response_hook=request_raw_response_hook) + async for item in self.container.read_all_items(throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook): + pass async def test_container_query_items_throughput_bucket_async(self): doc_id = 'MyId' + str(uuid.uuid4()) @@ -157,11 +157,11 @@ async def test_container_query_items_throughput_bucket_async(self): await self.container.create_item(body=document_definition) query = 'SELECT * from c' - self.container.query_items( + query_results = [item async for item in self.container.query_items( query=query, partition_key='pk', throughput_bucket=request_throughput_bucket_number, - raw_response_hook=request_raw_response_hook) + raw_response_hook=request_raw_response_hook)] async def test_container_replace_item_throughput_bucket_async(self): created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) From f8e3b3ae1db98a04c3f85e9b435063f3ea2a01c8 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Sun, 13 Apr 2025 21:14:10 -0400 Subject: [PATCH 12/15] added another finally block to test_headers --- sdk/cosmos/azure-cosmos/tests/test_headers.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 43667b96ee64..079dd61d1f86 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -20,7 +20,6 @@ def client_raw_response_hook(response): == str(client_throughput_bucket_number)) def request_raw_response_hook(response): - # if http_constants.HttpHeaders.ThroughputBucket in response.http_request.headers: assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] == str(request_throughput_bucket_number)) @@ -73,6 +72,7 @@ def test_correlated_activity_id(self): def test_max_integrated_cache_staleness(self): cosmos_client_connection = self.container.client_connection + original_connection_get = cosmos_client_connection._CosmosClientConnection__Get cosmos_client_connection._CosmosClientConnection__Get = MagicMock( side_effect=self.side_effect_dedicated_gateway_max_age_thousand) try: @@ -88,6 +88,8 @@ def test_max_integrated_cache_staleness(self): max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_million) except StopIteration: pass + finally: + cosmos_client_connection._CosmosClientConnection__Get = original_connection_get def test_negative_max_integrated_cache_staleness(self): try: From 2864b4b61d361f636426bcacc1b803ffd8967df2 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Mon, 14 Apr 2025 18:59:16 -0400 Subject: [PATCH 13/15] edited replace container test --- sdk/cosmos/azure-cosmos/tests/test_headers.py | 5 ++++- sdk/cosmos/azure-cosmos/tests/test_headers_async.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 079dd61d1f86..fa03441277a4 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -173,8 +173,11 @@ def test_query_containers_throughput_bucket(self): raw_response_hook=request_raw_response_hook) def test_replace_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) replaced_collection = self.database.replace_container( - self.container, + created_collection.id, PartitionKey(path="/pk"), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py index 7f2e21563977..9fc941d69807 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -123,8 +123,11 @@ async def test_query_containers_throughput_bucket_async(self): raw_response_hook=request_raw_response_hook) async def test_replace_container_throughput_bucket_async(self): + created_collection = await self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) replaced_collection = await self.database.replace_container( - self.container, + created_collection.id, PartitionKey(path="/pk"), throughput_bucket=request_throughput_bucket_number, raw_response_hook=request_raw_response_hook) From e05cb51e315a27f4967b69459b50589a6f85df92 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Tue, 15 Apr 2025 23:50:08 -0400 Subject: [PATCH 14/15] added sample files and section to the readme --- sdk/cosmos/azure-cosmos/README.md | 16 ++ .../azure-cosmos/samples/throughput_bucket.py | 142 +++++++++++++++++ .../samples/throughput_bucket_async.py | 143 ++++++++++++++++++ 3 files changed, 301 insertions(+) create mode 100644 sdk/cosmos/azure-cosmos/samples/throughput_bucket.py create mode 100644 sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py diff --git a/sdk/cosmos/azure-cosmos/README.md b/sdk/cosmos/azure-cosmos/README.md index 90a6b8bf9119..f9807e3c79be 100644 --- a/sdk/cosmos/azure-cosmos/README.md +++ b/sdk/cosmos/azure-cosmos/README.md @@ -874,6 +874,20 @@ may have additional latencies associated with searching in the service. You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well for additional guidance. +### Public Preview - Throughput Buckets +When multiple workloads share the same Azure Cosmos DB container, resource contention can lead to throttling, increased latency, and potential business impact. +To address this, Cosmos DB allows you to allocate throughput buckets, which help manage resource consumption for workloads sharing a Cosmos DB container by limiting the maximum throughput a bucket can consume. +However, throughput isn't reserved for any bucket, it remains shared across all workloads. + +Up to five (5) throughput buckets can be configured per container, with an ID ranging from 1-5. Each bucket has a maximum throughput percentage, capping the fraction of the container’s total throughput that it can consume. +Requests assigned to a bucket can consume throughput only up to this limit. If the bucket exceeds its configured limit, subsequent requests are throttled. +This ensures that no single workload consumes excessive throughput and impacts others. + +Throughput bucket configurations can be changed once every 10 minutes, otherwise the request is throttled with an HTTP 429 status code and substatus code 3213. +Also, requests with an invalid bucket ID (less than 1 or greater than 5) results in an error, as only bucket IDs 1 to 5 are valid. + +You can find our sync samples [here][cosmos_throughput_bucket_sample] and our async samples [here][cosmos_throughput_bucket_sample_async] as well for additional guidance. + ## Troubleshooting ### General @@ -1060,6 +1074,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos [BM25]: https://learn.microsoft.com/azure/search/index-similarity-and-scoring [cosmos_fts]: https://aka.ms/cosmosfulltextsearch [cosmos_index_policy_change]: https://learn.microsoft.com/azure/cosmos-db/index-policy#modifying-the-indexing-policy +[cosmos_throughput_bucket_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py +[cosmos_throughput_bucket_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py ## Contributing diff --git a/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py b/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py new file mode 100644 index 000000000000..cc381cff7d38 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py @@ -0,0 +1,142 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import azure.cosmos.cosmos_client as cosmos_client +import azure.cosmos.exceptions as exceptions +from azure.cosmos.partition_key import PartitionKey + +import uuid +import config + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates the basic throughput bucket operations at the client, database, container and item levels. +# +# 1. Setting throughput buckets at the Client Level +# +# 2. Setting Throughput Buckets at the Database Level +# 2.1 - Create and Delete Database +# +# 3. Setting Throughput Buckets at the Container Level +# 3.1 - Create container +# 3.2 - Query Container +# +# 4. Setting Throughput Buckets at the Item Level +# 4.1 - Read Item +# 4.2 - Create Item +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) multiple Databases and Containers on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +MASTER_KEY = config.settings['master_key'] +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +# Applies throughput bucket 1 to all requests from a client application +def create_client_with_throughput_bucket(host=HOST, master_key=MASTER_KEY): + cosmos_client.CosmosClient(host, master_key, + throughput_bucket=1) + +# Applies throughput bucket 2 to create and delete a database +def create_and_delete_database_with_throughput_bucket(client): + created_db = client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=2) + + client.delete_database( + created_db.id, + throughput_bucket=2) + +# Applies throughput bucket 3 to create a container +def create_container_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=3) + + database.delete_container(created_container.id) + +# Applies throughput bucket 3 for requests to query containers +def query_containers_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + query = 'SELECT * from c' + database.query_containers( + query=query, + throughput_bucket=3) + +# Applies throughput bucket 4 for read item requests +def container_read_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + created_container = database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + created_document = created_container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + created_container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=4) + + database.delete_container(created_container.id) + +# Applies throughput bucket 5 for create item requests +def container_create_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + + created_container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=5) + + database.delete_container(created_container.id) + +def run_sample(): + client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY} ) + client.create_database_if_not_exists(id=DATABASE_ID) + try: + # creates client + create_client_with_throughput_bucket() + + # creates and deletes a database + create_and_delete_database_with_throughput_bucket(client) + + # create a container + create_container_with_throughput_bucket(client) + + # queries containers in a database + query_containers_with_throughput_bucket(client) + + # reads an item from a container + container_read_item_throughput_bucket(client) + + # writes an item to a container + container_create_item_throughput_bucket(client) + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + +if __name__ == '__main__': + run_sample() diff --git a/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py b/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py new file mode 100644 index 000000000000..ad40f52c2722 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py @@ -0,0 +1,143 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +from azure.cosmos.aio import CosmosClient +import azure.cosmos.exceptions as exceptions +from azure.cosmos.partition_key import PartitionKey + +import uuid +import asyncio +import config + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates the basic throughput bucket operations at the client, database, container and item levels. +# +# 1. Setting throughput buckets at the Client Level +# +# 2. Setting Throughput Buckets at the Database Level +# 2.1 - Create and Delete Database +# +# 3. Setting Throughput Buckets at the Container Level +# 3.1 - Create container +# 3.2 - Query Container +# +# 4. Setting Throughput Buckets at the Item Level +# 4.1 - Read Item +# 4.2 - Create Item +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) multiple Databases and Containers on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +MASTER_KEY = config.settings['master_key'] +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +# Applies throughput bucket 1 to all requests from a client application +async def create_client_with_throughput_bucket(host=HOST, master_key=MASTER_KEY): + async with CosmosClient(host, master_key, throughput_bucket=1) as client: + pass + +# Applies throughput bucket 2 to create and delete a database +async def create_and_delete_database_with_throughput_bucket(client): + created_db = await client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=2) + + await client.delete_database( + created_db.id, + throughput_bucket=2) + +# Applies throughput bucket 3 to create a container +async def create_container_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = await database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=3) + + await database.delete_container(created_container.id) + +# Applies throughput bucket 3 for requests to query containers +async def query_containers_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + query = 'SELECT * from c' + database.query_containers( + query=query, + throughput_bucket=3) + +# Applies throughput bucket 4 for read item requests +async def container_read_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + created_container = await database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + created_document = await created_container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + await created_container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=4) + + await database.delete_container(created_container.id) + +# Applies throughput bucket 5 for create item requests +async def container_create_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = await database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + + await created_container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=5) + + await database.delete_container(created_container.id) + +async def run_sample(): + async with CosmosClient(HOST, {'masterKey': MASTER_KEY} ) as client: + await client.create_database_if_not_exists(id=DATABASE_ID) + try: + # creates client + await create_client_with_throughput_bucket() + + # creates and deletes a database + await create_and_delete_database_with_throughput_bucket(client) + + # create a container + await create_container_with_throughput_bucket(client) + + # queries containers in a database + await query_containers_with_throughput_bucket(client) + + # reads an item from a container + await container_read_item_throughput_bucket(client) + + # writes an item to a container + await container_create_item_throughput_bucket(client) + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + +if __name__ == '__main__': + asyncio.run(run_sample()) From 2dfd408217be4ebc53d8de9c3c71f70c6d433731 Mon Sep 17 00:00:00 2001 From: Andrew Mathew Date: Thu, 17 Apr 2025 15:27:44 -0400 Subject: [PATCH 15/15] got rid of trailing whitespace --- sdk/cosmos/azure-cosmos/azure/cosmos/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index d23d86159991..c8642cfa82e2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -405,7 +405,7 @@ def delete_container( # pylint:disable=docstring-missing-param populate_query_metrics: Optional[bool] = None, *, initial_headers: Optional[Dict[str, str]] = None, - throughput_bucket: Optional[int] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete a container.