Skip to content

Commit dc3afe4

Browse files
committed
fixup! dekaf: Fix creation of upstream topics for partitioned journals
1 parent 7ac784d commit dc3afe4

File tree

2 files changed

+34
-43
lines changed

2 files changed

+34
-43
lines changed

Diff for: crates/dekaf/src/api_client.rs

+7-8
Original file line numberDiff line numberDiff line change
@@ -454,11 +454,11 @@ impl KafkaApiClient {
454454
#[instrument(skip_all)]
455455
pub async fn ensure_topics(
456456
&mut self,
457-
topic_names: Vec<(messages::TopicName, usize)>,
457+
topics: Vec<(messages::TopicName, usize)>,
458458
) -> anyhow::Result<()> {
459459
let req = messages::MetadataRequest::default()
460460
.with_topics(Some(
461-
topic_names
461+
topics
462462
.iter()
463463
.map(|(name, _)| {
464464
messages::metadata_request::MetadataRequestTopic::default()
@@ -475,7 +475,7 @@ impl KafkaApiClient {
475475
let mut topics_to_update = Vec::new();
476476
let mut topics_to_create = Vec::new();
477477

478-
for (topic_name, desired_partitions) in topic_names.iter() {
478+
for (topic_name, desired_partitions) in topics.iter() {
479479
if let Some(topic) = resp
480480
.topics
481481
.iter()
@@ -491,11 +491,10 @@ impl KafkaApiClient {
491491
);
492492
topics_to_update.push((topic_name.clone(), *desired_partitions));
493493
} else if *desired_partitions < current_partitions {
494-
tracing::warn!(
495-
topic = ?topic_name,
496-
current_partitions = topic.partitions.len(),
497-
desired_partitions = *desired_partitions,
498-
"Topic has more partitions than requested, cannot decrease partition count"
494+
anyhow::bail!("Topic {} has more partitions ({}) than requested ({}), cannot decrease partition count",
495+
topic_name.as_str(),
496+
current_partitions,
497+
desired_partitions
499498
);
500499
}
501500
} else {

Diff for: crates/dekaf/src/session.rs

+27-35
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use kafka_protocol::{
1919
},
2020
protocol::{buf::ByteBuf, Decodable, Encodable, Message, StrBytes},
2121
};
22+
use rustls::crypto::hash::Hash;
2223
use std::{cmp::max, sync::Arc};
2324
use std::{
2425
collections::{hash_map::Entry, HashMap},
@@ -1107,13 +1108,15 @@ impl Session {
11071108
mut req: messages::OffsetCommitRequest,
11081109
header: RequestHeader,
11091110
) -> anyhow::Result<messages::OffsetCommitResponse> {
1110-
let collection_partitions = self
1111-
.fetch_collection_partitions(req.topics.iter().map(|topic| &topic.name))
1112-
.await?
1113-
.into_iter()
1114-
.map(|(topic_name, partitions)| {
1115-
self.encrypt_topic_name(topic_name)
1116-
.map(|encrypted_name| (encrypted_name, partitions))
1111+
let collections = self
1112+
.fetch_collections(req.topics.iter().map(|topic| &topic.name))
1113+
.await?;
1114+
1115+
let desired_topic_partitions = collections
1116+
.iter()
1117+
.map(|(topic_name, collection)| {
1118+
self.encrypt_topic_name(topic_name.clone())
1119+
.map(|encrypted_name| (encrypted_name, collection.partitions.len()))
11171120
})
11181121
.collect::<Result<Vec<_>, _>>()?;
11191122

@@ -1128,33 +1131,23 @@ impl Session {
11281131
.connect_to_group_coordinator(req.group_id.as_str())
11291132
.await?;
11301133

1131-
client.ensure_topics(collection_partitions).await?;
1134+
client.ensure_topics(desired_topic_partitions).await?;
11321135

11331136
let mut resp = client.send_request(req.clone(), Some(header)).await?;
11341137

1135-
let auth = self
1136-
.auth
1137-
.as_mut()
1138-
.ok_or(anyhow::anyhow!("Session not authenticated"))?;
1139-
1140-
let flow_client = auth.flow_client(&self.app).await?.clone();
1141-
1142-
// Redeclare to drop mutability
1143-
let auth = self.auth.as_ref().unwrap();
1144-
11451138
for topic in resp.topics.iter_mut() {
11461139
let encrypted_name = topic.name.clone();
11471140
let decrypted_name = self.decrypt_topic_name(topic.name.to_owned())?;
11481141

1149-
let collection_partitions = Collection::new(
1150-
&self.app,
1151-
auth,
1152-
&flow_client.pg_client(),
1153-
decrypted_name.as_str(),
1154-
)
1155-
.await?
1156-
.context(format!("unable to look up partitions for {:?}", topic.name))?
1157-
.partitions;
1142+
let collection_partitions = &collections
1143+
.iter()
1144+
.find(|(topic_name, _)| topic_name == &decrypted_name)
1145+
.context(format!(
1146+
"unable to look up partitions for {:?}",
1147+
decrypted_name
1148+
))?
1149+
.1
1150+
.partitions;
11581151

11591152
for partition in &topic.partitions {
11601153
if let Some(error) = partition.error_code.err() {
@@ -1201,10 +1194,9 @@ impl Session {
12011194
.committed_offset;
12021195

12031196
metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64);
1197+
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, committed_offset, "Committed offset");
12041198
}
12051199
}
1206-
1207-
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, "Committed offset");
12081200
}
12091201

12101202
Ok(resp)
@@ -1217,12 +1209,12 @@ impl Session {
12171209
header: RequestHeader,
12181210
) -> anyhow::Result<messages::OffsetFetchResponse> {
12191211
let collection_partitions = if let Some(topics) = &req.topics {
1220-
self.fetch_collection_partitions(topics.iter().map(|topic| &topic.name))
1212+
self.fetch_collections(topics.iter().map(|topic| &topic.name))
12211213
.await?
12221214
.into_iter()
1223-
.map(|(topic_name, partitions)| {
1215+
.map(|(topic_name, collection)| {
12241216
self.encrypt_topic_name(topic_name)
1225-
.map(|encrypted_name| (encrypted_name, partitions))
1217+
.map(|encrypted_name| (encrypted_name, collection.partitions.len()))
12261218
})
12271219
.collect::<Result<Vec<_>, _>>()?
12281220
} else {
@@ -1350,10 +1342,10 @@ impl Session {
13501342
}
13511343
}
13521344

1353-
async fn fetch_collection_partitions(
1345+
async fn fetch_collections(
13541346
&mut self,
13551347
topics: impl IntoIterator<Item = &TopicName>,
1356-
) -> anyhow::Result<Vec<(TopicName, usize)>> {
1348+
) -> anyhow::Result<Vec<(TopicName, Collection)>> {
13571349
let auth = self
13581350
.auth
13591351
.as_mut()
@@ -1369,7 +1361,7 @@ impl Session {
13691361
let collection = Collection::new(app, auth, flow_client, topic.as_ref())
13701362
.await?
13711363
.context(format!("unable to look up partitions for {:?}", topic))?;
1372-
Ok::<(TopicName, usize), anyhow::Error>((topic.clone(), collection.partitions.len()))
1364+
Ok::<(TopicName, Collection), anyhow::Error>((topic.clone(), collection))
13731365
}))
13741366
.await
13751367
}

0 commit comments

Comments
 (0)