Skip to content

Commit ff80a15

Browse files
committed
Implement BucketIT
1 parent 55ef4dc commit ff80a15

File tree

1 file changed

+384
-8
lines changed

1 file changed

+384
-8
lines changed

tests/test_buckets.py

Lines changed: 384 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,389 @@
1+
import pytest
2+
from botocore.exceptions import ClientError
3+
from datetime import datetime, timedelta, timezone
4+
import time
15

2-
def test_create_list_delete_bucket(s3_client, bucket_name: str):
3-
buckets = s3_client.list_buckets()
4-
assert len(buckets['Buckets']) == 0
6+
from s3mock_test import given_bucket
57

8+
# reimplementation of https://github.com/adobe/S3Mock/blob/main/integration-tests/src/test/kotlin/com/adobe/testing/s3mock/its/BucketIT.kt
9+
10+
def test_creating_and_deleting_a_bucket_is_successful(s3_client, bucket_name: str):
11+
# Create the bucket
612
s3_client.create_bucket(Bucket=bucket_name)
7-
response = s3_client.list_buckets()
8-
assert len(response['Buckets']) == 1
9-
assert response['Buckets'][0]['Name'] == bucket_name
1013

14+
# Wait until the bucket exists
15+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
16+
17+
# Does not throw if bucket exists; also returns a response dict
18+
head_resp = s3_client.head_bucket(Bucket=bucket_name)
19+
assert head_resp is not None
20+
21+
# Delete the bucket
1122
s3_client.delete_bucket(Bucket=bucket_name)
12-
buckets = s3_client.list_buckets()
13-
assert len(buckets['Buckets']) == 0
23+
24+
# Wait until the bucket no longer exists
25+
s3_client.get_waiter("bucket_not_exists").wait(Bucket=bucket_name)
26+
27+
# Verify it's gone: head should raise ClientError with NoSuchBucket/404
28+
try:
29+
s3_client.head_bucket(Bucket=bucket_name)
30+
pytest.fail("Bucket still exists after deletion")
31+
except ClientError as e:
32+
code = e.response.get("Error", {}).get("Code")
33+
assert code in ("NoSuchBucket", "404")
34+
35+
def test_creating_a_bucket_with_configuration_is_successful(s3_client, bucket_name: str):
36+
# Create the bucket with a location constraint (region)
37+
create_resp = s3_client.create_bucket(
38+
Bucket=bucket_name,
39+
CreateBucketConfiguration={
40+
"LocationConstraint": "ap-southeast-5",
41+
},
42+
)
43+
44+
# Status code and location like in the Kotlin assertions
45+
assert create_resp["ResponseMetadata"]["HTTPStatusCode"] == 200
46+
assert create_resp.get("Location") == f"/{bucket_name}"
47+
48+
# Wait until the bucket exists
49+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
50+
51+
# Does not raise if the bucket exists
52+
head_resp = s3_client.head_bucket(Bucket=bucket_name)
53+
assert head_resp is not None
54+
55+
def test_deleting_a_non_empty_bucket_fails(s3_client, bucket_name: str):
56+
# Create a bucket and upload one object into it
57+
s3_client.create_bucket(Bucket=bucket_name)
58+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
59+
60+
s3_client.put_object(Bucket=bucket_name, Key="test-object", Body=b"data")
61+
62+
# Attempting to delete a non-empty bucket should raise a ClientError
63+
with pytest.raises(ClientError) as excinfo:
64+
s3_client.delete_bucket(Bucket=bucket_name)
65+
66+
err = excinfo.value
67+
# Check HTTP status code 409 (Conflict)
68+
status = err.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
69+
assert status == 409
70+
# Check AWS error code "BucketNotEmpty"
71+
code = err.response.get("Error", {}).get("Code")
72+
assert code == "BucketNotEmpty"
73+
74+
def test_creating_and_listing_multiple_buckets_is_successful(s3_client, bucket_name: str):
75+
# Create three buckets with a shared base name
76+
created_names = [f"{bucket_name}-1", f"{bucket_name}-2", f"{bucket_name}-3"]
77+
for n in created_names:
78+
given_bucket(s3_client, n)
79+
80+
# Allow for stripped milliseconds and up to 1 minute of clock skew
81+
creation_threshold = datetime.now(timezone.utc) - timedelta(minutes=1)
82+
83+
resp = s3_client.list_buckets()
84+
85+
# Buckets list should exist
86+
assert resp.get("Buckets")
87+
buckets = resp["Buckets"]
88+
89+
# Expect exactly 5 buckets: 2 defaults + 3 created
90+
assert len(buckets) == 5
91+
92+
# Names should be exactly as expected, in order
93+
names = [b["Name"] for b in buckets]
94+
assert names == ["bucket-a", "bucket-b", *created_names]
95+
96+
# Creation dates for the 3 created buckets should be after or equal to the threshold
97+
assert buckets[2]["CreationDate"] >= creation_threshold
98+
assert buckets[3]["CreationDate"] >= creation_threshold
99+
assert buckets[4]["CreationDate"] >= creation_threshold
100+
101+
# No pagination-like fields for list_buckets (ensure absent/None)
102+
assert resp.get("Prefix") is None
103+
assert resp.get("ContinuationToken") is None
104+
105+
# Owner metadata
106+
owner = resp.get("Owner") or {}
107+
assert owner.get("DisplayName") == "s3-mock-file-store"
108+
assert owner.get("ID") == "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
109+
110+
def test_creating_and_listing_multiple_buckets_limiting_by_prefix_is_successful(s3_client, bucket_name: str):
111+
# Create three buckets with a shared base name (prefix)
112+
created_names = [f"{bucket_name}-1", f"{bucket_name}-2", f"{bucket_name}-3"]
113+
for n in created_names:
114+
given_bucket(s3_client, n)
115+
116+
# the returned creation date might strip off the millisecond-part, resulting in rounding down
117+
# and account for a clock-skew in the Docker container of up to a minute.
118+
creation_threshold = datetime.now(timezone.utc) - timedelta(minutes=1)
119+
120+
# List all buckets, then apply client-side filtering by prefix
121+
# (the AWS SDK for Python does not support a Prefix parameter for list_buckets)
122+
resp = s3_client.list_buckets()
123+
assert resp.get("Buckets")
124+
buckets = resp["Buckets"]
125+
126+
# Filter by our test prefix and keep order
127+
filtered = [b for b in buckets if b["Name"].startswith(bucket_name)]
128+
assert len(filtered) == 3
129+
130+
names = [b["Name"] for b in filtered]
131+
assert names == created_names
132+
133+
# Creation dates should be after or equal to the threshold
134+
assert filtered[0]["CreationDate"] >= creation_threshold
135+
assert filtered[1]["CreationDate"] >= creation_threshold
136+
assert filtered[2]["CreationDate"] >= creation_threshold
137+
138+
# Emulate the prefix assertion from the original: we used this prefix to filter
139+
prefix = bucket_name
140+
assert prefix == bucket_name
141+
142+
# No continuation token in basic list_buckets response
143+
assert resp.get("ContinuationToken") is None
144+
145+
# Owner metadata
146+
owner = resp.get("Owner") or {}
147+
assert owner.get("DisplayName") == "s3-mock-file-store"
148+
assert owner.get("ID") == "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
149+
150+
151+
def test_creating_and_listing_multiple_buckets_limiting_by_max_buckets_is_successful(
152+
s3_client, bucket_name: str, endpoint_url_http: str
153+
):
154+
# Create three buckets with a shared base name
155+
created_names = [f"{bucket_name}-1", f"{bucket_name}-2", f"{bucket_name}-3"]
156+
for n in created_names:
157+
given_bucket(s3_client, n)
158+
159+
# Allow for stripped milliseconds and up to 1 minute of clock skew
160+
creation_threshold = datetime.now(timezone.utc) - timedelta(minutes=1)
161+
162+
# First page: limit to 4 via custom S3Mock query parameter
163+
page1 = s3_client.list_buckets(MaxBuckets=4)
164+
165+
# Buckets list should exist
166+
assert page1["Buckets"]
167+
buckets1 = page1["Buckets"]
168+
assert len(buckets1) == 4
169+
assert [b["Name"] for b in buckets1] == [
170+
"bucket-a",
171+
"bucket-b",
172+
created_names[0],
173+
created_names[1],
174+
]
175+
176+
assert buckets1[2]["CreationDate"] >= creation_threshold
177+
assert buckets1[3]["CreationDate"] >= creation_threshold
178+
179+
assert page1.get("Prefix") in (None, "") # S3Mock may omit or send empty
180+
assert page1["ContinuationToken"] is not None
181+
assert page1["Owner"]["DisplayName"] == "s3-mock-file-store"
182+
assert page1["Owner"]["ID"] == "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
183+
184+
# Second page using continuation token
185+
page2 = s3_client.list_buckets(ContinuationToken=page1["ContinuationToken"])
186+
187+
assert page2["Buckets"]
188+
buckets2 = page2["Buckets"]
189+
assert len(buckets2) == 1
190+
assert [b["Name"] for b in buckets2] == [created_names[2]]
191+
assert buckets2[0]["CreationDate"] >= creation_threshold
192+
193+
assert page2.get("Prefix") in (None, "")
194+
assert page2.get("ContinuationToken") in (None, "")
195+
assert page2["Owner"]["DisplayName"] == "s3-mock-file-store"
196+
assert page2["Owner"]["ID"] == "79a59df900b949e55d96a1e698fbacedfd6e09d98eacf8f8d5218e7cd47ef2be"
197+
198+
def test_default_buckets_were_created(s3_client):
199+
resp = s3_client.list_buckets()
200+
buckets = resp.get("Buckets", [])
201+
names = [b["Name"] for b in buckets]
202+
assert len(names) == 2
203+
assert names == ["bucket-a", "bucket-b"]
204+
205+
def test_get_bucket_location_returns_a_result(s3_client, bucket_name: str):
206+
# Create bucket in a specific region
207+
s3_client.create_bucket(
208+
Bucket=bucket_name,
209+
CreateBucketConfiguration={"LocationConstraint": "eu-west-1"},
210+
)
211+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
212+
213+
# Retrieve bucket location
214+
resp = s3_client.get_bucket_location(Bucket=bucket_name)
215+
assert resp.get("LocationConstraint") == "eu-west-1"
216+
217+
def test_by_default_bucket_versioning_is_turned_off(s3_client, bucket_name: str):
218+
# Create a fresh bucket
219+
given_bucket(s3_client, bucket_name)
220+
221+
# Query bucket versioning
222+
resp = s3_client.get_bucket_versioning(Bucket=bucket_name)
223+
224+
# When versioning is not configured, Status and MFADelete should be absent/None
225+
assert resp.get("Status") is None
226+
assert resp.get("MFADelete") is None
227+
228+
def test_put_bucket_versioning_works_get_bucket_versioning_returns_enabled(s3_client, bucket_name: str):
229+
# Create a bucket
230+
given_bucket(s3_client, bucket_name)
231+
232+
# Enable versioning on the bucket
233+
s3_client.put_bucket_versioning(
234+
Bucket=bucket_name,
235+
VersioningConfiguration={"Status": "Enabled",},
236+
)
237+
238+
# Verify versioning status is returned as Enabled
239+
resp = s3_client.get_bucket_versioning(Bucket=bucket_name)
240+
assert resp.get("Status") == "Enabled"
241+
242+
def test_put_bucket_versioning_with_mfa_works_get_bucket_versioning_is_returned_correctly(
243+
s3_client, bucket_name: str
244+
):
245+
# Create a bucket
246+
given_bucket(s3_client, bucket_name)
247+
248+
# Enable versioning with MFA delete via MFA header and configuration
249+
s3_client.put_bucket_versioning(
250+
Bucket=bucket_name,
251+
MFA="fakeMfaValue",
252+
VersioningConfiguration={
253+
"Status": "Enabled",
254+
"MFADelete": "Enabled",
255+
},
256+
)
257+
258+
# Verify both versioning status and MFA delete status
259+
resp = s3_client.get_bucket_versioning(Bucket=bucket_name)
260+
assert resp.get("Status") == "Enabled"
261+
assert resp.get("MFADelete") == "Enabled"
262+
263+
def test_duplicate_bucket_creation_returns_the_correct_error(s3_client, bucket_name: str):
264+
# Create the bucket
265+
s3_client.create_bucket(Bucket=bucket_name)
266+
267+
# Wait until the bucket exists and verify
268+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
269+
head_resp = s3_client.head_bucket(Bucket=bucket_name)
270+
assert head_resp is not None
271+
272+
# Attempt to create the same bucket again and validate error details
273+
with pytest.raises(ClientError) as excinfo:
274+
s3_client.create_bucket(Bucket=bucket_name)
275+
276+
err = excinfo.value
277+
status = err.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
278+
assert status == 409
279+
code = err.response.get("Error", {}).get("Code")
280+
assert code == "BucketAlreadyOwnedByYou"
281+
282+
# Clean up: delete the bucket and wait until it's gone
283+
s3_client.delete_bucket(Bucket=bucket_name)
284+
s3_client.get_waiter("bucket_not_exists").wait(Bucket=bucket_name)
285+
286+
# Confirm the bucket is gone
287+
with pytest.raises(ClientError) as exc2:
288+
s3_client.head_bucket(Bucket=bucket_name)
289+
code2 = exc2.value.response.get("Error", {}).get("Code")
290+
assert code2 in ("NoSuchBucket", "404")
291+
292+
def test_duplicate_bucket_deletion_returns_the_correct_error(s3_client, bucket_name: str):
293+
# Create the bucket
294+
s3_client.create_bucket(Bucket=bucket_name)
295+
296+
# Wait until the bucket exists and verify
297+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
298+
head_resp = s3_client.head_bucket(Bucket=bucket_name)
299+
assert head_resp is not None
300+
301+
# Delete the bucket and wait until it's gone
302+
s3_client.delete_bucket(Bucket=bucket_name)
303+
s3_client.get_waiter("bucket_not_exists").wait(Bucket=bucket_name)
304+
305+
# Confirm the bucket is gone (head should fail)
306+
with pytest.raises(ClientError) as exc1:
307+
s3_client.head_bucket(Bucket=bucket_name)
308+
code1 = exc1.value.response.get("Error", {}).get("Code")
309+
assert code1 in ("NoSuchBucket", "404")
310+
311+
# Deleting a non-existent bucket should return a 404 NoSuchBucket
312+
with pytest.raises(ClientError) as exc2:
313+
s3_client.delete_bucket(Bucket=bucket_name)
314+
status2 = exc2.value.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
315+
assert status2 == 404
316+
code2 = exc2.value.response.get("Error", {}).get("Code")
317+
assert code2 in ("NoSuchBucket", "404")
318+
319+
def test_get_bucket_lifecycle_returns_error_if_not_set(s3_client, bucket_name: str):
320+
# Create the bucket and wait until it exists
321+
s3_client.create_bucket(Bucket=bucket_name)
322+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
323+
head_resp = s3_client.head_bucket(Bucket=bucket_name)
324+
assert head_resp is not None
325+
326+
# Getting lifecycle configuration on a bucket without one should raise a 404
327+
with pytest.raises(ClientError) as excinfo:
328+
s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)
329+
330+
err = excinfo.value
331+
status = err.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
332+
assert status == 404
333+
code = err.response.get("Error", {}).get("Code")
334+
assert code == "NoSuchLifecycleConfiguration"
335+
336+
def test_put_get_delete_bucket_lifecycle_is_successful(s3_client, bucket_name: str):
337+
# Create the bucket and wait until it exists
338+
s3_client.create_bucket(Bucket=bucket_name)
339+
s3_client.get_waiter("bucket_exists").wait(Bucket=bucket_name)
340+
head_resp = s3_client.head_bucket(Bucket=bucket_name)
341+
assert head_resp is not None
342+
343+
# Define lifecycle configuration equivalent to the selection
344+
lifecycle_config = {
345+
"Rules": [
346+
{
347+
"ID": bucket_name,
348+
"Status": "Enabled",
349+
"Filter": {"Prefix": "myprefix/"},
350+
"AbortIncompleteMultipartUpload": {"DaysAfterInitiation": 2},
351+
"Expiration": {"Days": 2},
352+
}
353+
]
354+
}
355+
356+
# Put lifecycle configuration
357+
s3_client.put_bucket_lifecycle_configuration(
358+
Bucket=bucket_name, LifecycleConfiguration=lifecycle_config
359+
)
360+
361+
# Get lifecycle configuration and verify first rule matches important fields
362+
got = s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)
363+
assert got.get("Rules")
364+
assert len(got["Rules"]) == 1
365+
rule = got["Rules"][0]
366+
assert rule.get("ID") == bucket_name
367+
assert rule.get("Status") == "Enabled"
368+
assert rule.get("Filter", {}).get("Prefix") == "myprefix/"
369+
assert rule.get("AbortIncompleteMultipartUpload", {}).get("DaysAfterInitiation") == 2
370+
assert rule.get("Expiration", {}).get("Days") == 2
371+
372+
# Delete lifecycle configuration
373+
del_resp = s3_client.delete_bucket_lifecycle(Bucket=bucket_name)
374+
# Expect 204 No Content (allow 200 in case of implementation variance)
375+
status_del = del_resp.get("ResponseMetadata", {}).get("HTTPStatusCode")
376+
assert status_del in (204, 200)
377+
378+
# Give backend time to apply deletion to ensure following call fails
379+
time.sleep(3)
380+
381+
# Now fetching lifecycle should yield 404 NoSuchLifecycleConfiguration
382+
with pytest.raises(ClientError) as excinfo:
383+
s3_client.get_bucket_lifecycle_configuration(Bucket=bucket_name)
384+
385+
err = excinfo.value
386+
status = err.response.get("ResponseMetadata", {}).get("HTTPStatusCode")
387+
assert status == 404
388+
code = err.response.get("Error", {}).get("Code")
389+
assert code == "NoSuchLifecycleConfiguration"

0 commit comments

Comments
 (0)