77
88import cbor2
99import logging
10+ import re
11+ from suit_generator .exceptions import GeneratorError
1012
1113log = logging .getLogger (__name__ )
1214
1315PAYLOAD_EXTRACT_CMD = "payload_extract"
1416
17+ PAYLOAD_EXTRACT_SINGLE_CMD = "single"
18+ PAYLOAD_EXTRACT_RECURSIVE_CMD = "recursive"
19+
1520
1621def add_arguments (parser ):
1722 """Add additional arguments to the passed parser."""
1823 cmd_payload_extract_arg_parser = parser .add_parser (PAYLOAD_EXTRACT_CMD , help = "Create raw cache structure." )
1924
20- cmd_payload_extract_arg_parser .add_argument ("--input-envelope" , required = True , help = "Input envelope file path." )
21- cmd_payload_extract_arg_parser .add_argument ("--output-envelope" , required = True , help = "Output envelope file path." )
22- cmd_payload_extract_arg_parser .add_argument (
25+ cmd_payload_extract_subparsers = cmd_payload_extract_arg_parser .add_subparsers (
26+ dest = "payload_extract_subcommand" , required = True , help = "Choose sign subcommand"
27+ )
28+
29+ cmd_payload_extract_single = cmd_payload_extract_subparsers .add_parser (
30+ PAYLOAD_EXTRACT_SINGLE_CMD ,
31+ help = "Extact a single payload from a SUIT envelope." ,
32+ )
33+
34+ cmd_payload_extract_single .add_argument ("--input-envelope" , required = True , help = "Input envelope file path." )
35+ cmd_payload_extract_single .add_argument ("--output-envelope" , required = True , help = "Output envelope file path." )
36+ cmd_payload_extract_single .add_argument (
2337 "--payload-name" , required = True , help = "Name of the integrated payload to extract."
2438 )
25- cmd_payload_extract_arg_parser .add_argument (
39+ cmd_payload_extract_single .add_argument (
2640 "--output-payload-file" ,
2741 required = False ,
2842 help = "Output payload file path to store the extracted payload."
2943 + "If not provided, the payload will not be stored to a file." ,
3044 )
3145
32- cmd_payload_extract_arg_parser .add_argument (
46+ cmd_payload_extract_single .add_argument (
3347 "--payload-replace-path" ,
34- help = "Path to the integrated payload to replace the extracted payload with."
48+ help = "Path to the integrated payload to replace the extracted payload with. "
3549 + "If not provided, the payload will be removed from the envelope." ,
3650 )
3751
52+ cmd_payload_extract_recursive = cmd_payload_extract_subparsers .add_parser (
53+ PAYLOAD_EXTRACT_RECURSIVE_CMD ,
54+ help = "Recursively extract payloads from a SUIT envelope based on regular expressions. "
55+ + "The resulting file names will be the based on the payload names." ,
56+ )
57+ cmd_payload_extract_recursive .add_argument ("--input-envelope" , required = True , help = "Input envelope file path." )
58+
59+ cmd_payload_extract_recursive .add_argument (
60+ "--output-envelope" , required = True , help = "Output envelope file path (envelope with removed extracted payloads)."
61+ )
62+
63+ cmd_payload_extract_recursive .add_argument (
64+ "--omit-payload-regex" ,
65+ help = "Integrated payloads matching the regular expression will not be extracted into files." ,
66+ )
67+
68+ cmd_payload_extract_recursive .add_argument (
69+ "--dependency-regex" ,
70+ help = "Integrated payloads matching the regular expression will be treated as dependency"
71+ + "envelopes and parsed hierarchically. "
72+ + "The payloads extracted from the dependency envelopes will be extracted into files." ,
73+ )
74+
75+ cmd_payload_extract_recursive .add_argument (
76+ "--payload-file-prefix-to-remove" ,
77+ help = "Prefix to remove from the extracted payload file names. "
78+ + "For example, if the payload is named file://my_file.bin, the prefix file:// will be removed if "
79+ + "this argument is set to 'file://'." ,
80+ )
3881
39- def main (
40- input_envelope : str , output_envelope : str , payload_name : str , output_payload_file : str , payload_replace_path : str
41- ) -> None :
42- """Extract an integrated payload from a SUIT envelope.
82+
83+ def payload_extract_single (
84+ envelope : cbor2 .CBORTag , payload_name : str , output_payload_file : str , payload_replace_path : str
85+ ) -> cbor2 .CBORTag :
86+ """Extract a single integrated payload from a SUIT envelope.
4387
4488 :param input_envelope: input envelope file path
4589 :param output_envelope: output envelope file path
@@ -48,9 +92,8 @@ def main(
4892 None if the payload should not be stored to a file
4993 :param payload_replace_path: Path to the integrated payload to replace the extracted payload with.
5094 None if the payload should be removed from the envelope.
95+ :return envelope with removed extracted payload
5196 """
52- with open (input_envelope , "rb" ) as fh :
53- envelope = cbor2 .load (fh )
5497 extracted_payload = envelope .value .pop (payload_name , None )
5598
5699 if extracted_payload is None :
@@ -60,8 +103,92 @@ def main(
60103 with open (payload_replace_path , "rb" ) as fh :
61104 envelope .value [payload_name ] = fh .read ()
62105
63- with open (output_envelope , "wb" ) as fh :
64- cbor2 .dump (envelope , fh )
65106 if output_payload_file is not None :
66107 with open (output_payload_file , "wb" ) as fh :
67108 fh .write (extracted_payload )
109+
110+ return envelope
111+
112+
113+ def payload_extract_recursive (
114+ envelope : cbor2 .CBORTag , omit_payload_regex : str , dependency_regex : str , payload_file_prefix_to_remove : str = None
115+ ) -> cbor2 .CBORTag :
116+ """Recursively extract payloads from a SUIT envelope based on regular expressions.
117+
118+ :param input_envelope: input envelope file path
119+ :param output_envelope: output envelope file path
120+ :param omit_payload_regex: integrated payloads matching the regular expression will not be extracted into files
121+ :param dependency_regex: integrated payloads matching the regular expression will be treated as dependency
122+ envelopes and parsed hierarchically. The payloads extracted from the dependency envelopes will be extracted
123+ into files.
124+ :return: envelope with removed extracted payloads
125+ """
126+ integrated = [k for k in envelope .value .keys () if isinstance (k , str )]
127+
128+ if dependency_regex is not None :
129+ integrated_dependencies = [k for k in integrated if not re .fullmatch (dependency_regex , k ) is None ]
130+ for dep in integrated_dependencies :
131+ integrated .remove (dep )
132+ else :
133+ integrated_dependencies = []
134+
135+ if omit_payload_regex is None :
136+ payloads_to_extract = integrated
137+ else :
138+ payloads_to_extract = [k for k in integrated if re .fullmatch (omit_payload_regex , k ) is None ]
139+
140+ for payload in payloads_to_extract :
141+ payload_file_name = payload
142+ # Remove prefix from the payload file name
143+ if payload_file_prefix_to_remove is not None :
144+ if payload .startswith (payload_file_prefix_to_remove ):
145+ payload_file_name = payload [len (payload_file_prefix_to_remove ) :]
146+ with open (payload_file_name , "wb" ) as fh :
147+ fh .write (envelope .value .pop (payload ))
148+
149+ for dependency in integrated_dependencies :
150+ try :
151+ try :
152+ dependency_envelope = cbor2 .loads (envelope .value [dependency ])
153+ except Exception :
154+ raise GeneratorError ("The dependency is not a valid envelope!" )
155+
156+ if not (isinstance (dependency_envelope , cbor2 .CBORTag ) and isinstance (dependency_envelope .value , dict )):
157+ raise GeneratorError ("The dependency is not a valid envelope!" )
158+
159+ new_dependency_envelope = payload_extract_recursive (
160+ dependency_envelope , omit_payload_regex , dependency_regex , payload_file_prefix_to_remove
161+ )
162+ except GeneratorError as e :
163+ log .log (logging .ERROR , "Failed to extract payloads from dependency %s: %s" , dependency , repr (e ))
164+ raise GeneratorError ("Failed to extract payloads from envelope!" )
165+
166+ envelope .value [dependency ] = cbor2 .dumps (new_dependency_envelope )
167+
168+ return envelope
169+
170+
171+ def main (** kwargs ) -> None :
172+ """Extract a integrated payloads from a SUIT envelope."""
173+ with open (kwargs ["input_envelope" ], "rb" ) as fh :
174+ envelope = cbor2 .load (fh )
175+
176+ if kwargs ["payload_extract_subcommand" ] == PAYLOAD_EXTRACT_SINGLE_CMD :
177+ output_envelope = payload_extract_single (
178+ envelope ,
179+ kwargs ["payload_name" ],
180+ kwargs ["output_payload_file" ],
181+ kwargs ["payload_replace_path" ],
182+ )
183+ elif kwargs ["payload_extract_subcommand" ] == PAYLOAD_EXTRACT_RECURSIVE_CMD :
184+ output_envelope = payload_extract_recursive (
185+ envelope ,
186+ kwargs ["omit_payload_regex" ],
187+ kwargs ["dependency_regex" ],
188+ kwargs ["payload_file_prefix_to_remove" ],
189+ )
190+ else :
191+ raise GeneratorError (f"Invalid 'payload_extract' subcommand: { kwargs ['payload_extract_subcommand' ]} " )
192+
193+ with open (kwargs ["output_envelope" ], "wb" ) as fh :
194+ cbor2 .dump (output_envelope , fh )
0 commit comments