Skip to content

Commit 64819d4

Browse files
committed
Adding a new plugin
* Instead of beating on the old implementation, implement a new subscriber plugin that uses a single client thread and feeds that alternately from messaging or history This avoids the countdown latch and state transition logic. Also avoids a state machine, just take the next block from one source or the other, always getting just the next block to send the client. Does require a secondary queue from the messaging, but it's small and simple. * Adjust the subscriber "method" class to be reused, so we track sessions properly * Added a latch so the pipeline thread holds until the session is registered with messaging before returning. This has minimal impact in production, and makes it possible to test reliably. * Added live block push to queue, live blocks stream correctly. * Added handling for request ahead of live condition * Session just skips over live blocks until live reaches the start block. * Added proper shutdown of all sessions if the plugin is stopped. * History and Live both work, and the test disables history to prove it * This is necessary because otherwise the test "history" plugin is too fast and we always serve history. * TODO: * Add more unit tests * Add testing that forces a client to fall behind and catch up * Add some end-to-end test suites to exercise this plugin better * Add more and better javadoc * Clean up the (excessive) trace logging * Add better metrics, the "transition" metrics aren't super helpful. * Suggested metrics * Live batches sent to client (would be good if we can tag this with a client ID) * History blocks sent to client (would be good if we can tag this with a client ID) * Count of times the client thread must wait for live blocks, and time spent waiting * Count of times the client cannot read from history, but subsequently reads from live * Client connection and time from connect to session established Signed-off-by: Joseph Sinclair <121976561+jsync-swirlds@users.noreply.github.com>
1 parent 90ba908 commit 64819d4

File tree

11 files changed

+1416
-24
lines changed

11 files changed

