Skip to content

Commit 883809d

Browse files
committed
Add java-driver-3.x async benchmark
1 parent eee94a9 commit 883809d

File tree

6 files changed

+329
-0
lines changed

6 files changed

+329
-0
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM ubuntu:18.04
2+
RUN apt update
3+
4+
# Install java 8
5+
RUN apt install -y openjdk-8-jdk
6+
RUN apt install -y maven
7+
#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java
8+
RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version
9+
10+
# Copy benchmark code into the container
11+
COPY source /source
12+
WORKDIR /source
13+
14+
# Compile the code
15+
RUN mvn clean package
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
#!/bin/bash
2+
docker build . -t rust-driver-benchmarks-basic-java-driver-3.x-async
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/bin/bash
2+
docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-3.x-async \
3+
java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@"
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<modelVersion>4.0.0</modelVersion>
6+
7+
<groupId>org.example</groupId>
8+
<artifactId>source</artifactId>
9+
<version>1.0-SNAPSHOT</version>
10+
11+
<properties>
12+
<maven.compiler.source>8</maven.compiler.source>
13+
<maven.compiler.target>8</maven.compiler.target>
14+
</properties>
15+
<dependencies>
16+
<!-- https://mvnrepository.com/artifact/commons-cli/commons-cli -->
17+
<dependency>
18+
<groupId>commons-cli</groupId>
19+
<artifactId>commons-cli</artifactId>
20+
<version>1.5.0</version>
21+
</dependency>
22+
23+
<dependency>
24+
<groupId>com.scylladb</groupId>
25+
<artifactId>scylla-driver-core</artifactId>
26+
<version>3.11.2.0</version>
27+
</dependency>
28+
</dependencies>
29+
30+
<build>
31+
<plugins>
32+
<plugin>
33+
<groupId>org.apache.maven.plugins</groupId>
34+
<artifactId>maven-shade-plugin</artifactId>
35+
<version>3.2.4</version>
36+
<executions>
37+
<execution>
38+
<phase>package</phase>
39+
<goals>
40+
<goal>shade</goal>
41+
</goals>
42+
</execution>
43+
</executions>
44+
</plugin>
45+
</plugins>
46+
</build>
47+
48+
49+
50+
</project>
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import java.util.Arrays;
2+
3+
import org.apache.commons.cli.*;
4+
class Config{
5+
6+
enum Workload {
7+
Inserts,
8+
Selects,
9+
Mixed,
10+
};
11+
12+
String[] node_addresses;
13+
Workload workload;
14+
long tasks;
15+
long concurrency;
16+
boolean dont_prepare;
17+
int threads;
18+
19+
Config(String[] args) {
20+
21+
this.node_addresses = new String[]{"127.0.0.1"};
22+
this.workload = Workload.Inserts;
23+
this.tasks = 1000 * 1000;
24+
this.concurrency = 1024;
25+
this.dont_prepare = false;
26+
this.threads = Runtime.getRuntime().availableProcessors();
27+
28+
Options options = new Options();
29+
30+
options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark");
31+
options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma");
32+
options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)");
33+
options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects");
34+
options.addOption("c", "concurrency", true, "Maximum number of requests performed at once (concurrency/threads has to fit in java int)");
35+
options.addOption("th", "threads", true, "Number of worker threads to launch. If omitted defaults to number of cores JVM can see");
36+
37+
try {
38+
CommandLineParser parser = new DefaultParser();
39+
CommandLine cmd = parser.parse(options, args);
40+
41+
if(cmd.hasOption("dont-prepare")){
42+
this.dont_prepare = true;
43+
}
44+
45+
if(cmd.hasOption("nodes")){
46+
String value = cmd.getOptionValue("nodes");
47+
node_addresses = value.split(",");
48+
}
49+
50+
if(cmd.hasOption("workload")){
51+
String workloadValue = cmd.getOptionValue("workload");
52+
this.workload = Workload.valueOf(workloadValue);
53+
}
54+
55+
if(cmd.hasOption("tasks")){
56+
this.tasks = Integer.parseInt(cmd.getOptionValue("tasks"));
57+
}
58+
59+
if(cmd.hasOption("concurrency")){
60+
this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency"));
61+
}
62+
63+
if(cmd.hasOption("threads")){
64+
this.threads = Integer.parseInt(cmd.getOptionValue("threads"));
65+
}
66+
} catch (ParseException e){
67+
HelpFormatter helpFormatter = new HelpFormatter();
68+
helpFormatter.printHelp("./run.sh [OPTION]...", options);
69+
System.out.println();
70+
System.out.println("Unexpected exception: " + e.getMessage());
71+
}
72+
}
73+
74+
@Override
75+
public String toString() {
76+
return "Config{" +
77+
"node_addresses=" + Arrays.toString(node_addresses) +
78+
", workload=" + workload +
79+
", tasks=" + tasks +
80+
", concurrency=" + concurrency +
81+
", dont_prepare=" + dont_prepare +
82+
", threads=" + threads +
83+
'}';
84+
}
85+
}
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
import com.datastax.driver.core.*;
2+
import com.google.common.base.Function;
3+
import com.google.common.util.concurrent.AsyncFunction;
4+
import com.google.common.util.concurrent.FutureCallback;
5+
import com.google.common.util.concurrent.Futures;
6+
import com.google.common.util.concurrent.ListenableFuture;
7+
8+
import java.util.ArrayList;
9+
import java.util.concurrent.*;
10+
import java.util.concurrent.atomic.AtomicLong;
11+
12+
public class MainClass {
13+
14+
private static Cluster cluster;
15+
private static Config config;
16+
17+
private static ExecutorService executor;
18+
private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)";
19+
private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?";
20+
21+
private static long benchmarkStart;
22+
23+
private static long benchmarkEnd;
24+
25+
public static void main(String[] args) throws InterruptedException, ExecutionException {
26+
27+
config = new Config(args);
28+
System.out.println("Parsed config: ");
29+
System.out.println(config.toString());
30+
31+
cluster = Cluster.builder().addContactPoints(config.node_addresses).withProtocolVersion(ProtocolVersion.V4).build();
32+
cluster.getConfiguration().getPoolingOptions().setMaxQueueSize((int) Math.max(2048, 2 * config.concurrency));
33+
Session session = cluster.connect();
34+
35+
prepareKeyspaceAndTable(session);
36+
37+
if (!config.dont_prepare) {
38+
prepareKeyspaceAndTable(session);
39+
40+
if (config.workload.equals(Config.Workload.Selects)) {
41+
prepareSelectsBenchmark(session);
42+
}
43+
}
44+
45+
AtomicLong requestsSent = new AtomicLong(0);
46+
47+
executor = Executors.newFixedThreadPool(config.threads);
48+
49+
System.out.println("Starting the benchmark");
50+
51+
benchmarkStart = System.nanoTime();
52+
ArrayList<Future<?>> arr = new ArrayList<>();
53+
for (int i = 0; i < config.threads; i++) {
54+
arr.add(
55+
executor.submit(() -> {
56+
int permits = (int) (config.concurrency / config.threads);
57+
Semaphore semaphore = new Semaphore(permits);
58+
PreparedStatement insertQ = session.prepare(INSERT_STRING);
59+
PreparedStatement selectQ = session.prepare(SELECT_STRING);
60+
long pk;
61+
while ((pk = requestsSent.incrementAndGet()) < config.tasks) {
62+
try {
63+
semaphore.acquire();
64+
} catch (InterruptedException e) {
65+
throw new RuntimeException(e);
66+
}
67+
if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) {
68+
ListenableFuture<ResultSet> resultSetA = session.executeAsync(insertQ.bind(pk, 2L * pk, 3L * pk));
69+
ListenableFuture<ResultSet> resultSetB;
70+
71+
long finalPk = pk;
72+
if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
73+
// Chain with select depending on workload
74+
resultSetB = Futures.transform(resultSetA,
75+
new AsyncFunction<ResultSet, ResultSet>() {
76+
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
77+
return session.executeAsync(selectQ.bind(finalPk));
78+
}
79+
});
80+
} else {
81+
resultSetB = resultSetA;
82+
}
83+
ListenableFuture<Boolean> result;
84+
// Verify returned data depending on workload and return the permit
85+
if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
86+
result = Futures.transform(resultSetB,
87+
new Function<ResultSet, Boolean>() {
88+
public Boolean apply(ResultSet rs) {
89+
semaphore.release();
90+
Row r = rs.one();
91+
if ((r.getLong("v1") != 2 * finalPk) || (r.getLong("v2") != 3 * finalPk)) {
92+
throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", finalPk, 2 * finalPk, 3 * finalPk, r.getInt("pk"), r.getInt("v1"), r.getInt("v2")));
93+
}
94+
return true;
95+
}
96+
});
97+
98+
}
99+
else {
100+
result = Futures.transform(resultSetB,
101+
new Function<ResultSet, Boolean>() {
102+
public Boolean apply(ResultSet rs) {
103+
semaphore.release();
104+
return true;
105+
}
106+
});
107+
}
108+
}
109+
}
110+
try {
111+
//possible only if all futures ended successfully
112+
semaphore.acquire(permits);
113+
} catch (InterruptedException e) {
114+
throw new RuntimeException(e);
115+
}
116+
}));
117+
}
118+
119+
120+
executor.shutdown();
121+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
122+
benchmarkEnd = System.nanoTime();
123+
for (Future<?> f : arr) {
124+
f.get(); // make sure nothing has thrown
125+
}
126+
System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000));
127+
128+
session.close();
129+
if (cluster != null) cluster.close();
130+
}
131+
132+
static void prepareKeyspaceAndTable(Session session) {
133+
session.execute("DROP KEYSPACE IF EXISTS benchks");
134+
session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}");
135+
session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)");
136+
if (!cluster.getMetadata().checkSchemaAgreement()) {
137+
throw new RuntimeException("Schema not in agreement after preparing keyspace and table.");
138+
}
139+
}
140+
141+
private static void prepareSelectsBenchmark(Session session) throws InterruptedException, ExecutionException {
142+
System.out.println("Preparing a selects benchmark (inserting values)...");
143+
144+
AtomicLong nextBatchStart = new AtomicLong(0);
145+
executor = Executors.newFixedThreadPool((int) config.threads);
146+
147+
ArrayList<Future<?>> arr = new ArrayList<>();
148+
try {
149+
for (int i = 0; i < config.concurrency; i++) {
150+
arr.add(executor.submit(() -> {
151+
PreparedStatement insertQ = session.prepare(INSERT_STRING);
152+
while (true) {
153+
long curBatchStart = nextBatchStart.addAndGet(config.tasks / config.concurrency);
154+
if (curBatchStart >= config.tasks) {
155+
break;
156+
}
157+
long curBatchEnd = Math.min(curBatchStart + (config.tasks / config.concurrency), config.tasks);
158+
for (long pk = curBatchStart; pk < curBatchEnd; pk++) {
159+
session.execute(insertQ.bind(pk, 2L * pk, 3L * pk));
160+
}
161+
}
162+
}));
163+
}
164+
} finally {
165+
executor.shutdown();
166+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
167+
for (Future<?> f : arr) {
168+
f.get(); // make sure nothing has thrown
169+
}
170+
}
171+
}
172+
}
173+
174+

0 commit comments

Comments
 (0)