Skip to content

RUBY-3658 Ensure partial writes are retried #2927

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 5 additions & 30 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,15 @@ jobs:
env:
CI: true
TESTOPTS: "-v"
runs-on: ubuntu-20.04
runs-on: ubuntu-22.04
continue-on-error: true
strategy:
fail-fast: false
matrix:
os: [ ubuntu-20.04 ]
ruby: ["2.7", "3.0", "3.1", "3.2", "3.3"]
mongodb: ["4.4", "5.0", "6.0", "7.0", "8.0"]
topology: [replica_set, sharded_cluster]
include:
- os: macos
ruby: "2.7"
mongodb: "7.0"
topology: server
- os: macos
ruby: "3.0"
mongodb: "7.0"
topology: server
- os: ubuntu-latest
ruby: "2.7"
mongodb: "7.0"
topology: server
- os: ubuntu-latest
ruby: "3.1"
mongodb: "7.0"
topology: server
- os: ubuntu-latest
ruby: "3.2"
mongodb: "7.0"
topology: server
- os: ubuntu-latest
ruby: "3.2"
mongodb: "8.0"
topology: replica_set
os: [ ubuntu-22.04 ]
ruby: [ "3.2" ]
mongodb: [ "7.0", "8.0" ]
topology: [ replica_set, sharded_cluster ]
steps:
- name: repo checkout
uses: actions/checkout@v2
Expand Down
51 changes: 29 additions & 22 deletions lib/mongo/socket.rb
Original file line number Diff line number Diff line change
Expand Up @@ -491,9 +491,8 @@ def write_without_timeout(*args)
buf = buf.to_s
i = 0
while i < buf.length
chunk = buf[i...i+WRITE_CHUNK_SIZE]
@socket.write(chunk)
i += WRITE_CHUNK_SIZE
chunk = buf[i, WRITE_CHUNK_SIZE]
i += @socket.write(chunk)
end
end
end
Expand Down Expand Up @@ -523,32 +522,40 @@ def write_with_timeout(*args, timeout:)

def write_chunk(chunk, timeout)
deadline = Utils.monotonic_time + timeout

written = 0
begin
written += @socket.write_nonblock(chunk[written..-1])
rescue IO::WaitWritable, Errno::EINTR
select_timeout = deadline - Utils.monotonic_time
rv = Kernel.select(nil, [@socket], nil, select_timeout)
if BSON::Environment.jruby?
# Ignore the return value of Kernel.select.
# On JRuby, select appears to return nil prior to timeout expiration
# (apparently due to a EAGAIN) which then causes us to fail the read
# even though we could have retried it.
# Check the deadline ourselves.
if deadline
select_timeout = deadline - Utils.monotonic_time
if select_timeout <= 0
raise_timeout_error!("Took more than #{timeout} seconds to receive data", true)
end
while written < chunk.length
begin
written += @socket.write_nonblock(chunk[written..-1])
rescue IO::WaitWritable, Errno::EINTR
if !wait_for_socket_to_be_writable(deadline)
raise_timeout_error!("Took more than #{timeout} seconds to receive data", true)
end
elsif rv.nil?
raise_timeout_error!("Took more than #{timeout} seconds to receive data (select call timed out)", true)

retry
end
retry
end

written
end

def wait_for_socket_to_be_writable(deadline)
select_timeout = deadline - Utils.monotonic_time
rv = Kernel.select(nil, [@socket], nil, select_timeout)

if BSON::Environment.jruby?
# Ignore the return value of Kernel.select.
# On JRuby, select appears to return nil prior to timeout expiration
# (apparently due to a EAGAIN) which then causes us to fail the read
# even though we could have retried it.
# Check the deadline ourselves.
select_timeout = deadline - Utils.monotonic_time
return select_timeout > 0
end

!rv.nil?
end

def unix_socket?(sock)
defined?(UNIXSocket) && sock.is_a?(UNIXSocket)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,7 @@
end

let(:error_regex) do
if BSON::Environment.jruby?
/SocketError/
else
/Connection refused/
end
/Connection refused|SocketError|SocketTimeoutError/
end

it_behaves_like 'raising a KMS error'
Expand Down
63 changes: 57 additions & 6 deletions spec/mongo/socket_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,6 @@

let(:raw_socket) { socket.instance_variable_get('@socket') }

let(:wait_readable_class) do
Class.new(Exception) do
include IO::WaitReadable
end
end

context 'timeout' do
clean_slate_for_all

Expand Down Expand Up @@ -116,4 +110,61 @@
end
end
end

describe '#write' do
let(:target_host) do
host = ClusterConfig.instance.primary_address_host
# Take ipv4 address
Socket.getaddrinfo(host, 0).detect { |ai| ai.first == 'AF_INET' }[3]
end

let(:socket) do
Mongo::Socket::TCP.new(target_host, ClusterConfig.instance.primary_address_port, 1, Socket::PF_INET)
end

let(:raw_socket) { socket.instance_variable_get('@socket') }

context 'with timeout' do
let(:timeout) { 5_000 }

context 'data is less than WRITE_CHUNK_SIZE' do
let(:data) { "a" * 1024 }

context 'when a partial write occurs' do
before do
expect(raw_socket)
.to receive(:write_nonblock)
.twice
.and_return(data.length / 2)
end

it 'eventually writes everything' do
expect(socket.write(data, timeout: timeout)).
to be === data.length
end
end
end

context 'data is greater than WRITE_CHUNK_SIZE' do
let(:data) { "a" * (2 * Mongo::Socket::WRITE_CHUNK_SIZE + 256) }

context 'when a partial write occurs' do
before do
expect(raw_socket)
.to receive(:write_nonblock)
.exactly(4).times
.and_return(Mongo::Socket::WRITE_CHUNK_SIZE,
128,
Mongo::Socket::WRITE_CHUNK_SIZE - 128,
256)
end

it 'eventually writes everything' do
expect(socket.write(data, timeout: timeout)).
to be === data.length
end
end
end
end
end
end
Loading