Skip to content

Commit b7c766a

Browse files
authored
Merge branch 'apache:trunk' into trunk
2 parents dde3ada + 71efb89 commit b7c766a

File tree

86 files changed

+3923
-894
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+3923
-894
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3350,6 +3350,7 @@ project(':jmh-benchmarks') {
33503350
implementation project(':raft')
33513351
implementation project(':clients')
33523352
implementation project(':coordinator-common')
3353+
implementation project(':coordinator-common').sourceSets.test.output
33533354
implementation project(':group-coordinator')
33543355
implementation project(':group-coordinator:group-coordinator-api')
33553356
implementation project(':metadata')

checkstyle/import-control-jmh-benchmarks.xml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@
5252
<allow pkg="org.apache.kafka.server"/>
5353
<allow pkg="org.apache.kafka.storage"/>
5454
<allow pkg="org.apache.kafka.clients"/>
55-
<allow class="org.apache.kafka.coordinator.common.runtime.CoordinatorMetadataImage"/>
56-
<allow class="org.apache.kafka.coordinator.common.runtime.KRaftCoordinatorMetadataImage"/>
57-
<allow class="org.apache.kafka.coordinator.common.runtime.HdrHistogram"/>
55+
<allow pkg="org.apache.kafka.coordinator.common.runtime"/>
5856
<allow pkg="org.apache.kafka.coordinator.group"/>
5957
<allow pkg="org.apache.kafka.image"/>
6058
<allow pkg="org.apache.kafka.metadata"/>

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerCallbackTest.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@
2727
import java.util.List;
2828
import java.util.Locale;
2929
import java.util.Map;
30+
import java.util.Set;
3031
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicReference;
3133
import java.util.function.BiConsumer;
3234

3335
import static org.apache.kafka.clients.ClientsTestUtils.consumeAndVerifyRecords;
@@ -166,6 +168,80 @@ public void testClassicConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAs
166168
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CLASSIC);
167169
}
168170

171+
@ClusterTest
172+
public void testOnPartitionsAssignedCalledWithNewPartitionsOnlyForClassicCooperative() throws InterruptedException {
173+
try (var consumer = createClassicConsumerCooperativeProtocol()) {
174+
testOnPartitionsAssignedCalledWithExpectedPartitions(consumer, true);
175+
}
176+
}
177+
178+
@ClusterTest
179+
public void testOnPartitionsAssignedCalledWithNewPartitionsOnlyForAsyncConsumer() throws InterruptedException {
180+
try (var consumer = createConsumer(CONSUMER)) {
181+
testOnPartitionsAssignedCalledWithExpectedPartitions(consumer, true);
182+
}
183+
}
184+
185+
@ClusterTest
186+
public void testOnPartitionsAssignedCalledWithNewPartitionsOnlyForClassicEager() throws InterruptedException {
187+
try (var consumer = createConsumer(CLASSIC)) {
188+
testOnPartitionsAssignedCalledWithExpectedPartitions(consumer, false);
189+
}
190+
}
191+
192+
private void testOnPartitionsAssignedCalledWithExpectedPartitions(
193+
Consumer<byte[], byte[]> consumer,
194+
boolean expectNewPartitionsOnlyInCallback) throws InterruptedException {
195+
subscribeAndExpectOnPartitionsAssigned(consumer, List.of(topic), List.of(tp));
196+
assertEquals(Set.of(tp), consumer.assignment());
197+
198+
// Add a new partition assignment while keeping the previous one
199+
String newTopic = "newTopic";
200+
TopicPartition addedPartition = new TopicPartition(newTopic, 0);
201+
List<TopicPartition> expectedPartitionsInCallback;
202+
if (expectNewPartitionsOnlyInCallback) {
203+
expectedPartitionsInCallback = List.of(addedPartition);
204+
} else {
205+
expectedPartitionsInCallback = List.of(tp, addedPartition);
206+
}
207+
208+
// Change subscription to keep the previous one and add a new topic. Assignment should be updated
209+
// to contain partitions from both topics, but the onPartitionsAssigned parameters may containing
210+
// the full new assignment or just the newly added partitions depending on the case.
211+
subscribeAndExpectOnPartitionsAssigned(
212+
consumer,
213+
List.of(topic, newTopic),
214+
expectedPartitionsInCallback);
215+
assertEquals(Set.of(tp, addedPartition), consumer.assignment());
216+
}
217+
218+
private void subscribeAndExpectOnPartitionsAssigned(Consumer<byte[], byte[]> consumer, List<String> topics, Collection<TopicPartition> expectedPartitionsInCallback) throws InterruptedException {
219+
var partitionsAssigned = new AtomicBoolean(false);
220+
AtomicReference<Collection<TopicPartition>> partitionsFromCallback = new AtomicReference<>();
221+
consumer.subscribe(topics, new ConsumerRebalanceListener() {
222+
@Override
223+
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
224+
if (partitions.containsAll(expectedPartitionsInCallback)) {
225+
partitionsFromCallback.set(partitions);
226+
partitionsAssigned.set(true);
227+
}
228+
}
229+
230+
@Override
231+
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
232+
// noop
233+
}
234+
});
235+
ClientsTestUtils.pollUntilTrue(
236+
consumer,
237+
partitionsAssigned::get,
238+
"Timed out before expected rebalance completed"
239+
);
240+
// These are different types, so comparing values instead
241+
assertTrue(expectedPartitionsInCallback.containsAll(partitionsFromCallback.get()) && partitionsFromCallback.get().containsAll(expectedPartitionsInCallback),
242+
"Expected partitions " + expectedPartitionsInCallback + " as parameter for onPartitionsAssigned, but got " + partitionsFromCallback.get());
243+
}
244+
169245
@ClusterTest
170246
public void testAsyncConsumerGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback() throws InterruptedException {
171247
testGetPositionOfNewlyAssignedPartitionOnPartitionsAssignedCallback(CONSUMER);
@@ -284,4 +360,12 @@ private Consumer<byte[], byte[]> createConsumer(GroupProtocol protocol) {
284360
ENABLE_AUTO_COMMIT_CONFIG, "false"
285361
));
286362
}
363+
364+
private Consumer<byte[], byte[]> createClassicConsumerCooperativeProtocol() {
365+
return cluster.consumer(Map.of(
366+
GROUP_PROTOCOL_CONFIG, CLASSIC.name.toLowerCase(Locale.ROOT),
367+
ENABLE_AUTO_COMMIT_CONFIG, "false",
368+
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"
369+
));
370+
}
287371
}

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,10 @@ private void testHeaders(Map<String, Object> consumerConfig) throws Exception {
203203
) {
204204
var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getBytes(), "value".getBytes());
205205
record.headers().add("headerKey", "headerValue".getBytes());
206+
record.headers().add("headerKey2", "headerValue2".getBytes());
207+
record.headers().add("headerKey3", "headerValue3".getBytes());
206208
producer.send(record);
209+
producer.flush();
207210

