Skip to content

Commit bb6bfa7

Browse files
add streams with s3 storage
1 parent 97ebbb8 commit bb6bfa7

File tree

8 files changed

+1080
-9
lines changed

8 files changed

+1080
-9
lines changed

shard.lock

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@ shards:
1212
git: https://github.com/cloudamqp/amqp-client.cr.git
1313
version: 1.3.1
1414

15+
awscr-signer:
16+
git: https://github.com/taylorfinnell/awscr-signer.git
17+
version: 0.9.0
18+
1519
lz4:
1620
git: https://github.com/84codes/lz4.cr.git
1721
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

shard.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ dependencies:
3434
github: 84codes/lz4.cr
3535
mqtt-protocol:
3636
github: 84codes/mqtt-protocol.cr
37+
awscr-signer:
38+
github: taylorfinnell/awscr-signer
3739

3840
development_dependencies:
3941
ameba:

spec/stream_queue_s3_spec.cr

Lines changed: 343 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,343 @@
1+
require "./spec_helper"
2+
require "./../src/lavinmq/amqp/queue/stream_s3_message_store"
3+
4+
class S3SpecHelper
5+
def self.meta_bytes(offset)
6+
io = IO::Memory.new
7+
io.write_bytes 100_u32 # count
8+
io.write_bytes offset # first offset
9+
io.write_bytes offset # first timestamp
10+
io.rewind
11+
io.getb_to_end
12+
end
13+
14+
def self.segment_bytes(offset)
15+
io = IO::Memory.new
16+
io.write_bytes 4 # schema version
17+
100.times do |i|
18+
props = LavinMQ::AMQP::Properties.new(headers: LavinMQ::AMQP::Table.new({
19+
"x-stream-offset" => offset + i,
20+
}))
21+
msg = LavinMQ::Message.new("ex", "rk", "body", props)
22+
io.write_bytes msg
23+
end
24+
io.rewind
25+
io.getb_to_end
26+
end
27+
28+
def self.setup_responses : Hash(String, String | Bytes)
29+
{
30+
"?delimiter=%2F&encoding-type=url&list-type=2&prefix=" => RESPONSE_FILE_LIST,
31+
"/test_queue/msgs.0000000001" => S3SpecHelper.segment_bytes(0_i64),
32+
"/test_queue/msgs.0000000002" => S3SpecHelper.segment_bytes(100_i64),
33+
"/test_queue/msgs.0000000001.meta" => S3SpecHelper.meta_bytes(0_i64),
34+
"/test_queue/msgs.0000000002.meta" => S3SpecHelper.meta_bytes(100_i64),
35+
"/tmp/lavinmq-spec/test_queue" => RESPONSE_UPLOAD,
36+
}
37+
end
38+
end
39+
40+
RESPONSE_FILE_LIST = <<-XML
41+
<?xml version="1.0" encoding="UTF-8"?>
42+
<ListBucketResult>
43+
<Contents>
44+
<Key>test_queue/msgs.0000000001</Key>
45+
<ETag>"abc123"</ETag>
46+
<Size>5647</Size>
47+
</Contents>
48+
<Contents>
49+
<Key>test_queue/msgs.0000000002</Key>
50+
<ETag>"def456"</ETag>
51+
<Size>5647</Size>
52+
</Contents>
53+
<Contents>
54+
<Key>test_queue/msgs.0000000001.meta</Key>
55+
<ETag>"meta123"</ETag>
56+
<Size>20</Size>
57+
</Contents>
58+
<Contents>
59+
<Key>test_queue/msgs.0000000002.meta</Key>
60+
<ETag>"meta456"</ETag>
61+
<Size>20</Size>
62+
</Contents>
63+
</ListBucketResult>
64+
XML
65+
RESPONSE_UPLOAD = ""
66+
67+
class MockS3HTTPClient < HTTP::Client
68+
property responses = Hash(String, String | Bytes).new
69+
70+
def initialize(hostname)
71+
super(hostname, 443, tls: true)
72+
end
73+
74+
def get(path, headers = nil)
75+
match_response(path)
76+
end
77+
78+
def put(path, body = nil, headers = nil)
79+
match_response(path, "PUT")
80+
end
81+
82+
def delete(path, headers = nil)
83+
match_response(path, "DELETE")
84+
end
85+
86+
def get(path, headers = nil, &)
87+
yield match_response(path)
88+
end
89+
90+
def match_response(path, method = "GET") : HTTP::Client::Response
91+
resp = ""
92+
if path.starts_with?("?delimiter=%2F&encoding-type=url&list-type=2&prefix=")
93+
resp = @responses.first_value
94+
else
95+
@responses.each do |key, response|
96+
if path == key
97+
resp = response
98+
end
99+
end
100+
end
101+
if resp == "" && method == "GET"
102+
HTTP::Client::Response.new(404, "Not Found")
103+
elsif resp.is_a?(String)
104+
body_io = IO::Memory.new(resp)
105+
headers = HTTP::Headers{"Content-Length" => resp.bytesize.to_s, "ETag" => "abc123"}
106+
HTTP::Client::Response.new(200, resp, headers, body_io: body_io)
107+
else
108+
body_io = IO::Memory.new(resp)
109+
headers = HTTP::Headers{"Content-Length" => body_io.bytesize.to_s, "ETag" => "abc123"}
110+
HTTP::Client::Response.new(200, "", headers, body_io: body_io)
111+
end
112+
end
113+
end
114+
115+
# Mock HTTP client for S3 specs
116+
class MockStreamS3MessageStore < LavinMQ::AMQP::StreamQueue::StreamS3MessageStore
117+
@test_responses = Hash(String, String | Bytes).new
118+
119+
def initialize(@msg_dir : String, @test_responses : Hash(String, String | Bytes) = S3SpecHelper.setup_responses)
120+
super(@msg_dir, nil, true, ::Log::Metadata.empty)
121+
end
122+
123+
def http(uri : URI)
124+
h = MockS3HTTPClient.new("test.lavinmq.com")
125+
h.responses = @test_responses
126+
if signer = s3_signer
127+
signer.sign(::HTTP::Request.new("GET", uri.path))
128+
else
129+
raise "No S3 signer found"
130+
end
131+
h.as(::HTTP::Client)
132+
end
133+
end
134+
135+
describe LavinMQ::AMQP::StreamQueue::StreamS3MessageStore do
136+
before_each do
137+
LavinMQ::Config.instance.streams_s3_storage_region = "us-east-1"
138+
LavinMQ::Config.instance.streams_s3_storage_access_key_id = "test_access_key"
139+
LavinMQ::Config.instance.streams_s3_storage_secret_access_key = "test_secret_key"
140+
end
141+
142+
after_each do
143+
FileUtils.rm_rf("/tmp/lavinmq-spec/test_queue")
144+
end
145+
146+
it "should get file list from s3" do
147+
msg_dir = "/tmp/lavinmq-spec/test_queue"
148+
FileUtils.rm_rf(msg_dir)
149+
Dir.mkdir_p(msg_dir)
150+
msg_store = MockStreamS3MessageStore.new(msg_dir, S3SpecHelper.setup_responses)
151+
152+
msg_store.@s3_segments.size.should eq 2
153+
msg_store.@segments.size.should eq 2
154+
msg_store.@size.should eq 200
155+
end
156+
157+
it "should read & upload local file" do
158+
msg_dir = "/tmp/lavinmq-spec/test_queue"
159+
FileUtils.rm_rf(msg_dir)
160+
Dir.mkdir_p(msg_dir)
161+
File.write(File.join(msg_dir, "msgs.0000000003"), S3SpecHelper.segment_bytes(200_i64))
162+
msg_store = MockStreamS3MessageStore.new(msg_dir, S3SpecHelper.setup_responses)
163+
164+
msg_store.@s3_segments.size.should eq 3
165+
msg_store.@segments.size.should eq 3
166+
msg_store.@size.should eq 300
167+
end
168+
169+
# Starting with empty data dir, download files from s3 and consume
170+
it "can consume from s3" do
171+
msg_dir = "/tmp/lavinmq-spec/test_queue"
172+
FileUtils.rm_rf(msg_dir)
173+
Dir.mkdir_p(msg_dir)
174+
msg_store = MockStreamS3MessageStore.new(msg_dir, S3SpecHelper.setup_responses)
175+
176+
consumer = MockStreamConsumer.new(0_i64, 0_u32, 0)
177+
if env = msg_store.shift?(consumer)
178+
String.new(env.message.body).should eq "body"
179+
else
180+
fail "Expected to get a message from S3"
181+
end
182+
end
183+
184+
describe "no meta files" do
185+
it "should verify local segments" do
186+
msg_dir = "/tmp/lavinmq-spec/test_queue"
187+
FileUtils.rm_rf(msg_dir)
188+
Dir.mkdir_p(msg_dir)
189+
File.write(File.join(msg_dir, "msgs.0000000001"), S3SpecHelper.segment_bytes(0_i64))
190+
File.write(File.join(msg_dir, "msgs.0000000002"), S3SpecHelper.segment_bytes(100_i64))
191+
digest = Digest::MD5.new
192+
digest.update(File.open(File.join(msg_dir, "msgs.0000000001"), &.getb_to_end))
193+
etag1 = digest.hexfinal
194+
digest = Digest::MD5.new
195+
digest.update(File.open(File.join(msg_dir, "msgs.0000000002"), &.getb_to_end))
196+
etag2 = digest.hexfinal
197+
198+
file_list = <<-XML
199+
<?xml version="1.0" encoding="UTF-8"?>
200+
<ListBucketResult>
201+
<Contents>
202+
<Key>test_queue/msgs.0000000001</Key>
203+
<ETag>"#{etag1}"</ETag>
204+
<Size>5647</Size>
205+
</Contents>
206+
<Contents>
207+
<Key>test_queue/msgs.0000000002</Key>
208+
<ETag>"#{etag2}"</ETag>
209+
<Size>5647</Size>
210+
</Contents>
211+
</ListBucketResult>
212+
XML
213+
responses = {
214+
"?delimiter=%2F&encoding-type=url&list-type=2&prefix=" => file_list,
215+
"/test_queue/msgs.0000000001" => "foo".to_slice,
216+
"/test_queue/msgs.0000000002" => "bar".to_slice,
217+
"/tmp/lavinmq-spec/test_queue" => RESPONSE_UPLOAD,
218+
}
219+
msg_store = MockStreamS3MessageStore.new(msg_dir, responses)
220+
221+
msg_store.@s3_segments.size.should eq 2
222+
msg_store.@segments.size.should eq 3
223+
msg_store.@size.should eq 200
224+
digest = Digest::MD5.new
225+
digest.update(File.open(File.join(msg_dir, "msgs.0000000001"), &.getb_to_end))
226+
digest.hexfinal.should eq etag1
227+
digest = Digest::MD5.new
228+
digest.update(File.open(File.join(msg_dir, "msgs.0000000002"), &.getb_to_end))
229+
digest.hexfinal.should eq etag2
230+
end
231+
232+
it "should read local segments" do
233+
msg_dir = "/tmp/lavinmq-spec/test_queue"
234+
FileUtils.rm_rf(msg_dir)
235+
Dir.mkdir_p(msg_dir)
236+
File.write(File.join(msg_dir, "msgs.0000000001"), S3SpecHelper.segment_bytes(0_i64))
237+
File.write(File.join(msg_dir, "msgs.0000000002"), S3SpecHelper.segment_bytes(100_i64))
238+
file_list = <<-XML
239+
<?xml version="1.0" encoding="UTF-8"?>
240+
<ListBucketResult>
241+
</ListBucketResult>
242+
XML
243+
responses = {
244+
"?delimiter=%2F&encoding-type=url&list-type=2&prefix=" => file_list,
245+
"/test_queue/msgs.0000000001" => S3SpecHelper.segment_bytes(0_i64),
246+
"/test_queue/msgs.0000000002" => S3SpecHelper.segment_bytes(100_i64),
247+
"/tmp/lavinmq-spec/test_queue" => RESPONSE_UPLOAD,
248+
}
249+
msg_store = MockStreamS3MessageStore.new(msg_dir, responses)
250+
251+
msg_store.@s3_segments.size.should eq 2
252+
msg_store.@segments.size.should eq 3
253+
msg_store.@size.should eq 200
254+
end
255+
256+
it "should download segments from s3" do
257+
msg_dir = "/tmp/lavinmq-spec/test_queue"
258+
FileUtils.rm_rf(msg_dir)
259+
Dir.mkdir_p(msg_dir)
260+
file_list = <<-XML
261+
<?xml version="1.0" encoding="UTF-8"?>
262+
<ListBucketResult>
263+
<Contents>
264+
<Key>test_queue/msgs.0000000001</Key>
265+
<ETag>"abc"</ETag>
266+
<Size>5647</Size>
267+
</Contents>
268+
<Contents>
269+
<Key>test_queue/msgs.0000000002</Key>
270+
<ETag>"cde"</ETag>
271+
<Size>5647</Size>
272+
</Contents>
273+
</ListBucketResult>
274+
XML
275+
responses = {
276+
"?delimiter=%2F&encoding-type=url&list-type=2&prefix=" => file_list,
277+
"/test_queue/msgs.0000000001" => S3SpecHelper.segment_bytes(0_i64),
278+
"/test_queue/msgs.0000000002" => S3SpecHelper.segment_bytes(100_i64),
279+
"/tmp/lavinmq-spec/test_queue" => RESPONSE_UPLOAD,
280+
}
281+
msg_store = MockStreamS3MessageStore.new(msg_dir, responses)
282+
msg_store.@s3_segments.size.should eq 2
283+
msg_store.@segments.size.should eq 3
284+
msg_store.@size.should eq 200
285+
end
286+
end
287+
288+
it "should raise if not configed properly" do
289+
LavinMQ::Config.instance.streams_s3_storage_region = nil
290+
LavinMQ::Config.instance.streams_s3_storage_access_key_id = nil
291+
LavinMQ::Config.instance.streams_s3_storage_secret_access_key = nil
292+
293+
msg_dir = "/tmp/lavinmq-spec/test_queue"
294+
FileUtils.rm_rf(msg_dir)
295+
Dir.mkdir_p(msg_dir)
296+
297+
expect_raises(SpecExit) do
298+
MockStreamS3MessageStore.new(msg_dir, S3SpecHelper.setup_responses)
299+
end
300+
end
301+
end
302+
303+
class MockStreamConsumer < LavinMQ::AMQP::StreamConsumer
304+
@channel = MockChannel.new
305+
@tag = "test_tag"
306+
307+
def initialize(@offset : Int64, @segment : UInt32 = 0, @pos : UInt32 = 0)
308+
@users = LavinMQ::Auth::UserStore.new("/tmp/lavinmq-spec/test_queue", LavinMQ::Clustering::NoopServer.new)
309+
@vhost = LavinMQ::VHost.new("vhost", "/tmp/lavinmq-spec/test_queue", @users, LavinMQ::Clustering::NoopServer.new)
310+
@queue = LavinMQ::AMQP::Queue.new(@vhost, "test_queue", arguments: LavinMQ::AMQP::Table.new)
311+
end
312+
end
313+
314+
class MockChannel < LavinMQ::AMQP::Channel
315+
@client = MockClient.new
316+
@id = 1
317+
@vhost = "/"
318+
@metadata = ::Log::Metadata.empty
319+
320+
def initialize
321+
@log = LavinMQ::Logger.new(Log, @metadata)
322+
end
323+
end
324+
325+
class MockClient < LavinMQ::AMQP::Client
326+
@socket = IO::Memory.new
327+
@connection_info = LavinMQ::ConnectionInfo.new(Socket::IPAddress.new("127.0.0.1", 5672), Socket::IPAddress.new("127.0.0.1", 5672))
328+
@user = LavinMQ::Auth::User.new("test_user", "+pHuxkR9fCyrrwXjOD4BP4XbzO3l8LJr8YkThMgJ0yVHFRE+", "SHA256", Array(LavinMQ::Tag).new)
329+
@max_frame_size = 4096_u32
330+
@actual_channel_max = 1000_u16
331+
@channel_max = 1000_u16
332+
@heartbeat_timeout = 60_u32
333+
@heartbeat_interval = 60_u32
334+
@auth_mechanism = "PLAIN"
335+
@client_properties = LavinMQ::AMQP::Table.new
336+
@metadata = ::Log::Metadata.empty
337+
338+
def initialize
339+
users = LavinMQ::Auth::UserStore.new("/tmp/lavinmq-spec/test_queue", LavinMQ::Clustering::NoopServer.new)
340+
@vhost = LavinMQ::VHost.new("vhost", "/tmp/lavinmq-spec/test_queue", users, LavinMQ::Clustering::NoopServer.new)
341+
@log = LavinMQ::Logger.new(Log, @metadata)
342+
end
343+
end

0 commit comments

Comments
 (0)