Skip to content

Commit 68800a0

Browse files
authored
KAFKA-19301 Move partition state classes from storage to server module (apache#20831)
Follow up to the discussion in apache#20083. Move partition state management classes from `org.apache.kafka.storage.internals.log` to `org.apache.kafka.server.partition` package. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent 907e56d commit 68800a0

20 files changed

+27
-21
lines changed

core/src/main/java/kafka/server/share/SharePartitionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.kafka.common.utils.Time;
3434
import org.apache.kafka.coordinator.group.GroupConfigManager;
3535
import org.apache.kafka.server.common.ShareVersion;
36+
import org.apache.kafka.server.partition.PartitionListener;
3637
import org.apache.kafka.server.share.CachedSharePartition;
3738
import org.apache.kafka.server.share.ShareGroupListener;
3839
import org.apache.kafka.server.share.SharePartitionKey;
@@ -57,7 +58,6 @@
5758
import org.apache.kafka.server.util.timer.SystemTimerReaper;
5859
import org.apache.kafka.server.util.timer.Timer;
5960
import org.apache.kafka.server.util.timer.TimerTask;
60-
import org.apache.kafka.storage.internals.log.PartitionListener;
6161
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
6262

6363
import org.slf4j.Logger;

core/src/main/scala/kafka/cluster/Partition.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCac
4141
import org.apache.kafka.server.common.RequestLocal
4242
import org.apache.kafka.server.log.remote.TopicPartitionLog
4343
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
44-
import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, AssignmentState, AsyncOffsetReader, CommittedPartitionState, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState, UnifiedLog, VerificationGuard}
44+
import org.apache.kafka.storage.internals.log.{AppendOrigin, AsyncOffsetReader, FetchDataInfo, LeaderHwChange, LogAppendInfo, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogReadInfo, LogStartOffsetIncrementReason, OffsetResultHolder, UnifiedLog, VerificationGuard}
4545
import org.apache.kafka.server.metrics.KafkaMetricsGroup
46+
import org.apache.kafka.server.partition.{AlterPartitionListener, AssignmentState, CommittedPartitionState, OngoingReassignmentState, PartitionListener, PartitionState, PendingExpandIsr, PendingPartitionChange, PendingShrinkIsr, SimpleAssignmentState}
4647
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
4748
import org.apache.kafka.server.replica.Replica
4849
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey

core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
2323
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
2424
import org.apache.kafka.server.ActionQueue
2525
import org.apache.kafka.server.common.RequestLocal
26+
import org.apache.kafka.server.partition.PartitionListener
2627
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
27-
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, PartitionListener, VerificationGuard}
28+
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig, VerificationGuard}
2829

2930
import java.util.concurrent.CompletableFuture
3031
import scala.collection.Map

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package kafka.server
1818

1919
import com.yammer.metrics.core.Meter
20-
import kafka.cluster.{Partition}
20+
import kafka.cluster.Partition
2121
import kafka.log.LogManager
2222
import kafka.server.HostedPartition.Online
2323
import kafka.server.QuotaFactory.QuotaManagers
@@ -56,6 +56,7 @@ import org.apache.kafka.server.config.ReplicationConfigs
5656
import org.apache.kafka.server.log.remote.storage.RemoteLogManager
5757
import org.apache.kafka.server.metrics.KafkaMetricsGroup
5858
import org.apache.kafka.server.network.BrokerEndPoint
59+
import org.apache.kafka.server.partition.PartitionListener
5960
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, DelayedRemoteFetch, DelayedRemoteListOffsets, DeleteRecordsPartitionStatus, ListOffsetsPartitionStatus, TopicPartitionOperationKey}
6061
import org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFetchPartitionKey}
6162
import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
@@ -65,7 +66,7 @@ import org.apache.kafka.server.util.timer.{SystemTimer, TimerTask}
6566
import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
6667
import org.apache.kafka.server.{ActionQueue, DelayedActionQueue, common}
6768
import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
68-
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard,PartitionListener}
69+
import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchPartitionStatus, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, LogReadResult, OffsetResultHolder, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog, VerificationGuard}
6970
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
7071

7172
import java.io.File

core/src/test/scala/unit/kafka/cluster/AbstractPartitionTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.apache.kafka.common.utils.Utils
2525
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, MockConfigRepository, PartitionRegistration}
2626
import org.apache.kafka.server.common.MetadataVersion
2727
import org.apache.kafka.server.config.ReplicationConfigs
28+
import org.apache.kafka.server.partition.AlterPartitionListener
2829
import org.apache.kafka.server.util.MockTime
2930
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
30-
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,AlterPartitionListener}
31+
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig}
3132
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
3233
import org.junit.jupiter.api.{AfterEach, BeforeEach}
3334
import org.mockito.ArgumentMatchers

