Skip to content

Commit 9732114

Browse files
authored
[components][rfc] Rename "ResolvableModel" to "ComponentSchema" (#27678)
## Summary & Motivation If we're going to enforce that this is always used with Components, we should probably name it something more specific. ## How I Tested These Changes ## Changelog NOCHANGELOG
1 parent b2b37a5 commit 9732114

File tree

29 files changed

+218
-216
lines changed

29 files changed

+218
-216
lines changed

docs/docs/guides/preview/components/creating-a-component.md

+4-2
Original file line numberDiff line numberDiff line change
@@ -62,13 +62,15 @@ In this case, we'll want to define a few things:
6262

6363
To simplify common use cases, `dagster-components` provides schemas for common bits of configuration:
6464

65-
- `AssetAttributesModel`: This contains attributes that are common to all assets, such as the key, description, tags, and dependencies.
66-
- `OpSpecModel`: This contains attributes specific to an underlying operation, such as the name and tags.
65+
- `AssetSpecSchema`: This contains attributes that are common to all assets, such as the key, description, tags, and dependencies.
66+
- `OpSpecSchema`: This contains attributes specific to an underlying operation, such as the name and tags.
6767

6868
We can the schema for our component and add it to our class as follows:
6969

7070
<CodeExample path="docs_beta_snippets/docs_beta_snippets/guides/components/shell-script-component/with-config-schema.py" language="python" />
7171

72+
Because the argument names in the schema match the names of the arguments in the `ShellCommandComponent` class, the `load` method will automatically populate the class with the values from the schema, and will automatically resolve the `AssetSpecSchema`s into `AssetSpec` objects.
73+
7274
## Building definitions
7375

7476
Now that we've defined how the component is parameterized, we need to define how to turn those parameters into a `Definitions` object.
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
from typing import Annotated, Optional
22

33
from dagster_components import ResolvableFieldInfo
4-
from dagster_components.core.schema.objects import AssetAttributesModel, OpSpecModel
4+
from dagster_components.core.schema.objects import AssetAttributesSchema, OpSpecSchema
55
from pydantic import BaseModel
66

77

88
class ShellScriptSchema(BaseModel):
99
script_path: str
10-
asset_attributes: AssetAttributesModel
10+
asset_attributes: AssetAttributesSchema
1111
# highlight-start
1212
script_runner: Annotated[
1313
str, ResolvableFieldInfo(required_scope={"get_script_runner"})
1414
]
1515
# highlight-end
16-
op: Optional[OpSpecModel] = None
16+
op: Optional[OpSpecSchema] = None
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from collections.abc import Sequence
2+
from typing import Annotated, Optional
3+
4+
from dagster_components import ComponentSchema, ResolutionContext, Resolver, resolver
5+
from dagster_components.core.schema.objects import AssetAttributesSchema, OpSpecSchema
6+
from pydantic import BaseModel
7+
8+
9+
class ShellScriptSchema(ComponentSchema):
10+
script_path: str
11+
asset_attributes: Sequence[AssetAttributesSchema]
12+
op: Optional[OpSpecSchema] = None
13+
14+
15+
@resolver(fromtype=ShellScriptSchema)
16+
class ShellScriptResolver(Resolver[ShellScriptSchema]):
17+
def resolve_my_object(self, context: ResolutionContext): ...
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,17 @@
1+
from collections.abc import Sequence
2+
13
from dagster_components import (
4+
AssetSpecSchema,
25
Component,
3-
ComponentLoadContext,
6+
ComponentSchema,
47
registered_component_type,
58
)
69

7-
import dagster as dg
8-
9-
10-
class ScriptRunner: ...
1110

12-
13-
def _get_script_runner(val: str) -> ScriptRunner:
14-
return ScriptRunner()
11+
class ShellCommandParams(ComponentSchema):
12+
path: str
13+
asset_specs: Sequence[AssetSpecSchema]
1514

1615

1716
@registered_component_type(name="shell_command")
18-
class ShellCommand(Component):
19-
def __init__(self, params):
20-
self.params = params
21-
22-
...
23-
24-
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
25-
# highlight-start
26-
# resolve the script runner with its required additional scope
27-
script_runner = self.params.resolve_properties(
28-
load_context.resolution_context.with_scope(
29-
get_script_runner=_get_script_runner
30-
)
31-
)["script_runner"]
32-
# highlight-end
33-
...
34-
return dg.Definitions(...)
17+
class ShellCommand(Component): ...
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,47 @@
11
import subprocess
2+
from collections.abc import Sequence
23
from typing import Optional
34

45
from dagster_components import (
6+
AssetSpecSchema,
57
Component,
68
ComponentLoadContext,
9+
ComponentSchema,
10+
OpSpecSchema,
711
registered_component_type,
812
)
9-
from dagster_components.core.schema.objects import AssetAttributesModel, OpSpecModel
10-
from pydantic import BaseModel
1113

1214
import dagster as dg
1315

1416

15-
# highlight-start
16-
class ShellScriptSchema(BaseModel):
17+
class ShellScriptSchema(ComponentSchema):
1718
script_path: str
18-
asset_attributes: AssetAttributesModel
19-
op: Optional[OpSpecModel] = None
20-
# highlight-end
19+
asset_specs: Sequence[AssetSpecSchema]
20+
op: Optional[OpSpecSchema] = None
2121

2222

2323
@registered_component_type(name="shell_command")
2424
class ShellCommand(Component):
25-
def __init__(self, params: ShellScriptSchema):
26-
self.params = params
25+
def __init__(
26+
self,
27+
script_path: str,
28+
asset_specs: Sequence[dg.AssetSpec],
29+
op: Optional[OpSpecSchema] = None,
30+
):
31+
self.script_path = script_path
32+
self.specs = asset_specs
33+
self.op = op or OpSpecSchema()
2734

2835
@classmethod
2936
def get_schema(cls) -> type[ShellScriptSchema]:
30-
# higlight-start
3137
return ShellScriptSchema
32-
# highlight-end
33-
34-
@classmethod
35-
def load(
36-
cls, params: ShellScriptSchema, load_context: ComponentLoadContext
37-
) -> "ShellCommand":
38-
return cls(params=params)
3938

4039
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions:
41-
resolved_asset_attributes = (
42-
self.params.asset_attributes.get_resolved_properties(
43-
load_context.resolution_context
44-
)
45-
)
46-
resolved_op_properties = (
47-
self.params.op.get_resolved_properties(load_context.resolution_context)
48-
if self.params.op
49-
else {}
50-
)
51-
52-
@dg.asset(**resolved_asset_attributes, **resolved_op_properties)
40+
@dg.multi_asset(specs=self.specs, op_tags=self.op.tags, name=self.op.name)
5341
def _asset(context: dg.AssetExecutionContext):
5442
self.execute(context)
5543

5644
return dg.Definitions(assets=[_asset])
5745

5846
def execute(self, context: dg.AssetExecutionContext):
59-
subprocess.run(["sh", self.params.script_path], check=False)
47+
subprocess.run(["sh", self.script_path], check=True)
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,39 @@
1+
from collections.abc import Sequence
12
from typing import Optional
23

34
from dagster_components import (
5+
AssetSpecSchema,
46
Component,
57
ComponentLoadContext,
8+
ComponentSchema,
9+
OpSpecSchema,
610
registered_component_type,
711
)
8-
from dagster_components.core.schema.objects import AssetAttributesModel, OpSpecModel
912
from pydantic import BaseModel
1013

11-
from dagster import Definitions
14+
import dagster as dg
1215

1316

14-
# highlight-start
15-
class ShellScriptSchema(BaseModel):
17+
class ShellScriptSchema(ComponentSchema):
1618
script_path: str
17-
asset_attributes: AssetAttributesModel
18-
op: Optional[OpSpecModel] = None
19-
# highlight-end
19+
asset_specs: Sequence[AssetSpecSchema]
20+
op: Optional[OpSpecSchema] = None
2021

2122

2223
@registered_component_type(name="shell_command")
2324
class ShellCommand(Component):
25+
def __init__(
26+
self,
27+
script_path: str,
28+
asset_specs: Sequence[dg.AssetSpec],
29+
op: Optional[OpSpecSchema] = None,
30+
):
31+
self.script_path = script_path
32+
self.specs = asset_specs
33+
self.op = op or OpSpecSchema()
34+
2435
@classmethod
2536
def get_schema(cls) -> type[ShellScriptSchema]:
26-
# higlight-start
2737
return ShellScriptSchema
28-
# highlight-end
2938

30-
def build_defs(self, load_context: ComponentLoadContext) -> Definitions: ...
39+
def build_defs(self, load_context: ComponentLoadContext) -> dg.Definitions: ...

examples/experimental/dagster-blueprints/dagster_blueprints/blueprint_assets_definition.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from dagster_blueprints.blueprint import Blueprint
1414

1515

16-
class AssetSpecModel(DagsterModel):
16+
class AssetSpecSchema(DagsterModel):
1717
key: str
1818
deps: Sequence[str] = []
1919
description: Optional[str] = None
@@ -36,7 +36,7 @@ def to_asset_spec(self) -> AssetSpec:
3636
class BlueprintAssetsDefinition(Blueprint):
3737
"""A blueprint that produces an AssetsDefinition."""
3838

39-
assets: Sequence[AssetSpecModel]
39+
assets: Sequence[AssetSpecSchema]
4040

4141
def build_defs(self) -> Definitions:
4242
specs = [spec_model.to_asset_spec() for spec_model in self.assets]

examples/experimental/dagster-blueprints/dagster_blueprints_tests/test_databricks_blueprint.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
materialize,
1111
)
1212
from dagster._core.pipes.client import PipesClientCompletedInvocation
13-
from dagster_blueprints.blueprint_assets_definition import AssetSpecModel
13+
from dagster_blueprints.blueprint_assets_definition import AssetSpecSchema
1414
from dagster_blueprints.databricks_blueprint import DatabricksTaskBlueprint
1515
from databricks.sdk.service import jobs
1616

