Skip to content

KAFKA-17573: Move KRaftMetadataCache to metadata module #19232

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.kafka.coordinator.share.metrics.{ShareCoordinatorMetrics, Shar
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRecordSerde, ShareCoordinatorService}
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, KRaftMetadataCache}
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.logger.RuntimeLoggerManager
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
import org.apache.kafka.common.Uuid.ZERO_UUID
Expand All @@ -53,7 +52,7 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply}
import org.apache.kafka.metadata.{BrokerHeartbeatReply, BrokerRegistrationReply, KRaftMetadataCache}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager, ProcessRole}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers

import scala.collection.immutable
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.server.metadata.{ClientQuotaMetadataManager, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCachePublisher, ScramPublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.network.ListenerName
Expand All @@ -34,7 +34,7 @@ import org.apache.kafka.common.{ClusterResource, Endpoint, Uuid}
import org.apache.kafka.controller.metrics.{ControllerMetadataMetricsPublisher, QuorumControllerMetrics}
import org.apache.kafka.controller.{Controller, QuorumController, QuorumFeatures}
import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, MetadataPublisher}
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.{KafkaConfigSchema, KRaftMetadataCache, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.server.common.RequestLocal
import org.apache.kafka.server.fault.FaultHandler
Expand Down
477 changes: 0 additions & 477 deletions core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package kafka.server.metadata

import org.apache.kafka.image.{MetadataDelta, MetadataImage}

import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.metadata.KRaftMetadataCache

class KRaftMetadataCachePublisher(
val metadataCache: KRaftMetadataCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import kafka.network.RequestChannel;
import kafka.server.AuthHelper;
import kafka.server.KafkaConfig;
import kafka.server.metadata.KRaftMetadataCache;
import kafka.utils.TestUtils;

import org.apache.kafka.common.Uuid;
Expand Down Expand Up @@ -56,6 +55,7 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
import org.apache.kafka.metadata.KRaftMetadataCache;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.network.metrics.RequestChannelMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package kafka.server

import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.{TopicPartition, Uuid}
Expand All @@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.server.util.{MockScheduler, MockTime}
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.metadata.MetadataCache
import org.apache.kafka.server.config.ReplicationConfigs
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
import org.mockito.ArgumentMatchers
Expand All @@ -43,7 +43,6 @@ import java.lang.{Long => JLong}
import java.nio.ByteBuffer
import java.util.Optional
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch, Semaphore}
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.DelayedShareFetch
import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common.compress.Compression
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/cluster/ReplicaTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package kafka.cluster

import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{LogOffsetMetadata, UnifiedLog}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertThrows, assertTrue}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/network/ProcessorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package kafka.network

import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.errors.{InvalidRequestException, UnsupportedVersionException}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.RequestHeaderData
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{RequestHeader, RequestTestUtils}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager, SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, KRaftVersion, MetadataVersion}
import org.junit.jupiter.api.Assertions.{assertThrows, assertTrue}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package kafka.server
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.common.Uuid.ZERO_UUID
import org.apache.kafka.common.acl.AclOperation
Expand Down Expand Up @@ -52,6 +51,7 @@ import org.apache.kafka.common.{ElectionType, Uuid}
import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT
import org.apache.kafka.controller.{Controller, ControllerRequestContext, ResultOrError}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.network.metrics.RequestChannelMetrics
import org.apache.kafka.raft.QuorumConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
*/
package kafka.server

import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.clients.NodeApiVersions
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metadata.FeatureLevelRecord
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.{BrokerFeatures, DefaultApiVersionManager}
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion}
import org.junit.jupiter.api.Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import org.junit.jupiter.api._
import org.junit.jupiter.api.Assertions._
import kafka.utils.TestUtils
import kafka.cluster.Partition
import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.SimpleRecord
import org.apache.kafka.metadata.MockConfigRepository
import org.apache.kafka.metadata.{KRaftMetadataCache, MockConfigRepository}
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogDirFailureChannel}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import java.util.Properties
import kafka.cluster.Partition
import kafka.log.LogManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils.MockAlterPartitionManager
import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.config.ReplicationConfigs
Expand Down
3 changes: 1 addition & 2 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kafka.cluster.Partition
import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.SharePartitionManager
import kafka.utils.{CoreUtils, Logging, LoggingController, TestUtils}
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
Expand Down Expand Up @@ -82,7 +81,7 @@ import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorTestConfig}
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache, MockConfigRepository}
import org.apache.kafka.metadata.{ConfigRepository, KRaftMetadataCache, MetadataCache, MockConfigRepository}
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.authorizer.AclEntry
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package kafka.server

