Skip to content

Commit bbf4540

Browse files
committed
refactor: replace Tuple2 with Extraction and increase coverage
1 parent fcb52bb commit bbf4540

7 files changed

Lines changed: 107 additions & 42 deletions

File tree

implementation/src/main/java/io/smallrye/mutiny/groups/Gatherer.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import io.smallrye.common.annotation.CheckReturnValue;
1111
import io.smallrye.mutiny.Multi;
12-
import io.smallrye.mutiny.tuples.Tuple2;
1312

1413
/**
1514
* A Gatherer operator transforms a stream of items by accumulating them into an accumulator and extracting
@@ -42,10 +41,11 @@ public interface Gatherer<I, ACC, O> {
4241
*
4342
* @param accumulator the current accumulator
4443
* @param upstreamCompleted whether the upstream has completed
45-
* @return an Optional containing a Tuple2 with the updated accumulator and the extracted item, or an empty Optional if no
44+
* @return an Optional containing a Extraction with the updated accumulator and the extracted item, or an empty Optional if
45+
* no
4646
* item can be extracted
4747
*/
48-
Optional<Tuple2<ACC, O>> extract(ACC accumulator, boolean upstreamCompleted);
48+
Optional<Extraction<ACC, O>> extract(ACC accumulator, boolean upstreamCompleted);
4949

5050
/**
5151
* Finalizes the accumulator and extracts the final item, if any.
@@ -56,6 +56,30 @@ public interface Gatherer<I, ACC, O> {
5656
*/
5757
Optional<O> finalize(ACC accumulator);
5858