@@ -57,7 +57,7 @@ def run(self, *, context, extras=None, **kwargs) -> PipesClientCompletedInvocati
5757
def test_single_databricks_task_blueprint() -> None:
5858
databricks_task_dict = make_submit_task_dict("/my/script/path.py", "/my/whl/path.whl", True)
5959
single_asset_blueprint = DatabricksTaskBlueprint(
60-
assets=[AssetSpecModel(key="asset1")], task=databricks_task_dict
60+
assets=[AssetSpecSchema(key="asset1")], task=databricks_task_dict
6161
)
6262
defs = single_asset_blueprint.build_defs()
6363
asset1 = cast(AssetsDefinition, next(iter(defs.assets or [])))
@@ -76,7 +76,7 @@ def test_single_databricks_task_blueprint() -> None:
7676
def test_single_databricks_task_blueprint_with_result() -> None:
7777
databricks_task_dict = make_submit_task_dict("/my/script/path.py", "/my/whl/path.whl", True)
7878
single_asset_blueprint = DatabricksTaskBlueprint(
79-
assets=[AssetSpecModel(key="asset1")], task=databricks_task_dict
79+
assets=[AssetSpecSchema(key="asset1")], task=databricks_task_dict
8080
)
8181
defs = single_asset_blueprint.build_defs()
8282
asset1 = cast(AssetsDefinition, next(iter(defs.assets or [])))
@@ -98,7 +98,7 @@ def test_single_databricks_task_blueprint_with_result() -> None:
9898
def test_multi_asset_databricks_task_blueprint() -> None:
9999
databricks_task_dict = make_submit_task_dict("/my/script/path.py", "/my/whl/path.whl", True)
100100
multi_asset_blueprint = DatabricksTaskBlueprint(
101-
assets=[AssetSpecModel(key="asset1"), AssetSpecModel(key="asset2")],
101+
assets=[AssetSpecSchema(key="asset1"), AssetSpecSchema(key="asset2")],
102102
task=databricks_task_dict,
103103
)
104104
defs = multi_asset_blueprint.build_defs()
@@ -118,7 +118,7 @@ def test_multi_asset_databricks_task_blueprint() -> None:
118118
def test_multi_asset_databricks_task_blueprint_with_results() -> None:
119119
databricks_task_dict = make_submit_task_dict("/my/script/path.py", "/my/whl/path.whl", True)
120120
multi_asset_blueprint = DatabricksTaskBlueprint(
121-
assets=[AssetSpecModel(key="asset1"), AssetSpecModel(key="asset2")],
121+
assets=[AssetSpecSchema(key="asset1"), AssetSpecSchema(key="asset2")],
122122
task=databricks_task_dict,
123123
)
124124
defs = multi_asset_blueprint.build_defs()
@@ -149,10 +149,10 @@ def test_multi_asset_databricks_task_blueprint_with_results() -> None:
149149

