1818from cryptography .hazmat .primitives import hashes , serialization
1919from cryptography .hazmat .primitives .asymmetric import ec , ed25519 , padding , rsa
2020from cryptography .hazmat .primitives .asymmetric .types import PublicKeyTypes
21+ from cryptography .hazmat .primitives .asymmetric .utils import decode_dss_signature , encode_dss_signature
2122from cryptography .x509 import Certificate , NameOID
2223
2324from .logging_config import logger
3031ALG_PS256 = - 37
3132X5CHAIN_HEADER = 33
3233
33- # Map COSE algorithm IDs to EC curve / hash pairs for verification
34- _EC_ALG_MAP : dict [int , tuple [type [ec .EllipticCurve ], type [hashes .HashAlgorithm ]]] = {
35- ALG_ES256 : (ec .SECP256R1 , hashes .SHA256 ),
36- ALG_ES384 : (ec .SECP384R1 , hashes .SHA384 ),
37- ALG_ES512 : (ec .SECP521R1 , hashes .SHA512 ),
34+ # Map COSE algorithm IDs to ( EC curve, hash, R||S component size in bytes)
35+ _EC_ALG_MAP : dict [int , tuple [type [ec .EllipticCurve ], type [hashes .HashAlgorithm ], int ]] = {
36+ ALG_ES256 : (ec .SECP256R1 , hashes .SHA256 , 32 ),
37+ ALG_ES384 : (ec .SECP384R1 , hashes .SHA384 , 48 ),
38+ ALG_ES512 : (ec .SECP521R1 , hashes .SHA512 , 66 ),
3839}
3940
41+ # Reverse map: EC curve name to R||S component size
42+ _EC_COMPONENT_SIZE : dict [str , int ] = {
43+ "secp256r1" : 32 ,
44+ "secp384r1" : 48 ,
45+ "secp521r1" : 66 ,
46+ }
47+
48+
49+ def _raw_to_der_ecdsa (raw_sig : bytes , component_size : int ) -> bytes :
50+ """Convert a COSE raw R||S ECDSA signature to DER format."""
51+ r = int .from_bytes (raw_sig [:component_size ], "big" )
52+ s = int .from_bytes (raw_sig [component_size :], "big" )
53+ return encode_dss_signature (r , s )
54+
55+
56+ def _der_to_raw_ecdsa (der_sig : bytes , component_size : int ) -> bytes :
57+ """Convert a DER-encoded ECDSA signature to COSE raw R||S format."""
58+ r , s = decode_dss_signature (der_sig )
59+ return r .to_bytes (component_size , "big" ) + s .to_bytes (component_size , "big" )
60+
4061
4162class Signer (Protocol ):
4263 """Protocol for abstract signing implementations (e.g., AWS KMS, Azure Key Vault)."""
@@ -94,6 +115,9 @@ def _sign_with_key(key: SigningKey, data: bytes) -> bytes:
94115def _verify_with_public_key (pub : PublicKeyTypes , signature : bytes , data : bytes , alg : int ) -> None :
95116 """Verify a signature using the correct algorithm for the key/alg pair.
96117
118+ Handles both COSE raw R||S format (RFC 9053) and DER-encoded ECDSA
119+ signatures for backwards compatibility.
120+
97121 Raises InvalidSignature on failure.
98122 """
99123 if isinstance (pub , ed25519 .Ed25519PublicKey ):
@@ -103,8 +127,15 @@ def _verify_with_public_key(pub: PublicKeyTypes, signature: bytes, data: bytes,
103127 ec_entry = _EC_ALG_MAP .get (alg )
104128 if ec_entry is None :
105129 raise ValueError (f"COSE alg { alg } is not an EC algorithm" )
106- _ , hash_cls = ec_entry
107- pub .verify (signature , data , ec .ECDSA (hash_cls ()))
130+ _ , hash_cls , component_size = ec_entry
131+ # COSE uses raw R||S (RFC 9053). Convert to DER for cryptography lib.
132+ expected_raw_len = component_size * 2
133+ if len (signature ) == expected_raw_len :
134+ der_sig = _raw_to_der_ecdsa (signature , component_size )
135+ else :
136+ # Already DER-encoded (legacy Encypher-signed content)
137+ der_sig = signature
138+ pub .verify (der_sig , data , ec .ECDSA (hash_cls ()))
108139 return
109140 if isinstance (pub , rsa .RSAPublicKey ):
110141 pub .verify (
@@ -131,6 +162,9 @@ def _build_sign1(protected_bstr: bytes, unprotected: dict, payload: bytes, signa
131162
132163def _parse_sign1 (cose_bytes : bytes ) -> tuple [bytes , dict , Optional [bytes ], bytes ]:
133164 arr = cbor2 .loads (cose_bytes )
165+ # COSE_Sign1 may be wrapped in CBOR tag 18 per RFC 9052 §4.1.
166+ if isinstance (arr , cbor2 .CBORTag ) and arr .tag == 18 :
167+ arr = arr .value
134168 if not isinstance (arr , list ) or len (arr ) != 4 :
135169 raise ValueError ("Invalid COSE_Sign1 structure" )
136170 protected_bstr , unprotected , payload , signature = arr
@@ -268,9 +302,16 @@ def sign_c2pa_cose(
268302 protected_bstr = _encode_protected (protected_header )
269303 to_sign = _sig_structure (protected_bstr , payload_bytes )
270304
271- signature = _sign_with_key (private_key , to_sign )
305+ signature = cast (bytes , _sign_with_key (private_key , to_sign ))
306+
307+ # COSE requires raw R||S format for ECDSA (RFC 9053 Section 2.1).
308+ # Python's cryptography library returns DER; convert if EC key.
309+ if isinstance (private_key , ec .EllipticCurvePrivateKey ):
310+ component_size = _EC_COMPONENT_SIZE .get (private_key .curve .name )
311+ if component_size :
312+ signature = _der_to_raw_ecdsa (signature , component_size )
272313
273- encoded_cose = _build_sign1 (protected_bstr , unprotected_header , payload_bytes , cast ( bytes , signature ) )
314+ encoded_cose = _build_sign1 (protected_bstr , unprotected_header , payload_bytes , signature )
274315
275316 # If timestamp authority URL is provided, request a timestamp
276317 if timestamp_authority_url :
@@ -482,7 +523,10 @@ def verify_c2pa_cose(
482523 x5chain = unprotected .get (X5CHAIN_HEADER )
483524 if not x5chain :
484525 raise ValueError ("No public key provided and no x5chain in COSE message" )
485- cert = x509 .load_der_x509_certificate (x5chain [0 ])
526+ # Per RFC 9360, x5chain is a single CBOR bstr for one certificate
527+ # or a CBOR array of bstr for a chain. Extract the leaf (first) cert.
528+ leaf_der = x5chain if isinstance (x5chain , bytes ) else x5chain [0 ]
529+ cert = x509 .load_der_x509_certificate (leaf_der )
486530 verify_key = cert .public_key ()
487531
488532 try :
@@ -670,17 +714,22 @@ def extract_certificates_from_cose(cose_bytes: bytes) -> list[Certificate]:
670714 ValueError: If the message is not a valid COSE_Sign1 structure or contains no certificates.
671715 """
672716 protected_bstr , unprotected , payload , _signature = _parse_sign1 (cose_bytes )
673- if payload is None :
674- raise ValueError ("Message is not a COSE_Sign1 structure." )
675717
676- # Extract certificates from the unprotected header (x5chain)
718+ # Extract certificates from the unprotected header (x5chain).
719+ # The payload may be None for detached-payload COSE_Sign1 structures
720+ # (required by C2PA spec); certificates live in the unprotected header
721+ # regardless of whether the payload is inline or detached.
677722 certificates = []
678723 x5chain = unprotected .get (X5CHAIN_HEADER )
679724
680725 if not x5chain :
681726 raise ValueError ("No X.509 certificates found in COSE message." )
682727
683- # Parse each certificate in the chain
728+ # Per RFC 9360, x5chain is a single CBOR bstr for one certificate
729+ # or a CBOR array of bstr for a chain. Normalise to a list.
730+ if isinstance (x5chain , bytes ):
731+ x5chain = [x5chain ]
732+
684733 for cert_bytes in x5chain :
685734 try :
686735 cert = x509 .load_der_x509_certificate (cert_bytes )
0 commit comments