Skip to content

Commit 21aaf61

Browse files
committed
Add subject_delete_marker_ttl and limit_marker_ttl support with review fixes
Implements PR nats-io#814 feature (closes nats-io#725) with improvements from code review: - Add subject_delete_marker_ttl field to StreamConfig with nanosecond conversion in from_response/as_dict - Add limit_marker_ttl field to KeyValueConfig, passed through to StreamConfig in create_key_value - Add headers field to KeyValue.Entry for accessing message metadata - Detect TTL-expired markers via Nats-Marker-Reason header in watcher Review improvements applied: - Extract "Nats-Marker-Reason" and "MaxAge" magic strings to module constants (NATS_MARKER_REASON, NATS_MARKER_MAX_AGE) in kv.py - Add limit_marker_ttl nanosecond conversion in KeyValueConfig.as_dict() for consistency with ttl field - Use Dict[str, str] type annotation for Entry.headers instead of bare dict - Add server version comments on KeyValueConfig.limit_marker_ttl https://claude.ai/code/session_01MteA8J2yH3ky5Y6coBpEAf
1 parent 531bcf4 commit 21aaf61

File tree

4 files changed

+89
-3
lines changed

4 files changed

+89
-3
lines changed

nats/src/nats/js/api.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -346,10 +346,15 @@ class StreamConfig(Base):
346346
# Metadata are user defined string key/value pairs.
347347
metadata: Optional[Dict[str, str]] = None
348348

349+
# TTL of subject delete markers.
350+
# Introduced in nats-server 2.11.0.
351+
subject_delete_marker_ttl: Optional[float] = None # in seconds
352+
349353
@classmethod
350354
def from_response(cls, resp: Dict[str, Any]):
351355
cls._convert_nanoseconds(resp, "max_age")
352356
cls._convert_nanoseconds(resp, "duplicate_window")
357+
cls._convert_nanoseconds(resp, "subject_delete_marker_ttl")
353358
cls._convert(resp, "placement", Placement)
354359
cls._convert(resp, "mirror", StreamSource)
355360
cls._convert(resp, "sources", StreamSource)
@@ -361,6 +366,7 @@ def as_dict(self) -> Dict[str, object]:
361366
result = super().as_dict()
362367
result["duplicate_window"] = self._to_nanoseconds(self.duplicate_window)
363368
result["max_age"] = self._to_nanoseconds(self.max_age)
369+
result["subject_delete_marker_ttl"] = self._to_nanoseconds(self.subject_delete_marker_ttl)
364370
if self.sources:
365371
result["sources"] = [src.as_dict() for src in self.sources]
366372
if self.compression and (self.compression != StoreCompression.NONE and self.compression != StoreCompression.S2):
@@ -748,9 +754,14 @@ class KeyValueConfig(Base):
748754
republish: Optional[RePublish] = None
749755
direct: Optional[bool] = None
750756

757+
# TTL of limit (delete) markers for KV.
758+
# Introduced in nats-server 2.11.0.
759+
limit_marker_ttl: Optional[float] = None # in seconds
760+
751761
def as_dict(self) -> Dict[str, object]:
752762
result = super().as_dict()
753763
result["ttl"] = self._to_nanoseconds(self.ttl)
764+
result["limit_marker_ttl"] = self._to_nanoseconds(self.limit_marker_ttl)
754765
return result
755766

756767

nats/src/nats/js/client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1416,6 +1416,7 @@ async def create_key_value(
14161416
num_replicas=config.replicas,
14171417
storage=config.storage,
14181418
republish=config.republish,
1419+
subject_delete_marker_ttl=config.limit_marker_ttl,
14191420
)
14201421
si = await self.add_stream(stream)
14211422
assert stream.name is not None

nats/src/nats/js/kv.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import logging
2020
import re
2121
from dataclasses import dataclass
22-
from typing import TYPE_CHECKING, List, Optional
22+
from typing import TYPE_CHECKING, Dict, List, Optional
2323

2424
import nats.errors
2525
import nats.js.errors
@@ -32,6 +32,8 @@
3232
KV_DEL = "DEL"
3333
KV_PURGE = "PURGE"
3434
MSG_ROLLUP_SUBJECT = "sub"
35+
NATS_MARKER_REASON = "Nats-Marker-Reason"
36+
NATS_MARKER_MAX_AGE = "MaxAge"
3537

