Skip to content

Commit 66657cd

Browse files
Add deployment step lifecycle logs and error resolution hints (#21020)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Alex Streed <alex.s@prefect.io> Co-authored-by: alex.s <ajstreed1@gmail.com> Co-authored-by: Alex Streed <desertaxle@users.noreply.github.com>
1 parent fe48ce4 commit 66657cd

File tree

6 files changed

+337
-11
lines changed

6 files changed

+337
-11
lines changed

src/prefect/deployments/steps/core.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,10 @@ async def run_steps(
157157
continue
158158
fqn, inputs = _get_step_fully_qualified_name_and_inputs(step)
159159
step_name = fqn.split(".")[-1]
160-
print_function(f" > Running {step_name} step...")
160+
if logger:
161+
logger.info("Executing deployment step: %s", step_name)
162+
else:
163+
print_function(f" > Running {step_name} step...")
161164

162165
# SECURITY: Serialize inputs BEFORE running the step (and thus before templating).
163166
# This ensures that the event payload contains template strings like
@@ -211,6 +214,9 @@ async def run_steps(
211214
upstream_outputs[inputs.get("id")] = step_output
212215
upstream_outputs.update(step_output)
213216

217+
if logger:
218+
logger.info("Deployment step '%s' completed successfully", step_name)
219+
214220
# Emit success event for this step
215221
await _emit_pull_step_event(
216222
serialized_step,
@@ -230,6 +236,8 @@ async def run_steps(
230236
)
231237
raise StepExecutionError(f"Encountered error while running {fqn}") from exc
232238

239+
if logger:
240+
logger.info("All deployment steps completed successfully")
233241
return upstream_outputs
234242

235243

src/prefect/deployments/steps/pull.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,11 @@ async def pull_with_block(
281281
try:
282282
block = await Block.aload(full_slug)
283283
except Exception:
284-
deployment_logger.exception("Unable to load block '%s'", full_slug)
284+
deployment_logger.exception(
285+
"Failed to load storage block with slug %s."
286+
" Verify the block exists and you have access to it.",
287+
full_slug,
288+
)
285289
raise
286290

287291
try:

src/prefect/flows.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3081,9 +3081,7 @@ async def load_flow_from_flow_run(
30813081
await storage_block.get_directory(from_path=from_path, local_path=".")
30823082

30833083
if deployment.pull_steps:
3084-
run_logger.debug(
3085-
f"Running {len(deployment.pull_steps)} deployment pull step(s)"
3086-
)
3084+
run_logger.info(f"Running {len(deployment.pull_steps)} deployment pull step(s)")
30873085

30883086
from prefect.deployments.steps.core import StepExecutionError, run_steps
30893087

src/prefect/runner/storage.py

Lines changed: 104 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -459,10 +459,15 @@ async def _clone_repo(self):
459459
if self._credentials or parsed_url.password or parsed_url.username
460460
else exc
461461
)
462-
raise RuntimeError(
463-
f"Failed to clone repository {strip_auth_from_url(self._url)!r} with exit code"
462+
safe_url = strip_auth_from_url(self._url)
463+
error_message = (
464+
f"Failed to clone repository {safe_url!r} with exit code"
464465
f" {exc.returncode}."
465-
) from exc_chain
466+
)
467+
hint = _get_git_clone_error_hint(exc)
468+
if hint:
469+
error_message += f" {hint}"
470+
raise RuntimeError(error_message) from exc_chain
466471

467472
if self._commit_sha:
468473
# Fetch the commit
@@ -699,10 +704,14 @@ async def pull_code(self) -> None:
699704
)
700705
)
701706
except Exception as exc:
702-
raise RuntimeError(
707+
error_message = (
703708
f"Failed to pull contents from remote storage {self._url!r} to"
704709
f" {self.destination!r}"
705-
) from exc
710+
)
711+
hint = _get_remote_storage_error_hint(exc)
712+
if hint:
713+
error_message += f". {hint}"
714+
raise RuntimeError(error_message) from exc
706715

707716
def to_pull_step(self) -> dict[str, Any]:
708717
"""
@@ -898,6 +907,96 @@ def create_storage_from_source(
898907
return LocalStorage(path=source, pull_interval=pull_interval)
899908

900909

910+
# Pattern-based error hints for git clone failures
911+
_GIT_CLONE_ERROR_HINTS: list[tuple[str, str]] = [
912+
(
913+
"Authentication failed",
914+
"Hint: Check that your credentials or access token are correct and not expired.",
915+
),
916+
(
917+
"repository not found",
918+
"Hint: Verify the repository URL and that you have access to it.",
919+
),
920+
(
921+
"Could not resolve host",
922+
"Hint: Check your network connectivity and DNS settings.",
923+
),
924+
(
925+
"Connection refused",
926+
"Hint: Check your network connectivity and DNS settings.",
927+
),
928+
(
929+
"Permission denied",
930+
"Hint: Check your SSH key or token permissions.",
931+
),
932+
(
933+
"destination path",
934+
"Hint: A stale working directory may exist. Consider removing it and retrying.",
935+
),
936+
(
937+
"not found",
938+
"Hint: Verify the repository URL and that you have access to it.",
939+
),
940+
]
941+
942+
943+
def _get_git_clone_error_hint(exc: subprocess.CalledProcessError) -> str | None:
944+
"""Extract a resolution hint from a git clone CalledProcessError's stderr."""
945+
stderr = ""
946+
if exc.stderr:
947+
stderr = (
948+
exc.stderr.decode("utf-8", errors="replace")
949+
if isinstance(exc.stderr, bytes)
950+
else str(exc.stderr)
951+
)
952+
for pattern, hint in _GIT_CLONE_ERROR_HINTS:
953+
if pattern.lower() in stderr.lower():
954+
return hint
955+
return None
956+
957+
958+
# Pattern-based error hints for remote storage failures
959+
_REMOTE_STORAGE_ERROR_HINTS: list[tuple[str, str]] = [
960+
(
961+
"NoSuchBucket",
962+
"Hint: Verify the bucket name and region.",
963+
),
964+
(
965+
"AccessDenied",
966+
"Hint: Check your storage permissions and credentials.",
967+
),
968+
(
969+
"403",
970+
"Hint: Check your storage permissions and credentials.",
971+
),
972+
(
973+
"NoSuchKey",
974+
"Hint: Verify the storage path exists.",
975+
),
976+
(
977+
"ConnectionError",
978+
"Hint: Check network connectivity and the storage endpoint URL.",
979+
),
980+
(
981+
"EndpointConnectionError",
982+
"Hint: Check network connectivity and the storage endpoint URL.",
983+
),
984+
(
985+
"ConnectionRefusedError",
986+
"Hint: Check network connectivity and the storage endpoint URL.",
987+
),
988+
]
989+
990+
991+
def _get_remote_storage_error_hint(exc: Exception) -> str | None:
992+
"""Extract a resolution hint from a remote storage exception."""
993+
error_str = str(exc).lower()
994+
for pattern, hint in _REMOTE_STORAGE_ERROR_HINTS:
995+
if pattern.lower() in error_str:
996+
return hint
997+
return None
998+
999+
9011000
def _format_token_from_credentials(
9021001
netloc: str,
9031002
credentials: dict[str, Any] | GitCredentials,

tests/deployment/test_steps.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,90 @@ def func(*args, **kwargs):
720720
await run_steps(steps, {}, print_function=mock_print)
721721
mock_print.assert_any_call("this is a warning")
722722

723+
async def test_run_steps_uses_print_function_without_logger(self):
724+
"""Test that run_steps uses print_function (not logger) when no logger
725+
is provided, and does not emit lifecycle log messages."""
726+
mock_print = MagicMock()
727+
728+
steps = [
729+
{
730+
"prefect.deployments.steps.run_shell_script": {
731+
"script": "echo 'hello'",
732+
}
733+
},
734+
]
735+
736+
await run_steps(steps, {}, print_function=mock_print)
737+
738+
# print_function should have been called with the step name
739+
mock_print.assert_any_call(" > Running run_shell_script step...")
740+
741+
async def test_run_steps_logs_lifecycle_with_provided_logger(self):
742+
"""Test that run_steps uses a provided logger for lifecycle messages."""
743+
mock_logger = MagicMock()
744+
745+
steps = [
746+
{
747+
"prefect.deployments.steps.run_shell_script": {
748+
"script": "echo 'test'",
749+
}
750+
},
751+
]
752+
753+
await run_steps(steps, {}, logger=mock_logger)
754+
755+
# Verify the provided logger was used for lifecycle messages
756+
start_logged = any(
757+
"Executing deployment step: %s" in str(c)
758+
for c in mock_logger.info.call_args_list
759+
)
760+
complete_logged = any(
761+
"Deployment step '%s' completed successfully" in str(c)
762+
for c in mock_logger.info.call_args_list
763+
)
764+
all_complete_logged = any(
765+
"All deployment steps completed successfully" in str(c)
766+
for c in mock_logger.info.call_args_list
767+
)
768+
769+
assert start_logged, (
770+
f"Expected start log, got: {mock_logger.info.call_args_list}"
771+
)
772+
assert complete_logged, (
773+
f"Expected complete log, got: {mock_logger.info.call_args_list}"
774+
)
775+
assert all_complete_logged, (
776+
f"Expected all-complete log, got: {mock_logger.info.call_args_list}"
777+
)
778+
779+
async def test_run_steps_no_complete_log_on_failure(self, monkeypatch):
780+
"""Test that the all-complete log is NOT emitted when a step fails."""
781+
mock_logger = MagicMock()
782+
783+
def failing_step(**kwargs):
784+
raise RuntimeError("boom")
785+
786+
monkeypatch.setattr(
787+
"prefect.deployments.steps.run_shell_script",
788+
failing_step,
789+
)
790+
791+
steps = [
792+
{
793+
"prefect.deployments.steps.run_shell_script": {
794+
"script": "echo 'test'",
795+
}
796+
},
797+
]
798+
799+
with pytest.raises(StepExecutionError):
800+
await run_steps(steps, {}, logger=mock_logger)
801+
802+
info_calls = [str(c) for c in mock_logger.info.call_args_list]
803+
assert not any(
804+
"All deployment steps completed successfully" in c for c in info_calls
805+
)
806+
723807

724808
class MockCredentials:
725809
def __init__(
@@ -1535,7 +1619,8 @@ async def test_block_not_found(self, caplog):
15351619
}
15361620
)
15371621

1538-
assert "Unable to load block 'in-the/wind'" in caplog.text
1622+
assert "Failed to load storage block with slug in-the/wind" in caplog.text
1623+
assert "Verify the block exists and you have access to it" in caplog.text
15391624

15401625
async def test_incorrect_type_of_block(self, caplog):
15411626
"""

0 commit comments

Comments
 (0)