Skip to content

Commit e1c8b21

Browse files
wip: fixing tests
Signed-off-by: Matt Peterson <[email protected]>
1 parent 69423e4 commit e1c8b21

File tree

8 files changed

+230
-199
lines changed

8 files changed

+230
-199
lines changed

server/src/main/java/com/hedera/block/server/BlockStreamService.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,7 @@
1616

1717
package com.hedera.block.server;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
import static com.hedera.block.protos.BlockStreamService.PublishStreamRequest;
21-
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse;
22-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
23-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
19+
import static com.hedera.block.protos.BlockStreamService.*;
2420
import static com.hedera.block.server.Constants.*;
2521

2622
import com.google.protobuf.Descriptors;
@@ -47,7 +43,7 @@ public class BlockStreamService implements GrpcService {
4743
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4844

4945
private final long timeoutThresholdMillis;
50-
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
46+
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
5147

5248
/**
5349
* Constructor for the BlockStreamService class.
@@ -56,7 +52,7 @@ public class BlockStreamService implements GrpcService {
5652
*/
5753
public BlockStreamService(
5854
final long timeoutThresholdMillis,
59-
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator) {
55+
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator) {
6056
this.timeoutThresholdMillis = timeoutThresholdMillis;
6157
this.streamMediator = streamMediator;
6258
}

server/src/main/java/com/hedera/block/server/consumer/BlockItemEventHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,6 @@
1818

1919
import com.lmax.disruptor.EventHandler;
2020

21-
public interface BlockItemEventHandler<U> extends EventHandler<U> {
21+
public interface BlockItemEventHandler<T> extends EventHandler<T> {
2222
void awaitShutdown() throws InterruptedException;
2323
}

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

+23-22
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@
1616

1717
package com.hedera.block.server.consumer;
1818

19-
import static com.hedera.block.protos.BlockStreamService.*;
19+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
2021

2122
import com.hedera.block.server.data.ObjectEvent;
2223
import com.hedera.block.server.mediator.StreamMediator;
@@ -43,7 +44,7 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4344

4445
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
4546

46-
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
47+
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
4748

4849
/**
4950
* Constructor for the LiveStreamObserverImpl class.
@@ -53,11 +54,12 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
5354
public ConsumerBlockItemObserver(
5455
final long timeoutThresholdMillis,
5556
final InstantSource producerLivenessClock,
56-
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator,
57+
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator,
5758
final StreamObserver<SubscribeStreamResponse> subscribeStreamResponseObserver) {
5859

5960
this.timeoutThresholdMillis = timeoutThresholdMillis;
6061
this.producerLivenessClock = producerLivenessClock;
62+
this.streamMediator = streamMediator;
6163

6264
// The ServerCallStreamObserver can be configured with a Runnable to
6365
// be executed if a downstream consumer cancels the stream without
@@ -79,16 +81,15 @@ public ConsumerBlockItemObserver(
7981
}
8082

8183
this.subscribeStreamResponseObserver = subscribeStreamResponseObserver;
82-
this.producerLivenessMillis = producerLivenessClock.millis();
83-
84-
this.streamMediator = streamMediator;
8584
}
8685

8786
/** Pass the block to the observer provided by Helidon */
8887
@Override
8988
public void onEvent(final ObjectEvent<BlockItem> event, final long l, final boolean b) {
9089

91-
if (isThresholdExceeded(producerLivenessMillis)) {
90+
if (producerLivenessClock.millis() - producerLivenessMillis > timeoutThresholdMillis) {
91+
92+
// if (isThresholdExceeded(producerLivenessMillis)) {
9293
streamMediator.unsubscribe(this);
9394
} else {
9495

@@ -103,21 +104,21 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
103104
}
104105
}
105106

106-
private boolean isThresholdExceeded(long livenessMillis) {
107-
final long currentTimeMillis = Clock.systemDefaultZone().millis();
108-
final long elapsedMillis = currentTimeMillis - livenessMillis;
109-
if (elapsedMillis > timeoutThresholdMillis) {
110-
LOGGER.log(
111-
System.Logger.Level.INFO,
112-
"Elapsed milliseconds: "
113-
+ elapsedMillis
114-
+ ", timeout threshold: "
115-
+ timeoutThresholdMillis);
116-
return true;
117-
}
118-
119-
return false;
120-
}
107+
// private boolean isThresholdExceeded(long livenessMillis) {
108+
// final long currentTimeMillis = Clock.systemDefaultZone().millis();
109+
// final long elapsedMillis = currentTimeMillis - livenessMillis;
110+
// if (elapsedMillis > timeoutThresholdMillis) {
111+
// LOGGER.log(
112+
// System.Logger.Level.INFO,
113+
// "Elapsed milliseconds: "
114+
// + elapsedMillis
115+
// + ", timeout threshold: "
116+
// + timeoutThresholdMillis);
117+
// return true;
118+
// }
119+
//
120+
// return false;
121+
// }
121122

122123
@Override
123124
public void awaitShutdown() throws InterruptedException {

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
* live blocks to the subscribers as they arrive and persists the blocks to the block persistence
3838
* store.
3939
*/
40-
public class LiveStreamMediatorImpl
41-
implements StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> {
40+
public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockItem>, BlockItem> {
4241

4342
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4443

@@ -110,4 +109,9 @@ public void unsubscribe(final BlockItemEventHandler<ObjectEvent<BlockItem>> hand
110109
// Remove the gating sequence from the ring buffer
111110
ringBuffer.removeGatingSequence(batchEventProcessor.getSequence());
112111
}
112+
113+
@Override
114+
public boolean isSubscribed(BlockItemEventHandler<ObjectEvent<BlockItem>> handler) {
115+
return subscribers.containsKey(handler);
116+
}
113117
}

server/src/main/java/com/hedera/block/server/mediator/StreamMediator.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616

1717
package com.hedera.block.server.mediator;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
2119
import com.hedera.block.server.consumer.BlockItemEventHandler;
2220

2321
/**
@@ -29,14 +27,16 @@
2927
* type streamed TO the client. The type definition for the onNext() method provides the flexibility
3028
* for the StreamObserver and the Block types to vary independently.
3129
*
32-
* @param <U> The type of the block
33-
* @param <V> The type of the StreamObserver
30+
* @param <U> The type required by the RingBuffer implementation
31+
* @param <V> The type of the BlockItem
3432
*/
3533
public interface StreamMediator<U, V> {
3634

37-
void publishEvent(final BlockItem blockItem);
35+
void publishEvent(final V blockItem);
3836

3937
void subscribe(final BlockItemEventHandler<U> handler);
4038

4139
void unsubscribe(final BlockItemEventHandler<U> handler);
40+
41+
boolean isSubscribed(final BlockItemEventHandler<U> handler);
4242
}

server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,15 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
3838
private final System.Logger LOGGER = System.getLogger(getClass().getName());
3939

4040
private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
41-
private final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator;
41+
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
4242

4343
/**
4444
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
4545
* mediator with blocks as they arrive from the upstream producer. It also sends responses back
4646
* to the upstream producer via the responseStreamObserver.
4747
*/
4848
public ProducerBlockItemObserver(
49-
final StreamMediator<ObjectEvent<BlockItem>, SubscribeStreamRequest> streamMediator,
49+
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator,
5050
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
5151
this.streamMediator = streamMediator;
5252
this.publishStreamResponseObserver = publishStreamResponseObserver;

server/src/test/java/com/hedera/block/server/consumer/LiveStreamObserverImplTest.java

+64-62
Original file line numberDiff line numberDiff line change
@@ -16,64 +16,70 @@
1616

1717
package com.hedera.block.server.consumer;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamRequest;
21-
19+
import com.hedera.block.server.data.ObjectEvent;
2220
import com.hedera.block.server.mediator.StreamMediator;
2321
import io.grpc.stub.StreamObserver;
24-
import java.time.Instant;
25-
import java.time.InstantSource;
22+
import org.junit.jupiter.api.Test;
2623
import org.junit.jupiter.api.extension.ExtendWith;
2724
import org.mockito.Mock;
2825
import org.mockito.junit.jupiter.MockitoExtension;
2926

27+
import java.time.Instant;
28+
import java.time.InstantSource;
29+
30+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
31+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
32+
import static org.mockito.Mockito.*;
33+
3034
@ExtendWith(MockitoExtension.class)
3135
public class LiveStreamObserverImplTest {
3236

3337
private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
3438
private final long TEST_TIME = 1_719_427_664_950L;
3539

36-
@Mock private StreamMediator<BlockItem, SubscribeStreamRequest> streamMediator;
37-
38-
@Mock private StreamObserver<BlockItem> responseStreamObserver;
40+
@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
41+
42+
@Mock private StreamObserver<SubscribeStreamResponse> responseStreamObserver;
43+
44+
@Mock private ObjectEvent<BlockItem> objectEvent;
45+
46+
// @Test
47+
// public void testConsumerTimeoutWithinWindow() {
48+
// final var consumerBlockItemObserver =
49+
// new ConsumerBlockItemObserver(
50+
// TIMEOUT_THRESHOLD_MILLIS,
51+
// buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
52+
// streamMediator,
53+
// responseStreamObserver);
54+
//
55+
// final BlockItem blockItem = BlockItem.newBuilder().build();
56+
// when(objectEvent.get()).thenReturn(blockItem);
57+
//
58+
// final SubscribeStreamResponse subscribeStreamResponse =
59+
// SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
60+
//
61+
// consumerBlockItemObserver.onEvent(objectEvent, 0, true);
62+
//
63+
// // verify the observer is called with the next
64+
// // block and the stream mediator is not unsubscribed
65+
// verify(responseStreamObserver).onNext(subscribeStreamResponse);
66+
// verify(streamMediator, never()).unsubscribe(consumerBlockItemObserver);
67+
// }
68+
69+
@Test
70+
public void testConsumerTimeoutOutsideWindow() {
71+
72+
final var consumerBlockItemObserver =
73+
new ConsumerBlockItemObserver(
74+
TIMEOUT_THRESHOLD_MILLIS,
75+
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
76+
streamMediator,
77+
responseStreamObserver);
78+
79+
consumerBlockItemObserver.onEvent(objectEvent, 1, true);
80+
verify(streamMediator).unsubscribe(consumerBlockItemObserver);
81+
}
3982

40-
// @Test
41-
// public void testConsumerTimeoutWithinWindow() {
42-
// final LiveStreamObserver<BlockStreamServiceGrpcProto.Block,
43-
// BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
44-
// TIMEOUT_THRESHOLD_MILLIS,
45-
// buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
46-
// buildClockInsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
47-
// streamMediator,
48-
// responseStreamObserver);
49-
// BlockStreamServiceGrpcProto.Block newBlock =
50-
// BlockStreamServiceGrpcProto.Block.newBuilder().build();
51-
// liveStreamObserver.notify(newBlock);
52-
//
53-
// // verify the observer is called with the next
54-
// // block and the stream mediator is not unsubscribed
55-
// verify(responseStreamObserver).onNext(newBlock);
56-
// verify(streamMediator, never()).unsubscribe(liveStreamObserver);
57-
// }
58-
//
59-
// @Test
60-
// public void testConsumerTimeoutOutsideWindow() throws InterruptedException {
61-
//
62-
// final LiveStreamObserver<BlockStreamServiceGrpcProto.Block,
63-
// BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
64-
// TIMEOUT_THRESHOLD_MILLIS,
65-
// buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
66-
// buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
67-
// streamMediator,
68-
// responseStreamObserver);
69-
//
70-
// final BlockStreamServiceGrpcProto.Block newBlock =
71-
// BlockStreamServiceGrpcProto.Block.newBuilder().build();
72-
// when(streamMediator.isSubscribed(liveStreamObserver)).thenReturn(true);
73-
// liveStreamObserver.notify(newBlock);
74-
// verify(streamMediator).unsubscribe(liveStreamObserver);
75-
// }
76-
//
7783
// @Test
7884
// public void testProducerTimeoutWithinWindow() {
7985
// final LiveStreamObserver<BlockStreamServiceGrpcProto.Block,
@@ -92,23 +98,19 @@ public class LiveStreamObserverImplTest {
9298
// verify(streamMediator, never()).unsubscribe(liveStreamObserver);
9399
// }
94100
//
95-
// @Test
96-
// public void testProducerTimeoutOutsideWindow() throws InterruptedException {
97-
// final LiveStreamObserver<BlockStreamServiceGrpcProto.Block,
98-
// BlockStreamServiceGrpcProto.BlockResponse> liveStreamObserver = new LiveStreamObserverImpl(
99-
// TIMEOUT_THRESHOLD_MILLIS,
100-
// buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
101-
// buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
102-
// streamMediator,
103-
// responseStreamObserver);
104-
//
105-
// Thread.sleep(51);
106-
// BlockStreamServiceGrpcProto.BlockResponse blockResponse =
107-
// BlockStreamServiceGrpcProto.BlockResponse.newBuilder().build();
108-
// liveStreamObserver.onNext(blockResponse);
109-
//
110-
// verify(streamMediator).unsubscribe(liveStreamObserver);
111-
// }
101+
@Test
102+
public void testProducerTimeoutOutsideWindow() throws InterruptedException {
103+
final var consumerBlockItemObserver = new ConsumerBlockItemObserver(
104+
TIMEOUT_THRESHOLD_MILLIS,
105+
buildClockOutsideWindow(TEST_TIME, TIMEOUT_THRESHOLD_MILLIS),
106+
streamMediator,
107+
responseStreamObserver);
108+
109+
Thread.sleep(51);
110+
111+
consumerBlockItemObserver.onEvent(objectEvent, 0, true);
112+
verify(streamMediator).unsubscribe(consumerBlockItemObserver);
113+
}
112114

113115
private static InstantSource buildClockInsideWindow(
114116
long testTime, long timeoutThresholdMillis) {

0 commit comments

Comments
 (0)