diff --git a/src/notebooklm/cli/helpers.py b/src/notebooklm/cli/helpers.py index 2c759fdf..14ab7c65 100644 --- a/src/notebooklm/cli/helpers.py +++ b/src/notebooklm/cli/helpers.py @@ -14,6 +14,7 @@ import logging import os import time +from collections.abc import Iterable from functools import wraps from typing import TYPE_CHECKING @@ -76,6 +77,42 @@ def run_async(coro): return asyncio.run(coro) +def _source_identity(source: dict) -> tuple[str, str] | None: + """Return a stable dedupe key for research import sources. + + URL-backed sources dedupe by URL. Deep-research report entries do not have a + URL, so we fall back to the report body and title to avoid resubmitting the + same synthesized report after a timeout. + """ + url = source.get("url") + if isinstance(url, str) and url: + return ("url", url) + + if source.get("result_type") == 5: + report_markdown = source.get("report_markdown") + title = source.get("title") + if isinstance(report_markdown, str) and report_markdown: + return ("report", f"{title or ''}\n{report_markdown}") + + return None + + +def _existing_source_identities(existing_sources: Iterable[object]) -> set[tuple[str, str]]: + """Build dedupe keys from notebook sources returned by the API.""" + identities: set[tuple[str, str]] = set() + for source in existing_sources: + url = getattr(source, "url", None) + if isinstance(url, str) and url: + identities.add(("url", url)) + + result_type = getattr(source, "result_type", None) + report_markdown = getattr(source, "report_markdown", None) + if result_type == 5 and isinstance(report_markdown, str) and report_markdown: + identities.add(("report", f"{getattr(source, 'title', '') or ''}\n{report_markdown}")) + + return identities + + async def import_with_retry( client, notebook_id: str, @@ -96,16 +133,42 @@ async def import_with_retry( started_at = time.monotonic() delay = initial_delay attempt = 1 + pending_sources = list(sources) while True: try: - return await client.research.import_sources(notebook_id, task_id, sources) + return await client.research.import_sources(notebook_id, task_id, pending_sources) except RPCTimeoutError: elapsed = time.monotonic() - started_at remaining = max_elapsed - elapsed if remaining <= 0: raise + try: + existing_sources = await client.sources.list(notebook_id) + existing_identities = _existing_source_identities(existing_sources) + pending_sources = [ + source + for source in pending_sources + if (identity := _source_identity(source)) is None + or identity not in existing_identities + ] + if not pending_sources: + logger.info( + "IMPORT_RESEARCH timeout for notebook %s but all sources are already present; stopping retries", + notebook_id, + ) + # Preserve the existing CLI contract: if every source already + # landed during a timed-out attempt, return an empty list + # rather than fabricating imported records we do not have. + return [] + except Exception as e: # pragma: no cover - defensive: retry original pending batch + logger.debug( + "Failed to list existing sources before retrying research import for %s: %s", + notebook_id, + e, + ) + sleep_for = min(delay, max_delay, remaining) logger.warning( "IMPORT_RESEARCH timed out for notebook %s; retrying in %.1fs (attempt %d, %.1fs elapsed)", diff --git a/tests/unit/cli/test_helpers.py b/tests/unit/cli/test_helpers.py index 73c5c0b2..02aca42e 100644 --- a/tests/unit/cli/test_helpers.py +++ b/tests/unit/cli/test_helpers.py @@ -651,6 +651,7 @@ async def test_retries_rpc_timeout_then_succeeds(self): [{"id": "src_1", "title": "Source 1"}], ] ) + client.sources.list = AsyncMock(return_value=[]) with ( patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock) as mock_sleep, @@ -667,6 +668,7 @@ async def test_retries_rpc_timeout_then_succeeds(self): assert imported == [{"id": "src_1", "title": "Source 1"}] assert client.research.import_sources.await_count == 2 + client.sources.list.assert_awaited_once_with("nb_123") mock_sleep.assert_awaited_once_with(5) mock_console.print.assert_called_once() @@ -679,6 +681,7 @@ async def test_retries_silently_for_json_output(self): [], ] ) + client.sources.list = AsyncMock(return_value=[]) with ( patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock), @@ -699,6 +702,7 @@ async def test_raises_after_elapsed_budget(self): client = MagicMock() error = RPCTimeoutError("Timed out", timeout_seconds=30.0) client.research.import_sources = AsyncMock(side_effect=error) + client.sources.list = AsyncMock(return_value=[]) with ( patch("notebooklm.cli.helpers.time.monotonic", side_effect=[0.0, 1801.0]), @@ -715,6 +719,121 @@ async def test_raises_after_elapsed_budget(self): mock_sleep.assert_not_awaited() + @pytest.mark.asyncio + async def test_retries_only_pending_sources_after_timeout(self): + client = MagicMock() + client.research.import_sources = AsyncMock( + side_effect=[ + RPCTimeoutError("Timed out", timeout_seconds=30.0), + [{"id": "src_2", "title": "Source 2"}], + ] + ) + client.sources.list = AsyncMock( + return_value=[MagicMock(url="https://example.com/already-imported")] + ) + sources = [ + {"url": "https://example.com/already-imported", "title": "Source 1"}, + {"url": "https://example.com/still-pending", "title": "Source 2"}, + ] + + with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock): + imported = await import_with_retry(client, "nb_123", "task_123", sources) + + assert imported == [{"id": "src_2", "title": "Source 2"}] + assert client.research.import_sources.await_args_list[0].args[2] == sources + assert client.research.import_sources.await_args_list[1].args[2] == [sources[1]] + + @pytest.mark.asyncio + async def test_stops_retrying_when_all_sources_already_imported(self): + client = MagicMock() + client.research.import_sources = AsyncMock( + side_effect=[RPCTimeoutError("Timed out", timeout_seconds=30.0)] + ) + client.sources.list = AsyncMock( + return_value=[MagicMock(url="https://example.com/already-imported")] + ) + + with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + imported = await import_with_retry( + client, + "nb_123", + "task_123", + [{"url": "https://example.com/already-imported", "title": "Source 1"}], + ) + + assert imported == [] + assert client.research.import_sources.await_count == 1 + mock_sleep.assert_not_awaited() + + @pytest.mark.asyncio + async def test_continues_retrying_when_sources_list_fails(self): + client = MagicMock() + sources = [{"url": "https://example.com", "title": "Source 1"}] + client.research.import_sources = AsyncMock( + side_effect=[ + RPCTimeoutError("Timed out", timeout_seconds=30.0), + [{"id": "src_1", "title": "Source 1"}], + ] + ) + client.sources.list = AsyncMock(side_effect=Exception("API error")) + + with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock): + imported = await import_with_retry(client, "nb_123", "task_123", sources) + + assert imported == [{"id": "src_1", "title": "Source 1"}] + assert client.research.import_sources.await_args_list[1].args[2] == sources + + @pytest.mark.asyncio + async def test_skips_report_sources_already_imported_after_timeout(self): + client = MagicMock() + report_source = { + "title": "Research report", + "result_type": 5, + "report_markdown": "# Deep report body", + } + client.research.import_sources = AsyncMock( + side_effect=[RPCTimeoutError("Timed out", timeout_seconds=30.0)] + ) + client.sources.list = AsyncMock( + return_value=[ + MagicMock( + title="Research report", + result_type=5, + report_markdown="# Deep report body", + url=None, + ) + ] + ) + + with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock) as mock_sleep: + imported = await import_with_retry(client, "nb_123", "task_123", [report_source]) + + assert imported == [] + assert client.research.import_sources.await_count == 1 + mock_sleep.assert_not_awaited() + + @pytest.mark.asyncio + async def test_timeout_reconciliation_only_returns_final_successful_batch(self): + client = MagicMock() + sources = [ + {"url": "https://example.com/already-imported", "title": "Source 1"}, + {"url": "https://example.com/still-pending", "title": "Source 2"}, + ] + client.research.import_sources = AsyncMock( + side_effect=[ + RPCTimeoutError("Timed out", timeout_seconds=30.0), + [{"id": "src_2", "title": "Source 2"}], + ] + ) + client.sources.list = AsyncMock( + return_value=[MagicMock(url="https://example.com/already-imported")] + ) + + with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock): + imported = await import_with_retry(client, "nb_123", "task_123", sources) + + assert imported == [{"id": "src_2", "title": "Source 2"}] + @pytest.mark.asyncio async def test_does_not_retry_non_timeout_error(self): client = MagicMock()