Skip to content

Commit fc1ed24

Browse files
committed
Add java-driver-3.x async benchmark
1 parent eee94a9 commit fc1ed24

File tree

6 files changed

+298
-0
lines changed

6 files changed

+298
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM ubuntu:18.04
2+
RUN apt update
3+
4+
# Install java 8
5+
RUN apt install -y openjdk-8-jdk
6+
RUN apt install -y maven
7+
#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java
8+
RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version
9+
10+
# Copy benchmark code into the container
11+
COPY source /source
12+
WORKDIR /source
13+
14+
# Compile the code
15+
RUN mvn clean package
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
docker build . -t rust-driver-benchmarks-basic-java-driver-3.x-async
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-3.x-async \
3+
java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@"
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>source</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>8</maven.compiler.source>
13+
<maven.compiler.target>8</maven.compiler.target>
14+
</properties>
15+
<dependencies>
16+
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
17+
<dependency>
18+
<groupId>commons-cli</groupId>
19+
<artifactId>commons-cli</artifactId>
20+
<version>1.5.0</version>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>com.scylladb</groupId>
25+
<artifactId>scylla-driver-core</artifactId>
26+
<version>3.11.2.0</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>3.2.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
</execution>
43+
</executions>
44+
</plugin>
45+
</plugins>
46+
</build>
47+
48+
49+
50+
</project>
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import java.util.Arrays;
2+
3+
import org.apache.commons.cli.*;
4+
5+
class Config {
6+
7+
enum Workload {
8+
Inserts, Selects, Mixed,
9+
}
10+
11+
String[] node_addresses;
12+
Workload workload;
13+
long tasks;
14+
long concurrency;
15+
boolean dont_prepare;
16+
17+
Config(String[] args) {
18+
19+
this.node_addresses = new String[]{"127.0.0.1"};
20+
this.workload = Workload.Inserts;
21+
this.tasks = 1000 * 1000;
22+
this.concurrency = 1024;
23+
this.dont_prepare = false;
24+
25+
Options options = new Options();
26+
27+
options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark");
28+
options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma");
29+
options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)");
30+
options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects");
31+
options.addOption("c", "concurrency", true, "Maximum number of requests performed at once");
32+
33+
try {
34+
CommandLineParser parser = new DefaultParser();
35+
CommandLine cmd = parser.parse(options, args);
36+
37+
if (cmd.hasOption("dont-prepare")) {
38+
this.dont_prepare = true;
39+
}
40+
41+
if (cmd.hasOption("nodes")) {
42+
String value = cmd.getOptionValue("nodes");
43+
node_addresses = value.split(",");
44+
}
45+
46+
if (cmd.hasOption("workload")) {
47+
String workloadValue = cmd.getOptionValue("workload");
48+
this.workload = Workload.valueOf(workloadValue);
49+
}
50+
51+
if (cmd.hasOption("tasks")) {
52+
this.tasks = Integer.parseInt(cmd.getOptionValue("tasks"));
53+
}
54+
55+
if (cmd.hasOption("concurrency")) {
56+
this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency"));
57+
}
58+
59+
} catch (ParseException e) {
60+
HelpFormatter helpFormatter = new HelpFormatter();
61+
helpFormatter.printHelp("./run.sh [OPTION]...", options);
62+
System.out.println();
63+
System.out.println("Unexpected exception: " + e.getMessage());
64+
}
65+
}
66+
67+
@Override
68+
public String toString() {
69+
return "Config{" +
70+
"node_addresses=" + Arrays.toString(node_addresses) +
71+
", workload=" + workload +
72+
", tasks=" + tasks +
73+
", concurrency=" + concurrency +
74+
", dont_prepare=" + dont_prepare +
75+
'}';
76+
}
77+
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import com.datastax.driver.core.*;
2+
import com.google.common.util.concurrent.*;
3+
4+
import java.util.ArrayList;
5+
import java.util.concurrent.*;
6+
7+
public class MainClass {
8+
9+
private static Cluster cluster;
10+
private static Config config;
11+
private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)";
12+
private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?";
13+
14+
private static PreparedStatement INSERT_PS;
15+
private static PreparedStatement SELECT_PS;
16+
17+
public static void main(String[] args) throws InterruptedException, ExecutionException {
18+
19+
config = new Config(args);
20+
System.out.println("Parsed config: ");
21+
System.out.println(config.toString());
22+
23+
cluster = Cluster.builder().addContactPoints(config.node_addresses).withProtocolVersion(ProtocolVersion.V4).build();
24+
cluster.getConfiguration().getPoolingOptions().setMaxQueueSize((int) Math.max(2048, 2 * config.concurrency));
25+
Session session = cluster.connect();
26+
27+
prepareKeyspaceAndTable(session);
28+
29+
if (!config.dont_prepare) {
30+
prepareKeyspaceAndTable(session);
31+
32+
if (config.workload.equals(Config.Workload.Selects)) {
33+
prepareSelectsBenchmark(session);
34+
}
35+
}
36+
37+
38+
ArrayList<CompletableFuture<ResultSet>> arr = new ArrayList<>();
39+
40+
System.out.println("Starting the benchmark");
41+
42+
long benchmarkStart = System.nanoTime();
43+
44+
INSERT_PS = session.prepare(INSERT_STRING);
45+
SELECT_PS = session.prepare(SELECT_STRING);
46+
47+
for (int i = 0; i < config.concurrency; i++) {
48+
if (i + 1 == config.concurrency) {
49+
arr.add(execute(session, i * (config.tasks / config.concurrency), config.tasks));
50+
} else {
51+
arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
52+
}
53+
}
54+
55+
for (Future<?> f : arr) {
56+
f.get(); // make sure nothing has thrown and everything finished
57+
}
58+
59+
long benchmarkEnd = System.nanoTime();
60+
System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000));
61+
62+
session.close();
63+
if (cluster != null) cluster.close();
64+
}
65+
66+
static void prepareKeyspaceAndTable(Session session) {
67+
session.execute("DROP KEYSPACE IF EXISTS benchks");
68+
session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}");
69+
session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)");
70+
if (!cluster.getMetadata().checkSchemaAgreement()) {
71+
throw new RuntimeException("Schema not in agreement after preparing keyspace and table.");
72+
}
73+
}
74+
75+
private static void prepareSelectsBenchmark(Session session) throws InterruptedException, ExecutionException {
76+
System.out.println("Preparing a selects benchmark (inserting values)...");
77+
78+
ArrayList<CompletableFuture<ResultSet>> arr = new ArrayList<>();
79+
INSERT_PS = session.prepare(INSERT_STRING);
80+
81+
Config.Workload originalWorkload = config.workload;
82+
config.workload = Config.Workload.Inserts; // Switch for setup purposes
83+
84+
for (int i = 0; i < config.concurrency; i++) {
85+
if (i + 1 == config.concurrency) {
86+
arr.add(execute(session, i * (config.tasks / config.concurrency), config.tasks));
87+
} else {
88+
arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
89+
}
90+
}
91+
for (Future<?> f : arr) {
92+
f.get(); // make sure nothing has thrown and everything finished
93+
}
94+
95+
config.workload = originalWorkload;
96+
}
97+
98+
public static CompletableFuture<ResultSet> execute(Session s, long currentIter, long maxIter) {
99+
if (currentIter >= maxIter) {
100+
// No more iterations
101+
return CompletableFuture.completedFuture(null);
102+
}
103+
104+
ListenableFuture<ResultSet> fut = null;
105+
if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) {
106+
fut = s.executeAsync(INSERT_PS.bind(currentIter, 2L * currentIter, 3L * currentIter));
107+
}
108+
109+
if (config.workload.equals(Config.Workload.Selects)) {
110+
fut = s.executeAsync(SELECT_PS.bind(currentIter));
111+
112+
} else if (config.workload.equals(Config.Workload.Mixed)) {
113+
fut = Futures.transform(fut, new AsyncFunction<ResultSet, ResultSet>() {
114+
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
115+
return (s.executeAsync(SELECT_PS.bind(currentIter)));
116+
}
117+
});
118+
}
119+
120+
if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
121+
fut = Futures.transform(fut, new AsyncFunction<ResultSet, ResultSet>() {
122+
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
123+
Row r = rs.one();
124+
if ((r.getLong("v1") != 2L * currentIter) || (r.getLong("v2") != 3L * currentIter)) {
125+
throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", currentIter, 2L * currentIter, 3L * currentIter, r.getLong("pk"), r.getLong("v1"), r.getLong("v2")));
126+
}
127+
return Futures.immediateFuture(rs);
128+
}
129+
});
130+
}
131+
132+
// Convert ResultSetFuture to CompletableFuture
133+
CompletableFuture<ResultSet> futCompletable = new CompletableFuture<>();
134+
Futures.addCallback(fut, new FutureCallback<ResultSet>() {
135+
@Override
136+
public void onSuccess(ResultSet result) {
137+
futCompletable.complete(result);
138+
}
139+
140+
@Override
141+
public void onFailure(Throwable t) {
142+
futCompletable.completeExceptionally(t);
143+
}
144+
});
145+
146+
// Execute next iteration after that
147+
return futCompletable.thenCompose(rs -> execute(s, currentIter + 1, maxIter));
148+
}
149+
}
150+
151+

0 commit comments

Comments
 (0)