22#
33# SPDX-License-Identifier: Apache-2.0
44
5- from dataclasses import dataclass , field
6- from typing import Optional , Set , List , Tuple , Dict
5+ """Logic that implements a mock CA suitable for testing purposes."""
76
87import logging
98import sys
109from dataclasses import dataclass , field
1110from typing import Dict , List , Optional , Set , Tuple
1211
13- sys .path .append ('.' )
14-
1512from cryptography import x509
1613from flask import Flask , Response , request
14+ from pyasn1 .codec .der import decoder , encoder
15+ from pyasn1_alt_modules import rfc9480
16+
17+ sys .path .append ('.' )
1718from pq_logic .hybrid_issuing import build_chameleon_from_p10cr , build_sun_hybrid_cert_from_request
1819from pq_logic .hybrid_sig import sun_lamps_hybrid_scheme_00
1920from pq_logic .hybrid_sig .sun_lamps_hybrid_scheme_00 import get_sun_hybrid_alt_sig
2021from pq_logic .py_verify_logic import verify_hybrid_pkimessage_protection
21- from pyasn1 .codec .der import decoder , encoder
22- from pyasn1_alt_modules import rfc9480
2322from resources .ca_ra_utils import (
2423 build_cp_cmp_message ,
2524 build_cp_from_p10cr ,
@@ -134,6 +133,7 @@ def _build_error_from_exception(e: CMPTestSuiteError) -> rfc9480.PKIMessage:
134133
135134
136135class CAHandler :
136+ """Mock CA class"""
137137
138138 def __init__ (self , ca_cert : rfc9480 .CMPCertificate , ca_key : PrivateKey ,
139139 config : dict , ca_alt_key : Optional [PrivateKey ] = None ):
@@ -191,29 +191,30 @@ def process_normal_request(self, pki_message: rfc9480.PKIMessage) -> rfc9480.PKI
191191
192192 :return: The PKI message containing the response.
193193 """
194- logging .debug (f"Processing request with body: { pki_message ['body' ].getName ()} " )
194+ body_type = pki_message ["body" ].getName ()
195+ logging .debug ("Processing request with body: %s" , body_type )
195196 try :
196- if pki_message [ "body" ]. getName () == "rr" :
197+ if body_type == "rr" :
197198 response = self .process_rr (pki_message )
198- elif pki_message [ "body" ]. getName () == "certConf" :
199+ elif body_type == "certConf" :
199200 response = self .process_cert_conf (pki_message )
200- elif pki_message [ "body" ]. getName () == "kur" :
201+ elif body_type == "kur" :
201202 response = self .process_kur (pki_message )
202- elif pki_message [ "body" ]. getName () == "genm" :
203+ elif body_type == "genm" :
203204 response = self .process_genm (pki_message )
204- elif pki_message [ "body" ]. getName () == "cr" :
205+ elif body_type == "cr" :
205206 response = self .process_cr (pki_message )
206- elif pki_message [ "body" ]. getName () == "ir" :
207+ elif body_type == "ir" :
207208 response = self .process_ir (pki_message )
208- elif pki_message [ "body" ]. getName () == "p10cr" :
209+ elif body_type == "p10cr" :
209210 response = self .process_p10cr (pki_message )
210211 else :
211- raise NotImplementedError (f"Method not implemented, to handle the "
212- f"provided message: { pki_message ['body' ].getName ()} ." )
212+ raise NotImplementedError (f"Cannot handle: { body_type } " )
213213 except CMPTestSuiteError as e :
214214 return _build_error_from_exception (e )
215215 except Exception as e :
216- return _build_error_from_exception (CMPTestSuiteError (f"An error occurred: { str (e )} " , failinfo = "systemFailure" ))
216+ return _build_error_from_exception (CMPTestSuiteError (f"An error occurred: "
217+ f"{ str (e )} " , failinfo = "systemFailure" ))
217218
218219 return self .sign_response (response = response , request = pki_message )
219220
@@ -320,15 +321,15 @@ def process_ir(self, pki_message: rfc9480.PKIMessage) -> rfc9480.PKIMessage:
320321 :return: The PKI message containing the response.
321322 """
322323 logging .debug ("Processing IR message" )
323- logging .debug ("CA Key: {}" . format ( self .ca_key ) )
324+ logging .debug ("CA Key: %s" , self .ca_key )
324325
325326 pki_message , certs = build_ip_cmp_message (
326327 request = pki_message ,
327328 ca_cert = self .ca_cert ,
328329 ca_key = self .ca_key ,
329330 implicit_confirm = True ,
330331 )
331- logging .debug ("RESPONSE: {}" . format ( pki_message .prettyPrint () ))
332+ logging .debug ("RESPONSE: %s" , pki_message .prettyPrint ())
332333 self .state .store_transaction_certificate (
333334 transaction_id = pki_message ["header" ]["transactionID" ].asOctets (),
334335 sender = pki_message ["header" ]["sender" ],
@@ -353,9 +354,8 @@ def process_chameleon(self, pki_message: rfc9480.PKIMessage) -> rfc9480.PKIMessa
353354 certs = [paired_cert , delta_cert ],
354355 )
355356 return pki_message
356- else :
357- raise NotImplementedError ("Not implemented to handle a chameleon request with body: {}"
358- .format (pki_message ["body" ].getName ()))
357+
358+ raise NotImplementedError ("Only p10cr is supported for Chameleon" )
359359
360360
361361 def process_sun_hybrid (self , pki_message : rfc9480 .PKIMessage ) -> rfc9480 .PKIMessage :
@@ -410,12 +410,14 @@ def process_multi_auth(self, pki_message: rfc9480.PKIMessage) -> rfc9480.PKIMess
410410
411411@app .route ("/pubkey/<serial_number>" , methods = ["GET" ])
412412def get_pubkey (serial_number ):
413+ """Retrieve a public key knowing the certificate serial number."""
413414 serial_number = int (serial_number )
414415 sun_hybrid_cert = state .sun_hybrid_state .sun_hybrid_pub_keys [serial_number ]
415416 return encoder .encode (sun_hybrid_cert )
416417
417418@app .route ("/sig/<serial_number>" , methods = ["GET" ])
418419def get_signature (serial_number ):
420+ """Retrieve a certificate's signature knowing its serial number."""
419421 serial_number = int (serial_number )
420422 alt_sig = state .sun_hybrid_state .sun_hybrid_signatures [serial_number ]
421423 return alt_sig
@@ -430,14 +432,13 @@ def handle_issuing() -> bytes:
430432 try :
431433 # Access the raw data from the request body
432434 data = request .get_data ()
433- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
435+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
434436 pki_message = handler .process_normal_request (pki_message )
435- logging .warning (f"Response: { pki_message .prettyPrint ()} " )
437+ logging .warning (f"Response: %s" , pki_message .prettyPrint ())
436438 response_data = encoder .encode (pki_message )
437439 return Response (response_data , content_type = "application/octet-stream" )
438440 except Exception as e :
439- # Handle any errors gracefully
440- return Response (f"Error: { str (e )} " , status = 500 , content_type = "text/plain" )
441+ return Response (f"Error: { e } " , status = 500 , content_type = "text/plain" )
441442
442443@app .route ("/chameleon" , methods = ["POST" ])
443444def handle_chameleon ():
@@ -446,7 +447,7 @@ def handle_chameleon():
446447 :return: The DER-encoded response.
447448 """
448449 data = request .get_data ()
449- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
450+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
450451 pki_message = handler .process_normal_request (pki_message )
451452 return handler .process_chameleon (
452453 pki_message = pki_message ,
@@ -459,7 +460,7 @@ def handle_sun_hybrid():
459460 :return: The DER-encoded response.
460461 """
461462 data = request .get_data ()
462- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
463+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
463464 pki_message = handler .process_normal_request (pki_message )
464465 return handler .process_sun_hybrid (
465466 pki_message = pki_message ,
@@ -472,7 +473,7 @@ def handle_multi_auth():
472473 :return: The DER-encoded response.
473474 """
474475 data = request .get_data ()
475- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
476+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
476477 pki_message = handler .process_normal_request (pki_message )
477478 return handler .process_multi_auth (
478479 pki_message = pki_message ,
@@ -485,7 +486,7 @@ def handle_cert_discovery():
485486 :return: The DER-encoded response.
486487 """
487488 data = request .get_data ()
488- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
489+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
489490 pki_message = handler .process_cert_discovery (pki_message )
490491 return pki_message
491492
@@ -496,7 +497,7 @@ def handle_related_cert():
496497 :return: The DER-encoded response.
497498 """
498499 data = request .get_data ()
499- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
500+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
500501 pki_message = handler .process_related_cert (pki_message )
501502 return pki_message
502503
@@ -507,7 +508,7 @@ def handle_catalyst_sig():
507508 :return: The DER-encoded response.
508509 """
509510 data = request .get_data ()
510- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
511+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
511512 pki_message = handler .process_catalyst_sig (pki_message )
512513 return pki_message
513514
@@ -518,12 +519,11 @@ def handle_catalyst():
518519 :return: The DER-encoded response.
519520 """
520521 data = request .get_data ()
521- pki_message , rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
522+ pki_message , _rest = decoder .decode (data , asn1Spec = rfc9480 .PKIMessage ())
522523 pki_message = handler .process_catalyst (pki_message )
523524 return pki_message
524525
525526
526527
527528if __name__ == "__main__" :
528529 app .run (port = 5000 , debug = True )
529-
0 commit comments