Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 24 additions & 11 deletions LSP/src/communication.jl
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,23 @@ mutable struct Endpoint

local endpoint_ref = Ref{Endpoint}()

read_task = Threads.@spawn :interactive while true
msg = @something try
readlsp(in)
catch err
err_handler(#=isread=#true, err, catch_backtrace())
continue
end break # terminate this task loop when the stream is closed
(!isassigned(endpoint_ref) || isopen(endpoint_ref[])) || break
put!(in_msg_queue, msg)
GC.safepoint()
read_task = Threads.@spawn :interactive begin
while true
msg = @something try
readlsp(in)
catch err
err_handler(#=isread=#true, err, catch_backtrace())
continue
end break # terminate this task loop when the stream is closed
(!isassigned(endpoint_ref) || isopen(endpoint_ref[])) || break
put!(in_msg_queue, msg)
GC.safepoint()
end
# Send a sentinel to unblock `take!` in `iterate` — without this,
# the server loop hangs forever when the input stream closes.
# Guard with `isopen` since `close(endpoint)` may have already
# closed the channel during normal shutdown.
isopen(in_msg_queue) && put!(in_msg_queue, nothing)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When read_task exits during normal shutdown, close(endpoint) may have already closed in_msg_queue. Without this guard, put! might throw InvalidStateException on the closed channel. The exception is silently swallowed (since read_task is never waited on), but the guard makes the intent explicit and avoids the unnecessary exception.

end

write_task = Threads.@spawn :interactive for msg in out_msg_queue
Expand Down Expand Up @@ -156,13 +163,19 @@ check_dead_endpoint!(endpoint::Endpoint) = isopen(endpoint) || error("Endpoint i
function Base.flush(endpoint::Endpoint)
check_dead_endpoint!(endpoint)
while isready(endpoint.out_msg_queue)
istaskdone(endpoint.write_task) && break
yield()
end
end

function Base.iterate(endpoint::Endpoint, _=nothing)
isopen(endpoint) || return nothing
return take!(endpoint.in_msg_queue), nothing
msg = take!(endpoint.in_msg_queue)
# `nothing` is a sentinel from `read_task` signaling that the input
# stream has closed (e.g. client process died). End iteration so the
# server loop can proceed to its `finally` cleanup as usual.
msg === nothing && return nothing
return msg, nothing
end

"""
Expand Down
Loading