Skip to content

Commit 3d469ac

Browse files
Serialize register/remove operations.
1 parent 8731842 commit 3d469ac

File tree

2 files changed

+275
-2
lines changed

2 files changed

+275
-2
lines changed

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ def initialize(interval: 10, total_size_limit: nil, memory_sample: false, **opti
3030
@options = options
3131

3232
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
33+
34+
# Queue to serialize cluster modifications to prevent race conditions:
35+
@queue = Thread::Queue.new
3336
end
3437

3538
# @attribute [Memory::Leak::Cluster] The cluster of processes being monitored.
@@ -42,6 +45,15 @@ def add(process_id)
4245
@cluster.add(process_id, **@options)
4346
end
4447

48+
# Process all pending operations from the queue.
49+
#
50+
# This ensures cluster modifications are serialized and don't conflict with @cluster.check! iterations.
51+
private def process_queue!
52+
while operation = @queue.pop(timeout: 0)
53+
operation.call
54+
end
55+
end
56+
4557
# Register the connection (worker) with the memory monitor.
4658
def register(connection)
4759
Console.debug(self, "Registering connection.", connection: connection, state: connection.state)
@@ -50,7 +62,8 @@ def register(connection)
5062

5163
if connections.empty?
5264
Console.debug(self, "Registering process.", child: {process_id: process_id})
53-
self.add(process_id)
65+
# Queue the cluster modification to avoid race conditions:
66+
@queue.push(proc { @cluster.add(process_id, **@options) })
5467
end
5568

5669
connections.add(connection)

test/async/container/memory_monitor.rb

Lines changed: 261 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,283 @@
1414
let(:monitor) {subject.new(interval: 1, memory_sample: {duration: 1, timeout: 5})}
1515
it_behaves_like Async::Container::Supervisor::AMonitor
1616

17+
# Mock connection object for testing
18+
let(:mock_connection) do
19+
Object.new.tap do |connection|
20+
def connection.state
21+
@state ||= {}
22+
end
23+
end
24+
end
25+
26+
with "#register" do
27+
it "adds process to cluster when registering first connection" do
28+
process_id = 12345
29+
mock_connection.state[:process_id] = process_id
30+
31+
expect(monitor.cluster).to receive(:add).with(process_id)
32+
33+
monitor.register(mock_connection)
34+
35+
# Process the queue to execute the queued operation:
36+
monitor.send(:process_queue!)
37+
end
38+
39+
it "does not add process to cluster when connection already exists" do
40+
process_id = 12345
41+
mock_connection.state[:process_id] = process_id
42+
43+
# Register first connection
44+
monitor.register(mock_connection)
45+
monitor.send(:process_queue!)
46+
47+
# Verify process was added
48+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
49+
50+
# Register second connection to same process - should not queue another add
51+
mock_connection2 = Object.new.tap do |conn|
52+
def conn.state
53+
@state ||= {process_id: 12345}
54+
end
55+
end
56+
57+
# Count queue size before
58+
queue_size_before = monitor.instance_variable_get(:@queue).size
59+
60+
monitor.register(mock_connection2)
61+
62+
# Queue size should not increase (no new add operation queued)
63+
queue_size_after = monitor.instance_variable_get(:@queue).size
64+
expect(queue_size_after).to be == queue_size_before
65+
end
66+
67+
it "handles connection without process_id gracefully" do
68+
mock_connection.state.clear
69+
70+
# Should not raise an error
71+
monitor.register(mock_connection)
72+
monitor.send(:process_queue!)
73+
74+
# Queue should be empty (no operations queued)
75+
expect(monitor.instance_variable_get(:@queue).size).to be == 0
76+
end
77+
end
78+
79+
with "#remove" do
80+
it "removes process from cluster when removing last connection" do
81+
process_id = 12345
82+
mock_connection.state[:process_id] = process_id
83+
84+
# Register first
85+
monitor.register(mock_connection)
86+
monitor.send(:process_queue!)
87+
88+
# Remove
89+
expect(monitor.cluster).to receive(:remove).with(process_id)
90+
91+
monitor.remove(mock_connection)
92+
monitor.send(:process_queue!)
93+
end
94+
95+
it "does not remove process from cluster when other connections exist" do
96+
process_id = 12345
97+
mock_connection.state[:process_id] = process_id
98+
99+
# Register first connection
100+
monitor.register(mock_connection)
101+
monitor.send(:process_queue!)
102+
103+
# Register second connection
104+
mock_connection2 = Object.new.tap do |conn|
105+
def conn.state
106+
@state ||= {process_id: 12345}
107+
end
108+
end
109+
monitor.register(mock_connection2)
110+
monitor.send(:process_queue!)
111+
112+
# Verify process is still in cluster
113+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
114+
115+
# Count queue size before
116+
queue_size_before = monitor.instance_variable_get(:@queue).size
117+
118+
# Remove first connection - should not queue remove (other connection exists)
119+
monitor.remove(mock_connection)
120+
121+
# Queue size should not increase
122+
queue_size_after = monitor.instance_variable_get(:@queue).size
123+
expect(queue_size_after).to be == queue_size_before
124+
end
125+
126+
it "handles connection without process_id gracefully" do
127+
mock_connection.state.clear
128+
129+
# Should not raise an error
130+
monitor.remove(mock_connection)
131+
monitor.send(:process_queue!)
132+
133+
# Queue should be empty (no operations queued)
134+
expect(monitor.instance_variable_get(:@queue).size).to be == 0
135+
end
136+
end
137+
138+
with "queue serialization" do
139+
include Sus::Fixtures::Async::SchedulerContext
140+
141+
it "processes queued operations before cluster check" do
142+
process_id = 12345
143+
mock_connection.state[:process_id] = process_id
144+
145+
# Register connection (queues the operation)
146+
monitor.register(mock_connection)
147+
148+
# Verify process is not yet in cluster
149+
expect(monitor.cluster.processes.keys.include?(process_id)).to be == false
150+
151+
# Process queue manually to simulate what run does
152+
monitor.send(:process_queue!)
153+
154+
# Now verify process was added
155+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
156+
end
157+
158+
it "serializes multiple register/remove operations" do
159+
process_id1 = 11111
160+
process_id2 = 22222
161+
162+
conn1 = Object.new.tap do |c|
163+
def c.state
164+
@state ||= {process_id: 11111}
165+
end
166+
end
167+
conn2 = Object.new.tap do |c|
168+
def c.state
169+
@state ||= {process_id: 22222}
170+
end
171+
end
172+
173+
# Queue multiple operations
174+
monitor.register(conn1)
175+
monitor.register(conn2)
176+
monitor.remove(conn1)
177+
178+
# Verify both processes are queued but not yet added
179+
expect(monitor.cluster.processes.keys.include?(process_id1)).to be == false
180+
expect(monitor.cluster.processes.keys.include?(process_id2)).to be == false
181+
182+
# Process queue - should execute all operations
183+
monitor.send(:process_queue!)
184+
185+
# Verify operations were executed: process_id2 should be added, process_id1 should be removed
186+
expect(monitor.cluster.processes.keys).to be(:include?, process_id2)
187+
expect(monitor.cluster.processes.keys.include?(process_id1)).to be == false
188+
end
189+
end
190+
191+
with "#memory_leak_detected" do
192+
let(:monitor_without_sample) {subject.new(interval: 1)}
193+
194+
it "kills process when memory leak is detected" do
195+
process_id = 12345
196+
mock_monitor = Object.new
197+
198+
expect(Process).to receive(:kill).with(:INT, process_id)
199+
200+
result = monitor_without_sample.memory_leak_detected(process_id, mock_monitor)
201+
202+
expect(result).to be == true
203+
end
204+
205+
it "handles already-dead process gracefully" do
206+
process_id = 99999 # Non-existent process
207+
mock_monitor = Object.new
208+
209+
expect(Process).to receive(:kill).with(:INT, process_id).and_raise(Errno::ESRCH)
210+
211+
# Should not raise an error
212+
result = monitor_without_sample.memory_leak_detected(process_id, mock_monitor)
213+
214+
expect(result).to be == true
215+
end
216+
217+
it "captures memory sample when enabled" do
218+
process_id = 12345
219+
mock_monitor = Object.new
220+
mock_connection.state[:process_id] = process_id
221+
222+
# Register connection
223+
monitor.register(mock_connection)
224+
monitor.send(:process_queue!)
225+
226+
# Mock the connection call
227+
expect(mock_connection).to receive(:call).with(
228+
do: :memory_sample,
229+
duration: 1,
230+
timeout: 5
231+
).and_return({data: "sample data"})
232+
233+
expect(Process).to receive(:kill).with(:INT, process_id)
234+
235+
monitor.memory_leak_detected(process_id, mock_monitor)
236+
end
237+
end
238+
17239
with "#run" do
18240
include Sus::Fixtures::Async::SchedulerContext
19241

20242
it "can run the monitor" do
21243
task = monitor.run
22244
expect(task).to be(:running?)
245+
ensure
246+
task&.stop
23247
end
24248

25249
it "can handle failures" do
250+
# Mock check! to raise an error - the monitor should continue running
26251
expect(monitor.cluster).to receive(:check!).and_raise(Errno::ESRCH)
27252

28253
task = monitor.run
29254
expect(task).to be(:running?)
30255

31-
sleep 1
256+
reactor.sleep(0.1)
32257

258+
# Task should still be running despite errors
33259
expect(task).to be(:running?)
260+
ensure
261+
task&.stop
262+
end
263+
264+
it "processes queue before each check iteration" do
265+
process_id = 12345
266+
mock_connection.state[:process_id] = process_id
267+
268+
# Register connection (queues operation)
269+
monitor.register(mock_connection)
270+
271+
# Verify process is not yet in cluster
272+
expect(monitor.cluster.processes.keys.include?(process_id)).to be == false
273+
274+
# Track check! calls and verify queue processing
275+
check_count = 0
276+
process_added_before_check = false
277+
original_check = monitor.cluster.method(:check!)
278+
expect(monitor.cluster).to receive(:check!) do |&block|
279+
check_count += 1
280+
# On first check, verify the process was added (queue was processed)
281+
if check_count == 1
282+
process_added_before_check = monitor.cluster.processes.keys.include?(process_id)
283+
end
284+
original_check.call(&block)
285+
end
286+
287+
task = monitor.run
288+
reactor.sleep(0.1)
289+
290+
expect(check_count).to be > 0
291+
expect(process_added_before_check).to be_truthy
292+
ensure
293+
task&.stop
34294
end
35295
end
36296
end

0 commit comments

Comments
 (0)