Skip to content

Conversation

@mimaison
Copy link
Member

@mimaison mimaison commented May 19, 2025

  • Rename KafkaMetadataLog to KafkaRaftLog
  • Make raft depend on storage
  • Remove storage dependency on metadata as this otherwise create a
    cyclic dependency

@github-actions github-actions bot added core Kafka Broker kraft storage Pull requests that target the storage module build Gradle build or GitHub Actions labels May 19, 2025
@dengziming
Copy link
Member

There are some concerns regarding this change, though we haven't made kraft a public API, we are trying to keep it dependency-free to be used as a raft-sdk anywhere. For example, if you are implementing a system with metadata changing frequently, you can directly copy kraft code into your project.
This assumption also has some benefits to the whole Kafka architecture, this change reduced dependency complexity but breaked this assumption, we should evaluate which goal is more important.

@mimaison
Copy link
Member Author

we are trying to keep it dependency-free to be used as a raft-sdk anywhere

Where has this been discussed?
The way Kafka implements Raft is via a topic so our implementation of ReplicatedLog needs to depend on storage. Since metadata depends on raft, the other option is to put KafkaMetadataLog into the metadata module and have that depend on storage instead of raft.

@dengziming
Copy link
Member

dengziming commented May 22, 2025

Where has this been discussed?

In KIP-595 we wrote:

We are also trying to pave the way for normal partition replication through Raft as well as eventually supporting metadata sharding with multiple partitions.

We are only trying to pave the way for Kafka normal partition replication instead of making it SDK exportable, I may have thought too much about it, but the idea behind it still deserves consideration, for example, etcd-raft(https://github.com/etcd-io/raft) also follows a similar principle to not include a concrete log implementation.
Moving it to metadata module is better compared putting it in raft module, and we should not rename it to KafkaRaftLog since we can use raft to replicate normal partition.

@mimaison
Copy link
Member Author

I'm not sure I interpret that statement from KIP-595 as an intent to keep the raft module dependency free. As you said the raft module is not public API, and I've not seen any discussions in that direction.

The metadata module mostly consists of the data structures written to the metadata log. To me it seems the logic of maintaining the metadata log as a topic fits better in the raft module, as described in https://issues.apache.org/jira/browse/KAFKA-15599.

@dengziming
Copy link
Member

@mimaison That makes sense to me, 👍 if we should make the tradeoff, the modularity outweighs the pluggability of the raft module, @jsancio @ijuma, where do you think is better to put KafkaMetadataLog.

@ijuma
Copy link
Member

ijuma commented May 23, 2025

The description I wrote in the ticket was the conclusion after a discussion with a few people including @jsancio and @cmccabe. @jsancio does that approach still make sense to you?

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class KafkaRaftLog implements ReplicatedLog {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JIRA ticket also talked about renaming ReplicatedLog to RaftLog. Do we want to do that separately or we think it's not a good idea? cc @jsancio

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think RaftLog is a nicer name. I can do it in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@dengziming dengziming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR, I left some comments, PTAL.

deleted = deletedSegments != 0 || !forgottenSnapshots.isEmpty();
}
}
removeSnapshots(forgottenSnapshots, reason);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we move this inside of synchronized block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right this can be moved outside of the synchronized block

)
);
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's a newcomer's question, is UncheckedIOException better than IOException?

Copy link
Member

@ijuma ijuma May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UncheckedIOException is unchecked while IOException is checked. However, it's risky to make this change as you have to check that every caller handles the new exception and the compiler will not help you - the way we did it for other classes was not to change exception types during conversion from Scala to Java. And to have a separate JIRA for investigating this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I used UncheckedIOException is that none of the method definitions in RaftLog are marked as throwing exceptions. None of the calling logic, mostly in KafkaRaftClient, has logic to handle checked exceptions.

So any exception throw will be passed back up the call stack to KafkaRadftClientDriver.doWork() that has catches Throwable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps it's better to wrap this in KafkaException to distinguish it from other UncheckedIOException? Ditto in a few other places.

