Skip to content

Commit 52cfe33

Browse files
committed
integration of comms and command_interface
Signed-off-by: Maximilian Deubel <[email protected]>
1 parent 4052729 commit 52cfe33

File tree

6 files changed

+187
-403
lines changed

6 files changed

+187
-403
lines changed

src/nrfcredstore/at_client.py

Lines changed: 0 additions & 98 deletions
This file was deleted.

src/nrfcredstore/cli.py

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import argparse
22
import sys
33
import serial
4+
import logging
45

56
from nrfcredstore.exceptions import ATCommandError, NoATClientException
6-
from nrfcredstore.at_client import ATClient
7+
from nrfcredstore.command_interface import ATCommandInterface
78
from nrfcredstore.credstore import CredStore, CredType
9+
from nrfcredstore.comms import Comms
810

911
FUN_MODE_OFFLINE = 4
1012
KEY_TYPES_OR_ANY = list(map(lambda type: type.name, CredType))
@@ -19,7 +21,7 @@
1921

2022
def parse_args(in_args):
2123
parser = argparse.ArgumentParser(description='Manage certificates stored in a cellular modem.')
22-
parser.add_argument('dev', help='Serial device used to communicate with the modem.')
24+
parser.add_argument('dev', help='Device used to communicate with the modem.')
2325
parser.add_argument('--baudrate', type=int, default=115200, help='Serial baudrate')
2426
parser.add_argument('--timeout', type=int, default=3,
2527
help='Serial communication timeout in seconds')
@@ -55,6 +57,10 @@ def parse_args(in_args):
5557

5658
deleteall_parser = subparsers.add_parser('deleteall', help='Delete all keys in a secure tag')
5759

