Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions awswrangler/dynamodb/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def _convert_condition_base_to_expression(
@_utils.validate_distributed_kwargs(
unsupported_kwargs=["boto3_session", "dtype_backend"],
)
def read_items( # noqa: PLR0912
def read_items( # noqa: PLR0912, PLR0915
table_name: str,
index_name: str | None = None,
partition_values: Sequence[Any] | None = None,
Expand All @@ -475,6 +475,7 @@ def read_items( # noqa: PLR0912
use_threads: bool | int = True,
boto3_session: boto3.Session | None = None,
pyarrow_additional_kwargs: dict[str, Any] | None = None,
key_schema: list[dict[str, str]] | None = None,
) -> pd.DataFrame | Iterator[pd.DataFrame] | _ItemsListType | Iterator[_ItemsListType]:
"""Read items from given DynamoDB table.

Expand Down Expand Up @@ -551,6 +552,10 @@ def read_items( # noqa: PLR0912
Forwarded to `to_pandas` method converting from PyArrow tables to Pandas DataFrame.
Valid values include "split_blocks", "self_destruct", "ignore_metadata".
e.g. pyarrow_additional_kwargs={'split_blocks': True}.
key_schema
Key schema of the table (e.g. `[{"AttributeName": "key", "KeyType": "HASH"}]`).
If provided, the library will bypass the `DescribeTable` API call, which can
reduce network latency and prevent API throttling. Defaults to None.

Raises
------
Expand Down Expand Up @@ -657,7 +662,10 @@ def read_items( # noqa: PLR0912
# Extract key schema
dynamodb_client = _utils.client(service_name="dynamodb", session=boto3_session)
serializer = TypeSerializer()
table_key_schema = dynamodb_client.describe_table(TableName=table_name)["Table"]["KeySchema"]
if key_schema:
table_key_schema = key_schema
else:
table_key_schema = dynamodb_client.describe_table(TableName=table_name)["Table"]["KeySchema"]

# Detect sort key, if any
if len(table_key_schema) == 1:
Expand Down
9 changes: 9 additions & 0 deletions awswrangler/dynamodb/_read.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> pd.DataFrame: ...
@overload
def read_items(
Expand All @@ -90,6 +91,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> Iterator[pd.DataFrame]: ...
@overload
def read_items(
Expand All @@ -112,6 +114,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> _ItemsListType: ...
@overload
def read_items(
Expand All @@ -134,6 +137,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> Iterator[_ItemsListType]: ...
@overload
def read_items(
Expand All @@ -156,6 +160,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> pd.DataFrame | _ItemsListType: ...
@overload
def read_items(
Expand All @@ -178,6 +183,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> Iterator[pd.DataFrame] | Iterator[_ItemsListType]: ...
@overload
def read_items(
Expand All @@ -200,6 +206,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> pd.DataFrame | Iterator[pd.DataFrame]: ...
@overload
def read_items(
Expand All @@ -222,6 +229,7 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> _ItemsListType | Iterator[_ItemsListType]: ...
@overload
def read_items(
Expand All @@ -244,4 +252,5 @@ def read_items(
use_threads: bool | int = ...,
boto3_session: boto3.Session | None = ...,
pyarrow_additional_kwargs: dict[str, Any] | None = ...,
key_schema: list[dict[str, str]] | None = ...,
) -> pd.DataFrame | Iterator[pd.DataFrame] | _ItemsListType | Iterator[_ItemsListType]: ...
42 changes: 42 additions & 0 deletions tests/unit/test_moto.py
Original file line number Diff line number Diff line change
Expand Up @@ -760,3 +760,45 @@ def test_extract_ctas_manifest_paths_cross_bucket_raises(moto_s3_client: "S3Clie

with pytest.raises(InvalidArgumentValue, match="unexpected bucket"):
_extract_ctas_manifest_paths(path=f"s3://bucket/{manifest_key}")


def test_dynamodb_read_items_with_key_schema(moto_dynamodb_client, moto_dynamodb_table) -> None:
# Insert items
items = [{"key": 1, "value": "A"}, {"key": 2, "value": "B"}]
wr.dynamodb.put_items(items=items, table_name=moto_dynamodb_table)

# 1. Verify read_items works with key_schema passed
key_schema = [{"AttributeName": "key", "KeyType": "HASH"}]
df = wr.dynamodb.read_items(
table_name=moto_dynamodb_table,
key_schema=key_schema,
allow_full_scan=True,
)
assert len(df) == 2
assert set(df["value"]) == {"A", "B"}

# 2. Assert DescribeTable is NOT called when key_schema is provided
call = botocore.client.BaseClient._make_api_call
describe_table_calls = 0

def mock_make_api_call(self, operation_name, kwarg):
nonlocal describe_table_calls
if operation_name == "DescribeTable":
describe_table_calls += 1
return call(self, operation_name, kwarg)

with mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call):
# Call read_items with key_schema
wr.dynamodb.read_items(
table_name=moto_dynamodb_table,
key_schema=key_schema,
allow_full_scan=True,
)
assert describe_table_calls == 0

# Call read_items WITHOUT key_schema
wr.dynamodb.read_items(
table_name=moto_dynamodb_table,
allow_full_scan=True,
)
assert describe_table_calls == 1
Loading