Skip to content

Commit 45ca564

Browse files
ParallelBatchFluxBuilder, adding builder visitor.
1 parent 3a7b17e commit 45ca564

2 files changed

Lines changed: 106 additions & 13 deletions

File tree

src/main/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtils.java

Lines changed: 77 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -128,40 +128,108 @@ public ParallelBatchFluxBuilder<T, B> withBatchSupplier ( Supplier<? extends Col
128128
return this;
129129
}
130130

131-
public ParallelFlux<B> build ()
131+
132+
public int getParallelism ()
133+
{
134+
return parallelism;
135+
}
136+
137+
public void setParallelism ( int parallelism )
138+
{
139+
this.parallelism = parallelism;
140+
}
141+
142+
public int getParallelismPreFetch ()
143+
{
144+
return parallelismPreFetch;
145+
}
146+
147+
public Scheduler getScheduler ()
148+
{
149+
return scheduler;
150+
}
151+
152+
public int getBatchSize ()
153+
{
154+
return batchSize;
155+
}
156+
157+
public Supplier<B> getBatchSupplier ()
158+
{
159+
return batchSupplier;
160+
}
161+
162+
/**
163+
* @param visitor if non-null, I'll call it with myself before creating the result. This can be
164+
* used to inspect a builder during the build process, to set info from defaults (eg, from
165+
* {@link #getParallelism()}).
166+
*
167+
*/
168+
public ParallelFlux<B> build ( Consumer<ParallelBatchFluxBuilder<? super T, ? super B>> visitor )
132169
{
133170
@SuppressWarnings ( "unchecked" )
134171
Flux<B> result = this.batchSupplier == null
135172
? (Flux<B>) flux.buffer ( batchSize ) : flux.buffer ( batchSize, batchSupplier );
173+
174+
if ( visitor != null ) visitor.accept ( this );
136175

137176
return result
138177
.parallel ( parallelism, parallelismPreFetch )
139178
.runOn ( scheduler );
140-
}
179+
}
180+
181+
public ParallelFlux<B> build ()
182+
{
183+
return build ( null );
184+
}
185+
141186
} // class ParallelBatchFluxBuilder
142187

143188

144189
/**
145190
* Just uses {@link ParallelBatchFluxBuilder} with its defaults.
146191
*/
147-
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Flux<? extends T> flux ) {
148-
return new ParallelBatchFluxBuilder<T, List<T>> ( flux ).build ();
192+
public static <T> ParallelFlux<List<T>> parallelBatchFlux (
193+
Flux<? extends T> flux, Consumer<ParallelBatchFluxBuilder<? super T, ? super List<T>>> visitor
194+
)
195+
{
196+
return new ParallelBatchFluxBuilder<T, List<T>> ( flux )
197+
.build ( visitor );
149198
}
150199

200+
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Flux<? extends T> flux ) {
201+
return parallelBatchFlux ( flux, null );
202+
}
203+
204+
151205
/**
152206
* Just uses {@link ParallelBatchFluxBuilder} with its defaults.
153207
*/
208+
public static <T> ParallelFlux<List<T>> parallelBatchFlux (
209+
Stream<? extends T> stream, Consumer<ParallelBatchFluxBuilder<? super T, ? super List<T>>> visitor
210+
)
211+
{
212+
return new ParallelBatchFluxBuilder<T, List<T>> ( stream ).build ( visitor );
213+
}
214+
154215
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Stream<? extends T> stream ) {
155-
return new ParallelBatchFluxBuilder<T, List<T>> ( stream ).build ();
216+
return parallelBatchFlux ( stream, null );
156217
}
157218

219+
158220
/**
159221
* Just uses {@link ParallelBatchFluxBuilder} with its defaults.
160222
*/
161-
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Collection<? extends T> collection ) {
162-
return new ParallelBatchFluxBuilder<T, List<T>> ( collection ).build ();
223+
public static <T> ParallelFlux<List<T>> parallelBatchFlux (
224+
Collection<? extends T> collection, Consumer<ParallelBatchFluxBuilder<? super T, ? super List<T>>> visitor
225+
)
226+
{
227+
return new ParallelBatchFluxBuilder<T, List<T>> ( collection ).build ( visitor );
163228
}
164229

230+
public static <T> ParallelFlux<List<T>> parallelBatchFlux ( Collection<? extends T> collection ) {
231+
return parallelBatchFlux ( collection );
232+
}
165233

166234
/**
167235
* Uses {@link ParallelBatchFluxBuilder} to process a source of batches.
@@ -191,19 +259,15 @@ public static <T> void batchProcessing (
191259
/**
192260
* Variant of {@link #batchProcessing(Flux, Consumer)}
193261
*/
194-
public static <T> void batchProcessing (
195-
Stream<T> stream, Consumer<List<T>> task
196-
)
262+
public static <T> void batchProcessing ( Stream<T> stream, Consumer<List<T>> task )
197263
{
198264
batchProcessing ( parallelBatchFlux ( stream ), task );
199265
}
200266

201267
/**
202268
* Variant of {@link #batchProcessing(Flux, Consumer)}
203269
*/
204-
public static <T> void batchProcessing (
205-
Collection<T> collection, Consumer<List<T>> task
206-
)
270+
public static <T> void batchProcessing ( Collection<T> collection, Consumer<List<T>> task )
207271
{
208272
batchProcessing ( parallelBatchFlux ( collection ), task );
209273
}

src/test/java/uk/ac/ebi/utils/opt/runcontrol/ReactorUtilsTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package uk.ac.ebi.utils.opt.runcontrol;
22

33
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
45

56
import java.util.Comparator;
67
import java.util.HashSet;
@@ -10,6 +11,8 @@
1011
import java.util.stream.IntStream;
1112
import java.util.stream.Stream;
1213

14+
import org.apache.commons.lang3.mutable.Mutable;
15+
import org.apache.commons.lang3.mutable.MutableInt;
1316
import org.junit.Test;
1417

1518
import reactor.core.publisher.ParallelFlux;
@@ -66,4 +69,30 @@ public void testBatchProcessing ()
6669
// Usual Gauss formula for Sum (1..n)
6770
assertEquals ( "Result isn't as expected!", max * (max - 1) / 2, sum.get () );
6871
}
72+
73+
74+
@Test
75+
public void testBatchProcessingWithVisitor ()
76+
{
77+
int max = 1000;
78+
79+
Stream<Integer> strm = IntStream.range ( 0, max )
80+
.mapToObj ( Integer::valueOf );
81+
82+
83+
AtomicInteger sum = new AtomicInteger ();
84+
85+
MutableInt parallelism = new MutableInt ( 0 );
86+
87+
ReactorUtils.parallelBatchFlux (
88+
strm,
89+
builder -> parallelism.setValue ( builder.getParallelism () )
90+
)
91+
.doOnNext ( b -> sum.addAndGet ( b.stream ().mapToInt ( Integer::intValue ).sum () ) )
92+
.sequential ()
93+
.blockLast ();
94+
95+
assertEquals ( "Result isn't as expected!", max * (max - 1) / 2, sum.get () );
96+
assertTrue ( "parallelism wasn't retrieved from the builder!", parallelism.getValue () > 0 );
97+
}
6998
}

0 commit comments

Comments
 (0)