Skip to content

Commit 832534f

Browse files
committed
dekaf: Fix creation of upstream topics for partitioned journals
We were correctly ensuring that the topics existed upstream, but were failing to create them with the correct number of partitions if the Flow collection had >1 partition. This both adds logic to fetch and specify the number of partitions when creating topics, it also adds support for increasing the number of partitions if a journal is split while already being read by Dekaf.
1 parent 4bfa4b5 commit 832534f

File tree

2 files changed

+217
-66
lines changed

2 files changed

+217
-66
lines changed

crates/dekaf/src/api_client.rs

+131-33
Original file line numberDiff line numberDiff line change
@@ -454,13 +454,13 @@ impl KafkaApiClient {
454454
#[instrument(skip_all)]
455455
pub async fn ensure_topics(
456456
&mut self,
457-
topic_names: Vec<messages::TopicName>,
457+
topic_names: Vec<(messages::TopicName, usize)>,
458458
) -> anyhow::Result<()> {
459459
let req = messages::MetadataRequest::default()
460460
.with_topics(Some(
461461
topic_names
462462
.iter()
463-
.map(|name| {
463+
.map(|(name, _)| {
464464
messages::metadata_request::MetadataRequestTopic::default()
465465
.with_name(Some(name.clone()))
466466
})
@@ -472,43 +472,141 @@ impl KafkaApiClient {
472472
let resp = coord.send_request(req, None).await?;
473473
tracing::debug!(metadata=?resp, "Got metadata response");
474474

475-
if resp.topics.iter().all(|topic| {
476-
topic
477-
.name
478-
.as_ref()
479-
.map(|topic_name| topic_names.contains(topic_name) && topic.error_code == 0)
480-
.unwrap_or(false)
481-
}) {
482-
return Ok(());
483-
} else {
484-
let mut topics_map = vec![];
485-
for topic_name in topic_names.into_iter() {
486-
topics_map.push(
487-
messages::create_topics_request::CreatableTopic::default()
488-
.with_name(topic_name)
489-
.with_replication_factor(2)
490-
.with_num_partitions(-1),
491-
);
492-
}
493-
let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
494-
let create_resp = coord.send_request(create_req, None).await?;
495-
tracing::debug!(create_response=?create_resp, "Got create response");
496-
497-
for topic in create_resp.topics {
498-
if topic.error_code > 0 {
499-
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
475+
let mut topics_to_update = Vec::new();
476+
let mut topics_to_create = Vec::new();
477+
478+
for (topic_name, desired_partitions) in topic_names.iter() {
479+
if let Some(topic) = resp
480+
.topics
481+
.iter()
482+
.find(|t| t.name.as_ref() == Some(topic_name))
483+
{
484+
let current_partitions = topic.partitions.len();
485+
if *desired_partitions > current_partitions {
486+
tracing::info!(
487+
topic = ?topic_name,
488+
current_partitions = current_partitions,
489+
desired_partitions = *desired_partitions,
490+
"Increasing partition count for topic",
491+
);
492+
topics_to_update.push((topic_name.clone(), *desired_partitions));
493+
} else if *desired_partitions < current_partitions {
500494
tracing::warn!(
501-
topic = topic.name.to_string(),
502-
error = ?err,
503-
message = topic.error_message.map(|m|m.to_string()),
504-
"Failed to create topic"
495+
topic = ?topic_name,
496+
current_partitions = topic.partitions.len(),
497+
desired_partitions = *desired_partitions,
498+
"Topic has more partitions than requested, cannot decrease partition count"
505499
);
506-
bail!("Failed to create topic");
507500
}
501+
} else {
502+
// Topic doesn't exist, add to creation list
503+
topics_to_create.push((topic_name.clone(), *desired_partitions));
504+
}
505+
}
506+
507+
if !topics_to_update.is_empty() {
508+
self.increase_partition_counts(topics_to_update).await?;
509+
}
510+
511+
if !topics_to_create.is_empty() {
512+
self.create_new_topics(topics_to_create).await?;
513+
}
514+
515+
Ok(())
516+
}
517+
518+
#[instrument(skip_all)]
519+
async fn increase_partition_counts(
520+
&mut self,
521+
topics: Vec<(messages::TopicName, usize)>,
522+
) -> anyhow::Result<()> {
523+
let coord = self.connect_to_controller().await?;
524+
525+
let mut topic_partitions = Vec::new();
526+
for (topic_name, partition_count) in topics {
527+
topic_partitions.push(
528+
messages::create_partitions_request::CreatePartitionsTopic::default()
529+
.with_name(topic_name)
530+
.with_count(partition_count as i32)
531+
// Let Kafka auto-assign new partitions to brokers
532+
.with_assignments(None),
533+
);
534+
}
535+
536+
let create_partitions_req = messages::CreatePartitionsRequest::default()
537+
.with_topics(topic_partitions)
538+
.with_timeout_ms(30000) // This requst will cause a rebalance, so it can take some time
539+
.with_validate_only(false); // Actually perform the changes
540+
541+
let resp = coord.send_request(create_partitions_req, None).await?;
542+
tracing::debug!(response = ?resp, "Got create partitions response");
543+
544+
for result in resp.results {
545+
if result.error_code > 0 {
546+
let err = kafka_protocol::ResponseError::try_from_code(result.error_code);
547+
tracing::warn!(
548+
topic = result.name.to_string(),
549+
error = ?err,
550+
message = result.error_message.map(|m| m.to_string()),
551+
"Failed to increase partition count"
552+
);
553+
return Err(anyhow::anyhow!(
554+
"Failed to increase partition count for topic {}: {:?}",
555+
result.name.as_str(),
556+
err
557+
));
558+
} else {
559+
tracing::info!(
560+
topic = result.name.to_string(),
561+
"Successfully increased partition count",
562+
);
508563
}
564+
}
509565

510-
Ok(())
566+
Ok(())
567+
}
568+
569+
#[instrument(skip_all)]
570+
async fn create_new_topics(
571+
&mut self,
572+
topics: Vec<(messages::TopicName, usize)>,
573+
) -> anyhow::Result<()> {
574+
let coord = self.connect_to_controller().await?;
575+
576+
let mut topics_map = vec![];
577+
for (topic_name, desired_partitions) in topics {
578+
topics_map.push(
579+
messages::create_topics_request::CreatableTopic::default()
580+
.with_name(topic_name)
581+
.with_replication_factor(2)
582+
.with_num_partitions(desired_partitions as i32),
583+
);
584+
}
585+
586+
let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
587+
let create_resp = coord.send_request(create_req, None).await?;
588+
tracing::debug!(create_response = ?create_resp, "Got create topics response");
589+
590+
for topic in create_resp.topics {
591+
if topic.error_code > 0 {
592+
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
593+
tracing::warn!(
594+
topic = topic.name.to_string(),
595+
error = ?err,
596+
message = topic.error_message.map(|m| m.to_string()),
597+
"Failed to create topic"
598+
);
599+
return Err(anyhow::anyhow!("Failed to create topic"));
600+
} else {
601+
tracing::info!(
602+
topic = topic.name.to_string(),
603+
"Successfully created topic with {} partitions",
604+
topic.num_partitions
605+
);
606+
}
511607
}
608+
609+
Ok(())
512610
}
513611
}
514612

crates/dekaf/src/session.rs

+86-33
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{
66
};
77
use anyhow::{bail, Context};
88
use bytes::{BufMut, Bytes, BytesMut};
9+
use futures::TryFutureExt;
910
use kafka_protocol::{
1011
error::{ParseResponseErrorCode, ResponseError},
1112
messages::{
@@ -1103,13 +1104,21 @@ impl Session {
11031104
#[instrument(skip_all, fields(group=?req.group_id))]
11041105
pub async fn offset_commit(
11051106
&mut self,
1106-
req: messages::OffsetCommitRequest,
1107+
mut req: messages::OffsetCommitRequest,
11071108
header: RequestHeader,
11081109
) -> anyhow::Result<messages::OffsetCommitResponse> {
1109-
let mut mutated_req = req.clone();
1110-
for topic in &mut mutated_req.topics {
1110+
let collection_partitions = self
1111+
.fetch_collection_partitions(req.topics.iter().map(|topic| &topic.name))
1112+
.await?
1113+
.into_iter()
1114+
.map(|(topic_name, partitions)| {
1115+
self.encrypt_topic_name(topic_name)
1116+
.map(|encrypted_name| (encrypted_name, partitions))
1117+
})
1118+
.collect::<Result<Vec<_>, _>>()?;
1119+
1120+
for topic in &mut req.topics {
11111121
let encrypted = self.encrypt_topic_name(topic.name.clone())?;
1112-
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, "Committing offset");
11131122
topic.name = encrypted;
11141123
}
11151124

@@ -1119,17 +1128,9 @@ impl Session {
11191128
.connect_to_group_coordinator(req.group_id.as_str())
11201129
.await?;
11211130

1122-
client
1123-
.ensure_topics(
1124-
mutated_req
1125-
.topics
1126-
.iter()
1127-
.map(|t| t.name.to_owned())
1128-
.collect(),
1129-
)
1130-
.await?;
1131+
client.ensure_topics(collection_partitions).await?;
11311132

1132-
let mut resp = client.send_request(mutated_req, Some(header)).await?;
1133+
let mut resp = client.send_request(req.clone(), Some(header)).await?;
11331134

11341135
let auth = self
11351136
.auth
@@ -1142,48 +1143,65 @@ impl Session {
11421143
let auth = self.auth.as_ref().unwrap();
11431144

11441145
for topic in resp.topics.iter_mut() {
1145-
topic.name = self.decrypt_topic_name(topic.name.to_owned())?;
1146+
let encrypted_name = topic.name.clone();
1147+
let decrypted_name = self.decrypt_topic_name(topic.name.to_owned())?;
11461148

11471149
let collection_partitions = Collection::new(
11481150
&self.app,
11491151
auth,
11501152
&flow_client.pg_client(),
1151-
topic.name.as_str(),
1153+
decrypted_name.as_str(),
11521154
)
11531155
.await?
11541156
.context(format!("unable to look up partitions for {:?}", topic.name))?
11551157
.partitions;
11561158

11571159
for partition in &topic.partitions {
11581160
if let Some(error) = partition.error_code.err() {
1159-
tracing::warn!(topic=?topic.name,partition=partition.partition_index,?error,"Got error from upstream Kafka when trying to commit offsets");
1161+
tracing::warn!(
1162+
topic = decrypted_name.as_str(),
1163+
partition = partition.partition_index,
1164+
?error,
1165+
"Got error from upstream Kafka when trying to commit offsets"
1166+
);
11601167
} else {
11611168
let journal_name = collection_partitions
11621169
.get(partition.partition_index as usize)
11631170
.context(format!(
1164-
"unable to find partition {} in collection {:?}",
1165-
partition.partition_index, topic.name
1171+
"unable to find collection partition idx {} in collection {:?}",
1172+
partition.partition_index,
1173+
decrypted_name.as_str()
11661174
))?
11671175
.spec
11681176
.name
11691177
.to_owned();
11701178

1171-
let committed_offset = req
1179+
let partitions = &req
11721180
.topics
11731181
.iter()
1174-
.find(|req_topic| req_topic.name == topic.name)
1175-
.context(format!("unable to find topic in request {:?}", topic.name))?
1176-
.partitions
1182+
.find(|req_topic| req_topic.name == encrypted_name)
1183+
.context(format!(
1184+
"unable to find topic in request {:?}",
1185+
decrypted_name.as_str()
1186+
))?
1187+
.partitions;
1188+
1189+
let committed_offset = partitions
11771190
.get(partition.partition_index as usize)
11781191
.context(format!(
1179-
"unable to find partition {}",
1180-
partition.partition_index
1192+
"unable to find topic partition idx {} in topic {:?}. It has: {:?}. Flow has: {:?}",
1193+
partition.partition_index,
1194+
decrypted_name.as_str(),
1195+
partitions,
1196+
collection_partitions
11811197
))?
11821198
.committed_offset;
11831199

11841200
metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64);
11851201
}
11861202
}
1203+
1204+
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, "Committed offset");
11871205
}
11881206

11891207
Ok(resp)
@@ -1192,11 +1210,23 @@ impl Session {
11921210
#[instrument(skip_all, fields(group=?req.group_id))]
11931211
pub async fn offset_fetch(
11941212
&mut self,
1195-
req: messages::OffsetFetchRequest,
1213+
mut req: messages::OffsetFetchRequest,
11961214
header: RequestHeader,
11971215
) -> anyhow::Result<messages::OffsetFetchResponse> {
1198-
let mut mutated_req = req.clone();
1199-
if let Some(ref mut topics) = mutated_req.topics {
1216+
let collection_partitions = if let Some(topics) = &req.topics {
1217+
self.fetch_collection_partitions(topics.iter().map(|topic| &topic.name))
1218+
.await?
1219+
.into_iter()
1220+
.map(|(topic_name, partitions)| {
1221+
self.encrypt_topic_name(topic_name)
1222+
.map(|encrypted_name| (encrypted_name, partitions))
1223+
})
1224+
.collect::<Result<Vec<_>, _>>()?
1225+
} else {
1226+
vec![]
1227+
};
1228+
1229+
if let Some(ref mut topics) = req.topics {
12001230
for topic in topics {
12011231
topic.name = self.encrypt_topic_name(topic.name.clone())?;
12021232
}
@@ -1208,12 +1238,11 @@ impl Session {
12081238
.connect_to_group_coordinator(req.group_id.as_str())
12091239
.await?;
12101240

1211-
if let Some(ref topics) = mutated_req.topics {
1212-
client
1213-
.ensure_topics(topics.iter().map(|t| t.name.to_owned()).collect())
1214-
.await?;
1241+
if !collection_partitions.is_empty() {
1242+
client.ensure_topics(collection_partitions).await?;
12151243
}
1216-
let mut resp = client.send_request(mutated_req, Some(header)).await?;
1244+
1245+
let mut resp = client.send_request(req, Some(header)).await?;
12171246

12181247
for topic in resp.topics.iter_mut() {
12191248
topic.name = self.decrypt_topic_name(topic.name.to_owned())?;
@@ -1318,6 +1347,30 @@ impl Session {
13181347
}
13191348
}
13201349

1350+
async fn fetch_collection_partitions(
1351+
&mut self,
1352+
topics: impl IntoIterator<Item = &TopicName>,
1353+
) -> anyhow::Result<Vec<(TopicName, usize)>> {
1354+
let auth = self
1355+
.auth
1356+
.as_mut()
1357+
.ok_or(anyhow::anyhow!("Session not authenticated"))?;
1358+
1359+
let app = &self.app;
1360+
let flow_client = &auth.flow_client(app).await?.pg_client();
1361+
1362+
// Re-declare here to drop mutable reference
1363+
let auth = self.auth.as_ref().unwrap();
1364+
1365+
futures::future::try_join_all(topics.into_iter().map(|topic| async move {
1366+
let collection = Collection::new(app, auth, flow_client, topic.as_ref())
1367+
.await?
1368+
.context(format!("unable to look up partitions for {:?}", topic))?;
1369+
Ok::<(TopicName, usize), anyhow::Error>((topic.clone(), collection.partitions.len()))
1370+
}))
1371+
.await
1372+
}
1373+
13211374
/// If the fetched offset is within a fixed number of offsets from the end of the journal,
13221375
/// return Some with a PartitionOffset containing the beginning and end of the latest fragment.
13231376
#[tracing::instrument(skip(self))]

0 commit comments

Comments
 (0)