|
| 1 | +import json |
| 2 | +from pathlib import Path |
| 3 | + |
1 | 4 | import dagster_airlift.core as dg_airlift_core
|
2 | 5 | import pytest
|
| 6 | +import yaml |
| 7 | +from click.testing import CliRunner |
3 | 8 | from dagster._core.test_utils import ensure_dagster_tests_import
|
| 9 | +from dagster.components.cli import cli |
4 | 10 | from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent
|
5 | 11 | from dagster_airlift.test import make_instance
|
6 | 12 | from dagster_airlift.test.test_utils import asset_spec
|
7 | 13 |
|
8 | 14 | ensure_dagster_tests_import()
|
9 |
| -from dagster_tests.components_tests.utils import build_component_defs_for_test |
| 15 | +from dagster_tests.components_tests.utils import ( |
| 16 | + build_component_defs_for_test, |
| 17 | + temp_code_location_bar, |
| 18 | +) |
10 | 19 |
|
11 | 20 |
|
12 | 21 | @pytest.fixture
|
@@ -68,3 +77,56 @@ def test_load_dags_basic(component_for_test: type[AirflowInstanceComponent]) ->
|
68 | 77 | assert keyed_spec.metadata["foo"] == "bar"
|
69 | 78 |
|
70 | 79 | assert len(defs.jobs) == 3 # monitoring job + 2 dag jobs.
|
| 80 | + |
| 81 | + |
| 82 | +def _scaffold_airlift(scaffold_format: str): |
| 83 | + runner = CliRunner() |
| 84 | + result = runner.invoke( |
| 85 | + cli, |
| 86 | + [ |
| 87 | + "scaffold", |
| 88 | + "object", |
| 89 | + "dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent", |
| 90 | + "bar/components/qux", |
| 91 | + "--json-params", |
| 92 | + json.dumps({"name": "qux", "auth_type": "basic_auth"}), |
| 93 | + "--scaffold-format", |
| 94 | + scaffold_format, |
| 95 | + ], |
| 96 | + ) |
| 97 | + assert result.exit_code == 0 |
| 98 | + |
| 99 | + |
| 100 | +def test_scaffold_airlift_yaml(): |
| 101 | + with temp_code_location_bar(): |
| 102 | + _scaffold_airlift("yaml") |
| 103 | + assert Path("bar/components/qux/component.yaml").exists() |
| 104 | + with open("bar/components/qux/component.yaml") as f: |
| 105 | + assert yaml.safe_load(f) == { |
| 106 | + "type": "dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent", |
| 107 | + "attributes": { |
| 108 | + "name": "qux", |
| 109 | + "auth": { |
| 110 | + "type": "basic_auth", |
| 111 | + "webserver_url": '{{ env("AIRFLOW_WEBSERVER_URL") }}', |
| 112 | + "username": '{{ env("AIRFLOW_USERNAME") }}', |
| 113 | + "password": '{{ env("AIRFLOW_PASSWORD") }}', |
| 114 | + }, |
| 115 | + }, |
| 116 | + } |
| 117 | + |
| 118 | + |
| 119 | +def test_scaffold_airlift_python(): |
| 120 | + with temp_code_location_bar(): |
| 121 | + _scaffold_airlift("python") |
| 122 | + assert Path("bar/components/qux/component.py").exists() |
| 123 | + with open("bar/components/qux/component.py") as f: |
| 124 | + file_contents = f.read() |
| 125 | + assert file_contents == ( |
| 126 | + """from dagster.components import component, ComponentLoadContext |
| 127 | +from dagster_airlift.core.components.airflow_instance.component import AirflowInstanceComponent |
| 128 | +
|
| 129 | +@component |
| 130 | +def load(context: ComponentLoadContext) -> AirflowInstanceComponent: ... |
| 131 | +""" |
| 132 | + ) |
0 commit comments