1
1
import inspect
2
- from abc import abstractmethod
3
- from collections .abc import Mapping , Sequence
2
+ from collections .abc import Iterator , Mapping , Sequence
4
3
from dataclasses import dataclass
5
4
from pathlib import Path
6
5
from typing import Any , Optional , TypeVar
@@ -45,34 +44,33 @@ class ComponentFileModel(BaseModel):
45
44
requirements : Optional [ComponentRequirementsModel ] = None
46
45
47
46
48
- #########
49
- # MODULES
50
- #########
51
-
52
-
53
- @dataclass
54
- class DefsModuleComponent (Component ):
55
- path : Path
56
-
57
- @classmethod
58
- def from_context (cls , context : ComponentLoadContext ) -> Optional ["DefsModuleComponent" ]:
59
- """Attempts to load a component from the given context. Iterates through potential component
60
- type matches, prioritizing more specific types: YAML, Python, plain Dagster defs, and component
61
- folder.
62
- """
63
- # this defines the priority of the module types
64
- module_types = (
65
- WrappedYamlComponent ,
66
- WrappedPythonicComponent ,
67
- DagsterDefsComponent ,
68
- DefsFolderComponent ,
47
+ def get_component (context : ComponentLoadContext ) -> Optional [Component ]:
48
+ """Attempts to load a component from the given context. Iterates through potential component
49
+ type matches, prioritizing more specific types: YAML, Python, plain Dagster defs, and component
50
+ folder.
51
+ """
52
+ # in priority order
53
+ # yaml component
54
+ if (context .path / "component.yaml" ).exists ():
55
+ return load_yaml_component (context )
56
+ # pythonic component
57
+ elif (context .path / "component.py" ).exists ():
58
+ return load_pythonic_component (context )
59
+ # defs
60
+ elif (context .path / "definitions.py" ).exists ():
61
+ return DagsterDefsComponent (path = context .path / "definitions.py" )
62
+ elif context .path .suffix == ".py" :
63
+ return DagsterDefsComponent (path = context .path )
64
+ # folder
65
+ elif context .path .is_dir ():
66
+ children = _crawl (context )
67
+ return DefsFolderComponent (
68
+ path = context .path ,
69
+ children = children ,
70
+ asset_post_processors = None ,
69
71
)
70
- module_filter = filter (None , (cls .from_context (context ) for cls in module_types ))
71
- return next (module_filter , None )
72
72
73
- @classmethod
74
- def load (cls , attributes : Any , context : ComponentLoadContext ) -> "DefsModuleComponent" :
75
- return check .not_none (cls .from_context (context ))
73
+ return None
76
74
77
75
78
76
@dataclass
@@ -81,15 +79,16 @@ class DefsFolderComponentYamlSchema(Resolvable):
81
79
82
80
83
81
@dataclass
84
- class DefsFolderComponent (DefsModuleComponent ):
82
+ class DefsFolderComponent (Component ):
85
83
"""A folder which may contain multiple submodules, each
86
84
which define components.
87
85
88
86
Optionally enables postprocessing to modify the Dagster definitions
89
87
produced by submodules.
90
88
"""
91
89
92
- submodules : Sequence [DefsModuleComponent ]
90
+ path : Path
91
+ children : Mapping [Path , Component ]
93
92
asset_post_processors : Optional [Sequence [AssetPostProcessor ]]
94
93
95
94
@classmethod
@@ -105,59 +104,54 @@ def load(cls, attributes: Any, context: ComponentLoadContext) -> "DefsFolderComp
105
104
context .resolution_context .at_path ("attributes" ),
106
105
attributes ,
107
106
)
108
- submodules = check . not_none ( cls . submodules_from_context ( context ))
107
+
109
108
return DefsFolderComponent (
110
109
path = context .path ,
111
- submodules = submodules ,
110
+ children = _crawl ( context ) ,
112
111
asset_post_processors = resolved_attributes .asset_post_processors ,
113
112
)
114
113
115
- @classmethod
116
- def submodules_from_context (
117
- cls , context : ComponentLoadContext
118
- ) -> Optional [Sequence [DefsModuleComponent ]]:
119
- if not context .path .is_dir ():
120
- return None
121
- submodules = (
122
- DefsModuleComponent .from_context (context .for_path (subpath ))
123
- for subpath in context .path .iterdir ()
124
- )
125
- return list (filter (None , submodules ))
126
-
127
- @classmethod
128
- def from_context (cls , context : ComponentLoadContext ) -> Optional ["DefsFolderComponent" ]:
129
- submodules = cls .submodules_from_context (context )
130
- if submodules is None :
131
- return None
132
- return DefsFolderComponent (
133
- path = context .path , submodules = submodules , asset_post_processors = None
134
- )
135
-
136
114
def build_defs (self , context : ComponentLoadContext ) -> Definitions :
137
115
defs = Definitions .merge (
138
- * (
139
- submodule .build_defs (context .for_path (submodule .path ))
140
- for submodule in self .submodules
141
- )
116
+ * (child .build_defs (context .for_path (path )) for path , child in self .children .items ())
142
117
)
143
118
for post_processor in self .asset_post_processors or []:
144
119
defs = post_processor (defs )
145
120
return defs
146
121
122
+ @classmethod
123
+ def get (cls , context : ComponentLoadContext ) -> "DefsFolderComponent" :
124
+ component = get_component (context )
125
+ return check .inst (
126
+ component ,
127
+ DefsFolderComponent ,
128
+ f"Expected DefsFolderComponent at { context .path } , got { component } ." ,
129
+ )
130
+
131
+ def iterate_components (self ) -> Iterator [Component ]:
132
+ for component in self .children .values ():
133
+ if isinstance (component , DefsFolderComponent ):
134
+ yield from component .iterate_components ()
135
+
136
+ yield component
137
+
138
+
139
+ def _crawl (context : ComponentLoadContext ) -> Mapping [Path , Component ]:
140
+ found = {}
141
+ for subpath in context .path .iterdir ():
142
+ component = get_component (context .for_path (subpath ))
143
+ if component :
144
+ found [subpath ] = component
145
+ return found
146
+
147
147
148
148
@dataclass
149
- class DagsterDefsComponent (DefsModuleComponent ):
149
+ class DagsterDefsComponent (Component ):
150
150
"""A Python module containing Dagster definitions. Used for implicit loading of
151
151
Dagster definitions from Python files in the defs folder.
152
152
"""
153
153
154
- @classmethod
155
- def from_context (cls , context : ComponentLoadContext ) -> Optional ["DagsterDefsComponent" ]:
156
- if (context .path / "definitions.py" ).exists ():
157
- return DagsterDefsComponent (path = context .path / "definitions.py" )
158
- elif context .path .suffix == ".py" :
159
- return DagsterDefsComponent (path = context .path )
160
- return None
154
+ path : Path
161
155
162
156
def build_defs (self , context : ComponentLoadContext ) -> Definitions :
163
157
module = context .load_defs_relative_python_module (self .path )
@@ -173,103 +167,57 @@ def build_defs(self, context: ComponentLoadContext) -> Definitions:
173
167
)
174
168
175
169
176
- @dataclass
177
- class WrappedDefsModuleComponent (DefsModuleComponent ):
178
- """A module containing a component definition. ABC implemented by
179
- WrappedPythonicComponent and WrappedYamlComponent.
180
- """
181
-
182
- wrapped : Component
183
-
184
- @staticmethod
185
- @abstractmethod
186
- def get_component_def_path (path : Path ) -> Path : ...
187
-
188
- @classmethod
189
- @abstractmethod
190
- def get_component (cls , context : ComponentLoadContext ) -> Component : ...
191
-
192
- @classmethod
193
- def from_context (cls , context : ComponentLoadContext ) -> Optional ["WrappedDefsModuleComponent" ]:
194
- if cls .get_component_def_path (context .path ).exists ():
195
- return cls (path = context .path , wrapped = cls .get_component (context ))
196
- return None
197
-
198
- def build_defs (self , context : ComponentLoadContext ) -> Definitions :
199
- return self .wrapped .build_defs (context )
200
-
201
-
202
- @dataclass
203
- class WrappedPythonicComponent (WrappedDefsModuleComponent ):
204
- """A module which contains a component definition specified in Python
205
- with an @component decorator.
206
- """
207
-
208
- @staticmethod
209
- def get_component_def_path (path : Path ) -> Path :
210
- return path / "component.py"
211
-
212
- @classmethod
213
- def get_component (cls , context ) -> Component :
214
- module = context .load_defs_relative_python_module (cls .get_component_def_path (context .path ))
215
- component_loaders = list (inspect .getmembers (module , is_component_loader ))
216
- if len (component_loaders ) == 0 :
217
- raise DagsterInvalidDefinitionError ("No component loaders found in module" )
218
- elif len (component_loaders ) == 1 :
219
- _ , component_loader = component_loaders [0 ]
220
- return WrappedPythonicComponent (path = context .path , wrapped = component_loader (context ))
221
- else :
222
- raise DagsterInvalidDefinitionError (
223
- f"Multiple component loaders found in module: { component_loaders } "
224
- )
225
-
226
-
227
- @dataclass
228
- class WrappedYamlComponent (WrappedDefsModuleComponent ):
229
- """A module containing a component definition specified in a component.yaml file."""
170
+ def load_pythonic_component (context : ComponentLoadContext ) -> Component :
171
+ module = context .load_defs_relative_python_module (context .path / "component.py" )
172
+ component_loaders = list (inspect .getmembers (module , is_component_loader ))
173
+ if len (component_loaders ) == 0 :
174
+ raise DagsterInvalidDefinitionError ("No component loaders found in module" )
175
+ elif len (component_loaders ) == 1 :
176
+ _ , component_loader = component_loaders [0 ]
177
+ return component_loader (context )
178
+ else :
179
+ raise DagsterInvalidDefinitionError (
180
+ f"Multiple component loaders found in module: { component_loaders } "
181
+ )
230
182
231
- @staticmethod
232
- def get_component_def_path (path : Path ) -> Path :
233
- return path / "component.yaml"
234
183
235
- @classmethod
236
- def get_component (cls , context : ComponentLoadContext ) -> Component :
237
- # parse the yaml file
238
- component_def_path = cls .get_component_def_path (context .path )
239
- source_tree = parse_yaml_with_source_positions (
240
- component_def_path .read_text (), str (component_def_path )
241
- )
242
- component_file_model = _parse_and_populate_model_with_annotated_errors (
243
- cls = ComponentFileModel , obj_parse_root = source_tree , obj_key_path_prefix = []
184
+ def load_yaml_component (context : ComponentLoadContext ) -> Component :
185
+ # parse the yaml file
186
+ component_def_path = context .path / "component.yaml"
187
+ source_tree = parse_yaml_with_source_positions (
188
+ component_def_path .read_text (), str (component_def_path )
189
+ )
190
+ component_file_model = _parse_and_populate_model_with_annotated_errors (
191
+ cls = ComponentFileModel , obj_parse_root = source_tree , obj_key_path_prefix = []
192
+ )
193
+
194
+ # find the component type
195
+ type_str = context .normalize_component_type_str (component_file_model .type )
196
+ key = PluginObjectKey .from_typename (type_str )
197
+ obj = load_package_object (key )
198
+ if not isinstance (obj , type ) or not issubclass (obj , Component ):
199
+ raise DagsterInvalidDefinitionError (
200
+ f"Component type { type_str } is of type { type (obj )} , but must be a subclass of dagster.Component"
244
201
)
245
202
246
- # find the component type
247
- type_str = context .normalize_component_type_str (component_file_model .type )
248
- key = PluginObjectKey .from_typename (type_str )
249
- obj = load_package_object (key )
250
- if not isinstance (obj , type ) or not issubclass (obj , Component ):
251
- raise DagsterInvalidDefinitionError (
252
- f"Component type { type_str } is of type { type (obj )} , but must be a subclass of dagster.Component"
253
- )
254
-
255
- model_cls = obj .get_model_cls ()
256
- context = context .with_rendering_scope (
257
- obj .get_additional_scope ()
258
- ).with_source_position_tree (source_tree .source_position_tree )
259
-
260
- # grab the attributes from the yaml file
261
- with pushd (str (context .path )):
262
- if model_cls is None :
263
- attributes = None
264
- elif source_tree :
265
- attributes_position_tree = source_tree .source_position_tree .children ["attributes" ]
266
- with enrich_validation_errors_with_source_position (
267
- attributes_position_tree , ["attributes" ]
268
- ):
269
- attributes = TypeAdapter (model_cls ).validate_python (
270
- component_file_model .attributes
271
- )
272
- else :
203
+ model_cls = obj .get_model_cls ()
204
+ context = context .with_rendering_scope (
205
+ obj .get_additional_scope (),
206
+ ).with_source_position_tree (
207
+ source_tree .source_position_tree ,
208
+ )
209
+
210
+ # grab the attributes from the yaml file
211
+ with pushd (str (context .path )):
212
+ if model_cls is None :
213
+ attributes = None
214
+ elif source_tree :
215
+ attributes_position_tree = source_tree .source_position_tree .children ["attributes" ]
216
+ with enrich_validation_errors_with_source_position (
217
+ attributes_position_tree , ["attributes" ]
218
+ ):
273
219
attributes = TypeAdapter (model_cls ).validate_python (component_file_model .attributes )
220
+ else :
221
+ attributes = TypeAdapter (model_cls ).validate_python (component_file_model .attributes )
274
222
275
- return obj .load (attributes , context )
223
+ return obj .load (attributes , context )
0 commit comments