Skip to content

Commit ba1aa2a

Browse files
committed
Implement aggregation in pipeline service
Signed-off-by: Miroslav Kovar <miroslavkovar@protonmail.com>
1 parent 3ad965f commit ba1aa2a

File tree

5 files changed

+285
-0
lines changed

5 files changed

+285
-0
lines changed

ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/SnapWorldStateDownloadProcess.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,7 @@ public SnapWorldStateDownloadProcess build() {
433433
"batchDownloadBlockAccessListsData",
434434
tasks -> requestDataStep.requestBlockAccessLists(tasks),
435435
maxOutstandingRequests)
436+
.inSingleBatch()
436437
.thenProcess(
437438
"batchPersistBlockAccessListsData", tasks -> persistDataStep.persist(tasks))
438439
.andFinishWith(
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Copyright ConsenSys AG.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package org.hyperledger.besu.services.pipeline;
16+
17+
import org.hyperledger.besu.plugin.services.metrics.Counter;
18+
19+
import java.util.ArrayDeque;
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Queue;
23+
import java.util.List;
24+
25+
/**
26+
* A read pipe that aggregates all incoming batches into a single batch.
27+
*
28+
* @param <T> the type of item included in batches
29+
*/
30+
public class AggregatingReadPipe<T> implements ReadPipe<List<T>> {
31+
32+
private final ReadPipe<List<T>> input;
33+
private final Counter aggregateCounter;
34+
private final Queue<T> pendingItems = new ArrayDeque<>();
35+
36+
/**
37+
* Instantiates a new Aggregating read pipe.
38+
*
39+
* @param input the input batches
40+
* @param aggregateCounter the aggregate counter
41+
*/
42+
public AggregatingReadPipe(final ReadPipe<List<T>> input, final Counter aggregateCounter) {
43+
this.input = input;
44+
this.aggregateCounter = aggregateCounter;
45+
}
46+
47+
@Override
48+
public boolean hasMore() {
49+
return input.hasMore() || !pendingItems.isEmpty();
50+
}
51+
52+
@Override
53+
public boolean isAborted() {
54+
return input.isAborted();
55+
}
56+
57+
@Override
58+
public List<T> get() {
59+
drainInputToPendingByGet();
60+
return emitPendingUpTo(Integer.MAX_VALUE);
61+
}
62+
63+
@Override
64+
public List<T> poll() {
65+
drainInputToPendingByPoll();
66+
67+
if (input.hasMore()) {
68+
return null;
69+
}
70+
71+
return emitPendingUpTo(Integer.MAX_VALUE);
72+
}
73+
74+
@Override
75+
public int drainTo(final Collection<List<T>> output, final int maxElements) {
76+
if (maxElements <= 0) {
77+
return 0;
78+
}
79+
80+
drainInputToPendingByPoll();
81+
82+
final List<T> aggregate = emitPendingUpTo(maxElements);
83+
if (aggregate != null) {
84+
output.add(aggregate);
85+
return aggregate.size();
86+
}
87+
return 0;
88+
}
89+
90+
private List<T> emitPendingUpTo(final int maximumSize) {
91+
if (pendingItems.isEmpty()) {
92+
return null;
93+
}
94+
final int outputSize = Math.min(maximumSize, pendingItems.size());
95+
final List<T> output = new ArrayList<>(outputSize);
96+
for (int i = 0; i < outputSize; i++) {
97+
output.add(pendingItems.remove());
98+
}
99+
aggregateCounter.inc();
100+
return output;
101+
}
102+
103+
private void drainInputToPendingByPoll() {
104+
List<T> batch;
105+
while ((batch = input.poll()) != null) {
106+
pendingItems.addAll(batch);
107+
}
108+
}
109+
110+
private void drainInputToPendingByGet() {
111+
while (input.hasMore()) {
112+
final List<T> batch = input.get();
113+
if (batch != null) {
114+
pendingItems.addAll(batch);
115+
}
116+
}
117+
}
118+
}

services/pipeline/src/main/java/org/hyperledger/besu/services/pipeline/PipelineBuilder.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,31 @@ public PipelineBuilder<I, List<T>> inBatches(
296296
pipelineName);
297297
}
298298

