-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy path_user.py
More file actions
153 lines (126 loc) · 5.42 KB
/
_user.py
File metadata and controls
153 lines (126 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import logging
import os
import platform
import subprocess
import sys
from configparser import ConfigParser
import click
from securesystemslib.signer import Key, Signer
logger = logging.getLogger(__name__)
# some known locations where we might find libykcs11.
# These should all be _system_ locations (not user writable)
LIBYKCS11_LOCATIONS = {
"Linux": [
"/usr/lib/x86_64-linux-gnu/libykcs11.so",
"/usr/lib64/libykcs11.so",
"/usr/local/lib/libykcs11.so",
],
"Darwin": ["/opt/homebrew/lib/libykcs11.dylib", "/usr/local/lib/libykcs11.dylib"],
}
def bold(text: str) -> str:
return click.style(text, bold=True)
def _get_base_branch_internal(remote: str = "origin") -> str:
"""Get base branch by auto-detecting from git remote
Args:
remote: The git remote name (defaults to "origin")
Returns:
The name of the base branch
"""
try:
# Try to auto-detect default branch from git remote
cmd = [
"git",
"-c",
"user.name=TUF-on-CI",
"-c",
"user.email=41898282+github-actions[bot]@users.noreply.github.com",
"symbolic-ref",
f"refs/remotes/{remote}/HEAD",
]
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
# Output is like 'refs/remotes/origin/main'
ref = result.stdout.strip()
branch = ref.split("/")[-1]
logger.debug("Auto-detected base branch: %s", branch)
return branch
except (subprocess.CalledProcessError, IndexError):
logger.debug("Failed to auto-detect base branch, defaulting to 'main'")
return "main"
class User:
"""Class that manages user configuration and manages the users signer cache"""
def __init__(self, path: str):
self._config_path = path
self._config = ConfigParser(interpolation=None)
self._config.read(path)
# TODO: create config if missing, ask/confirm values from user
if not self._config:
raise click.ClickException(f"Settings file {path} not found")
try:
self.name = self._config["settings"]["user-name"].lower()
if not self.name.startswith("@"):
self.name = f"@{self.name}"
self.push_remote = self._config["settings"]["push-remote"]
self.pull_remote = self._config["settings"]["pull-remote"]
except KeyError as e:
raise click.ClickException(
f"Failed to find required setting {e} in {path}"
) from e
# signing key config is not required
if "signing-keys" in self._config:
self._signing_key_uris = dict(self._config.items("signing-keys"))
else:
self._signing_key_uris = {}
# probe for pykcs11lib if it's not set
try:
self.pykcs11lib = self._config["settings"]["pykcs11lib"]
except KeyError:
for loc in LIBYKCS11_LOCATIONS.get(platform.system(), []):
if os.path.exists(loc):
self.pykcs11lib = loc
logger.debug("Using probed YKCS11 location %s", self.pykcs11lib)
break
else:
raise click.ClickException("Failed to find libykcs11")
# signer cache gets populated as they are used the first time
self._signers: dict[str, Signer] = {}
# detect or use configured base branch
self.base_branch = self._get_base_branch()
def _get_base_branch(self) -> str:
"""Get base branch from config or auto-detect"""
try:
# First try to get from config
return self._config["settings"]["base-branch"]
except KeyError:
pass
# Auto-detect using shared utility
return _get_base_branch_internal(remote=self.pull_remote)
def get_signer(self, key: Key) -> Signer:
"""Returns a Signer for the given public key
The signer sources are (in order):
* signers cached via set_signer()
* any configured signer from 'signing-keys' config section
* for sigstore type keys, a Signer is automatically created
* for any remaining keys, HSM is assumed and a signer is created
"""
def get_secret(secret: str) -> str:
msg = f"Enter {secret} to sign (provide touch/bio authentication if needed)"
# special case for tests -- prompt() will lockup trying to hide STDIN:
if not sys.stdin.isatty():
return sys.stdin.readline().rstrip()
return click.prompt(bold(msg), hide_input=True)
if key.keyid in self._signers:
return self._signers[key.keyid]
if key.keyid in self._signing_key_uris:
# signer is not cached yet, but config exists
uri = self._signing_key_uris[key.keyid]
return Signer.from_priv_key_uri(uri, key, get_secret)
if key.keytype == "sigstore-oidc":
# signer is not cached, no configuration was found, type is sigstore
return Signer.from_priv_key_uri("sigstore:?ambient=false", key, get_secret)
# signer is not cached, no configuration was found: assume Yubikey
return Signer.from_priv_key_uri("hsm:", key, get_secret)
def set_signer(self, key: Key, signer: Signer) -> None:
"""Cache a signer for a keyid
This should be called after a successful signing operation
"""
self._signers[key.keyid] = signer