diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 07e22f91e6301..e189d571e2f55 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -2535,12 +2535,12 @@ public void scheduleUnloadOperation( /** * A new metadata image is available. * - * @param newImage The new metadata image. - * @param delta The metadata delta. + * @param delta The metadata delta. + * @param newImage The new metadata image. */ - public void onNewMetadataImage( - CoordinatorMetadataImage newImage, - CoordinatorMetadataDelta delta + public void onMetadataUpdate( + CoordinatorMetadataDelta delta, + CoordinatorMetadataImage newImage ) { throwIfNotRunning(); log.debug("Scheduling applying of a new metadata image with version {}.", newImage.version()); diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index a359832756d3d..a0fc120640250 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -1968,7 +1968,7 @@ public void testClose() throws Exception { } @Test - public void testOnNewMetadataImage() { + public void testOnMetadataUpdate() { TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0); TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1); @@ -2029,7 +2029,7 @@ public void testOnNewMetadataImage() { // Publish a new image. CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new MetadataDelta(MetadataImage.EMPTY)); CoordinatorMetadataImage newImage = CoordinatorMetadataImage.EMPTY; - runtime.onNewMetadataImage(newImage, delta); + runtime.onMetadataUpdate(delta, newImage); // Coordinator 0 should be notified about it. verify(coordinator0).onNewMetadataImage(newImage, delta); diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala index 7364eef518817..3f6abc5c71f68 100644 --- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala +++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala @@ -234,7 +234,7 @@ class BrokerMetadataPublisher( try { // Propagate the new image to the group coordinator. - groupCoordinator.onNewMetadataImage(new KRaftCoordinatorMetadataImage(newImage), new KRaftCoordinatorMetadataDelta(delta)) + groupCoordinator.onMetadataUpdate(delta, newImage) } catch { case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " + s"coordinator with local changes in $deltaName", t) diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala index 36a94edf9bd55..644bce7b80bf6 100644 --- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala +++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala @@ -35,7 +35,6 @@ import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, RemoveTopicRecord, TopicRecord} import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes} import org.apache.kafka.common.utils.Exit -import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage} import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage} @@ -292,7 +291,7 @@ class BrokerMetadataPublisherTest { .numBytes(42) .build()) - verify(groupCoordinator).onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(delta)) + verify(groupCoordinator).onMetadataUpdate(delta, image) } @Test diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 83dc962268302..5133c2da77109 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -53,9 +53,9 @@ import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.BufferSupplier; -import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta; -import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import java.time.Duration; @@ -467,12 +467,12 @@ void onResignation( /** * A new metadata image is available. * - * @param newImage The new metadata image. * @param delta The metadata delta. + * @param newImage The new metadata image. */ - void onNewMetadataImage( - CoordinatorMetadataImage newImage, - CoordinatorMetadataDelta delta + void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage ); /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 2817eba179872..ce10aab6dbf72 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -84,18 +84,21 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor; import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader; -import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta; import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorResult; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics; import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier; +import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta; +import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor; import org.apache.kafka.coordinator.common.runtime.PartitionWriter; import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; +import org.apache.kafka.image.MetadataDelta; +import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.Authorizer; import org.apache.kafka.server.record.BrokerCompressionType; @@ -343,9 +346,9 @@ public GroupCoordinatorService build() { /** * The metadata image to extract topic id to names map. - * This is initialised when the {@link GroupCoordinator#onNewMetadataImage(CoordinatorMetadataImage, CoordinatorMetadataDelta)} is called + * This is initialised when the {@link GroupCoordinator#onMetadataUpdate(MetadataDelta, MetadataImage)} is called */ - private CoordinatorMetadataImage metadataImage = null; + private volatile CoordinatorMetadataImage metadataImage = null; /** * @@ -2354,16 +2357,18 @@ public void onResignation( } /** - * See {@link GroupCoordinator#onNewMetadataImage(CoordinatorMetadataImage, CoordinatorMetadataDelta)}. + * See {@link GroupCoordinator#onMetadataUpdate(MetadataDelta, MetadataImage)}. */ @Override - public void onNewMetadataImage( - CoordinatorMetadataImage newImage, - CoordinatorMetadataDelta delta + public void onMetadataUpdate( + MetadataDelta delta, + MetadataImage newImage ) { throwIfNotActive(); - metadataImage = newImage; - runtime.onNewMetadataImage(newImage, delta); + var wrappedImage = newImage == null ? null : new KRaftCoordinatorMetadataImage(newImage); + var wrappedDelta = delta == null ? null : new KRaftCoordinatorMetadataDelta(delta); + metadataImage = wrappedImage; + runtime.onMetadataUpdate(wrappedDelta, wrappedImage); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 75e0bc45c7629..cbbc0d96502eb 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -88,11 +88,8 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord; import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime; -import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta; -import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage; import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder; import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics; import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult; @@ -3163,7 +3160,7 @@ public void testOnPartitionsDeleted() { .addTopic(Uuid.randomUuid(), "foo", 1) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(new MetadataDelta(image))); + service.onMetadataUpdate(new MetadataDelta(image), image); when(runtime.scheduleWriteAllOperation( ArgumentMatchers.eq("on-partition-deleted"), @@ -3221,7 +3218,7 @@ public void testOnPartitionsDeletedCleanupShareGroupState() { .addTopic(Uuid.randomUuid(), "foo", 1) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(new MetadataDelta(image))); + service.onMetadataUpdate(new MetadataDelta(image), image); // No error in partition deleted callback when(runtime.scheduleWriteAllOperation( @@ -3268,10 +3265,10 @@ public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() { .build(); service.startup(() -> 3); - CoordinatorMetadataImage image = new MetadataImageBuilder() + MetadataImage image = new MetadataImageBuilder() .addTopic(Uuid.randomUuid(), "bar", 1) - .buildCoordinatorMetadataImage(); - service.onNewMetadataImage(image, image.emptyDelta()); + .build(); + service.onMetadataUpdate(new MetadataDelta(image), image); // No error in partition deleted callback when(runtime.scheduleWriteAllOperation( @@ -3318,8 +3315,8 @@ public void testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() { .build(); service.startup(() -> 3); - CoordinatorMetadataImage image = CoordinatorMetadataImage.EMPTY; - service.onNewMetadataImage(image, image.emptyDelta()); + MetadataImage image = MetadataImage.EMPTY; + service.onMetadataUpdate(new MetadataDelta(image), image); // No error in partition deleted callback when(runtime.scheduleWriteAllOperation( @@ -4146,7 +4143,7 @@ public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws Interrupt .addTopic(TOPIC_ID, TOPIC_NAME, 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(new MetadataDelta(image), image); int partition = 1; @@ -4229,7 +4226,7 @@ public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionExc .build(true); // Forcing a null Metadata Image - service.onNewMetadataImage(null, null); + service.onMetadataUpdate(null, null); int partition = 1; DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() @@ -4273,7 +4270,7 @@ public void testDescribeShareGroupAllOffsets() throws InterruptedException, Exec .addTopic(TOPIC_ID, TOPIC_NAME, 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); int partition = 1; @@ -4344,7 +4341,7 @@ public void testDescribeShareGroupAllOffsetsThrowsError() { .addTopic(TOPIC_ID, TOPIC_NAME, 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); int partition = 1; @@ -4380,7 +4377,7 @@ public void testDescribeShareGroupAllOffsetsNullResult() { .addTopic(TOPIC_ID, TOPIC_NAME, 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); int partition = 1; @@ -4417,7 +4414,7 @@ public void testDescribeShareGroupAllOffsetsReadSummaryPartitionError() throws I .addTopic(TOPIC_ID, TOPIC_NAME, 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); int partition = 1; @@ -4507,7 +4504,7 @@ public void testDescribeShareGroupAllOffsetsMetadataImageNull() throws Execution .build(true); // Forcing a null Metadata Image - service.onNewMetadataImage(null, null); + service.onMetadataUpdate(null, null); DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup() .setGroupId("share-group-id") @@ -4704,7 +4701,7 @@ public void testDeleteShareGroupOffsetsMetadataImageNull() throws ExecutionExcep .build(true); // Forcing a null Metadata Image - service.onNewMetadataImage(null, null); + service.onMetadataUpdate(null, null); DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData() .setGroupId("share-group-id") @@ -5581,7 +5578,7 @@ public void testPersisterInitializeSuccess() { .addTopic(topicId, "topic-name", 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new InitializeShareGroupStateResult.Builder() @@ -5756,7 +5753,7 @@ public void testPersisterInitializeGroupInitializeFailure() { .addTopic(topicId, "topic-name", 3) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new InitializeShareGroupStateResult.Builder() @@ -5818,7 +5815,7 @@ public void testAlterShareGroupOffsetsMetadataImageNull() throws ExecutionExcept .build(true); // Forcing a null Metadata Image - service.onNewMetadataImage(null, null); + service.onMetadataUpdate(null, null); String groupId = "share-group"; AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData() @@ -5974,7 +5971,7 @@ public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() { .addTopic(topicId, "topic-name", 1) .build(); - service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null); + service.onMetadataUpdate(null, image); when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture( new InitializeShareGroupStateResult.Builder() @@ -6023,7 +6020,7 @@ private static class GroupCoordinatorServiceBuilder { private CoordinatorRuntime runtime; private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics(); private Persister persister = new NoOpStatePersister(); - private CoordinatorMetadataImage metadataImage = null; + private MetadataImage metadataImage = null; private PartitionMetadataClient partitionMetadataClient = null; GroupCoordinatorService build() { @@ -6032,7 +6029,9 @@ GroupCoordinatorService build() { GroupCoordinatorService build(boolean serviceStartup) { if (metadataImage == null) { - metadataImage = mock(CoordinatorMetadataImage.class); + metadataImage = new MetadataImageBuilder() + .addTopic(TOPIC_ID, TOPIC_NAME, 1) + .build(); } GroupCoordinatorService service = new GroupCoordinatorService( @@ -6048,14 +6047,8 @@ GroupCoordinatorService build(boolean serviceStartup) { if (serviceStartup) { service.startup(() -> 1); - service.onNewMetadataImage(metadataImage, null); + service.onMetadataUpdate(null, metadataImage); } - when(metadataImage.topicNames()).thenReturn(Set.of(TOPIC_NAME)); - var topicMetadata = mock(CoordinatorMetadataImage.TopicMetadata.class); - when(topicMetadata.name()).thenReturn(TOPIC_NAME); - when(topicMetadata.id()).thenReturn(TOPIC_ID); - when(metadataImage.topicMetadata(TOPIC_ID)).thenReturn(Optional.of(topicMetadata)); - when(metadataImage.topicMetadata(TOPIC_NAME)).thenReturn(Optional.of(topicMetadata)); return service; } diff --git a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java index d0386e32b9f40..bb6c811c82ed7 100644 --- a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java +++ b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java @@ -1100,7 +1100,7 @@ public void onTopicsDeleted(Set deletedTopicIds, BufferSupplier bufferSupp @Override public void onNewMetadataImage(CoordinatorMetadataImage newImage, FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta) { throwIfNotActive(); - this.runtime.onNewMetadataImage(newImage, delta); + this.runtime.onMetadataUpdate(delta, newImage); boolean enabled = isShareGroupsEnabled(newFeaturesImage); // enabled shouldRunJob result (XOR) // 0 0 no op on flag, do not call jobs