Skip to content

Commit 6f2c075

Browse files
handle changes to meta file naming
1 parent a7dbfa6 commit 6f2c075

File tree

4 files changed

+49
-31
lines changed

4 files changed

+49
-31
lines changed

spec/stream_s3_spec.cr

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@ module S3SpecHelper
3333
"?delimiter=%2F&encoding-type=url&list-type=2&prefix=" => RESPONSE_FILE_LIST,
3434
"/#{DATA_DIR}/msgs.0000000001" => S3SpecHelper.segment_bytes(0_i64),
3535
"/#{DATA_DIR}/msgs.0000000002" => S3SpecHelper.segment_bytes(100_i64),
36-
"/#{DATA_DIR}/msgs.0000000001.meta" => S3SpecHelper.meta_bytes(0_i64),
37-
"/#{DATA_DIR}/msgs.0000000002.meta" => S3SpecHelper.meta_bytes(100_i64),
36+
"/#{DATA_DIR}/meta.0000000001" => S3SpecHelper.meta_bytes(0_i64),
37+
"/#{DATA_DIR}/meta.0000000002" => S3SpecHelper.meta_bytes(100_i64),
3838
"/tmp/lavinmq-spec#{DATA_DIR}" => RESPONSE_UPLOAD,
3939
}
4040
end
@@ -54,12 +54,12 @@ RESPONSE_FILE_LIST = <<-XML
5454
<Size>5647</Size>
5555
</Contents>
5656
<Contents>
57-
<Key>#{DATA_DIR}/msgs.0000000001.meta</Key>
57+
<Key>#{DATA_DIR}/meta.0000000001</Key>
5858
<ETag>"meta123"</ETag>
5959
<Size>20</Size>
6060
</Contents>
6161
<Contents>
62-
<Key>#{DATA_DIR}/msgs.0000000002.meta</Key>
62+
<Key>#{DATA_DIR}/meta.0000000002</Key>
6363
<ETag>"meta456"</ETag>
6464
<Size>20</Size>
6565
</Contents>
@@ -330,27 +330,27 @@ describe LavinMQ::AMQP::Stream::S3MessageStore do
330330
<Size>5647</Size>
331331
</Contents>
332332
<Contents>
333-
<Key>#{DATA_DIR}/msgs.0000000001.meta</Key>
333+
<Key>#{DATA_DIR}/meta.0000000001</Key>
334334
<ETag>"meta123"</ETag>
335335
<Size>20</Size>
336336
</Contents>
337337
<Contents>
338-
<Key>#{DATA_DIR}/msgs.0000000002.meta</Key>
338+
<Key>#{DATA_DIR}/meta.0000000002</Key>
339339
<ETag>"meta123"</ETag>
340340
<Size>20</Size>
341341
</Contents>
342342
<Contents>
343-
<Key>#{DATA_DIR}/msgs.0000000003.meta</Key>
343+
<Key>#{DATA_DIR}/meta.0000000003</Key>
344344
<ETag>"meta123"</ETag>
345345
<Size>20</Size>
346346
</Contents>
347347
<Contents>
348-
<Key>#{DATA_DIR}/msgs.0000000004.meta</Key>
348+
<Key>#{DATA_DIR}/meta.0000000004</Key>
349349
<ETag>"meta123"</ETag>
350350
<Size>20</Size>
351351
</Contents>
352352
<Contents>
353-
<Key>#{DATA_DIR}/msgs.0000000005.meta</Key>
353+
<Key>#{DATA_DIR}/meta.0000000005</Key>
354354
<ETag>"meta123"</ETag>
355355
<Size>20</Size>
356356
</Contents>
@@ -363,11 +363,11 @@ describe LavinMQ::AMQP::Stream::S3MessageStore do
363363
"/#{DATA_DIR}/msgs.0000000003" => S3SpecHelper.segment_bytes(200_i64),
364364
"/#{DATA_DIR}/msgs.0000000004" => S3SpecHelper.segment_bytes(300_i64),
365365
"/#{DATA_DIR}/msgs.0000000005" => S3SpecHelper.segment_bytes(400_i64),
366-
"/#{DATA_DIR}/msgs.0000000001.meta" => S3SpecHelper.meta_bytes(0_i64),
367-
"/#{DATA_DIR}/msgs.0000000002.meta" => S3SpecHelper.meta_bytes(100_i64),
368-
"/#{DATA_DIR}/msgs.0000000003.meta" => S3SpecHelper.meta_bytes(200_i64),
369-
"/#{DATA_DIR}/msgs.0000000004.meta" => S3SpecHelper.meta_bytes(300_i64),
370-
"/#{DATA_DIR}/msgs.0000000005.meta" => S3SpecHelper.meta_bytes(400_i64),
366+
"/#{DATA_DIR}/meta.0000000001" => S3SpecHelper.meta_bytes(0_i64),
367+
"/#{DATA_DIR}/meta.0000000002" => S3SpecHelper.meta_bytes(100_i64),
368+
"/#{DATA_DIR}/meta.0000000003" => S3SpecHelper.meta_bytes(200_i64),
369+
"/#{DATA_DIR}/meta.0000000004" => S3SpecHelper.meta_bytes(300_i64),
370+
"/#{DATA_DIR}/meta.0000000005" => S3SpecHelper.meta_bytes(400_i64),
371371
"/tmp/lavinmq-spec/#{DATA_DIR}" => RESPONSE_UPLOAD,
372372
}
373373
S3SpecHelper.responses = responses

