-
Notifications
You must be signed in to change notification settings - Fork 97
Expand file tree
/
Copy pathauth.py
More file actions
448 lines (379 loc) · 16.5 KB
/
auth.py
File metadata and controls
448 lines (379 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
"""Centralized authentication resolution for APM CLI.
Every APM operation that touches a remote host MUST use AuthResolver.
Resolution is per-(host, org, repo-path) tuple when repo context is known,
thread-safe, and cached per-process.
All token-bearing requests use HTTPS — that is the transport security
boundary. Global env vars are tried for every host; if the token is
wrong for the target host, ``try_with_fallback`` retries with git
credential helpers automatically.
Usage::
resolver = AuthResolver()
ctx = resolver.resolve("github.com", org="microsoft")
# ctx.token, ctx.source, ctx.token_type, ctx.host_info, ctx.git_env
For dependencies::
ctx = resolver.resolve_for_dep(dep_ref)
For operations with automatic auth/unauth fallback::
result = resolver.try_with_fallback(
"github.com", lambda token, env: download(token, env),
org="microsoft",
)
"""
from __future__ import annotations
import os
import sys
import threading
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
from apm_cli.core.token_manager import GitHubTokenManager
from apm_cli.utils.github_host import (
default_host,
is_azure_devops_hostname,
is_github_hostname,
is_valid_fqdn,
)
if TYPE_CHECKING:
from apm_cli.models.dependency.reference import DependencyReference
T = TypeVar("T")
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class HostInfo:
"""Immutable description of a remote Git host."""
host: str
kind: str # "github" | "ghe_cloud" | "ghes" | "ado" | "generic"
has_public_repos: bool
api_base: str
@dataclass
class AuthContext:
"""Resolved authentication for a single (host, org) pair.
Treat as immutable after construction — fields are never mutated.
Not frozen because ``git_env`` is a dict (unhashable).
"""
token: Optional[str]
source: str # e.g. "GITHUB_APM_PAT_ORGNAME", "GITHUB_TOKEN", "none"
token_type: str # "fine-grained", "classic", "oauth", "github-app", "unknown"
host_info: HostInfo
git_env: dict = field(compare=False, repr=False)
# ---------------------------------------------------------------------------
# AuthResolver
# ---------------------------------------------------------------------------
class AuthResolver:
"""Single source of truth for auth resolution.
Every APM operation that touches a remote host MUST use this class.
Resolution is per-(host, org) pair, thread-safe, cached per-process.
"""
def __init__(self, token_manager: Optional[GitHubTokenManager] = None):
self._token_manager = token_manager or GitHubTokenManager()
self._cache: dict[tuple, AuthContext] = {}
self._lock = threading.Lock()
# -- host classification ------------------------------------------------
@staticmethod
def classify_host(host: str) -> HostInfo:
"""Return a ``HostInfo`` describing *host*."""
h = host.lower()
if h == "github.com":
return HostInfo(
host=host,
kind="github",
has_public_repos=True,
api_base="https://api.github.com",
)
if h.endswith(".ghe.com"):
return HostInfo(
host=host,
kind="ghe_cloud",
has_public_repos=False,
api_base=f"https://{host}/api/v3",
)
if is_azure_devops_hostname(host):
return HostInfo(
host=host,
kind="ado",
has_public_repos=True,
api_base="https://dev.azure.com",
)
# GHES: GITHUB_HOST is set to a non-github.com, non-ghe.com FQDN
ghes_host = os.environ.get("GITHUB_HOST", "").lower()
if ghes_host and ghes_host == h and ghes_host != "github.com" and not ghes_host.endswith(".ghe.com"):
if is_valid_fqdn(ghes_host):
return HostInfo(
host=host,
kind="ghes",
has_public_repos=True,
api_base=f"https://{host}/api/v3",
)
# Generic FQDN (GitLab, Bitbucket, self-hosted, etc.)
return HostInfo(
host=host,
kind="generic",
has_public_repos=True,
api_base=f"https://{host}/api/v3",
)
# -- token type detection -----------------------------------------------
@staticmethod
def detect_token_type(token: str) -> str:
"""Classify a token string by its prefix.
Note: EMU (Enterprise Managed Users) tokens use standard PAT
prefixes (``ghp_`` or ``github_pat_``). There is no prefix that
identifies a token as EMU-scoped — that's a property of the
account, not the token format.
Prefix reference (docs.github.com):
- ``github_pat_`` → fine-grained PAT
- ``ghp_`` → classic PAT
- ``ghu_`` → OAuth user-to-server (e.g. ``gh auth login``)
- ``gho_`` → OAuth app token
- ``ghs_`` → GitHub App installation (server-to-server)
- ``ghr_`` → GitHub App refresh token
"""
if token.startswith("github_pat_"):
return "fine-grained"
if token.startswith("ghp_"):
return "classic"
if token.startswith("ghu_"):
return "oauth"
if token.startswith("gho_"):
return "oauth"
if token.startswith("ghs_"):
return "github-app"
if token.startswith("ghr_"):
return "github-app"
return "unknown"
# -- core resolution ----------------------------------------------------
def resolve(
self,
host: str,
org: Optional[str] = None,
repo_path: Optional[str] = None,
) -> AuthContext:
"""Resolve auth for *(host, org, repo_path)*. Cached & thread-safe."""
key = (
host.lower() if host else host,
org.lower() if org else org,
repo_path,
)
with self._lock:
cached = self._cache.get(key)
if cached is not None:
return cached
# Hold lock during entire credential resolution to prevent duplicate
# credential-helper popups when parallel downloads resolve the same
# (host, org) concurrently. The first caller fills the cache; all
# subsequent callers for the same key become O(1) cache hits.
# Bounded by APM_GIT_CREDENTIAL_TIMEOUT (default 60s). No deadlock
# risk: single lock, never nested.
host_info = self.classify_host(host)
token, source = self._resolve_token(host_info, org, repo_path=repo_path)
token_type = self.detect_token_type(token) if token else "unknown"
git_env = self._build_git_env(token)
ctx = AuthContext(
token=token,
source=source,
token_type=token_type,
host_info=host_info,
git_env=git_env,
)
self._cache[key] = ctx
return ctx
def resolve_for_dep(self, dep_ref: "DependencyReference") -> AuthContext:
"""Resolve auth from a ``DependencyReference``."""
host = dep_ref.host or default_host()
org: Optional[str] = None
repo_path: Optional[str] = None
if dep_ref.repo_url:
parts = dep_ref.repo_url.split("/")
if parts:
org = parts[0]
repo_path = f"{dep_ref.repo_url}.git"
return self.resolve(host, org, repo_path=repo_path)
# -- fallback strategy --------------------------------------------------
def try_with_fallback(
self,
host: str,
operation: Callable[..., T],
*,
org: Optional[str] = None,
repo_path: Optional[str] = None,
unauth_first: bool = False,
verbose_callback: Optional[Callable[[str], None]] = None,
) -> T:
"""Execute *operation* with automatic auth/unauth fallback.
Parameters
----------
host:
Target git host.
operation:
``operation(token, git_env) -> T`` — the work to do.
org:
Optional organisation for per-org token lookup.
unauth_first:
If *True*, try unauthenticated first (saves rate limits, EMU-safe).
verbose_callback:
Called with a human-readable step description at each attempt.
When the resolved token comes from a global env var and fails
(e.g. a github.com PAT tried on ``*.ghe.com``), the method
retries with ``git credential fill`` before giving up.
"""
auth_ctx = self.resolve(host, org, repo_path=repo_path)
host_info = auth_ctx.host_info
git_env = auth_ctx.git_env
def _log(msg: str) -> None:
if verbose_callback:
verbose_callback(msg)
def _try_credential_fallback(exc: Exception) -> T:
"""Retry with git-credential-fill when an env-var token fails."""
if auth_ctx.source in ("git-credential-fill", "none"):
raise exc
if host_info.kind == "ado":
raise exc
_log(f"Token from {auth_ctx.source} failed, trying fallback credentials for {host}")
gh_token = self._token_manager.resolve_credential_from_gh_cli(host)
if gh_token:
return operation(gh_token, self._build_git_env(gh_token))
cred = self._token_manager.resolve_credential_from_git(host, path=repo_path)
if cred:
return operation(cred, self._build_git_env(cred))
raise exc
# Hosts that never have public repos → auth-only
if host_info.kind in ("ghe_cloud", "ado"):
_log(f"Auth-only attempt for {host_info.kind} host {host}")
try:
return operation(auth_ctx.token, git_env)
except Exception as exc:
return _try_credential_fallback(exc)
if unauth_first:
# Validation path: save rate limits, EMU-safe
try:
_log(f"Trying unauthenticated access to {host}")
return operation(None, git_env)
except Exception:
if auth_ctx.token:
_log(f"Unauthenticated failed, retrying with token (source: {auth_ctx.source})")
try:
return operation(auth_ctx.token, git_env)
except Exception as exc:
return _try_credential_fallback(exc)
raise
else:
# Download path: auth-first for higher rate limits
if auth_ctx.token:
try:
_log(f"Trying authenticated access to {host} (source: {auth_ctx.source})")
return operation(auth_ctx.token, git_env)
except Exception as exc:
if host_info.has_public_repos:
_log("Authenticated failed, retrying without token")
try:
return operation(None, git_env)
except Exception:
return _try_credential_fallback(exc)
return _try_credential_fallback(exc)
else:
_log(f"No token available, trying unauthenticated access to {host}")
return operation(None, git_env)
# -- error context ------------------------------------------------------
def build_error_context(
self, host: str, operation: str, org: Optional[str] = None
) -> str:
"""Build an actionable error message for auth failures."""
auth_ctx = self.resolve(host, org)
lines: list[str] = [f"Authentication failed for {operation} on {host}."]
if auth_ctx.token:
lines.append(f"Token was provided (source: {auth_ctx.source}, type: {auth_ctx.token_type}).")
host_info = self.classify_host(host)
if host_info.kind == "ghe_cloud":
lines.append(
"GHE Cloud Data Residency hosts (*.ghe.com) require "
"enterprise-scoped tokens. Ensure your PAT is authorized "
"for this enterprise."
)
elif host.lower() == "github.com":
lines.append(
"If your organization uses SAML SSO or is an EMU org, "
"ensure your PAT is authorized at "
"https://github.com/settings/tokens"
)
else:
lines.append(
"If your organization uses SAML SSO, you may need to "
"authorize your token at https://github.com/settings/tokens"
)
else:
lines.append("No token available.")
lines.append(
"Set GITHUB_APM_PAT or GITHUB_TOKEN, or run 'gh auth login'."
)
if org:
lines.append(
f"If packages span multiple organizations, set per-org tokens: "
f"GITHUB_APM_PAT_{_org_to_env_suffix(org)}"
)
lines.append("Run with --verbose for detailed auth diagnostics.")
return "\n".join(lines)
# -- internals ----------------------------------------------------------
def _resolve_token(
self, host_info: HostInfo, org: Optional[str], repo_path: Optional[str] = None
) -> tuple[Optional[str], str]:
"""Walk the token resolution chain. Returns (token, source).
Resolution order:
1. Per-org env var ``GITHUB_APM_PAT_{ORG}`` (any host)
2. Global env vars ``GITHUB_APM_PAT`` → ``GITHUB_TOKEN`` → ``GH_TOKEN``
(any host — if the token is wrong for the target host,
``try_with_fallback`` retries with git credentials)
3. Git credential helper (any host except ADO)
All token-bearing requests use HTTPS, which is the transport
security boundary. Host-gating global env vars is unnecessary
and creates DX friction for multi-host setups.
"""
# 1. Per-org env var (GitHub-like hosts only — ADO uses ADO_APM_PAT)
if org and host_info.kind not in ("ado",):
env_name = f"GITHUB_APM_PAT_{_org_to_env_suffix(org)}"
token = os.environ.get(env_name)
if token:
return token, env_name
# 2. Global env var chain (any host)
purpose = self._purpose_for_host(host_info)
token = self._token_manager.get_token_for_purpose(purpose)
if token:
source = self._identify_env_source(purpose)
return token, source
# 3. gh CLI active account (GitHub-like hosts only)
gh_token = self._token_manager.resolve_credential_from_gh_cli(host_info.host)
if gh_token:
return gh_token, "gh-auth-token"
# 4. Git credential helper (not for ADO — uses its own PAT)
if host_info.kind not in ("ado",):
credential = self._token_manager.resolve_credential_from_git(
host_info.host,
path=repo_path,
)
if credential:
return credential, "git-credential-fill"
return None, "none"
@staticmethod
def _purpose_for_host(host_info: HostInfo) -> str:
if host_info.kind == "ado":
return "ado_modules"
return "modules"
def _identify_env_source(self, purpose: str) -> str:
"""Return the name of the first env var that matched for *purpose*."""
for var in self._token_manager.TOKEN_PRECEDENCE.get(purpose, []):
if os.environ.get(var):
return var
return "env"
@staticmethod
def _build_git_env(token: Optional[str] = None) -> dict:
"""Pre-built env dict for subprocess git calls."""
env = os.environ.copy()
env["GIT_TERMINAL_PROMPT"] = "0"
# On Windows, GIT_ASKPASS='' can cause issues; use 'echo' instead
env["GIT_ASKPASS"] = "" if sys.platform != "win32" else "echo"
if token:
env["GIT_TOKEN"] = token
return env
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _org_to_env_suffix(org: str) -> str:
"""Convert an org name to an env-var suffix (upper-case, hyphens → underscores)."""
return org.upper().replace("-", "_")