@@ -52,8 +52,8 @@ public class BlockStreamService implements GrpcService {
52
52
* @param timeoutThresholdMillis the timeout threshold in milliseconds
53
53
* @param streamMediator the stream mediator
54
54
*/
55
- public BlockStreamService (long timeoutThresholdMillis ,
56
- StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamMediator ) {
55
+ public BlockStreamService (final long timeoutThresholdMillis ,
56
+ final StreamMediator <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamMediator ) {
57
57
58
58
this .timeoutThresholdMillis = timeoutThresholdMillis ;
59
59
this .streamMediator = streamMediator ;
@@ -85,7 +85,7 @@ public String serviceName() {
85
85
* @param routing the routing for the BlockStreamService
86
86
*/
87
87
@ Override
88
- public void update (Routing routing ) {
88
+ public void update (final Routing routing ) {
89
89
routing .bidi (CLIENT_STREAMING_METHOD_NAME , this ::streamSink );
90
90
routing .bidi (SERVER_STREAMING_METHOD_NAME , this ::streamSource );
91
91
}
@@ -98,7 +98,8 @@ public void update(Routing routing) {
98
98
* @return a custom StreamObserver to handle streaming blocks from the producer to all subscribed consumers
99
99
* via the streamMediator as well as sending responses back to the producer.
100
100
*/
101
- private StreamObserver <BlockStreamServiceGrpcProto .Block > streamSink (StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > responseStreamObserver ) {
101
+ private StreamObserver <BlockStreamServiceGrpcProto .Block > streamSink (
102
+ final StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > responseStreamObserver ) {
102
103
LOGGER .finer ("Executing bidirectional streamSink method" );
103
104
104
105
return new ProducerBlockStreamObserver (streamMediator , responseStreamObserver );
@@ -113,14 +114,16 @@ private StreamObserver<BlockStreamServiceGrpcProto.Block> streamSink(StreamObser
113
114
* @return a custom StreamObserver to handle streaming blocks from the producer to the consumer as well as
114
115
* handling responses from the consumer.
115
116
*/
116
- private StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > streamSource (StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
117
+ private StreamObserver <BlockStreamServiceGrpcProto .BlockResponse > streamSource (final StreamObserver <BlockStreamServiceGrpcProto .Block > responseStreamObserver ) {
117
118
LOGGER .finer ("Executing bidirectional streamSource method" );
118
119
119
120
// Return a custom StreamObserver to handle streaming blocks from the producer.
120
- LiveStreamObserver <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamObserver = new LiveStreamObserverImpl (
121
+ final LiveStreamObserver <BlockStreamServiceGrpcProto .Block , BlockStreamServiceGrpcProto .BlockResponse > streamObserver = new LiveStreamObserverImpl (
121
122
timeoutThresholdMillis ,
122
123
streamMediator ,
123
124
responseStreamObserver );
125
+
126
+ // Subscribe the observer to the mediator
124
127
streamMediator .subscribe (streamObserver );
125
128
126
129
return streamObserver ;
0 commit comments