208211
assertEquals(0, consumer.assignment().size());
209212
consumer.assign(List.of(TP));
@@ -212,8 +215,15 @@ var record = new ProducerRecord<>(TP.topic(), TP.partition(), null, "key".getByt
212215
consumer.seek(TP, 0);
213216
var records = consumeRecords(consumer, numRecords);
214217
assertEquals(numRecords, records.size());
218+
215219
var header = records.get(0).headers().lastHeader("headerKey");
216220
assertEquals("headerValue", header == null ? null : new String(header.value()));
221+
222+
// Test the order of headers in a record is preserved when producing and consuming
223+
Header[] headers = records.get(0).headers().toArray();
224+
assertEquals("headerKey", headers[0].key());
225+
assertEquals("headerKey2", headers[1].key());
226+
assertEquals("headerKey3", headers[2].key());
217227
}
218228
}
219229

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,8 @@ public void testHeaders() {
467467
int numRecords = 1;
468468
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(tp.topic(), tp.partition(), null, "key".getBytes(), "value".getBytes());
469469
record.headers().add("headerKey", "headerValue".getBytes());
470+
record.headers().add("headerKey2", "headerValue2".getBytes());
471+
record.headers().add("headerKey3", "headerValue3".getBytes());
470472
producer.send(record);
471473
producer.flush();
472474

@@ -475,11 +477,15 @@ public void testHeaders() {
475477
List<ConsumerRecord<byte[], byte[]>> records = consumeRecords(shareConsumer, numRecords);
476478
assertEquals(numRecords, records.size());
477479

478-
for (ConsumerRecord<byte[], byte[]> consumerRecord : records) {
479-
Header header = consumerRecord.headers().lastHeader("headerKey");
480-
if (header != null)
481-
assertEquals("headerValue", new String(header.value()));
482-
}
480+
Header header = records.get(0).headers().lastHeader("headerKey");
481+
assertEquals("headerValue", new String(header.value()));
482+
483+
// Test the order of headers in a record is preserved when producing and consuming
484+
Header[] headers = records.get(0).headers().toArray();
485+
assertEquals("headerKey", headers[0].key());
486+
assertEquals("headerKey2", headers[1].key());
487+
assertEquals("headerKey3", headers[2].key());
488+
483489
verifyShareGroupStateTopicRecordsProduced();
484490
}
485491
}

