Skip to content

Commit a44cbc0

Browse files
committed
Add per-stream retention metadata and create surface
• [MSN] 1 mission driving 1 epic forward • [EXC] 2 stories queued, none actively executing • [HLT] 1 warnings, no structural errors detected
1 parent 84d7769 commit a44cbc0

14 files changed

Lines changed: 301 additions & 23 deletions

File tree

.keel/epics/VHUAlZWZG/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ mission: VHUAhwLlS
2222
## Voyages
2323

2424
<!-- BEGIN GENERATED -->
25-
**Progress:** 0/1 voyages complete, 0/3 stories done
25+
**Progress:** 0/1 voyages complete, 1/3 stories done
2626
| Voyage | Status | Stories |
2727
|--------|--------|---------|
28-
| [Deliver Per-Stream Retention Configuration And Visibility](voyages/VHUApus0L/) | in-progress | 0/3 |
28+
| [Deliver Per-Stream Retention Configuration And Visibility](voyages/VHUApus0L/) | in-progress | 1/3 |
2929
<!-- END GENERATED -->

.keel/epics/VHUAlZWZG/voyages/VHUApus0L/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ started_at: 2026-04-21T20:15:22
2727
## Stories
2828

2929
<!-- BEGIN GENERATED -->
30-
**Progress:** 0/3 stories complete
30+
**Progress:** 1/3 stories complete
3131

3232
| Title | Type | Status |
3333
|-------|------|--------|
34-
| [Add Per-Stream Retention Metadata And Create-Time Surface](../../../../stories/VHUAuqNpn/README.md) | feat | in-progress |
34+
| [Add Per-Stream Retention Metadata And Create-Time Surface](../../../../stories/VHUAuqNpn/README.md) | feat | done |
3535
| [Enforce Retention And Surface Retained Frontier Status](../../../../stories/VHUAuquph/README.md) | feat | backlog |
3636
| [Publish Retention Proof Coverage And Operator Guidance](../../../../stories/VHUAurAqP/README.md) | feat | backlog |
3737
<!-- END GENERATED -->

.keel/stories/VHUAuqNpn/README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
---
22
# system-managed
33
id: VHUAuqNpn
4-
status: in-progress
4+
status: done
55
created_at: 2026-04-21T20:10:53
6-
updated_at: 2026-04-21T20:15:22
6+
updated_at: 2026-04-21T20:26:47
77
# authored
88
title: Add Per-Stream Retention Metadata And Create-Time Surface
99
type: feat
1010
operator-signal:
1111
scope: VHUAlZWZG/VHUApus0L
1212
index: 1
1313
started_at: 2026-04-21T20:15:22
14+
submitted_at: 2026-04-21T20:26:46
15+
completed_at: 2026-04-21T20:26:47
1416
---
1517

1618
# Add Per-Stream Retention Metadata And Create-Time Surface
@@ -21,7 +23,7 @@ Add the explicit per-stream retention policy model, thread it through stream cre
2123

2224
## Acceptance Criteria
2325

24-
- [ ] [SRS-01/AC-01] Stream metadata can represent `retention = none` plus optional `max_age_days` and `max_bytes` without changing existing unconfigured streams. <!-- [SRS-01/AC-01] verify: manual, SRS-01:start, SRS-01:end, proof: ac-1.log -->
25-
- [ ] [SRS-02/AC-01] Stream creation surfaces accept `--retention-max-age-days` and `--retention-max-bytes` so retention is configured explicitly per stream. <!-- [SRS-02/AC-01] verify: manual, SRS-02:start, SRS-02:end, proof: ac-2.log -->
26-
- [ ] [SRS-03/AC-01] `transit streams list` shows `retention_age` and `retention_bytes` in human and JSON output for each stream. <!-- [SRS-03/AC-01] verify: manual, SRS-03:start, SRS-03:end, proof: ac-3.log -->
27-
- [ ] [SRS-NFR-01/AC-01] Streams without an explicit retention policy continue to behave as `retention = none`; no implicit `30 day` default is introduced. <!-- [SRS-NFR-01/AC-01] verify: manual, SRS-NFR-01:start, SRS-NFR-01:end, proof: ac-4.log -->
26+
- [x] [SRS-01/AC-01] Stream metadata can represent `retention = none` plus optional `max_age_days` and `max_bytes` without changing existing unconfigured streams. <!-- [SRS-01/AC-01] verify: manual, SRS-01:start, SRS-01:end, proof: ac-1.log -->
27+
- [x] [SRS-02/AC-01] Stream creation surfaces accept `--retention-max-age-days` and `--retention-max-bytes` so retention is configured explicitly per stream. <!-- [SRS-02/AC-01] verify: manual, SRS-02:start, SRS-02:end, proof: ac-2.log -->
28+
- [x] [SRS-03/AC-01] `transit streams list` shows `retention_age` and `retention_bytes` in human and JSON output for each stream. <!-- [SRS-03/AC-01] verify: manual, SRS-03:start, SRS-03:end, proof: ac-3.log -->
29+
- [x] [SRS-NFR-01/AC-01] Streams without an explicit retention policy continue to behave as `retention = none`; no implicit `30 day` default is introduced. <!-- [SRS-NFR-01/AC-01] verify: manual, SRS-NFR-01:start, SRS-NFR-01:end, proof: ac-4.log -->

