16
16
17
17
package com .hedera .block .server .consumer ;
18
18
19
+ import static com .hedera .block .protos .BlockStreamService .BlockItem ;
20
+ import static com .hedera .block .protos .BlockStreamService .SubscribeStreamRequest ;
21
+
19
22
import com .hedera .block .server .data .ObjectEvent ;
20
23
import com .hedera .block .server .mediator .StreamMediator ;
21
24
import io .grpc .stub .ServerCallStreamObserver ;
22
25
import io .grpc .stub .StreamObserver ;
23
-
24
26
import java .time .Clock ;
25
27
import java .time .InstantSource ;
26
28
import java .util .concurrent .CountDownLatch ;
27
29
28
- import static com .hedera .block .protos .BlockStreamService .BlockItem ;
29
- import static com .hedera .block .protos .BlockStreamService .SubscribeStreamRequest ;
30
-
31
30
/**
32
- * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to the downstream consumer
33
- * via the notify method and manage the bidirectional stream to the consumer via the onNext, onError, and onCompleted methods.
31
+ * The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
32
+ * the downstream consumer via the notify method and manage the bidirectional stream to the consumer
33
+ * via the onNext, onError, and onCompleted methods.
34
34
*/
35
- public class ConsumerBlockItemObserver implements BlockItemEventHandler <ObjectEvent <BlockItem >, SubscribeStreamRequest > {
35
+ public class ConsumerBlockItemObserver
36
+ implements BlockItemEventHandler <ObjectEvent <BlockItem >, SubscribeStreamRequest > {
36
37
37
38
private final System .Logger LOGGER = System .getLogger (getClass ().getName ());
38
39
@@ -68,13 +69,17 @@ public ConsumerBlockItemObserver(
68
69
// this observer to avoid orphaning subscribed resources.
69
70
if (subscribeStreamResponseObserver instanceof ServerCallStreamObserver ) {
70
71
71
- // Unfortunately we have to cast the responseStreamObserver to a ServerCallStreamObserver
72
+ // Unfortunately we have to cast the responseStreamObserver to a
73
+ // ServerCallStreamObserver
72
74
// to register the onCancelHandler.
73
- ((ServerCallStreamObserver <BlockItem >)subscribeStreamResponseObserver )
74
- .setOnCancelHandler (() -> {
75
- LOGGER .log (System .Logger .Level .DEBUG , "Consumer cancelled stream. Unsubscribing observer." );
76
- streamMediator .unsubscribe (this );
77
- });
75
+ ((ServerCallStreamObserver <BlockItem >) subscribeStreamResponseObserver )
76
+ .setOnCancelHandler (
77
+ () -> {
78
+ LOGGER .log (
79
+ System .Logger .Level .DEBUG ,
80
+ "Consumer cancelled stream. Unsubscribing observer." );
81
+ streamMediator .unsubscribe (this );
82
+ });
78
83
}
79
84
80
85
this .subscribeStreamResponseObserver = subscribeStreamResponseObserver ;
@@ -83,12 +88,10 @@ public ConsumerBlockItemObserver(
83
88
this .streamMediator = streamMediator ;
84
89
}
85
90
86
- /**
87
- * Pass the block to the observer provided by Helidon
88
- *
89
- */
91
+ /** Pass the block to the observer provided by Helidon */
90
92
@ Override
91
- public void onEvent (final ObjectEvent <BlockItem > event , final long l , final boolean b ) throws Exception {
93
+ public void onEvent (final ObjectEvent <BlockItem > event , final long l , final boolean b )
94
+ throws Exception {
92
95
93
96
// Refresh the producer liveness and pass the block to the observer.
94
97
producerLivenessMillis = producerLivenessClock .millis ();
@@ -104,33 +107,39 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
104
107
}
105
108
106
109
/**
107
- * The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the bidirectional stream.
108
- *
110
+ * The onNext() method is triggered by Helidon when a consumer sends a blockResponse via the
111
+ * bidirectional stream.
109
112
*/
110
113
@ Override
111
114
public void onNext (final SubscribeStreamRequest subscribeStreamRequest ) {
112
115
113
116
// Check if the producer has timed out. If so, unsubscribe the observer from the mediator.
114
117
if (isThresholdExceeded (producerLivenessMillis )) {
115
- LOGGER .log (System .Logger .Level .DEBUG , "Producer timeout threshold exceeded. Unsubscribing observer." );
118
+ LOGGER .log (
119
+ System .Logger .Level .DEBUG ,
120
+ "Producer timeout threshold exceeded. Unsubscribing observer." );
116
121
streamMediator .unsubscribe (this );
117
122
}
118
123
}
119
124
120
125
/**
121
- * The onError() method is triggered by Helidon when an error occurs on the bidirectional stream to the downstream consumer.
122
- * Unsubscribe the observer from the mediator.
126
+ * The onError() method is triggered by Helidon when an error occurs on the bidirectional stream
127
+ * to the downstream consumer. Unsubscribe the observer from the mediator.
123
128
*
124
129
* @param t the error occurred on the stream
125
130
*/
126
131
@ Override
127
132
public void onError (final Throwable t ) {
128
- LOGGER .log (System .Logger .Level .ERROR , "Unexpected consumer stream communication failure: %s" .formatted (t ), t );
133
+ LOGGER .log (
134
+ System .Logger .Level .ERROR ,
135
+ "Unexpected consumer stream communication failure: %s" .formatted (t ),
136
+ t );
129
137
}
130
138
131
139
/**
132
- * The onCompleted() method is triggered by Helidon when the bidirectional stream to the downstream consumer is completed.
133
- * This implementation will then unsubscribe the observer from the mediator.
140
+ * The onCompleted() method is triggered by Helidon when the bidirectional stream to the
141
+ * downstream consumer is completed. This implementation will then unsubscribe the observer from
142
+ * the mediator.
134
143
*/
135
144
@ Override
136
145
public void onCompleted () {
@@ -142,7 +151,12 @@ private boolean isThresholdExceeded(long livenessMillis) {
142
151
final long currentTimeMillis = Clock .systemDefaultZone ().millis ();
143
152
final long elapsedMillis = currentTimeMillis - livenessMillis ;
144
153
if (elapsedMillis > timeoutThresholdMillis ) {
145
- LOGGER .log (System .Logger .Level .INFO , "Elapsed milliseconds: " + elapsedMillis + ", timeout threshold: " + timeoutThresholdMillis );
154
+ LOGGER .log (
155
+ System .Logger .Level .INFO ,
156
+ "Elapsed milliseconds: "
157
+ + elapsedMillis
158
+ + ", timeout threshold: "
159
+ + timeoutThresholdMillis );
146
160
return true ;
147
161
}
148
162
0 commit comments