Skip to content

Commit 4052729

Browse files
committed
add comms and command_interface from nrfcloud-utils
Signed-off-by: Maximilian Deubel <[email protected]>
1 parent 6f9a4cf commit 4052729

File tree

4 files changed

+1329
-0
lines changed

4 files changed

+1329
-0
lines changed
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (c) 2025 Nordic Semiconductor ASA
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
7+
from enum import Enum
8+
from abc import ABC, abstractmethod
9+
import math
10+
import time
11+
from nrfcredstore.comms import Comms
12+
import base64
13+
import hashlib
14+
import coloredlogs, logging
15+
import re
16+
17+
logger = logging.getLogger(__name__)
18+
19+
IMEI_LEN = 15
20+
21+
class CredentialCommandInterface(ABC):
22+
def __init__(self, comms: Comms):
23+
"""Initialize a Credentials Command Interface
24+
25+
Args:
26+
comms: Comms object to use for serial communication.
27+
"""
28+
self.comms = comms
29+
30+
def write_raw(self, command: str):
31+
"""Write a raw line directly to the serial interface."""
32+
self.comms.write_line(command)
33+
34+
@abstractmethod
35+
def write_credential(self, sectag: int, cred_type: int, cred_text: str):
36+
"""Write a credential string to the command interface"""
37+
return
38+
39+
@abstractmethod
40+
def delete_credential(self, sectag: int, cred_type: int):
41+
"""Delete a credential using command interface"""
42+
return
43+
44+
@abstractmethod
45+
def check_credential_exists(self, sectag: int, cred_type: int, get_hash=True):
46+
"""Verify that a credential is installed. If check_hash is true, retrieve the SHA hash."""
47+
return
48+
49+
@abstractmethod
50+
def calculate_expected_hash(self, cred_text: str):
51+
"""Returns the expected digest/hash for a given credential as a string"""
52+
return
53+
54+
@abstractmethod
55+
def get_csr(self, sectag: int, attributes: str):
56+
"""Generate a private/public keypair and a corresponding Certificate Signing Request.
57+
58+
Returns:
59+
CSR blob in modem specific body.cose format.
60+
"""
61+
return
62+
63+
@abstractmethod
64+
def go_offline(self):
65+
"""Tell the device to go offline so that credentials can be modified"""
66+
return
67+
68+
@abstractmethod
69+
def get_imei(self):
70+
"""Get device IMEI, if applicable"""
71+
return
72+
73+
@abstractmethod
74+
def get_mfw_version(self):
75+
"""Get modem firmware version, if applicable"""
76+
return
77+
78+
class ATCommandInterface(CredentialCommandInterface):
79+
shell = False
80+
81+
def _parse_sha(self, cmng_result_str: str):
82+
# Example AT%CMNG response:
83+
# %CMNG: 123,0,"2C43952EE9E000FF2ACC4E2ED0897C0A72AD5FA72C3D934E81741CBD54F05BD1"
84+
# The first item in " is the SHA.
85+
try:
86+
return cmng_result_str.split('"')[1]
87+
except (ValueError, IndexError):
88+
logger.error(f'Could not parse credential hash: {cmng_result_str}')
89+
return None
90+
91+
def set_shell_mode(self, shell: bool):
92+
self.shell = shell
93+
94+
def detect_shell_mode(self):
95+
"""Detect if the device is in shell mode or not."""
96+
for cmd, shell_mode in [("at AT+CGSN", True), ("AT+CGSN", False)]:
97+
for _ in range(3):
98+
self.write_raw(cmd)
99+
result, output = self.comms.expect_response("OK", "ERROR", "", suppress_errors=True, timeout=1)
100+
if result and len(re.findall("[0-9]{15}", output)) > 0:
101+
self.set_shell_mode(shell_mode)
102+
return
103+
raise TimeoutError("Failed to detect shell mode. Device does not respond to AT commands.")
104+
105+
def enable_error_codes(self):
106+
"""Enable error codes in the AT client"""
107+
if not self.at_command('AT+CMEE=1', wait_for_result=True):
108+
logger.error("Failed to enable error codes.")
109+
110+
def at_command(self, at_command: str, wait_for_result=False, suppress_errors=False):
111+
"""Write an AT command to the command interface. Optionally wait for OK"""
112+
113+
if self.shell:
114+
# transform line endings to match shell expectations
115+
at_command = at_command.replace("\r", "")
116+
at_command = at_command.replace("\n", "\\n")
117+
self.write_raw("at '" + at_command + "'")
118+
else:
119+
self.write_raw(at_command)
120+
121+
if wait_for_result:
122+
result, _ = self.comms.expect_response("OK", "ERROR", suppress_errors=suppress_errors)
123+
return result
124+
else:
125+
return True
126+
127+
def write_credential(self, sectag: int, cred_type: int, cred_text: str):
128+
return self.at_command(f'AT%CMNG=0,{sectag},{cred_type},"{cred_text}"', wait_for_result=True)
129+
130+
def delete_credential(self, sectag: int, cred_type: int):
131+
return self.at_command(f'AT%CMNG=3,{sectag},{cred_type}', wait_for_result=True)
132+
133+
def check_credential_exists(self, sectag: int, cred_type: int, get_hash=True):
134+
self.at_command(f'AT%CMNG=1,{sectag},{cred_type}')
135+
retval, res = self.comms.expect_response("OK", "ERROR", "%CMNG")
136+
if retval and res:
137+
if not get_hash:
138+
return True, None
139+
else:
140+
return True, self._parse_sha(res)
141+
142+
return False, None
143+
144+
def calculate_expected_hash(self, cred_text: str):
145+
# AT Command host returns hex of SHA256 hash of credential plaintext
146+
return hashlib.sha256(cred_text.encode('utf-8')).hexdigest().upper()
147+
148+
def go_offline(self):
149+
return self.at_command('AT+CFUN=4', wait_for_result=True)
150+
151+
def get_imei(self):
152+
self.at_command('AT+CGSN')
153+
retval, output = self.comms.expect_response("OK", "ERROR", "")
154+
if not retval:
155+
return None
156+
return output[:IMEI_LEN]
157+
158+
def get_model_id(self):
159+
self.at_command('AT+CGMM')
160+
retval, output = self.comms.expect_response("OK", "ERROR", "")
161+
if not retval:
162+
return None
163+
return output
164+
165+
def get_mfw_version(self):
166+
self.at_command('AT+CGMR')
167+
retval, output = self.comms.expect_response("OK", "ERROR", "")
168+
if not retval:
169+
return None
170+
return output
171+
172+
def get_attestation_token(self):
173+
self.at_command('AT%ATTESTTOKEN')
174+
retval, output = self.comms.expect_response("OK", "ERROR", "%ATTESTTOKEN:")
175+
if not retval:
176+
return None
177+
attest_tok = output.split('"')[1]
178+
return attest_tok
179+
180+
def get_csr(self, sectag=0, attributes=""):
181+
if attributes:
182+
self.at_command(f'AT%KEYGEN={sectag},2,0,"{attributes}"')
183+
else:
184+
self.at_command(f'AT%KEYGEN={sectag},2,0')
185+
186+
retval, output = self.comms.expect_response("OK", "ERROR", "%KEYGEN:")
187+
188+
if not retval:
189+
return None
190+
191+
# convert the encoded blob to an actual cert
192+
csr_blob = str(output).split('"')[1]
193+
logger.debug('CSR blob: {}'.format(csr_blob))
194+
195+
# format is "body.cose"
196+
# body is base64 encoded DER
197+
# cose is base64 encoded COSE header (CBOR)
198+
199+
return csr_blob
200+
201+
TLS_CRED_TYPES = ["CA", "SERV", "PK"]
202+
# This chunk size can be any multiple of 4, as long as it is small enough to fit within the
203+
# Zephyr shell buffer.
204+
TLS_CRED_CHUNK_SIZE = 48
205+
206+
class TLSCredShellInterface(CredentialCommandInterface):
207+
def write_credential(self, sectag, cred_type, cred_text):
208+
# Because the Zephyr shell does not support multi-line commands,
209+
# we must base-64 encode our PEM strings and install them as if they were binary.
210+
# Yes, this does mean we are base-64 encoding a string which is already mostly base-64.
211+
# We could alternatively strip the ===== BEGIN/END XXXX ===== header/footer, and then pass
212+
# everything else directly as a binary payload (using BIN mode instead of BINT, since
213+
# MBedTLS uses the NULL terminator to determine if the credential is raw DER, or is a
214+
# PEM string). But this will fail for multi-CA installs, such as CoAP.
215+
216+
# text -> bytes -> base64 bytes -> base64 text
217+
encoded = base64.b64encode(cred_text.encode()).decode()
218+
219+
# Clear credential buffer -- If it is already clear, there may not be text feedback
220+
self.write_raw("cred buf clear")
221+
222+
# Write the encoded credential in chunks
223+
chunks = math.ceil(len(encoded)/TLS_CRED_CHUNK_SIZE)
224+
for c in range(chunks):
225+
chunk = encoded[c*TLS_CRED_CHUNK_SIZE:(c+1)*TLS_CRED_CHUNK_SIZE]
226+
self.write_raw(f"cred buf {chunk}")
227+
self.comms.expect_response("Stored")
228+
229+
# Store the buffered credential
230+
self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint")
231+
result, _ = self.comms.expect_response("Added TLS credential")
232+
time.sleep(1)
233+
return result
234+
235+
def delete_credential(self, sectag: int, cred_type: int):
236+
self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}')
237+
result, _ = self.comms.expect_response("Deleted TLS credential", "There is no TLS credential")
238+
time.sleep(2)
239+
return result
240+
241+
def check_credential_exists(self, sectag: int, cred_type: int, get_hash=True):
242+
self.write_raw(f'cred list {sectag} {TLS_CRED_TYPES[cred_type]}')
243+
244+
# This will capture the list dump for the credential if it exists.
245+
result, output = self.comms.expect_response("1 credentials found.",
246+
"0 credentials found.",
247+
f"{sectag},{TLS_CRED_TYPES[cred_type]}")
248+
249+
if not output:
250+
return False, None
251+
252+
if not get_hash:
253+
return True, None
254+
255+
# Output is a comma separated list of positional items
256+
data = output.split(",")
257+
hash = data[2].strip()
258+
status_code = data[3].strip()
259+
260+
if (status_code != "0"):
261+
logger.error(f"Error retrieving credential hash: {output.strip()}.")
262+
logger.error("Device might not support credential digests.")
263+
return True, None
264+
265+
return True, hash
266+
267+
def calculate_expected_hash(self, cred_text: str):
268+
# TLS Credentials shell returns base-64 of SHA256 hash of full credential, including NULL
269+
# termination.
270+
hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00')
271+
return base64.b64encode(hash.digest()).decode()
272+
273+
def get_csr(self, sectag=0, attributes=""):
274+
raise RuntimeError("The TLS Credentials Shell does not support CSR generation")
275+
276+
def go_offline(self):
277+
# TLS credentials shell has no concept of online/offline. Just no-op.
278+
pass
279+
280+
def get_imei(self):
281+
raise RuntimeError("The TLS Credentials Shell does not support IMEI extraction")
282+
283+
def get_mfw_version(self):
284+
raise RuntimeError("The TLS Credentials Shell does not support MFW version extraction")

0 commit comments

Comments
 (0)