Skip to content

Commit 4e3a3d3

Browse files
authored
KAFKA-17570 Rewrite StressTestLog by Java (#17249)
Reviewers: Chia-Ping Tsai <[email protected]>
1 parent 65ae070 commit 4e3a3d3

File tree

2 files changed

+219
-157
lines changed

2 files changed

+219
-157
lines changed

Diff for: core/src/test/scala/other/kafka/StressTestLog.scala

-157
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.jmh.log;
18+
19+
import kafka.log.UnifiedLog;
20+
import kafka.utils.TestUtils;
21+
22+
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
23+
import org.apache.kafka.common.compress.Compression;
24+
import org.apache.kafka.common.config.TopicConfig;
25+
import org.apache.kafka.common.record.FileLogInputStream;
26+
import org.apache.kafka.common.record.FileRecords;
27+
import org.apache.kafka.common.record.MemoryRecords;
28+
import org.apache.kafka.common.record.RecordBatch;
29+
import org.apache.kafka.common.record.Records;
30+
import org.apache.kafka.common.utils.Exit;
31+
import org.apache.kafka.common.utils.Utils;
32+
import org.apache.kafka.server.common.MetadataVersion;
33+
import org.apache.kafka.server.common.RequestLocal;
34+
import org.apache.kafka.server.storage.log.FetchIsolation;
35+
import org.apache.kafka.server.util.MockTime;
36+
import org.apache.kafka.storage.internals.log.AppendOrigin;
37+
import org.apache.kafka.storage.internals.log.FetchDataInfo;
38+
import org.apache.kafka.storage.internals.log.LogAppendInfo;
39+
import org.apache.kafka.storage.internals.log.LogConfig;
40+
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
41+
import org.apache.kafka.storage.internals.log.LogOffsetsListener;
42+
import org.apache.kafka.storage.internals.log.ProducerStateManagerConfig;
43+
import org.apache.kafka.storage.internals.log.VerificationGuard;
44+
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
45+
46+
import java.io.File;
47+
import java.nio.charset.StandardCharsets;
48+
import java.time.Duration;
49+
import java.util.Properties;
50+
import java.util.concurrent.ConcurrentHashMap;
51+
import java.util.concurrent.atomic.AtomicBoolean;
52+
53+
import scala.Option;
54+
55+
56+
public class StressTestLog {
57+
private static final AtomicBoolean RUNNING = new AtomicBoolean(true);
58+
59+
public static void main(String[] args) throws Exception {
60+
File dir = TestUtils.randomPartitionLogDir(TestUtils.tempDir());
61+
MockTime time = new MockTime();
62+
Properties logProperties = new Properties();
63+
logProperties.put(TopicConfig.SEGMENT_BYTES_CONFIG, 64 * 1024 * 1024);
64+
logProperties.put(TopicConfig.MAX_MESSAGE_BYTES_CONFIG, Integer.MAX_VALUE);
65+
logProperties.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 1024 * 1024);
66+
67+
int fiveMinutesInMillis = (int) Duration.ofMinutes(5).toMillis();
68+
UnifiedLog log = UnifiedLog.apply(
69+
dir,
70+
new LogConfig(logProperties),
71+
0L,
72+
0L,
73+
time.scheduler,
74+
new BrokerTopicStats(),
75+
time,
76+
fiveMinutesInMillis,
77+
new ProducerStateManagerConfig(600000, false),
78+
fiveMinutesInMillis,
79+
new LogDirFailureChannel(10),
80+
true,
81+
Option.empty(),
82+
true,
83+
new ConcurrentHashMap<>(),
84+
false,
85+
LogOffsetsListener.NO_OP_OFFSETS_LISTENER
86+
);
87+
88+
WriterThread writer = new WriterThread(log);
89+
writer.start();
90+
ReaderThread reader = new ReaderThread(log);
91+
reader.start();
92+
93+
Exit.addShutdownHook("strees-test-shudtodwn-hook", () -> {
94+
try {
95+
RUNNING.set(false);
96+
writer.join();
97+
reader.join();
98+
Utils.delete(dir);
99+
} catch (Exception e) {
100+
e.printStackTrace();
101+
}
102+
});
103+
104+
while (RUNNING.get()) {
105+
Thread.sleep(1000);
106+
System.out.printf("Reader offset = %d, writer offset = %d%n", reader.currentOffset, writer.currentOffset);
107+
writer.checkProgress();
108+
reader.checkProgress();
109+
}
110+
}
111+
112+
abstract static class WorkerThread extends Thread {
113+
protected long currentOffset = 0;
114+
private long lastOffsetCheckpointed = currentOffset;
115+
private long lastProgressCheckTime = System.currentTimeMillis();
116+
117+
@Override
118+
public void run() {
119+
try {
120+
while (StressTestLog.RUNNING.get()) {
121+
work();
122+
}
123+
} catch (Exception e) {
124+
e.printStackTrace();
125+
} finally {
126+
StressTestLog.RUNNING.set(false);
127+
}
128+
}
129+
130+
protected abstract void work() throws Exception;
131+
132+
public boolean isMakingProgress() {
133+
if (currentOffset > lastOffsetCheckpointed) {
134+
lastOffsetCheckpointed = currentOffset;
135+
return true;
136+
}
137+
return false;
138+
}
139+
140+
public void checkProgress() {
141+
long curTime = System.currentTimeMillis();
142+
if ((curTime - lastProgressCheckTime) > 500) {
143+
if (!isMakingProgress()) {
144+
throw new RuntimeException("Thread not making progress");
145+
}
146+
lastProgressCheckTime = curTime;
147+
}
148+
}
149+
}
150+
151+
static class WriterThread extends WorkerThread {
152+
private final UnifiedLog log;
153+
154+
public WriterThread(UnifiedLog log) {
155+
this.log = log;
156+
}
157+
158+
@Override
159+
protected void work() throws Exception {
160+
byte[] value = Long.toString(currentOffset).getBytes(StandardCharsets.UTF_8);
161+
MemoryRecords records = TestUtils.singletonRecords(value,
162+
null,
163+
Compression.NONE,
164+
RecordBatch.NO_TIMESTAMP,
165+
RecordBatch.CURRENT_MAGIC_VALUE);
166+
LogAppendInfo logAppendInfo = log.appendAsLeader(records,
167+
0,
168+
AppendOrigin.CLIENT,
169+
MetadataVersion.LATEST_PRODUCTION,
170+
RequestLocal.noCaching(),
171+
VerificationGuard.SENTINEL);
172+
173+
if ((logAppendInfo.firstOffset() != -1 && logAppendInfo.firstOffset() != currentOffset)
174+
|| logAppendInfo.lastOffset() != currentOffset) {
175+
throw new RuntimeException("Offsets do not match");
176+
}
177+
178+
currentOffset++;
179+
if (currentOffset % 1000 == 0) {
180+
Thread.sleep(50);
181+
}
182+
}
183+
}
184+
185+
static class ReaderThread extends WorkerThread {
186+
private final UnifiedLog log;
187+
188+
public ReaderThread(UnifiedLog log) {
189+
this.log = log;
190+
}
191+
192+
@Override
193+
protected void work() throws Exception {
194+
try {
195+
FetchDataInfo fetchDataInfo = log.read(
196+
currentOffset,
197+
1,
198+
FetchIsolation.LOG_END,
199+
true
200+
);
201+
202+
Records records = fetchDataInfo.records;
203+
204+
if (records instanceof FileRecords && records.sizeInBytes() > 0) {
205+
FileLogInputStream.FileChannelRecordBatch first = ((FileRecords) records).batches().iterator().next();
206+
if (first.lastOffset() != currentOffset) {
207+
throw new RuntimeException("Expected to read the message at offset " + currentOffset);
208+
}
209+
if (first.sizeInBytes() != records.sizeInBytes()) {
210+
throw new RuntimeException(String.format("Expected %d but got %d.", first.sizeInBytes(), records.sizeInBytes()));
211+
}
212+
currentOffset++;
213+
}
214+
} catch (OffsetOutOfRangeException e) {
215+
// this is ok
216+
}
217+
}
218+
}
219+
}

0 commit comments

Comments
 (0)