Skip to content

Commit 7e59e20

Browse files
committed
Add java-driver-3.x benchmark
1 parent 2f74a7f commit 7e59e20

File tree

6 files changed

+292
-0
lines changed

6 files changed

+292
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM ubuntu:18.04
2+
RUN apt update
3+
4+
# Install java 8
5+
RUN apt install -y openjdk-8-jdk
6+
RUN apt install -y maven
7+
#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java
8+
RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version
9+
10+
# Copy benchmark code into the container
11+
COPY source /source
12+
WORKDIR /source
13+
14+
# Compile the code
15+
RUN mvn clean package
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
docker build . -t rust-driver-benchmarks-basic-java-driver-3.x
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-3.x \
3+
java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@"
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>source</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>8</maven.compiler.source>
13+
<maven.compiler.target>8</maven.compiler.target>
14+
</properties>
15+
<dependencies>
16+
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
17+
<dependency>
18+
<groupId>commons-cli</groupId>
19+
<artifactId>commons-cli</artifactId>
20+
<version>1.5.0</version>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>com.scylladb</groupId>
25+
<artifactId>scylla-driver-core</artifactId>
26+
<version>3.11.2.0</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>3.2.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
</execution>
43+
</executions>
44+
</plugin>
45+
</plugins>
46+
</build>
47+
48+
49+
50+
</project>
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import java.util.Arrays;
2+
3+
import org.apache.commons.cli.*;
4+
5+
import static java.lang.Math.max;
6+
7+
class Config{
8+
9+
enum Workload {
10+
Inserts,
11+
Selects,
12+
Mixed,
13+
};
14+
15+
String[] node_addresses;
16+
Workload workload;
17+
long tasks;
18+
long concurrency;
19+
long batch_size;
20+
boolean dont_prepare;
21+
22+
Config(String[] args) {
23+
24+
this.node_addresses = new String[]{"127.0.0.1"};
25+
this.workload = Workload.Inserts;
26+
this.tasks = 1000 * 1000;
27+
this.concurrency = 1024;
28+
this.dont_prepare = false;
29+
30+
Options options = new Options();
31+
32+
options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark");
33+
options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma");
34+
options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)");
35+
options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects");
36+
options.addOption("c", "concurrency", true, "Maximum number of requests performed at once");
37+
38+
try {
39+
CommandLineParser parser = new DefaultParser();
40+
CommandLine cmd = parser.parse(options, args);
41+
42+
if(cmd.hasOption("dont-prepare")){
43+
this.dont_prepare = true;
44+
}
45+
46+
if(cmd.hasOption("nodes")){
47+
String value = cmd.getOptionValue("nodes");
48+
node_addresses = value.split(",");
49+
}
50+
51+
if(cmd.hasOption("workload")){
52+
String workloadValue = cmd.getOptionValue("workload");
53+
this.workload = Workload.valueOf(workloadValue);
54+
}
55+
56+
if(cmd.hasOption("tasks")){
57+
this.tasks = Integer.parseInt(cmd.getOptionValue("tasks"));
58+
}
59+
60+
if(cmd.hasOption("concurrency")){
61+
this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency"));
62+
}
63+
} catch (ParseException e){
64+
HelpFormatter helpFormatter = new HelpFormatter();
65+
helpFormatter.printHelp("./run.sh [OPTION]...", options);
66+
System.out.println();
67+
System.out.println("Unexpected exception: " + e.getMessage());
68+
}
69+
70+
batch_size = 256;
71+
72+
if (this.tasks/this.batch_size < this.concurrency) {
73+
this.batch_size = max(1, this.tasks / this.concurrency);
74+
}
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return "Config{" +
80+
"node_addresses=" + Arrays.toString(node_addresses) +
81+
", workload=" + workload +
82+
", tasks=" + tasks +
83+
", concurrency=" + concurrency +
84+
", batch_size=" + batch_size +
85+
", dont_prepare=" + dont_prepare +
86+
'}';
87+
}
88+
}
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
import com.datastax.driver.core.*;
2+
3+
import java.util.ArrayList;
4+
import java.util.concurrent.*;
5+
import java.util.concurrent.atomic.AtomicLong;
6+
7+
public class MainClass {
8+
9+
private static Cluster cluster;
10+
private static Config config;
11+
12+
private static ExecutorService executor;
13+
private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)";
14+
private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?";
15+
16+
private static long benchmarkStart;
17+
18+
private static long benchmarkEnd;
19+
20+
public static void main(String[] args) throws InterruptedException, ExecutionException {
21+
22+
config = new Config(args);
23+
System.out.println("Parsed config: ");
24+
System.out.println(config.toString());
25+
26+
cluster = Cluster.builder().addContactPoints(config.node_addresses).withProtocolVersion(ProtocolVersion.V4).build();
27+
cluster.getConfiguration().getPoolingOptions().setMaxQueueSize((int) Math.max(2048, 2 * config.concurrency));
28+
Session session = cluster.connect();
29+
30+
prepareKeyspaceAndTable(session);
31+
32+
if (!config.dont_prepare) {
33+
prepareKeyspaceAndTable(session);
34+
35+
if (config.workload.equals(Config.Workload.Selects)) {
36+
prepareSelectsBenchmark(session);
37+
}
38+
}
39+
40+
AtomicLong nextBatchStart = new AtomicLong(0);
41+
42+
executor = Executors.newFixedThreadPool((int) config.concurrency);
43+
44+
System.out.println("Starting the benchmark");
45+
46+
benchmarkStart = System.nanoTime();
47+
ArrayList<Future<?>> arr = new ArrayList<>();
48+
for (int i = 0; i < config.concurrency; i++) {
49+
arr.add(
50+
executor.submit(() -> {
51+
PreparedStatement insertQ = session.prepare(INSERT_STRING);
52+
PreparedStatement selectQ = session.prepare(SELECT_STRING);
53+
while (true) {
54+
55+
long curBatchStart = nextBatchStart.addAndGet(config.batch_size);
56+
if (curBatchStart >= config.tasks) {
57+
break;
58+
}
59+
60+
long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks);
61+
62+
for (long pk = curBatchStart; pk < curBatchEnd; pk++) {
63+
if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) {
64+
session.execute(insertQ.bind(pk, 2L * pk, 3L * pk));
65+
}
66+
67+
if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
68+
ResultSet rs = session.execute(selectQ.bind(pk));
69+
Row r = rs.one();
70+
if ((r.getLong("v1") != 2 * pk) || (r.getLong("v2") != 3 * pk)) {
71+
throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", pk, 2 * pk, 3 * pk, r.getInt("pk"), r.getInt("v1"), r.getInt("v2")));
72+
}
73+
}
74+
}
75+
}
76+
}));
77+
}
78+
79+
executor.shutdown();
80+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
81+
benchmarkEnd = System.nanoTime();
82+
for (Future<?> f : arr) {
83+
f.get(); // make sure nothing has thrown
84+
}
85+
System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000));
86+
87+
session.close();
88+
if (cluster != null) cluster.close();
89+
}
90+
91+
static void prepareKeyspaceAndTable(Session session) {
92+
session.execute("DROP KEYSPACE IF EXISTS benchks");
93+
session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}");
94+
session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)");
95+
if (!cluster.getMetadata().checkSchemaAgreement()) {
96+
throw new RuntimeException("Schema not in agreement after preparing keyspace and table.");
97+
}
98+
}
99+
100+
private static void prepareSelectsBenchmark(Session session) throws InterruptedException, ExecutionException {
101+
System.out.println("Preparing a selects benchmark (inserting values)...");
102+
103+
AtomicLong nextBatchStart = new AtomicLong(0);
104+
executor = Executors.newFixedThreadPool((int) config.concurrency);
105+
106+
ArrayList<Future<?>> arr = new ArrayList<>();
107+
try {
108+
for (int i = 0; i < config.concurrency; i++) {
109+
arr.add(executor.submit(() -> {
110+
PreparedStatement insertQ = session.prepare(INSERT_STRING);
111+
while (true) {
112+
long curBatchStart = nextBatchStart.addAndGet(config.batch_size);
113+
if (curBatchStart >= config.tasks) {
114+
break;
115+
}
116+
long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks);
117+
for (long pk = curBatchStart; pk < curBatchEnd; pk++) {
118+
session.execute(insertQ.bind(pk, 2L * pk, 3L * pk));
119+
}
120+
}
121+
}));
122+
}
123+
} finally {
124+
executor.shutdown();
125+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
126+
for (Future<?> f : arr) {
127+
f.get(); // make sure nothing has thrown
128+
}
129+
}
130+
}
131+
132+
}
133+
134+

0 commit comments

Comments
 (0)