Skip to content

dekaf: Fix creation of upstream Kafka topics for partitioned journals #2047

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 137 additions & 35 deletions crates/dekaf/src/api_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,13 @@ impl KafkaApiClient {
#[instrument(skip_all)]
pub async fn ensure_topics(
&mut self,
topic_names: Vec<messages::TopicName>,
topics: Vec<(messages::TopicName, usize)>,
) -> anyhow::Result<()> {
let req = messages::MetadataRequest::default()
.with_topics(Some(
topic_names
topics
.iter()
.map(|name| {
.map(|(name, _)| {
messages::metadata_request::MetadataRequestTopic::default()
.with_name(Some(name.clone()))
})
Expand All @@ -472,43 +472,145 @@ impl KafkaApiClient {
let resp = coord.send_request(req, None).await?;
tracing::debug!(metadata=?resp, "Got metadata response");

if resp.topics.iter().all(|topic| {
topic
.name
.as_ref()
.map(|topic_name| topic_names.contains(topic_name) && topic.error_code == 0)
.unwrap_or(false)
}) {
return Ok(());
} else {
let mut topics_map = vec![];
for topic_name in topic_names.into_iter() {
topics_map.push(
messages::create_topics_request::CreatableTopic::default()
.with_name(topic_name)
.with_replication_factor(2)
.with_num_partitions(-1),
);
}
let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
let create_resp = coord.send_request(create_req, None).await?;
tracing::debug!(create_response=?create_resp, "Got create response");

for topic in create_resp.topics {
if topic.error_code > 0 {
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
tracing::warn!(
topic = topic.name.to_string(),
error = ?err,
message = topic.error_message.map(|m|m.to_string()),
"Failed to create topic"
let mut topics_to_update = Vec::new();
let mut topics_to_create = Vec::new();

for (topic_name, desired_partitions) in topics.iter() {
if let Some(topic) = resp
.topics
.iter()
.find(|t| t.name.as_ref() == Some(topic_name))
{
let current_partitions = topic.partitions.len();
if *desired_partitions > current_partitions {
tracing::info!(
topic = ?topic_name,
current_partitions = current_partitions,
desired_partitions = *desired_partitions,
"Increasing partition count for topic",
);
topics_to_update.push((topic_name.clone(), *desired_partitions));
} else if *desired_partitions < current_partitions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not return an error? What happens to the read from the client's perspective if this condition is met? I would think that we'd be able to just use whatever subset of partitions we need, but then I'm not sure why this is a warning 🤔

Copy link
Contributor Author

@jshearer jshearer Apr 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this really should be an error, since I can't think of any scenario where it can happen during normal operations. The idea was that if this weird circumstance happens, we should log about it but then let the original request continue through to Kafka where it will either succeed or fail based on whatever rules and state Kafka has. But thinking about it more I agree, we should fail fast here.

anyhow::bail!("Topic {} has more partitions ({}) than requested ({}), cannot decrease partition count",
topic_name.as_str(),
current_partitions,
desired_partitions
);
bail!("Failed to create topic");
}
} else {
// Topic doesn't exist, add to creation list
tracing::info!(
topic = ?topic_name,
desired_partitions = *desired_partitions,
"Creating new topic as it does not exist",
);
topics_to_create.push((topic_name.clone(), *desired_partitions));
}
}

if !topics_to_update.is_empty() {
self.increase_partition_counts(topics_to_update).await?;
}

if !topics_to_create.is_empty() {
self.create_new_topics(topics_to_create).await?;
}

Ok(())
}

#[instrument(skip_all)]
async fn increase_partition_counts(
&mut self,
topics: Vec<(messages::TopicName, usize)>,
) -> anyhow::Result<()> {
let coord = self.connect_to_controller().await?;

let mut topic_partitions = Vec::new();
for (topic_name, partition_count) in topics {
topic_partitions.push(
messages::create_partitions_request::CreatePartitionsTopic::default()
.with_name(topic_name)
.with_count(partition_count as i32)
// Let Kafka auto-assign new partitions to brokers
.with_assignments(None),
);
}

let create_partitions_req = messages::CreatePartitionsRequest::default()
.with_topics(topic_partitions)
.with_timeout_ms(30000) // This request will cause a rebalance, so it can take some time
.with_validate_only(false); // Actually perform the changes

let resp = coord.send_request(create_partitions_req, None).await?;
tracing::debug!(response = ?resp, "Got create partitions response");

for result in resp.results {
if result.error_code > 0 {
let err = kafka_protocol::ResponseError::try_from_code(result.error_code);
tracing::warn!(
topic = result.name.to_string(),
error = ?err,
message = result.error_message.map(|m| m.to_string()),
"Failed to increase partition count"
);
return Err(anyhow::anyhow!(
"Failed to increase partition count for topic {}: {:?}",
result.name.as_str(),
err
));
} else {
tracing::info!(
topic = result.name.to_string(),
"Successfully increased partition count",
);
}
}

Ok(())
Ok(())
}

#[instrument(skip_all)]
async fn create_new_topics(
&mut self,
topics: Vec<(messages::TopicName, usize)>,
) -> anyhow::Result<()> {
let coord = self.connect_to_controller().await?;

let mut topics_map = vec![];
for (topic_name, desired_partitions) in topics {
topics_map.push(
messages::create_topics_request::CreatableTopic::default()
.with_name(topic_name)
.with_replication_factor(2)
.with_num_partitions(desired_partitions as i32),
);
}

let create_req = messages::CreateTopicsRequest::default().with_topics(topics_map);
let create_resp = coord.send_request(create_req, None).await?;
tracing::debug!(create_response = ?create_resp, "Got create topics response");

for topic in create_resp.topics {
if topic.error_code > 0 {
let err = kafka_protocol::ResponseError::try_from_code(topic.error_code);
tracing::warn!(
topic = topic.name.to_string(),
error = ?err,
message = topic.error_message.map(|m| m.to_string()),
"Failed to create topic"
);
return Err(anyhow::anyhow!("Failed to create topic"));
} else {
tracing::info!(
topic = topic.name.to_string(),
"Successfully created topic with {} partitions",
topic.num_partitions
);
}
}

Ok(())
}
}

Expand Down
Loading
Loading