Skip to content

Allow chunked uploads to work with IO streams other than files #110

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ source 'https://rubygems.org'

# Specify your gem's dependencies in boxr.gemspec
gemspec

gem "rake"
gem "dotenv", "~> 2.0"
gem "rspec", "~> 3.1"
gem "simplecov", "~> 0.9"
gem "awesome_print", "~> 1.8"
gem "lru_redux", "~> 0.8"
7 changes: 0 additions & 7 deletions boxr.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@ Gem::Specification.new do |spec|

spec.required_ruby_version = '>= 2.0'

spec.add_development_dependency "bundler", "~> 1.6"
spec.add_development_dependency "rake", "~> 10.0"
spec.add_development_dependency "rspec", "~> 3.1"
spec.add_development_dependency "simplecov", "~> 0.9"
spec.add_development_dependency "dotenv", "~> 0.11"
spec.add_development_dependency "awesome_print", "~> 1.8"
spec.add_development_dependency "lru_redux", "~> 0.8"
spec.add_development_dependency "parallel", "~> 1.0"

spec.add_runtime_dependency "httpclient", "~> 2.8"
Expand Down
1 change: 1 addition & 0 deletions lib/boxr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'addressable/template'
require 'jwt'
require 'securerandom'
require 'stringio'

require 'boxr/version'
require 'boxr/errors'
Expand Down
63 changes: 50 additions & 13 deletions lib/boxr/chunked_uploads.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ def chunked_upload_create_session_new_file(path_to_file, parent, name: nil)
end
end

def chunked_upload_create_session_new_file_from_io(io, parent, name)
def chunked_upload_create_session_new_file_from_io(io, parent, name, io_size: nil)
io_size ||= io.size
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing to keep in mind is that #size is not part of IO's public interface.


parent_id = ensure_id(parent)

uri = "#{UPLOAD_URI}/files/upload_sessions"
body = {folder_id: parent_id, file_size: io.size, file_name: name}
body = {folder_id: parent_id, file_size: io_size, file_name: name}
session_info, response = post(uri, body, content_type: "application/json")

session_info
Expand All @@ -27,10 +29,12 @@ def chunked_upload_create_session_new_version(path_to_file, file, name: nil)
end
end

def chunked_upload_create_session_new_version_from_io(io, file, name)
def chunked_upload_create_session_new_version_from_io(io, file, name, io_size: nil)
io_size ||= io.size

file_id = ensure_id(file)
uri = "#{UPLOAD_URI}/files/#{file_id}/upload_sessions"
body = {file_size: io.size, file_name: name}
body = {file_size: io_size, file_name: name}
session_info, response = post(uri, body, content_type: "application/json")

session_info
Expand All @@ -49,14 +53,17 @@ def chunked_upload_part(path_to_file, session_id, content_range)
end
end

def chunked_upload_part_from_io(io, session_id, content_range)
io.pos = content_range.min
def chunked_upload_part_from_io(io, session_id, content_range, io_size: nil, io_pos: nil)
io_size ||= io.size
io_pos ||= content_range.min

io.pos = io_pos
part_size = content_range.max - content_range.min + 1
data = io.read(part_size)
io.rewind

digest = "sha=#{Digest::SHA1.base64digest(data)}"
range = "bytes #{content_range.min}-#{content_range.max}/#{io.size}"
range = "bytes #{content_range.min}-#{content_range.max}/#{io_size}"

