Skip to content

Commit 7cc9abd

Browse files
fix: changed the subscribeBlockStream gRPC method from bidirectional to server streaming.
Signed-off-by: Matt Peterson <[email protected]>
1 parent 31be928 commit 7cc9abd

File tree

5 files changed

+22
-57
lines changed

5 files changed

+22
-57
lines changed

protos/src/main/protobuf/blockstream.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ service BlockStreamGrpcService {
2323

2424
rpc publishBlockStream (stream PublishStreamRequest) returns (stream PublishStreamResponse) {}
2525

26-
rpc subscribeBlockStream (stream SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}
26+
rpc subscribeBlockStream (SubscribeStreamRequest) returns (stream SubscribeStreamResponse) {}
2727
}
2828

2929
message PublishStreamRequest {

server/src/main/java/com/hedera/block/server/BlockStreamService.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -91,30 +91,33 @@ public String serviceName() {
9191
@Override
9292
public void update(final Routing routing) {
9393
routing.bidi(CLIENT_STREAMING_METHOD_NAME, this::publishBlockStream);
94-
routing.bidi(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
94+
routing.serverStream(SERVER_STREAMING_METHOD_NAME, this::subscribeBlockStream);
9595
}
9696

9797
private StreamObserver<PublishStreamRequest> publishBlockStream(
9898
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
99-
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSink method");
99+
LOGGER.log(
100+
System.Logger.Level.DEBUG,
101+
"Executing bidirectional publishBlockStream gRPC method");
100102

101103
return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver);
102104
}
103105

104-
private StreamObserver<SubscribeStreamRequest> subscribeBlockStream(
105-
final StreamObserver<SubscribeStreamResponse> subscribeStreamRequestObserver) {
106-
LOGGER.log(System.Logger.Level.DEBUG, "Executing bidirectional streamSource method");
106+
private void subscribeBlockStream(
107+
final SubscribeStreamRequest subscribeStreamRequest,
108+
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
109+
LOGGER.log(
110+
System.Logger.Level.DEBUG,
111+
"Executing Server Streaming subscribeBlockStream gRPC method");
107112

108113
// Return a custom StreamObserver to handle streaming blocks from the producer.
109114
final var streamObserver =
110115
new ConsumerBlockItemObserver(
111116
timeoutThresholdMillis,
112117
Clock.systemDefaultZone(),
113118
streamMediator,
114-
subscribeStreamRequestObserver);
119+
subscribeStreamResponseObserver);
115120

116121
streamMediator.subscribe(streamObserver);
117-
118-
return streamObserver;
119122
}
120123
}

server/src/main/java/com/hedera/block/server/Server.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,15 @@
3333
/** Main class for the block node server */
3434
public class Server {
3535

36-
// Function stubs to satisfy the bidi routing param signatures. The implementations are in the
36+
// Function stubs to satisfy the routing param signatures. The implementations are in the
3737
// service class.
3838
private static ServerCalls.BidiStreamingMethod<
3939
StreamObserver<PublishStreamRequest>, StreamObserver<PublishStreamResponse>>
4040
clientBidiStreamingMethod;
41-
private static ServerCalls.BidiStreamingMethod<
42-
StreamObserver<SubscribeStreamRequest>, StreamObserver<SubscribeStreamResponse>>
43-
serverBidiStreamingMethod;
41+
42+
public static ServerCalls.ServerStreamingMethod<
43+
SubscribeStreamRequest, StreamObserver<SubscribeStreamResponse>>
44+
serverStreamingMethod;
4445

4546
private static final System.Logger LOGGER = System.getLogger(Server.class.getName());
4647

@@ -85,12 +86,12 @@ public static void main(final String[] args) {
8586
SERVICE_NAME,
8687
CLIENT_STREAMING_METHOD_NAME,
8788
clientBidiStreamingMethod)
88-
.bidi(
89+
.serverStream(
8990
com.hedera.block.protos.BlockStreamService
9091
.getDescriptor(),
9192
SERVICE_NAME,
9293
SERVER_STREAMING_METHOD_NAME,
93-
serverBidiStreamingMethod))
94+
serverStreamingMethod))
9495
.build()
9596
.start();
9697

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,8 @@
1616

1717
package com.hedera.block.server.consumer;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
21-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
19+
import static com.hedera.block.protos.BlockStreamService.*;
2220

23-
import com.hedera.block.protos.BlockStreamService;
2421
import com.hedera.block.server.data.ObjectEvent;
2522
import com.hedera.block.server.mediator.StreamMediator;
2623
import io.grpc.stub.ServerCallStreamObserver;
@@ -39,8 +36,7 @@ public class ConsumerBlockItemObserver
3936

4037
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4138

42-
private final StreamObserver<BlockStreamService.SubscribeStreamResponse>
43-
subscribeStreamResponseObserver;
39+
private final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver;
4440

4541
private final long timeoutThresholdMillis;
4642
private final InstantSource producerLivenessClock;

server/src/test/resources/consumer.sh

+1-36
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,8 @@ if [ "$#" -lt 1 ] || ! [[ "$1" =~ ^[0-9]+$ ]]; then
1010
usage_error
1111
fi
1212

13-
# Check if the second argument is provided and if it's a positive integer
14-
if [ "$#" -eq 2 ] && ! [[ "$2" =~ ^[1-9][0-9]*$ ]]; then
15-
usage_error
16-
fi
17-
1813
# If the script reaches here, the parameters are valid
1914
echo "The provided integer is: $1"
20-
if [ "$#" -eq 2 ]; then
21-
echo "The optional positive integer is: $2"
22-
fi
2315

2416
# Use environment variables or default values
2517
GRPC_SERVER=${GRPC_SERVER:-"localhost:8080"}
@@ -29,32 +21,5 @@ PROTO_IMPORT_PATH=${PROTO_IMPORT_PATH:-"../../../../protos/src/main/protobuf"}
2921

3022
echo "Starting consumer..."
3123

32-
# Signal handler to handle SIGINT (Ctrl+C)
33-
function cleanup {
34-
echo "Received SIGINT, stopping..."
35-
kill $GRPC_PID
36-
exit 0
37-
}
38-
39-
# Trap SIGINT
40-
trap cleanup SIGINT
41-
42-
# Generate and push messages to the gRPC server as a consumer.
4324
# Response block messages from the gRPC server are printed to stdout.
44-
(
45-
iter=$1
46-
while true; do
47-
echo "{\"start_block_number\": $iter}"
48-
49-
if [ $iter -eq $2 ]; then
50-
exit 0
51-
fi
52-
53-
((iter++))
54-
55-
# Configure the message speed
56-
sleep 1
57-
58-
done
59-
) | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD
60-
25+
echo "{\"start_block_number\": $1}" | grpcurl -plaintext -import-path $PROTO_IMPORT_PATH -proto $PATH_TO_PROTO -d @ $GRPC_SERVER $GRPC_METHOD

0 commit comments

Comments
 (0)