-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Expand file tree
/
Copy pathcontext.py
More file actions
204 lines (169 loc) · 7.39 KB
/
context.py
File metadata and controls
204 lines (169 loc) · 7.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""CLI context persistence helpers."""
from __future__ import annotations
import json
import logging
from collections.abc import Callable
from pathlib import Path
from typing import Any, Literal
from filelock import FileLock, Timeout
from ..io import atomic_update_json, atomic_write_json
from ..paths import get_context_path
logger = logging.getLogger(__name__)
ContextPathFn = Callable[..., Path]
ContextClearStatus = Literal["cleared", "unchanged", "contended", "unavailable"]
def _describe_json_shape(value: Any) -> str:
"""Return a compact diagnostic description for unexpected JSON payloads."""
return f"{type(value).__name__} {value!r}"
def _current_storage_override() -> Path | None:
"""Resolve the active ``--storage`` override from the current Click context.
Backward-compatibility shim — delegates to
:func:`notebooklm.cli.services.auth_source.current_storage_override`.
New callers should use the :class:`AuthSource` resolver directly so
they pick up the full precedence chain (env-var fast path etc.).
"""
from .services.auth_source import current_storage_override
return current_storage_override()
def _resolve_context_path(context_path_fn: ContextPathFn | None = None) -> Path:
context_path_fn = context_path_fn or get_context_path
return context_path_fn(storage_path=_current_storage_override())
def _get_context_value(key: str, *, context_path_fn: ContextPathFn | None = None) -> str | None:
"""Read a single value from context.json."""
context_file = _resolve_context_path(context_path_fn)
if not context_file.exists():
return None
try:
data = json.loads(context_file.read_text(encoding="utf-8"))
if not isinstance(data, dict):
logger.warning(
"Context file %s has invalid shape; expected JSON object, got %s.",
context_file,
_describe_json_shape(data),
)
return None
return data.get(key)
except json.JSONDecodeError:
logger.warning(
"Context file %s is corrupted; cannot read '%s'. Run 'notebooklm clear' to reset.",
context_file,
key,
)
return None
except OSError as e:
logger.warning("Cannot read context file %s: %s", context_file, e)
return None
def _set_context_value(
key: str, value: str | None, *, context_path_fn: ContextPathFn | None = None
) -> None:
"""Set or clear a single value in context.json."""
context_file = _resolve_context_path(context_path_fn)
if not context_file.exists():
# Conversation updates are context-only: callers must select a notebook
# first, which creates the file and account metadata to preserve.
return
def _mutate(existing: Any) -> dict[str, Any]:
data = dict(existing) if isinstance(existing, dict) else {}
if value is not None:
data[key] = value
elif key in data:
del data[key]
return data
try:
atomic_update_json(context_file, _mutate)
except json.JSONDecodeError:
logger.warning(
"Context file %s is corrupted; cannot update '%s'. Run 'notebooklm clear' to reset.",
context_file,
key,
)
except OSError as e:
logger.warning("Failed to write context file %s for key '%s': %s", context_file, key, e)
def get_current_notebook(*, context_path_fn: ContextPathFn | None = None) -> str | None:
"""Get the current notebook ID from context."""
return _get_context_value("notebook_id", context_path_fn=context_path_fn)
def set_current_notebook(
notebook_id: str,
title: str | None = None,
is_owner: bool | None = None,
created_at: str | None = None,
*,
context_path_fn: ContextPathFn | None = None,
) -> None:
"""Set the current notebook context."""
context_file = _resolve_context_path(context_path_fn)
def _mutate(existing: Any) -> dict[str, Any]:
existing_dict = existing if isinstance(existing, dict) else {}
data: dict[str, Any] = {}
if isinstance(existing_dict.get("account"), dict):
data["account"] = existing_dict["account"]
data["notebook_id"] = notebook_id
if title:
data["title"] = title
if is_owner is not None:
data["is_owner"] = is_owner
if created_at:
data["created_at"] = created_at
return data
atomic_update_json(context_file, _mutate, recover_from_corrupt=True)
def clear_context(
*, clear_account: bool = False, context_path_fn: ContextPathFn | None = None
) -> bool:
"""Clear the current context."""
context_file = _resolve_context_path(context_path_fn)
return _clear_context_file(context_file, clear_account=clear_account) == "cleared"
def _clear_context_file(context_file: Path, *, clear_account: bool) -> ContextClearStatus:
"""Clear context file data and report the precise lock/storage outcome."""
if not context_file.exists():
return "unchanged"
lock_path = context_file.with_suffix(context_file.suffix + ".lock")
context_file.parent.mkdir(parents=True, exist_ok=True)
try:
lock = FileLock(str(lock_path), timeout=10.0)
with lock:
return _clear_context_file_locked(context_file, clear_account=clear_account)
except Timeout as e:
logger.warning("Context file %s lock is contended; clear skipped: %s", context_file, e)
return "contended"
except OSError as e:
logger.warning("Context file %s is unavailable; clear skipped: %s", context_file, e)
return "unavailable"
def _clear_context_file_locked(context_file: Path, *, clear_account: bool) -> ContextClearStatus:
"""Clear context file data while the file lock is held."""
try:
if not context_file.exists():
return "unchanged"
if clear_account:
context_file.unlink(missing_ok=True)
return "cleared"
try:
data = json.loads(context_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
context_file.unlink(missing_ok=True)
return "cleared"
if not isinstance(data, dict):
context_file.unlink(missing_ok=True)
return "cleared"
original = dict(data)
account = original.get("account")
# ``clear`` intentionally removes every non-account field so future
# notebook/conversation context keys do not need explicit pop entries.
data.clear()
if "account" in original:
data["account"] = account
if not data:
context_file.unlink(missing_ok=True)
return "cleared"
if data != original:
atomic_write_json(context_file, data)
return "cleared"
return "unchanged"
except OSError as e:
logger.warning("Context file %s is unavailable; clear skipped: %s", context_file, e)
return "unavailable"
def get_current_conversation(*, context_path_fn: ContextPathFn | None = None) -> str | None:
"""Get the current conversation ID from context."""
return _get_context_value("conversation_id", context_path_fn=context_path_fn)
def set_current_conversation(
conversation_id: str | None, *, context_path_fn: ContextPathFn | None = None
) -> None:
"""Set or clear the current conversation ID in context."""
_set_context_value("conversation_id", conversation_id, context_path_fn=context_path_fn)