diff --git a/sdk/cosmos/azure-cosmos/README.md b/sdk/cosmos/azure-cosmos/README.md index 90a6b8bf9119..f9807e3c79be 100644 --- a/sdk/cosmos/azure-cosmos/README.md +++ b/sdk/cosmos/azure-cosmos/README.md @@ -874,6 +874,20 @@ may have additional latencies associated with searching in the service. You can find our sync samples [here][cosmos_index_sample] and our async samples [here][cosmos_index_sample_async] as well for additional guidance. +### Public Preview - Throughput Buckets +When multiple workloads share the same Azure Cosmos DB container, resource contention can lead to throttling, increased latency, and potential business impact. +To address this, Cosmos DB allows you to allocate throughput buckets, which help manage resource consumption for workloads sharing a Cosmos DB container by limiting the maximum throughput a bucket can consume. +However, throughput isn't reserved for any bucket, it remains shared across all workloads. + +Up to five (5) throughput buckets can be configured per container, with an ID ranging from 1-5. Each bucket has a maximum throughput percentage, capping the fraction of the container’s total throughput that it can consume. +Requests assigned to a bucket can consume throughput only up to this limit. If the bucket exceeds its configured limit, subsequent requests are throttled. +This ensures that no single workload consumes excessive throughput and impacts others. + +Throughput bucket configurations can be changed once every 10 minutes, otherwise the request is throttled with an HTTP 429 status code and substatus code 3213. +Also, requests with an invalid bucket ID (less than 1 or greater than 5) results in an error, as only bucket IDs 1 to 5 are valid. + +You can find our sync samples [here][cosmos_throughput_bucket_sample] and our async samples [here][cosmos_throughput_bucket_sample_async] as well for additional guidance. + ## Troubleshooting ### General @@ -1060,6 +1074,8 @@ For more extensive documentation on the Cosmos DB service, see the [Azure Cosmos [BM25]: https://learn.microsoft.com/azure/search/index-similarity-and-scoring [cosmos_fts]: https://aka.ms/cosmosfulltextsearch [cosmos_index_policy_change]: https://learn.microsoft.com/azure/cosmos-db/index-policy#modifying-the-indexing-policy +[cosmos_throughput_bucket_sample]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py +[cosmos_throughput_bucket_sample_async]: https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py ## Contributing diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index addde69e515e..b9fe0285b574 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -63,6 +63,7 @@ 'priority': 'priorityLevel', 'no_response': 'responsePayloadOnWriteDisabled', 'max_item_count': 'maxItemCount', + 'throughput_bucket': 'throughputBucket' } # Cosmos resource ID validation regex breakdown: @@ -317,6 +318,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches if options.get("correlatedActivityId"): headers[http_constants.HttpHeaders.CorrelatedActivityId] = options["correlatedActivityId"] + if options.get("throughputBucket"): + headers[http_constants.HttpHeaders.ThroughputBucket] = options["throughputBucket"] + if resource_type == "docs" and verb != "get": if "responsePayloadOnWriteDisabled" in options: responsePayloadOnWriteDisabled = options["responsePayloadOnWriteDisabled"] diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py index e43cd4c9c287..939c8299885e 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py @@ -160,6 +160,10 @@ def __init__( # pylint: disable=too-many-statements http_constants.HttpHeaders.IsContinuationExpected: False, } + throughput_bucket = kwargs.pop('throughput_bucket', None) + if throughput_bucket: + self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket + # Keeps the latest response headers from the server. self.last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict() diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index 79a5b766eb3e..b7993d414dcd 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -156,6 +156,7 @@ async def read( populate_quota_info: Optional[bool] = None, priority: Optional[Literal["High", "Low"]] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the container properties. @@ -169,6 +170,7 @@ async def read( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be retrieved. This includes if the container does not exist. :returns: Dict representing the retrieved container. @@ -185,6 +187,8 @@ async def read( kwargs['priority'] = priority if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) if populate_partition_key_range_statistics is not None: request_options["populatePartitionKeyRangeStatistics"] = populate_partition_key_range_statistics @@ -208,6 +212,7 @@ async def create_item( initial_headers: Optional[Dict[str, str]] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Create an item in the container. @@ -232,6 +237,7 @@ async def create_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Item with the given ID already exists. :returns: A CosmosDict representing the new item. The dict will be empty if `no_response` is specified. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -261,6 +267,8 @@ async def create_item( kwargs['priority'] = priority if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = not enable_automatic_id_generation if indexing_directive is not None: @@ -284,6 +292,7 @@ async def read_item( initial_headers: Optional[Dict[str, str]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Get the item identified by `item`. @@ -303,6 +312,7 @@ async def read_item( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved. :returns: A CosmosDict representing the retrieved item. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -326,6 +336,8 @@ async def read_item( kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["partitionKey"] = await self._set_partition_key(partition_key) @@ -346,6 +358,7 @@ def read_all_items( initial_headers: Optional[Dict[str, str]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List all the items in the container. @@ -361,6 +374,7 @@ def read_all_items( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] """ @@ -370,6 +384,8 @@ def read_all_items( kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -404,6 +420,7 @@ def query_items( priority: Optional[Literal["High", "Low"]] = None, continuation_token_limit: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Return all results matching the given `query`. @@ -441,6 +458,7 @@ def query_items( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An AsyncItemPaged of items (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -468,6 +486,8 @@ def query_items( kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -730,6 +750,7 @@ async def upsert_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Insert or update the specified item. @@ -754,6 +775,7 @@ async def upsert_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item could not be upserted. :returns: A CosmosDict representing the upserted item. The dict will be empty if `no_response` is specified. @@ -775,6 +797,8 @@ async def upsert_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True if self.container_link in self.__get_client_container_caches(): @@ -802,6 +826,7 @@ async def replace_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """Replaces the specified item if it exists in the container. @@ -827,6 +852,7 @@ async def replace_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The replace operation failed or the item with given id does not exist. :returns: A CosmosDict representing the item after replace went through. The dict will be empty if `no_response` @@ -850,6 +876,8 @@ async def replace_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True if self.container_link in self.__get_client_container_caches(): @@ -875,6 +903,7 @@ async def patch_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosDict: """ Patches the specified item with the provided operations if it @@ -903,6 +932,7 @@ async def patch_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from client-level options. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The patch operations failed or the item with given id does not exist. :returns: A CosmosDict representing the item after the patch operations went through. The dict will be empty if @@ -923,6 +953,8 @@ async def patch_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["disableAutomaticIdGeneration"] = True request_options["partitionKey"] = await self._set_partition_key(partition_key) @@ -949,6 +981,7 @@ async def delete_item( etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the specified item from the container. @@ -972,6 +1005,7 @@ async def delete_item( before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], None], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The item wasn't deleted successfully. :raises ~azure.cosmos.exceptions.CosmosResourceNotFoundError: The item does not exist in the container. :rtype: None @@ -990,6 +1024,8 @@ async def delete_item( kwargs['match_condition'] = match_condition if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["partitionKey"] = await self._set_partition_key(partition_key) if self.container_link in self.__get_client_container_caches(): @@ -1207,6 +1243,7 @@ async def delete_all_items_by_partition_key( pre_trigger_include: Optional[str] = None, post_trigger_include: Optional[str] = None, session_token: Optional[str] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """The delete by partition key feature is an asynchronous, background operation that allows you to delete all @@ -1221,6 +1258,7 @@ async def delete_all_items_by_partition_key( :keyword str post_trigger_include: trigger id to be used as post operation trigger. :keyword str session_token: Token for use with Session consistency. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :rtype: None """ etag = kwargs.get('etag') @@ -1242,6 +1280,8 @@ async def delete_all_items_by_partition_key( kwargs['post_trigger_include'] = post_trigger_include if session_token is not None: kwargs['session_token'] = session_token + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) # regardless if partition key is valid we set it as invalid partition keys are set to a default empty value request_options["partitionKey"] = await self._set_partition_key(partition_key) @@ -1261,6 +1301,7 @@ async def execute_item_batch( post_trigger_include: Optional[str] = None, session_token: Optional[str] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> CosmosList: """ Executes the transactional batch for the specified partition key. @@ -1276,6 +1317,7 @@ async def execute_item_batch( request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A CosmosList representing the items after the batch operations went through. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The batch failed to execute. :raises ~azure.cosmos.exceptions.CosmosBatchOperationError: A transactional batch operation failed in the batch. @@ -1302,6 +1344,8 @@ async def execute_item_batch( kwargs['session_token'] = session_token if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) request_options["partitionKey"] = await self._set_partition_key(partition_key) request_options["disableAutomaticIdGeneration"] = True diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py index 683f16288cd3..64f50020d373 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client.py @@ -167,6 +167,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword level (to log all requests) or at a single request level. Requests will be logged at INFO level. :keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending response payloads for write operations on items by default unless specified differently per operation. + :keyword int throughput_bucket: The desired throughput bucket for the client .. admonition:: Example: @@ -251,6 +252,7 @@ async def create_database( *, offer_throughput: Optional[Union[int, ThroughputProperties]] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -261,6 +263,7 @@ async def create_database( :paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties] :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosResourceExistsError: Database with the given ID already exists. :returns: A DatabaseProxy instance representing the new database. @@ -296,6 +299,8 @@ async def create_database( DeprecationWarning) if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) _set_throughput_options(offer=offer_throughput, request_options=request_options) @@ -309,6 +314,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin *, offer_throughput: Optional[Union[int, ThroughputProperties]] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -325,6 +331,7 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin :paramtype offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties] :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Dict[str, str], Dict[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed. :returns: A DatabaseProxy instance representing the database. @@ -348,6 +355,8 @@ async def create_database_if_not_exists( # pylint: disable=redefined-builtin "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers try: @@ -385,6 +394,7 @@ def list_databases( max_item_count: Optional[int] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List the databases in a Cosmos DB SQL database account. @@ -393,6 +403,7 @@ def list_databases( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any]], None] :returns: An AsyncItemPaged of database properties (dicts). :rtype: AsyncItemPaged[Dict[str, str]] @@ -405,6 +416,8 @@ def list_databases( DeprecationWarning) if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -423,6 +436,7 @@ def query_databases( max_item_count: Optional[int] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """Query the databases in a Cosmos DB SQL database account. @@ -435,6 +449,7 @@ def query_databases( :keyword str session_token: Token for use with Session consistency. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any]], None] :returns: An AsyncItemPaged of database properties (dicts). :rtype: AsyncItemPaged[Dict[str, str]] @@ -447,6 +462,8 @@ def query_databases( DeprecationWarning) if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -466,6 +483,7 @@ async def delete_database( *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the database with the given ID (name). @@ -475,6 +493,7 @@ async def delete_database( :type database: Union[str, ~azure.cosmos.DatabaseProxy, Dict[str, Any]] :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any]], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted. :rtype: None @@ -497,7 +516,8 @@ async def delete_database( "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) - + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers request_options = _build_options(kwargs) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py index cd2e3ca9c9f0..56052384c0c9 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py @@ -163,6 +163,10 @@ def __init__( # pylint: disable=too-many-statements http_constants.HttpHeaders.IsContinuationExpected: False, } + throughput_bucket = kwargs.pop('throughput_bucket', None) + if throughput_bucket: + self.default_headers[http_constants.HttpHeaders.ThroughputBucket] = throughput_bucket + if consistency_level is not None: self.default_headers[http_constants.HttpHeaders.ConsistencyLevel] = consistency_level diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py index 32a0b1b0ee13..9cc576a1c8eb 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_database.py @@ -131,6 +131,7 @@ async def read( self, *, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the database properties. @@ -138,6 +139,7 @@ async def read( :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved. :returns: A dict representing the database properties :rtype: Dict[str, Any] @@ -152,6 +154,8 @@ async def read( database_link = _get_database_link(self) if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket request_options = _build_options(kwargs) self._properties = await self.client_connection.ReadDatabase( @@ -177,6 +181,7 @@ async def create_container( vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a new container with the given ID (name). @@ -210,6 +215,7 @@ async def create_container( :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :returns: A `ContainerProxy` instance representing the new container. :rtype: ~azure.cosmos.aio.ContainerProxy @@ -277,9 +283,10 @@ async def create_container( definition["changeFeedPolicy"] = change_feed_policy if full_text_policy is not None: definition["fullTextPolicy"] = full_text_policy - if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) _set_throughput_options(offer=offer_throughput, request_options=request_options) @@ -305,6 +312,7 @@ async def create_container_if_not_exists( vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a container if it does not exist already. @@ -340,6 +348,7 @@ async def create_container_if_not_exists( :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :returns: A `ContainerProxy` instance representing the new container. :rtype: ~azure.cosmos.aio.ContainerProxy @@ -384,6 +393,7 @@ async def create_container_if_not_exists( vector_embedding_policy=vector_embedding_policy, change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, + throughput_bucket=throughput_bucket, **kwargs ) @@ -421,6 +431,7 @@ def list_containers( max_item_count: Optional[int] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs ) -> AsyncItemPaged[Dict[str, Any]]: """List the containers in the database. @@ -428,6 +439,7 @@ def list_containers( :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None] :returns: An AsyncItemPaged of container properties (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -450,6 +462,8 @@ def list_containers( DeprecationWarning) if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -470,6 +484,7 @@ def query_containers( max_item_count: Optional[int] = None, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> AsyncItemPaged[Dict[str, Any]]: """List the properties for containers in the current database. @@ -481,6 +496,7 @@ def query_containers( :keyword int max_item_count: Max number of items to be returned in the enumeration operation. :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any], AsyncItemPaged[Dict[str, Any]]], None] :returns: An AsyncItemPaged of container properties (dicts). :rtype: AsyncItemPaged[Dict[str, Any]] @@ -493,6 +509,8 @@ def query_containers( DeprecationWarning) if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket feed_options = _build_options(kwargs) if max_item_count is not None: feed_options["maxItemCount"] = max_item_count @@ -520,6 +538,7 @@ async def replace_container( analytical_storage_ttl: Optional[int] = None, computed_properties: Optional[List[Dict[str, str]]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Reset the properties of the container. @@ -548,6 +567,7 @@ async def replace_container( :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the container after replace completed. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced. This includes if the container with given id does not exist. @@ -583,6 +603,8 @@ async def replace_container( DeprecationWarning) if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) container_id = self._get_container_id(container) @@ -615,6 +637,7 @@ async def delete_container( container: Union[str, ContainerProxy, Mapping[str, Any]], *, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete a container. @@ -626,6 +649,7 @@ async def delete_container( :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], None], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted. :rtype: None """ @@ -649,6 +673,8 @@ async def delete_container( DeprecationWarning) if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = _build_options(kwargs) collection_link = self._get_container_link(container) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index ef676cf3e1d2..10b01a9b5254 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -154,6 +154,7 @@ def read( # pylint:disable=docstring-missing-param *, priority: Optional[Literal["High", "Low"]] = None, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> Dict[str, Any]: @@ -169,6 +170,7 @@ def read( # pylint:disable=docstring-missing-param :keyword dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], Dict[str, Any]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be retrieved. This includes if the container does not exist. :returns: Dict representing the retrieved container. @@ -184,6 +186,8 @@ def read( # pylint:disable=docstring-missing-param kwargs['priority'] = priority if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -213,6 +217,7 @@ def read_item( # pylint:disable=docstring-missing-param initial_headers: Optional[Dict[str, str]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> CosmosDict: @@ -233,6 +238,7 @@ def read_item( # pylint:disable=docstring-missing-param :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A CosmosDict representing the item to be retrieved. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item couldn't be retrieved. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -253,6 +259,8 @@ def read_item( # pylint:disable=docstring-missing-param kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -282,6 +290,7 @@ def read_all_items( # pylint:disable=docstring-missing-param initial_headers: Optional[Dict[str, str]] = None, max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: @@ -298,6 +307,7 @@ def read_all_items( # pylint:disable=docstring-missing-param :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of items (dicts). :rtype: Iterable[Dict[str, Any]] """ @@ -307,6 +317,8 @@ def read_all_items( # pylint:disable=docstring-missing-param kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook feed_options = build_options(kwargs) @@ -561,6 +573,7 @@ def query_items( # pylint:disable=docstring-missing-param max_integrated_cache_staleness_in_ms: Optional[int] = None, priority: Optional[Literal["High", "Low"]] = None, continuation_token_limit: Optional[int] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: @@ -601,6 +614,7 @@ def query_items( # pylint:disable=docstring-missing-param :keyword bool populate_index_metrics: Used to obtain the index metrics to understand how the query engine used existing indexes and how it could use potential new indexes. Please note that this options will incur overhead, so it should be enabled only when debugging slow queries. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of items (dicts). :rtype: ItemPaged[Dict[str, Any]] @@ -626,6 +640,8 @@ def query_items( # pylint:disable=docstring-missing-param kwargs['initial_headers'] = initial_headers if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughputBucket"] = throughput_bucket feed_options = build_options(kwargs) if enable_cross_partition_query is not None: feed_options["enableCrossPartitionQuery"] = enable_cross_partition_query @@ -690,6 +706,7 @@ def replace_item( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> CosmosDict: @@ -716,6 +733,7 @@ def replace_item( # pylint:disable=docstring-missing-param :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The replace operation failed or the item with given id does not exist. :returns: A CosmosDict representing the item after replace went through. The dict will be empty if `no_response` @@ -739,6 +757,8 @@ def replace_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -770,6 +790,7 @@ def upsert_item( # pylint:disable=docstring-missing-param match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> CosmosDict: @@ -795,6 +816,7 @@ def upsert_item( # pylint:disable=docstring-missing-param :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The given item could not be upserted. :returns: A CosmosDict representing the upserted item. The dict will be empty if `no_response` is specified. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -815,6 +837,8 @@ def upsert_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -850,6 +874,7 @@ def create_item( # pylint:disable=docstring-missing-param initial_headers: Optional[Dict[str, str]] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> CosmosDict: @@ -876,6 +901,7 @@ def create_item( # pylint:disable=docstring-missing-param :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Item with the given ID already exists. :returns: A CosmosDict representing the new item. The dict will be empty if `no_response` is specified. :rtype: ~azure.cosmos.CosmosDict[str, Any] @@ -905,6 +931,8 @@ def create_item( # pylint:disable=docstring-missing-param kwargs['priority'] = priority if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -938,6 +966,7 @@ def patch_item( match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, no_response: Optional[bool] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], Dict[str, Any]], None]] = None, **kwargs: Any ) -> CosmosDict: @@ -967,6 +996,7 @@ def patch_item( :keyword bool no_response: Indicates whether service should be instructed to skip sending response payloads. When not specified explicitly here, the default value will be determined from kwargs or when also not specified there from client-level kwargs. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The patch operations failed or the item with given id does not exist. :returns: A CosmosDict representing the item after the patch operations went through. The dict will be empty @@ -987,6 +1017,8 @@ def patch_item( kwargs['match_condition'] = match_condition if no_response is not None: kwargs['no_response'] = no_response + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -1012,6 +1044,7 @@ def execute_item_batch( post_trigger_include: Optional[str] = None, session_token: Optional[str] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], List[Dict[str, Any]]], None]] = None, **kwargs: Any ) -> CosmosList: @@ -1027,6 +1060,7 @@ def execute_item_batch( :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: [Callable[[Mapping[str, str], List[Dict[str, Any]]], None] :returns: A CosmosList representing the items after the batch operations went through. @@ -1055,6 +1089,8 @@ def execute_item_batch( kwargs['session_token'] = session_token if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -1078,6 +1114,7 @@ def delete_item( # pylint:disable=docstring-missing-param etag: Optional[str] = None, match_condition: Optional[MatchConditions] = None, priority: Optional[Literal["High", "Low"]] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], None], None]] = None, **kwargs: Any ) -> None: @@ -1099,6 +1136,7 @@ def delete_item( # pylint:disable=docstring-missing-param :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each request. Once the user has reached their provisioned throughput, low priority requests are throttled before high priority requests start getting throttled. Feature must first be enabled at the account level. + :keyword int throughput_bucket: The desired throughput bucket for the client :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], None], None] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The item wasn't deleted successfully. @@ -1115,6 +1153,8 @@ def delete_item( # pylint:disable=docstring-missing-param kwargs['match_condition'] = match_condition if priority is not None: kwargs['priority'] = priority + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) @@ -1360,6 +1400,7 @@ def delete_all_items_by_partition_key( pre_trigger_include: Optional[str] = None, post_trigger_include: Optional[str] = None, session_token: Optional[str] = None, + throughput_bucket: Optional[int] = None, response_hook: Optional[Callable[[Mapping[str, str], None], None]] = None, **kwargs: Any ) -> None: @@ -1374,6 +1415,7 @@ def delete_all_items_by_partition_key( :keyword str pre_trigger_include: trigger id to be used as pre operation trigger. :keyword str post_trigger_include: trigger id to be used as post operation trigger. :keyword str session_token: Token for use with Session consistency. + :keyword int throughput_bucket: The desired throughput bucket for the client :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str], None], None] = None, :rtype: None @@ -1397,6 +1439,8 @@ def delete_all_items_by_partition_key( kwargs['post_trigger_include'] = post_trigger_include if session_token is not None: kwargs['session_token'] = session_token + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if response_hook is not None: kwargs['response_hook'] = response_hook request_options = build_options(kwargs) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py index 10543f97c47b..10277941bfa0 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py @@ -187,6 +187,7 @@ class CosmosClient: # pylint: disable=client-accepts-api-version-keyword level (to log all requests) or at a single request level. Requests will be logged at INFO level. :keyword bool no_response_on_write: Indicates whether service should be instructed to skip sending response payloads on rite operations for items. + :keyword int throughput_bucket: The desired throughput bucket for the client .. admonition:: Example: @@ -261,6 +262,7 @@ def create_database( # pylint:disable=docstring-missing-param *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -271,6 +273,7 @@ def create_database( # pylint:disable=docstring-missing-param :type offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties] :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, str]], None] :returns: A DatabaseProxy instance representing the new database. :rtype: ~azure.cosmos.DatabaseProxy @@ -303,6 +306,8 @@ def create_database( # pylint:disable=docstring-missing-param "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", UserWarning) + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if populate_query_metrics is not None: warnings.warn( "The 'populate_query_metrics' flag does not apply to this method" @@ -327,6 +332,7 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param offer_throughput: Optional[Union[int, ThroughputProperties]] = None, *, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> DatabaseProxy: """ @@ -343,6 +349,7 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param :type offer_throughput: Union[int, ~azure.cosmos.ThroughputProperties] :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A DatabaseProxy instance representing the database. :rtype: ~azure.cosmos.DatabaseProxy :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The database read or creation failed. @@ -365,7 +372,8 @@ def create_database_if_not_exists( # pylint:disable=docstring-missing-param "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", UserWarning) - + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers try: @@ -408,6 +416,7 @@ def list_databases( # pylint:disable=docstring-missing-param *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List the databases in a Cosmos DB SQL database account. @@ -416,6 +425,7 @@ def list_databases( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of database properties (dicts). :rtype: Iterable[Dict[str, str]] """ @@ -430,7 +440,8 @@ def list_databases( # pylint:disable=docstring-missing-param "the populate_query_metrics flag does not apply to this method and will be removed in the future", UserWarning, ) - + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers feed_options = build_options(kwargs) @@ -452,6 +463,7 @@ def query_databases( # pylint:disable=docstring-missing-param *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """Query the databases in a Cosmos DB SQL database account. @@ -465,6 +477,7 @@ def query_databases( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of database properties (dicts). :rtype: Iterable[Dict[str, str]] """ @@ -482,6 +495,8 @@ def query_databases( # pylint:disable=docstring-missing-param if initial_headers is not None: kwargs["initial_headers"] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket feed_options = build_options(kwargs) if enable_cross_partition_query is not None: feed_options["enableCrossPartitionQuery"] = enable_cross_partition_query @@ -508,6 +523,7 @@ def delete_database( # pylint:disable=docstring-missing-param *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete the database with the given ID (name). @@ -518,6 +534,7 @@ def delete_database( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, str]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the database couldn't be deleted. :rtype: None """ @@ -544,7 +561,8 @@ def delete_database( # pylint:disable=docstring-missing-param "the populate_query_metrics flag does not apply to this method and will be removed in the future", UserWarning, ) - + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket if initial_headers is not None: kwargs["initial_headers"] = initial_headers request_options = build_options(kwargs) diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py index f367436b2c47..c8642cfa82e2 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/database.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/database.py @@ -126,12 +126,14 @@ def read( # pylint:disable=docstring-missing-param populate_query_metrics: Optional[bool] = None, *, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> Dict[str, Any]: """Read the database properties. :keyword dict[str,str] initial_headers: Initial headers to be sent as part of the request. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A dict representing the database properties. :rtype: Dict[Str, Any] :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the given database couldn't be retrieved. @@ -142,6 +144,8 @@ def read( # pylint:disable=docstring-missing-param "The 'session_token' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if populate_query_metrics is not None: warnings.warn( "the populate_query_metrics flag does not apply to this method and will be removed in the future", @@ -175,6 +179,7 @@ def create_container( # pylint:disable=docstring-missing-param vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a new container with the given ID (name). @@ -205,6 +210,7 @@ def create_container( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the new container. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container creation failed. :rtype: ~azure.cosmos.ContainerProxy @@ -276,9 +282,10 @@ def create_container( # pylint:disable=docstring-missing-param definition["changeFeedPolicy"] = change_feed_policy if full_text_policy is not None: definition["fullTextPolicy"] = full_text_policy - if initial_headers is not None: kwargs['initial_headers'] = initial_headers + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket request_options = build_options(kwargs) _set_throughput_options(offer=offer_throughput, request_options=request_options) result = self.client_connection.CreateContainer( @@ -305,6 +312,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param vector_embedding_policy: Optional[Dict[str, Any]] = None, change_feed_policy: Optional[Dict[str, Any]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Create a container if it does not exist already. @@ -337,6 +345,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the container. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: The container read or creation failed. :rtype: ~azure.cosmos.ContainerProxy @@ -364,6 +373,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param container_proxy = self.get_container_client(id) container_proxy.read( populate_query_metrics=populate_query_metrics, + throughput_bucket=throughput_bucket, initial_headers=initial_headers, **kwargs ) @@ -384,6 +394,7 @@ def create_container_if_not_exists( # pylint:disable=docstring-missing-param vector_embedding_policy=vector_embedding_policy, change_feed_policy=change_feed_policy, full_text_policy=full_text_policy, + throughput_bucket=throughput_bucket, **kwargs ) @@ -394,6 +405,7 @@ def delete_container( # pylint:disable=docstring-missing-param populate_query_metrics: Optional[bool] = None, *, initial_headers: Optional[Dict[str, str]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> None: """Delete a container. @@ -408,6 +420,7 @@ def delete_container( # pylint:disable=docstring-missing-param has changed, and act according to the condition specified by the `match_condition` parameter. :keyword ~azure.core.MatchConditions match_condition: The match condition to use upon the etag. :keyword Callable response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: If the container couldn't be deleted. :rtype: None """ @@ -429,6 +442,8 @@ def delete_container( # pylint:disable=docstring-missing-param "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if populate_query_metrics is not None: warnings.warn( "the populate_query_metrics flag does not apply to this method and will be removed in the future", @@ -475,6 +490,7 @@ def list_containers( # pylint:disable=docstring-missing-param *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List the containers in the database. @@ -484,6 +500,7 @@ def list_containers( # pylint:disable=docstring-missing-param :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. :paramtype response_hook: Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None] + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: An Iterable of container properties (dicts). :rtype: Iterable[Dict[str, Any]] @@ -502,6 +519,8 @@ def list_containers( # pylint:disable=docstring-missing-param "The 'session_token' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if populate_query_metrics is not None: warnings.warn( "the populate_query_metrics flag does not apply to this method and will be removed in the future", @@ -530,6 +549,7 @@ def query_containers( # pylint:disable=docstring-missing-param *, initial_headers: Optional[Dict[str, str]] = None, response_hook: Optional[Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ItemPaged[Dict[str, Any]]: """List the properties for containers in the current database. @@ -540,6 +560,7 @@ def query_containers( # pylint:disable=docstring-missing-param :param int max_item_count: Max number of items to be returned in the enumeration operation. :keyword Dict[str, str] initial_headers: Initial headers to be sent as part of the request. :keyword response_hook: A callable invoked with the response metadata. + :keyword int throughput_bucket: The desired throughput bucket for the client :paramtype response_hook: Callable[[Mapping[str, Any], ItemPaged[Dict[str, Any]]], None] :returns: An Iterable of container properties (dicts). :rtype: Iterable[Dict[str, Any]] @@ -550,6 +571,8 @@ def query_containers( # pylint:disable=docstring-missing-param "The 'session_token' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) + if throughput_bucket is not None: + kwargs["throughput_bucket"] = throughput_bucket if populate_query_metrics is not None: warnings.warn( "the populate_query_metrics flag does not apply to this method and will be removed in the future", @@ -585,6 +608,7 @@ def replace_container( # pylint:disable=docstring-missing-param analytical_storage_ttl: Optional[int] = None, computed_properties: Optional[List[Dict[str, str]]] = None, full_text_policy: Optional[Dict[str, Any]] = None, + throughput_bucket: Optional[int] = None, **kwargs: Any ) -> ContainerProxy: """Reset the properties of the container. @@ -611,6 +635,7 @@ def replace_container( # pylint:disable=docstring-missing-param :keyword Dict[str, Any] full_text_policy: **provisional** The full text policy for the container. Used to denote the default language to be used for all full text indexes, or to individually assign a language to each full text index path. + :keyword int throughput_bucket: The desired throughput bucket for the client :returns: A `ContainerProxy` instance representing the container after replace completed. :raises ~azure.cosmos.exceptions.CosmosHttpResponseError: Raised if the container couldn't be replaced. This includes if the container with given id does not exist. @@ -643,6 +668,8 @@ def replace_container( # pylint:disable=docstring-missing-param "The 'match_condition' flag does not apply to this method and is always ignored even if passed." " It will now be removed in the future.", DeprecationWarning) + if throughput_bucket is not None: + kwargs['throughput_bucket'] = throughput_bucket if populate_query_metrics is not None: warnings.warn( "the populate_query_metrics flag does not apply to this method and will be removed in the future", diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py index 31a95d2600d6..08e104d69de7 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py @@ -189,6 +189,7 @@ class HttpHeaders: DisableRUPerMinuteUsage = "x-ms-documentdb-disable-ru-per-minute-usage" IsRUPerMinuteUsed = "x-ms-documentdb-is-ru-per-minute-used" OfferIsRUPerMinuteThroughputEnabled = "x-ms-offer-is-ru-per-minute-throughput-enabled" + ThroughputBucket = "x-ms-cosmos-throughput-bucket" # Partitioned collection headers PartitionKey = "x-ms-documentdb-partitionkey" diff --git a/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py b/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py new file mode 100644 index 000000000000..cc381cff7d38 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/throughput_bucket.py @@ -0,0 +1,142 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +import azure.cosmos.cosmos_client as cosmos_client +import azure.cosmos.exceptions as exceptions +from azure.cosmos.partition_key import PartitionKey + +import uuid +import config + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates the basic throughput bucket operations at the client, database, container and item levels. +# +# 1. Setting throughput buckets at the Client Level +# +# 2. Setting Throughput Buckets at the Database Level +# 2.1 - Create and Delete Database +# +# 3. Setting Throughput Buckets at the Container Level +# 3.1 - Create container +# 3.2 - Query Container +# +# 4. Setting Throughput Buckets at the Item Level +# 4.1 - Read Item +# 4.2 - Create Item +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) multiple Databases and Containers on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +MASTER_KEY = config.settings['master_key'] +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +# Applies throughput bucket 1 to all requests from a client application +def create_client_with_throughput_bucket(host=HOST, master_key=MASTER_KEY): + cosmos_client.CosmosClient(host, master_key, + throughput_bucket=1) + +# Applies throughput bucket 2 to create and delete a database +def create_and_delete_database_with_throughput_bucket(client): + created_db = client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=2) + + client.delete_database( + created_db.id, + throughput_bucket=2) + +# Applies throughput bucket 3 to create a container +def create_container_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=3) + + database.delete_container(created_container.id) + +# Applies throughput bucket 3 for requests to query containers +def query_containers_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + query = 'SELECT * from c' + database.query_containers( + query=query, + throughput_bucket=3) + +# Applies throughput bucket 4 for read item requests +def container_read_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + created_container = database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + created_document = created_container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + created_container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=4) + + database.delete_container(created_container.id) + +# Applies throughput bucket 5 for create item requests +def container_create_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + + created_container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=5) + + database.delete_container(created_container.id) + +def run_sample(): + client = cosmos_client.CosmosClient(HOST, {'masterKey': MASTER_KEY} ) + client.create_database_if_not_exists(id=DATABASE_ID) + try: + # creates client + create_client_with_throughput_bucket() + + # creates and deletes a database + create_and_delete_database_with_throughput_bucket(client) + + # create a container + create_container_with_throughput_bucket(client) + + # queries containers in a database + query_containers_with_throughput_bucket(client) + + # reads an item from a container + container_read_item_throughput_bucket(client) + + # writes an item to a container + container_create_item_throughput_bucket(client) + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + +if __name__ == '__main__': + run_sample() diff --git a/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py b/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py new file mode 100644 index 000000000000..ad40f52c2722 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/samples/throughput_bucket_async.py @@ -0,0 +1,143 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See LICENSE.txt in the project root for +# license information. +# ------------------------------------------------------------------------- +from azure.cosmos.aio import CosmosClient +import azure.cosmos.exceptions as exceptions +from azure.cosmos.partition_key import PartitionKey + +import uuid +import asyncio +import config + +# ---------------------------------------------------------------------------------------------------------- +# Prerequisites - +# +# 1. An Azure Cosmos account - +# https://azure.microsoft.com/documentation/articles/documentdb-create-account/ +# +# 2. Microsoft Azure Cosmos PyPi package - +# https://pypi.python.org/pypi/azure-cosmos/ +# ---------------------------------------------------------------------------------------------------------- +# Sample - demonstrates the basic throughput bucket operations at the client, database, container and item levels. +# +# 1. Setting throughput buckets at the Client Level +# +# 2. Setting Throughput Buckets at the Database Level +# 2.1 - Create and Delete Database +# +# 3. Setting Throughput Buckets at the Container Level +# 3.1 - Create container +# 3.2 - Query Container +# +# 4. Setting Throughput Buckets at the Item Level +# 4.1 - Read Item +# 4.2 - Create Item +# ---------------------------------------------------------------------------------------------------------- +# Note - +# +# Running this sample will create (and delete) multiple Databases and Containers on your account. +# Each time a Container is created the account will be billed for 1 hour of usage based on +# the provisioned throughput (RU/s) of that account. +# ---------------------------------------------------------------------------------------------------------- + +HOST = config.settings['host'] +MASTER_KEY = config.settings['master_key'] +DATABASE_ID = config.settings['database_id'] +CONTAINER_ID = config.settings['container_id'] + +# Applies throughput bucket 1 to all requests from a client application +async def create_client_with_throughput_bucket(host=HOST, master_key=MASTER_KEY): + async with CosmosClient(host, master_key, throughput_bucket=1) as client: + pass + +# Applies throughput bucket 2 to create and delete a database +async def create_and_delete_database_with_throughput_bucket(client): + created_db = await client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=2) + + await client.delete_database( + created_db.id, + throughput_bucket=2) + +# Applies throughput bucket 3 to create a container +async def create_container_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = await database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=3) + + await database.delete_container(created_container.id) + +# Applies throughput bucket 3 for requests to query containers +async def query_containers_with_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + query = 'SELECT * from c' + database.query_containers( + query=query, + throughput_bucket=3) + +# Applies throughput bucket 4 for read item requests +async def container_read_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + created_container = await database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + created_document = await created_container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + await created_container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=4) + + await database.delete_container(created_container.id) + +# Applies throughput bucket 5 for create item requests +async def container_create_item_throughput_bucket(client): + database = client.get_database_client(DATABASE_ID) + + created_container = await database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + + await created_container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=5) + + await database.delete_container(created_container.id) + +async def run_sample(): + async with CosmosClient(HOST, {'masterKey': MASTER_KEY} ) as client: + await client.create_database_if_not_exists(id=DATABASE_ID) + try: + # creates client + await create_client_with_throughput_bucket() + + # creates and deletes a database + await create_and_delete_database_with_throughput_bucket(client) + + # create a container + await create_container_with_throughput_bucket(client) + + # queries containers in a database + await query_containers_with_throughput_bucket(client) + + # reads an item from a container + await container_read_item_throughput_bucket(client) + + # writes an item to a container + await container_create_item_throughput_bucket(client) + + except exceptions.CosmosHttpResponseError as e: + print('\nrun_sample has caught an error. {0}'.format(e.message)) + + finally: + print("\nrun_sample done") + +if __name__ == '__main__': + asyncio.run(run_sample()) diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers.py b/sdk/cosmos/azure-cosmos/tests/test_headers.py index 4c40fa86f83a..fa03441277a4 100644 --- a/sdk/cosmos/azure-cosmos/tests/test_headers.py +++ b/sdk/cosmos/azure-cosmos/tests/test_headers.py @@ -5,11 +5,23 @@ from unittest.mock import MagicMock import pytest +import uuid import azure.cosmos.cosmos_client as cosmos_client import test_config from azure.cosmos import DatabaseProxy +from azure.cosmos import http_constants, DatabaseProxy, _endpoint_discovery_retry_policy +from azure.cosmos.partition_key import PartitionKey +client_throughput_bucket_number = 2 +request_throughput_bucket_number = 3 +def client_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(client_throughput_bucket_number)) + +def request_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(request_throughput_bucket_number)) @pytest.mark.cosmosEmulator class TestHeaders(unittest.TestCase): @@ -48,15 +60,19 @@ def test_correlated_activity_id(self): query = 'SELECT * from c ORDER BY c._ts' cosmos_client_connection = self.container.client_connection + original_connection_post = cosmos_client_connection._CosmosClientConnection__Post cosmos_client_connection._CosmosClientConnection__Post = MagicMock( side_effect=self.side_effect_correlated_activity_id) try: list(self.container.query_items(query=query, partition_key="pk-1")) except StopIteration: pass + finally: + cosmos_client_connection._CosmosClientConnection__Post = original_connection_post def test_max_integrated_cache_staleness(self): cosmos_client_connection = self.container.client_connection + original_connection_get = cosmos_client_connection._CosmosClientConnection__Get cosmos_client_connection._CosmosClientConnection__Get = MagicMock( side_effect=self.side_effect_dedicated_gateway_max_age_thousand) try: @@ -72,6 +88,8 @@ def test_max_integrated_cache_staleness(self): max_integrated_cache_staleness_in_ms=self.dedicated_gateway_max_age_million) except StopIteration: pass + finally: + cosmos_client_connection._CosmosClientConnection__Get = original_connection_get def test_negative_max_integrated_cache_staleness(self): try: @@ -80,6 +98,221 @@ def test_negative_max_integrated_cache_staleness(self): except Exception as exception: assert isinstance(exception, ValueError) + def test_client_level_throughput_bucket(self): + cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number, + raw_response_hook=client_raw_response_hook) + + def test_request_precedence_throughput_bucket(self): + client = cosmos_client.CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number) + created_db = client.create_database( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + client.delete_database(created_db.id) + + def test_create_db_if_not_exists_and_delete_db_throughput_bucket(self): + created_db = self.client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.client.delete_database( + created_db.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_list_db_throughput_bucket(self): + self.client.list_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_query_db_throughput_bucket(self): + self.client.query_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_db_read_throughput_bucket(self): + self.database.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_create_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(created_collection.id) + + def test_create_container_if_not_exists_throughput_bucket(self): + created_collection = self.database.create_container_if_not_exists( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(created_collection.id) + + def test_delete_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + self.database.delete_container( + created_collection.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_list_containers_throughput_bucket(self): + self.database.list_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_query_containers_throughput_bucket(self): + self.database.query_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_replace_container_throughput_bucket(self): + created_collection = self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + replaced_collection = self.database.replace_container( + created_collection.id, + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + self.database.delete_container(replaced_collection.id) + + def test_container_read_throughput_bucket(self): + self.container.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_read_item_throughput_bucket(self): + created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + self.container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_read_all_items_throughput_bucket(self): + for i in range(10): + self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) + + self.container.read_all_items( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_query_items_throughput_bucket(self): + doc_id = 'MyId' + str(uuid.uuid4()) + document_definition = {'pk': 'pk', 'id': doc_id} + self.container.create_item(body=document_definition) + + query = 'SELECT * from c' + self.container.query_items( + query=query, + partition_key='pk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_replace_item_throughput_bucket(self): + created_document = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + self.container.replace_item( + item=created_document['id'], + body={'id': '2' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_upsert_item_throughput_bucket(self): + self.container.upsert_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_create_item_throughput_bucket(self): + self.container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_patch_item_throughput_bucket(self): + pkValue = "patch_item_pk" + str(uuid.uuid4()) + # Create item to patch + item = { + "id": "patch_item", + "pk": pkValue, + "prop": "prop1", + "address": { + "city": "Redmond" + }, + "company": "Microsoft", + "number": 3} + self.container.create_item(item) + # Define and run patch operations + operations = [ + {"op": "add", "path": "/color", "value": "yellow"}, + {"op": "remove", "path": "/prop"}, + {"op": "replace", "path": "/company", "value": "CosmosDB"}, + {"op": "set", "path": "/address/new_city", "value": "Atlanta"}, + {"op": "incr", "path": "/number", "value": 7}, + {"op": "move", "from": "/color", "path": "/favorite_color"} + ] + self.container.patch_item( + item="patch_item", + partition_key=pkValue, + patch_operations=operations, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_execute_item_batch_throughput_bucket(self): + created_collection = self.database.create_container( + id='test_execute_item ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/company')) + batch = [] + for i in range(100): + batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},))) + + created_collection.execute_item_batch( + batch_operations=batch, + partition_key="Microsoft", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + self.database.delete_container(created_collection) + + def test_container_delete_item_throughput_bucket(self): + created_item = self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + self.container.delete_item( + created_item['id'], + partition_key='mypk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + def test_container_delete_all_items_by_partition_key_throughput_bucket(self): + created_collection = self.database.create_container( + id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/pk', kind='Hash')) + + # Create two partition keys + partition_key1 = "{}-{}".format("Partition Key 1", str(uuid.uuid4())) + partition_key2 = "{}-{}".format("Partition Key 2", str(uuid.uuid4())) + + # add items for partition key 1 + for i in range(1, 3): + created_collection.upsert_item( + dict(id="item{}".format(i), pk=partition_key1)) + + # add items for partition key 2 + pk2_item = created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) + + # delete all items for partition key 1 + created_collection.delete_all_items_by_partition_key( + partition_key1, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + if __name__ == "__main__": unittest.main() diff --git a/sdk/cosmos/azure-cosmos/tests/test_headers_async.py b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py new file mode 100644 index 000000000000..9fc941d69807 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/tests/test_headers_async.py @@ -0,0 +1,267 @@ +# The MIT License (MIT) +# Copyright (c) Microsoft Corporation. All rights reserved. + +import unittest + +import pytest +import uuid + + +import test_config +from azure.cosmos import http_constants +from azure.cosmos.aio import CosmosClient, _retry_utility_async, DatabaseProxy +from azure.cosmos.partition_key import PartitionKey + +client_throughput_bucket_number = 2 +request_throughput_bucket_number = 3 +async def client_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(client_throughput_bucket_number)) + +async def request_raw_response_hook(response): + assert (response.http_request.headers[http_constants.HttpHeaders.ThroughputBucket] + == str(request_throughput_bucket_number)) + +@pytest.mark.cosmosEmulator +class TestHeadersAsync(unittest.IsolatedAsyncioTestCase): + client: CosmosClient = None + configs = test_config.TestConfig + host = configs.host + masterKey = configs.masterKey + database: DatabaseProxy = None + + @classmethod + def setUpClass(cls): + if (cls.masterKey == '[YOUR_KEY_HERE]' or + cls.host == '[YOUR_ENDPOINT_HERE]'): + raise Exception( + "You must specify your Azure Cosmos account values for " + "'masterKey' and 'host' at the top of this class to run the " + "tests.") + + async def asyncSetUp(self): + self.client = CosmosClient(self.host, self.masterKey) + self.database = self.client.get_database_client(self.configs.TEST_DATABASE_ID) + self.container = self.database.get_container_client(self.configs.TEST_MULTI_PARTITION_CONTAINER_ID) + + async def test_client_level_throughput_bucket_async(self): + CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number, + raw_response_hook=client_raw_response_hook) + + async def test_request_precedence_throughput_bucket_async(self): + client = CosmosClient(self.host, self.masterKey, + throughput_bucket=client_throughput_bucket_number) + created_db = await client.create_database( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + await client.delete_database(created_db.id) + + async def test_create_db_if_not_exists_and_delete_db_throughput_bucket_async(self): + created_db = await self.client.create_database_if_not_exists( + "test_db" + str(uuid.uuid4()), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + await self.client.delete_database( + created_db.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_list_db_throughput_bucket_async(self): + self.client.list_databases( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_query_db_throughput_bucket_async(self): + query = 'SELECT * from c' + self.client.query_databases( + query=query, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_db_read_throughput_bucket_async(self): + await self.database.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_create_container_throughput_bucket_async(self): + created_collection = await self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + await self.database.delete_container(created_collection.id) + + async def test_create_container_if_not_exists_throughput_bucket_async(self): + created_collection = await self.database.create_container_if_not_exists( + str(uuid.uuid4()), + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + await self.database.delete_container(created_collection.id) + + async def test_delete_container_throughput_bucket_async(self): + created_collection = await self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + await self.database.delete_container( + created_collection.id, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_list_containers_throughput_bucket_async(self): + self.database.list_containers( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_query_containers_throughput_bucket_async(self): + query = 'SELECT * from c' + self.database.query_containers( + query=query, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_replace_container_throughput_bucket_async(self): + created_collection = await self.database.create_container( + str(uuid.uuid4()), + PartitionKey(path="/pk")) + replaced_collection = await self.database.replace_container( + created_collection.id, + PartitionKey(path="/pk"), + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + await self.database.delete_container(replaced_collection.id) + + async def test_container_read_throughput_bucket_async(self): + await self.container.read( + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_read_item_throughput_bucket_async(self): + created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + await self.container.read_item( + item=created_document['id'], + partition_key="mypk", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_read_all_items_throughput_bucket_async(self): + for i in range(10): + await self.container.create_item(body={'id': ''.format(i) + str(uuid.uuid4()), 'pk': 'mypk'}) + + async for item in self.container.read_all_items(throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook): + pass + + async def test_container_query_items_throughput_bucket_async(self): + doc_id = 'MyId' + str(uuid.uuid4()) + document_definition = {'pk': 'pk', 'id': doc_id} + await self.container.create_item(body=document_definition) + + query = 'SELECT * from c' + query_results = [item async for item in self.container.query_items( + query=query, + partition_key='pk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook)] + + async def test_container_replace_item_throughput_bucket_async(self): + created_document = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + await self.container.replace_item( + item=created_document['id'], + body={'id': '2' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_upsert_item_throughput_bucket_async(self): + await self.container.upsert_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_create_item_throughput_bucket_async(self): + await self.container.create_item( + body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_patch_item_throughput_bucket_async(self): + pkValue = "patch_item_pk" + str(uuid.uuid4()) + # Create item to patch + item = { + "id": "patch_item", + "pk": pkValue, + "prop": "prop1", + "address": { + "city": "Redmond" + }, + "company": "Microsoft", + "number": 3} + await self.container.create_item(item) + # Define and run patch operations + operations = [ + {"op": "add", "path": "/color", "value": "yellow"}, + {"op": "remove", "path": "/prop"}, + {"op": "replace", "path": "/company", "value": "CosmosDB"}, + {"op": "set", "path": "/address/new_city", "value": "Atlanta"}, + {"op": "incr", "path": "/number", "value": 7}, + {"op": "move", "from": "/color", "path": "/favorite_color"} + ] + await self.container.patch_item( + item="patch_item", + partition_key=pkValue, + patch_operations=operations, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_execute_item_batch_throughput_bucket_async(self): + created_collection = await self.database.create_container( + id='test_execute_item ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/company')) + batch = [] + for i in range(100): + batch.append(("create", ({"id": "item" + str(i), "company": "Microsoft"},))) + + await created_collection.execute_item_batch( + batch_operations=batch, + partition_key="Microsoft", + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + await self.database.delete_container(created_collection) + + async def test_container_delete_item_throughput_bucket_async(self): + created_item = await self.container.create_item(body={'id': '1' + str(uuid.uuid4()), 'pk': 'mypk'}) + + await self.container.delete_item( + created_item['id'], + partition_key='mypk', + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + + async def test_container_delete_all_items_by_partition_key_throughput_bucket_async(self): + created_collection = await self.database.create_container( + id='test_delete_all_items_by_partition_key ' + str(uuid.uuid4()), + partition_key=PartitionKey(path='/pk', kind='Hash')) + + # Create two partition keys + partition_key1 = "{}-{}".format("Partition Key 1", str(uuid.uuid4())) + partition_key2 = "{}-{}".format("Partition Key 2", str(uuid.uuid4())) + + # add items for partition key 1 + for i in range(1, 3): + await created_collection.upsert_item( + dict(id="item{}".format(i), pk=partition_key1)) + + # add items for partition key 2 + pk2_item = await created_collection.upsert_item(dict(id="item{}".format(3), pk=partition_key2)) + + # delete all items for partition key 1 + await created_collection.delete_all_items_by_partition_key( + partition_key1, + throughput_bucket=request_throughput_bucket_number, + raw_response_hook=request_raw_response_hook) + +if __name__ == "__main__": + unittest.main()