diff --git a/extras/lavinmq.ini b/extras/lavinmq.ini
index 3384a578a6..52dd8a6674 100644
--- a/extras/lavinmq.ini
+++ b/extras/lavinmq.ini
@@ -28,3 +28,11 @@ bind = ::
;port = 5679
;advertised_uri = tcp://hostname.local:5679
;etcd_endpoints = localhost:2379
+
+[blob-storage]
+;region = eu-north-1
+; Use virtual-hosted-style URL with bucket in hostname:
+;endpoint = https://my-bucket.s3.eu-north-1.amazonaws.com
+;access_key_id = AKIAIOSFODNN7EXAMPLE
+;secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
+;local_segments_per_stream = 50
diff --git a/shard.lock b/shard.lock
index 86a3518309..90e23d1c94 100644
--- a/shard.lock
+++ b/shard.lock
@@ -12,6 +12,10 @@ shards:
git: https://github.com/cloudamqp/amqp-client.cr.git
version: 1.3.2
+ awscr-signer:
+ git: https://github.com/taylorfinnell/awscr-signer.git
+ version: 0.9.0
+
lz4:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d
diff --git a/shard.yml b/shard.yml
index a5fee08e90..d0a178a692 100644
--- a/shard.yml
+++ b/shard.yml
@@ -34,6 +34,8 @@ dependencies:
github: 84codes/lz4.cr
mqtt-protocol:
github: 84codes/mqtt-protocol.cr
+ awscr-signer:
+ github: taylorfinnell/awscr-signer
development_dependencies:
ameba:
diff --git a/spec/blob_stream_spec.cr b/spec/blob_stream_spec.cr
new file mode 100644
index 0000000000..7aa08207e2
--- /dev/null
+++ b/spec/blob_stream_spec.cr
@@ -0,0 +1,715 @@
+require "./spec_helper"
+require "./support/blob_server"
+require "./../src/lavinmq/amqp/stream/blob_message_store"
+
+DATA_DIR = "42099b4af021e53fd8fd4e056c2568d7c2e3ffa8/77d9712623c4368721b466d1c24d447e9c53c8d3"
+
+module BlobSpecHelper
+ class_property blob_server : MinimalBlobServer?
+
+ def self.meta_bytes(offset)
+ io = IO::Memory.new
+ io.write_bytes 100_u32 # count
+ io.write_bytes offset # first offset
+ io.write_bytes offset # first timestamp
+ io.write_bytes offset # last timestamp
+ io.rewind
+ io.getb_to_end
+ end
+
+ CATALOG_RECORD_SIZE = 32
+
+ def self.catalog_bytes(entries : Array(Tuple(UInt32, UInt32, Int64, Int64, Int64)))
+ io = IO::Memory.new
+ entries.each do |seg_id, msg_count, first_offset, first_ts, last_ts|
+ io.write_bytes seg_id
+ io.write_bytes msg_count
+ io.write_bytes first_offset
+ io.write_bytes first_ts
+ io.write_bytes last_ts
+ end
+ io.rewind
+ io.getb_to_end
+ end
+
+ def self.segment_bytes
+ io = IO::Memory.new
+ io.write_bytes 4 # schema version
+ 100.times do
+ io.write_bytes LavinMQ::Message.new("ex", "rk", "body")
+ end
+ io.rewind
+ io.getb_to_end
+ end
+
+ def self.setup_remote_with_files
+ if server = blob_server
+ server.clear
+
+ # Add segment and meta files to remote storage
+ server.put("#{DATA_DIR}/msgs.0000000001", segment_bytes())
+ server.put("#{DATA_DIR}/msgs.0000000002", segment_bytes())
+ server.put("#{DATA_DIR}/meta.0000000001", meta_bytes(0_i64))
+ server.put("#{DATA_DIR}/meta.0000000002", meta_bytes(100_i64))
+ else
+ fail("No blob server")
+ end
+ end
+end
+
+describe LavinMQ::AMQP::Stream::BlobMessageStore do
+ Spec.before_suite do
+ # Start blob server once for all tests
+ blob_server = MinimalBlobServer.new
+ blob_server.start
+ BlobSpecHelper.blob_server = blob_server
+ end
+
+ Spec.after_suite do
+ # Stop blob server after all tests
+ BlobSpecHelper.blob_server.try(&.stop)
+ end
+
+ before_each do
+ # Clear blob storage before each test
+ BlobSpecHelper.blob_server.try(&.clear)
+
+ # Configure LavinMQ to use our test blob server
+ if server = BlobSpecHelper.blob_server
+ LavinMQ::Config.instance.blob_storage_endpoint = "http://#{server.endpoint}"
+ LavinMQ::Config.instance.blob_storage_region = "us-east-1"
+ LavinMQ::Config.instance.blob_storage_access_key_id = "test_access_key"
+ LavinMQ::Config.instance.blob_storage_secret_access_key = "test_secret_key"
+ else
+ fail("No blob server")
+ end
+ end
+
+ after_each do
+ FileUtils.rm_rf("/tmp/lavinmq-spec/#{DATA_DIR}")
+ LavinMQ::Config.instance.blob_storage_endpoint = nil
+ end
+
+ it "should get file list from remote storage" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ BlobSpecHelper.setup_remote_with_files
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+
+ msg_store.@remote_segments.size.should eq 2
+ msg_store.@segments.size.should eq 2
+ msg_store.@size.should eq 200
+ end
+
+ it "should read & upload local file" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ File.write(File.join(msg_dir, "msgs.0000000003"), BlobSpecHelper.segment_bytes)
+ BlobSpecHelper.setup_remote_with_files
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+
+ # Local-only segment is uploaded asynchronously by upload workers
+ wait_for { msg_store.@remote_segments.size == 3 }
+ msg_store.@segments.size.should eq 3
+ msg_store.@size.should eq 300
+ end
+
+ # Publish a message and verify it can be consumed with blob storage
+ it "can publish and consume with blob storage" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "test_queue"
+ ch.prefetch 1
+ q_args = LavinMQ::AMQP::Table.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+
+ # Publish a message
+ ch.basic_publish("test body", "", q_name)
+
+ # Consume the message
+ channel = Channel(String).new
+ q.subscribe(no_ack: false) do |msg|
+ channel.send msg.body_io.to_s
+ ch.basic_ack(msg.delivery_tag)
+ end
+ channel.receive.should eq "test body"
+ end
+ end
+ end
+
+ describe "no meta files" do
+ it "should verify local segments" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ File.write(File.join(msg_dir, "msgs.0000000001"), BlobSpecHelper.segment_bytes)
+ File.write(File.join(msg_dir, "msgs.0000000002"), BlobSpecHelper.segment_bytes)
+ digest = Digest::MD5.new
+ digest.update(File.open(File.join(msg_dir, "msgs.0000000001"), &.getb_to_end))
+ etag1 = digest.hexfinal
+ digest = Digest::MD5.new
+ digest.update(File.open(File.join(msg_dir, "msgs.0000000002"), &.getb_to_end))
+ etag2 = digest.hexfinal
+
+ # Add files to remote storage (no meta files, just segments)
+ if server = BlobSpecHelper.blob_server
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/msgs.0000000002", BlobSpecHelper.segment_bytes)
+ else
+ fail("No blob server")
+ end
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+
+ msg_store.@remote_segments.size.should eq 2
+ msg_store.@segments.size.should eq 3
+ msg_store.@size.should eq 200
+ digest = Digest::MD5.new
+ digest.update(File.open(File.join(msg_dir, "msgs.0000000001"), &.getb_to_end))
+ digest.hexfinal.should eq etag1
+ digest = Digest::MD5.new
+ digest.update(File.open(File.join(msg_dir, "msgs.0000000002"), &.getb_to_end))
+ digest.hexfinal.should eq etag2
+ end
+
+ it "should read local segments" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ File.write(File.join(msg_dir, "msgs.0000000001"), BlobSpecHelper.segment_bytes)
+ File.write(File.join(msg_dir, "msgs.0000000002"), BlobSpecHelper.segment_bytes)
+
+ # Remote storage has no files (empty)
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+
+ # Local-only segments are uploaded asynchronously by upload workers
+ wait_for { msg_store.@remote_segments.size == 2 }
+ msg_store.@segments.size.should eq 3
+ msg_store.@size.should eq 200
+ end
+
+ it "should download segments from remote storage" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ # Add segments to remote storage (no meta files)
+ if server = BlobSpecHelper.blob_server
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/msgs.0000000002", BlobSpecHelper.segment_bytes)
+ else
+ fail("No blob server")
+ end
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ msg_store.@remote_segments.size.should eq 2
+ msg_store.@segments.size.should eq 3
+ msg_store.@size.should eq 200
+ end
+ end
+
+ it "uploads sealed segments to remote storage on rotation" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-upload-test"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+
+ # Publish a segment-sized message to trigger rotation
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ q.publish_confirm data
+ # Second publish triggers open_new_segment which uploads the first
+ q.publish_confirm data
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ # Sealed segment should be uploaded to remote storage
+ wait_for { server.keys.count(&.matches?(/msgs\.\d{10}$/)) >= 1 }
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+
+ it "deletes remote segments when queue is deleted" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-delete-test"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ 2.times { q.publish_confirm data }
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ # Verify segments exist in remote storage before delete
+ wait_for { !server.keys.select(&.includes?("msgs.")).empty? }
+
+ ch.queue_delete(q_name)
+
+ # All segments for this queue should be gone from remote storage
+ queue_hash = Digest::SHA1.hexdigest(q_name)
+ remaining = server.keys.select(&.includes?(queue_hash))
+ remaining.should be_empty
+ end
+ end
+ end
+
+ it "drops oldest remote segments when max-length exceeded" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-maxlen-test"
+ args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream", "x-max-length": 1})
+ q = ch.queue(q_name, durable: true, args: args)
+
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ 4.times { q.publish_confirm data }
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ queue_hash = Digest::SHA1.hexdigest(q_name)
+
+ # Wait for uploads, then verify segments were dropped
+ # We published 4 segment-sized messages (4 segments sealed + 1 active)
+ # With max-length: 1, most segments should be dropped
+ q.message_count.should be <= 2
+ segment_keys = server.keys.select { |k| k.includes?(queue_hash) && k.matches?(/msgs\.\d{10}$/) }
+ segment_keys.size.should be <= 2
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+
+ it "consumes messages across segment boundaries" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-cross-segment"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+ ch.prefetch 1
+
+ # Publish enough to span 2 segments
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ q.publish_confirm data
+ q.publish_confirm "last message"
+
+ # Consume from the beginning
+ msgs = Channel(String).new(2)
+ q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
+ msgs.send msg.body_io.to_s
+ ch.basic_ack(msg.delivery_tag)
+ end
+
+ first = msgs.receive
+ first.bytesize.should eq LavinMQ::Config.instance.segment_size
+ msgs.receive.should eq "last message"
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+
+ it "should raise if not configured properly" do
+ LavinMQ::Config.instance.blob_storage_region = nil
+ LavinMQ::Config.instance.blob_storage_access_key_id = nil
+ LavinMQ::Config.instance.blob_storage_secret_access_key = nil
+
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ expect_raises(SpecExit) do
+ LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ end
+ end
+
+ describe "download failure handling" do
+ it "recovers from remote storage GET failure during segment cache download" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-fail-test"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+ ch.prefetch 1
+
+ # Publish enough to create 2 segments
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ q.publish_confirm data
+ q.publish_confirm "second segment msg"
+
+ # Make the first segment fail on next GET (simulates transient remote storage error)
+ server = BlobSpecHelper.blob_server.not_nil!
+ queue_hash = Digest::SHA1.hexdigest(q_name)
+ first_segment_key = server.keys.find { |k| k.includes?(queue_hash) && k.matches?(/msgs\.\d{10}$/) }
+ server.fail_keys.add(first_segment_key.not_nil!) if first_segment_key
+
+ # Consumer should still be able to read (direct download retries or fallback)
+ msgs = Channel(String).new(2)
+ q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
+ msgs.send msg.body_io.to_s
+ ch.basic_ack(msg.delivery_tag)
+ end
+
+ # Should eventually receive messages (fail_keys only fails once)
+ msg = msgs.receive
+ msg.bytesize.should eq LavinMQ::Config.instance.segment_size
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+ end
+
+ describe "pagination" do
+ it "lists all segments when remote storage response is paginated" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ # Set max keys to 3 to force pagination with just a few segments
+ server.max_list_keys = 3
+
+ # Add 4 segments with meta files (8 keys total, will need 3 pages)
+ 4.times do |i|
+ seg_id = (i + 1).to_s.rjust(10, '0')
+ server.put("#{DATA_DIR}/msgs.#{seg_id}", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/meta.#{seg_id}", BlobSpecHelper.meta_bytes((i * 100).to_i64))
+ end
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ # All 4 segments should be discovered despite pagination
+ msg_store.@remote_segments.size.should eq 4
+ msg_store.@size.should eq 400
+ ensure
+ server.try &.max_list_keys = 1000
+ end
+ end
+
+ describe "concurrent consumers" do
+ it "supports two consumers reading different offsets" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-concurrent-consumers"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+ ch.prefetch 1
+
+ # Publish messages across 2 segments
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ q.publish_confirm data
+ q.publish_confirm "msg2"
+
+ # Consumer 1: reads from first offset
+ msgs1 = Channel(String).new(2)
+ q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
+ msgs1.send msg.body_io.to_s
+ ch.basic_ack(msg.delivery_tag)
+ end
+
+ # Consumer 2 on a separate channel: reads from last offset
+ with_channel(s) do |ch2|
+ ch2.prefetch 1
+ q2 = ch2.queue(q_name, durable: true, args: q_args)
+ msgs2 = Channel(String).new(2)
+ q2.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "last"})) do |msg|
+ msgs2.send msg.body_io.to_s
+ ch2.basic_ack(msg.delivery_tag)
+ end
+
+ # Both consumers should receive messages
+ first_msg = msgs1.receive
+ first_msg.bytesize.should eq LavinMQ::Config.instance.segment_size
+ msgs2.receive.should eq "msg2"
+ end
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+ end
+
+ describe "last_offset after restart" do
+ it "computes correct last_offset when write segment is empty" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ BlobSpecHelper.setup_remote_with_files
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ # 2 remote segments with 100 msgs each: last_offset should be 199
+ # (segment 1: offsets 0..99, segment 2: offsets 100..199)
+ msg_store.last_offset.should eq 199
+ end
+
+ it "has correct segment_first_offset ordering after remote load" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ BlobSpecHelper.setup_remote_with_files
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ offsets = msg_store.@segment_first_offset
+ # Keys should be in ascending order
+ offsets.keys.should eq offsets.keys.sort!
+ end
+
+ it "consumes all messages from offset 0 with only remote segments" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ BlobSpecHelper.setup_remote_with_files
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+
+ offset, segment, pos = msg_store.find_offset(0)
+ offset.should eq 0
+ segment.should eq 1_u32
+ pos.should eq 4_u32
+ end
+ end
+
+ describe "segment cache" do
+ it "prefetches segments for consumers and cleans up after removal" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-cache-test"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+ ch.prefetch 1
+
+ # Publish enough data to create multiple segments
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ 3.times { q.publish_confirm data }
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ queue_hash = Digest::SHA1.hexdigest(q_name)
+ # Verify segments were uploaded to remote storage
+ wait_for { server.keys.count { |k| k.includes?(queue_hash) && k.matches?(/msgs\.\d{10}$/) } >= 2 }
+
+ # Start a consumer to trigger prefetching
+ msgs = Channel(String).new(4)
+ q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
+ msgs.send msg.body_io.to_s
+ ch.basic_ack(msg.delivery_tag)
+ end
+
+ # Consume at least one message to confirm cache is working
+ msgs.receive
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+ end
+
+ describe "segment catalog" do
+ it "loads stats from catalog instead of individual meta files" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ # Add segments to remote storage (no meta files)
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/msgs.0000000002", BlobSpecHelper.segment_bytes)
+
+ # Add catalog with metadata for both segments
+ catalog = BlobSpecHelper.catalog_bytes([
+ {1_u32, 100_u32, 0_i64, 0_i64, 0_i64},
+ {2_u32, 100_u32, 100_i64, 100_i64, 100_i64},
+ ])
+ server.put("#{DATA_DIR}/segments.catalog", catalog)
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ msg_store.@remote_segments.size.should eq 2
+ msg_store.@size.should eq 200
+ msg_store.last_offset.should eq 199
+
+ # Catalog should have been downloaded locally
+ File.exists?(File.join(msg_dir, "segments.catalog")).should be_true
+ end
+
+ it "falls back to meta files when catalog is missing from remote storage" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+ BlobSpecHelper.setup_remote_with_files # segments + meta, no catalog
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ msg_store.@remote_segments.size.should eq 2
+ msg_store.@size.should eq 200
+ msg_store.last_offset.should eq 199
+ end
+
+ it "handles stale catalog with extra entries for deleted segments" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ # Only 1 segment in remote storage
+ server.put("#{DATA_DIR}/msgs.0000000002", BlobSpecHelper.segment_bytes)
+
+ # Catalog has entries for 2 segments (segment 1 was deleted)
+ catalog = BlobSpecHelper.catalog_bytes([
+ {1_u32, 100_u32, 0_i64, 0_i64, 0_i64},
+ {2_u32, 100_u32, 100_i64, 100_i64, 100_i64},
+ ])
+ server.put("#{DATA_DIR}/segments.catalog", catalog)
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ # Only segment 2 should be counted (segment 1 not in remote storage LIST)
+ msg_store.@remote_segments.size.should eq 1
+ msg_store.@size.should eq 100
+ end
+
+ it "falls back to meta download for segments missing from catalog" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ # 2 segments in remote storage, with meta files
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/msgs.0000000002", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/meta.0000000001", BlobSpecHelper.meta_bytes(0_i64))
+ server.put("#{DATA_DIR}/meta.0000000002", BlobSpecHelper.meta_bytes(100_i64))
+
+ # Catalog only has segment 1 (segment 2 was uploaded after last catalog sync)
+ catalog = BlobSpecHelper.catalog_bytes([
+ {1_u32, 100_u32, 0_i64, 0_i64, 0_i64},
+ ])
+ server.put("#{DATA_DIR}/segments.catalog", catalog)
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ msg_store.@remote_segments.size.should eq 2
+ msg_store.@size.should eq 200
+ msg_store.last_offset.should eq 199
+ end
+
+ it "handles duplicate seg_ids in catalog (last entry wins)" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+
+ # Catalog has two entries for segment 1 — second should win
+ catalog = BlobSpecHelper.catalog_bytes([
+ {1_u32, 50_u32, 0_i64, 0_i64, 0_i64}, # old entry
+ {1_u32, 100_u32, 0_i64, 0_i64, 0_i64}, # corrected entry
+ ])
+ server.put("#{DATA_DIR}/segments.catalog", catalog)
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ msg_store.@size.should eq 100
+ end
+
+ it "truncates catalog with trailing partial record" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+
+ # Valid catalog + 5 junk bytes
+ valid = BlobSpecHelper.catalog_bytes([
+ {1_u32, 100_u32, 0_i64, 0_i64, 0_i64},
+ ])
+ corrupted = Bytes.new(valid.size + 5)
+ corrupted.copy_from(valid)
+ server.put("#{DATA_DIR}/segments.catalog", corrupted)
+
+ msg_store = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ msg_store.@size.should eq 100
+
+ # Local catalog should have been truncated to valid size
+ File.size(File.join(msg_dir, "segments.catalog")).should eq BlobSpecHelper::CATALOG_RECORD_SIZE
+ end
+
+ it "appends to catalog on segment upload" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-catalog-append"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+
+ # Publish enough to trigger segment rotation + upload
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ q.publish_confirm data
+ q.publish_confirm data
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ queue_hash = Digest::SHA1.hexdigest(q_name)
+
+ # Wait for segment upload
+ wait_for { server.keys.count(&.matches?(/msgs\.\d{10}$/)) >= 1 }
+
+ # Find local catalog via glob and wait for it to have content
+ catalog_files = Dir.glob("/tmp/lavinmq-spec/**/#{queue_hash}/segments.catalog")
+ catalog_files.size.should eq 1
+ catalog_path = catalog_files.first
+ wait_for { File.size(catalog_path) >= BlobSpecHelper::CATALOG_RECORD_SIZE }
+ (File.size(catalog_path) % BlobSpecHelper::CATALOG_RECORD_SIZE).should eq 0
+
+ ch.queue_delete(q_name)
+ end
+ end
+ end
+
+ it "deletes catalog from remote storage when queue is deleted" do
+ with_amqp_server do |s|
+ with_channel(s) do |ch|
+ q_name = "blob-catalog-delete"
+ q_args = AMQP::Client::Arguments.new({"x-queue-type": "blob-stream"})
+ q = ch.queue(q_name, durable: true, args: q_args)
+
+ data = Bytes.new(LavinMQ::Config.instance.segment_size)
+ 2.times { q.publish_confirm data }
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ queue_hash = Digest::SHA1.hexdigest(q_name)
+ wait_for { !server.keys.select(&.includes?("msgs.")).empty? }
+
+ ch.queue_delete(q_name)
+
+ # All objects for this queue should be gone, including catalog
+ remaining = server.keys.select(&.includes?(queue_hash))
+ remaining.should be_empty
+ end
+ end
+ end
+
+ it "uses local catalog on restart without re-downloading" do
+ msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
+ FileUtils.rm_rf(msg_dir)
+ Dir.mkdir_p(msg_dir)
+
+ server = BlobSpecHelper.blob_server.not_nil!
+ server.put("#{DATA_DIR}/msgs.0000000001", BlobSpecHelper.segment_bytes)
+ server.put("#{DATA_DIR}/msgs.0000000002", BlobSpecHelper.segment_bytes)
+
+ catalog = BlobSpecHelper.catalog_bytes([
+ {1_u32, 100_u32, 0_i64, 0_i64, 0_i64},
+ {2_u32, 100_u32, 100_i64, 100_i64, 100_i64},
+ ])
+ server.put("#{DATA_DIR}/segments.catalog", catalog)
+
+ # First init downloads catalog
+ store1 = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ store1.close
+
+ # Remove catalog from remote storage — local copy should suffice
+ server.delete("#{DATA_DIR}/segments.catalog")
+
+ # Keep local files but re-init
+ store2 = LavinMQ::AMQP::Stream::BlobMessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
+ store2.@remote_segments.size.should eq 2
+ store2.@size.should eq 200
+ end
+ end
+end
diff --git a/spec/config_spec.cr b/spec/config_spec.cr
index 6a44248df8..7dbf611b77 100644
--- a/spec/config_spec.cr
+++ b/spec/config_spec.cr
@@ -163,6 +163,13 @@ describe LavinMQ::Config do
advertised_uri = lavinmq://localhost:5680
on_leader_elected = echo "Leader elected"
on_leader_lost = echo "Leader lost"
+
+ [blob-storage]
+ region = us-east-1
+ access_key_id = AKIAIOSFODNN7EXAMPLE
+ secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
+ endpoint = https://s3.example.com
+ local_segments_per_stream = 100
CONFIG
end
config = LavinMQ::Config.new
@@ -245,6 +252,13 @@ describe LavinMQ::Config do
config.clustering_advertised_uri.should eq "lavinmq://localhost:5680"
config.clustering_on_leader_elected.should eq "echo \"Leader elected\""
config.clustering_on_leader_lost.should eq "echo \"Leader lost\""
+
+ # S3 Storage section
+ config.blob_storage_region.should eq "us-east-1"
+ config.blob_storage_access_key_id.should eq "AKIAIOSFODNN7EXAMPLE"
+ config.blob_storage_secret_access_key.should eq "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
+ config.blob_storage_endpoint.should eq "https://s3.example.com"
+ config.blob_storage_local_segments_per_stream.should eq 100
ensure
# Reset log level to default for other specs
Log.setup(:fatal)
@@ -289,6 +303,11 @@ describe LavinMQ::Config do
"--clustering-etcd-prefix=cli-prefix",
"--clustering-max-unsynced-actions=4096",
"--clustering-port=5680",
+ "--blob-storage-region=eu-west-1",
+ "--blob-storage-access-key-id=AKIACLIEXAMPLE",
+ "--blob-storage-secret-access-key=cliSecretKey123",
+ "--blob-storage-endpoint=https://s3.cli.example.com",
+ "--blob-storage-local-segments=75",
]
config.parse(argv)
@@ -327,6 +346,11 @@ describe LavinMQ::Config do
config.clustering_etcd_prefix.should eq "cli-prefix"
config.clustering_max_unsynced_actions.should eq 4096
config.clustering_port.should eq 5680
+ config.blob_storage_region.should eq "eu-west-1"
+ config.blob_storage_access_key_id.should eq "AKIACLIEXAMPLE"
+ config.blob_storage_secret_access_key.should eq "cliSecretKey123"
+ config.blob_storage_endpoint.should eq "https://s3.cli.example.com"
+ config.blob_storage_local_segments_per_stream.should eq 75
end
it "can parse -d/--debug flag for verbose logging" do
diff --git a/spec/support/blob_server.cr b/spec/support/blob_server.cr
new file mode 100644
index 0000000000..8629821f15
--- /dev/null
+++ b/spec/support/blob_server.cr
@@ -0,0 +1,219 @@
+require "http/server"
+require "digest/md5"
+require "xml"
+
+# MinimalBlobServer provides an in-memory S3-compatible server for testing
+# Supports: ListObjectsV2 (with pagination), GetObject, PutObject, DeleteObject
+class MinimalBlobServer
+ @storage = Hash(String, Bytes).new
+ @server : HTTP::Server?
+ @port : Int32
+ # Keys that should return 500 on GET (for testing download failures)
+ property fail_keys = Set(String).new
+ # Max keys per ListObjectsV2 response (default: 1000, lower for pagination tests)
+ property max_list_keys : Int32 = 1000
+
+ def initialize(@port = 0)
+ end
+
+ def start
+ server = HTTP::Server.new do |context|
+ begin
+ handle_request(context)
+ rescue ex
+ context.response.status = HTTP::Status::INTERNAL_SERVER_ERROR
+ context.response.print "Internal server error: #{ex.message}"
+ end
+ end
+
+ # Bind to localhost
+ address = server.bind_tcp "127.0.0.1", @port
+ @port = address.port
+ @server = server
+
+ # Start server in background
+ spawn do
+ server.listen
+ end
+
+ # Give server time to start
+ sleep 100.milliseconds
+ end
+
+ def stop
+ @server.try(&.close)
+ end
+
+ def port : Int32
+ @port
+ end
+
+ def endpoint : String
+ "127.0.0.1:#{@port}"
+ end
+
+ def clear
+ @storage.clear
+ @fail_keys.clear
+ end
+
+ def put(key : String, data : Bytes)
+ @storage[key] = data
+ end
+
+ def delete(key : String)
+ @storage.delete(key)
+ end
+
+ def keys : Array(String)
+ @storage.keys
+ end
+
+ private def handle_request(context : HTTP::Server::Context)
+ request = context.request
+ response = context.response
+
+ case request.method
+ when "GET"
+ handle_get(request, response)
+ when "PUT"
+ handle_put(request, response)
+ when "POST"
+ handle_post(request, response)
+ when "DELETE"
+ handle_delete(request, response)
+ else
+ response.status = HTTP::Status::METHOD_NOT_ALLOWED
+ response.print "Method not allowed"
+ end
+ end
+
+ private def handle_get(request : HTTP::Request, response : HTTP::Server::Response)
+ query = request.query_params
+
+ # ListObjectsV2
+ if query["list-type"]? == "2"
+ handle_list_objects(request, response, query)
+ else
+ # GetObject
+ handle_get_object(request, response)
+ end
+ end
+
+ private def handle_list_objects(request : HTTP::Request, response : HTTP::Server::Response, query : URI::Params)
+ prefix = query["prefix"]? || ""
+ continuation_token = query["continuation-token"]?
+
+ # Filter and sort keys by prefix
+ matching_keys = @storage.keys.select(&.starts_with?(prefix)).sort!
+
+ # Handle continuation token (skip keys up to and including the token)
+ if continuation_token
+ start_idx = matching_keys.index { |k| k > continuation_token } || matching_keys.size
+ matching_keys = matching_keys[start_idx..]
+ end
+
+ # Paginate
+ is_truncated = matching_keys.size > @max_list_keys
+ page_keys = matching_keys[0, @max_list_keys]
+
+ # Build XML response
+ xml_resp = XML.build(indent: " ") do |xml|
+ xml.element("ListBucketResult", xmlns: "http://s3.amazonaws.com/doc/2006-03-01/") do
+ xml.element("IsTruncated") { xml.text is_truncated.to_s }
+ if is_truncated
+ xml.element("NextContinuationToken") { xml.text page_keys.last }
+ end
+ page_keys.each do |key|
+ data = @storage[key]
+ etag = calculate_etag(data)
+
+ xml.element("Contents") do
+ xml.element("Key") { xml.text key }
+ xml.element("ETag") { xml.text %("#{etag}") }
+ xml.element("Size") { xml.text data.size.to_s }
+ end
+ end
+ end
+ end
+
+ response.status = HTTP::Status::OK
+ response.headers["Content-Type"] = "application/xml"
+ response.headers["Content-Length"] = xml_resp.bytesize.to_s
+ response.print xml_resp
+ end
+
+ private def handle_get_object(request : HTTP::Request, response : HTTP::Server::Response)
+ key = request.path.lstrip('/')
+
+ if @fail_keys.includes?(key)
+ @fail_keys.delete(key) # Fail once then recover
+ response.status = HTTP::Status::INTERNAL_SERVER_ERROR
+ response.print "Simulated failure"
+ return
+ end
+
+ if data = @storage[key]?
+ etag = calculate_etag(data)
+ response.status = HTTP::Status::OK
+ response.headers["Content-Length"] = data.size.to_s
+ response.headers["ETag"] = %("#{etag}")
+ response.write(data)
+ else
+ response.status = HTTP::Status::NOT_FOUND
+ response.print "Not Found"
+ end
+ end
+
+ private def handle_put(request : HTTP::Request, response : HTTP::Server::Response)
+ key = request.path.lstrip('/')
+
+ # Read body
+ body = Bytes.new(0)
+ if content_length = request.headers["Content-Length"]?.try(&.to_i?)
+ body = Bytes.new(content_length)
+ request.body.try(&.read_fully(body))
+ elsif request_body = request.body
+ io = IO::Memory.new
+ IO.copy(request_body, io)
+ body = io.to_slice
+ end
+
+ @storage[key] = body
+ etag = calculate_etag(body)
+
+ response.status = HTTP::Status::OK
+ response.headers["ETag"] = %("#{etag}")
+ response.print ""
+ end
+
+ private def handle_post(request : HTTP::Request, response : HTTP::Server::Response)
+ query = request.query_params
+ if query["delete"]? != nil
+ # Multi-object delete
+ body = request.body.try(&.gets_to_end) || ""
+ doc = XML.parse(body)
+ doc.xpath_nodes("//*[local-name()='Key']").each do |node|
+ @storage.delete(node.content)
+ end
+ response.status = HTTP::Status::OK
+ response.headers["Content-Type"] = "application/xml"
+ response.print ""
+ else
+ response.status = HTTP::Status::BAD_REQUEST
+ response.print "Unsupported POST"
+ end
+ end
+
+ private def handle_delete(request : HTTP::Request, response : HTTP::Server::Response)
+ key = request.path.lstrip('/')
+ @storage.delete(key)
+
+ response.status = HTTP::Status::NO_CONTENT
+ response.print ""
+ end
+
+ private def calculate_etag(data : Bytes) : String
+ Digest::MD5.hexdigest(data)
+ end
+end
diff --git a/src/lavinmq/amqp/stream/blob_message_store.cr b/src/lavinmq/amqp/stream/blob_message_store.cr
new file mode 100644
index 0000000000..8732d87e96
--- /dev/null
+++ b/src/lavinmq/amqp/stream/blob_message_store.cr
@@ -0,0 +1,471 @@
+require "./stream"
+require "./stream_consumer"
+require "../../mfile"
+require "../../schema"
+require "./blob_storage_client"
+require "./blob_segment_cache"
+
+module LavinMQ::AMQP
+ class Stream < DurableQueue
+ class BlobMessageStore < StreamMessageStore
+ @remote_segments = Hash(UInt32, NamedTuple(path: String, etag: String, size: Int64, meta: Bool)).new
+ @storage_client : BlobStorageClient
+ @segment_cache : BlobSegmentCache?
+ @failed_uploads = Deque(UInt32).new
+ @upload_queue = ::Channel(UInt32).new
+ @catalog_io : File?
+ @catalog_data : Hash(UInt32, NamedTuple(msg_count: UInt32, first_offset: Int64, first_ts: Int64, last_ts: Int64))?
+ @catalog_dirty = false
+ @last_catalog_upload = Time.instant
+ NUM_UPLOAD_WORKERS = 4
+ CATALOG_RECORD_SIZE = 32
+ CATALOG_UPLOAD_INTERVAL = 10.seconds
+
+ def initialize(@msg_dir : String, @replicator : Clustering::Replicator?,
+ durable : Bool = true, metadata : ::Log::Metadata = ::Log::Metadata.empty)
+ @storage_client = BlobStorageClient.new(@msg_dir, metadata)
+ @remote_segments = @storage_client.remote_segments_from_bucket
+
+ # Load segment catalog (1 GET instead of N meta downloads)
+ load_segment_catalog
+
+ # Ensure last remote segment and remaining meta files are local before base init
+ prepare_local_files_from_remote
+
+ super # MessageStore + StreamMessageStore init (loads local segments/stats)
+
+ # Load stats for remote-only segments (those without local MFiles)
+ load_remote_segment_stats
+ @catalog_data = nil # Free catalog data after loading stats
+
+ # Re-sort segment hashes so first_key/last_key work correctly.
+ # Base init inserts local segments first; remote-only segments are appended
+ # after, breaking the insertion-order == segment-order assumption.
+ sort_segment_hashes!
+
+ # Fix up the write segment's first_offset — base init set it using
+ # @last_offset before remote segment stats were loaded, so it's wrong.
+ if @segment_msg_count[@wfile_id].zero?
+ prev_seg = @segment_first_offset.each_key.select { |k| k < @wfile_id }.max?
+ if prev_seg
+ @segment_first_offset[@wfile_id] = @segment_first_offset[prev_seg] + @segment_msg_count[prev_seg]
+ end
+ end
+
+ # Recalculate after loading remote segment stats
+ @last_offset = get_last_offset
+ @log.info { "Loaded #{@segment_msg_count.size} segments, #{@size} messages (#{@remote_segments.size} remote)" }
+
+ start_upload_workers
+
+ # Open a new segment if the current one has messages.
+ # Downloaded remote segments are complete and shouldn't be appended to.
+ # This triggers open_new_segment which uploads the previous segment.
+ open_new_segment unless @segment_msg_count[@wfile_id].zero?
+
+ # Upload any remaining local segments not yet uploaded
+ upload_missing_segments
+ @segment_cache = BlobSegmentCache.new(@storage_client, @remote_segments, @segments, @msg_dir, metadata)
+ end
+
+ private def open_new_segment(next_msg_size = 0) : MFile
+ prev_seg_id = @wfile_id unless @wfile_id.zero? || @segment_msg_count[@wfile_id]?.try(&.zero?)
+ super.tap do
+ # Re-enqueue any previously failed uploads first
+ while failed_id = @failed_uploads.shift?
+ @upload_queue.send(failed_id)
+ end
+ if prev_seg_id
+ @upload_queue.send(prev_seg_id)
+ end
+ end
+ end
+
+ private def start_upload_workers
+ NUM_UPLOAD_WORKERS.times do |i|
+ spawn(name: "BlobMessageStore#upload-worker-#{i}") do
+ upload_worker(i)
+ end
+ end
+ end
+
+ private def upload_worker(id : Int32)
+ @storage_client.with_http_client(with_timeouts: false) do |h|
+ loop do
+ seg_id = @upload_queue.receive? || break
+ upload_segment_with_retry(seg_id, h)
+ if cache = @segment_cache
+ cache.ensure_fibers_running
+ cache.notify_upload_complete
+ end
+ end
+ end
+ rescue ::Channel::ClosedError
+ end
+
+ private def upload_segment_with_retry(seg_id : UInt32, h : ::HTTP::Client)
+ 3.times do |attempt|
+ upload_segment_to_remote(seg_id, h)
+ return
+ rescue ex
+ if attempt < 2
+ @log.warn { "Failed to upload segment #{seg_id} to remote storage (attempt #{attempt + 1}/3): #{ex.message}" }
+ else
+ @log.error { "Failed to upload segment #{seg_id} to remote storage after 3 attempts, will retry on next rotation: #{ex.message}" }
+ @failed_uploads << seg_id unless @failed_uploads.includes?(seg_id)
+ end
+ end
+ end
+
+ private def download_segment(seg_id : UInt32) : MFile?
+ return nil unless @remote_segments[seg_id]?
+ return @segments[seg_id] if @segments[seg_id]? # raced with prefetch
+
+ # Download directly. If a cache worker is concurrently downloading the
+ # same segment, the filesystem rename in BlobStorageClient#download_segment
+ # arbitrates — the loser's bytes are discarded. Wasted GET, not a bug.
+ 3.times do |attempt|
+ @storage_client.with_http_client(with_timeouts: false) do |h|
+ if mfile = @storage_client.download_segment(seg_id, @remote_segments, h)
+ @segments[seg_id] = mfile
+ return mfile
+ end
+ end
+ break if @segments[seg_id]? # another fiber downloaded it
+ @log.warn { "Failed to download segment #{seg_id} (attempt #{attempt + 1}/3)" }
+ sleep (attempt + 1).seconds
+ rescue ex : IO::Error | IO::TimeoutError
+ @log.warn { "Error downloading segment #{seg_id} (attempt #{attempt + 1}/3): #{ex.message}" }
+ sleep (attempt + 1).seconds
+ end
+ @segments[seg_id]?
+ end
+
+ # -- Overrides --
+
+ def ensure_available(consumer : AMQP::StreamConsumer) : Nil
+ seg = @segments[consumer.segment]?
+ if seg.nil?
+ download_segment(consumer.segment)
+ seg = @segments[consumer.segment]?
+ end
+ # At EOF of current segment — prefetch the next one
+ if seg && consumer.pos >= seg.size
+ if next_seg = next_segment_id(consumer.segment)
+ download_segment(next_seg) unless @segments[next_seg]?
+ end
+ end
+ end
+
+ def next_segment_id(segment) : UInt32?
+ local = super
+ remote = @remote_segments.each_key.find { |sid| sid > segment }
+ case {local, remote}
+ when {UInt32, UInt32} then Math.min(local, remote)
+ when {UInt32, nil} then local
+ when {nil, UInt32} then remote
+ else nil
+ end
+ end
+
+ def close : Nil
+ @upload_queue.close
+ compact_catalog
+ @catalog_dirty = true # force upload on close
+ upload_catalog
+ @catalog_io.try &.close
+ @segment_cache.try &.close
+ super
+ end
+
+ def delete
+ super
+ keys = Array(String).new(@remote_segments.size * 2 + 1)
+ @remote_segments.each_value do |remote_seg|
+ keys << remote_seg[:path]
+ keys << @storage_client.meta_file_name(remote_seg[:path])
+ end
+ keys << @storage_client.catalog_file_name
+ @storage_client.delete_objects(keys)
+ @remote_segments.clear
+ @catalog_io.try &.close
+ @catalog_io = nil
+ end
+
+ private def next_segment(consumer) : MFile?
+ result = super
+ if result && (cache = @segment_cache)
+ cache.current_read_segments[consumer.tag] = consumer.segment
+ end
+ result
+ end
+
+ def add_consumer(tag, segment)
+ if cache = @segment_cache
+ cache.notify_consumer_added
+ cache.current_read_segments[tag] = segment
+ cache.ensure_fibers_running
+ end
+ end
+
+ def remove_consumer(tag)
+ if cache = @segment_cache
+ cache.current_read_segments.delete(tag)
+ cache.notify_consumer_removed
+ end
+ end
+
+ private def drop_segments_while(& : UInt32 -> Bool)
+ # Iterate segment_msg_count (the authoritative index of all segments,
+ # including those not yet uploaded) instead of just @remote_segments
+ @segment_msg_count.reject! do |seg_id, msg_count|
+ should_drop = yield seg_id
+ break unless should_drop
+ next if seg_id == @wfile_id # never delete the active segment
+
+ @size -= msg_count
+ @segment_last_ts.delete(seg_id)
+ @segment_first_offset.delete(seg_id)
+ @segment_first_ts.delete(seg_id)
+
+ if remote_seg = @remote_segments.delete(seg_id)
+ @bytesize -= remote_seg[:size] - 4
+ @storage_client.delete_remote(remote_seg)
+ elsif mfile = @segments[seg_id]?
+ @bytesize -= mfile.size - 4
+ end
+
+ if mfile = @segments.delete(seg_id)
+ delete_file(mfile, including_meta: true)
+ end
+ true
+ end
+ end
+
+ # -- Remote storage private methods --
+
+ private def sort_segment_hashes!
+ {% for ivar in ["@segment_msg_count", "@segment_first_offset", "@segment_first_ts", "@segment_last_ts"] %}
+ begin
+ sorted = {{ivar.id}}.to_a.sort_by!(&.[0])
+ {{ivar.id}}.clear
+ sorted.each { |k, v| {{ivar.id}}[k] = v }
+ end
+ {% end %}
+ end
+
+ # Download meta files and last segment from remote storage so base init can process them
+ private def prepare_local_files_from_remote
+ return if @remote_segments.empty?
+
+ # Download meta files only for segments not covered by the catalog
+ catalog = @catalog_data
+ @remote_segments.each do |seg_id, remote_seg|
+ next unless remote_seg[:meta] # only download if meta exists in remote storage
+ next if catalog && catalog.has_key?(seg_id)
+ meta_path = File.join(@msg_dir, "meta.#{seg_id.to_s.rjust(10, '0')}")
+ next if File.exists?(meta_path)
+ @storage_client.with_http_client(with_timeouts: true) do |h|
+ @storage_client.download_meta_file(seg_id, @remote_segments, h)
+ end
+ end
+
+ # Ensure last remote segment is local (we need it for writing)
+ last_seg = @remote_segments.each_key.max
+ local_path = File.join(@msg_dir, "msgs.#{last_seg.to_s.rjust(10, '0')}")
+ unless File.exists?(local_path)
+ @storage_client.with_http_client(with_timeouts: true) do |h|
+ @storage_client.download_segment(last_seg, @remote_segments, h)
+ end
+ end
+ end
+
+ # Load stats for remote segments that don't have local MFiles
+ # (Base init only loaded stats for segments in @segments)
+ private def load_remote_segment_stats
+ catalog = @catalog_data
+ @remote_segments.each do |seg_id, remote_seg|
+ already_counted = @segment_msg_count[seg_id]? && !@segment_msg_count[seg_id].zero?
+
+ if catalog && (entry = catalog[seg_id]?)
+ # Apply catalog data — always set offsets/timestamps (base init may
+ # have set wrong first_offset for the last segment which was loaded
+ # locally without a meta file). Only adjust counters for new segments.
+ @segment_msg_count[seg_id] = entry[:msg_count]
+ @segment_first_offset[seg_id] = entry[:first_offset]
+ @segment_first_ts[seg_id] = entry[:first_ts]
+ @segment_last_ts[seg_id] = entry[:last_ts]
+ unless already_counted
+ @size += entry[:msg_count]
+ @bytesize += remote_seg[:size] - 4
+ end
+ elsif already_counted
+ next
+ elsif File.exists?(meta_path = File.join(@msg_dir, "meta.#{seg_id.to_s.rjust(10, '0')}"))
+ read_remote_meta_file(seg_id, meta_path, remote_seg[:size])
+ else
+ # No meta file available, download segment and produce metadata
+ @storage_client.with_http_client(with_timeouts: true) do |h|
+ if mfile = @storage_client.download_segment(seg_id, @remote_segments, h)
+ @segments[seg_id] = mfile
+ mfile.pos = 4
+ produce_metadata(seg_id, mfile)
+ write_metadata_file(seg_id, mfile)
+ upload_meta_to_remote(seg_id, mfile, h)
+ end
+ end
+ end
+ end
+ end
+
+ private def read_remote_meta_file(seg_id, meta_path, remote_size)
+ File.open(meta_path) do |file|
+ count = file.read_bytes(UInt32)
+ @segment_msg_count[seg_id] = count
+ @segment_first_offset[seg_id] = file.read_bytes(Int64)
+ @segment_first_ts[seg_id] = file.read_bytes(Int64)
+ begin
+ @segment_last_ts[seg_id] = file.read_bytes(Int64)
+ rescue IO::EOFError
+ # Older meta format without last_ts
+ end
+ @size += count
+ @bytesize += remote_size - 4
+ end
+ end
+
+ private def upload_segment_to_remote(seg_id : UInt32, h : ::HTTP::Client)
+ return if @remote_segments[seg_id]?
+ return unless @segment_msg_count[seg_id]? && @segment_msg_count[seg_id] > 0
+
+ mfile = @segments[seg_id]? || return
+ remote_path = "/#{@storage_client.remote_path(mfile.path)}"
+ @log.debug { "Uploading segment: #{remote_path}" }
+ etag = @storage_client.upload_file(h, remote_path, mfile.to_slice)
+
+ upload_meta_to_remote(seg_id, mfile, h)
+
+ @remote_segments[seg_id] = {
+ path: remote_path[1..],
+ etag: etag,
+ size: mfile.size,
+ meta: true,
+ }
+ append_to_catalog(seg_id)
+ end
+
+ private def upload_meta_to_remote(seg_id : UInt32, mfile : MFile, h : ::HTTP::Client)
+ meta_path = meta_file_name(mfile)
+ if File.exists?(meta_path)
+ remote_meta_path = "/#{@storage_client.remote_path(meta_path)}"
+ @storage_client.upload_file(h, remote_meta_path, File.open(meta_path, &.getb_to_end))
+ end
+ end
+
+ private def upload_missing_segments
+ @segments.each_key do |seg_id|
+ next if seg_id == @wfile_id
+ next if @remote_segments[seg_id]?
+ next unless @segment_msg_count[seg_id]? && @segment_msg_count[seg_id] > 0
+ @upload_queue.send(seg_id)
+ end
+ end
+
+ # -- Segment catalog methods --
+
+ private def load_segment_catalog
+ catalog_path = File.join(@msg_dir, "segments.catalog")
+
+ # Download from remote storage if not local
+ unless File.exists?(catalog_path)
+ @storage_client.with_http_client(with_timeouts: true) do |h|
+ remote_path = @storage_client.catalog_file_name
+ h.get("/#{remote_path}") do |response|
+ if response.status_code == 200
+ File.open(catalog_path, "w") do |f|
+ IO.copy response.body_io, f
+ end
+ end
+ end
+ rescue ex : IO::TimeoutError | IO::Error
+ Log.warn { "Failed to download segment catalog: #{ex.message}" }
+ end
+ end
+
+ # Parse catalog file
+ catalog = Hash(UInt32, NamedTuple(msg_count: UInt32, first_offset: Int64, first_ts: Int64, last_ts: Int64)).new
+ if File.exists?(catalog_path)
+ file_size = File.size(catalog_path)
+ remainder = file_size % CATALOG_RECORD_SIZE
+ if remainder != 0
+ Log.warn { "Segment catalog has #{remainder} trailing bytes, truncating" }
+ File.open(catalog_path, "r+") do |f|
+ f.truncate(file_size - remainder)
+ end
+ end
+ File.open(catalog_path) do |f|
+ while f.pos < f.size
+ seg_id = f.read_bytes(UInt32)
+ msg_count = f.read_bytes(UInt32)
+ first_offset = f.read_bytes(Int64)
+ first_ts = f.read_bytes(Int64)
+ last_ts = f.read_bytes(Int64)
+ catalog[seg_id] = {msg_count: msg_count, first_offset: first_offset, first_ts: first_ts, last_ts: last_ts}
+ end
+ rescue IO::EOFError
+ end
+ Log.info { "Loaded #{catalog.size} entries from segment catalog" }
+ end
+
+ @catalog_data = catalog
+ @catalog_io = File.open(catalog_path, "a")
+ end
+
+ private def append_to_catalog(seg_id : UInt32)
+ io = @catalog_io || return
+ io.write_bytes(seg_id)
+ io.write_bytes(@segment_msg_count[seg_id])
+ io.write_bytes(@segment_first_offset[seg_id])
+ io.write_bytes(@segment_first_ts[seg_id])
+ io.write_bytes(@segment_last_ts[seg_id])
+ io.flush
+ @catalog_dirty = true
+ if Time.instant - @last_catalog_upload >= CATALOG_UPLOAD_INTERVAL
+ upload_catalog
+ end
+ end
+
+ private def upload_catalog
+ return unless @catalog_dirty
+ catalog_path = File.join(@msg_dir, "segments.catalog")
+ return unless File.exists?(catalog_path)
+ remote_path = "/#{@storage_client.catalog_file_name}"
+ @storage_client.upload_file(remote_path, File.read(catalog_path).to_slice)
+ @catalog_dirty = false
+ @last_catalog_upload = Time.instant
+ end
+
+ private def compact_catalog
+ catalog_path = File.join(@msg_dir, "segments.catalog")
+ return unless File.exists?(catalog_path)
+ record_count = File.size(catalog_path) // CATALOG_RECORD_SIZE
+ return unless record_count > @remote_segments.size * 2
+
+ @catalog_io.try &.close
+ tmp_path = "#{catalog_path}.tmp"
+ File.open(tmp_path, "w") do |f|
+ @remote_segments.each_key do |seg_id|
+ next unless @segment_msg_count[seg_id]?
+ f.write_bytes(seg_id)
+ f.write_bytes(@segment_msg_count[seg_id])
+ f.write_bytes(@segment_first_offset[seg_id])
+ f.write_bytes(@segment_first_ts[seg_id])
+ f.write_bytes(@segment_last_ts[seg_id])
+ end
+ end
+ File.rename(tmp_path, catalog_path)
+ @catalog_io = File.open(catalog_path, "a")
+ end
+ end
+ end
+end
diff --git a/src/lavinmq/amqp/stream/blob_segment_cache.cr b/src/lavinmq/amqp/stream/blob_segment_cache.cr
new file mode 100644
index 0000000000..7ad156c6f6
--- /dev/null
+++ b/src/lavinmq/amqp/stream/blob_segment_cache.cr
@@ -0,0 +1,280 @@
+require "./blob_storage_client"
+require "../../mfile"
+require "../../rough_time"
+require "../../config"
+
+module LavinMQ::AMQP
+ class BlobSegmentCache
+ @log : Logger
+ @idle_since : Time::Instant? = nil
+ property current_read_segments = Hash(String, UInt32).new
+
+ NUM_DOWNLOAD_WORKERS = 8
+ COORDINATOR_INTERVAL = 100.milliseconds
+ CLEANUP_INTERVAL = 5.seconds
+ IDLE_SHUTDOWN_TIMEOUT = 30.seconds
+
+ def initialize(
+ @storage_client : BlobStorageClient,
+ @remote_segments : Hash(UInt32, NamedTuple(path: String, etag: String, size: Int64, meta: Bool)),
+ @segments : Hash(UInt32, MFile),
+ @msg_dir : String,
+ metadata : ::Log::Metadata = ::Log::Metadata.empty,
+ )
+ @log = Logger.new(Log, metadata)
+ @segments_mutex = Mutex.new
+ @pending_mutex = Mutex.new
+ @pending = Set(UInt32).new
+ @download_queue = ::Channel(UInt32).new(256)
+ @fibers_running = Atomic(Bool).new(false)
+ @closed = false
+ @running_loops = Atomic(Int32).new(0)
+ @cleanup_signal = ::Channel(Nil).new(1)
+ delete_temp_files(@msg_dir)
+ end
+
+ # Start background fibers for prefetching and cleanup.
+ # Called when a consumer is added. No-op if already running.
+ def ensure_fibers_running
+ return if @closed
+ return unless @fibers_running.compare_and_set(false, true)[1]
+ # Recreate channels if they were closed by a previous shutdown
+ if @download_queue.closed?
+ @download_queue = ::Channel(UInt32).new(256)
+ @cleanup_signal = ::Channel(Nil).new(1)
+ end
+ @running_loops.set(2) # coordinator + cleanup
+ NUM_DOWNLOAD_WORKERS.times do |i|
+ spawn download_worker(i), name: "BlobSegmentCache#worker-#{i}"
+ end
+ spawn coordinator_loop, name: "BlobSegmentCache#coordinator"
+ spawn cleanup_loop, name: "BlobSegmentCache#cleanup"
+ end
+
+ # Signal that a consumer was removed. Triggers an immediate cleanup pass.
+ def notify_consumer_removed
+ @idle_since = Time.instant if @current_read_segments.empty?
+ # Signal cleanup loop to run immediately
+ select
+ when @cleanup_signal.send(nil)
+ else
+ end
+ end
+
+ # Signal that a segment was uploaded. Triggers an immediate cleanup pass.
+ def notify_upload_complete
+ select
+ when @cleanup_signal.send(nil)
+ else
+ end
+ end
+
+ # Called when a consumer connects. Clears idle timer.
+ def notify_consumer_added
+ @idle_since = nil
+ end
+
+ def close
+ @closed = true
+ @download_queue.close
+ @cleanup_signal.close
+ end
+
+ private def download_worker(worker_id : Int32)
+ client = @storage_client.http_client(with_timeouts: true)
+ loop do
+ seg_id = @download_queue.receive? || break
+ next if @segments[seg_id]?
+
+ @log.debug { "Worker #{worker_id}: downloading segment #{seg_id}" }
+ begin
+ if mfile = @storage_client.download_segment(seg_id, @remote_segments, client)
+ @segments_mutex.synchronize do
+ if @segments[seg_id]?
+ # Another worker finished first, discard our copy
+ @log.debug { "Worker #{worker_id}: segment #{seg_id} already downloaded" }
+ mfile.delete
+ mfile.close
+ else
+ @segments[seg_id] = mfile
+ @log.debug { "Worker #{worker_id}: segment #{seg_id} downloaded" }
+ end
+ end
+ @pending_mutex.synchronize { @pending.delete(seg_id) }
+ else
+ @pending_mutex.synchronize { @pending.delete(seg_id) }
+ end
+ rescue ex : IO::Error | IO::TimeoutError
+ @log.debug { "Worker #{worker_id}: segment #{seg_id} download failed: #{ex.message}" }
+ @pending_mutex.synchronize { @pending.delete(seg_id) }
+ # Reopen connection on error
+ client.close rescue nil
+ client = @storage_client.http_client(with_timeouts: true)
+ end
+ end
+ rescue ::Channel::ClosedError
+ ensure
+ client.try &.close
+ end
+
+ private def coordinator_loop
+ until @closed
+ sleep COORDINATOR_INTERVAL
+ break if @closed
+
+ if @current_read_segments.empty?
+ if idle_timeout_reached?
+ @log.debug { "Coordinator: idle timeout, shutting down" }
+ break
+ end
+ next
+ end
+
+ schedule_downloads
+ end
+ mark_fibers_stopped
+ end
+
+ private def schedule_downloads
+ wanted = segments_to_prefetch
+
+ wanted.each do |seg_id|
+ next if @segments[seg_id]? # Already local
+ next unless @remote_segments[seg_id]? # Must exist in remote storage
+
+ should_queue = @pending_mutex.synchronize do
+ next false if @pending.includes?(seg_id)
+ @pending.add(seg_id)
+ true
+ end
+ next unless should_queue
+
+ select
+ when @download_queue.send(seg_id)
+ @log.debug { "Queued prefetch for segment #{seg_id}" }
+ else
+ @pending_mutex.synchronize { @pending.delete(seg_id) }
+ end
+ end
+ end
+
+ # Segments to prefetch based on consumer positions.
+ # Returns segments sorted by proximity to the nearest consumer.
+ private def segments_to_prefetch : Array(UInt32)
+ readers = @current_read_segments
+ return [] of UInt32 if readers.empty?
+
+ # Ensure at least 3 segments per consumer so prefetching is useful even
+ # when many consumers share a small local_segments_per_stream budget
+ budget = Math.max(3, Config.instance.blob_storage_local_segments_per_stream // readers.size)
+
+ readers.flat_map do |_consumer, segment|
+ (0...budget).map { |i| segment + i }
+ end.to_set.to_a.sort_by do |segment|
+ readers.min_of { |_cid, consumer_seg| (segment.to_i64 - consumer_seg.to_i64).abs }
+ end
+ end
+
+ # Evict local segment copies that are no longer needed.
+ # Keeps the total under the configured max, prioritizing segments
+ # that consumers are likely to need.
+ private def cleanup_loop
+ until @closed
+ # Wait for either the cleanup interval or an explicit signal
+ # (e.g. from remove_consumer triggering immediate cleanup)
+ select
+ when @cleanup_signal.receive
+ when timeout(CLEANUP_INTERVAL)
+ end
+ break if @closed
+
+ run_cleanup
+
+ if @current_read_segments.empty? && idle_timeout_reached?
+ run_cleanup # Final pass before shutting down
+ @log.debug { "Cleanup: idle timeout, shutting down" }
+ break
+ end
+ end
+ mark_fibers_stopped
+ rescue ::Channel::ClosedError
+ mark_fibers_stopped
+ end
+
+ private def run_cleanup
+ max_local = Config.instance.blob_storage_local_segments_per_stream
+ return if @segments.size <= max_local
+
+ wanted = segments_to_prefetch.to_set
+ currently_reading = @current_read_segments.values.to_set
+
+ # Segments safe to remove: not wanted by prefetch, not actively being read,
+ # and re-downloadable from remote storage (never remove the write segment or local-only segments)
+ removable = @segments.keys.reject do |seg_id|
+ wanted.includes?(seg_id) || currently_reading.includes?(seg_id) || !@remote_segments[seg_id]?
+ end
+
+ if @current_read_segments.empty?
+ # No consumers — just remove until under the limit
+ removable.each do |seg_id|
+ break if @segments.size <= max_local
+ remove_local_segment(seg_id)
+ end
+ else
+ # The lowest segment ID any consumer is currently reading
+ min_reader_seg = @current_read_segments.values.min
+
+ # Sort so we remove the least useful segments first:
+ # - Segments behind all readers (already consumed) are removed first
+ # - Among remaining, furthest from any reader are removed first
+ sorted = removable.sort_by do |seg_id|
+ dist = @current_read_segments.values.min_of { |rs| (seg_id.to_i64 - rs.to_i64).abs }
+ seg_id < min_reader_seg ? -dist : dist
+ end
+
+ sorted.each do |seg_id|
+ break if @segments.size <= max_local
+ remove_local_segment(seg_id)
+ end
+ end
+ end
+
+ private def idle_timeout_reached? : Bool
+ if idle_since = @idle_since
+ Time.instant >= idle_since + IDLE_SHUTDOWN_TIMEOUT
+ else
+ false
+ end
+ end
+
+ # Called when coordinator or cleanup loop exits.
+ # When both have exited, close the download queue so workers exit too.
+ private def mark_fibers_stopped
+ if @running_loops.sub(1) == 1 # was 1, now 0 — we're the last loop
+ @download_queue.close
+ @cleanup_signal.close
+ @fibers_running.set(false)
+ @log.debug { "All background fibers stopped" }
+ end
+ end
+
+ private def remove_local_segment(seg_id : UInt32)
+ @log.debug { "Removing local segment: #{seg_id}" }
+ seg = @segments_mutex.synchronize { @segments.delete(seg_id) }
+ if seg
+ seg.delete if File.exists?(seg.path)
+ seg.close
+ end
+ rescue ex : File::NotFoundError
+ @log.debug { "File not found while removing segment #{seg_id}" }
+ end
+
+ private def delete_temp_files(msg_dir : String)
+ Dir.each_child(msg_dir) do |f|
+ if f.includes?(".tmp")
+ File.delete(File.join(msg_dir, f)) rescue nil
+ end
+ end
+ end
+ end
+end
diff --git a/src/lavinmq/amqp/stream/blob_storage_client.cr b/src/lavinmq/amqp/stream/blob_storage_client.cr
new file mode 100644
index 0000000000..c07980051b
--- /dev/null
+++ b/src/lavinmq/amqp/stream/blob_storage_client.cr
@@ -0,0 +1,344 @@
+require "awscr-signer"
+require "base64"
+require "digest/md5"
+require "http/client"
+require "xml"
+require "../../config"
+require "../../rough_time"
+require "../../mfile"
+require "../../message_store"
+
+module LavinMQ::AMQP
+ class BlobStorageClient
+ @s3_signer : Awscr::Signer::Signers::V4
+ @log : Logger
+ @relative_prefix : String # e.g. "vhost_hash/queue_hash"
+
+ HTTP_CONNECT_TIMEOUT = 200.milliseconds
+ HTTP_READ_TIMEOUT = 500.milliseconds
+
+ def initialize(@msg_dir : String, metadata : ::Log::Metadata = ::Log::Metadata.empty)
+ @log = Logger.new(Log, metadata)
+ @s3_signer = s3_signer
+ @relative_prefix = @msg_dir[Config.instance.data_dir.bytesize + 1..]
+ @file_mutex = Mutex.new
+ end
+
+ # Compute the remote path for a local file
+ def remote_path(local_path : String) : String
+ local_path[Config.instance.data_dir.bytesize + 1..]
+ end
+
+ # List all segments in the remote bucket for this stream
+ def remote_segments_from_bucket(max_retries = 5) : Hash(UInt32, NamedTuple(path: String, etag: String, size: Int64, meta: Bool))
+ remote_segments = Hash(UInt32, NamedTuple(path: String, etag: String, size: Int64, meta: Bool)).new
+ prefix = @relative_prefix + "/"
+ continuation_token : String? = nil
+ retries = 0
+
+ h = http_client(with_timeouts: true)
+ begin
+ loop do
+ begin
+ path = "/?delimiter=%2F&encoding-type=url&list-type=2&prefix=#{prefix}&max-keys=1000"
+ if token = continuation_token
+ path += "&continuation-token=#{URI.encode_path(token)}"
+ end
+ response = h.get(path)
+ continuation_token = list_of_files_from_xml(XML.parse(response.body), remote_segments)
+ retries = 0 # Reset on success
+ break unless continuation_token
+ rescue ex : IO::TimeoutError | IO::Error
+ retries += 1
+ if retries > max_retries
+ raise MessageStore::Error.new("Failed to list remote bucket after #{max_retries} retries: #{ex.message}")
+ end
+ @log.warn { "Error listing remote bucket (attempt #{retries}/#{max_retries}): #{ex.message}" }
+ sleep (retries * 100).milliseconds
+ # Reconnect on error — the previous connection may be broken
+ h.close rescue nil
+ h = http_client(with_timeouts: true)
+ end
+ end
+ ensure
+ h.close
+ end
+
+ @log.info { "Found #{remote_segments.size} segments in remote storage" }
+ remote_segments
+ end
+
+ def list_of_files_from_xml(document, remote_segments) : String?
+ list_bucket_results = document.first_element_child
+ return unless list_bucket_results
+
+ contents_elements = list_bucket_results.xpath_nodes("//*[local-name()='Contents']")
+ contents_elements.each { |content| parse_xml_element(content, remote_segments) }
+
+ is_truncated_node = list_bucket_results.xpath_node(".//*[local-name()='IsTruncated']")
+ if is_truncated_node && is_truncated_node.content == "true"
+ continuation_token_node = list_bucket_results.xpath_node(".//*[local-name()='NextContinuationToken']")
+ return continuation_token_node.try &.content
+ end
+ nil
+ end
+
+ private def parse_xml_element(content, remote_segments)
+ path = etag = ""
+ id = 0_u32
+ size = 0_i64
+
+ if key_node = content.xpath_node(".//*[local-name()='Key']")
+ path = key_node.content
+ if match = path.match(/\/meta\.(\d{10})$/)
+ id = match[1].to_u32
+ update_remote_segment_list(remote_segments, id, "", "", 0_i64, true)
+ return
+ elsif match = path.match(/\/msgs\.(\d{10})$/)
+ id = match[1].to_u32
+ else
+ return
+ end
+ end
+
+ if etag_node = content.xpath_node(".//*[local-name()='ETag']")
+ etag_content = etag_node.content
+ if etag_content.starts_with?('"')
+ etag = etag_content[1..-2]
+ else
+ etag = etag_content
+ end
+ end
+
+ if size_node = content.xpath_node(".//*[local-name()='Size']")
+ size = size_node.content.to_i64
+ end
+
+ update_remote_segment_list(remote_segments, id, path, etag, size, false)
+ end
+
+ private def update_remote_segment_list(remote_segments, seg_id : UInt32, path : String = "", etag : String = "", size : Int64 = 0_i64, meta : Bool = false)
+ remote_seg = remote_segments[seg_id]? || {path: path, etag: etag, size: size, meta: meta}
+ path = remote_seg[:path] if path == ""
+ etag = remote_seg[:etag] if etag == ""
+ size = remote_seg[:size] if size == 0_i64
+ # Sticky flag: once a meta file is seen for this segment, keep it true
+ meta = remote_seg[:meta] if meta == false
+
+ remote_segments[seg_id] = {path: path, etag: etag, size: size, meta: meta}
+ end
+
+ # Download a segment from remote storage
+ # Returns MFile on success, nil on failure
+ def download_segment(segment_id : UInt32, remote_segments, h : ::HTTP::Client) : MFile?
+ @log.debug { "Downloading segment: #{segment_id}" }
+ return unless remote_segments[segment_id]?
+
+ remote_file_path = remote_segments[segment_id][:path]
+ path = File.join(Config.instance.data_dir, remote_file_path)
+
+ h.get("/#{remote_file_path}") do |response|
+ if response.status_code != 200
+ @log.warn { "Failed to download segment #{segment_id}: HTTP #{response.status_code}" }
+ return nil
+ end
+
+ bytesize = response.headers["Content-Length"].to_i32
+ rfile = @file_mutex.synchronize { MFile.new(make_temp_path(path), bytesize) }
+
+ begin
+ IO.copy response.body_io, rfile
+
+ @file_mutex.synchronize do
+ if File.exists?(path)
+ @log.debug { "Segment #{segment_id} already exists, discarding download" }
+ rfile.delete
+ rfile.close
+ return nil
+ end
+ rfile.rename(path)
+ end
+ @log.debug { "Downloaded segment: #{segment_id}" }
+ return rfile
+ rescue ex
+ rfile.delete rescue nil
+ rfile.close rescue nil
+ raise ex
+ end
+ end
+ end
+
+ # Download metadata file from remote storage
+ def download_meta_file(segment_id : UInt32, remote_segments, h : ::HTTP::Client, max_retries = 3) : MFile?
+ @log.debug { "Downloading meta for segment: #{segment_id}" }
+ return unless remote_segments[segment_id]?
+
+ remote_meta_path = meta_file_name(remote_segments[segment_id][:path])
+ path = File.join(Config.instance.data_dir, remote_meta_path)
+ retries = 0
+
+ loop do
+ h.get("/#{remote_meta_path}") do |response|
+ if response.status_code == 404
+ @log.debug { "Meta file not found for segment #{segment_id}" }
+ return nil
+ end
+ if response.status_code != 200
+ raise "HTTP #{response.status_code}"
+ end
+ bytesize = response.headers["Content-Length"].to_i32
+ rfile = MFile.new(path, bytesize)
+ IO.copy response.body_io, rfile
+ return rfile
+ end
+ rescue ex
+ retries += 1
+ if retries > max_retries
+ @log.error { "Failed to download meta for segment #{segment_id} after #{max_retries} retries: #{ex.message}" }
+ return nil
+ end
+ @log.warn { "Failed to download meta for segment #{segment_id} (attempt #{retries}/#{max_retries}): #{ex.message}" }
+ sleep (retries * 100).milliseconds
+ end
+ end
+
+ # Upload a file to remote storage with retries
+ def upload_file(path : String, slice : Bytes, max_retries = 3) : String
+ with_http_client(with_timeouts: false) do |h|
+ upload_file(h, path, slice, max_retries)
+ end
+ end
+
+ # Upload using an existing HTTP client (for worker pools)
+ def upload_file(h : ::HTTP::Client, path : String, slice : Bytes, max_retries = 3) : String
+ retries = 0
+ loop do
+ begin
+ response = h.put(path, body: slice)
+ if response.status_code == 200
+ return response.headers["ETag"]
+ end
+ raise "HTTP #{response.status_code}"
+ rescue ex
+ retries += 1
+ if retries > max_retries
+ raise Exception.new("Failed to upload #{path} to remote storage after #{max_retries} retries: #{ex.message}")
+ end
+ @log.warn { "Failed to upload #{path} to remote storage (attempt #{retries}/#{max_retries}): #{ex.message}" }
+ sleep (retries * 100).milliseconds
+ end
+ end
+ end
+
+ # Delete a segment and its metadata from remote storage
+ def delete_remote(remote_seg)
+ with_http_client(with_timeouts: true) do |h|
+ delete_remote(h, remote_seg[:path])
+ delete_remote(h, meta_file_name(remote_seg[:path]))
+ end
+ end
+
+ def delete_remote(h : ::HTTP::Client, path : String)
+ response = h.delete("/#{path}")
+ if response.status_code != 204
+ @log.error { "Failed to delete #{path} from remote storage: HTTP #{response.status_code}" }
+ else
+ @log.debug { "Deleted #{path} from remote storage" }
+ end
+ end
+
+ # Batch delete up to 1000 objects per request using multi-object delete
+ def delete_objects(keys : Array(String))
+ return if keys.empty?
+ with_http_client(with_timeouts: false) do |h|
+ keys.each_slice(1000) do |batch|
+ delete_objects_batch(h, batch)
+ end
+ end
+ end
+
+ private def delete_objects_batch(h : ::HTTP::Client, keys : Array(String))
+ body = XML.build do |xml|
+ xml.element("Delete") do
+ xml.element("Quiet") { xml.text "true" }
+ keys.each do |key|
+ xml.element("Object") do
+ xml.element("Key") { xml.text key }
+ end
+ end
+ end
+ end
+ md5 = Base64.strict_encode(Digest::MD5.digest(body))
+ headers = ::HTTP::Headers{"Content-MD5" => md5, "Content-Type" => "application/xml"}
+ response = h.post("/?delete", headers: headers, body: body)
+ if response.status_code != 200
+ @log.error { "Multi-object delete failed: HTTP #{response.status_code}" }
+ else
+ @log.debug { "Deleted #{keys.size} objects from remote storage" }
+ end
+ end
+
+ # Block-based HTTP client that ensures cleanup
+ def with_http_client(with_timeouts = false, &)
+ h = http_client(with_timeouts)
+ begin
+ yield h
+ ensure
+ h.close
+ end
+ end
+
+ # Create an HTTP client for remote storage operations
+ def http_client(with_timeouts = false) : ::HTTP::Client
+ endpoint = Config.instance.blob_storage_endpoint
+ raise "Blob storage endpoint not configured" unless endpoint
+
+ h = ::HTTP::Client.new(URI.parse(endpoint))
+ h.before_request do |request|
+ @s3_signer.sign(request)
+ end
+ if with_timeouts
+ h.connect_timeout = HTTP_CONNECT_TIMEOUT
+ h.read_timeout = HTTP_READ_TIMEOUT
+ end
+ h
+ end
+
+ def catalog_file_name : String
+ "#{@relative_prefix}/segments.catalog"
+ end
+
+ # Replaces "msgs.DDDDDDDDDD" suffix with "meta.DDDDDDDDDD"
+ # Hard-coded byte offsets (15 = "msgs." + 10-digit ID) used intentionally
+ # to avoid string allocation from sub/gsub.
+ def meta_file_name(path : String) : String
+ raw = path.to_slice
+ String.build(path.size) do |io|
+ io.write raw[0, raw.size - 15]
+ io.write "meta.".to_slice
+ io.write raw[-10..]
+ end
+ end
+
+ private def s3_signer : Awscr::Signer::Signers::V4
+ if (region = Config.instance.blob_storage_region) &&
+ (access_key = Config.instance.blob_storage_access_key_id) &&
+ (secret_key = Config.instance.blob_storage_secret_access_key)
+ Awscr::Signer::Signers::V4.new("s3", region, access_key, secret_key)
+ else
+ Log.fatal { "Blob storage region or access key is not set" }
+ abort "Blob storage region or access key is not set"
+ end
+ end
+
+ private def make_temp_path(path) : String
+ temp_path = "#{path}.tmp"
+ i = 0
+ while File.exists?(temp_path)
+ temp_path = "#{path}.tmp.#{i}"
+ i += 1
+ end
+ temp_path
+ end
+ end
+end
diff --git a/src/lavinmq/amqp/stream/blob_stream.cr b/src/lavinmq/amqp/stream/blob_stream.cr
new file mode 100644
index 0000000000..d0ed11aff4
--- /dev/null
+++ b/src/lavinmq/amqp/stream/blob_stream.cr
@@ -0,0 +1,25 @@
+require "./stream"
+require "./blob_message_store"
+
+module LavinMQ::AMQP
+ class BlobStream < Stream
+ private def init_msg_store(data_dir)
+ replicator = @vhost.@replicator
+ @msg_store = BlobMessageStore.new(data_dir, replicator, true, metadata: @metadata)
+ end
+
+ def add_consumer(consumer : Client::Channel::Consumer)
+ super
+ blob_msg_store.add_consumer(consumer.tag, consumer.as(AMQP::StreamConsumer).segment)
+ end
+
+ def rm_consumer(consumer : Client::Channel::Consumer)
+ super
+ blob_msg_store.remove_consumer(consumer.tag)
+ end
+
+ private def blob_msg_store : BlobMessageStore
+ @msg_store.as(BlobMessageStore)
+ end
+ end
+end
diff --git a/src/lavinmq/amqp/stream/stream.cr b/src/lavinmq/amqp/stream/stream.cr
index 3f79e0d400..2651c69706 100644
--- a/src/lavinmq/amqp/stream/stream.cr
+++ b/src/lavinmq/amqp/stream/stream.cr
@@ -156,6 +156,7 @@ module LavinMQ::AMQP
# if we encouncer an unrecoverable ReadError, close queue
private def get(consumer : AMQP::StreamConsumer, & : Envelope -> Nil) : Bool
raise ClosedError.new if @closed
+ stream_msg_store.ensure_available(consumer)
env = @msg_store_lock.synchronize { @msg_store.shift?(consumer) } || return false
yield env # deliver the message
true
diff --git a/src/lavinmq/amqp/stream/stream_message_store.cr b/src/lavinmq/amqp/stream/stream_message_store.cr
index 4c80903886..dbb2a4cb9f 100644
--- a/src/lavinmq/amqp/stream/stream_message_store.cr
+++ b/src/lavinmq/amqp/stream/stream_message_store.cr
@@ -35,8 +35,9 @@ module LavinMQ::AMQP
private def get_last_offset : Int64
return 0i64 if @size.zero?
+ last_count = @segment_msg_count.last_value
offset = @segment_first_offset.last_value
- offset += @segment_msg_count.last_value - 1
+ offset += last_count - 1 unless last_count.zero?
offset
end
@@ -52,8 +53,8 @@ module LavinMQ::AMQP
end
case offset
- when "first" then offset_at(@segments.first_key, 4u32)
- when "last" then offset_at(@segments.last_key, 4u32)
+ when "first" then find_offset_in_segments(0)
+ when "last" then find_offset_in_segments(@last_offset)
when "next" then last_offset_seg_pos
when Time then find_offset_in_segments(offset)
when nil
@@ -96,15 +97,19 @@ module LavinMQ::AMQP
{@last_offset + 1, @segments.last_key, @segments.last_value.size.to_u32}
end
+ # ameba:disable Metrics/CyclomaticComplexity
private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32)
segment = offset_index_lookup(offset)
+ download_segment(segment) unless @segments[segment]?
pos = 4u32
msg_offset = @segment_first_offset[segment] || 0i64
loop do
rfile = @segments[segment]?
if rfile.nil? || pos == rfile.size
- if segment = @segments.each_key.find { |sid| sid > segment }
- rfile = @segments[segment]
+ if seg_id = next_segment_id(segment)
+ download_segment(seg_id) unless @segments[seg_id]?
+ rfile = @segments[seg_id]? || return last_offset_seg_pos
+ segment = seg_id
pos = 4u32
msg_offset = @segment_first_offset[segment]
else
@@ -126,7 +131,7 @@ module LavinMQ::AMQP
end
private def offset_index_lookup(offset) : UInt32
- seg = @segments.first_key
+ seg = @segment_first_offset.first_key
case offset
when Int
@segment_first_offset.each do |seg_id, first_seg_offset|
@@ -184,7 +189,7 @@ module LavinMQ::AMQP
return if @consumer_offsets.size.zero?
offsets_to_save = Hash(String, Int64).new
- lowest_offset_in_stream, _seg, _pos = offset_at(@segments.first_key, 4u32)
+ lowest_offset_in_stream = @segment_first_offset.first_value
capacity = 0
@consumer_offset_positions.each do |ctag, _pos|
if offset = last_offset_by_consumer_tag(ctag)
@@ -215,7 +220,7 @@ module LavinMQ::AMQP
def read(segment : UInt32, position : UInt32) : Envelope?
return if @closed
- rfile = @segments[segment]
+ rfile = @segments[segment]? || download_segment(segment) || return
return if position == rfile.size
begin
msg = BytesMessage.from_bytes(rfile.to_slice + position)
@@ -227,6 +232,11 @@ module LavinMQ::AMQP
end
end
+ # Ensure the segment the consumer needs is available locally.
+ # No-op for local storage; BlobMessageStore overrides to download ahead of the lock.
+ def ensure_available(consumer : AMQP::StreamConsumer) : Nil
+ end
+
def shift?(consumer : AMQP::StreamConsumer) : Envelope?
raise ClosedError.new if @closed
@@ -273,11 +283,17 @@ module LavinMQ::AMQP
end
private def next_segment(consumer) : MFile?
- if seg_id = next_segment_id(consumer.segment)
- consumer.segment = seg_id
- consumer.pos = 4u32
- @segments[seg_id]
- end
+ seg_id = next_segment_id(consumer.segment) || return
+ consumer.segment = seg_id
+ consumer.pos = 4u32
+ @segments[seg_id]? || download_segment(seg_id)
+ end
+
+ # Called when a consumer or offset lookup needs a segment that isn't
+ # available locally. Returns nil by default. BlobMessageStore overrides this
+ # to download the segment from remote storage and add it to @segments.
+ private def download_segment(seg_id : UInt32) : MFile?
+ nil
end
def push(msg) : SegmentPosition
@@ -292,9 +308,10 @@ module LavinMQ::AMQP
private def open_new_segment(next_msg_size = 0) : MFile
super.tap do
- drop_overflow
@segment_first_offset[@segments.last_key] = @last_offset.zero? ? 1i64 : @last_offset
@segment_first_ts[@segments.last_key] = RoughTime.unix_ms
+ @segment_last_ts[@segments.last_key] = RoughTime.unix_ms
+ drop_overflow
end
end
@@ -367,7 +384,7 @@ module LavinMQ::AMQP
private def produce_metadata(seg, mfile)
super
- if empty?
+ if @segment_msg_count[seg].zero?
@segment_first_offset[seg] = @last_offset + 1
@segment_first_ts[seg] = RoughTime.unix_ms
@segment_last_ts[seg] = RoughTime.unix_ms
diff --git a/src/lavinmq/config.cr b/src/lavinmq/config.cr
index d2e13521cc..38681cf479 100644
--- a/src/lavinmq/config.cr
+++ b/src/lavinmq/config.cr
@@ -62,10 +62,11 @@ module LavinMQ
parser.banner = "Usage: #{PROGRAM_NAME} [arguments]"
{% begin %}
sections = {
- options: {description: "Options", options: Array(Option).new},
- bindings: {description: "Bindings", options: Array(Option).new},
- tls: {description: "TLS", options: Array(Option).new},
- clustering: {description: "Clustering", options: Array(Option).new},
+ options: {description: "Options", options: Array(Option).new},
+ bindings: {description: "Bindings", options: Array(Option).new},
+ tls: {description: "TLS", options: Array(Option).new},
+ clustering: {description: "Clustering", options: Array(Option).new},
+ "blob-storage": {description: "Blob Storage", options: Array(Option).new},
}
# Build sections structure and populate with CLI options from annotated instance variables
{% for ivar in @type.instance_vars.select(&.annotation(CliOpt)) %}
diff --git a/src/lavinmq/config/options.cr b/src/lavinmq/config/options.cr
index 57c71d45c4..9d2c734f3f 100644
--- a/src/lavinmq/config/options.cr
+++ b/src/lavinmq/config/options.cr
@@ -3,7 +3,7 @@ module LavinMQ
annotation CliOpt; end
annotation IniOpt; end
annotation EnvOpt; end
- INI_SECTIONS = {"main", "amqp", "mqtt", "mgmt", "experimental", "clustering", "oauth"}
+ INI_SECTIONS = {"main", "amqp", "mqtt", "mgmt", "experimental", "clustering", "oauth", "blob-storage"}
# Separate module for config option definitions. This keeps the option declarations
# organized in one place, while config.cr contains the parsing and validation logic.
@@ -303,6 +303,26 @@ module LavinMQ
@[IniOpt(section: "main", transform: ->ConsistentHashAlgorithm.parse(String))]
property default_consistent_hash_algorithm : ConsistentHashAlgorithm = ConsistentHashAlgorithm::Ring
+ @[CliOpt("", "--blob-storage-region=REGION", "Blob storage region", section: "blob-storage")]
+ @[IniOpt(ini_name: region, section: "blob-storage")]
+ property blob_storage_region : String? = nil
+
+ @[CliOpt("", "--blob-storage-access-key-id=KEY", "Blob storage access key ID", section: "blob-storage")]
+ @[IniOpt(ini_name: access_key_id, section: "blob-storage")]
+ property blob_storage_access_key_id : String? = nil
+
+ @[CliOpt("", "--blob-storage-secret-access-key=KEY", "Blob storage secret access key", section: "blob-storage")]
+ @[IniOpt(ini_name: secret_access_key, section: "blob-storage")]
+ property blob_storage_secret_access_key : String? = nil
+
+ @[CliOpt("", "--blob-storage-endpoint=ENDPOINT", "Blob storage endpoint URL (use virtual-hosted-style: https://bucket.s3.region.amazonaws.com)", section: "blob-storage")]
+ @[IniOpt(ini_name: endpoint, section: "blob-storage")]
+ property blob_storage_endpoint : String? = nil
+
+ @[CliOpt("", "--blob-storage-local-segments=NUMBER", "Number of local segments to keep per blob-stream (default: 50)", section: "blob-storage")]
+ @[IniOpt(ini_name: local_segments_per_stream, section: "blob-storage")]
+ property blob_storage_local_segments_per_stream = 50
+
# Deprecated options - these forward to the primary option in [main]
@[IniOpt(ini_name: tls_cert, section: "amqp", deprecated: "tls_cert in [main]")]
diff --git a/src/lavinmq/message_store.cr b/src/lavinmq/message_store.cr
index 2e823e1993..e173059570 100644
--- a/src/lavinmq/message_store.cr
+++ b/src/lavinmq/message_store.cr
@@ -615,6 +615,10 @@ module LavinMQ
def initialize(mfile : MFile, cause = nil)
super("path=#{mfile.path} pos=#{mfile.pos} size=#{mfile.size}", cause: cause)
end
+
+ def initialize(message : String)
+ super(message)
+ end
end
end
end
diff --git a/src/lavinmq/queue_factory.cr b/src/lavinmq/queue_factory.cr
index 74b27f85cb..6f019bd1f3 100644
--- a/src/lavinmq/queue_factory.cr
+++ b/src/lavinmq/queue_factory.cr
@@ -2,6 +2,7 @@ require "./amqp/queue"
require "./amqp/queue/priority_queue"
require "./amqp/queue/durable_queue"
require "./amqp/stream/stream"
+require "./amqp/stream/blob_stream"
require "./mqtt/session"
module LavinMQ
@@ -19,8 +20,11 @@ module LavinMQ
end
private def self.make_durable(vhost, frame)
- if stream_queue? frame
+ case frame.arguments["x-queue-type"]?
+ when "stream"
AMQP::Stream.create(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments)
+ when "blob-stream"
+ AMQP::BlobStream.create(vhost, frame.queue_name, frame.exclusive, frame.auto_delete, frame.arguments)
else
warn_if_unsupported_queue_type frame
if prio_queue? frame
@@ -32,7 +36,7 @@ module LavinMQ
end
private def self.make_queue(vhost, frame)
- if stream_queue? frame
+ if stream_queue?(frame) || blob_stream_queue?(frame)
raise Error::PreconditionFailed.new("A stream cannot be non-durable")
end
warn_if_unsupported_queue_type frame
@@ -51,6 +55,10 @@ module LavinMQ
frame.arguments["x-queue-type"]? == "stream"
end
+ private def self.blob_stream_queue?(frame) : Bool
+ frame.arguments["x-queue-type"]? == "blob-stream"
+ end
+
private def self.mqtt_session?(frame) : Bool
frame.arguments["x-queue-type"]? == "mqtt"
end
diff --git a/static/js/queue.js b/static/js/queue.js
index 46d2b63f22..a1ce7fbc28 100644
--- a/static/js/queue.js
+++ b/static/js/queue.js
@@ -93,7 +93,7 @@ function updateQueue (all) {
HTTP.request('GET', queueUrl + '?consumer_list_length=' + consumerListLength)
.then(item => {
const qType = item.arguments['x-queue-type']
- if (qType === 'stream') {
+ if (qType === 'stream' || qType === 'blob-stream') {
window.location.href = `/stream#vhost=${encodeURIComponent(vhost)}&name=${encodeURIComponent(queue)}`
}
Chart.update(chart, item.message_stats)
diff --git a/static/js/queues.js b/static/js/queues.js
index a226bc35d0..6ae01976af 100644
--- a/static/js/queues.js
+++ b/static/js/queues.js
@@ -116,7 +116,7 @@ const queuesTable = Table.renderTable('table', tableOptions, function (tr, item,
}
const queueLink = document.createElement('a')
const qType = item.arguments['x-queue-type']
- if (qType === 'stream') {
+ if (qType === 'stream' || qType === 'blob-stream') {
queueLink.href = HTTP.url`stream#vhost=${item.vhost}&name=${item.name}`
} else {
queueLink.href = HTTP.url`queue#vhost=${item.vhost}&name=${item.name}`
diff --git a/views/queues.ecr b/views/queues.ecr
index 83cd250d2a..fdc16fbce5 100644
--- a/views/queues.ecr
+++ b/views/queues.ecr
@@ -114,6 +114,11 @@
Stream Queue
Make the queue a stream queue. It has to be durable and can't be auto-delete
|
+ Blob Stream Queue
+ Make the queue a blob stream queue. A stream queue that
+ offloads segments to remote blob storage (S3-compatible).
+ Requires blob storage to be configured. Must be durable and can't be auto-delete.
+ |
Max age
Stream queue segments will be deleted when all messages in the segmented is older than this.
Valid units are Y(ear), M(onth), D(ays), h(ours), m(inutes), s(seconds).