Skip to content

Commit 2d8a249

Browse files
RUBY-3463 Fix cursor behaviour on load balanced (#2893)
1 parent 56100d6 commit 2d8a249

File tree

17 files changed

+399
-314
lines changed

17 files changed

+399
-314
lines changed

gemfiles/standard.rb

+1
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ def standard_dependencies
6565
gem 'tilt'
6666
# solargraph depends on rbs, which won't build on jruby for some reason
6767
gem 'solargraph', platforms: :mri
68+
gem 'ruby-lsp', platforms: :mri
6869
end
6970

7071
gem 'libmongocrypt-helper', '~> 1.8.0' if ENV['FLE'] == 'helper'

lib/mongo/cluster/reapers/cursor_reaper.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,12 @@ def kill_cursors
194194
server_api: server.options[:server_api],
195195
connection_global_id: kill_spec.connection_global_id,
196196
}
197-
op.execute(server, context: Operation::Context.new(options: options))
197+
if connection = kill_spec.connection
198+
op.execute_with_connection(connection, context: Operation::Context.new(options: options))
199+
connection.connection_pool.check_in(connection)
200+
else
201+
op.execute(server, context: Operation::Context.new(options: options))
202+
end
198203

199204
if session = kill_spec.session
200205
if session.implicit?

lib/mongo/collection/view/aggregation.rb

+13-1
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,26 @@ def effective_read_preference(connection)
118118
end
119119

120120
def send_initial_query(server, context)
121-
server.with_connection do |connection|
121+
if server.load_balancer?
122+
# Connection will be checked in when cursor is drained.
123+
connection = server.pool.check_out(context: context)
122124
initial_query_op(
123125
context.session,
124126
effective_read_preference(connection)
125127
).execute_with_connection(
126128
connection,
127129
context: context
128130
)
131+
else
132+
server.with_connection do |connection|
133+
initial_query_op(
134+
context.session,
135+
effective_read_preference(connection)
136+
).execute_with_connection(
137+
connection,
138+
context: context
139+
)
140+
end
129141
end
130142
end
131143
end

lib/mongo/collection/view/iterable.rb

+8-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,14 @@ def initial_query_op(session)
168168
end
169169

170170
def send_initial_query(server, context)
171-
initial_query_op(context.session).execute(server, context: context)
171+
operation = initial_query_op(context.session)
172+
if server.load_balancer?
173+
# Connection will be checked in when cursor is drained.
174+
connection = server.pool.check_out(context: context)
175+
operation.execute_with_connection(connection, context: context)
176+
else
177+
operation.execute(server, context: context)
178+
end
172179
end
173180

174181
def use_query_cache?

lib/mongo/collection/view/map_reduce.rb

+20-4
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,15 @@ def each
7373
session = client.get_session(@options)
7474
server = cluster.next_primary(nil, session)
7575
context = Operation::Context.new(client: client, session: session, operation_timeouts: view.operation_timeouts)
76-
result = send_initial_query(server, context)
77-
result = send_fetch_query(server, session) unless inline?
76+
if server.load_balancer?
77+
# Connection will be checked in when cursor is drained.
78+
connection = server.pool.check_out(context: context)
79+
result = send_initial_query_with_connection(connection, context.session, context: context)
80+
result = send_fetch_query_with_connection(connection, session) unless inline?
81+
else
82+
result = send_initial_query(server, context)
83+
result = send_fetch_query(server, session) unless inline?
84+
end
7885
@cursor = Cursor.new(view, result, server, session: session)
7986
if block_given?
8087
@cursor.each do |doc|
@@ -306,7 +313,7 @@ def find_command_spec(session)
306313
Builder::MapReduce.new(map_function, reduce_function, view, options.merge(session: session)).command_specification
307314
end
308315

