Skip to content

Commit db34597

Browse files
aviateskclaude
andauthored
cancellation: fix memory leak in request cancellation handling (#260)
Implement `FixedSizeFIFOQueue` to keep track of recently handled request IDs and prevent dead IDs from accumulating in `currently_handled` when clients send `CancelRequestNotification` for already processed requests. The `handled_history` queue maintains the last `128` processed IDs with O(1) membership testing, allowing the server to efficiently ignore duplicate cancellation requests while preventing unbounded memory growth. Co-authored-by: Claude <noreply@anthropic.com>
1 parent 4a681bf commit db34597

5 files changed

Lines changed: 234 additions & 5 deletions

File tree

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
mutable struct FixedSizeFIFOQueue{T}
2+
const data::Vector{T}
3+
head::Int
4+
tail::Int
5+
size::Int
6+
const capacity::Int
7+
const items::Set{T}
8+
9+
function FixedSizeFIFOQueue{T}(capacity::Int) where T
10+
capacity > 0 || throw(ArgumentError("Capacity must be positive"))
11+
data = Vector{T}(undef, capacity)
12+
new{T}(data, 1, 1, 0, capacity, Set{T}())
13+
end
14+
FixedSizeFIFOQueue(capacity::Int) = FixedSizeFIFOQueue{Any}(capacity)
15+
end
16+
17+
function Base.push!(queue::FixedSizeFIFOQueue{T}, item::T) where T
18+
if queue.size == queue.capacity
19+
# Queue is full, overwrite oldest element
20+
old_item = queue.data[queue.head]
21+
delete!(queue.items, old_item)
22+
queue.data[queue.tail] = item
23+
push!(queue.items, item)
24+
queue.head = mod1(queue.head + 1, queue.capacity)
25+
queue.tail = mod1(queue.tail + 1, queue.capacity)
26+
else
27+
# Queue has space
28+
queue.data[queue.tail] = item
29+
push!(queue.items, item)
30+
queue.tail = mod1(queue.tail + 1, queue.capacity)
31+
queue.size += 1
32+
end
33+
return queue
34+
end
35+
36+
Base.in(item::T, queue::FixedSizeFIFOQueue{T}) where T = item in queue.items
37+
38+
Base.isempty(queue::FixedSizeFIFOQueue) = queue.size == 0
39+
40+
isfull(queue::FixedSizeFIFOQueue) = queue.size == queue.capacity
41+
42+
Base.length(queue::FixedSizeFIFOQueue) = queue.size
43+
44+
capacity(queue::FixedSizeFIFOQueue) = queue.capacity
45+
46+
function Base.collect(queue::FixedSizeFIFOQueue{T}) where T
47+
result = Vector{T}(undef, queue.size)
48+
idx = queue.head
49+
for i in 1:queue.size
50+
result[i] = queue.data[idx]
51+
idx = mod1(idx + 1, queue.capacity)
52+
end
53+
return result
54+
end
55+
56+
function Base.show(io::IO, queue::FixedSizeFIFOQueue{T}) where T
57+
items = collect(queue)
58+
print(io, "FixedSizeFIFOQueue{$T}(capacity=$(queue.capacity), items=$items)")
59+
end

src/JETLS.jl

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Analyzer.LSAnalyzer(args...; kwargs...) = LSAnalyzer(ScriptAnalysisEntry(filepat
4343
include("analysis/resolver.jl")
4444

4545
include("AtomicContainers/AtomicContainers.jl")
46+
include("FixedSizeFIFOQueue/FixedSizeFIFOQueue.jl")
4647
using .AtomicContainers
4748
const SWStats = JETLS_DEV_MODE ? AtomicContainers.SWStats : Nothing
4849
const LWStats = JETLS_DEV_MODE ? AtomicContainers.LWStats : Nothing
@@ -212,11 +213,6 @@ function handle_sequential_message(server::Server, @nospecialize msg)
212213
end
213214
end
214215

215-
# TODO This cancellation handling still has the following cleanup issues not yet implemented:
216-
# - When a client mistakenly sends a `CancelRequestNotification` for an already processed request,
217-
# it will register a dead ID in `currently_handled`. A fixed size FIFO queue to record
218-
# already handled message IDs may solve this problem in many cases.
219-
220216
struct ConcurrentMessageHandler
221217
queue::Channel{Any}
222218
end
@@ -226,12 +222,20 @@ end
226222
function (dispatcher::ConcurrentMessageHandler)(server::Server, @nospecialize msg)
227223
# Handle `currently_handled` processing serially within the concurrent message worker thread
228224
if msg isa CancelRequestNotification
225+
if msg.params.id in server.state.handled_history
226+
return # Request was already handled, ignore cancellation
227+
end
229228
cancel!(get!(()->CancelFlag(true), server.state.currently_handled, msg.params.id))
230229
elseif msg isa WorkDoneProgressCancelNotification
230+
if msg.params.token in server.state.handled_history
231+
return # Token was already handled, ignore cancellation
232+
end
231233
cancel!(get!(()->CancelFlag(true), server.state.currently_handled, msg.params.token))
232234
elseif msg isa HandledToken
233235
delete!(server.state.currently_handled, msg.id)
236+
push!(server.state.handled_history, msg.id) # Add to handled history to prevent dead IDs from accumulating
234237
# @info "Remaining requests" length(server.state.currently_handled) Base.summarysize(server.state.currently_handled)
238+
# @info "Handled history" length(server.state.handled_history) Base.summarysize(server.state.handled_history)
235239
# Handle regular messages concurrently
236240
elseif msg isa Dict{Symbol,Any} # ResponseMessage or untyped message
237241
request_caller = let id = get(msg, :id, nothing)

src/types.jl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,13 +371,16 @@ const CompletionResolverInfo = CASContainer{Union{Nothing,Tuple{Module,LSPostPro
371371
# Type aliases for concurrent updates using LWContainer (non-retriable operations)
372372
const ConfigManager = LWContainer{ConfigManagerData, LWStats}
373373

374+
const HandledHistory = FixedSizeFIFOQueue{Union{Int,String}}
375+
374376
mutable struct ServerState
375377
const file_cache::FileCache # syntactic analysis cache (synced with `textDocument/didChange`)
376378
const saved_file_cache::SavedFileCache # syntactic analysis cache (synced with `textDocument/didSave`)
377379
const testsetinfos_cache::TestsetInfosCache
378380
const analysis_manager::AnalysisManager
379381
const extra_diagnostics::ExtraDiagnostics
380382
const currently_handled::CurrentlyHandled
383+
const handled_history::HandledHistory
381384
const currently_requested::CurrentlyRequested
382385
const currently_registered::CurrentlyRegistered
383386
const config_manager::ConfigManager
@@ -397,6 +400,7 @@ mutable struct ServerState
397400
#=analysis_manager=# AnalysisManager(#=n_workers=# 1), # TODO multiple workers
398401
#=extra_diagnostics=# ExtraDiagnostics(ExtraDiagnosticsData()),
399402
#=currently_handled=# CurrentlyHandled(),
403+
#=handled_history=# HandledHistory(128),
400404
#=currently_requested=# CurrentlyRequested(Base.PersistentDict{String,RequestCaller}()),
401405
#=currently_registered=# CurrentlyRegistered(Set{Registered}()),
402406
#=config_manager=# ConfigManager(ConfigManagerData()),
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
module test_FixedSizeFIFOQueue
2+
3+
using Test
4+
using JETLS: FixedSizeFIFOQueue, capacity, isfull
5+
6+
struct TestType
7+
x::Int
8+
end
9+
10+
mutable struct MutableTestType
11+
x::Int
12+
data::Vector{Int}
13+
end
14+
15+
@testset "FixedSizeFIFOQueue basic operations" begin
16+
let q = FixedSizeFIFOQueue{Int}(3)
17+
@test isempty(q)
18+
@test !isfull(q)
19+
@test length(q) == 0
20+
@test capacity(q) == 3
21+
22+
push!(q, 1)
23+
@test !isempty(q)
24+
@test length(q) == 1
25+
@test 1 in q
26+
@test !(2 in q)
27+
28+
push!(q, 2)
29+
push!(q, 3)
30+
@test isfull(q)
31+
@test length(q) == 3
32+
@test 1 in q && 2 in q && 3 in q
33+
end
34+
35+
let q = FixedSizeFIFOQueue{Int}(3)
36+
push!(q, 1)
37+
push!(q, 2)
38+
push!(q, 3)
39+
push!(q, 4) # Should overwrite 1
40+
@test isfull(q)
41+
@test length(q) == 3
42+
@test !(1 in q)
43+
@test 2 in q && 3 in q && 4 in q
44+
45+
push!(q, 5) # Should overwrite 2
46+
@test !(2 in q)
47+
@test 3 in q && 4 in q && 5 in q
48+
end
49+
end
50+
51+
@testset "FixedSizeFIFOQueue type flexibility" begin
52+
let q = FixedSizeFIFOQueue(2)
53+
push!(q, "hello")
54+
push!(q, 42)
55+
@test "hello" in q
56+
@test 42 in q
57+
58+
push!(q, :symbol)
59+
@test !("hello" in q)
60+
@test 42 in q
61+
@test :symbol in q
62+
end
63+
64+
let q = FixedSizeFIFOQueue{TestType}(2)
65+
t1 = TestType(1)
66+
t2 = TestType(2)
67+
t3 = TestType(3)
68+
69+
push!(q, t1)
70+
push!(q, t2)
71+
@test t1 in q
72+
@test t2 in q
73+
74+
push!(q, t3)
75+
@test !(t1 in q)
76+
@test t2 in q
77+
@test t3 in q
78+
end
79+
end
80+
81+
@testset "FixedSizeFIFOQueue edge cases" begin
82+
let q = FixedSizeFIFOQueue{String}(1)
83+
push!(q, "a")
84+
@test "a" in q
85+
push!(q, "b")
86+
@test !("a" in q)
87+
@test "b" in q
88+
end
89+
90+
@test_throws ArgumentError FixedSizeFIFOQueue{Int}(0)
91+
@test_throws ArgumentError FixedSizeFIFOQueue{Int}(-1)
92+
end
93+
94+
@testset "FixedSizeFIFOQueue collect and display" begin
95+
let q = FixedSizeFIFOQueue{Int}(3)
96+
push!(q, 1)
97+
push!(q, 2)
98+
push!(q, 3)
99+
100+
items = collect(q)
101+
@test items == [1, 2, 3]
102+
103+
push!(q, 4) # Overwrites 1
104+
items = collect(q)
105+
@test items == [2, 3, 4]
106+
107+
str = string(q)
108+
@test occursin("FixedSizeFIFOQueue{Int", str)
109+
@test occursin("capacity=3", str)
110+
@test occursin("[2, 3, 4]", str)
111+
end
112+
end
113+
114+
@testset "FixedSizeFIFOQueue large capacity" begin
115+
let q = FixedSizeFIFOQueue{Union{Int,String}}(1000)
116+
for i in 1:500
117+
push!(q, i)
118+
end
119+
for i in 501:1000
120+
push!(q, "id-$i")
121+
end
122+
123+
@test isfull(q)
124+
@test 1 in q
125+
@test 500 in q
126+
@test "id-501" in q
127+
@test "id-1000" in q
128+
129+
push!(q, "extra-1")
130+
@test !(1 in q) # First element should be evicted
131+
@test 2 in q # Second element should still be there
132+
@test "extra-1" in q
133+
134+
for i in 1:100
135+
push!(q, "new-$i")
136+
end
137+
138+
@test !(2 in q)
139+
@test !(100 in q)
140+
@test "new-100" in q
141+
end
142+
end
143+
144+
@testset "FixedSizeFIFOQueue garbage collection" begin
145+
let q = FixedSizeFIFOQueue{MutableTestType}(2)
146+
obj1 = MutableTestType(1, [1,2,3])
147+
obj2 = MutableTestType(2, [4,5,6])
148+
obj3 = MutableTestType(3, [7,8,9])
149+
150+
push!(q, obj1)
151+
push!(q, obj2)
152+
push!(q, obj3)
153+
154+
@test obj2 in q
155+
@test obj3 in q
156+
@test !(obj1 in q.data)
157+
@test !(obj1 in q.items)
158+
end
159+
end
160+
161+
end # module test_FixedSizeFIFOQueue

test/runtests.jl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ include("setup.jl")
77
end
88

99
@testset "AtomicContainers" include("AtomicContainers/test_AtomicContainers.jl")
10+
@testset "FixedSizeFIFOQueue" include("FixedSizeFIFOQueue/test_FixedSizeFIFOQueue.jl")
1011

1112
@testset "JETLS" begin
1213
@testset "utils" begin

0 commit comments

Comments
 (0)