Skip to content

Commit eee94a9

Browse files
committed
Add java-driver-4.x benchmark
1 parent 7e59e20 commit eee94a9

File tree

6 files changed

+335
-0
lines changed

6 files changed

+335
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM ubuntu:18.04
2+
RUN apt update
3+
4+
# Install java 8
5+
RUN apt install -y openjdk-8-jdk
6+
RUN apt install -y maven
7+
#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java
8+
RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version
9+
10+
# Copy benchmark code into the container
11+
COPY source /source
12+
WORKDIR /source
13+
14+
# Compile the code
15+
RUN mvn clean package
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
docker build . -t rust-driver-benchmarks-basic-java-driver-4.x
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-4.x \
3+
java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@"
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>source</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>8</maven.compiler.source>
13+
<maven.compiler.target>8</maven.compiler.target>
14+
<driver.version>4.14.1.0</driver.version>
15+
</properties>
16+
<dependencies>
17+
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
18+
<dependency>
19+
<groupId>commons-cli</groupId>
20+
<artifactId>commons-cli</artifactId>
21+
<version>1.5.0</version>
22+
</dependency>
23+
24+
<dependency>
25+
<groupId>com.scylladb</groupId>
26+
<artifactId>java-driver-core</artifactId>
27+
<version>${driver.version}</version>
28+
</dependency>
29+
30+
<dependency>
31+
<groupId>com.scylladb</groupId>
32+
<artifactId>java-driver-query-builder</artifactId>
33+
<version>${driver.version}</version>
34+
</dependency>
35+
36+
<dependency>
37+
<groupId>com.scylladb</groupId>
38+
<artifactId>java-driver-mapper-runtime</artifactId>
39+
<version>${driver.version}</version>
40+
</dependency>
41+
42+
</dependencies>
43+
44+
<build>
45+
<plugins>
46+
<plugin>
47+
<groupId>org.apache.maven.plugins</groupId>
48+
<artifactId>maven-shade-plugin</artifactId>
49+
<version>3.2.4</version>
50+
<executions>
51+
<execution>
52+
<phase>package</phase>
53+
<goals>
54+
<goal>shade</goal>
55+
</goals>
56+
</execution>
57+
</executions>
58+
</plugin>
59+
</plugins>
60+
</build>
61+
62+
63+
64+
</project>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import java.util.Arrays;
2+
3+
import org.apache.commons.cli.*;
4+
5+
import static java.lang.Math.max;
6+
7+
class Config{
8+
9+
enum Workload {
10+
Inserts,
11+
Selects,
12+
Mixed,
13+
};
14+
15+
String[] node_addresses;
16+
Workload workload;
17+
long tasks;
18+
long concurrency;
19+
long batch_size;
20+
boolean dont_prepare;
21+
22+
Config(String[] args) {
23+
24+
this.node_addresses = new String[]{"127.0.0.1"};
25+
this.workload = Workload.Inserts;
26+
this.tasks = 1000 * 1000;
27+
this.concurrency = 1024;
28+
this.dont_prepare = false;
29+
30+
Options options = new Options();
31+
32+
options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark");
33+
options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma");
34+
options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)");
35+
options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects");
36+
options.addOption("c", "concurrency", true, "Maximum number of requests performed at once");
37+
38+
try {
39+
CommandLineParser parser = new DefaultParser();
40+
CommandLine cmd = parser.parse(options, args);
41+
42+
if(cmd.hasOption("dont-prepare")){
43+
this.dont_prepare = true;
44+
}
45+
46+
if(cmd.hasOption("nodes")){
47+
String value = cmd.getOptionValue("nodes");
48+
node_addresses = value.split(",");
49+
}
50+
51+
if(cmd.hasOption("workload")){
52+
String workloadValue = cmd.getOptionValue("workload");
53+
this.workload = Workload.valueOf(workloadValue);
54+
}
55+
56+
if(cmd.hasOption("tasks")){
57+
this.tasks = Integer.parseInt(cmd.getOptionValue("tasks"));
58+
}
59+
60+
if(cmd.hasOption("concurrency")){
61+
this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency"));
62+
}
63+
} catch (ParseException e){
64+
HelpFormatter helpFormatter = new HelpFormatter();
65+
helpFormatter.printHelp("./run.sh [OPTION]...", options);
66+
System.out.println();
67+
System.out.println("Unexpected exception: " + e.getMessage());
68+
}
69+
70+
batch_size = 256;
71+
72+
if (this.tasks/this.batch_size < this.concurrency) {
73+
this.batch_size = max(1, this.tasks / this.concurrency);
74+
}
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return "Config{" +
80+
"node_addresses=" + Arrays.toString(node_addresses) +
81+
", workload=" + workload +
82+
", tasks=" + tasks +
83+
", concurrency=" + concurrency +
84+
", batch_size=" + batch_size +
85+
", dont_prepare=" + dont_prepare +
86+
'}';
87+
}
88+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import com.datastax.oss.driver.api.core.CqlSession;
2+
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
3+
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
4+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
5+
import com.datastax.oss.driver.api.core.cql.ResultSet;
6+
import com.datastax.oss.driver.api.core.cql.Row;
7+
8+
import java.net.InetSocketAddress;
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
import java.util.concurrent.*;
12+
import java.util.concurrent.atomic.AtomicLong;
13+
14+
public class MainClass {
15+
private static Config config;
16+
17+
private static ExecutorService executor;
18+
private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)";
19+
private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?";
20+
21+
private static long benchmarkStart;
22+
23+
private static long benchmarkEnd;
24+
25+
public static void main(String[] args) throws InterruptedException, ExecutionException {
26+
27+
config = new Config(args);
28+
System.out.println("Parsed config: ");
29+
System.out.println(config.toString());
30+
31+
DriverConfigLoader loader =
32+
DriverConfigLoader.programmaticBuilder()
33+
.withString(DefaultDriverOption.PROTOCOL_VERSION, "V4")
34+
.withLong(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, Math.max(2048, 2L * config.concurrency))
35+
.build();
36+
37+
CqlSession session = CqlSession
38+
.builder()
39+
.withConfigLoader(loader)
40+
.addContactPoints(parseAddresses(config.node_addresses))
41+
.withLocalDatacenter("datacenter1")
42+
.build();
43+
44+
prepareKeyspaceAndTable(session);
45+
46+
if (!config.dont_prepare) {
47+
prepareKeyspaceAndTable(session);
48+
49+
if (config.workload.equals(Config.Workload.Selects)) {
50+
prepareSelectsBenchmark(session);
51+
}
52+
}
53+
54+
AtomicLong nextBatchStart = new AtomicLong(0);
55+
56+
executor = Executors.newFixedThreadPool((int) config.concurrency);
57+
58+
System.out.println("Starting the benchmark");
59+
60+
benchmarkStart = System.nanoTime();
61+
ArrayList<Future<?>> arr = new ArrayList<>();
62+
for (int i = 0; i < config.concurrency; i++) {
63+
arr.add(
64+
executor.submit(() -> {
65+
PreparedStatement insertQ = session.prepare(INSERT_STRING);
66+
PreparedStatement selectQ = session.prepare(SELECT_STRING);
67+
while (true) {
68+
69+
long curBatchStart = nextBatchStart.addAndGet(config.batch_size);
70+
if (curBatchStart >= config.tasks) {
71+
break;
72+
}
73+
74+
long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks);
75+
76+
for (long pk = curBatchStart; pk < curBatchEnd; pk++) {
77+
if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) {
78+
session.execute(insertQ.bind(pk, 2L * pk, 3L * pk));
79+
}
80+
81+
if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
82+
ResultSet rs = session.execute(selectQ.bind(pk));
83+
Row r = rs.one();
84+
assert r != null;
85+
if ((r.getLong("v1") != 2 * pk) || (r.getLong("v2") != 3 * pk)) {
86+
throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", pk, 2 * pk, 3 * pk, r.getInt("pk"), r.getInt("v1"), r.getInt("v2")));
87+
}
88+
}
89+
}
90+
}
91+
}));
92+
}
93+
94+
executor.shutdown();
95+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
96+
benchmarkEnd = System.nanoTime();
97+
for (Future<?> f : arr) {
98+
f.get(); // make sure nothing has thrown
99+
}
100+
System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000));
101+
102+
session.close();
103+
}
104+
105+
static void prepareKeyspaceAndTable(CqlSession session) {
106+
session.execute("DROP KEYSPACE IF EXISTS benchks");
107+
session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}");
108+
session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)");
109+
if (!session.checkSchemaAgreement()) {
110+
throw new RuntimeException("Schema not in agreement after preparing keyspace and table.");
111+
}
112+
}
113+
114+
private static void prepareSelectsBenchmark(CqlSession session) throws InterruptedException, ExecutionException {
115+
System.out.println("Preparing a selects benchmark (inserting values)...");
116+
117+
AtomicLong nextBatchStart = new AtomicLong(0);
118+
executor = Executors.newFixedThreadPool((int) config.concurrency);
119+
120+
ArrayList<Future<?>> arr = new ArrayList<>();
121+
try {
122+
for (int i = 0; i < config.concurrency; i++) {
123+
arr.add(executor.submit(() -> {
124+
PreparedStatement insertQ = session.prepare(INSERT_STRING);
125+
while (true) {
126+
long curBatchStart = nextBatchStart.addAndGet(config.batch_size);
127+
if (curBatchStart >= config.tasks) {
128+
break;
129+
}
130+
long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks);
131+
for (long pk = curBatchStart; pk < curBatchEnd; pk++) {
132+
session.execute(insertQ.bind(pk, 2L * pk, 3L * pk));
133+
}
134+
}
135+
}));
136+
}
137+
} finally {
138+
executor.shutdown();
139+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
140+
for (Future<?> f : arr) {
141+
f.get(); // make sure nothing has thrown
142+
}
143+
}
144+
}
145+
146+
private static List<InetSocketAddress> parseAddresses(String[] arr){
147+
ArrayList<InetSocketAddress> result = new ArrayList<>();
148+
for(String s : arr){
149+
InetSocketAddress addr;
150+
if(s.contains(":")){
151+
String[] tmp = s.split(":");
152+
addr = new InetSocketAddress(tmp[0], Integer.parseInt(tmp[1]));
153+
}
154+
else {
155+
addr = new InetSocketAddress(s, 9042);
156+
}
157+
result.add(addr);
158+
}
159+
return result;
160+
}
161+
}
162+
163+

0 commit comments

Comments
 (0)