clients/src/main/java/org/apache/kafka/clients/admin/RecordsToDelete.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,16 @@ private RecordsToDelete(long offset) {
3333
/**
3434
* Delete all the records before the given {@code offset}
3535
*
36-
* @param offset the offset before which all records will be deleted
36+
* @param offset The offset before which all records will be deleted.
37+
* Use {@code -1} to truncate to the high watermark.
3738
*/
3839
public static RecordsToDelete beforeOffset(long offset) {
3940
return new RecordsToDelete(offset);
4041
}
4142

4243
/**
43-
* The offset before which all records will be deleted
44+
* The offset before which all records will be deleted.
45+
* Use {@code -1} to truncate to the high watermark.
4446
*/
4547
public long beforeOffset() {
4648
return offset;

clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
* Under normal conditions, if a partition is reassigned from one consumer to another, then the old consumer will
5151
* always invoke {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} for that partition prior to the new consumer
5252
* invoking {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} for the same partition. So if offsets or other state is saved in the
53-
* {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one consumer member, it will be always accessible by the time the
53+
* {@link #onPartitionsRevoked(Collection) onPartitionsRevoked} call by one consumer member, it will always be accessible by the time the
5454
* other consumer member taking over that partition and triggering its {@link #onPartitionsAssigned(Collection) onPartitionsAssigned} callback to load the state.
5555
* <p>
5656
* You can think of revocation as a graceful way to give up ownership of a partition. In some cases, the consumer may not have an opportunity to do so.
@@ -120,13 +120,31 @@ public interface ConsumerRebalanceListener {
120120
/**
121121
* A callback method the user can implement to provide handling of offset commits to a customized store.
122122
* This method will be called during a rebalance operation when the consumer has to give up some partitions.
123-
* It can also be called when consumer is being closed ({@link KafkaConsumer#close(CloseOptions option)})
124-
* or is unsubscribing ({@link KafkaConsumer#unsubscribe()}).
123+
* The consumer may need to give up some partitions (thus this callback executed) under the following scenarios:
124+
* <ul>
125+
* <li>If the consumer assignment changes</li>
126+
* <li>If the consumer is being closed ({@link KafkaConsumer#close(CloseOptions option)})</li>
127+
* <li>If the consumer is unsubscribing ({@link KafkaConsumer#unsubscribe()})</li>
128+
* </ul>
125129
* It is recommended that offsets should be committed in this callback to either Kafka or a
126130
* custom offset store to prevent duplicate data.
127131
* <p>
128-
* In eager rebalancing, it will always be called at the start of a rebalance and after the consumer stops fetching data.
129-
* In cooperative rebalancing, it will be called at the end of a rebalance on the set of partitions being revoked iff the set is non-empty.
132+
* This callback is always called before re-assigning the partitions.
133+
* If the consumer is using the {@link GroupProtocol#CLASSIC} rebalance protocol:
134+
* <ul>
135+
* <li>
136+
* In eager rebalancing, onPartitionsRevoked will be called with the full set of assigned partitions as a parameter (all partitions are revoked).
137+
* It will be called even if there are no partitions to revoke.
138+
* </li>
139+
* <li>
140+
* In cooperative rebalancing, onPartitionsRevoked will be called with the set of partitions to revoke,
141+
* iff the set is non-empty.
142+
* </li>
143+
* </ul>
144+
* If the consumer is using the {@link GroupProtocol#CONSUMER} rebalance protocol, this callback will be called
145+
* with the set of partitions to revoke iff the set is non-empty
146+
* (same behavior as the {@link GroupProtocol#CLASSIC} rebalance protocol with Cooperative mode).
147+
* <p>
130148
* For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}.
131149
* <p>
132150
* It is common for the revocation callback to use the consumer instance in order to commit offsets. It is possible
@@ -135,21 +153,23 @@ public interface ConsumerRebalanceListener {
135153
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
136154
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
137155
*
138-
* @param partitions The list of partitions that were assigned to the consumer and now need to be revoked (may not
139-
* include all currently assigned partitions, i.e. there may still be some partitions left)
156+
* @param partitions The list of partitions that were assigned to the consumer and now need to be revoked. This will
157+
* include the full assignment under the Classic/Eager protocol, given that it revokes all partitions.
158+
* It will only include the subset to revoke under the Classic/Cooperative and Consumer protocols.
140159
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
141160
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
142161
*/
143162
void onPartitionsRevoked(Collection<TopicPartition> partitions);
144163

145164
/**
146165
* A callback method the user can implement to provide handling of customized offsets on completion of a successful
147-
* partition re-assignment. This method will be called after the partition re-assignment completes and before the
148-
* consumer starts fetching data, and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
166+
* partition re-assignment. This method will be called after the partition re-assignment completes (even if no new
167+
* partitions were assigned to the consumer), and before the consumer starts fetching data,
168+
* and only as the result of a {@link Consumer#poll(java.time.Duration) poll(long)} call.
149169
* <p>
150170
* It is guaranteed that under normal conditions all the processes in a consumer group will execute their
151-
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes its
152-
* {@link #onPartitionsAssigned(Collection)} callback. During exceptional scenarios, partitions may be migrated
171+
* {@link #onPartitionsRevoked(Collection)} callback before any instance executes this onPartitionsAssigned callback.
172+
* During exceptional scenarios, partitions may be migrated
153173
* without the old owner being notified (i.e. their {@link #onPartitionsRevoked(Collection)} callback not triggered),
154174
* and later when the old owner consumer realized this event, the {@link #onPartitionsLost(Collection)} callback
155175
* will be triggered by the consumer then.
@@ -160,9 +180,11 @@ public interface ConsumerRebalanceListener {
160180
* invocation of {@link KafkaConsumer#poll(java.time.Duration)} in which this callback is being executed. This means it is not
161181
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
162182
*
163-
* @param partitions The list of partitions that are now assigned to the consumer (previously owned partitions will
164-
* NOT be included, i.e. this list will only include newly added partitions)
165-
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
183+
* @param partitions Partitions that have been added to the assignment as a result of the rebalance.
184+
* Note that partitions that were already owned by this consumer and remain assigned are not
185+
* included in this list under the Classic/Cooperative or Consumer protocols. THe full assignment
186+
* will be received under the Classic/Eager protocol.
187+
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
166188
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
167189
*/
168190
void onPartitionsAssigned(Collection<TopicPartition> partitions);
@@ -187,10 +209,9 @@ public interface ConsumerRebalanceListener {
187209
* necessary to catch these exceptions and re-attempt to wakeup or interrupt the consumer thread.
188210
*
189211
* @param partitions The list of partitions that were assigned to the consumer and now have been reassigned
190-
* to other consumers. With the current protocol this will always include all of the consumer's
191-
* previously assigned partitions, but this may change in future protocols (ie there would still
192-
* be some partitions left)
193-
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
212+
* to other consumers. With both, the Classic and Consumer protocols, this will always include
213+
* all partitions that were previously assigned to the consumer.
214+
* @throws org.apache.kafka.common.errors.WakeupException If raised from a nested call to {@link KafkaConsumer}
194215
* @throws org.apache.kafka.common.errors.InterruptException If raised from a nested call to {@link KafkaConsumer}
195216
*/
196217
default void onPartitionsLost(Collection<TopicPartition> partitions) {

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -661,7 +661,7 @@ public Set<String> subscription() {
661661
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
662662
*
663663
* <p>
664-
* As part of group management, the consumer will keep track of the list of consumers that belong to a particular
664+
* As part of group management, the group coordinator will keep track of the list of consumers that belong to a particular
665665
* group and will trigger a rebalance operation if any one of the following events are triggered:
666666
* <ul>
667667
* <li>Number of partitions change for any of the subscribed topics
@@ -670,8 +670,11 @@ public Set<String> subscription() {
670670
* <li>A new member is added to the consumer group
671671
* </ul>
672672
* <p>
673-
* When any of these events are triggered, the provided listener will be invoked first to indicate that
674-
* the consumer's assignment has been revoked, and then again when the new assignment has been received.
673+
* When any of these events are triggered, the provided listener will be invoked in this way:
674+
* <ul>
675+
* <li>{@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)} will be invoked with the partitions to revoke, before re-assigning those partitions to another consumer.</li>
676+
* <li>{@link ConsumerRebalanceListener#onPartitionsAssigned(Collection)} will be invoked when the rebalance completes (even if no new partitions are assigned to the consumer)</li>
677+
* </ul>
675678
* Note that rebalances will only occur during an active call to {@link #poll(Duration)}, so callbacks will
676679
* also only be invoked during that time.
677680
*

0 commit comments

Comments
 (0)