-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathaction.py
More file actions
234 lines (175 loc) · 7.02 KB
/
action.py
File metadata and controls
234 lines (175 loc) · 7.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import logging
import os
import shlex
import sys
from datetime import datetime
from glob import glob
from pathlib import Path
from typing import NoReturn
from prettytable import PrettyTable, TableStyle
from pypi_attestations import Attestation, Distribution
from sigstore import oidc
from sigstore.models import ClientTrustConfig
from sigstore.sign import SigningContext
logger = logging.getLogger(__name__)
def _fatal(title: str, *, detail: str, tip: str | None = None) -> NoReturn:
"""
Report a fatal error to GitHub Actions and exit.
"""
summary_message = f"### ❌ Fatal: {title}\n\n{detail}"
if tip:
summary_message = f"{summary_message}\n\n> [!TIP]\n> {tip}"
_summary(summary_message)
print("::error title={}::{}".format(title, detail))
sys.exit(1)
def _summary(msg: str) -> None:
"""
Write a message to the GitHub Actions job summary, if available.
"""
if not (step_summary := os.getenv("GITHUB_STEP_SUMMARY")):
return
with Path(step_summary).open("a") as io:
print(msg, file=io)
def _get_input(name: str) -> str | None:
"""
Get an action input from the environment, or `None` if not set.
"""
env = f"ATTEST_ACTION_INPUT_{name.upper().replace('-', '_')}"
return os.getenv(env)
def _get_path_patterns() -> set[str]:
"""
Retrieve and normalize the 'paths' input.
Paths are split on whitespace (with shell lexing rules), and any bare directory
paths are normalized to include all files within that directory (i.e. `foo/`
becomes `foo/*`).
"""
raw_paths = _get_input("paths")
if not raw_paths:
_fatal(
"No 'paths' input provided",
detail="The `paths` input is required but was not provided.",
tip="Specify one or more paths or glob patterns in the `paths` input.",
)
paths = shlex.split(raw_paths)
if not paths:
_fatal(
"No paths provided in 'paths' input",
detail="The `paths` input was provided but contained no valid paths.",
tip="Specify one or more paths or glob patterns in the `paths` input.",
)
# Normalize `foo/` to `foo/*`
paths = [str(Path(p) / "*") if p.endswith(("/", "\\")) else p for p in paths]
return set(paths)
def _unroll_files(patterns: set[str]) -> set[Path]:
"""
Given one or more path patterns (which may include glob patterns), unroll and
return all matching files.
"""
files = set()
for pattern in patterns:
for path in glob(pattern, recursive=True):
path = Path(path)
if path.is_file():
files.add(path)
return files
def _collect_dists(patterns: set[str]) -> list[tuple[Path, Distribution]]:
"""
Given one or more path patterns (which may include glob patterns), collect and
return all Python distributions found at those paths.
A bare directory path like `foo/` is treated as `foo/*`, i.e.
all distributions within that directory.
Distributions are returned as a list of tuples, where each tuple contains
the `Path` to the distribution file and the corresponding `Distribution`
object.
"""
files = _unroll_files(patterns)
dists = []
for file in files:
try:
dist = Distribution.from_file(file)
dists.append((file, dist))
except Exception as _:
logger.debug(f"skipping non-distribution file: {file}")
continue
return dists
def _get_id_token() -> oidc.IdentityToken:
"""
Obtain the ambient OIDC identity token.
"""
try:
id_token = oidc.detect_credential()
except Exception as exc:
_fatal(
"Failed to obtain OIDC token",
detail=f"Could not detect an ambient OIDC credential.\n\nCause: {exc}",
tip="Ensure that your job has the `id-token: write` permission set.",
)
if not id_token:
_fatal(
"Failed to obtain OIDC token",
detail="The environment does not appear to support ambient OIDC credentials.",
tip="This action must be run within GitHub Actions.",
)
return oidc.IdentityToken(raw_token=id_token)
def _attest(
dists: list[tuple[Path, Distribution]],
id_token: oidc.IdentityToken,
overwrite: bool = False,
) -> list[tuple[Distribution, Attestation]]:
"""
Generate and write PEP 740 publish attestations for the given distributions.
If `overwrite` is `False`, existing attestation files will not be overwritten
and an error will be raised instead.
"""
if not dists:
_fatal(
"No distributions to attest",
detail="No valid Python distributions were collected from the specified paths.",
tip="Ensure that the `paths` input points to valid distribution files.",
)
# Before setting up any signing state, precompute the paths we intend
# to write attestations to (and fail if any already exist and overwrite
# is disabled).
dists_with_dests: list[tuple[Path, Distribution, Path]] = []
for file, dist in dists:
parent = file.parent
filename = file.name
attestation_name = f"{filename}.publish.attestation"
attestation_path = parent / attestation_name
if attestation_path.exists() and not overwrite:
_fatal(
"Attestation file conflict",
detail=f"Attestation file already exists: `{attestation_path}`",
tip="Set `overwrite: true` to overwrite existing attestation files.",
)
dists_with_dests.append((file, dist, attestation_path))
trust = ClientTrustConfig.production()
context = SigningContext.from_trust_config(trust)
attestations = []
with context.signer(identity_token=id_token) as signer:
for _, dist, attestation_path in dists_with_dests:
attestation = Attestation.sign(signer, dist)
attestation_path.write_text(attestation.model_dump_json())
attestations.append(((dist, attestation)))
return attestations
def main() -> None:
path_patterns = _get_path_patterns()
dists = _collect_dists(path_patterns)
id_token = _get_id_token()
overwrite = _get_input("overwrite") == "true"
attestations = _attest(dists, id_token, overwrite=overwrite)
_summary("## attest-action")
table = PrettyTable()
table.set_style(TableStyle.MARKDOWN)
table.field_names = ["Distribution", "Transparency Log Entry", "Integration Time"]
for dist, attestation in attestations:
log_entry = attestation.verification_material.transparency_entries[0]
log_index = log_entry["logIndex"]
log_index_url = f"https://search.sigstore.dev/?logIndex={log_index}"
log_index_link = f"[{log_index}]({log_index_url})"
integrated_time = int(log_entry["integratedTime"])
integrated_time_str = datetime.fromtimestamp(integrated_time).isoformat()
table.add_row([f"`{dist.name}`", log_index_link, integrated_time_str])
_summary(str(table))
if __name__ == "__main__":
main()