-
Notifications
You must be signed in to change notification settings - Fork 65
dekaf: Fix creation of upstream Kafka topics for partitioned journals #2047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
057d268
to
c4d1a1d
Compare
After some back and forth, reading from all partitions of a partitioned journal is tested working in Also confirmed that after splitting a partition, we get
And then readers start committing offsets for both partitions
|
d5fbd10
to
0919748
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM % avoiding the extra API calls, if possible.
"Increasing partition count for topic", | ||
); | ||
topics_to_update.push((topic_name.clone(), *desired_partitions)); | ||
} else if *desired_partitions < current_partitions { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return an error? What happens to the read from the client's perspective if this condition is met? I would think that we'd be able to just use whatever subset of partitions we need, but then I'm not sure why this is a warning 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this really should be an error, since I can't think of any scenario where it can happen during normal operations. The idea was that if this weird circumstance happens, we should log about it but then let the original request continue through to Kafka where it will either succeed or fail based on whatever rules and state Kafka has. But thinking about it more I agree, we should fail fast here.
crates/dekaf/src/session.rs
Outdated
))? | ||
.committed_offset; | ||
|
||
metrics::gauge!("dekaf_committed_offset", "group_id"=>req.group_id.to_string(),"journal_name"=>journal_name).set(committed_offset as f64); | ||
} | ||
} | ||
|
||
tracing::info!(topic_name = ?topic.name, partitions = ?topic.partitions, "Committed offset"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be helpful to include the actual offsets in this log, maybe just as an array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the log so it logs offset per partition -- I don't think this will make logs any noisier than gathering up the array, and it's arguably easier to read than parsing an array out of a single log line. Most of the journals have a single partition anyway
@psFried thanks for the feedback! I believe I addressed it, thoughts? I'll squash the fixup commit before merging, but I left it for easier review |
We were correctly ensuring that the topics existed upstream, but were failing to create them with the correct number of partitions if the Flow collection had >1 partition. This both adds logic to fetch and specify the number of partitions when creating topics, it also adds support for increasing the number of partitions if a journal is split while already being read by Dekaf.
9e7d9c6
to
80e305f
Compare
Moving the log stuff to a separate PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description:
We were correctly ensuring that the topics existed upstream, but were failing to create them with the correct number of partitions if the Flow collection had >1 partition.
This both adds logic to fetch and specify the number of partitions when creating topics, it also adds support for increasing the number of partitions if a journal is split while already being read by Dekaf.
Note:
This PR also contains a fix for another subtle issue I discovered: log messages emitted immediately before Session closure (such as the log containing the error that caused a Session to crash) didn't seem to be getting delivered. I also hypothesize that some messages could have gotten dropped if multiple were emitted very quickly, but I didn't find evidence of that.It turns out there was a bug where we were passing the call toappend_logs_to_writer
as one of the futures in atokio::select!
call, causing it to get cancelled if another future resolved before it did, such as the message stream closing. This call didn't need to be in theselect
in the first place, so moving it out solved the problem.Moving to a separate PR
This change is