5151/**
5252 * A client that consumes records from a Kafka cluster using a share group.
5353 * <p>
54- * <em>This is an early access feature under development which is introduced by KIP-932.
55- * It is not suitable for production use until it is fully implemented and released.</em>
54+ * <em>This is a preview feature introduced by KIP-932. It is not yet recommended for production use.</em>
5655 *
5756 * <h3>Cross-Version Compatibility</h3>
5857 * This client can communicate with brokers that are a version that supports share groups. You will receive an
10099 * of the topic-partitions that match its subscriptions. Records are acquired for delivery to this consumer with a
101100 * time-limited acquisition lock. While a record is acquired, it is not available for another consumer. By default,
102101 * the lock duration is 30 seconds, but it can also be controlled using the group {@code group.share.record.lock.duration.ms}
103- * configuration parameter . The idea is that the lock is automatically released once the lock duration has elapsed, and
102+ * configuration property . The idea is that the lock is automatically released once the lock duration has elapsed, and
104103 * then the record is available to be given to another consumer. The consumer which holds the lock can deal with it in
105104 * the following ways:
106105 * <ul>
116115 * {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically
117116 * releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
118117 * <p>
119- * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the
120- * consumer {@code share.acknowledgement.mode} property.
118+ * The consumer can choose to use implicit or explicit acknowledgement of the records it processes by using the
119+ * consumer {@code share.acknowledgement.mode} configuration property.
121120 * <p>
122121 * If the application sets the property to "implicit" or does not set it at all, then the consumer is using
123122 * <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by:
129128 * the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
130129 * <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li>
131130 * </ul>
132- * If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment </em>.
131+ * If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgement </em>.
133132 * The application must acknowledge all records returned from {@link #poll(Duration)} using
134133 * {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}.
135134 * If the application calls {@link #poll(Duration)} without having acknowledged all records, an
162161 * props.setProperty("group.id", "test");
163162 * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
164163 * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
164+ *
165165 * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
166166 * consumer.subscribe(Arrays.asList("foo"));
167167 * while (true) {
181181 * props.setProperty("group.id", "test");
182182 * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
183183 * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
184+ *
184185 * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
185186 * consumer.subscribe(Arrays.asList("foo"));
186187 * while (true) {
203204 * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
204205 * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
205206 * props.setProperty("share.acknowledgement.mode", "explicit");
207+ *
206208 * KafkaShareConsumer<String, String> consumer = new KafkaShareConsumer<>(props);
207209 * consumer.subscribe(Arrays.asList("foo"));
208210 * while (true) {
@@ -443,7 +445,7 @@ public void unsubscribe() {
443445 }
444446
445447 /**
446- * Fetch data for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
448+ * Deliver records for the topics specified using {@link #subscribe(Collection)}. It is an error to not have
447449 * subscribed to any topics before polling for data.
448450 *
449451 * <p>
@@ -452,13 +454,14 @@ public void unsubscribe() {
452454 *
453455 * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
454456 *
455- * @return map of topic to records since the last fetch for the subscribed list of topics
457+ * @return map of topic to records
456458 *
457459 * @throws AuthenticationException if authentication fails. See the exception for more details
458460 * @throws AuthorizationException if caller lacks Read access to any of the subscribed
459461 * topics or to the share group. See the exception for more details
460462 * @throws IllegalArgumentException if the timeout value is negative
461- * @throws IllegalStateException if the consumer is not subscribed to any topics
463+ * @throws IllegalStateException if the consumer is not subscribed to any topics, or it is using
464+ * explicit acknowledgement and has not acknowledged all records previously delivered
462465 * @throws ArithmeticException if the timeout is greater than {@link Long#MAX_VALUE} milliseconds.
463466 * @throws InvalidTopicException if the current subscription contains any invalid
464467 * topic (per {@link org.apache.kafka.common.internals.Topic#validate(String)})
@@ -475,11 +478,12 @@ public ConsumerRecords<K, V> poll(Duration timeout) {
475478 * Acknowledge successful delivery of a record returned on the last {@link #poll(Duration)} call.
476479 * The acknowledgement is committed on the next {@link #commitSync()}, {@link #commitAsync()} or
477480 * {@link #poll(Duration)} call.
481+ * <p>This method can only be used if the consumer is using <b>explicit acknowledgement</b>.
478482 *
479483 * @param record The record to acknowledge
480484 *
481- * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
482- * used implicit acknowledgement
485+ * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
486+ * explicit acknowledgement
483487 */
484488 @ Override
485489 public void acknowledge (ConsumerRecord <K , V > record ) {
@@ -489,14 +493,14 @@ public void acknowledge(ConsumerRecord<K, V> record) {
489493 /**
490494 * Acknowledge delivery of a record returned on the last {@link #poll(Duration)} call indicating whether
491495 * it was processed successfully. The acknowledgement is committed on the next {@link #commitSync()},
492- * {@link #commitAsync()} or {@link #poll(Duration)} call. By using this method, the consumer is using
493- * <b>explicit acknowledgement</b>.
496+ * {@link #commitAsync()} or {@link #poll(Duration)} call.
497+ * <p>This method can only be used if the consumer is using < b>explicit acknowledgement</b>.
494498 *
495499 * @param record The record to acknowledge
496500 * @param type The acknowledgement type which indicates whether it was processed successfully
497501 *
498- * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer has already
499- * used implicit acknowledgement
502+ * @throws IllegalStateException if the record is not waiting to be acknowledged, or the consumer is not using
503+ * explicit acknowledgement
500504 */
501505 @ Override
502506 public void acknowledge (ConsumerRecord <K , V > record , AcknowledgeType type ) {
@@ -585,7 +589,7 @@ public void setAcknowledgementCommitCallback(AcknowledgementCommitCallback callb
585589 * client to complete the request.
586590 * <p>
587591 * Client telemetry is controlled by the {@link ConsumerConfig#ENABLE_METRICS_PUSH_CONFIG}
588- * configuration option .
592+ * configuration property .
589593 *
590594 * @param timeout The maximum time to wait for consumer client to determine its client instance ID.
591595 * The value must be non-negative. Specifying a timeout of zero means do not
0 commit comments