Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion plugin/plugins/study_companion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ def __init__(self, ctx):
self._last_awareness_push_at = 0.0
self._awareness_idle_ticks = 0
self._review_due_task: asyncio.Task[None] | None = None
self._review_due_payload_future: asyncio.Future[dict[str, Any]] | None = None
self._command_queue: asyncio.Queue[tuple[str, dict[str, Any]]] = asyncio.Queue()
self._command_worker_task: asyncio.Task[None] | None = None
self._interruptible_task: asyncio.Task[None] | None = None
Expand Down Expand Up @@ -332,7 +333,9 @@ async def _cleanup_after_failed_startup(self) -> None:
await self._unsubscribe_neko_commands()
await self._cancel_command_worker()
await self._cancel_review_due_task()
event_bus = self._event_bus
agent = self._agent
ocr_pipeline = self._ocr_pipeline
self._agent = None
self._ocr_pipeline = None
self._knowledge_tracker = None
Expand All @@ -343,6 +346,11 @@ async def _cleanup_after_failed_startup(self) -> None:
self._supervision = None
self._memory_habit_bridge = None
self._event_bus = None
if event_bus is not None:
try:
await event_bus.stop_worker()
except Exception as exc:
self.logger.warning("study startup cleanup event bus failed: {}", exc)
try:
self.clear_list_actions()
except Exception as exc:
Expand All @@ -362,6 +370,13 @@ async def _cleanup_after_failed_startup(self) -> None:
self.logger.warning(
"study startup cleanup agent shutdown failed: {}", exc
)
if ocr_pipeline is not None:
try:
close_ocr = getattr(ocr_pipeline, "close", None)
if callable(close_ocr):
close_ocr()
except Exception as exc:
self.logger.warning("study startup cleanup OCR pipeline failed: {}", exc)
try:
await asyncio.to_thread(self._store.close)
except Exception as exc:
Expand All @@ -374,12 +389,30 @@ async def shutdown(self, **_):
await self._unsubscribe_neko_commands()
await self._cancel_command_worker()
await self._cancel_review_due_task()
event_bus = self._event_bus
self._event_bus = None
if event_bus is not None:
try:
await event_bus.stop_worker()
except Exception as exc:
self.logger.warning("study shutdown event bus cleanup failed: {}", exc)
try:
self.unregister_dynamic_entry("study_export_notes")
except Exception as exc:
self.logger.warning("study shutdown dynamic entry cleanup failed: {}", exc)
if self._agent is not None:
await self._agent.shutdown()
ocr_pipeline = self._ocr_pipeline
self._ocr_pipeline = None
if ocr_pipeline is not None:
try:
close_ocr = getattr(ocr_pipeline, "close", None)
if callable(close_ocr):
close_ocr()
except Exception as exc:
self.logger.warning(
"study shutdown OCR pipeline cleanup failed: {}", exc
)
async with self._lock:
self._state.status = STATUS_STOPPED
await asyncio.to_thread(self._store.save_state, self._state)
Expand Down Expand Up @@ -591,9 +624,24 @@ async def _cancel_review_due_task(self) -> None:
try:
await task
except asyncio.CancelledError:
return
pass
except Exception as exc:
self.logger.warning("study review due task cleanup failed: {}", exc)
await self._await_review_due_payload_future()

async def _await_review_due_payload_future(self) -> None:
future = self._review_due_payload_future
if future is None:
return
try:
await asyncio.shield(future)
except asyncio.CancelledError:
raise
except Exception as exc:
self.logger.warning("study review due payload cleanup failed: {}", exc)
finally:
if self._review_due_payload_future is future:
self._review_due_payload_future = None

def _on_review_due_task_done(self, task: asyncio.Task[None]) -> None:
if self._review_due_task is task:
Expand Down
126 changes: 105 additions & 21 deletions plugin/plugins/study_companion/_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ class StudyEventBus:
_THROTTLE_TTL = 3600.0
_RESPOND_COOLDOWN = 30.0
_MAX_IN_FLIGHT_EMITS = 8
_MAX_SCHEDULED_EMITS = 64
_MAX_QUEUE_SIZE = 64
_MAX_WORKER_FAILURES = 3
_WORKER_FAILURE_BACKOFF_BASE_SECONDS = 0.05
_WORKER_FAILURE_BACKOFF_MAX_SECONDS = 1.0

