Skip to content

Commit cf15057

Browse files
fix:refactoring, boosted test coverage
Signed-off-by: Matt Peterson <[email protected]>
1 parent bbc4066 commit cf15057

13 files changed

+294
-98
lines changed

server/src/main/java/com/hedera/block/server/Server.java

+8-3
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,20 @@ public static void main(final String[] args) {
6767

6868
// Initialize the reader and writer for the block storage
6969
final BlockWriter<BlockItem> blockWriter =
70-
new FileSystemBlockWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
70+
new BlockAsDirWriter(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
7171
final BlockReader<Block> blockReader =
72-
new FileSystemBlockReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
72+
new BlockAsDirReader(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
7373

7474
final BlockStreamService blockStreamService =
7575
new BlockStreamService(
7676
consumerTimeoutThreshold,
7777
new LiveStreamMediatorImpl(
78-
new WriteThroughCacheHandler(blockReader, blockWriter)));
78+
new WriteThroughCacheHandler(blockReader, blockWriter),
79+
(streamMediator) -> {
80+
LOGGER.log(
81+
System.Logger.Level.ERROR,
82+
"Shutting down the server due to an error.");
83+
}));
7984

8085
// Start the web server
8186
WebServer.builder()

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

+2
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
109109
final SubscribeStreamResponse subscribeStreamResponse =
110110
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
111111

112+
LOGGER.log(System.Logger.Level.DEBUG, "Send BlockItem downstream: {0} ", blockItem);
113+
112114
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
113115
}
114116
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

+11-4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19-
import static com.hedera.block.protos.BlockStreamService.*;
19+
import static com.hedera.block.protos.BlockStreamService.Block;
20+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
2021

2122
import com.hedera.block.server.consumer.BlockItemEventHandler;
2223
import com.hedera.block.server.data.ObjectEvent;
@@ -26,10 +27,12 @@
2627
import com.lmax.disruptor.RingBuffer;
2728
import com.lmax.disruptor.dsl.Disruptor;
2829
import com.lmax.disruptor.util.DaemonThreadFactory;
30+
import java.io.IOException;
2931
import java.util.HashMap;
3032
import java.util.Map;
3133
import java.util.concurrent.ExecutorService;
3234
import java.util.concurrent.Executors;
35+
import java.util.function.Consumer;
3336

3437
/**
3538
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
@@ -49,22 +52,25 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockI
4952
BatchEventProcessor<ObjectEvent<BlockItem>>>
5053
subscribers = new HashMap<>();
5154

52-
private final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler;
55+
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
56+
private final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback;
5357

5458
/**
5559
* Constructor for the LiveStreamMediatorImpl class.
5660
*
5761
* @param blockPersistenceHandler the block persistence handler
5862
*/
5963
public LiveStreamMediatorImpl(
60-
final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler) {
64+
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
65+
final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback) {
6166
this.blockPersistenceHandler = blockPersistenceHandler;
6267

6368
// Initialize and start the disruptor
6469
final Disruptor<ObjectEvent<BlockItem>> disruptor =
6570
new Disruptor<>(ObjectEvent::new, 1024, DaemonThreadFactory.INSTANCE);
6671
this.ringBuffer = disruptor.start();
6772
this.executor = Executors.newCachedThreadPool(DaemonThreadFactory.INSTANCE);
73+
this.shutdownCallback = shutdownCallback;
6874
}
6975

7076
@Override
@@ -77,9 +83,10 @@ public void publishEvent(BlockItem blockItem) {
7783
// Block persistence
7884
try {
7985
blockPersistenceHandler.persist(blockItem);
80-
} catch (Exception e) {
86+
} catch (IOException e) {
8187
// TODO: Push back on the producer?
8288
LOGGER.log(System.Logger.Level.ERROR, "Error occurred while persisting the block", e);
89+
shutdownCallback.accept(this);
8390
}
8491
}
8592

