Skip to content

Commit 9306881

Browse files
committed
wip
1 parent 7aedefd commit 9306881

File tree

5 files changed

+327
-26
lines changed

5 files changed

+327
-26
lines changed

src/nrfcredstore/at_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __at_command_ok(self, cmd):
3030
def __response_line(self):
3131
"""Read a single line from device and return a decoded and trimmed string."""
3232
line = self.comms.read_line()
33-
if line == b'':
33+
if line == None or line == b'':
3434
raise TimeoutError
3535
return line.strip()
3636

src/nrfcredstore/cli.py

Lines changed: 14 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import argparse
22
import sys
33
import serial
4+
import logging
45

56
from nrfcredstore.exceptions import ATCommandError, NoATClientException
6-
from nrfcredstore.at_client import ATClient
7+
from nrfcredstore.command_interface import ATCommandInterface
78
from nrfcredstore.credstore import CredStore, CredType
89
from nrfcredstore.comms import Comms
910

@@ -111,33 +112,29 @@ def exit_with_msg(exitcode, msg):
111112
exit(exitcode)
112113

113114
def main(args, credstore):
114-
try:
115-
credstore.at_client.verify()
116-
credstore.at_client.enable_error_codes()
117-
exec_cmd(args, credstore)
118-
except NoATClientException:
119-
exit_with_msg(ERR_NO_AT_CLIENT, 'The device does not respond to AT commands. Please flash at_client sample.')
120-
except ATCommandError as err:
121-
exit_with_msg(ERR_AT_COMMAND, err)
122-
except TimeoutError as err:
123-
exit_with_msg(ERR_TIMEOUT, 'The device did not respond in time. Please try again.')
124-
except serial.SerialException as err:
125-
exit_with_msg(ERR_SERIAL, f'Serial error: {err}')
126-
except Exception as err:
127-
exit_with_msg(ERR_UNKNOWN, f'Unhandled Error: {err}')
115+
credstore.at_client.verify()
116+
credstore.at_client.enable_error_codes()
117+
exec_cmd(args, credstore)
128118

129119
def run():
120+
logging.basicConfig(level='DEBUG')
130121
args = parse_args(sys.argv[1:])
131122
comms = None
132123

133124
# use inquirer to find the device
134125
if args.dev == 'auto':
135126
comms = Comms(list_all=True, baudrate=args.baudrate, timeout=args.timeout)
127+
elif args.dev == 'rtt':
128+
comms = Comms(rtt=True, baudrate=args.baudrate, timeout=args.timeout)
136129
# if dev is just numbers, assume it's an rtt device
137130
elif args.dev.isdigit():
138131
comms = Comms(rtt = True, serial_number = int(args.dev), timeout=args.timeout)
139132
# otherwise, assume it's a serial device
140133
else:
141-
comms = Comms(serial_port=args.dev, baudrate=args.baudrate, timeout=args.timeout)
134+
comms = Comms(port=args.dev, baudrate=args.baudrate, timeout=args.timeout)
142135

