Skip to content

Commit e63ea70

Browse files
committed
dekaf: Fix creation of upstream topics for partitioned journals
We were correctly ensuring that the topics existed upstream, but were failing to create them with the correct number of partitions if the Flow collection had >1 partition. This both adds logic to fetch and specify the number of partitions when creating topics, it also adds support for increasing the number of partitions if a journal is split while already being read by Dekaf.
1 parent 6a53136 commit e63ea70

File tree

2 files changed

+227
-68
lines changed

2 files changed

+227
-68
lines changed

crates/dekaf/src/api_client.rs

+136-33
Original file line numberDiff line numberDiff line change
@@ -454,13 +454,13 @@ impl KafkaApiClient {
454454
#[instrument(skip_all)]
455455
pub async fn ensure_topics(
456456
&mut self,
457-
topic_names: Vec<messages::TopicName>,
457+
topic_names: Vec<(messages::TopicName, usize)>,
458458
) -> anyhow::Result<()> {
459459
let req = messages::MetadataRequest::default()
460460
.with_topics(Some(
461461
topic_names
462462
.iter()
463-
.map(|name| {
463+
.map(|(name, _)| {
464464
messages::metadata_request::MetadataRequestTopic::default()
465465
.with_name(Some(name.clone()))
466466
})
@@ -472,43 +472,146 @@ impl KafkaApiClient {
472472
let resp = coord.send_request(req, None).await?;
473473
tracing::debug!(metadata=?resp, "Got metadata response");
474474

475-
if resp.topics.iter().all(|topic| {
476-
topic
477-
.name
478-
.as_ref()
479-
.map(|topic_name| topic_names.contains(topic_name) && topic.error_code == 0)
480-
.unwrap_or(false)
481-
}) {
482-
return Ok(());
483-
} else {
484-
let mut topics_map = vec![];
485-
for topic_name in topic_names.into_iter() {
486-
topics_map.push(
487-
messages::create_topics_request::CreatableTopic::default()
488-
.with_name(topic_name)
489-
.with_replication_factor(2)
490-
.with_num_partitions(-1),
491-
);
492-
}
493-
let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
494-
let create_resp = coord.send_request(create_req, None).await?;
495-
tracing::debug!(create_response=?create_resp, "Got create response");
496-
497-
for topic in create_resp.topics {
498-
if topic.error_code > 0 {
499-
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
475+
let mut topics_to_update = Vec::new();
476+
let mut topics_to_create = Vec::new();
477+
478+
for (topic_name, desired_partitions) in topic_names.iter() {
479+
if let Some(topic) = resp
480+
.topics
481+
.iter()
482+
.find(|t| t.name.as_ref() == Some(topic_name))
483+
{
484+
let current_partitions = topic.partitions.len();
485+
if *desired_partitions > current_partitions {
486+
tracing::info!(
487+
topic = ?topic_name,
488+
current_partitions = current_partitions,
489+
desired_partitions = *desired_partitions,
490+
"Increasing partition count for topic",
491+
);
492+
topics_to_update.push((topic_name.clone(), *desired_partitions));
493+
} else if *desired_partitions < current_partitions {
500494
tracing::warn!(
501-
topic = topic.name.to_string(),
502-
error = ?err,
503-
message = topic.error_message.map(|m|m.to_string()),
504-
"Failed to create topic"
495+
topic = ?topic_name,
496+
current_partitions = topic.partitions.len(),
497+
desired_partitions = *desired_partitions,
498+
"Topic has more partitions than requested, cannot decrease partition count"
505499
);
506-
bail!("Failed to create topic");
507500
}
501+
} else {
502+
// Topic doesn't exist, add to creation list
503+
tracing::info!(
504+
topic = ?topic_name,
505+
desired_partitions = *desired_partitions,
506+
"Creating new topic as it does not exist",
507+
);
508+
topics_to_create.push((topic_name.clone(), *desired_partitions));
508509
}
510+
}
511+
512+
if !topics_to_update.is_empty() {
513+
self.increase_partition_counts(topics_to_update).await?;
514+
}
515+
516+
if !topics_to_create.is_empty() {
517+
self.create_new_topics(topics_to_create).await?;
518+
}
509519

510-
Ok(())
520+
Ok(())
521+
}
522+
523+
#[instrument(skip_all)]
524+
async fn increase_partition_counts(
525+
&mut self,
526+
topics: Vec<(messages::TopicName, usize)>,
527+
) -> anyhow::Result<()> {
528+
let coord = self.connect_to_controller().await?;
529+
530+
let mut topic_partitions = Vec::new();
531+
for (topic_name, partition_count) in topics {
532+
topic_partitions.push(
533+
messages::create_partitions_request::CreatePartitionsTopic::default()
534+
.with_name(topic_name)
535+
.with_count(partition_count as i32)
536+
// Let Kafka auto-assign new partitions to brokers
537+
.with_assignments(None),
538+
);
511539
}
540+
541+
let create_partitions_req = messages::CreatePartitionsRequest::default()
542+
.with_topics(topic_partitions)
543+
.with_timeout_ms(30000) // This requst will cause a rebalance, so it can take some time
544+
.with_validate_only(false); // Actually perform the changes
545+
546+
let resp = coord.send_request(create_partitions_req, None).await?;
547+
tracing::debug!(response = ?resp, "Got create partitions response");
548+
549+
for result in resp.results {
550+
if result.error_code > 0 {
551+
let err = kafka_protocol::ResponseError::try_from_code(result.error_code);
552+
tracing::warn!(
553+
topic = result.name.to_string(),
554+
error = ?err,
555+
message = result.error_message.map(|m| m.to_string()),
556+
"Failed to increase partition count"
557+
);
558+
return Err(anyhow::anyhow!(
559+
"Failed to increase partition count for topic {}: {:?}",
560+
result.name.as_str(),
561+
err
562+
));
563+
} else {
564+
tracing::info!(
565+
topic = result.name.to_string(),
566+
"Successfully increased partition count",
567+
);
568+
}
569+
}
570+
571+
Ok(())
572+
}
573+
574+
#[instrument(skip_all)]
575+
async fn create_new_topics(
576+
&mut self,
577+
topics: Vec<(messages::TopicName, usize)>,
578+
) -> anyhow::Result<()> {
579+
let coord = self.connect_to_controller().await?;
580+
581+
let mut topics_map = vec![];
582+
for (topic_name, desired_partitions) in topics {
583+
topics_map.push(
584+
messages::create_topics_request::CreatableTopic::default()
585+
.with_name(topic_name)
586+
.with_replication_factor(2)
587+
.with_num_partitions(desired_partitions as i32),
588+
);
589+
}
590+
591+
let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
592+
let create_resp = coord.send_request(create_req, None).await?;
593+
tracing::debug!(create_response = ?create_resp, "Got create topics response");
594+
595+
for topic in create_resp.topics {
596+
if topic.error_code > 0 {
597+
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
598+
tracing::warn!(
599+
topic = topic.name.to_string(),
600+
error = ?err,
601+
message = topic.error_message.map(|m| m.to_string()),
602+
"Failed to create topic"
603+
);
604+
return Err(anyhow::anyhow!("Failed to create topic"));
605+
} else {
606+
tracing::info!(
607+
topic = topic.name.to_string(),
608+
"Successfully created topic with {} partitions",
609+
topic.num_partitions
610+
);
611+
}
612+
}
613+
614+
Ok(())
512615
}
513616
}
514617

0 commit comments

Comments
 (0)