Skip to content

Commit 30757ef

Browse files
committed
[dagster-airlift] log retrieval API
1 parent 99a0cfe commit 30757ef

File tree

3 files changed

+51
-0
lines changed

3 files changed

+51
-0
lines changed

python_modules/libraries/dagster-airlift/dagster_airlift/core/airflow_instance.py

+35
Original file line numberDiff line numberDiff line change
@@ -542,3 +542,38 @@ def get_all_datasets(
542542
offset += batch_size
543543

544544
return datasets
545+
546+
def get_task_instance_logs(
547+
self, dag_id: str, task_id: str, run_id: str, try_number: int
548+
) -> str:
549+
continuation_token = None
550+
logs = []
551+
while True:
552+
response = self.auth_backend.get_session().get(
553+
f"{self.get_api_url()}/dags/{dag_id}/dagRuns/{run_id}/taskInstances/{task_id}/logs/{try_number}",
554+
headers={"Accept": "application/json"},
555+
params={"token": continuation_token} if continuation_token else None,
556+
timeout=5,
557+
)
558+
if response.status_code != 200:
559+
raise DagsterError(
560+
f"Failed to fetch task instance logs for {dag_id}/{task_id}/{run_id}/{try_number}. Status code: {response.status_code}, Message: {response.text}"
561+
)
562+
data = response.json()
563+
# Love how it's different in the two cases lol.
564+
continuation_token = data.get("continuation_token")
565+
log = parse_af_log_response(data["content"])
566+
logs.append(log)
567+
if not continuation_token or log == "":
568+
break
569+
return "".join(logs)
570+
571+
572+
def parse_af_log_response(logs: str) -> str:
573+
import ast
574+
575+
parsed_data: list = ast.literal_eval(logs)
576+
strs = []
577+
for log_item in parsed_data:
578+
strs.append(log_item[1])
579+
return "".join(strs)

python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink/airflow_dags/dataset_dags.py

+4
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,11 @@ def print_fn() -> None:
1616
import os
1717

1818
os.environ["NO_PROXY"] = "*"
19+
import json
20+
1921
print("Hello") # noqa: T201
22+
data = json.dumps({"foo": "bar"})
23+
print(f"DAGSTER_START{data}DAGSTER_END") # noqa: T201
2024

2125

2226
# Inter-dag structure as follows:

python_modules/libraries/dagster-airlift/kitchen-sink/kitchen_sink_tests/integration_tests/test_api.py

+12
Original file line numberDiff line numberDiff line change
@@ -205,3 +205,15 @@ def test_datasets(airflow_instance: None) -> None:
205205
)
206206
assert not asset_spec("example1", defs)
207207
assert not asset_spec("example2", defs)
208+
209+
210+
def test_log_retrieval(airflow_instance: None) -> None:
211+
af_instance = local_airflow_instance()
212+
run_id = af_instance.trigger_dag("dataset_producer")
213+
af_instance.wait_for_run_completion(dag_id="dataset_producer", run_id=run_id)
214+
logs = af_instance.get_task_instance_logs(
215+
dag_id="dataset_producer", task_id="print_task", run_id=run_id, try_number=1
216+
)
217+
assert logs
218+
assert "DAGSTER_START" in logs
219+
assert "DAGSTER_END" in logs

0 commit comments

Comments
 (0)