|
1 | 1 | import functools |
2 | | -import json |
3 | | -import os |
4 | | -import subprocess |
5 | | -import time |
6 | | -from base64 import b64decode |
7 | | -from collections.abc import Callable |
8 | | -from datetime import datetime, timedelta |
9 | | -from pathlib import Path |
10 | | -from tempfile import TemporaryDirectory |
11 | | -from typing import Any |
12 | 2 |
|
13 | 3 | import pytest |
14 | 4 | from sigstore import oidc |
15 | | - |
16 | | - |
17 | | -def pytest_addoption(parser: pytest.Parser) -> None: |
18 | | - parser.addoption( |
19 | | - "--min-id-token-validity", |
20 | | - action="store", |
21 | | - help="Minimum validity of the identity token in seconds", |
22 | | - type=lambda x: timedelta(seconds=int(x)), |
23 | | - default=timedelta(seconds=20), |
24 | | - ) |
25 | | - |
26 | | - |
27 | | -def _jwt_cache() -> Callable[[Callable[..., Any]], Callable[..., Any]]: |
28 | | - def _decorator( |
29 | | - fn: Callable[[pytest.Config], oidc.IdentityToken], |
30 | | - ) -> Callable[[pytest.Config], oidc.IdentityToken]: |
31 | | - @functools.wraps(fn) |
32 | | - def _wrapped(pytestconfig: pytest.Config) -> oidc.IdentityToken: |
33 | | - # Cache the token for the duration of the test run, |
34 | | - # as long as the returned token is not yet expired |
35 | | - if hasattr(_wrapped, "token"): |
36 | | - assert isinstance(_wrapped.token, oidc.IdentityToken) |
37 | | - min_validity = pytestconfig.getoption("--min-id-token-validity") |
38 | | - if _is_valid_at(_wrapped.token, datetime.now() + min_validity): |
39 | | - return _wrapped.token |
40 | | - |
41 | | - token = fn(pytestconfig) |
42 | | - setattr(_wrapped, "token", token) |
43 | | - return token |
44 | | - |
45 | | - return _wrapped |
46 | | - |
47 | | - return _decorator |
48 | | - |
49 | | - |
50 | | -def _is_valid_at(token: oidc.IdentityToken, reference_time: datetime) -> bool: |
51 | | - # split token, b64 decode (with padding), parse as json, validate expiry |
52 | | - payload = str(token).split(".")[1] |
53 | | - payload += "=" * (4 - len(payload) % 4) |
54 | | - payload_json = json.loads(b64decode(payload)) |
55 | | - |
56 | | - expiry = datetime.fromtimestamp(payload_json["exp"]) |
57 | | - return reference_time < expiry |
| 5 | +from urllib3 import request |
58 | 6 |
|
59 | 7 |
|
60 | 8 | @pytest.fixture |
61 | | -@_jwt_cache() |
62 | | -def id_token(pytestconfig: pytest.Config) -> oidc.IdentityToken: |
63 | | - # following code is modified from extremely-dangerous-public-oidc-beacon download-token.py. |
64 | | - # Caching can be made smarter (to return the cached token only if it is valid) if token |
65 | | - # starts going invalid during runs |
66 | | - MIN_VALIDITY = timedelta(seconds=20) |
67 | | - MAX_RETRY_TIME = timedelta(minutes=5 if os.getenv("CI") else 1) |
68 | | - RETRY_SLEEP_SECS = 30 if os.getenv("CI") else 5 |
69 | | - GIT_URL = "https://github.com/sigstore-conformance/extremely-dangerous-public-oidc-beacon.git" |
70 | | - |
71 | | - def git_clone(url: str, dir: str) -> None: |
72 | | - base_cmd = ["git", "clone", "--quiet", "--branch", "current-token", "--depth", "1"] |
73 | | - subprocess.run(base_cmd + [url, dir], check=True) |
74 | | - |
75 | | - start_time = datetime.now() |
76 | | - while datetime.now() <= start_time + MAX_RETRY_TIME: |
77 | | - with TemporaryDirectory() as tempdir: |
78 | | - git_clone(GIT_URL, tempdir) |
79 | | - |
80 | | - with Path(tempdir, "oidc-token.txt").open() as f: |
81 | | - token = oidc.IdentityToken(f.read().rstrip()) |
82 | | - |
83 | | - if _is_valid_at(token, datetime.now() + MIN_VALIDITY): |
84 | | - return token |
85 | | - |
86 | | - print(f"Current token expires too early, retrying in {RETRY_SLEEP_SECS} seconds.") |
87 | | - time.sleep(RETRY_SLEEP_SECS) |
88 | | - |
89 | | - raise TimeoutError(f"Failed to find a valid token in {MAX_RETRY_TIME}") |
| 9 | +@functools.cache |
| 10 | +def id_token() -> oidc.IdentityToken: |
| 11 | + resp = request( |
| 12 | + "GET", |
| 13 | + "https://storage.googleapis.com/sigstore-conformance-testing-token/untrusted-testing-token.txt", |
| 14 | + timeout=30.0, |
| 15 | + ) |
| 16 | + return oidc.IdentityToken(resp.data.decode().rstrip()) |
0 commit comments