Skip to content

Commit e1bba7d

Browse files
committed
wip
1 parent 0f88507 commit e1bba7d

File tree

2 files changed

+684
-0
lines changed

2 files changed

+684
-0
lines changed

command_interface.py

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Copyright (c) 2025 Nordic Semiconductor ASA
4+
#
5+
# SPDX-License-Identifier: BSD-3-Clause
6+
7+
from enum import Enum
8+
from abc import ABC, abstractmethod
9+
import math
10+
import time
11+
from nrfcloud_utils.comms import Comms
12+
import base64
13+
import hashlib
14+
import coloredlogs, logging
15+
from cryptography import x509
16+
import re
17+
18+
logger = logging.getLogger(__name__)
19+
20+
IMEI_LEN = 15
21+
22+
class CredentialCommandInterface(ABC):
23+
def __init__(self, comms: Comms):
24+
"""Initialize a Credentials Command Interface
25+
26+
Args:
27+
comms: Comms object to use for serial communication.
28+
"""
29+
self.comms = comms
30+
31+
def write_raw(self, command):
32+
"""Write a raw line directly to the serial interface."""
33+
self.comms.write_line(command)
34+
35+
@abstractmethod
36+
def write_credential(self, sectag, cred_type, cred_text):
37+
"""Write a credential string to the command interface"""
38+
return
39+
40+
@abstractmethod
41+
def delete_credential(self, sectag, cred_type):
42+
"""Delete a credential using command interface"""
43+
return
44+
45+
@abstractmethod
46+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
47+
"""Verify that a credential is installed. If check_hash is true, retrieve the SHA hash."""
48+
return
49+
50+
@abstractmethod
51+
def calculate_expected_hash(self, cred_text):
52+
"""Returns the expected digest/hash for a given credential as a string"""
53+
return
54+
55+
@abstractmethod
56+
def get_csr(self, sectag = 0, cn = ""):
57+
"""Generate a private/public keypair and a corresponding Certificate Signing Request.
58+
59+
Returns:
60+
CSR as X509Req object.
61+
"""
62+
return
63+
64+
@abstractmethod
65+
def go_offline(self):
66+
"""Tell the device to go offline so that credentials can be modified"""
67+
return
68+
69+
@abstractmethod
70+
def get_imei(self):
71+
"""Get device IMEI, if applicable"""
72+
return
73+
74+
@abstractmethod
75+
def get_mfw_version(self):
76+
"""Get modem firmware version, if applicable"""
77+
return
78+
79+
class ATKeygenException(Exception):
80+
def __init__(self, message, exit_code):
81+
super().__init__(message)
82+
self.exit_code = exit_code
83+
84+
class ATCommandInterface(CredentialCommandInterface):
85+
shell = False
86+
87+
def _parse_sha(self, cmng_result_str):
88+
# Example AT%CMNG response:
89+
# %CMNG: 123,0,"2C43952EE9E000FF2ACC4E2ED0897C0A72AD5FA72C3D934E81741CBD54F05BD1"
90+
# The first item in " is the SHA.
91+
try:
92+
return cmng_result_str.split('"')[1]
93+
except (ValueError, IndexError):
94+
logger.error(f'Could not parse credential hash: {cmng_result_str}')
95+
return None
96+
97+
def set_shell_mode(self, shell):
98+
self.shell = shell
99+
100+
def detect_shell_mode(self):
101+
"""Detect if the device is in shell mode or not."""
102+
for _ in range(3):
103+
self.write_raw("at AT+CGSN")
104+
result, output = self.comms.expect_response("OK", "ERROR", "")
105+
if result and len(re.findall("[0-9]{15}", output)) > 0:
106+
self.set_shell_mode(True)
107+
return
108+
self.set_shell_mode(False)
109+
110+
def at_command(self, at_command, wait_for_result=False):
111+
"""Write an AT command to the command interface. Optionally wait for OK"""
112+
113+
# AT commands are written directly as-is with the ATCommandInterface:
114+
at_cmd_prefix = 'at ' if self.shell else ''
115+
self.write_raw(f'{at_cmd_prefix}{at_command}')
116+
117+
if wait_for_result:
118+
result, _ = self.comms.expect_response("OK", "ERROR")
119+
return result
120+
else:
121+
return True
122+
123+
def write_credential(self, sectag, cred_type, cred_text):
124+
result = self.at_command(f'AT%CMNG=0,{sectag},{cred_type},"{cred_text}"',
125+
wait_for_result=True)
126+
time.sleep(1)
127+
return result
128+
129+
def delete_credential(self, sectag, cred_type):
130+
# No output is expected beyond OK/ERROR in this case
131+
return self.at_command(f'AT%CMNG=3,{sectag},{cred_type}', wait_for_result=True)
132+
133+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
134+
self.at_command(f'AT%CMNG=1,{sectag},{cred_type}')
135+
retval, res = self.comms.expect_response("OK", "ERROR", "%CMNG")
136+
if retval and res:
137+
if not get_hash:
138+
return True, None
139+
else:
140+
return True, self._parse_sha(res)
141+
142+
return False, None
143+
144+
def calculate_expected_hash(self, cred_text):
145+
# AT Command host returns hex of SHA256 hash of credential plaintext
146+
return hashlib.sha256(cred_text.encode('utf-8')).hexdigest().upper()
147+
148+
def go_offline(self):
149+
return self.at_command('AT+CFUN=4', wait_for_result=True)
150+
151+
def get_imei(self):
152+
self.at_command('AT+CGSN')
153+
retval, output = self.comms.expect_response("OK", "ERROR", "")
154+
if not retval:
155+
return None
156+
return output[:IMEI_LEN]
157+
158+
def get_model_id(self):
159+
self.at_command('AT+CGMM')
160+
retval, output = self.comms.expect_response("OK", "ERROR", "")
161+
if not retval:
162+
return None
163+
return output
164+
165+
def get_mfw_version(self):
166+
self.at_command('AT+CGMR')
167+
retval, output = self.comms.expect_response("OK", "ERROR", "")
168+
if not retval:
169+
return None
170+
return output
171+
172+
def get_attestation_token(self):
173+
self.at_command('AT%ATTESTTOKEN')
174+
retval, output = self.comms.expect_response("OK", "ERROR", "%ATTESTTOKEN:")
175+
if not retval:
176+
return None
177+
attest_tok = output.split('"')[1]
178+
return attest_tok
179+
180+
TLS_CRED_TYPES = ["CA", "SERV", "PK"]
181+
# This chunk size can be any multiple of 4, as long as it is small enough to fit within the
182+
# Zephyr shell buffer.
183+
TLS_CRED_CHUNK_SIZE = 48
184+
185+
class TLSCredShellInterface(CredentialCommandInterface):
186+
def write_credential(self, sectag, cred_type, cred_text):
187+
# Because the Zephyr shell does not support multi-line commands,
188+
# we must base-64 encode our PEM strings and install them as if they were binary.
189+
# Yes, this does mean we are base-64 encoding a string which is already mostly base-64.
190+
# We could alternatively strip the ===== BEGIN/END XXXX ===== header/footer, and then pass
191+
# everything else directly as a binary payload (using BIN mode instead of BINT, since
192+
# MBedTLS uses the NULL terminator to determine if the credential is raw DER, or is a
193+
# PEM string). But this will fail for multi-CA installs, such as CoAP.
194+
195+
# text -> bytes -> base64 bytes -> base64 text
196+
encoded = base64.b64encode(cred_text.encode()).decode()
197+
198+
# Clear credential buffer -- If it is already clear, there may not be text feedback
199+
self.write_raw("cred buf clear")
200+
201+
# Write the encoded credential in chunks
202+
chunks = math.ceil(len(encoded)/TLS_CRED_CHUNK_SIZE)
203+
for c in range(chunks):
204+
chunk = encoded[c*TLS_CRED_CHUNK_SIZE:(c+1)*TLS_CRED_CHUNK_SIZE]
205+
self.write_raw(f"cred buf {chunk}")
206+
self.comms.expect_response("Stored")
207+
208+
# Store the buffered credential
209+
self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint")
210+
result, _ = self.comms.expect_response("Added TLS credential")
211+
time.sleep(1)
212+
return result
213+
214+
def delete_credential(self, sectag, cred_type):
215+
self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}')
216+
result, _ = self.comms.expect_response("Deleted TLS credential", "There is no TLS credential")
217+
time.sleep(2)
218+
return result
219+
220+
def check_credential_exists(self, sectag, cred_type, get_hash=True):
221+
self.write_raw(f'cred list {sectag} {TLS_CRED_TYPES[cred_type]}')
222+
223+
# This will capture the list dump for the credential if it exists.
224+
result, output = self.comms.expect_response("1 credentials found.",
225+
"0 credentials found.",
226+
f"{sectag},{TLS_CRED_TYPES[cred_type]}")
227+
228+
if not output:
229+
return False, None
230+
231+
if not get_hash:
232+
return True, None
233+
234+
# Output is a comma separated list of positional items
235+
data = output.split(",")
236+
hash = data[2].strip()
237+
status_code = data[3].strip()
238+
239+
if (status_code != "0"):
240+
logger.error(f"Error retrieving credential hash: {output.strip()}.")
241+
logger.error("Device might not support credential digests.")
242+
return True, None
243+
244+
return True, hash
245+
246+
def calculate_expected_hash(self, cred_text):
247+
# TLS Credentials shell returns base-64 of SHA256 hash of full credential, including NULL
248+
# termination.
249+
hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00')
250+
return base64.b64encode(hash.digest()).decode()
251+
252+
def get_csr(self, sectag = 0, cn = ""):
253+
raise RuntimeError("The TLS Credentials Shell does not support CSR generation")
254+
255+
def go_offline(self):
256+
# TLS credentials shell has no concept of online/offline. Just no-op.
257+
pass
258+
259+
def get_imei(self):
260+
raise RuntimeError("The TLS Credentials Shell does not support IMEI extraction")
261+
262+
def get_mfw_version(self):
263+
raise RuntimeError("The TLS Credentials Shell does not support MFW version extraction")

0 commit comments

Comments
 (0)