60+
imei_parser = subparsers.add_parser('imei', help='Get IMEI of the modem')
61+
62+
attoken_parser = subparsers.add_parser('attoken', help='Get attestation token of the modem')
63+
5864
# add generate command and args
5965
generate_parser = subparsers.add_parser('generate', help='Generate private key')
6066
generate_parser.add_argument('tag', type=int,
@@ -68,7 +74,8 @@ def parse_args(in_args):
6874

6975
def exec_cmd(args, credstore):
7076
if args.subcommand:
71-
credstore.func_mode(FUN_MODE_OFFLINE)
77+
if not credstore.func_mode(FUN_MODE_OFFLINE):
78+
raise RuntimeError("Failed to set modem to offline mode.")
7279

7380
if args.subcommand == 'list':
7481
ct = CredType[args.type]
@@ -104,30 +111,43 @@ def exec_cmd(args, credstore):
104111
credstore.keygen(args.tag, args.file, args.attributes)
105112
print(f'New private key generated in secure tag {args.tag}')
106113
print(f'Wrote CSR in DER format to {args.file.name}')
114+
elif args.subcommand=='imei':
115+
imei = credstore.command_interface.get_imei()
116+
if imei is None:
117+
raise RuntimeError("Failed to get IMEI.")
118+
print(f'IMEI: {imei}')
119+
elif args.subcommand=='attoken':
120+
attoken = credstore.command_interface.get_attestation_token()
121+
if attoken is None:
122+
raise RuntimeError("Failed to get attestation token.")
123+
print(f'Attestation token: {attoken}')
107124

108125
def exit_with_msg(exitcode, msg):
109126
print(msg)
110127
exit(exitcode)
111128

112-
def main(in_args, credstore):
113-
at_client = credstore.at_client
114-
try:
115-
args = parse_args(in_args)
116-
if args.dev:
117-
at_client.connect(args.dev, args.baudrate, args.timeout)
118-
at_client.verify()
119-
at_client.enable_error_codes()
120-
exec_cmd(args, credstore)
121-
except NoATClientException:
122-
exit_with_msg(ERR_NO_AT_CLIENT, 'The device does not respond to AT commands. Please flash at_client sample.')
123-
except ATCommandError as err:
124-
exit_with_msg(ERR_AT_COMMAND, err)
125-
except TimeoutError as err:
126-
exit_with_msg(ERR_TIMEOUT, 'The device did not respond in time. Please try again.')
127-
except serial.SerialException as err:
128-
exit_with_msg(ERR_SERIAL, f'Serial error: {err}')
129-
except Exception as err:
130-
exit_with_msg(ERR_UNKNOWN, f'Unhandled Error: {err}')
131-
132-
def run():
133-
main(sys.argv[1:], CredStore(ATClient(serial.Serial())))
129+
def main(args, credstore):
130+
credstore.command_interface.detect_shell_mode()
131+
credstore.command_interface.enable_error_codes()
132+
exec_cmd(args, credstore)
133+
134+
def run(argv=sys.argv):
135+
logging.basicConfig(level='DEBUG')
136+
args = parse_args(argv[1:])
137+
comms = None
138+
139+
# use inquirer to find the device
140+
if args.dev == 'auto':
141+
comms = Comms(list_all=True, baudrate=args.baudrate, timeout=args.timeout)
142+
elif args.dev == 'rtt':
143+
comms = Comms(rtt=True, baudrate=args.baudrate, timeout=args.timeout)
144+
# if dev is just numbers, assume it's an rtt device
145+
elif args.dev.isdigit():
146+
comms = Comms(rtt = True, serial_number = int(args.dev), timeout=args.timeout)
147+
# otherwise, assume it's a serial device
148+
else:
149+
comms = Comms(port=args.dev, baudrate=args.baudrate, timeout=args.timeout)
150+
151+
cred_if = ATCommandInterface(comms)
152+
153+
main(args, CredStore(cred_if))

src/nrfcredstore/credstore.py

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,16 @@ def __init__(self, tag: int, type: int, sha: str):
2727
self.sha = sha
2828

2929
class CredStore:
30-
def __init__(self, at_client):
31-
self.at_client = at_client
30+
def __init__(self, command_interface):
31+
self.command_interface = command_interface
3232

3333
def func_mode(self, mode):
3434
"""Set modem functioning mode
3535
3636
See AT Command Reference Guide for valid modes.
3737
"""
3838

39-
return self.at_client.at_command(f'AT+CFUN={mode}') == []
39+
return self.command_interface.at_command(f'AT+CFUN={mode}', wait_for_result=True)
4040

4141
def list(self, tag = None, type: CredType = CredType.ANY) -> List[Credential]:
4242
"""List stored credentials
@@ -57,8 +57,12 @@ def list(self, tag = None, type: CredType = CredType.ANY) -> List[Credential]:
5757
if type != CredType.ANY:
5858
cmd = f'{cmd},{CredType(type).value}'
5959

60-
response_lines = self.at_client.at_command(cmd)
61-
response_lines = [line for line in response_lines if line.strip()]
60+
self.command_interface.at_command(cmd, wait_for_result=False)
61+
result, response = self.command_interface.comms.expect_response("OK", "ERROR", "%CMNG: ")
62+
if not result:
63+
raise RuntimeError("Failed to list credentials")
64+
response_lines = response.splitlines()
65+
response_lines = [line.strip() for line in response_lines if line.strip()]
6266

6367
columns = map(lambda line: line.replace('%CMNG: ', '').replace('"', '').split(','), response_lines)
6468
cred_map = map(lambda columns:
@@ -77,7 +81,8 @@ def write(self, tag: int, type: CredType, file: io.TextIOBase):
7781
if type == CredType.ANY:
7882
raise ValueError
7983
cert = file.read().rstrip()
80-
return self.at_client.at_command(f'AT%CMNG=0,{tag},{type.value},"{cert}"')
84+
if not self.command_interface.at_command(f'AT%CMNG=0,{tag},{type.value},"{cert}"', wait_for_result=True):
85+
raise RuntimeError("Failed to write credential")
8186

8287
def delete(self, tag: int, type: CredType):
8388
"""Delete a credential from the modem
@@ -87,22 +92,19 @@ def delete(self, tag: int, type: CredType):
8792

8893
if type == CredType.ANY:
8994
raise ValueError
90-
return self.at_client.at_command(f'AT%CMNG=3,{tag},{type.value}')
95+
if not self.command_interface.at_command(f'AT%CMNG=3,{tag},{type.value}', wait_for_result=True):
96+
raise RuntimeError("Failed to delete credential")
9197

9298
def keygen(self, tag: int, file: io.BufferedIOBase, attributes: str = ''):
9399
"""Generate a new private key and return a certificate signing request in DER format"""
94-
cmd = f'AT%KEYGEN={tag},2,0'
95100

96-
if attributes != '':
97-
cmd = f'{cmd},"{attributes}"'
101+
keygen_output = self.command_interface.get_csr(sectag=tag, attributes=attributes)
98102

99-
response_lines = self.at_client.at_command(cmd)
100-
for l in response_lines:
101-
if not l.startswith('%KEYGEN'):
102-
continue
103-
keygen_output = l.replace('%KEYGEN: "', '')
104-
csr_der_b64 = keygen_output.split('.')[0]
105-
csr_der_bytes = base64.urlsafe_b64decode(csr_der_b64 + '===')
106-
file.write(csr_der_bytes)
103+
if not keygen_output:
104+
raise RuntimeError("Failed to generate key")
107105

106+
csr_der_b64 = keygen_output.split('.')[0]
107+
csr_der_bytes = base64.urlsafe_b64decode(csr_der_b64 + '===')
108+
109+
file.write(csr_der_bytes)
108110
file.close()

0 commit comments

Comments
 (0)