Skip to content

Commit f94e74f

Browse files
committed
[dagster-airlift][components 2/n] scaffolder
1 parent ee04ae2 commit f94e74f

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/component.py

+30
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@
44

55
from dagster._core.definitions.definitions_class import Definitions
66
from dagster.components import Component, ComponentLoadContext, Resolvable
7+
from dagster.components.component_scaffolding import scaffold_component
78
from dagster.components.resolved.context import ResolutionContext
89
from dagster.components.resolved.core_models import AssetPostProcessor
910
from dagster.components.resolved.model import Resolver
11+
from dagster.components.scaffold.scaffold import Scaffolder, ScaffoldRequest, scaffold_with
12+
from pydantic import BaseModel
1013
from typing_extensions import TypeAlias
1114

1215
import dagster_airlift.core as dg_airlift_core
@@ -28,6 +31,32 @@ class AirflowMwaaAuthBackendModel(Resolvable):
2831
type: Literal["mwaa"]
2932

3033

34+
class AirflowInstanceScaffolderParams(BaseModel):
35+
name: str
36+
auth_type: Literal["basic_auth", "mwaa"]
37+
38+
39+
class AirflowInstanceScaffolder(Scaffolder):
40+
@classmethod
41+
def get_scaffold_params(cls) -> Optional[type[BaseModel]]:
42+
return AirflowInstanceScaffolderParams
43+
44+
def scaffold(self, request: ScaffoldRequest, params: AirflowInstanceScaffolderParams) -> None:
45+
full_params = {
46+
"name": params.name,
47+
}
48+
if params.auth_type == "basic_auth":
49+
full_params["auth"] = {
50+
"type": "basic_auth",
51+
"webserver_url": '{{ env("AIRFLOW_WEBSERVER_URL") }}',
52+
"username": '{{ env("AIRFLOW_USERNAME") }}',
53+
"password": '{{ env("AIRFLOW_PASSWORD") }}',
54+
}
55+
else:
56+
raise ValueError(f"Unsupported auth type: {params.auth_type}")
57+
scaffold_component(request, full_params)
58+
59+
3160
def resolve_auth(context: ResolutionContext, model) -> AirflowAuthBackend:
3261
if model.auth.type == "basic_auth":
3362
return AirflowBasicAuthBackend(
@@ -48,6 +77,7 @@ def resolve_auth(context: ResolutionContext, model) -> AirflowAuthBackend:
4877
]
4978

5079

80+
@scaffold_with(AirflowInstanceScaffolder)
5181
@dataclass
5282
class AirflowInstanceComponent(Component, Resolvable):
5383
auth: ResolvedAirflowAuthBackend

python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_component_defs.py

+63-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
1+
import json
2+
from pathlib import Path
3+
14
import dagster_airlift.core as dg_airlift_core
25
import pytest
6+
import yaml
7+
from click.testing import CliRunner
8+
from dagster.components.cli import cli
39
from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent
410
from dagster_airlift.test import make_instance
511
from dagster_airlift.test.test_utils import asset_spec
6-
from dagster_tests.components_tests.utils import build_component_defs_for_test
12+
from dagster_tests.components_tests.utils import (
13+
build_component_defs_for_test,
14+
temp_code_location_bar,
15+
)
716

817

918
@pytest.fixture
@@ -65,3 +74,56 @@ def test_load_dags_basic(component_for_test: type[AirflowInstanceComponent]) ->
6574
assert keyed_spec.metadata["foo"] == "bar"
6675

6776
assert len(defs.jobs) == 3 # monitoring job + 2 dag jobs.
77+
78+
79+
def _scaffold_airlift(scaffold_format: str):
80+
runner = CliRunner()
81+
result = runner.invoke(
82+
cli,
83+
[
84+
"scaffold",
85+
"object",
86+
"dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent",
87+
"bar/components/qux",
88+
"--json-params",
89+
json.dumps({"name": "qux", "auth_type": "basic_auth"}),
90+
"--scaffold-format",
91+
scaffold_format,
92+
],
93+
)
94+
assert result.exit_code == 0
95+
96+
97+
def test_scaffold_airlift_yaml():
98+
with temp_code_location_bar():
99+
_scaffold_airlift("yaml")
100+
assert Path("bar/components/qux/component.yaml").exists()
101+
with open("bar/components/qux/component.yaml") as f:
102+
assert yaml.safe_load(f) == {
103+
"type": "dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent",
104+
"attributes": {
105+
"name": "qux",
106+
"auth": {
107+
"type": "basic_auth",
108+
"webserver_url": '{{ env("AIRFLOW_WEBSERVER_URL") }}',
109+
"username": '{{ env("AIRFLOW_USERNAME") }}',
110+
"password": '{{ env("AIRFLOW_PASSWORD") }}',
111+
},
112+
},
113+
}
114+
115+
116+
def test_scaffold_airlift_python():
117+
with temp_code_location_bar():
118+
_scaffold_airlift("python")
119+
assert Path("bar/components/qux/component.py").exists()
120+
with open("bar/components/qux/component.py") as f:
121+
file_contents = f.read()
122+
assert file_contents == (
123+
"""from dagster.components import component, ComponentLoadContext
124+
from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent
125+
126+
@component
127+
def load(context: ComponentLoadContext) -> AirflowInstanceComponent: ...
128+
"""
129+
)

0 commit comments

Comments
 (0)