src/lavinmq/amqp/stream/s3_message_store.cr

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ module LavinMQ::AMQP
7070
private def load_stats_from_local_files(segments, is_long_queue)
7171
counter = segments.size
7272
@segments.each do |seg, mfile|
73-
if @segment_msg_count[seg].zero? # only load segment if stats not already read from .meta file
73+
if @segment_msg_count[seg].zero? # only load segment if stats not already read from meta file
7474
@log.debug { "Loading stats for local segment: #{seg}, not uploaded to S3 yet" }
7575
begin
7676
read_metadata_file(seg, mfile)
@@ -125,8 +125,9 @@ module LavinMQ::AMQP
125125
produce_metadata(seg, mfile)
126126
write_metadata_file(seg, mfile)
127127
slice = Bytes.new(20)
128-
File.open("#{path}.meta", &.read_fully(slice))
129-
@storage_client.upload_file_to_s3("/#{path[Config.instance.data_dir.bytesize + 1..]}.meta", slice)
128+
File.open(meta_file_name(path), &.read_fully(slice))
129+
meta_s3_path = meta_file_name(path[Config.instance.data_dir.bytesize + 1..])
130+
@storage_client.upload_file_to_s3("/#{meta_s3_path}", slice)
130131
else
131132
@log.error { "Failed to load segment #{path}" }
132133
end
@@ -147,22 +148,24 @@ module LavinMQ::AMQP
147148
end
148149

149150
private def load_stats_from_meta_file(path, seg, s3file)
150-
if File.exists?("#{path}.meta")
151-
read_metadata_file(seg, path, s3file[:size])
151+
meta_path = meta_file_name(path)
152+
if File.exists?(meta_path)
153+
read_metadata_file(seg, meta_path, s3file[:size])
152154
unless s3file[:meta] # upload to s3 unless it exists there
153155
Log.info { "Uploading metadata file for segment #{seg} to S3" }
154156
slice = Bytes.new(20)
155-
File.open("#{path}.meta", &.read_fully(slice))
156-
@storage_client.upload_file_to_s3("/#{path[Config.instance.data_dir.bytesize + 1..]}.meta", slice)
157+
File.open(meta_path, &.read_fully(slice))
158+
meta_s3_path = meta_file_name(path[Config.instance.data_dir.bytesize + 1..])
159+
@storage_client.upload_file_to_s3("/#{meta_s3_path}", slice)
157160
end
158161
elsif meta_file = @storage_client.download_meta_file(seg, @s3_segments, @storage_client.http_client)
159162
read_metadata_file_from_s3(seg, meta_file)
160163
end
161164
end
162165

