Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion src/notebooklm/cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging
import os
import time
from collections.abc import Iterable
from functools import wraps
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -76,6 +77,42 @@ def run_async(coro):
return asyncio.run(coro)


def _source_identity(source: dict) -> tuple[str, str] | None:
"""Return a stable dedupe key for research import sources.

URL-backed sources dedupe by URL. Deep-research report entries do not have a
URL, so we fall back to the report body and title to avoid resubmitting the
same synthesized report after a timeout.
"""
url = source.get("url")
if isinstance(url, str) and url:
return ("url", url)

if source.get("result_type") == 5:
report_markdown = source.get("report_markdown")
title = source.get("title")
if isinstance(report_markdown, str) and report_markdown:
return ("report", f"{title or ''}\n{report_markdown}")

return None


def _existing_source_identities(existing_sources: Iterable[object]) -> set[tuple[str, str]]:
"""Build dedupe keys from notebook sources returned by the API."""
identities: set[tuple[str, str]] = set()
for source in existing_sources:
url = getattr(source, "url", None)
if isinstance(url, str) and url:
identities.add(("url", url))

result_type = getattr(source, "result_type", None)
report_markdown = getattr(source, "report_markdown", None)
if result_type == 5 and isinstance(report_markdown, str) and report_markdown:
identities.add(("report", f"{getattr(source, 'title', '') or ''}\n{report_markdown}"))

return identities
Comment on lines +100 to +113
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check if Source class has result_type or report_markdown attributes

echo "=== Source class definition ==="
ast-grep --pattern $'class Source:
  $$$
'

echo ""
echo "=== Check for result_type in types.py ==="
rg -n "result_type" src/notebooklm/types.py || echo "Not found"

echo ""
echo "=== Check for report_markdown in types.py ==="
rg -n "report_markdown" src/notebooklm/types.py || echo "Not found"

Repository: teng-lin/notebooklm-py

Length of output: 9685


Report deduplication logic will never execute due to missing attributes.

The Source dataclass in src/notebooklm/types.py:484-507 defines attributes id, title, url, _type_code, created_at, and status, but not result_type or report_markdown.

Lines 108–111 attempt to access these non-existent attributes:

  • getattr(source, "result_type", None) always returns None
  • The condition result_type == 5 never evaluates to True
  • Report-based identities are never added to the deduplication set

