diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..988cb95 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,92 @@ +# EditorConfig: https://editorconfig.org/ + +# top-most EditorConfig file +root = true + +# All (Defaults) +[*] +charset = utf-8 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true +max_line_length = 100 + +# Assembly +[*.S] +indent_style = tab +indent_size = 8 + +# C +[*.{c,h}] +indent_style = tab +indent_size = 8 + +# C++ +[*.{cpp,hpp}] +indent_style = tab +indent_size = 8 + +# Linker Script +[*.ld] +indent_style = tab +indent_size = 8 + +# Python +[*.py] +indent_style = space +indent_size = 4 + +# Perl +[*.pl] +indent_style = tab +indent_size = 8 + +# reStructuredText +[*.rst] +indent_style = space +indent_size = 3 + +# YAML +[*.{yml,yaml}] +indent_style = space +indent_size = 2 + +# Shell Script +[*.sh] +indent_style = space +indent_size = 4 + +# Windows Command Script +[*.cmd] +end_of_line = crlf +indent_style = tab +indent_size = 8 + +# Valgrind Suppression File +[*.supp] +indent_style = space +indent_size = 3 + +# CMake +[{CMakeLists.txt,*.cmake}] +indent_style = space +indent_size = 2 + +# Makefile +[Makefile] +indent_style = tab +indent_size = 8 + +# Device tree +[*.{dts,dtsi,overlay}] +indent_style = tab +indent_size = 8 + +# Git commit messages +[COMMIT_EDITMSG] +max_line_length = 75 + +# Kconfig +[Kconfig*] +indent_style = tab +indent_size = 8 diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index 231dc08..b911cd6 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - python-version: ['3.9', '3.10', '3.11', '3.12', '3.13'] + python-version: ['3.10', '3.11', '3.12', '3.13'] steps: - uses: actions/checkout@v4 diff --git a/README.md b/README.md index ecd655d..5b1b710 100644 --- a/README.md +++ b/README.md @@ -15,24 +15,32 @@ For the device to respond to AT commands, the firmware on the device must have a ## Command Line Interface ``` -usage: nrfcredstore [-h] [--baudrate BAUDRATE] [--timeout TIMEOUT] dev {list,write,delete,generate} ... +usage: nrfcredstore [-h] [--baudrate BAUDRATE] [--timeout TIMEOUT] [--debug] [--cmd-type {at,shell,auto}] + dev {list,write,delete,deleteall,imei,attoken,generate} ... Manage certificates stored in a cellular modem. positional arguments: - dev Serial device used to communicate with the modem. + dev Device used to communicate with the modem. For interactive selection of serial port, use "auto". For RTT, use "rtt". If given a SEGGER + serial number, it is assumed to be an RTT device. options: -h, --help show this help message and exit --baudrate BAUDRATE Serial baudrate --timeout TIMEOUT Serial communication timeout in seconds + --debug Enable debug logging + --cmd-type {at,shell,auto} + Command type to use. "at" for AT commands, "shell" for shell commands, "auto" to detect automatically. subcommands: - {list,write,delete,generate} + {list,write,delete,deleteall,imei,attoken,generate} Certificate related commands list List all keys stored in the modem write Write key/cert to a secure tag delete Delete value from a secure tag + deleteall Delete all keys in a secure tag + imei Get IMEI from the modem + attoken Get attestation token of the modem generate Generate private key ``` @@ -56,7 +64,7 @@ Secure tag Key type SHA ### write subcommand -Write key/cert to a secure tag. KEY_TYPE must be either ROOT_CA_CERT, CLIENT_CERT, CLIENT_KEY, or PSK. +Write key/cert to a security tag. KEY_TYPE must be either ROOT_CA_CERT, CLIENT_CERT, CLIENT_KEY, or PSK. ``` usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev write SECURE_TAG KEY_TYPE FILENAME @@ -68,7 +76,7 @@ usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev write SECURE_T ### delete subcommand -Delete value from a secure tag. +Delete value from a security tag. ``` usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev delete SECURE_TAG KEY_TYPE @@ -78,6 +86,31 @@ usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev delete SECURE_ $ nrfcredstore /dev/tty.usbmodem0009600000001 delete 123 ROOT_CA_CERT +### deletall subcommand + +Delete all writable security tags. + +``` +usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev deleteall +``` + +### imei subcommand + +Read IMEI from modem. + +``` +usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev imei +``` + +### attoken subcommand + +Read Attestation Token from modem. + +``` +usage: nrfcredstore [--baudrate BAUDRATE] [--timeout TIMEOUT] dev attoken +``` + + ### generate subcommand > [!IMPORTANT] @@ -114,4 +147,4 @@ Running the tests depends on a [development installation](#development-installat Check coverage - poetry run pytest --cov=. tests \ No newline at end of file + poetry run pytest --cov=src tests --cov-report=html diff --git a/poetry.lock b/poetry.lock index 1a990b9..8c5c1f5 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,24 +1,71 @@ -# This file is automatically @generated by Poetry 1.4.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. + +[[package]] +name = "ansicon" +version = "1.89.0" +description = "Python wrapper for loading Jason Hood's ANSICON" +optional = false +python-versions = "*" +groups = ["main"] +markers = "platform_system == \"Windows\"" +files = [ + {file = "ansicon-1.89.0-py2.py3-none-any.whl", hash = "sha256:f1def52d17f65c2c9682cf8370c03f541f410c1752d6a14029f97318e4b9dfec"}, + {file = "ansicon-1.89.0.tar.gz", hash = "sha256:e4d039def5768a47e4afec8e89e83ec3ae5a26bf00ad851f914d1240b444d2b1"}, +] + +[[package]] +name = "blessed" +version = "1.21.0" +description = "Easy, practical library for making terminal apps, by providing an elegant, well-documented interface to Colors, Keyboard input, and screen Positioning capabilities." +optional = false +python-versions = ">=2.7" +groups = ["main"] +files = [ + {file = "blessed-1.21.0-py2.py3-none-any.whl", hash = "sha256:f831e847396f5a2eac6c106f4dfadedf46c4f804733574b15fe86d2ed45a9588"}, + {file = "blessed-1.21.0.tar.gz", hash = "sha256:ece8bbc4758ab9176452f4e3a719d70088eb5739798cd5582c9e05f2a28337ec"}, +] + +[package.dependencies] +jinxed = {version = ">=1.1.0", markers = "platform_system == \"Windows\""} +wcwidth = ">=0.1.4" [[package]] name = "colorama" version = "0.4.6" description = "Cross-platform colored terminal text." -category = "dev" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" +groups = ["dev"] files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] +[[package]] +name = "coloredlogs" +version = "15.0.1" +description = "Colored terminal output for Python's logging module" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +groups = ["main"] +files = [ + {file = "coloredlogs-15.0.1-py2.py3-none-any.whl", hash = "sha256:612ee75c546f53e92e70049c9dbfcc18c935a2b9a53b66085ce9ef6a6e5c0934"}, + {file = "coloredlogs-15.0.1.tar.gz", hash = "sha256:7c991aa71a4577af2f82600d8f8f3a89f936baeaf9b50a9c197da014e5bf16b0"}, +] + +[package.dependencies] +humanfriendly = ">=9.1" + +[package.extras] +cron = ["capturer (>=2.4)"] + [[package]] name = "coverage" version = "7.2.7" description = "Code coverage measurement for Python" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "coverage-7.2.7-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d39b5b4f2a66ccae8b7263ac3c8170994b65266797fb96cbbfd3fb5b23921db8"}, {file = "coverage-7.2.7-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6d040ef7c9859bb11dfeb056ff5b3872436e3b5e401817d87a31e1750b9ae2fb"}, @@ -86,26 +133,43 @@ files = [ tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.11.0a6\" and extra == \"toml\""} [package.extras] -toml = ["tomli"] +toml = ["tomli ; python_full_version <= \"3.11.0a6\""] [[package]] name = "docopt" version = "0.6.2" description = "Pythonic argument parser, that will make you smile" -category = "dev" optional = false python-versions = "*" +groups = ["dev"] files = [ {file = "docopt-0.6.2.tar.gz", hash = "sha256:49b3a825280bd66b3aa83585ef59c4a8c82f2c8a522dbe754a8bc8d08c85c491"}, ] +[[package]] +name = "editor" +version = "1.6.6" +description = "🖋 Open the default text editor 🖋" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "editor-1.6.6-py3-none-any.whl", hash = "sha256:e818e6913f26c2a81eadef503a2741d7cca7f235d20e217274a009ecd5a74abf"}, + {file = "editor-1.6.6.tar.gz", hash = "sha256:bb6989e872638cd119db9a4fce284cd8e13c553886a1c044c6b8d8a160c871f8"}, +] + +[package.dependencies] +runs = "*" +xmod = "*" + [[package]] name = "exceptiongroup" version = "1.1.3" description = "Backport of PEP 654 (exception groups)" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] +markers = "python_version < \"3.11\"" files = [ {file = "exceptiongroup-1.1.3-py3-none-any.whl", hash = "sha256:343280667a4585d195ca1cf9cef84a4e178c4b6cf2274caef9859782b567d5e3"}, {file = "exceptiongroup-1.1.3.tar.gz", hash = "sha256:097acd85d473d75af5bb98e41b61ff7fe35efe6675e4f9370ec6ec5126d160e9"}, @@ -115,45 +179,84 @@ files = [ test = ["pytest (>=6)"] [[package]] -name = "importlib-metadata" -version = "6.7.0" -description = "Read metadata from Python packages" -category = "dev" +name = "future" +version = "1.0.0" +description = "Clean single-source support for Python 3 and 2" optional = false -python-versions = ">=3.7" +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*" +groups = ["main"] files = [ - {file = "importlib_metadata-6.7.0-py3-none-any.whl", hash = "sha256:cb52082e659e97afc5dac71e79de97d8681de3aa07ff18578330904a9d18e5b5"}, - {file = "importlib_metadata-6.7.0.tar.gz", hash = "sha256:1aaf550d4f73e5d6783e7acb77aec43d49da8017410afae93822cc9cca98c4d4"}, + {file = "future-1.0.0-py3-none-any.whl", hash = "sha256:929292d34f5872e70396626ef385ec22355a1fae8ad29e1a734c3e43f9fbc216"}, + {file = "future-1.0.0.tar.gz", hash = "sha256:bd2968309307861edae1458a4f8a4f3598c03be43b97521076aebf5d94c07b05"}, ] -[package.dependencies] -typing-extensions = {version = ">=3.6.4", markers = "python_version < \"3.8\""} -zipp = ">=0.5" +[[package]] +name = "humanfriendly" +version = "10.0" +description = "Human friendly output for text interfaces using Python" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +groups = ["main"] +files = [ + {file = "humanfriendly-10.0-py2.py3-none-any.whl", hash = "sha256:1697e1a8a8f550fd43c2865cd84542fc175a61dcb779b6fee18cf6b6ccba1477"}, + {file = "humanfriendly-10.0.tar.gz", hash = "sha256:6b0b831ce8f15f7300721aa49829fc4e83921a9a301cc7f606be6686a2288ddc"}, +] -[package.extras] -docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -perf = ["ipython"] -testing = ["flufl.flake8", "importlib-resources (>=1.3)", "packaging", "pyfakefs", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-mypy (>=0.9.1)", "pytest-perf (>=0.9.2)", "pytest-ruff"] +[package.dependencies] +pyreadline3 = {version = "*", markers = "sys_platform == \"win32\" and python_version >= \"3.8\""} [[package]] name = "iniconfig" version = "2.0.0" description = "brain-dead simple config-ini parsing" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "iniconfig-2.0.0-py3-none-any.whl", hash = "sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"}, {file = "iniconfig-2.0.0.tar.gz", hash = "sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"}, ] +[[package]] +name = "inquirer" +version = "3.4.0" +description = "Collection of common interactive command line user interfaces, based on Inquirer.js" +optional = false +python-versions = ">=3.8.1" +groups = ["main"] +files = [ + {file = "inquirer-3.4.0-py3-none-any.whl", hash = "sha256:bb0ec93c833e4ce7b51b98b1644b0a4d2bb39755c39787f6a504e4fee7a11b60"}, + {file = "inquirer-3.4.0.tar.gz", hash = "sha256:8edc99c076386ee2d2204e5e3653c2488244e82cb197b2d498b3c1b5ffb25d0b"}, +] + +[package.dependencies] +blessed = ">=1.19.0" +editor = ">=1.6.0" +readchar = ">=4.2.0" + +[[package]] +name = "jinxed" +version = "1.3.0" +description = "Jinxed Terminal Library" +optional = false +python-versions = "*" +groups = ["main"] +markers = "platform_system == \"Windows\"" +files = [ + {file = "jinxed-1.3.0-py2.py3-none-any.whl", hash = "sha256:b993189f39dc2d7504d802152671535b06d380b26d78070559551cbf92df4fc5"}, + {file = "jinxed-1.3.0.tar.gz", hash = "sha256:1593124b18a41b7a3da3b078471442e51dbad3d77b4d4f2b0c26ab6f7d660dbf"}, +] + +[package.dependencies] +ansicon = {version = "*", markers = "platform_system == \"Windows\""} + [[package]] name = "packaging" version = "23.1" description = "Core utilities for Python packages" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "packaging-23.1-py3-none-any.whl", hash = "sha256:994793af429502c4ea2ebf6bf664629d07c1a9fe974af92966e4b8d2df7edc61"}, {file = "packaging-23.1.tar.gz", hash = "sha256:a392980d2b6cffa644431898be54b0045151319d1e7ec34f0cfed48767dd334f"}, @@ -163,28 +266,56 @@ files = [ name = "pluggy" version = "1.2.0" description = "plugin and hook calling mechanisms for python" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "pluggy-1.2.0-py3-none-any.whl", hash = "sha256:c2fd55a7d7a3863cba1a013e4e2414658b1d07b6bc57b3919e0c63c9abb99849"}, {file = "pluggy-1.2.0.tar.gz", hash = "sha256:d12f0c4b579b15f5e054301bb226ee85eeeba08ffec228092f8defbaa3a4c4b3"}, ] -[package.dependencies] -importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} - [package.extras] dev = ["pre-commit", "tox"] testing = ["pytest", "pytest-benchmark"] +[[package]] +name = "pynrfjprog" +version = "10.24.2" +description = "A simple Python interface for the nrfjprog functionality" +optional = false +python-versions = ">=3.7" +groups = ["main"] +files = [ + {file = "pynrfjprog-10.24.2.tar.gz", hash = "sha256:0ae6d69d1a1620208422b0eb750d565c6ed7d6ed9b484000c2af0b54c9dac60e"}, +] + +[package.dependencies] +future = "*" +tomli-w = "*" + +[[package]] +name = "pyreadline3" +version = "3.5.4" +description = "A python implementation of GNU readline." +optional = false +python-versions = ">=3.8" +groups = ["main"] +markers = "sys_platform == \"win32\"" +files = [ + {file = "pyreadline3-3.5.4-py3-none-any.whl", hash = "sha256:eaf8e6cc3c49bcccf145fc6067ba8643d1df34d604a1ec0eccbf7a18e6d3fae6"}, + {file = "pyreadline3-3.5.4.tar.gz", hash = "sha256:8d57d53039a1c75adba8e50dd3d992b28143480816187ea5efbd5c78e6c885b7"}, +] + +[package.extras] +dev = ["build", "flake8", "mypy", "pytest", "twine"] + [[package]] name = "pyserial" version = "3.5" description = "Python Serial Port Extension" -category = "main" optional = false python-versions = "*" +groups = ["main"] files = [ {file = "pyserial-3.5-py2.py3-none-any.whl", hash = "sha256:c4451db6ba391ca6ca299fb3ec7bae67a5c55dde170964c7a14ceefec02f2cf0"}, {file = "pyserial-3.5.tar.gz", hash = "sha256:3c77e014170dfffbd816e6ffc205e9842efb10be9f58ec16d3e8675b4925cddb"}, @@ -197,9 +328,9 @@ cp2110 = ["hidapi"] name = "pytest" version = "7.4.0" description = "pytest: simple powerful testing with Python" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "pytest-7.4.0-py3-none-any.whl", hash = "sha256:78bf16451a2eb8c7a2ea98e32dc119fd2aa758f1d5d66dbf0a59d69a3969df32"}, {file = "pytest-7.4.0.tar.gz", hash = "sha256:b4bf8c45bd59934ed84001ad51e11b4ee40d40a1229d2c79f9c592b0a3f6bd8a"}, @@ -208,7 +339,6 @@ files = [ [package.dependencies] colorama = {version = "*", markers = "sys_platform == \"win32\""} exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""} -importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} iniconfig = "*" packaging = "*" pluggy = ">=0.12,<2.0" @@ -221,9 +351,9 @@ testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "no name = "pytest-cov" version = "4.1.0" description = "Pytest plugin for measuring coverage." -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "pytest-cov-4.1.0.tar.gz", hash = "sha256:3904b13dfbfec47f003b8e77fd5b589cd11904a21ddf1ab38a64f204d6a10ef6"}, {file = "pytest_cov-4.1.0-py3-none-any.whl", hash = "sha256:6ba70b9e97e69fcc3fb45bfeab2d0a138fb65c4d0d6a41ef33983ad114be8c3a"}, @@ -240,9 +370,9 @@ testing = ["fields", "hunter", "process-tests", "pytest-xdist", "six", "virtuale name = "pytest-watch" version = "4.2.0" description = "Local continuous test runner with pytest and watchdog." -category = "dev" optional = false python-versions = "*" +groups = ["dev"] files = [ {file = "pytest-watch-4.2.0.tar.gz", hash = "sha256:06136f03d5b361718b8d0d234042f7b2f203910d8568f63df2f866b547b3d4b9"}, ] @@ -253,37 +383,65 @@ docopt = ">=0.4.0" pytest = ">=2.6.4" watchdog = ">=0.6.0" +[[package]] +name = "readchar" +version = "4.2.1" +description = "Library to easily read single chars and key strokes" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "readchar-4.2.1-py3-none-any.whl", hash = "sha256:a769305cd3994bb5fa2764aa4073452dc105a4ec39068ffe6efd3c20c60acc77"}, + {file = "readchar-4.2.1.tar.gz", hash = "sha256:91ce3faf07688de14d800592951e5575e9c7a3213738ed01d394dcc949b79adb"}, +] + +[[package]] +name = "runs" +version = "1.2.2" +description = "🏃 Run a block of text as a subprocess 🏃" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "runs-1.2.2-py3-none-any.whl", hash = "sha256:0980dcbc25aba1505f307ac4f0e9e92cbd0be2a15a1e983ee86c24c87b839dfd"}, + {file = "runs-1.2.2.tar.gz", hash = "sha256:9dc1815e2895cfb3a48317b173b9f1eac9ba5549b36a847b5cc60c3bf82ecef1"}, +] + +[package.dependencies] +xmod = "*" + [[package]] name = "tomli" version = "2.0.1" description = "A lil' TOML parser" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] +markers = "python_full_version <= \"3.11.0a6\"" files = [ {file = "tomli-2.0.1-py3-none-any.whl", hash = "sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"}, {file = "tomli-2.0.1.tar.gz", hash = "sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"}, ] [[package]] -name = "typing-extensions" -version = "4.7.1" -description = "Backported and Experimental Type Hints for Python 3.7+" -category = "dev" +name = "tomli-w" +version = "1.2.0" +description = "A lil' TOML writer" optional = false -python-versions = ">=3.7" +python-versions = ">=3.9" +groups = ["main"] files = [ - {file = "typing_extensions-4.7.1-py3-none-any.whl", hash = "sha256:440d5dd3af93b060174bf433bccd69b0babc3b15b1a8dca43789fd7f61514b36"}, - {file = "typing_extensions-4.7.1.tar.gz", hash = "sha256:b75ddc264f0ba5615db7ba217daeb99701ad295353c45f9e95963337ceeeffb2"}, + {file = "tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90"}, + {file = "tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021"}, ] [[package]] name = "watchdog" version = "3.0.0" description = "Filesystem events monitoring" -category = "dev" optional = false python-versions = ">=3.7" +groups = ["dev"] files = [ {file = "watchdog-3.0.0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:336adfc6f5cc4e037d52db31194f7581ff744b67382eb6021c868322e32eef41"}, {file = "watchdog-3.0.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:a70a8dcde91be523c35b2bf96196edc5730edb347e374c7de7cd20c43ed95397"}, @@ -318,22 +476,30 @@ files = [ watchmedo = ["PyYAML (>=3.10)"] [[package]] -name = "zipp" -version = "3.15.0" -description = "Backport of pathlib-compatible object wrapper for zip files" -category = "dev" +name = "wcwidth" +version = "0.2.13" +description = "Measures the displayed width of unicode strings in a terminal" optional = false -python-versions = ">=3.7" +python-versions = "*" +groups = ["main"] files = [ - {file = "zipp-3.15.0-py3-none-any.whl", hash = "sha256:48904fc76a60e542af151aded95726c1a5c34ed43ab4134b597665c86d7ad556"}, - {file = "zipp-3.15.0.tar.gz", hash = "sha256:112929ad649da941c23de50f356a2b5570c954b65150642bccdd66bf194d224b"}, + {file = "wcwidth-0.2.13-py2.py3-none-any.whl", hash = "sha256:3da69048e4540d84af32131829ff948f1e022c1c6bdb8d6102117aac784f6859"}, + {file = "wcwidth-0.2.13.tar.gz", hash = "sha256:72ea0c06399eb286d978fdedb6923a9eb47e1c486ce63e9b4e64fc18303972b5"}, ] -[package.extras] -docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] -testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)"] +[[package]] +name = "xmod" +version = "1.8.1" +description = "🌱 Turn any object into a module 🌱" +optional = false +python-versions = ">=3.8" +groups = ["main"] +files = [ + {file = "xmod-1.8.1-py3-none-any.whl", hash = "sha256:a24e9458a4853489042522bdca9e50ee2eac5ab75c809a91150a8a7f40670d48"}, + {file = "xmod-1.8.1.tar.gz", hash = "sha256:38c76486b9d672c546d57d8035df0beb7f4a9b088bc3fb2de5431ae821444377"}, +] [metadata] -lock-version = "2.0" -python-versions = "^3.7" -content-hash = "ab0f273c27438d35df9f46eb1ae1e2694065003b69706cdc3961a9ecb07d3c94" +lock-version = "2.1" +python-versions = "^3.10" +content-hash = "4afd598f095df8f95cf80e762b53a88f58a16b7258a0e1a19d050c46165dc2a1" diff --git a/pyproject.toml b/pyproject.toml index 16fde1a..06b3232 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,8 +12,11 @@ readme = "README.md" repository = "https://github.com/NordicSemiconductor/nrfcredstore" [tool.poetry.dependencies] -python = "^3.7" +python = "^3.10" pyserial = "^3.5" +pynrfjprog = "^10.23.0" +inquirer = "^3.4.0" +coloredlogs = "^15.0.1" [tool.poetry.group.dev.dependencies] pytest = "^7.3.1" diff --git a/src/nrfcredstore/at_client.py b/src/nrfcredstore/at_client.py deleted file mode 100644 index e1e0e77..0000000 --- a/src/nrfcredstore/at_client.py +++ /dev/null @@ -1,98 +0,0 @@ -from nrfcredstore.exceptions import ATCommandError, NoATClientException - -ERR_CODE_TO_MSG = { - 0: 'AT command not supported by firmware version. Upgrade modem firmware?', - 513: 'Not found', - 514: 'Not allowed', - 515: 'Memory full', - 518: 'Not allowed in active state', - 519: 'Already exists', - 523: 'Key generation failed', -} - -class ATClient(): - def __init__(self, serial_dev): - if serial_dev is None: - raise RuntimeError('Serial device is None') - - self.device = serial_dev - - def __at_command_ok(self, cmd): - """Send AT command and return a bool whether the modem response was OK/ERROR.""" - try: - self.at_command(cmd) - except ATCommandError as err: - print(err) - return False - return True - - def __response_line(self): - """Read a single line from device and return a decoded and trimmed string.""" - line = self.device.readline() - if line == b'': - raise TimeoutError - return line.decode('utf-8').strip() - - def __read_response(self, cmd): - """Read full response for a command until it reaches OK or an error.""" - response = [] - while True: - line = self.__response_line() - - if line == 'OK': - break - elif line == 'ERROR': - raise ATCommandError(f'Error returned for AT command: {cmd}') - elif line.startswith('+CME ERROR'): - code = line.replace('+CME ERROR: ', '') - msg = ( - f'Error returned for AT command: {cmd}\n' - f'Error code: {code}\n' - f'Error message: {ERR_CODE_TO_MSG[int(code)]}' - ) - raise ATCommandError(msg) - else: - response.append(line) - - return response - - def connect(self, dev, baudrate = 115200, timeout = 3): - """Open a serial connection to the serial device""" - - self.device.port = dev - self.device.baudrate = baudrate - self.device.timeout = timeout - self.device.open() - - def verify(self): - """Check if modem responds to 'AT' - - Raises NoATClientException if unsuccessful after 3 attempts. - """ - retries = 3 - try: - while not self.__at_command_ok('AT'): - retries = retries - 1 - if retries == 0: - raise NoATClientException - except TimeoutError: - raise NoATClientException - return True - - def enable_error_codes(self): - """Change error responses to include error code""" - if not self.__at_command_ok('AT+CMEE=1'): - print('Failed to enable error notifications') - - def at_command(self, cmd): - """Send AT command to modem and return the response as a list of response lines. - - Raises ATCommandError if the modem responds with an error. - """ - - if not self.device.is_open: - raise ConnectionError('Serial device not connected') - - self.device.reset_input_buffer() - self.device.write((cmd + '\r\n').encode('utf-8')) - return self.__read_response(cmd) diff --git a/src/nrfcredstore/cli.py b/src/nrfcredstore/cli.py index 8524527..b3c56bd 100644 --- a/src/nrfcredstore/cli.py +++ b/src/nrfcredstore/cli.py @@ -1,10 +1,12 @@ import argparse import sys import serial +import logging from nrfcredstore.exceptions import ATCommandError, NoATClientException -from nrfcredstore.at_client import ATClient +from nrfcredstore.command_interface import ATCommandInterface from nrfcredstore.credstore import CredStore, CredType +from nrfcredstore.comms import Comms FUN_MODE_OFFLINE = 4 KEY_TYPES_OR_ANY = list(map(lambda type: type.name, CredType)) @@ -19,23 +21,27 @@ def parse_args(in_args): parser = argparse.ArgumentParser(description='Manage certificates stored in a cellular modem.') - parser.add_argument('dev', help='Serial device used to communicate with the modem.') + parser.add_argument('dev', help='Device used to communicate with the modem. For interactive selection of serial port, use "auto". For RTT, use "rtt". If given a SEGGER serial number, it is assumed to be an RTT device.') parser.add_argument('--baudrate', type=int, default=115200, help='Serial baudrate') parser.add_argument('--timeout', type=int, default=3, help='Serial communication timeout in seconds') + parser.add_argument('--debug', action='store_true', + help='Enable debug logging') + parser.add_argument('--cmd-type', choices=['at', 'shell', 'auto'], default='auto', + help='Command type to use. "at" for AT commands, "shell" for shell commands, "auto" to detect automatically.') subparsers = parser.add_subparsers( title='subcommands', dest='subcommand', help='Certificate related commands' ) - # add list command + # Add list command list_parser = subparsers.add_parser('list', help='List all keys stored in the modem') list_parser.add_argument('--tag', type=int, help='Only list keys in secure tag') list_parser.add_argument('--type', choices=KEY_TYPES_OR_ANY, default='ANY', help='Only list key with given type') - # add write command + # Add write command write_parser = subparsers.add_parser('write', help='Write key/cert to a secure tag') write_parser.add_argument('tag', type=int, help='Secure tag to write key to') @@ -46,7 +52,7 @@ def parse_args(in_args): type=argparse.FileType('r', encoding='UTF-8'), help='PEM file to read from') - # add delete command + # Add delete command delete_parser = subparsers.add_parser('delete', help='Delete value from a secure tag') delete_parser.add_argument('tag', type=int, help='Secure tag to delete key') @@ -55,7 +61,11 @@ def parse_args(in_args): deleteall_parser = subparsers.add_parser('deleteall', help='Delete all keys in a secure tag') - # add generate command and args + imei_parser = subparsers.add_parser('imei', help='Get IMEI from the modem') + + attoken_parser = subparsers.add_parser('attoken', help='Get attestation token of the modem') + + # Add generate command and args generate_parser = subparsers.add_parser('generate', help='Generate private key') generate_parser.add_argument('tag', type=int, help='Secure tag to store generated key') @@ -68,7 +78,8 @@ def parse_args(in_args): def exec_cmd(args, credstore): if args.subcommand: - credstore.func_mode(FUN_MODE_OFFLINE) + if not credstore.func_mode(FUN_MODE_OFFLINE): + raise RuntimeError("Failed to set modem to offline mode.") if args.subcommand == 'list': ct = CredType[args.type] @@ -104,30 +115,50 @@ def exec_cmd(args, credstore): credstore.keygen(args.tag, args.file, args.attributes) print(f'New private key generated in secure tag {args.tag}') print(f'Wrote CSR in DER format to {args.file.name}') + elif args.subcommand=='imei': + imei = credstore.command_interface.get_imei() + if imei is None: + raise RuntimeError("Failed to get IMEI.") + print(f'IMEI: {imei}') + elif args.subcommand=='attoken': + attoken = credstore.command_interface.get_attestation_token() + if attoken is None: + raise RuntimeError("Failed to get attestation token.") + print(f'Attestation token: {attoken}') def exit_with_msg(exitcode, msg): print(msg) exit(exitcode) -def main(in_args, credstore): - at_client = credstore.at_client - try: - args = parse_args(in_args) - if args.dev: - at_client.connect(args.dev, args.baudrate, args.timeout) - at_client.verify() - at_client.enable_error_codes() - exec_cmd(args, credstore) - except NoATClientException: - exit_with_msg(ERR_NO_AT_CLIENT, 'The device does not respond to AT commands. Please flash at_client sample.') - except ATCommandError as err: - exit_with_msg(ERR_AT_COMMAND, err) - except TimeoutError as err: - exit_with_msg(ERR_TIMEOUT, 'The device did not respond in time. Please try again.') - except serial.SerialException as err: - exit_with_msg(ERR_SERIAL, f'Serial error: {err}') - except Exception as err: - exit_with_msg(ERR_UNKNOWN, f'Unhandled Error: {err}') - -def run(): - main(sys.argv[1:], CredStore(ATClient(serial.Serial()))) +def main(args, credstore): + if args.cmd_type == 'auto': + credstore.command_interface.detect_shell_mode() + elif args.cmd_type == 'shell': + credstore.command_interface.set_shell_mode(True) + credstore.command_interface.enable_error_codes() + exec_cmd(args, credstore) + +def run(argv=sys.argv): + args = parse_args(argv[1:]) + comms = None + + if args.debug: + logging.basicConfig(level='DEBUG') + else: + logging.basicConfig(level='ERROR') + + # Use inquirer to find the device + if args.dev == 'auto': + comms = Comms(list_all=True, baudrate=args.baudrate, timeout=args.timeout) + elif args.dev == 'rtt': + comms = Comms(rtt=True, baudrate=args.baudrate, timeout=args.timeout) + # If dev is just numbers, assume it's an rtt device + elif args.dev.isdigit(): + comms = Comms(rtt=True, serial=int(args.dev), timeout=args.timeout) + # Otherwise, assume it's a serial device + else: + comms = Comms(port=args.dev, baudrate=args.baudrate, timeout=args.timeout) + + cred_if = ATCommandInterface(comms) + + main(args, CredStore(cred_if)) diff --git a/src/nrfcredstore/command_interface.py b/src/nrfcredstore/command_interface.py new file mode 100644 index 0000000..3f69688 --- /dev/null +++ b/src/nrfcredstore/command_interface.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2025 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: BSD-3-Clause + +from enum import Enum +from abc import ABC, abstractmethod +import math +import time +from nrfcredstore.comms import Comms +import base64 +import hashlib +import coloredlogs, logging +import re +from typing import List, Tuple, Optional + +logger = logging.getLogger(__name__) + +IMEI_LEN = 15 + +class CredentialCommandInterface(ABC): + def __init__(self, comms: Comms): + """Initialize a Credentials Command Interface + + Args: + comms: Comms object to use for serial communication. + """ + self.comms = comms + + def write_raw(self, command: str): + """Write a raw line directly to the serial interface.""" + self.comms.write_line(command) + + @abstractmethod + def write_credential(self, sectag: int, cred_type: int, cred_text: str) -> bool: + """Write a credential string to the command interface""" + return False + + @abstractmethod + def delete_credential(self, sectag: int, cred_type: int) -> bool: + """Delete a credential using command interface""" + return False + + @abstractmethod + def check_credential_exists(self, sectag: int, cred_type: int, get_hash=True) -> tuple[bool, Optional[str]]: + """Verify that a credential is installed. If check_hash is true, retrieve the SHA hash.""" + return False, None + + @abstractmethod + def calculate_expected_hash(self, cred_text: str) -> str: + """Returns the expected digest/hash for a given credential as a string""" + return "" + + @abstractmethod + def get_csr(self, sectag: int, attributes: str) -> Optional[str]: + """Generate a private/public keypair and a corresponding Certificate Signing Request. + + Returns: + CSR blob in modem specific body.cose format. + """ + return + + @abstractmethod + def go_offline(self) -> bool: + """Tell the device to go offline so that credentials can be modified""" + return False + + @abstractmethod + def get_imei(self) -> Optional[str]: + """Get device IMEI, if applicable""" + return + + @abstractmethod + def get_mfw_version(self) -> Optional[str]: + """Get modem firmware version, if applicable""" + return + +class ATCommandInterface(CredentialCommandInterface): + shell = False + + def _parse_sha(self, cmng_result_str: str): + # Example AT%CMNG response: + # %CMNG: 123,0,"2C43952EE9E000FF2ACC4E2ED0897C0A72AD5FA72C3D934E81741CBD54F05BD1" + # The first item in " is the SHA. + try: + return cmng_result_str.split('"')[1] + except (ValueError, IndexError): + logger.error(f'Could not parse credential hash: {cmng_result_str}') + return None + + def set_shell_mode(self, shell: bool): + self.shell = shell + + def detect_shell_mode(self): + """Detect if the device is in shell mode or not.""" + for cmd, shell_mode in [("at AT+CGSN", True), ("AT+CGSN", False)]: + for _ in range(3): + self.write_raw(cmd) + result, output = self.comms.expect_response("OK", "ERROR", "", suppress_errors=True, timeout=1) + if result and len(re.findall("[0-9]{15}", output)) > 0: + self.set_shell_mode(shell_mode) + return + raise TimeoutError("Failed to detect shell mode. Device does not respond to AT commands.") + + def enable_error_codes(self): + """Enable error codes in the AT client""" + if not self.at_command('AT+CMEE=1', wait_for_result=True): + logger.error("Failed to enable error codes.") + + def at_command(self, at_command: str, wait_for_result=False, suppress_errors=False): + """Write an AT command to the command interface. Optionally wait for OK""" + + if self.shell: + # Transform line endings to match shell expectations + at_command = at_command.replace("\r", "") + at_command = at_command.replace("\n", "\\n") + self.write_raw("at '" + at_command + "'") + else: + self.write_raw(at_command) + + if wait_for_result: + result, _ = self.comms.expect_response("OK", "ERROR", suppress_errors=suppress_errors) + return result + else: + return True + + def write_credential(self, sectag: int, cred_type: int, cred_text: str): + return self.at_command(f'AT%CMNG=0,{sectag},{cred_type},"{cred_text}"', wait_for_result=True) + + def delete_credential(self, sectag: int, cred_type: int): + return self.at_command(f'AT%CMNG=3,{sectag},{cred_type}', wait_for_result=True) + + def check_credential_exists(self, sectag: int, cred_type: int, get_hash=True): + self.at_command(f'AT%CMNG=1,{sectag},{cred_type}') + retval, res = self.comms.expect_response("OK", "ERROR", "%CMNG") + if retval and res: + if not get_hash: + return True, None + else: + return True, self._parse_sha(res) + + return False, None + + def calculate_expected_hash(self, cred_text: str): + # AT Command host returns hex of SHA256 hash of credential plaintext + return hashlib.sha256(cred_text.encode('utf-8')).hexdigest().upper() + + def go_offline(self): + return self.at_command('AT+CFUN=4', wait_for_result=True) + + def get_imei(self): + self.at_command('AT+CGSN') + retval, output = self.comms.expect_response("OK", "ERROR", "") + if not retval: + return None + return output[:IMEI_LEN] + + def get_model_id(self): + self.at_command('AT+CGMM') + retval, output = self.comms.expect_response("OK", "ERROR", "") + if not retval: + return None + return output + + def get_mfw_version(self): + self.at_command('AT+CGMR') + retval, output = self.comms.expect_response("OK", "ERROR", "") + if not retval: + return None + return output + + def get_attestation_token(self): + self.at_command('AT%ATTESTTOKEN') + retval, output = self.comms.expect_response("OK", "ERROR", "%ATTESTTOKEN:") + if not retval: + return None + attest_tok = output.split('"')[1] + return attest_tok + + def get_csr(self, sectag=0, attributes=""): + if attributes: + self.at_command(f'AT%KEYGEN={sectag},2,0,"{attributes}"') + else: + self.at_command(f'AT%KEYGEN={sectag},2,0') + + retval, output = self.comms.expect_response("OK", "ERROR", "%KEYGEN:") + + if not retval: + return None + + # Convert the encoded blob to an actual cert + csr_blob = str(output).split('"')[1] + logger.debug('CSR blob: {}'.format(csr_blob)) + + # Format is "body.cose" + # body is base64 encoded DER + # cose is base64 encoded COSE header (CBOR) + + return csr_blob + +TLS_CRED_TYPES = ["CA", "SERV", "PK"] +# This chunk size can be any multiple of 4, as long as it is small enough to fit within the +# Zephyr shell buffer. +TLS_CRED_CHUNK_SIZE = 48 + +class TLSCredShellInterface(CredentialCommandInterface): + def write_credential(self, sectag, cred_type, cred_text): + # Because the Zephyr shell does not support multi-line commands, + # we must base-64 encode our PEM strings and install them as if they were binary. + # Yes, this does mean we are base-64 encoding a string which is already mostly base-64. + # We could alternatively strip the ===== BEGIN/END XXXX ===== header/footer, and then pass + # everything else directly as a binary payload (using BIN mode instead of BINT, since + # MBedTLS uses the NULL terminator to determine if the credential is raw DER, or is a + # PEM string). But this will fail for multi-CA installs, such as CoAP. + + # text -> bytes -> base64 bytes -> base64 text + encoded = base64.b64encode(cred_text.encode()).decode() + + # Clear credential buffer -- If it is already clear, there may not be text feedback + self.write_raw("cred buf clear") + + # Write the encoded credential in chunks + chunks = math.ceil(len(encoded)/TLS_CRED_CHUNK_SIZE) + for c in range(chunks): + chunk = encoded[c*TLS_CRED_CHUNK_SIZE:(c+1)*TLS_CRED_CHUNK_SIZE] + self.write_raw(f"cred buf {chunk}") + self.comms.expect_response("Stored") + + # Store the buffered credential + self.write_raw(f"cred add {sectag} {TLS_CRED_TYPES[cred_type]} DEFAULT bint") + result, _ = self.comms.expect_response("Added TLS credential") + time.sleep(1) + return result + + def delete_credential(self, sectag: int, cred_type: int): + self.write_raw(f'cred del {sectag} {TLS_CRED_TYPES[cred_type]}') + result, _ = self.comms.expect_response("Deleted TLS credential", "There is no TLS credential") + time.sleep(2) + return result + + def check_credential_exists(self, sectag: int, cred_type: int, get_hash=True): + self.write_raw(f'cred list {sectag} {TLS_CRED_TYPES[cred_type]}') + + # This will capture the list dump for the credential if it exists. + result, output = self.comms.expect_response("1 credentials found.", + "0 credentials found.", + f"{sectag},{TLS_CRED_TYPES[cred_type]}") + + if not output: + return False, None + + if not get_hash: + return True, None + + # Output is a comma separated list of positional items + data = output.split(",") + hash = data[2].strip() + status_code = data[3].strip() + + if (status_code != "0"): + logger.error(f"Error retrieving credential hash: {output.strip()}.") + logger.error("Device might not support credential digests.") + return True, None + + return True, hash + + def calculate_expected_hash(self, cred_text: str): + # TLS Credentials shell returns base-64 of SHA256 hash of full credential, including NULL + # termination. + hash = hashlib.sha256(cred_text.encode('utf-8') + b'\x00') + return base64.b64encode(hash.digest()).decode() + + def get_csr(self, sectag=0, attributes=""): + raise RuntimeError("The TLS Credentials Shell does not support CSR generation") + + def go_offline(self): + # TLS credentials shell has no concept of online/offline. Just no-op. + return True + + def get_imei(self): + raise RuntimeError("The TLS Credentials Shell does not support IMEI extraction") + + def get_mfw_version(self): + raise RuntimeError("The TLS Credentials Shell does not support MFW version extraction") diff --git a/src/nrfcredstore/comms.py b/src/nrfcredstore/comms.py new file mode 100644 index 0000000..2226fcd --- /dev/null +++ b/src/nrfcredstore/comms.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python3 +# +# Copyright (c) 2025 Nordic Semiconductor ASA +# +# SPDX-License-Identifier: BSD-3-Clause + +# comms module for nrfcloud-utils +# This module handles finding and selecting a device and doing serial comms with it. +# Both serial and RTT backends are supported. + +from serial.tools import list_ports +from serial.tools.list_ports_common import ListPortInfo +import serial +from collections import defaultdict +import sys +import time +import atexit +import inquirer +from pynrfjprog import LowLevel +import coloredlogs, logging +import re +import platform +from typing import Tuple, List, Union, Optional + +logger = logging.getLogger(__name__) + +CMD_TERM_DICT = {'NULL': '\0', + 'CR': '\r', + 'LF': '\n', + 'CRLF': '\r\n'} + +CMD_TYPE_AT = "at" +CMD_TYPE_AT_SHELL = "at_shell" +CMD_TYPE_TLS_SHELL = "tls_cred_shell" +CMD_TYPE_AUTO = "auto" + +ansi_escape = re.compile(r'\x1B\[[0-?]*[ -/]*[@-~]') + +usb_patterns = [ + (r"THINGY91X", "Thingy:91 X", 0), + (r"THINGY91", "Thingy:91", 0), + (r"PCA20035", "Thingy:91", 0), + (r"0009600", "nRF9160-DK", 0), + (r"0009601", "nRF5340-DK", 1), + (r"0010500", "nRF5340-DK", 1), + (r"0010507", "nRF7002-DK", 1), + (r"0010509", "nRF9161-DK", 0), + (r"0010510", "nRF9131-EK", 0), + (r"0010511", "nRF54H20-DK", 0), + (r"0010512", "nRF9151-DK", 0), + (r"0010550", "Thingy:91 X", 0), + (r"0010551", "Thingy:91 X", 0), + (r"0010513", "Thingy:91 X", 0), + (r"0010577", "nRF54L15-DK", 1), + (r"00105", "Unknown Nordic Kit", 0), + (r"NRFBLEGW", "nRF Cloud Gateway", 0), +] + +ERR_CODE_TO_MSG = { + 0: 'AT command not supported by firmware version. Upgrade modem firmware?', + 513: 'Not found', + 514: 'Not allowed', + 515: 'Memory full', + 518: 'Not allowed in active state', + 519: 'Already exists', + 523: 'Key generation failed', +} + +# HWIDs look different on different platforms: +# Linux: 'USB VID:PID=1366:1059 SER=001051216197 LOCATION=3-12.1.3.2.1.4:1.0' +# MacOS: 'USB VID:PID=1366:1059 SER=001051246141 LOCATION=0-1.4.2.3' +# Windows: [(1, 'USB VID:PID=1366:1059 SER=001057731013'), (2, 'USB VID:PID=1366:1059 SER=001057731013 LOCATION=1-21:x.2')] + + +# Returns a list of printable name, serial number and serial port for connected Nordic boards +def get_connected_nordic_boards() -> List[Tuple[str, Union[str, int], ListPortInfo]]: + if platform.system() == 'Darwin': + ports = sorted(list_ports.comports(), key=lambda x: x.device) + else: + ports = sorted(list_ports.comports(), key=lambda x: x.hwid) + nordic_boards = defaultdict(list) + for port in ports: + # Get serial number from hwid, because port.serial_number is not always available + serial = extract_serial_number_from_serial_device(port) + nordic_boards[serial].append(port) + main_ports = [] + for serial, ports in nordic_boards.items(): + for pattern, name, main_port in usb_patterns: + if f"SER={pattern}" in ports[0].hwid: + main_ports.append((name, serial, ports[main_port])) + break + return main_ports + +# Returns a list of SEGGER J-Link serial numbers as int +def get_connected_jlinks() -> List[int]: + with LowLevel.API(LowLevel.DeviceFamily.UNKNOWN) as api: + return api.enum_emu_snr() or [] + +# For a serial device, return the serial number +def extract_serial_number_from_serial_device(dev: ListPortInfo) -> Union[str, int, None]: + hwid = dev.hwid + # Get serial number from hwid, because port.serial_number is not always available + serial = [x[4:] for x in hwid.split(" ") if x.startswith("SER=")] + if len(serial) == 0: + return None + serial = serial[0] + + if serial.isnumeric(): + return int(serial) + + return serial + +def extract_product_name_from_serial_device(dev: ListPortInfo) -> str: + for pattern, name, main_port in usb_patterns: + if f"SER={pattern}" in dev.hwid: + return name + for text in dev.hwid.split(" "): + if text.startswith("VID:PID="): + return text + return '' + +def extract_product_name_from_jlink_serial(serial : int) -> str: + serial_str = f"{serial:012}" + for pattern, name, main_port in usb_patterns: + if pattern in serial_str: + return name + return '' + +# Find the main port for a device if it's a Nordic board +def get_port_index(dev: ListPortInfo) -> Optional[int]: + for pattern, name, main_port in usb_patterns: + if f"SER={pattern}" in dev.hwid: + return main_port + return None + +def select_jlink(jlinks : List[int], list_all: bool) -> int: + if len(jlinks) == 0: + raise Exception("No J-Link device found") + if len(jlinks) == 1: + return jlinks[0] + if list_all: + question = inquirer.List( + "serial", + message="Select a J-Link device", + choices=[(f"{serial} {extract_product_name_from_jlink_serial(serial)}", serial) for serial in jlinks], + ) + else: + nordic_boards = get_connected_nordic_boards() + serial_numbers = [serial for _, serial, _ in nordic_boards if serial in jlinks] + if len(serial_numbers) == 0: + raise Exception("No J-Link device found") + if len(serial_numbers) == 1: + return serial_numbers[0] + else: + question = inquirer.List( + "serial", + message="Select a J-Link device", + choices=[(f"{serial} {extract_product_name_from_jlink_serial(serial)}", serial) for serial in serial_numbers], + ) + answer = inquirer.prompt([question]) + if not answer: + raise Exception("No J-Link device selected") + return answer["serial"] + + +def select_device_by_serial(serial_number : Union[str, int], list_all : bool) -> Tuple[ListPortInfo, Union[str, int]]: + serial_devices = [ + x + for x in list_ports.comports() + if extract_serial_number_from_serial_device(x) == serial_number + ] + if len(serial_devices) == 0: + raise Exception(f"No device found with serial {serial_number}") + if len(serial_devices) == 1: + return (serial_devices[0], serial_number) + + if not list_all: + port_index = get_port_index(serial_devices[0]) + if port_index is not None: + # can return early if we can guess the right port + return (serial_devices[port_index], serial_number) + question = inquirer.List( + "port", + message="Select a serial port", + choices=[(port.device, port) for port in serial_devices], + ) + answer = inquirer.prompt([question]) + if not answer: + raise Exception("No serial port selected") + selected_port = answer["port"] + return (selected_port, serial_number) + +# Returns serial_port, serial_number of selected device +def select_device(rtt : bool, serial_number : Optional[Union[str, int]], port : Optional[ListPortInfo], list_all : bool) -> Tuple[Optional[ListPortInfo], Optional[Union[str, int]]]: + if type(serial_number) == str and serial_number.isdigit(): + serial_number = int(serial_number) + + if rtt: + # RTT requires a J-Link device + jlinks = get_connected_jlinks() + if serial_number: + if serial_number in jlinks: + return (None, serial_number) + else: + raise Exception(f"No device found with serial {serial_number}") + return (None, select_jlink(jlinks, list_all)) + + if port: + # Serial ports are unique, so we just check if it exists and try to get a serial number + serial_devices = [x for x in list_ports.comports() if x.device == port] + if len(serial_devices) == 0: + raise Exception(f"No device found with port {port}") + extracted_serial_number = extract_serial_number_from_serial_device( + serial_devices[0] + ) + if serial_number and extracted_serial_number != serial_number: + logger.warning( + f"Given Serial number {serial_number} does not match device serial number {extracted_serial_number}" + ) + return (serial_devices[0], extracted_serial_number) + + if serial_number: + # Often, there are multiple serial ports for a device, so we need to find the right one + return select_device_by_serial(serial_number, list_all) + + if list_all: + # Show all ports, no filtering + ports = list_ports.comports() + question = inquirer.List( + "port", + message="Select a serial port", + choices=sorted([(f"{port.device} {extract_product_name_from_serial_device(port)}", port) for port in ports]), + ) + answer = inquirer.prompt([question]) + if not answer: + raise Exception("No serial port selected") + selected_port = answer["port"] + extracted_serial_number = extract_serial_number_from_serial_device( + selected_port + ) + return (selected_port, extracted_serial_number) + + # Select from connected Nordic boards + nordic_boards = get_connected_nordic_boards() + if len(nordic_boards) == 0: + raise Exception("No device found") + if len(nordic_boards) == 1: + name, serial, port = nordic_boards[0] + return (port, serial) + question = inquirer.List( + "port", + message="Select a serial port", + choices=[(f"{port.device} {name}", port) for name, serial, port in nordic_boards], + ) + answer = inquirer.prompt([question]) + if not answer: + raise Exception("No serial port selected") + selected_port = answer["port"] + extracted_serial_number = extract_serial_number_from_serial_device(selected_port) + return (selected_port, extracted_serial_number) +class Comms: + def __init__( + self, + port=None, + serial=None, + baudrate=115200, + xonxoff=False, + rtscts=True, + dsrdtr=False, + timeout=1, + line_ending="\r\n", + rtt=False, + list_all=False, + ): + self.timeout = timeout + self.jlink_api = None + self.serial_api = None + self.write = None + self.read_line = None + self.line_ending = line_ending + self._rtt_line_buffer = '' + + serial_port, self.serial_number = select_device(rtt, serial, port, list_all) + + if rtt: + self._init_rtt() + else: + self._init_serial(serial_port, baudrate, xonxoff, rtscts, dsrdtr) + + atexit.register(self.close) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def close(self): + if self.jlink_api: + self.jlink_api.close() + self.jlink_api = None + if self.serial_api: + self.serial_api.close() + self.serial_api = None + + def expect_response(self, ok_str=None, error_str=None, store_str=None, timeout=15, suppress_errors=False): + ''' + Read lines until either ok_str or error_str is found or timeout (seconds) is reached. + If store_str is in one of the lines, it will be returned as the output. + + return tuple of (ok_or_error, output) + ''' + output = '' + time_end = time.time() + timeout + while time.time() < time_end: + line = self.read_line() # type: ignore + if line: + line = line.strip() + # Remove ANSI escape codes + line = ansi_escape.sub('', line) + if ok_str and ok_str == line: + return (True, output) + if error_str and error_str == line: + return (False, output) + if line.startswith('+CME ERROR'): + code = int(line.replace('+CME ERROR: ', '')) + if not suppress_errors: + logging.error(f'AT command error: {ERR_CODE_TO_MSG.get(code, "Unknown error")}') + return (False, output) + if (store_str is not None) and store_str in line: + output += line + '\r\n' + time.sleep(0.1) + return (False, output) + + def reset_device(self): + if self.jlink_api: + self.close() + self._init_rtt() + else: + logger.error("Cannot reset device, not using RTT") + + def write_line(self, data : str): + logger.debug(f"> {data}") + self.write((data + self.line_ending).encode('ascii')) # type: ignore + + def _readline_rtt(self) -> Optional[str]: + time_end = time.time() + self.timeout + while time.time() < time_end: + self._rtt_line_buffer += self.jlink_api.rtt_read(channel_index=0, length=4096) # type: ignore + # Find first line ending + line_end = self._rtt_line_buffer.find(self.line_ending) + if line_end != -1: + # Split the line from the buffer + line = self._rtt_line_buffer[:line_end] + self._rtt_line_buffer = self._rtt_line_buffer[line_end + len(self.line_ending) :] + logger.debug(f"< {line}") + return line + time.sleep(0.1) + return None + + def _readline_serial(self) -> Optional[str]: + # Read a line from the serial port + line = self.serial_api.readline() # type: ignore + if line: + line = line.decode('utf-8', errors="replace").strip() + logger.debug(f"< {line}") + return line + return None + + def _write_rtt(self, data: bytes): + # Hacky workaround from old rtt_interface + for i in range(0, len(data), 12): + self.jlink_api.rtt_write(channel_index=0, msg=data[i : i + 12]) # type: ignore + time.sleep(0.01) + + def _write_serial(self, data: bytes): + self.serial_api.write(data) # type: ignore + + def _reset_input_buffer_rtt(self): + # RTT does not have an input buffer to reset, but we can clear the line buffer + self._rtt_line_buffer = '' + + def _init_rtt(self): + self.jlink_api = LowLevel.API(LowLevel.DeviceFamily.UNKNOWN) + self.jlink_api.open() + self.jlink_api.connect_to_emu_with_snr(self.serial_number) + self.jlink_api.select_family(self.jlink_api.read_device_family()) + self.jlink_api.sys_reset() + self.jlink_api.go() + self.jlink_api.rtt_start() + for _ in range(5): + if self.jlink_api.rtt_is_control_block_found(): + break + time.sleep(0.5) + self.write = self._write_rtt + self.read_line = self._readline_rtt + self.reset_input_buffer = self._reset_input_buffer_rtt + + def _init_serial(self, serial_port, baudrate, xonxoff, rtscts, dsrdtr): + self.serial_api = serial.Serial( + port=serial_port.device, + baudrate=baudrate, + timeout=self.timeout, + xonxoff=xonxoff, + rtscts=rtscts, + dsrdtr=dsrdtr, + ) + # Initialize the serial port, clear the buffers + self.serial_api.reset_output_buffer() + self.serial_api.write(self.line_ending.encode('ascii')) + self.serial_api.flush() + time.sleep(0.2) + self.serial_api.reset_input_buffer() + self.write = self._write_serial + self.read_line = self._readline_serial + self.reset_input_buffer = self.serial_api.reset_input_buffer diff --git a/src/nrfcredstore/credstore.py b/src/nrfcredstore/credstore.py index f7a4e8c..5b7f3b3 100644 --- a/src/nrfcredstore/credstore.py +++ b/src/nrfcredstore/credstore.py @@ -27,8 +27,8 @@ def __init__(self, tag: int, type: int, sha: str): self.sha = sha class CredStore: - def __init__(self, at_client): - self.at_client = at_client + def __init__(self, command_interface): + self.command_interface = command_interface def func_mode(self, mode): """Set modem functioning mode @@ -36,7 +36,7 @@ def func_mode(self, mode): See AT Command Reference Guide for valid modes. """ - return self.at_client.at_command(f'AT+CFUN={mode}') == [] + return self.command_interface.at_command(f'AT+CFUN={mode}', wait_for_result=True) def list(self, tag = None, type: CredType = CredType.ANY) -> List[Credential]: """List stored credentials @@ -49,16 +49,20 @@ def list(self, tag = None, type: CredType = CredType.ANY) -> List[Credential]: if tag is None and type != CredType.ANY: raise RuntimeError('Cannot list with type without a tag') - # optional secure tag + # Optional secure tag if tag is not None: cmd = f'{cmd},{tag}' - # optional key type + # Optional key type if type != CredType.ANY: cmd = f'{cmd},{CredType(type).value}' - response_lines = self.at_client.at_command(cmd) - response_lines = [line for line in response_lines if line.strip()] + self.command_interface.at_command(cmd, wait_for_result=False) + result, response = self.command_interface.comms.expect_response("OK", "ERROR", "%CMNG: ") + if not result: + raise RuntimeError("Failed to list credentials") + response_lines = response.splitlines() + response_lines = [line.strip() for line in response_lines if line.strip()] columns = map(lambda line: line.replace('%CMNG: ', '').replace('"', '').split(','), response_lines) cred_map = map(lambda columns: @@ -77,7 +81,8 @@ def write(self, tag: int, type: CredType, file: io.TextIOBase): if type == CredType.ANY: raise ValueError cert = file.read().rstrip() - return self.at_client.at_command(f'AT%CMNG=0,{tag},{type.value},"{cert}"') + if not self.command_interface.at_command(f'AT%CMNG=0,{tag},{type.value},"{cert}"', wait_for_result=True): + raise RuntimeError("Failed to write credential") def delete(self, tag: int, type: CredType): """Delete a credential from the modem @@ -87,22 +92,19 @@ def delete(self, tag: int, type: CredType): if type == CredType.ANY: raise ValueError - return self.at_client.at_command(f'AT%CMNG=3,{tag},{type.value}') + if not self.command_interface.at_command(f'AT%CMNG=3,{tag},{type.value}', wait_for_result=True): + raise RuntimeError("Failed to delete credential") def keygen(self, tag: int, file: io.BufferedIOBase, attributes: str = ''): """Generate a new private key and return a certificate signing request in DER format""" - cmd = f'AT%KEYGEN={tag},2,0' - if attributes != '': - cmd = f'{cmd},"{attributes}"' + keygen_output = self.command_interface.get_csr(sectag=tag, attributes=attributes) - response_lines = self.at_client.at_command(cmd) - for l in response_lines: - if not l.startswith('%KEYGEN'): - continue - keygen_output = l.replace('%KEYGEN: "', '') - csr_der_b64 = keygen_output.split('.')[0] - csr_der_bytes = base64.urlsafe_b64decode(csr_der_b64 + '===') - file.write(csr_der_bytes) + if not keygen_output: + raise RuntimeError("Failed to generate key") + csr_der_b64 = keygen_output.split('.')[0] + csr_der_bytes = base64.urlsafe_b64decode(csr_der_b64 + '===') + + file.write(csr_der_bytes) file.close() diff --git a/tests/test_at_client.py b/tests/test_at_client.py deleted file mode 100644 index 77a5139..0000000 --- a/tests/test_at_client.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Test for cert_manager.py -""" - -import pytest - -from unittest.mock import Mock -from nrfcredstore.at_client import ATClient -from nrfcredstore.exceptions import ATCommandError, NoATClientException - -def encode_cmd(cmd): - return f'{cmd}\r\n'.encode('utf-8') - -def response_lines(lines): - return map(encode_cmd, lines) - -# pylint: disable=no-self-use -class TestATClient: - - @pytest.fixture - def at_client(self): - self.serial = Mock() - return ATClient(self.serial) - - @pytest.fixture - def ok_resp(self, at_client): - self.serial.readline.return_value = 'OK\r\n'.encode('utf-8') - - @pytest.fixture - def not_at_client_resp(self, at_client): - self.serial.readline.side_effect = [b'AT: command not found', b''] - - @pytest.fixture - def error_resp(self, at_client): - self.serial.readline.return_value = 'ERROR\r\n'.encode('utf-8') - - @pytest.fixture - def error_code_resp(self, at_client): - self.serial.readline.return_value = '+CME ERROR: 514\r\n'.encode('utf-8') - - @pytest.fixture - def error_code_0_resp(self, at_client): - self.serial.readline.return_value = '+CME ERROR: 0\r\n'.encode('utf-8') - - @pytest.fixture - def error_ok_resp(self, at_client): - self.serial.readline.side_effect = [b'ERROR', b'OK'] - - def test_create_without_serial_device(self): - with pytest.raises(RuntimeError): - ATClient(None) - - def test_at_command_with_serial_closed(self, at_client): - with pytest.raises(ConnectionError): - self.serial.is_open = False - at_client.at_command('AT') - - def test_verify_with_serial_closed(self, at_client): - with pytest.raises(ConnectionError): - self.serial.is_open = False - at_client.verify() - - def test_exposes_serial_device(self, at_client): - assert at_client.device is self.serial - - def test_connect_sets_port(self, at_client): - at_client.connect('/dev/tty.usb') - assert self.serial.port == '/dev/tty.usb' - - def test_connect_sets_baudrate(self, at_client): - at_client.connect('foo', 123) - assert self.serial.baudrate == 123 - - def test_connect_sets_timeout(self, at_client): - at_client.connect('foo', timeout=3) - assert self.serial.timeout == 3 - - def test_connect_opens_serial_connection_to_device(self, at_client): - at_client.connect('foo') - self.serial.open.assert_called() - - def test_verify_fails_for_wrong_response(self, at_client, not_at_client_resp): - """Test that the AT client verifiation raises NoATClientException when readline returns - unexpected lines and an empty string (timeout). - """ - with pytest.raises(NoATClientException): - at_client.verify() - - def test_verify_sends_at_command(self, at_client, ok_resp): - at_client.verify() - self.serial.write.assert_called_with('AT\r\n'.encode('utf-8')) - - def test_verify_fails_on_error(self, at_client, error_resp): - with pytest.raises(NoATClientException): - at_client.verify() - - def test_verify_retries_on_first_error(self, at_client, error_ok_resp): - assert at_client.verify() - - def test_enable_error_codes_sends_cmd(self, at_client, ok_resp): - at_client.enable_error_codes() - self.serial.write.assert_called_with(encode_cmd('AT+CMEE=1')) - - def test_at_command_error(self, at_client, error_resp): - with pytest.raises(ATCommandError): - at_client.at_command('AT') - - def test_at_command_error_code(self, at_client, error_code_resp): - with pytest.raises(ATCommandError): - at_client.at_command('AT') - - def test_at_command_error_code_maps_to_message(self, at_client, error_code_resp): - with pytest.raises(ATCommandError) as excinfo: - at_client.at_command('AT') - assert 'Not allowed' in str(excinfo.value) - - def test_unsupported_cmd_error_code(self, at_client, error_code_0_resp): - with pytest.raises(ATCommandError) as excinfo: - at_client.at_command('AT') - assert 'AT command not supported by firmware version' in str(excinfo.value) - - def test_at_command_with_single_line_response(self, at_client): - self.serial.readline.side_effect = [b'single', b'OK'] - assert at_client.at_command('AT+CGSN') == ['single'] - - def test_at_command_with_multi_line_response(self, at_client): - self.serial.readline.side_effect = [b'foo', b'bar', b'OK'] - assert at_client.at_command('AT+CGSN') == ['foo', 'bar'] diff --git a/tests/test_cli.py b/tests/test_cli.py index ae8cb59..1550149 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,7 +2,7 @@ from unittest.mock import Mock, ANY, patch from serial import SerialException -from nrfcredstore.cli import main +from nrfcredstore.cli import main, parse_args, run, FUN_MODE_OFFLINE from nrfcredstore.credstore import CredType from nrfcredstore.exceptions import NoATClientException, ATCommandError @@ -11,138 +11,137 @@ class TestCli(): @pytest.fixture - def at_client(self): - at_client = Mock() - return at_client + def command_interface(self): + command_interface = Mock() + return command_interface @pytest.fixture - def credstore(self, at_client): + def credstore(self, command_interface): credstore = Mock() - credstore.at_client = at_client + credstore.command_interface = command_interface return credstore - @pytest.fixture - def offline(self, credstore): - credstore.funk_mode.return_value = True - @pytest.fixture def empty_cred_list(self, credstore): credstore.list.return_value = [] - def test_device_passed_to_connect(self, credstore, at_client, offline, empty_cred_list): - main(['/dev/tty.usb', 'list'], credstore) - at_client.connect.assert_called_with('/dev/tty.usb', ANY, ANY) - - def test_baudrate_passed_to_connect(self, credstore, at_client, offline, empty_cred_list): - main(['fakedev', '--baudrate', '9600', 'list'], credstore) - at_client.connect.assert_called_with(ANY, 9600, ANY) - - def test_timeout_passed_to_connect(self, credstore, at_client, offline, empty_cred_list): - main(['fakedev', '--timeout', '3', 'list'], credstore) - at_client.connect.assert_called_with(ANY, ANY, 3) - - def test_at_client_verify(self, credstore, at_client, offline, empty_cred_list): - main(['fakedev', 'list'], credstore) - at_client.verify.assert_called() - - @patch('builtins.print') - def test_at_client_verify_fail(self, mock_print, credstore, at_client): - at_client.verify.side_effect = NoATClientException() - with pytest.raises(SystemExit) as e: - main(['fakedev', 'list'], credstore) - assert 'does not respond to AT commands' in mock_print.call_args[0][0] - - def test_at_client_enable_error_codes(self, credstore, at_client, offline, empty_cred_list): - main(['fakedev', 'list'], credstore) - at_client.enable_error_codes.assert_called() + @pytest.fixture + def cred_list_minimal(self, credstore): + credstore.list.return_value = [ + Mock(tag=4294967292, type=CredType.NORDIC_PUB_KEY, sha='672E2F05962B4EFBFA8801255D87E0E0418F2DDF4DDAEFC59E9B4162F512CB63'), + Mock(tag=4294967293, type=CredType.NORDIC_ID_ROOT_CA, sha='2C43952EE9E000FF2ACC4E2ED0897C0A72AD5FA72C3D934E81741CBD54F05BD1'), + Mock(tag=4294967294, type=CredType.DEV_ID_PUB_KEY, sha='A0C145630DB69B4ED933DDE9F3E77BCD5540A869461DBC82D6F554EA64B6AC9E'), + ] + + # non-responsive device + def test_non_responsive_device(self, credstore, command_interface): + command_interface.detect_shell_mode.side_effect = TimeoutError() + with pytest.raises(TimeoutError) as e: + main(parse_args(['fakedev', 'list']), credstore) + assert e.type == TimeoutError + + def test_non_responsive_device(self, credstore, command_interface, empty_cred_list): + command_interface.detect_shell_mode.enable_error_codes.return_value = False + main(parse_args(['fakedev', 'list']), credstore) + + def test_list_default_empty(self, credstore, empty_cred_list): + main(parse_args(['fakedev', 'list']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) + credstore.list.assert_called_with(None, CredType.ANY) - def test_list_default(self, credstore, offline, empty_cred_list): - main(['fakedev', 'list'], credstore) + def test_list_default(self, credstore, cred_list_minimal): + main(parse_args(['fakedev', 'list']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.list.assert_called_with(None, CredType.ANY) - def test_list_with_tag(self, credstore, offline, empty_cred_list): - main(['fakedev', 'list', '--tag', '123'], credstore) + def test_list_with_tag(self, credstore, empty_cred_list): + main(parse_args(['fakedev', 'list', '--tag', '123']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.list.assert_called_with(123, ANY) - def test_list_with_type(self, credstore, offline, empty_cred_list): - main(['fakedev', 'list', '--tag', '123', '--type', 'CLIENT_KEY'], credstore) + def test_list_with_type(self, credstore, empty_cred_list): + main(parse_args(['fakedev', 'list', '--tag', '123', '--type', 'CLIENT_KEY']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.list.assert_called_with(ANY, CredType.CLIENT_KEY) - def test_write_tag_and_type(self, credstore, offline): + def test_write_tag_and_type(self, credstore): credstore.write.return_value = True - main(['fakedev', 'write', '123', 'ROOT_CA_CERT', 'tests/fixtures/root-ca.pem'], credstore) + main(parse_args(['fakedev', 'write', '123', 'ROOT_CA_CERT', 'tests/fixtures/root-ca.pem']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.write.assert_called_with(123, CredType.ROOT_CA_CERT, ANY) @patch('builtins.open') - def test_write_file(self, mock_file, credstore, offline): + def test_write_file(self, mock_file, credstore): credstore.write.return_value = True - main(['fakedev', 'write', '123', 'ROOT_CA_CERT', 'foo.pem'], credstore) + main(parse_args(['fakedev', 'write', '123', 'ROOT_CA_CERT', 'foo.pem']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) mock_file.assert_called_with('foo.pem', 'r', ANY, ANY, ANY) @patch('builtins.open') - def test_write_psk_file(self, mock_file, credstore, offline): + def test_write_psk_file(self, mock_file, credstore): credstore.write.return_value = True - main(['fakedev', 'write', '123', 'PSK', 'foo.psk'], credstore) + main(parse_args(['fakedev', 'write', '123', 'PSK', 'foo.psk']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) mock_file.assert_called_with('foo.psk', 'r', ANY, ANY, ANY) - def test_delete(self, credstore, offline): + def test_delete(self, credstore): credstore.delete.return_value = True - main(['fakedev', 'delete', '123', 'CLIENT_KEY'], credstore) + main(parse_args(['fakedev', 'delete', '123', 'CLIENT_KEY']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.delete.assert_called_with(123, CredType.CLIENT_KEY) - def test_delete_any_should_fail(self, credstore, offline): + def test_delete_any_should_fail(self, credstore): with pytest.raises(SystemExit): - main(['fakedev', 'delete', '123', 'ANY'], credstore) + main(parse_args(['fakedev', 'delete', '123', 'ANY']), credstore) + + def test_deleteall(self, credstore, cred_list_minimal): + credstore.deleteall.return_value = True + main(parse_args(['fakedev', 'deleteall']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) + credstore.list.assert_called_with(None, CredType.ANY) @patch('builtins.open') - def test_generate_tag(self, mock_file, credstore, offline): + def test_generate_tag(self, mock_file, credstore): credstore.keygen.return_value = True - main(['fakedev', 'generate', '123', 'foo.der'], credstore) + main(parse_args(['fakedev', 'generate', '123', 'foo.der']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.keygen.assert_called_with(123, ANY, ANY) @patch('builtins.open') - def test_generate_file(self, mock_file, credstore, offline): + def test_generate_file(self, mock_file, credstore): credstore.keygen.return_value = True - main(['fakedev', 'generate', '123', 'foo.der'], credstore) + main(parse_args(['fakedev', 'generate', '123', 'foo.der']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) mock_file.assert_called_with('foo.der', 'wb', ANY, ANY, ANY) @patch('builtins.open') - def test_generate_with_attributes(self, credstore, offline): + def test_generate_with_attributes(self, credstore): credstore.keygen.return_value = True - main(['fakedev', 'generate', '123', 'foo.der', '--attributes', 'CN=foo'], credstore) + main(parse_args(['fakedev', 'generate', '123', 'foo.der', '--attributes', 'CN=foo']), credstore) + credstore.func_mode.assert_called_with(FUN_MODE_OFFLINE) credstore.keygen.assert_called_with(123, ANY, 'CN=foo') - def test_no_at_client_exit_code(self, credstore, at_client): - at_client.verify.side_effect = NoATClientException() - with pytest.raises(SystemExit) as e: - main(['fakedev', 'list'], credstore) - assert e.type == SystemExit - assert e.value.code == 10 - - def test_at_command_error_exit_code(self, credstore, at_client): - at_client.verify.side_effect = ATCommandError() - with pytest.raises(SystemExit) as e: - main(['fakedev', 'list'], credstore) - assert e.type == SystemExit - assert e.value.code == 11 - - def test_timeout_error_exit_code(self, credstore, at_client): - at_client.verify.side_effect = TimeoutError() - with pytest.raises(SystemExit) as e: - main(['fakedev', 'list'], credstore) - assert e.type == SystemExit - assert e.value.code == 12 - - def test_serial_exception_exit_code(self, credstore, at_client): - at_client.connect.side_effect = SerialException() - with pytest.raises(SystemExit) as e: - main(['fakedev', 'list'], credstore) - assert e.type == SystemExit - assert e.value.code == 13 - - def test_unhandled_exception_exit_code(self, credstore, at_client): - at_client.verify.side_effect = Exception() - with pytest.raises(SystemExit) as e: - main(['fakedev', 'list'], credstore) - assert e.type == SystemExit - assert e.value.code == 1 + def test_imei(self, credstore): + credstore.command_interface.get_imei.return_value = '123456789012345' + args = parse_args(['fakedev', 'imei']) + main(args, credstore) + credstore.command_interface.get_imei.assert_called_once() + + def test_attestation_token(self, credstore): + credstore.command_interface.get_attestation_token.return_value = '2dn3hQFQUDYxVDkxRPCAIhIbZAFifQNQGv86y_GmR2SiY0wmRsHGVFDT791_BPH8YOWFiyCHND1q.0oRDoQEmoQRBIfZYQGuXwJliinHc6xDPruiyjsaXyXZbZVpUuOhHG9YS8L05VuglCcJhMN4EUhWVGpaHgNnHHno6ahi-d5tOeZmAcNY' + args = parse_args(['fakedev', 'attoken']) + main(args, credstore) + credstore.command_interface.get_attestation_token.assert_called_once() + + def test_at_command_error_exit_code(self, credstore): + credstore.func_mode.side_effect = RuntimeError("Failed to set modem to offline mode.") + with pytest.raises(RuntimeError) as e: + main(parse_args(['fakedev', 'list']), credstore) + assert e.type == RuntimeError + + def test_cannot_find_device(self): + with patch("nrfcredstore.comms.__init__", return_value=Mock()) as mock_comms: + mock_comms.side_effect = Exception("No device found") + with pytest.raises(Exception) as e: + run(['nrfcredstore', 'fakedev', 'list']) + assert e.type == Exception diff --git a/tests/test_command_interface.py b/tests/test_command_interface.py new file mode 100644 index 0000000..a983631 --- /dev/null +++ b/tests/test_command_interface.py @@ -0,0 +1,154 @@ +from unittest.mock import patch, Mock +from collections import namedtuple +import pytest + +from nrfcredstore.command_interface import CredentialCommandInterface, ATCommandInterface, TLSCredShellInterface +from nrfcredstore.credstore import CredType + +@pytest.fixture +def comms(): + """Mock comms object""" + comms = Mock() + return comms + +@pytest.fixture +def at_command_interface(comms): + """Mock command interface""" + interface = ATCommandInterface(comms) + return interface + +@pytest.fixture +def tls_cred_shell_interface(comms): + """Mock TLSCredShellInterface""" + interface = TLSCredShellInterface(comms) + return interface + +def test_write_raw_at(at_command_interface): + """Test writing raw AT commands""" + at_command_interface.comms.write_line = Mock() + at_command_interface.write_raw('AT+TEST=1') + at_command_interface.comms.write_line.assert_called_once_with('AT+TEST=1') + +def test_write_raw_tls(tls_cred_shell_interface): + """Test writing raw TLS commands""" + tls_cred_shell_interface.comms.write_line = Mock() + tls_cred_shell_interface.write_raw('AT+TEST=1') + tls_cred_shell_interface.comms.write_line.assert_called_once_with('AT+TEST=1') + +def test_write_credential_at(at_command_interface): + """Test writing a credential using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, "") + at_command_interface.write_credential(sectag=42, cred_type=CredType.CLIENT_CERT.value, cred_text='test_value') + at_command_interface.comms.write_line.assert_called_once_with('AT%CMNG=0,42,1,"test_value"') + +def test_delete_credential_at(at_command_interface): + """Test deleting a credential using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, "") + at_command_interface.delete_credential(sectag=42, cred_type=CredType.CLIENT_CERT.value) + at_command_interface.comms.write_line.assert_called_once_with('AT%CMNG=3,42,1') + +def test_check_credential_exists_at(at_command_interface): + """Test checking if a credential exists using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '%CMNG: 42,1,"8CEA57609B0F95C0D0F80383A7A21ECD1C6E102FDCC3CDCEB1948B0EA828601D"') + exists, sha = at_command_interface.check_credential_exists(sectag=42, cred_type=CredType.CLIENT_CERT.value) + assert exists is True + assert sha == '8CEA57609B0F95C0D0F80383A7A21ECD1C6E102FDCC3CDCEB1948B0EA828601D' + +def test_get_csr_at(at_command_interface): + """Test getting a CSR using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '%KEYGEN: "foo.bar"') + csr = at_command_interface.get_csr(sectag=42, attributes='O=Test,CN=Device') + assert csr == 'foo.bar' + at_command_interface.comms.write_line.assert_called_once_with('AT%KEYGEN=42,2,0,"O=Test,CN=Device"') + +def test_get_csr_no_attributes(at_command_interface): + """Test getting a CSR without attributes using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '%KEYGEN: "foo.bar"') + csr = at_command_interface.get_csr(sectag=42) + assert csr == 'foo.bar' + at_command_interface.comms.write_line.assert_called_once_with('AT%KEYGEN=42,2,0') + +def test_get_imei(at_command_interface): + """Test getting IMEI using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '123456789012345') + imei = at_command_interface.get_imei() + assert imei == '123456789012345' + at_command_interface.comms.write_line.assert_called_once_with('AT+CGSN') + +def test_go_offline(at_command_interface): + """Test going offline using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '') + at_command_interface.go_offline() + at_command_interface.comms.write_line.assert_called_once_with('AT+CFUN=4') + +def test_get_model_id(at_command_interface): + """Test getting model ID using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, 'nRF9151-LACA') + model_id = at_command_interface.get_model_id() + assert model_id == 'nRF9151-LACA' + at_command_interface.comms.write_line.assert_called_once_with('AT+CGMM') + +def test_get_mfw_version(at_command_interface): + """Test getting MFW version using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, 'mfw_nrf91x1_2.0.2') + mfw_version = at_command_interface.get_mfw_version() + assert mfw_version == 'mfw_nrf91x1_2.0.2' + at_command_interface.comms.write_line.assert_called_once_with('AT+CGMR') + +def test_get_attestation_token(at_command_interface): + """Test getting attestation token using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '%ATTESTTOKEN: "foo.bar"') + attestation_token = at_command_interface.get_attestation_token() + assert attestation_token == 'foo.bar' + at_command_interface.comms.write_line.assert_called_once_with('AT%ATTESTTOKEN') + +def test_enable_error_codes(at_command_interface): + """Test enabling error codes using ATCommandInterface""" + at_command_interface.comms.write_line = Mock() + at_command_interface.comms.expect_response.return_value = (True, '') + at_command_interface.enable_error_codes() + at_command_interface.comms.write_line.assert_called_once_with('AT+CMEE=1') + +class MockCommsAT: + """Mock comms for ATCommandInterface""" + def write_line(self, line): + self.last_written = line + + def expect_response(self, ok_str=None, error_str=None, store_str=None, timeout=15, suppress_errors=False): + if self.last_written == 'AT+CGSN': + return True, '123456789012345' + else: + return False, '' + +def test_detect_shell_mode_at(at_command_interface): + """Test detecting shell mode using ATCommandInterface (AT Host)""" + at_command_interface.comms = MockCommsAT() + at_command_interface.detect_shell_mode() + assert at_command_interface.shell == False + +class MockCommsATShell: + """Mock comms for TLSCredShellInterface""" + def write_line(self, line): + self.last_written = line + + def expect_response(self, ok_str=None, error_str=None, store_str=None, timeout=15, suppress_errors=False): + if self.last_written == 'at AT+CGSN': + return True, '123456789012345' + else: + return False, '' + +def test_detect_shell_mode_shell(at_command_interface): + """Test detecting shell mode using ATCommandInterface (AT Shell)""" + at_command_interface.comms = MockCommsATShell() + at_command_interface.detect_shell_mode() + assert at_command_interface.shell == True diff --git a/tests/test_comms.py b/tests/test_comms.py new file mode 100644 index 0000000..0021ead --- /dev/null +++ b/tests/test_comms.py @@ -0,0 +1,454 @@ +from unittest.mock import patch, Mock +from collections import namedtuple +import pytest + +from nrfcredstore.comms import ( + get_connected_nordic_boards, + select_jlink, + select_device_by_serial, + select_device, + Comms, +) + +Port = namedtuple("Port", ["hwid", "device"]) + +list_ports_mac = [ + Port("n/a", "/dev/cu.debug-console"), + Port("n/a", "/dev/cu.Bluetooth-Incoming-Port"), + Port( + "USB VID:PID=1366:1069 SER=001051202135 LOCATION=1-1.4.4", + "/dev/cu.usbmodem0010512021353", + ), + Port( + "USB VID:PID=1366:1069 SER=001051202135 LOCATION=1-1.4.4", + "/dev/cu.usbmodem0010512021351", + ), +] + +list_ports_windows = [ + Port("ACPI\\PNP0501\\1", "COM1"), + Port("USB VID:PID=1915:910A SER=THINGY91X_F39CC1B120C LOCATION=1-21:x.1", "COM35"), + Port("USB VID:PID=1915:910A SER=THINGY91X_F39CC1B120C LOCATION=1-21:x.4", "COM36"), +] + +list_ports_linux_one = [ + Port("n/a", "/dev/ttyS0"), + Port("n/a", "/dev/ttyS1"), + Port( + "USB VID:PID=1915:910A SER=THINGY91X_F39CC1B120C LOCATION=3-12.1.3.2.1.4:1.1", + "/dev/ttyACM0", + ), + Port( + "USB VID:PID=1915:910A SER=THINGY91X_F39CC1B120C LOCATION=3-12.1.3.2.1.4:1.4", + "/dev/ttyACM1", + ), +] + +list_ports_linux_multi = [ + Port("n/a", "/dev/ttyS0"), + Port("n/a", "/dev/ttyS1"), + Port( + "USB VID:PID=1915:910A SER=THINGY91X_F39CC1B120C LOCATION=3-12.1.3.2.1.4:1.1", + "/dev/ttyACM0", + ), + Port( + "USB VID:PID=1915:910A SER=THINGY91X_F39CC1B120C LOCATION=3-12.1.3.2.1.4:1.4", + "/dev/ttyACM1", + ), + Port( + "USB VID:PID=1366:1069 SER=001051202135 LOCATION=3-12.1.3.2.4:1.0", + "/dev/ttyACM2", + ), + Port( + "USB VID:PID=1366:1069 SER=001051202135 LOCATION=3-12.1.3.2.4:1.2", + "/dev/ttyACM3", + ), + Port( + "USB VID:PID=1366:1051 SER=001050760093 LOCATION=3-12.1.3.2.4:1.0", + "/dev/ttyACM4", + ), + Port( + "USB VID:PID=1366:1051 SER=001050760093 LOCATION=3-12.1.3.2.4:1.2", + "/dev/ttyACM5", + ), +] + +list_ports_linux_jlink = [ + Port("n/a", "/dev/ttyS0"), + Port("n/a", "/dev/ttyS1"), + Port( + "USB VID:PID=1366:0105 SER=000821001234 LOCATION=3-12.1.3.2.4:1.0", + "/dev/ttyACM0", + ), +] + +list_ports_linux_mixed = [ + Port("n/a", "/dev/ttyS0"), + Port("n/a", "/dev/ttyS1"), + Port( + "USB VID:PID=1366:0105 SER=000821001234 LOCATION=3-12.1.3.2.4:1.0", + "/dev/ttyACM0", + ), + Port( + "USB VID:PID=1366:1051 SER=001050760093 LOCATION=3-12.1.3.2.4:1.0", + "/dev/ttyACM1", + ), + Port( + "USB VID:PID=1366:1051 SER=001050760093 LOCATION=3-12.1.3.2.4:1.2", + "/dev/ttyACM2", + ), +] + +list_ports_linux_jlink_multi_com = [ + Port("n/a", "/dev/ttyS0"), + Port("n/a", "/dev/ttyS1"), + Port( + "USB VID:PID=1366:0105 SER=000821001234 LOCATION=3-12.1.3.2.4:1.0", + "/dev/ttyACM0", + ), + Port( + "USB VID:PID=1366:0105 SER=000821001234 LOCATION=3-12.1.3.2.4:1.2", + "/dev/ttyACM1", + ), + Port( + "USB VID:PID=1366:0105 SER=000821001234 LOCATION=3-12.1.3.2.4:1.4", + "/dev/ttyACM2", + ), +] + + +@pytest.fixture +def platform_darwin(): + with patch("nrfcredstore.comms.platform.system", autospec=True) as m: + m.return_value = "Darwin" + yield m + + +@pytest.fixture +def platform_linux(): + with patch("nrfcredstore.comms.platform.system", autospec=True) as m: + m.return_value = "Linux" + yield m + + +@pytest.fixture +def platform_windows(): + with patch("nrfcredstore.comms.platform.system", autospec=True) as m: + m.return_value = "Windows" + yield m + + +@pytest.fixture +def ports_mac(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_mac + yield m + + +@pytest.fixture +def ports_windows(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_windows + yield m + + +@pytest.fixture +def ports_linux_one(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_linux_one + yield m + + +@pytest.fixture +def ports_linux_multi(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_linux_multi + yield m + + +@pytest.fixture +def ports_linux_jlink(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_linux_jlink + yield m + + +@pytest.fixture +def ports_linux_mixed(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_linux_mixed + yield m + + +@pytest.fixture +def ports_empty(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = [] + yield m + + +@pytest.fixture +def ports_linux_jlink_multi_com(): + with patch("nrfcredstore.comms.list_ports", autospec=True) as m: + m.comports.return_value = list_ports_linux_jlink_multi_com + yield m + +@pytest.fixture +def mock_serial(): + """Mock the serial.Serial class.""" + with patch("nrfcredstore.comms.serial.Serial", autospec=True) as mock_serial: + yield mock_serial + +# Tests for get_connected_nordic_boards + + +def test_mac(platform_darwin, ports_mac): + boards = get_connected_nordic_boards() + assert len(boards) == 1 + name, serial, port = boards[0] + assert port.device == "/dev/cu.usbmodem0010512021351" + assert serial == 1051202135 + assert name == "nRF9151-DK" + + +def test_windows(platform_windows, ports_windows): + boards = get_connected_nordic_boards() + assert len(boards) == 1 + name, serial, port = boards[0] + assert port.device == "COM35" + assert serial == "THINGY91X_F39CC1B120C" + assert name == "Thingy:91 X" + + +def test_linux_one(platform_linux, ports_linux_one): + boards = get_connected_nordic_boards() + assert len(boards) == 1 + name, serial, port = boards[0] + assert port.device == "/dev/ttyACM0" + assert serial == "THINGY91X_F39CC1B120C" + assert name == "Thingy:91 X" + + +def test_linux_multi(platform_linux, ports_linux_multi): + boards = get_connected_nordic_boards() + assert len(boards) == 3 + assert "Thingy:91 X" in [name for name, _, _ in boards] + assert "nRF9151-DK" in [name for name, _, _ in boards] + assert "nRF7002-DK" in [name for name, _, _ in boards] + + for name, serial, port in boards: + if name == "Thingy:91 X": + assert port.device == "/dev/ttyACM0" + assert serial == "THINGY91X_F39CC1B120C" + elif name == "nRF9151-DK": + assert port.device == "/dev/ttyACM2" + assert serial == 1051202135 + elif name == "nRF7002-DK": + assert port.device == "/dev/ttyACM5" + assert serial == 1050760093 + + +def test_linux_jlink(platform_linux, ports_linux_jlink): + boards = get_connected_nordic_boards() + assert len(boards) == 0 + + +# Tests for select_jlink + + +def test_select_jlink_no_jlinks(): + with pytest.raises(Exception, match="No J-Link device found"): + select_jlink(jlinks=[], list_all=True) + + +def test_select_jlink_one_jlink(): + selected = select_jlink(jlinks=[1051202135], list_all=True) + assert selected == 1051202135 + + +def test_select_jlink_multiple_jlinks(): + jlinks = [1051202135, 1050760093] + with patch( + "nrfcredstore.comms.inquirer.prompt", return_value={"serial": 1051202135} + ): + selected = select_jlink(jlinks=jlinks, list_all=True) + assert selected == 1051202135 + + +# note: is it important to align the mocked ports and the jlink list? +def test_select_jlink_multiple_non_nordic_jlinks(platform_linux, ports_linux_jlink): + with pytest.raises(Exception, match="No J-Link device found"): + select_jlink(jlinks=[821001234, 821001235], list_all=False) + + +def test_select_jlink_mixed_jlinks(platform_linux, ports_linux_mixed): + selected = select_jlink(jlinks=[821001234, 1050760093], list_all=False) + assert selected == 1050760093 + + +# Tests for select_device_by_serial + + +def test_select_device_by_serial_no_devices(ports_empty): + with pytest.raises(Exception, match="No device found with serial 821001234"): + select_device_by_serial(serial_number=821001234, list_all=False) + + +def test_select_device_by_serial_no_matching_device(platform_linux, ports_linux_one): + with pytest.raises(Exception, match="No device found with serial 999999999"): + select_device_by_serial(serial_number=999999999, list_all=False) + + +def test_select_device_by_serial_one_matching_device(platform_linux, ports_linux_one): + serial_number_requested = "THINGY91X_F39CC1B120C" + selected_port, serial_number = select_device_by_serial( + serial_number="THINGY91X_F39CC1B120C", list_all=False + ) + assert selected_port.device == "/dev/ttyACM0" + assert serial_number == serial_number_requested + + +def test_select_device_by_serial_multiple_matching_ports( + platform_linux, ports_linux_jlink_multi_com +): + serial_number_requested = 821001234 + port_to_select = list_ports_linux_jlink_multi_com[2] + with patch( + "nrfcredstore.comms.inquirer.prompt", + return_value={"port": port_to_select}, + ): + selected_port, serial_number = select_device_by_serial( + serial_number=serial_number_requested, list_all=True + ) + assert selected_port.device == port_to_select.device + assert serial_number == serial_number_requested + +# Tests for select_device + +# rtt true, no serial number +def test_select_device_rtt_true_no_serial_number(platform_linux, ports_linux_multi): + with patch("nrfcredstore.comms.get_connected_jlinks", return_value=[1051202135, 1050760093]): + with patch("nrfcredstore.comms.inquirer.prompt", return_value={"serial": 1051202135}): + _, serial_number = select_device( + rtt=True, serial_number=None, port=None, list_all=True + ) + assert serial_number == 1051202135 + +# rtt true, serial number, device found +def test_select_device_rtt_true_serial_number_found(platform_linux, ports_linux_multi): + with patch("nrfcredstore.comms.get_connected_jlinks", return_value=[1051202135, 1050760093]): + _, serial_number = select_device( + rtt=True, serial_number=1050760093, port=None, list_all=False + ) + assert serial_number == 1050760093 + +# rtt true, serial number, device not found +def test_select_device_rtt_true_serial_number_not_found(platform_linux, ports_linux_multi): + with patch("nrfcredstore.comms.get_connected_jlinks", return_value=[1051202135, 1050760093]): + with pytest.raises(Exception, match="No device found with serial 999999999"): + _, serial_number = select_device( + rtt=True, serial_number=999999999, port=None, list_all=False + ) + +# port given +def test_select_device_port_given(platform_linux, ports_linux_multi): + port, serial_number = select_device( + rtt=False, serial_number=None, port="/dev/ttyACM0", list_all=False + ) + assert port.device == "/dev/ttyACM0" + assert serial_number == "THINGY91X_F39CC1B120C" + + +# serial number given +def test_select_device_serial_number_given(platform_linux, ports_linux_multi): + port, serial_number = select_device( + rtt=False, serial_number="THINGY91X_F39CC1B120C", port=None, list_all=False + ) + assert port.device == "/dev/ttyACM0" + assert serial_number == "THINGY91X_F39CC1B120C" + +# list all +def test_select_device_list_all(platform_linux, ports_linux_multi): + port_to_select = list_ports_linux_jlink_multi_com[2] + with patch("nrfcredstore.comms.inquirer.prompt", return_value={"port": port_to_select}): + port, serial_number = select_device( + rtt=False, serial_number=None, port=None, list_all=True + ) + assert port.device == "/dev/ttyACM0" + assert serial_number == 821001234 + +# no list-all, no devices +def test_select_device_no_list_all_no_devices(platform_linux, ports_empty): + with pytest.raises(Exception, match="No device found"): + select_device( + rtt=False, serial_number=None, port=None, list_all=False + ) + +# no list-all, one device +def test_select_device_no_list_all_one_device(platform_linux, ports_linux_one): + port, serial_number = select_device( + rtt=False, serial_number=None, port=None, list_all=False + ) + assert port.device == "/dev/ttyACM0" + assert serial_number == "THINGY91X_F39CC1B120C" + +# no list-all, more devices +def test_select_device_no_list_all_more_devices( + platform_linux, ports_linux_multi +): + port_to_select = list_ports_linux_multi[2] + with patch("nrfcredstore.comms.inquirer.prompt", return_value={"port": port_to_select}): + port, serial_number = select_device( + rtt=False, serial_number=None, port=None, list_all=False + ) + assert port.device == port_to_select.device + assert serial_number == "THINGY91X_F39CC1B120C" + +# tests for expect_response + +def test_expect_response_ok(mock_serial): + with patch("nrfcredstore.comms.select_device", return_value=(Mock(), "123456789")) as mock_select: + comms = Comms() + comms.read_line = Mock(return_value="OK") + result, output = comms.expect_response("OK", "ERROR") + mock_select.assert_called_once() + assert result is True + assert output == '' + +def test_expect_response_error(mock_serial): + with patch("nrfcredstore.comms.select_device", return_value=(Mock(), "123456789")) as mock_select: + comms = Comms() + comms.read_line = Mock(return_value="ERROR") + result, output = comms.expect_response("OK", "ERROR") + mock_select.assert_called_once() + assert result is False + assert output == '' + +def test_expect_response_error_not_allowed(mock_serial): + with patch("nrfcredstore.comms.select_device", return_value=(Mock(), "123456789")) as mock_select: + with patch("nrfcredstore.comms.logging.error") as mock_error: + comms = Comms() + comms.read_line = Mock(return_value="+CME ERROR: 514") + result, output = comms.expect_response("OK", "ERROR") + mock_select.assert_called_once() + assert result is False + assert output == '' + mock_error.assert_called_once_with("AT command error: Not allowed") + +def test_expect_response_timeout(mock_serial): + with patch("nrfcredstore.comms.select_device", return_value=(Mock(), "123456789")) as mock_select: + comms = Comms() + comms.read_line = Mock(return_value="") + result, output = comms.expect_response("OK", "ERROR", timeout=1) + mock_select.assert_called_once() + assert result is False + assert output == '' + +def test_expect_response_store(mock_serial): + with patch("nrfcredstore.comms.select_device", return_value=(Mock(), "123456789")) as mock_select: + comms = Comms() + comms.read_line = Mock(side_effect=['%ATTESTTOKEN: "foo.bar"', "OK"]) + result, output = comms.expect_response("OK", "ERROR", "%ATTESTTOKEN: ") + mock_select.assert_called_once() + assert result is True + assert output.strip() == '%ATTESTTOKEN: "foo.bar"' diff --git a/tests/test_credstore.py b/tests/test_credstore.py index 5a320a7..4bc4fae 100644 --- a/tests/test_credstore.py +++ b/tests/test_credstore.py @@ -1,7 +1,7 @@ import io import pytest -from unittest.mock import Mock +from unittest.mock import Mock, patch from nrfcredstore.credstore import * from nrfcredstore.exceptions import ATCommandError @@ -15,64 +15,71 @@ class TestCredStore: @pytest.fixture def cred_store(self): - self.at_client = Mock() - return CredStore(self.at_client) + self.command_interface = Mock() + return CredStore(self.command_interface) @pytest.fixture def list_all_resp(self, cred_store): - cred_store.at_client.at_command.return_value = [ - '%CMNG: 12345678, 0, "978C...02C4"', - '%CMNG: 567890, 1, "C485...CF09"' - ] + cred_store.command_interface.comms.expect_response.return_value = ( + True, + "\r\f".join([ + '%CMNG: 12345678, 0, "978C...02C4"', + '%CMNG: 567890, 1, "C485...CF09"' + ]) + ) @pytest.fixture def list_all_resp_blank_lines(self, cred_store): - cred_store.at_client.at_command.return_value = [ - '', - '%CMNG: 12345678, 0, "978C...02C4"', - '%CMNG: 567890, 1, "C485...CF09"', - '' - ] + cred_store.command_interface.comms.expect_response.return_value = ( + True, + "\r\f".join([ + '', + '%CMNG: 12345678, 0, "978C...02C4"', + '%CMNG: 567890, 1, "C485...CF09"', + '' + ]) + ) @pytest.fixture def ok_resp(self, cred_store): - cred_store.at_client.at_command.return_value = [] + cred_store.command_interface.at_command.return_value = True @pytest.fixture def csr_resp(self, cred_store): # KEYGEN value is base64-encoded 'foo' and base64-encoded 'bar' joined by '.' - cred_store.at_client.at_command.return_value = ['', '%KEYGEN: "Zm9v.YmFy"'] + cred_store.command_interface.get_csr.return_value = "Zm9v.YmFy" @pytest.fixture def at_error(self, cred_store): - cred_store.at_client.at_command.side_effect = ATCommandError('') + cred_store.command_interface.at_command.return_value = False - def test_exposes_at_client(self, cred_store): - assert cred_store.at_client is self.at_client + @pytest.fixture + def at_error_in_expect_response(self, cred_store): + cred_store.command_interface.comms.expect_response.return_value = (False, "") + + def test_exposes_command_interface(self, cred_store): + assert cred_store.command_interface is self.command_interface def test_func_mode_offline(self, cred_store): cred_store.func_mode(4) - self.at_client.at_command.assert_called_with('AT+CFUN=4') + self.command_interface.at_command.assert_called_with('AT+CFUN=4', wait_for_result=True) def test_func_mode_min(self, cred_store): cred_store.func_mode(0) - self.at_client.at_command.assert_called_with('AT+CFUN=0') - - def test_func_mode_fail(self, cred_store, at_error): - with pytest.raises(ATCommandError): - cred_store.func_mode(4) + self.command_interface.at_command.assert_called_with('AT+CFUN=0', wait_for_result=True) def test_list_sends_cmng_command(self, cred_store, list_all_resp): cred_store.list() - self.at_client.at_command.assert_called_with('AT%CMNG=1') + self.command_interface.at_command.assert_called_with('AT%CMNG=1', wait_for_result=False) + self.command_interface.comms.expect_response.assert_called_with("OK", "ERROR", "%CMNG: ") def test_list_with_tag_part_of_cmng(self, cred_store, list_all_resp): cred_store.list(12345678) - self.at_client.at_command.assert_called_with('AT%CMNG=1,12345678') + self.command_interface.at_command.assert_called_with('AT%CMNG=1,12345678', wait_for_result=False) def test_list_with_tag_and_type_part_of_cmng(self, cred_store, list_all_resp): cred_store.list(12345678, CredType(0)) - self.at_client.at_command.assert_called_with('AT%CMNG=1,12345678,0') + self.command_interface.at_command.assert_called_with('AT%CMNG=1,12345678,0', wait_for_result=False) def test_list_credentials_contains_tag(self, cred_store, list_all_resp): first = cred_store.list()[0] @@ -102,17 +109,16 @@ def test_list_type_without_tag(self, cred_store): with pytest.raises(RuntimeError): cred_store.list(None, CredType(0)) - def test_list_fail(self, cred_store, at_error): - with pytest.raises(ATCommandError): + def test_list_fail(self, cred_store, at_error_in_expect_response): + with pytest.raises(RuntimeError): cred_store.list() def test_delete_success(self, cred_store, ok_resp): - response = cred_store.delete(567890, CredType(1)) - assert len(response) == 0 - self.at_client.at_command.assert_called_with('AT%CMNG=3,567890,1') + cred_store.delete(567890, CredType(1)) + self.command_interface.at_command.assert_called_with('AT%CMNG=3,567890,1', wait_for_result=True) def test_delete_fail(self, cred_store, at_error): - with pytest.raises(ATCommandError): + with pytest.raises(RuntimeError): cred_store.delete(123, CredType(1)) def test_delete_any_type_fail(self, cred_store): @@ -124,12 +130,11 @@ def test_write_success(self, cred_store, ok_resp): dGVzdA== -----END CERTIFICATE-----''' fake_file = io.StringIO(cert_text) - response = cred_store.write(567890, CredType.CLIENT_KEY, fake_file) - assert len(response) == 0 - self.at_client.at_command.assert_called_with(f'AT%CMNG=0,567890,2,"{cert_text}"') + cred_store.write(567890, CredType.CLIENT_KEY, fake_file) + self.command_interface.at_command.assert_called_with(f'AT%CMNG=0,567890,2,"{cert_text}"', wait_for_result=True) def test_write_fail(self, cred_store, at_error): - with pytest.raises(ATCommandError): + with pytest.raises(RuntimeError): cred_store.write(567890, CredType.CLIENT_KEY, io.StringIO()) def test_write_any_type_fail(self, cred_store): @@ -139,18 +144,21 @@ def test_write_any_type_fail(self, cred_store): def test_generate_sends_keygen_cmd(self, cred_store, csr_resp): fake_binary_file = Mock() cred_store.keygen(12345678, fake_binary_file) - self.at_client.at_command.assert_called_with(f'AT%KEYGEN=12345678,2,0') + self.command_interface.get_csr.assert_called_with(sectag=12345678, attributes='') def test_generate_with_attributes(self, cred_store, csr_resp): cred_store.keygen(12345678, Mock(), 'O=Nordic Semiconductor,L=Trondheim,C=no,CN=mydevice') - self.at_client.at_command.assert_called_with( - f'AT%KEYGEN=12345678,2,0,"O=Nordic Semiconductor,L=Trondheim,C=no,CN=mydevice"') + self.command_interface.get_csr.assert_called_with( + sectag=12345678, + attributes="O=Nordic Semiconductor,L=Trondheim,C=no,CN=mydevice" + ) def test_generate_writes_csr_to_stream(self, cred_store, csr_resp): fake_binary_file = Mock() cred_store.keygen(12345678, fake_binary_file) fake_binary_file.write.assert_called_with(b'foo') - def test_generate_fail(self, cred_store, at_error): - with pytest.raises(ATCommandError): - cred_store.keygen(12345678, Mock()) + def test_generate_fail(self, cred_store): + with patch.object(cred_store.command_interface, 'get_csr', return_value=None): + with pytest.raises(RuntimeError): + cred_store.keygen(12345678, Mock())