150150
def test_op_name_collisions() -> None:
151151
single_asset_blueprint1 = DatabricksTaskBlueprint(
152-
assets=[AssetSpecModel(key="asset1")], task={"placeholder": "placeholder"}
152+
assets=[AssetSpecSchema(key="asset1")], task={"placeholder": "placeholder"}
153153
)
154154
single_asset_blueprint2 = DatabricksTaskBlueprint(
155-
assets=[AssetSpecModel(key="asset2")], task={"placeholder": "placeholder"}
155+
assets=[AssetSpecSchema(key="asset2")], task={"placeholder": "placeholder"}
156156
)
157157
resources = {"pipes_databricks_client": object()}
158158
blueprint_defs = Definitions.merge(

examples/experimental/dagster-blueprints/dagster_blueprints_tests/test_shell_command_blueprint.py

+9-9
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
DATA_VERSION_TAG,
1414
)
1515
from dagster._core.pipes.subprocess import PipesSubprocessClient
16-
from dagster_blueprints.blueprint_assets_definition import AssetSpecModel
16+
from dagster_blueprints.blueprint_assets_definition import AssetSpecSchema
1717
from dagster_blueprints.shell_command_blueprint import ShellCommandBlueprint
1818

1919

@@ -29,7 +29,7 @@ def temp_script(script_fn: Callable[[], Any]) -> Iterator[str]:
2929

