Skip to content

Commit df832a0

Browse files
committed
[dagster-airlift] job monitoring
1 parent 4534d04 commit df832a0

File tree

4 files changed

+546
-13
lines changed

4 files changed

+546
-13
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_defs_data.py

+1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def assets_produced_by_dags(self) -> Mapping[str, AbstractSet[AssetKey]]:
100100
result[dag_id].add(spec.key)
101101
return result
102102

103+
103104
@public
104105
@property
105106
def instance_name(self) -> str:

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py

+57-11
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,44 @@ def list_variables(self) -> list[dict[str, Any]]:
128128
"Failed to fetch variables. Status code: {response.status_code}, Message: {response.text}"
129129
)
130130

131+
def get_task_instance_batch_time_range(
132+
self, dag_ids: Sequence[str], states: Sequence[str], end_date_gte: datetime.datetime, end_date_lte: datetime.datetime,
133+
) -> list["TaskInstance"]:
134+
"""Get all task instances across all dag_ids for a given time range."""
135+
response = self.auth_backend.get_session().post(
136+
f"{self.get_api_url()}/dags/~/dagRuns/~/taskInstances/list",
137+
json={
138+
"dag_ids": dag_ids,
139+
"end_date_gte": end_date_gte.isoformat(),
140+
"end_date_lte": end_date_lte.isoformat(),
141+
"state": states,
142+
},
143+
)
144+
145+
if response.status_code != 200:
146+
raise DagsterError(
147+
f"Failed to fetch task instances for {dag_ids}. Status code: {response.status_code}, Message: {response.text}"
148+
)
149+
return [
150+
TaskInstance(
151+
webserver_url=self.auth_backend.get_webserver_url(),
152+
dag_id=dag_id,
153+
task_id=task_instance_json["task_id"],
154+
run_id=task_instance_json["dag_run_id"],
155+
metadata=task_instance_json,
156+
)
157+
for dag_id, task_instance_json in response.json()["task_instances"].items()
158+
]
159+
160+
161+
162+
131163
def get_task_instance_batch(
132164
self, dag_id: str, task_ids: Sequence[str], run_id: str, states: Sequence[str]
133165
) -> list["TaskInstance"]:
134166
"""Get all task instances for a given dag_id, task_ids, and run_id."""
167+
# It's not possible to offset the task instance API on versions of Airflow < 2.7.0, so we need to
168+
# chunk the task ids directly.
135169
task_instances = []
136170
task_id_chunks = [
137171
task_ids[i : i + self.batch_task_instance_limit]
@@ -259,25 +293,37 @@ def get_dag_runs(
259293
def get_dag_runs_batch(
260294
self,
261295
dag_ids: Sequence[str],
262-
end_date_gte: datetime.datetime,
263-
end_date_lte: datetime.datetime,
296+
end_date_gte: Optional[datetime.datetime]=None,
297+
end_date_lte: Optional[datetime.datetime]=None,
298+
start_date_gte: Optional[datetime.datetime]=None,
299+
start_date_lte: Optional[datetime.datetime]=None,
264300
offset: int = 0,
301+
states: Optional[Sequence[str]] = None,
265302
) -> tuple[list["DagRun"], int]:
266303
"""For the given list of dag_ids, return a tuple containing:
267304
- A list of dag runs ending within (end_date_gte, end_date_lte). Returns a maximum of batch_dag_runs_limit (which is configurable on the instance).
268305
- The number of total rows returned.
269306
"""
307+
states = states or ["success"]
308+
params = {
309+
"dag_ids": dag_ids,
310+
"order_by": "end_date",
311+
"states": states,
312+
"page_offset": offset,
313+
"page_limit": self.batch_dag_runs_limit,
314+
}
315+
if end_date_gte:
316+
params["end_date_gte"] = end_date_gte.isoformat()
317+
if end_date_lte:
318+
params["end_date_lte"] = end_date_lte.isoformat()
319+
if start_date_gte:
320+
params["start_date_gte"] = start_date_gte.isoformat()
321+
if start_date_lte:
322+
params["start_date_lte"] = start_date_lte.isoformat()
323+
270324
response = self.auth_backend.get_session().post(
271325
f"{self.get_api_url()}/dags/~/dagRuns/list",
272-
json={
273-
"dag_ids": dag_ids,
274-
"end_date_gte": end_date_gte.isoformat(),
275-
"end_date_lte": end_date_lte.isoformat(),
276-
"order_by": "end_date",
277-
"states": ["success"],
278-
"page_offset": offset,
279-
"page_limit": self.batch_dag_runs_limit,
280-
},
326+
json=params,
281327
)
282328
if response.status_code == 200:
283329
webserver_url = self.auth_backend.get_webserver_url()

0 commit comments

Comments
 (0)