Skip to content

Commit e93e5a0

Browse files
authored
Merge pull request #51 from akash-akya/dev
Handle premature termination of the stream
2 parents 4bebd4c + a8f8632 commit e93e5a0

File tree

2 files changed

+175
-114
lines changed

2 files changed

+175
-114
lines changed

lib/ex_cmd/stream.ex

Lines changed: 137 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ defmodule ExCmd.Stream do
2626
defmodule Sink do
2727
@moduledoc false
2828

29-
@type t :: %__MODULE__{process: Process.t(), ignore_epipe: boolean}
29+
@type t :: %__MODULE__{process: Process.t()}
3030

31-
defstruct [:process, :ignore_epipe]
31+
defstruct [:process]
3232

3333
defimpl Collectable do
3434
def into(%{process: process}) do
@@ -93,61 +93,64 @@ defmodule ExCmd.Stream do
9393
end
9494

9595
defimpl Enumerable do
96-
# credo:disable-for-next-line Credo.Check.Refactor.CyclomaticComplexity
96+
defmodule State do
97+
@moduledoc false
98+
@enforce_keys [:process, :writer_task]
99+
defstruct [
100+
:process,
101+
:writer_task,
102+
:max_chunk_size,
103+
:exit_timeout,
104+
:ignore_epipe,
105+
:stream_exit_status,
106+
process_status: :running,
107+
stream_reader_status: :started
108+
]
109+
end
110+
97111
def reduce(arg, acc, fun) do
98-
start_fun = fn ->
99-
state = start_process(arg)
100-
{state, :running}
101-
end
112+
start_fun = fn -> start_process(arg) end
113+
next_fun = &read_next/1
114+
after_fun = &cleanup/1
102115

103-
next_fun = fn
104-
{state, :exited} ->
105-
{:halt, {state, :exited}}
106-
107-
{state, exit_state} ->
108-
%{
109-
process: process,
110-
stream_opts: %{
111-
stream_exit_status: stream_exit_status,
112-
max_chunk_size: max_chunk_size
113-
}
114-
} = state
115-
116-
case Process.read(process, max_chunk_size) do
117-
:eof when stream_exit_status == false ->
118-
{:halt, {state, :eof}}
119-
120-
:eof when stream_exit_status == true ->
121-
elem = [await_exit(state, :eof)]
122-
{elem, {state, :exited}}
123-
124-
{:ok, x} ->
125-
elem = [IO.iodata_to_binary(x)]
126-
{elem, {state, exit_state}}
127-
128-
{:error, errno} ->
129-
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
116+
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
117+
end
118+
119+
defp read_next(%State{process_status: :running} = state) do
120+
case Process.read(state.process, state.max_chunk_size) do
121+
{:ok, data} ->
122+
{[IO.iodata_to_binary(data)], state}
123+
124+
:eof ->
125+
state = %State{state | stream_reader_status: :eof}
126+
state = stop_process(state)
127+
128+
if state.stream_exit_status do
129+
{[state.process_status], state}
130+
else
131+
{:halt, state}
130132
end
133+
134+
{:error, errno} ->
135+
raise Error, "failed to read from the external process. errno: #{inspect(errno)}"
131136
end
137+
end
132138

133-
after_fun = fn
134-
{_state, :exited} ->
135-
:ok
139+
defp read_next(%State{} = state), do: {:halt, state}
136140

137-
{state, exit_state} ->
138-
case await_exit(state, exit_state) do
139-
{:exit, {:status, 0}} ->
140-
:ok
141+
defp cleanup(state) do
142+
state = stop_process(state)
143+
raise_on_abnormal_exit(state)
144+
end
141145

142-
{:exit, {:status, exit_status}} ->
143-
raise AbnormalExit, exit_status
146+
defp raise_on_abnormal_exit(%State{stream_exit_status: true}), do: :ok
144147

145-
{:exit, :epipe} ->
146-
raise AbnormalExit, :epipe
147-
end
148+
defp raise_on_abnormal_exit(%State{process_status: status}) do
149+
case status do
150+
{:exit, {:status, 0}} -> :ok
151+
{:exit, {:status, code}} -> raise AbnormalExit, code
152+
{:error, reason} -> raise AbnormalExit, reason
148153
end
149-
150-
Stream.resource(start_fun, next_fun, after_fun).(acc, fun)
151154
end
152155

