Skip to content

Commit 9aae54d

Browse files
committed
Extend payload_extract command with possibility of recursive extraction
Ref: NCSDK-31668 Signed-off-by: Artur Hadasz <[email protected]>
1 parent 0a8712a commit 9aae54d

File tree

1 file changed

+141
-14
lines changed

1 file changed

+141
-14
lines changed

suit_generator/cmd_payload_extract.py

Lines changed: 141 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,39 +7,83 @@
77

88
import cbor2
99
import logging
10+
import re
11+
from suit_generator.exceptions import GeneratorError
1012

1113
log = logging.getLogger(__name__)
1214

1315
PAYLOAD_EXTRACT_CMD = "payload_extract"
1416

17+
PAYLOAD_EXTRACT_SINGLE_CMD = "single"
18+
PAYLOAD_EXTRACT_RECURSIVE_CMD = "recursive"
19+
1520

1621
def add_arguments(parser):
1722
"""Add additional arguments to the passed parser."""
1823
cmd_payload_extract_arg_parser = parser.add_parser(PAYLOAD_EXTRACT_CMD, help="Create raw cache structure.")
1924

20-
cmd_payload_extract_arg_parser.add_argument("--input-envelope", required=True, help="Input envelope file path.")
21-
cmd_payload_extract_arg_parser.add_argument("--output-envelope", required=True, help="Output envelope file path.")
22-
cmd_payload_extract_arg_parser.add_argument(
25+
cmd_payload_extract_subparsers = cmd_payload_extract_arg_parser.add_subparsers(
26+
dest="payload_extract_subcommand", required=True, help="Choose sign subcommand"
27+
)
28+
29+
cmd_payload_extract_single = cmd_payload_extract_subparsers.add_parser(
30+
PAYLOAD_EXTRACT_SINGLE_CMD,
31+
help="Extact a single payload from a SUIT envelope.",
32+
)
33+
34+
cmd_payload_extract_single.add_argument("--input-envelope", required=True, help="Input envelope file path.")
35+
cmd_payload_extract_single.add_argument("--output-envelope", required=True, help="Output envelope file path.")
36+
cmd_payload_extract_single.add_argument(
2337
"--payload-name", required=True, help="Name of the integrated payload to extract."
2438
)
25-
cmd_payload_extract_arg_parser.add_argument(
39+
cmd_payload_extract_single.add_argument(
2640
"--output-payload-file",
2741
required=False,
2842
help="Output payload file path to store the extracted payload."
2943
+ "If not provided, the payload will not be stored to a file.",
3044
)
3145

32-
cmd_payload_extract_arg_parser.add_argument(
46+
cmd_payload_extract_single.add_argument(
3347
"--payload-replace-path",
34-
help="Path to the integrated payload to replace the extracted payload with."
48+
help="Path to the integrated payload to replace the extracted payload with. "
3549
+ "If not provided, the payload will be removed from the envelope.",
3650
)
3751

52+
cmd_payload_extract_recursive = cmd_payload_extract_subparsers.add_parser(
53+
PAYLOAD_EXTRACT_RECURSIVE_CMD,
54+
help="Recursively extract payloads from a SUIT envelope based on regular expressions. "
55+
+ "The resulting file names will be the based on the payload names.",
56+
)
57+
cmd_payload_extract_recursive.add_argument("--input-envelope", required=True, help="Input envelope file path.")
58+
59+
cmd_payload_extract_recursive.add_argument(
60+
"--output-envelope", required=True, help="Output envelope file path (envelope with removed extracted payloads)."
61+
)
62+
63+
cmd_payload_extract_recursive.add_argument(
64+
"--omit-payload-regex",
65+
help="Integrated payloads matching the regular expression will not be extracted into files.",
66+
)
67+
68+
cmd_payload_extract_recursive.add_argument(
69+
"--dependency-regex",
70+
help="Integrated payloads matching the regular expression will be treated as dependency"
71+
+ "envelopes and parsed hierarchically. "
72+
+ "The payloads extracted from the dependency envelopes will be extracted into files.",
73+
)
74+
75+
cmd_payload_extract_recursive.add_argument(
76+
"--payload-file-prefix-to-remove",
77+
help="Prefix to remove from the extracted payload file names. "
78+
+ "For example, if the payload is named file://my_file.bin, the prefix file:// will be removed if "
79+
+ "this argument is set to 'file://'.",
80+
)
3881

39-
def main(
40-
input_envelope: str, output_envelope: str, payload_name: str, output_payload_file: str, payload_replace_path: str
41-
) -> None:
42-
"""Extract an integrated payload from a SUIT envelope.
82+
83+
def payload_extract_single(
84+
envelope: cbor2.CBORTag, payload_name: str, output_payload_file: str, payload_replace_path: str
85+
) -> cbor2.CBORTag:
86+
"""Extract a single integrated payload from a SUIT envelope.
4387
4488
:param input_envelope: input envelope file path
4589
:param output_envelope: output envelope file path
@@ -48,9 +92,8 @@ def main(
4892
None if the payload should not be stored to a file
4993
:param payload_replace_path: Path to the integrated payload to replace the extracted payload with.
5094
None if the payload should be removed from the envelope.
95+
:return envelope with removed extracted payload
5196
"""
52-
with open(input_envelope, "rb") as fh:
53-
envelope = cbor2.load(fh)
5497
extracted_payload = envelope.value.pop(payload_name, None)
5598

5699
if extracted_payload is None:
@@ -60,8 +103,92 @@ def main(
60103
with open(payload_replace_path, "rb") as fh:
61104
envelope.value[payload_name] = fh.read()
62105

63-
with open(output_envelope, "wb") as fh:
64-
cbor2.dump(envelope, fh)
65106
if output_payload_file is not None:
66107
with open(output_payload_file, "wb") as fh:
67108
fh.write(extracted_payload)
109+
110+
return envelope
111+
112+
113+
def payload_extract_recursive(
114+
envelope: cbor2.CBORTag, omit_payload_regex: str, dependency_regex: str, payload_file_prefix_to_remove: str = None
115+
) -> cbor2.CBORTag:
116+
"""Recursively extract payloads from a SUIT envelope based on regular expressions.
117+
118+
:param input_envelope: input envelope file path
119+
:param output_envelope: output envelope file path
120+
:param omit_payload_regex: integrated payloads matching the regular expression will not be extracted into files
121+
:param dependency_regex: integrated payloads matching the regular expression will be treated as dependency
122+
envelopes and parsed hierarchically. The payloads extracted from the dependency envelopes will be extracted
123+
into files.
124+
:return: envelope with removed extracted payloads
125+
"""
126+
integrated = [k for k in envelope.value.keys() if isinstance(k, str)]
127+
128+
if dependency_regex is not None:
129+
integrated_dependencies = [k for k in integrated if not re.fullmatch(dependency_regex, k) is None]
130+
for dep in integrated_dependencies:
131+
integrated.remove(dep)
132+
else:
133+
integrated_dependencies = []
134+
135+
if omit_payload_regex is None:
136+
payloads_to_extract = integrated
137+
else:
138+
payloads_to_extract = [k for k in integrated if re.fullmatch(omit_payload_regex, k) is None]
139+
140+
for payload in payloads_to_extract:
141+
payload_file_name = payload
142+
# Remove prefix from the payload file name
143+
if payload_file_prefix_to_remove is not None:
144+
if payload.startswith(payload_file_prefix_to_remove):
145+
payload_file_name = payload[len(payload_file_prefix_to_remove) :]
146+
with open(payload_file_name, "wb") as fh:
147+
fh.write(envelope.value.pop(payload))
148+
149+
for dependency in integrated_dependencies:
150+
try:
151+
try:
152+
dependency_envelope = cbor2.loads(envelope.value[dependency])
153+
except Exception:
154+
raise GeneratorError("The dependency is not a valid envelope!")
155+
156+
if not (isinstance(dependency_envelope, cbor2.CBORTag) and isinstance(dependency_envelope.value, dict)):
157+
raise GeneratorError("The dependency is not a valid envelope!")
158+
159+
new_dependency_envelope = payload_extract_recursive(
160+
dependency_envelope, omit_payload_regex, dependency_regex, payload_file_prefix_to_remove
161+
)
162+
except GeneratorError as e:
163+
log.log(logging.ERROR, "Failed to extract payloads from dependency %s: %s", dependency, repr(e))
164+
raise GeneratorError("Failed to extract payloads from envelope!")
165+
166+
envelope.value[dependency] = cbor2.dumps(new_dependency_envelope)
167+
168+
return envelope
169+
170+
171+
def main(**kwargs) -> None:
172+
"""Extract a integrated payloads from a SUIT envelope."""
173+
with open(kwargs["input_envelope"], "rb") as fh:
174+
envelope = cbor2.load(fh)
175+
176+
if kwargs["payload_extract_subcommand"] == PAYLOAD_EXTRACT_SINGLE_CMD:
177+
output_envelope = payload_extract_single(
178+
envelope,
179+
kwargs["payload_name"],
180+
kwargs["output_payload_file"],
181+
kwargs["payload_replace_path"],
182+
)
183+
elif kwargs["payload_extract_subcommand"] == PAYLOAD_EXTRACT_RECURSIVE_CMD:
184+
output_envelope = payload_extract_recursive(
185+
envelope,
186+
kwargs["omit_payload_regex"],
187+
kwargs["dependency_regex"],
188+
kwargs["payload_file_prefix_to_remove"],
189+
)
190+
else:
191+
raise GeneratorError(f"Invalid 'payload_extract' subcommand: {kwargs['payload_extract_subcommand']}")
192+
193+
with open(kwargs["output_envelope"], "wb") as fh:
194+
cbor2.dump(output_envelope, fh)

0 commit comments

Comments
 (0)