URL-based deduplication (lines 104–106) works correctly, but report-type sources will not be deduplicated against existing sources, risking duplicates on retry.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/notebooklm/cli/helpers.py` around lines 100 - 113, The dedupe branch
never runs because it checks non-existent attributes `result_type` and
`report_markdown`; update the function `_existing_source_identities` to use the
actual Source fields: read the source type via `getattr(source, "_type_code",
None)` (replace `result_type`) and, if you need report content, use an existing
field such as `getattr(source, "title", "")` (or another real field from the
Source dataclass) instead of `report_markdown`; ensure the conditional uses
`_type_code == 5` and only builds the ("report", ...) identity when the chosen
real field is a non-empty string so report-based deduplication can actually run.



async def import_with_retry(
client,
notebook_id: str,
Expand All @@ -96,16 +133,42 @@ async def import_with_retry(
started_at = time.monotonic()
delay = initial_delay
attempt = 1
pending_sources = list(sources)

while True:
try:
return await client.research.import_sources(notebook_id, task_id, sources)
return await client.research.import_sources(notebook_id, task_id, pending_sources)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When a retry occurs with a subset of the original sources, the return value of import_with_retry will only contain the IDs and titles of the sources imported in that specific successful attempt. If some sources landed during a previous attempt that timed out, their information is lost. While reconciling the full list would be ideal, avoid making breaking API changes. Instead, document this existing behavior in tests and flag it for future consideration, as per repository rules.

References
  1. Avoid making breaking API changes. Instead, document the existing behavior in tests and flag it for future consideration.

except RPCTimeoutError:
elapsed = time.monotonic() - started_at
remaining = max_elapsed - elapsed
if remaining <= 0:
raise

try:
existing_sources = await client.sources.list(notebook_id)
existing_identities = _existing_source_identities(existing_sources)
pending_sources = [
source
for source in pending_sources
if (identity := _source_identity(source)) is None
or identity not in existing_identities
]
if not pending_sources:
logger.info(
"IMPORT_RESEARCH timeout for notebook %s but all sources are already present; stopping retries",
notebook_id,
)
# Preserve the existing CLI contract: if every source already
# landed during a timed-out attempt, return an empty list
# rather than fabricating imported records we do not have.
return []
except Exception as e: # pragma: no cover - defensive: retry original pending batch
logger.debug(
"Failed to list existing sources before retrying research import for %s: %s",
notebook_id,
e,
)

sleep_for = min(delay, max_delay, remaining)
logger.warning(
"IMPORT_RESEARCH timed out for notebook %s; retrying in %.1fs (attempt %d, %.1fs elapsed)",
Expand Down
119 changes: 119 additions & 0 deletions tests/unit/cli/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,6 +651,7 @@ async def test_retries_rpc_timeout_then_succeeds(self):
[{"id": "src_1", "title": "Source 1"}],
]
)
client.sources.list = AsyncMock(return_value=[])

with (
patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
Expand All @@ -667,6 +668,7 @@ async def test_retries_rpc_timeout_then_succeeds(self):

assert imported == [{"id": "src_1", "title": "Source 1"}]
assert client.research.import_sources.await_count == 2
client.sources.list.assert_awaited_once_with("nb_123")
mock_sleep.assert_awaited_once_with(5)
mock_console.print.assert_called_once()

Expand All @@ -679,6 +681,7 @@ async def test_retries_silently_for_json_output(self):
[],
]
)
client.sources.list = AsyncMock(return_value=[])

with (
patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock),
Expand All @@ -699,6 +702,7 @@ async def test_raises_after_elapsed_budget(self):
client = MagicMock()
error = RPCTimeoutError("Timed out", timeout_seconds=30.0)
client.research.import_sources = AsyncMock(side_effect=error)
client.sources.list = AsyncMock(return_value=[])

with (
patch("notebooklm.cli.helpers.time.monotonic", side_effect=[0.0, 1801.0]),
Expand All @@ -715,6 +719,121 @@ async def test_raises_after_elapsed_budget(self):

mock_sleep.assert_not_awaited()

@pytest.mark.asyncio
async def test_retries_only_pending_sources_after_timeout(self):
client = MagicMock()
client.research.import_sources = AsyncMock(
side_effect=[
RPCTimeoutError("Timed out", timeout_seconds=30.0),
[{"id": "src_2", "title": "Source 2"}],
]
)
client.sources.list = AsyncMock(
return_value=[MagicMock(url="https://example.com/already-imported")]
)
sources = [
{"url": "https://example.com/already-imported", "title": "Source 1"},
{"url": "https://example.com/still-pending", "title": "Source 2"},
]

with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock):
imported = await import_with_retry(client, "nb_123", "task_123", sources)

assert imported == [{"id": "src_2", "title": "Source 2"}]
assert client.research.import_sources.await_args_list[0].args[2] == sources
assert client.research.import_sources.await_args_list[1].args[2] == [sources[1]]

@pytest.mark.asyncio
async def test_stops_retrying_when_all_sources_already_imported(self):
client = MagicMock()
client.research.import_sources = AsyncMock(
side_effect=[RPCTimeoutError("Timed out", timeout_seconds=30.0)]
)
client.sources.list = AsyncMock(
return_value=[MagicMock(url="https://example.com/already-imported")]
)

with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
imported = await import_with_retry(
client,
"nb_123",
"task_123",
[{"url": "https://example.com/already-imported", "title": "Source 1"}],
)

assert imported == []
assert client.research.import_sources.await_count == 1
mock_sleep.assert_not_awaited()

@pytest.mark.asyncio
async def test_continues_retrying_when_sources_list_fails(self):
client = MagicMock()
sources = [{"url": "https://example.com", "title": "Source 1"}]
client.research.import_sources = AsyncMock(
side_effect=[
RPCTimeoutError("Timed out", timeout_seconds=30.0),
[{"id": "src_1", "title": "Source 1"}],
]
)
client.sources.list = AsyncMock(side_effect=Exception("API error"))

with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock):
imported = await import_with_retry(client, "nb_123", "task_123", sources)

assert imported == [{"id": "src_1", "title": "Source 1"}]
assert client.research.import_sources.await_args_list[1].args[2] == sources

@pytest.mark.asyncio
async def test_skips_report_sources_already_imported_after_timeout(self):
client = MagicMock()
report_source = {
"title": "Research report",
"result_type": 5,
"report_markdown": "# Deep report body",
}
client.research.import_sources = AsyncMock(
side_effect=[RPCTimeoutError("Timed out", timeout_seconds=30.0)]
)
client.sources.list = AsyncMock(
return_value=[
MagicMock(
title="Research report",
result_type=5,
report_markdown="# Deep report body",
url=None,
)
]
)

with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock) as mock_sleep:
imported = await import_with_retry(client, "nb_123", "task_123", [report_source])

assert imported == []
assert client.research.import_sources.await_count == 1
mock_sleep.assert_not_awaited()

@pytest.mark.asyncio
async def test_timeout_reconciliation_only_returns_final_successful_batch(self):
client = MagicMock()
sources = [
{"url": "https://example.com/already-imported", "title": "Source 1"},
{"url": "https://example.com/still-pending", "title": "Source 2"},
]
client.research.import_sources = AsyncMock(
side_effect=[
RPCTimeoutError("Timed out", timeout_seconds=30.0),
[{"id": "src_2", "title": "Source 2"}],
]
)
client.sources.list = AsyncMock(
return_value=[MagicMock(url="https://example.com/already-imported")]
)

with patch("notebooklm.cli.helpers.asyncio.sleep", new_callable=AsyncMock):
imported = await import_with_retry(client, "nb_123", "task_123", sources)

assert imported == [{"id": "src_2", "title": "Source 2"}]

@pytest.mark.asyncio
async def test_does_not_retry_non_timeout_error(self):
client = MagicMock()
Expand Down
Loading