diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index ae6015ec18a5d..ec24193d30d50 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -75,7 +75,11 @@ void start(String[] args) throws IOException { // not thread-safe, do not share with other threads SplittableRandom random = new SplittableRandom(0); ProducerRecord record; - stats = new Stats(config.numRecords, 5000); + + if (config.warmupRecords > 0) { + System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary."); + } + stats = new Stats(config.numRecords); long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs); @@ -94,7 +98,11 @@ void start(String[] args) throws IOException { record = new ProducerRecord<>(config.topicName, payload); long sendStartMs = System.currentTimeMillis(); - cb = new PerfCallback(sendStartMs, payload.length, stats); + if ( config.warmupRecords > 0 && i == config.warmupRecords ) { + steadyStateStats = new Stats(config.numRecords - config.warmupRecords, config.warmupRecords > 0); + stats.steadyStateActive = true; + } + cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats); producer.send(record, cb); currentTransactionSize++; @@ -116,6 +124,10 @@ record = new ProducerRecord<>(config.topicName, payload); /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } } else { // Make sure all messages are sent before printing out the stats and the metrics // We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py @@ -124,6 +136,10 @@ record = new ProducerRecord<>(config.topicName, payload); /* print final results */ stats.printTotal(); + /* print steady-state stats if relevant */ + if (steadyStateStats != null) { + steadyStateStats.printTotal(); + } /* print out metrics */ ToolsUtils.printMetrics(producer.metrics()); @@ -146,8 +162,8 @@ KafkaProducer createKafkaProducer(Properties props) { } Callback cb; - Stats stats; + Stats steadyStateStats; static byte[] generateRandomPayload(Integer recordSize, List payloadByteList, byte[] payload, SplittableRandom random, boolean payloadMonotonic, long recordValue) { @@ -163,7 +179,7 @@ static byte[] generateRandomPayload(Integer recordSize, List payloadByte } return payload; } - + static Properties readProps(List producerProps, String producerConfig) throws IOException { Properties props = new Properties(); if (producerConfig != null) { @@ -326,6 +342,16 @@ static ArgumentParser argParser() { "--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " + "the default value will be 3000."); + parser.addArgument("--warmup-records") + .action(store()) + .required(false) + .type(Long.class) + .metavar("WARMUP-RECORDS") + .dest("warmupRecords") + .setDefault(0L) + .help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " + + "An additional summary line will be printed describing the steady-state statistics. (default: 0)."); + return parser; } @@ -346,8 +372,14 @@ static class Stats { private long windowTotalLatency; private long windowBytes; private long windowStart; + private final boolean isSteadyState; + private boolean steadyStateActive; + + public Stats(long numRecords) { + this(numRecords, false); + } - public Stats(long numRecords, int reportingInterval) { + public Stats(long numRecords, boolean isSteadyState) { this.start = System.currentTimeMillis(); this.windowStart = System.currentTimeMillis(); this.iteration = 0; @@ -360,7 +392,9 @@ public Stats(long numRecords, int reportingInterval) { this.windowTotalLatency = 0; this.windowBytes = 0; this.totalLatency = 0; - this.reportingInterval = reportingInterval; + this.reportingInterval = 5000; + this.isSteadyState = isSteadyState; + this.steadyStateActive = isSteadyState; } public void record(int latency, int bytes, long time) { @@ -378,7 +412,12 @@ public void record(int latency, int bytes, long time) { } /* maybe report the recent perf */ if (time - windowStart >= reportingInterval) { - printWindow(); + if (this.isSteadyState && count == windowCount) { + System.out.println("Beginning steady state."); + } + if (this.isSteadyState || !this.steadyStateActive) { + printWindow(); + } newWindow(); } } @@ -428,8 +467,9 @@ public void printTotal() { double recsPerSec = 1000.0 * count / (double) elapsed; double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0); int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999); - System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", + System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n", count, + this.isSteadyState ? " steady state" : "", recsPerSec, mbPerSec, totalLatency / (double) count, @@ -456,10 +496,12 @@ static final class PerfCallback implements Callback { private final long start; private final int bytes; private final Stats stats; + private final Stats steadyStateStats; - public PerfCallback(long start, int bytes, Stats stats) { + public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) { this.start = start; this.stats = stats; + this.steadyStateStats = steadyStateStats; this.bytes = bytes; } @@ -471,6 +513,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { this.stats.record(latency, bytes, now); this.stats.iteration++; + if (steadyStateStats != null) { + this.steadyStateStats.record(latency, bytes, now); + this.steadyStateStats.iteration++; + } } if (exception != null) exception.printStackTrace(); @@ -479,7 +525,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) { static final class ConfigPostProcessor { final String topicName; - final Long numRecords; + final long numRecords; + final long warmupRecords; final Integer recordSize; final double throughput; final boolean payloadMonotonic; @@ -493,6 +540,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept Namespace namespace = parser.parseArgs(args); this.topicName = namespace.getString("topic"); this.numRecords = namespace.getLong("numRecords"); + this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0); this.recordSize = namespace.getInt("recordSize"); this.throughput = namespace.getDouble("throughput"); this.payloadMonotonic = namespace.getBoolean("payloadMonotonic"); @@ -503,9 +551,12 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept String payloadFilePath = namespace.getString("payloadFile"); Long transactionDurationMsArg = namespace.getLong("transactionDurationMs"); String transactionIdArg = namespace.getString("transactionalId"); - if (numRecords != null && numRecords <= 0) { + if (numRecords <= 0) { throw new ArgumentParserException("--num-records should be greater than zero", parser); } + if (warmupRecords >= numRecords) { + throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser); + } if (recordSize != null && recordSize <= 0) { throw new ArgumentParserException("--record-size should be greater than zero", parser); } diff --git a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java index 10b790bf91422..35209efdffca9 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/ProducerPerformanceTest.java @@ -492,4 +492,75 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString() .startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX)); } + + @Test + public void testWarmupRecordsFractionalValue() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "1.5", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + } + + @Test + public void testWarmupRecordsString() throws Exception { + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "foo", + "--throughput", "100", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + ArgumentParser parser = ProducerPerformance.argParser(); + ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args)); + } + + @Test + public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "2", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount()); + verify(producerMock, times(1)).close(); + } + + @Test + public void testWarmupNegativeRecordsNormalTest() throws IOException { + doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class)); + doAnswer(invocation -> { + producerPerformanceSpy.cb.onCompletion(null, null); + return null; + }).when(producerMock).send(any(), any()); + + String[] args = new String[] { + "--topic", "Hello-Kafka", + "--num-records", "10", + "--warmup-records", "-1", + "--throughput", "1", + "--record-size", "100", + "--producer-props", "bootstrap.servers=localhost:9000"}; + producerPerformanceSpy.start(args); + + verify(producerMock, times(10)).send(any(), any()); + assertEquals(10, producerPerformanceSpy.stats.totalCount()); + verify(producerMock, times(1)).close(); + } }