16
16
17
17
package com .hedera .block .server ;
18
18
19
+ import static com .hedera .block .server .Constants .*;
20
+ import static io .helidon .webserver .grpc .ResponseHelper .complete ;
21
+
19
22
import com .google .protobuf .Descriptors ;
20
23
import com .hedera .block .protos .BlockStreamServiceGrpcProto ;
21
24
import com .hedera .block .server .consumer .LiveStreamObserver ;
26
29
import io .grpc .Status ;
27
30
import io .grpc .stub .StreamObserver ;
28
31
import io .helidon .webserver .grpc .GrpcService ;
29
-
30
32
import java .time .Clock ;
31
33
import java .util .Optional ;
32
34
33
- import static io .helidon .webserver .grpc .ResponseHelper .complete ;
34
-
35
- import static com .hedera .block .server .Constants .*;
36
-
37
35
/**
38
- * This class implements the GrpcService interface and provides the functionality for the BlockStreamService.
39
- * It sets up the bidirectional streaming methods for the service and handles the routing for these methods.
40
- * It also initializes the StreamMediator, BlockStorage, and BlockCache upon creation.
36
+ * This class implements the GrpcService interface and provides the functionality for the
37
+ * BlockStreamService. It sets up the bidirectional streaming methods for the service and handles
38
+ * the routing for these methods. It also initializes the StreamMediator, BlockStorage, and
39
+ * BlockCache upon creation.
41
40
*
42
- * <p>The class provides two main methods, streamSink and streamSource, which handle the client and server streaming
43
- * respectively. These methods return custom StreamObservers which are used to observe and respond to the streams.
41
+ * <p>The class provides two main methods, streamSink and streamSource, which handle the client and
42
+ * server streaming respectively. These methods return custom StreamObservers which are used to
43
+ * observe and respond to the streams.
44
44
*/
45
45
public class BlockStreamService implements GrpcService {
46
46
47
47
private final System .Logger LOGGER = System .getLogger (getClass ().getName ());
48
48
49
49
private final long timeoutThresholdMillis ;
50
- private final StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamMediator ;
51
- private final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block > blockPersistenceHandler ;
50
+ private final StreamMediator <
51
+ BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse >
52
+ streamMediator ;
53
+ private final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block >
54
+ blockPersistenceHandler ;
52
55
53
56
/**
54
57
* Constructor for the BlockStreamService class.
55
58
*
56
59
* @param timeoutThresholdMillis the timeout threshold in milliseconds
57
60
* @param streamMediator the stream mediator
58
61
*/
59
- public BlockStreamService (final long timeoutThresholdMillis ,
60
- final StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamMediator ,
61
- final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block > blockPersistenceHandler ) {
62
+ public BlockStreamService (
63
+ final long timeoutThresholdMillis ,
64
+ final StreamMediator <
65
+ BlockStreamServiceGrpcProto .Block ,
66
+ BlockStreamServiceGrpcProto .BlockResponse >
67
+ streamMediator ,
68
+ final BlockPersistenceHandler <BlockStreamServiceGrpcProto .Block >
69
+ blockPersistenceHandler ) {
62
70
63
71
this .timeoutThresholdMillis = timeoutThresholdMillis ;
64
72
this .streamMediator = streamMediator ;
@@ -76,8 +84,8 @@ public Descriptors.FileDescriptor proto() {
76
84
}
77
85
78
86
/**
79
- * Returns the service name for the BlockStreamService. This service name corresponds to the service name in
80
- * the proto file.
87
+ * Returns the service name for the BlockStreamService. This service name corresponds to the
88
+ * service name in the proto file.
81
89
*
82
90
* @return the service name corresponding to the service name in the proto file
83
91
*/
@@ -87,7 +95,8 @@ public String serviceName() {
87
95
}
88
96
89
97
/**
90
- * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming methods for the service.
98
+ * Updates the routing for the BlockStreamService. It sets up the bidirectional streaming
99
+ * methods for the service.
91
100
*
92
101
* @param routing the routing for the BlockStreamService
93
102
*/
@@ -99,64 +108,75 @@ public void update(final Routing routing) {
99
108
}
100
109
101
110
/**
102
- * The streamSink method is called by Helidon each time a producer initiates a bidirectional stream.
103
- *
104
- * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to the producer.
111
+ * The streamSink method is called by Helidon each time a producer initiates a bidirectional
112
+ * stream.
105
113
*
106
- * @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumer
107
- * via the streamMediator as well as sending responses back to the producer.
114
+ * @param responseStreamObserver Helidon provides a StreamObserver to handle responses back to
115
+ * the producer.
116
+ * @return a custom StreamObserver to handle streaming blocks from the producer to all
117
+ * subscribed consumer via the streamMediator as well as sending responses back to the
118
+ * producer.
108
119
*/
109
120
private StreamObserver <BlockStreamServiceGrpcProto .Block > streamSink (
110
- final StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > responseStreamObserver ) {
121
+ final StreamObserver <BlockStreamServiceGrpcProto .BlockResponse >
122
+ responseStreamObserver ) {
111
123
LOGGER .log (System .Logger .Level .DEBUG , "Executing bidirectional streamSink method" );
112
124
113
125
return new ProducerBlockStreamObserver (streamMediator , responseStreamObserver );
114
126
}
115
127
116
128
/**
117
- * The streamSource method is called by Helidon each time a consumer initiates a bidirectional stream.
129
+ * The streamSource method is called by Helidon each time a consumer initiates a bidirectional
130
+ * stream.
118
131
*
119
- * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the consumer
120
- * back to the server.
121
- *
122
- * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well
123
- * as handling responses from the consumer.
132
+ * @param responseStreamObserver Helidon provides a StreamObserver to handle responses from the
133
+ * consumer back to the server.
134
+ * @return a custom StreamObserver to handle streaming blocks from the producer to the consumer
135
+ * as well as handling responses from the consumer.
124
136
*/
125
- private StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > streamSource (final StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
137
+ private StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > streamSource (
138
+ final StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
126
139
LOGGER .log (System .Logger .Level .DEBUG , "Executing bidirectional streamSource method" );
127
140
128
141
// Return a custom StreamObserver to handle streaming blocks from the producer.
129
- final LiveStreamObserver <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamObserver = new LiveStreamObserverImpl (
130
- timeoutThresholdMillis ,
131
- Clock .systemDefaultZone (),
132
- Clock .systemDefaultZone (),
133
- streamMediator ,
134
- responseStreamObserver );
142
+ final LiveStreamObserver <
143
+ BlockStreamServiceGrpcProto .Block ,
144
+ BlockStreamServiceGrpcProto .BlockResponse >
145
+ streamObserver =
146
+ new LiveStreamObserverImpl (
147
+ timeoutThresholdMillis ,
148
+ Clock .systemDefaultZone (),
149
+ Clock .systemDefaultZone (),
150
+ streamMediator ,
151
+ responseStreamObserver );
135
152
136
153
// Subscribe the observer to the mediator
137
154
streamMediator .subscribe (streamObserver );
138
155
139
156
return streamObserver ;
140
157
}
141
158
142
- private void getBlock (BlockStreamServiceGrpcProto .Block block , StreamObserver <BlockStreamServiceGrpcProto .Block > responseObserver ) {
159
+ private void getBlock (
160
+ BlockStreamServiceGrpcProto .Block block ,
161
+ StreamObserver <BlockStreamServiceGrpcProto .Block > responseObserver ) {
143
162
String message = "GET BLOCK RESPONSE! " ;
144
163
LOGGER .log (System .Logger .Level .INFO , "GetBlock request received" );
145
- Optional <BlockStreamServiceGrpcProto .Block > responseBlock = blockPersistenceHandler .read (block .getId ());
146
- if (responseBlock .isPresent ()) {
164
+ Optional <BlockStreamServiceGrpcProto .Block > responseBlock =
165
+ blockPersistenceHandler .read (block .getId ());
166
+ if (responseBlock .isPresent ()) {
147
167
LOGGER .log (System .Logger .Level .INFO , "SENDING BLOCK # " + block .getId ());
148
- complete (responseObserver , responseBlock .get ()); // TODO: Should return int and not quoted string
168
+ complete (
169
+ responseObserver ,
170
+ responseBlock .get ()); // TODO: Should return int and not quoted string
149
171
} else {
150
172
LOGGER .log (System .Logger .Level .INFO , "DID NOT FIND YOUR BLOCK" );
151
173
// TODO: Fix below. It could return gRPC equivalent of 404 NOT FOUND
152
- responseObserver .onError (Status . NOT_FOUND
153
- . withDescription ( "DID NOT FIND YOUR BLOCK" )
154
- . asRuntimeException ( )
155
- );
174
+ responseObserver .onError (
175
+ Status . NOT_FOUND
176
+ . withDescription ( "DID NOT FIND YOUR BLOCK" )
177
+ . asRuntimeException () );
156
178
// Keeping below comments for the fix needed above.
157
179
// complete(responseObserver, BlockStreamServiceGrpcProto.Block.getDefaultInstance());
158
180
}
159
181
}
160
182
}
161
-
162
-
0 commit comments