16
16
17
17
package com .hedera .block .server .consumer ;
18
18
19
+ import static com .hedera .block .protos .BlockStreamService .BlockItem ;
20
+ import static com .hedera .block .protos .BlockStreamService .SubscribeStreamResponse ;
21
+ import static org .mockito .Mockito .*;
22
+
19
23
import com .hedera .block .server .data .ObjectEvent ;
20
24
import com .hedera .block .server .mediator .StreamMediator ;
21
25
import io .grpc .stub .StreamObserver ;
26
+ import java .time .Instant ;
27
+ import java .time .InstantSource ;
28
+ import org .junit .jupiter .api .Disabled ;
22
29
import org .junit .jupiter .api .Test ;
23
30
import org .junit .jupiter .api .extension .ExtendWith ;
24
31
import org .mockito .Mock ;
25
32
import org .mockito .junit .jupiter .MockitoExtension ;
26
33
27
- import java .time .Instant ;
28
- import java .time .InstantSource ;
29
-
30
- import static com .hedera .block .protos .BlockStreamService .BlockItem ;
31
- import static com .hedera .block .protos .BlockStreamService .SubscribeStreamResponse ;
32
- import static org .mockito .Mockito .*;
33
-
34
34
@ ExtendWith (MockitoExtension .class )
35
35
public class LiveStreamObserverImplTest {
36
36
@@ -43,74 +43,74 @@ public class LiveStreamObserverImplTest {
43
43
44
44
@ Mock private ObjectEvent <BlockItem > objectEvent ;
45
45
46
- // @Test
47
- // public void testConsumerTimeoutWithinWindow() {
48
- // final var consumerBlockItemObserver =
49
- // new ConsumerBlockItemObserver(
50
- // TIMEOUT_THRESHOLD_MILLIS,
51
- // buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
52
- // streamMediator,
53
- // responseStreamObserver);
54
- //
55
- // final BlockItem blockItem = BlockItem.newBuilder().build();
56
- // when(objectEvent.get()).thenReturn(blockItem);
57
- //
58
- // final SubscribeStreamResponse subscribeStreamResponse =
59
- // SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
60
- //
61
- // consumerBlockItemObserver.onEvent(objectEvent, 0, true);
62
- //
63
- // // verify the observer is called with the next
64
- // // block and the stream mediator is not unsubscribed
65
- // verify(responseStreamObserver).onNext(subscribeStreamResponse);
66
- // verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver);
67
- // }
46
+ @ Test
47
+ @ Disabled
48
+ public void testConsumerTimeoutWithinWindow () {
49
+ final var consumerBlockItemObserver =
50
+ new ConsumerBlockItemObserver (
51
+ TIMEOUT_THRESHOLD_MILLIS ,
52
+ buildClockInsideWindow (TEST_TIME , TIMEOUT_THRESHOLD_MILLIS ),
53
+ streamMediator ,
54
+ responseStreamObserver );
55
+
56
+ final BlockItem blockItem = BlockItem .newBuilder ().build ();
57
+ when (objectEvent .get ()).thenReturn (blockItem );
58
+
59
+ final SubscribeStreamResponse subscribeStreamResponse =
60
+ SubscribeStreamResponse .newBuilder ().setBlockItem (blockItem ).build ();
61
+
62
+ consumerBlockItemObserver .onEvent (objectEvent , 0 , true );
63
+
64
+ // verify the observer is called with the next
65
+ // block and the stream mediator is not unsubscribed
66
+ verify (responseStreamObserver ).onNext (subscribeStreamResponse );
67
+ verify (streamMediator , never ()).unsubscribe (consumerBlockItemObserver );
68
+ }
68
69
69
70
@ Test
70
71
public void testConsumerTimeoutOutsideWindow () {
71
72
72
73
final var consumerBlockItemObserver =
73
- new ConsumerBlockItemObserver (
74
- TIMEOUT_THRESHOLD_MILLIS ,
75
- buildClockOutsideWindow (TEST_TIME , TIMEOUT_THRESHOLD_MILLIS ),
76
- streamMediator ,
77
- responseStreamObserver );
74
+ new ConsumerBlockItemObserver (
75
+ TIMEOUT_THRESHOLD_MILLIS ,
76
+ buildClockOutsideWindow (TEST_TIME , TIMEOUT_THRESHOLD_MILLIS ),
77
+ streamMediator ,
78
+ responseStreamObserver );
78
79
79
80
consumerBlockItemObserver .onEvent (objectEvent , 1 , true );
80
81
verify (streamMediator ).unsubscribe (consumerBlockItemObserver );
81
82
}
82
83
83
- // @Test
84
- // public void testProducerTimeoutWithinWindow() {
85
- // final LiveStreamObserver<BlockStreamServiceGrpcProto.Block,
86
- // BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
87
- // TIMEOUT_THRESHOLD_MILLIS,
88
- // buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
89
- // buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
90
- // streamMediator,
91
- // responseStreamObserver);
92
- //
93
- // BlockStreamServiceGrpcProto.BlockResponse blockResponse =
94
- // BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
95
- // liveStreamObserver.onNext(blockResponse);
96
- //
97
- // // verify the mediator is NOT called to unsubscribe the observer
98
- // verify(streamMediator, never()).unsubscribe(liveStreamObserver);
99
- // }
100
- //
101
- @ Test
102
- public void testProducerTimeoutOutsideWindow () throws InterruptedException {
103
- final var consumerBlockItemObserver = new ConsumerBlockItemObserver (
104
- TIMEOUT_THRESHOLD_MILLIS ,
105
- buildClockOutsideWindow (TEST_TIME , TIMEOUT_THRESHOLD_MILLIS ),
106
- streamMediator ,
107
- responseStreamObserver );
108
-
109
- Thread .sleep (51 );
110
-
111
- consumerBlockItemObserver .onEvent (objectEvent , 0 , true );
112
- verify (streamMediator ).unsubscribe (consumerBlockItemObserver );
113
- }
84
+ @ Test
85
+ @ Disabled
86
+ public void testProducerTimeoutWithinWindow () {
87
+ final var consumerBlockItemObserver =
88
+ new ConsumerBlockItemObserver (
89
+ TIMEOUT_THRESHOLD_MILLIS ,
90
+ buildClockOutsideWindow (TEST_TIME , TIMEOUT_THRESHOLD_MILLIS ),
91
+ streamMediator ,
92
+ responseStreamObserver );
93
+
94
+ consumerBlockItemObserver .onEvent (objectEvent , 0 , true );
95
+
96
+ // verify the mediator is NOT called to unsubscribe the observer
97
+ verify (streamMediator , never ()).unsubscribe (consumerBlockItemObserver );
98
+ }
99
+
100
+ @ Test
101
+ public void testProducerTimeoutOutsideWindow () throws InterruptedException {
102
+ final var consumerBlockItemObserver =
103
+ new ConsumerBlockItemObserver (
104
+ TIMEOUT_THRESHOLD_MILLIS ,
105
+ buildClockOutsideWindow (TEST_TIME , TIMEOUT_THRESHOLD_MILLIS ),
106
+ streamMediator ,
107
+ responseStreamObserver );
108
+
109
+ Thread .sleep (51 );
110
+
111
+ consumerBlockItemObserver .onEvent (objectEvent , 0 , true );
112
+ verify (streamMediator ).unsubscribe (consumerBlockItemObserver );
113
+ }
114
114
115
115
private static InstantSource buildClockInsideWindow (
116
116
long testTime , long timeoutThresholdMillis ) {
0 commit comments