Skip to content

add consumed capacity to the table resource #4491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 110 additions & 6 deletions boto3/dynamodb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class TableResource:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def batch_writer(self, overwrite_by_pkeys=None):
def batch_writer(
self, overwrite_by_pkeys=None, return_consumed_capacity=None
):
"""Create a batch writer object.

This method creates a context manager for writing
Expand All @@ -53,18 +55,38 @@ def batch_writer(self, overwrite_by_pkeys=None):
:param overwrite_by_pkeys: De-duplicate request items in buffer
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``
:type return_consumed_capacity: string
:param return_consumed_capacity: Determines the level of detail
about either provisioned or on-demand throughput consumption
that is returned in the response:
INDEXES - The response includes the aggregate
ConsumedCapacity for the operation, together with
ConsumedCapacity for each table and secondary index that
was accessed.
TOTAL - The response includes only the aggregate
ConsumedCapacity for the operation.
NONE - No ConsumedCapacity details are included in the
response.

"""
return BatchWriter(
self.name, self.meta.client, overwrite_by_pkeys=overwrite_by_pkeys
self.name,
self.meta.client,
overwrite_by_pkeys=overwrite_by_pkeys,
return_consumed_capacity=return_consumed_capacity,
)


class BatchWriter:
"""Automatically handle batch writes to DynamoDB for a single table."""

def __init__(
self, table_name, client, flush_amount=25, overwrite_by_pkeys=None
self,
table_name,
client,
flush_amount=25,
overwrite_by_pkeys=None,
return_consumed_capacity=None,
):
"""

Expand Down Expand Up @@ -92,12 +114,26 @@ def __init__(
if match new request item on specified primary keys. i.e
``["partition_key1", "sort_key2", "sort_key3"]``

:type return_consumed_capacity: string
:param return_consumed_capacity: Determines the level of detail
about either provisioned or on-demand throughput consumption
that is returned in the response:
INDEXES - The response includes the aggregate
ConsumedCapacity for the operation, together with
ConsumedCapacity for each table and secondary index that
was accessed.
TOTAL - The response includes only the aggregate
ConsumedCapacity for the operation.
NONE - No ConsumedCapacity details are included in the
response.
"""
self._table_name = table_name
self._client = client
self._items_buffer = []
self._flush_amount = flush_amount
self._overwrite_by_pkeys = overwrite_by_pkeys
self.return_consumed_capacity = return_consumed_capacity
self.consumed_capacity = None

def put_item(self, Item):
self._add_request_and_process({'PutRequest': {'Item': Item}})
Expand Down Expand Up @@ -141,9 +177,15 @@ def _flush_if_needed(self):
def _flush(self):
items_to_send = self._items_buffer[: self._flush_amount]
self._items_buffer = self._items_buffer[self._flush_amount :]
response = self._client.batch_write_item(
RequestItems={self._table_name: items_to_send}
)
params = {
'RequestItems': {self._table_name: items_to_send},
}
if self.return_consumed_capacity is not None:
params['ReturnConsumedCapacity'] = self.return_consumed_capacity
response = self._client.batch_write_item(**params)
consumed_capacity = response.get('ConsumedCapacity')
if consumed_capacity is not None:
self._update_consumed_capacity_array(consumed_capacity)
unprocessed_items = response['UnprocessedItems']
if not unprocessed_items:
unprocessed_items = {}
Expand All @@ -157,6 +199,68 @@ def _flush(self):
len(self._items_buffer),
)

def _update_consumed_capacity_array(self, new_consumed_capacity):
if self.consumed_capacity is None:
self.consumed_capacity = new_consumed_capacity
elif new_consumed_capacity:
self.aggg_consumed_capacity_objects(
self.consumed_capacity[0], new_consumed_capacity[0]
)

@staticmethod
def aggg_consumed_capacity_objects(
total_consumed_capacity, consumed_capacity
):
# Merge total capacities
BatchWriter._agg_capacity_objects(
total_consumed_capacity, consumed_capacity
)

# Merge table capacities
if 'Table' in consumed_capacity:
if 'Table' not in total_consumed_capacity:
total_consumed_capacity['Table'] = {}
BatchWriter._agg_capacity_objects(
total_consumed_capacity['Table'],
consumed_capacity['Table'],
)

# Merge indexes capacities
index_types = ['LocalSecondaryIndexes', 'GlobalSecondaryIndexes']
for index_type in index_types:
if index_type in consumed_capacity:
if index_type not in total_consumed_capacity:
total_consumed_capacity[index_type] = consumed_capacity[
index_type
]
else:
for index_name in consumed_capacity[index_type]:
if (
index_name
not in total_consumed_capacity[index_type]
):
total_consumed_capacity[index_type][
index_name
] = {}
BatchWriter._agg_capacity_objects(
total_consumed_capacity[index_type][index_name],
consumed_capacity[index_type][index_name],
)

@staticmethod
def _agg_capacity_objects(total_consumed_capacity, consumed_capacity):
capacity_unit_keys = [
'CapacityUnits',
'ReadCapacityUnits',
'WriteCapacityUnits',
]
for key in capacity_unit_keys:
if key in consumed_capacity:
total_consumed_capacity[key] = (
total_consumed_capacity.get(key, 0)
+ consumed_capacity[key]
)

def __enter__(self):
return self

Expand Down
63 changes: 63 additions & 0 deletions tests/integration/test_dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,66 @@ def test_batch_write_items(self):
# Verify all the items were added to dynamodb.
for obj in self.table.scan(ConsistentRead=True)['Items']:
self.assertIn(obj, items)

# Verify consumed capacity is None
self.assertIs(batch.consumed_capacity, None)

def test_batch_write_item_agg_capacity_none(self):
num_elements = 100
items = []
for i in range(num_elements):
items.append({'MyHashKey': f'foo{i}', 'OtherKey': f'bar{i}'})

with self.table.batch_writer(return_consumed_capacity='NONE') as batch:
for item in items:
batch.put_item(Item=item)

# Verify all the items were added to dynamodb.
for obj in self.table.scan(ConsistentRead=True)['Items']:
self.assertIn(obj, items)

# Verify consumed capacity
self.assertIs(batch.consumed_capacity, None)

def test_batch_write_item_agg_capacity_total(self):
num_elements = 100
items = []
for i in range(num_elements):
items.append({'MyHashKey': f'foo{i}', 'OtherKey': f'bar{i}'})

with self.table.batch_writer(
return_consumed_capacity='TOTAL'
) as batch:
for item in items:
batch.put_item(Item=item)

# Verify all the items were added to dynamodb.
for obj in self.table.scan(ConsistentRead=True)['Items']:
self.assertIn(obj, items)

# Verify consumed capacity
total_cu = batch.consumed_capacity[0]['CapacityUnits']
self.assertEqual(total_cu, num_elements)
self.assertNotIn('Table', batch.consumed_capacity[0])

def test_batch_write_item_agg_capacity_indexes(self):
num_elements = 100
items = []
for i in range(num_elements):
items.append({'MyHashKey': f'foo{i}', 'OtherKey': f'bar{i}'})

with self.table.batch_writer(
return_consumed_capacity='INDEXES'
) as batch:
for item in items:
batch.put_item(Item=item)

# Verify all the items were added to dynamodb.
for obj in self.table.scan(ConsistentRead=True)['Items']:
self.assertIn(obj, items)

# Verify consumed capacity
total_cu = batch.consumed_capacity[0]['CapacityUnits']
table_cu = batch.consumed_capacity[0]['Table']['CapacityUnits']
self.assertEqual(total_cu, num_elements)
self.assertEqual(table_cu, num_elements)
Loading