Skip to content

Commit 01066e1

Browse files
committed
add auth profiles and deprecate build
1 parent 0912a21 commit 01066e1

File tree

15 files changed

+1393
-132
lines changed

15 files changed

+1393
-132
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ name = "seqspec"
1212
crate-type = ["rlib", "cdylib"]
1313

1414
[dependencies]
15-
clap = { version = "4.5.46", features = ["derive"] }
15+
anyhow = "1"
16+
clap = { version = "4.5.46", features = ["derive", "env"] }
1617
jsonschema = "0.33.0"
1718
pyo3 = { version = "0.25", optional = true, features = ["extension-module", "abi3-py312"] }
1819
# pythonize = "0.25.0"
@@ -22,6 +23,7 @@ serde_yaml = "0.9"
2223
thiserror = "1"
2324
reqwest = { version = "0.12", features = ["blocking", "rustls-tls"] }
2425
flate2 = "1"
26+
toml = "0.8"
2527

2628

2729
[features]

seqspec/auth.py

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
import json
2+
import os
3+
from pathlib import Path
4+
from typing import Dict, List, Optional, Tuple
5+
from urllib.parse import urlparse
6+
7+
import tomllib
8+
9+
AUTH_CONFIG_ENV = "SEQSPEC_AUTH_CONFIG"
10+
11+
12+
class AuthProfile:
13+
def __init__(
14+
self, hosts: List[str], kind: str, username_env: str, password_env: str
15+
) -> None:
16+
self.hosts = hosts
17+
self.kind = kind
18+
self.username_env = username_env
19+
self.password_env = password_env
20+
21+
@classmethod
22+
def from_dict(cls, data: Dict) -> "AuthProfile":
23+
return cls(
24+
hosts=list(data.get("hosts", [])),
25+
kind=str(data.get("kind", "basic")),
26+
username_env=str(data["username_env"]),
27+
password_env=str(data["password_env"]),
28+
)
29+
30+
def to_dict(self) -> Dict:
31+
return {
32+
"hosts": self.hosts,
33+
"kind": self.kind,
34+
"username_env": self.username_env,
35+
"password_env": self.password_env,
36+
}
37+
38+
def matches_host(self, host: str) -> bool:
39+
return any(candidate.lower() == host.lower() for candidate in self.hosts)
40+
41+
42+
class AuthRegistry:
43+
def __init__(self, location: Dict, profiles: Dict[str, AuthProfile]) -> None:
44+
self.location = location
45+
self.profiles = profiles
46+
47+
@classmethod
48+
def load(cls) -> "AuthRegistry":
49+
location = config_location()
50+
profiles = load_profiles(location)
51+
return cls(location, profiles)
52+
53+
def profile_summaries(self) -> List[Dict]:
54+
summaries = []
55+
for name, profile in self.profiles.items():
56+
summaries.append(
57+
{
58+
"name": name,
59+
"kind": profile.kind,
60+
"hosts": profile.hosts,
61+
"username_env": profile.username_env,
62+
"username_present": os.environ.get(profile.username_env) is not None,
63+
"password_env": profile.password_env,
64+
"password_present": os.environ.get(profile.password_env) is not None,
65+
}
66+
)
67+
return summaries
68+
69+
def resolve_summary(
70+
self, url: str, selected_profile: Optional[str] = None
71+
) -> Dict[str, object]:
72+
host = host_from_url(url)
73+
resolved = self.resolve_profile(host, selected_profile)
74+
profile = None
75+
if resolved is not None:
76+
name, match = resolved
77+
profile = {
78+
"name": name,
79+
"kind": match.kind,
80+
"hosts": match.hosts,
81+
"username_env": match.username_env,
82+
"username_present": os.environ.get(match.username_env) is not None,
83+
"password_env": match.password_env,
84+
"password_present": os.environ.get(match.password_env) is not None,
85+
}
86+
return {"url": url, "host": host, "profile": profile}
87+
88+
def resolve_profile(
89+
self, host: str, selected_profile: Optional[str] = None
90+
) -> Optional[Tuple[str, AuthProfile]]:
91+
if selected_profile is not None:
92+
if selected_profile not in self.profiles:
93+
raise ValueError(
94+
f"auth profile '{selected_profile}' is not defined in {display_config_path(self.location)}"
95+
)
96+
profile = self.profiles[selected_profile]
97+
if not profile.matches_host(host):
98+
raise ValueError(
99+
f"auth profile '{selected_profile}' does not match host '{host}'"
100+
)
101+
return (selected_profile, profile)
102+
103+
matches = [
104+
(name, profile)
105+
for name, profile in self.profiles.items()
106+
if profile.matches_host(host)
107+
]
108+
if len(matches) > 1:
109+
names = ", ".join(name for name, _ in matches)
110+
raise ValueError(f"multiple auth profiles match host '{host}': {names}")
111+
if len(matches) == 1:
112+
return matches[0]
113+
return None
114+
115+
def resolve_requests_auth(
116+
self, url: str, selected_profile: Optional[str] = None
117+
) -> Optional[Tuple[str, str]]:
118+
host = host_from_url(url)
119+
resolved = self.resolve_profile(host, selected_profile)
120+
if resolved is None:
121+
return None
122+
profile_name, profile = resolved
123+
username = os.environ.get(profile.username_env)
124+
if username is None:
125+
raise ValueError(
126+
f"auth profile '{profile_name}' requires env var '{profile.username_env}' for host '{host}'"
127+
)
128+
password = os.environ.get(profile.password_env)
129+
if password is None:
130+
raise ValueError(
131+
f"auth profile '{profile_name}' requires env var '{profile.password_env}' for host '{host}'"
132+
)
133+
return (username, password)
134+
135+
136+
def config_location() -> Dict[str, object]:
137+
env_path = os.environ.get(AUTH_CONFIG_ENV)
138+
if env_path:
139+
path = Path(env_path)
140+
return {
141+
"path": path,
142+
"source": f"env:{AUTH_CONFIG_ENV}",
143+
"exists": path.exists(),
144+
}
145+
146+
xdg = os.environ.get("XDG_CONFIG_HOME")
147+
if xdg:
148+
path = Path(xdg) / "seqspec" / "auth.toml"
149+
return {"path": path, "source": "xdg_config_home", "exists": path.exists()}
150+
151+
home = os.environ.get("HOME")
152+
if home:
153+
path = Path(home) / ".config" / "seqspec" / "auth.toml"
154+
return {"path": path, "source": "home_default", "exists": path.exists()}
155+
156+
return {"path": None, "source": "unavailable", "exists": False}
157+
158+
159+
def load_profiles(location: Dict[str, object]) -> Dict[str, AuthProfile]:
160+
path = location.get("path")
161+
if path is None:
162+
return {}
163+
164+
path = Path(path)
165+
if not path.exists():
166+
if str(location["source"]).startswith("env:"):
167+
raise ValueError(f"auth config does not exist: {path}")
168+
return {}
169+
170+
with open(path, "rb") as stream:
171+
config = tomllib.load(stream)
172+
173+
profiles = config.get("profiles", {})
174+
return {name: AuthProfile.from_dict(profile) for name, profile in profiles.items()}
175+
176+
177+
def init_profile(profile_name: str, profile: AuthProfile) -> Dict[str, object]:
178+
location = config_location()
179+
path = location.get("path")
180+
if path is None:
181+
raise ValueError("no auth config path is available on this system")
182+
183+
path = Path(path)
184+
created_config = not path.exists()
185+
profiles = {}
186+
if path.exists():
187+
profiles = load_profiles(location)
188+
updated_profile = profile_name in profiles
189+
profiles[profile_name] = profile
190+
191+
path.parent.mkdir(parents=True, exist_ok=True)
192+
path.write_text(render_config(profiles))
193+
194+
return {
195+
"profile": profile_name,
196+
"path": str(path),
197+
"created_config": created_config,
198+
"updated_profile": updated_profile,
199+
"hosts": profile.hosts,
200+
"kind": profile.kind,
201+
"username_env": profile.username_env,
202+
"password_env": profile.password_env,
203+
}
204+
205+
206+
def render_config(profiles: Dict[str, AuthProfile]) -> str:
207+
lines: List[str] = []
208+
for name, profile in profiles.items():
209+
lines.append(f"[profiles.{name}]")
210+
hosts = ", ".join(json.dumps(host) for host in profile.hosts)
211+
lines.append(f"hosts = [{hosts}]")
212+
lines.append(f'kind = "{profile.kind}"')
213+
lines.append(f'username_env = "{profile.username_env}"')
214+
lines.append(f'password_env = "{profile.password_env}"')
215+
lines.append("")
216+
return "\n".join(lines).rstrip() + "\n"
217+
218+
219+
def host_from_url(url: str) -> str:
220+
parsed = urlparse(url)
221+
if not parsed.scheme:
222+
raise ValueError(f"URL '{url}' does not contain a scheme")
223+
if not parsed.hostname:
224+
raise ValueError(f"URL '{url}' has an empty host")
225+
return parsed.hostname.lower()
226+
227+
228+
def display_config_path(location: Dict[str, object]) -> str:
229+
path = location.get("path")
230+
if path is None:
231+
return "<unavailable>"
232+
return str(path)

0 commit comments

Comments
 (0)