Skip to content

Commit 6549553

Browse files
committed
[dagster-airlift] instance component
1 parent 23d4d84 commit 6549553

File tree

10 files changed

+253
-0
lines changed

10 files changed

+253
-0
lines changed

.mcp.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"mcpServers": {}
3+
}

python_modules/libraries/dagster-airlift/dagster_airlift/core/components/__init__.py

Whitespace-only changes.

python_modules/libraries/dagster-airlift/dagster_airlift/core/components/airflow_instance/__init__.py

Whitespace-only changes.
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from collections.abc import Sequence
2+
from dataclasses import dataclass
3+
from typing import Annotated, Literal, Optional, Union
4+
5+
from dagster._core.definitions.definitions_class import Definitions
6+
from dagster.components import Component, ComponentLoadContext, Resolvable
7+
from dagster.components.resolved.context import ResolutionContext
8+
from dagster.components.resolved.core_models import AssetPostProcessor
9+
from dagster.components.resolved.model import Resolver
10+
from typing_extensions import TypeAlias
11+
12+
import dagster_airlift.core as dg_airlift_core
13+
from dagster_airlift.core.airflow_instance import AirflowAuthBackend
14+
from dagster_airlift.core.basic_auth import AirflowBasicAuthBackend
15+
from dagster_airlift.core.load_defs import build_job_based_airflow_defs
16+
17+
18+
@dataclass
19+
class AirflowBasicAuthBackendModel(Resolvable):
20+
type: Literal["basic_auth"]
21+
webserver_url: str
22+
username: str
23+
password: str
24+
25+
26+
@dataclass
27+
class AirflowMwaaAuthBackendModel(Resolvable):
28+
type: Literal["mwaa"]
29+
30+
31+
def resolve_auth(context: ResolutionContext, model) -> AirflowAuthBackend:
32+
if model.auth.type == "basic_auth":
33+
return AirflowBasicAuthBackend(
34+
webserver_url=model.auth.webserver_url,
35+
username=model.auth.username,
36+
password=model.auth.password,
37+
)
38+
else:
39+
raise ValueError(f"Unsupported auth type: {model.auth.type}")
40+
41+
42+
ResolvedAirflowAuthBackend: TypeAlias = Annotated[
43+
AirflowAuthBackend,
44+
Resolver.from_model(
45+
resolve_auth,
46+
model_field_type=Union[AirflowBasicAuthBackendModel, AirflowMwaaAuthBackendModel],
47+
),
48+
]
49+
50+
51+
@dataclass
52+
class AirflowInstanceComponent(Component, Resolvable):
53+
auth: ResolvedAirflowAuthBackend
54+
name: str
55+
asset_post_processors: Optional[Sequence[AssetPostProcessor]] = None
56+
57+
def _get_instance(self) -> dg_airlift_core.AirflowInstance:
58+
return dg_airlift_core.AirflowInstance(
59+
auth_backend=self.auth,
60+
name=self.name,
61+
)
62+
63+
def build_defs(self, context: ComponentLoadContext) -> Definitions:
64+
defs = build_job_based_airflow_defs(
65+
airflow_instance=self._get_instance(),
66+
)
67+
for post_processor in self.asset_post_processors or []:
68+
defs = post_processor(defs)
69+
return defs
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import dagster_airlift.core as dg_airlift_core
2+
import pytest
3+
from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent
4+
from dagster_airlift.test import make_instance
5+
from dagster_airlift.test.test_utils import asset_spec
6+
from dagster_tests.components_tests.utils import build_component_defs_for_test
7+
8+
9+
@pytest.fixture
10+
def component_for_test():
11+
airflow_instance = make_instance(
12+
{"dag_1": ["dag_1_task_1", "dag_1_task_2"], "dag_2": ["dag_2_task_1", "dag_2_task_2"]},
13+
dataset_construction_info=[
14+
{
15+
"uri": "s3://dataset-bucket/example1.csv",
16+
"producing_tasks": [
17+
{"dag_id": "dag_1", "task_id": "dag_1_task_1"},
18+
],
19+
"consuming_dags": ["dag_2"],
20+
},
21+
{
22+
"uri": "s3://dataset-bucket/example2.csv",
23+
"producing_tasks": [
24+
{"dag_id": "dag_2", "task_id": "dag_2_task_1"},
25+
],
26+
"consuming_dags": [],
27+
},
28+
],
29+
)
30+
31+
class DebugAirflowInstanceComponent(AirflowInstanceComponent):
32+
def _get_instance(self) -> dg_airlift_core.AirflowInstance:
33+
return airflow_instance
34+
35+
return DebugAirflowInstanceComponent
36+
37+
38+
def test_load_dags_basic(component_for_test: type[AirflowInstanceComponent]) -> None:
39+
defs = build_component_defs_for_test(
40+
component_for_test,
41+
{
42+
"auth": {
43+
"type": "basic_auth",
44+
"webserver_url": "http://localhost:8080",
45+
"username": "admin",
46+
"password": "admin",
47+
},
48+
"name": "test_instance",
49+
"asset_post_processors": [
50+
{
51+
"target": "*",
52+
"attributes": {
53+
"metadata": {
54+
"foo": "bar",
55+
},
56+
},
57+
}
58+
],
59+
},
60+
)
61+
62+
for asset_key in ["example1", "example2"]:
63+
keyed_spec = asset_spec(asset_key, defs)
64+
assert keyed_spec is not None
65+
assert keyed_spec.metadata["foo"] == "bar"
66+
67+
assert len(defs.jobs) == 3 # monitoring job + 2 dag jobs.

