-
Notifications
You must be signed in to change notification settings - Fork 108
Expand file tree
/
Copy pathclient.py
More file actions
303 lines (243 loc) · 9.47 KB
/
client.py
File metadata and controls
303 lines (243 loc) · 9.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""Fetch, parse, and cache marketplace.json from GitHub repositories.
Uses ``AuthResolver.try_with_fallback(unauth_first=True)`` for public-first
access with automatic credential fallback for private marketplace repos.
Cache lives at ``~/.apm/cache/marketplace/`` with a 1-hour TTL.
"""
import json
import logging
import os
import time
from typing import Dict, List, Optional
import requests
from .errors import MarketplaceFetchError
from .models import MarketplaceManifest, MarketplacePlugin, MarketplaceSource, parse_marketplace_json
from .registry import get_registered_marketplaces
logger = logging.getLogger(__name__)
_CACHE_TTL_SECONDS = 3600 # 1 hour
_CACHE_DIR_NAME = os.path.join("cache", "marketplace")
# Candidate locations for marketplace.json in a repository (priority order)
_MARKETPLACE_PATHS = [
"marketplace.json",
".github/plugin/marketplace.json",
".claude-plugin/marketplace.json",
]
def _cache_dir() -> str:
"""Return the cache directory, creating it if needed."""
from ..config import CONFIG_DIR
d = os.path.join(CONFIG_DIR, _CACHE_DIR_NAME)
os.makedirs(d, exist_ok=True)
return d
def _sanitize_cache_name(name: str) -> str:
"""Sanitize marketplace name for safe use in file paths."""
import re
from ..utils.path_security import PathTraversalError, validate_path_segments
safe = re.sub(r"[^a-zA-Z0-9._-]", "_", name)
# Prevent path traversal even after sanitization
safe = safe.strip(".").strip("_") or "unnamed"
# Defense-in-depth: validate with centralized path security
try:
validate_path_segments(safe, context="cache name")
except PathTraversalError:
safe = "unnamed"
return safe
def _cache_data_path(name: str) -> str:
return os.path.join(_cache_dir(), f"{_sanitize_cache_name(name)}.json")
def _cache_meta_path(name: str) -> str:
return os.path.join(_cache_dir(), f"{_sanitize_cache_name(name)}.meta.json")
def _read_cache(name: str) -> Optional[Dict]:
"""Read cached marketplace data if valid (not expired)."""
data_path = _cache_data_path(name)
meta_path = _cache_meta_path(name)
if not os.path.exists(data_path) or not os.path.exists(meta_path):
return None
try:
with open(meta_path, "r") as f:
meta = json.load(f)
fetched_at = meta.get("fetched_at", 0)
ttl = meta.get("ttl_seconds", _CACHE_TTL_SECONDS)
if time.time() - fetched_at > ttl:
return None # Expired
with open(data_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError, KeyError) as exc:
logger.debug("Cache read failed for '%s': %s", name, exc)
return None
def _read_stale_cache(name: str) -> Optional[Dict]:
"""Read cached data even if expired (stale-while-revalidate)."""
data_path = _cache_data_path(name)
if not os.path.exists(data_path):
return None
try:
with open(data_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def _write_cache(name: str, data: Dict) -> None:
"""Write marketplace data and metadata to cache."""
data_path = _cache_data_path(name)
meta_path = _cache_meta_path(name)
try:
with open(data_path, "w") as f:
json.dump(data, f, indent=2)
with open(meta_path, "w") as f:
json.dump(
{"fetched_at": time.time(), "ttl_seconds": _CACHE_TTL_SECONDS},
f,
)
except OSError as exc:
logger.debug("Cache write failed for '%s': %s", name, exc)
def _clear_cache(name: str) -> None:
"""Remove cached data for a marketplace."""
for path in (_cache_data_path(name), _cache_meta_path(name)):
try:
os.remove(path)
except OSError:
pass
# ---------------------------------------------------------------------------
# Network fetch
# ---------------------------------------------------------------------------
def _github_contents_url(source: MarketplaceSource, file_path: str) -> str:
"""Build the GitHub Contents API URL for a file."""
from ..core.auth import AuthResolver
host_info = AuthResolver.classify_host(source.host)
api_base = host_info.api_base
return f"{api_base}/repos/{source.owner}/{source.repo}/contents/{file_path}?ref={source.branch}"
def _fetch_file(
source: MarketplaceSource,
file_path: str,
auth_resolver: Optional[object] = None,
) -> Optional[Dict]:
"""Fetch a JSON file from a GitHub repo via the Contents API.
Returns parsed JSON or ``None`` if the file does not exist (404).
Raises ``MarketplaceFetchError`` on unexpected failures.
"""
url = _github_contents_url(source, file_path)
def _do_fetch(token, _git_env):
headers = {
"Accept": "application/vnd.github.v3.raw",
"User-Agent": "apm-cli",
}
if token:
headers["Authorization"] = f"token {token}"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
if auth_resolver is None:
from ..core.auth import AuthResolver
auth_resolver = AuthResolver()
try:
return auth_resolver.try_with_fallback(
source.host,
_do_fetch,
org=source.owner,
repo_path=f"{source.owner}/{source.repo}.git",
unauth_first=True,
)
except Exception as exc:
raise MarketplaceFetchError(source.name, str(exc)) from exc
def _auto_detect_path(
source: MarketplaceSource,
auth_resolver: Optional[object] = None,
) -> Optional[str]:
"""Probe candidate locations and return the first that exists.
Returns ``None`` if no location contains a marketplace.json.
Raises ``MarketplaceFetchError`` on non-404 failures (auth errors, etc.).
"""
for candidate in _MARKETPLACE_PATHS:
data = _fetch_file(source, candidate, auth_resolver=auth_resolver)
if data is not None:
return candidate
return None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def fetch_marketplace(
source: MarketplaceSource,
*,
force_refresh: bool = False,
auth_resolver: Optional[object] = None,
) -> MarketplaceManifest:
"""Fetch and parse a marketplace manifest.
Uses cache when available (1h TTL). Falls back to stale cache on
network errors.
Args:
source: Marketplace source to fetch.
force_refresh: Skip cache and re-fetch from network.
auth_resolver: Optional ``AuthResolver`` instance (created if None).
Returns:
MarketplaceManifest: Parsed manifest.
Raises:
MarketplaceFetchError: If fetch fails and no cache is available.
"""
# Try fresh cache first
if not force_refresh:
cached = _read_cache(source.name)
if cached is not None:
logger.debug("Using cached marketplace data for '%s'", source.name)
return parse_marketplace_json(cached, source.name)
# Fetch from network
try:
data = _fetch_file(source, source.path, auth_resolver=auth_resolver)
if data is None:
raise MarketplaceFetchError(
source.name,
f"marketplace.json not found at '{source.path}' "
f"in {source.owner}/{source.repo}",
)
_write_cache(source.name, data)
return parse_marketplace_json(data, source.name)
except MarketplaceFetchError:
# Stale-while-revalidate: serve expired cache on network error
stale = _read_stale_cache(source.name)
if stale is not None:
logger.warning(
"Network error fetching '%s'; using stale cache", source.name
)
return parse_marketplace_json(stale, source.name)
raise
def fetch_or_cache(
source: MarketplaceSource,
*,
auth_resolver: Optional[object] = None,
) -> MarketplaceManifest:
"""Convenience wrapper -- same as ``fetch_marketplace`` with defaults."""
return fetch_marketplace(source, auth_resolver=auth_resolver)
def search_marketplace(
query: str,
source: MarketplaceSource,
*,
auth_resolver: Optional[object] = None,
) -> List[MarketplacePlugin]:
"""Search a single marketplace for plugins matching *query*."""
manifest = fetch_marketplace(source, auth_resolver=auth_resolver)
return manifest.search(query)
def search_all_marketplaces(
query: str,
*,
auth_resolver: Optional[object] = None,
) -> List[MarketplacePlugin]:
"""Search across all registered marketplaces.
Returns plugins matching the query, annotated with their source marketplace.
"""
results: List[MarketplacePlugin] = []
for source in get_registered_marketplaces():
try:
manifest = fetch_marketplace(source, auth_resolver=auth_resolver)
results.extend(manifest.search(query))
except MarketplaceFetchError as exc:
logger.warning("Skipping marketplace '%s': %s", source.name, exc)
return results
def clear_marketplace_cache(name: Optional[str] = None) -> int:
"""Clear cached data for one or all marketplaces.
Returns the number of caches cleared.
"""
if name:
_clear_cache(name)
return 1
count = 0
for source in get_registered_marketplaces():
_clear_cache(source.name)
count += 1
return count