33"""
44
55import base64
6+ import binascii
67import datetime
78import hashlib
89import json
1314import dateutil .parser
1415
1516from mobster .error import SBOMError
16- from mobster .image import Image
17+ from mobster .image import parse_image_reference
1718
1819logger = logging .getLogger (__name__ )
1920
2021
21- class Provenance02 :
22+ class SLSAParsingError ( Exception ) :
2223 """
23- Object containing the data of a provenance attestation.
24+ Exception raised when parsing SLSA provenance data fails.
25+ """
26+
2427
25- Attributes:
26- predicate (dict): The attestation predicate.
28+ class SLSAProvenance :
2729 """
30+ Class for parsing and accessing SLSA provenance data.
2831
29- predicate_type = "https://slsa.dev/provenance/v0.2"
32+ Parses SLSA provenance payloads and provides access to build metadata
33+ and SBOM digest mappings for container images.
34+ """
3035
31- def __init__ (self , predicate : dict [str , Any ]) -> None :
32- self .predicate = predicate
36+ def __init__ (
37+ self , build_finished_on : datetime .datetime | None , sbom_digests : dict [str , str ]
38+ ) -> None :
39+ self ._build_finished_on = build_finished_on
40+ self ._sbom_digests : dict [str , str ] = sbom_digests
3341
3442 @staticmethod
35- def from_cosign_output ( raw : bytes ) -> "Provenance02 " :
43+ def parse ( attestation : bytes ) -> "SLSAProvenance " :
3644 """
37- Create a Provenance02 object from a line of raw "cosign
38- verify- attestation" output.
45+ Parse a raw in-toto attestation into an SLSAProvenance.
46+ https://github.com/in-toto/ attestation/blob/main/spec/README.md#in-toto-attestation-framework-spec
3947
4048 Args:
41- raw: Raw bytes from cosign verify-attestation command
42- """
43- encoded = json .loads (raw )
44- att = json .loads (base64 .b64decode (encoded ["payload" ]))
45- if (pt := att .get ("predicateType" )) != Provenance02 .predicate_type :
46- raise ValueError (
47- f"Cannot parse predicateType { pt } . "
48- f"Expected { Provenance02 .predicate_type } "
49+ attestation: Bytes containing data of an in-toto attestation.
50+ E.g. a line of output from "cosign verify-attestation".
51+
52+ Raises:
53+ SLSAParsingError: If the SLSA version is not supported, the
54+ statement is missing a predicateType field, or some required
55+ provenance content cannot be decoded.
56+ """
57+ encoded = json .loads (attestation )
58+ statement = json .loads (base64 .b64decode (encoded ["payload" ]))
59+
60+ predicate_type = statement .get ("predicateType" )
61+ if predicate_type is None :
62+ raise SLSAParsingError (
63+ 'Statement is missing required "predicateType" field'
4964 )
5065
51- predicate = att .get ("predicate" , {})
52- return Provenance02 (predicate )
66+ predicate = statement .get ("predicate" )
5367
54- @property
55- def build_finished_on (self ) -> datetime .datetime :
56- """
57- Return datetime of the build being finished.
58- If it's not available, fallback to datetime.min.
59- """
60- finished_on : str | None = self .predicate .get ("metadata" , {}).get (
61- "buildFinishedOn"
62- )
63- if finished_on :
64- return dateutil .parser .isoparse (finished_on )
68+ if predicate_type == "https://slsa.dev/provenance/v0.2" :
69+ return SLSAProvenance ._parse_v02 (predicate )
70+ if predicate_type == "https://slsa.dev/provenance/v1" :
71+ return SLSAProvenance ._parse_v1 (predicate )
6572
66- return datetime .datetime .min .replace (tzinfo = datetime .timezone .utc )
73+ raise SLSAParsingError (
74+ f"Cannot parse SLSA provenance with predicateType { predicate_type } ."
75+ )
6776
68- def get_sbom_digest (self , image : Image ) -> str :
77+ @staticmethod
78+ def _parse_v02 (predicate : Any ) -> "SLSAProvenance" :
6979 """
70- Find the SBOM_BLOB_URL value in the provenance for the supplied image.
80+ Parse an SLSA provenance v0.2 from an in-toto attestation's predicate
81+ field.
82+ https://github.com/in-toto/attestation/blob/main/spec/README.md#in-toto-attestation-framework-spec
83+
84+ Spec of the provenance can be found in https://slsa.dev/provenance/v0.2
7185
7286 Args:
73- image: The image to find the SBOM digest for
87+ predicate: Contents of the "predicate" field of the in-toto
88+ attestation's statement parsed into a dictionary object.
89+
90+ Returns:
91+ An SLSAProvenance object populated with data from the predicate.
7492 """
93+ # parse build_finished_on
94+
95+ finished_on : str | None = predicate .get ("metadata" , {}).get ("buildFinishedOn" )
96+ if finished_on :
97+ build_finished_on = dateutil .parser .isoparse (finished_on )
98+ else :
99+ build_finished_on = None
100+
101+ # map image digests to sbom blob digests
75102 sbom_blob_urls : dict [str , str ] = {}
76- tasks = self . predicate .get ("buildConfig" , {}).get ("tasks" , [])
103+ tasks = predicate .get ("buildConfig" , {}).get ("tasks" , [])
77104 for task in tasks :
78105 curr_digest , sbom_url = "" , ""
79106 for result in task .get ("results" , []):
@@ -83,13 +110,102 @@ def get_sbom_digest(self, image: Image) -> str:
83110 curr_digest = result .get ("value" )
84111 if not all ([curr_digest , sbom_url ]):
85112 continue
86- sbom_blob_urls [curr_digest ] = sbom_url
87113
88- blob_url = sbom_blob_urls .get (image .digest )
89- if blob_url is None :
90- raise SBOMError (f"No SBOM_BLOB_URL found in attestation for image { image } ." )
114+ sbom_blob_urls [curr_digest ] = sbom_url .split ("@" , 1 )[1 ]
115+
116+ return SLSAProvenance (build_finished_on , sbom_blob_urls )
117+
118+ @staticmethod
119+ def _parse_v1 (predicate : Any ) -> "SLSAProvenance" :
120+ """
121+ Parse an SLSA provenance v1 from an in-toto attestation's predicate
122+ field.
123+ https://github.com/in-toto/attestation/blob/main/spec/README.md#in-toto-attestation-framework-spec
91124
92- return blob_url .split ("@" , 1 )[1 ]
125+ Args:
126+ predicate: Contents of the "predicate" field of the in-toto
127+ attestation's statement parsed into a dictionary object.
128+
129+ Spec of the provenance can be found in https://slsa.dev/provenance/v1
130+
131+ Returns:
132+ An SLSAProvenance object populated with data from the predicate.
133+ """
134+ finished_on : str | None = (
135+ predicate .get ("runDetails" , {}).get ("metadata" , {}).get ("finishedOn" )
136+ )
137+ if finished_on :
138+ build_finished_on = dateutil .parser .isoparse (finished_on )
139+ else :
140+ build_finished_on = None
141+
142+ image_digests : dict [str , str ] = {}
143+ sbom_digests : dict [str , str ] = {}
144+ byproducts = predicate .get ("runDetails" , {}).get ("byproducts" , [])
145+
146+ for byproduct in byproducts :
147+ name = byproduct .get ("name" , "" )
148+ if name not in (
149+ "taskRunResults/IMAGE_REF" ,
150+ "taskRunResults/SBOM_BLOB_URL" ,
151+ ):
152+ continue
153+
154+ content = byproduct .get ("content" )
155+ if content is None :
156+ raise SLSAParsingError (
157+ f'Byproduct with name { name } is missing "content" field'
158+ )
159+
160+ try :
161+ decoded = json .loads (base64 .b64decode (content ))
162+ except (binascii .Error , json .JSONDecodeError ) as err :
163+ raise SLSAParsingError (
164+ f"Failed to decode { name } content: { err } "
165+ ) from err
166+
167+ if not isinstance (decoded , str ):
168+ raise SLSAParsingError (
169+ f"Expected string content for { name } , got { type (decoded ).__name__ } "
170+ )
171+
172+ repository , digest = parse_image_reference (decoded )
173+ if name == "taskRunResults/IMAGE_REF" :
174+ image_digests .setdefault (repository , digest )
175+ elif name == "taskRunResults/SBOM_BLOB_URL" :
176+ sbom_digests [repository ] = digest
177+
178+ sbom_blob_urls = {
179+ image_digests [repo ]: sbom_digest
180+ for repo , sbom_digest in sbom_digests .items ()
181+ if repo in image_digests
182+ }
183+
184+ return SLSAProvenance (build_finished_on , sbom_blob_urls )
185+
186+ @property
187+ def build_finished_on (self ) -> datetime .datetime | None :
188+ """
189+ Get the timestamp when the build finished.
190+
191+ Returns:
192+ The build completion timestamp, or None if the timestamp was not
193+ available in the provenance data.
194+ """
195+ return self ._build_finished_on
196+
197+ def sbom_digest (self , image_digest : str ) -> str | None :
198+ """
199+ Get the SBOM digest for a given image digest.
200+
201+ Args:
202+ image_digest: SHA256 digest of the container image
203+
204+ Returns:
205+ The corresponding SBOM digest, or None if not found in the
206+ provenance data.
207+ """
208+ return self ._sbom_digests .get (image_digest )
93209
94210
95211class SBOMFormat (Enum ):
0 commit comments