Skip to content

MINOR: Change Streams group to streams group #19813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
Original file line number Diff line number Diff line change
Expand Up @@ -951,24 +951,24 @@ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(Map<String, List
}

/**
* List the Streams group offsets available in the cluster for the specified Streams groups.
* List the streams group offsets available in the cluster for the specified streams groups.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#listConsumerGroupOffsets} does.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @param groupSpecs Map of streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
*
* @param options The options to use when listing the Streams group offsets.
* @param options The options to use when listing the streams group offsets.
* @return The ListStreamsGroupOffsetsResult
*/
ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs, ListStreamsGroupOffsetsOptions options);

/**
* List the Streams group offsets available in the cluster for the specified groups with the default options.
* List the streams group offsets available in the cluster for the specified groups with the default options.
* <p>
* This is a convenience method for
* {@link #listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)} with default options.
*
* @param groupSpecs Map of Streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @param groupSpecs Map of streams group ids to a spec that specifies the topic partitions of the group to list offsets for.
* @return The ListStreamsGroupOffsetsResult.
*/
default ListStreamsGroupOffsetsResult listStreamsGroupOffsets(Map<String, ListStreamsGroupOffsetsSpec> groupSpecs) {
Expand All @@ -993,17 +993,17 @@ default DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> group
}

/**
* Delete Streams groups from the cluster.
* Delete streams groups from the cluster.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroups} does.
*
* @param options The options to use when deleting a Streams group.
* @param options The options to use when deleting a streams group.
* @return The DeleteStreamsGroupsResult.
*/
DeleteStreamsGroupsResult deleteStreamsGroups(Collection<String> groupIds, DeleteStreamsGroupsOptions options);

/**
* Delete Streams groups from the cluster with the default options.
* Delete streams groups from the cluster with the default options.
*
* @return The DeleteStreamsGroupResult.
*/
Expand Down Expand Up @@ -1035,21 +1035,21 @@ default DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsets(String group
}

/**
* Delete committed offsets for a set of partitions in a Streams group. This will
* Delete committed offsets for a set of partitions in a streams group. This will
* succeed at the partition level only if the group is not actively subscribed
* to the corresponding topic.
*
* <em>Note</em>: this method effectively does the same as the corresponding consumer group method {@link Admin#deleteConsumerGroupOffsets} does.
*
* @param options The options to use when deleting offsets in a Streams group.
* @param options The options to use when deleting offsets in a streams group.
* @return The DeleteStreamsGroupOffsetsResult.
*/
DeleteStreamsGroupOffsetsResult deleteStreamsGroupOffsets(String groupId,
Set<TopicPartition> partitions,
DeleteStreamsGroupOffsetsOptions options);

/**
* Delete committed offsets for a set of partitions in a Streams group with the default
* Delete committed offsets for a set of partitions in a streams group with the default
* options. This will succeed at the partition level only if the group is not actively
* subscribed to the corresponding topic.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Collection;

/**
* Specification of Streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
* Specification of streams group offsets to list using {@link Admin#listStreamsGroupOffsets(Map, ListStreamsGroupOffsetsOptions)}.
* <p>
* The API of this class is evolving, see {@link Admin} for details.
*/
Expand All @@ -33,15 +33,15 @@ public class ListStreamsGroupOffsetsSpec {
private Collection<TopicPartition> topicPartitions;

/**
* Set the topic partitions whose offsets are to be listed for a Streams group.
* Set the topic partitions whose offsets are to be listed for a streams group.
*/
ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}

/**
* Returns the topic partitions whose offsets are to be listed for a Streams group.
* Returns the topic partitions whose offsets are to be listed for a streams group.
*/
Collection<TopicPartition> topicPartitions() {
return topicPartitions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult.EMPTY;

/**
* <p>Manages the request creation and response handling for the Streams group heartbeat. The class creates a
* <p>Manages the request creation and response handling for the streams group heartbeat. The class creates a
* heartbeat request using the state stored in the membership manager. The requests can be retrieved
* by calling {@link StreamsGroupHeartbeatRequestManager#poll(long)}. Once the response is received, it updates the
* state in the membership manager and handles any errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public int hashCode() {
private MemberState state;

/**
* Group ID of the Streams group the member will be part of, provided when creating the current
* Group ID of the streams group the member will be part of, provided when creating the current
* membership manager.
*/
private final String groupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Set;

/**
* Listener for handling Streams group rebalance events in Kafka Streams.
* Listener for handling streams group rebalance events in Kafka Streams.
*/
public interface StreamsRebalanceListener {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ private void process(final LeaveGroupOnCloseEvent event) {
CompletableFuture<Void> future = requestManagers.consumerMembershipManager.get().leaveGroupOnClose(event.membershipOperation());
future.whenComplete(complete(event.future()));
} else if (requestManagers.streamsMembershipManager.isPresent()) {
log.debug("Signal the StreamsMembershipManager to leave the Streams group since the member is closing");
log.debug("Signal the StreamsMembershipManager to leave the streams group since the member is closing");
CompletableFuture<Void> future = requestManagers.streamsMembershipManager.get().leaveGroupOnClose();
future.whenComplete(complete(event.future()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1643,7 +1643,7 @@ private void throwIfStreamsGroupIsFull(
/**
* Validates the member epoch provided in the heartbeat request.
*
* @param member The Streams group member.
* @param member The streams group member.
* @param receivedMemberEpoch The member epoch.
* @param ownedActiveTasks The owned active tasks.
* @param ownedStandbyTasks The owned standby tasks.
Expand Down Expand Up @@ -1798,7 +1798,7 @@ private static ConsumerProtocolSubscription deserializeSubscription(
}

/**
* Handles a regular heartbeat from a Streams group member.
* Handles a regular heartbeat from a streams group member.
* It mainly consists of five parts:
* 1) Create or update the member.
* The group epoch is bumped if the member has been created or updated.
Expand Down Expand Up @@ -5248,8 +5248,8 @@ public void replay(

/**
* Replays StreamsGroupMetadataKey/Value to update the hard state of
* the Streams group. It updates the group epoch of the Streams
* group or deletes the Streams group.
* the streams group. It updates the group epoch of the Streams
* group or deletes the streams group.
*
* @param key A StreamsGroupMetadataKey key.
* @param value A StreamsGroupMetadataValue record.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import java.util.stream.Collectors;

/**
* Contains all information related to a member within a Streams group.
* Contains all information related to a member within a streams group.
* <p>
* This class is immutable and is fully backed by records stored in the __consumer_offsets topic.
*
Expand Down Expand Up @@ -322,7 +322,7 @@ public boolean isReconciledTo(int targetAssignmentEpoch) {
}

/**
* Creates a member description for the Streams group describe response from this member.
* Creates a member description for the streams group describe response from this member.
*
* @param targetAssignment The target assignment of this member in the corresponding group.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Set;

/**
* The assignment specification for a Streams group member.
* The assignment specification for a streams group member.
*
* @param instanceId The instance ID if provided.
* @param rackId The rack ID if provided.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.Objects;

/**
* The assignment specification for a Streams group.
* The assignment specification for a streams group.
*
* @param members The member metadata keyed by member ID.
* @param assignmentConfigs Any configurations passed to the assignor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Set;

/**
* The task assignment for a Streams group member.
* The task assignment for a streams group member.
*
* @param activeTasks The active tasks assigned to this member keyed by subtopologyId.
* @param standbyTasks The standby tasks assigned to this member keyed by subtopologyId.
Expand Down