|
| 1 | +# Bi-directional Producer/Consumer Streaming with gRPC |
| 2 | + |
| 3 | +## Purpose |
| 4 | + |
| 5 | +A primary use case of the `hedera-block-node` is to stream live BlockItems (see Terms section) from a producer |
| 6 | +(e.g. Consensus Node) to N consumers (e.g. Mirror Node) with the lowest possible latency while correctly preserving the |
| 7 | +order of the BlockItems. This document outlines several possible strategies to implement this use case and the design |
| 8 | +of the recommended approach. All strategies rely on the Helidon 4.x.x server implementations of HTTP/2 and gRPC |
| 9 | +services to ingest BlockItem data from a producer and then to stream the same BlockItems to downstream consumers. It |
| 10 | +does this by defining bidirectional gRPC streaming services based on protobuf definitions. |
| 11 | + |
| 12 | +Helidon provides well-defined APIs and extension points to implement business logic for these services. The main entry |
| 13 | +point for custom logic is an implementation of `GrpcService`. |
| 14 | + |
| 15 | +--- |
| 16 | + |
| 17 | +## Goals |
| 18 | + |
| 19 | +1) Consumers must be able to dynamically subscribe and unsubscribe from the live stream of BlockItems emitted by the |
| 20 | + producer. When a consumer subscribes to the stream, they will begin receiving BlockItems at the start of the next Block. |
| 21 | + BlockItems transiting before the start of the next Block will not be sent to that downstream consumer. |
| 22 | +2) Correct, in-order streaming delivery of BlockItems from a producer to all registered consumers. |
| 23 | +3) Minimize latency between the producer and consumers. |
| 24 | +4) Minimize CPU resources consumed by the producer and consumers. |
| 25 | + |
| 26 | +--- |
| 27 | + |
| 28 | +### Terms |
| 29 | + |
| 30 | +**BlockItem** - The BlockItem is the primary data structure passed between the producer, the `hedera-block-node` |
| 31 | +and consumers. A defined sequence of BlockItems represent a Block when stored on the `hedera-block-node`. |
| 32 | + |
| 33 | +**Bidirectional Streaming** - Bidirectional streaming is an [HTTP/2 feature](https://datatracker.ietf.org/doc/html/rfc9113#name-streams-and-multiplexing) |
| 34 | +allowing both a client and a server emit a continuous stream of frames without waiting for responses. In this way, gRPC |
| 35 | +services can be used to efficiently transmit a continuous flow of BlockItem messages while the HTTP/2 connection is open. |
| 36 | + |
| 37 | +**Producer StreamObserver** - The Producer StreamObserver is a custom implementation of the [gRPC StreamObserver |
| 38 | +interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes |
| 39 | +the Producer StreamObserver at runtime when the producer sends a new BlockItem to the `publishBlockStream` gRPC service. |
| 40 | + |
| 41 | +**Consumer StreamObserver** - The Consumer StreamObserver is a custom implementation of the [gRPC StreamObserver |
| 42 | +interface](https://github.com/grpc/grpc-java/blob/0ff3f8e4ac4c265e91b4a0379a32cf25a0a2b2f7/stub/src/main/java/io/grpc/stub/StreamObserver.java#L45) used by Helidon. It is initialized by the BlockItemStreamService (see Entities section). Helidon invokes |
| 43 | +the Consumer StreamObserver at runtime when the downstream consumer of the `subscribeBlockStream` gRPC service sends HTTP/2 |
| 44 | +responses to sent BlockItems. |
| 45 | + |
| 46 | +**subscribe** - Consumers calling the `subscribeBlockStream` gRPC service must be affiliated or subscribed with a producer to |
| 47 | +receive a live stream of BlockItems from the `hedera-block-node`. |
| 48 | + |
| 49 | +**unsubscribe** - Consumers terminating their connection with the `subscribeBlockStream` gRPC service must be unaffiliated or |
| 50 | +unsubscribed from a producer so that internal objects can be cleaned up and resources released. |
| 51 | + |
| 52 | +--- |
| 53 | + |
| 54 | +### Block Node gRPC Streaming Services API |
| 55 | + |
| 56 | +The Block Node gRPC Streaming Services API is now aligned with the names and simplified types defined in the |
| 57 | +[`hedera-protobufs` repository on the `continue-block-node` branch](https://github.com/hashgraph/hedera-protobufs/blob/25783427575ded59d06d6bf1ed253fd24ef3c437/block/block_service.proto#L701-L742). |
| 58 | + |
| 59 | +--- |
| 60 | + |
| 61 | + |
| 62 | +## Approaches: |
| 63 | + |
| 64 | +All the following approaches require integrating with Helidon 4.x.x gRPC services to implement the bidirectional |
| 65 | +streaming API methods defined above. The following objects are used in all approaches: |
| 66 | + |
| 67 | +`BlockItemStreamService` is a custom implementation of the Helidon gRPC `GrpcService`. It is responsible for binding |
| 68 | +the Helidon routing mechanism to the gRPC streaming methods called by producers and consumers. |
| 69 | + |
| 70 | +`ProducerBlockItemObserver` is a custom implementation of the Helidon gRPC `StreamObserver` interface. |
| 71 | +`BlockItemStreamService` instantiates a new `ProducerBlockItemObserver` instance when the `publishBlockStream` gRPC method is |
| 72 | +called by a producer. Thereafter, Helidon invokes `ProducerBlockItemObserver` methods to receive the latest BlockItem |
| 73 | +from the producer and return BlockItemResponses via a bidirectional stream. |
| 74 | + |
| 75 | +`ConsumerBlockItemObserver` is also a custom implementation of the Helidon gRPC `StreamObserver` interface. |
| 76 | +`BlockItemStreamService` instantiates a new `ConsumerBlockItemObserver` instance when the `subscribeBlockStream` gRPC method |
| 77 | +is called by each consumer. The `ConsumerBlockItemObserver` wraps an instance of `StreamObserver` provided by Helidon |
| 78 | +when the connection is established. The `ConsumerBlockItemObserver` uses the `StreamObserver` to send the latest |
| 79 | +BlockItem to the downstream consumer. Helidon invokes `ConsumerBlockItemObserver` methods to deliver BlockItemResponses |
| 80 | +from the consumer in receipt of BlockItems. |
| 81 | + |
| 82 | + |
| 83 | +### Approach 1: Directly passing BlockItems from `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s. |
| 84 | + |
| 85 | +Directly passing BlockItems from the `ProducerBlockItemObserver` to N `ConsumerBlockItemObserver`s without storing |
| 86 | +BlockItems in an intermediate data structure. This approach was the basis for one of the first implementations of gRPC |
| 87 | +Live Streaming (see [BlockNode Issue 21](https://github.com/hashgraph/hedera-block-node/issues/21)). Unfortunately, this approach has the following problems: |
| 88 | + |
| 89 | +Drawbacks: |
| 90 | +1) Each `ProducerBlockItemObserver` must iterate over the list of subscribed consumers to pass the BlockItem to each |
| 91 | + `ConsumerBlockItemObserver` before saving the BlockItem to disk and issuing a BlockItemResponse back to the producer. |
| 92 | + The linear scaling of consumers will aggregate latency resulting in the last consumer in the list to be penalized |
| 93 | + with the sum of the latencies of all consumers before it. |
| 94 | +2) Dynamically subscribing/unsubscribing `ConsumerBlockItemObserver`s while deterministically broadcasting BlockItems |
| 95 | + to each consumer in the correct order complicates and slows down the process. It requires thread-safe data |
| 96 | + structures and synchronization on all reads and writes to ensure new/removed subscribers do not disrupt the |
| 97 | + iteration order of the `ConsumerBlockItemObserver`s. |
| 98 | + |
| 99 | +### Approach 2: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Consumers busy-wait for new BlockItems. |
| 100 | + |
| 101 | +Alternatively, if `ProducerBlockItemObserver`s store BlockItems in a shared data structure before immediately returning |
| 102 | +a response to the producer, the BlockItem is then immediately available for all `ConsumerBlockItemObserver`s to read |
| 103 | +asynchronously. Consumers can repeatedly poll the shared data structure for new BlockItems. This approach has the |
| 104 | +following consequences: |
| 105 | + |
| 106 | +Advantages: |
| 107 | +1) The `ProducerBlockItemObserver` can immediately return a BlockItemResponse to the producer without waiting for the |
| 108 | + `ConsumerBlockItemObserver`s to process the BlockItem or waiting for the BlockItem to be written to disk. |
| 109 | +2) No additional third-party libraries are required to implement this approach. |
| 110 | + |
| 111 | +Drawbacks: |
| 112 | +1) Busy-waiting consumers will increase CPU demand while polling the shared data structure for new BlockItems. |
| 113 | +2) It is difficult to anticipate and tune an optimal polling interval for consumers as the number of consumers scales |
| 114 | + up or down. |
| 115 | +3) While prototyping this approach, it appeared that `ConsumerBlockItemObserver`s using a busy-wait to watch for new |
| 116 | + BlockItems impaired the ability of the Helidon Virtual Thread instance to process the inbound responses from the |
| 117 | + downstream consumer in a timely way. The aggressive behavior of the busy-wait could complicate future use cases |
| 118 | + requiring downstream consumer response processing. |
| 119 | + |
| 120 | + |
| 121 | +### Approach 3: Use a shared data structure between `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. Use downstream consumer BlockItemResponses to drive the process of sending new BlockItems. |
| 122 | + |
| 123 | +With this approach, the `ProducerBlockItemObserver` will store BlockItems in a shared data structure before immediately |
| 124 | +returning a BlockItemResponse to the producer. However, rather than using a busy-wait to poll for new BlockItems, |
| 125 | +`ConsumerBlockItemObserver`s will send new BlockItems only upon receipt of BlockItemResponses from previously sent |
| 126 | +BlockItems. When Helidon invokes `onNext()` with a BlockItemResponse, the `ConsumerBlockItemObserver` (using an |
| 127 | +internal counter) will calculate and send all newest BlockItems available from the shared data structure to the |
| 128 | +downstream consumer. In this way, the downstream consumer responses will drive the process of sending new BlockItems. |
| 129 | + |
| 130 | +Advantages: |
| 131 | +1) It will not consume CPU resources polling. |
| 132 | +2) It will not hijack the thread from responding to the downstream consumer. Rather, it uses the interaction with the |
| 133 | + consumer to trigger sending the newest BlockItems downstream. |
| 134 | +3) The shared data structure will need to be concurrent but, after the initial write operation, all subsequent reads |
| 135 | + should not require synchronization. |
| 136 | +4) The shared data structure will decouple the `ProducerBlockItemObserver` from the `ConsumerBlockItemObserver`s |
| 137 | + allowing them to operate independently and not accrue the same latency issues as Approach #1. |
| 138 | +5) No additional third-party libraries are required to implement this approach. |
| 139 | + |
| 140 | +Drawbacks: |
| 141 | +1) With this approach, BlockItems sent to the consumer are driven exclusively by the downstream consumer |
| 142 | + BlockItemResponses. Given, the latency of a network request/response round-trip, this approach will likely be far |
| 143 | + too slow to be considered effective even when sending a batch of all the latest BlockItems. |
| 144 | + |
| 145 | +### Approach 4: Shared data structure between producer and consumer services. Leveraging the LMAX Disruptor library to manage inter-process pub/sub message-passing between producer and consumers via RingBuffer. |
| 146 | + |
| 147 | +The LMAX Disruptor library is a high-performance inter-process pub/sub message passing library that could be used to |
| 148 | +efficiently pass BlockItems between a `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. The Disruptor |
| 149 | +library is designed to minimize latency as well as CPU cycles to by not blocking while maintaining concurrency |
| 150 | +guarantees. |
| 151 | + |
| 152 | +Advantages: |
| 153 | +1) The Disruptor library is designed to minimize the latency of passing BlockItem messages between a |
| 154 | + `ProducerBlockItemObserver` and `ConsumerBlockItemObserver`s. |
| 155 | +2) The Disruptor library is designed to minimize the CPU resources used by the `ProducerBlockItemObserver` and |
| 156 | + `ConsumerBlockItemObserver`s. |
| 157 | +3) The Disruptor library does not require any additional transient dependencies. |
| 158 | +4) Fixes to the Disruptor library are actively maintained and updated by the LMAX team. |
| 159 | + |
| 160 | +Drawbacks: |
| 161 | +1) The Disruptor library is a third-party library requiring ramp-up time and integration effort to use it correctly and |
| 162 | + effectively. |
| 163 | +2) Leveraging the Disruptor library requires the communication between the `ProducerBlockItemObserver` and |
| 164 | + `ConsumerBlockItemObserver`s to be affiliated by subscribing/unsubscribing the downstream consumers to receive the |
| 165 | + latest BlockItems from the producer via the Disruptor RingBuffer. The process of managing these subscriptions to |
| 166 | + the RingBuffer can be complex. |
| 167 | + |
| 168 | +--- |
| 169 | + |
| 170 | +## Design |
| 171 | + |
| 172 | +Given the goals and the proposed approaches, Approach #4 has significant advantages and fewer significant drawbacks. |
| 173 | +Using the LMAX Disruptor offers low latency and CPU consumption via a well-maintained and tested API. The RingBuffer |
| 174 | +intermediate data structure should serve to decouple the producer bidirectional stream from the consumer bidirectional |
| 175 | +streams. Please see the following Entities section and Diagrams for a visual representation of the design. |
| 176 | + |
| 177 | +### Producer Registration Flow |
| 178 | + |
| 179 | +At boot time, the `BlockItemStreamService` will initialize the `StreamMediator` with the LMAX Disruptor RingBuffer. |
| 180 | + |
| 181 | +When a producer calls the `publishBlockStream` gRPC method, the `BlockItemStreamService` will create a new |
| 182 | +`ProducerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the |
| 183 | +upstream producer. The `ProducerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to |
| 184 | +the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the producer. |
| 185 | +See the Producer Registration Flow diagram for more details. |
| 186 | + |
| 187 | +### Consumer Registration Flow |
| 188 | + |
| 189 | +When a consumer calls the `subscribeBlockStream` gRPC method, the `BlockItemStreamService` will create a new |
| 190 | +`ConsumerBlockItemObserver` instance for Helidon to invoke during the lifecycle of the bidirectional connection to the |
| 191 | +downstream consumer. The `ConsumerBlockItemObserver` is constructed with a reference to the `StreamMediator` and to |
| 192 | +the `ResponseStreamObserver` managed by Helidon for transmitting BlockItemResponses to the downstream consumer. The |
| 193 | +`BlockItemStreamService` will also subscribe the `ConsumerBlockItemObserver` to the `StreamMediator` to receive the |
| 194 | +streaming BlockItems from the producer. |
| 195 | + |
| 196 | + |
| 197 | +### Runtime Streaming |
| 198 | + |
| 199 | +At runtime, the `ProducerBlockItemObserver` will receive the latest BlockItem from the producer via Helidon and will |
| 200 | +invoke publishEvent(BlockItem) on the `StreamMediator` to write the BlockItem to the RingBuffer. The |
| 201 | +`ProducerBlockItemObserver` will then persist the BlockItem and return a BlockItemResponse to the producer via |
| 202 | +its reference to `ResponseStreamObserver`. |
| 203 | + |
| 204 | +Asynchronously, the RingBuffer will invoke the onEvent(BlockItem) method of all the subscribed |
| 205 | +`ConsumerBlockItemObserver`s passing them the latest BlockItem. The `ConsumerBlockItemObserver` will then transmit |
| 206 | +the BlockItem downstream to the consumer via its reference to the `ResponseStreamObserver`. Downstream consumers will |
| 207 | +respond with a BlockItemResponse. Helidon will call the onNext() method of the `ConsumerBlockItemObserver` with the |
| 208 | +BlockItemResponse. |
| 209 | + |
| 210 | +BlockItems sent to the `ConsumerBlockItemObserver` via the RingBuffer and BlockItemResponses passed by Helidon from |
| 211 | +the downstream consumer are used to refresh internal timeouts maintained by the `ConsumerBlockItemObserver`. If a |
| 212 | +configurable timeout threshold is exceeded, the `ConsumerBlockItemObserver` will unsubscribe itself from the |
| 213 | +`StreamMediator`. This mechanism is necessary because producers and consumers may not send HTTP/2 `End Stream` DATA |
| 214 | +frames to terminate their bidirectional connection. Moreover, Helidon does not throw an exception back up to |
| 215 | +`ConsumerBlockItemObserver` when the downstream consumer disconnects. Internal timeouts ensure objects are not |
| 216 | +permanently subscribed to the `StreamMediator`. |
| 217 | + |
| 218 | +### Entities |
| 219 | + |
| 220 | +**BlockItemStreamService** - The BlockItemStreamService is a custom implementation of the Helidon gRPC GrpcService. |
| 221 | +It is responsible for initializing the StreamMediator and instantiating ProducerBlockItemObserver and |
| 222 | +ConsumerBlockItemObserver instances on-demand when the gRPC API is called by producers and consumers. It is |
| 223 | +the primary binding between the Helidon routing mechanisms and the `hedera-block-node` custom business logic. |
| 224 | + |
| 225 | +**StreamObserver** - StreamObserver is the main interface through which Helidon 4.x.x invokes custom business logic |
| 226 | +to receive and transmit bidirectional BlockItem streams at runtime. |
| 227 | + |
| 228 | +**ProducerBlockItemObserver** - A custom implementation of StreamObserver invoked by Helidon at runtime which is |
| 229 | +responsible for: |
| 230 | +1) Receiving the latest BlockItem from the producer (e.g. Consensus Node). |
| 231 | +2) Returning a response to the producer. |
| 232 | + |
| 233 | +**StreamMediator** - StreamMediator is an implementation of the [Mediator Pattern](https://en.wikipedia.org/wiki/Mediator_pattern) |
| 234 | +encapsulating the communication and interaction between the producer (ProducerBlockItemObserver) and N consumers |
| 235 | +(ConsumerBlockItemObserver) using the RingBuffer of the Disruptor library. It manages the 1-to-N relationship between |
| 236 | +the producer and consumers. |
| 237 | + |
| 238 | +**RingBuffer** - A shared data structure between the producer and consumers that temporarily stores inbound BlockItems. |
| 239 | +The RingBuffer is a fixed-sized array of ConsumerBlockItemObservers that is managed by the Disruptor library. |
| 240 | + |
| 241 | +**EventHandler** - The EventHandler is an integration interface provided by the Disruptor library as a mechanism to |
| 242 | +invoke callback logic when a new BlockItem is written to the RingBuffer. The EventHandler is responsible for passing |
| 243 | +the latest BlockItem to the ConsumerBlockItemObserver when it is available in the RingBuffer. |
| 244 | + |
| 245 | +**ConsumerBlockItemObserver** - A custom implementation of StreamObserver called by Helidon which is responsible for: |
| 246 | +1) Receiving the latest response from the downstream consumer. |
| 247 | +2) Receiving the latest BlockItem from the RingBuffer. |
| 248 | +3) Sending the latest BlockItem to the downstream consumer. |
| 249 | + |
| 250 | +**BlockPersistenceHandler** - The BlockPersistenceHandler is responsible for writing the latest BlockItem to disk. |
| 251 | + |
| 252 | +--- |
| 253 | +## Diagrams |
| 254 | + |
| 255 | + |
| 256 | +### Producer Registration Flow |
| 257 | + |
| 258 | + |
| 259 | + |
| 260 | + |
| 261 | +### Consumer Registration Flow |
| 262 | + |
| 263 | + |
| 264 | + |
| 265 | + |
| 266 | +### Class Diagram of all Entities and their Relationships |
| 267 | + |
| 268 | + |
| 269 | + |
| 270 | +### Runtime Stream of BlockItems from Producer to Consumers |
| 271 | + |
| 272 | + |
| 273 | + |
| 274 | + |
| 275 | +--- |
| 276 | + |
0 commit comments