WIP: Feature: Add S3 Storage Support for Stream Queues#1176
WIP: Feature: Add S3 Storage Support for Stream Queues#1176viktorerlingsson wants to merge 7 commits intomainfrom
Conversation
lukas8219
left a comment
There was a problem hiding this comment.
just small comments since i haven't been able to ran it locally yet
0aa3ef4 to
2cd6c20
Compare
69aa1a6 to
bb6bfa7
Compare
|
Found this: https://github.com/ysbaddaden/xml.cr |
spuun
left a comment
There was a problem hiding this comment.
Dunno if it would be better or not, but maybe it's good to have global download_segments and monitor_downloads fibers. Not for a first version of s3 queue thought.
635c59a to
2075eb8
Compare
2075eb8 to
32e9257
Compare
58e3024 to
6f2c075
Compare
38cddba to
52641c1
Compare
S3SegmentCache: - Replace 3 polling fibers with fixed worker pool + coordinator - Use Channel-based work distribution instead of spawning ad-hoc fibers - Add bounded retries (MAX_ATTEMPTS_PER_SEGMENT = 3) - Protect shared state with mutex - Add graceful shutdown via close method S3StorageClient: - Remove infinite recursion in download methods (callers handle retries) - Fix upload_file_to_s3 to use proper loop instead of recursion - Add exponential backoff to s3_segments_from_bucket retries - Better error handling and HTTP status code checks S3MessageStore: - Fix HTTP client leaks (close clients after each use) - Add mutex synchronization for concurrent segment array access - Add close method to shut down segment cache - Fix missing return in find_offset_in_segments recursive call Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- S3SegmentCache: Fix DownloadAttempt.copy_with not mutating array (record is immutable, need to replace array with mapped values) - S3StorageClient: Close HTTP client in delete_from_s3 method - S3MessageStore: Add 30s timeout to next_segment to prevent infinite wait if segment cannot be downloaded from S3 Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix S3 list objects API path (add leading slash) - Delete segment_msg_count when removing empty segments - Check segment-specific message count in produce_metadata - Call reload_logger after parsing all config sources - Add S3 storage config example to lavinmq.ini Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Refactor S3MessageStore to reuse StreamMessageStore base init instead of duplicating segment loading, metadata parsing, and file registration - Extract segment download into overridable `download_segment` method on StreamMessageStore, letting S3 subclass transparently fetch from cache or direct download - Rewrite S3SegmentCache fiber lifecycle: idle shutdown with automatic restart on new consumers, channel-based coordination replacing busy-wait polling - Fix segment cache eviction order to remove already-consumed segments before speculative prefetch segments - Fix UInt32 subtraction overflow in prefetch distance calculation - Reconnect HTTP client on error during S3 bucket listing - Add specs for pagination, concurrent consumers, segment cache prefetching, download failure recovery, and max-length enforcement
52641c1 to
de3a312
Compare
PR ReviewBug:
|
Work in progress!
This PR introduces S3 storage support for LavinMQ stream queues, enabling efficient long-term storage and reduced local disk usage for stream data. This feature enables LavinMQ to handle very large streams by offloading historical data to S3 while maintaining high performance for active consumers through intelligent local caching.
Throughput is lower than with regular streams, but in best-case-scenarios it should be pretty close.
WHAT is this pull request doing?
Summary
with S3 backend storage
needed by consumers
optimize performance
Key Features
Dependencies
How to test
Add relevant config to your config file (or cmd args)
Example config:
Then just create a stream and start publishing/consuming.
TO-DO's / Known issues
Implements #1070
HOW can this pull request be tested?
There are some specs, but not for every scenario. Manual testing is still needed.