support subject_delete_marker_ttl in StreamConfig and limit_marker_ttl in KeyValueConfig#814
support subject_delete_marker_ttl in StreamConfig and limit_marker_ttl in KeyValueConfig#814adambudziak wants to merge 1 commit intonats-io:mainfrom
Conversation
…l in KeyValueConfig
Review of PR adding subject_delete_marker_ttl in StreamConfig and limit_marker_ttl in KeyValueConfig for nats-server 2.11.0+ support. Key findings: - Magic strings should be extracted to constants - KeyValueConfig.as_dict() should convert limit_marker_ttl to nanoseconds - Core functionality is correct and follows existing patterns https://claude.ai/code/session_01MteA8J2yH3ky5Y6coBpEAf
…w fixes Implements PR nats-io#814 feature (closes nats-io#725) with improvements from code review: - Add subject_delete_marker_ttl field to StreamConfig with nanosecond conversion in from_response/as_dict - Add limit_marker_ttl field to KeyValueConfig, passed through to StreamConfig in create_key_value - Add headers field to KeyValue.Entry for accessing message metadata - Detect TTL-expired markers via Nats-Marker-Reason header in watcher Review improvements applied: - Extract "Nats-Marker-Reason" and "MaxAge" magic strings to module constants (NATS_MARKER_REASON, NATS_MARKER_MAX_AGE) in kv.py - Add limit_marker_ttl nanosecond conversion in KeyValueConfig.as_dict() for consistency with ttl field - Use Dict[str, str] type annotation for Entry.headers instead of bare dict - Add server version comments on KeyValueConfig.limit_marker_ttl https://claude.ai/code/session_01MteA8J2yH3ky5Y6coBpEAf
There was a problem hiding this comment.
Pull request overview
Adds client-side support for JetStream delete-marker TTLs needed for per-key TTL semantics in KeyValue (i.e., ensuring watchers can observe delete/purge-style events when keys expire).
Changes:
- Add
subject_delete_marker_ttltoStreamConfig(including ns↔s conversion in serialization/deserialization). - Add
limit_marker_ttltoKeyValueConfigand wire it into KV stream creation (subject_delete_marker_ttl). - Update KV watch handling to classify TTL-driven delete markers as
DELand surface message headers; add new tests covering both StreamConfig and KV behavior.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| nats/tests/test_js.py | Adds v2.11 feature tests for subject delete-marker TTL and KV per-key TTL delete marker behavior. |
| nats/src/nats/js/kv.py | Enhances watcher parsing for TTL delete markers and exposes message headers on KV entries. |
| nats/src/nats/js/client.py | Wires KeyValueConfig.limit_marker_ttl into KV stream creation via StreamConfig.subject_delete_marker_ttl. |
| nats/src/nats/js/api.py | Adds new config fields and duration conversions for StreamConfig/KeyValueConfig. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| nc = NATS() | ||
| await nc.connect() | ||
|
|
There was a problem hiding this comment.
Both new tests create/connect a NATS client but never close it. Please ensure await nc.close() runs even if assertions fail (e.g., via try/finally) to avoid leaking connections/tasks and causing later tests to flake.
|
|
||
| js = nc.jetstream() | ||
| stream = await js.add_stream( | ||
| nats.js.api.StreamConfig(subject_delete_marker_ttl=1), |
There was a problem hiding this comment.
This test publishes with msg_ttl=..., which requires the stream to have allow_msg_ttl=True (see existing test_publish_msg_ttl). Also, since this feature is 2.11+, add a server version guard/skip like other TTL-related tests to avoid failures on older servers.
| nats.js.api.StreamConfig(subject_delete_marker_ttl=1), | |
| nats.js.api.StreamConfig(subject_delete_marker_ttl=1, allow_msg_ttl=True), |
| await asyncio.sleep(1.1) | ||
| assert len(messages) == 2 |
There was a problem hiding this comment.
Relying on asyncio.sleep(1.1) and then asserting an exact message count is timing-sensitive and can flake under load/slow CI. Prefer waiting until 2 messages are observed (e.g., with an asyncio.Event/Queue and asyncio.wait_for) instead of a fixed sleep.
| async def test_create_key_value_ttl(self): | ||
| nc = NATS() | ||
| await nc.connect() | ||
|
|
||
| js = nc.jetstream() | ||
|
|
||
| kv = await js.create_key_value(nats.js.api.KeyValueConfig("delete_marker_kv", ttl=1, limit_marker_ttl=1)) | ||
|
|
||
| watcher = await kv.watchall() | ||
| await watcher.updates() |
There was a problem hiding this comment.
This test creates a watcher (and a NATS connection) but never stops the watcher or closes the connection. Please call await watcher.stop() (or otherwise unsubscribe) and await nc.close() in a finally block to prevent background tasks from leaking into subsequent tests.
| value: Optional[bytes] | ||
| revision: Optional[int] | ||
| delta: Optional[int] | ||
| created: Optional[int] | ||
| operation: Optional[str] | ||
| headers: Optional[dict] = None | ||
|
|
There was a problem hiding this comment.
KeyValue.Entry.created is annotated as Optional[int], but the code assigns meta.timestamp (a datetime.datetime) and later treats it like a datetime (e.g., subtraction in purge_deletes). Since this hunk modifies Entry, update the type annotation to match actual usage; similarly, headers can be typed as Optional[Dict[str, str]] for consistency with Msg.headers.
| if msg.header: | ||
| if KV_OP in msg.header: | ||
| op = msg.header.get(KV_OP) | ||
| elif msg.header.get("Nats-Marker-Reason") == "MaxAge": # deleted by TTL | ||
| op = KV_DEL | ||
|
|
There was a problem hiding this comment.
Watchers now translate TTL delete markers (Nats-Marker-Reason: MaxAge) into a KV DEL operation, but KeyValue.get()/_get() still only treats messages with KV-Operation headers as deleted/purged. To keep behavior consistent (and to avoid returning an empty value for an expired key), update the get-path deletion detection to also recognize TTL delete markers.
| async def save_msg(msg): | ||
| messages.append(msg) | ||
|
|
||
| sub = await js.subscribe("delete.marker", cb=save_msg) |
There was a problem hiding this comment.
Variable sub is not used.
| sub = await js.subscribe("delete.marker", cb=save_msg) | |
| await js.subscribe("delete.marker", cb=save_msg) |
closes
#725
I'm quite new in the repo so likely the change is not quite good enough to be merged.