Skip to content

Commit de3a312

Browse files
fix: S3 streams review fixes
- Refactor S3MessageStore to reuse StreamMessageStore base init instead of duplicating segment loading, metadata parsing, and file registration - Extract segment download into overridable `download_segment` method on StreamMessageStore, letting S3 subclass transparently fetch from cache or direct download - Rewrite S3SegmentCache fiber lifecycle: idle shutdown with automatic restart on new consumers, channel-based coordination replacing busy-wait polling - Fix segment cache eviction order to remove already-consumed segments before speculative prefetch segments - Fix UInt32 subtraction overflow in prefetch distance calculation - Reconnect HTTP client on error during S3 bucket listing - Add specs for pagination, concurrent consumers, segment cache prefetching, download failure recovery, and max-length enforcement
1 parent 673e8d3 commit de3a312

File tree

10 files changed

+757
-538
lines changed

10 files changed

+757
-538
lines changed

spec/config_spec.cr

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ describe LavinMQ::Config do
169169
region = us-east-1
170170
access_key_id = AKIAIOSFODNN7EXAMPLE
171171
secret_access_key = wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
172-
session_token = AQoDYXdzEJr
173172
endpoint = https://s3.example.com
174173
local_segments_per_stream = 100
175174
CONFIG
@@ -260,7 +259,6 @@ describe LavinMQ::Config do
260259
config.streams_s3_storage_region.should eq "us-east-1"
261260
config.streams_s3_storage_access_key_id.should eq "AKIAIOSFODNN7EXAMPLE"
262261
config.streams_s3_storage_secret_access_key.should eq "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
263-
config.streams_s3_storage_session_token.should eq "AQoDYXdzEJr"
264262
config.streams_s3_storage_endpoint.should eq "https://s3.example.com"
265263
config.streams_s3_storage_local_segments_per_stream.should eq 100
266264
ensure
@@ -306,7 +304,6 @@ describe LavinMQ::Config do
306304
"--s3-storage-region=eu-west-1",
307305
"--s3-storage-access-key-id=AKIACLIEXAMPLE",
308306
"--s3-storage-secret-access-key=cliSecretKey123",
309-
"--s3-storage-session-token=cliSessionToken",
310307
"--s3-storage-endpoint=https://s3.cli.example.com",
311308
"--s3-storage-local-segments=75",
312309
]
@@ -346,7 +343,6 @@ describe LavinMQ::Config do
346343
config.streams_s3_storage_region.should eq "eu-west-1"
347344
config.streams_s3_storage_access_key_id.should eq "AKIACLIEXAMPLE"
348345
config.streams_s3_storage_secret_access_key.should eq "cliSecretKey123"
349-
config.streams_s3_storage_session_token.should eq "cliSessionToken"
350346
config.streams_s3_storage_endpoint.should eq "https://s3.cli.example.com"
351347
config.streams_s3_storage_local_segments_per_stream.should eq 75
352348
end

spec/stream_s3_spec.cr

Lines changed: 241 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module S3SpecHelper
1212
io.write_bytes 100_u32 # count
1313
io.write_bytes offset # first offset
1414
io.write_bytes offset # first timestamp
15+
io.write_bytes offset # last timestamp
1516
io.rewind
1617
io.getb_to_end
1718
end
@@ -192,6 +193,106 @@ describe LavinMQ::AMQP::Stream::S3MessageStore do
192193
end
193194
end
194195

