Skip to content

Commit b43d708

Browse files
authored
KAFKA-18652: Add task.offset.interval.ms config (apache#21737)
KIP-1071 adds new broker side configs: - task.offset.interval.ms - min.task.offset.interval.ms This PR adds both configs to the broker, and updates the StreamsHeartbeatResponse to provide `task.offset.interval.ms` to the Kafka Streams client. Reviewers: Andrew Schofield <aschofield@confluent.io>, Lucas Brutschy <lbrutschy@confluent.io>
1 parent aa97adc commit b43d708

14 files changed

Lines changed: 778 additions & 94 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,7 @@ private void onSuccessResponse(final StreamsGroupHeartbeatResponse response, fin
531531
heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
532532
heartbeatState.setEndpointInformationEpoch(data.endpointInformationEpoch());
533533
streamsRebalanceData.setHeartbeatIntervalMs(data.heartbeatIntervalMs());
534+
streamsRebalanceData.setTaskOffsetIntervalMs(data.taskOffsetIntervalMs());
534535

535536
if (data.partitionsByUserEndpoint() != null) {
536537
streamsRebalanceData.setPartitionsByHost(convertHostInfoMap(data));

clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceData.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ public String toString() {
345345

346346
private final AtomicInteger heartbeatIntervalMs = new AtomicInteger(-1);
347347

348+
private final AtomicInteger taskOffsetIntervalMs = new AtomicInteger(-1);
349+
348350
public StreamsRebalanceData(final UUID processId,
349351
final Optional<HostInfo> endpoint,
350352
final Optional<String> rackId,
@@ -427,4 +429,14 @@ public int heartbeatIntervalMs() {
427429
return heartbeatIntervalMs.get();
428430
}
429431

432+
/** Updated whenever a heartbeat response is received from the broker. */
433+
public void setTaskOffsetIntervalMs(final int taskOffsetIntervalMs) {
434+
this.taskOffsetIntervalMs.set(taskOffsetIntervalMs);
435+
}
436+
437+
/** Returns the task offset interval in milliseconds, or -1 if not yet set. */
438+
public int taskOffsetIntervalMs() {
439+
return taskOffsetIntervalMs.get();
440+
}
441+
430442
}

core/src/test/scala/unit/kafka/server/KafkaApisTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern,
7676
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
7777
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
7878
import org.apache.kafka.common.utils.{ImplicitLinkedHashCollection, ProducerIdAndEpoch, SecurityUtils, Utils}
79-
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG}
79+
import org.apache.kafka.coordinator.group.GroupConfig.{CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG, CONSUMER_SESSION_TIMEOUT_MS_CONFIG, SHARE_AUTO_OFFSET_RESET_CONFIG, SHARE_DELIVERY_COUNT_LIMIT_CONFIG, SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, SHARE_ISOLATION_LEVEL_CONFIG, SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG, SHARE_RECORD_LOCK_DURATION_MS_CONFIG, SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG, SHARE_SESSION_TIMEOUT_MS_CONFIG, STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, STREAMS_NUM_STANDBY_REPLICAS_CONFIG, STREAMS_SESSION_TIMEOUT_MS_CONFIG, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG}
8080
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
8181
import org.apache.kafka.coordinator.group.{GroupConfig, GroupConfigManager, GroupCoordinator, GroupCoordinatorConfig}
8282
import org.apache.kafka.coordinator.group.streams.StreamsGroupHeartbeatResult
@@ -368,6 +368,7 @@ class KafkaApisTest extends Logging {
368368
cgConfigs.put(STREAMS_SESSION_TIMEOUT_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_DEFAULT.toString)
369369
cgConfigs.put(STREAMS_NUM_STANDBY_REPLICAS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT.toString)
370370
cgConfigs.put(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT.toString)
371+
cgConfigs.put(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG, GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT.toString)
371372

372373
when(configRepository.groupConfig(consumerGroupId)).thenReturn(cgConfigs)
373374

core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,6 +1086,9 @@ class KafkaConfigTest {
10861086
case GroupCoordinatorConfig.STREAMS_GROUP_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
10871087
case GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
10881088
case GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
1089+
case GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
1090+
case GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
1091+
case GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -1)
10891092

10901093
/** Share coordinator configs */
10911094
case ShareCoordinatorConfig.APPEND_LINGER_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", -2, -0.5)

core/src/test/scala/unit/kafka/server/StreamsGroupHeartbeatRequestTest.scala

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import scala.jdk.CollectionConverters._
3535
serverProperties = Array(
3636
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
3737
new ClusterConfigProperty(key = GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1"),
38-
new ClusterConfigProperty(key = "group.streams.initial.rebalance.delay.ms", value = "0")
38+
new ClusterConfigProperty(key = GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, value = "0")
3939
)
4040
)
4141
class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCoordinatorBaseRequestTest(cluster) {
@@ -686,6 +686,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo
686686
assertEquals(0, streamsGroupHeartbeatResponse1.standbyTasks().size(), "Member 1 should have no standby tasks initially")
687687
assertEquals(0, streamsGroupHeartbeatResponse2.standbyTasks().size(), "Member 2 should have no standby tasks initially")
688688

689+
// Verify both members picked up `task.offset.interval.ms`
690+
assertEquals(60_000, streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup task.offset.interval.ms initially")
691+
assertEquals(60_000, streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup task.offset.interval.ms initially")
692+
689693
// Both members continue to send heartbeats with their assigned tasks
690694
TestUtils.waitUntilTrue(() => {
691695
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -736,6 +740,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo
736740
}
737741
assertEquals(0, member2StandbyTasksSize, "Member 2 should have no standby tasks in this configuration")
738742

743+
// Verify both members picked up `task.offset.interval.ms`
744+
assertEquals(60_000, streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup task.offset.interval.ms initially")
745+
assertEquals(60_000, streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup task.offset.interval.ms initially")
746+
739747
// Change streams.num.standby.replicas = 1
740748
val groupConfigResource = new ConfigResource(ConfigResource.Type.GROUP, groupId)
741749
val alterConfigOp = new AlterConfigOp(
@@ -746,6 +754,16 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo
746754
val options = new org.apache.kafka.clients.admin.AlterConfigsOptions()
747755
admin.incrementalAlterConfigs(configChanges, options).all().get()
748756

757+
// Change streams.task.offset.interval.ms = 45000
758+
val groupConfigResource2 = new ConfigResource(ConfigResource.Type.GROUP, groupId)
759+
val alterConfigOp2 = new AlterConfigOp(
760+
new ConfigEntry("streams.task.offset.interval.ms", "45000"),
761+
AlterConfigOp.OpType.SET
762+
)
763+
val configChanges2 = Map(groupConfigResource2 -> List(alterConfigOp2).asJavaCollection).asJava
764+
val options2 = new org.apache.kafka.clients.admin.AlterConfigsOptions()
765+
admin.incrementalAlterConfigs(configChanges2, options2).all().get()
766+
749767
// Send heartbeats to trigger rebalance after config change
750768
TestUtils.waitUntilTrue(() => {
751769
streamsGroupHeartbeatResponse1 = streamsGroupHeartbeat(
@@ -790,6 +808,10 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo
790808
val totalStandbyTasks = member1StandbyTasksNum + member2StandbyTasksNum
791809
assertEquals(totalActiveTasks, totalStandbyTasks, "Each active task should have one standby task")
792810

811+
// Verify both members picked up change of `task.offset.interval.ms`
812+
assertEquals(45_000, streamsGroupHeartbeatResponse1.taskOffsetIntervalMs(), "Member 1 should pickup task.offset.interval.ms initially")
813+
assertEquals(45_000, streamsGroupHeartbeatResponse2.taskOffsetIntervalMs(), "Member 2 should pickup task.offset.interval.ms initially")
814+
793815
} finally {
794816
admin.close()
795817
}
@@ -1074,6 +1096,7 @@ class StreamsGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupCo
10741096
.setActiveTasks(expectedActiveTasks)
10751097
.setStandbyTasks(List.empty.asJava)
10761098
.setWarmupTasks(List.empty.asJava)
1099+
.setTaskOffsetIntervalMs(60_000)
10771100

10781101
assertEquals(expectedRejoinResponse, rejoinResponse)
10791102
} finally {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ public final class GroupConfig extends AbstractConfig {
9292

9393
public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = "streams.initial.rebalance.delay.ms";
9494

95+
public static final String STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG = "streams.task.offset.interval.ms";
96+
9597
public final int consumerSessionTimeoutMs;
9698

9799
public final int consumerHeartbeatIntervalMs;
@@ -116,6 +118,8 @@ public final class GroupConfig extends AbstractConfig {
116118

117119
public final int streamsInitialRebalanceDelayMs;
118120

121+
public final int streamsTaskOffsetIntervalMs;
122+
119123
public final String shareIsolationLevel;
120124

121125
public final boolean shareRenewAcknowledgeEnable;
@@ -203,7 +207,13 @@ public final class GroupConfig extends AbstractConfig {
203207
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
204208
atLeast(0),
205209
MEDIUM,
206-
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);
210+
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC)
211+
.define(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
212+
INT,
213+
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DEFAULT,
214+
atLeast(1),
215+
MEDIUM,
216+
GroupCoordinatorConfig.STREAMS_GROUP_TASK_OFFSET_INTERVAL_MS_DOC);
207217

208218
public GroupConfig(Map<?, ?> props) {
209219
super(CONFIG, props, false);
@@ -219,6 +229,7 @@ public GroupConfig(Map<?, ?> props) {
219229
this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
220230
this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
221231
this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
232+
this.streamsTaskOffsetIntervalMs = getInt(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
222233
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
223234
this.shareRenewAcknowledgeEnable = getBoolean(SHARE_RENEW_ACKNOWLEDGE_ENABLE_CONFIG);
224235
}
@@ -262,6 +273,7 @@ private static void validateValues(Map<String, ?> valueMaps, GroupCoordinatorCon
262273
int streamsSessionTimeoutMs = (Integer) valueMaps.get(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
263274
int streamsHeartbeatIntervalMs = (Integer) valueMaps.get(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
264275
int streamsNumStandbyReplicas = (Integer) valueMaps.get(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
276+
int streamsTaskOffsetIntervalMs = (Integer) valueMaps.get(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG);
265277
if (consumerHeartbeatInterval < groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs()) {
266278
throw new InvalidConfigurationException(CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
267279
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -338,6 +350,10 @@ private static void validateValues(Map<String, ?> valueMaps, GroupCoordinatorCon
338350
throw new InvalidConfigurationException(STREAMS_NUM_STANDBY_REPLICAS_CONFIG + " must be less than or equal to " +
339351
GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
340352
}
353+
if (streamsTaskOffsetIntervalMs < groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()) {
354+
throw new InvalidConfigurationException(STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG + " must be greater than or equal to " +
355+
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG);
356+
}
341357
if (consumerSessionTimeout <= consumerHeartbeatInterval) {
342358
throw new InvalidConfigurationException(CONSUMER_SESSION_TIMEOUT_MS_CONFIG + " must be greater than " +
343359
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG);
@@ -429,6 +445,8 @@ private static void evaluateValues(
429445
groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs());
430446
clampToMax(props, groupId, STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
431447
groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas());
448+
clampToMin(props, groupId, STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
449+
groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs());
432450

433451
// Verify that clamping did not break the session > heartbeat invariant.
434452
checkSessionExceedsHeartbeat(props, groupId,
@@ -529,6 +547,33 @@ private static void clampToMax(
529547
}
530548
}
531549

550+
/**
551+
* Clamp a config value to at least min. A WARN log is emitted on adjustment.
552+
* No-op when the key is absent from props.
553+
*
554+
* @param props The properties to modify in place.
555+
* @param groupId The group id.
556+
* @param key The config key.
557+
* @param min The minimum allowed value (inclusive).
558+
*/
559+
private static void clampToMin(
560+
Properties props,
561+
String groupId,
562+
String key,
563+
int min
564+
) {
565+
Object rawValue = props.get(key);
566+
if (rawValue == null) return;
567+
568+
int value = Integer.parseInt(rawValue.toString());
569+
if (value < min) {
570+
log.warn("The group config '{}' for group '{}' has value {} which is below the broker's " +
571+
"allowed minimum {}. The effective value will be capped to {}.",
572+
key, groupId, value, min, min);
573+
props.put(key, min);
574+
}
575+
}
576+
532577
/**
533578
* Create a group config instance using the given properties and defaults.
534579
*/
@@ -637,6 +682,13 @@ public int streamsInitialRebalanceDelayMs() {
637682
return streamsInitialRebalanceDelayMs;
638683
}
639684

685+
/**
686+
* The task offset reporting interval.
687+
*/
688+
public int streamsTaskOffsetIntervalMs() {
689+
return streamsTaskOffsetIntervalMs;
690+
}
691+
640692
/**
641693
* The share group isolation level.
642694
*/

0 commit comments

Comments
 (0)