Skip to content

Commit 0a4d059

Browse files
Fix for outdated sensors after rebalancing (#72)
* Rename test classes to start with `Test` Previously these classes started with `test_`, such classes are not collected by pytest so these tests were not run. * Failing test for clearing sensors on rebalance * Clear sensors related to topics after rebalance * Refactor prometheus metrics * Failing test for prometheus metrics with topic labels after rebalance * Clear prometheus topic related metrics on rebalance * Failing test for IndexError in `on_stream_event_out` * Fix IndexError in `on_stream_event_out`
1 parent fcb6b18 commit 0a4d059

File tree

7 files changed

+774
-347
lines changed

7 files changed

+774
-347
lines changed

faust/sensors/monitor.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ def on_assignment_completed(
571571
def on_rebalance_start(self, app: AppT) -> Dict:
572572
"""Cluster rebalance in progress."""
573573
self.rebalances = app.rebalancing_count
574+
self._clear_topic_related_sensors()
574575
return {"time_start": self.time()}
575576

576577
def on_rebalance_return(self, app: AppT, state: Dict) -> None:
@@ -596,6 +597,7 @@ def on_rebalance_end(self, app: AppT, state: Dict) -> None:
596597
latency_end=latency_end,
597598
)
598599
deque_pushpopmax(self.rebalance_end_latency, latency_end, self.max_avg_history)
600+
self._clear_topic_related_sensors()
599601

600602
def on_web_request_start(
601603
self, app: AppT, request: web.Request, *, view: web.View = None
@@ -633,3 +635,10 @@ def _normalize(
633635
substitution: str = RE_NORMALIZE_SUBSTITUTION
634636
) -> str:
635637
return pattern.sub(substitution, name)
638+
639+
def _clear_topic_related_sensors(self) -> None:
640+
self.tp_committed_offsets.clear()
641+
self.tp_read_offsets.clear()
642+
self.tp_end_offsets.clear()
643+
self.topic_buffer_full.clear()
644+
self.stream_inbound_time.clear()

0 commit comments

Comments
 (0)