diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index d665006a63..35c6d44077 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -17,14 +17,15 @@ pub struct Read { /// Most-recent journal write head observed by this Read. pub(crate) last_write_head: i64, - key_ptr: Vec, // Pointers to the document key. - key_schema: avro::Schema, // Avro schema when encoding keys. - key_schema_id: u32, // Registry ID of the key's schema. - meta_op_ptr: doc::Pointer, // Location of document op (currently always `/_meta/op`). - not_before: uuid::Clock, // Not before this clock. - stream: ReadJsonLines, // Underlying document stream. - uuid_ptr: doc::Pointer, // Location of document UUID. - value_schema_id: u32, // Registry ID of the value's schema. + key_ptr: Vec, // Pointers to the document key. + key_schema: avro::Schema, // Avro schema when encoding keys. + key_schema_id: u32, // Registry ID of the key's schema. + meta_op_ptr: doc::Pointer, // Location of document op (currently always `/_meta/op`). + not_before: Option, // Not before this clock. + not_after: Option, // Not after this clock. + stream: ReadJsonLines, // Underlying document stream. + uuid_ptr: doc::Pointer, // Location of document UUID. + value_schema_id: u32, // Registry ID of the value's schema. extractors: Vec<(avro::Schema, utils::CustomizableExtractor)>, // Projections to apply // Keep these details around so we can create a new ReadRequest if we need to skip forward @@ -69,7 +70,10 @@ impl Read { rewrite_offsets_from: Option, auth: &SessionAuthentication, ) -> anyhow::Result { - let (not_before_sec, _) = collection.not_before.to_unix(); + let (not_before_sec, _) = collection + .not_before + .map(|not_before| not_before.to_unix()) + .unwrap_or((0, 0)); let stream = client.clone().read_json_lines( broker::ReadRequest { @@ -94,6 +98,7 @@ impl Read { key_schema_id, meta_op_ptr: doc::Pointer::from_str("/_meta/op"), not_before: collection.not_before, + not_after: collection.not_after, stream, uuid_ptr: collection.uuid_ptr.clone(), value_schema_id, @@ -217,13 +222,23 @@ impl Read { }; let (producer, clock, flags) = gazette::uuid::parse_str(uuid.as_str())?; - if clock < self.not_before { + // Is this a non-content control document, such as a transaction ACK? + let is_control = flags.is_ack(); + + let should_skip = match (self.not_before, self.not_after) { + (Some(not_before), Some(not_after)) => clock < not_before || clock > not_after, + (Some(not_before), None) => clock < not_before, + (None, Some(not_after)) => clock > not_after, + (None, None) => false, + }; + + // Only filter non-ack documents to allow the consumer to make and + // record progress scanning through the offset range. + if !is_control && should_skip { continue; } last_source_published_at = Some(clock); - // Is this a non-content control document, such as a transaction ACK? - let is_control = flags.is_ack(); // Is this a deletion? let is_deletion = matches!( self.meta_op_ptr.query(root.get()), diff --git a/crates/dekaf/src/session.rs b/crates/dekaf/src/session.rs index 3eb5df41a8..ee679d30c9 100644 --- a/crates/dekaf/src/session.rs +++ b/crates/dekaf/src/session.rs @@ -1193,8 +1193,8 @@ impl Session { ))? .committed_offset; - metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64); - tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, committed_offset, "Committed offset"); + metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name.clone()).set(committed_offset as f64); + tracing::info!(topic_name = decrypted_name.as_str(), journal_name, partitions = ?topic.partitions, committed_offset, "Committed offset"); } } } diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index bad4b8423f..297b9267a9 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -88,7 +88,8 @@ pub struct Collection { pub journal_client: journal::Client, pub key_ptr: Vec, pub key_schema: avro::Schema, - pub not_before: uuid::Clock, + pub not_before: Option, + pub not_after: Option, pub partitions: Vec, pub spec: flow::CollectionSpec, pub uuid_ptr: doc::Pointer, @@ -132,8 +133,6 @@ impl Collection { pg_client: &postgrest::Postgrest, topic_name: &str, ) -> anyhow::Result> { - let not_before = uuid::Clock::default(); - let binding = if let SessionAuthentication::Task(task_auth) = auth { if let Some((binding, _)) = task_auth.get_binding_for_topic(topic_name) { Some(binding) @@ -174,7 +173,14 @@ impl Collection { let journal_client = Self::build_journal_client(app, &auth, collection_name, &partition_template_name) .await?; - let partitions = Self::fetch_partitions(&journal_client, collection_name).await?; + + let selector = if let Some(binding) = binding { + binding.partition_selector.clone() + } else { + None + }; + + let partitions = Self::fetch_partitions(&journal_client, collection_name, selector).await?; tracing::debug!(?partitions, "Got partitions"); @@ -214,6 +220,25 @@ impl Collection { let key_schema = avro::key_to_avro(&key_ptr, collection_schema_shape); + let (not_before, not_after) = if let Some(binding) = binding { + ( + binding.not_before.map(|b| { + uuid::Clock::from_unix( + b.seconds.try_into().unwrap(), + b.nanos.try_into().unwrap(), + ) + }), + binding.not_after.map(|b| { + uuid::Clock::from_unix( + b.seconds.try_into().unwrap(), + b.nanos.try_into().unwrap(), + ) + }), + ) + } else { + (None, None) + }; + tracing::debug!( collection_name, partitions = partitions.len(), @@ -226,6 +251,7 @@ impl Collection { key_ptr, key_schema, not_before, + not_after, partitions, spec: collection_spec, uuid_ptr, @@ -280,12 +306,13 @@ impl Collection { async fn fetch_partitions( journal_client: &journal::Client, collection: &str, + partition_selector: Option, ) -> anyhow::Result> { let request = broker::ListRequest { - selector: Some(broker::LabelSelector { + selector: Some(partition_selector.unwrap_or(broker::LabelSelector { include: Some(labels::build_set([(labels::COLLECTION, collection)])), exclude: None, - }), + })), ..Default::default() }; @@ -330,7 +357,10 @@ impl Collection { })); } _ => { - let (not_before_sec, _) = self.not_before.to_unix(); + let (not_before_sec, _) = self + .not_before + .map(|not_before| not_before.to_unix()) + .unwrap_or((0, 0)); let begin_mod_time = if timestamp_millis == -1 { i64::MAX // Sentinel for "largest available offset",