Skip to content

KAFKA-19244: Add support for kafka-streams-groups.sh options (offset-related APIs) [1/N] #19646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class ListGroupsResult {
private final KafkaFutureImpl<Collection<GroupListing>> valid;
private final KafkaFutureImpl<Collection<Throwable>> errors;

ListGroupsResult(KafkaFuture<Collection<Object>> future) {
public ListGroupsResult(KafkaFuture<Collection<Object>> future) {
this.all = new KafkaFutureImpl<>();
this.valid = new KafkaFutureImpl<>();
this.errors = new KafkaFutureImpl<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,17 @@ public class ListStreamsGroupOffsetsSpec {
private Collection<TopicPartition> topicPartitions;

/**
* Set the topic partitions whose offsets are to be listed for a Streams group.
* Set the topic partitions whose offsets are to be listed for a streams group.
*/
ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
public ListStreamsGroupOffsetsSpec topicPartitions(Collection<TopicPartition> topicPartitions) {
this.topicPartitions = topicPartitions;
return this;
}

/**
* Returns the topic partitions whose offsets are to be listed for a Streams group.
* Returns the topic partitions whose offsets are to be listed for a streams group.
*/
Collection<TopicPartition> topicPartitions() {
public Collection<TopicPartition> topicPartitions() {
return topicPartitions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
public class CsvUtils {
private static final CsvMapper MAPPER = new CsvMapper();

static ObjectReader readerFor(Class<?> clazz) {
public static ObjectReader readerFor(Class<?> clazz) {
return MAPPER.readerFor(clazz).with(getSchema(clazz));
}

static ObjectWriter writerFor(Class<?> clazz) {
public static ObjectWriter writerFor(Class<?> clazz) {
return MAPPER.writerFor(clazz).with(getSchema(clazz));
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,25 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import joptsimple.OptionSpec;

import static org.apache.kafka.tools.ToolsUtils.minus;

public class StreamsGroupCommandOptions extends CommandDefaultOptions {
private static final String NL = System.lineSeparator();
public static final Logger LOGGER = LoggerFactory.getLogger(StreamsGroupCommandOptions.class);

public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to.";
public static final String GROUP_DOC = "The streams group we wish to act on.";
private static final String TOPIC_DOC = "The topic whose streams group information should be deleted or topic whose should be included in the reset offset process. " +
"In `reset-offsets` case, partitions can be specified using this format: `topic1:0,1,2`, where 0,1,2 are the partition to be included in the process. " +
"Reset-offsets also supports multiple topic inputs.";
private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a group in the `reset-offsets` process.";
public static final String LIST_DOC = "List all streams groups.";
public static final String DESCRIBE_DOC = "Describe streams group and list offset lag related to given group.";
public static final String TIMEOUT_MS_DOC = "The timeout that can be set for some use cases. For example, it can be used when describing the group " +
Expand All @@ -43,22 +52,55 @@ public class StreamsGroupCommandOptions extends CommandDefaultOptions {
public static final String MEMBERS_DOC = "Describe members of the group. This option may be used with the '--describe' option only.";
public static final String OFFSETS_DOC = "Describe the group and list all topic partitions in the group along with their offset information." +
"This is the default sub-action and may be used with the '--describe' option only.";
private static final String RESET_OFFSETS_DOC = "Reset offsets of streams group. The instances should be inactive" + NL +
"Has 2 execution options: --dry-run (the default) to plan which offsets to reset, and --execute to update the offsets." + NL +
"You must choose one of the following reset specifications: --to-datetime, --by-duration, --to-earliest, " +
"--to-latest, --shift-by, --from-file, --to-current, --to-offset." + NL +
"To define the scope use --all-topics or --topic. One scope must be specified unless you use '--from-file'.";
private static final String DRY_RUN_DOC = "Only show results without executing changes on streams group. Supported operations: reset-offsets.";
private static final String EXECUTE_DOC = "Execute operation. Supported operations: reset-offsets.";
private static final String EXPORT_DOC = "Export operation execution to a CSV file. Supported operations: reset-offsets.";
private static final String RESET_TO_OFFSET_DOC = "Reset offsets to a specific offset.";
private static final String RESET_FROM_FILE_DOC = "Reset offsets to values defined in CSV file.";
private static final String RESET_TO_DATETIME_DOC = "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDThh:mm:ss.sss'";
private static final String RESET_BY_DURATION_DOC = "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'";
private static final String RESET_TO_EARLIEST_DOC = "Reset offsets to earliest offset.";
private static final String RESET_TO_LATEST_DOC = "Reset offsets to latest offset.";
private static final String RESET_TO_CURRENT_DOC = "Reset offsets to current offset.";
private static final String RESET_SHIFT_BY_DOC = "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative.";
public static final String VERBOSE_DOC = """
Use with --describe --state to show group epoch and target assignment epoch.
Use with --describe --members to show for each member the member epoch, target assignment epoch, current assignment, target assignment, and whether member is still using the classic rebalance protocol.
Use with --describe --offsets and --describe to show leader epochs for each partition.""";

public final OptionSpec<String> bootstrapServerOpt;
public final OptionSpec<String> groupOpt;
final OptionSpec<String> topicOpt;
final OptionSpec<Void> allTopicsOpt;
public final OptionSpec<Void> listOpt;
public final OptionSpec<Void> describeOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<String> stateOpt;
public final OptionSpec<Void> membersOpt;
public final OptionSpec<Void> offsetsOpt;
public final OptionSpec<Void> resetOffsetsOpt;
public final OptionSpec<Long> resetToOffsetOpt;
public final OptionSpec<String> resetFromFileOpt;
public final OptionSpec<String> resetToDatetimeOpt;
public final OptionSpec<String> resetByDurationOpt;
public final OptionSpec<Void> resetToEarliestOpt;
public final OptionSpec<Void> resetToLatestOpt;
public final OptionSpec<Void> resetToCurrentOpt;
public final OptionSpec<Long> resetShiftByOpt;
public final OptionSpec<Void> dryRunOpt;
public final OptionSpec<Void> executeOpt;
public final OptionSpec<Void> exportOpt;
public final OptionSpec<Void> verboseOpt;

final Set<OptionSpec<?>> allResetOffsetScenarioOpts;


public static StreamsGroupCommandOptions fromArgs(String[] args) {
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
opts.checkArgs();
Expand All @@ -76,6 +118,11 @@ public StreamsGroupCommandOptions(String[] args) {
.withRequiredArg()
.describedAs("streams group")
.ofType(String.class);
topicOpt = parser.accepts("topic", TOPIC_DOC)
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
allTopicsOpt = parser.accepts("all-topics", ALL_TOPICS_DOC);
listOpt = parser.accepts("list", LIST_DOC);
describeOpt = parser.accepts("describe", DESCRIBE_DOC);
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
Expand All @@ -88,6 +135,7 @@ public StreamsGroupCommandOptions(String[] args) {
.withRequiredArg()
.describedAs("command config property file")
.ofType(String.class);

stateOpt = parser.accepts("state", STATE_DOC)
.availableIf(listOpt, describeOpt)
.withOptionalArg()
Expand All @@ -96,10 +144,40 @@ public StreamsGroupCommandOptions(String[] args) {
.availableIf(describeOpt);
offsetsOpt = parser.accepts("offsets", OFFSETS_DOC)
.availableIf(describeOpt);
resetOffsetsOpt = parser.accepts("reset-offsets", RESET_OFFSETS_DOC);
resetToOffsetOpt = parser.accepts("to-offset", RESET_TO_OFFSET_DOC)
.withRequiredArg()
.describedAs("offset")
.ofType(Long.class);
resetFromFileOpt = parser.accepts("from-file", RESET_FROM_FILE_DOC)
.withRequiredArg()
.describedAs("path to CSV file")
.ofType(String.class);
resetToDatetimeOpt = parser.accepts("to-datetime", RESET_TO_DATETIME_DOC)
.withRequiredArg()
.describedAs("datetime")
.ofType(String.class);
resetByDurationOpt = parser.accepts("by-duration", RESET_BY_DURATION_DOC)
.withRequiredArg()
.describedAs("duration")
.ofType(String.class);
resetToEarliestOpt = parser.accepts("to-earliest", RESET_TO_EARLIEST_DOC);
resetToLatestOpt = parser.accepts("to-latest", RESET_TO_LATEST_DOC);
resetToCurrentOpt = parser.accepts("to-current", RESET_TO_CURRENT_DOC);
resetShiftByOpt = parser.accepts("shift-by", RESET_SHIFT_BY_DOC)
.withRequiredArg()
.describedAs("number-of-offsets")
.ofType(Long.class);

verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
.availableIf(describeOpt);

dryRunOpt = parser.accepts("dry-run", DRY_RUN_DOC);
executeOpt = parser.accepts("execute", EXECUTE_DOC);
exportOpt = parser.accepts("export", EXPORT_DOC);
options = parser.parse(args);

allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToOffsetOpt, resetShiftByOpt,
resetToDatetimeOpt, resetByDurationOpt, resetToEarliestOpt, resetToLatestOpt, resetToCurrentOpt, resetFromFileOpt));
}

public void checkArgs() {
Expand All @@ -121,6 +199,27 @@ public void checkArgs() {
LOGGER.debug("Option " + timeoutMsOpt + " is applicable only when " + describeOpt + " is used.");
}

if (options.has(resetOffsetsOpt)) {
if (options.has(dryRunOpt) && options.has(executeOpt))
CommandLineUtils.printUsageAndExit(parser, "Option " + resetOffsetsOpt + " only accepts one of " + executeOpt + " and " + dryRunOpt);

if (!options.has(dryRunOpt) && !options.has(executeOpt)) {
System.err.println("WARN: No action will be performed as the --execute option is missing. " +
"In a future major release, the default behavior of this command will be to prompt the user before " +
"executing the reset rather than doing a dry run. You should add the --dry-run option explicitly " +
"if you are scripting this command and want to keep the current default behavior without prompting.");
}

CommandLineUtils.checkInvalidArgs(parser, options, resetToOffsetOpt, minus(allResetOffsetScenarioOpts, resetToOffsetOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToDatetimeOpt, minus(allResetOffsetScenarioOpts, resetToDatetimeOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetByDurationOpt, minus(allResetOffsetScenarioOpts, resetByDurationOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToEarliestOpt, minus(allResetOffsetScenarioOpts, resetToEarliestOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToLatestOpt, minus(allResetOffsetScenarioOpts, resetToLatestOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetToCurrentOpt, minus(allResetOffsetScenarioOpts, resetToCurrentOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetShiftByOpt, minus(allResetOffsetScenarioOpts, resetShiftByOpt));
CommandLineUtils.checkInvalidArgs(parser, options, resetFromFileOpt, minus(allResetOffsetScenarioOpts, resetFromFileOpt));
}

CommandLineUtils.checkInvalidArgs(parser, options, listOpt, membersOpt, offsetsOpt);
}
}
Loading