143-
main(args, CredStore(ATClient(comms)))
136+
cred_if = ATCommandInterface(comms)
137+
cred_if.detect_shell_mode()
138+
cred_if.enable_error_codes()
139+
140+
main(args, CredStore(cred_if))
Lines changed: 290 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,290 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (c) 2025 Nordic Semiconductor ASA
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
7+
from enum import Enum
8+
from abc import ABC, abstractmethod
9+
import math
10+
import time
11+
from nrfcloud_utils.comms import Comms
12+
import base64
13+
import hashlib
14+
import coloredlogs, logging
15+
from cryptography import x509
16+
import re
17+
18+
logger = logging.getLogger(__name__)
19+
20+
IMEI_LEN = 15
21+
22+
class CredentialCommandInterface(ABC):
23+
def __init__(self, comms: Comms):
24+
"""Initialize a Credentials Command Interface
25+
26+
Args:
27+
comms: Comms object to use for serial communication.
28+
"""
29+
self.comms = comms
30+
31+
def write_raw(self, command):
32+
"""Write a raw line directly to the serial interface."""
33+
self.comms.write_line(command)
34+
35+
@abstractmethod
36+
def write_credential(self, sectag, cred_type, cred_text):
37+
"""Write a credential string to the command interface"""
38+
return
39+
40+
@abstractmethod
41+
def delete_credential(self, sectag, cred_type):
42+
"""Delete a credential using command interface"""
43+
return
44+
45+
@abstractmethod
46+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
47+
"""Verify that a credential is installed. If check_hash is true, retrieve the SHA hash."""
48+
return
49+
50+
@abstractmethod
51+
def calculate_expected_hash(self, cred_text):
52+
"""Returns the expected digest/hash for a given credential as a string"""
53+
return
54+
55+
@abstractmethod
56+
def get_csr(self, sectag = 0, cn = ""):
57+
"""Generate a private/public keypair and a corresponding Certificate Signing Request.
58+
59+
Returns:
60+
CSR blob in modem specific body.cose format.
61+
"""
62+
return
63+
64+
@abstractmethod
65+
def go_offline(self):
66+
"""Tell the device to go offline so that credentials can be modified"""
67+
return
68+
69+
@abstractmethod
70+
def get_imei(self):
71+
"""Get device IMEI, if applicable"""
72+
return
73+
74+
@abstractmethod
75+
def get_mfw_version(self):
76+
"""Get modem firmware version, if applicable"""
77+
return
78+
79+
class ATKeygenException(Exception):
80+
def __init__(self, message, exit_code):
81+
super().__init__(message)
82+
self.exit_code = exit_code
83+
84+
class ATCommandInterface(CredentialCommandInterface):
85+
shell = False
86+
87+
def _parse_sha(self, cmng_result_str):
88+
# Example AT%CMNG response:
89+
# %CMNG: 123,0,"2C43952EE9E000FF2ACC4E2ED0897C0A72AD5FA72C3D934E81741CBD54F05BD1"
90+
# The first item in " is the SHA.
91+
try:
92+
return cmng_result_str.split('"')[1]
93+
except (ValueError, IndexError):
94+
logger.error(f'Could not parse credential hash: {cmng_result_str}')
95+
return None
96+
97+
def set_shell_mode(self, shell):
98+
self.shell = shell
99+
100+
def detect_shell_mode(self):
101+
"""Detect if the device is in shell mode or not."""
102+
for _ in range(3):
103+
self.write_raw("at AT+CGSN")
104+
result, output = self.comms.expect_response("OK", "ERROR", "")
105+
if result and len(re.findall("[0-9]{15}", output)) > 0:
106+
self.set_shell_mode(True)
107+
return
108+
self.set_shell_mode(False)
109+
110+
def enable_error_codes(self):
111+
"""Enable error codes in the AT client"""
112+
self.at_command('AT+CMEE=1', wait_for_result=True)
113+
114+
def at_command(self, at_command, wait_for_result=False):
115+
"""Write an AT command to the command interface. Optionally wait for OK"""
116+
117+
# AT commands are written directly as-is with the ATCommandInterface:
118+
at_cmd_prefix = 'at ' if self.shell else ''
119+
self.write_raw(f'{at_cmd_prefix}{at_command}')
120+
121+
if wait_for_result:
122+
result, _ = self.comms.expect_response("OK", "ERROR")
123+
return result
124+
else:
125+
return True
126+
127+
def write_credential(self, sectag, cred_type, cred_text):
128+
result = self.at_command(f'AT%CMNG=0,{sectag},{cred_type},"{cred_text}"',
129+
wait_for_result=True)
130+
time.sleep(1)
131+
return result
132+
133+
def delete_credential(self, sectag, cred_type):
134+
# No output is expected beyond OK/ERROR in this case
135+
return self.at_command(f'AT%CMNG=3,{sectag},{cred_type}', wait_for_result=True)
136+
137+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
138+
self.at_command(f'AT%CMNG=1,{sectag},{cred_type}')
139+
retval, res = self.comms.expect_response("OK", "ERROR", "%CMNG")
140+
if retval and res:
141+
if not get_hash:
142+
return True, None
143+
else:
144+
return True, self._parse_sha(res)
145+
146+
return False, None
147+
148+
def calculate_expected_hash(self, cred_text):
149+
# AT Command host returns hex of SHA256 hash of credential plaintext
150+
return hashlib.sha256(cred_text.encode('utf-8')).hexdigest().upper()
151+
152+
def go_offline(self):
153+
return self.at_command('AT+CFUN=4', wait_for_result=True)
154+
155+
def get_imei(self):
156+
self.at_command('AT+CGSN')
157+
retval, output = self.comms.expect_response("OK", "ERROR", "")
158+
if not retval:
159+
return None
160+
return output[:IMEI_LEN]
161+
162+
def get_model_id(self):
163+
self.at_command('AT+CGMM')
164+
retval, output = self.comms.expect_response("OK", "ERROR", "")
165+
if not retval:
166+
return None
167+
return output
168+
169+
def get_mfw_version(self):
170+
self.at_command('AT+CGMR')
171+
retval, output = self.comms.expect_response("OK", "ERROR", "")
172+
if not retval:
173+
return None
174+
return output
175+
176+
def get_attestation_token(self):
177+
self.at_command('AT%ATTESTTOKEN')
178+
retval, output = self.comms.expect_response("OK", "ERROR", "%ATTESTTOKEN:")
179+
if not retval:
180+
return None
181+
attest_tok = output.split('"')[1]
182+
return attest_tok
183+
184+
def get_csr(self, sectag=0, cn=""):
185+
186+
# provide attributes parameter if a custom CN is specified
187+
attr = f',"CN={cn}"' if len(cn) else ''
188+
189+
self.at_command(f'AT%KEYGEN={sectag},2,0{attr}')
190+
191+
# include the CR in OK because 'OK' could be found in the CSR string
192+
retval, output = self.comms.expect_response("OK", "ERROR", "%KEYGEN:")
193+
194+
if not retval:
195+
return None
196+
197+
# convert the encoded blob to an actual cert
198+
csr_blob = str(output).split('"')[1]
199+
logger.debug('CSR blob: {}'.format(csr_blob))
200+
201+
# format is "body.cose"
202+
# body is base64 encoded DER
203+
# cose is base64 encoded COSE header (CBOR)
204+
205+
return csr_blob
206+
207+
TLS_CRED_TYPES = ["CA", "SERV", "PK"]
208+
# This chunk size can be any multiple of 4, as long as it is small enough to fit within the
209+
# Zephyr shell buffer.
210+
TLS_CRED_CHUNK_SIZE = 48
211+
212+
class TLSCredShellInterface(CredentialCommandInterface):
213+
def write_credential(self, sectag, cred_type, cred_text):
214+
# Because the Zephyr shell does not support multi-line commands,
215+
# we must base-64 encode our PEM strings and install them as if they were binary.
216+
# Yes, this does mean we are base-64 encoding a string which is already mostly base-64.
217+
# We could alternatively strip the ===== BEGIN/END XXXX ===== header/footer, and then pass
218+
# everything else directly as a binary payload (using BIN mode instead of BINT, since
219+
# MBedTLS uses the NULL terminator to determine if the credential is raw DER, or is a
220+
# PEM string). But this will fail for multi-CA installs, such as CoAP.
221+
222+
# text -> bytes -> base64 bytes -> base64 text
223+
encoded = base64.b64encode(cred_text.encode()).decode()
224+
225+
# Clear credential buffer -- If it is already clear, there may not be text feedback
226+
self.write_raw("cred buf clear")
227+
228+
# Write the encoded credential in chunks
229+
chunks = math.ceil(len(encoded)/TLS_CRED_CHUNK_SIZE)
230+
for c in range(chunks):
231+
chunk = encoded[c*TLS_CRED_CHUNK_SIZE:(c+1)*TLS_CRED_CHUNK_SIZE]
232+
self.write_raw(f"cred buf {chunk}")
233+
self.comms.expect_response("Stored")
234+
235+
# Store the buffered credential
236+
self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint")
237+
result, _ = self.comms.expect_response("Added TLS credential")
238+
time.sleep(1)
239+
return result
240+
241+
def delete_credential(self, sectag, cred_type):
242+
self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}')
243+
result, _ = self.comms.expect_response("Deleted TLS credential", "There is no TLS credential")
244+
time.sleep(2)
245+
return result
246+
247+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
248+
self.write_raw(f'cred list {sectag} {TLS_CRED_TYPES[cred_type]}')
249+
250+
# This will capture the list dump for the credential if it exists.
251+
result, output = self.comms.expect_response("1 credentials found.",
252+
"0 credentials found.",
253+
f"{sectag},{TLS_CRED_TYPES[cred_type]}")
254+
255+
if not output:
256+
return False, None
257+
258+
if not get_hash:
259+
return True, None
260+
261+
# Output is a comma separated list of positional items
262+
data = output.split(",")
263+
hash = data[2].strip()
264+
status_code = data[3].strip()
265+
266+
if (status_code != "0"):
267+
logger.error(f"Error retrieving credential hash: {output.strip()}.")
268+
logger.error("Device might not support credential digests.")
269+
return True, None
270+
271+
return True, hash
272+
273+
def calculate_expected_hash(self, cred_text):
274+
# TLS Credentials shell returns base-64 of SHA256 hash of full credential, including NULL
275+
# termination.
276+
hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00')
277+
return base64.b64encode(hash.digest()).decode()
278+
279+
def get_csr(self, sectag = 0, cn = ""):
280+
raise RuntimeError("The TLS Credentials Shell does not support CSR generation")
281+
282+
def go_offline(self):
283+
# TLS credentials shell has no concept of online/offline. Just no-op.
284+
pass
285+
286+
def get_imei(self):
287+
raise RuntimeError("The TLS Credentials Shell does not support IMEI extraction")
288+
289+
def get_mfw_version(self):
290+
raise RuntimeError("The TLS Credentials Shell does not support MFW version extraction")

