Skip to content

Commit 0588067

Browse files
authored
feat: added block stream simulator out-of-order streaming (#966)
Signed-off-by: Alex Kehayov <[email protected]>
1 parent 6c4a193 commit 0588067

18 files changed

+600
-33
lines changed

simulator/src/main/java/org/hiero/block/simulator/config/ConfigInjectionModule.java

+13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.hiero.block.simulator.config.data.ConsumerConfig;
1212
import org.hiero.block.simulator.config.data.GrpcConfig;
1313
import org.hiero.block.simulator.config.data.SimulatorStartupDataConfig;
14+
import org.hiero.block.simulator.config.data.UnorderedStreamConfig;
1415
import org.hiero.block.simulator.config.logging.ConfigurationLogging;
1516
import org.hiero.block.simulator.config.logging.SimulatorConfigurationLogger;
1617

@@ -101,4 +102,16 @@ static ConfigurationLogging providesConfigurationLogging(final Configuration con
101102
static SimulatorStartupDataConfig providesSimulatorStartupDataConfig(final Configuration configuration) {
102103
return configuration.getConfigData(SimulatorStartupDataConfig.class);
103104
}
105+
106+
/**
107+
* Provides the unordered stream configuration.
108+
*
109+
* @param configuration the configuration to be used by the unordered stream
110+
* @return the unordered stream configuration
111+
*/
112+
@Singleton
113+
@Provides
114+
static UnorderedStreamConfig provideUnorderedStreamConfig(final Configuration configuration) {
115+
return configuration.getConfigData(UnorderedStreamConfig.class);
116+
}
104117
}

simulator/src/main/java/org/hiero/block/simulator/config/SimulatorConfigExtension.java

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.hiero.block.simulator.config.data.ConsumerConfig;
1313
import org.hiero.block.simulator.config.data.GrpcConfig;
1414
import org.hiero.block.simulator.config.data.SimulatorStartupDataConfig;
15+
import org.hiero.block.simulator.config.data.UnorderedStreamConfig;
1516

1617
/** Sets up configuration for services. */
1718
@AutoService(ConfigurationExtension.class)
@@ -27,6 +28,7 @@ public SimulatorConfigExtension() {
2728
public Set<Class<? extends Record>> getConfigDataTypes() {
2829
return Set.of(
2930
BlockStreamConfig.class,
31+
UnorderedStreamConfig.class,
3032
ConsumerConfig.class,
3133
GrpcConfig.class,
3234
BlockGeneratorConfig.class,

simulator/src/main/java/org/hiero/block/simulator/config/SimulatorMappedConfigSourceInitializer.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,13 @@ public final class SimulatorMappedConfigSourceInitializer {
5656
"SIMULATOR_STARTUP_DATA_LATEST_ACK_BLOCK_NUMBER_PATH"),
5757
new ConfigMapping(
5858
"simulator.startup.data.latestAckBlockHashPath",
59-
"SIMULATOR_STARTUP_DATA_LATEST_ACK_BLOCK_HASH_PATH"));
59+
"SIMULATOR_STARTUP_DATA_LATEST_ACK_BLOCK_HASH_PATH"),
60+
61+
// Block stream configuration
62+
new ConfigMapping("unorderedStream.enabled", "UNORDERED_STREAM_ENABLED"),
63+
new ConfigMapping("unorderedStream.availableBlocks", "UNORDERED_STREAM_AVAILABLE_BLOCKS"),
64+
new ConfigMapping("unorderedStream.sequenceScrambleLevel", "UNORDERED_STREAM_SEQUENCE_SCRAMBLE_LEVEL"),
65+
new ConfigMapping("unorderedStream.fixedStreamingSequence", "UNORDERED_STREAM_FIXED_STREAMING_SEQUENCE"));
6066

6167
private SimulatorMappedConfigSourceInitializer() {}
6268

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package org.hiero.block.simulator.config.data;
3+
4+
import com.swirlds.config.api.ConfigData;
5+
import com.swirlds.config.api.ConfigProperty;
6+
import java.util.regex.Pattern;
7+
import org.hiero.block.common.utils.Preconditions;
8+
import org.hiero.block.simulator.config.logging.Loggable;
9+
10+
/**
11+
* Defines the configuration data for the unordered block streaming in the Hedera Block Simulator.
12+
*
13+
* @param enabled indicates whether to enable the unordered streaming mode
14+
* @param availableBlocks the numbers of the blocks which will be considered available for sending
15+
* @param sequenceScrambleLevel the coefficient used to randomly scramble the stream
16+
* @param fixedStreamingSequence the numbers of the blocks which will form the steam (could be from availableBlocks or not)
17+
*/
18+
@ConfigData("unorderedStream")
19+
public record UnorderedStreamConfig(
20+
@Loggable @ConfigProperty(defaultValue = "false") boolean enabled,
21+
@Loggable @ConfigProperty(defaultValue = "[1-10]") String availableBlocks,
22+
@Loggable @ConfigProperty(defaultValue = "0") int sequenceScrambleLevel,
23+
@Loggable @ConfigProperty(defaultValue = "1,2,4,3") String fixedStreamingSequence) {
24+
25+
public UnorderedStreamConfig {
26+
availableBlocks = availableBlocks.trim();
27+
fixedStreamingSequence = fixedStreamingSequence.trim();
28+
final Pattern pattern = Pattern.compile("^(\\[(\\d+)-(\\d+)\\]|\\d+)(,\\s*(\\[(\\d+)-(\\d+)\\]|\\d+))*$");
29+
30+
Preconditions.requireInRange(sequenceScrambleLevel, 0, 10, "sequenceScrambleLevel must be between 0 and 10");
31+
Preconditions.requireNotBlank(
32+
fixedStreamingSequence, "fixedStreamingSequence must not be blank when sequenceScrambleLevel is 0");
33+
if (!pattern.matcher(fixedStreamingSequence).matches()) {
34+
throw new IllegalArgumentException(fixedStreamingSequence + " does not match the expected format");
35+
}
36+
Preconditions.requireNotBlank(
37+
availableBlocks, "availableBlocks must not be blank when sequenceScrambleLevel is larger than 0");
38+
if (!pattern.matcher(availableBlocks).matches()) {
39+
throw new IllegalArgumentException(availableBlocks + " does not match the expected format");
40+
}
41+
}
42+
}

simulator/src/main/java/org/hiero/block/simulator/generator/CraftBlockStreamManager.java

+164-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@
22
package org.hiero.block.simulator.generator;
33

44
import static java.lang.System.Logger.Level.DEBUG;
5+
import static java.lang.System.Logger.Level.ERROR;
56
import static java.lang.System.Logger.Level.INFO;
7+
import static java.lang.System.Logger.Level.WARNING;
8+
import static java.util.Objects.requireNonNull;
69

710
import com.hedera.hapi.block.stream.BlockProof;
811
import com.hedera.hapi.block.stream.protoc.Block;
@@ -12,14 +15,23 @@
1215
import java.io.IOException;
1316
import java.lang.System.Logger;
1417
import java.util.ArrayList;
18+
import java.util.Collections;
19+
import java.util.HashMap;
20+
import java.util.Iterator;
21+
import java.util.LinkedHashSet;
1522
import java.util.List;
23+
import java.util.Map;
1624
import java.util.Random;
25+
import java.util.Set;
26+
import java.util.regex.Matcher;
27+
import java.util.regex.Pattern;
1728
import org.hiero.block.common.hasher.Hashes;
1829
import org.hiero.block.common.hasher.HashingUtilities;
1930
import org.hiero.block.common.hasher.NaiveStreamingTreeHasher;
2031
import org.hiero.block.common.hasher.StreamingTreeHasher;
2132
import org.hiero.block.internal.BlockItemUnparsed;
2233
import org.hiero.block.simulator.config.data.BlockGeneratorConfig;
34+
import org.hiero.block.simulator.config.data.UnorderedStreamConfig;
2335
import org.hiero.block.simulator.config.types.GenerationMode;
2436
import org.hiero.block.simulator.exception.BlockSimulatorParsingException;
2537
import org.hiero.block.simulator.generator.itemhandler.BlockHeaderHandler;
@@ -56,17 +68,22 @@ public class CraftBlockStreamManager implements BlockStreamManager {
5668
private StreamingTreeHasher inputTreeHasher;
5769
private StreamingTreeHasher outputTreeHasher;
5870

71+
// Unordered streaming
72+
private final boolean unorderedStreamingEnabled;
73+
private Iterator<Block> unorderedStreamIterator;
74+
5975
/**
6076
* Constructs a new CraftBlockStreamManager with the specified configuration.
6177
*
6278
* @param blockGeneratorConfig Configuration parameters for block generation
63-
* including event and transaction counts
79+
* including event and transaction counts
6480
* @param simulatorStartupData simulator startup data used for initialization
6581
* @throws NullPointerException if blockGeneratorConfig is null
6682
*/
6783
public CraftBlockStreamManager(
6884
@NonNull final BlockGeneratorConfig blockGeneratorConfig,
69-
@NonNull final SimulatorStartupData simulatorStartupData) {
85+
@NonNull final SimulatorStartupData simulatorStartupData,
86+
@NonNull final UnorderedStreamConfig unorderedStreamConfig) {
7087
this.generationMode = blockGeneratorConfig.generationMode();
7188
this.minEventsPerBlock = blockGeneratorConfig.minEventsPerBlock();
7289
this.maxEventsPerBlock = blockGeneratorConfig.maxEventsPerBlock();
@@ -81,6 +98,39 @@ public CraftBlockStreamManager(
8198
this.currentBlockNumber = simulatorStartupData.getLatestAckBlockNumber() + 1L;
8299
this.previousBlockHash = simulatorStartupData.getLatestAckBlockHash();
83100
LOGGER.log(INFO, "Block Stream Simulator will use Craft mode for block management");
101+
102+
// Unordered streaming
103+
unorderedStreamingEnabled = unorderedStreamConfig.enabled();
104+
if (unorderedStreamingEnabled) {
105+
initUnorderedStreaming(simulatorStartupData, unorderedStreamConfig);
106+
}
107+
}
108+
109+
private void initUnorderedStreaming(
110+
SimulatorStartupData simulatorStartupData, UnorderedStreamConfig unorderedStreamConfig) {
111+
112+
if (simulatorStartupData.isEnabled()) {
113+
throw new IllegalStateException("Unordered streaming does not support start-up data enabled");
114+
}
115+
this.currentBlockNumber = 0;
116+
final List<Block> blockStreamList;
117+
final int scrambleLevel = unorderedStreamConfig.sequenceScrambleLevel();
118+
if (scrambleLevel == 0) {
119+
// use a predefined order of streaming from the properties file
120+
final LinkedHashSet<Long> fixedStreamingSequence =
121+
parseRangeSet(unorderedStreamConfig.fixedStreamingSequence());
122+
blockStreamList = getBlockStreamList(fixedStreamingSequence);
123+
} else {
124+
// put the predefined available blocks in a list and scramble by the provided coefficient
125+
final Set<Long> availableBlocks = parseRangeSet(unorderedStreamConfig.availableBlocks());
126+
blockStreamList = getBlockStreamList(availableBlocks);
127+
scrambleBlocks(blockStreamList, scrambleLevel);
128+
}
129+
if (blockStreamList.isEmpty()) {
130+
throw new IllegalStateException("No blocks are available for streaming with the current configuration");
131+
}
132+
unorderedStreamIterator = blockStreamList.iterator();
133+
LOGGER.log(INFO, "Unordered streaming is enabled");
84134
}
85135

86136
/**
@@ -109,11 +159,22 @@ public BlockItem getNextBlockItem() {
109159
* Each block includes a header, events with their transactions and results, and a proof.
110160
*
111161
* @return A newly generated Block
112-
* @throws IOException if there is an error processing block items
162+
* @throws IOException if there is an error processing block items
113163
* @throws BlockSimulatorParsingException if there is an error parsing block components
114164
*/
115165
@Override
116166
public Block getNextBlock() throws IOException, BlockSimulatorParsingException {
167+
if (unorderedStreamingEnabled) {
168+
if (unorderedStreamIterator.hasNext()) {
169+
return unorderedStreamIterator.next();
170+
}
171+
return null;
172+
} else {
173+
return createNextBlock();
174+
}
175+
}
176+
177+
private Block createNextBlock() throws BlockSimulatorParsingException {
117178
LOGGER.log(DEBUG, "Started creation of block number %s.".formatted(currentBlockNumber));
118179
// todo(683) Refactor common hasher to accept protoc types, in order to avoid the additional overhead of keeping
119180
// and unparsing.
@@ -188,4 +249,104 @@ private void resetState() {
188249
currentBlockNumber++;
189250
previousBlockHash = currentBlockHash;
190251
}
252+
253+
private List<Block> getBlockStreamList(Set<Long> blockSequence) {
254+
requireNonNull(blockSequence);
255+
final Map<Long, Block> craftedBlocksMap = new HashMap<>();
256+
final List<Block> blockStreamList = new ArrayList<>();
257+
if (!blockSequence.isEmpty()) {
258+
try {
259+
for (int i = 0; i <= Collections.max(blockSequence); i++) {
260+
final Block block = createNextBlock();
261+
final long blockNbr = block.getItems(block.getItemsCount() - 1)
262+
.getBlockProof()
263+
.getBlock();
264+
craftedBlocksMap.put(blockNbr, block);
265+
}
266+
} catch (Exception e) {
267+
LOGGER.log(ERROR, e.getMessage(), e);
268+
throw new RuntimeException(e);
269+
}
270+
for (Long blockNbr : blockSequence) {
271+
if (craftedBlocksMap.containsKey(blockNbr)) {
272+
blockStreamList.add(craftedBlocksMap.get(blockNbr));
273+
}
274+
}
275+
}
276+
return blockStreamList;
277+
}
278+
279+
private void scrambleBlocks(List<Block> list, int coefficient) {
280+
requireNonNull(list);
281+
final int size = list.size();
282+
if (size > 1) {
283+
final List<Block> originalList = new ArrayList<>(list);
284+
final Random random = new Random();
285+
final int maxAttempts = 10;
286+
int attempt = 0;
287+
288+
// The number of swaps is proportional to the coefficient
289+
final int swapCount = (int) ((coefficient / 10.0) * (size * 2));
290+
291+
while (list.equals(originalList) && attempt++ < maxAttempts) {
292+
for (int i = 0; i < swapCount; i++) {
293+
final int index1 = random.nextInt(size - 1);
294+
final int index2;
295+
if (coefficient <= 5) {
296+
// When coefficient is low, swap with a neighbor
297+
index2 = index1 + 1;
298+
} else {
299+
// Higher coefficient allows more distant swaps
300+
index2 = random.nextInt(size);
301+
}
302+
Collections.swap(list, index1, index2);
303+
}
304+
}
305+
306+
// this condition is not supposed to be met when valid configurations are provided
307+
// maxAttempts variable serves for endless loop prevention in case of inaccurate config inputs
308+
if (list.equals(originalList)) {
309+
LOGGER.log(WARNING, "Scramble unsuccessful after " + maxAttempts + " attempts");
310+
}
311+
}
312+
}
313+
314+
private LinkedHashSet<Long> parseRangeSet(String input) {
315+
requireNonNull(input);
316+
final LinkedHashSet<Long> result = new LinkedHashSet<>();
317+
if (!input.isBlank()) {
318+
try {
319+
final Matcher matcher =
320+
Pattern.compile("\\[(\\d+)-(\\d+)\\]|(\\d+)").matcher(input);
321+
while (matcher.find()) {
322+
if (matcher.group(1) != null && matcher.group(2) != null) {
323+
final long start = Long.parseLong(matcher.group(1));
324+
final long end = Long.parseLong(matcher.group(2));
325+
if (start <= 0 || end <= 0) {
326+
throw new IllegalArgumentException(input
327+
+ " does not match the expected format. Range values must be positive and non-zero: ["
328+
+ start + "-" + end + "]");
329+
}
330+
if (start >= end) {
331+
throw new IllegalArgumentException(
332+
input + " does not match the expected format. Invalid range: start >= end in ["
333+
+ start + "-" + end + "]");
334+
}
335+
for (long i = start; i <= end; i++) {
336+
result.add(i);
337+
}
338+
} else if (matcher.group(3) != null) {
339+
final long value = Long.parseLong(matcher.group(3));
340+
if (value <= 0) {
341+
throw new IllegalArgumentException("Values must be positive and non-zero: " + value);
342+
}
343+
result.add(value);
344+
}
345+
}
346+
} catch (Exception e) {
347+
throw new IllegalArgumentException("Exception in parsing input: " + input, e);
348+
}
349+
}
350+
return result;
351+
}
191352
}

simulator/src/main/java/org/hiero/block/simulator/generator/GeneratorInjectionModule.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import edu.umd.cs.findbugs.annotations.NonNull;
77
import javax.inject.Singleton;
88
import org.hiero.block.simulator.config.data.BlockGeneratorConfig;
9+
import org.hiero.block.simulator.config.data.UnorderedStreamConfig;
910
import org.hiero.block.simulator.config.types.GenerationMode;
1011
import org.hiero.block.simulator.startup.SimulatorStartupData;
1112

@@ -29,7 +30,8 @@ public interface GeneratorInjectionModule {
2930
@Provides
3031
static BlockStreamManager providesBlockStreamManager(
3132
@NonNull final BlockGeneratorConfig generatorConfig,
32-
@NonNull final SimulatorStartupData simulatorStartupData) {
33+
@NonNull final SimulatorStartupData simulatorStartupData,
34+
@NonNull final UnorderedStreamConfig unorderedStreamConfig) {
3335
final String managerImpl = generatorConfig.managerImplementation();
3436
final GenerationMode generationMode = generatorConfig.generationMode();
3537
return switch (generationMode) {
@@ -39,7 +41,7 @@ static BlockStreamManager providesBlockStreamManager(
3941
}
4042
yield new BlockAsFileBlockStreamManager(generatorConfig);
4143
}
42-
case CRAFT -> new CraftBlockStreamManager(generatorConfig, simulatorStartupData);
44+
case CRAFT -> new CraftBlockStreamManager(generatorConfig, simulatorStartupData, unorderedStreamConfig);
4345
};
4446
}
4547
}

0 commit comments

Comments
 (0)