Skip to content

Commit be21434

Browse files
fix:added negative tests for a producer
Signed-off-by: Matt Peterson <[email protected]>
1 parent 1c96bfe commit be21434

File tree

8 files changed

+218
-65
lines changed

8 files changed

+218
-65
lines changed

protos/src/main/protobuf/blockstream.proto

+60-5
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,64 @@ message PublishStreamRequest {
3131
}
3232

3333
message PublishStreamResponse {
34-
ItemAcknowledgement acknowledgement = 1;
35-
}
34+
oneof response {
35+
/**
36+
* A response sent for each item and for each block.
37+
*/
38+
ItemAcknowledgement acknowledgement = 1;
39+
40+
/**
41+
* A response sent when a stream ends.
42+
*/
43+
EndOfStream status = 2;
44+
}
45+
46+
message ItemAcknowledgement {
47+
bytes item_ack = 1;
48+
}
49+
50+
message EndOfStream {
51+
PublishStreamResponseCode status = 1;
52+
}
3653

37-
message ItemAcknowledgement {
38-
bytes item_ack = 1;
54+
/**
55+
* An enumeration indicating the status of this request.
56+
*
57+
* This enumeration describes the reason a block stream
58+
* (sent via `writeBlockStream`) ended.
59+
*/
60+
enum PublishStreamResponseCode {
61+
/**
62+
* An "unset value" flag, this value SHALL NOT be used.<br/>
63+
* This status indicates the server software failed to set a
64+
* status, and SHALL be considered a software defect.
65+
*/
66+
STREAM_ITEMS_UNKNOWN = 0;
67+
68+
/**
69+
* The request succeeded.<br/>
70+
* No errors occurred and the source node orderly ended the stream.
71+
*/
72+
STREAM_ITEMS_SUCCESS = 1;
73+
74+
/**
75+
* The delay between items was too long.<br/>
76+
* The source MUST start a new stream before the failed block.
77+
*/
78+
STREAM_ITEMS_TIMEOUT = 2;
79+
80+
/**
81+
* An item was received out-of-order.<br/>
82+
* The source MUST start a new stream before the failed block.
83+
*/
84+
STREAM_ITEMS_OUT_OF_ORDER = 3;
85+
86+
/**
87+
* A block state proof item could not be validated.<br/>
88+
* The source MUST start a new stream before the failed block.
89+
*/
90+
STREAM_ITEMS_BAD_STATE_PROOF = 4;
91+
}
3992
}
4093

