Skip to content

Commit a814d71

Browse files
benpankowalangenfeld
authored andcommitted
More aggressively mask user code errors when masking enabled (#27183)
## Summary Follow-on to the functionality introduced in #18688. The original PR ensures that if the `DAGSTER_REDACT_USER_CODE_ERRORS` env var is set to `1`, instances of `DagsterUserCodeExecutionError` which are serialized are masked so that their potentially sensitive contents are not leaked. However, there is a class of errors that can still expose elements of user code: system/framework errors or interrupts, for example, will include their stack trace and snippets of user code. This PR attempts to handle this second class of errors, in a few ways: - Any error which is raised in a user code context has its stacktrace logged and then cleared, so if the error makes it to serialization, it will only include the error message. - User code exceptions have their `original_exc_info` purged. - A global mapping stores the error, so that a unique error ID can be raised in the UI, which can be used to locate the original error in logs. ## Test Plan New unit tests which specifically handle the intterrupt/system error cases.
1 parent 265f6ba commit a814d71

File tree

4 files changed

+251
-34
lines changed

4 files changed

+251
-34
lines changed

python_modules/dagster/dagster/_core/errors.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,8 +280,9 @@ def user_code_error_boundary(
280280
"""
281281
check.callable_param(msg_fn, "msg_fn")
282282
check.class_param(error_cls, "error_cls", superclass=DagsterUserCodeExecutionError)
283+
from dagster._utils.error import redact_user_stacktrace_if_enabled
283284

284-
with raise_execution_interrupts():
285+
with redact_user_stacktrace_if_enabled(), raise_execution_interrupts():
285286
if log_manager:
286287
log_manager.begin_python_log_capture()
287288
try:

python_modules/dagster/dagster/_core/execution/plan/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,13 @@ def op_execution_error_boundary(
4242
as respecting the RetryPolicy if present.
4343
"""
4444
from dagster._core.execution.context.system import StepExecutionContext
45+
from dagster._utils.error import redact_user_stacktrace_if_enabled
4546

4647
check.callable_param(msg_fn, "msg_fn")
4748
check.class_param(error_cls, "error_cls", superclass=DagsterUserCodeExecutionError)
4849
check.inst_param(step_context, "step_context", StepExecutionContext)
4950

50-
with raise_execution_interrupts():
51+
with redact_user_stacktrace_if_enabled(), raise_execution_interrupts():
5152
step_context.log.begin_python_log_capture()
5253
retry_policy = step_context.op_retry_policy
5354

python_modules/dagster/dagster/_utils/error.py

Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import contextlib
12
import logging
23
import os
4+
import sys
35
import traceback
46
import uuid
57
from collections.abc import Sequence
@@ -9,6 +11,7 @@
911
from typing_extensions import TypeAlias
1012

1113
import dagster._check as check
14+
from dagster._core.errors import DagsterUserCodeExecutionError
1215
from dagster._serdes import whitelist_for_serdes
1316

1417

@@ -94,6 +97,104 @@ def _should_redact_user_code_error() -> bool:
9497
)
9598

9699

100+
USER_FACING_ERROR_ID_ATTR_NAME = "_redacted_error_uuid"
101+
102+
103+
class DagsterRedactedUserCodeError(DagsterUserCodeExecutionError):
104+
pass
105+
106+
107+
@contextlib.contextmanager
108+
def redact_user_stacktrace_if_enabled():
109+
"""Context manager which, if a user has enabled redacting user code errors, logs exceptions raised from within,
110+
and clears the stacktrace from the exception. It also marks the exception to be redacted if it was to be persisted
111+
or otherwise serialized to be sent to Dagster Plus. This is useful for preventing sensitive information from
112+
being leaked in error messages.
113+
"""
114+
if not _should_redact_user_code_error():
115+
yield
116+
else:
117+
try:
118+
yield
119+
except BaseException as e:
120+
exc_info = sys.exc_info()
121+
122+
# Generate a unique error ID for this error, or re-use an existing one
123+
# if this error has already been seen
124+
existing_error_id = getattr(e, USER_FACING_ERROR_ID_ATTR_NAME, None)
125+
126+
if not existing_error_id:
127+
error_id = str(uuid.uuid4())
128+
129+
# Track the error ID for this exception so we can redact it later
130+
setattr(e, USER_FACING_ERROR_ID_ATTR_NAME, error_id)
131+
masked_logger = logging.getLogger(_REDACTED_ERROR_LOGGER_NAME)
132+
133+
masked_logger.error(
134+
f"Error occurred during user code execution, error ID {error_id}",
135+
exc_info=exc_info,
136+
)
137+
else:
138+
error_id = existing_error_id
139+
140+
if isinstance(e, DagsterUserCodeExecutionError):
141+
# To be especially sure that user code error information doesn't leak from
142+
# outside the context, we raise a new exception with a cleared original_exc_info
143+
# The only remnant that remains is user_exception, which we only use to allow the user
144+
# to retrieve exceptions in hooks
145+
try:
146+
raise Exception("Masked").with_traceback(None) from None
147+
except Exception as dummy_exception:
148+
redacted_exception = DagsterRedactedUserCodeError(
149+
f"Error occurred during user code execution, error ID {error_id}. "
150+
"The error has been masked to prevent leaking sensitive information. "
151+
"Search in logs for this error ID for more details.",
152+
user_exception=e.user_exception,
153+
original_exc_info=sys.exc_info(),
154+
).with_traceback(None)
155+
setattr(dummy_exception, USER_FACING_ERROR_ID_ATTR_NAME, error_id)
156+
setattr(redacted_exception, USER_FACING_ERROR_ID_ATTR_NAME, error_id)
157+
raise redacted_exception from None
158+
159+
# Redact the stacktrace to ensure it will not be passed to Dagster Plus
160+
raise e.with_traceback(None) from None
161+
162+
163+
def _generate_redacted_user_code_error_message(err_id: str) -> SerializableErrorInfo:
164+
return SerializableErrorInfo(
165+
message=(
166+
f"Error occurred during user code execution, error ID {err_id}. "
167+
"The error has been masked to prevent leaking sensitive information. "
168+
"Search in logs for this error ID for more details."
169+
),
170+
stack=[],
171+
cls_name="DagsterRedactedUserCodeError",
172+
cause=None,
173+
context=None,
174+
)
175+
176+
177+
def _generate_partly_redacted_framework_error_message(
178+
exc_info: ExceptionInfo, err_id: str
179+
) -> SerializableErrorInfo:
180+
exc_type, e, tb = exc_info
181+
tb_exc = traceback.TracebackException(check.not_none(exc_type), check.not_none(e), tb)
182+
error_info = _serializable_error_info_from_tb(tb_exc)
183+
184+
return SerializableErrorInfo(
185+
message=error_info.message
186+
+ (
187+
f"Error ID {err_id}. "
188+
"The error has been masked to prevent leaking sensitive information. "
189+
"Search in logs for this error ID for more details."
190+
),
191+
stack=[],
192+
cls_name=error_info.cls_name,
193+
cause=None,
194+
context=None,
195+
)
196+
197+
97198
def serializable_error_info_from_exc_info(
98199
exc_info: ExceptionInfo,
99200
# Whether to forward serialized errors thrown from subprocesses
@@ -116,27 +217,18 @@ def serializable_error_info_from_exc_info(
116217
e = check.not_none(e, additional_message=additional_message)
117218
tb = check.not_none(tb, additional_message=additional_message)
118219

119-
from dagster._core.errors import DagsterUserCodeExecutionError, DagsterUserCodeProcessError
120-
121-
if isinstance(e, DagsterUserCodeExecutionError) and _should_redact_user_code_error():
122-
error_id = str(uuid.uuid4())
123-
masked_logger = logging.getLogger(_REDACTED_ERROR_LOGGER_NAME)
124-
125-
masked_logger.error(
126-
f"Error occurred during user code execution, error ID {error_id}",
127-
exc_info=exc_info,
128-
)
129-
return SerializableErrorInfo(
130-
message=(
131-
f"Error occurred during user code execution, error ID {error_id}. "
132-
"The error has been masked to prevent leaking sensitive information. "
133-
"Search in logs for this error ID for more details."
134-
),
135-
stack=[],
136-
cls_name="DagsterRedactedUserCodeError",
137-
cause=None,
138-
context=None,
139-
)
220+
from dagster._core.errors import DagsterUserCodeProcessError
221+
222+
err_id = getattr(e, USER_FACING_ERROR_ID_ATTR_NAME, None)
223+
if err_id:
224+
if isinstance(e, DagsterUserCodeExecutionError):
225+
# For user code, we want to completely mask the error message, since
226+
# both the stacktrace and the message could contain sensitive information
227+
return _generate_redacted_user_code_error_message(err_id)
228+
else:
229+
# For all other errors (framework errors, interrupts),
230+
# we want to redact the error message, but keep the stacktrace
231+
return _generate_partly_redacted_framework_error_message(exc_info, err_id)
140232

141233
if (
142234
hoist_user_code_error

python_modules/dagster/dagster_tests/core_tests/test_mask_user_code_errors.py

Lines changed: 134 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,20 @@
1+
import logging
12
import re
23
import sys
34
import time
45
import traceback
5-
from typing import Any
6+
from typing import Any, Callable
67

78
import pytest
89
from dagster import Config, RunConfig, config_mapping, job, op
10+
from dagster._core.definitions.events import Failure
911
from dagster._core.definitions.timestamp import TimestampWithTimezone
10-
from dagster._core.errors import DagsterUserCodeProcessError
12+
from dagster._core.errors import (
13+
DagsterExecutionInterruptedError,
14+
DagsterUserCodeExecutionError,
15+
DagsterUserCodeProcessError,
16+
user_code_error_boundary,
17+
)
1118
from dagster._core.test_utils import environ, instance_for_test
1219
from dagster._utils.error import (
1320
_serializable_error_info_from_tb,
@@ -30,32 +37,148 @@ def __init__(self):
3037
)
3138

3239

40+
class hunter2:
41+
pass
42+
43+
3344
@pytest.fixture(scope="function")
3445
def enable_masking_user_code_errors() -> Any:
3546
with environ({"DAGSTER_REDACT_USER_CODE_ERRORS": "1"}):
3647
yield
3748

3849

39-
def test_masking_op_execution(enable_masking_user_code_errors) -> Any:
50+
def test_masking_basic(enable_masking_user_code_errors):
51+
try:
52+
with user_code_error_boundary(
53+
error_cls=DagsterUserCodeExecutionError,
54+
msg_fn=lambda: "hunter2",
55+
):
56+
57+
def hunter2():
58+
raise UserError()
59+
60+
hunter2()
61+
except Exception:
62+
exc_info = sys.exc_info()
63+
err_info = serializable_error_info_from_exc_info(exc_info)
64+
65+
assert "hunter2" not in str(err_info)
66+
67+
68+
def test_masking_nested_user_code_err_boundaries(enable_masking_user_code_errors):
69+
try:
70+
with user_code_error_boundary(
71+
error_cls=DagsterUserCodeExecutionError,
72+
msg_fn=lambda: "hunter2 as well",
73+
):
74+
with user_code_error_boundary(
75+
error_cls=DagsterUserCodeExecutionError,
76+
msg_fn=lambda: "hunter2",
77+
):
78+
79+
def hunter2():
80+
raise UserError()
81+
82+
hunter2()
83+
except Exception:
84+
exc_info = sys.exc_info()
85+
err_info = serializable_error_info_from_exc_info(exc_info)
86+
87+
assert "hunter2" not in str(err_info)
88+
89+
90+
def test_masking_nested_user_code_err_boundaries_reraise(enable_masking_user_code_errors):
91+
try:
92+
try:
93+
with user_code_error_boundary(
94+
error_cls=DagsterUserCodeExecutionError,
95+
msg_fn=lambda: "hunter2",
96+
):
97+
98+
def hunter2():
99+
raise UserError()
100+
101+
hunter2()
102+
except Exception as e:
103+
# Mimics behavior of resource teardown, which runs in a
104+
# user_code_error_boundary after the user code raises an error
105+
with user_code_error_boundary(
106+
error_cls=DagsterUserCodeExecutionError,
107+
msg_fn=lambda: "teardown after we raised hunter2 error",
108+
):
109+
# do teardown stuff
110+
raise e
111+
112+
except Exception:
113+
exc_info = sys.exc_info()
114+
err_info = serializable_error_info_from_exc_info(exc_info)
115+
116+
assert "hunter2" not in str(err_info)
117+
118+
119+
ERROR_ID_REGEX = r"[Ee]rror ID ([a-z0-9\-]+)"
120+
121+
122+
@pytest.mark.parametrize(
123+
"exc_name, expect_exc_name_in_error, build_exc",
124+
[
125+
("UserError", False, lambda: UserError()),
126+
("TypeError", False, lambda: TypeError("hunter2")),
127+
("KeyboardInterrupt", True, lambda: KeyboardInterrupt()),
128+
("DagsterExecutionInterruptedError", True, lambda: DagsterExecutionInterruptedError()),
129+
("Failure", True, lambda: Failure("asdf")),
130+
],
131+
)
132+
def test_masking_op_execution(
133+
enable_masking_user_code_errors,
134+
exc_name: str,
135+
expect_exc_name_in_error: bool,
136+
build_exc: Callable[[], BaseException],
137+
caplog,
138+
) -> Any:
40139
@op
41140
def throws_user_error(_):
42-
raise UserError()
141+
def hunter2():
142+
raise build_exc()
143+
144+
hunter2()
43145

44146
@job
45147
def job_def():
46148
throws_user_error()
47149

48-
result = job_def.execute_in_process(raise_on_error=False)
150+
with caplog.at_level(logging.ERROR):
151+
result = job_def.execute_in_process(raise_on_error=False)
49152
assert not result.success
50-
assert not any("hunter2" in str(event) for event in result.all_events)
153+
154+
# Ensure error message and contents of user code don't leak (e.g. hunter2 text or function name)
155+
assert not any("hunter2" in str(event).lower() for event in result.all_events), [
156+
str(event) for event in result.all_events if "hunter2" in str(event)
157+
]
158+
51159
step_error = next(event for event in result.all_events if event.is_step_failure)
52-
assert (
53-
step_error.step_failure_data.error
54-
and step_error.step_failure_data.error.cls_name == "DagsterRedactedUserCodeError"
55-
)
56160

161+
# Certain exceptions will not be fully redacted, just the stack trace
162+
# For example, system errors and interrupts may contain useful information
163+
# or information that the framework itself relies on
164+
if expect_exc_name_in_error:
165+
assert (
166+
step_error.step_failure_data.error
167+
and step_error.step_failure_data.error.cls_name == exc_name
168+
)
169+
else:
170+
assert (
171+
step_error.step_failure_data.error
172+
and step_error.step_failure_data.error.cls_name == "DagsterRedactedUserCodeError"
173+
)
174+
175+
# Ensures we can match the error ID in the Dagster+ UI surfaced message to the rich error message
176+
# in logs which includes the redacted error message
177+
assert "Search in logs for this error ID for more details" in str(step_error)
178+
error_id = re.search(ERROR_ID_REGEX, str(step_error)).group(1) # type: ignore
57179

58-
ERROR_ID_REGEX = r"Error occurred during user code execution, error ID ([a-z0-9\-]+)"
180+
assert f"Error occurred during user code execution, error ID {error_id}" in caplog.text
181+
assert "hunter2" in caplog.text
59182

60183

61184
def test_masking_sensor_execution(instance, enable_masking_user_code_errors, capsys) -> None:

0 commit comments

Comments
 (0)