309-
def fetch_query_op(server, session)
316+
def fetch_query_op(session)
310317
spec = {
311318
coll_name: out_collection_name,
312319
db_name: out_database_name,
@@ -320,7 +327,16 @@ def fetch_query_op(server, session)
320327
end
321328

322329
def send_fetch_query(server, session)
323-
fetch_query_op(server, session).execute(server, context: Operation::Context.new(client: client, session: session))
330+
fetch_query_op(session).execute(server, context: Operation::Context.new(client: client, session: session))
331+
end
332+
333+
def send_fetch_query_with_connection(connection, session)
334+
fetch_query_op(
335+
session
336+
).execute_with_connection(
337+
connection,
338+
context: Operation::Context.new(client: client, session: session)
339+
)
324340
end
325341
end
326342
end

lib/mongo/collection/view/readable.rb

+7-1
Original file line numberDiff line numberDiff line change
@@ -736,7 +736,13 @@ def parallel_scan(cursor_count, options = {})
736736
session: session,
737737
connection_global_id: result.connection_global_id,
738738
)
739-
result = op.execute(server, context: context)
739+
result = if server.load_balancer?
740+
# Connection will be checked in when cursor is drained.
741+
connection = server.pool.check_out(context: context)
742+
op.execute_with_connection(connection, context: context)
743+
else
744+
op.execute(server, context: context)
745+
end
740746
Cursor.new(self, result, server, session: session)
741747
end
742748
end

lib/mongo/cursor.rb

+43-5
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,19 @@ def initialize(view, result, server, options = {})
9191
@context = @options[:context]&.with(connection_global_id: connection_global_id_for_context) || fresh_context
9292
@explicitly_closed = false
9393
@lock = Mutex.new
94-
unless closed?
94+
if server.load_balancer?
95+
# We need the connection in the cursor only in load balanced topology;
96+
# we do not need an additional reference to it otherwise.
97+
@connection = @initial_result.connection
98+
end
99+
if closed?
100+
check_in_connection
101+
else
95102
register
96-
ObjectSpace.define_finalizer(self, self.class.finalize(kill_spec(@connection_global_id),
97-
cluster))
103+
ObjectSpace.define_finalizer(
104+
self,
105+
self.class.finalize(kill_spec(@connection_global_id), cluster)
106+
)
98107
end
99108
end
100109

@@ -104,6 +113,9 @@ def initialize(view, result, server, options = {})
104113
# @api private
105114
attr_reader :initial_result
106115

116+
# @api private
117+
attr_reader :connection
118+
107119
# Finalize the cursor for garbage collection. Schedules this cursor to be included
108120
# in a killCursors operation executed by the Cluster's CursorReaper.
109121
#
@@ -315,6 +327,7 @@ def close(opts = {})
315327
@lock.synchronize do
316328
@explicitly_closed = true
317329
end
330+
check_in_connection
318331
end
319332

320333
# Get the parsed collection name.
@@ -395,6 +408,7 @@ def kill_spec(connection_global_id)
395408
connection_global_id: connection_global_id,
396409
server_address: server.address,
397410
session: @session,
411+
connection: @connection
398412
)
399413
end
400414

@@ -464,7 +478,10 @@ def process(result)
464478
# the @cursor_id may be zero (all results fit in the first batch).
465479
# Thus we need to check both @cursor_id and the cursor_id of the result
466480
# prior to calling unregister here.
467-
unregister if !closed? && result.cursor_id == 0
481+
if !closed? && result.cursor_id == 0
482+
unregister
483+
check_in_connection
484+
end
468485
@cursor_id = set_cursor_id(result)
469486

470487
if result.respond_to?(:post_batch_resume_token)
@@ -496,7 +513,12 @@ def unregister
496513
end
497514

498515
def execute_operation(op, context: nil)
499-
op.execute(@server, context: context || possibly_refreshed_context)
516+
op_context = context || possibly_refreshed_context
517+
if @connection.nil?
518+
op.execute(@server, context: op_context)
519+
else
520+
op.execute_with_connection(@connection, context: op_context)
521+
end
500522
end
501523