196+
it "uploads sealed segments to S3 on rotation" do
197+
with_amqp_server do |s|
198+
with_channel(s) do |ch|
199+
q_name = "s3-upload-test"
200+
q_args = AMQP::Client::Arguments.new({"x-queue-type": "stream"})
201+
q = ch.queue(q_name, durable: true, args: q_args)
202+
203+
# Publish a segment-sized message to trigger rotation
204+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
205+
q.publish_confirm data
206+
# Second publish triggers open_new_segment which uploads the first
207+
q.publish_confirm data
208+
209+
server = S3SpecHelper.s3_server.not_nil!
210+
# Sealed segment should be uploaded to S3
211+
wait_for { server.keys.count(&.matches?(/msgs\.\d{10}$/)) >= 1 }
212+
213+
ch.queue_delete(q_name)
214+
end
215+
end
216+
end
217+
218+
it "deletes S3 segments when queue is deleted" do
219+
with_amqp_server do |s|
220+
with_channel(s) do |ch|
221+
q_name = "s3-delete-test"
222+
q_args = AMQP::Client::Arguments.new({"x-queue-type": "stream"})
223+
q = ch.queue(q_name, durable: true, args: q_args)
224+
225+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
226+
2.times { q.publish_confirm data }
227+
228+
server = S3SpecHelper.s3_server.not_nil!
229+
# Verify segments exist in S3 before delete
230+
wait_for { !server.keys.select(&.includes?("msgs.")).empty? }
231+
232+
ch.queue_delete(q_name)
233+
234+
# All segments for this queue should be gone from S3
235+
queue_hash = Digest::SHA1.hexdigest(q_name)
236+
remaining = server.keys.select(&.includes?(queue_hash))
237+
remaining.should be_empty
238+
end
239+
end
240+
end
241+
242+
it "drops oldest S3 segments when max-length exceeded" do
243+
with_amqp_server do |s|
244+
with_channel(s) do |ch|
245+
q_name = "s3-maxlen-test"
246+
args = AMQP::Client::Arguments.new({"x-queue-type": "stream", "x-max-length": 1})
247+
q = ch.queue(q_name, durable: true, args: args)
248+
249+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
250+
4.times { q.publish_confirm data }
251+
252+
server = S3SpecHelper.s3_server.not_nil!
253+
queue_hash = Digest::SHA1.hexdigest(q_name)
254+
255+
# Wait for uploads, then verify segments were dropped
256+
# We published 4 segment-sized messages (4 segments sealed + 1 active)
257+
# With max-length: 1, most segments should be dropped
258+
q.message_count.should be <= 2
259+
segment_keys = server.keys.select { |k| k.includes?(queue_hash) && k.matches?(/msgs\.\d{10}$/) }
260+
segment_keys.size.should be <= 2
261+
262+
ch.queue_delete(q_name)
263+
end
264+
end
265+
end
266+
267+
it "consumes messages across segment boundaries" do
268+
with_amqp_server do |s|
269+
with_channel(s) do |ch|
270+
q_name = "s3-cross-segment"
271+
q_args = AMQP::Client::Arguments.new({"x-queue-type": "stream"})
272+
q = ch.queue(q_name, durable: true, args: q_args)
273+
ch.prefetch 1
274+
275+
# Publish enough to span 2 segments
276+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
277+
q.publish_confirm data
278+
q.publish_confirm "last message"
279+
280+
# Consume from the beginning
281+
msgs = Channel(String).new(2)
282+
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
283+
msgs.send msg.body_io.to_s
284+
ch.basic_ack(msg.delivery_tag)
285+
end
286+
287+
first = msgs.receive
288+
first.bytesize.should eq LavinMQ::Config.instance.segment_size
289+
msgs.receive.should eq "last message"
290+
291+
ch.queue_delete(q_name)
292+
end
293+
end
294+
end
295+
195296
it "should raise if not configured properly" do
196297
LavinMQ::Config.instance.streams_s3_storage_region = nil
197298
LavinMQ::Config.instance.streams_s3_storage_access_key_id = nil
@@ -205,4 +306,144 @@ describe LavinMQ::AMQP::Stream::S3MessageStore do
205306
LavinMQ::AMQP::Stream::S3MessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
206307
end
207308
end
309+
310+
describe "download failure handling" do
311+
it "recovers from S3 GET failure during segment cache download" do
312+
with_amqp_server do |s|
313+
with_channel(s) do |ch|
314+
q_name = "s3-fail-test"
315+
q_args = AMQP::Client::Arguments.new({"x-queue-type": "stream"})
316+
q = ch.queue(q_name, durable: true, args: q_args)
317+
ch.prefetch 1
318+
319+
# Publish enough to create 2 segments
320+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
321+
q.publish_confirm data
322+
q.publish_confirm "second segment msg"
323+
324+
# Make the first segment fail on next GET (simulates transient S3 error)
325+
server = S3SpecHelper.s3_server.not_nil!
326+
queue_hash = Digest::SHA1.hexdigest(q_name)
327+
first_segment_key = server.keys.find { |k| k.includes?(queue_hash) && k.matches?(/msgs\.\d{10}$/) }
328+
server.fail_keys.add(first_segment_key.not_nil!) if first_segment_key
329+
330+
# Consumer should still be able to read (direct download retries or fallback)
331+
msgs = Channel(String).new(2)
332+
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
333+
msgs.send msg.body_io.to_s
334+
ch.basic_ack(msg.delivery_tag)
335+
end
336+
337+
# Should eventually receive messages (fail_keys only fails once)
338+
msg = msgs.receive
339+
msg.bytesize.should eq LavinMQ::Config.instance.segment_size
340+
341+
ch.queue_delete(q_name)
342+
end
343+
end
344+
end
345+
end
346+
347+
describe "S3 pagination" do
348+
it "lists all segments when S3 response is paginated" do
349+
msg_dir = "/tmp/lavinmq-spec/#{DATA_DIR}"
350+
FileUtils.rm_rf(msg_dir)
351+
Dir.mkdir_p(msg_dir)
352+
353+
server = S3SpecHelper.s3_server.not_nil!
354+
# Set max keys to 3 to force pagination with just a few segments
355+
server.max_list_keys = 3
356+
357+
# Add 4 segments with meta files (8 keys total, will need 3 pages)
358+
4.times do |i|
359+
seg_id = (i + 1).to_s.rjust(10, '0')
360+
server.put("#{DATA_DIR}/msgs.#{seg_id}", S3SpecHelper.segment_bytes)
361+
server.put("#{DATA_DIR}/meta.#{seg_id}", S3SpecHelper.meta_bytes((i * 100).to_i64))
362+
end
363+
364+
msg_store = LavinMQ::AMQP::Stream::S3MessageStore.new(msg_dir, nil, true, ::Log::Metadata.empty)
365+
# All 4 segments should be discovered despite pagination
366+
msg_store.@s3_segments.size.should eq 4
367+
msg_store.@size.should eq 400
368+
ensure
369+
server.try &.max_list_keys = 1000
370+
end
371+
end
372+
373+
describe "concurrent consumers" do
374+
it "supports two consumers reading different offsets" do
375+
with_amqp_server do |s|
376+
with_channel(s) do |ch|
377+
q_name = "s3-concurrent-consumers"
378+
q_args = AMQP::Client::Arguments.new({"x-queue-type": "stream"})
379+
q = ch.queue(q_name, durable: true, args: q_args)
380+
ch.prefetch 1
381+
382+
# Publish messages across 2 segments
383+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
384+
q.publish_confirm data
385+
q.publish_confirm "msg2"
386+
387+
# Consumer 1: reads from first offset
388+
msgs1 = Channel(String).new(2)
389+
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
390+
msgs1.send msg.body_io.to_s
391+
ch.basic_ack(msg.delivery_tag)
392+
end
393+
394+
# Consumer 2 on a separate channel: reads from last offset
395+
with_channel(s) do |ch2|
396+
ch2.prefetch 1
397+
q2 = ch2.queue(q_name, durable: true, args: q_args)
398+
msgs2 = Channel(String).new(2)
399+
q2.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "last"})) do |msg|
400+
msgs2.send msg.body_io.to_s
401+
ch2.basic_ack(msg.delivery_tag)
402+
end
403+
404+
# Both consumers should receive messages
405+
first_msg = msgs1.receive
406+
first_msg.bytesize.should eq LavinMQ::Config.instance.segment_size
407+
msgs2.receive.should eq "msg2"
408+
end
409+
410+
ch.queue_delete(q_name)
411+
end
412+
end
413+
end
414+
end
415+
416+
describe "segment cache" do
417+
it "prefetches segments for consumers and cleans up after removal" do
418+
with_amqp_server do |s|
419+
with_channel(s) do |ch|
420+
q_name = "s3-cache-test"
421+
q_args = AMQP::Client::Arguments.new({"x-queue-type": "stream"})
422+
q = ch.queue(q_name, durable: true, args: q_args)
423+
ch.prefetch 1
424+
425+
# Publish enough data to create multiple segments
426+
data = Bytes.new(LavinMQ::Config.instance.segment_size)
427+
3.times { q.publish_confirm data }
428+
429+
server = S3SpecHelper.s3_server.not_nil!
430+
queue_hash = Digest::SHA1.hexdigest(q_name)
431+
# Verify segments were uploaded to S3
432+
wait_for { server.keys.count { |k| k.includes?(queue_hash) && k.matches?(/msgs\.\d{10}$/) } >= 2 }
433+
434+
# Start a consumer to trigger prefetching
435+
msgs = Channel(String).new(4)
436+
q.subscribe(no_ack: false, args: AMQP::Client::Arguments.new({"x-stream-offset": "first"})) do |msg|
437+
msgs.send msg.body_io.to_s
438+
ch.basic_ack(msg.delivery_tag)
439+
end
440+
441+
# Consume at least one message to confirm cache is working
442+
msgs.receive
443+
444+
ch.queue_delete(q_name)
445+
end
446+
end
447+
end
448+
end
208449
end

