diff --git a/bin/automq-perf-test.sh b/bin/automq-perf-test.sh index dd065326fe..98c6b3d0d8 100755 --- a/bin/automq-perf-test.sh +++ b/bin/automq-perf-test.sh @@ -23,4 +23,5 @@ fi if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1024M" fi + exec "$(dirname "$0")/kafka-run-class.sh" -name kafkaClient -loggc org.apache.kafka.tools.automq.PerfCommand "$@" diff --git a/s3stream/build.gradle b/s3stream/build.gradle new file mode 100644 index 0000000000..f49f3c654a --- /dev/null +++ b/s3stream/build.gradle @@ -0,0 +1,3 @@ +tasks.withType(JavaCompile) { + options.compilerArgs += ["-Xlint:-this-escape"] +} \ No newline at end of file diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java index 4be64f5aa6..049516bc2f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java @@ -93,17 +93,28 @@ private PerfCommand(PerfConfig config) { private void run() { LOGGER.info("Starting perf test with config: {}", jsonStringify(config)); TimerUtil timer = new TimerUtil(); - - if (config.reset) { - LOGGER.info("Deleting all test topics..."); - int deleted = topicService.deleteTopics(); - LOGGER.info("Deleted all test topics ({} in total), took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); + + // Modified topic initialization logic + List topics; + if (config.catchupTopicPrefix != null && !config.catchupTopicPrefix.isEmpty()) { + LOGGER.info("Listing existing topics with prefix {}...", config.catchupTopicPrefix); + topics = topicService.listTopicsByPrefix(config.catchupTopicPrefix); + LOGGER.info("Found {} existing topics for catchup test", topics.size()); + + if (topics.isEmpty()) { + throw new RuntimeException("No topics found with prefix: " + config.catchupTopicPrefix); + } + } else { + if (config.reset) { + LOGGER.info("Deleting all test topics..."); + int deleted = topicService.deleteTopics(); + LOGGER.info("Deleted {} test topics, took {} ms", deleted, timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); + } + LOGGER.info("Creating {} new topics...", config.topics); + topics = topicService.createTopics(config.topicsConfig()); + LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); } - LOGGER.info("Creating topics..."); - List topics = topicService.createTopics(config.topicsConfig()); - LOGGER.info("Created {} topics, took {} ms", topics.size(), timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); - LOGGER.info("Creating consumers..."); int consumers = consumerService.createConsumers(topics, config.consumersConfig()); consumerService.start(this::messageReceived, config.maxConsumeRecordRate); @@ -117,8 +128,15 @@ private void run() { waitTopicsReady(consumerService.consumerCount() > 0); LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS)); + // Modified producer start logic Function> payloads = payloads(config, topics); - producerService.start(payloads, config.sendRate); + if (config.catchupTopicPrefix != null) { + LOGGER.info("Starting catchup test with existing topics"); + producerService.start(payloads, config.sendRate); + } else { + LOGGER.info("Starting normal test with new topics"); + producerService.start(payloads, config.sendRate); + } preparing = false; @@ -132,7 +150,7 @@ private void run() { } Result result; - if (config.backlogDurationSeconds > 0) { + if (config.catchupTopicPrefix == null && config.backlogDurationSeconds > 0) { LOGGER.info("Pausing consumers for {} seconds to build up backlog...", config.backlogDurationSeconds); consumerService.pause(); long backlogStart = System.currentTimeMillis(); diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java index 9f7db65d0e..a013098abf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java @@ -65,6 +65,7 @@ public class PerfConfig { public final int reportingIntervalSeconds; public final String valueSchema; public final String valuesFile; + public final String catchupTopicPrefix; public PerfConfig(String[] args) { ArgumentParser parser = parser(); @@ -105,7 +106,16 @@ public PerfConfig(String[] args) { reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds"); valueSchema = ns.getString("valueSchema"); valuesFile = ns.get("valuesFile"); + catchupTopicPrefix = ns.getString("catchupTopicPrefix"); + if (catchupTopicPrefix != null && !catchupTopicPrefix.isEmpty()) { + if (reset) { + throw new IllegalArgumentException( + "Cannot use --reset with --catchup-topic-prefix" + ); + } + } + if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) { throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)", backlogDurationSeconds, groupsPerTopic, groupStartDelaySeconds)); @@ -260,6 +270,12 @@ public static ArgumentParser parser() { .dest("valuesFile") .metavar("VALUES_FILE") .help("The avro value file. Example file content {\"f1\": \"value1\"}"); + parser.addArgument("--catchup-topic-prefix") + .type(String.class) + .dest("catchupTopicPrefix") + .metavar("CATCHUP_TOPIC_PREFIX") + .help("Prefix of existing topics to reuse for catchup testing"); + return parser; } diff --git a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java index f67626cfdb..46c6c8c169 100644 --- a/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java +++ b/tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java @@ -33,6 +33,8 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.kafka.clients.admin.TopicDescription; + public class TopicService implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(TopicService.class); @@ -78,7 +80,30 @@ public List createTopics(TopicsConfig config) { .map(name -> new Topic(name, config.partitionsPerTopic)) .collect(Collectors.toList()); } - + /** + * List all performance test topics. + */ + public List listTopicsByPrefix(String prefix) { + try { + // Automatically add the unified prefix for performance test topics + String fullPrefix = COMMON_TOPIC_PREFIX + prefix; + + List topicNames = admin.listTopics().names().get() + .stream() + // Fix filter condition: use full prefix matching + .filter(name -> name.startsWith(fullPrefix)) + .collect(Collectors.toList()); + + // Fix deprecated all() method invocation + Map descriptions = admin.describeTopics(topicNames).allTopicNames().get(); + + return descriptions.values().stream() + .map(desc -> new Topic(desc.name(), desc.partitions().size())) + .collect(Collectors.toList()); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Failed to list topics with prefix: " + prefix, e); + } + } /** * Delete all historical performance test topics. */