502524
# Considers the timeout mode and will either return the cursor's
@@ -545,6 +567,22 @@ def connection_global_id_for_context
545567
@connection_global_id
546568
end
547569
end
570+
571+
# Returns the connection that was used to create the cursor back to the
572+
# corresponding connection pool.
573+
#
574+
# In a load balanced topology cursors must use the same connection for the
575+
# initial and all subsequent operations. Therefore, the connection is not
576+
# checked into the pool after the initial operation is completed, but
577+
# only when the cursor is drained.
578+
def check_in_connection
579+
# Connection nil means the connection has been already checked in.
580+
return if @connection.nil?
581+
return unless @connection.server.load_balancer?
582+
583+
@connection.connection_pool.check_in(@connection)
584+
@connection = nil
585+
end
548586
end
549587
end
550588

lib/mongo/cursor/kill_spec.rb

+5-2
Original file line numberDiff line numberDiff line change
@@ -31,22 +31,25 @@ def initialize(
3131
db_name:,
3232
connection_global_id:,
3333
server_address:,
34-
session:
34+
session:,
35+
connection: nil
3536
)
3637
@cursor_id = cursor_id
3738
@coll_name = coll_name
3839
@db_name = db_name
3940
@connection_global_id = connection_global_id
4041
@server_address = server_address
4142
@session = session
43+
@connection = connection
4244
end
4345

4446
attr_reader :cursor_id,
4547
:coll_name,
4648
:db_name,
4749
:connection_global_id,
4850
:server_address,
49-
:session
51+
:session,
52+
:connection
5053

5154
def ==(other)
5255
cursor_id == other.cursor_id &&

lib/mongo/database/view.rb

+14-5
Original file line numberDiff line numberDiff line change
@@ -288,11 +288,20 @@ def send_initial_query(server, session, context, options = {})
288288
if opts.key?(:deserialize_as_bson)
289289
execution_opts[:deserialize_as_bson] = opts.delete(:deserialize_as_bson)
290290
end
291-
initial_query_op(session, opts).execute(
292-
server,
293-
context: context,
294-
options: execution_opts
295-
)
291+
if server.load_balancer?
292+
connection = server.pool.check_out(context: context)
293+
initial_query_op(session, opts).execute_with_connection(
294+
connection,
295+
context: context,
296+
options: execution_opts
297+
)
298+
else
299+
initial_query_op(session, opts).execute(
300+
server,
301+
context: context,
302+
options: execution_opts
303+
)
304+
end
296305
end
297306
end
298307
end

lib/mongo/index/view.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,12 @@ def normalize_models(models, server)
404404
end
405405

406406
def send_initial_query(server, session, context)
407-
initial_query_op(session).execute(server, context: context)
407+
if server.load_balancer?
408+
connection = server.pool.check_out(context: context)
409+
initial_query_op(session).execute_with_connection(connection, context: context)
410+
else
411+
initial_query_op(session).execute(server, context: context)
412+
end
408413
end
409414
end
410415
end

lib/mongo/operation/result.rb

+4-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ class Result
104104
# when this result was produced.
105105
#
106106
# @api private
107-
def initialize(replies, connection_description = nil, connection_global_id = nil, context: nil)
107+
def initialize(replies, connection_description = nil, connection_global_id = nil, context: nil, connection: nil)
108108
@context = context
109109

110110
if replies
@@ -122,6 +122,7 @@ def initialize(replies, connection_description = nil, connection_global_id = nil
122122
@replies = [ reply ]
123123
@connection_description = connection_description
124124
@connection_global_id = connection_global_id
125+
@connection = connection
125126
end
126127
end
127128

@@ -148,6 +149,8 @@ def initialize(replies, connection_description = nil, connection_global_id = nil
148149
# @api private
149150
attr_reader :context
150151

152+
attr_reader :connection
153+
151154
# @api private
152155
def_delegators :parser,
153156
:not_master?, :node_recovering?, :node_shutting_down?

lib/mongo/operation/shared/executable.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ def result_class
104104
end
105105

106106
def get_result(connection, context, options = {})
107-
result_class.new(*dispatch_message(connection, context, options), context: context)
107+
result_class.new(*dispatch_message(connection, context, options), context: context, connection: connection)
108108
end
109109

110110
# Returns a Protocol::Message or nil as reply.

0 commit comments

Comments
 (0)