.keel/stories/VHUAuqNpn/ac-1.log

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Added `StreamRetentionPolicy` to the shared kernel and threaded optional retention metadata through
2+
`StreamDescriptor`, `LocalStreamStatus`, and `RemoteStreamStatus` with `None` as the default.
3+
4+
Focused proof:
5+
- `cargo test -p transit-core root_descriptor_preserves_retention_policy`
6+
- `cargo test -p transit-core list_streams_returns_sorted_descriptors`
7+
8+
Observed result:
9+
- configured streams preserve `max_age_days` and `max_bytes`
10+
- unconfigured streams continue to report no retention policy

.keel/stories/VHUAuqNpn/ac-2.log

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Extended the explicit stream creation surfaces to accept retention policy inputs:
2+
- CLI flags: `--retention-max-age-days`, `--retention-max-bytes`
3+
- Hosted client/server request surface: `create_root_with_retention(...)`
4+
5+
Focused proof:
6+
- `cargo test -p transit-core remote_stream_listing_reports_descriptors_and_status`
7+
- `cargo test -p transit-cli --bin transit streams_produce_and_consume_cover_the_kcat_style_remote_flow`
8+
9+
Observed result:
10+
- retention policy is accepted at stream creation time and propagated through hosted creation flows

.keel/stories/VHUAuqNpn/ac-3.log

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
Updated `transit streams list` to surface retention metadata in both table and JSON output via
2+
`retention_age` and `retention_bytes`.
3+
4+
Focused proof:
5+
- `cargo test -p transit-core remote_stream_listing_reports_descriptors_and_status`
6+
- `cargo test -p transit-cli --bin transit streams_produce_and_consume_cover_the_kcat_style_remote_flow`
7+
8+
Observed result:
9+
- listed streams include retention age/bytes when configured
10+
- unconfigured streams render `none` in human output and `null` in JSON output

.keel/stories/VHUAuqNpn/ac-4.log

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
Preserved the default retention semantics as `none`.
2+
3+
Implementation guardrails:
4+
- `StreamDescriptor::root/branch/merge` default retention policy to `None`
5+
- retention is only set when an explicit policy is parsed or passed
6+
- `parse_retention_policy(...)` returns `None` when both flags are omitted
7+
8+
Focused proof:
9+
- `cargo test -p transit-core root_descriptor_preserves_retention_policy`
10+
- `cargo test -p transit-cli --bin transit streams_produce_and_consume_cover_the_kcat_style_remote_flow`
11+
12+
Observed result:
13+
- no implicit `30 day` retention default is introduced anywhere in the creation or listing path
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
id: VHUAuqNpn
2+
git_sha: 84d7769fc1b5907aeb223f73f7bea7f6e658a787
3+
evidence: {}

crates/transit-cli/src/main.rs

