Skip to content

Commit 9e32ab4

Browse files
committed
Handle premature termination of the stream
1 parent 4bebd4c commit 9e32ab4

File tree

2 files changed

+144
-115
lines changed

2 files changed

+144
-115
lines changed

lib/ex_cmd/stream.ex

Lines changed: 106 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ defmodule ExCmd.Stream do
2626
defmodule Sink do
2727
@moduledoc false
2828

29-
@type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
29+
@type t :: %__MODULE__{process: Process.t()}
3030

31-
defstruct [:process, :ignore_epipe]
31+
defstruct [:process]
3232

3333
defimpl Collectable do
3434
def into(%{process: process}) do
@@ -93,61 +93,62 @@ defmodule ExCmd.Stream do
9393
end
9494

9595
defimpl Enumerable do
96-
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
96+
defmodule State do
97+
@moduledoc false
98+
@enforce_keys [:process, :writer_task]
99+
defstruct [
100+
:process,
101+
:writer_task,
102+
:max_chunk_size,
103+
:exit_timeout,
104+
:ignore_epipe,
105+
:stream_exit_status,
106+
process_status: :running
107+
]
108+
end
109+
97110
def reduce(arg, acc, fun) do
98-
start_fun = fn ->
99-
state = start_process(arg)
100-
{state, :running}
101-
end
111+
start_fun = fn -> start_process(arg) end
112+
next_fun = &read_next/1
113+
after_fun = &cleanup/1
114+
115+
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
116+
end
117+
118+
defp read_next(%State{process_status: :running} = state) do
119+
case Process.read(state.process, state.max_chunk_size) do
120+
{:ok, data} ->
121+
{[IO.iodata_to_binary(data)], state}
122+
123+
:eof ->
124+
state = stop_process(state)
102125

103-
next_fun = fn
104-
{state, :exited} ->
105-
{:halt, {state, :exited}}
106-
107-
{state, exit_state} ->
108-
%{
109-
process: process,
110-
stream_opts: %{
111-
stream_exit_status: stream_exit_status,
112-
max_chunk_size: max_chunk_size
113-
}
114-
} = state
115-
116-
case Process.read(process, max_chunk_size) do
117-
:eof when stream_exit_status == false ->
118-
{:halt, {state, :eof}}
119-
120-
:eof when stream_exit_status == true ->
121-
elem = [await_exit(state, :eof)]
122-
{elem, {state, :exited}}
123-
124-
{:ok, x} ->
125-
elem = [IO.iodata_to_binary(x)]
126-
{elem, {state, exit_state}}
127-
128-
{:error, errno} ->
129-
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
126+
if state.stream_exit_status do
127+
{[state.process_status], state}
128+
else
129+
{:halt, state}
130130
end
131+
132+
{:error, errno} ->
133+
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
131134
end
135+
end
132136

133-
after_fun = fn
134-
{_state, :exited} ->
135-
:ok
137+
defp read_next(%State{} = state), do: {:halt, state}
136138

137-
{state, exit_state} ->
138-
case await_exit(state, exit_state) do
139-
{:exit, {:status, 0}} ->
140-
:ok
139+
defp cleanup(state) do
140+
state = stop_process(state)
141+
raise_on_abnormal_exit(state)
142+
end
141143

142-
{:exit, {:status, exit_status}} ->
143-
raise AbnormalExit, exit_status
144+
defp raise_on_abnormal_exit(%State{stream_exit_status: true}), do: :ok
144145

145-
{:exit, :epipe} ->
146-
raise AbnormalExit, :epipe
147-
end
146+
defp raise_on_abnormal_exit(%State{process_status: status}) do
147+
case status do
148+
{:exit, {:status, 0}} -> :ok
149+
{:exit, {:status, code}} -> raise AbnormalExit, code
150+
{:error, reason} -> raise AbnormalExit, reason
148151
end
149-
150-
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
151152
end
152153

153154
def count(_stream) do
@@ -162,95 +163,85 @@ defmodule ExCmd.Stream do
162163
{:error, __MODULE__}
163164
end
164165

165-
defp start_process(%ExCmd.Stream{
166-
process_opts: process_opts,
167-
stream_opts: stream_opts,
168-
cmd_with_args: cmd_with_args
169-
}) do
170-
process_opts = Keyword.put(process_opts, :stderr, stream_opts[:stderr])
171-
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
172-
sink = %Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]}
173-
writer_task = start_input_streamer(sink, stream_opts.input)
174-
175-
%{process: process, stream_opts: stream_opts, writer_task: writer_task}
166+
defp start_process(%ExCmd.Stream{} = stream) do
167+
opts = stream.stream_opts
168+
169+
process_opts = Keyword.put(stream.process_opts, :stderr, opts.stderr)
170+
{:ok, process} = Process.start_link(stream.cmd_with_args, process_opts)
171+
172+
sink = %Sink{process: process}
173+
writer_task = start_input_streamer(sink, opts.input)
174+
175+
%State{
176+
process: process,
177+
writer_task: writer_task,
178+
max_chunk_size: opts.max_chunk_size,
179+
exit_timeout: opts.exit_timeout,
180+
ignore_epipe: opts.ignore_epipe,
181+
stream_exit_status: opts.stream_exit_status
182+
}
176183
end
177184

178-
@doc false
179-
@spec start_input_streamer(term, term) :: Task.t()
180185
defp start_input_streamer(%Sink{process: process} = sink, input) do
181186
case input do
182187
:no_input ->
183-
# use `Task.completed(:ok)` when bumping min Elixir requirement
184188
Task.async(fn -> :ok end)
185189

