Skip to content

Commit 0889c17

Browse files
fixed junit tests
Signed-off-by: Matt Peterson <[email protected]>
1 parent 0ed0c9a commit 0889c17

File tree

4 files changed

+105
-109
lines changed

4 files changed

+105
-109
lines changed

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

+9-24
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public ConsumerBlockItemObserver(
5757
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
5858

5959
this.timeoutThresholdMillis = timeoutThresholdMillis;
60-
this.producerLivenessClock = producerLivenessClock;
6160
this.streamMediator = streamMediator;
6261

6362
// The ServerCallStreamObserver can be configured with a Runnable to
@@ -80,45 +79,31 @@ public ConsumerBlockItemObserver(
8079
}
8180

8281
this.subscribeStreamResponseObserver = subscribeStreamResponseObserver;
82+
this.producerLivenessClock = producerLivenessClock;
83+
this.producerLivenessMillis = producerLivenessClock.millis();
8384
}
8485

8586
/** Pass the block to the observer provided by Helidon */
8687
@Override
8788
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) {
8889

89-
if (producerLivenessClock.millis() - producerLivenessMillis > timeoutThresholdMillis) {
90-
91-
// if (isThresholdExceeded(producerLivenessMillis)) {
90+
final long currentMillis = producerLivenessClock.millis();
91+
if (currentMillis - producerLivenessMillis > timeoutThresholdMillis) {
9292
streamMediator.unsubscribe(this);
93+
LOGGER.log(System.Logger.Level.DEBUG, "Unsubscribed handler");
9394
} else {
9495

95-
// Refresh the producer liveness and pass the block to the observer.
96-
producerLivenessMillis = producerLivenessClock.millis();
96+
// Refresh the producer liveness and pass the BlockItem to the downstream observer.
97+
producerLivenessMillis = currentMillis;
9798

98-
final BlockItem blockItem = event.get();
99+
// Construct a response
99100
final SubscribeStreamResponse subscribeStreamResponse =
100-
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
101+
SubscribeStreamResponse.newBuilder().setBlockItem(event.get()).build();
101102

102103
subscribeStreamResponseObserver.onNext(subscribeStreamResponse);
103104
}
104105
}
105106

106-
// private boolean isThresholdExceeded(long livenessMillis) {
107-
// final long currentTimeMillis = Clock.systemDefaultZone().millis();
108-
// final long elapsedMillis = currentTimeMillis - livenessMillis;
109-
// if (elapsedMillis > timeoutThresholdMillis) {
110-
// LOGGER.log(
111-
// System.Logger.Level.INFO,
112-
// "Elapsed milliseconds: "
113-
// + elapsedMillis
114-
// + ", timeout threshold: "
115-
// + timeoutThresholdMillis);
116-
// return true;
117-
// }
118-
//
119-
// return false;
120-
// }
121-
122107
@Override
123108
public void awaitShutdown() throws InterruptedException {
124109
shutdownLatch.await();

server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java renamed to server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java

+5-71
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,20 @@
1818

1919
import static com.hedera.block.protos.BlockStreamService.BlockItem;
2020
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
21+
import static com.hedera.block.server.util.TestClock.buildClockInsideWindow;
22+
import static com.hedera.block.server.util.TestClock.buildClockOutsideWindow;
2123
import static org.mockito.Mockito.*;
2224

2325
import com.hedera.block.server.data.ObjectEvent;
2426
import com.hedera.block.server.mediator.StreamMediator;
2527
import io.grpc.stub.StreamObserver;
26-
import java.time.Instant;
27-
import java.time.InstantSource;
28-
import org.junit.jupiter.api.Disabled;
2928
import org.junit.jupiter.api.Test;
3029
import org.junit.jupiter.api.extension.ExtendWith;
3130
import org.mockito.Mock;
3231
import org.mockito.junit.jupiter.MockitoExtension;
3332

3433
@ExtendWith(MockitoExtension.class)
35-
public class LiveStreamObserverImplTest {
34+
public class ConsumerBlockItemObserverTest {
3635

3736
private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
3837
private final long TEST_TIME = 1_719_427_664_950L;
@@ -44,8 +43,7 @@ public class LiveStreamObserverImplTest {
4443
@Mock private ObjectEvent<BlockItem> objectEvent;
4544

4645
@Test
47-
@Disabled
48-
public void testConsumerTimeoutWithinWindow() {
46+
public void testProducerTimeoutWithinWindow() {
4947
final var consumerBlockItemObserver =
5048
new ConsumerBlockItemObserver(
5149
TIMEOUT_THRESHOLD_MILLIS,
@@ -61,37 +59,8 @@ public void testConsumerTimeoutWithinWindow() {
6159

6260
consumerBlockItemObserver.onEvent(objectEvent, 0, true);
6361

64-
// verify the observer is called with the next
65-
// block and the stream mediator is not unsubscribed
62+
// verify the observer is called with the next BlockItem
6663
verify(responseStreamObserver).onNext(subscribeStreamResponse);
67-
verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver);
68-
}
69-
70-
@Test
71-
public void testConsumerTimeoutOutsideWindow() {
72-
73-
final var consumerBlockItemObserver =
74-
new ConsumerBlockItemObserver(
75-
TIMEOUT_THRESHOLD_MILLIS,
76-
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
77-
streamMediator,
78-
responseStreamObserver);
79-
80-
consumerBlockItemObserver.onEvent(objectEvent, 1, true);
81-
verify(streamMediator).unsubscribe(consumerBlockItemObserver);
82-
}
83-
84-
@Test
85-
@Disabled
86-
public void testProducerTimeoutWithinWindow() {
87-
final var consumerBlockItemObserver =
88-
new ConsumerBlockItemObserver(
89-
TIMEOUT_THRESHOLD_MILLIS,
90-
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
91-
streamMediator,
92-
responseStreamObserver);
93-
94-
consumerBlockItemObserver.onEvent(objectEvent, 0, true);
9564

9665
// verify the mediator is NOT called to unsubscribe the observer
9766
verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver);
@@ -111,39 +80,4 @@ public void testProducerTimeoutOutsideWindow() throws InterruptedException {
11180
consumerBlockItemObserver.onEvent(objectEvent, 0, true);
11281
verify(streamMediator).unsubscribe(consumerBlockItemObserver);
11382
}
114-
115-
private static InstantSource buildClockInsideWindow(
116-
long testTime, long timeoutThresholdMillis) {
117-
return new TestClock(testTime, testTime + timeoutThresholdMillis - 1);
118-
}
119-
120-
private static InstantSource buildClockOutsideWindow(
121-
long testTime, long timeoutThresholdMillis) {
122-
return new TestClock(testTime, testTime + timeoutThresholdMillis + 1);
123-
}
124-
125-
static class TestClock implements InstantSource {
126-
127-
private int index;
128-
private final Long[] millis;
129-
130-
TestClock(Long... millis) {
131-
this.millis = millis;
132-
}
133-
134-
@Override
135-
public long millis() {
136-
long value = millis[index];
137-
138-
// cycle through the provided millis
139-
// and wrap around if necessary
140-
index = index > millis.length - 1 ? 0 : index + 1;
141-
return value;
142-
}
143-
144-
@Override
145-
public Instant instant() {
146-
return null;
147-
}
148-
}
14983
}

server/src/test/java/com/hedera/block/server/mediator/LiveStreamMediatorImplTest.java

+30-14
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19-
import static com.hedera.block.protos.BlockStreamService.Block;
20-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
21-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
19+
import static com.hedera.block.protos.BlockStreamService.*;
20+
import static com.hedera.block.server.util.TestClock.buildClockInsideWindow;
2221
import static org.junit.jupiter.api.Assertions.assertFalse;
2322
import static org.junit.jupiter.api.Assertions.assertTrue;
2423
import static org.mockito.Mockito.times;
@@ -30,8 +29,6 @@
3029
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
3130
import com.hedera.block.server.persistence.storage.BlockStorage;
3231
import io.grpc.stub.StreamObserver;
33-
import java.time.Clock;
34-
import org.junit.jupiter.api.Disabled;
3532
import org.junit.jupiter.api.Test;
3633
import org.junit.jupiter.api.extension.ExtendWith;
3734
import org.mockito.Mock;
@@ -48,7 +45,9 @@ public class LiveStreamMediatorImplTest {
4845

4946
@Mock private BlockStorage<Block, BlockItem> blockStorage;
5047

51-
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver;
48+
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver1;
49+
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver2;
50+
@Mock private StreamObserver<SubscribeStreamResponse> streamObserver3;
5251

5352
@Test
5453
public void testUnsubscribeEach() {
@@ -104,24 +103,33 @@ public void testMediatorPersistenceWithoutSubscribers() {
104103
}
105104

106105
@Test
107-
@Disabled
108-
public void testMediatorPublishEventToSubscribers() {
106+
public void testMediatorPublishEventToSubscribers() throws InterruptedException {
109107

110-
final long TEST_TIMEOUT = 10000;
108+
final long TIMEOUT_THRESHOLD_MILLIS = 100L;
109+
final long TEST_TIME = 1_719_427_664_950L;
111110
final var streamMediator =
112111
new LiveStreamMediatorImpl(new WriteThroughCacheHandler(blockStorage));
113112

114113
final var concreteObserver1 =
115114
new ConsumerBlockItemObserver(
116-
TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver);
115+
TIMEOUT_THRESHOLD_MILLIS,
116+
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
117+
streamMediator,
118+
streamObserver1);
117119

118120
final var concreteObserver2 =
119121
new ConsumerBlockItemObserver(
120-
TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver);
122+
TIMEOUT_THRESHOLD_MILLIS,
123+
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
124+
streamMediator,
125+
streamObserver2);
121126

122127
final var concreteObserver3 =
123128
new ConsumerBlockItemObserver(
124-
TEST_TIMEOUT, Clock.systemDefaultZone(), streamMediator, streamObserver);
129+
TIMEOUT_THRESHOLD_MILLIS,
130+
buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
131+
streamMediator,
132+
streamObserver3);
125133

126134
// Set up the subscribers
127135
streamMediator.subscribe(concreteObserver1);
@@ -139,13 +147,21 @@ public void testMediatorPublishEventToSubscribers() {
139147
"Expected the mediator to have observer3 subscribed");
140148

141149
final BlockItem blockItem = BlockItem.newBuilder().build();
150+
final SubscribeStreamResponse subscribeStreamResponse =
151+
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
142152

143153
// Acting as a producer, notify the mediator of a new block
144154
streamMediator.publishEvent(blockItem);
145155

156+
// TODO: Is there a better way?
157+
synchronized (streamObserver1) {
158+
streamObserver1.wait(2000);
159+
}
160+
146161
// Confirm each subscriber was notified of the new block
147-
verify(streamObserver, times(3))
148-
.onNext(SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build());
162+
verify(streamObserver1, times(1)).onNext(subscribeStreamResponse);
163+
verify(streamObserver2, times(1)).onNext(subscribeStreamResponse);
164+
verify(streamObserver3, times(1)).onNext(subscribeStreamResponse);
149165

150166
// Confirm the BlockStorage write method was
151167
// called despite the absence of subscribers
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.util;
18+
19+
import java.time.Instant;
20+
import java.time.InstantSource;
21+
22+
public class TestClock implements InstantSource {
23+
24+
private int index;
25+
private final Long[] millis;
26+
private final System.Logger LOGGER = System.getLogger(getClass().getName());
27+
28+
public TestClock(Long... millis) {
29+
if (millis.length == 0) {
30+
throw new IllegalArgumentException("At least 1 argument is required");
31+
}
32+
33+
this.millis = millis;
34+
}
35+
36+
@Override
37+
public long millis() {
38+
LOGGER.log(System.Logger.Level.INFO, "millis() called");
39+
long value = millis[index];
40+
41+
// cycle through the provided millis
42+
// and wrap around if necessary
43+
int temp = index + 1;
44+
index = (temp % millis.length == 0) ? 0 : temp;
45+
return value;
46+
}
47+
48+
@Override
49+
public Instant instant() {
50+
return null;
51+
}
52+
53+
public static InstantSource buildClockInsideWindow(long testTime, long timeoutThresholdMillis) {
54+
return new TestClock(testTime, testTime + timeoutThresholdMillis - 1);
55+
}
56+
57+
public static InstantSource buildClockOutsideWindow(
58+
long testTime, long timeoutThresholdMillis) {
59+
return new TestClock(testTime, testTime + timeoutThresholdMillis + 1);
60+
}
61+
}

0 commit comments

Comments
 (0)