1
+ import asyncio
1
2
from collections .abc import Iterator
2
3
from itertools import chain
3
4
from typing import Optional
4
5
5
6
from dagster import AssetMaterialization
7
+ from dagster ._core .definitions .metadata .metadata_value import MetadataValue
6
8
from dagster ._core .events import (
7
9
AssetMaterializationPlannedData ,
8
10
DagsterEvent ,
20
22
from dagster_airlift .core .airflow_defs_data import AirflowDefinitionsData
21
23
from dagster_airlift .core .airflow_instance import AirflowInstance
22
24
from dagster_airlift .core .monitoring_job .utils import (
25
+ extract_metadata_from_logs ,
23
26
get_dagster_run_for_airflow_repr ,
24
27
structured_log ,
25
28
)
@@ -93,6 +96,7 @@ def persist_state(
93
96
@record
94
97
class TaskInstanceCompleted :
95
98
task_instance : TaskInstance
99
+ metadata : dict [str , MetadataValue ]
96
100
97
101
@property
98
102
def timestamp (self ) -> float :
@@ -108,7 +112,9 @@ def persist_state(
108
112
for asset in airflow_data .mapped_asset_keys_by_task_handle [self .task_instance .task_handle ]:
109
113
# IMPROVEME: Add metadata to the materialization event.
110
114
_report_materialization (
111
- context , corresponding_run , AssetMaterialization (asset_key = asset )
115
+ context ,
116
+ corresponding_run ,
117
+ AssetMaterialization (asset_key = asset , metadata = self .metadata ),
112
118
)
113
119
114
120
@@ -225,6 +231,37 @@ def _process_completed_runs(
225
231
break
226
232
227
233
234
+ async def _retrieve_logs_for_task_instance (
235
+ context : OpExecutionContext ,
236
+ airflow_instance : AirflowInstance ,
237
+ task_instance : TaskInstance ,
238
+ ) -> TaskInstanceCompleted :
239
+ logs = airflow_instance .get_task_instance_logs (
240
+ task_instance .dag_id ,
241
+ task_instance .task_id ,
242
+ task_instance .run_id ,
243
+ task_instance .try_number ,
244
+ )
245
+ metadata = extract_metadata_from_logs (context , logs )
246
+
247
+ return TaskInstanceCompleted (task_instance = task_instance , metadata = metadata )
248
+
249
+
250
+ async def _async_process_task_instances (
251
+ context : OpExecutionContext ,
252
+ airflow_instance : AirflowInstance ,
253
+ task_instances : list [TaskInstance ],
254
+ ) -> Iterator [TaskInstanceCompleted ]:
255
+ results = await asyncio .gather (
256
+ * (
257
+ _retrieve_logs_for_task_instance (context , airflow_instance , task_instance )
258
+ for task_instance in task_instances
259
+ )
260
+ )
261
+
262
+ return results
263
+
264
+
228
265
def _process_task_instances (
229
266
context : OpExecutionContext ,
230
267
airflow_data : AirflowDefinitionsData ,
@@ -246,9 +283,7 @@ def _process_task_instances(
246
283
context ,
247
284
f"Found { len (task_instances )} completed task instances in the time range { datetime_from_timestamp (range_start )} to { datetime_from_timestamp (range_end )} " ,
248
285
)
249
- yield from (
250
- TaskInstanceCompleted (task_instance = task_instance ) for task_instance in task_instances
251
- )
286
+ yield from asyncio .run (_async_process_task_instances (context , airflow_instance , task_instances ))
252
287
253
288
254
289
def persist_events (
0 commit comments