Lines changed: 86 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use transit_core::engine::{
1616
};
1717
use transit_core::kernel::{
1818
LineageMetadata, MergePolicy, MergePolicyKind, MergeSpec, Offset, StreamDescriptor, StreamId,
19-
StreamLineage, StreamPosition,
19+
StreamLineage, StreamPosition, StreamRetentionPolicy,
2020
};
2121
use transit_core::membership::NodeId;
2222
use transit_core::object_store_support::{
@@ -166,6 +166,12 @@ struct StreamsCreateArgs {
166166
/// Optional lineage labels in key=value form.
167167
#[arg(long = "label")]
168168
labels: Vec<String>,
169+
/// Optional retention maximum age in days.
170+
#[arg(long = "retention-max-age-days")]
171+
retention_max_age_days: Option<u64>,
172+
/// Optional retention maximum retained bytes.
173+
#[arg(long = "retention-max-bytes")]
174+
retention_max_bytes: Option<u64>,
169175
/// Render create output as JSON.
170176
#[arg(long)]
171177
json: bool,
@@ -421,6 +427,10 @@ struct ServerCreateRootArgs {
421427
reason: Option<String>,
422428
#[arg(long = "label")]
423429
labels: Vec<String>,
430+
#[arg(long = "retention-max-age-days")]
431+
retention_max_age_days: Option<u64>,
432+
#[arg(long = "retention-max-bytes")]
433+
retention_max_bytes: Option<u64>,
424434
#[arg(long)]
425435
json: bool,
426436
}
@@ -1044,12 +1054,15 @@ fn summarize_remote_stream_summary(
10441054
) -> RemoteStreamSummaryResult {
10451055
let (lineage_kind, parents, merge_base) = summarize_descriptor_lineage(summary.descriptor());
10461056
let next_offset = summary.status().next_offset().value();
1057+
let retention_policy = summary.descriptor().retention_policy();
10471058

10481059
RemoteStreamSummaryResult {
10491060
stream_id: summary.descriptor().stream_id.as_str().to_owned(),
10501061
lineage_kind,
10511062
parents,
10521063
merge_base,
1064+
retention_age: retention_policy.and_then(StreamRetentionPolicy::max_age_days),
1065+
retention_bytes: retention_policy.and_then(StreamRetentionPolicy::max_bytes),
10531066
record_count: next_offset,
10541067
head_offset: next_offset.checked_sub(1),
10551068
active_record_count: summary.status().active_record_count(),
@@ -1745,6 +1758,8 @@ struct RemoteStreamSummaryResult {
17451758
lineage_kind: String,
17461759
parents: Vec<String>,
17471760
merge_base: Option<String>,
1761+
retention_age: Option<u64>,
1762+
retention_bytes: Option<u64>,
17481763
record_count: u64,
17491764
head_offset: Option<u64>,
17501765
active_record_count: u64,
@@ -1789,6 +1804,8 @@ struct RemoteStreamStatusResult {
17891804
durability: String,
17901805
topology: String,
17911806
stream_id: String,
1807+
retention_age: Option<u64>,
1808+
retention_bytes: Option<u64>,
17921809
next_offset: u64,
17931810
active_record_count: u64,
17941811
active_segment_start_offset: u64,
@@ -3315,10 +3332,13 @@ fn run_streams_create(
33153332
) -> Result<RemoteStreamStatusResult> {
33163333
let client = build_remote_client(server_addr, args.connection_io_timeout_ms);
33173334
let stream_id = parse_stream_id_arg(&args.stream_id)?;
3335+
let retention_policy =
3336+
parse_retention_policy(args.retention_max_age_days, args.retention_max_bytes)?;
33183337
let created = client
3319-
.create_root(
3338+
.create_root_with_retention(
33203339
&stream_id,
33213340
parse_lineage_metadata(args.actor, args.reason, args.labels)?,
3341+
retention_policy,
33223342
)
33233343
.with_context(|| format!("create remote root {}", stream_id.as_str()))?;
33243344

@@ -3411,10 +3431,13 @@ fn run_consume(server_addr: SocketAddr, args: ConsumeArgs) -> Result<ConsumeResu
34113431
fn run_remote_create_root(args: ServerCreateRootArgs) -> Result<RemoteStreamStatusResult> {
34123432
let client = RemoteClient::new(args.server_addr);
34133433
let stream_id = parse_stream_id_arg(&args.stream_id)?;
3434+
let retention_policy =
3435+
parse_retention_policy(args.retention_max_age_days, args.retention_max_bytes)?;
34143436
let created = client
3415-
.create_root(
3437+
.create_root_with_retention(
34163438
&stream_id,
34173439
parse_lineage_metadata(args.actor, args.reason, args.labels)?,
3440+
retention_policy,
34183441
)
34193442
.with_context(|| format!("create remote root {}", stream_id.as_str()))?;
34203443

@@ -3954,6 +3977,8 @@ fn summarize_remote_stream_status(
39543977
durability: response.ack().durability().to_owned(),
39553978
topology: render_topology(response.ack().topology()),
39563979
stream_id: response.body().stream_id().as_str().to_owned(),
3980+
retention_age: response.body().retention_max_age_days(),
3981+
retention_bytes: response.body().retention_max_bytes(),
39573982
next_offset: response.body().next_offset().value(),
39583983
active_record_count: response.body().active_record_count(),
39593984
active_segment_start_offset: response.body().active_segment_start_offset().value(),
@@ -4304,6 +4329,8 @@ fn run_integrity_server_parity(root: &Path) -> Result<IntegrityProofServerParity
43044329
actor: Some("mission".into()),
43054330
reason: Some("integrity-server-proof".into()),
43064331
labels: vec!["kind=integrity-root".into()],
4332+
retention_max_age_days: None,
4333+
retention_max_bytes: None,
43074334
json: false,
43084335
})?;
43094336
run_remote_append(ServerAppendArgs {
@@ -5807,6 +5834,20 @@ fn render_remote_stream_status(result: RemoteStreamStatusResult, json: bool) ->
58075834
println!("durability: {}", result.durability);
58085835
println!("topology: {}", result.topology);
58095836
println!("stream: {}", result.stream_id);
5837+
println!(
5838+
"retention age: {}",
5839+
result
5840+
.retention_age
5841+
.map(|retention_age| format!("{retention_age}d"))
5842+
.unwrap_or_else(|| "none".to_owned())
5843+
);
5844+
println!(
5845+
"retention bytes: {}",
5846+
result
5847+
.retention_bytes
5848+
.map(|retention_bytes| retention_bytes.to_string())
5849+
.unwrap_or_else(|| "none".to_owned())
5850+
);
58105851
println!("next offset: {}", result.next_offset);
58115852
println!("active records: {}", result.active_record_count);
58125853
println!(
@@ -5829,6 +5870,8 @@ fn render_streams_list(result: RemoteStreamListResult, json: bool) -> Result<()>
58295870
"lineage".to_owned(),
58305871
"parents".to_owned(),
58315872
"merge_base".to_owned(),
5873+
"retention_age".to_owned(),
5874+
"retention_bytes".to_owned(),
58325875
"records".to_owned(),
58335876
"head_offset".to_owned(),
58345877
"active_records".to_owned(),
@@ -5849,6 +5892,14 @@ fn render_streams_list(result: RemoteStreamListResult, json: bool) -> Result<()>
58495892
stream.parents.join(", ")
58505893
},
58515894
stream.merge_base.unwrap_or_else(|| "-".to_owned()),
5895+
stream
5896+
.retention_age
5897+
.map(|retention_age| format!("{retention_age}d"))
5898+
.unwrap_or_else(|| "none".to_owned()),
5899+
stream
5900+
.retention_bytes
5901+
.map(|retention_bytes| retention_bytes.to_string())
5902+
.unwrap_or_else(|| "none".to_owned()),
58525903
stream.record_count.to_string(),
58535904
stream
58545905
.head_offset
@@ -5881,6 +5932,19 @@ fn render_streams_list(result: RemoteStreamListResult, json: bool) -> Result<()>
58815932
Ok(())
58825933
}
58835934

5935+
fn parse_retention_policy(
5936+
retention_max_age_days: Option<u64>,
5937+
retention_max_bytes: Option<u64>,
5938+
) -> Result<Option<StreamRetentionPolicy>> {
5939+
match (retention_max_age_days, retention_max_bytes) {
5940+
(None, None) => Ok(None),
5941+
(retention_max_age_days, retention_max_bytes) => Ok(Some(StreamRetentionPolicy::new(
5942+
retention_max_age_days,
5943+
retention_max_bytes,
5944+
)?)),
5945+
}
5946+
}
5947+
58845948
fn render_ascii_table_row(cells: &[String], widths: &[usize]) -> String {
58855949
let padded = cells
58865950
.iter()
@@ -6368,16 +6432,25 @@ mod tests {
63686432
actor: Some("cli".into()),
63696433
reason: Some("create".into()),
63706434
labels: vec![],
6435+
retention_max_age_days: Some(30),
6436+
retention_max_bytes: Some(10_737_418_240),
63716437
json: true,
63726438
},
63736439
)
63746440
.expect("create stream");
63756441
assert_eq!(created.stream_id, "task.root");
6442+
assert_eq!(created.retention_age, Some(30));
6443+
assert_eq!(created.retention_bytes, Some(10_737_418_240));
63766444

63776445
let listed_after = run_streams_list(server_addr, None).expect("list streams");
63786446
assert_eq!(listed_after.stream_count, 1);
63796447
assert_eq!(listed_after.streams[0].stream_id, "task.root");
63806448
assert_eq!(listed_after.streams[0].lineage_kind, "root");
6449+
assert_eq!(listed_after.streams[0].retention_age, Some(30));
6450+
assert_eq!(
6451+
listed_after.streams[0].retention_bytes,
6452+
Some(10_737_418_240)
6453+
);
63816454

63826455
let produced = run_produce(
63836456
server_addr,
@@ -6437,6 +6510,8 @@ mod tests {
64376510
actor: Some("test".into()),
64386511
reason: Some("cli".into()),
64396512
labels: vec!["kind=root".into()],
6513+
retention_max_age_days: None,
6514+
retention_max_bytes: None,
64406515
json: true,
64416516
})
64426517
.expect("create root stream");
@@ -6544,6 +6619,8 @@ mod tests {
65446619
actor: Some("classifier".into()),
65456620
reason: Some("bootstrap".into()),
65466621
labels: vec!["kind=root".into()],
6622+
retention_max_age_days: None,
6623+
retention_max_bytes: None,
65476624
json: true,
65486625
})
65496626
.expect("create root stream");
@@ -6596,6 +6673,8 @@ mod tests {
65966673
actor: Some("test".into()),
65976674
reason: Some("batch".into()),
65986675
labels: vec!["kind=root".into()],
6676+
retention_max_age_days: None,
6677+
retention_max_bytes: None,
65996678
json: true,
66006679
})
66016680
.expect("create root stream");
@@ -6639,6 +6718,8 @@ mod tests {
66396718
actor: Some("proof".into()),
66406719
reason: Some("batch".into()),
66416720
labels: vec![],
6721+
retention_max_age_days: None,
6722+
retention_max_bytes: None,
66426723
json: true,
66436724
})
66446725
.expect("create root stream");
@@ -6689,6 +6770,8 @@ mod tests {
66896770
actor: Some("proof".into()),
66906771
reason: Some("bootstrap".into()),
66916772
labels: vec![],
6773+
retention_max_age_days: None,
6774+
retention_max_bytes: None,
66926775
json: true,
66936776
})
66946777
.expect("create root stream");

crates/transit-client/src/client.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ use std::time::Duration;
55
use crate::projection::{
66
ProjectionReadConsumer, ProjectionReadOutcome, ProjectionReadRequest, projection_revision_for,
77
};
8-
use transit_core::kernel::{LineageMetadata, MergeSpec, Offset, StreamId, StreamPosition};
8+
use transit_core::kernel::{
9+
LineageMetadata, MergeSpec, Offset, StreamId, StreamPosition, StreamRetentionPolicy,
10+
};
911
use transit_core::server::{
1012
RemoteAcknowledged, RemoteAppendOutcome, RemoteBatchAppendOutcome, RemoteClient,
1113
RemoteClientError, RemoteDeletedStreamOutcome, RemoteLineageOutcome, RemoteReadOutcome,
@@ -45,6 +47,16 @@ impl TransitClient {
4547
self.inner.create_root(stream_id, metadata)
4648
}
4749

50+
pub fn create_root_with_retention(
51+
&self,
52+
stream_id: &StreamId,
53+
metadata: LineageMetadata,
54+
retention_policy: Option<StreamRetentionPolicy>,
55+
) -> ClientResult<RemoteAcknowledged<RemoteStreamStatus>> {
56+
self.inner
57+
.create_root_with_retention(stream_id, metadata, retention_policy)
58+
}
59+
4860
pub fn list_streams(&self) -> ClientResult<RemoteAcknowledged<RemoteStreamListOutcome>> {
4961
self.inner.list_streams()
5062
}

0 commit comments

Comments
 (0)