7
7
8
8
logger = logging .getLogger ("pymdoccbor" )
9
9
10
- from pycose .headers import Algorithm
11
- from pycose .keys import CoseKey
12
-
13
- from datetime import timezone
14
-
15
10
from pycose .headers import Algorithm #, KID
16
11
from pycose .keys import CoseKey , EC2Key
17
-
18
12
from pycose .messages import Sign1Message
19
13
20
14
from typing import Union
21
15
22
-
23
16
from pymdoccbor .exceptions import MsoPrivateKeyRequired
24
17
from pymdoccbor import settings
25
18
from pymdoccbor .x509 import MsoX509Fabric
26
19
from pymdoccbor .tools import shuffle_dict
27
20
from cryptography import x509
28
21
from cryptography .hazmat .primitives import serialization
22
+ from cryptography .x509 import Certificate
29
23
30
24
31
25
from cbor_diag import *
@@ -40,7 +34,6 @@ def __init__(
40
34
self ,
41
35
data : dict ,
42
36
validity : dict ,
43
- revocation : str = None ,
44
37
cert_path : str = None ,
45
38
key_label : str = None ,
46
39
user_pin : str = None ,
@@ -51,13 +44,13 @@ def __init__(
51
44
hsm : bool = False ,
52
45
private_key : Union [dict , CoseKey ] = None ,
53
46
digest_alg : str = settings .PYMDOC_HASHALG ,
47
+ revocation : dict = None
54
48
) -> None :
55
49
"""
56
50
Initialize a new MsoIssuer
57
51
58
52
:param data: dict: the data to sign
59
53
:param validity: validity: the validity info of the mso
60
- :param revocation: str: the revocation status
61
54
:param cert_path: str: the path to the certificate
62
55
:param key_label: str: key label
63
56
:param user_pin: str: user pin
@@ -68,6 +61,7 @@ def __init__(
68
61
:param hsm: bool: hardware security module
69
62
:param private_key: Union[dict, CoseKey]: the signing key
70
63
:param digest_alg: str: the digest algorithm
64
+ :param revocation: dict: revocation status dict to include in the mso, it may include status_list and identifier_list keys
71
65
"""
72
66
73
67
if not hsm :
@@ -82,10 +76,10 @@ def __init__(
82
76
raise ValueError ("private_key must be a dict or CoseKey object" )
83
77
else :
84
78
raise MsoPrivateKeyRequired ("MSO Writer requires a valid private key" )
85
-
79
+
86
80
if not validity :
87
81
raise ValueError ("validity must be present" )
88
-
82
+
89
83
if not alg :
90
84
raise ValueError ("alg must be present" )
91
85
@@ -208,19 +202,32 @@ def sign(
208
202
"deviceKeyInfo" : {
209
203
"deviceKey" : device_key ,
210
204
},
211
- "digestAlgorithm" : alg_map .get (self .alg ),
205
+ "digestAlgorithm" : alg_map .get (self .alg )
212
206
}
213
-
214
207
if self .revocation is not None :
215
208
payload .update ({"status" : self .revocation })
216
209
217
210
if self .cert_path :
218
- # Load the DER certificate file
211
+ # Try to load the certificate file
219
212
with open (self .cert_path , "rb" ) as file :
220
213
certificate = file .read ()
221
-
222
- cert = x509 .load_der_x509_certificate (certificate )
223
-
214
+ _parsed_cert : Union [Certificate , None ] = None
215
+ try :
216
+ _parsed_cert = x509 .load_pem_x509_certificate (certificate )
217
+ except Exception as e :
218
+ logger .error (f"Certificate at { self .cert_path } could not be loaded as PEM, trying DER" )
219
+
220
+ if not _parsed_cert :
221
+ try :
222
+ _parsed_cert = x509 .load_der_x509_certificate (certificate )
223
+ except Exception as e :
224
+ _err_msg = f"Certificate at { self .cert_path } could not be loaded as DER"
225
+ logger .error (_err_msg )
226
+
227
+ if _parsed_cert :
228
+ cert = _parsed_cert
229
+ else :
230
+ raise Exception (f"Certificate at { self .cert_path } failed parse" )
224
231
_cert = cert .public_bytes (getattr (serialization .Encoding , "DER" ))
225
232
else :
226
233
_cert = self .selfsigned_x509cert ()
0 commit comments