Skip to content

Commit d027e76

Browse files
committed
dekaf: Add support for not_before and not_after now that we have MaterializationSpec::Binding from which to pull them
1 parent 5575340 commit d027e76

File tree

2 files changed

+54
-16
lines changed

2 files changed

+54
-16
lines changed

crates/dekaf/src/read.rs

+28-12
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@ pub struct Read {
1717
/// Most-recent journal write head observed by this Read.
1818
pub(crate) last_write_head: i64,
1919

20-
key_ptr: Vec<doc::Pointer>, // Pointers to the document key.
21-
key_schema: avro::Schema, // Avro schema when encoding keys.
22-
key_schema_id: u32, // Registry ID of the key's schema.
23-
meta_op_ptr: doc::Pointer, // Location of document op (currently always `/_meta/op`).
24-
not_before: uuid::Clock, // Not before this clock.
25-
stream: ReadJsonLines, // Underlying document stream.
26-
uuid_ptr: doc::Pointer, // Location of document UUID.
27-
value_schema_id: u32, // Registry ID of the value's schema.
20+
key_ptr: Vec<doc::Pointer>, // Pointers to the document key.
21+
key_schema: avro::Schema, // Avro schema when encoding keys.
22+
key_schema_id: u32, // Registry ID of the key's schema.
23+
meta_op_ptr: doc::Pointer, // Location of document op (currently always `/_meta/op`).
24+
not_before: Option<uuid::Clock>, // Not before this clock.
25+
not_after: Option<uuid::Clock>, // Not after this clock.
26+
stream: ReadJsonLines, // Underlying document stream.
27+
uuid_ptr: doc::Pointer, // Location of document UUID.
28+
value_schema_id: u32, // Registry ID of the value's schema.
2829
extractors: Vec<(avro::Schema, utils::CustomizableExtractor)>, // Projections to apply
2930

3031
// Keep these details around so we can create a new ReadRequest if we need to skip forward
@@ -69,7 +70,10 @@ impl Read {
6970
rewrite_offsets_from: Option<i64>,
7071
auth: &SessionAuthentication,
7172
) -> anyhow::Result<Self> {
72-
let (not_before_sec, _) = collection.not_before.to_unix();
73+
let (not_before_sec, _) = collection
74+
.not_before
75+
.map(|not_before| not_before.to_unix())
76+
.unwrap_or((0, 0));
7377

7478
let stream = client.clone().read_json_lines(
7579
broker::ReadRequest {
@@ -94,6 +98,7 @@ impl Read {
9498
key_schema_id,
9599
meta_op_ptr: doc::Pointer::from_str("/_meta/op"),
96100
not_before: collection.not_before,
101+
not_after: collection.not_after,
97102
stream,
98103
uuid_ptr: collection.uuid_ptr.clone(),
99104
value_schema_id,
@@ -217,13 +222,24 @@ impl Read {
217222
};
218223
let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?;
219224

220-
if clock < self.not_before {
225+
// Is this a non-content control document, such as a transaction ACK?
226+
let is_control = flags.is_ack();
227+
228+
let should_skip = if let Some(not_before) = self.not_before {
229+
clock < not_before
230+
} else if let Some(not_after) = self.not_after {
231+
clock > not_after
232+
} else {
233+
false
234+
};
235+
236+
// Only filter non-ack documents to allow the consumer to make and
237+
// record progress scanning through the offset range.
238+
if !is_control && should_skip {
221239
continue;
222240
}
223241
last_source_published_at = Some(clock);
224242

225-
// Is this a non-content control document, such as a transaction ACK?
226-
let is_control = flags.is_ack();
227243
// Is this a deletion?
228244
let is_deletion = matches!(
229245
self.meta_op_ptr.query(root.get()),

crates/dekaf/src/topology.rs

+26-4
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,8 @@ pub struct Collection {
8888
pub journal_client: journal::Client,
8989
pub key_ptr: Vec<doc::Pointer>,
9090
pub key_schema: avro::Schema,
91-
pub not_before: uuid::Clock,
91+
pub not_before: Option<uuid::Clock>,
92+
pub not_after: Option<uuid::Clock>,
9293
pub partitions: Vec<Partition>,
9394
pub spec: flow::CollectionSpec,
9495
pub uuid_ptr: doc::Pointer,
@@ -132,8 +133,6 @@ impl Collection {
132133
pg_client: &postgrest::Postgrest,
133134
topic_name: &str,
134135
) -> anyhow::Result<Option<Self>> {
135-
let not_before = uuid::Clock::default();
136-
137136
let binding = if let SessionAuthentication::Task(task_auth) = auth {
138137
if let Some((binding, _)) = task_auth.get_binding_for_topic(topic_name) {
139138
Some(binding)
@@ -214,6 +213,25 @@ impl Collection {
214213

215214
let key_schema = avro::key_to_avro(&key_ptr, collection_schema_shape);
216215

216+
let (not_before, not_after) = if let Some(binding) = binding {
217+
(
218+
binding.not_before.map(|b| {
219+
uuid::Clock::from_unix(
220+
b.seconds.try_into().unwrap(),
221+
b.nanos.try_into().unwrap(),
222+
)
223+
}),
224+
binding.not_after.map(|b| {
225+
uuid::Clock::from_unix(
226+
b.seconds.try_into().unwrap(),
227+
b.nanos.try_into().unwrap(),
228+
)
229+
}),
230+
)
231+
} else {
232+
(None, None)
233+
};
234+
217235
tracing::debug!(
218236
collection_name,
219237
partitions = partitions.len(),
@@ -226,6 +244,7 @@ impl Collection {
226244
key_ptr,
227245
key_schema,
228246
not_before,
247+
not_after,
229248
partitions,
230249
spec: collection_spec,
231250
uuid_ptr,
@@ -330,7 +349,10 @@ impl Collection {
330349
}));
331350
}
332351
_ => {
333-
let (not_before_sec, _) = self.not_before.to_unix();
352+
let (not_before_sec, _) = self
353+
.not_before
354+
.map(|not_before| not_before.to_unix())
355+
.unwrap_or((0, 0));
334356

335357
let begin_mod_time = if timestamp_millis == -1 {
336358
i64::MAX // Sentinel for "largest available offset",

0 commit comments

Comments
 (0)