4194
message SubscribeStreamRequest {
@@ -50,6 +103,7 @@ message SubscribeStreamResponse {
50103
message Block {
51104
repeated BlockItem block_items = 1;
52105
}
106+
53107
/**
54108
* A BlockItem is a simple message that contains an id and a value.
55109
* This specification is a simple example meant to expedite development.
@@ -76,4 +130,5 @@ message EventMetadata {
76130

77131
message BlockProof {
78132
uint64 block = 1;
79-
}
133+
}
134+

server/src/main/java/com/hedera/block/server/BlockStreamService.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.hedera.block.server.consumer.ConsumerBlockItemObserver;
2424
import com.hedera.block.server.data.ObjectEvent;
2525
import com.hedera.block.server.mediator.StreamMediator;
26+
import com.hedera.block.server.producer.ItemAckBuilder;
2627
import com.hedera.block.server.producer.ProducerBlockItemObserver;
2728
import io.grpc.stub.StreamObserver;
2829
import io.helidon.webserver.grpc.GrpcService;
@@ -43,6 +44,7 @@ public class BlockStreamService implements GrpcService {
4344
private final System.Logger LOGGER = System.getLogger(getClass().getName());
4445

4546
private final long timeoutThresholdMillis;
47+
private final ItemAckBuilder itemAckBuilder;
4648
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
4749

4850
/**
@@ -52,8 +54,10 @@ public class BlockStreamService implements GrpcService {
5254
*/
5355
public BlockStreamService(
5456
final long timeoutThresholdMillis,
57+
final ItemAckBuilder itemAckBuilder,
5558
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator) {
5659
this.timeoutThresholdMillis = timeoutThresholdMillis;
60+
this.itemAckBuilder = itemAckBuilder;
5761
this.streamMediator = streamMediator;
5862
}
5963

@@ -96,7 +100,8 @@ private StreamObserver<PublishStreamRequest> publishBlockStream(
96100
System.Logger.Level.DEBUG,
97101
"Executing bidirectional publishBlockStream gRPC method");
98102

99-
return new ProducerBlockItemObserver(streamMediator, publishStreamResponseObserver);
103+
return new ProducerBlockItemObserver(
104+
streamMediator, publishStreamResponseObserver, itemAckBuilder);
100105
}
101106

102107
private void subscribeBlockStream(

server/src/main/java/com/hedera/block/server/Server.java

+2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.hedera.block.server.mediator.LiveStreamMediatorImpl;
2323
import com.hedera.block.server.persistence.WriteThroughCacheHandler;
2424
import com.hedera.block.server.persistence.storage.*;
25+
import com.hedera.block.server.producer.ItemAckBuilder;
2526
import io.grpc.stub.ServerCalls;
2627
import io.grpc.stub.StreamObserver;
2728
import io.helidon.config.Config;
@@ -74,6 +75,7 @@ public static void main(final String[] args) {
7475
final BlockStreamService blockStreamService =
7576
new BlockStreamService(
7677
consumerTimeoutThreshold,
78+
new ItemAckBuilder(),
7779
new LiveStreamMediatorImpl(
7880
new WriteThroughCacheHandler(blockReader, blockWriter),
7981
(streamMediator) -> {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.producer;
18+
19+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20+
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.ItemAcknowledgement;
21+
import static com.hedera.block.server.producer.Util.getFakeHash;
22+
23+
import com.google.protobuf.ByteString;
24+
import java.io.IOException;
25+
import java.security.NoSuchAlgorithmException;
26+
27+
public class ItemAckBuilder {
28+
public ItemAcknowledgement buildAck(final BlockItem blockItem)
29+
throws IOException, NoSuchAlgorithmException {
30+
// TODO: Use real hash
31+
return ItemAcknowledgement.newBuilder()
32+
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
33+
.build();
34+
}
35+
}

server/src/main/java/com/hedera/block/server/producer/ProducerBlockItemObserver.java

+16-27
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,12 @@
1717
package com.hedera.block.server.producer;
1818

1919
import static com.hedera.block.protos.BlockStreamService.*;
20+
import static com.hedera.block.protos.BlockStreamService.PublishStreamResponse.*;
2021

21-
import com.google.protobuf.ByteString;
2222
import com.hedera.block.server.data.ObjectEvent;
2323
import com.hedera.block.server.mediator.StreamMediator;
2424
import io.grpc.stub.StreamObserver;
25-
import java.io.ByteArrayOutputStream;
2625
import java.io.IOException;
27-
import java.io.ObjectOutputStream;
28-
import java.security.MessageDigest;
2926
import java.security.NoSuchAlgorithmException;
3027

3128
/**
@@ -39,6 +36,7 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
3936

4037
private final StreamObserver<PublishStreamResponse> publishStreamResponseObserver;
4138
private final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
39+
private final ItemAckBuilder itemAckBuilder;
4240

4341
/**
4442
* Constructor for the ProducerBlockStreamObserver class. It is responsible for calling the
@@ -47,9 +45,12 @@ public class ProducerBlockItemObserver implements StreamObserver<PublishStreamRe
4745
*/
4846
public ProducerBlockItemObserver(
4947
final StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator,
50-
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver) {
48+
final StreamObserver<PublishStreamResponse> publishStreamResponseObserver,
49+
final ItemAckBuilder itemAckBuilder) {
50+
5151
this.streamMediator = streamMediator;
5252
this.publishStreamResponseObserver = publishStreamResponseObserver;
53+
this.itemAckBuilder = itemAckBuilder;
5354
}
5455

5556
/**
@@ -64,16 +65,20 @@ public void onNext(final PublishStreamRequest publishStreamRequest) {
6465
streamMediator.publishEvent(blockItem);
6566

6667
try {
67-
// Send a response back to the upstream producer
68-
// TODO: Use real hash
69-
final ItemAcknowledgement itemAck =
70-
ItemAcknowledgement.newBuilder()
71-
.setItemAck(ByteString.copyFrom(getFakeHash(blockItem)))
72-
.build();
68+
final ItemAcknowledgement itemAck = itemAckBuilder.buildAck(blockItem);
7369
final PublishStreamResponse publishStreamResponse =
7470
PublishStreamResponse.newBuilder().setAcknowledgement(itemAck).build();
7571
publishStreamResponseObserver.onNext(publishStreamResponse);
72+
7673
} catch (IOException | NoSuchAlgorithmException e) {
74+
75+
final EndOfStream endOfStream =
76+
EndOfStream.newBuilder()
77+
.setStatus(PublishStreamResponseCode.STREAM_ITEMS_UNKNOWN)
78+
.build();
79+
final PublishStreamResponse errorResponse =
80+
PublishStreamResponse.newBuilder().setStatus(endOfStream).build();
81+
publishStreamResponseObserver.onNext(errorResponse);
7782
LOGGER.log(System.Logger.Level.ERROR, "Error calculating hash", e);
7883
}
7984
}
@@ -99,20 +104,4 @@ public void onCompleted() {
99104
LOGGER.log(System.Logger.Level.DEBUG, "ProducerBlockStreamObserver completed");
100105
publishStreamResponseObserver.onCompleted();
101106
}
102-
103-
private static byte[] getFakeHash(BlockItem blockItem)
104-
throws IOException, NoSuchAlgorithmException {
105-
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
106-
try (ObjectOutputStream objectOutputStream =
107-
new ObjectOutputStream(byteArrayOutputStream)) {
108-
objectOutputStream.writeObject(blockItem);
109-
}
110-
111-
// Get the serialized bytes
112-
byte[] serializedObject = byteArrayOutputStream.toByteArray();
113-
114-
// Calculate the SHA-256 hash
115-
MessageDigest digest = MessageDigest.getInstance("SHA-384");
116-
return digest.digest(serializedObject);
117-
}
118107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright (C) 2024 Hedera Hashgraph, LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.hedera.block.server.producer;
18+
19+
import static com.hedera.block.protos.BlockStreamService.BlockItem;
20+
21+
import java.io.ByteArrayOutputStream;
22+
import java.io.IOException;
23+
import java.io.ObjectOutputStream;
24+
import java.security.MessageDigest;
25+
import java.security.NoSuchAlgorithmException;
26+
27+
public final class Util {
28+
private Util() {}
29+
30+
public static byte[] getFakeHash(BlockItem blockItem)
31+
throws IOException, NoSuchAlgorithmException {
32+
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
33+
try (ObjectOutputStream objectOutputStream =
34+
new ObjectOutputStream(byteArrayOutputStream)) {
35+
objectOutputStream.writeObject(blockItem);
36+
}
37+
38+
// Get the serialized bytes
39+
byte[] serializedObject = byteArrayOutputStream.toByteArray();
40+
41+
// Calculate the SHA-256 hash
42+
MessageDigest digest = MessageDigest.getInstance("SHA-384");
43+
return digest.digest(serializedObject);
44+
}
45+
}

server/src/test/java/com/hedera/block/server/consumer/ConsumerBlockItemObserverTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ public class ConsumerBlockItemObserverTest {
3636
private final long TIMEOUT_THRESHOLD_MILLIS = 50L;
3737
private final long TEST_TIME = 1_719_427_664_950L;
3838

39-
@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
39+
private Object lock = new Object();
4040

41+
@Mock private StreamMediator<ObjectEvent<BlockItem>, BlockItem> streamMediator;
4142
@Mock private StreamObserver<SubscribeStreamResponse> responseStreamObserver;
42-
4343
@Mock private ObjectEvent<BlockItem> objectEvent;
4444

4545
@Test
@@ -115,8 +115,8 @@ public void testConsumerNotToSendBeforeBlockHeader() throws InterruptedException
115115
final SubscribeStreamResponse subscribeStreamResponse =
116116
SubscribeStreamResponse.newBuilder().setBlockItem(blockItem).build();
117117

118-
synchronized (responseStreamObserver) {
119-
responseStreamObserver.wait(2000);
118+
synchronized (lock) {
119+
lock.wait(50);
120120
}
121121

122122
// Confirm that the observer was called with the next BlockItem

0 commit comments

Comments
 (0)