Skip to content

KAFKA-17645: KIP-1052: Enable warmup in producer performance test #17340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 23 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
4cfb897
Refactored for trunk
matt-welch Sep 16, 2024
125a246
Refactor for readability
matt-welch Sep 18, 2024
6dcaed8
:bugfix: Remove redundant declaration of overallStats
matt-welch Sep 20, 2024
46fcad9
Remove overallStats
matt-welch Sep 20, 2024
df6a119
Merge branch 'trunk' into KIP-1052-test-PR
matt-welch Sep 30, 2024
6dfbaa7
:bugfix: Properly create callback during switch
matt-welch Oct 3, 2024
74f2f1c
Merge branch 'trunk' into KIP-1052-PR
matt-welch Oct 3, 2024
baef288
Fix redundant numRecords declaration
matt-welch Oct 3, 2024
50a7565
Fix erroneous extra bracket
matt-welch Oct 3, 2024
4dd891f
Fix ternary operator in printTotal
matt-welch Oct 4, 2024
ce9ce6b
Fix selection in ternary operator for proper label
matt-welch Oct 4, 2024
9be5385
Fix concatenating stats constructor
matt-welch Oct 17, 2024
fb8a4ca
Fix checkstyle error and print
matt-welch Oct 21, 2024
440329b
Merge branch 'trunk' into KIP-1052-PR
matt-welch Oct 21, 2024
a57554b
Refactor for redundant stats objects
matt-welch Dec 11, 2024
818d1c6
Merge branch 'trunk' of github.com:apache/kafka into trunk
Jan 9, 2025
b0a6c20
Fix double-printing of windows during steady state
Jan 16, 2025
b127087
Remove unused constructor, fix types
matt-welch Jan 16, 2025
a55a429
Merge branch 'trunk' into KIP-1052-PR
matt-welch Mar 3, 2025
4ff4daa
Refactor inner callback conditional
matt-welch Mar 24, 2025
7def5f4
Move DEFAULT_REPORTING_INTERVAL_MS into Stats
matt-welch Apr 18, 2025
dd4c5ed
Merge branch 'trunk' of github.com:apache/kafka into KIP-1052-PR
matt-welch May 29, 2025
0115368
Refactor printing and iteration vars
matt-welch May 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 63 additions & 12 deletions tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,12 @@ void start(String[] args) throws IOException {
// not thread-safe, do not share with other threads
SplittableRandom random = new SplittableRandom(0);
ProducerRecord<byte[], byte[]> record;
stats = new Stats(config.numRecords, 5000);

if (config.warmupRecords > 0) {
System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
}
boolean isSteadyState = false;
stats = new Stats(config.numRecords, isSteadyState);
long startMs = System.currentTimeMillis();

ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs);
Expand All @@ -95,7 +100,11 @@ void start(String[] args) throws IOException {
record = new ProducerRecord<>(config.topicName, payload);

long sendStartMs = System.currentTimeMillis();
cb = new PerfCallback(sendStartMs, payload.length, stats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to change this to:

if (config.warmupRecords > 0 && i == config.warmupRecords) {
    steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, true);
    stats.steadyStateActive = true;
}

cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);

True, steadyStateStats will be null up until the number of warmup records has elapsed. As I read in Stats, passing in a null Stats object doesn't hurt.

CMIIW.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I think the refactored conditional is much easier to understand. Hopefully nobody minds the passing of null steadyStateStats. I've refactored this in my latest commit.

if ((isSteadyState = config.warmupRecords > 0) && i == config.warmupRecords) {
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, isSteadyState);
stats.suppressPrinting();
}
cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
producer.send(record, cb);

currentTransactionSize++;
Expand All @@ -117,6 +126,10 @@ record = new ProducerRecord<>(config.topicName, payload);

/* print final results */
stats.printTotal();
/* print steady-state stats if relevant */
if (steadyStateStats != null) {
steadyStateStats.printTotal();
}
} else {
// Make sure all messages are sent before printing out the stats and the metrics
// We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
Expand All @@ -125,6 +138,10 @@ record = new ProducerRecord<>(config.topicName, payload);

/* print final results */
stats.printTotal();
/* print steady-state stats if relevant */
if (steadyStateStats != null) {
steadyStateStats.printTotal();
}

/* print out metrics */
ToolsUtils.printMetrics(producer.metrics());
Expand All @@ -147,8 +164,8 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
}

Callback cb;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is unrelated to these changes, but why are cb and stats declared at the instance level instead of just created inside start()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding cb and the stats objects, their scopes don't extend beyond the start() method and the callback which gets them passed in. I tested moving them and found no errors. Do you think that I should move their declaration in this patch?


Stats stats;
Stats steadyStateStats;

static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
SplittableRandom random, boolean payloadMonotonic, long recordValue) {
Expand All @@ -164,7 +181,7 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
}
return payload;
}

static Properties readProps(List<String> producerProps, String producerConfig) throws IOException {
Properties props = new Properties();
if (producerConfig != null) {
Expand Down Expand Up @@ -331,6 +348,16 @@ static ArgumentParser argParser() {
"--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " +
"the default value will be 3000.");

parser.addArgument("--warmup-records")
.action(store())
.required(false)
.type(Long.class)
.metavar("WARMUP-RECORDS")
.dest("warmupRecords")
.setDefault(0L)
.help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " +
"An additional summary line will be printed describing the steady-state statistics. (default: 0).");

return parser;
}