3030
def test_single_asset_shell_command_blueprint() -> None:
3131
single_asset_blueprint = ShellCommandBlueprint(
32-
assets=[AssetSpecModel(key="asset1")], command=["echo", '"hello"']
32+
assets=[AssetSpecSchema(key="asset1")], command=["echo", '"hello"']
3333
)
3434
defs = single_asset_blueprint.build_defs()
3535
asset1 = cast(AssetsDefinition, next(iter(defs.assets or [])))
@@ -41,7 +41,7 @@ def test_single_asset_shell_command_blueprint() -> None:
4141

4242
def test_single_asset_shell_command_blueprint_key_prefix() -> None:
4343
single_asset_blueprint = ShellCommandBlueprint(
44-
assets=[AssetSpecModel(key="prefix/asset1")], command=["echo", '"hello"']
44+
assets=[AssetSpecSchema(key="prefix/asset1")], command=["echo", '"hello"']
4545
)
4646
defs = single_asset_blueprint.build_defs()
4747
asset1 = cast(AssetsDefinition, next(iter(defs.assets or [])))
@@ -50,7 +50,7 @@ def test_single_asset_shell_command_blueprint_key_prefix() -> None:
5050

5151
def test_single_asset_shell_command_blueprint_str_command() -> None:
5252
single_asset_blueprint = ShellCommandBlueprint(
53-
assets=[AssetSpecModel(key="asset1")], command='echo "hello world"'
53+
assets=[AssetSpecSchema(key="asset1")], command='echo "hello world"'
5454
)
5555
defs = single_asset_blueprint.build_defs()
5656
asset1 = cast(AssetsDefinition, next(iter(defs.assets or [])))
@@ -74,7 +74,7 @@ def script_fn():
7474
extras = {"bar": "baz"}
7575
with temp_script(script_fn) as script_path:
7676
single_asset_blueprint = ShellCommandBlueprint(
77-
assets=[AssetSpecModel(key="asset1")],
77+
assets=[AssetSpecSchema(key="asset1")],
7878
command=[cast(str, shutil.which("python")), script_path],
7979
extras=extras,
8080
)
@@ -99,7 +99,7 @@ def script_fn():
9999