uri = "#{UPLOAD_URI}/files/upload_sessions/#{session_id}"
body = data
Expand Down Expand Up @@ -155,25 +162,55 @@ def chunked_upload_new_version_of_file_from_io(io, file, name, n_threads: 1, con

PARALLEL_GEM_REQUIREMENT = Gem::Requirement.create('~> 1.0').freeze

def chunked_upload_to_session_from_io(io, session, n_threads: 1, content_created_at: nil, content_modified_at: nil)
def content_ranges_for(session, io_size)
content_ranges = []
offset = 0
loop do
limit = [offset + session.part_size, io.size].min - 1
limit = [offset + session.part_size, io_size].min - 1
content_ranges << (offset..limit)
break if limit == io.size - 1
break if limit == io_size - 1

offset = limit + 1
end

content_ranges
end

def read_io_concurrently(io, content_range, lock)
part_size = content_range.max - content_range.min + 1
buf = String.new(capacity: part_size)

pread_supported = io.respond_to?(:pread)
if pread_supported
begin
io.pread(part_size, content_range.min, buf)
rescue SystemCallError
pread_supported = false
end
end

unless pread_supported
lock.synchronize do
io.pos = content_range.min
io.read(part_size, buf)
end
end

buf
end

def chunked_upload_to_session_from_io(io, session, n_threads: 1, io_size: nil, content_created_at: nil, content_modified_at: nil)
io_size ||= io.size

content_ranges = content_ranges_for(session, io_size)
parts =
if n_threads > 1
raise BoxrError.new(boxr_message: "parallel chunked uploads requires gem parallel (#{PARALLEL_GEM_REQUIREMENT}) to be loaded") unless gem_parallel_available?

lock = Mutex.new
Parallel.map(content_ranges, in_threads: n_threads) do |content_range|
File.open(io.path) do |io_dup|
chunked_upload_part_from_io(io_dup, session.id, content_range)
end
data = read_io_concurrently(io, content_range, lock)
chunked_upload_part_from_io(StringIO.new(data), session.id, content_range, io_size: io_size, io_pos: 0)
end
else
content_ranges.map do |content_range|
Expand Down
40 changes: 32 additions & 8 deletions spec/boxr/chunked_uploads_spec.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#rake spec SPEC_OPTS="-e \"invokes chunked uploads operations"\"

require "parallel"

describe "chunked uploads operations" do
it "invokes chunked uploads operations" do
it "invokes chunked uploads session-related operations" do
puts "create chunked upload session"
new_session = BOX_CLIENT.chunked_upload_create_session_new_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", @test_folder)
expect(new_session.id).not_to be_nil
Expand All @@ -16,14 +19,35 @@
puts "abort chunked upload session"
abort_info = BOX_CLIENT.chunked_upload_abort_session(new_session.id)
expect(abort_info).to eq({})
end

puts "upload a large file in chunks"
new_file = BOX_CLIENT.chunked_upload_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", @test_folder)
expect(new_file.name).to eq(TEST_LARGE_FILE_NAME)
test_file = new_file
shared_examples "chunked uploads data-upload" do |n_threads:|
fit "uploads chunked uploads upload-related operations (threads: #{n_threads})" do
puts "upload a large file in chunks"
new_file = BOX_CLIENT.chunked_upload_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", @test_folder, n_threads: n_threads)
expect(new_file.name).to eq(TEST_LARGE_FILE_NAME)
test_file = new_file

puts "upload new version of a large file in chunks"
new_version = BOX_CLIENT.chunked_upload_new_version_of_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", test_file)
expect(new_version.id).to eq(test_file.id)
puts "upload new version of a large file in chunks"
new_version = BOX_CLIENT.chunked_upload_new_version_of_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", test_file, n_threads: n_threads)
expect(new_version.id).to eq(test_file.id)

puts "upload a large file in chunks from IO stream"
filename = "yet another large file.txt"
io = StringIO.new
io << "1" * 21 * 1024**2
io.rewind
new_file = BOX_CLIENT.chunked_upload_file_from_io(io, @test_folder, filename, n_threads: n_threads)
expect(new_file.name).to eq(filename)
test_file = new_file

puts "upload new version of a large file in chunks from IO stream"
io.rewind
new_version = BOX_CLIENT.chunked_upload_new_version_of_file_from_io(io, test_file, filename, n_threads: n_threads)
expect(new_version.id).to eq(test_file.id)
end
end

it_behaves_like "chunked uploads data-upload", n_threads: 1
it_behaves_like "chunked uploads data-upload", n_threads: 2
end
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
require 'awesome_print'

RSpec.configure do |config|
config.before(:each) do
config.before(:each) do |test|
if test.metadata[:skip_reset]
puts "Skipping reset"
next
Expand Down