Skip to content

Commit bbd4b06

Browse files
committed
3.x-async code cleanup
1 parent 5399b83 commit bbd4b06

File tree

2 files changed

+47
-93
lines changed

2 files changed

+47
-93
lines changed

benchmarks/basic/java-driver-3.x-async/source/src/main/java/Config.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
import java.util.Arrays;
22

33
import org.apache.commons.cli.*;
4-
class Config{
4+
5+
class Config {
56

67
enum Workload {
7-
Inserts,
8-
Selects,
9-
Mixed,
10-
};
8+
Inserts, Selects, Mixed,
9+
}
1110

1211
String[] node_addresses;
1312
Workload workload;
1413
long tasks;
1514
long concurrency;
1615
boolean dont_prepare;
1716

18-
Config(String[] args) {
17+
Config(String[] args) {
1918

2019
this.node_addresses = new String[]{"127.0.0.1"};
2120
this.workload = Workload.Inserts;
@@ -35,29 +34,29 @@ enum Workload {
3534
CommandLineParser parser = new DefaultParser();
3635
CommandLine cmd = parser.parse(options, args);
3736

38-
if(cmd.hasOption("dont-prepare")){
37+
if (cmd.hasOption("dont-prepare")) {
3938
this.dont_prepare = true;
4039
}
4140

42-
if(cmd.hasOption("nodes")){
41+
if (cmd.hasOption("nodes")) {
4342
String value = cmd.getOptionValue("nodes");
4443
node_addresses = value.split(",");
4544
}
4645

47-
if(cmd.hasOption("workload")){
46+
if (cmd.hasOption("workload")) {
4847
String workloadValue = cmd.getOptionValue("workload");
4948
this.workload = Workload.valueOf(workloadValue);
5049
}
5150

52-
if(cmd.hasOption("tasks")){
51+
if (cmd.hasOption("tasks")) {
5352
this.tasks = Integer.parseInt(cmd.getOptionValue("tasks"));
5453
}
5554

56-
if(cmd.hasOption("concurrency")){
55+
if (cmd.hasOption("concurrency")) {
5756
this.concurrency = Integer.parseInt(cmd.getOptionValue("concurrency"));
5857
}
5958

60-
} catch (ParseException e){
59+
} catch (ParseException e) {
6160
HelpFormatter helpFormatter = new HelpFormatter();
6261
helpFormatter.printHelp("./run.sh [OPTION]...", options);
6362
System.out.println();

benchmarks/basic/java-driver-3.x-async/source/src/main/java/MainClass.java

Lines changed: 36 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import com.datastax.driver.core.*;
2-
import com.google.common.util.concurrent.AsyncFunction;
3-
import com.google.common.util.concurrent.FutureCallback;
4-
import com.google.common.util.concurrent.Futures;
5-
import com.google.common.util.concurrent.ListenableFuture;
2+
import com.google.common.util.concurrent.*;
63

74
import java.util.ArrayList;
85
import java.util.concurrent.*;
@@ -11,18 +8,12 @@ public class MainClass {
118

129
private static Cluster cluster;
1310
private static Config config;
14-
15-
private static ExecutorService executor;
1611
private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)";
1712
private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?";
1813

1914
private static PreparedStatement INSERT_PS;
2015
private static PreparedStatement SELECT_PS;
2116

22-
private static long benchmarkStart;
23-
24-
private static long benchmarkEnd;
25-
2617
public static void main(String[] args) throws InterruptedException, ExecutionException {
2718

2819
config = new Config(args);
@@ -48,29 +39,20 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
4839

4940
System.out.println("Starting the benchmark");
5041

51-
benchmarkStart = System.nanoTime();
42+
long benchmarkStart = System.nanoTime();
5243

5344
INSERT_PS = session.prepare(INSERT_STRING);
5445
SELECT_PS = session.prepare(SELECT_STRING);
5546

56-
if (config.workload == Config.Workload.Inserts) {
57-
for (int i = 0; i < config.concurrency; i++) {
58-
arr.add(executeInsert(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
59-
}
60-
} else if (config.workload == Config.Workload.Selects) {
61-
for (int i = 0; i < config.concurrency; i++) {
62-
arr.add(executeSelect(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
63-
}
64-
} else if (config.workload == Config.Workload.Mixed) {
65-
for (int i = 0; i < config.concurrency; i++) {
66-
arr.add(executeMixed(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
67-
}
47+
for (int i = 0; i < config.concurrency; i++) {
48+
arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
6849
}
6950

7051
for (Future<?> f : arr) {
7152
f.get(); // make sure nothing has thrown and everything finished
7253
}
73-
benchmarkEnd = System.nanoTime();
54+
55+
long benchmarkEnd = System.nanoTime();
7456
System.out.println(String.format("Finished\nBenchmark time: %d ms\n", (benchmarkEnd - benchmarkStart) / 1_000_000));
7557

7658
session.close();
@@ -92,83 +74,56 @@ private static void prepareSelectsBenchmark(Session session) throws InterruptedE
9274
ArrayList<CompletableFuture<ResultSet>> arr = new ArrayList<>();
9375
INSERT_PS = session.prepare(INSERT_STRING);
9476

77+
Config.Workload originalWorkload = config.workload;
78+
config.workload = Config.Workload.Inserts; // Switch for setup purposes
79+
9580
for (int i = 0; i < config.concurrency; i++) {
96-
arr.add(executeInsert(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
81+
arr.add(execute(session, i * (config.tasks / config.concurrency), (i + 1) * (config.tasks / config.concurrency)));
9782
}
9883
for (Future<?> f : arr) {
9984
f.get(); // make sure nothing has thrown and everything finished
10085
}
10186

87+
config.workload = originalWorkload;
10288
}
10389

104-
public static CompletableFuture<ResultSet> executeSelect(Session s, long currentIter, long maxIter) {
90+
public static CompletableFuture<ResultSet> execute(Session s, long currentIter, long maxIter) {
10591
if (currentIter >= maxIter) {
10692
// No more iterations
10793
return CompletableFuture.completedFuture(null);
10894
}
10995

110-
ResultSetFuture fut = s.executeAsync(SELECT_PS.bind(currentIter));
111-
112-
// Convert ResultSetFuture to CompletableFuture
113-
CompletableFuture<ResultSet> futCompletable = new CompletableFuture<>();
114-
Futures.addCallback(fut, new FutureCallback<ResultSet>() {
115-
@Override
116-
public void onSuccess(ResultSet result) {
117-
futCompletable.complete(result);
118-
}
119-
120-
@Override
121-
public void onFailure(Throwable t) {
122-
futCompletable.completeExceptionally(t);
123-
}
124-
});
125-
126-
// Execute next iteration after that
127-
return futCompletable.thenCompose(rs -> executeSelect(s, currentIter + 1, maxIter));
128-
}
129-
130-
public static CompletableFuture<ResultSet> executeInsert(Session s, long currentIter, long maxIter) {
131-
if (currentIter >= maxIter) {
132-
// No more iterations
133-
return CompletableFuture.completedFuture(null);
96+
ListenableFuture<ResultSet> fut = null;
97+
if (config.workload.equals(Config.Workload.Inserts) || config.workload.equals(Config.Workload.Mixed)) {
98+
fut = s.executeAsync(INSERT_PS.bind(currentIter, 2L * currentIter, 3L * currentIter));
13499
}
135100

136-
ResultSetFuture fut = s.executeAsync(INSERT_PS.bind(currentIter, 2L * currentIter, 3L * currentIter));
137-
138-
// Convert ResultSetFuture to CompletableFuture
139-
CompletableFuture<ResultSet> futCompletable = new CompletableFuture<>();
140-
Futures.addCallback(fut, new FutureCallback<ResultSet>() {
141-
@Override
142-
public void onSuccess(ResultSet result) {
143-
futCompletable.complete(result);
144-
}
101+
if (config.workload.equals(Config.Workload.Selects)) {
102+
fut = s.executeAsync(SELECT_PS.bind(currentIter));
145103

146-
@Override
147-
public void onFailure(Throwable t) {
148-
futCompletable.completeExceptionally(t);
149-
}
150-
});
151-
152-
// Execute next iteration after that
153-
return futCompletable.thenCompose(rs -> executeInsert(s, currentIter + 1, maxIter));
154-
}
155-
156-
public static CompletableFuture<ResultSet> executeMixed(Session s, long currentIter, long maxIter) {
157-
if (currentIter >= maxIter) {
158-
// No more iterations
159-
return CompletableFuture.completedFuture(null);
104+
} else if (config.workload.equals(Config.Workload.Mixed)) {
105+
fut = Futures.transform(fut, new AsyncFunction<ResultSet, ResultSet>() {
106+
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
107+
return (s.executeAsync(SELECT_PS.bind(currentIter)));
108+
}
109+
});
160110
}
161111

162-
ListenableFuture<ResultSet> listenableFut = s.executeAsync(INSERT_PS.bind(currentIter, 2L * currentIter, 3L * currentIter));
163-
listenableFut = Futures.transform(listenableFut, new AsyncFunction<ResultSet, ResultSet>() {
164-
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
165-
return (s.executeAsync(SELECT_PS.bind(currentIter)));
166-
}
167-
});
112+
if (config.workload.equals(Config.Workload.Selects) || config.workload.equals(Config.Workload.Mixed)) {
113+
fut = Futures.transform(fut, new AsyncFunction<ResultSet, ResultSet>() {
114+
public ListenableFuture<ResultSet> apply(ResultSet rs) throws Exception {
115+
Row r = rs.one();
116+
if ((r.getLong("v1") != 2L * currentIter) || (r.getLong("v2") != 3L * currentIter)) {
117+
throw new RuntimeException(String.format("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s).", currentIter, 2L * currentIter, 3L * currentIter, r.getLong("pk"), r.getLong("v1"), r.getLong("v2")));
118+
}
119+
return Futures.immediateFuture(rs);
120+
}
121+
});
122+
}
168123

169124
// Convert ResultSetFuture to CompletableFuture
170125
CompletableFuture<ResultSet> futCompletable = new CompletableFuture<>();
171-
Futures.addCallback(listenableFut, new FutureCallback<ResultSet>() {
126+
Futures.addCallback(fut, new FutureCallback<ResultSet>() {
172127
@Override
173128
public void onSuccess(ResultSet result) {
174129
futCompletable.complete(result);
@@ -181,7 +136,7 @@ public void onFailure(Throwable t) {
181136
});
182137

183138
// Execute next iteration after that
184-
return futCompletable.thenCompose(rs -> executeMixed(s, currentIter + 1, maxIter));
139+
return futCompletable.thenCompose(rs -> execute(s, currentIter + 1, maxIter));
185140
}
186141
}
187142

0 commit comments

Comments
 (0)