Skip to content

Commit 87ae0d1

Browse files
authored
Handling missing node_info in dagster-dbt stream_raw_events (#29011)
## Summary & Motivation We have occasionally encountered errors due to unparseable logs that lead to dbt jobs being killed mid-stream. For example, we will see an info log emitted like `[LogTestResult]: Unable to parse logging event dictionary. Failed to parse num_failures field: Value out of range: 8146686661.. Dictionary: ...`, and then immediately after see an error due to: ``` The above exception was caused by the following exception: KeyError: 'node_info' File "/app/.venv/lib/python3.11/site-packages/dagster/_core/execution/plan/utils.py", line 56, in op_execution_error_boundary yield File "/app/.venv/lib/python3.11/site-packages/dagster/_utils/__init__.py", line 401, in iterate_with_context next_output = next(iterator) ^^^^^^^^^^^^^^ File "/app/services/dagster/domain/ops.py", line 76, in custom_dbt_build_op for dbt_event in dbt_task.stream_raw_events(): File "/app/.venv/lib/python3.11/site-packages/dagster_dbt/core/dbt_cli_invocation.py", line 336, in stream_raw_events is_result_event = DbtCliEventMessage.is_result_event(raw_event) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/app/.venv/lib/python3.11/site-packages/dagster_dbt/core/dbt_cli_event.py", line 243, in is_result_event ) and not raw_event["data"]["node_info"]["unique_id"].startswith("unit_test") ~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^ ``` Within `DbtCliInvocation.stream_raw_events()`, the code anticipates that there may be cases where `raw_event['data']` may be missing the key `node_info` ([here](https://github.com/dagster-io/dagster/blob/4e5506696e3c1584f7ea204746a5503f25d27b30/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_invocation.py#L336)), but within `DbtCliEventMessage.is_result_event()` (called immediately subsequently within `stream_raw_events` [here](https://github.com/dagster-io/dagster/blob/4e5506696e3c1584f7ea204746a5503f25d27b30/python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_invocation.py#L337)), `node_info` is expected and thus an error is raised if it's missing. Also, `make ruff` added a couple unrelated formatting changes to the file. [These](https://github.com/dagster-io/dagster/pull/29011/files#diff-6670355d6443aed5576ec4c378936e6be622b18544a024ed045069ba88646ac3R243-R245) are the only intentional changes. ## How I Tested These Changes I don't see any existing testing directly around this that I could easily hook into; would appreciate any thoughts on where / how best to test this. Given that I'm just mirroring the logic used in the line above where this is called, likely not _terribly_ risky, but happy to do more here.
1 parent ff836db commit 87ae0d1

File tree

1 file changed

+13
-5
lines changed

1 file changed

+13
-5
lines changed

python_modules/libraries/dagster-dbt/dagster_dbt/core/dbt_cli_event.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,9 @@ def has_column_lineage_metadata(self) -> bool:
240240
def is_result_event(raw_event: dict[str, Any]) -> bool:
241241
return raw_event["info"]["name"] in set(
242242
["LogSeedResult", "LogModelResult", "LogSnapshotResult", "LogTestResult"]
243-
) and not raw_event["data"]["node_info"]["unique_id"].startswith("unit_test")
243+
) and not raw_event["data"].get("node_info", {}).get("unique_id", "").startswith(
244+
"unit_test"
245+
)
244246

245247
def _yield_observation_events_for_test(
246248
self,
@@ -271,7 +273,11 @@ def to_default_asset_events(
271273
target_path: Optional[Path] = None,
272274
) -> Iterator[
273275
Union[
274-
Output, AssetMaterialization, AssetObservation, AssetCheckResult, AssetCheckEvaluation
276+
Output,
277+
AssetMaterialization,
278+
AssetObservation,
279+
AssetCheckResult,
280+
AssetCheckEvaluation,
275281
]
276282
]:
277283
"""Convert a dbt CLI event to a set of corresponding Dagster events.
@@ -338,9 +344,11 @@ def to_default_asset_events(
338344
"invocation_id": invocation_id,
339345
}
340346

341-
if event_node_info.get("node_started_at") in ["", "None", None] and event_node_info.get(
342-
"node_finished_at"
343-
) in ["", "None", None]:
347+
if event_node_info.get("node_started_at") in [
348+
"",
349+
"None",
350+
None,
351+
] and event_node_info.get("node_finished_at") in ["", "None", None]:
344352
# if model materialization is incremental microbatch, node_started_at and node_finished_at are empty strings
345353
# and require fallback to data.execution_time
346354
default_metadata["Execution Duration"] = self.raw_event["data"]["execution_time"]

0 commit comments

Comments
 (0)