From 68c707d6e88746efffcec4c96ef188d4c2c4c4cd Mon Sep 17 00:00:00 2001 From: majialong Date: Thu, 28 May 2026 23:13:31 +0800 Subject: [PATCH] KAFKA-19298: Move RuntimeLoggerManager to server module --- .../kafka/server/ConfigAdminManager.scala | 3 +-- .../scala/kafka/server/ControllerApis.scala | 2 +- .../server/logger/RuntimeLoggerManager.java | 20 +++++++++++---- .../logger/RuntimeLoggerManagerTest.java | 25 +++++++++++++++++-- 4 files changed, 40 insertions(+), 10 deletions(-) rename {core/src/main/java => server/src/main/java/org/apache}/kafka/server/logger/RuntimeLoggerManager.java (89%) rename {core/src/test/java => server/src/test/java/org/apache}/kafka/server/logger/RuntimeLoggerManagerTest.java (80%) diff --git a/core/src/main/scala/kafka/server/ConfigAdminManager.scala b/core/src/main/scala/kafka/server/ConfigAdminManager.scala index 1b0215316019e..68210276e564f 100644 --- a/core/src/main/scala/kafka/server/ConfigAdminManager.scala +++ b/core/src/main/scala/kafka/server/ConfigAdminManager.scala @@ -16,8 +16,6 @@ */ package kafka.server -import kafka.server.logger.RuntimeLoggerManager - import java.util import java.util.Properties import kafka.utils._ @@ -37,6 +35,7 @@ import org.apache.kafka.common.requests.ApiError import org.apache.kafka.common.resource.{Resource, ResourceType} import org.apache.kafka.metadata.ConfigRepository import org.apache.kafka.server.config.AbstractKafkaConfig +import org.apache.kafka.server.logger.RuntimeLoggerManager import org.slf4j.{Logger, LoggerFactory} import scala.collection.{Map, Seq} diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index ef82b0cfb843c..2d2d5970b9308 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture import java.util.function.Consumer import kafka.network.RequestChannel import kafka.server.QuotaFactory.QuotaManagers -import kafka.server.logger.RuntimeLoggerManager import kafka.utils.Logging import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType} import org.apache.kafka.common.Uuid.ZERO_UUID @@ -58,6 +57,7 @@ import org.apache.kafka.security.DelegationTokenManager import org.apache.kafka.server.{ApiVersionManager, ProcessRole} import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal} +import org.apache.kafka.server.logger.RuntimeLoggerManager import org.apache.kafka.server.quota.ControllerMutationQuota import scala.jdk.CollectionConverters._ diff --git a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java b/server/src/main/java/org/apache/kafka/server/logger/RuntimeLoggerManager.java similarity index 89% rename from core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java rename to server/src/main/java/org/apache/kafka/server/logger/RuntimeLoggerManager.java index 3cb226e06867c..ceb611285278f 100644 --- a/core/src/main/java/kafka/server/logger/RuntimeLoggerManager.java +++ b/server/src/main/java/org/apache/kafka/server/logger/RuntimeLoggerManager.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package kafka.server.logger; +package org.apache.kafka.server.logger; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.config.LogLevelConfig; @@ -25,7 +25,6 @@ import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.server.logger.LoggingController; import org.slf4j.Logger; @@ -49,7 +48,7 @@ public class RuntimeLoggerManager { private final int nodeId; private final Logger log; - public RuntimeLoggerManager(int nodeId, Logger log) { + public RuntimeLoggerManager(int nodeId, Logger log) { this.nodeId = nodeId; this.log = log; } @@ -73,7 +72,12 @@ void alterLogLevelConfigs(Collection ops) { ops.forEach(op -> { String loggerName = op.name(); String logLevel = op.value(); - switch (OpType.forId(op.configOperation())) { + OpType opType = OpType.forId(op.configOperation()); + if (opType == null) { + throw new IllegalArgumentException( + "Invalid log4j configOperation: " + op.configOperation()); + } + switch (opType) { case SET: if (LoggingController.logLevel(loggerName, logLevel)) { log.warn("Updated the log level of {} to {}", loggerName, logLevel); @@ -118,7 +122,13 @@ void validateLoggerNameExists(String loggerName) { void validateLogLevelConfigs(Collection ops) { ops.forEach(op -> { String loggerName = op.name(); - switch (OpType.forId(op.configOperation())) { + OpType opType = OpType.forId(op.configOperation()); + if (opType == null) { + throw new InvalidRequestException("Unknown operation type " + + (int) op.configOperation() + " is not allowed for the " + + BROKER_LOGGER + " resource"); + } + switch (opType) { case SET: validateLoggerNameExists(loggerName); String logLevel = op.value(); diff --git a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java b/server/src/test/java/org/apache/kafka/server/logger/RuntimeLoggerManagerTest.java similarity index 80% rename from core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java rename to server/src/test/java/org/apache/kafka/server/logger/RuntimeLoggerManagerTest.java index b5c8740639c2f..f5d687feb4fc9 100644 --- a/core/src/test/java/kafka/server/logger/RuntimeLoggerManagerTest.java +++ b/server/src/test/java/org/apache/kafka/server/logger/RuntimeLoggerManagerTest.java @@ -14,14 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package kafka.server.logger; +package org.apache.kafka.server.logger; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; import org.apache.kafka.common.errors.InvalidConfigurationException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig; -import org.apache.kafka.server.logger.LoggingController; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -67,6 +66,28 @@ public void testOperationNotAllowed(byte id) { setValue("TRACE")))).getMessage()); } + @Test + public void testUnknownOperationNotAllowed() { + byte unknownOperation = 99; + assertEquals("Unknown operation type 99 is not allowed for the BROKER_LOGGER resource", + Assertions.assertThrows(InvalidRequestException.class, + () -> MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig(). + setName(LOG.getName()). + setConfigOperation(unknownOperation). + setValue("TRACE")))).getMessage()); + } + + @Test + public void testAlterUnknownOperationNotAllowed() { + byte unknownOperation = 99; + assertEquals("Invalid log4j configOperation: 99", + Assertions.assertThrows(IllegalArgumentException.class, + () -> MANAGER.alterLogLevelConfigs(List.of(new AlterableConfig(). + setName(LOG.getName()). + setConfigOperation(unknownOperation). + setValue("TRACE")))).getMessage()); + } + @Test public void testValidateBogusLogLevelNameNotAllowed() { assertEquals("Cannot set the log level of " + LOG.getName() + " to BOGUS as it is not " +