+1416
-24
lines changed

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/GrpcPluginTestBase.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public void clientEndStreamReceived() {
6060
@Override
6161
public void onNext(Bytes item) throws RuntimeException {
6262
fromPluginBytes.add(item);
63-
LOGGER.log(TRACE, "onNext: %d".formatted(fromPluginBytes.size()));
6463
}
6564

6665
@Override

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/SimpleInMemoryHistoricalBlockFacility.java

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public class SimpleInMemoryHistoricalBlockFacility implements HistoricalBlockFac
3030
private final AtomicLong currentBlockNumber = new AtomicLong(UNKNOWN_BLOCK_NUMBER);
3131
private final List<BlockItems> partialBlock = new ArrayList<>();
3232
private final AtomicBoolean delayResponses = new AtomicBoolean(false);
33+
private final AtomicBoolean disablePlugin = new AtomicBoolean(false);
3334
private BlockNodeContext blockNodeContext;
3435

3536
@Override
@@ -42,28 +43,30 @@ public void init(BlockNodeContext context, ServiceBuilder serviceBuilder) {
4243
*/
4344
@Override
4445
public void handleBlockItemsReceived(BlockItems blockItems) {
45-
if (blockItems.isStartOfNewBlock()) {
46-
if (!partialBlock.isEmpty()) {
47-
throw new RuntimeException("Something went wrong, partitionedBlock is not empty. So we never got a end "
48-
+ "block for current block");
46+
if (!disablePlugin.get()) {
47+
if (blockItems.isStartOfNewBlock()) {
48+
if (!partialBlock.isEmpty()) {
49+
throw new RuntimeException(
50+
"Something went wrong, partitionedBlock is not empty. So we never got a end block for current block");
51+
}
52+
currentBlockNumber.set(blockItems.newBlockNumber());
4953
}
50-
currentBlockNumber.set(blockItems.newBlockNumber());
51-
}
52-
partialBlock.add(blockItems);
53-
if (blockItems.isEndOfBlock()) {
54-
final long blockNumber = currentBlockNumber.getAndSet(UNKNOWN_BLOCK_NUMBER);
55-
List<BlockItem> bi = new ArrayList<>();
56-
for (BlockItems items : partialBlock) {
57-
bi.addAll(toBlockItems(items.blockItems()));
54+
partialBlock.add(blockItems);
55+
if (blockItems.isEndOfBlock()) {
56+
final long blockNumber = currentBlockNumber.getAndSet(UNKNOWN_BLOCK_NUMBER);
57+
List<BlockItem> bi = new ArrayList<>();
58+
for (BlockItems items : partialBlock) {
59+
bi.addAll(toBlockItems(items.blockItems()));
60+
}
61+
Block block = new Block(bi);
62+
blockStorage.put(blockNumber, block);
63+
availableBlocks.add(blockNumber);
64+
partialBlock.clear();
65+
// send block persisted message
66+
blockNodeContext
67+
.blockMessaging()
68+
.sendBlockPersisted(new PersistedNotification(blockNumber, blockNumber, 2000));
5869
}
59-
Block block = new Block(bi);
60-
blockStorage.put(blockNumber, block);
61-
availableBlocks.add(blockNumber);
62-
partialBlock.clear();
63-
// send block persisted message
64-
blockNodeContext
65-
.blockMessaging()
66-
.sendBlockPersisted(new PersistedNotification(blockNumber, blockNumber, 2000));
6770
}
6871
}
6972

@@ -97,6 +100,14 @@ public void clearDelayResponses() {
97100
delayResponses.compareAndSet(true, false);
98101
}
99102

103+
public void setDisablePlugin() {
104+
disablePlugin.compareAndSet(false, true);
105+
}
106+
107+
public void clearDisablePlugin() {
108+
disablePlugin.compareAndSet(true, false);
109+
}
110+
100111
/**
101112
* {@inheritDoc}
102113
*/

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/TestBlockMessagingFacility.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ public void sendBlockItems(BlockItems blockItems) {
134134
final int handlerCount = blockItemHandlers.size() + nonBackpressureBlockItemHandlers.size();
135135
LOGGER.log(
136136
Level.TRACE,
137-
"Sending next %d block items to %d handlers."
138-
.formatted(blockItems.blockItems().size(), handlerCount));
137+
"Sending next %d block items for block %d to %d handlers."
138+
.formatted(blockItems.blockItems().size(), blockItems.newBlockNumber(), handlerCount));
139139
sentBlockBlockItems.add(blockItems);
140140
boolean handlerHasBackpressure = false;
141141
for (BlockItemHandler handler : blockItemHandlers) {

block-node/block-access/src/main/java/org/hiero/block/node/access/service/BlockAccessServicePlugin.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.hiero.block.api.BlockRequest;
1919
import org.hiero.block.api.BlockResponse;
2020
import org.hiero.block.api.BlockResponse.Code;
21+
// PBJ doesn't generate GRPC stubs for some reason, also the proto file is broken when PBJ compiles it...
2122
import org.hiero.block.api.protoc.BlockAccessServiceGrpc;
2223
import org.hiero.block.node.spi.BlockNodeContext;
2324
import org.hiero.block.node.spi.BlockNodePlugin;
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
plugins { id("org.hiero.gradle.module.library") }
3+
4+
description = "Hiero Block Node Subscriber Service"
5+
6+
// Remove the following line to enable all 'javac' lint checks that we have turned on by default
7+
// and then fix the reported issues.
8+
tasks.withType<JavaCompile>().configureEach { options.compilerArgs.add("-Xlint:-exports") }
9+
10+
mainModuleInfo {
11+
runtimeOnly("com.swirlds.config.impl")
12+
runtimeOnly("org.apache.logging.log4j.slf4j2.impl")
13+
runtimeOnly("io.helidon.logging.jul")
14+
runtimeOnly("com.hedera.pbj.grpc.helidon.config")
15+
}
16+
17+
testModuleInfo {
18+
requires("org.junit.jupiter.api")
19+
requires("org.hiero.block.node.app.test.fixtures")
20+
requires("org.assertj.core.api")
21+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
import org.hiero.block.node.stream.subscriber.SubscriberServicePlugin;
3+
4+
// SPDX-License-Identifier: Apache-2.0
5+
module org.hiero.block.node.stream.subscriber {
6+
uses com.swirlds.config.api.spi.ConfigurationBuilderFactory;
7+
8+
// export configuration classes to the config module and app
9+
exports org.hiero.block.node.stream.subscriber to
10+
com.swirlds.config.impl,
11+
com.swirlds.config.extensions,
12+
org.hiero.block.node.app;
13+
14+
requires transitive com.hedera.pbj.runtime;
15+
requires transitive com.swirlds.config.api;
16+
requires transitive org.hiero.block.node.spi;
17+
requires transitive org.hiero.block.protobuf;
18+
requires com.swirlds.metrics.api;
19+
requires org.hiero.block.node.base;
20+
requires com.github.spotbugs.annotations;
21+
22+
provides org.hiero.block.node.spi.BlockNodePlugin with
23+
SubscriberServicePlugin;
24+
}

0 commit comments

Comments
 (0)