import kafka.server.metadata.KRaftMetadataCache
import org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, BrokerEndpointCollection}
import org.apache.kafka.common.metadata._
Expand All @@ -26,7 +25,7 @@ import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{DirectoryId, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataProvenance}
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache}
import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState, MetadataCache}
import org.apache.kafka.server.common.KRaftVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
Expand Down Expand Up @@ -278,7 +277,7 @@ class MetadataCacheTest {
assertEquals(0, partitionMetadata.partitionIndex)
assertEquals(expectedError.code, partitionMetadata.errorCode)
assertFalse(partitionMetadata.isrNodes.isEmpty)
assertEquals(List(0), partitionMetadata.replicaNodes.asScala)
assertEquals(util.List.of(0), partitionMetadata.replicaNodes)
}

@ParameterizedTest
Expand Down Expand Up @@ -815,6 +814,7 @@ class MetadataCacheTest {
checkTopicMetadata(topic0, Set(1, 2), resultTopic.partitions().asScala)

// With start index and quota reached
System.out.println("here")
response = metadataCache.describeTopicResponse(util.List.of(topic0, topic1).iterator, listenerName, t => if (t.equals(topic0)) 2 else 0, 1, false)
result = response.topics().asScala.toList
assertEquals(1, result.size)
Expand Down Expand Up @@ -909,7 +909,7 @@ class MetadataCacheTest {
setLeader(partition.replicas.get(0)).setIsr(partition.replicas)))
val cache = new KRaftMetadataCache(1, () => KRaftVersion.KRAFT_VERSION_0)
cache.setImage(delta.apply(MetadataProvenance.EMPTY))
val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)).asScala.head
val topicMetadata = cache.getTopicMetadata(util.Set.of("foo"), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), false, false).asScala.head
topicMetadata.partitions().asScala.map(p => (p.partitionIndex(), p.offlineReplicas())).toMap
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kafka.log.LogManager
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.ReplicaAlterLogDirsThread.ReassignmentState
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition
Expand All @@ -30,6 +29,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import kafka.log.LogManager
import kafka.server.AbstractFetcherThread.ResultWithPartitions
import kafka.server.QuotaFactory.UNBOUNDED_QUOTA
import kafka.server.epoch.util.MockBlockingSender
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.compress.Compression
Expand All @@ -34,6 +33,7 @@ import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBat
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion, OffsetAndEpoch}
import org.apache.kafka.server.network.BrokerEndPoint
import org.apache.kafka.storage.internals.log.{LogAppendInfo, UnifiedLog}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util
import java.util.concurrent.{CompletableFuture, Executors, LinkedBlockingQueue, TimeUnit}
import java.util.{Optional, Properties}
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{CoreUtils, Logging, TestUtils}
import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionChangeRecord, PartitionRecord, RegisterBrokerRecord, TopicRecord}
Expand All @@ -34,7 +33,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.{DirectoryId, IsolationLevel, TopicPartition, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, MockConfigRepository}
import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderAndIsr, LeaderRecoveryState, MetadataCache, MockConfigRepository}
import org.apache.kafka.metadata.PartitionRegistration
import org.apache.kafka.metadata.storage.Formatter
import org.apache.kafka.raft.QuorumConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@ import java.util.{Collections, Optional, Properties}
import kafka.cluster.{Partition, PartitionTest}
import kafka.log.LogManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils._
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.record.{MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.metadata.{KRaftMetadataCache, LeaderRecoveryState}
import org.apache.kafka.server.common.KRaftVersion
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.kafka.server.log.remote.quota.RLMQuotaManagerConfig.INACTIVE_S
import org.apache.kafka.server.log.remote.quota.RLMQuotaMetrics
import kafka.server.QuotaFactory.{QuotaManagers, UNBOUNDED_QUOTA}
import kafka.server.epoch.util.MockBlockingSender
import kafka.server.metadata.KRaftMetadataCache
import kafka.server.share.{DelayedShareFetch, SharePartition}
import kafka.utils.TestUtils.waitUntilTrue
import kafka.utils.{Pool, TestUtils}
Expand Down Expand Up @@ -55,6 +54,7 @@ import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{LogContext, Time, Utils}
import org.apache.kafka.coordinator.transaction.{AddPartitionsToTxnConfig, TransactionLogConfig}
import org.apache.kafka.image._
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
import org.apache.kafka.metadata.{LeaderAndIsr, MetadataCache}
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.io.File
import kafka.log.LogManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server._
import kafka.server.metadata.KRaftMetadataCache
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.{OffsetForLeaderPartition, OffsetForLeaderTopic}
Expand All @@ -29,6 +28,7 @@ import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.RecordBatch
import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET}
import org.apache.kafka.metadata.KRaftMetadataCache
import org.apache.kafka.server.common.{KRaftVersion, OffsetAndEpoch}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{LogDirFailureChannel, UnifiedLog}
Expand Down
Loading