diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py index 10a358d094c7..48ee0d01d50d 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py @@ -1,7 +1,6 @@ import asyncio from typing import Dict, List, Optional -import aiohttp import kubernetes_asyncio import kubernetes_asyncio.watch from kubernetes_asyncio.client import ApiClient, V1Pod @@ -82,7 +81,6 @@ async def _replicate_pod_events(self): namespace=self._namespace, label_selector=f"job-name={self._job_name}", timeout_seconds=self._timeout_seconds, - _request_timeout=aiohttp.ClientTimeout(), ): phase = event["object"].status.phase diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py index 2f83394816cc..69157e39465d 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py @@ -118,7 +118,6 @@ Union, ) -import aiohttp import anyio.abc import kubernetes_asyncio from jsonpatch import JsonPatch @@ -866,7 +865,6 @@ async def _stream_job_logs( configuration: KubernetesWorkerJobConfiguration, client, ): - timeout = aiohttp.ClientTimeout(total=None) core_client = CoreV1Api(client) logs = await core_client.read_namespaced_pod_log( @@ -875,7 +873,6 @@ async def _stream_job_logs( follow=True, _preload_content=False, container="prefect-job", - _request_timeout=timeout, ) try: async for line in logs.content: @@ -916,7 +913,6 @@ async def _job_events( func=batch_client.list_namespaced_job, namespace=namespace, field_selector=f"metadata.name={job_name}", - _request_timeout=aiohttp.ClientTimeout(), **watch_kwargs, ): yield event @@ -925,7 +921,6 @@ async def _job_events( job_list = await batch_client.list_namespaced_job( namespace=namespace, field_selector=f"metadata.name={job_name}", - _request_timeout=aiohttp.ClientTimeout(), ) resource_version = job_list.metadata.resource_version @@ -1129,7 +1124,6 @@ async def _get_job_pod( namespace=configuration.namespace, label_selector=f"job-name={job_name}", timeout_seconds=configuration.pod_watch_timeout_seconds, - _request_timeout=aiohttp.ClientTimeout(), ): pod: V1Pod = event["object"] last_pod_name = pod.metadata.name diff --git a/src/integrations/prefect-kubernetes/pyproject.toml b/src/integrations/prefect-kubernetes/pyproject.toml index e6ac3b037af4..76e814940e87 100644 --- a/src/integrations/prefect-kubernetes/pyproject.toml +++ b/src/integrations/prefect-kubernetes/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] dependencies = [ "prefect>=3.1.1", - "kubernetes-asyncio>=29.0.0", + "kubernetes-asyncio>=32.0.0", "tenacity>=8.2.3", "exceptiongroup", "pyopenssl>=24.1.0", diff --git a/src/integrations/prefect-kubernetes/tests/test_worker.py b/src/integrations/prefect-kubernetes/tests/test_worker.py index 04deb4ce2c5a..0461b9e12aed 100644 --- a/src/integrations/prefect-kubernetes/tests/test_worker.py +++ b/src/integrations/prefect-kubernetes/tests/test_worker.py @@ -5,7 +5,7 @@ from contextlib import asynccontextmanager from time import monotonic, sleep from unittest import mock -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock, MagicMock, patch import anyio import anyio.abc @@ -2409,7 +2409,6 @@ async def mock_stream(*args, **kwargs): func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector=mock.ANY, - _request_timeout=mock.ANY, ) if job_timeout is not None: @@ -2431,7 +2430,6 @@ async def mock_stream(*args, **kwargs): namespace=mock.ANY, label_selector=mock.ANY, timeout_seconds=42, - _request_timeout=mock.ANY, ), mock.call(**expected_job_call_kwargs), ] @@ -2475,13 +2473,11 @@ async def mock_stream(*args, **kwargs): namespace=mock.ANY, label_selector=mock.ANY, timeout_seconds=mock.ANY, - _request_timeout=mock.ANY, ), mock.call( func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector=mock.ANY, - _request_timeout=mock.ANY, # Note: timeout_seconds is excluded here ), ] @@ -2522,13 +2518,11 @@ async def mock_stream(*args, **kwargs): namespace="my-awesome-flows", label_selector=mock.ANY, timeout_seconds=60, - _request_timeout=mock.ANY, ), mock.call( func=mock_batch_client.return_value.list_namespaced_job, namespace="my-awesome-flows", field_selector=mock.ANY, - _request_timeout=mock.ANY, ), ] ) @@ -2665,7 +2659,6 @@ async def mock_log_stream(*args, **kwargs): namespace=mock.ANY, label_selector=mock.ANY, timeout_seconds=mock.ANY, - _request_timeout=mock.ANY, ), # Starts with the full timeout minus the amount we slept streaming logs mock.call( @@ -2673,7 +2666,6 @@ async def mock_log_stream(*args, **kwargs): field_selector=mock.ANY, namespace=mock.ANY, timeout_seconds=pytest.approx(50, 1), - _request_timeout=mock.ANY, ), ] ) @@ -2892,14 +2884,12 @@ async def mock_stream(*args, **kwargs): func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector="metadata.name=mock-job", - _request_timeout=mock.ANY, ), mock.call( func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector="metadata.name=mock-job", resource_version="1", - _request_timeout=mock.ANY, ), ] ) @@ -2942,6 +2932,50 @@ async def mock_stream(*args, **kwargs): assert result.status_code == 0 + async def test_watch_no_timeout_after_five_minutes_without_data( + self, + flow_run, + default_configuration, + mock_core_client, + mock_watch, + mock_batch_client, + mock_pod, + caplog: pytest.LogCaptureFixture, + ): + """ + Regressio test for https://github.com/PrefectHQ/prefect/issues/16210 + """ + # The job should not be completed to start + mock_batch_client.return_value.read_namespaced_job.return_value.status.completion_time = None + + async def mock_stream(*args, **kwargs): + if kwargs["func"] == mock_core_client.return_value.list_namespaced_pod: + yield {"object": mock_pod, "type": "MODIFIED"} + + if kwargs["func"] == mock_batch_client.return_value.list_namespaced_job: + job = MagicMock(spec=kubernetes_asyncio.client.V1Job) + job.status.completion_time = None + job.status.failed = 0 + job.spec.backoff_limit = 6 + + # First event + yield {"object": job, "type": "ADDED"} + + # Simulate 5 minutes passing + with patch("anyio.sleep", return_value=None): + await anyio.sleep(310) + + # Send another event after the delay + job.status.completion_time = pendulum.now("utc").timestamp() + yield {"object": job, "type": "MODIFIED"} + + mock_watch.return_value.stream = mock.Mock(side_effect=mock_stream) + + async with KubernetesWorker(work_pool_name="test") as k8s_worker: + result = await k8s_worker.run(flow_run, default_configuration) + assert "Error occurred while streaming logs" not in caplog.text + assert result.status_code == 0 + @pytest.fixture async def mock_events(self, mock_core_client): mock_core_client.return_value.list_namespaced_event.return_value = (