core/src/test/scala/unit/kafka/cluster/AssignmentStateTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
*/
1717
package kafka.cluster
1818

19-
import org.apache.kafka.storage.internals.log.SimpleAssignmentState
2019
import org.apache.kafka.common.DirectoryId
2120
import org.apache.kafka.metadata.{LeaderRecoveryState, PartitionRegistration}
21+
import org.apache.kafka.server.partition.SimpleAssignmentState
2222
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
2323
import org.junit.jupiter.params.ParameterizedTest
2424
import org.junit.jupiter.params.provider.{Arguments, MethodSource}

core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3333
import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState, MetadataCache, MockConfigRepository, PartitionRegistration}
3434
import org.apache.kafka.server.common.{RequestLocal, TopicIdPartition}
3535
import org.apache.kafka.server.config.ReplicationConfigs
36+
import org.apache.kafka.server.partition.{AlterPartitionListener, CommittedPartitionState, PendingShrinkIsr}
3637
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams}
3738
import org.apache.kafka.server.util.MockTime
3839
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
3940
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
40-
import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, CleanerConfig, CommittedPartitionState, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, PendingShrinkIsr, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
41+
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetsListener, LogSegments, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
4142
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
4243
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
4344
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}

core/src/test/scala/unit/kafka/cluster/PartitionTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
5555
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
5656
import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager, RequestLocal}
5757
import org.apache.kafka.server.metrics.KafkaYammerMetrics
58+
import org.apache.kafka.server.partition.{AlterPartitionListener, OngoingReassignmentState, PartitionListener, PendingShrinkIsr, SimpleAssignmentState}
5859
import org.apache.kafka.server.purgatory.{DelayedDeleteRecords, DelayedOperationPurgatory, TopicPartitionOperationKey}
5960
import org.apache.kafka.server.share.fetch.DelayedShareFetchPartitionKey
6061
import org.apache.kafka.server.storage.log.{FetchIsolation, FetchParams, UnexpectedAppendOffsetException}
6162
import org.apache.kafka.server.util.{KafkaScheduler, MockTime}
6263
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints
6364
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
64-
import org.apache.kafka.storage.internals.log.{AlterPartitionListener, AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, OngoingReassignmentState, PartitionListener, PendingShrinkIsr, ProducerStateManager, ProducerStateManagerConfig, SimpleAssignmentState, UnifiedLog, VerificationGuard}
65+
import org.apache.kafka.storage.internals.log.{AppendOrigin, CleanerConfig, EpochEntry, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, LogLoader, LogOffsetMetadata, LogOffsetsListener, LogReadInfo, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, UnifiedLog, VerificationGuard}
6566
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
6667
import org.junit.jupiter.params.ParameterizedTest
6768
import org.junit.jupiter.params.provider.ValueSource

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/PartitionMakeFollowerBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,9 @@
3535
import org.apache.kafka.metadata.MetadataCache;
3636
import org.apache.kafka.metadata.MockConfigRepository;
3737
import org.apache.kafka.metadata.PartitionRegistration;
38+
import org.apache.kafka.server.partition.AlterPartitionListener;
3839
import org.apache.kafka.server.util.KafkaScheduler;
3940
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
40-
import org.apache.kafka.storage.internals.log.AlterPartitionListener;
4141
import org.apache.kafka.storage.internals.log.CleanerConfig;
4242
import org.apache.kafka.storage.internals.log.LogConfig;
4343
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;

jmh-benchmarks/src/main/java/org/apache/kafka/jmh/partition/UpdateFollowerFetchStateBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@
3131
import org.apache.kafka.metadata.MetadataCache;
3232
import org.apache.kafka.metadata.MockConfigRepository;
3333
import org.apache.kafka.metadata.PartitionRegistration;
34+
import org.apache.kafka.server.partition.AlterPartitionListener;
3435
import org.apache.kafka.server.replica.Replica;
3536
import org.apache.kafka.server.util.KafkaScheduler;
3637
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpoints;
37-
import org.apache.kafka.storage.internals.log.AlterPartitionListener;
3838
import org.apache.kafka.storage.internals.log.CleanerConfig;
3939
import org.apache.kafka.storage.internals.log.LogConfig;
4040
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;

0 commit comments

Comments
 (0)