def __init__(
self,
Expand All @@ -85,6 +88,11 @@ def __init__(
self._scheduled_emit_count = 0
self._dropped_emit_count = 0
self._emit_semaphore = asyncio.Semaphore(self._MAX_IN_FLIGHT_EMITS)
self._queue: asyncio.Queue[StudyEvent] = asyncio.Queue(
maxsize=self._MAX_QUEUE_SIZE
)
self._worker_task: asyncio.Task[None] | None = None
self._worker_failure_count = 0
self._last_respond_at = -self._RESPOND_COOLDOWN
self._last_screen_context_type = ""
self._screen_buf: list[tuple[str, float]] = []
Expand Down Expand Up @@ -113,25 +121,110 @@ def schedule_emit(self, event: StudyEvent) -> asyncio.Task[None] | None:
except RuntimeError:
_logger.warning("StudyEventBus.schedule_emit() called outside event loop")
return None
if self._scheduled_emit_count >= self._MAX_SCHEDULED_EMITS:
if self._worker_task is None or self._worker_task.done():
self._worker_failure_count = 0
self._worker_task = loop.create_task(self._consume_queue())
if self._queue.full():
try:
dropped = self._queue.get_nowait()
self._safe_task_done()
self._scheduled_emit_count = max(0, self._scheduled_emit_count - 1)
self._dropped_emit_count += 1
_logger.warning(
"StudyEventBus.schedule_emit() dropped oldest event due to backlog: %s",
dropped.name,
)
except asyncio.QueueEmpty:
pass
try:
self._queue.put_nowait(event)
self._scheduled_emit_count += 1
except asyncio.QueueFull:
self._dropped_emit_count += 1
_logger.warning(
"StudyEventBus.schedule_emit() dropped event due to backlog: %s",
event.name,
)
return None
self._scheduled_emit_count += 1
task = loop.create_task(self._emit_with_backpressure(event))
task.add_done_callback(self._on_scheduled_emit_done)
return task
return self._worker_task

async def _emit_with_backpressure(self, event: StudyEvent) -> None:
async with self._emit_semaphore:
await self.emit(event)
async def stop_worker(self) -> None:
task = self._worker_task
if task is None:
return
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
finally:
if self._worker_task is task:
self._worker_task = None

def _on_scheduled_emit_done(self, task: asyncio.Task[None]) -> None:
self._scheduled_emit_count = max(0, self._scheduled_emit_count - 1)
_on_emit_done(task)
async def _consume_queue(self) -> None:
task = asyncio.current_task()
try:
while True:
event = await self._queue.get()
should_stop = False
backoff_seconds = 0.0
try:
async with self._emit_semaphore:
await self.emit(event)
self._worker_failure_count = 0
except asyncio.CancelledError:
raise
except Exception:
self._worker_failure_count += 1
_logger.exception("StudyEventBus worker emit failed")
if self._worker_failure_count >= self._MAX_WORKER_FAILURES:
_logger.error(
"StudyEventBus worker stopped after %s consecutive failures",
self._worker_failure_count,
)
should_stop = True
else:
backoff_seconds = min(
self._WORKER_FAILURE_BACKOFF_MAX_SECONDS,
self._WORKER_FAILURE_BACKOFF_BASE_SECONDS
* (2 ** (self._worker_failure_count - 1)),
)
finally:
self._scheduled_emit_count = max(
0, self._scheduled_emit_count - 1
)
self._safe_task_done()
if should_stop:
self._drop_queued_events_after_worker_stop()
return
if backoff_seconds > 0:
await asyncio.sleep(backoff_seconds)
finally:
if self._worker_task is task:
self._worker_task = None
Comment thread
coderabbitai[bot] marked this conversation as resolved.

def _drop_queued_events_after_worker_stop(self) -> None:
dropped = 0
while True:
try:
self._queue.get_nowait()
except asyncio.QueueEmpty:
break
dropped += 1
self._scheduled_emit_count = max(0, self._scheduled_emit_count - 1)
self._dropped_emit_count += 1
self._safe_task_done()
if dropped:
_logger.error(
"StudyEventBus worker dropped %s queued event(s) after stopping",
dropped,
)

def _safe_task_done(self) -> None:
try:
self._queue.task_done()
except ValueError:
_logger.exception("StudyEventBus queue task_done underflow")

def should_schedule_screen_context(
self, screen_type: str, previous_type: str = ""
Expand Down Expand Up @@ -333,15 +426,6 @@ def _format(self, event: StudyEvent) -> str:
return str(event.payload)


def _on_emit_done(task: asyncio.Task[None]) -> None:
try:
task.result()
except asyncio.CancelledError:
return
except Exception:
_logger.exception("StudyEventBus.schedule_emit() task failed")


def _safe_float(value: Any, default: float = 0.0) -> float:
try:
number = float(value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ async def _emit_review_due_if_needed(self) -> None:
if bus is None:
return
try:
payload = await asyncio.to_thread(self._build_review_due_payload)
loop = asyncio.get_running_loop()
payload_future = loop.run_in_executor(None, self._build_review_due_payload)
self._review_due_payload_future = payload_future
try:
payload = await asyncio.shield(payload_future)
finally:
if payload_future.done() and self._review_due_payload_future is payload_future:
self._review_due_payload_future = None
if not payload:
return
bus.schedule_emit(StudyEvent(name="review_due", payload=payload))
Expand Down
Loading
Loading