Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 62 additions & 32 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType.{CLUSTER, GROUP, TOPIC, USER}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource
import org.apache.kafka.controller.ControllerRequestContext.requestTimeoutMsToDeadlineNs
import org.apache.kafka.controller.{Controller, ControllerRequestContext}
import org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
Expand Down Expand Up @@ -720,47 +721,76 @@ class ControllerApis(
val configChanges = new util.HashMap[ConfigResource,
util.Map[String, Entry[AlterConfigOp.OpType, String]]]()
val brokerLoggerResponses = new util.ArrayList[AlterConfigsResourceResponse](1)
val nullConfigsErrorResults = new util.IdentityHashMap[AlterConfigsResource, ApiError]()
alterConfigsRequest.data.resources.forEach { resource =>
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
try {
val nullUpdates = new util.ArrayList[String]()
resource.configs().forEach { config =>
if (config.configOperation() != AlterConfigOp.OpType.DELETE.id() &&
config.value() == null) {
nullUpdates.add(config.name())
}
}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
} else if (!duplicateResources.contains(configResource)) {
val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
if (!nullUpdates.isEmpty) {
throw new InvalidRequestException("Null value not supported for : " +
String.join(", ", nullUpdates))
}
if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
val configResource = new ConfigResource(
ConfigResource.Type.forId(resource.resourceType), resource.resourceName())
if (configResource.`type`().equals(ConfigResource.Type.BROKER_LOGGER)) {
val apiError = try {
runtimeLoggerManager.applyChangesForResource(
authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME),
alterConfigsRequest.data().validateOnly(),
resource)
ApiError.NONE
} catch {
case t: Throwable => ApiError.fromThrowable(t)
}
brokerLoggerResponses.add(new AlterConfigsResourceResponse().
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()).
setErrorCode(apiError.error().code()).
setErrorMessage(if (apiError.isFailure) apiError.messageWithFallback() else null))
} else if (configResource.`type`().equals(ConfigResource.Type.UNKNOWN)) {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate resource.").
setErrorCode(UNSUPPORTED_VERSION.code()).
setErrorMessage("Unknown resource type " + resource.resourceType() + ".").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
} else if (!duplicateResources.contains(configResource)) {
val altersByName = new util.HashMap[String, Entry[AlterConfigOp.OpType, String]]()
resource.configs.forEach { config =>
altersByName.put(config.name, new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
AlterConfigOp.OpType.forId(config.configOperation), config.value))
}
if (configChanges.put(configResource, altersByName) != null) {
duplicateResources.add(configResource)
configChanges.remove(configResource)
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Duplicate resource.").
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
}
}
} catch {
case t: Throwable =>
val err = ApiError.fromThrowable(t)
error(s"Error on processing incrementalAlterConfigs request on ${resource.resourceName()}", t)
nullConfigsErrorResults.put(resource, err)
}
}
if (!nullConfigsErrorResults.isEmpty) {
nullConfigsErrorResults.forEach((resource, apiError) => {
response.responses().add(new AlterConfigsResourceResponse().
setErrorCode(apiError.error().code()).
setErrorMessage(apiError.messageWithFallback()).
setResourceName(resource.resourceName()).
setResourceType(resource.resourceType()))
configChanges.remove(resource)
})
}
val iterator = configChanges.keySet().iterator()
while (iterator.hasNext) {
val resource = iterator.next()
Expand Down
32 changes: 32 additions & 0 deletions core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,38 @@ class ControllerApisTest {
response.data().responses().asScala.toSet)
}

@Test
def testInvalidIncrementalAlterConfigsWithNullResources(): Unit = {
val requestData = new IncrementalAlterConfigsRequestData().setResources(
new AlterConfigsResourceCollection(util.Arrays.asList(
new AlterConfigsResource().
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id()).
setConfigs(new AlterableConfigCollection(util.Arrays.asList(new AlterableConfig().
setName("my.custom.config").
setValue(null).
setConfigOperation(AlterConfigOp.OpType.SET.id())).iterator()))
).iterator()))
val request = buildRequest(new IncrementalAlterConfigsRequest.Builder(requestData).build(0))
controllerApis = createControllerApis(None, new MockController.Builder().build())
controllerApis.handleIncrementalAlterConfigs(request)
val capturedResponse: ArgumentCaptor[AbstractResponse] =
ArgumentCaptor.forClass(classOf[AbstractResponse])
verify(requestChannel).sendResponse(
ArgumentMatchers.eq(request),
capturedResponse.capture(),
ArgumentMatchers.eq(None))
assertNotNull(capturedResponse.getValue)
val response = capturedResponse.getValue.asInstanceOf[IncrementalAlterConfigsResponse]
assertEquals(Set(
new AlterConfigsResourceResponse().
setErrorCode(INVALID_REQUEST.code()).
setErrorMessage("Null value not supported for : my.custom.config").
setResourceName("3").
setResourceType(ConfigResource.Type.BROKER.id())),
response.data().responses().asScala.toSet)
}

@Test
def testUnauthorizedHandleAlterPartitionReassignments(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,39 @@ public void testDescribeConfigs(ClusterInstance clusterInstance) throws Exceptio
assertEquals("2", configEntry.value());
}
}

@ClusterTest
public void testIncrementalAlterNullValueConfigsByControllers(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterNullValueConfigs(clusterInstance, true);
}

@ClusterTest
public void testIncrementalAlterNullValueConfigs(ClusterInstance clusterInstance) throws Exception {
testIncrementalAlterNullValueConfigs(clusterInstance, false);
}

private void testIncrementalAlterNullValueConfigs(ClusterInstance clusterInstance, boolean usingBootstrapControllers) throws Exception {
try (Admin admin = Admin.create(adminConfig(clusterInstance, usingBootstrapControllers))) {
int nodeId = usingBootstrapControllers ?
clusterInstance.controllers().values().iterator().next().config().nodeId() :
clusterInstance.brokers().values().iterator().next().config().nodeId();
ConfigResource nodeResource = new ConfigResource(BROKER, "" + nodeId);
Map<ConfigResource, Collection<AlterConfigOp>> alterations = Map.of(
nodeResource,
List.of(
new AlterConfigOp(new ConfigEntry("my.custom.config", null), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("my.custom.config1", null), AlterConfigOp.OpType.SET)
)
);
ExecutionException exception = assertThrows(
ExecutionException.class,
() -> admin.incrementalAlterConfigs(alterations).all().get(1, TimeUnit.MINUTES)
);
assertEquals(
"org.apache.kafka.common.errors.InvalidRequestException: " +
"Null value not supported for : my.custom.config, my.custom.config1",
exception.getMessage()
);
}
}
}