100100
def test_multi_asset_shell_command_blueprint() -> None:
101101
multi_asset_blueprint = ShellCommandBlueprint(
102-
assets=[AssetSpecModel(key="asset1"), AssetSpecModel(key="asset2")],
102+
assets=[AssetSpecSchema(key="asset1"), AssetSpecSchema(key="asset2")],
103103
command=["echo", '"hello"'],
104104
)
105105
defs = multi_asset_blueprint.build_defs()
@@ -121,7 +121,7 @@ def script_fn():
121121

122122
with temp_script(script_fn) as script_path:
123123
multi_asset_blueprint = ShellCommandBlueprint(
124-
assets=[AssetSpecModel(key="asset1"), AssetSpecModel(key="asset2")],
124+
assets=[AssetSpecSchema(key="asset1"), AssetSpecSchema(key="asset2")],
125125
command=[cast(str, shutil.which("python")), script_path],
126126
)
127127

@@ -146,10 +146,10 @@ def script_fn():
146146

147147
def test_op_name_collisions() -> None:
148148
single_asset_blueprint1 = ShellCommandBlueprint(
149-
assets=[AssetSpecModel(key="asset1")], command=["echo", '"hello"']
149+
assets=[AssetSpecSchema(key="asset1")], command=["echo", '"hello"']
150150
)
151151
single_asset_blueprint2 = ShellCommandBlueprint(
152-
assets=[AssetSpecModel(key="asset2")], command=["echo", '"hello"']
152+
assets=[AssetSpecSchema(key="asset2")], command=["echo", '"hello"']
153153
)
154154
resources = {"pipes_subprocess_client": PipesSubprocessClient()}
155155
blueprint_defs = Definitions.merge(

python_modules/libraries/dagster-components/dagster_components/__init__.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
ComponentScaffoldRequest as ComponentScaffoldRequest,
1414
DefaultComponentScaffolder as DefaultComponentScaffolder,
1515
)
16-
from dagster_components.core.schema.base import ResolvableModel as ResolvableModel
16+
from dagster_components.core.schema.base import ComponentSchema as ComponentSchema
1717
from dagster_components.core.schema.context import ResolutionContext as ResolutionContext
1818
from dagster_components.core.schema.metadata import ResolvableFieldInfo as ResolvableFieldInfo
1919
from dagster_components.core.schema.objects import (
20-
AssetAttributesModel as AssetAttributesModel,
21-
AssetSpecTransformModel as AssetSpecTransformModel,
22-
OpSpecModel as OpSpecModel,
20+
AssetAttributesSchema as AssetAttributesSchema,
21+
AssetSpecSchema as AssetSpecSchema,
22+
AssetSpecTransformSchema as AssetSpecTransformSchema,
23+
OpSpecSchema as OpSpecSchema,
24+
Resolver as Resolver,
25+
resolver as resolver,
2326
)
2427
from dagster_components.version import __version__ as __version__

0 commit comments

Comments
 (0)