11import com .datastax .driver .core .*;
2- import com .google .common .base .Function ;
32import com .google .common .util .concurrent .AsyncFunction ;
43import com .google .common .util .concurrent .FutureCallback ;
54import com .google .common .util .concurrent .Futures ;
65import com .google .common .util .concurrent .ListenableFuture ;
76
87import java .util .ArrayList ;
98import java .util .concurrent .*;
10- import java .util .concurrent .atomic .AtomicLong ;
119
1210public class MainClass {
1311
@@ -18,6 +16,9 @@ public class MainClass {
1816 private static final String INSERT_STRING = "INSERT INTO benchks.benchtab (pk, v1, v2) VALUES(?, ?, ?)" ;
1917 private static final String SELECT_STRING = "SELECT v1, v2 FROM benchks.benchtab WHERE pk = ?" ;
2018
19+ private static PreparedStatement INSERT_PS ;
20+ private static PreparedStatement SELECT_PS ;
21+
2122 private static long benchmarkStart ;
2223
2324 private static long benchmarkEnd ;
@@ -42,87 +43,34 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
4243 }
4344 }
4445
45- AtomicLong requestsSent = new AtomicLong (0 );
4646
47- executor = Executors . newFixedThreadPool ( config . threads );
47+ ArrayList < CompletableFuture < ResultSet >> arr = new ArrayList <>( );
4848
4949 System .out .println ("Starting the benchmark" );
5050
5151 benchmarkStart = System .nanoTime ();
52- ArrayList <Future <?>> arr = new ArrayList <>();
53- for (int i = 0 ; i < config .threads ; i ++) {
54- arr .add (
55- executor .submit (() -> {
56- int permits = (int ) (config .concurrency / config .threads );
57- Semaphore semaphore = new Semaphore (permits );
58- PreparedStatement insertQ = session .prepare (INSERT_STRING );
59- PreparedStatement selectQ = session .prepare (SELECT_STRING );
60- long pk ;
61- while ((pk = requestsSent .incrementAndGet ()) < config .tasks ) {
62- try {
63- semaphore .acquire ();
64- } catch (InterruptedException e ) {
65- throw new RuntimeException (e );
66- }
67- if (config .workload .equals (Config .Workload .Inserts ) || config .workload .equals (Config .Workload .Mixed )) {
68- ListenableFuture <ResultSet > resultSetA = session .executeAsync (insertQ .bind (pk , 2L * pk , 3L * pk ));
69- ListenableFuture <ResultSet > resultSetB ;
70-
71- long finalPk = pk ;
72- if (config .workload .equals (Config .Workload .Selects ) || config .workload .equals (Config .Workload .Mixed )) {
73- // Chain with select depending on workload
74- resultSetB = Futures .transform (resultSetA ,
75- new AsyncFunction <ResultSet , ResultSet >() {
76- public ListenableFuture <ResultSet > apply (ResultSet rs ) throws Exception {
77- return session .executeAsync (selectQ .bind (finalPk ));
78- }
79- });
80- } else {
81- resultSetB = resultSetA ;
82- }
83- ListenableFuture <Boolean > result ;
84- // Verify returned data depending on workload and return the permit
85- if (config .workload .equals (Config .Workload .Selects ) || config .workload .equals (Config .Workload .Mixed )) {
86- result = Futures .transform (resultSetB ,
87- new Function <ResultSet , Boolean >() {
88- public Boolean apply (ResultSet rs ) {
89- semaphore .release ();
90- Row r = rs .one ();
91- if ((r .getLong ("v1" ) != 2 * finalPk ) || (r .getLong ("v2" ) != 3 * finalPk )) {
92- throw new RuntimeException (String .format ("Received incorrect data. " + "Expected: (%s, %s, %s). " + "Received: (%s, %s ,%s)." , finalPk , 2 * finalPk , 3 * finalPk , r .getInt ("pk" ), r .getInt ("v1" ), r .getInt ("v2" )));
93- }
94- return true ;
95- }
96- });
97-
98- }
99- else {
100- result = Futures .transform (resultSetB ,
101- new Function <ResultSet , Boolean >() {
102- public Boolean apply (ResultSet rs ) {
103- semaphore .release ();
104- return true ;
105- }
106- });
107- }
108- }
109- }
110- try {
111- //possible only if all futures ended successfully
112- semaphore .acquire (permits );
113- } catch (InterruptedException e ) {
114- throw new RuntimeException (e );
115- }
116- }));
117- }
11852
53+ INSERT_PS = session .prepare (INSERT_STRING );
54+ SELECT_PS = session .prepare (SELECT_STRING );
55+
56+ if (config .workload == Config .Workload .Inserts ) {
57+ for (int i = 0 ; i < config .concurrency ; i ++) {
58+ arr .add (executeInsert (session , i * (config .tasks / config .concurrency ), (i + 1 ) * (config .tasks / config .concurrency )));
59+ }
60+ } else if (config .workload == Config .Workload .Selects ) {
61+ for (int i = 0 ; i < config .concurrency ; i ++) {
62+ arr .add (executeSelect (session , i * (config .tasks / config .concurrency ), (i + 1 ) * (config .tasks / config .concurrency )));
63+ }
64+ } else if (config .workload == Config .Workload .Mixed ) {
65+ for (int i = 0 ; i < config .concurrency ; i ++) {
66+ arr .add (executeMixed (session , i * (config .tasks / config .concurrency ), (i + 1 ) * (config .tasks / config .concurrency )));
67+ }
68+ }
11969
120- executor .shutdown ();
121- executor .awaitTermination (Long .MAX_VALUE , TimeUnit .NANOSECONDS );
122- benchmarkEnd = System .nanoTime ();
12370 for (Future <?> f : arr ) {
124- f .get (); // make sure nothing has thrown
71+ f .get (); // make sure nothing has thrown and everything finished
12572 }
73+ benchmarkEnd = System .nanoTime ();
12674 System .out .println (String .format ("Finished\n Benchmark time: %d ms\n " , (benchmarkEnd - benchmarkStart ) / 1_000_000 ));
12775
12876 session .close ();
@@ -141,33 +89,99 @@ static void prepareKeyspaceAndTable(Session session) {
14189 private static void prepareSelectsBenchmark (Session session ) throws InterruptedException , ExecutionException {
14290 System .out .println ("Preparing a selects benchmark (inserting values)..." );
14391
144- AtomicLong nextBatchStart = new AtomicLong ( 0 );
145- executor = Executors . newFixedThreadPool (( int ) config . threads );
92+ ArrayList < CompletableFuture < ResultSet >> arr = new ArrayList <>( );
93+ INSERT_PS = session . prepare ( INSERT_STRING );
14694
147- ArrayList <Future <?>> arr = new ArrayList <>();
148- try {
149- for (int i = 0 ; i < config .concurrency ; i ++) {
150- arr .add (executor .submit (() -> {
151- PreparedStatement insertQ = session .prepare (INSERT_STRING );
152- while (true ) {
153- long curBatchStart = nextBatchStart .addAndGet (config .tasks / config .concurrency );
154- if (curBatchStart >= config .tasks ) {
155- break ;
156- }
157- long curBatchEnd = Math .min (curBatchStart + (config .tasks / config .concurrency ), config .tasks );
158- for (long pk = curBatchStart ; pk < curBatchEnd ; pk ++) {
159- session .execute (insertQ .bind (pk , 2L * pk , 3L * pk ));
160- }
161- }
162- }));
95+ for (int i = 0 ; i < config .concurrency ; i ++) {
96+ arr .add (executeInsert (session , i * (config .tasks / config .concurrency ), (i + 1 ) * (config .tasks / config .concurrency )));
97+ }
98+ for (Future <?> f : arr ) {
99+ f .get (); // make sure nothing has thrown and everything finished
100+ }
101+
102+ }
103+
104+ public static CompletableFuture <ResultSet > executeSelect (Session s , long currentIter , long maxIter ) {
105+ if (currentIter >= maxIter ) {
106+ // No more iterations
107+ return CompletableFuture .completedFuture (null );
108+ }
109+
110+ ResultSetFuture fut = s .executeAsync (SELECT_PS .bind (currentIter ));
111+
112+ // Convert ResultSetFuture to CompletableFuture
113+ CompletableFuture <ResultSet > futCompletable = new CompletableFuture <>();
114+ Futures .addCallback (fut , new FutureCallback <ResultSet >() {
115+ @ Override
116+ public void onSuccess (ResultSet result ) {
117+ futCompletable .complete (result );
118+ }
119+
120+ @ Override
121+ public void onFailure (Throwable t ) {
122+ futCompletable .completeExceptionally (t );
123+ }
124+ });
125+
126+ // Execute next iteration after that
127+ return futCompletable .thenCompose (rs -> executeSelect (s , currentIter + 1 , maxIter ));
128+ }
129+
130+ public static CompletableFuture <ResultSet > executeInsert (Session s , long currentIter , long maxIter ) {
131+ if (currentIter >= maxIter ) {
132+ // No more iterations
133+ return CompletableFuture .completedFuture (null );
134+ }
135+
136+ ResultSetFuture fut = s .executeAsync (INSERT_PS .bind (currentIter , 2L * currentIter , 3L * currentIter ));
137+
138+ // Convert ResultSetFuture to CompletableFuture
139+ CompletableFuture <ResultSet > futCompletable = new CompletableFuture <>();
140+ Futures .addCallback (fut , new FutureCallback <ResultSet >() {
141+ @ Override
142+ public void onSuccess (ResultSet result ) {
143+ futCompletable .complete (result );
163144 }
164- } finally {
165- executor .shutdown ();
166- executor .awaitTermination (Long .MAX_VALUE , TimeUnit .NANOSECONDS );
167- for (Future <?> f : arr ) {
168- f .get (); // make sure nothing has thrown
145+
146+ @ Override
147+ public void onFailure (Throwable t ) {
148+ futCompletable .completeExceptionally (t );
169149 }
150+ });
151+
152+ // Execute next iteration after that
153+ return futCompletable .thenCompose (rs -> executeInsert (s , currentIter + 1 , maxIter ));
154+ }
155+
156+ public static CompletableFuture <ResultSet > executeMixed (Session s , long currentIter , long maxIter ) {
157+ if (currentIter >= maxIter ) {
158+ // No more iterations
159+ return CompletableFuture .completedFuture (null );
170160 }
161+
162+ ListenableFuture <ResultSet > listenableFut = s .executeAsync (INSERT_PS .bind (currentIter , 2L * currentIter , 3L * currentIter ));
163+ listenableFut = Futures .transform (listenableFut , new AsyncFunction <ResultSet , ResultSet >() {
164+ public ListenableFuture <ResultSet > apply (ResultSet rs ) throws Exception {
165+ return (s .executeAsync (SELECT_PS .bind (currentIter )));
166+ }
167+ });
168+
169+ // Convert ResultSetFuture to CompletableFuture
170+ CompletableFuture <ResultSet > futCompletable = new CompletableFuture <>();
171+ Futures .addCallback (listenableFut , new FutureCallback <ResultSet >() {
172+ @ Override
173+ public void onSuccess (ResultSet result ) {
174+ futCompletable .complete (result );
175+ }
176+
177+ @ Override
178+ public void onFailure (Throwable t ) {
179+ futCompletable .completeExceptionally (t );
180+ }
181+ });
182+
183+ // Execute next iteration after that
184+ return futCompletable .thenCompose (rs -> executeMixed (s , currentIter + 1 , maxIter ));
171185 }
172186}
173187
0 commit comments