Skip to content

Commit b1c8d4a

Browse files
Implement StorageTables::Service::MirrorService (#71)
* Implement StorageTables::Service::MirrorService for synchronized file uploads across primary and mirror services * Implement asynchronous mirroring for uploaded blobs and refactor MirrorService * Refactor StorageTables module and improve MirrorJob implementation * Implement StorageTables::Downloader for file downloading and integrity verification * Add downloader requirement to StorageTables service * Refactor mirror service tests to use checksum instead of key for downloads and uploads * Update comments in StorageTables service for clarity and accuracy * Refactor comments and method signatures in StorageTables service for clarity and consistency * Remove unnecessary EOF assertion in mirror service tests for cleaner code * Update comments in mirror service for accuracy and consistency * Remove commented-out test for URL generation in mirror service tests
1 parent cdebd9c commit b1c8d4a

File tree

7 files changed

+202
-6
lines changed

7 files changed

+202
-6
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
require "active_support/core_ext/object/try"
4+
5+
module StorageTables
6+
# Provides asynchronous mirroring of directly-uploaded blobs.
7+
class MirrorJob < ActiveJob::Base # rubocop:disable Rails/ApplicationJob
8+
queue_as { StorageTables.queues[:mirror] }
9+
10+
discard_on StorageTables::FileNotFoundError
11+
retry_on StorageTables::IntegrityError, attempts: 10, wait: :polynomially_longer
12+
13+
def perform(checksum)
14+
StorageTables::Blob.service.try(:mirror, checksum)
15+
end
16+
end
17+
end

lib/storage_tables.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ module StorageTables
2626
mattr_accessor :routes_prefix, default: "/rails/storage_tables"
2727
mattr_accessor :draw_routes, default: true
2828

29+
mattr_accessor :queues, default: {}
30+
2931
def self.deprecator
3032
@deprecator ||= ActiveSupport::Deprecation.new("0.2.0", "StorageTables")
3133
end

lib/storage_tables/service.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ class Service
1010
attr_accessor :name
1111

1212
class << self
13-
# Configure an Active Storage service by name from a set of configurations,
14-
# typically loaded from a YAML file. The Active Storage engine uses this
15-
# to set the global Active Storage service when the app boots.
13+
# Configure an Storage Tables service by name from a set of configurations,
14+
# typically loaded from a YAML file. The Storage Tables engine uses this
15+
# to set the global Storage Tables service when the app boots.
1616
def configure(service_name, configurations)
1717
Configurator.build(service_name, configurations)
1818
end
@@ -23,7 +23,7 @@ def configure(service_name, configurations)
2323
# Passes the configurator and all of the service's config as keyword args.
2424
#
2525
# See MirrorService for an example.
26-
def build(name:, **service_config) # :nodoc:
26+
def build(configurator:, name:, **service_config) # rubocop:disable Lint/UnusedMethodArgument
2727
new(**service_config).tap do |service_instance|
2828
service_instance.name = name
2929
end

lib/storage_tables/service/configurator.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def initialize(configurations)
2020
def build(service_name)
2121
config = config_for(service_name.to_sym)
2222
resolve(config.fetch(:service)).build(
23-
**config.except(:service), name: service_name
23+
**config.except(:service), name: service_name, configurator: self
2424
)
2525
end
2626

lib/storage_tables/service/disk_service.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class Service
1111
class DiskService < Service
1212
attr_accessor :root
1313

14-
def initialize(root:, public: false)
14+
def initialize(root:, public: false, **)
1515
@root = root
1616
@public = public
1717

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# frozen_string_literal: true
2+
3+
require "active_support/core_ext/module/delegation"
4+
5+
module StorageTables
6+
class Service
7+
# = Storage Tables Mirror \Service
8+
#
9+
# Wraps a set of mirror services and provides a single StorageTables::Service object that will all
10+
# have the files uploaded to them. A +primary+ service is designated to answer calls to:
11+
# * +download+
12+
# * +exists?+
13+
# * +url+
14+
# * +url_for_direct_upload+
15+
# * +headers_for_direct_upload+
16+
class MirrorService < Service
17+
attr_reader :primary, :mirrors
18+
19+
delegate :download, :download_chunk, :exist?, :url,
20+
:url_for_direct_upload, :headers_for_direct_upload, :path_for, :compose, to: :primary
21+
22+
# Stitch together from named services.
23+
def self.build(primary:, mirrors:, name:, configurator:, **) # :nodoc:
24+
new(
25+
primary: configurator.build(primary),
26+
mirrors: mirrors.collect { |mirror_name| configurator.build mirror_name }
27+
).tap do |service_instance|
28+
service_instance.name = name
29+
end
30+
end
31+
32+
def initialize(primary:, mirrors:) # rubocop:disable Lint/MissingSuper
33+
@primary = primary
34+
@mirrors = mirrors
35+
@executor = Concurrent::ThreadPoolExecutor.new(
36+
min_threads: 1,
37+
max_threads: mirrors.size,
38+
max_queue: 0,
39+
fallback_policy: :caller_runs,
40+
idle_time: 60
41+
)
42+
end
43+
44+
# Upload the +io+ to the +checksum+ specified to all services. The upload to the primary service is done
45+
# synchronously whereas the upload to the mirrors is done asynchronously. If a +checksum+ is provided, all
46+
# services will ensure a match when the upload has completed or raise an StorageTables::IntegrityError.
47+
def upload(checksum, io, **)
48+
io.rewind
49+
primary.upload(checksum, io, **)
50+
mirror_later checksum
51+
end
52+
53+
# Delete the file at the +checksum+ on all services.
54+
def delete(checksum)
55+
perform_across_services :delete, checksum
56+
end
57+
58+
def mirror_later(checksum)
59+
StorageTables::MirrorJob.perform_later checksum
60+
end
61+
62+
# Copy the file at the +checksum+ from the primary service to each of the mirrors where it doesn't already exist.
63+
def mirror(checksum)
64+
instrument(:mirror, checksum) do
65+
if (mirrors_in_need_of_mirroring = mirrors.reject { |service| service.exist?(checksum) }).any?
66+
primary.open(checksum) do |io|
67+
mirrors_in_need_of_mirroring.each do |service|
68+
io.rewind
69+
service.upload checksum, io
70+
end
71+
end
72+
end
73+
end
74+
end
75+
76+
private
77+
78+
def each_service(&)
79+
[primary, *mirrors].each(&)
80+
end
81+
82+
def perform_across_services(method, *args)
83+
tasks = each_service.collect do |service|
84+
Concurrent::Promise.execute(executor: @executor) do
85+
service.public_send method, *args
86+
end
87+
end
88+
tasks.each(&:value!)
89+
end
90+
end
91+
end
92+
end
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# frozen_string_literal: true
2+
3+
require "service/shared_service_tests"
4+
5+
module StorageTables
6+
class Service
7+
class MirrorServiceTest < ActiveSupport::TestCase
8+
mirror_config = (1..3).to_h do |i|
9+
["mirror_#{i}",
10+
{ service: "Disk",
11+
root: Dir.mktmpdir("active_storage_tests_mirror_#{i}") }]
12+
end
13+
14+
config = mirror_config.merge \
15+
mirror: { service: "Mirror", primary: "primary", mirrors: mirror_config.keys },
16+
primary: { service: "Disk", root: Dir.mktmpdir("active_storage_tests_primary") }
17+
18+
SERVICE = StorageTables::Service.configure :mirror, config
19+
20+
include StorageTables::Service::SharedServiceTests
21+
include ActiveJob::TestHelper
22+
23+
test "name" do
24+
assert_equal :mirror, @service.name
25+
end
26+
27+
test "uploading to all services" do
28+
old_service = StorageTables::Blob.service
29+
StorageTables::Blob.service = @service
30+
31+
data = "Something else entirely!"
32+
io = StringIO.new(data)
33+
checksum = generate_checksum(data)
34+
35+
assert_performed_jobs 1, only: StorageTables::MirrorJob do
36+
@service.upload checksum, io.tap(&:read)
37+
end
38+
39+
assert_equal data, @service.primary.download(checksum)
40+
@service.mirrors.each do |mirror|
41+
assert_equal data, mirror.download(checksum)
42+
end
43+
ensure
44+
@service.delete checksum
45+
StorageTables::Blob.service = old_service
46+
end
47+
48+
test "downloading from primary service" do
49+
data = "Something else entirely!"
50+
checksum = generate_checksum(data)
51+
52+
@service.primary.upload(checksum, StringIO.new(data))
53+
54+
assert_equal data, @service.download(checksum)
55+
end
56+
57+
test "deleting from all services" do
58+
@service.delete @checksum
59+
60+
assert_not SERVICE.primary.exist?(@checksum)
61+
SERVICE.mirrors.each do |mirror|
62+
assert_not mirror.exist?(@checksum)
63+
end
64+
end
65+
66+
test "mirroring a file from the primary service to secondary services where it doesn't exist" do
67+
data = "Something else entirely!"
68+
checksum = generate_checksum(data)
69+
70+
@service.primary.upload(checksum, StringIO.new(data))
71+
@service.mirrors.third.upload checksum, StringIO.new(data)
72+
73+
@service.mirror(checksum)
74+
75+
assert_equal data, @service.mirrors.first.download(checksum)
76+
assert_equal data, @service.mirrors.second.download(checksum)
77+
assert_equal data, @service.mirrors.third.download(checksum)
78+
end
79+
80+
test "path for file in primary service" do
81+
assert_equal @service.primary.path_for(@checksum), @service.path_for(@checksum)
82+
end
83+
end
84+
end
85+
end

0 commit comments

Comments
 (0)