Skip to content

Commit 0608e65

Browse files
Issues 16, 17, 18 and 21
- added license headers to java files - added bidirectional gRPC live streaming blocks implementation with 1 producer and N consumers Signed-off-by: Matt Peterson <[email protected]>
1 parent e1eeea3 commit 0608e65

33 files changed

+1789
-78
lines changed

.gitignore

+4-1
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,7 @@ gradle-app.setting
4545
.project
4646

4747
# JDT-specific (Eclipse Java Development Tools)
48-
.classpath
48+
.classpath
49+
50+
data.txt
51+

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,13 @@ to [[email protected]](mailto:[email protected]).
2828

2929
Please do not file a public ticket mentioning the vulnerability. Refer to the security policy defined in the [SECURITY.md](https://github.com/hashgraph/hedera-sourcify/blob/main/SECURITY.md).
3030

31+
---
32+
33+
# Running Locally
34+
35+
1) Create a local temp directory. For example, use `mktemp -d -t block-stream-temp-dir` to create a directory
36+
2) export BLOCKNODE_STORAGE_ROOT_PATH=<path to the temp directory> # You can add this to your .zshrc, etc
37+
3) ./gradlew run # ./gradlew run --debug-jvm to run in debug mode
38+
39+
# Running Tests
40+
1) ./gradlew build

gradle/modules.properties

+1
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ io.helidon.webserver=io.helidon.webserver:helidon-webserver
33
io.helidon.webserver.grpc=io.helidon.webserver:helidon-webserver-grpc
44
io.helidon.webserver.testing.junit5=io.helidon.webserver.testing.junit5:helidon-webserver-testing-junit5
55
io.grpc=io.grpc:grpc-stub
6+
grpc.protobuf=io.grpc:grpc-protobuf:1.20.0
+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
syntax = "proto3";
2+
3+
option java_package = "com.hedera.block.protos";
4+
option java_outer_classname = "BlockStreamServiceGrpcProto";
5+
6+
service BlockStreamGrpc {
7+
rpc StreamSink(stream Block) returns (stream BlockResponse) {}
8+
rpc StreamSource(stream BlockResponse) returns (stream Block) {}
9+
}
10+
11+
message Block {
12+
int64 id = 1;
13+
string value = 2;
14+
}
15+
16+
message BlockResponse {
17+
int64 id = 1;
18+
}
19+
20+
message Empty {
21+
}

protos/src/main/protobuf/echo.proto

-15
This file was deleted.

server/build.gradle.kts

+2-5
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,5 @@ application {
2525
}
2626

