Skip to content

feat: For the performance test shell script, add a prefix that can reuse the created topic #2401

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/automq-perf-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ fi
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1024M"
fi

exec "$(dirname "$0")/kafka-run-class.sh" -name kafkaClient -loggc org.apache.kafka.tools.automq.PerfCommand "$@"
3 changes: 3 additions & 0 deletions s3stream/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
tasks.withType(JavaCompile) {
options.compilerArgs += ["-Xlint:-this-escape"]
}
Comment on lines +1 to +3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disabling -Xlint:this-escape globally is not recommended, as it suppresses critical warnings about unsafe object initialization (e.g., this escaping constructors before full initialization). This could hide concurrency risks.

Instead, consider using @SuppressWarnings("this-escape") locally if unavoidable, but prioritize fixing the root cause for safer code.

Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,28 @@ private PerfCommand(PerfConfig config) {
private void run() {
LOGGER.info("Starting perf test with config: {}", jsonStringify(config));
TimerUtil timer = new TimerUtil();

if (config.reset) {
LOGGER.info("Deleting all test topics...");
int deleted = topicService.deleteTopics();
LOGGER.info("Deleted all test topics ({} in total), took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

// Modified topic initialization logic
List<Topic> topics;
if (config.catchupTopicPrefix != null && !config.catchupTopicPrefix.isEmpty()) {
LOGGER.info("Listing existing topics with prefix {}...", config.catchupTopicPrefix);
topics = topicService.listTopicsByPrefix(config.catchupTopicPrefix);
LOGGER.info("Found {} existing topics for catchup test", topics.size());

if (topics.isEmpty()) {
throw new RuntimeException("No topics found with prefix: " + config.catchupTopicPrefix);
}
} else {
if (config.reset) {
LOGGER.info("Deleting all test topics...");
int deleted = topicService.deleteTopics();
LOGGER.info("Deleted {} test topics, took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}
LOGGER.info("Creating {} new topics...", config.topics);
topics = topicService.createTopics(config.topicsConfig());
LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
}

LOGGER.info("Creating topics...");
List<Topic> topics = topicService.createTopics(config.topicsConfig());
LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

LOGGER.info("Creating consumers...");
int consumers = consumerService.createConsumers(topics, config.consumersConfig());
consumerService.start(this::messageReceived, config.maxConsumeRecordRate);
Expand All @@ -117,8 +128,15 @@ private void run() {
waitTopicsReady(consumerService.consumerCount() > 0);
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

// Modified producer start logic
Function<String, List<byte[]>> payloads = payloads(config, topics);
producerService.start(payloads, config.sendRate);
if (config.catchupTopicPrefix != null) {
Copy link
Preview

Copilot AI Apr 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Consider checking if config.catchupTopicPrefix is not only non-null but also non-empty, to maintain consistency with the validation logic in PerfConfig.java.

Suggested change
if (config.catchupTopicPrefix != null) {
if (!Strings.isNullOrEmpty(config.catchupTopicPrefix)) {

Copilot uses AI. Check for mistakes.

LOGGER.info("Starting catchup test with existing topics");
producerService.start(payloads, config.sendRate);
} else {
LOGGER.info("Starting normal test with new topics");
producerService.start(payloads, config.sendRate);
}
Comment on lines +131 to +139
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this code modification?


preparing = false;

Expand All @@ -132,7 +150,7 @@ private void run() {
}

Result result;
if (config.backlogDurationSeconds > 0) {
if (config.catchupTopicPrefix == null && config.backlogDurationSeconds > 0) {
LOGGER.info("Pausing consumers for {} seconds to build up backlog...", config.backlogDurationSeconds);
consumerService.pause();
long backlogStart = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class PerfConfig {
public final int reportingIntervalSeconds;
public final String valueSchema;
public final String valuesFile;
public final String catchupTopicPrefix;

public PerfConfig(String[] args) {
ArgumentParser parser = parser();
Expand Down Expand Up @@ -105,7 +106,16 @@ public PerfConfig(String[] args) {
reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds");
valueSchema = ns.getString("valueSchema");
valuesFile = ns.get("valuesFile");
catchupTopicPrefix = ns.getString("catchupTopicPrefix");

if (catchupTopicPrefix != null && !catchupTopicPrefix.isEmpty()) {
if (reset) {
throw new IllegalArgumentException(
"Cannot use --reset with --catchup-topic-prefix"
);
}
}

if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) {
throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)",
backlogDurationSeconds, groupsPerTopic, groupStartDelaySeconds));
Expand Down Expand Up @@ -260,6 +270,12 @@ public static ArgumentParser parser() {
.dest("valuesFile")
.metavar("VALUES_FILE")
.help("The avro value file. Example file content {\"f1\": \"value1\"}");
parser.addArgument("--catchup-topic-prefix")
.type(String.class)
.dest("catchupTopicPrefix")
.metavar("CATCHUP_TOPIC_PREFIX")
.help("Prefix of existing topics to reuse for catchup testing");

return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.kafka.clients.admin.TopicDescription;

public class TopicService implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(TopicService.class);
Expand Down Expand Up @@ -78,7 +80,30 @@ public List<Topic> createTopics(TopicsConfig config) {
.map(name -> new Topic(name, config.partitionsPerTopic))
.collect(Collectors.toList());
}

/**
* List all performance test topics.
*/
public List<Topic> listTopicsByPrefix(String prefix) {
try {
// Automatically add the unified prefix for performance test topics
String fullPrefix = COMMON_TOPIC_PREFIX + prefix;

List<String> topicNames = admin.listTopics().names().get()
.stream()
// Fix filter condition: use full prefix matching
.filter(name -> name.startsWith(fullPrefix))
.collect(Collectors.toList());

// Fix deprecated all() method invocation
Map<String, TopicDescription> descriptions = admin.describeTopics(topicNames).allTopicNames().get();

return descriptions.values().stream()
.map(desc -> new Topic(desc.name(), desc.partitions().size()))
.collect(Collectors.toList());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to list topics with prefix: " + prefix, e);
}
}
/**
* Delete all historical performance test topics.
*/
Expand Down
Loading