Skip to content

Commit 893ffb7

Browse files
vdk-dag: improve error handling and error messages (#3152)
To make it easier to debug VDK DAG jobs some improvements in error handling are done. - Stop using deprecated UserCodeError in favour of domain oriented exceptoin class - Improve the formatting of the error message in case of failure. - Renamed DAG_LOCAL_RUN_JOB_PATH to DAGS_LOCAL_RUN_JOB_PATH to bring it inline with other configuration options [like here](https://github.com/vmware/versatile-data-kit/blob/main/projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_plugin_configuration.py#L6). Eventually we should expose it the same way but not in this PR. Testing Done: existing tests for regression. Here is a comparison before and now in the formatting: https://gist.githubusercontent.com/antoniivanov/8f87cdef1c610fcd24361197bcba45dc/raw/c31b990de2c0b1f49ac5989419cfc6ab12aac934/gistfile1.txt --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent d90c97c commit 893ffb7

File tree

6 files changed

+132
-55
lines changed

6 files changed

+132
-55
lines changed

Diff for: projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/cached_data_job_executor.py

+5-14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from vdk.internal.core.errors import UserCodeError
1313
from vdk.plugin.dag.dags import IDataJobExecutor
1414
from vdk.plugin.dag.dags import TrackableJob
15+
from vdk.plugin.dag.exception import DagJobExecutionException
1516
from vdk.plugin.dag.remote_data_job import JobStatus
1617

1718
log = logging.getLogger(__name__)
@@ -91,22 +92,12 @@ def finalize_job(self, job_name):
9192
job.details = details
9293
log.info(
9394
f"Finished data job {job_name}:\n"
94-
f"start_time: {details['start_time']}\n"
95-
f"end_time: {details.get('end_time')}\n"
96-
f"status: {details['status']}\n"
97-
f"message: {details['message']}"
95+
f" start_time: {details['start_time']}\n"
96+
f" end_time: {details.get('end_time')}\n"
97+
f" status: {details['status']}\n"
9898
)
9999
if job.status != JobStatus.SUCCEEDED.value and job.fail_dag_on_error:
100-
raise UserCodeError(
101-
ErrorMessage(
102-
"",
103-
"DAG failed due to a Data Job failure.",
104-
f"Data Job {job_name} failed. See details: {details}",
105-
"The rest of the jobs in the DAG will not be started "
106-
"and the DAG will fail.",
107-
"Investigate the error in the job or re-try again.",
108-
)
109-
)
100+
raise DagJobExecutionException(job_name, details)
110101

111102
@staticmethod
112103
def __get_printable_details(details):

Diff for: projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/dag_validator.py

+35-37
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,7 @@
77
from typing import Dict
88
from typing import List
99

10-
from vdk.internal.core.errors import ErrorMessage
11-
from vdk.internal.core.errors import UserCodeError
10+
from vdk.plugin.dag.exception import DagValidationException
1211

1312
log = logging.getLogger(__name__)
1413
Error = namedtuple("Error", ["TYPE", "PERMISSION", "REQUIREMENT", "CONFLICT"])
@@ -45,23 +44,10 @@ def validate(self, jobs: List[Dict]):
4544
self._check_dag_cycles(jobs)
4645
log.debug("Successfully validated the DAG!")
4746

48-
def _raise_error(
49-
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = ""
50-
):
51-
raise UserCodeError(
52-
ErrorMessage(
53-
"",
54-
"DAG failed due to a Data Job validation failure.",
55-
f"There is a {error_type} error with job(s) {jobs}. " + reason,
56-
"The DAG will not be built and will fail.",
57-
countermeasures,
58-
)
59-
)
60-
6147
def _validate_no_duplicates(self, jobs: List[Dict]):
6248
duplicated_jobs = list({job["job_name"] for job in jobs if jobs.count(job) > 1})
6349
if duplicated_jobs:
64-
self._raise_error(
50+
raise DagValidationException(
6551
ERROR.CONFLICT,
6652
f"There are some duplicated jobs: {duplicated_jobs}.",
6753
f"Remove the duplicated jobs from the list - each job can appear in the jobs list at most once. "
@@ -85,100 +71,111 @@ def _validate_job(self, job: Dict):
8571

8672
def _validate_job_type(self, job: Dict):
8773
if not isinstance(job, dict):
88-
self._raise_error(
74+
jobs = ["".join(list(job))]
75+
raise DagValidationException(
8976
ERROR.TYPE,
9077
"The job type is not dict.",
9178
f"Change the Data Job type. Current type is {type(job)}. Expected type is dict.",
92-
["".join(list(job))],
79+
jobs,
9380
)
9481

9582
def _validate_allowed_and_required_keys(self, job: Dict):
9683
disallowed_keys = [key for key in job.keys() if key not in allowed_job_keys]
9784
if disallowed_keys:
98-
self._raise_error(
85+
raise DagValidationException(
9986
ERROR.PERMISSION,
10087
"One or more job dict keys are not allowed.",
10188
f"Remove the disallowed Data Job Dict keys. "
10289
f"Keys {disallowed_keys} are not allowed. Allowed keys: {allowed_job_keys}.",
90+
None,
10391
)
10492
missing_keys = [key for key in required_job_keys if key not in job]
10593
if missing_keys:
106-
self._raise_error(
94+
raise DagValidationException(
10795
ERROR.REQUIREMENT,
10896
"One or more job dict required keys are missing.",
10997
f"Add the missing required Data Job Dict keys. Keys {missing_keys} "
11098
f"are missing. Required keys: {required_job_keys}.",
99+
None,
111100
)
112101

113102
def _validate_job_name(self, job: Dict):
114103
if not isinstance(job["job_name"], str):
115-
self._raise_error(
104+
jobs = ["".join(list(job))]
105+
raise DagValidationException(
116106
ERROR.TYPE,
117107
"The type of the job dict key job_name is not string.",
118108
f"Change the Data Job Dict value of job_name. "
119109
f"Current type is {type(job['job_name'])}. Expected type is string.",
120-
["".join(list(job))],
110+
jobs,
121111
)
122112

123113
def _validate_dependencies(self, job_name: str, dependencies: List[str]):
124114
if not (isinstance(dependencies, List)):
125-
self._raise_error(
115+
jobs = [job_name]
116+
raise DagValidationException(
126117
ERROR.TYPE,
127118
"The type of the job dict depends_on key is not list.",
128119
f"Check the Data Job Dict type of the depends_on key. Current type "
129120
f"is {type(dependencies)}. Expected type is list.",
130-
[job_name],
121+
jobs,
131122
)
132123
non_string_dependencies = [
133124
pred for pred in dependencies if not isinstance(pred, str)
134125
]
135126
if non_string_dependencies:
136-
self._raise_error(
127+
jobs1 = [job_name]
128+
raise DagValidationException(
137129
ERROR.TYPE,
138130
"One or more items of the job dependencies list are not strings.",
139131
f"Check the Data Job Dict values of the depends_on list. "
140132
f"There are some non-string values: {non_string_dependencies}. Expected type is string.",
141-
[job_name],
133+
jobs1,
142134
)
143135

144136
def _validate_team_name(self, job_name: str, team_name: str):
145137
if not isinstance(team_name, str):
146-
self._raise_error(
138+
jobs = [job_name]
139+
raise DagValidationException(
147140
ERROR.TYPE,
148141
"The type of the job dict key job_name is not string.",
149142
f"Change the Data Job Dict value of team_name. "
150143
f"Current type is {type(team_name)}. Expected type is string.",
151-
[job_name],
144+
jobs,
152145
)
153146

154147
def _validate_fail_dag_on_error(self, job_name: str, fail_dag_on_error: bool):
155148
if not isinstance(fail_dag_on_error, bool):
156-
self._raise_error(
149+
jobs = [job_name]
150+
raise DagValidationException(
157151
ERROR.TYPE,
158152
"The type of the job dict key fail_dag_on_error is not bool (True/False).",
159153
f"Change the Data Job Dict value of fail_dag_on_error. Current type"
160154
f" is {type(fail_dag_on_error)}. Expected type is bool.",
161-
[job_name],
155+
jobs,
162156
)
163157

164158
def _validate_arguments(self, job_name: str, job_args: dict):
165159
if not isinstance(job_args, dict):
166-
self._raise_error(
160+
jobs = [job_name]
161+
raise DagValidationException(
167162
ERROR.TYPE,
168163
"The type of the job dict key arguments is not dict.",
169164
f"Change the Data Job Dict value of arguments. "
170165
f"Current type is {type(job_args)}. Expected type is dict.",
171-
[job_name],
166+
jobs,
172167
)
173168
try:
174169
json.dumps(job_args)
175170
except TypeError as e:
176-
self._raise_error(
171+
reason = str(e)
172+
jobs1 = [job_name]
173+
raise DagValidationException(
177174
ERROR.TYPE,
178-
str(e),
175+
reason,
179176
f"Change the Data Job Dict value of arguments. "
180177
f"Current type is {type(job_args)} but not serializable as JSON.",
181-
[job_name],
178+
jobs1,
182179
)
183180

184181
def _check_dag_cycles(self, jobs: List[Dict]):
@@ -190,9 +187,10 @@ def _check_dag_cycles(self, jobs: List[Dict]):
190187
# Preparing the sorter raises CycleError if cycles exist
191188
topological_sorter.prepare()
192189
except graphlib.CycleError as e:
193-
self._raise_error(
190+
jobs1 = e.args[1][:-1]
191+
raise DagValidationException(
194192
ERROR.CONFLICT,
195193
"There is a cycle in the DAG.",
196194
f"Change the depends_on list of the jobs that participate in the detected cycle: {e.args[1]}.",
197-
e.args[1][:-1],
195+
jobs1,
198196
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# Copyright 2023-2024 Broadcom
2+
# SPDX-License-Identifier: Apache-2.0
3+
from typing import List
4+
from typing import Optional
5+
6+
from vdk.internal.core.error_classifiers import ResolvableBy
7+
from vdk.internal.core.error_classifiers import ResolvableByActual
8+
from vdk.internal.core.errors import BaseVdkError
9+
10+
11+
class DagValidationException(BaseVdkError):
12+
"""
13+
Exception raised for errors during DAG data job validation.
14+
15+
:param error_type: The type of error encountered.
16+
:param reason: Explanation of why the error occurred.
17+
:param countermeasures: Suggested actions to resolve the error.
18+
:param jobs: List of jobs associated with the error, defaults to None.
19+
"""
20+
21+
def __init__(
22+
self, error_type: str, reason: str, countermeasures: str, jobs: List[str] = None
23+
):
24+
self.jobs = jobs if jobs is not None else []
25+
self.error_type = error_type
26+
self.reason = reason
27+
self.countermeasures = countermeasures
28+
29+
jobs_formatted = ", ".join(self.jobs) if self.jobs else "N/A"
30+
message = (
31+
f"DAG Validation Error:\n"
32+
f" - Error Type: {self.error_type}\n"
33+
f" - Affected Jobs: {jobs_formatted}\n"
34+
f" - Reason: {self.reason}\n"
35+
f" - Countermeasures: {self.countermeasures}"
36+
)
37+
super().__init__(ResolvableByActual.USER, ResolvableBy.USER_ERROR, message)
38+
39+
40+
class DagJobExecutionException(BaseVdkError):
41+
"""
42+
Exception raised when an execution of a job within a DAG fails.
43+
44+
:param str job_name: The name of the job that failed.
45+
:param dict details: Any details relevant to the failure, optional.
46+
"""
47+
48+
def __init__(self, job_name: str, details: Optional[dict] = None):
49+
self.job_name = job_name
50+
self.details = details if details is not None else {}
51+
52+
details_formatted = self.format_details(self.details)
53+
54+
message = (
55+
f"Failure in DAG execution - Job '{self.job_name}' failed.\n"
56+
f" - Failed Job details:\n{details_formatted}"
57+
)
58+
# regardless of the failed job resolvable type, the DAG job always fails with user error
59+
# since the DAG itself didn't fail due to platform error.
60+
# The failed job itself might be platform error in this case the platform would still be alerted.
61+
# While the user is responsible for looking at the DAG itself.
62+
super().__init__(ResolvableByActual.USER, ResolvableBy.USER_ERROR, message)
63+
64+
@staticmethod
65+
def format_details(details: dict) -> str:
66+
if not details:
67+
return "None"
68+
69+
def format_dict(d, indent=0, indent_prefix=" ", initial_prefix=" "):
70+
formatted_str = ""
71+
current_indent = initial_prefix + indent_prefix * indent
72+
for key, value in d.items():
73+
if not value:
74+
continue
75+
formatted_str += f"{current_indent}{key}: "
76+
if isinstance(value, dict) and indent < 1:
77+
# Print nested dictionaries only up to the 2nd level
78+
formatted_str += "\n" + format_dict(
79+
value, indent + 1, indent_prefix
80+
)
81+
else:
82+
formatted_str += f"{value}\n"
83+
return formatted_str
84+
85+
return format_dict(details)

Diff for: projects/vdk-plugins/vdk-dag/src/vdk/plugin/dag/local_executor.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ def _determine_status_without_summary(self, result: int) -> str:
135135
return JobStatus.SUCCEEDED.value
136136

137137
def _update_message_with_summary(self, content: str):
138-
self._message = {"summary": json.loads(content), "logs": self._log_file}
138+
self._message = json.loads(content)
139+
self._message["logs"] = self._log_file
139140

140141

141142
class LocalDataJobRunException(Exception):
@@ -158,9 +159,10 @@ def __find_job_path(job_name: str):
158159
candidates = [
159160
os.getcwd(),
160161
]
162+
# TODO: expose this using the plugin configuration mechanisms (which infers also from env. vars among others)
161163
candidates += [
162164
part
163-
for part in os.environ.get("DAG_LOCAL_RUN_JOB_PATH", "").split(":")
165+
for part in os.environ.get("DAGS_LOCAL_RUN_JOB_PATH", "").split(":")
164166
if part
165167
]
166168

Diff for: projects/vdk-plugins/vdk-dag/tests/test_dag.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from vdk.internal.core.errors import UserCodeError
1616
from vdk.plugin.dag import dag_plugin
1717
from vdk.plugin.dag import dag_runner
18+
from vdk.plugin.dag.exception import DagValidationException
1819
from vdk.plugin.test_utils.util_funcs import cli_assert_equal
1920
from vdk.plugin.test_utils.util_funcs import CliEntryBasedTestRunner
2021
from vdk.plugin.test_utils.util_funcs import jobs_path_from_caller_directory
@@ -436,7 +437,7 @@ def _test_dag_validation(self, dag_name):
436437
self.runner = CliEntryBasedTestRunner(dag_plugin)
437438
result = self._run_dag(dag_name)
438439
cli_assert_equal(1, result)
439-
self._assert_dag_fails_with_error(result, UserCodeError)
440+
self._assert_dag_fails_with_error(result, DagValidationException)
440441
self.httpserver.stop()
441442

442443
def test_dag_circular_dependency(self):

Diff for: projects/vdk-plugins/vdk-dag/tests/test_local_executor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ def reduce_retries_in_test_http_requests():
2323
with mock.patch.dict(
2424
os.environ,
2525
{
26-
"DAG_LOCAL_RUN_JOB_PATH": jobs_path_from_caller_directory(""),
26+
"DAGS_LOCAL_RUN_JOB_PATH": jobs_path_from_caller_directory(""),
2727
"DAGS_DAG_EXECUTION_CHECK_TIME_PERIOD_SECONDS": "0",
2828
"DAGS_TIME_BETWEEN_STATUS_CHECK_SECONDS": "0",
2929
},

0 commit comments

Comments
 (0)