3638
logger = logging.getLogger(__name__)
3739

@@ -86,6 +88,7 @@ class Entry:
8688
delta: Optional[int]
8789
created: Optional[int]
8890
operation: Optional[str]
91+
headers: Optional[Dict[str, str]] = None
8992

9093
@dataclass(frozen=True)
9194
class BucketStatus:
@@ -472,8 +475,11 @@ async def watch_updates(msg):
472475

473476
meta = msg.metadata
474477
op = None
475-
if msg.header and KV_OP in msg.header:
476-
op = msg.header.get(KV_OP)
478+
if msg.header:
479+
if KV_OP in msg.header:
480+
op = msg.header.get(KV_OP)
481+
elif msg.header.get(NATS_MARKER_REASON) == NATS_MARKER_MAX_AGE:
482+
op = KV_DEL
477483

478484
# keys() uses this
479485
if ignore_deletes:
@@ -491,6 +497,7 @@ async def watch_updates(msg):
491497
delta=meta.num_pending,
492498
created=meta.timestamp,
493499
operation=op,
500+
headers=msg.header,
494501
)
495502
await watcher._updates.put(entry)
496503

nats/tests/test_js.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5160,3 +5160,70 @@ async def test_add_stream_invalid_names(self):
51605160
),
51615161
):
51625162
await js.add_stream(name=name)
5163+
5164+
5165+
class V211FeaturesTest(SingleJetStreamServerTestCase):
5166+
5167+
@async_test
5168+
async def test_subject_delete_marker_ttl(self):
5169+
nc = NATS()
5170+
await nc.connect()
5171+
5172+
js = nc.jetstream()
5173+
stream = await js.add_stream(
5174+
nats.js.api.StreamConfig(subject_delete_marker_ttl=1),
5175+
name="test-subject-delete-marker-ttl",
5176+
subjects=["delete.marker"],
5177+
)
5178+
assert isinstance(stream, nats.js.api.StreamInfo)
5179+
assert isinstance(stream.config, nats.js.api.StreamConfig)
5180+
assert stream.config.subject_delete_marker_ttl == 1
5181+
5182+
messages: list[Msg] = []
5183+
5184+
async def save_msg(msg):
5185+
messages.append(msg)
5186+
5187+
sub = await js.subscribe("delete.marker", cb=save_msg)
5188+
await js.publish("delete.marker", b"test-message", msg_ttl=1)
5189+
5190+
await asyncio.sleep(1.1)
5191+
assert len(messages) == 2
5192+
assert messages[0].data == b"test-message"
5193+
assert messages[0].headers and messages[0].headers["Nats-TTL"] == "1"
5194+
5195+
assert messages[1].data == b""
5196+
m1_headers = messages[1].headers
5197+
assert m1_headers
5198+
assert m1_headers["Nats-Marker-Reason"] == "MaxAge"
5199+
assert m1_headers["Nats-TTL"] == "1s"
5200+
5201+
@async_test
5202+
async def test_create_key_value_ttl(self):
5203+
nc = NATS()
5204+
await nc.connect()
5205+
5206+
js = nc.jetstream()
5207+
5208+
kv = await js.create_key_value(
5209+
nats.js.api.KeyValueConfig("delete_marker_kv", ttl=1, limit_marker_ttl=1)
5210+
)
5211+
5212+
watcher = await kv.watchall()
5213+
await watcher.updates()
5214+
await kv.create("key", b"value", msg_ttl=1)
5215+
5216+
update = await watcher.updates()
5217+
assert update is not None
5218+
assert update.key == "key"
5219+
assert update.revision == 1
5220+
assert update.headers and update.headers["Nats-TTL"] == "1"
5221+
5222+
await asyncio.sleep(1.1)
5223+
5224+
update = await watcher.updates()
5225+
assert update is not None
5226+
assert update.key == "key"
5227+
assert not update.value
5228+
assert update.headers and update.headers["Nats-Marker-Reason"] == "MaxAge"
5229+
assert update.operation == "DEL"

0 commit comments

Comments
 (0)