Skip to content

Commit 6f86b73

Browse files
authored
backpressure fix during recovery Issue #74 (#75)
1 parent 0a4d059 commit 6f86b73

File tree

3 files changed

+5
-1
lines changed

3 files changed

+5
-1
lines changed

faust/tables/recovery.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ async def _restart_recovery(self) -> None:
530530

531531
def _set_recovery_started(self) -> None:
532532
self.in_recovery = True
533+
self.app.in_recovery = True
533534
self._recovery_ended = None
534535
self._recovery_started_at = monotonic()
535536
self._active_events_received_at.clear()
@@ -539,6 +540,7 @@ def _set_recovery_started(self) -> None:
539540

540541
def _set_recovery_ended(self) -> None:
541542
self.in_recovery = False
543+
self.app.in_recovery = False
542544
self._recovery_ended_at = monotonic()
543545
self._active_events_received_at.clear()
544546
self._standby_events_received_at.clear()

faust/transport/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,7 @@ def _set_active_tps(self, tps: Set[TP]) -> Set[TP]:
519519

520520
def on_buffer_full(self, tp: TP) -> None:
521521
# do not remove the partition when in recovery
522-
if not self.app.rebalancing:
522+
if not self.app.in_recovery:
523523
active_partitions = self._get_active_partitions()
524524
active_partitions.discard(tp)
525525
self._buffered_partitions.add(tp)

faust/types/app.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,8 @@ class AppT(ServiceT):
179179
#: Set to true if the worker is currently rebalancing.
180180
rebalancing: bool = False
181181
rebalancing_count: int = 0
182+
#: Set to true when the worker is in recovery
183+
in_recovery: bool = False
182184

183185
#: Set to true if the assignment is empty
184186
# This flag is set by App._on_partitions_assigned

0 commit comments

Comments
 (0)