Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -537,19 +537,27 @@ public long handleFailedPersistence() {
* the caller whether the handler should shut down and/or reset its current
* block action and current streaming block number.
*/
private record BatchHandleResult(boolean shouldShutdown, boolean shouldReset) {}
private record BatchHandleResult(boolean shouldShutdown, boolean shouldReset, boolean shouldCloseStream) {}

/**
* This method handles the result of a block action handle.
*
* @param handleResult the result to handle
*/
private void handleBlockActionResult(final BatchHandleResult handleResult) {
if (handleResult.shouldReset()) {
resetState();
}
// close connection and clean up or handle partial clean-ups
if (handleResult.shouldShutdown()) {
shutdown();
} else {
// close stream leaving connection open for subsequent publishers calls
if (handleResult.shouldCloseStream()) {
replies.onComplete();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that is the only thing we need to do to close the stream. I remember there were some changes recently that allow us to actually close the stream completely.

}

// clean up handler state
if (handleResult.shouldReset()) {
resetState();
}
Comment on lines +551 to +560
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if these should be in an else. All these booleans should probably run if they are all true.

}
}

Expand All @@ -564,7 +572,7 @@ private BatchHandleResult handleAccept(
blockItemsQueue.put(itemSetUnparsed);
final int itemsReceived = blockItems.size();
metrics.liveBlockItemsReceived.add(itemsReceived); // @todo(1415) add label
return new BatchHandleResult(false, false);
return new BatchHandleResult(false, false, false);
}

/**
Expand All @@ -580,9 +588,9 @@ private BatchHandleResult handleSkip(final long blockNumber) {
PublishStreamResponse.newBuilder().skipBlock(skipBlock).build();
if (sendResponse(response)) {
metrics.blockSkipsSent.increment(); // @todo(1415) add label
return new BatchHandleResult(false, true);
return new BatchHandleResult(false, true, false);
} else {
return new BatchHandleResult(true, true);
return new BatchHandleResult(true, true, true);
}
}

Expand All @@ -601,9 +609,9 @@ private BatchHandleResult handleResend() {
PublishStreamResponse.newBuilder().resendBlock(resendBlock).build();
if (sendResponse(response)) {
metrics.blockResendsSent.increment(); // @todo(1415) add label
return new BatchHandleResult(false, true);
return new BatchHandleResult(false, true, false);
} else {
return new BatchHandleResult(true, true);
return new BatchHandleResult(true, true, true);
}
}

Expand All @@ -615,7 +623,10 @@ private BatchHandleResult handleEndBehind() {
// If the action is END_BEHIND, we need to send an end of stream
// response to the publisher and not propagate the items.
sendEndOfStream(Code.BEHIND);
return new BatchHandleResult(true, true);

// Close stream after sending BEHIND but leave connection open. CN will close it in the event of a
// TOO_FAR_BEHIND
return new BatchHandleResult(false, true, true);
}

/**
Expand All @@ -630,7 +641,7 @@ private BatchHandleResult handleEndDuplicate() {
// If the action is END_DUPLICATE, we need to send an end of stream
// response to the publisher and not propagate the items.
sendEndOfStream(Code.DUPLICATE_BLOCK);
return new BatchHandleResult(true, true);
return new BatchHandleResult(true, true, true);
}

/**
Expand All @@ -642,7 +653,7 @@ private BatchHandleResult handleEndError() {
// response to the publisher and not propagate the items.
sendEndOfStream(Code.ERROR);
metrics.streamErrors.increment(); // @todo(1415) add label
return new BatchHandleResult(true, true);
return new BatchHandleResult(true, true, true);
}

// ==== EndStream Handling Methods =========================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ void testBatchesIncrementOnlyAfterForwarderCompletes() throws InterruptedExcepti

// After forwarder completion, batches should have increased and facility should contain messages.
assertThat(managerMetrics.blocksClosedComplete().get()).isEqualTo(beforeBatches + 2);
assertThat(managerMetrics.currentPublisherCount().get()).isEqualTo(beforeBatches + 1);
assertThat(managerMetrics.currentPublisherCount().get()).isEqualTo(beforeBatches + 2);
// The in-memory messaging facility should now have reset the block number to -1.
assertThat(toTest.getLatestBlockNumber()).isEqualTo(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.hiero.block.api.BlockAccessServiceInterface;
import org.hiero.block.api.BlockEnd;
Expand All @@ -40,6 +41,8 @@
import org.hiero.block.api.BlockStreamSubscribeServiceInterface;
import org.hiero.block.api.PublishStreamRequest;
import org.hiero.block.api.PublishStreamResponse;
import org.hiero.block.api.PublishStreamResponse.EndOfStream.Code;
import org.hiero.block.api.PublishStreamResponse.ResponseOneOfType;
import org.hiero.block.api.ServerStatusRequest;
import org.hiero.block.api.ServerStatusResponse;
import org.hiero.block.api.SubscribeStreamRequest;
Expand Down Expand Up @@ -555,4 +558,66 @@ void e2eSocketClosureTest() throws InterruptedException {
assertTrue(ex.getCause() instanceof SocketException);
assertTrue(ex.getCause().getMessage().toLowerCase().contains("socket closed"));
}

/** Test publisher that is too far behind the current block
* Publisher attempts to publish a block that is significantly ahead of BN's current block
* BN replies with a BEHIND status and closes the stream
* CN does not have the ability to republish the block at this time and sends a TOO_FAR_BEHIND endStream code
*
*/
@Test
void publisherTooFarBehind() throws InterruptedException {
BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient blockStreamPublishServiceClient =
new BlockStreamPublishServiceInterface.BlockStreamPublishServiceClient(
publishBlockStreamPbjGrpcClient, OPTIONS);

ResponsePipelineUtils<PublishStreamResponse> responseObserver = new ResponsePipelineUtils<>();
final Pipeline<? super PublishStreamRequest> requestStream =
blockStreamPublishServiceClient.publishBlockStream(responseObserver);

final long blockNumber = 100;
BlockItem[] blockItems = BlockItemBuilderUtils.createSimpleBlockWithNumber(blockNumber);
PublishStreamRequest request = PublishStreamRequest.newBuilder()
.blockItems(BlockItemSet.newBuilder().blockItems(blockItems).build())
.build();

CountDownLatch completeCountDownLatch = responseObserver.setAndGetOnCompleteLatch(1);
requestStream.onNext(request);

completeCountDownLatch.await(30, TimeUnit.SECONDS); // wait for behind response
assertThat(responseObserver.getOnNextCalls())
.hasSize(1)
.first()
.returns(ResponseOneOfType.END_STREAM, responseKindExtractor)
.returns(Code.BEHIND, endStreamResponseCodeExtractor)
.returns(-1L, endStreamBlockNumberExtractor);

// Assert no other responses sent
assertThat(responseObserver.getOnErrorCalls()).isEmpty();
assertThat(responseObserver.getOnSubscriptionCalls()).isEmpty();
assertThat(responseObserver.getOnCompleteCalls().get()).isEqualTo(1);

// publisher sends TOO_FAR_BEHIND endStream after receiving BEHIND from BN
ResponsePipelineUtils<PublishStreamResponse> tooFarBehindResponseObserver = new ResponsePipelineUtils<>();
final Pipeline<? super PublishStreamRequest> tooFarBehindRequestStream =
blockStreamPublishServiceClient.publishBlockStream(tooFarBehindResponseObserver);

PublishStreamRequest endStreamRequest = PublishStreamRequest.newBuilder()
.endStream(PublishStreamRequest.EndStream.newBuilder()
.earliestBlockNumber(75L)
.endCode(PublishStreamRequest.EndStream.Code.TOO_FAR_BEHIND)
.latestBlockNumber(125L)
.build())
.build();

completeCountDownLatch = tooFarBehindResponseObserver.setAndGetOnCompleteLatch(1);
tooFarBehindRequestStream.onNext(endStreamRequest);
completeCountDownLatch.await(30, TimeUnit.SECONDS); // wait for connection shutdown

// Assert no other responses sent using same connections
assertThat(tooFarBehindResponseObserver.getOnErrorCalls()).isEmpty();
assertThat(tooFarBehindResponseObserver.getOnSubscriptionCalls()).isEmpty();
assertThat(tooFarBehindResponseObserver.getOnCompleteCalls().get()).isEqualTo(1);
assertThat(tooFarBehindResponseObserver.getOnErrorCalls().size()).isEqualTo(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI, assertj has some conveniences for zero values, some other very frequently used. So we can do "assertThat(number).isZero();". Non-blocking nit. On another note, instead of calling Collection#size and checking if it is zero, we could simply call assertThat(collection).isEmpty(); as seen in L:618. Again, totally non-blocking nit.

}
}
Loading