Skip to content

Commit 66dc4af

Browse files
committed
feat: disable event batching by default
Signed-off-by: paxbit <[email protected]>
1 parent fa12e3e commit 66dc4af

File tree

3 files changed

+82
-14
lines changed

3 files changed

+82
-14
lines changed

kopf/_cogs/configs/configuration.py

+16-4
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,22 @@ class BatchingSettings:
204204
How soon an idle worker is exited and garbage-collected if no events arrive.
205205
"""
206206

207-
batch_window: float = 0.1
208-
"""
209-
How fast/slow does a worker deplete the queue when an event is received.
210-
All events arriving within this window will be ignored except the last one.
207+
batch_window: float = None
208+
"""
209+
A lossy but in some cases viable optimization mechanism to take some
210+
load off of event processing.
211+
Defines the debouncing interval a worker depletes the incoming event queue with.
212+
All events arriving within this window will be ignored except the last one. This can have
213+
consequences depending on the type and use-case of the operator implemented using kopf.
214+
Example: Given an event sequence of resource A of ``CREATED > MODIFIED > DELETED``, if this
215+
sequence is received by the incoming event queue within ``batch_window`` any handlers dealing
216+
with ``CREATED`` or ``MODIFIED`` events of resource A will never be called. Instead the first handler
217+
being called for A will be the ``DELETED`` handler, if one exists. Depending on what your operator
218+
does, this might be problematic.
219+
220+
If your use-case allows debouncing of event sequences for a resource, set this to ``> 0``.
221+
A notable side effect of doing this is an implicit handler delay for all handlers as all events
222+
will only be dispatched to the handler after this interval.
211223
"""
212224

213225
exit_timeout: float = 2.0

kopf/_core/reactor/queueing.py

+11-10
Original file line numberDiff line numberDiff line change
@@ -304,16 +304,17 @@ async def worker(
304304
else:
305305
continue
306306
else:
307-
try:
308-
while True:
309-
prev_event = raw_event
310-
next_event = await asyncio.wait_for(
311-
backlog.get(),
312-
timeout=settings.batching.batch_window)
313-
shouldstop = shouldstop or isinstance(next_event, EOS)
314-
raw_event = prev_event if isinstance(next_event, EOS) else next_event
315-
except asyncio.TimeoutError:
316-
pass
307+
if settings.batching.batch_window is not None:
308+
try:
309+
while True:
310+
prev_event = raw_event
311+
next_event = await asyncio.wait_for(
312+
backlog.get(),
313+
timeout=settings.batching.batch_window)
314+
shouldstop = shouldstop or isinstance(next_event, EOS)
315+
raw_event = prev_event if isinstance(next_event, EOS) else next_event
316+
except asyncio.TimeoutError:
317+
pass
317318

318319
# Exit gracefully and immediately on the end-of-stream marker sent by the watcher.
319320
if isinstance(raw_event, EOS):

tests/reactor/test_queueing.py

+55
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,61 @@ async def test_watchevent_batching(settings, resource, processor, timer,
164164
assert actual_uid_val_pairs == expected_uid_val_pairs
165165

166166

167+
@pytest.mark.parametrize('uids, vals, events', [
168+
169+
pytest.param(['uid1', 'uid1'], ['a', 'b'], [
170+
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
171+
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'b'}},
172+
], id='the same'),
173+
174+
pytest.param(['uid1', 'uid2'], ['a', 'b'], [
175+
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
176+
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'b'}},
177+
], id='distinct'),
178+
179+
pytest.param(['uid1', 'uid2', 'uid1', 'uid2', 'uid1', 'uid3'], ['a', 'b', 'c', 'd', 'e', 'f'], [
180+
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
181+
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'b'}},
182+
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'c'}},
183+
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'd'}},
184+
{'type': 'DELETED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'e'}},
185+
{'type': 'DELETED', 'object': {'metadata': {'uid': 'uid3'}, 'spec': 'f'}},
186+
], id='mixed'),
187+
188+
])
189+
@pytest.mark.usefixtures('watcher_limited')
190+
async def test_watchevent_none_batching(settings, resource, processor, timer,
191+
stream, events, uids, vals, event_loop):
192+
""" Verify that all stream events are handled if batching is disabled. """
193+
194+
# Override the default timeouts to make the tests faster.
195+
settings.batching.idle_timeout = 100 # should not be involved, fail if it is
196+
settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't
197+
settings.batching.batch_window = None # disable batching entirely
198+
199+
# Inject the events of unique objects - to produce few streams/workers.
200+
stream.feed(events)
201+
stream.close()
202+
203+
# Run the watcher (near-instantly and test-blocking).
204+
with timer:
205+
await watcher(
206+
namespace=None,
207+
resource=resource,
208+
settings=settings,
209+
processor=processor,
210+
)
211+
212+
# Was the processor called exactly once for each stream event?
213+
assert processor.await_count == len(events)
214+
215+
expected_uid_val_pairs = set(zip(uids, vals))
216+
actual_uid_val_pairs = set((
217+
kwargs['raw_event']['object']['metadata']['uid'],
218+
kwargs['raw_event']['object']['spec'])
219+
for args, kwargs in processor.call_args_list)
220+
assert actual_uid_val_pairs == expected_uid_val_pairs
221+
167222
@pytest.mark.parametrize('unique, events', [
168223
169224
pytest.param(1, [

0 commit comments

Comments
 (0)