299+
/**
300+
* Aggregates all incoming batches into a single batch.
301+
*
302+
* <p>This stage is intended to be used after {@link #inBatches(int)} (or any stage producing
303+
* {@link List} items), when downstream processing must operate on one final aggregate batch.
304+
*
305+
* @return a {@link PipelineBuilder} ready to extend the pipeline with additional stages.
306+
*/
307+
@SuppressWarnings("unchecked")
308+
public PipelineBuilder<I, T> inSingleBatch() {
309+
return new PipelineBuilder<>(
310+
inputPipe,
311+
stages,
312+
pipes,
313+
lastStageName,
314+
(ReadPipe<T>)
315+
new AggregatingReadPipe<>(
316+
(ReadPipe<List<T>>) pipeEnd,
317+
outputCounter.labels(lastStageName + "_outputPipe", "aggregated_batches")),
318+
1,
319+
outputCounter,
320+
tracingEnabled,
321+
pipelineName);
322+
}
323+
299324
/**
300325
* Adds a 1-to-many processing stage to the pipeline. For each item in the stream, <i>mapper</i>
301326
* is called and each item of the {@link Stream} it returns is output as an individual item. The
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright ConsenSys AG.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*
13+
* SPDX-License-Identifier: Apache-2.0
14+
*/
15+
package org.hyperledger.besu.services.pipeline;
16+
17+
import static java.util.Arrays.asList;
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.hyperledger.besu.metrics.noop.NoOpMetricsSystem.NO_OP_COUNTER;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.verifyNoInteractions;
24+
25+
import org.hyperledger.besu.plugin.services.metrics.Counter;
26+
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
30+
import org.junit.jupiter.api.Test;
31+
32+
public class AggregatingReadPipeTest {
33+
34+
private final Pipe<List<String>> source =
35+
new Pipe<>(10, NO_OP_COUNTER, NO_OP_COUNTER, NO_OP_COUNTER, "source_pipe");
36+
private final Counter aggregateCounter = mock(Counter.class);
37+
private final AggregatingReadPipe<String> aggregatingPipe =
38+
new AggregatingReadPipe<>(source, aggregateCounter);
39+
40+
@Test
41+
public void shouldGetSingleBatchContainingAllInputBatches() {
42+
source.put(asList("a", "b"));
43+
source.put(asList("c"));
44+
source.put(asList("d", "e"));
45+
source.close();
46+
47+
assertThat(aggregatingPipe.get()).containsExactly("a", "b", "c", "d", "e");
48+
assertThat(aggregatingPipe.get()).isNull();
49+
50+
verify(aggregateCounter, times(1)).inc();
51+
}
52+
53+
@Test
54+
public void shouldPollSingleBatchOnlyWhenInputIsComplete() {
55+
source.put(asList("a", "b"));
56+
57+
assertThat(aggregatingPipe.poll()).isNull();
58+
verifyNoInteractions(aggregateCounter);
59+
60+
source.put(asList("c"));
61+
source.close();
62+
63+
assertThat(aggregatingPipe.poll()).containsExactly("a", "b", "c");
64+
assertThat(aggregatingPipe.poll()).isNull();
65+
66+
verify(aggregateCounter, times(1)).inc();
67+
}
68+
69+
@Test
70+
public void shouldDrainToRespectMaxElements() {
71+
source.put(asList("a"));
72+
source.put(asList("b", "c"));
73+
source.close();
74+
75+
final List<List<String>> output = new ArrayList<>();
76+
77+
assertThat(aggregatingPipe.drainTo(output, 0)).isZero();
78+
assertThat(output).isEmpty();
79+
80+
assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
81+
assertThat(output).containsExactly(asList("a"));
82+
83+
assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
84+
assertThat(output).containsExactly(asList("a"), asList("b"));
85+
86+
assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
87+
assertThat(output).containsExactly(asList("a"), asList("b"), asList("c"));
88+
89+
assertThat(aggregatingPipe.drainTo(output, 1)).isZero();
90+
91+
verify(aggregateCounter, times(3)).inc();
92+
}
93+
94+
@Test
95+
public void shouldDrainAvailableDataAndLeaveRemainingForPoll() {
96+
source.put(asList("a", "b"));
97+
98+
final List<List<String>> output = new ArrayList<>();
99+
100+
assertThat(aggregatingPipe.drainTo(output, 1)).isEqualTo(1);
101+
assertThat(output).containsExactly(asList("a"));
102+
103+
source.close();
104+
105+
assertThat(aggregatingPipe.poll()).containsExactly("b");
106+
}
107+
108+
@Test
109+
public void shouldTrackHasMoreAcrossAggregationLifecycle() {
110+
assertThat(aggregatingPipe.hasMore()).isTrue();
111+
112+
source.put(asList("a"));
113+
assertThat(aggregatingPipe.poll()).isNull();
114+
assertThat(aggregatingPipe.hasMore()).isTrue();
115+
116+
source.close();
117+
assertThat(aggregatingPipe.poll()).containsExactly("a");
118+
assertThat(aggregatingPipe.hasMore()).isFalse();
119+
}
120+
}

services/pipeline/src/test/java/org/hyperledger/besu/services/pipeline/PipelineBuilderTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,27 @@ public void shouldCombineIntoBatches() throws Exception {
134134
assertThat(output).isEmpty();
135135
}
136136

137+
@Test
138+
public void shouldAggregateAllBatchesIntoSingleBatch() throws Exception {
139+
final BlockingQueue<List<Integer>> output = new ArrayBlockingQueue<>(1);
140+
final Pipeline<Integer> pipeline =
141+
PipelineBuilder.<Integer>createPipeline(
142+
"source", 20, NO_OP_LABELLED_2_COUNTER, false, "test")
143+
.inBatches(4)
144+
.inSingleBatch()
145+
.andFinishWith("end", output::offer);
146+
147+
final Pipe<Integer> input = pipeline.getInputPipe();
148+
tasks.forEachRemaining(input::put);
149+
input.close();
150+
151+
pipeline.start(executorService).get(10, SECONDS);
152+
153+
assertThat(output.poll(10, SECONDS))
154+
.containsExactly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15);
155+
assertThat(output).isEmpty();
156+
}
157+
137158
@Test
138159
public void shouldProcessAsync() throws Exception {
139160
final List<String> output = new ArrayList<>();

0 commit comments

Comments
 (0)