Skip to content

Commit 44d8445

Browse files
andreasrongeclaude
andcommitted
fix(mcp_server): unblock streaming MCP clients (Claude Code, Inspector)
The stdio loop hung every streaming MCP client because `IO.binread(io, 4096)` ran inline in the GenServer and only returned at 4096 bytes or EOF. Real clients (Claude Code, Claude Desktop, Cursor, Cline, Inspector) write one ~250-byte `initialize` line and wait for a reply, so the read never returned and worker `:async_reply` messages couldn't be drained from the mailbox. The temp-file integration suite missed it because file-backed pipes always EOF. Switch to line-mode reads done in a dedicated `spawn_monitor`'d reader process that forwards each frame as a `{:stdin_line, ...}` message; the GenServer is never IO-blocked and async replies interleave naturally. Treat EOF-during-`exit`-drain as a no-op so the grace timer can finish delivering in-flight replies (the old temp-file `tools/call` test passed by timing luck once reads were serial). Cancel the grace timer on clean drain to avoid an orphan `:exit_grace_elapsed` message. Add a `terminate/2` that kills the orphaned reader on test-mode stop. New regression test (`streaming_stdio_test.exs`) drives the release as an Erlang `Port` with a persistent stdin and exercises both `initialize` and `tools/call (+ 1 2 3)`. Without the fix, this hangs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d38c985 commit 44d8445

4 files changed

Lines changed: 349 additions & 36 deletions

File tree

Plans/ptc-runner-mcp-server.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,10 @@ program completes; ordering is not preserved.
367367

368368
| Event | Server behavior |
369369
|---|---|
370-
| stdin EOF | Cancel all in-flight sandboxes; emit no further responses; exit 0. |
370+
| stdin EOF (no `exit` drain in progress) | Cancel all in-flight sandboxes; emit no further responses; exit 0. |
371+
| stdin EOF (during `exit` drain) | Defer to row 2 — the grace-period drain finishes first. File-backed clients (and `Port`-driven test runners) hit EOF the moment after writing the `exit` frame; tearing down here would race the in-flight reply. |
371372
| `shutdown` request (if sent) | Reply `null`; transition to drain; on subsequent `exit` notification, exit 0. |
373+
| `exit` notification (workers in flight) | Set `exit_pending`; schedule a 2 s grace timer; once `in_flight` drains, reply-then-stop and cancel the timer. If grace elapses, force-kill workers and stop. |
372374
| `notifications/cancelled` for in-flight ID | Kill the sandbox process; emit no response for that ID. |
373375
| `notifications/cancelled` for unknown/already-completed ID | Ignore silently. |
374376
| Unhandled BEAM crash in request handler | Log to stderr; emit `-32603 Internal error` referencing the request ID; continue serving. |

mcp_server/lib/ptc_runner_mcp/stdio.ex

Lines changed: 165 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ defmodule PtcRunnerMcp.Stdio do
5252
observer: pid() | nil,
5353
in_flight: %{optional(term()) => %{pid: pid(), ref: reference()}},
5454
workers: %{optional(pid()) => term()},
55-
exit_pending: boolean()
55+
exit_pending: boolean(),
56+
reader: {pid(), reference()} | nil,
57+
exit_grace_ref: reference() | nil
5658
}
5759

5860
defstruct io: :stdio,
@@ -83,7 +85,17 @@ defmodule PtcRunnerMcp.Stdio do
8385
workers: %{},
8486
# Set after `exit` notification arrives; once `in_flight`
8587
# is empty (or grace period elapses) we stop.
86-
exit_pending: false
88+
exit_pending: false,
89+
# `{pid, monitor_ref}` of the dedicated stdin reader spawned
90+
# by `init/1` when `auto_read: true`. `nil` when reading is
91+
# driven externally (tests using `feed/2`).
92+
reader: nil,
93+
# Reference of the in-flight `Process.send_after/3` grace
94+
# timer scheduled in `apply_lifecycle(:exit)`. We cancel it
95+
# in `maybe_finalize_exit/1` once `in_flight` empties so
96+
# `:exit_grace_elapsed` doesn't leak into the mailbox after
97+
# a clean drain (Codex review of streaming-stdio fix).
98+
exit_grace_ref: nil
8799
end
88100

