diff --git a/Gemfile b/Gemfile index b2c23f0..027ce0e 100644 --- a/Gemfile +++ b/Gemfile @@ -2,3 +2,11 @@ source 'https://rubygems.org' # Specify your gem's dependencies in boxr.gemspec gemspec + +gem "rake" +gem "dotenv", "~> 2.0" +gem "rspec", "~> 3.1" +gem "simplecov", "~> 0.9" +gem "awesome_print", "~> 1.8" +gem "lru_redux", "~> 0.8" +gem "parallel", "~> 1.0" diff --git a/README.md b/README.md index 32ca578..398731a 100644 --- a/README.md +++ b/README.md @@ -258,17 +258,17 @@ restore_trashed_file(file, name: nil, parent: nil) ```ruby chunked_upload_create_session_new_file(path_to_file, parent, name: nil) -chunked_upload_create_session_new_file_from_io(io, parent, name) +chunked_upload_create_session_new_file_from_io(io, parent, name, io_size: nil) chunked_upload_create_session_new_version(path_to_file, file, name: nil) -chunked_upload_create_session_new_version_from_io(io, file, name) +chunked_upload_create_session_new_version_from_io(io, file, name, io_size: nil) chunked_upload_get_session(session_id) chunked_upload_part(path_to_file, session_id, content_range) -chunked_upload_part_from_io(io, session_id, content_range) +chunked_upload_part_from_io(io, session_id, content_range, io_size: nil, io_pos: nil) chunked_upload_list_parts(session_id, limit: nil, offset: nil) diff --git a/boxr.gemspec b/boxr.gemspec index 52cdbee..021e84f 100644 --- a/boxr.gemspec +++ b/boxr.gemspec @@ -20,15 +20,6 @@ Gem::Specification.new do |spec| spec.required_ruby_version = '>= 2.0' - spec.add_development_dependency "bundler", "~> 1.6" - spec.add_development_dependency "rake", "~> 10.0" - spec.add_development_dependency "rspec", "~> 3.1" - spec.add_development_dependency "simplecov", "~> 0.9" - spec.add_development_dependency "dotenv", "~> 0.11" - spec.add_development_dependency "awesome_print", "~> 1.8" - spec.add_development_dependency "lru_redux", "~> 0.8" - spec.add_development_dependency "parallel", "~> 1.0" - spec.add_runtime_dependency "httpclient", "~> 2.8" spec.add_runtime_dependency "hashie", "~> 3.5" spec.add_runtime_dependency "addressable", "~> 2.3" diff --git a/lib/boxr.rb b/lib/boxr.rb index 605d14e..3e0382e 100644 --- a/lib/boxr.rb +++ b/lib/boxr.rb @@ -4,6 +4,7 @@ require 'addressable/template' require 'jwt' require 'securerandom' +require 'stringio' require 'boxr/version' require 'boxr/errors' diff --git a/lib/boxr/chunked_uploads.rb b/lib/boxr/chunked_uploads.rb index f2e91ba..2ad549e 100644 --- a/lib/boxr/chunked_uploads.rb +++ b/lib/boxr/chunked_uploads.rb @@ -9,11 +9,13 @@ def chunked_upload_create_session_new_file(path_to_file, parent, name: nil) end end - def chunked_upload_create_session_new_file_from_io(io, parent, name) + def chunked_upload_create_session_new_file_from_io(io, parent, name, io_size: nil) + io_size ||= io.size + parent_id = ensure_id(parent) uri = "#{UPLOAD_URI}/files/upload_sessions" - body = {folder_id: parent_id, file_size: io.size, file_name: name} + body = {folder_id: parent_id, file_size: io_size, file_name: name} session_info, response = post(uri, body, content_type: "application/json") session_info @@ -27,10 +29,12 @@ def chunked_upload_create_session_new_version(path_to_file, file, name: nil) end end - def chunked_upload_create_session_new_version_from_io(io, file, name) + def chunked_upload_create_session_new_version_from_io(io, file, name, io_size: nil) + io_size ||= io.size + file_id = ensure_id(file) uri = "#{UPLOAD_URI}/files/#{file_id}/upload_sessions" - body = {file_size: io.size, file_name: name} + body = {file_size: io_size, file_name: name} session_info, response = post(uri, body, content_type: "application/json") session_info @@ -49,14 +53,17 @@ def chunked_upload_part(path_to_file, session_id, content_range) end end - def chunked_upload_part_from_io(io, session_id, content_range) - io.pos = content_range.min + def chunked_upload_part_from_io(io, session_id, content_range, io_size: nil, io_pos: nil) + io_size ||= io.size + io_pos ||= content_range.min + + io.pos = io_pos part_size = content_range.max - content_range.min + 1 data = io.read(part_size) io.rewind digest = "sha=#{Digest::SHA1.base64digest(data)}" - range = "bytes #{content_range.min}-#{content_range.max}/#{io.size}" + range = "bytes #{content_range.min}-#{content_range.max}/#{io_size}" uri = "#{UPLOAD_URI}/files/upload_sessions/#{session_id}" body = data @@ -155,25 +162,55 @@ def chunked_upload_new_version_of_file_from_io(io, file, name, n_threads: 1, con PARALLEL_GEM_REQUIREMENT = Gem::Requirement.create('~> 1.0').freeze - def chunked_upload_to_session_from_io(io, session, n_threads: 1, content_created_at: nil, content_modified_at: nil) + def content_ranges_for(session, io_size) content_ranges = [] offset = 0 loop do - limit = [offset + session.part_size, io.size].min - 1 + limit = [offset + session.part_size, io_size].min - 1 content_ranges << (offset..limit) - break if limit == io.size - 1 + break if limit == io_size - 1 offset = limit + 1 end + content_ranges + end + + def read_io_concurrently(io, content_range, lock) + part_size = content_range.max - content_range.min + 1 + buf = String.new(capacity: part_size) + + pread_supported = io.respond_to?(:pread) + if pread_supported + begin + io.pread(part_size, content_range.min, buf) + rescue SystemCallError + pread_supported = false + end + end + + unless pread_supported + lock.synchronize do + io.pos = content_range.min + io.read(part_size, buf) + end + end + + buf + end + + def chunked_upload_to_session_from_io(io, session, n_threads: 1, io_size: nil, content_created_at: nil, content_modified_at: nil) + io_size ||= io.size + + content_ranges = content_ranges_for(session, io_size) parts = if n_threads > 1 raise BoxrError.new(boxr_message: "parallel chunked uploads requires gem parallel (#{PARALLEL_GEM_REQUIREMENT}) to be loaded") unless gem_parallel_available? + lock = Mutex.new Parallel.map(content_ranges, in_threads: n_threads) do |content_range| - File.open(io.path) do |io_dup| - chunked_upload_part_from_io(io_dup, session.id, content_range) - end + data = read_io_concurrently(io, content_range, lock) + chunked_upload_part_from_io(StringIO.new(data), session.id, content_range, io_size: io_size, io_pos: 0) end else content_ranges.map do |content_range| diff --git a/spec/boxr/chunked_uploads_spec.rb b/spec/boxr/chunked_uploads_spec.rb index 95b25ed..30715b4 100644 --- a/spec/boxr/chunked_uploads_spec.rb +++ b/spec/boxr/chunked_uploads_spec.rb @@ -1,6 +1,9 @@ #rake spec SPEC_OPTS="-e \"invokes chunked uploads operations"\" + +require "parallel" + describe "chunked uploads operations" do - it "invokes chunked uploads operations" do + it "invokes chunked uploads session-related operations" do puts "create chunked upload session" new_session = BOX_CLIENT.chunked_upload_create_session_new_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", @test_folder) expect(new_session.id).not_to be_nil @@ -16,14 +19,35 @@ puts "abort chunked upload session" abort_info = BOX_CLIENT.chunked_upload_abort_session(new_session.id) expect(abort_info).to eq({}) + end - puts "upload a large file in chunks" - new_file = BOX_CLIENT.chunked_upload_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", @test_folder) - expect(new_file.name).to eq(TEST_LARGE_FILE_NAME) - test_file = new_file + shared_examples "chunked uploads data-upload" do |n_threads:| + it "uploads chunked uploads upload-related operations (threads: #{n_threads})" do + puts "upload a large file in chunks" + new_file = BOX_CLIENT.chunked_upload_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", @test_folder, n_threads: n_threads) + expect(new_file.name).to eq(TEST_LARGE_FILE_NAME) + test_file = new_file - puts "upload new version of a large file in chunks" - new_version = BOX_CLIENT.chunked_upload_new_version_of_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", test_file) - expect(new_version.id).to eq(test_file.id) + puts "upload new version of a large file in chunks" + new_version = BOX_CLIENT.chunked_upload_new_version_of_file("./spec/test_files/#{TEST_LARGE_FILE_NAME}", test_file, n_threads: n_threads) + expect(new_version.id).to eq(test_file.id) + + puts "upload a large file in chunks from IO stream" + filename = "yet another large file.txt" + io = StringIO.new + io << "1" * 21 * 1024**2 + io.rewind + new_file = BOX_CLIENT.chunked_upload_file_from_io(io, @test_folder, filename, n_threads: n_threads) + expect(new_file.name).to eq(filename) + test_file = new_file + + puts "upload new version of a large file in chunks from IO stream" + io.rewind + new_version = BOX_CLIENT.chunked_upload_new_version_of_file_from_io(io, test_file, filename, n_threads: n_threads) + expect(new_version.id).to eq(test_file.id) + end end + + it_behaves_like "chunked uploads data-upload", n_threads: 1 + it_behaves_like "chunked uploads data-upload", n_threads: 2 end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 97ab9d5..10bab73 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,7 +4,7 @@ require 'awesome_print' RSpec.configure do |config| - config.before(:each) do + config.before(:each) do |test| if test.metadata[:skip_reset] puts "Skipping reset" next