Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .tool-versions
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
erlang 27.3.4
erlang 28.3.1
elixir 1.18.4
pipx 1.8.0
19 changes: 12 additions & 7 deletions lib/reactor/step/map.ex
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,21 @@ defmodule Reactor.Step.Map do
emit_batch(source, options, map_step, result)
end

defp collect_all_step_names(steps, into \\ MapSet.new())
defp collect_all_step_names([], into), do: into
defp collect_all_step_names(steps) do
steps
|> do_collect_all_step_names([])
|> MapSet.new()
end

defp do_collect_all_step_names([], acc), do: acc

defp collect_all_step_names([%{steps: [_ | _] = child_steps} = step | steps], into) do
into = collect_all_step_names(child_steps, MapSet.put(into, step.name))
collect_all_step_names(steps, into)
defp do_collect_all_step_names([%{steps: [_ | _] = child_steps} = step | steps], acc) do
acc = do_collect_all_step_names(child_steps, [step.name | acc])
do_collect_all_step_names(steps, acc)
end

defp collect_all_step_names([step | steps], into),
do: collect_all_step_names(steps, MapSet.put(into, step.name))
defp do_collect_all_step_names([step | steps], acc),
do: do_collect_all_step_names(steps, [step.name | acc])

defp emit_batch(source, options, map_step, result) do
with {:done, batch} <- Iter.take_chunk(source, options[:batch_size]),
Expand Down