Expand All @@ -351,8 +378,10 @@ static class Stats {
private long windowTotalLatency;
private long windowBytes;
private long windowStart;
private final boolean isSteadyState;
private boolean supressPrint;

public Stats(long numRecords, int reportingInterval) {
public Stats(long numRecords, boolean isSteadyState) {
this.start = System.currentTimeMillis();
this.windowStart = System.currentTimeMillis();
this.iteration = 0;
Expand All @@ -365,7 +394,9 @@ public Stats(long numRecords, int reportingInterval) {
this.windowTotalLatency = 0;
this.windowBytes = 0;
this.totalLatency = 0;
this.reportingInterval = reportingInterval;
this.reportingInterval = 5000;
this.isSteadyState = isSteadyState;
this.supressPrint = false;
}

public void record(int latency, int bytes, long time) {
Expand All @@ -383,9 +414,15 @@ public void record(int latency, int bytes, long time) {
}
/* maybe report the recent perf */
if (time - windowStart >= reportingInterval) {
printWindow();
if (this.isSteadyState && count == windowCount) {
System.out.println("In steady state.");
}
if (!this.supressPrint) {
printWindow();
}
newWindow();
}
this.iteration++;
}

public long totalCount() {
Expand Down Expand Up @@ -433,8 +470,9 @@ public void printTotal() {
double recsPerSec = 1000.0 * count / (double) elapsed;
double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
count,
this.isSteadyState ? " steady state" : "",
recsPerSec,
mbPerSec,
totalLatency / (double) count,
Expand All @@ -455,16 +493,22 @@ private static int[] percentiles(int[] latencies, int count, double... percentil
}
return values;
}

public void suppressPrinting() {
this.supressPrint = true;
}
}

static final class PerfCallback implements Callback {
private final long start;
private final int bytes;
private final Stats stats;
private final Stats steadyStateStats;

public PerfCallback(long start, int bytes, Stats stats) {
public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
this.start = start;
this.stats = stats;
this.steadyStateStats = steadyStateStats;
this.bytes = bytes;
}

Expand All @@ -475,7 +519,9 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
// magically printed when the sending fails.
if (exception == null) {
this.stats.record(latency, bytes, now);
this.stats.iteration++;
if (steadyStateStats != null) {
this.steadyStateStats.record(latency, bytes, now);
}
}
if (exception != null)
exception.printStackTrace();
Expand All @@ -484,7 +530,8 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {

static final class ConfigPostProcessor {
final String topicName;
final Long numRecords;
final long numRecords;
final long warmupRecords;
final Integer recordSize;
final double throughput;
final boolean payloadMonotonic;
Expand All @@ -498,6 +545,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
Namespace namespace = parser.parseArgs(args);
this.topicName = namespace.getString("topic");
this.numRecords = namespace.getLong("numRecords");
this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0);
this.recordSize = namespace.getInt("recordSize");
this.throughput = namespace.getDouble("throughput");
this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
Expand All @@ -508,9 +556,12 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
String payloadFilePath = namespace.getString("payloadFile");
Long transactionDurationMsArg = namespace.getLong("transactionDurationMs");
String transactionIdArg = namespace.getString("transactionalId");
if (numRecords != null && numRecords <= 0) {
if (numRecords <= 0) {
throw new ArgumentParserException("--num-records should be greater than zero", parser);
}
if (warmupRecords >= numRecords) {
throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
}
if (recordSize != null && recordSize <= 0) {
throw new ArgumentParserException("--record-size should be greater than zero", parser);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,16 +381,16 @@ public void testDefaultClientId() throws Exception {
@Test
public void testStatsInitializationWithLargeNumRecords() {
long numRecords = Long.MAX_VALUE;
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, 5000));
assertDoesNotThrow(() -> new ProducerPerformance.Stats(numRecords, false));
}

@Test
public void testStatsCorrectness() throws Exception {
ExecutorService singleThreaded = Executors.newSingleThreadExecutor();
final long numRecords = 1000000;
ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, 5000);
ProducerPerformance.Stats stats = new ProducerPerformance.Stats(numRecords, false);
for (long i = 0; i < numRecords; i++) {
final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats);
final Callback callback = new ProducerPerformance.PerfCallback(0, 100, stats, null);
CompletableFuture.runAsync(() -> {
callback.onCompletion(null, null);
}, singleThreaded);
Expand Down Expand Up @@ -567,4 +567,77 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A
assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString()
.startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX));
}

@Test
public void testWarmupRecordsFractionalValue() throws Exception {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "1.5",
"--throughput", "100",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
thrown.printStackTrace();
}

@Test
public void testWarmupRecordsString() throws Exception {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "foo",
"--throughput", "100",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
thrown.printStackTrace();
}

@Test
public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());

String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "2",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);

verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount());
verify(producerMock, times(1)).close();
}

@Test
public void testWarmupNegativeRecordsNormalTest() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());

String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "-1",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);

verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
}