python_modules/libraries/dagster-airlift/kitchen-sink/Makefile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,14 @@ run_observation_defs:
3737
run_job_based_defs:
3838
dagster dev -m kitchen_sink.dagster_defs.job_based_defs -p 3333
3939

40+
run_component_defs:
41+
dagster dev -m kitchen_sink.dagster_defs.component_defs -p 3333
42+
4043
# Command to point at a workspace.yaml
4144
run_dagster_multi_code_locations:
4245
dagster dev -w $(MAKEFILE_DIR)/kitchen_sink/dagster_multi_code_locations/workspace.yaml -p 3333
4346

47+
4448
wipe: ## Wipe out all the files created by the Makefile
4549
rm -rf $(AIRFLOW_HOME) $(DAGSTER_HOME)
4650

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from dagster.components import load_defs
2+
3+
import kitchen_sink.dagster_defs.inner_component_defs as inner_component_defs
4+
5+
defs = load_defs(defs_root=inner_component_defs)

python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/dagster_defs/inner_component_defs/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
type: dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent
2+
3+
attributes:
4+
name: kitchen_sink_instance
5+
auth:
6+
type: basic_auth
7+
username: admin
8+
password: admin
9+
webserver_url: http://localhost:8080
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
from dagster._core.definitions.asset_key import AssetKey
2+
from dagster._core.definitions.metadata.metadata_value import TimestampMetadataValue
3+
from dagster._core.event_api import EventRecordsFilter
4+
from dagster._core.events import DagsterEventType
5+
from dagster._core.instance_for_test import instance_for_test
6+
from dagster._core.storage.dagster_run import DagsterRunStatus
7+
from dagster._core.storage.tags import EXTERNAL_JOB_SOURCE_TAG_KEY
8+
from dagster_airlift.constants import DAG_ID_TAG_KEY, DAG_RUN_ID_TAG_KEY
9+
from dagster_airlift.core.utils import monitoring_job_name
10+
from dagster_airlift.test.test_utils import asset_spec
11+
from kitchen_sink.airflow_instance import local_airflow_instance
12+
13+
from kitchen_sink_tests.integration_tests.conftest import (
14+
poll_for_airflow_run_existence_and_completion,
15+
)
16+
17+
18+
def test_component_based_defs(
19+
airflow_instance: None,
20+
) -> None:
21+
"""Test that component based defs load properly."""
22+
from kitchen_sink.dagster_defs.component_defs import defs
23+
24+
assert len(defs.jobs) == 20
25+
assert len(defs.assets) == 1
26+
for key in ["example1", "example2"]:
27+
assert asset_spec(key, defs)
28+
29+
# First, execute dataset producer dag
30+
af_instance = local_airflow_instance("kitchen_sink_instance")
31+
af_run_id = af_instance.trigger_dag("dataset_producer")
32+
poll_for_airflow_run_existence_and_completion(
33+
af_instance=af_instance, af_run_id=af_run_id, dag_id="dataset_producer", duration=30
34+
)
35+
36+
# Then, execute monitoring job
37+
with instance_for_test() as instance:
38+
result = defs.execute_job_in_process(
39+
monitoring_job_name(af_instance.name), instance=instance
40+
)
41+
assert result.success
42+
# There should be a run for the dataset producer dag
43+
runs = instance.get_runs()
44+
assert len(runs) == 2
45+
producer_run = next(run for run in runs if run.job_name == "dataset_producer")
46+
assert producer_run.status == DagsterRunStatus.SUCCESS
47+
assert producer_run.tags[DAG_RUN_ID_TAG_KEY] == af_run_id
48+
assert producer_run.tags[DAG_ID_TAG_KEY] == "dataset_producer"
49+
assert producer_run.tags[EXTERNAL_JOB_SOURCE_TAG_KEY] == "airflow"
50+
51+
# Check that there are asset planned events for the two assets
52+
planned_records = instance.get_event_records(
53+
event_records_filter=EventRecordsFilter(
54+
event_type=DagsterEventType.ASSET_MATERIALIZATION_PLANNED
55+
),
56+
)
57+
assert len(planned_records) == 2
58+
assert {planned_records.asset_key for planned_records in planned_records} == {
59+
AssetKey("example1"),
60+
AssetKey("example2"),
61+
}
62+
assert {planned_records.run_id for planned_records in planned_records} == {
63+
producer_run.run_id
64+
}
65+
66+
# Check that there are asset materialized events for the two assets
67+
materialized_records = instance.get_event_records(
68+
event_records_filter=EventRecordsFilter(
69+
event_type=DagsterEventType.ASSET_MATERIALIZATION
70+
),
71+
)
72+
assert len(materialized_records) == 2
73+
assert {
74+
materialized_records.asset_key for materialized_records in materialized_records
75+
} == {
76+
AssetKey("example1"),
77+
AssetKey("example2"),
78+
}
79+
assert {materialized_records.run_id for materialized_records in materialized_records} == {
80+
producer_run.run_id
81+
}
82+
for key in ["example1", "example2"]:
83+
key_record = next(
84+
materialized_records
85+
for materialized_records in materialized_records
86+
if materialized_records.asset_key == AssetKey(key)
87+
)
88+
assert key_record.asset_materialization.metadata
89+
assert key_record.asset_materialization.metadata[
90+
"my_timestamp"
91+
] == TimestampMetadataValue(value=111.0)
92+
assert key_record.asset_materialization.metadata[
93+
"my_other_timestamp"
94+
] == TimestampMetadataValue(value=113.0)
95+
# It gets overridden by the second print
96+
assert key_record.asset_materialization.metadata["foo"].value == "baz"

0 commit comments

Comments
 (0)