diff --git a/CHANGES.rst b/CHANGES.rst index 4a031d0f8..06462301c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -10,6 +10,7 @@ New Features in 23.1 - A new log level called ``CONSOLE`` has been added between the default ``INFO`` and ``DEBUG`` levels. This level will show all reads and writes made to the serial console during testing. +- Add JLinkDriver and JLinkDevice resource. Bug fixes in 23.1 ~~~~~~~~~~~~~~~~~ @@ -447,7 +448,7 @@ New and Updated Drivers - The `SerialDriver` now supports using plain TCP instead of RFC 2217, which is needed from some console servers. - The `ShellDriver` has been improved: - + - It supports configuring the various timeouts used during the login process. - It can use xmodem to transfer file from and to the target. diff --git a/doc/configuration.rst b/doc/configuration.rst index 553418d03..409133ed0 100644 --- a/doc/configuration.rst +++ b/doc/configuration.rst @@ -783,6 +783,26 @@ Arguments: Used by: - `OpenOCDDriver`_ +JLinkDevice +~~~~~~~~~~~ +A JLinkDevice resource describes a Segger J-Link Debug Probe. + +.. code-block:: yaml + + JLinkDevice: + match: + ID_SERIAL_SHORT: '000000123456' + +- match (dict): key and value for a udev match, see `udev Matching`_ + +Used by: + - `JLinkDriver`_ + +NetworkJLinkDevice +~~~~~~~~~~~~~~~~~~~~~ + +A :any:`NetworkJLinkDevice` resource describes a `JLinkDevice`_ available on a remote computer. + SNMPEthernetPort ~~~~~~~~~~~~~~~~ A SNMPEthernetPort resource describes a port on an Ethernet switch, which is @@ -1789,6 +1809,21 @@ Arguments: - board_config (str): optional, board config in the ``openocd/scripts/board/`` directory - load_commands (list of str): optional, load commands to use instead of ``init``, ``bootstrap {filename}``, ``shutdown`` +JLinkDriver +~~~~~~~~~~~~~ +A JLinkDriver provides access to a Segger J-Link Debug Probe via `pylink `_. + +Binds to: + interface: + - `JLinkDevice`_ + - `NetworkJLinkDevice`_ + +Implements: + - None + +Arguments: + - None + QuartusHPSDriver ~~~~~~~~~~~~~~~~ A QuartusHPSDriver controls the "Quartus Prime Programmer and Tools" to flash diff --git a/labgrid/driver/__init__.py b/labgrid/driver/__init__.py index e16d0529a..46bd6e84c 100644 --- a/labgrid/driver/__init__.py +++ b/labgrid/driver/__init__.py @@ -47,3 +47,4 @@ from .deditecrelaisdriver import DeditecRelaisDriver from .dediprogflashdriver import DediprogFlashDriver from .httpdigitaloutput import HttpDigitalOutputDriver +from .jlinkdriver import JLinkDriver diff --git a/labgrid/driver/jlinkdriver.py b/labgrid/driver/jlinkdriver.py new file mode 100644 index 000000000..2969d3c05 --- /dev/null +++ b/labgrid/driver/jlinkdriver.py @@ -0,0 +1,43 @@ +from importlib import import_module +import socket +import attr + +from ..factory import target_factory +from ..resource.remote import NetworkJLinkDevice +from ..util.proxy import proxymanager +from .common import Driver + +@target_factory.reg_driver +@attr.s(eq=False) +class JLinkDriver(Driver): + bindings = {"jlink_device": {"JLinkDevice", "NetworkJLinkDevice"}, } + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self._module = import_module('pylink') + self.jlink = None + + def on_activate(self): + self.jlink = self._module.JLink() + + if isinstance(self.jlink_device, NetworkJLinkDevice): + # we can only forward if the backend knows which port to use + host, port = proxymanager.get_host_and_port(self.jlink_device) + # The J-Link client software does not support host names + ip_addr = socket.gethostbyname(host) + + # Workaround for Debian's /etc/hosts entry + # https://www.debian.org/doc/manuals/debian-reference/ch05.en.html#_the_hostname_resolution + if ip_addr == "127.0.1.1": + ip_addr = "127.0.0.1" + self.jlink.open(ip_addr=f"{ip_addr}:{port}") + else: + self.jlink.open(serial_no=self.jlink_device.serial) + + def on_deactivate(self): + self.jlink.close() + self.jlink = None + + @Driver.check_active + def get_interface(self): + return self.jlink diff --git a/labgrid/remote/exporter.py b/labgrid/remote/exporter.py index a30b1afe1..cfbdb6df2 100755 --- a/labgrid/remote/exporter.py +++ b/labgrid/remote/exporter.py @@ -14,12 +14,14 @@ from pathlib import Path from typing import Dict, Type from socket import gethostname, getfqdn +from pexpect import TIMEOUT import attr from autobahn.asyncio.wamp import ApplicationRunner, ApplicationSession from .config import ResourceConfig from .common import ResourceEntry, enable_tcp_nodelay from ..util import get_free_port, labgrid_version +from ..util import Timeout __version__ = labgrid_version() @@ -500,6 +502,80 @@ def __attrs_post_init__(self): super().__attrs_post_init__() self.data['cls'] = f"Remote{self.cls}".replace("Network", "") +class USBJLinkExport(USBGenericExport): + """Export J-Link device using the J-Link Remote Server""" + + def __attrs_post_init__(self): + super().__attrs_post_init__() + self.child = None + self.port = None + self.tool = '/opt/SEGGER/JLink/JLinkRemoteServer' + + def _get_params(self): + """Helper function to return parameters""" + return { + 'host': self.host, + 'port': self.port, + 'busnum': self.local.busnum, + 'devnum': self.local.devnum, + 'path': self.local.path, + 'vendor_id': self.local.vendor_id, + 'model_id': self.local.model_id, + } + + def __del__(self): + if self.child is not None: + self.stop() + + def _start(self, start_params): + """Start ``JLinkRemoteServer`` subprocess""" + assert self.local.avail + assert self.child is None + self.port = get_free_port() + + cmd = [ + self.tool, + "-Port", + f"{self.port}", + "-select", + f"USB={self.local.serial}", + ] + self.logger.info("Starting JLinkRemoteServer with: %s", " ".join(cmd)) + self.child = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True) + + # Wait for the server to be ready for incoming connections + # Waiting to open the socket with Python does not work + timeout = Timeout(10.0) + while not timeout.expired: + line = self.child.stdout.readline().rstrip() + self.logger.debug(line) + if "Waiting for client connections..." in line: + break + + if timeout.expired: + raise TIMEOUT( + f"Timeout of {timeout.timeout} seconds exceeded during waiting for J-Link Remote Server startup" + ) + self.logger.info("started JLinkRemoteServer for %s on port %d", self.local.serial, self.port) + + def _stop(self, start_params): + """Stop ``JLinkRemoteServer`` subprocess""" + assert self.child + child = self.child + self.child = None + port = self.port + self.port = None + child.terminate() + try: + child.communicate(2.0) # JLinkRemoteServer takes about a second to react + except subprocess.TimeoutExpired: + self.logger.warning("JLinkRemoteServer for %s still running after SIGTERM", self.local.serial) + log_subprocess_kernel_stack(self.logger, child) + child.kill() + child.communicate(1.0) + self.logger.info("stopped JLinkRemoteServer for %s on port %d", self.local.serial, port) + + exports["AndroidFastboot"] = USBGenericExport exports["AndroidUSBFastboot"] = USBGenericRemoteExport exports["DFUDevice"] = USBGenericExport @@ -512,6 +588,7 @@ def __attrs_post_init__(self): exports["USBSDMuxDevice"] = USBSDMuxExport exports["USBSDWireDevice"] = USBSDWireExport exports["USBDebugger"] = USBGenericExport +exports["JLinkDevice"] = USBJLinkExport exports["USBMassStorage"] = USBGenericExport exports["USBVideo"] = USBGenericExport diff --git a/labgrid/resource/remote.py b/labgrid/resource/remote.py index b83d6c3cc..50bee7747 100644 --- a/labgrid/resource/remote.py +++ b/labgrid/resource/remote.py @@ -308,6 +308,18 @@ def __attrs_post_init__(self): super().__attrs_post_init__() +@target_factory.reg_resource +@attr.s(eq=False) +class NetworkJLinkDevice(RemoteUSBResource): + """The NetworkJLinkDevice describes a remotely accessible USB Segger J-Link device""" + + port = attr.ib(validator=attr.validators.optional(attr.validators.instance_of(int))) + + def __attrs_post_init__(self): + self.timeout = 10.0 + super().__attrs_post_init__() + + @target_factory.reg_resource @attr.s(eq=False) class NetworkDeditecRelais8(RemoteUSBResource): diff --git a/labgrid/resource/suggest.py b/labgrid/resource/suggest.py index 2b9e862ad..d8c6f2d6a 100644 --- a/labgrid/resource/suggest.py +++ b/labgrid/resource/suggest.py @@ -23,6 +23,7 @@ HIDRelay, USBDebugger, USBPowerPort, + JLinkDevice, ) from ..util import dump @@ -56,6 +57,7 @@ def __init__(self, args): self.resources.append(HIDRelay(**args)) self.resources.append(USBDebugger(**args)) self.resources.append(USBPowerPort(**args, index=0)) + self.resources.append(JLinkDevice(**args)) def suggest_callback(self, resource, meta, suggestions): cls = type(resource).__name__ diff --git a/labgrid/resource/udev.py b/labgrid/resource/udev.py index 5e53f256c..8bc16127d 100644 --- a/labgrid/resource/udev.py +++ b/labgrid/resource/udev.py @@ -708,3 +708,19 @@ def filter_match(self, device): return False return super().filter_match(device) + +@target_factory.reg_resource +@attr.s(eq=False) +class JLinkDevice(USBResource): + """The JLinkDevice describes an attached Segger J-Link device, + it is identified via USB using udev + """ + + def __attrs_post_init__(self): + self.match["ID_VENDOR_ID"] = "1366" + self.match["ID_MODEL_ID"] = "0101" + super().__attrs_post_init__() + + def update(self): + super().update() + self.serial = self.device.properties.get('ID_SERIAL_SHORT') diff --git a/pyproject.toml b/pyproject.toml index 2e80ccf46..d508ae503 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ doc = [ ] docker = ["docker>=5.0.2"] graph = ["graphviz>=0.17.0"] +jlink = ["pylink-square>=1.0.0"] kasa = ["python-kasa>=0.4.0"] modbus = ["pyModbusTCP>=0.1.10"] modbusrtu = ["minimalmodbus>=1.0.2"] @@ -91,6 +92,9 @@ dev = [ # labgrid[graph] "graphviz>=0.17.0", + # labgrid[jlink] + "pylink-square>=1.0.0", + # labgrid[kasa] "python-kasa>=0.4.0", diff --git a/tests/test_jlink.py b/tests/test_jlink.py new file mode 100644 index 000000000..bd1fe6603 --- /dev/null +++ b/tests/test_jlink.py @@ -0,0 +1,104 @@ +import contextlib +import io +import pytest +import subprocess +from unittest.mock import MagicMock +from unittest.mock import Mock +from unittest.mock import patch + +from labgrid.remote.exporter import USBJLinkExport +from labgrid.resource.remote import NetworkJLinkDevice +from labgrid.resource.udev import JLinkDevice +from labgrid.driver.jlinkdriver import JLinkDriver + +FAKE_SERIAL = 123456789 +MATCH = {"ID_SERIAL_SHORT": f"000{FAKE_SERIAL}"} + + +class Popen_mock(): + """Mock of Popen object to mimmic JLinkRemoteServer output""" + + def __init__(self, args, **kwargs): + assert "JLinkRemoteServer" in args[0] + assert args[1] == "-Port" + # Since args[2] is dynamic do not check it + assert args[3] == "-select" + assert args[4] == f"USB={FAKE_SERIAL}" + self.wait_called = False + + stdout = io.StringIO( + "SEGGER J-Link Remote Server V7.84a\n" + "Compiled Dec 22 2022 16:13:52\n" + "\n" + "'q' to quit '?' for help\n" + "\n" + f"Connected to J-Link with S/N {FAKE_SERIAL}\n" + "\n" + "Waiting for client connections...\n" + ) + + def communicate(self, input=None, timeout=None): + # Only timeout on the first call to exercise the error handling code. + if not self.wait_called: + self.wait_called = True + raise subprocess.TimeoutExpired("JLinkRemoteServer", timeout) + + def kill(self): + pass + + def poll(self): + return 0 + + def terminate(self): + pass + + +def test_jlink_resource(target): + r = JLinkDevice(target, name=None, match=MATCH) + + +@patch('subprocess.Popen', Popen_mock) +def test_jlink_export_start(target): + config = {'avail': True, 'cls': "JLinkDevice", 'params': {'match': MATCH}, } + e = USBJLinkExport(config) + e.local.avail = True + e.local.serial = FAKE_SERIAL + + e.start() + # Exercise the __del__ method which also exercises stop() + del e + + +@patch('subprocess.Popen', Popen_mock) +def test_jlink_driver(target): + pytest.importorskip("pylink") + device = JLinkDevice(target, name=None, match=MATCH) + device.avail = True + device.serial = FAKE_SERIAL + driver = JLinkDriver(target, name=None) + + with patch('pylink.JLink') as JLinkMock: + instance = JLinkMock.return_value + target.activate(driver) + instance.open.assert_called_once_with(serial_no=FAKE_SERIAL) + intf = driver.get_interface() + assert(isinstance(intf, Mock)) + target.deactivate(driver) + instance.close.assert_called_once_with() + + +@patch('subprocess.Popen', Popen_mock) +def test_jlink_driver_network_device(target): + pytest.importorskip("pylink") + device = NetworkJLinkDevice(target, None, host='127.0.1.1', port=12345, busnum=0, devnum=1, path='0:1', vendor_id=0x0, model_id=0x0,) + device.avail = True + driver = JLinkDriver(target, name=None) + assert (isinstance(driver, JLinkDriver)) + + with patch('pylink.JLink') as JLinkMock: + instance = JLinkMock.return_value + # Call on_activate directly since activating the driver via the target does not work during testing + driver.on_activate() + instance.open.assert_called_once_with(ip_addr='127.0.0.1:12345') + driver.on_deactivate() + instance.close.assert_called_once_with()