Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/src/main/scala/kafka/server/ConfigAdminManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package kafka.server

import kafka.server.logger.RuntimeLoggerManager

import java.util
import java.util.Properties
import kafka.utils._
Expand All @@ -37,6 +35,7 @@ import org.apache.kafka.common.requests.ApiError
import org.apache.kafka.common.resource.{Resource, ResourceType}
import org.apache.kafka.metadata.ConfigRepository
import org.apache.kafka.server.config.AbstractKafkaConfig
import org.apache.kafka.server.logger.RuntimeLoggerManager
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.{Map, Seq}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture
import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.server.QuotaFactory.QuotaManagers
import kafka.server.logger.RuntimeLoggerManager
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{AlterConfigOp, EndpointType}
import org.apache.kafka.common.Uuid.ZERO_UUID
Expand Down Expand Up @@ -58,6 +57,7 @@ import org.apache.kafka.security.DelegationTokenManager
import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
import org.apache.kafka.server.logger.RuntimeLoggerManager
import org.apache.kafka.server.quota.ControllerMutationQuota

import scala.jdk.CollectionConverters._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package kafka.server.logger;
package org.apache.kafka.server.logger;

import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.config.LogLevelConfig;
Expand All @@ -25,7 +25,6 @@
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.server.logger.LoggingController;

import org.slf4j.Logger;

Expand All @@ -49,7 +48,7 @@ public class RuntimeLoggerManager {
private final int nodeId;
private final Logger log;

public RuntimeLoggerManager(int nodeId, Logger log) {
public RuntimeLoggerManager(int nodeId, Logger log) {
this.nodeId = nodeId;
this.log = log;
}
Expand All @@ -73,7 +72,12 @@ void alterLogLevelConfigs(Collection<AlterableConfig> ops) {
ops.forEach(op -> {
String loggerName = op.name();
String logLevel = op.value();
switch (OpType.forId(op.configOperation())) {
OpType opType = OpType.forId(op.configOperation());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OpType.forId returns null for unknown ids, so report a clear error here instead of throwing an NPE.

if (opType == null) {
throw new IllegalArgumentException(
"Invalid log4j configOperation: " + op.configOperation());
}
switch (opType) {
case SET:
if (LoggingController.logLevel(loggerName, logLevel)) {
log.warn("Updated the log level of {} to {}", loggerName, logLevel);
Expand Down Expand Up @@ -118,7 +122,13 @@ void validateLoggerNameExists(String loggerName) {
void validateLogLevelConfigs(Collection<AlterableConfig> ops) {
ops.forEach(op -> {
String loggerName = op.name();
switch (OpType.forId(op.configOperation())) {
OpType opType = OpType.forId(op.configOperation());
if (opType == null) {
throw new InvalidRequestException("Unknown operation type " +
(int) op.configOperation() + " is not allowed for the " +
BROKER_LOGGER + " resource");
}
switch (opType) {
case SET:
validateLoggerNameExists(loggerName);
String logLevel = op.value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server.logger;
package org.apache.kafka.server.logger;

import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.AlterConfigOp.OpType;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterableConfig;
import org.apache.kafka.server.logger.LoggingController;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -67,6 +66,28 @@ public void testOperationNotAllowed(byte id) {
setValue("TRACE")))).getMessage());
}

@Test
public void testUnknownOperationNotAllowed() {
byte unknownOperation = 99;
assertEquals("Unknown operation type 99 is not allowed for the BROKER_LOGGER resource",
Assertions.assertThrows(InvalidRequestException.class,
() -> MANAGER.validateLogLevelConfigs(List.of(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(unknownOperation).
setValue("TRACE")))).getMessage());
}

@Test
public void testAlterUnknownOperationNotAllowed() {
byte unknownOperation = 99;
assertEquals("Invalid log4j configOperation: 99",
Assertions.assertThrows(IllegalArgumentException.class,
() -> MANAGER.alterLogLevelConfigs(List.of(new AlterableConfig().
setName(LOG.getName()).
setConfigOperation(unknownOperation).
setValue("TRACE")))).getMessage());
}

@Test
public void testValidateBogusLogLevelNameNotAllowed() {
assertEquals("Cannot set the log level of " + LOG.getName() + " to BOGUS as it is not " +
Expand Down
Loading