spec/support/s3_server.cr

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ require "digest/md5"
33
require "xml"
44

55
# MinimalS3Server provides an in-memory S3-compatible server for testing
6-
# Supports: ListObjectsV2, GetObject, PutObject, DeleteObject
6+
# Supports: ListObjectsV2 (with pagination), GetObject, PutObject, DeleteObject
77
class MinimalS3Server
88
@storage = Hash(String, Bytes).new
99
@server : HTTP::Server?
1010
@port : Int32
11+
# Keys that should return 500 on GET (for testing download failures)
12+
property fail_keys = Set(String).new
13+
# Max keys per ListObjectsV2 response (default: 1000, lower for pagination tests)
14+
property max_list_keys : Int32 = 1000
1115

1216
def initialize(@port = 0)
1317
end
@@ -50,6 +54,7 @@ class MinimalS3Server
5054

5155
def clear
5256
@storage.clear
57+
@fail_keys.clear
5358
end
5459

5560
def put(key : String, data : Bytes)
@@ -101,14 +106,29 @@ class MinimalS3Server
101106

102107
private def handle_list_objects(request : HTTP::Request, response : HTTP::Server::Response, query : URI::Params)
103108
prefix = query["prefix"]? || ""
109+
continuation_token = query["continuation-token"]?
104110

