Skip to content

Commit 88af26b

Browse files
committed
dekaf: Add support for not_before and not_after now that we have MaterializationSpec::Binding from which to pull them
1 parent dc3afe4 commit 88af26b

File tree

3 files changed

+55
-18
lines changed

3 files changed

+55
-18
lines changed

crates/dekaf/src/read.rs

+27-12
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ pub struct Read {
1717
/// Most-recent journal write head observed by this Read.
1818
pub(crate) last_write_head: i64,
1919

20-
key_ptr: Vec<doc::Pointer>, // Pointers to the document key.
21-
key_schema: avro::Schema, // Avro schema when encoding keys.
22-
key_schema_id: u32, // Registry ID of the key's schema.
23-
meta_op_ptr: doc::Pointer, // Location of document op (currently always `/_meta/op`).
24-
not_before: uuid::Clock, // Not before this clock.
25-
stream: ReadJsonLines, // Underlying document stream.
26-
uuid_ptr: doc::Pointer, // Location of document UUID.
27-
value_schema_id: u32, // Registry ID of the value's schema.
20+
key_ptr: Vec<doc::Pointer>, // Pointers to the document key.
21+
key_schema: avro::Schema, // Avro schema when encoding keys.
22+
key_schema_id: u32, // Registry ID of the key's schema.
23+
meta_op_ptr: doc::Pointer, // Location of document op (currently always `/_meta/op`).
24+
not_before: Option<uuid::Clock>, // Not before this clock.
25+
not_after: Option<uuid::Clock>, // Not after this clock.
26+
stream: ReadJsonLines, // Underlying document stream.
27+
uuid_ptr: doc::Pointer, // Location of document UUID.
28+
value_schema_id: u32, // Registry ID of the value's schema.
2829
extractors: Vec<(avro::Schema, utils::CustomizableExtractor)>, // Projections to apply
2930

3031
// Keep these details around so we can create a new ReadRequest if we need to skip forward
@@ -69,7 +70,10 @@ impl Read {
6970
rewrite_offsets_from: Option<i64>,
7071
auth: &SessionAuthentication,
7172
) -> anyhow::Result<Self> {
72-
let (not_before_sec, _) = collection.not_before.to_unix();
73+
let (not_before_sec, _) = collection
74+
.not_before
75+
.map(|not_before| not_before.to_unix())
76+
.unwrap_or((0, 0));
7377

7478
let stream = client.clone().read_json_lines(
7579
broker::ReadRequest {
@@ -94,6 +98,7 @@ impl Read {
9498
key_schema_id,
9599
meta_op_ptr: doc::Pointer::from_str("/_meta/op"),
96100
not_before: collection.not_before,
101+
not_after: collection.not_after,
97102
stream,
98103
uuid_ptr: collection.uuid_ptr.clone(),
99104
value_schema_id,
@@ -217,13 +222,23 @@ impl Read {
217222
};
218223
let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?;
219224

220-
if clock < self.not_before {
225+
// Is this a non-content control document, such as a transaction ACK?
226+
let is_control = flags.is_ack();
227+
228+
let should_skip = match (self.not_before, self.not_after) {
229+
(Some(not_before), Some(not_after)) => clock < not_before || clock > not_after,
230+
(Some(not_before), None) => clock < not_before,
231+
(None, Some(not_after)) => clock > not_after,
232+
(None, None) => false,
233+
};
234+
235+
// Only filter non-ack documents to allow the consumer to make and
236+
// record progress scanning through the offset range.
237+
if !is_control && should_skip {
221238
continue;
222239
}
223240
last_source_published_at = Some(clock);
224241

225-
// Is this a non-content control document, such as a transaction ACK?
226-
let is_control = flags.is_ack();
227242
// Is this a deletion?
228243
let is_deletion = matches!(
229244
self.meta_op_ptr.query(root.get()),

crates/dekaf/src/session.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1193,8 +1193,8 @@ impl Session {
11931193
))?
11941194
.committed_offset;
11951195

1196-
metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64);
1197-
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, committed_offset, "Committed offset");
1196+
metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name.clone()).set(committed_offset as f64);
1197+
tracing::info!(topic_name = decrypted_name.as_str(), journal_name, partitions = ?topic.partitions, committed_offset, "Committed offset");
11981198
}
11991199
}
12001200
}

crates/dekaf/src/topology.rs

+26-4
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ pub struct Collection {
8888
pub journal_client: journal::Client,
8989
pub key_ptr: Vec<doc::Pointer>,
9090
pub key_schema: avro::Schema,
91-
pub not_before: uuid::Clock,
91+
pub not_before: Option<uuid::Clock>,
92+
pub not_after: Option<uuid::Clock>,
9293
pub partitions: Vec<Partition>,
9394
pub spec: flow::CollectionSpec,
9495
pub uuid_ptr: doc::Pointer,
@@ -132,8 +133,6 @@ impl Collection {
132133
pg_client: &postgrest::Postgrest,
133134
topic_name: &str,
134135
) -> anyhow::Result<Option<Self>> {
135-
let not_before = uuid::Clock::default();
136-
137136
let binding = if let SessionAuthentication::Task(task_auth) = auth {
138137
if let Some((binding, _)) = task_auth.get_binding_for_topic(topic_name) {
139138
Some(binding)
@@ -214,6 +213,25 @@ impl Collection {
214213

215214
let key_schema = avro::key_to_avro(&key_ptr, collection_schema_shape);
216215

216+
let (not_before, not_after) = if let Some(binding) = binding {
217+
(
218+
binding.not_before.map(|b| {
219+
uuid::Clock::from_unix(
220+
b.seconds.try_into().unwrap(),
221+
b.nanos.try_into().unwrap(),
222+
)
223+
}),
224+
binding.not_after.map(|b| {
225+
uuid::Clock::from_unix(
226+
b.seconds.try_into().unwrap(),
227+
b.nanos.try_into().unwrap(),
228+
)
229+
}),
230+
)
231+
} else {
232+
(None, None)
233+
};
234+
217235
tracing::debug!(
218236
collection_name,
219237
partitions = partitions.len(),
@@ -226,6 +244,7 @@ impl Collection {
226244
key_ptr,
227245
key_schema,
228246
not_before,
247+
not_after,
229248
partitions,
230249
spec: collection_spec,
231250
uuid_ptr,
@@ -330,7 +349,10 @@ impl Collection {
330349
}));
331350
}
332351
_ => {
333-
let (not_before_sec, _) = self.not_before.to_unix();
352+
let (not_before_sec, _) = self
353+
.not_before
354+
.map(|not_before| not_before.to_unix())
355+
.unwrap_or((0, 0));
334356

335357
let begin_mod_time = if timestamp_millis == -1 {
336358
i64::MAX // Sentinel for "largest available offset",

0 commit comments

Comments
 (0)