Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 17 additions & 9 deletions nats/src/nats/js/kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async def create(self, key: str, value: bytes, validate_keys: bool = True, msg_t

pa = None
try:
pa = await self.update(key, value, last=0, validate_keys=validate_keys, msg_ttl=msg_ttl)
pa = await self._update(key, value, last=0, validate_keys=validate_keys, msg_ttl=msg_ttl)
except nats.js.errors.KeyWrongLastSequenceError as err:
# In case of attempting to recreate an already deleted key,
# the client would get a KeyWrongLastSequenceError. When this happens,
Expand All @@ -246,7 +246,7 @@ async def create(self, key: str, value: bytes, validate_keys: bool = True, msg_t
# to recreate using the last revision.
raise err
except nats.js.errors.KeyDeletedError as err:
pa = await self.update(
pa = await self._update(
key, value, last=err.entry.revision, validate_keys=validate_keys, msg_ttl=msg_ttl
)

Expand All @@ -258,14 +258,20 @@ async def update(
value: bytes,
last: Optional[int] = None,
validate_keys: bool = True,
msg_ttl: Optional[float] = None,
) -> int:
"""
update will update the value if the latest revision matches.

Note: TTL parameter is accepted for internal use by create(), but should not be
used directly on update operations per NATS KV semantics.
"""
return await self._update(key, value, last=last, validate_keys=validate_keys)

async def _update(
self,
key: str,
value: bytes,
last: Optional[int] = None,
validate_keys: bool = True,
msg_ttl: Optional[float] = None,
) -> int:
if validate_keys and not _is_key_valid(key):
raise nats.js.errors.InvalidKeyError(key)

Expand All @@ -286,15 +292,17 @@ async def update(
return pa.seq

async def delete(
self, key: str, last: Optional[int] = None, validate_keys: bool = True, msg_ttl: Optional[float] = None
self,
key: str,
last: Optional[int] = None,
validate_keys: bool = True,
) -> bool:
"""
delete will place a delete marker and remove all previous revisions.

:param key: The key to delete
:param last: Expected last revision number (for optimistic concurrency)
:param validate_keys: Whether to validate the key format
:param msg_ttl: Optional TTL (time-to-live) in seconds for the delete marker
"""
if validate_keys and not _is_key_valid(key):
raise nats.js.errors.InvalidKeyError(key)
Expand All @@ -305,7 +313,7 @@ async def delete(
if last and last > 0:
hdrs[api.Header.EXPECTED_LAST_SUBJECT_SEQUENCE] = str(last)

await self._js.publish(f"{self._pre}{key}", headers=hdrs, msg_ttl=msg_ttl)
await self._js.publish(f"{self._pre}{key}", headers=hdrs)
return True

async def purge(self, key: str, msg_ttl: Optional[float] = None) -> bool:
Expand Down
48 changes: 0 additions & 48 deletions nats/tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -3914,54 +3914,6 @@ async def error_handler(e):

await nc.close()

@async_test
async def test_kv_delete_with_ttl(self):
"""Test that delete() supports msg_ttl parameter for the delete marker"""
errors = []

async def error_handler(e):
print("Error:", e, type(e))
errors.append(e)

nc = await nats.connect(error_cb=error_handler)

server_version = nc.connected_server_version
if server_version.major == 2 and server_version.minor < 11:
pytest.skip("per-message TTL requires nats-server v2.11.0 or later")

js = nc.jetstream()

# Create a KV bucket
kv = await js.create_key_value(bucket="TEST_TTL_DELETE", history=10)

# Put a key
seq = await kv.put("city", b"paris")
assert seq == 1

# Verify the key exists
entry = await kv.get("city")
assert entry.value == b"paris"

# Delete with TTL of 2 seconds on the delete marker
await kv.delete("city", msg_ttl=2.0)

# Key should be deleted immediately
with pytest.raises(KeyNotFoundError):
await kv.get("city")

# The delete marker should exist in the stream
status = await kv.status()
# After delete, there should be both the original message and delete marker
assert status.values >= 1

# Wait for the delete marker TTL to expire (2 seconds + buffer)
await asyncio.sleep(2.5)

# The marker itself should now be removed from the stream
# Note: This behavior depends on server version and configuration

await nc.close()

@async_test
async def test_kv_put_no_ttl(self):
"""Test that put() does NOT support TTL (should not have msg_ttl parameter)"""
Expand Down
Loading