105-
# Filter keys by prefix
106-
matching_keys = @storage.keys.select(&.starts_with?(prefix))
111+
# Filter and sort keys by prefix
112+
matching_keys = @storage.keys.select(&.starts_with?(prefix)).sort!
113+
114+
# Handle continuation token (skip keys up to and including the token)
115+
if continuation_token
116+
start_idx = matching_keys.index { |k| k > continuation_token } || matching_keys.size
117+
matching_keys = matching_keys[start_idx..]
118+
end
119+
120+
# Paginate
121+
is_truncated = matching_keys.size > @max_list_keys
122+
page_keys = matching_keys[0, @max_list_keys]
107123

108124
# Build XML response
109125
xml_resp = XML.build(indent: " ") do |xml|
110126
xml.element("ListBucketResult", xmlns: "http://s3.amazonaws.com/doc/2006-03-01/") do
111-
matching_keys.each do |key|
127+
xml.element("IsTruncated") { xml.text is_truncated.to_s }
128+
if is_truncated
129+
xml.element("NextContinuationToken") { xml.text page_keys.last }
130+
end
131+
page_keys.each do |key|
112132
data = @storage[key]
113133
etag = calculate_etag(data)
114134

@@ -130,6 +150,13 @@ class MinimalS3Server
130150
private def handle_get_object(request : HTTP::Request, response : HTTP::Server::Response)
131151
key = request.path.lstrip('/')
132152

153+
if @fail_keys.includes?(key)
154+
@fail_keys.delete(key) # Fail once then recover
155+
response.status = HTTP::Status::INTERNAL_SERVER_ERROR
156+
response.print "Simulated failure"
157+
return
158+
end
159+
133160
if data = @storage[key]?
134161
etag = calculate_etag(data)
135162
response.status = HTTP::Status::OK

0 commit comments

Comments
 (0)