Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions moto/dynamodb/models/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,9 @@ def __init__(
},
}

if stream_type in ("NEW_IMAGE", "NEW_AND_OLD_IMAGES"):
if stream_type in ("NEW_IMAGE", "NEW_AND_OLD_IMAGES") and new is not None:
self.record["dynamodb"]["NewImage"] = new_a
if stream_type in ("OLD_IMAGE", "NEW_AND_OLD_IMAGES"):
if stream_type in ("OLD_IMAGE", "NEW_AND_OLD_IMAGES") and old is not None:
self.record["dynamodb"]["OldImage"] = old_a

# This is a substantial overestimate but it's the easiest to do now
Expand Down
172 changes: 172 additions & 0 deletions tests/test_dynamodbstreams/test_dynamodbstreams.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,175 @@ def test_stream_with_range_key(self):
assert len(resp["Records"]) == 2
assert resp["Records"][0]["eventName"] == "INSERT"
assert resp["Records"][1]["eventName"] == "INSERT"


class TestStreamRecordImages:
"""Verify that stream records include the correct image keys.

AWS behavior:
- INSERT: NewImage present, OldImage absent
- MODIFY: both NewImage and OldImage present
- REMOVE: OldImage present, NewImage absent

Empty images (e.g. OldImage on INSERT) must be omitted entirely,
not included as empty dicts.
"""

@staticmethod
def _get_records(table_name, stream_arn):
"""Helper to read all records from a stream."""
streams = boto3.client("dynamodbstreams", region_name="us-east-1")
desc = streams.describe_stream(StreamArn=stream_arn)
shard_id = desc["StreamDescription"]["Shards"][0]["ShardId"]
resp = streams.get_shard_iterator(
StreamArn=stream_arn,
ShardId=shard_id,
ShardIteratorType="TRIM_HORIZON",
)
return streams.get_records(ShardIterator=resp["ShardIterator"])["Records"]

@mock_aws
def test_new_and_old_images_insert_has_no_old_image(self):
"""INSERT with NEW_AND_OLD_IMAGES should not have OldImage."""
client = boto3.client("dynamodb", region_name="us-east-1")
resp = client.create_table(
TableName="img-test",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
)
stream_arn = resp["TableDescription"]["LatestStreamArn"]
client.put_item(
TableName="img-test",
Item={"pk": {"S": "k1"}, "val": {"S": "a"}},
)

records = self._get_records("img-test", stream_arn)
assert len(records) == 1
rec = records[0]
assert rec["eventName"] == "INSERT"
assert "NewImage" in rec["dynamodb"]
assert rec["dynamodb"]["NewImage"]["val"] == {"S": "a"}
# INSERT must NOT have OldImage
assert "OldImage" not in rec["dynamodb"]

@mock_aws
def test_new_and_old_images_modify_has_both(self):
"""MODIFY with NEW_AND_OLD_IMAGES should have both images."""
client = boto3.client("dynamodb", region_name="us-east-1")
resp = client.create_table(
TableName="img-test",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
)
stream_arn = resp["TableDescription"]["LatestStreamArn"]
client.put_item(
TableName="img-test",
Item={"pk": {"S": "k1"}, "val": {"S": "original"}},
)
client.put_item(
TableName="img-test",
Item={"pk": {"S": "k1"}, "val": {"S": "updated"}},
)

records = self._get_records("img-test", stream_arn)
assert len(records) == 2
modify = records[1]
assert modify["eventName"] == "MODIFY"
assert "OldImage" in modify["dynamodb"]
assert "NewImage" in modify["dynamodb"]
assert modify["dynamodb"]["OldImage"]["val"] == {"S": "original"}
assert modify["dynamodb"]["NewImage"]["val"] == {"S": "updated"}

@mock_aws
def test_new_and_old_images_remove_has_no_new_image(self):
"""REMOVE with NEW_AND_OLD_IMAGES should not have NewImage."""
client = boto3.client("dynamodb", region_name="us-east-1")
resp = client.create_table(
TableName="img-test",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "NEW_AND_OLD_IMAGES",
},
)
stream_arn = resp["TableDescription"]["LatestStreamArn"]
client.put_item(
TableName="img-test",
Item={"pk": {"S": "k1"}, "val": {"S": "a"}},
)
client.delete_item(TableName="img-test", Key={"pk": {"S": "k1"}})

records = self._get_records("img-test", stream_arn)
remove = records[1]
assert remove["eventName"] == "REMOVE"
assert "OldImage" in remove["dynamodb"]
assert remove["dynamodb"]["OldImage"]["val"] == {"S": "a"}
# REMOVE must NOT have NewImage
assert "NewImage" not in remove["dynamodb"]

@mock_aws
def test_new_image_only_remove_has_no_new_image(self):
"""REMOVE with NEW_IMAGE should not have NewImage (nothing new)."""
client = boto3.client("dynamodb", region_name="us-east-1")
resp = client.create_table(
TableName="img-test",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "NEW_IMAGE",
},
)
stream_arn = resp["TableDescription"]["LatestStreamArn"]
client.put_item(
TableName="img-test",
Item={"pk": {"S": "k1"}, "val": {"S": "a"}},
)
client.delete_item(TableName="img-test", Key={"pk": {"S": "k1"}})

records = self._get_records("img-test", stream_arn)
remove = records[1]
assert remove["eventName"] == "REMOVE"
# NEW_IMAGE on a REMOVE: there is no new image
assert "NewImage" not in remove["dynamodb"]
assert "OldImage" not in remove["dynamodb"]

@mock_aws
def test_old_image_only_insert_has_no_old_image(self):
"""INSERT with OLD_IMAGE should not have OldImage (nothing old)."""
client = boto3.client("dynamodb", region_name="us-east-1")
resp = client.create_table(
TableName="img-test",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={
"StreamEnabled": True,
"StreamViewType": "OLD_IMAGE",
},
)
stream_arn = resp["TableDescription"]["LatestStreamArn"]
client.put_item(
TableName="img-test",
Item={"pk": {"S": "k1"}, "val": {"S": "a"}},
)

records = self._get_records("img-test", stream_arn)
insert = records[0]
assert insert["eventName"] == "INSERT"
# OLD_IMAGE on an INSERT: there is no old image
assert "OldImage" not in insert["dynamodb"]
assert "NewImage" not in insert["dynamodb"]
Loading