-
Notifications
You must be signed in to change notification settings - Fork 86
dekaf: Support collection reset and materialization binding backfill via Kafka leader epochs #2497
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
c3ac061 to
9851e34
Compare
5b9ca90 to
e9f3c44
Compare
e9f3c44 to
b6f79f4
Compare
|
Alright @jgraettinger, I've done a couple rounds of self-review here and I think it's time to get your feedback. I've tested this with kcat and Tinybird in all the scenarios I can think of: "old" consumer upgrading to epochs then backfilling, "new" consumer backfilling, real collection reset, starting from non-zero backfill counters, etc. Before deploying I also plan to test with SingleStore and Clickhouse, just to see how the handle epoch changes. I acknowledge that this feels like adding even more complexity to some already pretty complicated logic.. as we've talked about, my plan is to do a refactor of this codebase to split it up into much more manageable components, right after I write an e2e test suite so I can have some confidence that I'm not breaking behavior in subtle ways. |
jgraettinger
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
crates/dekaf/src/session.rs
Outdated
| let response = collections | ||
| .into_iter() | ||
| .map(|(topic_name, offsets)| { | ||
| .map(|(topic_name, maybe_current_epoch, offsets)| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When is maybe_current_epoch None (vs, say, Some(0))?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None = binding not found, Some(0) = binding exists and has a backfill counter of 0. Now that you mention it, I don't love the return type of fetch_topic_offsets, I'm thinking it should be an enum rather than some combination of Options.
| current_epoch = collection.binding_backfill_counter, | ||
| "Consumer epoch is stale, skipping read start" | ||
| ); | ||
| // Remove stale pending read if it exists. Error will be returned during poll phase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this action-at-a-distance is confusing / hard to follow. Is it possible to fail faster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is, but not without a refactor that I don't want to do in this PR. Fundamentally, reading/fetching is an operation over futures: a fetch request for a new topic+partition results in a new Read, which progresses asynchronously and can be polled to see whether it has any data to return or not. In some circumstances, a read will short-circuit into an error state before even starting, or it will resolve into an error state after starting. All of this maps nicely onto the concept of a future, and is in fact the pattern that the implementation is currently using, even though we don't call it that.
I would like to implement a ReadManager that breaks this down into logic around starting new reads (all of the pre-validation that's currently done in fetch()), and logic around polling existing reads and dealing with their statuses. It looks something like this:
pub enum ReadError {
Fenced { current_epoch: i32 },
UnknownEpoch { current_epoch: i32 },
EpochChangedDuringRead { old_epoch: i32, new_epoch: i32 },
CollectionNotFound,
PartitionNotFound,
TaskRedirected,
SessionIsPreviewOnly,
Suspended,
}
pub struct ReadData {
pub batch: Option<Bytes>,
pub high_watermark: i64,
pub last_stable_offset: i64,
pub leader_epoch: i32,
}
pub enum ReadResult {
Data(ReadData),
Error(ReadError),
Pending,
}
struct PendingRead {
started_at: Instant,
offset: i64,
leader_epoch: i32,
handle: AbortOnDropHandle<anyhow::Result<(Read, BatchResult)>>,
}
struct CompletedRead {
read: Read,
batch: BatchResult,
leader_epoch: i32,
}
pub struct ReadManager {
pending: HashMap<(TopicName, i32), PendingRead>,
completed: HashMap<(TopicName, i32), CompletedRead>,
}
impl ReadManager{
pub async fn start_reads(
&mut self,
requests: &[PartitionRequest]
) -> Result<()> {
...
}
pub async fn poll_reads(
&mut self,
requests: &[PartitionRequest],
timeout: Duration
) -> Vec<ReadResult> { // Index-aligned with `requests`
...
}
}But given the current complexity, I don't want to do that refactor before I have some tests that prove that the existing behavior doesn't regress with the refactor.
crates/dekaf/src/session.rs
Outdated
|
|
||
| // Re-fetch collection to check if epoch changed during the read | ||
| let auth = self.auth.as_ref().unwrap(); | ||
| let Some(collection) = Collection::new(&auth, &key.0).await? else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could we simplify by bubbling up a deleted journal as terminal, and then check epoch on the next session ?
my sense is that once we've validated the current journals and reader are on the same epoch, we shouldn't be checking again unless something exceptional happens (e.x. a read journal is deleted out from under us, or we hit a maximum interval / jwt expiration we'll process for under the current configuration, etc).
a9ceedf to
830a78f
Compare
…ion reset
When a collection is reset or a materialization binding is backfilled, consumers
need to detect that their committed offsets are invalid. This maps the binding's
backfill counter to Kafka's leader epoch mechanism:
* Emit `leader_epoch` in Metadata and ListOffsets responses
* Validate consumer epoch in Fetch and ListOffsets, returning `FENCED_LEADER_EPOCH`
for stale epochs and `UNKNOWN_LEADER_EPOCH` for future epochs
* Implement `OffsetForLeaderEpoch` API - returns offset 0 for old epochs (reset to
beginning) and current high watermark for current epoch
* Append `-e{counter}` suffix to upstream topic names for offset isolation by epoch
Also isolate committed offsets by task name and clean up legacy offsets.
Previously, all topics sharing the same token would commit offsets to the same
upstream Kafka topic name, creating potential conflicts across tasks/tenants.
* Swap encryption nonce from token to task_name when epoch suffix is present
* Clean up oldoffsets after successful epoch-qualified commit, only after the new commit succeeds
This shows up sometimes, I believe attributed to eventual consistency in the upstream Kafka brokers. It should be retried and not result in the session crashing.
2f34760 to
5d38214
Compare
There was a confusing behavior where the task manager would serve 0 journals for a deleted collection until the `SPEC_TTL` expired.
Problem
Dekaf does not currently handle materialization backfills correctly. This means that when a collection is reset via dataflow reset (which is now the default capture backfill mode), that topic gets into a broken state. Specifically, the consumer doesn't realize that its offsets have been invalidated, and keeps trying to read from the previously committed offset, which likely is beyond the new collection's write head. As such, no more data will show up in the Dekaf topic until the new collection sees at least as much data as was written to the previous collection.
In order to fix this, as well as allow for "regular" Dekaf bindings (topics) to be backfilled, we need to figure out a way to signal to consumers that their committed offsets are invalid, and they should start over from the beginning of the new collection.
Background: Kafka leader epochs
Kafka uses leader epochs to handle partition leadership changes and log truncation. When leadership changes, the epoch increments. Consumers include their expected epoch in fetch requests, which they discover as part of the topic metadata. If the provided epoch is old, the broker returns
FENCED_LEADER_EPOCH. Consumers then callOffsetForLeaderEpochwith the previous epoch value to determine where the old epoch ended, compare to their position, and reset if needed.We already have a value that conveniently maps to this concept: a materialization binding's backfill counter. The control-plane already ensures that when a collection is reset, all bindings referring to it also get backfilled, so "all" we have to do is map the binding's backfill counter to Kafka's leader epoch!
Implementation
Metadata
In order for us to be able to compare the consumer's leader epoch against the current spec's binding backfill counter, it needs to be told about the epoch that corresponds with its discovered metadata. As such, we emit the
current_leader_epochinMetadataandListOffsetsresponses.Consumers cache epoch with offsets:
(offset, leader_epoch).ListOffsetsalso validatescurrent_leader_epochfrom the request and returnsFENCED_LEADER_EPOCHif stale.Fetching
Consumers include
current_leader_epochin fetch requests. We validate this against the current spec's backfill counter:If
request.current_leader_epoch < collection.binding_backfill_counterReturn
FENCED_LEADER_EPOCHalong with the now-current leader epoch. Consumer then callsOffsetForLeaderEpochwith its old offset, gets a new high-watermark of 0 which looks like a truncation (since it is) and triggers its defaultauto.offset.resetbehavior (eitherearliestorlatestdepending on whether its configured to start from the beginning, or only tail recent events).TODO: Actually test with a consumer set to
latestand see what it does.If
request.current_leader_epoch == collection.binding_backfill_counterContinue to serve the the fetch.
If
request.current_leader_epoch > collection.binding_backfill_counterError with
UNKNOWN_LEADER_EPOCH. This causes the consumer to do a metadata refresh.OffsetForLeaderEpochAfter receiving
FENCED_LEADER_EPOCH, consumers callOffsetForLeaderEpochto discover where their previous epoch ended. They then start reading from the new epoch starting at this offset. In our case, we want the consumers to start over from the beginning, so if the requested epoch is less than the current binding backfill counter, we advertise 0 here.TODO: Should we really fetch the journal's earliest available offset and advertise that instead of 0? I didn't test with journals that are missing early fragments..
Committed offset segmentation
Normally, Kafka brokers store and return the correct committed offsets by leader epoch. But since we're co-opting this feature for our own use, we need to deal with storing and fetching the correct committed offset for a given epoch (binding backfill counter). We do this by appending the backfill counter to the group ID we use when proxying requests to upstream Kafka for
JoinGroup,SyncGroup,OffsetCommit,OffsetFetch, etc.to_upstream_topic_name(name, secret, token, Some(backfill_counter))Produces epoch-qualified keys:
<encrypted>-e0<encrypted>-e1Upgrade Path
Committed offset isolation
Previously, all topics for tasks sharing the same token would commit offsets to the same upstream Kafka topic name. This created the possibility for conflicts when multiple bindings across different tasks and tenants used the same topic name and token.
Since we're already introducing a group ID upgrade path here, we might as well also kill this bird by swapping the nonce to the task name from the token. This closes #2083
Migration Flow
First fetch after upgrade:
OffsetFetchwith task_name + epoch, gets empty responseOffsetFetchfalls back and tries previous topic name, finds old offsetsOffsetFetchtries task_name + epoch and finds new offsets, no longer falls back.Closes #2376