89101
@doc """
@@ -122,41 +134,100 @@ defmodule PtcRunnerMcp.Stdio do
122134
auto_read: Keyword.get(opts, :auto_read, true)
123135
}
124136

125-
if state.auto_read, do: send(self(), :read)
137+
# Run the blocking `IO.binread/2` in a dedicated reader process
138+
# rather than this GenServer. If we read inline, `IO.binread`
139+
# blocks the GenServer's receive loop, so worker `:async_reply`
140+
# messages sit unread in the mailbox until the next stdin line
141+
# arrives — that hangs every `tools/call` reply for streaming
142+
# clients (which only write one frame and wait). The reader
143+
# forwards each line as `{:stdin_line, {:data, line}}` and exits
144+
# `:normal` after sending `{:stdin_line, :eof}`. `spawn_monitor`
145+
# (no link) matches the worker pattern: a reader crash cannot
146+
# take stdio down — the `:DOWN` is converted to a synthetic
147+
# `{:stdin_line, {:error, reason}}` so the existing read-error
148+
# path runs.
149+
state =
150+
if state.auto_read do
151+
io = state.io
152+
parent = self()
153+
{pid, ref} = spawn_monitor(fn -> reader_loop(io, parent) end)
154+
%{state | reader: {pid, ref}}
155+
else
156+
state
157+
end
158+
126159
{:ok, state}
127160
end
128161

129-
@impl GenServer
130-
def handle_info(:read, %State{io: io} = state) do
131-
case IO.binread(io, 4096) do
162+
# `IO.binread(io, :line)` returns each line **with** its trailing
163+
# `\n`, which `process_chunk/2` then walks byte-by-byte to flush
164+
# `state.buffer`. If the underlying device is ever switched to a
165+
# mode that strips the LF, the line walker will silently buffer
166+
# without dispatching — keep this contract in sync.
167+
defp reader_loop(io, parent) do
168+
case IO.binread(io, :line) do
132169
:eof ->
133-
Log.log(:info, "stdin_eof")
134-
# § 6.4 row 1: cancel all in-flight workers, no further replies.
135-
state = cancel_all_workers(state, :stdin_eof)
136-
notify_observer(state, {:exited, :eof})
170+
send(parent, {:stdin_line, :eof})
137171

138-
if state.observer == nil do
139-
System.stop(0)
140-
end
172+
{:error, reason} ->
173+
send(parent, {:stdin_line, {:error, reason}})
141174

142-
{:stop, :normal, state}
175+
data when is_binary(data) ->
176+
send(parent, {:stdin_line, {:data, data}})
177+
reader_loop(io, parent)
178+
end
179+
end
143180

144-
{:error, reason} ->
145-
Log.log(:error, "stdin_read_error", %{reason: inspect(reason)})
146-
state = cancel_all_workers(state, {:read_error, reason})
147-
notify_observer(state, {:exited, {:error, reason}})
181+
@impl GenServer
182+
def handle_info({:stdin_line, :eof}, %State{exit_pending: true} = state) do
183+
# An `exit` notification is already draining in-flight workers
184+
# under a grace period (§ 6.4 row 2). Don't tear them down on EOF
185+
# too — the file-backed temp-file integration runner always hits
186+
# EOF the moment after writing the `exit` frame. The drain logic
187+
# will stop us cleanly via `maybe_finalize_exit/1` (or
188+
# `:exit_grace_elapsed` as a safety net).
189+
#
190+
# Invariant: when `exit_pending: true`, a live grace timer MUST
191+
# exist — that's the only forward-progress guarantee once we
192+
# ignore EOF. Any future change that cancels the timer without
193+
# also clearing `exit_pending` would hang the server here.
194+
true = is_reference(state.exit_grace_ref)
195+
Log.log(:debug, "stdin_eof_during_exit_drain")
196+
{:noreply, state}
197+
end
148198

149-
if state.observer == nil do
150-
System.stop(0)
151-
end
199+
# Anything buffered in `state.buffer` at this point is an
200+
# unterminated partial line: silently drop it. § 6.4 row 1's
201+
# invariant ("no further responses on EOF") means we wouldn't reply
202+
# to it anyway, parse-error or otherwise.
152203

153-
{:stop, :normal, state}
204+
def handle_info({:stdin_line, :eof}, state) do
205+
Log.log(:info, "stdin_eof")
206+
# § 6.4 row 1: cancel all in-flight workers, no further replies.
207+
state = cancel_all_workers(state, :stdin_eof)
208+
notify_observer(state, {:exited, :eof})
154209

155-
data when is_binary(data) ->
156-
new_state = process_chunk(state, data)
157-
send(self(), :read)
158-
{:noreply, new_state}
210+
if state.observer == nil do
211+
System.stop(0)
159212
end
213+
214+
{:stop, :normal, state}
215+
end
216+
217+
def handle_info({:stdin_line, {:error, reason}}, state) do
218+
Log.log(:error, "stdin_read_error", %{reason: inspect(reason)})
219+
state = cancel_all_workers(state, {:read_error, reason})
220+
notify_observer(state, {:exited, {:error, reason}})
221+
222+
if state.observer == nil do
223+
System.stop(0)
224+
end
225+
226+
{:stop, :normal, state}
227+
end
228+
229+
def handle_info({:stdin_line, {:data, data}}, state) do
230+
{:noreply, process_chunk(state, data)}
160231
end
161232

162233
# Worker reply: write the success_reply, demonitor, release permit,
@@ -185,14 +256,38 @@ defmodule PtcRunnerMcp.Stdio do
185256
end
186257
end
187258

259+
# `:DOWN` from either the dedicated stdin reader or a per-call
260+
# worker. Reader DOWN gets dispatched first (it's the rarer case
261+
# and a one-shot identity check), then we fall through to the
262+
# worker dispatch table.
263+
@impl GenServer
264+
def handle_info({:DOWN, ref, :process, _pid, reason}, %State{reader: {_, ref}} = state)
265+
when is_reference(ref) do
266+
state = %{state | reader: nil}
267+
268+
case reason do
269+
:normal ->
270+
# Reader exited cleanly after sending `{:stdin_line, :eof}` /
271+
# `{:stdin_line, {:error, _}}`. The corresponding handler
272+
# already ran (or is queued). Nothing else to do.
273+
{:noreply, state}
274+
275+
_crash ->
276+
# Reader crashed mid-`IO.binread`. Synthesize a read error so
277+
# the existing shutdown path runs — same observable behavior
278+
# as the OTP IO server returning `{:error, reason}`.
279+
send(self(), {:stdin_line, {:error, reason}})
280+
{:noreply, state}
281+
end
282+
end
283+
188284
# Worker DOWN: the worker process exited. Either:
189285
# * we already removed it via :async_reply (impossible — we
190286
# demonitor with :flush on that path) — covered by `:error` arm.
191287
# * the worker crashed (release permit, optionally reply with
192288
# internal error if it never sent {:async_reply}).
193289
# * the worker was killed via `notifications/cancelled` (release
194290
# permit, NO reply per § 6.4 row 3).
195-
@impl GenServer
196291
def handle_info({:DOWN, _ref, :process, pid, reason}, state) do
197292
case Map.fetch(state.workers, pid) do
198293
{:ok, request_id} ->
@@ -247,6 +342,21 @@ defmodule PtcRunnerMcp.Stdio do
247342

248343
def handle_info({:exit_grace_elapsed, _ref}, state), do: {:noreply, state}
249344

345+
# On clean stop, the reader (if any) is still parked in
346+
# `IO.binread/2`. In production the BEAM is `System.stop`'d so the
347+
# whole VM exits and the reader dies with it. In tests we stop the
348+
# GenServer normally — without a `Process.exit/2` here the reader
349+
# would leak until its underlying `:io` device closes (often never,
350+
# for `StringIO`).
351+
@impl GenServer
352+
def terminate(_reason, %State{reader: {pid, ref}}) do
353+
Process.demonitor(ref, [:flush])
354+
Process.exit(pid, :kill)
355+
:ok
356+
end
357+
358+
def terminate(_reason, _state), do: :ok
359+
250360
# ----------------------------------------------------------------
251361
# Public test entry: feed a chunk of bytes and run dispatch inline.
252362
# ----------------------------------------------------------------
@@ -418,9 +528,8 @@ defmodule PtcRunnerMcp.Stdio do
418528
# stdin chunk (codex review of 0fe4c78). Phase 1's invariant
419529
# "no work after exit" must hold even when in-flight workers
420530
# are still completing.
421-
ref = make_ref()
422-
Process.send_after(self(), {:exit_grace_elapsed, ref}, @exit_grace_ms)
423-
%{state | exit_pending: true, exited: true}
531+
ref = Process.send_after(self(), {:exit_grace_elapsed, make_ref()}, @exit_grace_ms)
532+
%{state | exit_pending: true, exited: true, exit_grace_ref: ref}
424533
end
425534
end
426535

@@ -592,11 +701,14 @@ defmodule PtcRunnerMcp.Stdio do
592701

593702
# If we're in `exit_pending` (saw `exit` with workers in flight)
594703
# and `in_flight` is now empty, finalize exit: notify observer,
595-
# System.stop in production.
704+
# System.stop in production. Cancel the grace-period timer so a
705+
# late `:exit_grace_elapsed` doesn't land in the mailbox after a
706+
# clean drain.
596707
defp maybe_finalize_exit(%State{exit_pending: true, in_flight: m} = state)
597708
when map_size(m) == 0 do
598709
Log.log(:info, "exit_drained", %{})
599710
notify_observer(state, {:exited, :exit_method})
711+
state = cancel_exit_grace_timer(state)
600712

601713
if state.observer == nil do
602714
System.stop(0)
@@ -607,6 +719,28 @@ defmodule PtcRunnerMcp.Stdio do
607719

608720
defp maybe_finalize_exit(state), do: state
609721

722+
defp cancel_exit_grace_timer(%State{exit_grace_ref: nil} = state), do: state
723+
724+
defp cancel_exit_grace_timer(%State{exit_grace_ref: ref} = state) when is_reference(ref) do
725+
# `Process.cancel_timer/1` returns `false` if the message has
726+
# already been delivered. Flush it from the mailbox so the
727+
# catch-all clause at `handle_info({:exit_grace_elapsed, _}, …)`
728+
# never sees a stale reference after we've finalized.
729+
case Process.cancel_timer(ref) do
730+
false ->
731+
receive do
732+
{:exit_grace_elapsed, _ref} -> :ok
733+
after
734+
0 -> :ok
735+
end
736+
737+
_ms ->
738+
:ok
739+
end
740+
741+
%{state | exit_grace_ref: nil}
742+
end
743+
610744
# ----------------------------------------------------------------
611745
# JSON-RPC reply construction (kept here so JsonRpc stays pure).
612746
# ----------------------------------------------------------------

0 commit comments

Comments
 (0)