186190
{:enumerable, enum} ->
187-
Task.async(fn ->
188-
Process.change_pipe_owner(process, :stdin, self())
189-
190-
try do
191-
Enum.into(enum, sink)
192-
rescue
193-
Error ->
194-
{:error, :epipe}
195-
end
196-
end)
191+
stream_to_sink(process, fn -> Enum.into(enum, sink) end)
197192

198193
{:collectable, func} ->
199-
Task.async(fn ->
200-
Process.change_pipe_owner(process, :stdin, self())
201-
202-
try do
203-
func.(sink)
204-
rescue
205-
Error ->
206-
{:error, :epipe}
207-
end
208-
end)
194+
stream_to_sink(process, fn -> func.(sink) end)
209195
end
210196
end
211197

212-
defp await_exit(state, exit_state) do
213-
%{
214-
process: process,
215-
stream_opts: %{ignore_epipe: ignore_epipe, exit_timeout: exit_timeout},
216-
writer_task: writer_task
217-
} = state
218-
219-
result = Process.await_exit(process, exit_timeout)
220-
writer_task_status = Task.await(writer_task)
198+
defp stream_to_sink(process, write_fn) do
199+
Task.async(fn ->
200+
Process.change_pipe_owner(process, :stdin, self())
221201

222-
case {exit_state, result, writer_task_status} do
223-
# if reader exit early and there is a pending write
224-
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
225-
{:exit, {:status, 0}}
226-
227-
# :killed might be due to SIGPIPE / EPIPE
228-
{:running, {:error, :killed}, {:error, :epipe}} when ignore_epipe ->
229-
{:exit, {:status, 0}}
202+
try do
203+
write_fn.()
204+
rescue
205+
Error -> {:error, :epipe}
206+
end
207+
end)
208+
end
230209

231-
# if reader exit early and there is no pending write or if
232-
# there is no writer
233-
{:running, {:ok, _status}, :ok} when ignore_epipe ->
234-
{:exit, {:status, 0}}
210+
defp stop_process(state) do
211+
status =
212+
case await_exit(state) do
213+
{:exit, term} ->
214+
{:exit, term}
215+
216+
{:error, reason} ->
217+
if state.ignore_epipe do
218+
{:exit, {:status, 0}}
219+
else
220+
{:error, reason}
221+
end
222+
end
235223

236-
{:running, {:error, :killed}, :ok} when ignore_epipe ->
237-
{:exit, {:status, 0}}
224+
%{state | process_status: status}
225+
end
238226

239-
# if we get epipe from writer then raise that error, and ignore exit status
240-
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
241-
{:exit, :epipe}
227+
@spec await_exit(map) :: {:exit, {:status, non_neg_integer}} | {:error, atom}
228+
defp await_exit(%{process_status: :running} = state) do
229+
process_result = Process.await_exit(state.process, state.exit_timeout)
230+
writer_task_status = Task.await(state.writer_task)
242231

243-
{:running, {:error, :killed}, {:error, :epipe}} when ignore_epipe == false ->
244-
{:exit, :epipe}
232+
case {process_result, writer_task_status} do
233+
{_process_exit_status, {:error, :epipe}} ->
234+
{:error, :epipe}
245235

246-
# Normal exit success case
247-
{_, {:ok, 0}, _} ->
248-
{:exit, {:status, 0}}
236+
{{:ok, status}, :ok} ->
237+
{:exit, {:status, status}}
249238

250-
{:eof, {:ok, exit_status}, _} ->
251-
{:exit, {:status, exit_status}}
239+
{{:error, reason}, _writer_status} ->
240+
{:error, reason}
252241
end
253242
end
243+
244+
defp await_exit(%{process_status: status}), do: status
254245
end
255246

256247
@spec normalize_input(term) ::

test/ex_cmd_test.exs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ defmodule ExCmdTest do
2828
end
2929
end
3030

31+
test "stream/2 early termination with non-zero exit does not raise" do
32+
result =
33+
ExCmd.stream(["sh", "-c", "echo foo; exit 120"])
34+
|> Enum.take(1)
35+
36+
assert result == ["foo\n"]
37+
end
38+
39+
test "stream!/2 early termination with non-zero exit raises" do
40+
assert_raise ExCmd.Stream.AbnormalExit, "program exited with exit status: 120", fn ->
41+
ExCmd.stream!(["sh", "-c", "echo foo; exit 120"])
42+
|> Enum.take(1)
43+
end
44+
end
45+
3146
test "abnormal exit status" do
3247
proc_stream = ExCmd.stream!(["sh", "-c", "exit 5"])
3348

@@ -44,4 +59,27 @@ defmodule ExCmdTest do
4459

4560
assert exit_status == 5
4661
end
62+
63+
test "stream/2 returns exit status as last element" do
64+
result =
65+
ExCmd.stream(["sh", "-c", "echo foo; exit 42"])
66+
|> Enum.to_list()
67+
68+
assert result == ["foo\n", {:exit, {:status, 42}}]
69+
end
70+
71+
test "stream!/2 early termination with ignore_epipe does not raise" do
72+
result =
73+
ExCmd.stream!(["cat"], input: Stream.cycle(["data"]), ignore_epipe: true)
74+
|> Enum.take(1)
75+
76+
assert is_binary(hd(result))
77+
end
78+
79+
test "stream!/2 early termination without ignore_epipe raises on epipe" do
80+
assert_raise ExCmd.Stream.AbnormalExit, ~r/epipe/, fn ->
81+
ExCmd.stream!(["cat"], input: Stream.cycle(["data"]))
82+
|> Enum.take(1)
83+
end
84+
end
4785
end

0 commit comments

Comments
 (0)