@@ -423,7 +423,7 @@ def clear_sign(self: MsgSigner, operation: ClearSignOperation) -> SigningResults
423423 return signing_results
424424
425425 @staticmethod
426- def create_manifest_claim_message (signature_key : str , digest : str , reference : str ) -> str :
426+ def create_manifest_claim_message (digest : str , reference : str ) -> str :
427427 """Create manifest claim for container signing.
428428
429429 See below for the specification for the manifest claim that is created here
@@ -447,6 +447,26 @@ def create_manifest_claim_message(signature_key: str, digest: str, reference: st
447447 }
448448 return base64 .b64encode (json .dumps (manifest_claim ).encode ("latin1" )).decode ("latin1" )
449449
450+ def _prepare_messages (self , operation : ContainerSignOperation ) -> List [MsgMessage ]:
451+ fargs = []
452+ for digest , reference in zip (operation .digests , operation .references ):
453+ repo = reference .split ("/" , 1 )[1 ].split (":" )[0 ]
454+ fargs .append (
455+ FData (
456+ args = [
457+ self .create_manifest_claim_message (digest = digest , reference = reference ),
458+ repo ,
459+ operation ,
460+ SignRequestType .CONTAINER ,
461+ ],
462+ kwargs = {
463+ "extra_attrs" : {"pub_task_id" : operation .task_id , "manifest_digest" : digest }
464+ },
465+ )
466+ )
467+ ret = run_in_parallel (self ._create_msg_message , fargs )
468+ return list (ret .values ())
469+
450470 def container_sign (self : MsgSigner , operation : ContainerSignOperation ) -> SigningResults :
451471 """Run container signing operation.
452472
@@ -469,26 +489,9 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin
469489
470490 LOG .info (f"Container sign operation for { len (operation .digests )} " )
471491
472- fargs = []
473- for digest , reference in zip (operation .digests , operation .references ):
474- repo = reference .split ("/" , 1 )[1 ].split (":" )[0 ]
475- fargs .append (
476- FData (
477- args = [
478- self .create_manifest_claim_message (
479- signing_key , digest = digest , reference = reference
480- ),
481- repo ,
482- operation ,
483- SignRequestType .CONTAINER ,
484- ],
485- kwargs = {
486- "extra_attrs" : {"pub_task_id" : operation .task_id , "manifest_digest" : digest }
487- },
488- )
489- )
490- ret = run_in_parallel (self ._create_msg_message , fargs )
491- for n , message in ret .items ():
492+ ret = self ._prepare_messages (operation )
493+
494+ for message in ret :
492495 message_to_data [message .body ["request_id" ]] = message
493496 messages .append (message )
494497
@@ -605,16 +608,179 @@ def container_sign(self: MsgSigner, operation: ContainerSignOperation) -> Signin
605608 signer_results .error_message += f"{ error .name } : { error .description } \n "
606609 return signing_results
607610
611+ print ("ALL MESSAGES" , len (all_messages ))
612+ print ("ALL MESSAGES" , all_messages )
608613 operation_result = ContainerSignResult (
609614 signing_key = operation .signing_key , results = ["" ] * len (all_messages ), failed = False
610615 )
611616 for recv_id , _received in recvc .recv .items ():
617+ print ("RECV_ID" , recv_id )
618+ print ("RECEIVED" , _received )
612619 operation_result .failed = True if _received [0 ]["msg" ]["errors" ] else False
613620 operation_result .results [all_messages .index (message_to_data [recv_id ])] = _received
614621 signing_results .operation_result = operation_result
615622 return signing_results
616623
617624
625+ class MsgBatchSigner (MsgSigner ):
626+ """Messaging batch signer class."""
627+
628+ _signer_config_key : str = "msg_batch_signer"
629+
630+ chunk_size : int = field (
631+ init = False ,
632+ metadata = {
633+ "description" : "Identify how many signing claims should be send in one message" ,
634+ "sample" : 10 ,
635+ },
636+ )
637+
638+ SUPPORTED_OPERATIONS : ClassVar [List [Type [SignOperation ]]] = [
639+ ContainerSignOperation ,
640+ ]
641+
642+ def _construct_signing_batch_message (
643+ self : Self ,
644+ claims : List [str ],
645+ signing_keys : List [str ],
646+ repo : str ,
647+ signing_key_names : List [str ] = [],
648+ extra_attrs : Optional [Dict [str , Any ]] = None ,
649+ sig_type : str = SignRequestType .CONTAINER ,
650+ ) -> dict [str , Any ]:
651+ data_attr = "claims" if sig_type == SignRequestType .CONTAINER else "data"
652+ _extra_attrs = extra_attrs or {}
653+ processed_claims = [
654+ {
655+ "claim_file" : claim ,
656+ "sig_keyname" : signing_key_names ,
657+ "sig_key_id" : signing_keys ,
658+ "manifest_digest" : digest ,
659+ }
660+ for claim , digest in zip (claims , _extra_attrs .get ("manifest_digest" , "" ))
661+ ]
662+ message = {
663+ data_attr : processed_claims ,
664+ "request_id" : str (uuid .uuid4 ()),
665+ "created" : isodate_now (),
666+ "requested_by" : self .creator ,
667+ "repo" : repo ,
668+ }
669+ _extra_attrs .pop ("manifest_digest" , None )
670+ message .update (_extra_attrs )
671+ return message
672+
673+ def _create_msg_batch_message (
674+ self : Self ,
675+ data : List [str ],
676+ repo : str ,
677+ operation : SignOperation ,
678+ sig_type : SignRequestType ,
679+ extra_attrs : Optional [Dict [str , Any ]] = None ,
680+ ) -> MsgMessage :
681+ if operation .signing_key in self .key_aliases :
682+ signing_key = self .key_aliases [operation .signing_key ]
683+ LOG .info (f"Using signing key alias { signing_key } for { operation .signing_key } " )
684+ else :
685+ signing_key = operation .signing_key
686+
687+ extra_attrs = extra_attrs or {}
688+ headers = self ._construct_headers (sig_type , extra_attrs = extra_attrs )
689+ if isinstance (operation , ContainerSignOperation ):
690+ extra_attrs ["manifest_digest" ] = operation .digests
691+ ret = MsgMessage (
692+ headers = headers ,
693+ body = self ._construct_signing_batch_message (
694+ data ,
695+ [signing_key ],
696+ repo ,
697+ signing_key_names = [operation .signing_key_name ],
698+ extra_attrs = extra_attrs ,
699+ sig_type = sig_type .value ,
700+ ),
701+ address = self .topic_send_to .format (
702+ ** dict (list (asdict (self ).items ()) + list (asdict (operation ).items ()))
703+ ),
704+ )
705+ LOG .debug (f"Construted message with request_id { ret .body ['request_id' ]} " )
706+ return ret
707+
708+ def _prepare_messages (self : Self , operation : ContainerSignOperation ) -> List [MsgMessage ]:
709+ messages : List [MsgMessage ] = []
710+ repo_groups : Dict [str , Dict [str , List [str ]]] = {}
711+ for digest , reference in zip (operation .digests , operation .references ):
712+ repo = reference .split ("/" , 1 )[1 ].split (":" )[0 ]
713+ if repo not in repo_groups :
714+ repo_groups [repo ] = cast (dict [str , list [str ]], {"digests" : [], "references" : []})
715+ repo_groups [repo ]["digests" ].append (digest )
716+ repo_groups [repo ]["references" ].append (reference )
717+
718+ batch_data : List [FData ] = []
719+ for repo , group in repo_groups .items ():
720+ claims = []
721+ digests = []
722+
723+ for digest , reference in zip (group ["digests" ], group ["references" ]):
724+ claims .append (
725+ self .create_manifest_claim_message (digest = digest , reference = reference )
726+ )
727+ digests .append (digest )
728+ if len (claims ) >= self .chunk_size :
729+ fdata = FData (
730+ args = [claims , repo , operation , SignRequestType .CONTAINER ],
731+ kwargs = {
732+ "extra_attrs" : {
733+ "pub_task_id" : operation .task_id ,
734+ "manifest_digest" : digests ,
735+ }
736+ },
737+ )
738+ batch_data .append (fdata )
739+ claims = []
740+ digests = []
741+ if claims :
742+ fdata = FData (
743+ args = [claims , repo , operation , SignRequestType .CONTAINER ],
744+ kwargs = {
745+ "extra_attrs" : {
746+ "pub_task_id" : operation .task_id ,
747+ "manifest_digest" : digests ,
748+ }
749+ },
750+ )
751+ batch_data .append (fdata )
752+
753+ ret = run_in_parallel (self ._create_msg_batch_message , batch_data )
754+ messages .extend (ret .values ())
755+ return messages
756+
757+ def load_config (self : Self , config_data : Dict [str , Any ]) -> None :
758+ """Load configuration of messaging signer.
759+
760+ Arguments:
761+ config_data (dict): configuration data to load
762+ """
763+ self .messaging_brokers = config_data ["msg_batch_signer" ]["messaging_brokers" ]
764+ self .messaging_cert_key = os .path .expanduser (
765+ config_data ["msg_batch_signer" ]["messaging_cert_key" ]
766+ )
767+ self .messaging_ca_cert = os .path .expanduser (
768+ config_data ["msg_batch_signer" ]["messaging_ca_cert" ]
769+ )
770+ self .topic_send_to = config_data ["msg_batch_signer" ]["topic_send_to" ]
771+ self .topic_listen_to = config_data ["msg_batch_signer" ]["topic_listen_to" ]
772+ self .environment = config_data ["msg_batch_signer" ]["environment" ]
773+ self .service = config_data ["msg_batch_signer" ]["service" ]
774+ self .message_id_key = config_data ["msg_batch_signer" ]["message_id_key" ]
775+ self .retries = config_data ["msg_batch_signer" ]["retries" ]
776+ self .send_retries = config_data ["msg_batch_signer" ]["send_retries" ]
777+ self .log_level = config_data ["msg_batch_signer" ]["log_level" ]
778+ self .timeout = config_data ["msg_batch_signer" ]["timeout" ]
779+ self .creator = self ._get_cert_subject_cn ()
780+ self .key_aliases = config_data ["msg_batch_signer" ].get ("key_aliases" , {})
781+ self .chunk_size = config_data ["msg_batch_signer" ]["chunk_size" ]
782+
783+
618784def msg_clear_sign (
619785 inputs : List [str ],
620786 signing_key : str = "" ,
@@ -669,9 +835,14 @@ def msg_container_sign(
669835 digest : list [str ] = [],
670836 reference : list [str ] = [],
671837 requester : str = "" ,
838+ signer_type : str = "single" ,
672839) -> Dict [str , Any ]:
673840 """Run containersign operation with cli arguments."""
674- msg_signer = MsgSigner ()
841+ if signer_type == "single" :
842+ msg_signer = MsgSigner ()
843+ elif signer_type == "batch" :
844+ msg_signer = MsgBatchSigner ()
845+
675846 config = _get_config_file (config_file )
676847 msg_signer .load_config (load_config (os .path .expanduser (config )))
677848 if requester :
@@ -814,6 +985,9 @@ def msg_clear_sign_main(
814985 default = "INFO" ,
815986 help = "Set log level" ,
816987)
988+ @click .option (
989+ "--signer-type" , type = click .Choice (["single" , "batch" ]), default = "single" , help = "Signer type"
990+ )
817991def msg_container_sign_main (
818992 signing_key : str = "" ,
819993 signing_key_name : str = "" ,
@@ -824,6 +998,7 @@ def msg_container_sign_main(
824998 requester : str = "" ,
825999 raw : bool = False ,
8261000 log_level : str = "INFO" ,
1001+ signer_type : str = "single" ,
8271002) -> None :
8281003 """Entry point method for containersign operation.
8291004
@@ -851,6 +1026,7 @@ def msg_container_sign_main(
8511026 digest = digest ,
8521027 reference = reference ,
8531028 requester = requester ,
1029+ signer_type = signer_type ,
8541030 )
8551031 if not raw :
8561032 click .echo (json .dumps (ret ))
0 commit comments