59+
/**
60+
* An extraction result containing the next accumulator and the next item to emit.
61+
*
62+
* @param nextAccumulator the next accumulator
63+
* @param nextItem the next item to emit
64+
* @param <ACC> the type of the accumulator
65+
* @param <O> the type of the item to emit
66+
*/
67+
record Extraction<ACC, O>(ACC nextAccumulator, O nextItem) {
68+
69+
/**
70+
* Creates a new {@link Extraction} instance.
71+
*
72+
* @param nextAccumulator the next accumulator
73+
* @param nextItem the next item to emit
74+
* @return a new {@link Extraction} instance
75+
* @param <ACC> the type of the accumulator
76+
* @param <O> the type of the item to emit
77+
*/
78+
public static <ACC, O> Extraction<ACC, O> of(ACC nextAccumulator, O nextItem) {
79+
return new Extraction<>(nextAccumulator, nextItem);
80+
}
81+
}
82+
5983
/**
6084
* Builder for creating a {@link Gatherer}.
6185
*
@@ -130,15 +154,15 @@ private ExtractStep(Supplier<ACC> initialAccumulatorSupplier, BiFunction<ACC, I,
130154
* When the extractor function returns an empty {@link Optional}, no value is emitted.
131155
* When the extractor function returns a non-empty {@link Optional}, the value is emitted, and the accumulator is
132156
* updated.
133-
* This is done by returning a {@link Tuple2} containing the new accumulator and the value to emit.
157+
* This is done by returning a {@link Extraction} containing the new accumulator and the value to emit.
134158
*
135159
* @param extractor the extractor function, which takes the current accumulator and returns an {@link Optional}
136-
* containing a {@link Tuple2} with the new accumulator and the value to emit
160+
* containing a {@link Extraction} with the new accumulator and the value to emit
137161
* @param <O> the type of the value to emit
138162
* @return the next step in the builder
139163
*/
140164
@CheckReturnValue
141-
public <O> FinalizerStep<I, ACC, O> extract(BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor) {
165+
public <O> FinalizerStep<I, ACC, O> extract(BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor) {
142166
nonNull(extractor, "extractor");
143167
return new FinalizerStep<>(initialAccumulatorSupplier, accumulator, extractor);
144168
}
@@ -154,11 +178,11 @@ public <O> FinalizerStep<I, ACC, O> extract(BiFunction<ACC, Boolean, Optional<Tu
154178
class FinalizerStep<I, ACC, O> {
155179
private final Supplier<ACC> initialAccumulatorSupplier;
156180
private final BiFunction<ACC, I, ACC> accumulator;
157-
private final BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor;
181+
private final BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor;
158182

159183
private FinalizerStep(Supplier<ACC> initialAccumulatorSupplier,
160184
BiFunction<ACC, I, ACC> accumulator,
161-
BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor) {
185+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor) {
162186
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
163187
this.accumulator = accumulator;
164188
this.extractor = extractor;

implementation/src/main/java/io/smallrye/mutiny/groups/Gatherers.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@
88
import java.util.function.Supplier;
99
import java.util.stream.Collectors;
1010

11-
import io.smallrye.mutiny.tuples.Tuple2;
11+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
1212

1313
/**
1414
* Factory interface for creating {@link Gatherer} instances.
15+
* <p>
1516
* This interface provides various static methods to create different types of gatherers.
1617
*/
1718
public interface Gatherers {
@@ -30,7 +31,7 @@ public interface Gatherers {
3031
*/
3132
static <I, ACC, O> Gatherer<I, ACC, O> of(Supplier<ACC> initialAccumulatorSupplier,
3233
BiFunction<ACC, I, ACC> accumulatorFunction,
33-
BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor,
34+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor,
3435
Function<ACC, Optional<O>> finalizer) {
3536
return new DefaultGatherer<>(initialAccumulatorSupplier, accumulatorFunction, extractor, finalizer);
3637
}
@@ -49,7 +50,7 @@ static <I, ACC, O> Gatherer<I, ACC, O> of(Supplier<ACC> initialAccumulatorSuppli
4950
*/
5051
static <I> Gatherer<I, I, I> scan(Supplier<I> initialAccumulatorSupplier, BiFunction<I, I, I> accumulatorFunction) {
5152
return of(initialAccumulatorSupplier, accumulatorFunction,
52-
(acc, done) -> done ? Optional.empty() : Optional.of(Tuple2.of(acc, acc)), Optional::of);
53+
(acc, done) -> done ? Optional.empty() : Optional.of(Extraction.of(acc, acc)), Optional::of);
5354
}
5455

5556
/**
@@ -84,7 +85,7 @@ static <I> Gatherer<I, List<I>, List<I>> window(int size) {
8485
return acc;
8586
}, (acc, completed) -> {
8687
if (acc.size() == size) {
87-
return Optional.of(Tuple2.of(new ArrayList<>(), new ArrayList<>(acc)));
88+
return Optional.of(Extraction.of(new ArrayList<>(), new ArrayList<>(acc)));
8889
}
8990
return Optional.empty();
9091
}, acc -> acc.isEmpty()
@@ -109,7 +110,7 @@ static <I> Gatherer<I, List<I>, List<I>> slidingWindow(int size) {
109110
return acc;
110111
}, (acc, completed) -> {
111112
if (acc.size() == size) {
112-
return Optional.of(Tuple2.of(acc.stream().skip(1).collect(Collectors.toList()), new ArrayList<>(acc)));
113+
return Optional.of(Extraction.of(acc.stream().skip(1).collect(Collectors.toList()), new ArrayList<>(acc)));
113114
}
114115
return Optional.empty();
115116
}, acc -> acc.isEmpty()
@@ -128,12 +129,12 @@ class DefaultGatherer<I, ACC, O> implements Gatherer<I, ACC, O> {
128129

129130
private final Supplier<ACC> initialAccumulatorSupplier;
130131
private final BiFunction<ACC, I, ACC> accumulatorFunction;
131-
private final BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor;
132+
private final BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor;
132133
private final Function<ACC, Optional<O>> finalizer;
133134

134135
public DefaultGatherer(Supplier<ACC> initialAccumulatorSupplier,
135136
BiFunction<ACC, I, ACC> accumulatorFunction,
136-
BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor,
137+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor,
137138
Function<ACC, Optional<O>> finalizer) {
138139
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
139140
this.accumulatorFunction = accumulatorFunction;
@@ -152,7 +153,7 @@ public ACC accumulate(ACC accumulator, I item) {
152153
}
153154

154155
@Override
155-
public Optional<Tuple2<ACC, O>> extract(ACC accumulator, boolean upstreamCompleted) {
156+
public Optional<Extraction<ACC, O>> extract(ACC accumulator, boolean upstreamCompleted) {
156157
return extractor.apply(accumulator, upstreamCompleted);
157158
}
158159

@@ -171,5 +172,4 @@ public Optional<O> finalize(ACC accumulator) {
171172
static <I> Gatherer.Builder<I> builder() {
172173
return new Gatherer.Builder<>();
173174
}
174-
175175
}

implementation/src/main/java/io/smallrye/mutiny/groups/MultiOnItemGather.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
import io.smallrye.common.annotation.CheckReturnValue;
1111
import io.smallrye.common.annotation.Experimental;
1212
import io.smallrye.mutiny.Multi;
13+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
1314
import io.smallrye.mutiny.operators.multi.MultiGather;
14-
import io.smallrye.mutiny.tuples.Tuple2;
1515

1616
/**
1717
* A builder to gather items emitted by a {@link Multi} into an accumulator.
@@ -97,15 +97,15 @@ private ExtractStep(Multi<I> upstream, Supplier<ACC> initialAccumulatorSupplier,
9797
* When the extractor function returns an empty {@link Optional}, no value is emitted.
9898
* When the extractor function returns a non-empty {@link Optional}, the value is emitted, and the accumulator is
9999
* updated.
100-
* This is done by returning a {@link Tuple2} containing the new accumulator and the value to emit.
100+
* This is done by returning a {@link Extraction} containing the new accumulator and the value to emit.
101101
*
102102
* @param extractor the extractor function, which takes the current accumulator and returns an {@link Optional}
103-
* containing a {@link Tuple2} with the new accumulator and the value to emit
103+
* containing a {@link Extraction} with the new accumulator and the value to emit
104104
* @param <O> the type of the value to emit
105105
* @return the next step in the builder
106106
*/
107107
@CheckReturnValue
108-
public <O> FinalizerStep<I, ACC, O> extract(BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor) {
108+
public <O> FinalizerStep<I, ACC, O> extract(BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor) {
109109
nonNull(extractor, "extractor");
110110
return new FinalizerStep<>(upstream, initialAccumulatorSupplier, accumulator, extractor);
111111
}
@@ -122,12 +122,12 @@ public static class FinalizerStep<I, ACC, O> {
122122
private final Multi<I> upstream;
123123
private final Supplier<ACC> initialAccumulatorSupplier;
124124
private final BiFunction<ACC, I, ACC> accumulator;
125-
private final BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor;
125+
private final BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor;
126126

127127
private FinalizerStep(Multi<I> upstream,
128128
Supplier<ACC> initialAccumulatorSupplier,
129129
BiFunction<ACC, I, ACC> accumulator,
130-
BiFunction<ACC, Boolean, Optional<Tuple2<ACC, O>>> extractor) {
130+
BiFunction<ACC, Boolean, Optional<Extraction<ACC, O>>> extractor) {
131131
this.upstream = upstream;
132132
this.initialAccumulatorSupplier = initialAccumulatorSupplier;
133133
this.accumulator = accumulator;

implementation/src/main/java/io/smallrye/mutiny/operators/multi/MultiGather.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77

88
import io.smallrye.mutiny.Multi;
99
import io.smallrye.mutiny.groups.Gatherer;
10+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
1011
import io.smallrye.mutiny.helpers.Subscriptions;
1112
import io.smallrye.mutiny.subscription.MultiSubscriber;
12-
import io.smallrye.mutiny.tuples.Tuple2;
1313

1414
public class MultiGather<I, ACC, O> extends AbstractMultiOperator<I, O> {
1515

@@ -77,14 +77,14 @@ public void onItem(I item) {
7777
if (acc == null) {
7878
throw new NullPointerException("The accumulator returned a null value");
7979
}
80-
Optional<Tuple2<ACC, O>> mapping = gatherer.extract(acc, false);
80+
Optional<Extraction<ACC, O>> mapping = gatherer.extract(acc, false);
8181
if (mapping == null) {
8282
throw new NullPointerException("The extractor returned a null value");
8383
}
8484
if (mapping.isPresent()) {
85-
Tuple2<ACC, O> tuple = mapping.get();
86-
acc = tuple.getItem1();
87-
O value = tuple.getItem2();
85+
Extraction<ACC, O> result = mapping.get();
86+
acc = result.nextAccumulator();
87+
O value = result.nextItem();
8888
if (acc == null) {
8989
throw new NullPointerException("The extractor returned a null accumulator value");
9090
}
@@ -125,14 +125,14 @@ private void drainRemainingElements() {
125125
return;
126126
}
127127
try {
128-
Optional<Tuple2<ACC, O>> mapping = gatherer.extract(acc, true);
128+
Optional<Extraction<ACC, O>> mapping = gatherer.extract(acc, true);
129129
if (mapping == null) {
130130
throw new NullPointerException("The extractor returned a null value");
131131
}
132132
if (mapping.isPresent()) {
133-
Tuple2<ACC, O> tuple = mapping.get();
134-
acc = tuple.getItem1();
135-
O value = tuple.getItem2();
133+
Extraction<ACC, O> result = mapping.get();
134+
acc = result.nextAccumulator();
135+
O value = result.nextItem();
136136
if (acc == null) {
137137
throw new NullPointerException("The extractor returned a null accumulator value");
138138
}

implementation/src/test/java/io/smallrye/mutiny/operators/multi/MultiGatherTest.java

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@
1111
import org.junit.jupiter.api.Test;
1212

1313
import io.smallrye.mutiny.Multi;
14+
import io.smallrye.mutiny.groups.Gatherer;
15+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
1416
import io.smallrye.mutiny.groups.Gatherers;
1517
import io.smallrye.mutiny.helpers.test.AssertSubscriber;
16-
import io.smallrye.mutiny.tuples.Tuple2;
1718

1819
class MultiGatherTest {
1920

@@ -99,7 +100,7 @@ void gatherLinesOfText() {
99100
String str = sb.toString();
100101
if (str.contains("\n")) {
101102
String[] lines = str.split("\n", 2);
102-
return Optional.of(Tuple2.of(new StringBuilder(lines[1]), lines[0]));
103+
return Optional.of(Extraction.of(new StringBuilder(lines[1]), lines[0]));
103104
}
104105
return Optional.empty();
105106
})
@@ -133,7 +134,7 @@ void checkCompletionCorrectness() {
133134
String str = sb.toString();
134135
if (str.contains(",")) {
135136
String[] lines = str.split(",", 2);
136-
return Optional.of(Tuple2.of(new StringBuilder(lines[1]), lines[0]));
137+
return Optional.of(Extraction.of(new StringBuilder(lines[1]), lines[0]));
137138
}
138139
return Optional.empty();
139140
})
@@ -164,7 +165,8 @@ void rejectNullParamsInApi() {
164165
.withMessageContaining("accumulator");
165166
assertThatIllegalArgumentException()
166167
.isThrownBy(() -> multi.onItem().gather().into(ArrayList<Integer>::new).accumulate((a, b) -> a)
167-
.extract((BiFunction<ArrayList<Integer>, Boolean, Optional<Tuple2<ArrayList<Integer>, Object>>>) null))
168+
.extract(
169+
(BiFunction<ArrayList<Integer>, Boolean, Optional<Extraction<ArrayList<Integer>, Object>>>) null))
168170
.withMessageContaining("extractor");
169171
assertThatIllegalArgumentException()
170172
.isThrownBy(() -> multi.onItem().gather().into(ArrayList<Integer>::new).accumulate((a, b) -> a)
@@ -220,7 +222,7 @@ void rejectNullInExtractorOptionalTupleLeft() {
220222
acc.add(next);
221223
return acc;
222224
})
223-
.extract((acc, completed) -> Optional.of(Tuple2.of(null, "ok")))
225+
.extract((acc, completed) -> Optional.of(Extraction.of(null, "ok")))
224226
.finalize(acc -> Optional.empty())
225227
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
226228
sub.assertFailedWith(NullPointerException.class, "The extractor returned a null accumulator value");
@@ -235,7 +237,7 @@ void rejectNullInExtractorOptionalTupleRight() {
235237
acc.add(next);
236238
return acc;
237239
})
238-
.extract((acc, completed) -> Optional.of(Tuple2.of(new ArrayList<>(), null)))
240+
.extract((acc, completed) -> Optional.of(Extraction.of(new ArrayList<>(), null)))
239241
.finalize(acc -> Optional.empty())
240242
.subscribe().withSubscriber(AssertSubscriber.create(Long.MAX_VALUE));
241243
sub.assertFailedWith(NullPointerException.class, "The extractor returned a null value to emit");
@@ -350,4 +352,43 @@ void rejectBadRequests() {
350352
sub.request(-10L).assertFailedWith(IllegalArgumentException.class,
351353
"The number of items requested must be strictly positive");
352354
}
355+
356+
@Test
357+
void builderApi() {
358+
Gatherer<String, StringBuilder, String> gatherer = Gatherers.<String> builder()
359+
.into(StringBuilder::new)
360+
.accumulate(StringBuilder::append)
361+
.extract((sb, completed) -> {
362+
String str = sb.toString();
363+
if (str.contains("\n")) {
364+
String[] lines = str.split("\n", 2);
365+
return Optional.of(Extraction.of(new StringBuilder(lines[1]), lines[0]));
366+
}
367+
return Optional.empty();
368+
})
369+
.finalize(sb -> Optional.of(sb.toString()));
370+
371+
List<String> chunks = List.of(
372+
"Hello", " ", "world!\n",
373+
"This is a test\n",
374+
"==\n==",
375+
"\n\nThis", " is", " ", "amazing\n\n");
376+
AssertSubscriber<String> sub = Multi.createFrom().iterable(chunks)
377+
.onItem().gather(gatherer)
378+
.subscribe().withSubscriber(AssertSubscriber.create());
379+
380+
sub.awaitNextItems(2);
381+
assertThat(sub.getItems()).containsExactly("Hello world!", "This is a test");
382+
383+
sub.request(Long.MAX_VALUE);
384+
assertThat(sub.getItems()).containsExactly(
385+
"Hello world!",
386+
"This is a test",
387+
"==",
388+
"==",
389+
"",
390+
"This is amazing",
391+
"",
392+
"");
393+
}
353394
}

reactive-streams-tck-tests/src/test/java/io/smallrye/mutiny/tcktests/MultiGatherTckTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.stream.LongStream;
77

88
import io.smallrye.mutiny.Multi;
9-
import io.smallrye.mutiny.tuples.Tuple2;
9+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
1010

1111
public class MultiGatherTckTest extends AbstractPublisherTck<Long> {
1212

@@ -24,7 +24,7 @@ public Flow.Publisher<Long> createFlowPublisher(long elements) {
2424
if (list.isEmpty()) {
2525
return Optional.empty();
2626
} else {
27-
return Optional.of(Tuple2.of(new ArrayList<>(), list.get(0)));
27+
return Optional.of(Extraction.of(new ArrayList<>(), list.get(0)));
2828
}
2929
})
3030
.finalize(list -> Optional.empty());

workshop-examples/src/main/java/_03_composition_transformation/_22_Multi_Chunks_To_Sentence_Gather.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
import io.smallrye.mutiny.Multi;
1212
import io.smallrye.mutiny.Uni;
13-
import io.smallrye.mutiny.tuples.Tuple2;
13+
import io.smallrye.mutiny.groups.Gatherer.Extraction;
1414

1515
public class _22_Multi_Chunks_To_Sentence_Gather {
1616

@@ -34,7 +34,7 @@ public static void main(String[] args) throws InterruptedException {
3434
String str = sb.toString();
3535
if (str.contains("\n")) {
3636
String[] lines = str.split("\n", 2);
37-
return Optional.of(Tuple2.of(new StringBuilder(lines[1]), lines[0]));
37+
return Optional.of(Extraction.of(new StringBuilder(lines[1]), lines[0]));
3838
}
3939
return Optional.empty();
4040
})

0 commit comments

Comments
 (0)