feat: Create new stream subscriber plugin#1058
Merged
Conversation
28d6262 to
c56d814
Compare
georgi-l95
reviewed
Apr 25, 2025
Member
georgi-l95
left a comment
There was a problem hiding this comment.
Also we'd need to replace in block-node/app/build.gradle.kts:
runtimeOnly("org.hiero.block.node.subscriber")
with:
runtimeOnly("org.hiero.block.node.stream.subscriber")
Looking good, done some manual testing, which will be transformed into E2E tests. Those included:
- Live streaming - Send request with -1 start block to -1 end block. ✅
- Historic streaming - Send request with 0 start block to 10 end block. ✅
- Historic -> Live -> Historic -> Live streaming - Send request with 0 start block to -1 end block. We started receiving blocks, then (with some modifications on simulator side) waited couple of seconds to get behind, then the plugin switched us to historic and when we caught up, we switched to live. ✅
Things left to try:
- Live streaming on two or more consumers
- Historic streaming on two or more consumers
- Historic -> Live -> Historic -> Live streaming on two or more consumers
...subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberServicePlugin.java
Outdated
Show resolved
Hide resolved
...subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberServicePlugin.java
Outdated
Show resolved
Hide resolved
a4668ac to
64819d4
Compare
* Instead of beating on the old implementation, implement a new subscriber plugin
that uses a single client thread and feeds that alternately from messaging or history
This avoids the countdown latch and state transition logic.
Also avoids a state machine, just take the next block from one source or the other,
always getting just the next block to send the client.
Does require a secondary queue from the messaging, but it's small and simple.
* Adjust the subscriber "method" class to be reused, so we track sessions properly
* Added a latch so the pipeline thread holds until the session is registered with
messaging before returning. This has minimal impact in production, and makes
it possible to test reliably.
* Added live block push to queue, live blocks stream correctly.
* Added handling for request ahead of live condition
* Session just skips over live blocks until live reaches the start block.
* Added proper shutdown of all sessions if the plugin is stopped.
* History and Live both work, and the test disables history to prove it
* This is necessary because otherwise the test "history" plugin is too fast and we always serve history.
* TODO:
* Add more unit tests
* Add testing that forces a client to fall behind and catch up
* Add some end-to-end test suites to exercise this plugin better
* Add more and better javadoc
* Clean up the (excessive) trace logging
* Add better metrics, the "transition" metrics aren't super helpful.
* Suggested metrics
* Live batches sent to client (would be good if we can tag this with a client ID)
* History blocks sent to client (would be good if we can tag this with a client ID)
* Count of times the client thread must wait for live blocks, and time spent waiting
* Count of times the client cannot read from history, but subsequently reads from live
* Client connection and time from connect to session established
Signed-off-by: Joseph Sinclair <121976561+jsync-swirlds@users.noreply.github.com>
64819d4 to
393aa2f
Compare
georgi-l95
approved these changes
Apr 25, 2025
ata-nas
approved these changes
Apr 25, 2025
Contributor
ata-nas
left a comment
There was a problem hiding this comment.
@jsync-swirlds looks good! Looking forward to see the improvements done based on the todo. 🔥
...stream-subscriber/src/main/java/org/hiero/block/node/stream/subscriber/SubscriberConfig.java
Show resolved
Hide resolved
Nana-EC
reviewed
Apr 25, 2025
Contributor
Nana-EC
left a comment
There was a problem hiding this comment.
Nice work.
Made it a 1/3 of the way through and will circle back
...riber/src/main/java/org/hiero/block/node/stream/subscriber/BlockStreamSubscriberSession.java
Show resolved
Hide resolved
...riber/src/main/java/org/hiero/block/node/stream/subscriber/BlockStreamSubscriberSession.java
Show resolved
Hide resolved
...riber/src/main/java/org/hiero/block/node/stream/subscriber/BlockStreamSubscriberSession.java
Show resolved
Hide resolved
AlfredoG87
reviewed
Apr 25, 2025
...riber/src/main/java/org/hiero/block/node/stream/subscriber/BlockStreamSubscriberSession.java
Show resolved
Hide resolved
Contributor
|
I want to go through in detail but don't want to hold up as I will not get a chance till after devcon. So can merge and I will review later. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Reviewer Notes
Adding a new plugin
that uses a single client thread and feeds that alternately from messaging or history
This avoids the countdown latch and state transition logic.
Also avoids a state machine, just take the next block from one source or the other,
always getting just the next block to send the client.
Does require a secondary queue from the messaging, but it's small and simple.
messaging before returning. This has minimal impact in production, and makes
it possible to test reliably.
and we always serve history.
TODO
Related Issue(s)
Resolves: #1051