Skip to content

Commit 70dc8c5

Browse files
Serialize register/remove operations. (#14)
1 parent 8731842 commit 70dc8c5

File tree

4 files changed

+284
-13
lines changed

4 files changed

+284
-13
lines changed

lib/async/container/supervisor/memory_monitor.rb

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ def initialize(interval: 10, total_size_limit: nil, memory_sample: false, **opti
3030
@options = options
3131

3232
@processes = Hash.new{|hash, key| hash[key] = Set.new.compare_by_identity}
33+
34+
# Queue to serialize cluster modifications to prevent race conditions:
35+
@guard = Mutex.new
3336
end
3437

3538
# @attribute [Memory::Leak::Cluster] The cluster of processes being monitored.
@@ -50,7 +53,10 @@ def register(connection)
5053

5154
if connections.empty?
5255
Console.debug(self, "Registering process.", child: {process_id: process_id})
53-
self.add(process_id)
56+
# Queue the cluster modification to avoid race conditions:
57+
@guard.synchronize do
58+
self.add(process_id)
59+
end
5460
end
5561

5662
connections.add(connection)
@@ -66,7 +72,9 @@ def remove(connection)
6672

6773
if connections.empty?
6874
Console.debug(self, "Removing process.", child: {process_id: process_id})
69-
@cluster.remove(process_id)
75+
@guard.synchronize do
76+
@cluster.remove(process_id)
77+
end
7078
end
7179
end
7280
end
@@ -119,14 +127,16 @@ def memory_leak_detected(process_id, monitor)
119127
def run
120128
Async do
121129
Loop.run(interval: @interval) do
122-
# This block must return true if the process was killed.
123-
@cluster.check! do |process_id, monitor|
124-
Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
125-
126-
begin
127-
memory_leak_detected(process_id, monitor)
128-
rescue => error
129-
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
130+
@guard.synchronize do
131+
# This block must return true if the process was killed.
132+
@cluster.check! do |process_id, monitor|
133+
Console.error(self, "Memory leak detected!", child: {process_id: process_id}, monitor: monitor)
134+
135+
begin
136+
memory_leak_detected(process_id, monitor)
137+
rescue => error
138+
Console.error(self, "Failed to handle memory leak!", child: {process_id: process_id}, exception: error)
139+
end
130140
end
131141
end
132142
end

releases.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Releases
22

3+
## Unreleased
4+
5+
- Serialize `register`/`remove` and `check!` operations in `MemoryMonitor`.
6+
37
## v0.9.1
48

59
- Close `Call` queue if asynchronous call fails during dispatch - further messages will fail with `ClosedQueueError`.

test/async/container/client.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
with "#run" do
2424
it "can run the client" do
2525
connected = Async::Promise.new
26-
expect(client).to receive(:connected!) {|connection| connected.resolve(true)}
26+
expect(client).to receive(:connected!){|connection| connected.resolve(true)}
2727

2828
client_task = client.run
2929

test/async/container/memory_monitor.rb

Lines changed: 259 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,280 @@
1414
let(:monitor) {subject.new(interval: 1, memory_sample: {duration: 1, timeout: 5})}
1515
it_behaves_like Async::Container::Supervisor::AMonitor
1616

17+
# Mock connection object for testing
18+
let(:mock_connection) do
19+
Object.new.tap do |connection|
20+
def connection.state
21+
@state ||= {}
22+
end
23+
end
24+
end
25+
26+
with "#register" do
27+
it "adds process to cluster when registering first connection" do
28+
process_id = 12345
29+
mock_connection.state[:process_id] = process_id
30+
31+
expect(monitor.cluster).to receive(:add).with(process_id)
32+
33+
monitor.register(mock_connection)
34+
35+
# With mutex, operation happens immediately
36+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
37+
end
38+
39+
it "does not add process to cluster when connection already exists" do
40+
process_id = 12345
41+
mock_connection.state[:process_id] = process_id
42+
43+
# Register first connection
44+
monitor.register(mock_connection)
45+
46+
# Verify process was added
47+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
48+
49+
# Count how many processes are in cluster before second registration
50+
process_count_before = monitor.cluster.processes.size
51+
52+
# Register second connection to same process - should not add again
53+
mock_connection2 = Object.new.tap do |conn|
54+
def conn.state
55+
@state ||= {process_id: 12345}
56+
end
57+
end
58+
59+
monitor.register(mock_connection2)
60+
61+
# Process count should not increase (no new add operation)
62+
process_count_after = monitor.cluster.processes.size
63+
expect(process_count_after).to be == process_count_before
64+
end
65+
66+
it "handles connection without process_id gracefully" do
67+
mock_connection.state.clear
68+
69+
# Count processes before
70+
process_count_before = monitor.cluster.processes.size
71+
72+
# Should not raise an error or call add
73+
monitor.register(mock_connection)
74+
75+
# Process count should not change
76+
process_count_after = monitor.cluster.processes.size
77+
expect(process_count_after).to be == process_count_before
78+
end
79+
end
80+
81+
with "#remove" do
82+
it "removes process from cluster when removing last connection" do
83+
process_id = 12345
84+
mock_connection.state[:process_id] = process_id
85+
86+
# Register first
87+
monitor.register(mock_connection)
88+
89+
# Verify process was added
90+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
91+
92+
# Remove
93+
expect(monitor.cluster).to receive(:remove).with(process_id)
94+
95+
monitor.remove(mock_connection)
96+
97+
# With mutex, operation happens immediately
98+
expect(monitor.cluster.processes.keys).not.to be(:include?, process_id)
99+
end
100+
101+
it "does not remove process from cluster when other connections exist" do
102+
process_id = 12345
103+
mock_connection.state[:process_id] = process_id
104+
105+
# Register first connection
106+
monitor.register(mock_connection)
107+
108+
# Register second connection
109+
mock_connection2 = Object.new.tap do |conn|
110+
def conn.state
111+
@state ||= {process_id: 12345}
112+
end
113+
end
114+
monitor.register(mock_connection2)
115+
116+
# Verify process is still in cluster
117+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
118+
119+
# Count processes before removal
120+
process_count_before = monitor.cluster.processes.size
121+
122+
# Remove first connection - should not remove (other connection exists)
123+
monitor.remove(mock_connection)
124+
125+
# Process count should not decrease
126+
process_count_after = monitor.cluster.processes.size
127+
expect(process_count_after).to be == process_count_before
128+
129+
# Process should still be in cluster
130+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
131+
end
132+
133+
it "handles connection without process_id gracefully" do
134+
mock_connection.state.clear
135+
136+
# Count processes before
137+
process_count_before = monitor.cluster.processes.size
138+
139+
# Should not raise an error or call remove
140+
monitor.remove(mock_connection)
141+
142+
# Process count should not change
143+
process_count_after = monitor.cluster.processes.size
144+
expect(process_count_after).to be == process_count_before
145+
end
146+
end
147+
148+
with "mutex serialization" do
149+
include Sus::Fixtures::Async::SchedulerContext
150+
151+
it "serializes register operations with cluster check" do
152+
process_id = 12345
153+
mock_connection.state[:process_id] = process_id
154+
155+
# Register connection - with mutex, operation happens immediately
156+
monitor.register(mock_connection)
157+
158+
# Verify process was added immediately
159+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
160+
end
161+
162+
it "serializes multiple register/remove operations" do
163+
process_id1 = 11111
164+
process_id2 = 22222
165+
166+
conn1 = Object.new.tap do |c|
167+
def c.state
168+
@state ||= {process_id: 11111}
169+
end
170+
end
171+
conn2 = Object.new.tap do |c|
172+
def c.state
173+
@state ||= {process_id: 22222}
174+
end
175+
end
176+
177+
# Register both connections - operations happen immediately with mutex
178+
monitor.register(conn1)
179+
monitor.register(conn2)
180+
181+
# Verify both processes were added
182+
expect(monitor.cluster.processes.keys).to be(:include?, process_id1)
183+
expect(monitor.cluster.processes.keys).to be(:include?, process_id2)
184+
185+
# Remove first connection
186+
monitor.remove(conn1)
187+
188+
# Verify operations were executed: process_id2 should still be there, process_id1 should be removed
189+
expect(monitor.cluster.processes.keys).to be(:include?, process_id2)
190+
expect(monitor.cluster.processes.keys).not.to be(:include?, process_id1)
191+
end
192+
end
193+
194+
with "#memory_leak_detected" do
195+
let(:monitor_without_sample) {subject.new(interval: 1)}
196+
197+
it "kills process when memory leak is detected" do
198+
process_id = 12345
199+
mock_monitor = Object.new
200+
201+
expect(Process).to receive(:kill).with(:INT, process_id)
202+
203+
result = monitor_without_sample.memory_leak_detected(process_id, mock_monitor)
204+
205+
expect(result).to be == true
206+
end
207+
208+
it "handles already-dead process gracefully" do
209+
process_id = 99999 # Non-existent process
210+
mock_monitor = Object.new
211+
212+
expect(Process).to receive(:kill).with(:INT, process_id).and_raise(Errno::ESRCH)
213+
214+
# Should not raise an error
215+
result = monitor_without_sample.memory_leak_detected(process_id, mock_monitor)
216+
217+
expect(result).to be == true
218+
end
219+
220+
it "captures memory sample when enabled" do
221+
process_id = 12345
222+
mock_monitor = Object.new
223+
mock_connection.state[:process_id] = process_id
224+
225+
# Register connection
226+
monitor.register(mock_connection)
227+
228+
# Mock the connection call
229+
expect(mock_connection).to receive(:call).with(
230+
do: :memory_sample,
231+
duration: 1,
232+
timeout: 5
233+
).and_return({data: "sample data"})
234+
235+
expect(Process).to receive(:kill).with(:INT, process_id)
236+
237+
monitor.memory_leak_detected(process_id, mock_monitor)
238+
end
239+
end
240+
17241
with "#run" do
18242
include Sus::Fixtures::Async::SchedulerContext
19243

20244
it "can run the monitor" do
21245
task = monitor.run
22246
expect(task).to be(:running?)
247+
ensure
248+
task&.stop
23249
end
24250

25251
it "can handle failures" do
26-
expect(monitor.cluster).to receive(:check!).and_raise(Errno::ESRCH)
252+
checked = Async::Promise.new
253+
254+
# The monitor should continue running even if check! raises errors
255+
# Loop.run handles errors internally, so we just verify the task stays running
256+
expect(monitor.cluster).to receive(:check!){checked.resolve(true); raise Errno::ESRCH}
27257

28258
task = monitor.run
29259
expect(task).to be(:running?)
30260

31-
sleep 1
261+
# Wait for iterations - Loop.run catches errors and continues
262+
checked.wait
32263

264+
# Task should still be running despite errors
265+
# (Loop.run catches exceptions and logs them, then continues)
33266
expect(task).to be(:running?)
267+
ensure
268+
task&.stop
269+
end
270+
271+
it "serializes register operations with cluster check" do
272+
process_id = 12345
273+
mock_connection.state[:process_id] = process_id
274+
275+
# Register connection - with mutex, operation happens immediately
276+
monitor.register(mock_connection)
277+
278+
# Verify process was added immediately (mutex ensures this happens synchronously)
279+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
280+
281+
# Start the monitor - the mutex ensures check! runs after register completes
282+
task = monitor.run
283+
284+
# Give it a moment to run
285+
reactor.sleep(0.1)
286+
287+
# Process should still be in cluster (check! doesn't remove it unless there's a leak)
288+
expect(monitor.cluster.processes.keys).to be(:include?, process_id)
289+
ensure
290+
task&.stop
34291
end
35292
end
36293
end

0 commit comments

Comments
 (0)