From 4cfb89775b1a5a0ec729cc7752ef737aac7bcc85 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Mon, 16 Sep 2024 16:17:49 -0700 Subject: [PATCH 01/16] Refactored for trunk This commit should enable a test PR against the trunk --- .../kafka/tools/ProducerPerformance.java | 96 ++++++++++++++++++- .../kafka/tools/ProducerPerformanceTest.java | 71 ++++++++++++++ 2 files changed, 163 insertions(+), 4 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index c9317d8a24c65..0404a876cd592 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -75,9 +75,15 @@ void start(String[] args) throws IOException { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; - stats = new Stats(config.numRecords, 5000); + if (config.warmupRecords > 0) { + this.warmupStats = new Stats(config.warmupRecords, 5000); + //stats = new Stats(config.numRecords, 5000); + } long startMs = System.currentTimeMillis(); + // TODO: Keep this message? Maybe unnecessary + if (config.warmupRecords > 0) + System.out.println("Warmup first " + config.warmupRecords + " records. Steady-state results will print after the complete-test summary."); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); int currentTransactionSize = 0; @@ -94,8 +100,17 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - cb = new PerfCallback(sendStartMs, payload.length, stats); - producer.send(record, cb); + if (i < config.warmupRecords) { + cb = new PerfCallback(sendStartMs, payload.length, warmupStats); + producer.send(record, cb); + } else if (i == config.warmupRecords) { + stats = new Stats(config.numRecords - config.warmupRecords, 5000, config.warmupRecords); + cb = new PerfCallback(sendStartMs, payload.length, stats); + producer.send(record, cb); + } else { + cb = new PerfCallback(sendStartMs, payload.length, stats); + producer.send(record, cb); + } currentTransactionSize++; if (config.transactionsEnabled && config.transactionDurationMs <= (sendStartMs - transactionStartTime)) { @@ -114,6 +129,10 @@ record = new ProducerRecord<>(config.topicName, payload); if (!config.shouldPrintMetrics) { producer.close(); + /* print warmup stats if relevant */ + if (config.warmupRecords > 0) { + stats.printTotal(warmupStats); + } /* print final results */ stats.printTotal(); } else { @@ -122,6 +141,10 @@ record = new ProducerRecord<>(config.topicName, payload); // expects this class to work with older versions of the client jar that don't support flush(). producer.flush(); + /* print warmup stats if relevant */ + if (config.warmupRecords > 0) { + stats.printTotal(warmupStats); + } /* print final results */ stats.printTotal(); @@ -148,6 +171,7 @@ KafkaProducer createKafkaProducer(Properties props) { Callback cb; Stats stats; + Stats warmupStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) { @@ -326,6 +350,16 @@ static ArgumentParser argParser() { "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " + "the default value will be 3000."); + parser.addArgument("--warmup-records") + .action(store()) + .required(false) + .type(Long.class) + .metavar("WARMUP-RECORDS") + .dest("warmupRecords") + .setDefault(0L) + .help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " + + "An additional summary line will be printed describing the steady-state statistics. (default: 0)."); + return parser; } @@ -346,8 +380,13 @@ static class Stats { private long windowTotalLatency; private long windowBytes; private long windowStart; + private long warmupRecords; public Stats(long numRecords, int reportingInterval) { + this(numRecords, reportingInterval, 0); + } + + public Stats(long numRecords, int reportingInterval, long warmupRecords) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -361,6 +400,7 @@ public Stats(long numRecords, int reportingInterval) { this.windowBytes = 0; this.totalLatency = 0; this.reportingInterval = reportingInterval; + this.warmupRecords = warmupRecords; } public void record(int latency, int bytes, long time) { @@ -403,6 +443,18 @@ public int index() { return this.index; } + public int maxLatency() { + return this.maxLatency; + } + + public int[] getLatencies() { + return this.latencies; + } + + public long getTotalLatency() { + return this.totalLatency; + } + public void printWindow() { long elapsed = System.currentTimeMillis() - windowStart; double recsPerSec = 1000.0 * windowCount / (double) elapsed; @@ -428,8 +480,13 @@ public void printTotal() { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + String state = ""; + if (this.warmupRecords > 0) { + state = " steady state"; + } + System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, + state, recsPerSec, mbPerSec, totalLatency / (double) count, @@ -440,6 +497,32 @@ public void printTotal() { percs[3]); } + public void printTotal(Stats warmupStats) { + long overallElapsed = System.currentTimeMillis() - warmupStats.start; + long overallCount = count + warmupStats.totalCount(); + long overallBytes = bytes + warmupStats.bytes(); + double overallRecsPerSec = 1000.0 * overallCount / (double) overallElapsed; + double overallMbPerSec = 1000.0 * overallBytes / (double) overallElapsed / (1024.0 * 1024.0); + int overallMax = Math.max(maxLatency, warmupStats.maxLatency()); + long overallTotalLatency = totalLatency + warmupStats.getTotalLatency(); + + int totalElements = index + warmupStats.index(); + int[] overallLatencyArray = Arrays.copyOf(warmupStats.getLatencies(), totalElements); + System.arraycopy(this.getLatencies(), 0, overallLatencyArray, warmupStats.index(), this.index()); + + int[] percs = percentiles(overallLatencyArray, totalElements, 0.5, 0.95, 0.99, 0.999); + System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + overallCount, + overallRecsPerSec, + overallMbPerSec, + overallTotalLatency / (double) overallCount, + (double) overallMax, + percs[0], + percs[1], + percs[2], + percs[3]); + } + private static int[] percentiles(int[] latencies, int count, double... percentiles) { int size = Math.min(count, latencies.length); Arrays.sort(latencies, 0, size); @@ -480,6 +563,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { static final class ConfigPostProcessor { final String topicName; final long numRecords; + final long warmupRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; @@ -493,6 +577,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept Namespace namespace = parser.parseArgs(args); this.topicName = namespace.getString("topic"); this.numRecords = namespace.getLong("numRecords"); + this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0); this.recordSize = namespace.getInt("recordSize"); this.throughput = namespace.getDouble("throughput"); this.payloadMonotonic = namespace.getBoolean("payloadMonotonic"); @@ -503,6 +588,9 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); + if (warmupRecords >= numRecords) { + throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); + } if (producerConfigs == null && producerConfigFile == null) { throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 1757fa8606951..b3912cf9fa0e3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -472,4 +472,75 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString() .startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX)); } + + @Test + public void testWarmupRecordsFractionalValue() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "1.5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + } + + @Test + public void testWarmupRecordsString() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "foo", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + } + + @Test + public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "2", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(2, producerPerformanceSpy.warmupStats.totalCount()); + assertEquals(8, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } + + @Test + public void testWarmupNegativeRecordsNormalTest() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "-1", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } } From 125a246f9280664650a50117cbb30736f50e5444 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Wed, 18 Sep 2024 16:46:25 -0700 Subject: [PATCH 02/16] Refactor for readability Now using an overallStats object with new constructor to contin the combined stats at end of test and a single printTotal method. --- .../kafka/tools/ProducerPerformance.java | 82 ++++++++++++------- .../kafka/tools/ProducerPerformanceTest.java | 2 +- 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 0404a876cd592..17c609bf131be 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -76,14 +76,14 @@ void start(String[] args) throws IOException { SplittableRandom random = new SplittableRandom(0); ProducerRecord record; if (config.warmupRecords > 0) { + // TODO: Keep this message? Maybe unnecessary + System.out.println("Warmup first " + config.warmupRecords + " records. Steady-state results will print after the complete-test summary."); this.warmupStats = new Stats(config.warmupRecords, 5000); - //stats = new Stats(config.numRecords, 5000); + } else { + stats = new Stats(config.numRecords, 5000); } long startMs = System.currentTimeMillis(); - // TODO: Keep this message? Maybe unnecessary - if (config.warmupRecords > 0) - System.out.println("Warmup first " + config.warmupRecords + " records. Steady-state results will print after the complete-test summary."); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); int currentTransactionSize = 0; @@ -100,12 +100,13 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - if (i < config.warmupRecords) { - cb = new PerfCallback(sendStartMs, payload.length, warmupStats); - producer.send(record, cb); - } else if (i == config.warmupRecords) { - stats = new Stats(config.numRecords - config.warmupRecords, 5000, config.warmupRecords); - cb = new PerfCallback(sendStartMs, payload.length, stats); + if (warmupStats != null) { + if (i < config.warmupRecords) { + cb = new PerfCallback(sendStartMs, payload.length, warmupStats); + } else if (i == config.warmupRecords) { + stats = new Stats(config.numRecords - config.warmupRecords, 5000, config.warmupRecords); + cb = new PerfCallback(sendStartMs, payload.length, stats); + } producer.send(record, cb); } else { cb = new PerfCallback(sendStartMs, payload.length, stats); @@ -130,8 +131,9 @@ record = new ProducerRecord<>(config.topicName, payload); producer.close(); /* print warmup stats if relevant */ - if (config.warmupRecords > 0) { - stats.printTotal(warmupStats); + if (warmupStats != null) { + overallStats = new Stats(warmupStats, stats); + overallStats.printTotal(); } /* print final results */ stats.printTotal(); @@ -142,8 +144,9 @@ record = new ProducerRecord<>(config.topicName, payload); producer.flush(); /* print warmup stats if relevant */ - if (config.warmupRecords > 0) { - stats.printTotal(warmupStats); + if (warmupStats != null) { + Stats overallStats = new Stats(warmupStats, stats); + overallStats.printTotal(); } /* print final results */ stats.printTotal(); @@ -171,6 +174,7 @@ KafkaProducer createKafkaProducer(Properties props) { Callback cb; Stats stats; + Stats overallStats; Stats warmupStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, @@ -403,6 +407,25 @@ public Stats(long numRecords, int reportingInterval, long warmupRecords) { this.warmupRecords = warmupRecords; } + public Stats(Stats first, Stats second) { + // create a Stats object that's the combination of two disjoint Stats objects + this.start = Math.min(first.start, second.start); + this.iteration = first.iteration + second.iteration; + this.sampling = first.sampling; + this.latencies = Arrays.copyOf(first.latencies, first.index + second.index); + System.arraycopy(second.latencies, 0, this.latencies, first.index(), second.index()); + this.maxLatency = Math.max(first.maxLatency, second.maxLatency); + this.windowCount = first.windowCount + second.windowCount; + this.totalLatency = first.totalLatency + second.totalLatency; + this.reportingInterval = first.reportingInterval; + this.warmupRecords = 0; + this.count = first.count + second.count; + // unused vars, populating to prevent compiler errors: + //this.windowMaxLatency = 0; + //this.windowTotalLatency = 0; + //this.windowBytes = 0; + } + public void record(int latency, int bytes, long time) { this.count++; this.bytes += bytes; @@ -443,18 +466,6 @@ public int index() { return this.index; } - public int maxLatency() { - return this.maxLatency; - } - - public int[] getLatencies() { - return this.latencies; - } - - public long getTotalLatency() { - return this.totalLatency; - } - public void printWindow() { long elapsed = System.currentTimeMillis() - windowStart; double recsPerSec = 1000.0 * windowCount / (double) elapsed; @@ -497,18 +508,28 @@ public void printTotal() { percs[3]); } + /* + public void combineStats(Stats stats) { + this.count += stats.totalCount(); + this.bytes += stats.bytes; + this.totalLatency += stats.totalLatency; + this.latencies = Arrays.copyOf(this.latencies, index + stats.index); + System.arraycopy(stats.latencies, 0, this.latencies, this.index(), stats.index()); + this.index += stats.index; + } + public void printTotal(Stats warmupStats) { long overallElapsed = System.currentTimeMillis() - warmupStats.start; long overallCount = count + warmupStats.totalCount(); long overallBytes = bytes + warmupStats.bytes(); double overallRecsPerSec = 1000.0 * overallCount / (double) overallElapsed; double overallMbPerSec = 1000.0 * overallBytes / (double) overallElapsed / (1024.0 * 1024.0); - int overallMax = Math.max(maxLatency, warmupStats.maxLatency()); - long overallTotalLatency = totalLatency + warmupStats.getTotalLatency(); + int overallMax = Math.max(maxLatency, warmupStats.maxLatency); + long overallTotalLatency = totalLatency + warmupStats.totalLatency; int totalElements = index + warmupStats.index(); - int[] overallLatencyArray = Arrays.copyOf(warmupStats.getLatencies(), totalElements); - System.arraycopy(this.getLatencies(), 0, overallLatencyArray, warmupStats.index(), this.index()); + int[] overallLatencyArray = Arrays.copyOf(warmupStats.latencies, totalElements); + System.arraycopy(this.latencies, 0, overallLatencyArray, warmupStats.index(), this.index()); int[] percs = percentiles(overallLatencyArray, totalElements, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", @@ -522,6 +543,7 @@ public void printTotal(Stats warmupStats) { percs[2], percs[3]); } + */ private static int[] percentiles(int[] latencies, int count, double... percentiles) { int size = Math.min(count, latencies.length); diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index b3912cf9fa0e3..787b1492770e5 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -517,7 +517,7 @@ public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); - assertEquals(2, producerPerformanceSpy.warmupStats.totalCount()); + assertEquals(10, producerPerformanceSpy.overallStats.totalCount()); assertEquals(8, producerPerformanceSpy.stats.totalCount()); verify(producerMock, times(1)).close(); } From 6dcaed8d9cc9ea13d6a07951599b5cfecab44e66 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Fri, 20 Sep 2024 09:56:58 -0700 Subject: [PATCH 03/16] :bugfix: Remove redundant declaration of overallStats --- .../main/java/org/apache/kafka/tools/ProducerPerformance.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 17c609bf131be..d5df384aaac00 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -145,7 +145,7 @@ record = new ProducerRecord<>(config.topicName, payload); /* print warmup stats if relevant */ if (warmupStats != null) { - Stats overallStats = new Stats(warmupStats, stats); + overallStats = new Stats(warmupStats, stats); overallStats.printTotal(); } /* print final results */ From 46fcad99b18c9c735be48ce50a5c9576c30368a8 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Fri, 20 Sep 2024 11:24:48 -0700 Subject: [PATCH 04/16] Remove overallStats Refactor to remove declaration of overallStats and limit its scope to only the location where printTotal is called. Set Stats combine-constructor as package-private. Remove commented code. --- .../kafka/tools/ProducerPerformance.java | 50 ++----------------- .../kafka/tools/ProducerPerformanceTest.java | 2 +- 2 files changed, 4 insertions(+), 48 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index d5df384aaac00..532b7c554345f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -132,8 +132,7 @@ record = new ProducerRecord<>(config.topicName, payload); /* print warmup stats if relevant */ if (warmupStats != null) { - overallStats = new Stats(warmupStats, stats); - overallStats.printTotal(); + new Stats(warmupStats, stats).printTotal(); } /* print final results */ stats.printTotal(); @@ -145,8 +144,7 @@ record = new ProducerRecord<>(config.topicName, payload); /* print warmup stats if relevant */ if (warmupStats != null) { - overallStats = new Stats(warmupStats, stats); - overallStats.printTotal(); + new Stats(warmupStats, stats).printTotal(); } /* print final results */ stats.printTotal(); @@ -174,7 +172,6 @@ KafkaProducer createKafkaProducer(Properties props) { Callback cb; Stats stats; - Stats overallStats; Stats warmupStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, @@ -407,7 +404,7 @@ public Stats(long numRecords, int reportingInterval, long warmupRecords) { this.warmupRecords = warmupRecords; } - public Stats(Stats first, Stats second) { + Stats(Stats first, Stats second) { // create a Stats object that's the combination of two disjoint Stats objects this.start = Math.min(first.start, second.start); this.iteration = first.iteration + second.iteration; @@ -420,10 +417,6 @@ public Stats(Stats first, Stats second) { this.reportingInterval = first.reportingInterval; this.warmupRecords = 0; this.count = first.count + second.count; - // unused vars, populating to prevent compiler errors: - //this.windowMaxLatency = 0; - //this.windowTotalLatency = 0; - //this.windowBytes = 0; } public void record(int latency, int bytes, long time) { @@ -508,43 +501,6 @@ public void printTotal() { percs[3]); } - /* - public void combineStats(Stats stats) { - this.count += stats.totalCount(); - this.bytes += stats.bytes; - this.totalLatency += stats.totalLatency; - this.latencies = Arrays.copyOf(this.latencies, index + stats.index); - System.arraycopy(stats.latencies, 0, this.latencies, this.index(), stats.index()); - this.index += stats.index; - } - - public void printTotal(Stats warmupStats) { - long overallElapsed = System.currentTimeMillis() - warmupStats.start; - long overallCount = count + warmupStats.totalCount(); - long overallBytes = bytes + warmupStats.bytes(); - double overallRecsPerSec = 1000.0 * overallCount / (double) overallElapsed; - double overallMbPerSec = 1000.0 * overallBytes / (double) overallElapsed / (1024.0 * 1024.0); - int overallMax = Math.max(maxLatency, warmupStats.maxLatency); - long overallTotalLatency = totalLatency + warmupStats.totalLatency; - - int totalElements = index + warmupStats.index(); - int[] overallLatencyArray = Arrays.copyOf(warmupStats.latencies, totalElements); - System.arraycopy(this.latencies, 0, overallLatencyArray, warmupStats.index(), this.index()); - - int[] percs = percentiles(overallLatencyArray, totalElements, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", - overallCount, - overallRecsPerSec, - overallMbPerSec, - overallTotalLatency / (double) overallCount, - (double) overallMax, - percs[0], - percs[1], - percs[2], - percs[3]); - } - */ - private static int[] percentiles(int[] latencies, int count, double... percentiles) { int size = Math.min(count, latencies.length); Arrays.sort(latencies, 0, size); diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 787b1492770e5..b3912cf9fa0e3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -517,7 +517,7 @@ public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); - assertEquals(10, producerPerformanceSpy.overallStats.totalCount()); + assertEquals(2, producerPerformanceSpy.warmupStats.totalCount()); assertEquals(8, producerPerformanceSpy.stats.totalCount()); verify(producerMock, times(1)).close(); } From 6dfbaa7eb15d82ca076237a1046241eed4f23465 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 3 Oct 2024 16:36:51 -0700 Subject: [PATCH 05/16] :bugfix: Properly create callback during switch - Fix callbacks after switch to steady state which were not being properly created. - Add constant for DEFAULT_REPORTING_INTERVAL_MS - Remove redundant producer.send calls - Add ternerary operator in state assignment - Add check for non-positive numRecords --- .../kafka/tools/ProducerPerformance.java | 27 ++++++++++--------- 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 532b7c554345f..9f1f14ee9fd40 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -51,6 +51,7 @@ public class ProducerPerformance { public static final String DEFAULT_TRANSACTION_ID_PREFIX = "performance-producer-"; public static final long DEFAULT_TRANSACTION_DURATION_MS = 3000L; + public static final int DEFAULT_REPORTING_INTERVAL_MS = 5000; public static void main(String[] args) throws Exception { ProducerPerformance perf = new ProducerPerformance(); @@ -78,9 +79,9 @@ void start(String[] args) throws IOException { if (config.warmupRecords > 0) { // TODO: Keep this message? Maybe unnecessary System.out.println("Warmup first " + config.warmupRecords + " records. Steady-state results will print after the complete-test summary."); - this.warmupStats = new Stats(config.warmupRecords, 5000); + warmupStats = new Stats(config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS); } else { - stats = new Stats(config.numRecords, 5000); + stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS); } long startMs = System.currentTimeMillis(); @@ -103,15 +104,17 @@ record = new ProducerRecord<>(config.topicName, payload); if (warmupStats != null) { if (i < config.warmupRecords) { cb = new PerfCallback(sendStartMs, payload.length, warmupStats); - } else if (i == config.warmupRecords) { - stats = new Stats(config.numRecords - config.warmupRecords, 5000, config.warmupRecords); + } else { + if (i == config.warmupRecords) { + // Create the steady-state 'stats' object here so its start time is correct + stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords); + } cb = new PerfCallback(sendStartMs, payload.length, stats); } - producer.send(record, cb); } else { cb = new PerfCallback(sendStartMs, payload.length, stats); - producer.send(record, cb); } + producer.send(record, cb); currentTransactionSize++; if (config.transactionsEnabled && config.transactionDurationMs <= (sendStartMs - transactionStartTime)) { @@ -188,7 +191,7 @@ static byte[] generateRandomPayload(Integer recordSize, List payloadByte } return payload; } - + static Properties readProps(List producerProps, String producerConfig) throws IOException { Properties props = new Properties(); if (producerConfig != null) { @@ -484,10 +487,7 @@ public void printTotal() { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - String state = ""; - if (this.warmupRecords > 0) { - state = " steady state"; - } + String state = this.warmupRecords > 0 ? "" : " steady state"; System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, state, @@ -566,6 +566,9 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); + if (numRecords <= 0) { + throw new ArgumentParserException("The value for --num-records must be greater than zero.", parser); + } if (warmupRecords >= numRecords) { throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); } @@ -573,7 +576,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept throw new ArgumentParserException("Either --producer-props or --producer.config must be specified.", parser); } if (transactionDurationMsArg != null && transactionDurationMsArg <= 0) { - throw new ArgumentParserException("--transaction-duration-ms should > 0", parser); + throw new ArgumentParserException("--transaction-duration-ms should greater than zero", parser); } // since default value gets printed with the help text, we are escaping \n there and replacing it with correct value here. From baef2883aa4d4638278e39c1aa9ed62e337d036e Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 3 Oct 2024 16:51:10 -0700 Subject: [PATCH 06/16] Fix redundant numRecords declaration --- .../main/java/org/apache/kafka/tools/ProducerPerformance.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 9fd32a187d665..f9ac9ae364884 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -542,7 +542,6 @@ static final class ConfigPostProcessor { final String topicName; final Long numRecords; final Long warmupRecords; - final Long numRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; From 50a7565eeb62b2d5cdf7abb5c6dd4cb4bd71d12c Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 3 Oct 2024 16:53:08 -0700 Subject: [PATCH 07/16] Fix erroneous extra bracket --- .../main/java/org/apache/kafka/tools/ProducerPerformance.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index f9ac9ae364884..ecc20663350b2 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -566,7 +566,6 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); - } if (numRecords != null && numRecords <= 0) { throw new ArgumentParserException("--num-records should be greater than zero", parser); } From 4dd891fa1ac2244d6cabf53bbcaa0a64b8358627 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 3 Oct 2024 17:04:31 -0700 Subject: [PATCH 08/16] Fix ternary operator in printTotal --- .../main/java/org/apache/kafka/tools/ProducerPerformance.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index ecc20663350b2..9e22516e6bf0e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -487,10 +487,9 @@ public void printTotal() { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - String state = this.warmupRecords > 0 ? "" : " steady state"; System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, - state, + this.warmupRecords > 0 ? "" : " steady state", recsPerSec, mbPerSec, totalLatency / (double) count, From ce9ce6b101116e34b030b45d3771ac26f4f9da8e Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 3 Oct 2024 17:30:01 -0700 Subject: [PATCH 09/16] Fix selection in ternary operator for proper label --- .../main/java/org/apache/kafka/tools/ProducerPerformance.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 9e22516e6bf0e..884c1b6405ebf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -489,7 +489,7 @@ public void printTotal() { int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, - this.warmupRecords > 0 ? "" : " steady state", + this.warmupRecords > 0 ? " steady state" : "", recsPerSec, mbPerSec, totalLatency / (double) count, From 9be538517a27b1d3817c22845799d96f22aa869a Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 17 Oct 2024 15:38:35 -0700 Subject: [PATCH 10/16] Fix concatenating stats constructor - bytes are now properly accoutned for. - Refactor to use boolean isSteadyState over warmupRecords to track if a Stats object is of steady state --- .../kafka/tools/ProducerPerformance.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 884c1b6405ebf..4e94b27f1f5f8 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -78,7 +78,7 @@ void start(String[] args) throws IOException { ProducerRecord record; if (config.warmupRecords > 0) { // TODO: Keep this message? Maybe unnecessary - System.out.println("Warmup first " + config.warmupRecords + " records. Steady-state results will print after the complete-test summary."); + System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); warmupStats = new Stats(config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS); } else { stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS); @@ -107,7 +107,7 @@ record = new ProducerRecord<>(config.topicName, payload); } else { if (i == config.warmupRecords) { // Create the steady-state 'stats' object here so its start time is correct - stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords); + stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); } cb = new PerfCallback(sendStartMs, payload.length, stats); } @@ -384,13 +384,13 @@ static class Stats { private long windowTotalLatency; private long windowBytes; private long windowStart; - private long warmupRecords; + private final boolean isSteadyState; public Stats(long numRecords, int reportingInterval) { - this(numRecords, reportingInterval, 0); + this(numRecords, reportingInterval, false); } - public Stats(long numRecords, int reportingInterval, long warmupRecords) { + public Stats(long numRecords, int reportingInterval, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -404,7 +404,7 @@ public Stats(long numRecords, int reportingInterval, long warmupRecords) { this.windowBytes = 0; this.totalLatency = 0; this.reportingInterval = reportingInterval; - this.warmupRecords = warmupRecords; + this.isSteadyState = isSteadyState; } Stats(Stats first, Stats second) { @@ -412,14 +412,18 @@ public Stats(long numRecords, int reportingInterval, long warmupRecords) { this.start = Math.min(first.start, second.start); this.iteration = first.iteration + second.iteration; this.sampling = first.sampling; - this.latencies = Arrays.copyOf(first.latencies, first.index + second.index); + this.index = first.index() + second.index(); + this.latencies = Arrays.copyOf(first.latencies, this.index); System.arraycopy(second.latencies, 0, this.latencies, first.index(), second.index()); this.maxLatency = Math.max(first.maxLatency, second.maxLatency); this.windowCount = first.windowCount + second.windowCount; + this.windowMaxLatency = 0; + this.windowTotalLatency = 0; this.totalLatency = first.totalLatency + second.totalLatency; this.reportingInterval = first.reportingInterval; - this.warmupRecords = 0; + this.isSteadyState = false; // false except in the steady-state case this.count = first.count + second.count; + this.bytes = first.bytes + second.bytes; } public void record(int latency, int bytes, long time) { @@ -437,6 +441,9 @@ public void record(int latency, int bytes, long time) { } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { + if (this.isSteadyState && count == windowCount ){ + System.out.println("Beginning steady state. + } printWindow(); newWindow(); } @@ -489,7 +496,7 @@ public void printTotal() { int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, - this.warmupRecords > 0 ? " steady state" : "", + this.isSteadyState ? " steady state" : "", recsPerSec, mbPerSec, totalLatency / (double) count, From fb8a4ca78fafa9e2216f93e2cc8829334b064412 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Mon, 21 Oct 2024 09:45:14 -0700 Subject: [PATCH 11/16] Fix checkstyle error and print --- .../main/java/org/apache/kafka/tools/ProducerPerformance.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 4e94b27f1f5f8..f4664e18804b5 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -441,8 +441,8 @@ public void record(int latency, int bytes, long time) { } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { - if (this.isSteadyState && count == windowCount ){ - System.out.println("Beginning steady state. + if (this.isSteadyState && count == windowCount) { + System.out.println("Beginning steady state."); } printWindow(); newWindow(); From a57554b3710662deb5f595e7b261edd768c4ed86 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Wed, 11 Dec 2024 15:36:27 -0800 Subject: [PATCH 12/16] Refactor for redundant stats objects Stats objects now represent the whole test with 'stats' and the steady state portion of the test with 'steadyStateStats'. Remove merging of Stats latency arrays at end of test. --- .../kafka/tools/ProducerPerformance.java | 45 +++++++++++-------- .../kafka/tools/ProducerPerformanceTest.java | 4 +- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index f4664e18804b5..c7e81dd5bb0d2 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -77,12 +77,9 @@ void start(String[] args) throws IOException { SplittableRandom random = new SplittableRandom(0); ProducerRecord record; if (config.warmupRecords > 0) { - // TODO: Keep this message? Maybe unnecessary System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); - warmupStats = new Stats(config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS); - } else { - stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS); } + stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -101,15 +98,14 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - if (warmupStats != null) { + if (config.warmupRecords > 0) { if (i < config.warmupRecords) { - cb = new PerfCallback(sendStartMs, payload.length, warmupStats); + cb = new PerfCallback(sendStartMs, payload.length, stats); } else { if (i == config.warmupRecords) { - // Create the steady-state 'stats' object here so its start time is correct - stats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); } - cb = new PerfCallback(sendStartMs, payload.length, stats); + cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); } } else { cb = new PerfCallback(sendStartMs, payload.length, stats); @@ -133,24 +129,24 @@ record = new ProducerRecord<>(config.topicName, payload); if (!config.shouldPrintMetrics) { producer.close(); - /* print warmup stats if relevant */ - if (warmupStats != null) { - new Stats(warmupStats, stats).printTotal(); - } /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } } else { // Make sure all messages are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py // expects this class to work with older versions of the client jar that don't support flush(). producer.flush(); - /* print warmup stats if relevant */ - if (warmupStats != null) { - new Stats(warmupStats, stats).printTotal(); - } /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } /* print out metrics */ ToolsUtils.printMetrics(producer.metrics()); @@ -175,7 +171,7 @@ KafkaProducer createKafkaProducer(Properties props) { Callback cb; Stats stats; - Stats warmupStats; + Stats steadyStateStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) { @@ -523,10 +519,19 @@ static final class PerfCallback implements Callback { private final long start; private final int bytes; private final Stats stats; + private final Stats steadyStateStats; public PerfCallback(long start, int bytes, Stats stats) { this.start = start; this.stats = stats; + this.steadyStateStats = null; + this.bytes = bytes; + } + + public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) { + this.start = start; + this.stats = stats; + this.steadyStateStats = steadyStateStats; this.bytes = bytes; } @@ -538,6 +543,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { this.stats.record(latency, bytes, now); this.stats.iteration++; + if (steadyStateStats != null) { + this.steadyStateStats.record(latency, bytes, now); + this.steadyStateStats.iteration++; + } } if (exception != null) exception.printStackTrace(); diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 74134c50c3c6c..35209efdffca9 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -537,8 +537,8 @@ public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { producerPerformanceSpy.start(args); verify(producerMock, times(10)).send(any(), any()); - assertEquals(2, producerPerformanceSpy.warmupStats.totalCount()); - assertEquals(8, producerPerformanceSpy.stats.totalCount()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount()); verify(producerMock, times(1)).close(); } From b0a6c20ecb2afc0a93f797c3210042053552a375 Mon Sep 17 00:00:00 2001 From: sdp Date: Thu, 16 Jan 2025 14:28:40 -0800 Subject: [PATCH 13/16] Fix double-printing of windows during steady state - Add boolean to stats objects to control if they should print or not - Add DEBUG print to show value of config.warmupRecords --- .../org/apache/kafka/tools/ProducerPerformance.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index c7e81dd5bb0d2..ae40593003186 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -76,6 +76,9 @@ void start(String[] args) throws IOException { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; + + System.out.println("DEBUG: config.warmupRecords=" + config.warmupRecords + ", (config.warmupRecords > 0)=" + (config.warmupRecords > 0)); + if (config.warmupRecords > 0) { System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); } @@ -104,6 +107,7 @@ record = new ProducerRecord<>(config.topicName, payload); } else { if (i == config.warmupRecords) { steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); + stats.steadyStateActive = true; } cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); } @@ -381,6 +385,7 @@ static class Stats { private long windowBytes; private long windowStart; private final boolean isSteadyState; + private boolean steadyStateActive; public Stats(long numRecords, int reportingInterval) { this(numRecords, reportingInterval, false); @@ -401,6 +406,7 @@ public Stats(long numRecords, int reportingInterval, boolean isSteadyState) { this.totalLatency = 0; this.reportingInterval = reportingInterval; this.isSteadyState = isSteadyState; + this.steadyStateActive = isSteadyState; } Stats(Stats first, Stats second) { @@ -420,6 +426,7 @@ public Stats(long numRecords, int reportingInterval, boolean isSteadyState) { this.isSteadyState = false; // false except in the steady-state case this.count = first.count + second.count; this.bytes = first.bytes + second.bytes; + this.steadyStateActive = false; } public void record(int latency, int bytes, long time) { @@ -440,7 +447,9 @@ public void record(int latency, int bytes, long time) { if (this.isSteadyState && count == windowCount) { System.out.println("Beginning steady state."); } - printWindow(); + if (this.isSteadyState || !this.steadyStateActive) { + printWindow(); + } newWindow(); } } From b12708760f99a47fabaf993e112339b3da19feed Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Thu, 16 Jan 2025 14:45:19 -0800 Subject: [PATCH 14/16] Remove unused constructor, fix types - Use "long" for numRecords and warmupRecords instead of "Long" - Remove "* != null" for above - Remove unused "combiner" constructor --- .../kafka/tools/ProducerPerformance.java | 28 +++---------------- 1 file changed, 4 insertions(+), 24 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index ae40593003186..fb203ef86cc6c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -409,26 +409,6 @@ public Stats(long numRecords, int reportingInterval, boolean isSteadyState) { this.steadyStateActive = isSteadyState; } - Stats(Stats first, Stats second) { - // create a Stats object that's the combination of two disjoint Stats objects - this.start = Math.min(first.start, second.start); - this.iteration = first.iteration + second.iteration; - this.sampling = first.sampling; - this.index = first.index() + second.index(); - this.latencies = Arrays.copyOf(first.latencies, this.index); - System.arraycopy(second.latencies, 0, this.latencies, first.index(), second.index()); - this.maxLatency = Math.max(first.maxLatency, second.maxLatency); - this.windowCount = first.windowCount + second.windowCount; - this.windowMaxLatency = 0; - this.windowTotalLatency = 0; - this.totalLatency = first.totalLatency + second.totalLatency; - this.reportingInterval = first.reportingInterval; - this.isSteadyState = false; // false except in the steady-state case - this.count = first.count + second.count; - this.bytes = first.bytes + second.bytes; - this.steadyStateActive = false; - } - public void record(int latency, int bytes, long time) { this.count++; this.bytes += bytes; @@ -564,8 +544,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { static final class ConfigPostProcessor { final String topicName; - final Long numRecords; - final Long warmupRecords; + final long numRecords; + final long warmupRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; @@ -590,10 +570,10 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); - if (numRecords != null && numRecords <= 0) { + if (numRecords <= 0) { throw new ArgumentParserException("--num-records should be greater than zero", parser); } - if (warmupRecords != null && warmupRecords >= numRecords) { + if (warmupRecords >= numRecords) { throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); } if (recordSize != null && recordSize <= 0) { From 4ff4daafde67e28a26c227aab7514eb026304770 Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Mon, 24 Mar 2025 14:56:06 -0700 Subject: [PATCH 15/16] Refactor inner callback conditional --- .../kafka/tools/ProducerPerformance.java | 24 ++++--------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index ad6fa65bc8b42..e671369dba81a 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -101,19 +101,11 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - if (config.warmupRecords > 0) { - if (i < config.warmupRecords) { - cb = new PerfCallback(sendStartMs, payload.length, stats); - } else { - if (i == config.warmupRecords) { - steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); - stats.steadyStateActive = true; - } - cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); - } - } else { - cb = new PerfCallback(sendStartMs, payload.length, stats); + if ( config.warmupRecords > 0 && i == config.warmupRecords ) { + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); + stats.steadyStateActive = true; } + cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); producer.send(record, cb); currentTransactionSize++; @@ -173,7 +165,6 @@ KafkaProducer createKafkaProducer(Properties props) { } Callback cb; - Stats stats; Stats steadyStateStats; @@ -510,13 +501,6 @@ static final class PerfCallback implements Callback { private final Stats stats; private final Stats steadyStateStats; - public PerfCallback(long start, int bytes, Stats stats) { - this.start = start; - this.stats = stats; - this.steadyStateStats = null; - this.bytes = bytes; - } - public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) { this.start = start; this.stats = stats; From 7def5f44a4b25bf29a093698b0dd651fa919e64f Mon Sep 17 00:00:00 2001 From: Matt Welch Date: Fri, 18 Apr 2025 14:15:43 -0700 Subject: [PATCH 16/16] Move DEFAULT_REPORTING_INTERVAL_MS into Stats - Remove DEFAULT_REPORTING_INTERVAL_MS as a constructor parameter and move into Stats class; refactor Stats constructors to not use reportingInterval as a parameter - Remove DEBUG print statement --- .../apache/kafka/tools/ProducerPerformance.java | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index e671369dba81a..ec24193d30d50 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -51,7 +51,6 @@ public class ProducerPerformance { public static final String DEFAULT_TRANSACTION_ID_PREFIX = "performance-producer-"; public static final long DEFAULT_TRANSACTION_DURATION_MS = 3000L; - public static final int DEFAULT_REPORTING_INTERVAL_MS = 5000; public static void main(String[] args) throws Exception { ProducerPerformance perf = new ProducerPerformance(); @@ -77,12 +76,10 @@ void start(String[] args) throws IOException { SplittableRandom random = new SplittableRandom(0); ProducerRecord record; - System.out.println("DEBUG: config.warmupRecords=" + config.warmupRecords + ", (config.warmupRecords > 0)=" + (config.warmupRecords > 0)); - if (config.warmupRecords > 0) { System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); } - stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS); + stats = new Stats(config.numRecords); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -102,7 +99,7 @@ record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); if ( config.warmupRecords > 0 && i == config.warmupRecords ) { - steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0); + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.warmupRecords > 0); stats.steadyStateActive = true; } cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); @@ -378,11 +375,11 @@ static class Stats { private final boolean isSteadyState; private boolean steadyStateActive; - public Stats(long numRecords, int reportingInterval) { - this(numRecords, reportingInterval, false); + public Stats(long numRecords) { + this(numRecords, false); } - public Stats(long numRecords, int reportingInterval, boolean isSteadyState) { + public Stats(long numRecords, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -395,7 +392,7 @@ public Stats(long numRecords, int reportingInterval, boolean isSteadyState) { this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; - this.reportingInterval = reportingInterval; + this.reportingInterval = 5000; this.isSteadyState = isSteadyState; this.steadyStateActive = isSteadyState; }