server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@
2929
public interface BlockPersistenceHandler<U, V> {
3030

3131
/** Persists a block. */
32-
void persist(final V blockItem) throws IOException;
32+
void persist(final U blockItem) throws IOException;
3333

3434
/**
3535
* Reads a block.
3636
*
3737
* @param blockNumber the number of the block to read
3838
* @return an Optional of the block
3939
*/
40-
Optional<U> read(final long blockNumber);
40+
Optional<V> read(final long blockNumber);
4141

4242
/**
4343
* Reads a range of blocks.
@@ -46,5 +46,5 @@ public interface BlockPersistenceHandler<U, V> {
4646
* @param endBlockNumber the id of the last block to read
4747
* @return a queue of blocks
4848
*/
49-
Queue<U> readRange(final long startBlockNumber, final long endBlockNumber);
49+
Queue<V> readRange(final long startBlockNumber, final long endBlockNumber);
5050
}

server/src/main/java/com/hedera/block/server/persistence/Util.java

-32
This file was deleted.

server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
* Write-Through cache handler coordinates between the block storage and the block cache to ensure
3131
* the block is persisted to the storage before being cached.
3232
*/
33-
public class WriteThroughCacheHandler implements BlockPersistenceHandler<Block, BlockItem> {
33+
public class WriteThroughCacheHandler implements BlockPersistenceHandler<BlockItem, Block> {
3434

3535
private final BlockReader<Block> blockReader;
3636
private final BlockWriter<BlockItem> blockWriter;

server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockReader.java renamed to server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirReader.java

+21-10
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,20 @@
2222
import static com.hedera.block.server.Constants.BLOCK_FILE_EXTENSION;
2323

2424
import io.helidon.config.Config;
25+
import java.io.File;
2526
import java.io.FileInputStream;
2627
import java.io.FileNotFoundException;
2728
import java.io.IOException;
2829
import java.nio.file.Path;
2930
import java.util.Optional;
3031

31-
public class FileSystemBlockReader implements BlockReader<Block> {
32+
public class BlockAsDirReader implements BlockReader<Block> {
3233

3334
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3435

3536
final Path blockNodeRootPath;
3637

37-
public FileSystemBlockReader(final String key, final Config config) {
38+
public BlockAsDirReader(final String key, final Config config) {
3839

3940
LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockReader");
4041

@@ -57,8 +58,8 @@ public Optional<Block> read(final long blockNumber) {
5758
}
5859

5960
// There may be thousands of BlockItem files in a Block directory.
60-
// The BlockItems must be written into the Block object in order. A
61-
// DirectoryStream will iterate in any guaranteed order. To avoid sorting,
61+
// The BlockItems must be added to the Block object in order. A
62+
// DirectoryStream will iterate without any guaranteed order. To avoid sorting,
6263
// and to keep the retrieval process linear with the number of BlockItems,
6364
// Run a loop to fetch in the order we need. The loop will break when
6465
// it looks for a BlockItem file that does not exist.
@@ -72,6 +73,8 @@ public Optional<Block> read(final long blockNumber) {
7273
continue;
7374
}
7475
} catch (IOException io) {
76+
// Return an empty Optional signaling an error. It's all or nothing
77+
// when retrieving a Block
7578
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + blockItemPath, io);
7679
return Optional.empty();
7780
}
@@ -84,15 +87,23 @@ public Optional<Block> read(final long blockNumber) {
8487
}
8588

8689
private Optional<BlockItem> readBlockItem(final String blockItemPath) throws IOException {
90+
8791
try (FileInputStream fis = new FileInputStream(blockItemPath)) {
8892
return Optional.of(BlockItem.parseFrom(fis));
8993
} catch (FileNotFoundException io) {
90-
// The outer loop caller will continue to query
91-
// for the next BlockItem file based on the index
92-
// until the FileNotFoundException is thrown.
93-
// It's expected that this exception will be caught
94-
// at the end of every query.
95-
return Optional.empty();
94+
File f = new File(blockItemPath);
95+
if (!f.exists()) {
96+
// The outer loop caller will continue to query
97+
// for the next BlockItem file based on the index
98+
// until the FileNotFoundException is thrown.
99+
// It's expected that this exception will be caught
100+
// at the end of every query.
101+
return Optional.empty();
102+
}
103+
104+
// FileNotFound is thrown also when a file cannot be read.
105+
// So re-throw here to make a different decision upstream.
106+
throw io;
96107
}
97108
}
98109

server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockWriter.java renamed to server/src/main/java/com/hedera/block/server/persistence/storage/BlockAsDirWriter.java

+9-21
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,15 @@
2626
import java.nio.file.Files;
2727
import java.nio.file.Path;
2828

29-
public class FileSystemBlockWriter implements BlockWriter<BlockItem> {
29+
public class BlockAsDirWriter implements BlockWriter<BlockItem> {
3030

3131
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3232

3333
private final Path blockNodeRootPath;
3434
private long blockNodeFileNameIndex = 0;
3535
private Path currentBlockDir;
3636

37-
public FileSystemBlockWriter(final String key, final Config config) throws IOException {
37+
public BlockAsDirWriter(final String key, final Config config) throws IOException {
3838

3939
LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage");
4040

@@ -49,16 +49,7 @@ public FileSystemBlockWriter(final String key, final Config config) throws IOExc
4949
}
5050

5151
// Initialize the block node root directory if it does not exist
52-
if (Files.notExists(blockNodeRootPath)) {
53-
Files.createDirectory(blockNodeRootPath);
54-
LOGGER.log(
55-
System.Logger.Level.INFO,
56-
"Created block node root directory: " + blockNodeRootPath);
57-
} else {
58-
LOGGER.log(
59-
System.Logger.Level.INFO,
60-
"Using existing block node root directory: " + blockNodeRootPath);
61-
}
52+
createPath(blockNodeRootPath, System.Logger.Level.INFO);
6253

6354
this.blockNodeRootPath = blockNodeRootPath;
6455
}
@@ -77,7 +68,8 @@ public void write(final BlockItem blockItem) throws IOException {
7768
"Successfully wrote the block item file: {0}",
7869
blockItemFilePath);
7970
} catch (IOException e) {
80-
LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e);
71+
LOGGER.log(
72+
System.Logger.Level.ERROR, "Error writing the BlockItem protobuf to a file", e);
8173
}
8274
}
8375

@@ -87,7 +79,7 @@ private void resetState(final BlockItem blockItem) throws IOException {
8779
currentBlockDir = Path.of(String.valueOf(blockItem.getHeader().getBlockNumber()));
8880

8981
// Construct the path to the block directory
90-
createPath(blockNodeRootPath.resolve(currentBlockDir));
82+
createPath(blockNodeRootPath.resolve(currentBlockDir), System.Logger.Level.DEBUG);
9183

9284
// Reset
9385
blockNodeFileNameIndex = 0;
@@ -100,17 +92,13 @@ private String calculateBlockItemPath() {
10092
return blockPath.resolve(blockNodeFileNameIndex + BLOCK_FILE_EXTENSION).toString();
10193
}
10294

103-
private void createPath(Path blockNodePath) throws IOException {
95+
private void createPath(Path blockNodePath, System.Logger.Level logLevel) throws IOException {
10496
// Initialize the Block directory if it does not exist
10597
if (Files.notExists(blockNodePath)) {
10698
Files.createDirectory(blockNodePath);
107-
LOGGER.log(
108-
System.Logger.Level.INFO,
109-
"Created block node root directory: " + blockNodePath);
99+
LOGGER.log(logLevel, "Created block node root directory: " + blockNodePath);
110100
} else {
111-
LOGGER.log(
112-
System.Logger.Level.INFO,
113-
"Using existing block node root directory: " + blockNodePath);
101+
LOGGER.log(logLevel, "Using existing block node root directory: " + blockNodePath);
114102
}
115103
}
116104
}

server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,15 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException {
8484
}
8585

8686
@Test
87-
public void testStuff() throws InterruptedException {
87+
public void testConsumerNotToSendBeforeBlockHeader() throws InterruptedException {
8888
final var consumerBlockItemObserver =
8989
new ConsumerBlockItemObserver(
9090
TIMEOUT_THRESHOLD_MILLIS,
9191
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
9292
streamMediator,
9393
responseStreamObserver);
9494

95+
// Send non-header BlockItems to validate that the observer does not send them
9596
for (int i = 1; i <= 10; i++) {
9697

9798
if (i % 2 == 0) {

0 commit comments

Comments
 (0)