src/nrfcredstore/comms.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ def parser_add_comms_args(parser):
8585
(r"NRFBLEGW", "nRF Cloud Gateway", 0),
8686
]
8787

88+
ERR_CODE_TO_MSG = {
89+
0: 'AT command not supported by firmware version. Upgrade modem firmware?',
90+
513: 'Not found',
91+
514: 'Not allowed',
92+
515: 'Memory full',
93+
518: 'Not allowed in active state',
94+
519: 'Already exists',
95+
523: 'Key generation failed',
96+
}
97+
8898
# HWIDs look different on different platforms:
8999
# Linux: 'USB VID:PID=1366:1059 SER=001051216197 LOCATION=3-12.1.3.2.1.4:1.0'
90100
# MacOS: 'USB VID:PID=1366:1059 SER=001051246141 LOCATION=0-1.4.2.3'
@@ -337,6 +347,9 @@ def expect_response(self, ok_str=None, error_str=None, store_str=None, timeout=1
337347
return (True, output)
338348
if error_str and error_str == ansi_escape.sub('', line.strip()):
339349
return (False, output)
350+
if line.strip().startswith('+CME ERROR'):
351+
code = int(line.strip().replace('+CME ERROR: ', ''))
352+
logging.error(f'AT command error: {ERR_CODE_TO_MSG.get(code, "Unknown error")}')
340353
if (store_str is not None) and store_str in line:
341354
output += line
342355
time.sleep(0.1)

0 commit comments

Comments
 (0)