Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2535,12 +2535,12 @@ public void scheduleUnloadOperation(
/**
* A new metadata image is available.
*
* @param newImage The new metadata image.
* @param delta The metadata delta.
* @param delta The metadata delta.
* @param newImage The new metadata image.
*/
public void onNewMetadataImage(
CoordinatorMetadataImage newImage,
CoordinatorMetadataDelta delta
public void onMetadataUpdate(
CoordinatorMetadataDelta delta,
CoordinatorMetadataImage newImage
) {
throwIfNotRunning();
log.debug("Scheduling applying of a new metadata image with version {}.", newImage.version());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,7 @@ public void testClose() throws Exception {
}

@Test
public void testOnNewMetadataImage() {
public void testOnMetadataUpdate() {
TopicPartition tp0 = new TopicPartition("__consumer_offsets", 0);
TopicPartition tp1 = new TopicPartition("__consumer_offsets", 1);

Expand Down Expand Up @@ -2029,7 +2029,7 @@ public void testOnNewMetadataImage() {
// Publish a new image.
CoordinatorMetadataDelta delta = new KRaftCoordinatorMetadataDelta(new MetadataDelta(MetadataImage.EMPTY));
CoordinatorMetadataImage newImage = CoordinatorMetadataImage.EMPTY;
runtime.onNewMetadataImage(newImage, delta);
runtime.onMetadataUpdate(delta, newImage);

// Coordinator 0 should be notified about it.
verify(coordinator0).onNewMetadataImage(newImage, delta);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class BrokerMetadataPublisher(

try {
// Propagate the new image to the group coordinator.
groupCoordinator.onNewMetadataImage(new KRaftCoordinatorMetadataImage(newImage), new KRaftCoordinatorMetadataDelta(delta))
groupCoordinator.onMetadataUpdate(delta, newImage)
} catch {
case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
s"coordinator with local changes in $deltaName", t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metadata.{FeatureLevelRecord, PartitionRecord, RemoveTopicRecord, TopicRecord}
import org.apache.kafka.common.test.{KafkaClusterTestKit, TestKitNodes}
import org.apache.kafka.common.utils.Exit
import org.apache.kafka.coordinator.common.runtime.{KRaftCoordinatorMetadataDelta, KRaftCoordinatorMetadataImage}
import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta, MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage, ScramImage, TopicsImage}
Expand Down Expand Up @@ -292,7 +291,7 @@ class BrokerMetadataPublisherTest {
.numBytes(42)
.build())

verify(groupCoordinator).onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(delta))
verify(groupCoordinator).onMetadataUpdate(delta, image)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;

import java.time.Duration;
Expand Down Expand Up @@ -467,12 +467,12 @@ void onResignation(
/**
* A new metadata image is available.
*
* @param newImage The new metadata image.
* @param delta The metadata delta.
* @param newImage The new metadata image.
*/
void onNewMetadataImage(
CoordinatorMetadataImage newImage,
CoordinatorMetadataDelta delta
void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,18 +84,21 @@
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataDelta;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta;
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.record.BrokerCompressionType;
Expand Down Expand Up @@ -343,9 +346,9 @@ public GroupCoordinatorService build() {

/**
* The metadata image to extract topic id to names map.
* This is initialised when the {@link GroupCoordinator#onNewMetadataImage(CoordinatorMetadataImage, CoordinatorMetadataDelta)} is called
* This is initialised when the {@link GroupCoordinator#onMetadataUpdate(MetadataDelta, MetadataImage)} is called
*/
private CoordinatorMetadataImage metadataImage = null;
private volatile CoordinatorMetadataImage metadataImage = null;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bug. The attribute must be volatile because it is accessed from multiple threads.


/**
*
Expand Down Expand Up @@ -2354,16 +2357,18 @@ public void onResignation(
}

/**
* See {@link GroupCoordinator#onNewMetadataImage(CoordinatorMetadataImage, CoordinatorMetadataDelta)}.
* See {@link GroupCoordinator#onMetadataUpdate(MetadataDelta, MetadataImage)}.
*/
@Override
public void onNewMetadataImage(
CoordinatorMetadataImage newImage,
CoordinatorMetadataDelta delta
public void onMetadataUpdate(
MetadataDelta delta,
MetadataImage newImage
) {
throwIfNotActive();
metadataImage = newImage;
runtime.onNewMetadataImage(newImage, delta);
var wrappedImage = newImage == null ? null : new KRaftCoordinatorMetadataImage(newImage);
var wrappedDelta = delta == null ? null : new KRaftCoordinatorMetadataDelta(delta);
Comment on lines +2368 to +2369
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty ugly and it should not be here because the image and the delta can never be null. Unfortunately, we have a couple of tests passing nulls here. I will fix those separately and remove this code in a follow-up. I want the core change to be well scoped.

metadataImage = wrappedImage;
runtime.onMetadataUpdate(wrappedDelta, wrappedImage);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,8 @@
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataDelta;
import org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage;
import org.apache.kafka.coordinator.common.runtime.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult;
Expand Down Expand Up @@ -3163,7 +3160,7 @@ public void testOnPartitionsDeleted() {
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(new MetadataDelta(image)));
service.onMetadataUpdate(new MetadataDelta(image), image);

when(runtime.scheduleWriteAllOperation(
ArgumentMatchers.eq("on-partition-deleted"),
Expand Down Expand Up @@ -3221,7 +3218,7 @@ public void testOnPartitionsDeletedCleanupShareGroupState() {
.addTopic(Uuid.randomUuid(), "foo", 1)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), new KRaftCoordinatorMetadataDelta(new MetadataDelta(image)));
service.onMetadataUpdate(new MetadataDelta(image), image);

// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
Expand Down Expand Up @@ -3268,10 +3265,10 @@ public void testOnPartitionsDeletedCleanupShareGroupStateEmptyMetadata() {
.build();
service.startup(() -> 3);

CoordinatorMetadataImage image = new MetadataImageBuilder()
MetadataImage image = new MetadataImageBuilder()
.addTopic(Uuid.randomUuid(), "bar", 1)
.buildCoordinatorMetadataImage();
service.onNewMetadataImage(image, image.emptyDelta());
.build();
service.onMetadataUpdate(new MetadataDelta(image), image);

// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
Expand Down Expand Up @@ -3318,8 +3315,8 @@ public void testOnPartitionsDeletedCleanupShareGroupStateTopicsNotInMetadata() {
.build();
service.startup(() -> 3);

CoordinatorMetadataImage image = CoordinatorMetadataImage.EMPTY;
service.onNewMetadataImage(image, image.emptyDelta());
MetadataImage image = MetadataImage.EMPTY;
service.onMetadataUpdate(new MetadataDelta(image), image);

// No error in partition deleted callback
when(runtime.scheduleWriteAllOperation(
Expand Down Expand Up @@ -4146,7 +4143,7 @@ public void testDescribeShareGroupAllOffsetsLatestOffsetError() throws Interrupt
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(new MetadataDelta(image), image);

int partition = 1;

Expand Down Expand Up @@ -4229,7 +4226,7 @@ public void testDescribeShareGroupOffsetsMetadataImageNull() throws ExecutionExc
.build(true);

// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
service.onMetadataUpdate(null, null);

int partition = 1;
DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
Expand Down Expand Up @@ -4273,7 +4270,7 @@ public void testDescribeShareGroupAllOffsets() throws InterruptedException, Exec
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

int partition = 1;

Expand Down Expand Up @@ -4344,7 +4341,7 @@ public void testDescribeShareGroupAllOffsetsThrowsError() {
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

int partition = 1;

Expand Down Expand Up @@ -4380,7 +4377,7 @@ public void testDescribeShareGroupAllOffsetsNullResult() {
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

int partition = 1;

Expand Down Expand Up @@ -4417,7 +4414,7 @@ public void testDescribeShareGroupAllOffsetsReadSummaryPartitionError() throws I
.addTopic(TOPIC_ID, TOPIC_NAME, 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

int partition = 1;

Expand Down Expand Up @@ -4507,7 +4504,7 @@ public void testDescribeShareGroupAllOffsetsMetadataImageNull() throws Execution
.build(true);

// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
service.onMetadataUpdate(null, null);

DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup requestData = new DescribeShareGroupOffsetsRequestData.DescribeShareGroupOffsetsRequestGroup()
.setGroupId("share-group-id")
Expand Down Expand Up @@ -4704,7 +4701,7 @@ public void testDeleteShareGroupOffsetsMetadataImageNull() throws ExecutionExcep
.build(true);

// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
service.onMetadataUpdate(null, null);

DeleteShareGroupOffsetsRequestData requestData = new DeleteShareGroupOffsetsRequestData()
.setGroupId("share-group-id")
Expand Down Expand Up @@ -5581,7 +5578,7 @@ public void testPersisterInitializeSuccess() {
.addTopic(topicId, "topic-name", 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
Expand Down Expand Up @@ -5756,7 +5753,7 @@ public void testPersisterInitializeGroupInitializeFailure() {
.addTopic(topicId, "topic-name", 3)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
Expand Down Expand Up @@ -5818,7 +5815,7 @@ public void testAlterShareGroupOffsetsMetadataImageNull() throws ExecutionExcept
.build(true);

// Forcing a null Metadata Image
service.onNewMetadataImage(null, null);
service.onMetadataUpdate(null, null);

String groupId = "share-group";
AlterShareGroupOffsetsRequestData request = new AlterShareGroupOffsetsRequestData()
Expand Down Expand Up @@ -5974,7 +5971,7 @@ public void testPersisterInitializeForAlterShareGroupOffsetsResponseSuccess() {
.addTopic(topicId, "topic-name", 1)
.build();

service.onNewMetadataImage(new KRaftCoordinatorMetadataImage(image), null);
service.onMetadataUpdate(null, image);

when(mockPersister.initializeState(ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(
new InitializeShareGroupStateResult.Builder()
Expand Down Expand Up @@ -6023,7 +6020,7 @@ private static class GroupCoordinatorServiceBuilder {
private CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime;
private GroupCoordinatorMetrics metrics = new GroupCoordinatorMetrics();
private Persister persister = new NoOpStatePersister();
private CoordinatorMetadataImage metadataImage = null;
private MetadataImage metadataImage = null;
private PartitionMetadataClient partitionMetadataClient = null;

GroupCoordinatorService build() {
Expand All @@ -6032,7 +6029,9 @@ GroupCoordinatorService build() {

GroupCoordinatorService build(boolean serviceStartup) {
if (metadataImage == null) {
metadataImage = mock(CoordinatorMetadataImage.class);
metadataImage = new MetadataImageBuilder()
.addTopic(TOPIC_ID, TOPIC_NAME, 1)
.build();
Comment on lines +6032 to +6034
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is semi-related. It is just easier to build the image rather than mocking it.

}

GroupCoordinatorService service = new GroupCoordinatorService(
Expand All @@ -6048,14 +6047,8 @@ GroupCoordinatorService build(boolean serviceStartup) {

if (serviceStartup) {
service.startup(() -> 1);
service.onNewMetadataImage(metadataImage, null);
service.onMetadataUpdate(null, metadataImage);
}
when(metadataImage.topicNames()).thenReturn(Set.of(TOPIC_NAME));
var topicMetadata = mock(CoordinatorMetadataImage.TopicMetadata.class);
when(topicMetadata.name()).thenReturn(TOPIC_NAME);
when(topicMetadata.id()).thenReturn(TOPIC_ID);
when(metadataImage.topicMetadata(TOPIC_ID)).thenReturn(Optional.of(topicMetadata));
when(metadataImage.topicMetadata(TOPIC_NAME)).thenReturn(Optional.of(topicMetadata));

return service;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1100,7 +1100,7 @@ public void onTopicsDeleted(Set<Uuid> deletedTopicIds, BufferSupplier bufferSupp
@Override
public void onNewMetadataImage(CoordinatorMetadataImage newImage, FeaturesImage newFeaturesImage, CoordinatorMetadataDelta delta) {
throwIfNotActive();
this.runtime.onNewMetadataImage(newImage, delta);
this.runtime.onMetadataUpdate(delta, newImage);
boolean enabled = isShareGroupsEnabled(newFeaturesImage);
// enabled shouldRunJob result (XOR)
// 0 0 no op on flag, do not call jobs
Expand Down