Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
736b731
[dagster-airlift] fix prop on AirflowDefinitionsData
dpeng817 Apr 9, 2025
e7ac036
Move around classes
dpeng817 Apr 9, 2025
9b79aca
[dagster-airlift] datasets storage layer, AirflowInstance retrieval m…
dpeng817 Apr 9, 2025
0cbd753
[dagster-airlift] auto-construct assets from datasets
dpeng817 Apr 9, 2025
c1b22dd
[dagster-airlift] dataset filtering
dpeng817 Apr 10, 2025
80e0d3d
[dagster-airlift] move assets def construction to common code path; o…
dpeng817 Apr 10, 2025
c291a5c
Dag job def
dpeng817 Apr 10, 2025
c5f8ffd
address feedback
dpeng817 Apr 18, 2025
b2cd82e
[dagster-airlift] make AirflowDefinitionsData aware of jobs
dpeng817 Apr 11, 2025
167e407
[dagster-airlift][jobs 5/n] Definitions.execute_job_in_process
dpeng817 Apr 11, 2025
ed73075
[dagster-airlift][jobs 5/n] Airflow monitoring job
dpeng817 Apr 15, 2025
8dfb8f3
[dagster-airlift][jobs 6/n] jobs UI
dpeng817 Apr 15, 2025
0fb510f
Move pipes external metadata processing to its own layer.
dpeng817 Apr 15, 2025
0d6fbe8
[dagster-airlift] log retrieval API
dpeng817 Apr 15, 2025
294f6e1
[dagster-airlift] Ingest metadata from airflow logs
dpeng817 Apr 15, 2025
03fdd28
[dagster-airlift] instance component
dpeng817 Apr 16, 2025
baf0d66
[dagster-airlift][components 2/n] scaffolder
dpeng817 Apr 16, 2025
228618d
[dagster-airlift] make monitor job range configurable
dpeng817 Apr 17, 2025
602b1ed
Add history import run method
dpeng817 Apr 17, 2025
14e954c
Make runs obey event log entry timestamp
dpeng817 Apr 17, 2025
6e3c6f7
DagsterInstance.report_dagster_event timestamp override
dpeng817 Apr 18, 2025
cbc50ed
[dagster-airlift] apply storage changes to airlift
dpeng817 Apr 17, 2025
24dac4e
Allow for external jobs
dpeng817 Apr 25, 2025
9a3d7d8
Mapped assets components
dpeng817 Apr 25, 2025
f8d7da8
Handle migrated runs
dpeng817 Apr 26, 2025
361656e
Full components demo
dpeng817 Apr 28, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mcp.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"mcpServers": {}
}
24 changes: 24 additions & 0 deletions examples/starlift-demo/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export DBT_PROFILES_DIR := $(MAKEFILE_DIR)/dbt_example/shared/dbt
export DAGSTER_URL := http://localhost:3333
export AIRLIFT_MODULE_DIR := $(MAKEFILE_DIR)/../../python_modules/libraries/dagster-airlift
export DLIFT_MODULE_DIR := $(MAKEFILE_DIR)/../experimental/dagster-dlift
export COMPONENTS_DIR := $(MAKEFILE_DIR)/dbt_example/components

