Skip to content

Commit bdfe7b3

Browse files
committed
KAFKA-19817 Move DynamicTopicClusterQuotaPublisher to metadata module
1 parent 05999c7 commit bdfe7b3

File tree

23 files changed

+136
-89
lines changed

23 files changed

+136
-89
lines changed

checkstyle/import-control-metadata.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@
167167
<allow pkg="org.apache.kafka.server.common" />
168168
<allow pkg="org.apache.kafka.server.fault" />
169169
<allow pkg="org.apache.kafka.server.config" />
170+
<allow pkg="org.apache.kafka.server.quota" />
170171
<allow pkg="org.apache.kafka.server.util"/>
171172
<allow pkg="org.apache.kafka.test" />
172173
<subpackage name="authorizer">

core/src/main/java/kafka/server/QuotaFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.kafka.common.metrics.Metrics;
2222
import org.apache.kafka.common.utils.Time;
2323
import org.apache.kafka.common.utils.Utils;
24+
import org.apache.kafka.metadata.publisher.QuotaManagersProvider;
2425
import org.apache.kafka.server.config.ClientQuotaManagerConfig;
2526
import org.apache.kafka.server.config.QuotaConfig;
2627
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig;
@@ -58,7 +59,7 @@ public record QuotaManagers(ClientQuotaManager fetch,
5859
ReplicationQuotaManager leader,
5960
ReplicationQuotaManager follower,
6061
ReplicationQuotaManager alterLogDirs,
61-
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) {
62+
Optional<Plugin<ClientQuotaCallback>> clientQuotaCallbackPlugin) implements QuotaManagersProvider {
6263

6364
public void shutdown() {
6465
fetch.shutdown();

core/src/main/scala/kafka/network/RequestChannel.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ import org.apache.kafka.common.network.Send
3131
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
3232
import org.apache.kafka.common.requests._
3333
import org.apache.kafka.common.utils.Time
34-
import org.apache.kafka.network.Session
3534
import org.apache.kafka.network.metrics.{RequestChannelMetrics, RequestMetrics}
3635
import org.apache.kafka.server.common.RequestLocal
3736
import org.apache.kafka.server.metrics.KafkaMetricsGroup
3837
import org.apache.kafka.network.RequestConvertToJson
38+
import org.apache.kafka.server.network.Session
3939

4040
import scala.jdk.CollectionConverters._
4141
import scala.jdk.OptionConverters.RichOption

core/src/main/scala/kafka/server/BrokerServer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator, ShareCoordinatorRec
4242
import org.apache.kafka.coordinator.transaction.ProducerIdManager
4343
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
4444
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, MetadataVersionConfigValidator}
45-
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
45+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
4646
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
4747
import org.apache.kafka.server.authorizer.Authorizer
4848
import org.apache.kafka.server.common.{ApiMessageAndVersion, DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
@@ -494,7 +494,7 @@ class BrokerServer(
494494
),
495495
new DynamicTopicClusterQuotaPublisher(
496496
clusterId,
497-
config,
497+
config.nodeId,
498498
sharedServer.metadataPublishingFaultHandler,
499499
"broker",
500500
quotaManagers,

core/src/main/scala/kafka/server/ControllerServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
2222
import kafka.server.QuotaFactory.QuotaManagers
2323

2424
import scala.collection.immutable
25-
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, DynamicTopicClusterQuotaPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
25+
import kafka.server.metadata.{ClientQuotaMetadataManager, DynamicConfigPublisher, KRaftMetadataCache, KRaftMetadataCachePublisher}
2626
import kafka.utils.{CoreUtils, Logging}
2727
import org.apache.kafka.common.internals.Plugin
2828
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,7 +38,7 @@ import org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
3838
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
3939
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
4040
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
41-
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, FeaturesPublisher, ScramPublisher}
41+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, FeaturesPublisher, ScramPublisher}
4242
import org.apache.kafka.raft.QuorumConfig
4343
import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
4444
import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
@@ -346,7 +346,7 @@ class ControllerServer(
346346
// Set up the DynamicTopicClusterQuotaPublisher. This will enable quotas for the cluster and topics.
347347
metadataPublishers.add(new DynamicTopicClusterQuotaPublisher(
348348
clusterId,
349-
config,
349+
config.nodeId,
350350
sharedServer.metadataPublishingFaultHandler,
351351
"controller",
352352
quotaManagers,

core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfig
3333
import org.apache.kafka.image.loader.LoaderManifest
3434
import org.apache.kafka.image.publisher.MetadataPublisher
3535
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
36-
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, ScramPublisher}
36+
import org.apache.kafka.metadata.publisher.{AclPublisher, DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicTopicClusterQuotaPublisher, ScramPublisher}
3737
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
3838
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal, ShareVersion}
3939
import org.apache.kafka.server.fault.FaultHandler

core/src/main/scala/kafka/server/metadata/DynamicTopicClusterQuotaPublisher.scala

Lines changed: 0 additions & 72 deletions
This file was deleted.

core/src/test/scala/unit/kafka/server/BaseClientQuotaManagerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ import org.apache.kafka.common.requests.FetchRequest.PartitionData
2929
import org.apache.kafka.common.requests.{AbstractRequest, FetchRequest, RequestContext, RequestHeader}
3030
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
3131
import org.apache.kafka.common.utils.MockTime
32-
import org.apache.kafka.network.Session
3332
import org.apache.kafka.network.metrics.RequestChannelMetrics
33+
import org.apache.kafka.server.network.Session
3434
import org.apache.kafka.server.quota.{ClientQuotaManager, ThrottleCallback}
3535
import org.junit.jupiter.api.AfterEach
3636
import org.mockito.Mockito.mock

core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import org.apache.kafka.common.internals.Plugin
2222
import org.apache.kafka.common.metrics.Quota
2323
import org.apache.kafka.common.security.auth.KafkaPrincipal
2424
import org.apache.kafka.server.config.ClientQuotaManagerConfig
25-
import org.apache.kafka.network.Session
25+
import org.apache.kafka.server.network.Session
2626
import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, ClientQuotaManager, ClientQuotaType, QuotaType}
2727
import org.junit.jupiter.api.Assertions._
2828
import org.junit.jupiter.api.Test

core/src/test/scala/unit/kafka/server/ControllerApisTest.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,12 @@ import org.apache.kafka.controller.{Controller, ControllerRequestContext, Result
5454
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
5555
import org.apache.kafka.network.SocketServerConfigs
5656
import org.apache.kafka.network.metrics.RequestChannelMetrics
57-
import org.apache.kafka.network.Session
5857
import org.apache.kafka.raft.{QuorumConfig, RaftManager}
5958
import org.apache.kafka.server.SimpleApiVersionManager
6059
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
6160
import org.apache.kafka.server.common.{ApiMessageAndVersion, FinalizedFeatures, KRaftVersion, MetadataVersion, ProducerIdsBlock, RequestLocal}
6261
import org.apache.kafka.server.config.{KRaftConfigs, ServerConfigs}
62+
import org.apache.kafka.server.network.Session
6363
import org.apache.kafka.server.quota.{ClientQuotaManager, ControllerMutationQuota, ControllerMutationQuotaManager}
6464
import org.apache.kafka.server.util.FutureUtils
6565
import org.apache.kafka.storage.internals.log.CleanerConfig

0 commit comments

Comments
 (0)