Skip to content

Commit adcbc81

Browse files
wbarnhadoncat99
andauthored
Pull in hopping window table fix (#412)
* robinhood issues 514 * add hopping example * fix: 'Nonetype' object is not iterable problem * lint * remove unused var * add mock ranges to table tests * Pull in changes by @thomas-chauvet * save the popped value as a backup for now * cleanup and add more tests * test for ranges when full and empty * add linting * remove MagicMock import --------- Co-authored-by: Don Wong <[email protected]>
1 parent ed85356 commit adcbc81

File tree

2 files changed

+187
-3
lines changed

2 files changed

+187
-3
lines changed

faust/tables/base.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,13 +382,40 @@ async def _del_old_keys(self) -> None:
382382
for partition, timestamps in self._partition_timestamps.items():
383383
while timestamps and window.stale(timestamps[0], time.time()):
384384
timestamp = heappop(timestamps)
385+
triggered_windows = [
386+
self._partition_timestamp_keys.get(
387+
(partition, window_range)
388+
) # noqa
389+
for window_range in self._window_ranges(timestamp)
390+
]
385391
keys_to_remove = self._partition_timestamp_keys.pop(
386392
(partition, timestamp), None
387393
)
394+
window_data = {}
388395
if keys_to_remove:
389-
for key in keys_to_remove:
390-
value = self.data.pop(key, None)
391-
await self.on_window_close(key, value)
396+
for windows in triggered_windows:
397+
if windows:
398+
for processed_window in windows:
399+
# we use set to avoid duplicate element in window's data
400+
# window[0] is the window's key
401+
# it is not related to window's timestamp
402+
# windows are in format:
403+
# (key, (window_start, window_end))
404+
window_data.setdefault(processed_window[0], []).extend(
405+
self.data.get(processed_window, [])
406+
)
407+
408+
for key_to_remove in keys_to_remove:
409+
value = self.data.pop(key_to_remove, None)
410+
if key_to_remove[1][0] > self.last_closed_window:
411+
await self.on_window_close(
412+
key_to_remove,
413+
(
414+
window_data[key_to_remove[0]]
415+
if key_to_remove[0] in window_data
416+
else value
417+
),
418+
)
392419
self.last_closed_window = max(
393420
self.last_closed_window,
394421
max(key[1][0] for key in keys_to_remove),

tests/unit/tables/test_base.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,45 @@ async def test_last_closed_window(self, *, table):
191191
assert table.last_closed_window == 0.0
192192

193193
table.window = Mock(name="window")
194+
self.mock_no_ranges(table)
195+
table._data = {
196+
("boo", (1.1, 1.4)): "BOO",
197+
("moo", (1.4, 1.6)): "MOO",
198+
("faa", (1.9, 2.0)): "FAA",
199+
("bar", (4.1, 4.2)): "BAR",
200+
}
201+
table._partition_timestamps = {
202+
TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
203+
}
204+
table._partition_timestamp_keys = {
205+
(TP1, 2.0): [
206+
("boo", (1.1, 1.4)),
207+
("moo", (1.4, 1.6)),
208+
("faa", (1.9, 2.0)),
209+
],
210+
(TP1, 5.0): [
211+
("bar", (4.1, 4.2)),
212+
],
213+
}
214+
215+
def get_stale(limit):
216+
def is_stale(timestamp, latest_timestamp):
217+
return timestamp < limit
218+
219+
return is_stale
220+
221+
table.window.stale.side_effect = get_stale(4.0)
222+
223+
await table._del_old_keys()
224+
225+
assert table.last_closed_window == 1.9
226+
227+
@pytest.mark.asyncio
228+
async def test_last_closed_window__mock_ranges(self, *, table):
229+
assert table.last_closed_window == 0.0
230+
231+
table.window = Mock(name="window")
232+
self.mock_ranges(table)
194233
table._data = {
195234
("boo", (1.1, 1.4)): "BOO",
196235
("moo", (1.4, 1.6)): "MOO",
@@ -233,6 +272,64 @@ async def test_del_old_keys(self, *, table):
233272
on_window_close = table._on_window_close = AsyncMock(name="on_window_close")
234273

235274
table.window = Mock(name="window")
275+
self.mock_no_ranges(table)
276+
table._data = {
277+
("boo", (1.1, 1.4)): "BOO",
278+
("moo", (1.4, 1.6)): "MOO",
279+
("faa", (1.9, 2.0)): "FAA",
280+
("bar", (4.1, 4.2)): "BAR",
281+
}
282+
table._partition_timestamps = {
283+
TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
284+
}
285+
table._partition_timestamp_keys = {
286+
(TP1, 2.0): [
287+
("boo", (1.1, 1.4)),
288+
("moo", (1.4, 1.6)),
289+
("faa", (1.9, 2.0)),
290+
],
291+
(TP1, 5.0): [
292+
("bar", (4.1, 4.2)),
293+
],
294+
}
295+
296+
def get_stale(limit):
297+
def is_stale(timestamp, latest_timestamp):
298+
return timestamp < limit
299+
300+
return is_stale
301+
302+
table.window.stale.side_effect = get_stale(4.0)
303+
304+
await table._del_old_keys()
305+
306+
assert table._partition_timestamps[TP1] == [4.0, 5.0, 6.0, 7.0]
307+
assert table.data == {("bar", (4.1, 4.2)): "BAR"}
308+
309+
on_window_close.assert_has_calls(
310+
[
311+
call.__bool__(),
312+
call(("boo", (1.1, 1.4)), "BOO"),
313+
call.__bool__(),
314+
call(("moo", (1.4, 1.6)), "MOO"),
315+
call.__bool__(),
316+
call(("faa", (1.9, 2.0)), "FAA"),
317+
]
318+
)
319+
320+
table.last_closed_window = 8.0
321+
table.window.stale.side_effect = get_stale(6.0)
322+
323+
await table._del_old_keys()
324+
325+
assert not table.data
326+
327+
@pytest.mark.asyncio
328+
async def test_del_old_keys__mock_ranges(self, *, table):
329+
on_window_close = table._on_window_close = AsyncMock(name="on_window_close")
330+
331+
table.window = Mock(name="window")
332+
self.mock_ranges(table)
236333
table._data = {
237334
("boo", (1.1, 1.4)): "BOO",
238335
("moo", (1.4, 1.6)): "MOO",
@@ -289,6 +386,61 @@ async def test_del_old_keys_non_async_cb(self, *, table):
289386
on_window_close = table._on_window_close = Mock(name="on_window_close")
290387

291388
table.window = Mock(name="window")
389+
self.mock_no_ranges(table)
390+
table._data = {
391+
("boo", (1.1, 1.4)): "BOO",
392+
("moo", (1.4, 1.6)): "MOO",
393+
("faa", (1.9, 2.0)): "FAA",
394+
("bar", (4.1, 4.2)): "BAR",
395+
}
396+
table._partition_timestamps = {
397+
TP1: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0],
398+
}
399+
table._partition_timestamp_keys = {
400+
(TP1, 2.0): [
401+
("boo", (1.1, 1.4)),
402+
("moo", (1.4, 1.6)),
403+
("faa", (1.9, 2.0)),
404+
],
405+
(TP1, 5.0): [
406+
("bar", (4.1, 4.2)),
407+
],
408+
}
409+
410+
def get_stale(limit):
411+
def is_stale(timestamp, latest_timestamp):
412+
return timestamp < limit
413+
414+
return is_stale
415+
416+
table.window.stale.side_effect = get_stale(4.0)
417+
418+
await table._del_old_keys()
419+
420+
assert table._partition_timestamps[TP1] == [4.0, 5.0, 6.0, 7.0]
421+
assert table.data == {("bar", (4.1, 4.2)): "BAR"}
422+
423+
on_window_close.assert_has_calls(
424+
[
425+
call(("boo", (1.1, 1.4)), "BOO"),
426+
call(("moo", (1.4, 1.6)), "MOO"),
427+
call(("faa", (1.9, 2.0)), "FAA"),
428+
]
429+
)
430+
431+
table.last_closed_window = 8.0
432+
table.window.stale.side_effect = get_stale(6.0)
433+
434+
await table._del_old_keys()
435+
436+
assert not table.data
437+
438+
@pytest.mark.asyncio
439+
async def test_del_old_keys_non_async_cb__mock_ranges(self, *, table):
440+
on_window_close = table._on_window_close = Mock(name="on_window_close")
441+
442+
table.window = Mock(name="window")
443+
self.mock_ranges(table)
292444
table._data = {
293445
("boo", (1.1, 1.4)): "BOO",
294446
("moo", (1.4, 1.6)): "MOO",
@@ -527,6 +679,11 @@ def mock_ranges(self, table, ranges=[1.1, 1.2, 1.3]): # noqa
527679
table._window_ranges.return_value = ranges
528680
return ranges
529681

682+
def mock_no_ranges(self, table, ranges=[]): # noqa
683+
table._window_ranges = Mock(name="_window_ranges")
684+
table._window_ranges.return_value = ranges
685+
return ranges
686+
530687
def test_relative_now(self, *, table):
531688
event = Mock(name="event", autospec=Event)
532689
table._partition_latest_timestamp[event.message.partition] = 30.3

0 commit comments

Comments
 (0)