Skip to content

Commit fa12e3e

Browse files
authored
Merge pull request #821 from philipp-sontag-by/handle-list-objs-errors
Handle connection errors during list_obj calls gracefully
2 parents 1f8e385 + 035bd07 commit fa12e3e

File tree

2 files changed

+36
-8
lines changed

2 files changed

+36
-8
lines changed

kopf/_cogs/clients/watching.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -155,14 +155,18 @@ async def continuous_watch(
155155

156156
# First, list the resources regularly, and get the list's resource version.
157157
# Simulate the events with type "None" event - used in detection of causes.
158-
objs, resource_version = await fetching.list_objs(
159-
logger=logger,
160-
settings=settings,
161-
resource=resource,
162-
namespace=namespace,
163-
)
164-
for obj in objs:
165-
yield {'type': None, 'object': obj}
158+
try:
159+
objs, resource_version = await fetching.list_objs(
160+
logger=logger,
161+
settings=settings,
162+
resource=resource,
163+
namespace=namespace,
164+
)
165+
for obj in objs:
166+
yield {'type': None, 'object': obj}
167+
168+
except (aiohttp.ClientConnectionError, aiohttp.ClientPayloadError, asyncio.TimeoutError):
169+
return
166170

167171
# Notify the watcher that the initial listing is over, even if there was nothing yielded.
168172
yield Bookmark.LISTED

tests/k8s/test_watching_continuously.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import asyncio
1212
import logging
1313

14+
import aiohttp
1415
import pytest
1516

1617
from kopf._cogs.clients.watching import Bookmark, WatchingError, continuous_watch
@@ -180,3 +181,26 @@ async def test_long_line_parsing(
180181
assert len(events[1]['object']['spec']['field']) == 1
181182
assert len(events[2]['object']['spec']['field']) == 2 * 1024 * 1024
182183
assert len(events[3]['object']['spec']['field']) == 4 * 1024 * 1024
184+
185+
@pytest.mark.parametrize("connection_error",
186+
[
187+
aiohttp.ClientConnectionError,
188+
aiohttp.ClientPayloadError,
189+
asyncio.TimeoutError
190+
]
191+
)
192+
async def test_list_objs_connection_errors_are_caught(
193+
settings, resource, stream, namespace, enforced_session, mocker, connection_error):
194+
195+
enforced_session.request = mocker.Mock(side_effect=connection_error())
196+
stream.feed([], namespace=namespace)
197+
stream.close(namespace=namespace)
198+
199+
events = []
200+
async for event in continuous_watch(settings=settings,
201+
resource=resource,
202+
namespace=namespace,
203+
operator_pause_waiter=asyncio.Future()):
204+
events.append(event)
205+
206+
assert len(events) == 0

0 commit comments

Comments
 (0)