diff --git a/benchmarks/basic/java-driver-3.x-async/Dockerfile b/benchmarks/basic/java-driver-3.x-async/Dockerfile new file mode 100644 index 0000000..be93b18 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x-async/Dockerfile @@ -0,0 +1,15 @@ +FROM ubuntu:18.04 +RUN apt update + +# Install java 8 +RUN apt install -y openjdk-8-jdk +RUN apt install -y maven +#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java +RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version + +# Copy benchmark code into the container +COPY source /source +WORKDIR /source + +# Compile the code +RUN mvn clean package diff --git a/benchmarks/basic/java-driver-3.x-async/build.sh b/benchmarks/basic/java-driver-3.x-async/build.sh new file mode 100755 index 0000000..d54523b --- /dev/null +++ b/benchmarks/basic/java-driver-3.x-async/build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker build . -t rust-driver-benchmarks-basic-java-driver-3.x-async diff --git a/benchmarks/basic/java-driver-3.x-async/run.sh b/benchmarks/basic/java-driver-3.x-async/run.sh new file mode 100755 index 0000000..26fd829 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x-async/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-3.x-async \ +java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@" diff --git a/benchmarks/basic/java-driver-3.x-async/source/pom.xml b/benchmarks/basic/java-driver-3.x-async/source/pom.xml new file mode 100644 index 0000000..dcdf49d --- /dev/null +++ b/benchmarks/basic/java-driver-3.x-async/source/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + org.example + source + 1.0-SNAPSHOT + + + 8 + 8 + + + + + commons-cli + commons-cli + 1.5.0 + + + + com.scylladb + scylla-driver-core + 3.11.2.0 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + + + + \ No newline at end of file diff --git a/benchmarks/basic/java-driver-3.x-async/source/src/main/java/Config.java b/benchmarks/basic/java-driver-3.x-async/source/src/main/java/Config.java new file mode 100644 index 0000000..a0153a8 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x-async/source/src/main/java/Config.java @@ -0,0 +1,77 @@ +import java.util.Arrays; + +import org.apache.commons.cli.*; + +class Config { + + enum Workload { + Inserts, Selects, Mixed, + } + + String[] node_addresses; + Workload workload; + long tasks; + long concurrency; + boolean dont_prepare; + + Config(String[] args) { + + this.node_addresses = new String[]{"127.0.0.1"}; + this.workload = Workload.Inserts; + this.tasks = 1000 * 1000; + this.concurrency = 1024; + this.dont_prepare = false; + + Options options = new Options(); + + options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark"); + options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma"); + options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)"); + options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects"); + options.addOption("c", "concurrency", true, "Maximum number of requests performed at once"); + + try { + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + if (cmd.hasOption("dont-prepare")) { + this.dont_prepare = true; + } + + if (cmd.hasOption("nodes")) { + String value = cmd.getOptionValue("nodes"); + node_addresses = value.split(","); + } + + if (cmd.hasOption("workload")) { + String workloadValue = cmd.getOptionValue("workload"); + this.workload = Workload.valueOf(workloadValue); + } + + if (cmd.hasOption("tasks")) { + this.tasks = Integer.parseInt(cmd.getOptionValue("tasks")); + } + + if (cmd.hasOption("concurrency")) { + this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency")); + } + + } catch (ParseException e) { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("./run.sh [OPTION]...", options); + System.out.println(); + System.out.println("Unexpected exception: " + e.getMessage()); + } + } + + @Override + public String toString() { + return "Config{" + + "node_addresses=" + Arrays.toString(node_addresses) + + ", workload=" + workload + + ", tasks=" + tasks + + ", concurrency=" + concurrency + + ", dont_prepare=" + dont_prepare + + '}'; + } +} \ No newline at end of file diff --git a/benchmarks/basic/java-driver-3.x-async/source/src/main/java/MainClass.java b/benchmarks/basic/java-driver-3.x-async/source/src/main/java/MainClass.java new file mode 100644 index 0000000..99c6abe --- /dev/null +++ b/benchmarks/basic/java-driver-3.x-async/source/src/main/java/MainClass.java @@ -0,0 +1,151 @@ +import com.datastax.driver.core.*; +import com.google.common.util.concurrent.*; + +import java.util.ArrayList; +import java.util.concurrent.*; + +public class MainClass { + + private static Cluster cluster; + private static Config config; + private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)"; + private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?"; + + private static PreparedStatement INSERT_PS; + private static PreparedStatement SELECT_PS; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + + config = new Config(args); + System.out.println("Parsed config: "); + System.out.println(config.toString()); + + cluster = Cluster.builder().addContactPoints(config.node_addresses).withProtocolVersion(ProtocolVersion.V4).build(); + cluster.getConfiguration().getPoolingOptions().setMaxQueueSize((int) Math.max(2048, 2 * config.concurrency)); + Session session = cluster.connect(); + + prepareKeyspaceAndTable(session); + + if (!config.dont_prepare) { + prepareKeyspaceAndTable(session); + + if (config.workload.equals(Config.Workload.Selects)) { + prepareSelectsBenchmark(session); + } + } + + + ArrayList> arr = new ArrayList<>(); + + System.out.println("Starting the benchmark"); + + long benchmarkStart = System.nanoTime(); + + INSERT_PS = session.prepare(INSERT_STRING); + SELECT_PS = session.prepare(SELECT_STRING); + + for (int i = 0; i < config.concurrency; i++) { + if (i + 1 == config.concurrency) { + arr.add(execute(session, i * (config.tasks / config.concurrency), config.tasks)); + } else { + arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency))); + } + } + + for (Future f : arr) { + f.get(); // make sure nothing has thrown and everything finished + } + + long benchmarkEnd = System.nanoTime(); + System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000)); + + session.close(); + if (cluster != null) cluster.close(); + } + + static void prepareKeyspaceAndTable(Session session) { + session.execute("DROP KEYSPACE IF EXISTS benchks"); + session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}"); + session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)"); + if (!cluster.getMetadata().checkSchemaAgreement()) { + throw new RuntimeException("Schema not in agreement after preparing keyspace and table."); + } + } + + private static void prepareSelectsBenchmark(Session session) throws InterruptedException, ExecutionException { + System.out.println("Preparing a selects benchmark (inserting values)..."); + + ArrayList> arr = new ArrayList<>(); + INSERT_PS = session.prepare(INSERT_STRING); + + Config.Workload originalWorkload = config.workload; + config.workload = Config.Workload.Inserts; // Switch for setup purposes + + for (int i = 0; i < config.concurrency; i++) { + if (i + 1 == config.concurrency) { + arr.add(execute(session, i * (config.tasks / config.concurrency), config.tasks)); + } else { + arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency))); + } + } + for (Future f : arr) { + f.get(); // make sure nothing has thrown and everything finished + } + + config.workload = originalWorkload; + } + + public static CompletableFuture execute(Session s, long currentIter, long maxIter) { + if (currentIter >= maxIter) { + // No more iterations + return CompletableFuture.completedFuture(null); + } + + ListenableFuture fut = null; + if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) { + fut = s.executeAsync(INSERT_PS.bind(currentIter, 2L * currentIter, 3L * currentIter)); + } + + if (config.workload.equals(Config.Workload.Selects)) { + fut = s.executeAsync(SELECT_PS.bind(currentIter)); + + } else if (config.workload.equals(Config.Workload.Mixed)) { + fut = Futures.transform(fut, new AsyncFunction() { + public ListenableFuture apply(ResultSet rs) throws Exception { + return (s.executeAsync(SELECT_PS.bind(currentIter))); + } + }); + } + + if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) { + fut = Futures.transform(fut, new AsyncFunction() { + public ListenableFuture apply(ResultSet rs) throws Exception { + Row r = rs.one(); + if ((r.getLong("v1") != 2L * currentIter) || (r.getLong("v2") != 3L * currentIter)) { + throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", currentIter, 2L * currentIter, 3L * currentIter, r.getLong("pk"), r.getLong("v1"), r.getLong("v2"))); + } + return Futures.immediateFuture(rs); + } + }); + } + + // Convert ResultSetFuture to CompletableFuture + CompletableFuture futCompletable = new CompletableFuture<>(); + Futures.addCallback(fut, new FutureCallback() { + @Override + public void onSuccess(ResultSet result) { + futCompletable.complete(result); + } + + @Override + public void onFailure(Throwable t) { + futCompletable.completeExceptionally(t); + } + }); + + // Execute next iteration after that + return futCompletable.thenCompose(rs -> execute(s, currentIter + 1, maxIter)); + } +} + + diff --git a/benchmarks/basic/java-driver-3.x/Dockerfile b/benchmarks/basic/java-driver-3.x/Dockerfile new file mode 100644 index 0000000..be93b18 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x/Dockerfile @@ -0,0 +1,15 @@ +FROM ubuntu:18.04 +RUN apt update + +# Install java 8 +RUN apt install -y openjdk-8-jdk +RUN apt install -y maven +#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java +RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version + +# Copy benchmark code into the container +COPY source /source +WORKDIR /source + +# Compile the code +RUN mvn clean package diff --git a/benchmarks/basic/java-driver-3.x/build.sh b/benchmarks/basic/java-driver-3.x/build.sh new file mode 100755 index 0000000..a8393dd --- /dev/null +++ b/benchmarks/basic/java-driver-3.x/build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker build . -t rust-driver-benchmarks-basic-java-driver-3.x diff --git a/benchmarks/basic/java-driver-3.x/run.sh b/benchmarks/basic/java-driver-3.x/run.sh new file mode 100755 index 0000000..e4020f7 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-3.x \ +java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@" diff --git a/benchmarks/basic/java-driver-3.x/source/pom.xml b/benchmarks/basic/java-driver-3.x/source/pom.xml new file mode 100644 index 0000000..dcdf49d --- /dev/null +++ b/benchmarks/basic/java-driver-3.x/source/pom.xml @@ -0,0 +1,50 @@ + + + 4.0.0 + + org.example + source + 1.0-SNAPSHOT + + + 8 + 8 + + + + + commons-cli + commons-cli + 1.5.0 + + + + com.scylladb + scylla-driver-core + 3.11.2.0 + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + + + + \ No newline at end of file diff --git a/benchmarks/basic/java-driver-3.x/source/src/main/java/Config.java b/benchmarks/basic/java-driver-3.x/source/src/main/java/Config.java new file mode 100644 index 0000000..57ccbb9 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x/source/src/main/java/Config.java @@ -0,0 +1,88 @@ +import java.util.Arrays; + +import org.apache.commons.cli.*; + +import static java.lang.Math.max; + +class Config{ + + enum Workload { + Inserts, + Selects, + Mixed, + }; + + String[] node_addresses; + Workload workload; + long tasks; + long concurrency; + long batch_size; + boolean dont_prepare; + + Config(String[] args) { + + this.node_addresses = new String[]{"127.0.0.1"}; + this.workload = Workload.Inserts; + this.tasks = 1000 * 1000; + this.concurrency = 1024; + this.dont_prepare = false; + + Options options = new Options(); + + options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark"); + options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma"); + options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)"); + options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects"); + options.addOption("c", "concurrency", true, "Maximum number of requests performed at once"); + + try { + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + if(cmd.hasOption("dont-prepare")){ + this.dont_prepare = true; + } + + if(cmd.hasOption("nodes")){ + String value = cmd.getOptionValue("nodes"); + node_addresses = value.split(","); + } + + if(cmd.hasOption("workload")){ + String workloadValue = cmd.getOptionValue("workload"); + this.workload = Workload.valueOf(workloadValue); + } + + if(cmd.hasOption("tasks")){ + this.tasks = Integer.parseInt(cmd.getOptionValue("tasks")); + } + + if(cmd.hasOption("concurrency")){ + this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency")); + } + } catch (ParseException e){ + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("./run.sh [OPTION]...", options); + System.out.println(); + System.out.println("Unexpected exception: " + e.getMessage()); + } + + batch_size = 256; + + if (this.tasks/this.batch_size < this.concurrency) { + this.batch_size = max(1, this.tasks / this.concurrency); + } + } + + @Override + public String toString() { + return "Config{" + + "node_addresses=" + Arrays.toString(node_addresses) + + ", workload=" + workload + + ", tasks=" + tasks + + ", concurrency=" + concurrency + + ", batch_size=" + batch_size + + ", dont_prepare=" + dont_prepare + + '}'; + } +} \ No newline at end of file diff --git a/benchmarks/basic/java-driver-3.x/source/src/main/java/MainClass.java b/benchmarks/basic/java-driver-3.x/source/src/main/java/MainClass.java new file mode 100644 index 0000000..71ce188 --- /dev/null +++ b/benchmarks/basic/java-driver-3.x/source/src/main/java/MainClass.java @@ -0,0 +1,134 @@ +import com.datastax.driver.core.*; + +import java.util.ArrayList; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +public class MainClass { + + private static Cluster cluster; + private static Config config; + + private static ExecutorService executor; + private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)"; + private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?"; + + private static long benchmarkStart; + + private static long benchmarkEnd; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + + config = new Config(args); + System.out.println("Parsed config: "); + System.out.println(config.toString()); + + cluster = Cluster.builder().addContactPoints(config.node_addresses).withProtocolVersion(ProtocolVersion.V4).build(); + cluster.getConfiguration().getPoolingOptions().setMaxQueueSize((int) Math.max(2048, 2 * config.concurrency)); + Session session = cluster.connect(); + + prepareKeyspaceAndTable(session); + + if (!config.dont_prepare) { + prepareKeyspaceAndTable(session); + + if (config.workload.equals(Config.Workload.Selects)) { + prepareSelectsBenchmark(session); + } + } + + AtomicLong nextBatchStart = new AtomicLong(0); + + executor = Executors.newFixedThreadPool((int) config.concurrency); + + System.out.println("Starting the benchmark"); + + benchmarkStart = System.nanoTime(); + ArrayList> arr = new ArrayList<>(); + for (int i = 0; i < config.concurrency; i++) { + arr.add( + executor.submit(() -> { + PreparedStatement insertQ = session.prepare(INSERT_STRING); + PreparedStatement selectQ = session.prepare(SELECT_STRING); + while (true) { + + long curBatchStart = nextBatchStart.addAndGet(config.batch_size); + if (curBatchStart >= config.tasks) { + break; + } + + long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks); + + for (long pk = curBatchStart; pk < curBatchEnd; pk++) { + if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) { + session.execute(insertQ.bind(pk, 2L * pk, 3L * pk)); + } + + if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) { + ResultSet rs = session.execute(selectQ.bind(pk)); + Row r = rs.one(); + if ((r.getLong("v1") != 2 * pk) || (r.getLong("v2") != 3 * pk)) { + throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", pk, 2 * pk, 3 * pk, r.getInt("pk"), r.getInt("v1"), r.getInt("v2"))); + } + } + } + } + })); + } + + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + benchmarkEnd = System.nanoTime(); + for (Future f : arr) { + f.get(); // make sure nothing has thrown + } + System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000)); + + session.close(); + if (cluster != null) cluster.close(); + } + + static void prepareKeyspaceAndTable(Session session) { + session.execute("DROP KEYSPACE IF EXISTS benchks"); + session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}"); + session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)"); + if (!cluster.getMetadata().checkSchemaAgreement()) { + throw new RuntimeException("Schema not in agreement after preparing keyspace and table."); + } + } + + private static void prepareSelectsBenchmark(Session session) throws InterruptedException, ExecutionException { + System.out.println("Preparing a selects benchmark (inserting values)..."); + + AtomicLong nextBatchStart = new AtomicLong(0); + executor = Executors.newFixedThreadPool((int) config.concurrency); + + ArrayList> arr = new ArrayList<>(); + try { + for (int i = 0; i < config.concurrency; i++) { + arr.add(executor.submit(() -> { + PreparedStatement insertQ = session.prepare(INSERT_STRING); + while (true) { + long curBatchStart = nextBatchStart.addAndGet(config.batch_size); + if (curBatchStart >= config.tasks) { + break; + } + long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks); + for (long pk = curBatchStart; pk < curBatchEnd; pk++) { + session.execute(insertQ.bind(pk, 2L * pk, 3L * pk)); + } + } + })); + } + } finally { + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + for (Future f : arr) { + f.get(); // make sure nothing has thrown + } + } + } + +} + + diff --git a/benchmarks/basic/java-driver-4.x/Dockerfile b/benchmarks/basic/java-driver-4.x/Dockerfile new file mode 100644 index 0000000..be93b18 --- /dev/null +++ b/benchmarks/basic/java-driver-4.x/Dockerfile @@ -0,0 +1,15 @@ +FROM ubuntu:18.04 +RUN apt update + +# Install java 8 +RUN apt install -y openjdk-8-jdk +RUN apt install -y maven +#RUN update-alternatives --set java /usr/lib/jvm/jdk1.8.0_version/bin/java +RUN export JAVA_HOME=/usr/lib/jvm/jdk1.8.0_version + +# Copy benchmark code into the container +COPY source /source +WORKDIR /source + +# Compile the code +RUN mvn clean package diff --git a/benchmarks/basic/java-driver-4.x/build.sh b/benchmarks/basic/java-driver-4.x/build.sh new file mode 100755 index 0000000..a6cf5a4 --- /dev/null +++ b/benchmarks/basic/java-driver-4.x/build.sh @@ -0,0 +1,2 @@ +#!/bin/bash +docker build . -t rust-driver-benchmarks-basic-java-driver-4.x diff --git a/benchmarks/basic/java-driver-4.x/run.sh b/benchmarks/basic/java-driver-4.x/run.sh new file mode 100755 index 0000000..5d70ac8 --- /dev/null +++ b/benchmarks/basic/java-driver-4.x/run.sh @@ -0,0 +1,3 @@ +#!/bin/bash +docker run --rm -it --network host rust-driver-benchmarks-basic-java-driver-4.x \ +java -cp /source/target/source-1.0-SNAPSHOT.jar MainClass "$@" diff --git a/benchmarks/basic/java-driver-4.x/source/pom.xml b/benchmarks/basic/java-driver-4.x/source/pom.xml new file mode 100644 index 0000000..74f6ab1 --- /dev/null +++ b/benchmarks/basic/java-driver-4.x/source/pom.xml @@ -0,0 +1,64 @@ + + + 4.0.0 + + org.example + source + 1.0-SNAPSHOT + + + 8 + 8 + 4.14.1.0 + + + + + commons-cli + commons-cli + 1.5.0 + + + + com.scylladb + java-driver-core + ${driver.version} + + + + com.scylladb + java-driver-query-builder + ${driver.version} + + + + com.scylladb + java-driver-mapper-runtime + ${driver.version} + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + package + + shade + + + + + + + + + + \ No newline at end of file diff --git a/benchmarks/basic/java-driver-4.x/source/src/main/java/Config.java b/benchmarks/basic/java-driver-4.x/source/src/main/java/Config.java new file mode 100644 index 0000000..cf1d94e --- /dev/null +++ b/benchmarks/basic/java-driver-4.x/source/src/main/java/Config.java @@ -0,0 +1,88 @@ +import java.util.Arrays; + +import org.apache.commons.cli.*; + +import static java.lang.Math.max; + +class Config{ + + enum Workload { + Inserts, + Selects, + Mixed, + }; + + String[] node_addresses; + Workload workload; + long tasks; + long concurrency; + long batch_size; + boolean dont_prepare; + + Config(String[] args) { + + this.node_addresses = new String[]{"127.0.0.1"}; + this.workload = Workload.Inserts; + this.tasks = 1000 * 1000; + this.concurrency = 1024; + this.dont_prepare = false; + + Options options = new Options(); + + options.addOption("d", "dont-prepare", false, "Don't create tables and insert into them before the benchmark"); + options.addOption("n", "nodes", true, "Addresses of database nodes to connect to separated by a comma"); + options.addOption("w", "workload", true, "Type of work to perform (Inserts, Selects, Mixed)"); + options.addOption("t", "tasks", true, "Total number of tasks (requests) to perform the during benchmark. In case of mixed workload there will be tasks inserts and tasks selects"); + options.addOption("c", "concurrency", true, "Maximum number of requests performed at once"); + + try { + CommandLineParser parser = new DefaultParser(); + CommandLine cmd = parser.parse(options, args); + + if(cmd.hasOption("dont-prepare")){ + this.dont_prepare = true; + } + + if(cmd.hasOption("nodes")){ + String value = cmd.getOptionValue("nodes"); + node_addresses = value.split(","); + } + + if(cmd.hasOption("workload")){ + String workloadValue = cmd.getOptionValue("workload"); + this.workload = Workload.valueOf(workloadValue); + } + + if(cmd.hasOption("tasks")){ + this.tasks = Integer.parseInt(cmd.getOptionValue("tasks")); + } + + if(cmd.hasOption("concurrency")){ + this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency")); + } + } catch (ParseException e){ + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp("./run.sh [OPTION]...", options); + System.out.println(); + System.out.println("Unexpected exception: " + e.getMessage()); + } + + batch_size = 256; + + if (this.tasks/this.batch_size < this.concurrency) { + this.batch_size = max(1, this.tasks / this.concurrency); + } + } + + @Override + public String toString() { + return "Config{" + + "node_addresses=" + Arrays.toString(node_addresses) + + ", workload=" + workload + + ", tasks=" + tasks + + ", concurrency=" + concurrency + + ", batch_size=" + batch_size + + ", dont_prepare=" + dont_prepare + + '}'; + } +} \ No newline at end of file diff --git a/benchmarks/basic/java-driver-4.x/source/src/main/java/MainClass.java b/benchmarks/basic/java-driver-4.x/source/src/main/java/MainClass.java new file mode 100644 index 0000000..534a618 --- /dev/null +++ b/benchmarks/basic/java-driver-4.x/source/src/main/java/MainClass.java @@ -0,0 +1,163 @@ +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.api.core.config.DriverConfigLoader; +import com.datastax.oss.driver.api.core.cql.PreparedStatement; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; + +public class MainClass { + private static Config config; + + private static ExecutorService executor; + private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)"; + private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?"; + + private static long benchmarkStart; + + private static long benchmarkEnd; + + public static void main(String[] args) throws InterruptedException, ExecutionException { + + config = new Config(args); + System.out.println("Parsed config: "); + System.out.println(config.toString()); + + DriverConfigLoader loader = + DriverConfigLoader.programmaticBuilder() + .withString(DefaultDriverOption.PROTOCOL_VERSION, "V4") + .withLong(DefaultDriverOption.REQUEST_THROTTLER_MAX_QUEUE_SIZE, Math.max(2048, 2L * config.concurrency)) + .build(); + + CqlSession session = CqlSession + .builder() + .withConfigLoader(loader) + .addContactPoints(parseAddresses(config.node_addresses)) + .withLocalDatacenter("datacenter1") + .build(); + + prepareKeyspaceAndTable(session); + + if (!config.dont_prepare) { + prepareKeyspaceAndTable(session); + + if (config.workload.equals(Config.Workload.Selects)) { + prepareSelectsBenchmark(session); + } + } + + AtomicLong nextBatchStart = new AtomicLong(0); + + executor = Executors.newFixedThreadPool((int) config.concurrency); + + System.out.println("Starting the benchmark"); + + benchmarkStart = System.nanoTime(); + ArrayList> arr = new ArrayList<>(); + for (int i = 0; i < config.concurrency; i++) { + arr.add( + executor.submit(() -> { + PreparedStatement insertQ = session.prepare(INSERT_STRING); + PreparedStatement selectQ = session.prepare(SELECT_STRING); + while (true) { + + long curBatchStart = nextBatchStart.addAndGet(config.batch_size); + if (curBatchStart >= config.tasks) { + break; + } + + long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks); + + for (long pk = curBatchStart; pk < curBatchEnd; pk++) { + if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) { + session.execute(insertQ.bind(pk, 2L * pk, 3L * pk)); + } + + if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) { + ResultSet rs = session.execute(selectQ.bind(pk)); + Row r = rs.one(); + assert r != null; + if ((r.getLong("v1") != 2 * pk) || (r.getLong("v2") != 3 * pk)) { + throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", pk, 2 * pk, 3 * pk, r.getInt("pk"), r.getInt("v1"), r.getInt("v2"))); + } + } + } + } + })); + } + + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + benchmarkEnd = System.nanoTime(); + for (Future f : arr) { + f.get(); // make sure nothing has thrown + } + System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000)); + + session.close(); + } + + static void prepareKeyspaceAndTable(CqlSession session) { + session.execute("DROP KEYSPACE IF EXISTS benchks"); + session.execute("CREATE KEYSPACE IF NOT EXISTS benchks WITH REPLICATION = {'class' " + ": 'SimpleStrategy', 'replication_factor' : 1}"); + session.execute("CREATE TABLE IF NOT EXISTS benchks.benchtab (pk " + "bigint PRIMARY KEY, v1 bigint, v2 bigint)"); + if (!session.checkSchemaAgreement()) { + throw new RuntimeException("Schema not in agreement after preparing keyspace and table."); + } + } + + private static void prepareSelectsBenchmark(CqlSession session) throws InterruptedException, ExecutionException { + System.out.println("Preparing a selects benchmark (inserting values)..."); + + AtomicLong nextBatchStart = new AtomicLong(0); + executor = Executors.newFixedThreadPool((int) config.concurrency); + + ArrayList> arr = new ArrayList<>(); + try { + for (int i = 0; i < config.concurrency; i++) { + arr.add(executor.submit(() -> { + PreparedStatement insertQ = session.prepare(INSERT_STRING); + while (true) { + long curBatchStart = nextBatchStart.addAndGet(config.batch_size); + if (curBatchStart >= config.tasks) { + break; + } + long curBatchEnd = Math.min(curBatchStart + config.batch_size, config.tasks); + for (long pk = curBatchStart; pk < curBatchEnd; pk++) { + session.execute(insertQ.bind(pk, 2L * pk, 3L * pk)); + } + } + })); + } + } finally { + executor.shutdown(); + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + for (Future f : arr) { + f.get(); // make sure nothing has thrown + } + } + } + + private static List parseAddresses(String[] arr){ + ArrayList result = new ArrayList<>(); + for(String s : arr){ + InetSocketAddress addr; + if(s.contains(":")){ + String[] tmp = s.split(":"); + addr = new InetSocketAddress(tmp[0], Integer.parseInt(tmp[1])); + } + else { + addr = new InetSocketAddress(s, 9042); + } + result.add(addr); + } + return result; + } +} + +