Skip to content

Commit cefd21c

Browse files
committed
Imprve replay logs
1 parent 6dcc96f commit cefd21c

2 files changed

Lines changed: 34 additions & 13 deletions

File tree

src/karapace/core/schema_reader.py

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def __init__(
191191
self.processed_deprecated_karapace_keys_total = 0
192192
self.last_check = time.monotonic()
193193
self.start_time = time.monotonic()
194-
self.startup_previous_processed_offset = 0
194+
self._replay_start_logged = False
195195

196196
self.consecutive_unexpected_errors: int = 0
197197
self.consecutive_unexpected_errors_start: float = 0
@@ -380,34 +380,27 @@ def _is_ready(self) -> bool:
380380
# Reduce by one for actual highest offset.
381381
self._highest_offset = end_offset - 1
382382

383-
# Log when replay starts (first time we know how many messages to process)
384-
if self.startup_previous_processed_offset == 0 and self._highest_offset > 0:
385-
LOG.info(
386-
"Starting schema replay: %s messages to process (offset 0 to %s)",
387-
self._highest_offset + 1,
388-
self._highest_offset,
389-
)
383+
if not self._replay_start_logged and self._highest_offset > 0:
384+
LOG.info("Starting schema replay: reading up to offset %s", self._highest_offset)
385+
self._replay_start_logged = True
390386

391387
cur_time = time.monotonic()
392388
time_from_last_check = cur_time - self.last_check
393389
progress_pct = 0 if not self._highest_offset else round((self.offset / self._highest_offset) * 100, 2)
394-
startup_processed_message_per_second = (self.offset - self.startup_previous_processed_offset) / time_from_last_check
395390
LOG.debug(
396-
"Replay progress (%s): %s/%s (%s %%) (recs/s %s)",
391+
"Replay progress (%s): %s/%s (%s %%)",
397392
round(time_from_last_check, 2),
398393
self.offset,
399394
self._highest_offset,
400395
progress_pct,
401-
startup_processed_message_per_second,
402396
)
403397
self.last_check = cur_time
404-
self.startup_previous_processed_offset = self.offset
405398
ready = self.offset >= self._highest_offset
406399
if ready:
407400
self.max_messages_to_process = MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
408401
# Always log when replay completes - this is important for operators
409402
LOG.info(
410-
"Schema replay completed in %.2f seconds (processed %s messages)",
403+
"Schema replay completed in %.2f seconds (read up to offset %s)",
411404
time.monotonic() - self.start_time,
412405
self.offset,
413406
)
@@ -427,8 +420,12 @@ def ready(self) -> bool:
427420
return self._ready
428421

429422
def set_not_ready(self) -> None:
423+
now = time.monotonic()
430424
with self._ready_lock:
431425
self._ready = False
426+
self.start_time = now
427+
self.last_check = now
428+
self._replay_start_logged = False
432429

433430
@staticmethod
434431
def _parse_message_value(raw_value: str | bytes) -> JsonObject | None:

tests/unit/test_schema_reader.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,30 @@ def test_num_max_messages_to_consume_moved_to_one_after_ready(karapace_container
208208
assert schema_reader.max_messages_to_process == MAX_MESSAGES_TO_CONSUME_AFTER_STARTUP
209209

210210

211+
def test_set_not_ready_resets_replay_timing_state(karapace_container: KarapaceContainer) -> None:
212+
"""A master rebalance triggers set_not_ready(); subsequent replay logs must report
213+
durations and state relative to the new replay, not the original process start."""
214+
schema_reader = KafkaSchemaReader(
215+
config=karapace_container.config(),
216+
offset_watcher=OffsetWatcher(),
217+
key_formatter=Mock(spec=KeyFormatter),
218+
master_coordinator=None,
219+
database=InMemoryDatabase(),
220+
stats=Mock(spec=StatsClient),
221+
)
222+
223+
schema_reader.start_time = 0.0
224+
schema_reader.last_check = 0.0
225+
schema_reader._replay_start_logged = True
226+
227+
schema_reader.set_not_ready()
228+
229+
assert schema_reader.ready() is False
230+
assert schema_reader.start_time > 0.0
231+
assert schema_reader.last_check > 0.0
232+
assert schema_reader._replay_start_logged is False
233+
234+
211235
def test_schema_reader_skips_empty_message_and_advances_offset(karapace_container: KarapaceContainer) -> None:
212236
key_formatter_mock = Mock(spec=KeyFormatter)
213237
stats_mock = Mock(spec=StatsClient)

0 commit comments

Comments
 (0)