help:
@egrep -h '\s##\s' $(MAKEFILE_LIST) | sort | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-20s\033[0m %s\n", $$1, $$2}'
Expand Down Expand Up @@ -42,10 +43,12 @@ setup_local_env:
not_proxied:
chmod +x $(AIRLIFT_MODULE_DIR)/scripts/find_and_replace_in_yaml_dir.sh
$(AIRLIFT_MODULE_DIR)/scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/migrating_airflow_dags/proxied_state True False
AIRFLOW_HOME=$(MIGRATING_AIRFLOW_HOME) airflow dags reserialize

proxied:
chmod +x $(AIRLIFT_MODULE_DIR)/scripts/find_and_replace_in_yaml_dir.sh
$(AIRLIFT_MODULE_DIR)/scripts/find_and_replace_in_yaml_dir.sh $(MAKEFILE_DIR)/dbt_example/migrating_airflow_dags/proxied_state False True
AIRFLOW_HOME=$(MIGRATING_AIRFLOW_HOME) airflow dags reserialize

run_migrating_airflow:
AIRFLOW_HOME=$(MIGRATING_AIRFLOW_HOME) airflow standalone
Expand Down Expand Up @@ -80,6 +83,27 @@ run_federated_airflow_defs_2:
run_dbt_cloud_defs:
dagster dev -m dbt_example.dagster_defs.dbt_cloud_airflow -p 3333

run_peer_component: not_proxied
rm -f $(COMPONENTS_DIR)/inner/lakehouse/component.yaml
rm -f $(COMPONENTS_DIR)/inner/dbt_project/component.yaml
cp $(COMPONENTS_DIR)/inner/peer-component.yaml $(COMPONENTS_DIR)/inner/component.yaml
dagster dev -m dbt_example.components.defs -p 3333 --verbose

run_observe_component: not_proxied
rm -f $(COMPONENTS_DIR)/inner/lakehouse/component.yaml
rm -f $(COMPONENTS_DIR)/inner/dbt_project/component.yaml
cp -f $(COMPONENTS_DIR)/inner/dbt_project/_component.yaml $(COMPONENTS_DIR)/inner/dbt_project/component.yaml
cp -f $(COMPONENTS_DIR)/inner/observe-component.yaml $(COMPONENTS_DIR)/inner/component.yaml
dagster dev -m dbt_example.components.defs -p 3333 --verbose

run_migrate_component: proxied
rm -f $(COMPONENTS_DIR)/inner/lakehouse/component.yaml
rm -f $(COMPONENTS_DIR)/inner/dbt_project/component.yaml
cp -f $(COMPONENTS_DIR)/inner/lakehouse/_component.yaml $(COMPONENTS_DIR)/inner/lakehouse/component.yaml
cp -f $(COMPONENTS_DIR)/inner/dbt_project/_component.yaml $(COMPONENTS_DIR)/inner/dbt_project/component.yaml
cp -f $(COMPONENTS_DIR)/inner/migrate-component.yaml $(COMPONENTS_DIR)/inner/component.yaml
dagster dev -m dbt_example.components.defs -p 3333 --verbose

wipe: ## Wipe out all the files created by the Makefile
rm -rf $(MIGRATING_AIRFLOW_HOME) $(FEDERATED_AIRFLOW_HOME_1) $(FEDERATED_AIRFLOW_HOME_2) $(DAGSTER_HOME)

Expand Down
Empty file.
5 changes: 5 additions & 0 deletions examples/starlift-demo/dbt_example/components/defs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from dagster.components import load_defs

from dbt_example.components import inner as inner

defs = load_defs(defs_root=inner)
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
type: dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent

attributes:
name: ...
auth: ...
33 changes: 33 additions & 0 deletions examples/starlift-demo/dbt_example/components/inner/component.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
type: dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent

attributes:
name: my_airflow_instance
auth:
type: basic_auth
webserver_url: http://localhost:8080
username: admin
password: admin
mappings:
- dag_id: rebuild_iris_models
task_mappings:
- task_id: load_iris
assets:
- by_key: lakehouse/iris
- task_id: build_dbt_models
assets:
- by_key: raw_customers
- by_key: raw_orders
- by_key: raw_payments
- by_key: stg_customers
- by_key: stg_orders
- by_key: stg_payments
- by_key: customers
- by_key: orders
- by_key: iris_setosa
checks:
- type: metadata_bounds
target: lakehouse/iris
metadata_key: sepal_length_cm_min
min_value: 1.0
max_value: 10.0

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: dagster_dbt.DbtProjectComponent

attributes:
project: ../../../shared/dbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type: dagster_dbt.DbtProjectComponent

attributes:
project: ../../../shared/dbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
type: dbt_example.components.types.LakehouseComponent

attributes:
csv_path: ../../../shared/iris.csv
duckdb_path: ../../../shared/jaffle_shop.duckdb
columns:
- sepal_length_cm
- sepal_width_cm
- petal_length_cm
- petal_width_cm
- species
columns_for_min:
- sepal_length_cm
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
type: dbt_example.components.types.LakehouseComponent

attributes:
csv_path: ../../../shared/iris.csv
duckdb_path: ../../../shared/jaffle_shop.duckdb
columns:
- sepal_length_cm
- sepal_width_cm
- petal_length_cm
- petal_width_cm
- species
columns_for_min:
- sepal_length_cm
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
type: dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent

attributes:
name: my_airflow_instance
auth:
type: basic_auth
webserver_url: http://localhost:8080
username: admin
password: admin
mappings:
- dag_id: rebuild_iris_models
task_mappings:
- task_id: load_iris
assets:
- by_key: lakehouse/iris
- task_id: build_dbt_models
assets:
- by_key: raw_customers
- by_key: raw_orders
- by_key: raw_payments
- by_key: stg_customers
- by_key: stg_orders
- by_key: stg_payments
- by_key: customers
- by_key: orders
- by_key: iris_setosa
checks:
- type: metadata_bounds
target: lakehouse/iris
metadata_key: sepal_length_cm_min
min_value: 1.0
max_value: 10.0

Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
type: dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent

attributes:
name: my_airflow_instance
auth:
type: basic_auth
webserver_url: http://localhost:8080
username: admin
password: admin
mappings:
- dag_id: rebuild_iris_models
task_mappings:
- task_id: build_dbt_models
assets:
- by_key: raw_customers
- by_key: raw_orders
- by_key: raw_payments
- by_key: stg_customers
- by_key: stg_orders
- by_key: stg_payments
- by_key: customers
- by_key: orders
- by_key: iris_setosa
checks:
- type: metadata_bounds
target: lakehouse/iris
metadata_key: sepal_length_cm_min
min_value: 1.0
max_value: 10.0
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
type: dagster_airlift.core.components.airflow_instance.component.AirflowInstanceComponent

attributes:
name: my_airflow_instance
auth:
type: basic_auth
webserver_url: http://localhost:8080
username: admin
password: admin
checks:
- type: metadata_bounds
target: lakehouse/iris
metadata_key: sepal_length_cm_min
min_value: 1.0
max_value: 10.0
54 changes: 54 additions & 0 deletions examples/starlift-demo/dbt_example/components/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from dataclasses import dataclass
from pathlib import Path

from dagster import AssetKey, AssetSpec, multi_asset
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.events import AssetMaterialization
from dagster.components import Component, Resolvable
from dagster.components.core.context import ComponentLoadContext

from dbt_example.shared.lakehouse_utils import get_min_value, id_from_path, load_csv_to_duckdb


def lakehouse_asset_key(*, csv_path) -> AssetKey:
return AssetKey(["lakehouse", id_from_path(csv_path)])


@dataclass
class LakehouseComponent(Component, Resolvable):
csv_path: str
duckdb_path: str
columns: list[str]
columns_for_min: list[str]

def build_defs(self, context: ComponentLoadContext) -> Definitions:
full_csv_path = context.path / self.csv_path
assert full_csv_path.exists()
full_duckdb_path = context.path / self.duckdb_path
assert full_duckdb_path.exists()

@multi_asset(
specs=[
AssetSpec(key=lakehouse_asset_key(csv_path=Path(self.csv_path)), kinds=["duckdb"])
],
)
def _multi_asset():
load_csv_to_duckdb(
csv_path=full_csv_path,
db_path=full_duckdb_path,
columns=self.columns,
)
res = {}
for column in self.columns_for_min:
min_value = get_min_value(
db_path=full_duckdb_path,
csv_path=full_csv_path,
column=column,
)
res[f"{column}_min"] = min_value
yield AssetMaterialization(
asset_key=lakehouse_asset_key(csv_path=Path(self.csv_path)),
metadata=res,
)

return Definitions(assets=[_multi_asset])
29 changes: 27 additions & 2 deletions examples/starlift-demo/dbt_example/migrating_airflow_dags/dags.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import json
import os
from pathlib import Path

from airflow import DAG
from airflow.models.dataset import Dataset
from airflow.models.operator import BaseOperator
from airflow.operators.bash import BashOperator
from dagster._time import get_current_datetime
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
from dbt_example.shared.lakehouse_utils import load_csv_to_duckdb
from dbt_example.shared.lakehouse_utils import get_min_value, load_csv_to_duckdb
from dbt_example.shared.load_iris import CSV_PATH, DB_PATH, IRIS_COLUMNS

default_args = {
Expand All @@ -19,10 +21,19 @@


class LoadToLakehouseOperator(BaseOperator):
def __init__(self, csv_path: Path, db_path: Path, columns: list[str], *args, **kwargs):
def __init__(
self,
csv_path: Path,
db_path: Path,
columns: list[str],
columns_for_min: list[str],
*args,
**kwargs,
):
self._csv_path = csv_path
self._db_path = db_path
self._column_names = columns
self._columns_for_min = columns_for_min
super().__init__(*args, **kwargs)

def execute(self, context) -> None:
Expand All @@ -31,6 +42,18 @@ def execute(self, context) -> None:
db_path=self._db_path,
columns=self._column_names,
)
for column in self._columns_for_min:
min_value = get_min_value(
db_path=self._db_path,
csv_path=self._csv_path,
column=column,
)
dagster_json_metadata = json.dumps(
{
f"{column}_min": min_value,
}
)
print(f"DAGSTER_START{dagster_json_metadata}DAGSTER_END") # noqa: T201


DBT_DIR = os.getenv("DBT_PROJECT_DIR")
Expand All @@ -49,6 +72,8 @@ def execute(self, context) -> None:
csv_path=CSV_PATH,
db_path=DB_PATH,
columns=IRIS_COLUMNS,
columns_for_min=["sepal_length_cm"],
outlets=[Dataset(uri="local://lakehouse/iris.csv")],
)
run_dbt_model = BashOperator(task_id="build_dbt_models", bash_command=f"dbt build {args}", dag=dag)
load_iris >> run_dbt_model # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ tasks:
- id: load_iris
proxied: False
- id: build_dbt_models
proxied: False
proxied: True
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
tasks:
- id: load_iris
proxied: True
proxied: False
- id: build_dbt_models
proxied: True
proxied: False
14 changes: 14 additions & 0 deletions examples/starlift-demo/dbt_example/shared/lakehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,17 @@ def load_csv_to_duckdb(
f"CREATE TABLE IF NOT EXISTS {db_name}.lakehouse.{table_name} AS SELECT * FROM df"
).fetchall()
con.close()


def get_min_value(
*,
csv_path: Path,
db_path: Path,
column: str,
) -> float:
con = duckdb.connect(str(db_path))
min_value = con.execute(
f"SELECT MIN({column}) FROM {id_from_path(db_path)}.lakehouse.{id_from_path(csv_path)}"
).fetchall()
con.close()
return min_value[0][0] # 0th row, 0th column
1 change: 0 additions & 1 deletion examples/starlift-demo/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"dagster",
"dagster-webserver",
"dagster-airlift[core,in-airflow]",
"dagster-dlift",
"dbt-duckdb",
"pandas",
],
Expand Down
Loading