153156
def count(_stream) do
@@ -162,93 +165,113 @@ defmodule ExCmd.Stream do
162165
{:error, __MODULE__}
163166
end
164167

165-
defp start_process(%ExCmd.Stream{
166-
process_opts: process_opts,
167-
stream_opts: stream_opts,
168-
cmd_with_args: cmd_with_args
169-
}) do
170-
process_opts = Keyword.put(process_opts, :stderr, stream_opts[:stderr])
171-
{:ok, process} = Process.start_link(cmd_with_args, process_opts)
172-
sink = %Sink{process: process, ignore_epipe: stream_opts[:ignore_epipe]}
173-
writer_task = start_input_streamer(sink, stream_opts.input)
174-
175-
%{process: process, stream_opts: stream_opts, writer_task: writer_task}
168+
defp start_process(%ExCmd.Stream{} = stream) do
169+
opts = stream.stream_opts
170+
171+
process_opts = Keyword.put(stream.process_opts, :stderr, opts.stderr)
172+
{:ok, process} = Process.start_link(stream.cmd_with_args, process_opts)
173+
174+
sink = %Sink{process: process}
175+
writer_task = start_input_streamer(sink, opts.input)
176+
177+
%State{
178+
process: process,
179+
writer_task: writer_task,
180+
max_chunk_size: opts.max_chunk_size,
181+
exit_timeout: opts.exit_timeout,
182+
ignore_epipe: opts.ignore_epipe,
183+
stream_exit_status: opts.stream_exit_status
184+
}
176185
end
177186

178-
@doc false
179-
@spec start_input_streamer(term, term) :: Task.t()
180187
defp start_input_streamer(%Sink{process: process} = sink, input) do
181188
case input do
182189
:no_input ->
183-
# use `Task.completed(:ok)` when bumping min Elixir requirement
184-
Task.async(fn -> :ok end)
190+
Task.async(fn -> :no_input end)
185191

186192
{:enumerable, enum} ->
187-
Task.async(fn ->
188-
Process.change_pipe_owner(process, :stdin, self())
189-
190-
try do
191-
Enum.into(enum, sink)
192-
rescue
193-
Error ->
194-
{:error, :epipe}
195-
end
196-
end)
193+
stream_to_sink(process, fn -> Enum.into(enum, sink) end)
197194

198195
{:collectable, func} ->
199-
Task.async(fn ->
200-
Process.change_pipe_owner(process, :stdin, self())
201-
202-
try do
203-
func.(sink)
204-
rescue
205-
Error ->
206-
{:error, :epipe}
207-
end
208-
end)
196+
stream_to_sink(process, fn -> func.(sink) end)
209197
end
210198
end
211199

212-
defp await_exit(state, exit_state) do
213-
%{
214-
process: process,
215-
stream_opts: %{ignore_epipe: ignore_epipe, exit_timeout: exit_timeout},
216-
writer_task: writer_task
217-
} = state
200+
defp stream_to_sink(process, write_fn) do
201+
Task.async(fn ->
202+
Process.change_pipe_owner(process, :stdin, self())
218203

219-
result = Process.await_exit(process, exit_timeout)
220-
writer_task_status = Task.await(writer_task)
204+
try do
205+
write_fn.()
206+
rescue
207+
Error -> {:error, :epipe}
208+
end
209+
end)
210+
end
221211

222-
case {exit_state, result, writer_task_status} do
223-
# if reader exit early and there is a pending write
224-
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe ->
225-
{:exit, {:status, 0}}
212+
defp stop_process(state) do
213+
status =
214+
case await_exit(state) do
215+
{:exit, term} ->
216+
{:exit, term}
217+
218+
{:error, reason} ->
219+
if state.ignore_epipe do
220+
{:exit, {:status, 0}}
221+
else
222+
{:error, reason}
223+
end
224+
end
226225

227-
# :killed might be due to SIGPIPE / EPIPE
228-
{:running, {:error, :killed}, {:error, :epipe}} when ignore_epipe ->
229-
{:exit, {:status, 0}}
226+
%{state | process_status: status}
227+
end
230228

231-
# if reader exit early and there is no pending write or if
232-
# there is no writer
233-
{:running, {:ok, _status}, :ok} when ignore_epipe ->
234-
{:exit, {:status, 0}}
229+
@spec await_exit(map) :: {:exit, {:status, non_neg_integer}} | {:error, atom}
230+
defp await_exit(%{process_status: :running} = state) do
231+
process_result = Process.await_exit(state.process, state.exit_timeout)
232+
writer_task_status = Task.await(state.writer_task)
235233

