Skip to content

feat: batch window suppression #829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions kopf/_cogs/configs/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,22 @@ class BatchingSettings:
How soon an idle worker is exited and garbage-collected if no events arrive.
"""

batch_window: float = 0.1
"""
How fast/slow does a worker deplete the queue when an event is received.
All events arriving within this window will be ignored except the last one.
batch_window: float = None
"""
A lossy but in some cases viable optimization mechanism to take some
load off of event processing.
Defines the debouncing interval a worker depletes the incoming event queue with.
All events arriving within this window will be ignored except the last one. This can have
consequences depending on the type and use-case of the operator implemented using kopf.
Example: Given an event sequence of resource A of ``CREATED > MODIFIED > DELETED``, if this
sequence is received by the incoming event queue within ``batch_window`` any handlers dealing
with ``CREATED`` or ``MODIFIED`` events of resource A will never be called. Instead the first handler
being called for A will be the ``DELETED`` handler, if one exists. Depending on what your operator
does, this might be problematic.

If your use-case allows debouncing of event sequences for a resource, set this to ``> 0``.
A notable side effect of doing this is an implicit handler delay for all handlers as all events
will only be dispatched to the handler after this interval.
"""

exit_timeout: float = 2.0
Expand Down
21 changes: 11 additions & 10 deletions kopf/_core/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,16 +304,17 @@ async def worker(
else:
continue
else:
try:
while True:
prev_event = raw_event
next_event = await asyncio.wait_for(
backlog.get(),
timeout=settings.batching.batch_window)
shouldstop = shouldstop or isinstance(next_event, EOS)
raw_event = prev_event if isinstance(next_event, EOS) else next_event
except asyncio.TimeoutError:
pass
if settings.batching.batch_window is not None:
try:
while True:
prev_event = raw_event
next_event = await asyncio.wait_for(
backlog.get(),
timeout=settings.batching.batch_window)
shouldstop = shouldstop or isinstance(next_event, EOS)
raw_event = prev_event if isinstance(next_event, EOS) else next_event
except asyncio.TimeoutError:
pass

# Exit gracefully and immediately on the end-of-stream marker sent by the watcher.
if isinstance(raw_event, EOS):
Expand Down
55 changes: 55 additions & 0 deletions tests/reactor/test_queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,61 @@ async def test_watchevent_batching(settings, resource, processor, timer,
assert actual_uid_val_pairs == expected_uid_val_pairs


@pytest.mark.parametrize('uids, vals, events', [

pytest.param(['uid1', 'uid1'], ['a', 'b'], [
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'b'}},
], id='the same'),

pytest.param(['uid1', 'uid2'], ['a', 'b'], [
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'b'}},
], id='distinct'),

pytest.param(['uid1', 'uid2', 'uid1', 'uid2', 'uid1', 'uid3'], ['a', 'b', 'c', 'd', 'e', 'f'], [
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'a'}},
{'type': 'ADDED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'b'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'c'}},
{'type': 'MODIFIED', 'object': {'metadata': {'uid': 'uid2'}, 'spec': 'd'}},
{'type': 'DELETED', 'object': {'metadata': {'uid': 'uid1'}, 'spec': 'e'}},
{'type': 'DELETED', 'object': {'metadata': {'uid': 'uid3'}, 'spec': 'f'}},
], id='mixed'),

])
@pytest.mark.usefixtures('watcher_limited')
async def test_watchevent_none_batching(settings, resource, processor, timer,
stream, events, uids, vals, event_loop):
""" Verify that all stream events are handled if batching is disabled. """

# Override the default timeouts to make the tests faster.
settings.batching.idle_timeout = 100 # should not be involved, fail if it is
settings.batching.exit_timeout = 100 # should exit instantly, fail if it didn't
settings.batching.batch_window = None # disable batching entirely

# Inject the events of unique objects - to produce few streams/workers.
stream.feed(events)
stream.close()

# Run the watcher (near-instantly and test-blocking).
with timer:
await watcher(
namespace=None,
resource=resource,
settings=settings,
processor=processor,
)

# Was the processor called exactly once for each stream event?
assert processor.await_count == len(events)

expected_uid_val_pairs = set(zip(uids, vals))
actual_uid_val_pairs = set((
kwargs['raw_event']['object']['metadata']['uid'],
kwargs['raw_event']['object']['spec'])
for args, kwargs in processor.call_args_list)
assert actual_uid_val_pairs == expected_uid_val_pairs

@pytest.mark.parametrize('unique, events', [

pytest.param(1, [
Expand Down