Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions core/src/main/java/kafka/server/builders/LogManagerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,13 @@ public LogManagerBuilder setInitialTaskDelayMs(long initialTaskDelayMs) {
}

public LogManager build() {
if (logDirs == null) throw new RuntimeException("you must set logDirs");
if (configRepository == null) throw new RuntimeException("you must set configRepository");
if (initialDefaultConfig == null) throw new RuntimeException("you must set initialDefaultConfig");
if (cleanerConfig == null) throw new RuntimeException("you must set cleanerConfig");
if (scheduler == null) throw new RuntimeException("you must set scheduler");
if (brokerTopicStats == null) throw new RuntimeException("you must set brokerTopicStats");
if (logDirFailureChannel == null) throw new RuntimeException("you must set logDirFailureChannel");
if (logDirs == null) throw new IllegalStateException("you must set logDirs");
if (configRepository == null) throw new IllegalStateException("you must set configRepository");
if (initialDefaultConfig == null) throw new IllegalStateException("you must set initialDefaultConfig");
if (cleanerConfig == null) throw new IllegalStateException("you must set cleanerConfig");
if (scheduler == null) throw new IllegalStateException("you must set scheduler");
if (brokerTopicStats == null) throw new IllegalStateException("you must set brokerTopicStats");
if (logDirFailureChannel == null) throw new IllegalStateException("you must set logDirFailureChannel");
return new LogManager(CollectionConverters.asScala(logDirs).toSeq(),
CollectionConverters.asScala(initialOfflineDirs).toSeq(),
configRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ public ReplicaManagerBuilder setBrokerTopicStats(BrokerTopicStats brokerTopicSta

public ReplicaManager build() {
if (config == null) config = new KafkaConfig(Map.of());
if (logManager == null) throw new RuntimeException("You must set logManager");
if (metadataCache == null) throw new RuntimeException("You must set metadataCache");
if (logDirFailureChannel == null) throw new RuntimeException("You must set logDirFailureChannel");
if (alterPartitionManager == null) throw new RuntimeException("You must set alterIsrManager");
if (logManager == null) throw new IllegalStateException("You must set logManager");
if (metadataCache == null) throw new IllegalStateException("You must set metadataCache");
if (logDirFailureChannel == null) throw new IllegalStateException("You must set logDirFailureChannel");
if (alterPartitionManager == null) throw new IllegalStateException("You must set alterIsrManager");
if (brokerTopicStats == null) brokerTopicStats = new BrokerTopicStats(config.remoteLogManagerConfig().isRemoteStorageSystemEnabled());
// Initialize metrics in the end just before passing it to ReplicaManager to ensure ReplicaManager closes the
// metrics correctly. There might be a resource leak if it is initialized and an exception occurs between
Expand Down