Skip to content

Commit 994128b

Browse files
committed
Recover from error raised by the offset committer callback
[ML-10538](https://iguazio.atlassian.net/browse/ML-10538)
1 parent 1e7d099 commit 994128b

File tree

2 files changed

+123
-9
lines changed

2 files changed

+123
-9
lines changed

storey/sources.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,9 @@ async def _run_loop(self):
289289
or num_offsets_not_committed > 1
290290
and time.monotonic() >= last_commit_time + self._max_time_before_commit
291291
):
292-
num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer)
292+
num_offsets_not_committed = await _commit_handled_events(
293+
self._outstanding_offsets, committer, self.logger
294+
)
293295
events_handled_since_commit = 0
294296
last_commit_time = time.monotonic()
295297
# Due to the last event not being garbage collected, we tolerate a single unhandled event
@@ -300,7 +302,9 @@ async def _run_loop(self):
300302
break
301303
except queue.Empty:
302304
pass
303-
num_offsets_not_committed = await _commit_handled_events(self._outstanding_offsets, committer)
305+
num_offsets_not_committed = await _commit_handled_events(
306+
self._outstanding_offsets, committer, self.logger
307+
)
304308
events_handled_since_commit = 0
305309
last_commit_time = time.monotonic()
306310
if event is None:
@@ -318,7 +322,7 @@ async def _run_loop(self):
318322
if event is _termination_obj:
319323
# We can commit all at this point because termination of
320324
# all downstream steps completed successfully.
321-
await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True)
325+
await _commit_handled_events(self._outstanding_offsets, committer, self.logger, commit_all=True)
322326
self._termination_future.set_result(termination_result)
323327
except BaseException as ex:
324328
if self.logger:
@@ -510,7 +514,7 @@ async def await_termination(self):
510514
return await self._loop_task
511515

512516

513-
async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, commit_all=False):
517+
async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committer, logger, commit_all=False):
514518
num_offsets_not_handled = 0
515519
if not commit_all:
516520
gc.collect()
@@ -530,7 +534,12 @@ async def _commit_handled_events(outstanding_offsets_by_qualified_shard, committ
530534
num_to_clear += 1
531535
if last_handled_offset is not None:
532536
path, shard_id = qualified_shard
533-
await committer(QualifiedOffset(path, shard_id, last_handled_offset))
537+
try:
538+
await committer(QualifiedOffset(path, shard_id, last_handled_offset))
539+
except BaseException:
540+
if logger:
541+
logger.error(f"Failed to commit offsets due to error: {traceback.format_exc()}")
542+
return num_offsets_not_handled + num_to_clear
534543
outstanding_offsets_by_qualified_shard[qualified_shard] = offsets[num_to_clear:]
535544
return num_offsets_not_handled
536545

@@ -601,7 +610,9 @@ async def _run_loop(self):
601610
or num_offsets_not_handled > 0
602611
and time.monotonic() >= last_commit_time + self._max_time_before_commit
603612
):
604-
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
613+
num_offsets_not_handled = await _commit_handled_events(
614+
self._outstanding_offsets, committer, self.logger
615+
)
605616
events_handled_since_commit = 0
606617
last_commit_time = time.monotonic()
607618
# In case we can't block because there are outstanding events
@@ -611,7 +622,9 @@ async def _run_loop(self):
611622
break
612623
except TimeoutError:
613624
pass
614-
num_offsets_not_handled = await _commit_handled_events(self._outstanding_offsets, committer)
625+
num_offsets_not_handled = await _commit_handled_events(
626+
self._outstanding_offsets, committer, self.logger
627+
)
615628
events_handled_since_commit = 0
616629
last_commit_time = time.monotonic()
617630
if not event:
@@ -629,7 +642,7 @@ async def _run_loop(self):
629642
if event is _termination_obj:
630643
# We can commit all at this point because termination of
631644
# all downstream steps completed successfully.
632-
await _commit_handled_events(self._outstanding_offsets, committer, commit_all=True)
645+
await _commit_handled_events(self._outstanding_offsets, committer, self.logger, commit_all=True)
633646
return termination_result
634647
except BaseException as ex:
635648
if self.logger:

