Skip to content

Commit 6c3a327

Browse files
committed
[dagster-airlift][components 2/n] scaffolder
1 parent a3be2ff commit 6c3a327

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/component.py

+30
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55
from dagster._core.definitions.definitions_class import Definitions
66
from dagster.components import Component, ComponentLoadContext, Resolvable
7+
from dagster.components.component_scaffolding import scaffold_component
78
from dagster.components.resolved.context import ResolutionContext
89
from dagster.components.resolved.core_models import AssetPostProcessor
910
from dagster.components.resolved.model import Resolver
11+
from dagster.components.scaffold.scaffold import Scaffolder, ScaffoldRequest, scaffold_with
12+
from pydantic import BaseModel
1013
from typing_extensions import TypeAlias
1114

1215
import dagster_airlift.core as dg_airlift_core
@@ -28,6 +31,32 @@ class AirflowMwaaAuthBackendModel(Resolvable):
2831
type: Literal["mwaa"]
2932

3033

34+
class AirflowInstanceScaffolderParams(BaseModel):
35+
name: str
36+
auth_type: Literal["basic_auth", "mwaa"]
37+
38+
39+
class AirflowInstanceScaffolder(Scaffolder):
40+
@classmethod
41+
def get_scaffold_params(cls) -> Optional[type[BaseModel]]:
42+
return AirflowInstanceScaffolderParams
43+
44+
def scaffold(self, request: ScaffoldRequest, params: AirflowInstanceScaffolderParams) -> None:
45+
full_params = {
46+
"name": params.name,
47+
}
48+
if params.auth_type == "basic_auth":
49+
full_params["auth"] = {
50+
"type": "basic_auth",
51+
"webserver_url": '{{ env("AIRFLOW_WEBSERVER_URL") }}',
52+
"username": '{{ env("AIRFLOW_USERNAME") }}',
53+
"password": '{{ env("AIRFLOW_PASSWORD") }}',
54+
}
55+
else:
56+
raise ValueError(f"Unsupported auth type: {params.auth_type}")
57+
scaffold_component(request, full_params)
58+
59+
3160
def resolve_auth(context: ResolutionContext, model) -> AirflowAuthBackend:
3261
if model.auth.type == "basic_auth":
3362
return AirflowBasicAuthBackend(
@@ -48,6 +77,7 @@ def resolve_auth(context: ResolutionContext, model) -> AirflowAuthBackend:
4877
]
4978

5079

80+
@scaffold_with(AirflowInstanceScaffolder)
5181
@dataclass
5282
class AirflowInstanceComponent(Component, Resolvable):
5383
auth: ResolvedAirflowAuthBackend

python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_component_defs.py

+63-1
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1+
import json
2+
from pathlib import Path
3+
14
import dagster_airlift.core as dg_airlift_core
25
import pytest
6+
import yaml
7+
from click.testing import CliRunner
38
from dagster._core.test_utils import ensure_dagster_tests_import
9+
from dagster.components.cli import cli
410
from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent
511
from dagster_airlift.test import make_instance
612
from dagster_airlift.test.test_utils import asset_spec
7-
from dagster_tests.components_tests.utils import build_component_defs_for_test
13+
from dagster_tests.components_tests.utils import (
14+
build_component_defs_for_test,
15+
temp_code_location_bar,
16+
)
817

918
ensure_dagster_tests_import()
1019

@@ -68,3 +77,56 @@ def test_load_dags_basic(component_for_test: type[AirflowInstanceComponent]) ->
6877
assert keyed_spec.metadata["foo"] == "bar"
6978

7079
assert len(defs.jobs) == 3 # monitoring job + 2 dag jobs.
80+
81+
82+
def _scaffold_airlift(scaffold_format: str):
83+
runner = CliRunner()
84+
result = runner.invoke(
85+
cli,
86+
[
87+
"scaffold",
88+
"object",
89+
"dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent",
90+
"bar/components/qux",
91+
"--json-params",
92+
json.dumps({"name": "qux", "auth_type": "basic_auth"}),
93+
"--scaffold-format",
94+
scaffold_format,
95+
],
96+
)
97+
assert result.exit_code == 0
98+
99+
100+
def test_scaffold_airlift_yaml():
101+
with temp_code_location_bar():
102+
_scaffold_airlift("yaml")
103+
assert Path("bar/components/qux/component.yaml").exists()
104+
with open("bar/components/qux/component.yaml") as f:
105+
assert yaml.safe_load(f) == {
106+
"type": "dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent",
107+
"attributes": {
108+
"name": "qux",
109+
"auth": {
110+
"type": "basic_auth",
111+
"webserver_url": '{{ env("AIRFLOW_WEBSERVER_URL") }}',
112+
"username": '{{ env("AIRFLOW_USERNAME") }}',
113+
"password": '{{ env("AIRFLOW_PASSWORD") }}',
114+
},
115+
},
116+
}
117+
118+
119+
def test_scaffold_airlift_python():
120+
with temp_code_location_bar():
121+
_scaffold_airlift("python")
122+
assert Path("bar/components/qux/component.py").exists()
123+
with open("bar/components/qux/component.py") as f:
124+
file_contents = f.read()
125+
assert file_contents == (
126+
"""from dagster.components import component, ComponentLoadContext
127+
from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent
128+
129+
@component
130+
def load(context: ComponentLoadContext) -> AirflowInstanceComponent: ...
131+
"""
132+
)

0 commit comments

Comments
 (0)