2727
testModuleInfo {
28-
// requires("org.assertj.core")
29-
// requires("net.bytebuddy")
30-
// requires("org.junit.jupiter.api")
31-
// requires("org.junit.jupiter.params")
32-
}
28+
requires("org.junit.jupiter.api")
29+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Hedera Block Node
3+
*
4+
* Copyright (C) 2024 Hedera Hashgraph, LLC
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.hedera.block.server;
20+
21+
import com.google.protobuf.Descriptors;
22+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
23+
import com.hedera.block.server.producer.ProducerBlockStreamObserver;
24+
import com.hedera.block.server.mediator.StreamMediator;
25+
import com.hedera.block.server.consumer.LiveStreamObserverImpl;
26+
import com.hedera.block.server.consumer.LiveStreamObserver;
27+
import io.grpc.stub.StreamObserver;
28+
import io.helidon.webserver.grpc.GrpcService;
29+
30+
import java.util.logging.Logger;
31+
32+
import static com.hedera.block.server.Constants.*;
33+
34+
/**
35+
* This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
36+
* It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
37+
* It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
38+
*
39+
* <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
40+
* respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
41+
*
42+
*/
43+
public class BlockStreamService implements GrpcService {
44+
45+
private final Logger logger = Logger.getLogger(getClass().getName());
46+
47+
private final long timeoutThresholdMillis;
48+
private final StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator;
49+
50+
/**
51+
* Constructor for the BlockStreamService class.
52+
*
53+
* @param timeoutThresholdMillis the timeout threshold in milliseconds
54+
* @param streamMediator the stream mediator
55+
*/
56+
public BlockStreamService(long timeoutThresholdMillis,
57+
StreamMediator<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamMediator) {
58+
59+
this.timeoutThresholdMillis = timeoutThresholdMillis;
60+
this.streamMediator = streamMediator;
61+
}
62+
63+
/**
64+
* Returns the FileDescriptor for the BlockStreamServiceGrpcProto.
65+
*
66+
* @return the FileDescriptor for the BlockStreamServiceGrpcProto
67+
*/
68+
@Override
69+
public Descriptors.FileDescriptor proto() {
70+
return BlockStreamServiceGrpcProto.getDescriptor();
71+
}
72+
73+
/**
74+
* Returns the service name for the BlockStreamService. This service name corresponds to the service name in the proto file.
75+
*
76+
* @return the service name
77+
*/
78+
@Override
79+
public String serviceName() {
80+
return SERVICE_NAME;
81+
}
82+
83+
/**
84+
* Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
85+
*
86+
* @param routing the routing for the BlockStreamService
87+
*/
88+
@Override
89+
public void update(Routing routing) {
90+
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::streamSink);
91+
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::streamSource);
92+
}
93+
94+
/**
95+
* The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
96+
*
97+
* @param responseStreamObserver - Helidon provides a StreamObserver to handle responses back to the producer.
98+
*
99+
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumers
100+
* via the streamMediator as well as sending responses back to the producer.
101+
*/
102+
private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> responseStreamObserver) {
103+
logger.finer("Executing bidirectional streamSink method");
104+
105+
return new ProducerBlockStreamObserver(streamMediator, responseStreamObserver);
106+
}
107+
108+
/**
109+
* The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
110+
*
111+
* @param responseStreamObserver - Helidon provides a StreamObserver to handle responses from the consumer
112+
* back to the server.
113+
*
114+
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well as
115+
* handling responses from the consumer.
116+
*/
117+
private StreamObserver<BlockStreamServiceGrpcProto.BlockResponse> streamSource(StreamObserver<BlockStreamServiceGrpcProto.Block> responseStreamObserver) {
118+
logger.finer("Executing bidirectional streamSource method");
119+
120+
// Return a custom StreamObserver to handle streaming blocks from the producer.
121+
LiveStreamObserver<BlockStreamServiceGrpcProto.Block, BlockStreamServiceGrpcProto.BlockResponse> streamObserver = new LiveStreamObserverImpl(
122+
timeoutThresholdMillis,
123+
streamMediator,
124+
responseStreamObserver);
125+
streamMediator.subscribe(streamObserver);
126+
127+
return streamObserver;
128+
}
129+
}
130+
131+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Hedera Block Node
3+
*
4+
* Copyright (C) 2024 Hedera Hashgraph, LLC
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.hedera.block.server;
20+
21+
public final class Constants {
22+
private Constants() {}
23+
24+
// Config Constants
25+
public static String BLOCKNODE_STORAGE_ROOT_PATH_KEY = "blocknode.storage.root.path";
26+
27+
// Constants specified in the service definition of the .proto file
28+
public static String SERVICE_NAME = "BlockStreamGrpc";
29+
public static String CLIENT_STREAMING_METHOD_NAME = "StreamSink";
30+
public static String SERVER_STREAMING_METHOD_NAME = "StreamSource";
31+
}

server/src/main/java/com/hedera/block/server/EchoService.java