@Override
public Optional<OffsetAndEpoch> earliestSnapshotId() {
synchronized (snapshots) {
try {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to match latestSnapshotId()

try {
OffsetAndEpoch epoch = snapshots.lastKey();
return epoch == null ? Optional.empty() : Optional.of(epoch);
} catch (NoSuchElementException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about changing this NoSuchElementException catching code to if snapshots.isEmpty()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea

for (OffsetAndEpoch key : snapshots.keySet()) {
Optional<RawSnapshotReader> snapshotReader = readSnapshot(key);
snapshotReader.ifPresent(fileRawSnapshotReader -> {
snapshots.put(key, Optional.of((FileRawSnapshotReader) snapshotReader.get()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we put it here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, it shouldn't be there. The call to readSnapshot() will have opened the reader and updated snapshots already.

int nodeId) throws IOException {
Properties props = new Properties();
props.setProperty(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, String.valueOf(config.maxBatchSizeInBytes()));
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes()));
Copy link
Member

@brandboat brandboat May 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a reminder — we recently introduced some changes in KafkaMetadataLog (see bcda92b#diff-b332f85b04775c821226b6f704e91d51f9647f29ba73dace65b99cf36f6b9cea)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the heads-up!

Copy link
Member

@dengziming dengziming left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All lgtm.

@mimaison mimaison requested a review from junrao June 25, 2025 08:45
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison : Thanks for the PR. Left a few comments.

this.snapshots = snapshots;
this.topicPartition = topicPartition;
this.config = config;
this.logIdent = "[MetadataLog partition=" + topicPartition + ", nodeId=" + nodeId + "] ";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change MetadataLog to RaftLog?

} else {
props.setProperty(TopicConfig.SEGMENT_BYTES_CONFIG, String.valueOf(config.logSegmentBytes()));
}
props.setProperty(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, String.valueOf(ServerLogConfigs.LOG_DELETE_DELAY_MS_DEFAULT));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are missing the following.

props.setProperty(TopicConfig.SEGMENT_MS_CONFIG, config.logSegmentMillis.toString)

nodeId
);

// When recovering, truncate fully if the latest snapshot is after the log end offset. This can happen to a follower
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was the following validation removed?

    if (defaultLogConfig.segmentSize() < config.logSegmentBytes()) {
      metadataLog.error(s"Overriding ${MetadataLogConfig.INTERNAL_METADATA_LOG_SEGMENT_BYTES_CONFIG} is only supported for testing. Setting " +
        s"this value too low may lead to an inability to write batches of metadata records.")
    }


for (SnapshotPath snapshotPath : snapshotsToDelete) {
Files.deleteIfExists(snapshotPath.path());
LOG.info("Deleted unneeded snapshot file with path {}", snapshotPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add the partition level indent ?

String reason(OffsetAndEpoch snapshotId);
}

static class RetentionMsBreach implements SnapshotDeletionReason {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we change it to record? Ditto for RetentionSizeBreach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed all the SnapshotDeletionReason implementations to record

@Test
public void testConfig() throws IOException {
Properties props = new Properties();
props.put("process.roles", "broker");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move KRaftConfigs to the raft module so that we could use the defined config names?

mockTime.sleep(config.internalDeleteDelayMillis());
// Assert that the log dir doesn't contain any older snapshots
Files
.walk(logDir, 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be merged with the previous line?

OffsetAndEpoch snapshotId1 = new OffsetAndEpoch(1000, 1);
RawSnapshotWriter snapshot = log.createNewSnapshotUnchecked(snapshotId1).get();
append(snapshot, 500);
snapshot.freeze();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close snapshot as before? Ditto below.

}

@Test
public void testSegmentsLessThanLatestSnapshot() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we skip testSegmentMsConfigIsSetInMetadataLog() in the old test ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was added after I opened the PR. I must have missed it while rebasing.

* @param log The log
* @return true if the topic partition should not exist on the broker, false otherwise.
*/
public static boolean isStrayReplica(TopicsImage newTopicsImage, int brokerId, Optional<Uuid> topicId, int partitionId, String log) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be put in metadata.util?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasons I put it here are that:

  • it's a single method related to TopicsImage
  • the TopicsImageTest class has utility methods that helped simplify the test logic

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this static if the first argument is TopicsImage? This looks a lot like an object method where the this reference is the TopicsImage.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was static mostly for historical reasons as I just copied it from LogManager but you're right since the first argument is a TopicsImage the new method I added is now not static.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison : Thanks for the updated PR. A few more comments. Also, there are compilation errors.

FileRawSnapshotReader fileReader = FileRawSnapshotReader.open(log.dir().toPath(), snapshotId);
snapshots.put(snapshotId, Optional.of(fileReader));
return Optional.of(fileReader);
} catch (UncheckedIOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following path is only for NoSuchFileException. For other IOExceptions, we should propagate to the caller.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which method do you see throwing NoSuchFileException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to https://stackoverflow.com/questions/63638250/nio-channels-filechannel-open-threw-nosuchfileexception, FileChannel.open() could throw NoSuchFileException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, going down the whole call stack, I see that UnixFileSystemProvider uses UnixException.translateToIOException() and that maps ENOENT to NoSuchFileException.

I wonder if the existing code is broken as FileRawSnapshotReader.open() wrap the IOException into UncheckedIOException so the current catch block should be not entered.

I made the change and added a test case checking NoSuchFileException. However I can't find a way to reliably force another type of IOException to check these are thrown.

)
);
} catch (IOException ioe) {
throw new KafkaException(ioe);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that existing code like Snapshots and FileSnapshotReader convert IOException to UncheckedIOException. It's probably better to follow that convention for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the whole raft module pretty much only uses UncheckedIOException.

public void testIsStrayReplicaWithEmptyImage() {
TopicsImage image = topicsImage(List.of());
List<TopicIdPartition> onDisk = List.of(FOO_0, FOO_1, BAR_0, BAR_1, BAZ_0);
assertTrue(onDisk.stream().allMatch(log ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this hasn't been addressed?

mockTime.sleep(config.internalDeleteDelayMillis());
// Assert that the log dir doesn't contain any older snapshots
Files.walk(logDir, 1)
.map(Snapshots::parse)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This indentation is different from the one in line 628.

// Simulate log cleanup that advances the LSO
log.log().maybeIncrementLogStartOffset(snapshotId.offset() - 1, LogStartOffsetIncrementReason.SegmentDeletion);

assertEquals(Optional.empty(), log.createNewSnapshot(new OffsetAndEpoch(snapshotId.offset() - 2, snapshotId.epoch())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close the created snapshot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect createNewSnapshot() to return Optional.empty(), so there should be nothing to close.

log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords));
createNewSnapshot(log, snapshotId);

assertEquals(Optional.empty(), log.createNewSnapshot(snapshotId),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we close the created snapshot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect createNewSnapshot() to return Optional.empty(), so there should be nothing to close.

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison : Thanks for the updated PR. Just a comment based on Jose's feedback. Also, could you resolve the conflicts?

Copy link
Member

@jsancio jsancio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes I still need to review the new KafkaRaftLog files but here are some early comments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking that we should make this internal by moving the implementation to o.a.k.r.internals.KafkaRaftLog.java. The same of the accompanying test suite file.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved

<allow class="org.apache.kafka.common.compress.Compression" exact-match="true" />
<allow pkg="org.apache.kafka.common.config" />
<allow pkg="org.apache.kafka.common.feature" />
<allow pkg="org.apache.kafka.common.internals" />
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that to me this means that the common module is not organized correctly if the raft module needs types in the "internals" namespace. Same comment applies to the change below which also includes o.a.k.s.internals.log.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internals simply means it should not be used outside the kafka repo. This is more explicit than the uncommon "check the javadoc" to check if the class is public or internal.

* @param log The log
* @return true if the topic partition should not exist on the broker, false otherwise.
*/
public static boolean isStrayReplica(TopicsImage newTopicsImage, int brokerId, Optional<Uuid> topicId, int partitionId, String log) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why make this static if the first argument is TopicsImage? This looks a lot like an object method where the this reference is the TopicsImage.

Comment on lines 142 to 149
if (topicId.isEmpty()) {
// Missing topic ID could result from storage failure or unclean shutdown after topic creation but before flushing
// data to the `partition.metadata` file. And before appending data to the log, the `partition.metadata` is always
// flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as
// a stray log.
LOG.info("The topicId does not exist in {}, treat it as a stray log.", log);
return true;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about letting the log manager make this decision? In Kafka 4.0 all topics must have a topic id.

Comment on lines 151 to 106
PartitionRegistration partition = newTopicsImage.getPartition(topicId.get(), partitionId);
if (partition == null) {
LOG.info("Found stray log dir {}: the topicId {} does not exist in the metadata image.", log, topicId);
return true;
} else {
List<Integer> replicas = Arrays.stream(partition.replicas).boxed().toList();
if (!replicas.contains(brokerId)) {
LOG.info("Found stray log dir {}: the current replica assignment {} does not contain the local brokerId {}.",
log, replicas.stream().map(String::valueOf).collect(Collectors.joining(", ", "[", "]")), brokerId);
return true;
} else {
return false;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this should be a functionality of the log manager. Maybe the TopicsImage method should just return all of the topic ids that don't exist in the given image and broker.

This would allow you to remove that added static logger.

logManager.startup(
metadataCache.getAllTopics().asScala,
isStray = log => JLogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
isStray = log => TopicsImage.isStrayReplica(newImage.topics(), brokerId, log.topicId(), log.topicPartition().partition(), log.toString)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor but to me stray partition are in the log manager not in the topics image. Meaning the log manager has partition entries that are not in the latest topics image.

In some sense the log manager understand topics image and makes sure that they match. The topics images doesn't know anything about "stray partitions" and the log manager.

If you still want to move the functionality TopicsImage maybe make it a method (not static) with Stream<TopicIdPartition> deletedPartitionsForReplica(int brokerId, Stream<TopicIdPartition>).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced a more generic method in TopicsImage called partitionReplicas(), and moved the logic specific to stray partitions back to LogManager.

@mimaison
Copy link
Member Author

mimaison commented Aug 6, 2025

@jsancio Can you take another look? Thanks

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimaison : Thanks for the updated PR. A few more minor comments.

if (reader == null) {
return Optional.empty();
} else if (reader.isPresent()) {
return Optional.of(reader.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just return reader here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm rewrapping it here as the generic type does not match. reader is a Optional<FileRawSnapshotReader> while we need to return Optional<RawSnapshotReader>

metadataCache.getAllTopics().asScala,
isStray = log => JLogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)
isStray = log => {
if (log.topicId().isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we keep this part in JLogManager.isStrayReplica()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I kept this here is because I wanted to keep the else block here since it's using newImage which we're trying not to use in JLogManager

// flushed to disk. So if the topic ID is missing, it mostly means no data was appended, and we can treat this as
// a stray log.
LOG.info("The topicId does not exist in {}, treat it as a stray log.", log);
public static boolean isStrayReplica(List<Integer> replicas, int brokerId, Uuid topicId, UnifiedLog log) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no need to pass in topicId since it comes from log.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Fixed

@mimaison
Copy link
Member Author

mimaison commented Sep 2, 2025

Rebased on trunk, @jsancio @junrao can you take another look? Thanks

@mimaison
Copy link
Member Author

mimaison commented Sep 9, 2025

Rebased on trunk to resolve conflicts

@mimaison
Copy link
Member Author

Rebased once more. @jsancio @junrao can you take another look? Thanks

@mimaison
Copy link
Member Author

@jsancio @junrao I keep rebasing this each week. Quick reminder, can you take another look? Thanks

@mimaison
Copy link
Member Author

mimaison commented Dec 3, 2025

@jsancio @junrao Do you want to take another look? Or should I go ahead and merge it?

@mimaison mimaison merged commit bf91c07 into apache:trunk Dec 16, 2025
20 checks passed
@mimaison mimaison deleted the kafka-15599-4 branch December 16, 2025 12:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

build Gradle build or GitHub Actions clients connect core Kafka Broker kraft mirror-maker-2 performance storage Pull requests that target the storage module tools

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants