Skip to content

KAFKA-17645: KIP-1052: Enable warmup in producer performance test #17340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ProducerPerformance {

public static final String DEFAULT_TRANSACTION_ID_PREFIX = "performance-producer-";
public static final long DEFAULT_TRANSACTION_DURATION_MS = 3000L;
public static final int DEFAULT_REPORTING_INTERVAL_MS = 5000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this constant could be folded into the Stats class and removed as a constructor parameter since it's never changed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier in the PR, Chia-Ping asked to declare this variable here, probably because it seems analogous to the DEFAULT_TRANSACTION_DURATION_MS. I think the intent here is to have this as a parameter than can be modified for more frequent reporting, but making it a final member of the Stats class seems roughly equivalent to me. @kirktrue, @chia7712 where do you think these constants should be declared?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't plan to modify the reportingInterval for now, it is fine to remove the reportingInterval from Stats

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My latest commit moves DEFAULT_REPORTING_INTERVAL_MS into Stats.


public static void main(String[] args) throws Exception {
ProducerPerformance perf = new ProducerPerformance();
Expand All @@ -75,7 +76,10 @@ void start(String[] args) throws IOException {
// not thread-safe, do not share with other threads
SplittableRandom random = new SplittableRandom(0);
ProducerRecord<byte[], byte[]> record;
stats = new Stats(config.numRecords, 5000);
if (config.warmupRecords > 0) {
System.out.println("Warmup first " + config.warmupRecords + " records. Steady state results will print after the complete test summary.");
}
stats = new Stats(config.numRecords, DEFAULT_REPORTING_INTERVAL_MS);
long startMs = System.currentTimeMillis();

ThroughputThrottler throttler = new ThroughputThrottler(config.throughput, startMs);
Expand All @@ -94,7 +98,18 @@ void start(String[] args) throws IOException {
record = new ProducerRecord<>(config.topicName, payload);

long sendStartMs = System.currentTimeMillis();
cb = new PerfCallback(sendStartMs, payload.length, stats);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to change this to:

if (config.warmupRecords > 0 && i == config.warmupRecords) {
    steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, true);
    stats.steadyStateActive = true;
}

cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);

True, steadyStateStats will be null up until the number of warmup records has elapsed. As I read in Stats, passing in a null Stats object doesn't hurt.

CMIIW.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I think the refactored conditional is much easier to understand. Hopefully nobody minds the passing of null steadyStateStats. I've refactored this in my latest commit.

if (config.warmupRecords > 0) {
if (i < config.warmupRecords) {
cb = new PerfCallback(sendStartMs, payload.length, stats);
} else {
if (i == config.warmupRecords) {
steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config.warmupRecords > 0 can be replaced by true

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clarify what you mean? That conditional is required when warmup is enabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the unclear comment. Please see the following code.

if (config.warmupRecords > 0) {
                    if (i < config.warmupRecords) {
                        cb = new PerfCallback(sendStartMs, payload.length, stats);
                    } else {
                        if (i == config.warmupRecords) {
                            steadyStateStats = new Stats(config.numRecords - config.warmupRecords, DEFAULT_REPORTING_INTERVAL_MS, config.warmupRecords > 0);
                        }
                        cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
                    }
                }

In this case, config.warmupRecords > 0 is always true.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies if I'm still misunderstanding your query, but the conditional "config.warmupRecords > 0" will only evaluate to true when the argument '--warmup-records N' is invoked on the command line and N is greater than zero. The value for config.warmupRecords defaults to '0' otherwise. The if-clause on that line is the path used when a warmup is invoked. The else-clause will be used when no warmup has been requested. This else clause represents the same code path used in previous versions of the test with no warmup.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for the unclear description. I have attached a screenshot to illustrate the warning from the IDE.

Screenshot From 2024-12-18 19-08-16

we can make IDE happy by a little change :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @chia7712, to address your concern, I have temporarily added a DEBUG message that prints the value of config.warmupRecords and the value of the conditional (config.warmupRecords > 0) before the test. Some sample output of running the test with various values of warmupRecords follows. Note that, when negative values of warmupRecords are supplied on the CLI, we set config.warmupRecords to 0.

# warmupRecords = -1
./kafka-producer-perf-test.sh --producer.config ../config/producer.properties --topic warmup--1-records-test --num-records 500000 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092 acks=all compression.type=none batch.size=16384 linger.ms=0 --throughput 10000 --warmup-records -1
DEBUG: config.warmupRecords=0, (config.warmupRecords > 0)=false
49987 records sent, 9997.4 records/sec (9.76 MB/sec), 18.8 ms avg latency, 290.0 ms max latency.
50005 records sent, 10001.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 9.0 ms max latency.
50019 records sent, 10001.8 records/sec (9.77 MB/sec), 0.7 ms avg latency, 9.0 ms max latency.
50005 records sent, 10001.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50006 records sent, 9999.2 records/sec (9.76 MB/sec), 0.7 ms avg latency, 3.0 ms max latency.
49930 records sent, 9960.1 records/sec (9.73 MB/sec), 0.7 ms avg latency, 22.0 ms max latency.
50229 records sent, 10045.8 records/sec (9.81 MB/sec), 0.7 ms avg latency, 23.0 ms max latency.
50015 records sent, 10003.0 records/sec (9.77 MB/sec), 0.6 ms avg latency, 2.0 ms max latency.
49995 records sent, 9997.0 records/sec (9.76 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
500000 records sent, 9996.801024 records/sec (9.76 MB/sec), 2.49 ms avg latency, 290.00 ms max latency, 1 ms 50th, 1 ms 95th, 82 ms 99th, 219 ms 99.9th.

# warmupRecords = 0
./kafka-producer-perf-test.sh --producer.config ../config/producer.properties --topic warmup-0-records-test --num-records 500000 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092 acks=all compression.type=none batch.size=16384 linger.ms=0 --throughput 10000 --warmup-records 0
DEBUG: config.warmupRecords=0, (config.warmupRecords > 0)=false
49982 records sent, 9996.4 records/sec (9.76 MB/sec), 2.3 ms avg latency, 177.0 ms max latency.
50010 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 4.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 12.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 3.0 ms max latency.
50020 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50008 records sent, 10001.6 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
500000 records sent, 9997.200784 records/sec (9.76 MB/sec), 0.83 ms avg latency, 177.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 44 ms 99.9th.

# warmupRecords = 1
./kafka-producer-perf-test.sh --producer.config ../config/producer.properties --topic warmup-1-records-test --num-records 500000 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092 acks=all compression.type=none batch.size=16384 linger.ms=0 --throughput 10000 --warmup-records 1
DEBUG: config.warmupRecords=1, (config.warmupRecords > 0)=true
Warmup first 1 records. Steady state results will print after the complete test summary.
Beginning steady state.
51651 records sent, 10328.1 records/sec (10.09 MB/sec), 2.7 ms avg latency, 63.0 ms max latency.
50015 records sent, 10003.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 3.0 ms max latency.
49995 records sent, 9999.0 records/sec (9.76 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50013 records sent, 10002.6 records/sec (9.77 MB/sec), 0.7 ms avg latency, 3.0 ms max latency.
50016 records sent, 10001.2 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50011 records sent, 10000.2 records/sec (9.77 MB/sec), 0.7 ms avg latency, 19.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50010 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
500000 records sent, 9997.800484 records/sec (9.76 MB/sec), 0.87 ms avg latency, 177.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 54 ms 99.9th.
499999 steady state records sent, 10030.070211 records/sec (9.79 MB/sec), 0.87 ms avg latency, 63.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 54 ms 99.9th.

# warmupRecords = 10000
./kafka-producer-perf-test.sh --producer.config ../config/producer.properties --topic warmup-10000-records-test --num-records 500000 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092 acks=all compression.type=none batch.size=16384 linger.ms=0 --throughput 10000 --warmup-records 10000
DEBUG: config.warmupRecords=10000, (config.warmupRecords > 0)=true
Warmup first 10000 records. Steady state results will print after the complete test summary.
Beginning steady state.
49982 records sent, 9996.4 records/sec (9.76 MB/sec), 0.7 ms avg latency, 4.0 ms max latency.
50010 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 3.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50020 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 7.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 4.0 ms max latency.
50009 records sent, 10001.8 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50001 records sent, 9998.2 records/sec (9.76 MB/sec), 0.6 ms avg latency, 2.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.6 ms avg latency, 2.0 ms max latency.
500000 records sent, 9996.201443 records/sec (9.76 MB/sec), 0.83 ms avg latency, 178.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 46 ms 99.9th.
490000 steady state records sent, 9995.308325 records/sec (9.76 MB/sec), 0.66 ms avg latency, 7.00 ms max latency, 1 ms 50th, 1 ms 95th, 1 ms 99th, 2 ms 99.9th.

# warmupRecords = 50000
./kafka-producer-perf-test.sh --producer.config ../config/producer.properties --topic warmup-50000-records-test --num-records 500000 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092 acks=all compression.type=none batch.size=16384 linger.ms=0 --throughput 10000 --warmup-records 50000
DEBUG: config.warmupRecords=50000, (config.warmupRecords > 0)=true
Warmup first 50000 records. Steady state results will print after the complete test summary.
Beginning steady state.
49991 records sent, 9998.2 records/sec (9.76 MB/sec), 0.6 ms avg latency, 3.0 ms max latency.
50011 records sent, 10002.2 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 25.0 ms max latency.
50010 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 12.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 4.0 ms max latency.
50020 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.6 ms avg latency, 2.0 ms max latency.
50020 records sent, 10002.0 records/sec (9.77 MB/sec), 0.6 ms avg latency, 2.0 ms max latency.
500000 records sent, 9997.600576 records/sec (9.76 MB/sec), 0.81 ms avg latency, 172.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 40 ms 99.9th.
450000 steady state records sent, 9996.001599 records/sec (9.76 MB/sec), 0.66 ms avg latency, 25.00 ms max latency, 1 ms 50th, 1 ms 95th, 1 ms 99th, 4 ms 99.9th.

# warmupRecords = 100000
./kafka-producer-perf-test.sh --producer.config ../config/producer.properties --topic warmup-100000-records-test --num-records 500000 --record-size 1024 --producer-props bootstrap.servers=127.0.0.1:9092 acks=all compression.type=none batch.size=16384 linger.ms=0 --throughput 10000 --warmup-records 100000
DEBUG: config.warmupRecords=100000, (config.warmupRecords > 0)=true
Warmup first 100000 records. Steady state results will print after the complete test summary.
49994 records sent, 9996.8 records/sec (9.76 MB/sec), 2.2 ms avg latency, 184.0 ms max latency.
Beginning steady state.
49991 records sent, 9994.2 records/sec (9.76 MB/sec), 0.6 ms avg latency, 2.0 ms max latency.
50006 records sent, 10001.2 records/sec (9.77 MB/sec), 0.9 ms avg latency, 31.0 ms max latency.
49995 records sent, 9999.0 records/sec (9.76 MB/sec), 0.6 ms avg latency, 8.0 ms max latency.
50020 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 4.0 ms max latency.
50000 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50010 records sent, 10002.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 2.0 ms max latency.
50010 records sent, 10000.0 records/sec (9.77 MB/sec), 0.7 ms avg latency, 7.0 ms max latency.
500000 records sent, 9997.200784 records/sec (9.76 MB/sec), 0.84 ms avg latency, 184.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 42 ms 99.9th.
400000 steady state records sent, 9995.002499 records/sec (9.76 MB/sec), 0.69 ms avg latency, 31.00 ms max latency, 1 ms 50th, 1 ms 95th, 2 ms 99th, 17 ms 99.9th.

}
cb = new PerfCallback(sendStartMs, payload.length, stats, steadyStateStats);
}
} else {
cb = new PerfCallback(sendStartMs, payload.length, stats);
}
producer.send(record, cb);

currentTransactionSize++;
Expand All @@ -116,6 +131,10 @@ record = new ProducerRecord<>(config.topicName, payload);

/* print final results */
stats.printTotal();
/* print steady-state stats if relevant */
if (steadyStateStats != null) {
steadyStateStats.printTotal();
}
} else {
// Make sure all messages are sent before printing out the stats and the metrics
// We need to do this in a different branch for now since tests/kafkatest/sanity_checks/test_performance_services.py
Expand All @@ -124,6 +143,10 @@ record = new ProducerRecord<>(config.topicName, payload);

/* print final results */
stats.printTotal();
/* print steady-state stats if relevant */
if (steadyStateStats != null) {
steadyStateStats.printTotal();
}

/* print out metrics */
ToolsUtils.printMetrics(producer.metrics());
Expand All @@ -148,6 +171,7 @@ KafkaProducer<byte[], byte[]> createKafkaProducer(Properties props) {
Callback cb;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is unrelated to these changes, but why are cb and stats declared at the instance level instead of just created inside start()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding cb and the stats objects, their scopes don't extend beyond the start() method and the callback which gets them passed in. I tested moving them and found no errors. Do you think that I should move their declaration in this patch?


Stats stats;
Stats steadyStateStats;

static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByteList, byte[] payload,
SplittableRandom random, boolean payloadMonotonic, long recordValue) {
Expand All @@ -163,7 +187,7 @@ static byte[] generateRandomPayload(Integer recordSize, List<byte[]> payloadByte
}
return payload;
}

static Properties readProps(List<String> producerProps, String producerConfig) throws IOException {
Properties props = new Properties();
if (producerConfig != null) {
Expand Down Expand Up @@ -326,6 +350,16 @@ static ArgumentParser argParser() {
"--producer.config, or --transactional-id but --transaction-duration-ms is not specified, " +
"the default value will be 3000.");

parser.addArgument("--warmup-records")
.action(store())
.required(false)
.type(Long.class)
.metavar("WARMUP-RECORDS")
.dest("warmupRecords")
.setDefault(0L)
.help("The number of records to treat as warmup; these initial records will not be included in steady-state statistics. " +
"An additional summary line will be printed describing the steady-state statistics. (default: 0).");

return parser;
}

Expand All @@ -346,8 +380,13 @@ static class Stats {
private long windowTotalLatency;
private long windowBytes;
private long windowStart;
private final boolean isSteadyState;

public Stats(long numRecords, int reportingInterval) {
this(numRecords, reportingInterval, false);
}

public Stats(long numRecords, int reportingInterval, boolean isSteadyState) {
this.start = System.currentTimeMillis();
this.windowStart = System.currentTimeMillis();
this.iteration = 0;
Expand All @@ -361,6 +400,26 @@ public Stats(long numRecords, int reportingInterval) {
this.windowBytes = 0;
this.totalLatency = 0;
this.reportingInterval = reportingInterval;
this.isSteadyState = isSteadyState;
}

Stats(Stats first, Stats second) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please remove this unused constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// create a Stats object that's the combination of two disjoint Stats objects
this.start = Math.min(first.start, second.start);
this.iteration = first.iteration + second.iteration;
this.sampling = first.sampling;
this.index = first.index() + second.index();
this.latencies = Arrays.copyOf(first.latencies, this.index);
System.arraycopy(second.latencies, 0, this.latencies, first.index(), second.index());
this.maxLatency = Math.max(first.maxLatency, second.maxLatency);
this.windowCount = first.windowCount + second.windowCount;
this.windowMaxLatency = 0;
this.windowTotalLatency = 0;
this.totalLatency = first.totalLatency + second.totalLatency;
this.reportingInterval = first.reportingInterval;
this.isSteadyState = false; // false except in the steady-state case
this.count = first.count + second.count;
this.bytes = first.bytes + second.bytes;
}

public void record(int latency, int bytes, long time) {
Expand All @@ -378,6 +437,9 @@ public void record(int latency, int bytes, long time) {
}
/* maybe report the recent perf */
if (time - windowStart >= reportingInterval) {
if (this.isSteadyState && count == windowCount) {
System.out.println("Beginning steady state.");
}
printWindow();
newWindow();
}
Expand Down Expand Up @@ -428,8 +490,9 @@ public void printTotal() {
double recsPerSec = 1000.0 * count / (double) elapsed;
double mbPerSec = 1000.0 * this.bytes / (double) elapsed / (1024.0 * 1024.0);
int[] percs = percentiles(this.latencies, index, 0.5, 0.95, 0.99, 0.999);
System.out.printf("%d records sent, %.1f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
System.out.printf("%d%s records sent, %f records/sec (%.2f MB/sec), %.2f ms avg latency, %.2f ms max latency, %d ms 50th, %d ms 95th, %d ms 99th, %d ms 99.9th.%n",
count,
this.isSteadyState ? " steady state" : "",
recsPerSec,
mbPerSec,
totalLatency / (double) count,
Expand All @@ -456,10 +519,19 @@ static final class PerfCallback implements Callback {
private final long start;
private final int bytes;
private final Stats stats;
private final Stats steadyStateStats;

public PerfCallback(long start, int bytes, Stats stats) {
this.start = start;
this.stats = stats;
this.steadyStateStats = null;
this.bytes = bytes;
}

public PerfCallback(long start, int bytes, Stats stats, Stats steadyStateStats) {
this.start = start;
this.stats = stats;
this.steadyStateStats = steadyStateStats;
this.bytes = bytes;
}

Expand All @@ -471,6 +543,10 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
this.stats.record(latency, bytes, now);
this.stats.iteration++;
if (steadyStateStats != null) {
this.steadyStateStats.record(latency, bytes, now);
this.steadyStateStats.iteration++;
}
}
if (exception != null)
exception.printStackTrace();
Expand All @@ -480,6 +556,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
static final class ConfigPostProcessor {
final String topicName;
final Long numRecords;
final Long warmupRecords;
final Integer recordSize;
final double throughput;
final boolean payloadMonotonic;
Expand All @@ -493,6 +570,7 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
Namespace namespace = parser.parseArgs(args);
this.topicName = namespace.getString("topic");
this.numRecords = namespace.getLong("numRecords");
this.warmupRecords = Math.max(namespace.getLong("warmupRecords"), 0);
this.recordSize = namespace.getInt("recordSize");
this.throughput = namespace.getDouble("throughput");
this.payloadMonotonic = namespace.getBoolean("payloadMonotonic");
Expand All @@ -506,6 +584,9 @@ public ConfigPostProcessor(ArgumentParser parser, String[] args) throws IOExcept
if (numRecords != null && numRecords <= 0) {
throw new ArgumentParserException("--num-records should be greater than zero", parser);
}
if (warmupRecords != null && warmupRecords >= numRecords) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warmupRecords != null is unnecessary. Also, could you please change the type of numRecords and warmupRecords from Long to long?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

throw new ArgumentParserException("The value for --warmup-records must be strictly fewer than the number of records in the test, --num-records.", parser);
}
if (recordSize != null && recordSize <= 0) {
throw new ArgumentParserException("--record-size should be greater than zero", parser);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,75 @@ public void testEnableTransactionByTransactionDurationMs() throws IOException, A
assertTrue(configs.producerProps.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG).toString()
.startsWith(ProducerPerformance.DEFAULT_TRANSACTION_ID_PREFIX));
}

@Test
public void testWarmupRecordsFractionalValue() throws Exception {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "1.5",
"--throughput", "100",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
}

@Test
public void testWarmupRecordsString() throws Exception {
String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "foo",
"--throughput", "100",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
ArgumentParser parser = ProducerPerformance.argParser();
ArgumentParserException thrown = assertThrows(ArgumentParserException.class, () -> parser.parseArgs(args));
}

@Test
public void testWarmupNumberOfSuccessfulSendAndClose() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());

String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "2",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);

verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
assertEquals(10 - 2, producerPerformanceSpy.steadyStateStats.totalCount());
verify(producerMock, times(1)).close();
}

@Test
public void testWarmupNegativeRecordsNormalTest() throws IOException {
doReturn(producerMock).when(producerPerformanceSpy).createKafkaProducer(any(Properties.class));
doAnswer(invocation -> {
producerPerformanceSpy.cb.onCompletion(null, null);
return null;
}).when(producerMock).send(any(), any());

String[] args = new String[] {
"--topic", "Hello-Kafka",
"--num-records", "10",
"--warmup-records", "-1",
"--throughput", "1",
"--record-size", "100",
"--producer-props", "bootstrap.servers=localhost:9000"};
producerPerformanceSpy.start(args);

verify(producerMock, times(10)).send(any(), any());
assertEquals(10, producerPerformanceSpy.stats.totalCount());
verify(producerMock, times(1)).close();
}
}
Loading