tests/test_flow.py

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,19 @@ async def explicit_ack(self, qualified_offset):
146146
self.offsets[qualified_shard] = offset
147147

148148

149+
class BadCommitter:
150+
def __init__(self):
151+
self.offsets = {}
152+
153+
async def explicit_ack(self, qualified_offset):
154+
raise RuntimeError("Something went wrong")
155+
156+
149157
class CommitterContext:
150-
def __init__(self, platform):
158+
def __init__(self, platform, logger=None, verbose=None):
151159
self.platform = platform
160+
self.logger = logger
161+
self.verbose = verbose
152162

153163

154164
class EventHoarder(storey.Flow):
@@ -441,6 +451,97 @@ def test_offset_not_committed_prematurely():
441451
assert offsets == {("/", i): num_records_per_shard for i in range(num_shards)}
442452

443453

454+
async def async_offset_commit_error():
455+
platform = BadCommitter()
456+
logger = MockLogger()
457+
context = CommitterContext(platform, logger=logger)
458+
459+
controller = build_flow(
460+
[
461+
AsyncEmitSource(context=context, explicit_ack=True, max_wait_before_commit=1),
462+
Map(lambda x: x + 1),
463+
Filter(lambda x: x < 3),
464+
FlatMap(lambda x: [x, x * 10]),
465+
Reduce(0, lambda acc, x: acc + x),
466+
]
467+
).run()
468+
469+
num_shards = 10
470+
num_records_per_shard = 10
471+
472+
for offset in range(1, num_records_per_shard + 1):
473+
for shard in range(num_shards):
474+
event = Event(shard)
475+
event.shard_id = shard
476+
event.offset = offset
477+
await controller.emit(event)
478+
del event
479+
480+
# Make sure that offsets are committed even before termination
481+
await asyncio.sleep(2)
482+
offsets = copy.copy(platform.offsets)
483+
484+
try:
485+
assert offsets == {}
486+
finally:
487+
termination_result = await controller.terminate(wait=True)
488+
489+
assert termination_result == 330
490+
491+
log_level, (log_message,), _ = logger.logs[0]
492+
assert log_level == "error"
493+
assert "Failed to commit offsets due to error" in log_message
494+
assert "RuntimeError: Something went wrong" in log_message
495+
496+
497+
# ML-10538
498+
def test_async_offset_commit_error():
499+
asyncio.run(async_offset_commit_error())
500+
501+
502+
def test_offset_commit_error():
503+
platform = BadCommitter()
504+
logger = MockLogger()
505+
context = CommitterContext(platform, logger=logger)
506+
507+
controller = build_flow(
508+
[
509+
SyncEmitSource(context=context, explicit_ack=True, max_wait_before_commit=1),
510+
Map(lambda x: x + 1),
511+
Filter(lambda x: x < 3),
512+
FlatMap(lambda x: [x, x * 10]),
513+
Reduce(0, lambda acc, x: acc + x),
514+
]
515+
).run()
516+
517+
num_shards = 10
518+
num_records_per_shard = 10
519+
520+
for offset in range(1, num_records_per_shard + 1):
521+
for shard in range(num_shards):
522+
event = Event(shard)
523+
event.shard_id = shard
524+
event.offset = offset
525+
controller.emit(event)
526+
del event
527+
528+
# Make sure that offsets are committed even before termination
529+
time.sleep(2)
530+
offsets = copy.copy(platform.offsets)
531+
532+
try:
533+
assert offsets == {}
534+
finally:
535+
termination_result = controller.terminate(wait=True)
536+
537+
assert termination_result == 330
538+
539+
log_level, (log_message,), _ = logger.logs[0]
540+
assert log_level == "error"
541+
assert "Failed to commit offsets due to error" in log_message
542+
assert "RuntimeError: Something went wrong" in log_message
543+
544+
444545
def test_multiple_upstreams():
445546
source = SyncEmitSource()
446547
map1 = Map(lambda x: x + 1)

0 commit comments

Comments
 (0)