Skip to content

Commit b4324fc

Browse files
feature: design document for streaming blockitems from producer to consumers
Signed-off-by: Matt Peterson <[email protected]>
1 parent 552c9b9 commit b4324fc

5 files changed

+339
-0
lines changed
Loading
Loading
Loading
Loading
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,339 @@
1+
# Bi-directional Producer/Consumer Streaming with gRPC
2+
3+
## Purpose
4+
5+
A primary use case of the `hedera-block-node` is to stream live BlockItems (see Terms section) from a producer
6+
(e.g. Consensus Node) to N consumers (e.g. Mirror Node) with the lowest possible latency while correctly preserving the
7+
order of the BlockItems. This document outlines several possible strategies to implement this use case and the design
8+
of the recommended approach. All strategies rely on the Helidon 4.x.x server implementations of HTTP/2 and gRPC
9+
services to ingest BlockItem data from a producer and then to stream the same BlockItems to downstream consumers. It
10+
does this by defining bidirectional gRPC streaming services based on protobuf definitions.
11+
12+
Helidon provides well-defined APIs and extension points to implement business logic for these services. The main entry
13+
point for custom logic is an implementation of `GrpcService`.
14+
15+
---
16+
17+
## Goals
18+
19+
1) Consumers must be able to dynamically subscribe and unsubscribe from the live stream of BlockItems emitted by the
20+
producer.
21+
2) Correct, in-order streaming delivery of BlockItems from a producer to all registered consumers.
22+
3) Minimize latency between the producer and consumers.
23+
4) Minimize CPU resources consumed by the producer and consumers.
24+
25+
---
26+
27+
### Terms
28+
29+
**BlockItem** - The BlockItem is the primary data structure passed between the producer, the `hedera-block-node`
30+
and consumers. A defined sequence of BlockItems represent a Block when stored on the `hedera-block-node`.
31+
32+
**Bidirectional Streaming** - Bidirectional streaming is an [HTTP/2 feature](https://datatracker.ietf.org/doc/html/rfc9113#name-streams-and-multiplexing) allowing both a client and a server to emit
33+
a continuous stream of frames without waiting for responses. In this way, gRPC services can be used to efficiently
34+
transmit a continuous flow of BlockItem messages while the HTTP/2 connection is open.
35+
36+
**Producer StreamObserver** - The Producer StreamObserver is a custom implementation of the [gRPC StreamObserver
37+
interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Producer
38+
StreamObserver at runtime when the producer sends a new BlockItem to the `StreamSink` gRPC service.
39+
40+
**Consumer StreamObserver** - The Consumer StreamObserver is a custom implementation of the [gRPC StreamObserver
41+
interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes the Consumer
42+
StreamObserver at runtime when the downstream consumer of the `StreamSource` gRPC service sends HTTP/2 responses to
43+
sent BlockItems.
44+
45+
**subscribe** - Consumers calling the `StreamSource` gRPC service must be affiliated or subscribed with a producer to
46+
receive a live stream of BlockItems from the `hedera-block-node`.
47+
48+
**unsubscribe** - Consumers terminating their connection with the `StreamSource` gRPC service must be unaffiliated or
49+
unsubscribed from a producer so that internal objects can be cleaned up and resources released.
50+
51+
---
52+
53+
### Block Node gRPC Streaming Services API
54+
55+
The following protobuf definition outlines the gRPC services and messages used to stream BlockItems between a producer,
56+
consumers and the `hedera-block-node`. The `BlockStreamGrpc` service definition provides 2 bidirectional streaming
57+
methods: `StreamSink` and `StreamSource`. Aside from the gRPC service and method names, all the types will be replaced
58+
by those outlined in [hedera-protobufs](https://github.com/hashgraph/hedera-protobufs/pull/342/files).
59+
60+
```protobuf
61+
62+
/**
63+
* The BlockStreamGrpc service definition provides 2 bidirectional streaming methods for
64+
* exchanging BlockItems with the Block Node server.
65+
*
66+
* A producer (e.g. Consensus Node) can use the StreamSink method to stream BlockItems to the
67+
* Block Node server. The Block Node server will respond with a BlockResponse message for
68+
* each BlockItem received.
69+
*
70+
* A consumer (e.g. Mirror Node) can use the StreamSource method to request a stream of
71+
* BlockItems from the server. The consumer is expected to respond with a BlockResponse message
72+
* with the id of each BlockItem received.
73+
*/
74+
service BlockStreamGrpc {
75+
/**
76+
* StreamSink is a bidirectional streaming method that allows a producer to stream BlockItems
77+
* to the Block Node server. The server will respond with a BlockResponse message for each
78+
* BlockItem received.
79+
*/
80+
rpc StreamSink (stream BlockItem) returns (stream BlockItemResponse) {}
81+
82+
/**
83+
* StreamSource is a bidirectional streaming method that allows a consumer to request a
84+
* stream of BlockItems from the server. The consumer is expected to respond with a BlockResponse
85+
* message with the id of each BlockItem received.
86+
*/
87+
rpc StreamSource (stream BlockItemResponse) returns (stream BlockItem) {}
88+
}
89+
90+
/**
91+
* A BlockItem is a simple message that contains an id and a value.
92+
* This specification is a simple example meant to expedite development.
93+
* It will be replaced with a PBJ implementation in the future.
94+
*/
95+
message BlockItem {
96+
/**
97+
* The id of the block. Each block id should be unique.
98+
*/
99+
int64 id = 1;
100+
101+
/**
102+
* The value of the block. The value can be any string.
103+
*/
104+
string value = 2;
105+
}
106+
107+
/**
108+
* A BlockItemResponse is a simple message that contains an id.
109+
* The BlockItemResponse is meant to confirm the receipt of a BlockItem.
110+
* A future use case may expand on this type to communicate a failure
111+
* condition where the BlockItem needs to be resent, etc.
112+
*/
113+
message BlockItemResponse {
114+
/**
115+
* The id of the BlockItem which was received.
116+
*/
117+
int64 id = 1;
118+
}
119+
120+
```
121+
122+
---
123+
124+
125+
## Approaches:
126+
127+
All the following approaches require integrating with Helidon 4.x.x gRPC services to implement the bidirectional
128+
streaming API methods defined above. The following objects are used in all approaches:
129+
130+
`BlockItemStreamService` is a custom implementation of the Helidon gRPC `GrpcService`. It is responsible for binding
131+
the Helidon routing mechanism to the gRPC streaming methods called by producers and consumers.
132+
133+
`ProducerBlockItemObserver` is a custom implementation of the Helidon gRPC `StreamObserver` interface.
134+
`BlockItemStreamService` instantiates a new `ProducerBlockItemObserver` instance when the `StreamSink` gRPC method is
135+
called by a producer. Thereafter, Helidon invokes `ProducerBlockItemObserver` methods to receive the latest BlockItem
136+
from the producer and return BlockItemResponses via a bidirectional stream.
137+
138+
`ConsumerBlockItemObserver` is also a custom implementation of the Helidon gRPC `StreamObserver` interface.
139+
`BlockItemStreamService` instantiates a new `ConsumerBlockItemObserver` instance when the `StreamSource` gRPC method
140+
is called by each consumer. The `ConsumerBlockItemObserver` wraps an instance of `StreamObserver` provided by Helidon
141+
when the connection is established. The `ConsumerBlockItemObserver` uses the `StreamObserver` to send the latest
142+
BlockItem to the downstream consumer. Helidon invokes `ConsumerBlockItemObserver` methods to deliver BlockItemResponses
143+
from the consumer in receipt of BlockItems.
144+
145+
146+
### Approach 1: Directly passing BlockItems from `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s.
147+
148+
Directly passing BlockItems from the `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s without storing
149+
BlockItems in an intermediate data structure. This approach was the basis for one of the first implementations of gRPC
150+
Live Streaming (see [BlockNode Issue 21](https://github.com/hashgraph/hedera-block-node/issues/21)). Unfortunately, this approach has the following problems:
151+
152+
Drawbacks:
153+
1) Each `ProducerBlockItemObserver` must iterate over the list of subscribed consumers to pass the BlockItem to each
154+
`ConsumerBlockItemObserver` before saving the BlockItem to disk and issuing a BlockItemResponse back to the producer.
155+
The linear scaling of consumers will aggregate latency resulting in the last consumer in the list to be penalized
156+
with the sum of the latencies of all consumers before it.
157+
2) Dynamically subscribing/unsubscribing `ConsumerBlockItemObserver`s while deterministically broadcasting BlockItems
158+
to each consumer in the correct order complicates and slows down the process. It requires thread-safe data
159+
structures and synchronization on all reads and writes to ensure new/removed subscribers do not disrupt the
160+
iteration order of the `ConsumerBlockItemObserver`s.
161+
162+
### Approach 2: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Consumers busy-wait for new BlockItems.
163+
164+
Alternatively, if `ProducerBlockItemObserver`s store BlockItems in a shared data structure before immediately returning
165+
a response to the producer, the BlockItem is then immediately available for all `ConsumerBlockItemObserver`s to read
166+
asynchronously. Consumers can repeatedly poll the shared data structure for new BlockItems. This approach has the
167+
following consequences:
168+
169+
Advantages:
170+
1) The `ProducerBlockItemObserver` can immediately return a BlockItemResponse to the producer without waiting for the
171+
`ConsumerBlockItemObserver`s to process the BlockItem or waiting for the BlockItem to be written to disk.
172+
2) No additional third-party libraries are required to implement this approach.
173+
174+
Drawbacks:
175+
1) Busy-waiting consumers will increase CPU demand while polling the shared data structure for new BlockItems.
176+
2) It is difficult to anticipate and tune an optimal polling interval for consumers as the number of consumers scales
177+
up or down.
178+
3) While prototyping this approach, it appeared that `ConsumerBlockItemObserver`s using a busy-wait to watch for new
179+
BlockItems impaired the ability of the Helidon Virtual Thread instance to process the inbound responses from the
180+
downstream consumer in a timely way. The aggressive behavior of the busy-wait could complicate future use cases
181+
requiring downstream consumer response processing.
182+
183+
184+
### Approach 3: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Use downstream consumer BlockItemResponses to drive the process of sending new BlockItems.
185+
186+
With this approach, the `ProducerBlockItemObserver` will store BlockItems in a shared data structure before immediately
187+
returning a BlockItemResponse to the producer. However, rather than using a busy-wait to poll for new BlockItems,
188+
`ConsumerBlockItemObserver`s will send new BlockItems only upon receipt of BlockItemResponses from previously sent
189+
BlockItems. When Helidon invokes `onNext()` with a BlockItemResponse, the `ConsumerBlockItemObserver` (using an
190+
internal counter) will calculate and send all newest BlockItems available from the shared data structure to the
191+
downstream consumer. In this way, the downstream consumer responses will drive the process of sending new BlockItems.
192+
193+
Advantages:
194+
1) It will not consume CPU resources polling.
195+
2) It will not hijack the thread from responding to the downstream consumer. Rather, it uses the interaction with the
196+
consumer to trigger sending the newest BlockItems downstream.
197+
3) The shared data structure will need to be concurrent but, after the initial write operation, all subsequent reads
198+
should not require synchronization.
199+
4) The shared data structure will decouple the `ProducerBlockItemObserver` from the `ConsumerBlockItemObserver`s
200+
allowing them to operate independently and not accrue the same latency issues as Approach #1.
201+
5) No additional third-party libraries are required to implement this approach.
202+
203+
Drawbacks:
204+
1) With this approach, BlockItems sent to the consumer are driven exclusively by the downstream consumer
205+
BlockItemResponses. Given, the latency of a network request/response round-trip, this approach will likely be far
206+
too slow to be considered effective even when sending a batch of all the latest BlockItems.
207+
208+
### Approach 4: Shared data structure between producer and consumer services. Leveraging the LMAX Disruptor library to manage inter-process pub/sub message-passing between producer and consumers via RingBuffer.
209+
210+
The LMAX Disruptor library is a high-performance inter-process pub/sub message passing library that could be used to
211+
efficiently pass BlockItems between a `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. The Disruptor
212+
library is designed to minimize latency as well as CPU cycles to by not blocking while maintaining concurrency
213+
guarantees.
214+
215+
Advantages:
216+
1) The Disruptor library is designed to minimize the latency of passing BlockItem messages between a
217+
`ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s.
218+
2) The Disruptor library is designed to minimize the CPU resources used by the `ProducerBlockItemObserver` and
219+
`ConsumerBlockItemObserver`s.
220+
3) The Disruptor library does not require any additional transient dependencies.
221+
4) Fixes to the Disruptor library are actively maintained and updated by the LMAX team.
222+
223+
Drawbacks:
224+
1) The Disruptor library is a third-party library requiring ramp-up time and integration effort to use it correctly and
225+
effectively.
226+
2) Leveraging the Disruptor library requires the communication between the `ProducerBlockItemObserver` and
227+
`ConsumerBlockItemObserver`s to be affiliated by subscribing/unsubscribing the downstream consumers to receive the
228+
latest BlockItems from the producer via the Disruptor RingBuffer. The process of managing these subscriptions to
229+
the RingBuffer can be complex.
230+
231+
---
232+
233+
## Design
234+
235+
Given the goals and the proposed approaches, Approach #4 has significant advantages and fewer significant drawbacks.
236+
Using the LMAX Disruptor offers low latency and CPU consumption via a well-maintained and tested API. The RingBuffer
237+
intermediate data structure should serve to decouple the producer bidirectional stream from the consumer bidirectional
238+
streams. Please see the following Entities section and Diagrams for a visual representation of the design.
239+
240+
### Producer Registration Flow
241+
242+
At boot time, the `BlockItemStreamService` will initialize the `StreamMediator` with the LMAX Disruptor RingBuffer.
243+
244+
When a producer calls the `StreamSink` gRPC method, the `BlockItemStreamService` will create a new
245+
`ProducerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the
246+
upstream producer. The `ProducerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to
247+
the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the producer.
248+
See the Producer Registration Flow diagram for more details.
249+
250+
### Consumer Registration Flow
251+
252+
When a consumer calls the `StreamSource` gRPC method, the `BlockItemStreamService` will create a new
253+
`ConsumerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the
254+
downstream consumer. The `ConsumerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to
255+
the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the downstream consumer. The
256+
`BlockItemStreamService` will also subscribe the `ConsumerBlockItemObserver` to the `StreamMediator` to receive the
257+
streaming BlockItems from the producer.
258+
259+
260+
### Runtime Streaming
261+
262+
At runtime, the `ProducerBlockItemObserver` will receive the latest BlockItem from the producer via Helidon and will
263+
invoke publishEvent(BlockItem) on the `StreamMediator` to write the BlockItem to the RingBuffer. The
264+
`ProducerBlockItemObserver` will then persist the BlockItem and return a BlockItemResponse to the producer via
265+
its reference to `ResponseStreamObserver`.
266+
267+
Asynchronously, the RingBuffer will invoke the onEvent(BlockItem) method of all the subscribed
268+
`ConsumerBlockItemObserver`s passing them the latest BlockItem. The `ConsumerBlockItemObserver` will then transmit
269+
the BlockItem downstream to the consumer via its reference to the `ResponseStreamObserver`. Downstream consumers will
270+
respond with a BlockItemResponse. Helidon will call the onNext() method of the `ConsumerBlockItemObserver` with the
271+
BlockItemResponse.
272+
273+
BlockItems sent to the `ConsumerBlockItemObserver` via the RingBuffer and BlockItemResponses passed by Helidon from
274+
the downstream consumer are used to refresh internal timeouts maintained by the `ConsumerBlockItemObserver`. If a
275+
configurable timeout threshold is exceeded, the `ConsumerBlockItemObserver` will unsubscribe itself from the
276+
`StreamMediator`. This mechanism is necessary because producers and consumers may not send HTTP/2 `End Stream` DATA
277+
frames to terminate their bidirectional connection. Moreover, Helidon does not throw an exception back up to
278+
`ConsumerBlockItemObserver` when the downstream consumer disconnects. Internal timeouts ensure objects are not
279+
permanently subscribed to the `StreamMediator`.
280+
281+
### Entities
282+
283+
**BlockItemStreamService** - The BlockItemStreamService is a custom implementation of the Helidon gRPC GrpcService.
284+
It is responsible for initializing the StreamMediator and instantiating ProducerBlockItemObserver and
285+
ConsumerBlockItemObserver instances on-demand when the gRPC API is called by producers and consumers. It is
286+
the primary binding between the Helidon routing mechanisms and the `hedera-block-node` custom business logic.
287+
288+
**StreamObserver** - StreamObserver is the main interface through which Helidon 4.x.x invokes custom business logic
289+
to receive and transmit bidirectional BlockItem streams at runtime.
290+
291+
**ProducerBlockItemObserver** - A custom implementation of StreamObserver invoked by Helidon at runtime which is
292+
responsible for:
293+
1) Receiving the latest BlockItem from the producer (e.g. Consensus Node).
294+
2) Returning a response to the producer.
295+
296+
**StreamMediator** - StreamMediator is an implementation of the [Mediator Pattern](https://en.wikipedia.org/wiki/Mediator_pattern)
297+
encapsulating the communication and interaction between the producer (ProducerBlockItemObserver) and N consumers
298+
(ConsumerBlockItemObserver) using the RingBuffer of the Disruptor library. It manages the 1-to-N relationship between
299+
the producer and consumers.
300+
301+
**RingBuffer** - A shared data structure between the producer and consumers that temporarily stores inbound BlockItems.
302+
The RingBuffer is a fixed-sized array of ConsumerBlockItemObservers that is managed by the Disruptor library.
303+
304+
**EventHandler** - The EventHandler is an integration interface provided by the Disruptor library as a mechanism to
305+
invoke callback logic when a new BlockItem is written to the RingBuffer. The EventHandler is responsible for passing
306+
the latest BlockItem to the ConsumerBlockItemObserver when it is available in the RingBuffer.
307+
308+
**ConsumerBlockItemObserver** - A custom implementation of StreamObserver called by Helidon which is responsible for:
309+
1) Receiving the latest response from the downstream consumer.
310+
2) Receiving the latest BlockItem from the RingBuffer.
311+
3) Sending the latest BlockItem to the downstream consumer.
312+
313+
**BlockPersistenceHandler** - The BlockPersistenceHandler is responsible for writing the latest BlockItem to disk.
314+
315+
---
316+
## Diagrams
317+
318+
319+
### Producer Registration Flow
320+
321+
![Producer Registration](assets/00036-producer-registration.png)
322+
323+
324+
### Consumer Registration Flow
325+
326+
![Consumer Registration](assets/00036-consumer-registration.png)
327+
328+
329+
### Class Diagram of all Entities and their Relationships
330+
331+
![Class Diagram](assets/00036-demo-disruptor-class-diagram.png)
332+
333+
### Runtime Stream of BlockItems from Producer to Consumers
334+
335+
![Sequence Diagram](assets/00036-refactor-demo-disruptor.png)
336+
337+
338+
---
339+

0 commit comments

Comments
 (0)