236-
{:running, {:error, :killed}, :ok} when ignore_epipe ->
237-
{:exit, {:status, 0}}
234+
if state.stream_reader_status != :eof do
235+
handle_early_stream_exit(process_result, writer_task_status)
236+
else
237+
handle_normal_exit(process_result, writer_task_status)
238+
end
239+
end
240+
241+
defp await_exit(%{process_status: status}), do: status
242+
243+
defp handle_early_stream_exit(process_result, writer_task_status) do
244+
case {process_result, writer_task_status} do
245+
# if we don't have input and stream reader exits early then we don't care about
246+
# the exit status, since we are not reading the complete output of the co mmand,
247+
# we can't depend on the exit status.
248+
{{:ok, _status}, :no_input} ->
249+
{:error, :epipe}
250+
251+
# Same as above, the command might be killed due to early stream exit
252+
{{:error, :killed}, :no_input} ->
253+
{:error, :epipe}
254+
255+
_rest ->
256+
handle_normal_exit(process_result, writer_task_status)
257+
end
258+
end
238259

239-
# if we get epipe from writer then raise that error, and ignore exit status
240-
{:running, {:ok, _status}, {:error, :epipe}} when ignore_epipe == false ->
241-
{:exit, :epipe}
260+
defp handle_normal_exit(process_result, writer_task_status) do
261+
case {process_result, writer_task_status} do
262+
# if we writer fails with EPIPE then exist status does not matter
263+
{{:ok, _status}, {:error, :epipe}} ->
264+
{:error, :epipe}
242265

243-
{:running, {:error, :killed}, {:error, :epipe}} when ignore_epipe == false ->
244-
{:exit, :epipe}
266+
# we might be getting `:killed` exit status due to EPIPE
267+
{{:error, :killed}, {:error, :epipe}} ->
268+
{:error, :epipe}
245269

246-
# Normal exit success case
247-
{_, {:ok, 0}, _} ->
248-
{:exit, {:status, 0}}
270+
{{:ok, status}, _writer_status} ->
271+
{:exit, {:status, status}}
249272

250-
{:eof, {:ok, exit_status}, _} ->
251-
{:exit, {:status, exit_status}}
273+
{{:error, reason}, _writer_status} ->
274+
{:error, reason}
252275
end
253276
end
254277
end

test/ex_cmd_test.exs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,21 @@ defmodule ExCmdTest do
2828
end
2929
end
3030

31+
test "stream/2 early termination with non-zero exit does not raise" do
32+
result =
33+
ExCmd.stream(["sh", "-c", "echo foo; exit 120"])
34+
|> Enum.take(1)
35+
36+
assert result == ["foo\n"]
37+
end
38+
39+
test "stream!/2 early termination with non-zero exit raises" do
40+
assert_raise ExCmd.Stream.AbnormalExit, "program exited due to :epipe error", fn ->
41+
ExCmd.stream!(["sh", "-c", "echo foo; exit 120"])
42+
|> Enum.take(1)
43+
end
44+
end
45+
3146
test "abnormal exit status" do
3247
proc_stream = ExCmd.stream!(["sh", "-c", "exit 5"])
3348

@@ -44,4 +59,27 @@ defmodule ExCmdTest do
4459

4560
assert exit_status == 5
4661
end
62+
63+
test "stream/2 returns exit status as last element" do
64+
result =
65+
ExCmd.stream(["sh", "-c", "echo foo; exit 42"])
66+
|> Enum.to_list()
67+
68+
assert result == ["foo\n", {:exit, {:status, 42}}]
69+
end
70+
71+
test "stream!/2 early termination with ignore_epipe does not raise" do
72+
result =
73+
ExCmd.stream!(["cat"], input: Stream.cycle(["data"]), ignore_epipe: true)
74+
|> Enum.take(1)
75+
76+
assert is_binary(hd(result))
77+
end
78+
79+
test "stream!/2 early termination without ignore_epipe raises on epipe" do
80+
assert_raise ExCmd.Stream.AbnormalExit, ~r/epipe/, fn ->
81+
ExCmd.stream!(["cat"], input: Stream.cycle(["data"]))
82+
|> Enum.take(1)
83+
end
84+
end
4785
end

0 commit comments

Comments
 (0)