Skip to content

Commit 84240b1

Browse files
added additional BlockItem type support
Signed-off-by: Matt Peterson <[email protected]>
1 parent 8951fa0 commit 84240b1

File tree

12 files changed

+238
-168
lines changed

12 files changed

+238
-168
lines changed

protos/src/main/protobuf/blockstream.proto

+9-2
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,18 @@ message Block {
5757
*/
5858
message BlockItem {
5959

60-
BlockHeader block_header = 1;
60+
oneof items {
61+
BlockHeader block_header = 1;
62+
BlockProof state_proof = 2;
63+
}
6164

62-
string value = 2;
65+
string value = 3;
6366
}
6467

6568
message BlockHeader {
6669
uint64 block_number = 1;
70+
}
71+
72+
message BlockProof {
73+
uint64 block = 1;
6774
}

server/src/main/java/com/hedera/block/server/Server.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package com.hedera.block.server;
1818

19-
import static com.hedera.block.protos.BlockStreamService.*;
20-
import static com.hedera.block.server.Constants.*;
21-
2219
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
2320
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
2421
import com.hedera.block.server.persistence.storage.BlockStorage;
@@ -28,8 +25,12 @@
2825
import io.helidon.config.Config;
2926
import io.helidon.webserver.WebServer;
3027
import io.helidon.webserver.grpc.GrpcRouting;
28+
3129
import java.io.IOException;
3230

31+
import static com.hedera.block.protos.BlockStreamService.*;
32+
import static com.hedera.block.server.Constants.*;
33+
3334
/** Main class for the block node server */
3435
public class Server {
3536

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

+3-10
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ public class ConsumerBlockItemObserver
5050

5151
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
5252

53-
private boolean isReachedFirstBlock;
5453

5554
/**
5655
* Constructor for the LiveStreamObserverImpl class.
@@ -99,16 +98,10 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
9998
producerLivenessMillis = producerLivenessClock.millis();
10099

101100
final BlockItem blockItem = event.get();
102-
if (!isReachedFirstBlock && blockItem.getBlockHeader() > 0) {
103-
isReachedFirstBlock = true;
104-
}
105-
106-
if (isReachedFirstBlock) {
107-
final SubscribeStreamResponse subscribeStreamResponse =
108-
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
101+
final SubscribeStreamResponse subscribeStreamResponse =
102+
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
109103

110-
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
111-
}
104+
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
112105
}
113106

114107
/**

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
21-
2219
import com.hedera.block.server.consumer.BlockItemEventHandler;
2320
import com.hedera.block.server.data.ObjectEvent;
2421
import com.hedera.block.server.persistence.BlockPersistenceHandler;
@@ -27,11 +24,14 @@
2724
import com.lmax.disruptor.RingBuffer;
2825
import com.lmax.disruptor.dsl.Disruptor;
2926
import com.lmax.disruptor.util.DaemonThreadFactory;
27+
3028
import java.util.HashMap;
3129
import java.util.Map;
3230
import java.util.concurrent.ExecutorService;
3331
import java.util.concurrent.Executors;
3432

33+
import static com.hedera.block.protos.BlockStreamService.*;
34+
3535
/**
3636
* LiveStreamMediatorImpl is the implementation of the StreamMediator interface. It is responsible
3737
* for managing the subscribe and unsubscribe operations of downstream consumers. It also proxies
@@ -51,15 +51,15 @@ public class LiveStreamMediatorImpl
5151
BatchEventProcessor<ObjectEvent<BlockItem>>>
5252
subscribers = new HashMap<>();
5353

54-
private final BlockPersistenceHandler<BlockItem> blockPersistenceHandler;
54+
private final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler;
5555

5656
/**
5757
* Constructor for the LiveStreamMediatorImpl class.
5858
*
5959
* @param blockPersistenceHandler the block persistence handler
6060
*/
6161
public LiveStreamMediatorImpl(
62-
final BlockPersistenceHandler<BlockItem> blockPersistenceHandler) {
62+
final BlockPersistenceHandler<Block, BlockItem> blockPersistenceHandler) {
6363
this.blockPersistenceHandler = blockPersistenceHandler;
6464

6565
// Initialize and start the disruptor

server/src/main/java/com/hedera/block/server/persistence/BlockPersistenceHandler.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@ public interface BlockPersistenceHandler<U, V> {
3030
/**
3131
* Persists a block.
3232
*
33-
* @param blockNumber the block to persist
34-
* @return the id of the block
3533
*/
36-
Long persist(final V blockItem, final long blockNumber);
34+
void persist(final V blockItem);
3735

3836
/**
3937
* Reads a block.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.persistence;
18+
19+
import static com.hedera.block.protos.BlockStreamService.Block;
20+
import static com.hedera.block.protos.BlockStreamService.BlockHeader;
21+
22+
public final class Util {
23+
private Util() {}
24+
25+
public static long getBlockNumber(final Block block) {
26+
return getBlockHeader(block).getBlockNumber();
27+
}
28+
29+
public static BlockHeader getBlockHeader(final Block block) {
30+
return block.getBlockItems(0).getBlockHeader();
31+
}
32+
}

server/src/main/java/com/hedera/block/server/persistence/WriteThroughCacheHandler.java

+3-8
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818

1919
import com.hedera.block.server.persistence.storage.BlockStorage;
2020

21-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
2221
import static com.hedera.block.protos.BlockStreamService.Block;
22+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
2323

2424
import java.util.LinkedList;
2525
import java.util.Optional;
@@ -45,15 +45,10 @@ public WriteThroughCacheHandler(final BlockStorage<Block, BlockItem> blockStorag
4545
/**
4646
* Persists the block to the block storage and cache the block.
4747
*
48-
* @param blockItem the block to persist
49-
* @return the block id
5048
*/
5149
@Override
52-
public Long persist(final BlockItem blockItem, final long blockNumber) {
53-
54-
// Write-Through cache
55-
blockStorage.write(blockItem, blockNumber);
56-
return blockItem.getBlockHeader().getBlockNumber();
50+
public void persist(final BlockItem blockItem) {
51+
blockStorage.write(blockItem);
5752
}
5853

5954
/**

server/src/main/java/com/hedera/block/server/persistence/storage/BlockStorage.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,17 @@
2323
*
2424
* @param <V> the type of block to store
2525
*/
26-
public interface BlockStorage<V> {
26+
public interface BlockStorage<U, V> {
2727

2828
/**
2929
* Writes a block to storage.
30-
*
31-
* @return the id of the block
3230
*/
33-
Optional<Long> write(final V block, final long blockNumber);
31+
void write(final V blockItem);
3432

3533
/**
3634
* Reads a block from storage.
3735
*
3836
* @return the block
3937
*/
40-
Optional<V> read(final long blockNumber);
38+
Optional<U> read(final long blockNumber);
4139
}

server/src/main/java/com/hedera/block/server/persistence/storage/FileSystemBlockStorage.java

+84-26
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.io.IOException;
2525
import java.nio.file.Files;
2626
import java.nio.file.Path;
27-
import java.util.Collections;
2827
import java.util.Optional;
2928

3029
import static com.hedera.block.protos.BlockStreamService.Block;
30+
import static com.hedera.block.protos.BlockStreamService.Block.Builder;
3131
import static com.hedera.block.protos.BlockStreamService.BlockItem;
3232
import static com.hedera.block.server.Constants.BLOCKNODE_STORAGE_ROOT_PATH_KEY;
3333

@@ -37,10 +37,14 @@
3737
*/
3838
public class FileSystemBlockStorage implements BlockStorage<Block, BlockItem> {
3939

40+
private final System.Logger LOGGER = System.getLogger(getClass().getName());
41+
4042
public static final String BLOCK_FILE_EXTENSION = ".blk";
4143

4244
private final Path blockNodeRootPath;
43-
private final System.Logger LOGGER = System.getLogger(getClass().getName());
45+
46+
private long currentIndex = 1;
47+
private Path currentBlockDir;
4448

4549
/**
4650
* Constructs a FileSystemBlockStorage object.
@@ -50,12 +54,11 @@ public class FileSystemBlockStorage implements BlockStorage<Block, BlockItem> {
5054
* @throws IOException if an I/O error occurs while initializing the block node root directory
5155
*/
5256
public FileSystemBlockStorage(final String key, final Config config) throws IOException {
53-
5457
LOGGER.log(System.Logger.Level.INFO, "Initializing FileSystemBlockStorage");
55-
LOGGER.log(System.Logger.Level.INFO, config.toString());
5658

5759
blockNodeRootPath = Path.of(config.get(key).asString().get());
5860

61+
LOGGER.log(System.Logger.Level.INFO, config.toString());
5962
LOGGER.log(System.Logger.Level.INFO, "Block Node Root Path: " + blockNodeRootPath);
6063

6164
if (!blockNodeRootPath.isAbsolute()) {
@@ -79,20 +82,20 @@ public FileSystemBlockStorage(final String key, final Config config) throws IOEx
7982
/**
8083
* Writes a block to the filesystem.
8184
*
82-
* @param blockItem the block to write
83-
* @return the id of the block
8485
*/
8586
@Override
86-
public Optional<Long> write(final BlockItem blockItem, final long blockNumber) {
87-
final String fullPath = resolvePath(blockNumber);
88-
try (FileOutputStream fos = new FileOutputStream(fullPath)) {
89-
blockItem.writeTo(fos);
90-
LOGGER.log(System.Logger.Level.DEBUG, "Successfully wrote the block file: " + fullPath);
91-
92-
return Optional.of(blockNumber);
93-
} catch (IOException e) {
94-
LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e);
95-
return Optional.empty();
87+
public void write(final BlockItem blockItem) {
88+
89+
try {
90+
final String blockItemFilePath = getAbsoluteFilePath(blockItem);
91+
try (FileOutputStream fos = new FileOutputStream(blockItemFilePath)) {
92+
blockItem.writeTo(fos);
93+
LOGGER.log(System.Logger.Level.INFO, "Successfully wrote the block item file: {0}", blockItemFilePath);
94+
} catch (IOException e) {
95+
LOGGER.log(System.Logger.Level.ERROR, "Error writing the protobuf to a file", e);
96+
}
97+
} catch (IOException io) {
98+
LOGGER.log(System.Logger.Level.ERROR, "Error calculating the block item path", io);
9699
}
97100
}
98101

@@ -104,27 +107,82 @@ public Optional<Long> write(final BlockItem blockItem, final long blockNumber) {
104107
*/
105108
@Override
106109
public Optional<Block> read(final long id) {
107-
return read(resolvePath(id));
110+
111+
final Builder builder = Block.newBuilder();
112+
final Path blockPath = blockNodeRootPath.resolve(String.valueOf(id));
113+
return read(blockPath, builder);
108114
}
109115

110-
private Optional<Block> read(final String filePath) {
116+
private Optional<Block> read(final Path blockPath, final Builder builder) {
117+
118+
// Directly count and add BlockItems into the Block
119+
// to keep the retrieval process O(BlockItems)
120+
boolean isEnd = false;
121+
for (int i = 1;!isEnd;i++) {
122+
final Path blockItemPath = blockPath.resolve(i + BLOCK_FILE_EXTENSION);
123+
final Optional<BlockItem> blockItemOpt = readBlockItem(blockItemPath.toString());
124+
if (blockItemOpt.isPresent()) {
125+
builder.addBlockItems(blockItemOpt.get());
126+
continue;
127+
}
128+
129+
isEnd = true;
130+
}
131+
132+
return Optional.of(builder.build());
133+
}
111134

112-
try (FileInputStream fis = new FileInputStream(filePath)) {
135+
private Optional<BlockItem> readBlockItem(final String blockItemPath) {
136+
try (FileInputStream fis = new FileInputStream(blockItemPath)) {
113137
return Optional.of(BlockItem.parseFrom(fis));
114138
} catch (FileNotFoundException io) {
115-
LOGGER.log(System.Logger.Level.ERROR, "Error reading file: " + filePath, io);
116139
return Optional.empty();
117140
} catch (IOException io) {
118-
throw new RuntimeException("Error reading file: " + filePath, io);
141+
throw new RuntimeException("Error reading file: " + blockItemPath, io);
119142
}
120143
}
121144

122-
private String resolvePath(final long blockNumber) {
145+
private String getAbsoluteFilePath(final BlockItem blockItem) throws IOException {
123146

124-
String fileName = id + BLOCK_FILE_EXTENSION;
125-
Path fullPath = blockNodeRootPath.resolve(fileName);
126-
LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath);
147+
if (blockItem.hasBlockHeader()) {
127148

128-
return fullPath.toString();
149+
// A "block" is a directory of blockItems. Create the "block"
150+
// based on the block_number
151+
currentBlockDir = Path.of(String.valueOf(blockItem.getBlockHeader().getBlockNumber()));
152+
153+
final Path blockPath = blockNodeRootPath.resolve(currentBlockDir);
154+
createPath(blockPath);
155+
156+
// Build the path to the BlockHeader.blk file
157+
currentIndex = 1;
158+
return blockPath.resolve(currentIndex + BLOCK_FILE_EXTENSION).toString();
159+
}
160+
161+
// Build the path to a .blk file
162+
final Path blockPath = blockNodeRootPath.resolve(currentBlockDir);
163+
return blockPath.resolve(++currentIndex + BLOCK_FILE_EXTENSION).toString();
164+
}
165+
166+
private void createPath(Path blockNodePath) throws IOException {
167+
// Initialize the block node root directory if it does not exist
168+
if (Files.notExists(blockNodePath)) {
169+
Files.createDirectory(blockNodePath);
170+
LOGGER.log(
171+
System.Logger.Level.INFO,
172+
"Created block node root directory: " + blockNodePath);
173+
} else {
174+
LOGGER.log(
175+
System.Logger.Level.INFO,
176+
"Using existing block node root directory: " + blockNodePath);
177+
}
129178
}
179+
180+
// private String resolvePath(final long blockNumber) {
181+
//
182+
// String fileName = blockNumber + BLOCK_FILE_EXTENSION;
183+
// Path fullPath = blockNodeRootPath.resolve(fileName);
184+
// LOGGER.log(System.Logger.Level.DEBUG, "Resolved fullPath: " + fullPath);
185+
//
186+
// return fullPath.toString();
187+
// }
130188
}

0 commit comments

Comments
 (0)