Skip to content

Commit 83e7b0a

Browse files
authored
Support parsing of fusion logs for warnings in get_job_run_error (#737)
## Summary <!-- Provide a brief description of the changes in this PR --> With fusion sf GA right around the corner, Benoit said tis time to add support for fusion log parsing of warnings. This should be stable now! This is specific to the Admin API tool, `get_job_run_error` which optionally returns warnings as well. ## What Changed <!-- Describe the changes made in this PR --> - Add `WarningFetcher._extract_fusion_log_warnings` and `WarningFetcher.is_fusion_logs` check - Parses fusion-specific warn format in logs ## Why <!-- Explain the motivation for these changes --> ## Related Issues <!-- Link any related issues using #issue_number --> Closes #447 ## Checklist - [x] I have performed a self-review of my code - [x] I have made corresponding changes to the documentation (in https://github.com/dbt-labs/docs.getdbt.com) if required -- Mention it here - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes ## Validation 1. Tested in MCP inspector (trust me bro) 2. Tested with Claude Cpde <img width="863" height="478" alt="Screenshot 2026-04-21 at 10 39 37 PM" src="https://github.com/user-attachments/assets/aef6c6cf-3309-46dd-8a3b-57619e657c17" /> <img width="862" height="451" alt="Screenshot 2026-04-21 at 10 40 04 PM" src="https://github.com/user-attachments/assets/b7f795ba-fc6b-46e7-a938-df98480ba190" />
1 parent 1d8f75f commit 83e7b0a

3 files changed

Lines changed: 138 additions & 8 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
kind: Enhancement or New Feature
2+
body: Support parsing of fusion logs for warnings
3+
time: 2026-04-21T22:10:40.238804-07:00

src/dbt_mcp/dbt_admin/run_artifacts/parser.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,16 @@
2626

2727
logger = logging.getLogger(__name__)
2828

29+
# Matches ANSI color/style escape sequences (ie \x1b[33;1m) so they can be
30+
# stripped from log output before pattern matching on plain text
31+
_ANSI_ESCAPE = re.compile(r"\x1b\[[0-9;]*m")
32+
# dbt<####> is the Fusion event code (ie dbt1088) to catch warnings. Warnings without this code
33+
# will not be matched (can upate if Fusion changes its log format)
34+
_FUSION_WARN = re.compile(
35+
r"^\d{2}:\d{2}:\d{2}\s+WARN\s+Warn\s+dbt\d+:\s+.+$",
36+
re.MULTILINE,
37+
)
38+
2939

3040
class JobRunFetcher:
3141
"""Base class for fetching and parsing dbt Cloud job run artifacts.
@@ -537,20 +547,42 @@ async def _check_source_freshness_warnings(
537547
result_status="warn",
538548
)
539549

550+
def _is_fusion_logs(self, clean_logs: str) -> bool:
551+
"""Detect whether logs are from dbt Fusion (vs dbt Core/Platform)."""
552+
return "Fusion version:" in clean_logs
553+
554+
def _extract_fusion_log_warnings(self, clean_logs: str) -> list[OutputResultSchema]:
555+
"""Extract warnings from dbt Fusion logs.
556+
557+
Fusion warnings are self-contained single lines:
558+
HH:MM:SS WARN Warn dbtXXXX: <message>
559+
"""
560+
return [
561+
OutputResultSchema(
562+
unique_id=None,
563+
relation_name=None,
564+
message=match.group(0).strip(),
565+
status="warn",
566+
)
567+
for match in _FUSION_WARN.finditer(clean_logs)
568+
]
569+
540570
def _extract_log_warnings(self, step: RunStepSchema) -> list[OutputResultSchema]:
541-
"""Extract warnings from step logs, matching on [WARNING] log entries."""
571+
"""Extract warnings from step logs (supports dbt Core and dbt Fusion formats)."""
542572
if not step.logs:
543573
return []
544574

545-
if "[WARNING]" not in step.logs:
546-
return []
575+
# Remove ANSI color codes to focus on text
576+
clean_logs = _ANSI_ESCAPE.sub("", step.logs)
547577

548-
# TODO: Fusion logs differ from core (currently core only)
549-
# Need to explore and implement a solution for this
578+
# dbt Fusion: WARN with single line
579+
if self._is_fusion_logs(clean_logs):
580+
return self._extract_fusion_log_warnings(clean_logs)
581+
582+
# dbt Core: [WARNING] inline marker with possible continuation lines
583+
if "[WARNING]" not in clean_logs:
584+
return []
550585

551-
# Remove ANSI color codes to focus on text
552-
ansi_escape = re.compile(r"\x1b\[[0-9;]*m")
553-
clean_logs = ansi_escape.sub("", step.logs)
554586
lines = clean_logs.split("\n")
555587
warnings = []
556588
warning_pattern = r"\[WARNING\]" # Standard warning log pattern in dbt Platform

tests/unit/dbt_admin/run_artifacts/test_warning_fetcher.py

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,37 @@
129129
"log_warnings": 1,
130130
},
131131
),
132+
# Fusion log warnings extracted from WARN-prefixed log entries
133+
(
134+
{
135+
"id": 500,
136+
"status": 10,
137+
"is_cancelled": False,
138+
"finished_at": "2024-01-01T12:00:00Z",
139+
"run_steps": [
140+
{
141+
"index": 1,
142+
"name": "Invoke dbt with `dbt run`",
143+
"status": 10,
144+
"finished_at": "2024-01-01T12:00:00Z",
145+
"logs": (
146+
"04:00:14 INFO \x1b[1m Running\x1b[0m Fusion version: 2.0.0-preview.173\n"
147+
"04:00:14 WARN \x1b[33;1m Warn\x1b[0m \x1b[1mdbt1088\x1b[0m: Updated version available for zendesk@1.4.1: 1.5.1\n"
148+
"04:00:15 WARN \x1b[33;1m Warn\x1b[0m \x1b[1mdbt1088\x1b[0m: Updated version available for tiktok_ads@1.1.0: 1.2.0\n"
149+
"04:00:16 INFO \x1b[1;32m Installed\x1b[0m 2 packages"
150+
),
151+
}
152+
],
153+
},
154+
[None], # No run_results.json available
155+
True,
156+
{
157+
"total_warnings": 2,
158+
"test_warnings": 0,
159+
"freshness_warnings": 0,
160+
"log_warnings": 2,
161+
},
162+
),
132163
],
133164
)
134165
async def test_warning_scenarios(
@@ -184,3 +215,67 @@ async def mock_get_artifact(account_id, run_id, artifact_path, step=None): # no
184215

185216
assert result["has_warnings"] == expected_has_warnings
186217
assert result["summary"] == expected_counts
218+
219+
220+
@pytest.mark.parametrize(
221+
"clean_logs,expected",
222+
[
223+
# Fusion banner present → detected as Fusion
224+
("04:00:14 INFO Running Fusion version: 2.0.0-preview.173\n", True),
225+
# Core logs — no Fusion banner
226+
("10:00:00 [WARNING] Deprecated function usage detected\n", False),
227+
# Contains "Fusion" in a model name but not the banner — must not false-positive
228+
("10:00:00 INFO model fusion_events completed successfully\n", False),
229+
],
230+
)
231+
def test_is_fusion_logs(mock_client, admin_config, clean_logs, expected):
232+
fetcher = WarningFetcher(
233+
run_id=1,
234+
run_details={
235+
"id": 1,
236+
"status": 10,
237+
"is_cancelled": False,
238+
"finished_at": "2024-01-01T00:00:00Z",
239+
"run_steps": [],
240+
},
241+
client=mock_client,
242+
admin_api_config=admin_config,
243+
)
244+
assert fetcher._is_fusion_logs(clean_logs) == expected
245+
246+
247+
@pytest.mark.parametrize(
248+
"clean_logs,expected_messages",
249+
[
250+
# No WARN lines → empty result
251+
(
252+
"04:00:14 INFO Running Fusion version: 2.0.0-preview.173\n"
253+
"04:00:16 INFO Installed 2 packages\n",
254+
[],
255+
),
256+
# Single WARN line → one result with the full line as the message
257+
(
258+
"04:00:14 WARN Warn dbt1088: Updated version available for zendesk@1.4.1: 1.5.1\n",
259+
[
260+
"04:00:14 WARN Warn dbt1088: Updated version available for zendesk@1.4.1: 1.5.1"
261+
],
262+
),
263+
],
264+
)
265+
def test_extract_fusion_log_warnings(
266+
mock_client, admin_config, clean_logs, expected_messages
267+
):
268+
fetcher = WarningFetcher(
269+
run_id=1,
270+
run_details={
271+
"id": 1,
272+
"status": 10,
273+
"is_cancelled": False,
274+
"finished_at": "2024-01-01T00:00:00Z",
275+
"run_steps": [],
276+
},
277+
client=mock_client,
278+
admin_api_config=admin_config,
279+
)
280+
results = fetcher._extract_fusion_log_warnings(clean_logs)
281+
assert [r.message for r in results] == expected_messages

0 commit comments

Comments
 (0)