Skip to content

Commit bc3c990

Browse files
committed
[dagster-airlift] Ingest metadata from airflow logs
1 parent 43f3973 commit bc3c990

File tree

9 files changed

+160
-8
lines changed

9 files changed

+160
-8
lines changed

python_modules/dagster/dagster/_core/definitions/metadata/external_metadata.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections.abc import Mapping, Sequence
2-
from typing import TYPE_CHECKING, Any, Literal, TypedDict, Union
2+
from typing import TYPE_CHECKING, Any, Literal, TypedDict, Union, get_args
33

44
import dagster._check as check
55
from dagster._core.definitions.asset_key import AssetKey
@@ -48,6 +48,8 @@ class ExternalMetadataValue(TypedDict):
4848
"table_column_lineage",
4949
"timestamp",
5050
]
51+
EXTERNAL_METADATA_VALUE_KEYS = frozenset(ExternalMetadataValue.__annotations__.keys())
52+
EXTERNAL_METADATA_TYPES = frozenset(get_args(ExternalMetadataType))
5153

5254

5355
def metadata_map_from_external(

python_modules/libraries/dagster-airlift/dagster_airlift/core/monitoring_job/event_stream.py

+39-4
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
12
from abc import ABC, abstractmethod
23
from collections.abc import Iterator
34
from itertools import chain
45
from typing import Optional
56

67
from dagster import AssetMaterialization
8+
from dagster._core.definitions.metadata.metadata_value import MetadataValue
79
from dagster._core.events import (
810
AssetMaterializationPlannedData,
911
DagsterEvent,
@@ -21,6 +23,7 @@
2123
from dagster_airlift.core.airflow_defs_data import AirflowDefinitionsData
2224
from dagster_airlift.core.airflow_instance import AirflowInstance
2325
from dagster_airlift.core.monitoring_job.utils import (
26+
extract_metadata_from_logs,
2427
get_dagster_run_for_airflow_repr,
2528
structured_log,
2629
)
@@ -122,6 +125,7 @@ def persist_state(
122125
@record
123126
class TaskInstanceCompleted(AirflowEvent):
124127
task_instance: TaskInstance
128+
metadata: dict[str, MetadataValue]
125129

126130
@property
127131
def timestamp(self) -> float:
@@ -137,7 +141,9 @@ def persist_state(
137141
for asset in airflow_data.mapped_asset_keys_by_task_handle[self.task_instance.task_handle]:
138142
# IMPROVEME: Add metadata to the materialization event.
139143
_report_materialization(
140-
context, corresponding_run, AssetMaterialization(asset_key=asset)
144+
context,
145+
corresponding_run,
146+
AssetMaterialization(asset_key=asset, metadata=self.metadata),
141147
)
142148

143149

@@ -254,6 +260,37 @@ def _process_completed_runs(
254260
break
255261

256262

263+
async def _retrieve_logs_for_task_instance(
264+
context: OpExecutionContext,
265+
airflow_instance: AirflowInstance,
266+
task_instance: TaskInstance,
267+
) -> TaskInstanceCompleted:
268+
logs = airflow_instance.get_task_instance_logs(
269+
task_instance.dag_id,
270+
task_instance.task_id,
271+
task_instance.run_id,
272+
task_instance.try_number,
273+
)
274+
metadata = extract_metadata_from_logs(context, logs)
275+
276+
return TaskInstanceCompleted(task_instance=task_instance, metadata=metadata)
277+
278+
279+
async def _async_process_task_instances(
280+
context: OpExecutionContext,
281+
airflow_instance: AirflowInstance,
282+
task_instances: list[TaskInstance],
283+
) -> Iterator[TaskInstanceCompleted]:
284+
results = await asyncio.gather(
285+
*(
286+
_retrieve_logs_for_task_instance(context, airflow_instance, task_instance)
287+
for task_instance in task_instances
288+
)
289+
)
290+
291+
return results
292+
293+
257294
def _process_task_instances(
258295
context: OpExecutionContext,
259296
airflow_data: AirflowDefinitionsData,
@@ -275,9 +312,7 @@ def _process_task_instances(
275312
context,
276313
f"Found {len(task_instances)} completed task instances in the time range {datetime_from_timestamp(range_start)} to {datetime_from_timestamp(range_end)}",
277314
)
278-
yield from (
279-
TaskInstanceCompleted(task_instance=task_instance) for task_instance in task_instances
280-
)
315+
yield from asyncio.run(_async_process_task_instances(context, airflow_instance, task_instances))
281316

282317

283318
def persist_events(

python_modules/libraries/dagster-airlift/dagster_airlift/core/monitoring_job/utils.py

+60-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
1-
from typing import Optional, Union
1+
import json
2+
from collections.abc import Iterable
3+
from typing import Optional, TypeVar, Union, cast
24

5+
import dagster._check as check
6+
from dagster._core.definitions.metadata.external_metadata import (
7+
EXTERNAL_METADATA_TYPE_INFER,
8+
EXTERNAL_METADATA_TYPES,
9+
EXTERNAL_METADATA_VALUE_KEYS,
10+
ExternalMetadataValue,
11+
metadata_map_from_external,
12+
)
13+
from dagster._core.definitions.metadata.metadata_value import MetadataValue
314
from dagster._core.execution.context.op_execution_context import OpExecutionContext
415
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus, RunsFilter
516
from dagster_airlift.constants import DAG_ID_TAG_KEY, DAG_RUN_ID_TAG_KEY
@@ -61,3 +72,51 @@ def get_dagster_run_for_airflow_repr(
6172
),
6273
None,
6374
)
75+
76+
77+
_T = TypeVar("_T")
78+
79+
80+
def _assert_param_value(value: _T, expected_values: Iterable[_T]) -> _T:
81+
if value not in expected_values:
82+
raise Exception(
83+
f"Invalid value when translating metadata from logs. Expected one of"
84+
f" `{expected_values}`, got `{value}`."
85+
)
86+
return value
87+
88+
89+
def extract_metadata_from_logs(context: OpExecutionContext, logs: str) -> dict[str, MetadataValue]:
90+
metadata = {}
91+
import re
92+
93+
matches = re.findall(r"DAGSTER_START(.*?)DAGSTER_END", logs, re.DOTALL)
94+
for match in matches:
95+
raw_external_metadata_map = json.loads(match)
96+
check.mapping_param(raw_external_metadata_map, "raw_external_metadata_map")
97+
new_external_metadata_map = {}
98+
for key, value in raw_external_metadata_map.items():
99+
if not isinstance(key, str):
100+
raise Exception(
101+
f"Invalid type when translating metadata from logs. Expected a dict with string"
102+
f" keys, got a key `{key}` of type `{type(key)}`."
103+
)
104+
elif isinstance(value, dict):
105+
if not {*value.keys()} == EXTERNAL_METADATA_VALUE_KEYS:
106+
raise Exception(
107+
f"Invalid type when translating metadata from logs. Expected a dict with"
108+
" string keys and values that are either raw metadata values or dictionaries"
109+
f" with schema `{{raw_value: ..., type: ...}}`. Got a value `{value}`."
110+
)
111+
_assert_param_value(value["type"], EXTERNAL_METADATA_TYPES)
112+
new_external_metadata_map[key] = cast("ExternalMetadataValue", value)
113+
else:
114+
new_external_metadata_map[key] = {
115+
"raw_value": value,
116+
"type": EXTERNAL_METADATA_TYPE_INFER,
117+
}
118+
119+
metadata_map = metadata_map_from_external(new_external_metadata_map)
120+
metadata.update(metadata_map)
121+
122+
return metadata

python_modules/libraries/dagster-airlift/dagster_airlift/core/runtime_representations.py

+4
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,7 @@ def dag_handle(self) -> "DagHandle":
131131
from dagster_airlift.core.serialization.serialized_data import DagHandle
132132

133133
return DagHandle(dag_id=self.dag_id)
134+
135+
@property
136+
def try_number(self) -> int:
137+
return self.metadata["try_number"]

python_modules/libraries/dagster-airlift/dagster_airlift/test/airflow_test_instance.py

+14
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ def __init__(
4141
variables: list[dict[str, Any]] = [],
4242
instance_name: Optional[str] = None,
4343
max_runs_per_batch: Optional[int] = None,
44+
logs: Optional[Mapping[str, Mapping[str, str]]] = None,
4445
) -> None:
4546
self._dag_infos_by_dag_id = {dag_info.dag_id: dag_info for dag_info in dag_infos}
4647
self._task_infos_by_dag_and_task_id = {
@@ -49,6 +50,11 @@ def __init__(
4950
self._task_instances_by_dag_and_task_id: dict[tuple[str, str], list[TaskInstance]] = (
5051
defaultdict(list)
5152
)
53+
self._logs_by_run_id_and_task_id: dict[tuple[str, str], str] = defaultdict(lambda: "")
54+
for run_id, task_log_map in (logs or {}).items():
55+
for task_id, log in task_log_map.items():
56+
self._logs_by_run_id_and_task_id[(run_id, task_id)] = log
57+
5258
for task_instance in task_instances:
5359
self._task_instances_by_dag_and_task_id[
5460
(task_instance.dag_id, task_instance.task_id)
@@ -241,6 +247,11 @@ def get_all_datasets(
241247
return_datasets.append(dataset)
242248
return return_datasets
243249

250+
def get_task_instance_logs(
251+
self, dag_id: str, task_id: str, run_id: str, try_number: int
252+
) -> str:
253+
return self._logs_by_run_id_and_task_id[(run_id, task_id)]
254+
244255

245256
def make_dag_info(
246257
instance_name: str, dag_id: str, file_token: Optional[str], dag_props: Mapping[str, Any]
@@ -281,6 +292,7 @@ def make_task_instance(
281292
"start_date": start_date.isoformat(),
282293
"end_date": end_date.isoformat(),
283294
"logical_date": logical_date.isoformat() if logical_date else start_date.isoformat(),
295+
"try_number": 1,
284296
},
285297
)
286298

@@ -352,6 +364,7 @@ def make_instance(
352364
max_runs_per_batch: Optional[int] = None,
353365
dag_props: dict[str, Any] = {},
354366
task_instances: Optional[list[TaskInstance]] = None,
367+
logs: Optional[Mapping[str, Mapping[str, str]]] = None,
355368
) -> AirflowInstanceFake:
356369
"""Constructs DagInfo, TaskInfo, and TaskInstance objects from provided data.
357370
@@ -415,4 +428,5 @@ def make_instance(
415428
instance_name=instance_name,
416429
max_runs_per_batch=max_runs_per_batch,
417430
datasets=datasets,
431+
logs=logs,
418432
)

python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/conftest.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from collections import defaultdict
2-
from collections.abc import Generator, Sequence
2+
from collections.abc import Generator, Mapping, Sequence
33
from datetime import datetime, timedelta
44
from typing import Any, Optional, Union
55

@@ -73,6 +73,7 @@ def create_defs_and_instance(
7373
dag_level_asset_overrides: Optional[dict[str, list[str]]] = None,
7474
seeded_runs: Optional[list[DagRun]] = None,
7575
seeded_task_instances: Optional[list[TaskInstance]] = None,
76+
seeded_logs: Optional[Mapping[str, Mapping[str, str]]] = None,
7677
) -> tuple[Definitions, AirflowInstance]:
7778
assets = []
7879
dag_and_task_structure = defaultdict(list)
@@ -132,6 +133,7 @@ def _asset():
132133
dag_runs=runs,
133134
dataset_construction_info=dataset_construction_info or [],
134135
task_instances=seeded_task_instances,
136+
logs=seeded_logs,
135137
)
136138
defs = Definitions.merge(
137139
additional_defs,

python_modules/libraries/dagster-airlift/dagster_airlift_tests/unit_tests/core_tests/test_monitoring_job.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
from datetime import datetime, timedelta, timezone
23

34
from dagster import AssetKey, DagsterInstance
@@ -32,6 +33,10 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst
3233
"""Test that monitoring job correctly represents state in Dagster."""
3334
freeze_datetime = datetime(2021, 1, 1, tzinfo=timezone.utc)
3435

36+
raw_metadata = {
37+
"foo": "bar",
38+
"my_timestamp": {"raw_value": 111, "type": "timestamp"},
39+
}
3540
with freeze_time(freeze_datetime):
3641
defs, af_instance = create_defs_and_instance(
3742
assets_per_task={
@@ -77,6 +82,9 @@ def test_monitoring_job_execution(init_load_context: None, instance: DagsterInst
7782
end_date=freeze_datetime,
7883
),
7984
],
85+
seeded_logs={
86+
"run-dag": {"task": f"DAGSTER_START{json.dumps(raw_metadata)}DAGSTER_END"}
87+
},
8088
)
8189
defs = build_job_based_airflow_defs(
8290
airflow_instance=af_instance,

python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/dataset_dags.py

+13-1
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,20 @@ def print_fn() -> None:
1919
import json
2020

2121
print("Hello") # noqa: T201
22-
data = json.dumps({"foo": "bar"})
22+
data = json.dumps(
23+
{
24+
"foo": "bar",
25+
"my_timestamp": {"raw_value": 111, "type": "timestamp"},
26+
}
27+
)
2328
print(f"DAGSTER_START{data}DAGSTER_END") # noqa: T201
29+
another_data = json.dumps(
30+
{
31+
"foo": "baz",
32+
"my_other_timestamp": {"raw_value": 113, "type": "timestamp"},
33+
}
34+
)
35+
print(f"DAGSTER_START{another_data}DAGSTER_END") # noqa: T201
2436

2537

2638
# Inter-dag structure as follows:

python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_e2e_jobs.py

+16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from dagster._core.definitions.asset_key import AssetKey
2+
from dagster._core.definitions.metadata.metadata_value import TimestampMetadataValue
23
from dagster._core.event_api import EventRecordsFilter
34
from dagster._core.events import DagsterEventType
45
from dagster._core.instance_for_test import instance_for_test
@@ -79,3 +80,18 @@ def test_job_based_defs(
7980
assert {materialized_records.run_id for materialized_records in materialized_records} == {
8081
producer_run.run_id
8182
}
83+
for key in ["example1", "example2"]:
84+
key_record = next(
85+
materialized_records
86+
for materialized_records in materialized_records
87+
if materialized_records.asset_key == AssetKey(key)
88+
)
89+
assert key_record.asset_materialization.metadata
90+
assert key_record.asset_materialization.metadata[
91+
"my_timestamp"
92+
] == TimestampMetadataValue(value=111.0)
93+
assert key_record.asset_materialization.metadata[
94+
"my_other_timestamp"
95+
] == TimestampMetadataValue(value=113.0)
96+
# It gets overridden by the second print
97+
assert key_record.asset_materialization.metadata["foo"].value == "baz"

0 commit comments

Comments
 (0)