forked from microsoft/apm
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclient.py
More file actions
382 lines (308 loc) · 11.9 KB
/
client.py
File metadata and controls
382 lines (308 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
"""Fetch, parse, and cache marketplace.json from GitHub repositories.
Uses ``AuthResolver.try_with_fallback(unauth_first=True)`` for public-first
access with automatic credential fallback for private marketplace repos.
When ``PROXY_REGISTRY_URL`` is set, fetches are routed through the registry
proxy (Artifactory Archive Entry Download) before falling back to the
GitHub Contents API. When ``PROXY_REGISTRY_ONLY=1``, the GitHub fallback
is blocked entirely.
Cache lives at ``~/.apm/cache/marketplace/`` with a 1-hour TTL.
"""
import json
import logging
import os
import time
from typing import Dict, List, Optional
import requests
from .errors import MarketplaceFetchError
from .models import MarketplaceManifest, MarketplacePlugin, MarketplaceSource, parse_marketplace_json
from .registry import get_registered_marketplaces
logger = logging.getLogger(__name__)
_CACHE_TTL_SECONDS = 3600 # 1 hour
_CACHE_DIR_NAME = os.path.join("cache", "marketplace")
# Candidate locations for marketplace.json in a repository (priority order)
_MARKETPLACE_PATHS = [
"marketplace.json",
".github/plugin/marketplace.json",
".claude-plugin/marketplace.json",
]
def _cache_dir() -> str:
"""Return the cache directory, creating it if needed."""
from ..config import CONFIG_DIR
d = os.path.join(CONFIG_DIR, _CACHE_DIR_NAME)
os.makedirs(d, exist_ok=True)
return d
def _sanitize_cache_name(name: str) -> str:
"""Sanitize marketplace name for safe use in file paths."""
import re
from ..utils.path_security import PathTraversalError, validate_path_segments
safe = re.sub(r"[^a-zA-Z0-9._-]", "_", name)
# Prevent path traversal even after sanitization
safe = safe.strip(".").strip("_") or "unnamed"
# Defense-in-depth: validate with centralized path security
try:
validate_path_segments(safe, context="cache name")
except PathTraversalError:
safe = "unnamed"
return safe
def _cache_key(source: MarketplaceSource) -> str:
"""Cache key that includes host to avoid collisions across hosts."""
normalized_host = source.host.lower()
if normalized_host == "github.com":
return source.name
return f"{_sanitize_cache_name(normalized_host)}__{source.name}"
def _cache_data_path(name: str) -> str:
return os.path.join(_cache_dir(), f"{_sanitize_cache_name(name)}.json")
def _cache_meta_path(name: str) -> str:
return os.path.join(_cache_dir(), f"{_sanitize_cache_name(name)}.meta.json")
def _read_cache(name: str) -> Optional[Dict]:
"""Read cached marketplace data if valid (not expired)."""
data_path = _cache_data_path(name)
meta_path = _cache_meta_path(name)
if not os.path.exists(data_path) or not os.path.exists(meta_path):
return None
try:
with open(meta_path, "r") as f:
meta = json.load(f)
fetched_at = meta.get("fetched_at", 0)
ttl = meta.get("ttl_seconds", _CACHE_TTL_SECONDS)
if time.time() - fetched_at > ttl:
return None # Expired
with open(data_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError, KeyError) as exc:
logger.debug("Cache read failed for '%s': %s", name, exc)
return None
def _read_stale_cache(name: str) -> Optional[Dict]:
"""Read cached data even if expired (stale-while-revalidate)."""
data_path = _cache_data_path(name)
if not os.path.exists(data_path):
return None
try:
with open(data_path, "r") as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None
def _write_cache(name: str, data: Dict) -> None:
"""Write marketplace data and metadata to cache."""
data_path = _cache_data_path(name)
meta_path = _cache_meta_path(name)
try:
with open(data_path, "w") as f:
json.dump(data, f, indent=2)
with open(meta_path, "w") as f:
json.dump(
{"fetched_at": time.time(), "ttl_seconds": _CACHE_TTL_SECONDS},
f,
)
except OSError as exc:
logger.debug("Cache write failed for '%s': %s", name, exc)
def _clear_cache(name: str) -> None:
"""Remove cached data for a marketplace."""
for path in (_cache_data_path(name), _cache_meta_path(name)):
try:
os.remove(path)
except OSError:
pass
# ---------------------------------------------------------------------------
# Network fetch
# ---------------------------------------------------------------------------
def _try_proxy_fetch(
source: MarketplaceSource,
file_path: str,
) -> Optional[Dict]:
"""Try to fetch marketplace JSON via the registry proxy.
Returns parsed JSON dict on success, ``None`` when no proxy is
configured or the entry download fails.
"""
from ..deps.registry_proxy import RegistryConfig
cfg = RegistryConfig.from_env()
if cfg is None:
return None
from ..deps.artifactory_entry import fetch_entry_from_archive
content = fetch_entry_from_archive(
host=cfg.host,
prefix=cfg.prefix,
owner=source.owner,
repo=source.repo,
file_path=file_path,
ref=source.branch,
scheme=cfg.scheme,
headers=cfg.get_headers(),
)
if content is None:
return None
try:
return json.loads(content)
except (json.JSONDecodeError, ValueError):
logger.debug(
"Proxy returned non-JSON for %s/%s %s",
source.owner, source.repo, file_path,
)
return None
def _github_contents_url(source: MarketplaceSource, file_path: str) -> str:
"""Build the GitHub Contents API URL for a file."""
from ..core.auth import AuthResolver
host_info = AuthResolver.classify_host(source.host)
api_base = host_info.api_base
return f"{api_base}/repos/{source.owner}/{source.repo}/contents/{file_path}?ref={source.branch}"
def _fetch_file(
source: MarketplaceSource,
file_path: str,
auth_resolver: Optional[object] = None,
) -> Optional[Dict]:
"""Fetch a JSON file from a GitHub repo.
When ``PROXY_REGISTRY_URL`` is set, tries the registry proxy first via
Artifactory Archive Entry Download. Falls back to the GitHub Contents
API unless ``PROXY_REGISTRY_ONLY=1`` blocks direct access.
Returns parsed JSON or ``None`` if the file does not exist (404).
Raises ``MarketplaceFetchError`` on unexpected failures.
"""
# Proxy-first: try Artifactory Archive Entry Download
proxy_result = _try_proxy_fetch(source, file_path)
if proxy_result is not None:
return proxy_result
# When registry-only mode is active, block direct GitHub API access
from ..deps.registry_proxy import RegistryConfig
cfg = RegistryConfig.from_env()
if cfg is not None and cfg.enforce_only:
logger.debug(
"PROXY_REGISTRY_ONLY blocks direct GitHub fetch for %s/%s %s",
source.owner, source.repo, file_path,
)
return None
# Fallback: GitHub Contents API
url = _github_contents_url(source, file_path)
def _do_fetch(token, _git_env):
headers = {
"Accept": "application/vnd.github.v3.raw",
"User-Agent": "apm-cli",
}
if token:
headers["Authorization"] = f"token {token}"
resp = requests.get(url, headers=headers, timeout=30)
if resp.status_code == 404:
return None
resp.raise_for_status()
return resp.json()
if auth_resolver is None:
from ..core.auth import AuthResolver
auth_resolver = AuthResolver()
try:
return auth_resolver.try_with_fallback(
source.host,
_do_fetch,
org=source.owner,
unauth_first=True,
)
except Exception as exc:
raise MarketplaceFetchError(source.name, str(exc)) from exc
def _auto_detect_path(
source: MarketplaceSource,
auth_resolver: Optional[object] = None,
) -> Optional[str]:
"""Probe candidate locations and return the first that exists.
Returns ``None`` if no location contains a marketplace.json.
Raises ``MarketplaceFetchError`` on non-404 failures (auth errors, etc.).
"""
for candidate in _MARKETPLACE_PATHS:
data = _fetch_file(source, candidate, auth_resolver=auth_resolver)
if data is not None:
return candidate
return None
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def fetch_marketplace(
source: MarketplaceSource,
*,
force_refresh: bool = False,
auth_resolver: Optional[object] = None,
) -> MarketplaceManifest:
"""Fetch and parse a marketplace manifest.
Uses cache when available (1h TTL). Falls back to stale cache on
network errors.
Args:
source: Marketplace source to fetch.
force_refresh: Skip cache and re-fetch from network.
auth_resolver: Optional ``AuthResolver`` instance (created if None).
Returns:
MarketplaceManifest: Parsed manifest.
Raises:
MarketplaceFetchError: If fetch fails and no cache is available.
"""
cache_name = _cache_key(source)
# Try fresh cache first
if not force_refresh:
cached = _read_cache(cache_name)
if cached is not None:
logger.debug("Using cached marketplace data for '%s'", source.name)
return parse_marketplace_json(cached, source.name)
# Fetch from network
try:
data = _fetch_file(source, source.path, auth_resolver=auth_resolver)
if data is None:
raise MarketplaceFetchError(
source.name,
f"marketplace.json not found at '{source.path}' "
f"in {source.owner}/{source.repo}",
)
_write_cache(cache_name, data)
return parse_marketplace_json(data, source.name)
except MarketplaceFetchError:
# Stale-while-revalidate: serve expired cache on network error
stale = _read_stale_cache(cache_name)
if stale is not None:
logger.warning(
"Network error fetching '%s'; using stale cache", source.name
)
return parse_marketplace_json(stale, source.name)
raise
def fetch_or_cache(
source: MarketplaceSource,
*,
auth_resolver: Optional[object] = None,
) -> MarketplaceManifest:
"""Convenience wrapper -- same as ``fetch_marketplace`` with defaults."""
return fetch_marketplace(source, auth_resolver=auth_resolver)
def search_marketplace(
query: str,
source: MarketplaceSource,
*,
auth_resolver: Optional[object] = None,
) -> List[MarketplacePlugin]:
"""Search a single marketplace for plugins matching *query*."""
manifest = fetch_marketplace(source, auth_resolver=auth_resolver)
return manifest.search(query)
def search_all_marketplaces(
query: str,
*,
auth_resolver: Optional[object] = None,
) -> List[MarketplacePlugin]:
"""Search across all registered marketplaces.
Returns plugins matching the query, annotated with their source marketplace.
"""
results: List[MarketplacePlugin] = []
for source in get_registered_marketplaces():
try:
manifest = fetch_marketplace(source, auth_resolver=auth_resolver)
results.extend(manifest.search(query))
except MarketplaceFetchError as exc:
logger.warning("Skipping marketplace '%s': %s", source.name, exc)
return results
def clear_marketplace_cache(
name: Optional[str] = None,
host: str = "github.com",
) -> int:
"""Clear cached data for one or all marketplaces.
Returns the number of caches cleared.
"""
if name:
# Build a minimal source to derive the cache key
_src = MarketplaceSource(name=name, owner="", repo="", host=host)
_clear_cache(_cache_key(_src))
return 1
count = 0
for source in get_registered_marketplaces():
_clear_cache(_cache_key(source))
count += 1
return count