-38
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,96 @@
1+
/*
2+
* Hedera Block Node
3+
*
4+
* Copyright (C) 2024 Hedera Hashgraph, LLC
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
119
package com.hedera.block.server;
220

3-
import com.hedera.block.protos.EchoServiceGrpcProto;
21+
import com.hedera.block.protos.BlockStreamServiceGrpcProto;
22+
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
23+
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
24+
import com.hedera.block.server.persistence.cache.BlockCache;
25+
import com.hedera.block.server.persistence.cache.LRUCache;
26+
import com.hedera.block.server.persistence.storage.BlockStorage;
27+
import com.hedera.block.server.persistence.storage.FileSystemBlockStorage;
28+
import io.grpc.stub.ServerCalls;
429
import io.grpc.stub.StreamObserver;
30+
import io.helidon.config.Config;
531
import io.helidon.webserver.WebServer;
632
import io.helidon.webserver.grpc.GrpcRouting;
733
import io.helidon.webserver.http.HttpRouting;
834

35+
import java.io.IOException;
36+
import java.util.logging.Logger;
37+
import java.util.stream.Stream;
38+
39+
import static com.hedera.block.server.Constants.*;
40+
941
/**
1042
* Main class for the block node server
1143
*/
1244
public class Server {
13-
private Server() {
14-
// Not meant to be instantiated
15-
}
45+
46+
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the service class.
47+
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.Block>, StreamObserver<BlockStreamServiceGrpcProto.Block>> clientBidiStreamingMethod;
48+
private static ServerCalls.BidiStreamingMethod<Stream<BlockStreamServiceGrpcProto.BlockResponse>, StreamObserver<BlockStreamServiceGrpcProto.Block>> serverBidiStreamingMethod;
49+
50+
private static final Logger logger = Logger.getLogger(Server.class.getName());
51+
52+
private Server() {}
1653

1754
/**
1855
* Main entrypoint for the block node server
1956
*
2057
* @param args Command line arguments. Not used at present,
2158
*/
2259
public static void main(String[] args) {
23-
WebServer.builder()
24-
.port(8080)
25-
.addRouting(HttpRouting.builder()
26-
.get("/greet", (req, res) -> res.send("Hello World!")))
27-
.addRouting(GrpcRouting.builder()
28-
.service(new EchoService())
29-
.unary(EchoServiceGrpcProto.getDescriptor(),
30-
"EchoService",
31-
"Echo",
32-
Server::grpcEcho))
33-
.build()
34-
.start();
35-
}
3660

37-
static void grpcEcho(EchoServiceGrpcProto.EchoRequest request, StreamObserver<EchoServiceGrpcProto.EchoResponse> responseObserver) {}
61+
try {
62+
63+
// Set the global configuration
64+
Config config = Config.create();
65+
Config.global(config);
66+
67+
// Initialize the block storage, cache, and service
68+
BlockStorage<BlockStreamServiceGrpcProto.Block> blockStorage = new FileSystemBlockStorage(BLOCKNODE_STORAGE_ROOT_PATH_KEY, config);
69+
BlockCache<BlockStreamServiceGrpcProto.Block> blockCache = new LRUCache(1000);
70+
BlockStreamService blockStreamService = new BlockStreamService(1500,
71+
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage, blockCache)));
72+
73+
// Start the web server
74+
WebServer.builder()
75+
.port(8080)
76+
.addRouting(HttpRouting.builder()
77+
.get("/greet", (req, res) -> res.send("Hello World!")))
78+
.addRouting(GrpcRouting.builder()
79+
.service(blockStreamService)
80+
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
81+
SERVICE_NAME,
82+
CLIENT_STREAMING_METHOD_NAME,
83+
clientBidiStreamingMethod)
84+
.bidi(BlockStreamServiceGrpcProto.getDescriptor(),
85+
SERVICE_NAME,
86+
SERVER_STREAMING_METHOD_NAME,
87+
serverBidiStreamingMethod))
88+
.build()
89+
.start();
90+
91+
} catch (IOException e) {
92+
logger.severe("There was an exception starting the server: " + e.getMessage());
93+
throw new RuntimeException(e);
94+
}
95+
}
3896
}

0 commit comments

Comments
 (0)