Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void clientEndStreamReceived() {
@Override
public void onNext(Bytes item) throws RuntimeException {
fromPluginBytes.add(item);
LOGGER.log(TRACE, "onNext: %d".formatted(fromPluginBytes.size()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
private final AtomicLong currentBlockNumber = new AtomicLong(UNKNOWN_BLOCK_NUMBER);
private final List<BlockItems> partialBlock = new ArrayList<>();
private final AtomicBoolean delayResponses = new AtomicBoolean(false);
private final AtomicBoolean disablePlugin = new AtomicBoolean(false);
private BlockNodeContext blockNodeContext;

@Override
Expand All @@ -42,28 +43,30 @@
*/
@Override
public void handleBlockItemsReceived(BlockItems blockItems) {
if (blockItems.isStartOfNewBlock()) {
if (!partialBlock.isEmpty()) {
throw new RuntimeException("Something went wrong, partitionedBlock is not empty. So we never got a end "
+ "block for current block");
if (!disablePlugin.get()) {
if (blockItems.isStartOfNewBlock()) {
if (!partialBlock.isEmpty()) {
throw new RuntimeException(

Check warning on line 49 in block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/SimpleInMemoryHistoricalBlockFacility.java

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

block-node/app/src/testFixtures/java/org/hiero/block/node/app/fixtures/plugintest/SimpleInMemoryHistoricalBlockFacility.java#L49

Avoid throwing raw exception types.
"Something went wrong, partitionedBlock is not empty. So we never got a end block for current block");
}
currentBlockNumber.set(blockItems.newBlockNumber());
}
currentBlockNumber.set(blockItems.newBlockNumber());
}
partialBlock.add(blockItems);
if (blockItems.isEndOfBlock()) {
final long blockNumber = currentBlockNumber.getAndSet(UNKNOWN_BLOCK_NUMBER);
List<BlockItem> bi = new ArrayList<>();
for (BlockItems items : partialBlock) {
bi.addAll(toBlockItems(items.blockItems()));
partialBlock.add(blockItems);
if (blockItems.isEndOfBlock()) {
final long blockNumber = currentBlockNumber.getAndSet(UNKNOWN_BLOCK_NUMBER);
List<BlockItem> bi = new ArrayList<>();
for (BlockItems items : partialBlock) {
bi.addAll(toBlockItems(items.blockItems()));
}
Block block = new Block(bi);
blockStorage.put(blockNumber, block);
availableBlocks.add(blockNumber);
partialBlock.clear();
// send block persisted message
blockNodeContext
.blockMessaging()
.sendBlockPersisted(new PersistedNotification(blockNumber, blockNumber, 2000));
}
Block block = new Block(bi);
blockStorage.put(blockNumber, block);
availableBlocks.add(blockNumber);
partialBlock.clear();
// send block persisted message
blockNodeContext
.blockMessaging()
.sendBlockPersisted(new PersistedNotification(blockNumber, blockNumber, 2000));
}
}

Expand Down Expand Up @@ -97,6 +100,14 @@
delayResponses.compareAndSet(true, false);
}

public void setDisablePlugin() {
disablePlugin.compareAndSet(false, true);
}

public void clearDisablePlugin() {
disablePlugin.compareAndSet(true, false);
}

/**
* {@inheritDoc}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ public void sendBlockItems(BlockItems blockItems) {
final int handlerCount = blockItemHandlers.size() + nonBackpressureBlockItemHandlers.size();
LOGGER.log(
Level.TRACE,
"Sending next %d block items to %d handlers."
.formatted(blockItems.blockItems().size(), handlerCount));
"Sending next %d block items for block %d to %d handlers."
.formatted(blockItems.blockItems().size(), blockItems.newBlockNumber(), handlerCount));
sentBlockBlockItems.add(blockItems);
boolean handlerHasBackpressure = false;
for (BlockItemHandler handler : blockItemHandlers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.hiero.block.api.BlockRequest;
import org.hiero.block.api.BlockResponse;
import org.hiero.block.api.BlockResponse.Code;
// PBJ doesn't generate GRPC stubs for some reason, also the proto file is broken when PBJ compiles it...
import org.hiero.block.api.protoc.BlockAccessServiceGrpc;
import org.hiero.block.node.spi.BlockNodeContext;
import org.hiero.block.node.spi.BlockNodePlugin;
Expand Down
21 changes: 21 additions & 0 deletions block-node/stream-subscriber/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// SPDX-License-Identifier: Apache-2.0
plugins { id("org.hiero.gradle.module.library") }

description = "Hiero Block Node Subscriber Service"

// Remove the following line to enable all 'javac' lint checks that we have turned on by default
// and then fix the reported issues.
tasks.withType<JavaCompile>().configureEach { options.compilerArgs.add("-Xlint:-exports") }

mainModuleInfo {
runtimeOnly("com.swirlds.config.impl")
runtimeOnly("org.apache.logging.log4j.slf4j2.impl")
runtimeOnly("io.helidon.logging.jul")
runtimeOnly("com.hedera.pbj.grpc.helidon.config")
}

testModuleInfo {
requires("org.junit.jupiter.api")
requires("org.hiero.block.node.app.test.fixtures")
requires("org.assertj.core.api")
}
24 changes: 24 additions & 0 deletions block-node/stream-subscriber/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// SPDX-License-Identifier: Apache-2.0
import org.hiero.block.node.stream.subscriber.SubscriberServicePlugin;

// SPDX-License-Identifier: Apache-2.0
module org.hiero.block.node.stream.subscriber {
uses com.swirlds.config.api.spi.ConfigurationBuilderFactory;

// export configuration classes to the config module and app
exports org.hiero.block.node.stream.subscriber to
com.swirlds.config.impl,
com.swirlds.config.extensions,
org.hiero.block.node.app;

requires transitive com.hedera.pbj.runtime;
requires transitive com.swirlds.config.api;
requires transitive org.hiero.block.node.spi;
requires transitive org.hiero.block.protobuf;
requires com.swirlds.metrics.api;
requires org.hiero.block.node.base;
requires com.github.spotbugs.annotations;

provides org.hiero.block.node.spi.BlockNodePlugin with
SubscriberServicePlugin;
}
Loading
Loading