163-
private def read_metadata_file(seg, path, bytesize)
164-
return unless File.exists?("#{path}.meta")
165-
File.open("#{path}.meta") do |file|
166+
private def read_metadata_file(seg, meta_path, bytesize)
167+
return unless File.exists?(meta_path)
168+
File.open(meta_path) do |file|
166169
read_metadata(file, seg)
167170
@bytesize += bytesize
168171
end
@@ -268,9 +271,9 @@ module LavinMQ::AMQP
268271
@log.debug { "Uploading file to s3: /#{segment.path[Config.instance.data_dir.bytesize + 1..]}" }
269272
path = "/#{segment.path[Config.instance.data_dir.bytesize + 1..]}"
270273
etag = @storage_client.upload_file_to_s3(path, segment.to_slice)
271-
meta_path = "#{segment.path}.meta"
274+
meta_path = meta_file_name(segment.path)
272275
if File.exists?(meta_path)
273-
@storage_client.upload_file_to_s3("#{path}.meta", File.open(meta_path, &.getb_to_end))
276+
@storage_client.upload_file_to_s3(meta_file_name(path), File.open(meta_path, &.getb_to_end))
274277
end
275278

276279
@s3_segments[seg_id] = {

src/lavinmq/amqp/stream/s3_storage_client.cr

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ module LavinMQ::AMQP
6363

6464
if key_node = content.xpath_node(".//*[local-name()='Key']")
6565
path = key_node.content
66-
if match = path.match(/\/msgs\.(\d{10})\.meta$/)
66+
if match = path.match(/\/meta\.(\d{10})$/)
6767
id = match[1].to_u32
6868
update_s3_segment_list(s3_segments, id, "", "", 0_i64, true)
6969
return
@@ -133,7 +133,7 @@ module LavinMQ::AMQP
133133
@log.debug { "Downloading meta for segment: #{segment_id}" }
134134
return unless s3_segments[segment_id]?
135135

136-
s3_meta_path = "#{s3_segments[segment_id][:path]}.meta"
136+
s3_meta_path = meta_file_name(s3_segments[segment_id][:path])
137137
path = File.join(Config.instance.data_dir, s3_meta_path)
138138

139139
h.get("/#{s3_meta_path}") do |response|
@@ -166,7 +166,7 @@ module LavinMQ::AMQP
166166
def delete_from_s3(s3_seg)
167167
h = http_client
168168
delete_from_s3(h, s3_seg[:path])
169-
delete_from_s3(h, "#{s3_seg[:path]}.meta")
169+
delete_from_s3(h, meta_file_name(s3_seg[:path]))
170170
end
171171

172172
def delete_from_s3(h : ::HTTP::Client, path : String)
@@ -216,5 +216,19 @@ module LavinMQ::AMQP
216216
end
217217
temp_path
218218
end
219+
220+
private def meta_file_name(path : String) : String
221+
# We assume the path ends with "msgs.<10 chars>"
222+
raw = path.to_slice
223+
224+
# This is basically the same as using sub("msgs.", "meta.") but with #sub
225+
# the first occurrence of "msgs." would be replaced, not the last one.
226+
# This also requires only one allocation.
227+
String.build(path.size) do |io|
228+
io.write raw[0, raw.size - 15]
229+
io.write "meta.".to_slice
230+
io.write raw[-10..]
231+
end
232+
end
219233
end
220234
end

src/lavinmq/amqp/stream/stream_message_store.cr

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,8 @@ module LavinMQ::AMQP
376376
end
377377

378378
private def read_metadata_file(seg, mfile)
379-
File.open("#{mfile.path}.meta") do |file|
379+
meta_path = meta_file_name(mfile.path)
380+
File.open(meta_path) do |file|
380381
count = file.read_bytes(UInt32)
381382
@offset_index[seg] = file.read_bytes(Int64)
382383
@timestamp_index[seg] = file.read_bytes(Int64)
@@ -386,7 +387,7 @@ module LavinMQ::AMQP
386387
mfile.dontneed
387388
@bytesize += bytesize
388389
@size += count
389-
@log.debug { "Reading count from #{mfile.path}.meta: #{count}" }
390+
@log.debug { "Reading count from #{meta_path}: #{count}" }
390391
end
391392
end
392393
end

0 commit comments

Comments
 (0)