Skip to content

Commit 6988dde

Browse files
fix: injected map to make it more testable
Signed-off-by: Matt Peterson <[email protected]>
1 parent 7916f42 commit 6988dde

File tree

4 files changed

+310
-28
lines changed

4 files changed

+310
-28
lines changed

server/src/main/java/com/hedera/block/server/consumer/ConsumerBlockItemObserver.java

+4-7
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616

1717
package com.hedera.block.server.consumer;
1818

19-
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20-
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
21-
2219
import com.hedera.block.server.data.ObjectEvent;
2320
import com.hedera.block.server.mediator.StreamMediator;
2421
import io.grpc.stub.ServerCallStreamObserver;
2522
import io.grpc.stub.StreamObserver;
23+
2624
import java.time.InstantSource;
27-
import java.util.concurrent.CountDownLatch;
25+
26+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
27+
import static com.hedera.block.protos.BlockStreamService.SubscribeStreamResponse;
2828

2929
/**
3030
* The LiveStreamObserverImpl class implements the LiveStreamObserver interface to pass blocks to
@@ -41,8 +41,6 @@ public class ConsumerBlockItemObserver implements BlockItemEventHandler<ObjectEv
4141
private final InstantSource producerLivenessClock;
4242
private long producerLivenessMillis;
4343

44-
private final CountDownLatch shutdownLatch = new CountDownLatch(1);
45-
4644
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
4745

4846
private boolean streamStarted;
@@ -118,6 +116,5 @@ public void onEvent(final ObjectEvent<BlockItem> event, final long l, final bool
118116

119117
@Override
120118
public void awaitShutdown() throws InterruptedException {
121-
shutdownLatch.await();
122119
}
123120
}

server/src/main/java/com/hedera/block/server/mediator/LiveStreamMediatorImpl.java

+13-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockI
5050
private final Map<
5151
BlockItemEventHandler<ObjectEvent<BlockItem>>,
5252
BatchEventProcessor<ObjectEvent<BlockItem>>>
53-
subscribers = new HashMap<>();
53+
subscribers;
5454

5555
private final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler;
5656
private final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback;
@@ -61,8 +61,13 @@ public class LiveStreamMediatorImpl implements StreamMediator<ObjectEvent<BlockI
6161
* @param blockPersistenceHandler the block persistence handler
6262
*/
6363
public LiveStreamMediatorImpl(
64+
final Map<
65+
BlockItemEventHandler<ObjectEvent<BlockItem>>,
66+
BatchEventProcessor<ObjectEvent<BlockItem>>>
67+
subscribers,
6468
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
6569
final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback) {
70+
this.subscribers = subscribers;
6671
this.blockPersistenceHandler = blockPersistenceHandler;
6772

6873
// Initialize and start the disruptor
@@ -73,6 +78,12 @@ public LiveStreamMediatorImpl(
7378
this.shutdownCallback = shutdownCallback;
7479
}
7580

81+
public LiveStreamMediatorImpl(
82+
final BlockPersistenceHandler<BlockItem, Block> blockPersistenceHandler,
83+
final Consumer<StreamMediator<ObjectEvent<BlockItem>, BlockItem>> shutdownCallback) {
84+
this(new HashMap<>(), blockPersistenceHandler, shutdownCallback);
85+
}
86+
7687
@Override
7788
public void publishEvent(BlockItem blockItem) {
7889

@@ -101,6 +112,7 @@ public void subscribe(final BlockItemEventHandler<ObjectEvent<BlockItem>> handle
101112
ringBuffer.addGatingSequences(batchEventProcessor.getSequence());
102113
executor.execute(batchEventProcessor);
103114

115+
// Keep track of the subscriber
104116
subscribers.put(handler, batchEventProcessor);
105117
}
106118

0 commit comments

Comments
 (0)