33# SPDX-License-Identifier: MIT
44
55defmodule Ash.Resource.Transformers.ResolvePipelines do
6- @ moduledoc "Resolves `pipe_through` on actions by injecting pipeline changes/preparations ."
6+ @ moduledoc "Resolves `pipe_through` on actions by replacing PipeThrough entities with pipeline contents in-place ."
77 use Spark.Dsl.Transformer
88
9+ alias Ash.Resource.Actions.PipeThrough
910 alias Spark.Dsl.Transformer
1011 alias Spark.Error.DslError
1112
@@ -23,90 +24,80 @@ defmodule Ash.Resource.Transformers.ResolvePipelines do
2324
2425 dsl_state
2526 |> Transformer . get_entities ( [ :actions ] )
26- |> Enum . filter ( & ( & 1 . pipe_through != [ ] ) )
2727 |> Enum . reduce_while ( { :ok , dsl_state } , fn action , { :ok , dsl_state } ->
28- case resolve_pipe_through ( action , pipelines , dsl_state ) do
29- { :ok , updated_action } ->
30- new_state =
31- Transformer . replace_entity (
32- dsl_state ,
33- [ :actions ] ,
34- updated_action ,
35- & ( & 1 . name == action . name && & 1 . type == action . type )
36- )
37-
38- { :cont , { :ok , new_state } }
39-
40- { :error , error } ->
41- { :halt , { :error , error } }
28+ field = entity_field ( action . type )
29+ entities = Map . get ( action , field , [ ] )
30+
31+ if Enum . any? ( entities , & match? ( % PipeThrough { } , & 1 ) ) do
32+ updated_action = resolve_action ( action , field , entities , pipelines , dsl_state )
33+
34+ new_state =
35+ Transformer . replace_entity (
36+ dsl_state ,
37+ [ :actions ] ,
38+ updated_action ,
39+ & ( & 1 . name == action . name && & 1 . type == action . type )
40+ )
41+
42+ { :cont , { :ok , new_state } }
43+ else
44+ { :cont , { :ok , dsl_state } }
4245 end
4346 end )
4447 end
4548
46- defp resolve_pipe_through ( action , pipelines , dsl_state ) do
47- # Flatten all pipe_through declarations into {name, where_conditions} pairs
48- pairs =
49- Enum . flat_map ( action . pipe_through , fn % { names: names , where: where } ->
50- Enum . map ( names , & { & 1 , where } )
49+ defp resolve_action ( action , field , entities , pipelines , dsl_state ) do
50+ { expanded , arguments , accept } =
51+ Enum . reduce ( entities , { [ ] , [ ] , [ ] } , fn
52+ % PipeThrough { names: names , where: where } , { expanded , arguments , accept } ->
53+ { new_entities , new_arguments , new_accept } =
54+ expand_pipe_through ( names , where , action , pipelines , dsl_state )
55+
56+ { expanded ++ new_entities , arguments ++ new_arguments , accept ++ new_accept }
57+
58+ entity , { expanded , arguments , accept } ->
59+ { expanded ++ [ entity ] , arguments , accept }
5160 end )
5261
53- pairs
54- |> Enum . reduce_while ( { :ok , { [ ] , [ ] , [ ] } } , fn { name , where } ,
55- { :ok , { entities , arguments , accept } } ->
62+ action
63+ |> Map . put ( field , expanded )
64+ |> merge_arguments ( arguments , dsl_state )
65+ |> merge_accept ( merge_pipeline_accepts ( accept ) )
66+ end
67+
68+ defp expand_pipe_through ( names , where , action , pipelines , dsl_state ) do
69+ Enum . reduce ( names , { [ ] , [ ] , [ ] } , fn name , { entities , arguments , accept } ->
5670 case Map . fetch ( pipelines , name ) do
5771 { :ok , pipeline } ->
58- new_entities = collect_entities ( action . type , pipeline , where , dsl_state )
72+ pipeline_entities = collect_entities ( action . type , pipeline , where , dsl_state )
5973
60- { :cont ,
61- { :ok ,
62- { [ new_entities | entities ] , [ pipeline . arguments | arguments ] ,
63- [ pipeline . accept | accept ] } } }
74+ { entities ++ pipeline_entities , arguments ++ pipeline . arguments ,
75+ accept ++ List . wrap ( if pipeline . accept == :* , do: [ :* ] , else: pipeline . accept ) }
6476
6577 :error ->
66- { :halt ,
67- { :error ,
68- DslError . exception (
69- module: Transformer . get_persisted ( dsl_state , :module ) ,
70- path: [ :actions , action . type ] ,
71- message:
72- "Action `#{ action . name } ` references pipeline `#{ name } ` via `pipe_through`, but no pipeline named `#{ name } ` exists."
73- ) } }
78+ raise DslError ,
79+ module: Transformer . get_persisted ( dsl_state , :module ) ,
80+ path: [ :actions , action . type ] ,
81+ message:
82+ "Action `#{ action . name } ` references pipeline `#{ name } ` via `pipe_through`, but no pipeline named `#{ name } ` exists."
7483 end
7584 end )
76- |> case do
77- { :ok , { entities , arguments , accept } } ->
78- merged_accept = merge_pipeline_accepts ( accept )
79-
80- { :ok ,
81- apply_to_action (
82- action ,
83- entities |> Enum . reverse ( ) |> Enum . concat ( ) ,
84- arguments |> Enum . reverse ( ) |> Enum . concat ( ) ,
85- merged_accept ,
86- dsl_state
87- ) }
88-
89- { :error , error } ->
90- { :error , error }
91- end
9285 end
9386
94- defp apply_to_action ( action , entities , arguments , accept , dsl_state ) do
95- action
96- |> prepend_entities ( entities )
97- |> merge_arguments ( arguments , dsl_state )
98- |> merge_accept ( accept )
99- end
87+ defp collect_entities ( type , pipeline , where , dsl_state ) do
88+ subject = subject_for_type ( type )
89+ entities = source_entities ( pipeline , type )
90+
91+ Enum . each ( entities , & validate_supports! ( & 1 , subject , pipeline , dsl_state ) )
10092
101- defp prepend_entities ( action , entities ) do
102- field = entity_field ( action . type )
103- Map . update! ( action , field , & ( entities ++ & 1 ) )
93+ Enum . map ( entities , fn entity ->
94+ Map . update! ( entity , :where , & ( where ++ & 1 ) )
95+ end )
10496 end
10597
10698 defp merge_arguments ( action , [ ] , _dsl_state ) , do: action
10799
108100 defp merge_arguments ( action , pipeline_arguments , dsl_state ) do
109- # Dedup pipeline arguments (multiple pipelines may define the same arg)
110101 { deduped_pipeline_args , pipeline_conflicts } =
111102 Enum . reduce ( pipeline_arguments , { % { } , [ ] } , fn arg , { seen , conflicts } ->
112103 case Map . fetch ( seen , arg . name ) do
@@ -115,7 +106,6 @@ defmodule Ash.Resource.Transformers.ResolvePipelines do
115106
116107 { :ok , existing } ->
117108 if existing . type == arg . type do
118- # same type from different pipelines — keep first
119109 { seen , conflicts }
120110 else
121111 { seen , [ { arg . name , existing . type , arg . type } | conflicts ] }
@@ -135,7 +125,6 @@ defmodule Ash.Resource.Transformers.ResolvePipelines do
135125 message: "Pipelines define argument(s) with conflicting types: #{ conflict_details } "
136126 end
137127
138- # Check pipeline args against action args
139128 action_args_by_name = Map . new ( action . arguments , & { & 1 . name , & 1 } )
140129
141130 { new_args , action_conflicts } =
@@ -172,10 +161,10 @@ defmodule Ash.Resource.Transformers.ResolvePipelines do
172161 end
173162
174163 defp merge_pipeline_accepts ( accept_lists ) do
175- if Enum . any? ( accept_lists , & ( & 1 == :* ) ) do
164+ if :* in accept_lists do
176165 :*
177166 else
178- accept_lists |> Enum . reverse ( ) |> Enum . concat ( ) |> Enum . uniq ( )
167+ Enum . uniq ( accept_lists )
179168 end
180169 end
181170
@@ -195,17 +184,6 @@ defmodule Ash.Resource.Transformers.ResolvePipelines do
195184 end
196185 end
197186
198- defp collect_entities ( type , pipeline , where , dsl_state ) do
199- subject = subject_for_type ( type )
200- entities = source_entities ( pipeline , type )
201-
202- Enum . each ( entities , & validate_supports! ( & 1 , subject , pipeline , dsl_state ) )
203-
204- Enum . map ( entities , fn entity ->
205- Map . update! ( entity , :where , & ( where ++ & 1 ) )
206- end )
207- end
208-
209187 defp validate_supports! (
210188 % Ash.Resource.Validation { module: module , opts: opts } ,
211189 subject ,
0 commit comments