diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 9437fb748..eb301407e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -1,32 +1,104 @@ name: Run Unit Tests -on: - push: - branches: - -'arm/**' - pull_request: - branches: - -'arm/**' + +on: push +# on: +# push: +# branches: +# -'arm/**' +# pull_request: +# branches: +# -'arm/**' jobs: - test_arm64: - if: github.repository == 'intrepidcs/python_ics' - name: Linux ARM64 unit tests - runs-on: [ self-hosted, Linux, ARM64, Hardware ] + # test_arm64: + # if: github.repository == 'Jonas-Harrison-ICS/python_ics' + # name: Linux ARM64 unit tests + # runs-on: [ self-hosted, Linux, ARM64, Hardware ] + # steps: + # - uses: actions/checkout@v4 + # with: + # submodules: recursive + # fetch-depth: 0 + # # needed for history/tags## + + # - name: Setup Python + # run: | + # python3 -m venv .venv + # source .venv/bin/activate + # python3 -m pip install --upgrade pip + # echo "----------------------------------------------" + # pip install -r requirements.txt + # pwd + # echo "----------------------------------------------" + # python setup.py build -g + # python setup.py install --force + # # pip install . + + # - name: Run unit tests + # run: | + # source .venv/bin/activate + # sudo setcap cap_net_admin,cap_net_raw+ep $(realpath $(which python3)) + # python3 -m unittest discover -s tests.runner --verbose + # pytest -n auto + + test_macos: + if: github.repository == 'Jonas-Harrison-ICS/python_ics' + name: MAC-OS ARM64 unit tests + runs-on: [ self-hosted, macOS, ARM64, Hardware ] steps: - uses: actions/checkout@v4 with: - submodules: recursive - fetch-depth: 0 # needed for history/tags + submodules: recursive + fetch-depth: 0 + # needed for history/tags## - name: Setup Python run: | - python -m venv .venv + python3 -m venv .venv source .venv/bin/activate - python -m pip install --upgrade pip - pip install . + python3 -m pip install --upgrade pip + echo "----------------------------------------------" + pip install -r requirements.txt + pwd + echo "----------------------------------------------" + python setup.py build -g + python setup.py install --force + # pip install . - name: Run unit tests run: | source .venv/bin/activate - sudo setcap cap_net_admin,cap_net_raw+ep $(realpath $(which python)) - python -m unittest discover -s tests.runner --verbose \ No newline at end of file + python3 -m unittest discover -s tests.runner --verbose + pytest -n auto + + test_windows: + if: github.repository == 'Jonas-Harrison-ICS/python_ics' + name: Windows unit tests + runs-on: [ self-hosted, Windows, x64, Hardware ] + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + fetch-depth: 0 + # needed for history/tags## + + - name: Setup Python + run: | + python -m venv .venv + .venv\Scripts\Activate.ps1 + Write-Output "---------------------------------------------- VIRTUAL ENV CREATED" + python -m pip install --upgrade pip + Write-Output "---------------------------------------------- PIP INSTALLED" + pip install -r requirements.txt + pwd + Write-Output "---------------------------------------------- INSTALLED REQUIREMENTS.TXT" + python setup.py build -g + Write-Output "---------------------------------------------- PYTHON SETUP BUILT" + python setup.py install --force + Write-Output "---------------------------------------------- PYTHON SETUP INSTALLED" + # pip install . + - name: Run unit tests + run: | + .venv\Scripts\Activate.ps1 + python -m unittest discover -s tests.runner --verbose + pytest -n auto diff --git a/build_libicsneo.py b/build_libicsneo.py index c2501f0cc..db133803e 100644 --- a/build_libicsneo.py +++ b/build_libicsneo.py @@ -1,3 +1,4 @@ +import platform import subprocess import multiprocessing import os @@ -50,33 +51,48 @@ def checkout(): def _build_libusb(): os.makedirs(LIBUSB_BUILD, exist_ok=True) env = os.environ.copy() - if sys.platform == "darwin": - env["CFLAGS"] = "-arch x86_64 -arch arm64 -mmacosx-version-min=10.13" - env["CXXFLAGS"] = "-arch x86_64 -arch arm64 -mmacosx-version-min=10.13" + if "DARWIN" in platform.system().upper(): + env["CFLAGS"] = "-arch arm64 -mmacosx-version-min=10.13" + env["CXXFLAGS"] = "-arch arm64 -mmacosx-version-min=10.13" + env["LDFLAGS"] = "-arch arm64 -mmacosx-version-min=10.13" + + # os.environ.update(env) + + # os.system('echo $CFLAGS') + # os.system('echo $CXXFLAGS') + # os.system('echo $LDFLAGS') + + print("CFLAGS:", env["CFLAGS"]) + print("CXXFLAGS:", env["CXXFLAGS"]) + print("LDFLAGS:", env["LDFLAGS"]) else: env["CFLAGS"] = "-fPIC" env["CXXFLAGS"] = "-fPIC" - subprocess.check_output([ + result = subprocess.run([ f"{LIBUSB_SOURCE}/configure", "--disable-shared", "--enable-static", + "--disable-dependency-tracking", f"--prefix={LIBUSB_INSTALL}", "--disable-udev" ], cwd=LIBUSB_BUILD, env=env) - subprocess.check_output(["make", "-j" + CPUS], cwd=LIBUSB_BUILD) - subprocess.check_output(["make", "install"], cwd=LIBUSB_BUILD) + print("stdout:", result.stdout) + print("stderr:", result.stderr) + + subprocess.run(["make", "-j" + CPUS], cwd=LIBUSB_BUILD) + subprocess.run(["make", "install"], cwd=LIBUSB_BUILD) def _build_libpcap(): os.makedirs(LIBPCAP_BUILD, exist_ok=True) env = os.environ.copy() - if sys.platform == "darwin": - env["CFLAGS"] = "-arch x86_64 -arch arm64 -mmacosx-version-min=10.13" - env["CXXFLAGS"] = "-arch x86_64 -arch arm64 -mmacosx-version-min=10.13" + if "DARWIN" in platform.system().upper(): + env["CFLAGS"] = "-arch arm64 -mmacosx-version-min=10.13" + env["CXXFLAGS"] = "-arch arm64 -mmacosx-version-min=10.13" else: env["CFLAGS"] = "-fPIC" env["CXXFLAGS"] = "-fPIC" - subprocess.check_output([ + subprocess.run([ f"{LIBPCAP_SOURCE}/configure", "--disable-shared", "--disable-usb", @@ -93,8 +109,8 @@ def _build_libpcap(): f"--prefix={LIBPCAP_INSTALL}", ], cwd=LIBPCAP_BUILD, env=env) - subprocess.check_output(["make", "-j" + CPUS], cwd=LIBPCAP_BUILD) - subprocess.check_output(["make", "install"], cwd=LIBPCAP_BUILD) + subprocess.run(["make", "-j" + CPUS], cwd=LIBPCAP_BUILD) + subprocess.run(["make", "install"], cwd=LIBPCAP_BUILD) def _build_libicsneo_linux(): print("Cleaning libicsneo...") @@ -119,11 +135,11 @@ def _build_libicsneo_linux(): ) def _build_libicsneo_macos(): - subprocess.check_output( + subprocess.run( [ "cmake", "-DCMAKE_BUILD_TYPE=Release", - "-DCMAKE_OSX_ARCHITECTURES=arm64;x86_64", + "-DCMAKE_OSX_ARCHITECTURES=arm64;arm64", "-DLIBICSNEO_BUILD_ICSNEOLEGACY=ON", "-DCMAKE_OSX_DEPLOYMENT_TARGET=10.13", f"-DCMAKE_PREFIX_PATH={LIBUSB_INSTALL};{LIBPCAP_INSTALL}", @@ -132,7 +148,7 @@ def _build_libicsneo_macos(): ] ) - subprocess.check_output( + subprocess.run( ["cmake", "--build", LIBICSNEO_BUILD, "--target", "icsneolegacy", "--parallel", CPUS] ) diff --git a/setup.py b/setup.py index 5b368800f..ae33ec9c8 100644 --- a/setup.py +++ b/setup.py @@ -85,7 +85,7 @@ def get_ics_extension_compiler_arguments() -> List[str]: compile_args = GCC_COMPILE_ARGS elif "DARWIN" in platform.system().upper(): # Mac doesn't respect the compiler args, but will append with ARCHFLAGS environment variable - os.environ["ARCHFLAGS"] += " -std=c++17" + os.environ["ARCHFLAGS"] = os.environ.get("ARCHFLAGS", "") + " -std=c++17" compile_args = [] else: compile_args = [] diff --git a/tests/runner/Readme.md b/tests/runner/Readme.md index 55328b6a2..9c0c888b0 100644 --- a/tests/runner/Readme.md +++ b/tests/runner/Readme.md @@ -3,12 +3,39 @@ Runner tests This directory is specifically for testing against hardware attached to the runner. -Each runner consists of the following hardware: - +Each runner consists of the following ICS hardware: - neoVI FIRE3 - neoVI FIRE2 - ValueCAN4-2 - - - RAD-Moon2 - RAD-Moon2 + +Runner platforms: +- Windows 10 +- MacOS (something? M1) +- Linux (Debian) +- Linux ARM (Raspberry PI) + +Hardware setup: +PC -> USB -> NIC -> ETH -> Fire3 -> CAN1/2 network v v + -> LIN network | | V + -> ETH network | | | V + -> Fire2 -> CAN1/2 network v | ^ | | + -> LIN network | | ^ | + -> ETH network | | ^ + -> VCAN42 -> CAN1/2 network ^ ^ + -> NIC -> ETH -> Moon2 -> 100/1000BASE-T1 network v + -> NIC -> ETH -> Moon2 -> 100/1000BASE-T1 network ^ + +Software setup: +- Both Moon2 USB NIC adapter properties must be set: + - "ASIX USB to Gigabit Ethernet Family Adapter" -> "Ethernet" (windows 10) + - "ASIX USB to Gigabit Ethernet Family Adapter #2" -> "Ethernet 2" (windows 10) + - IPV6 disabled + - IPV4 IP address: 192.168.69.10, 192.168.69.11 + - IPV4 subnet mask: 255.255.255.0 + +Hardware / software testing: +- Run all python ics functions on all hardware +- Verify all networks, transmit, recieve, message functions +- Verify all example scripts are fully working \ No newline at end of file diff --git a/tests/runner/test_accessory_features.py b/tests/runner/test_accessory_features.py new file mode 100644 index 000000000..7d756cfb8 --- /dev/null +++ b/tests/runner/test_accessory_features.py @@ -0,0 +1,578 @@ +# ruff: noqa: E501 +import time +import unittest +import ctypes + +# Load the DLL explicitly +# dll_path = r'C:\DevJonas\vspy3\Projects\icsneoVIDLL\Debug\icsneo40.dll' +# ctypes.CDLL(dll_path) + +import ics + +# from ics.py_neo_device_ex import PyNeoDeviceEx +from ics.structures.e_device_settings_type import e_device_settings_type + +import faulthandler +import sys + +faulthandler.enable(file=sys.stderr) + +unittest.TestLoader.sortTestMethodsUsing = None + + +# class BaseTests: +# """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + +class TestSettings(unittest.TestCase): + device = None + @classmethod + def setUp(self): + devices = ics.find_devices([ics.NEODEVICE_FIRE3]) + self.device = ics.open_device(devices[0]) + # pass + + def tearDown(self): + ics.close_device(self.device) + time.sleep(6) + + def _get_device(self): + devices = ics.find_devices([self.device_type]) + return devices[0] + + def test_base36enc(self): + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + # device = self._get_device() + # device = ics.open_device(device) + base36 = ics.base36enc(self.device.SerialNumber) + serial_range_min = None + serial_range_max = None + if self.device.DeviceType == ics.NEODEVICE_FIRE3: + serial_range_min = int("ON0000", 36) + serial_range_max = int("ONZZZZ", 36) + if self.device.DeviceType == ics.NEODEVICE_FIRE2: + serial_range_min = int("CY0000", 36) + serial_range_max = int("CYZZZZ", 36) + if self.device.DeviceType == ics.NEODEVICE_RADMOON2: + serial_range_min = int("RM0000", 36) + serial_range_max = int("RNZZZZ", 36) + if self.device.DeviceType == ics.NEODEVICE_VCAN42: + serial_range_min = int("V10000", 36) + serial_range_max = int("V2ZZZZ", 36) + + self.assertGreaterEqual(int(base36, 36), serial_range_min) + self.assertLessEqual(int(base36, 36), serial_range_max) + + # print(base36) + + def test_disk_functions(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + details = ics.get_disk_details(self.device) + + format = ics.disk_format(self.device, details) + format_cancel = ics.disk_format_cancel(self.device) + + details = ics.get_disk_details(self.device) + progress = ics.get_disk_format_progress(self.device) + self.assertIsNot(details, None) + self.assertIsNone(format) + self.assertIsNone(format_cancel) + + # if (not device.DeviceType == ics.NEODEVICE_RADMOON2) and (not device.DeviceType == ics.NEODEVICE_VCAN42): + # ics.read_sdcard(device, 0) # icsneoReadSDCard(), Accepts a PyNeoDeviceEx and sector index. Returns a bytearray of 512 bytes max. Exception on error. + # pass + + def test_radio_message(self): + # might need neovi radio device for this test + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + msg = ics.SpyMessage() + msg.Data = ics.create_neovi_radio_message(Relay1=True, Relay4=False,LED6=True, MSB_report_rate=10) + ics.transmit_messages(self.device, msg) + # msg.Data + + self.assertEqual(0, msg.Data) + + + def test_bus_voltage(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + voltage = False + if self.device.DeviceType == (ics.NEODEVICE_FIRE3 or ics.NEODEVICE_FIRE2): + ics.enable_bus_voltage_monitor(self.device, 1) + voltage = ics.get_bus_voltage(self.device) + + if self.device.DeviceType == ics.NEODEVICE_FIRE3: + self.assertAlmostEqual(voltage, 12000) + elif self.device.DeviceType == ics.NEODEVICE_FIRE2: + self.assertAlmostEqual(voltage, 12000) + elif self.device.DeviceType == ics.NEODEVICE_RADMOON2: + self.assertFalse(voltage) + # moon2 does not allow for reading bus voltage + elif self.device.DeviceType == ics.NEODEVICE_VCAN42: + self.assertFalse(voltage) + # vcan4 does not allow for reading bus voltage + + + def test_enable_doip_line(self): + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + # device = self._get_device() + # device = ics.open_device(device) + ics.enable_doip_line(self.device, 1) + ics.enable_network_com(self.device, True, ics.NETID_HSCAN) + + + def test_enable_network_com(self): + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + # device = self._get_device() + # device = ics.open_device(device) + ics.enable_network_com(self.device, True, ics.NETID_HSCAN) + + + def test_generic_api(self): + api_index = 1 + api_selected = 1 + instance_index = 0 + instance_selected = 0 + data = b"\ff, \ff" + function_running = 8 + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + function_index, callback_error, finished_processing = ics.generic_api_get_status(self.device, api_index, instance_index) # fails + function_return_error = ics.generic_api_send_command(self.device, api_selected, instance_selected, function_running, data) # fails + + # self.assertEqual(callback_error, 0) + + function_index, data = ics.generic_api_read_data(self.device, api_index, instance_index) + function_return_error = ics.generic_api_send_command(self.device, api_selected, instance_selected, function_running, data) + # self.assertEqual(function_return_error, 0) + + ics.get_last_api_error(self.device) + + msg = ics.SpyMessage() + tmstmp = ics.get_timestamp_for_msg(self.device, msg) # need to explicitly test this function + + self.assertEqual(tmstmp, 0.0) # TODO verify this actually works + + + def test_backup_power(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + time.sleep(.5) + ics.get_backup_power_enabled(self.device) + ics.get_backup_power_ready(self.device) + ics.get_bus_voltage(self.device, 0) + + + def test_active_vnet_channel(self): + import inspect + import os + + # Get the file path for the imported module + module_path = inspect.getfile(ics) + + # Print the absolute file path + print(os.path.abspath(module_path)) + path = ics.get_library_path() + print(path) + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + ics.set_active_vnet_channel(self.device, 1) + + vnet_channel = ics.get_active_vnet_channel(self.device) + print(vnet_channel) + # versions = ics.get_all_chip_versions(device) + + + + # msg = ics.SpyMessage() + # tmstmp = ics.get_timestamp_for_msg(device, msg) + + self.assertEqual(vnet_channel, 1) # TODO verify this actually works + + + def test_get_all_chip_versions(self): + # path = ics.get_library_path() + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + # ics.get_chip_ids() + versions = ics.get_all_chip_versions(self.device) + + # self.assetEqual(versions.) + # print(versions) + + + def test_firmware_info(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + + info = ics.get_dll_firmware_info(self.device) + version = ics.get_dll_version() + + + if self.device.DeviceType == ics.NEODEVICE_FIRE3: + self.assertEqual(info.iAppMajor, 4) + self.assertEqual(info.iAppMinor, 13) + self.assertEqual(version, 917) + if self.device.DeviceType == ics.NEODEVICE_FIRE2: + self.assertEqual(info.iAppMajor, 9) + self.assertEqual(info.iAppMinor, 5) + self.assertEqual(version, 917) + if self.device.DeviceType == ics.NEODEVICE_RADMOON2: + self.assertEqual(info.iAppMajor, 3) + self.assertEqual(info.iAppMinor, 4) + self.assertEqual(version, 917) + elif self.device.DeviceType == ics.NEODEVICE_VCAN42: + self.assertEqual(info.iAppMajor, 5) + self.assertEqual(info.iAppMinor, 4) + self.assertEqual(version, 917) + # ics.get_gptp_status(device) + + # ics.get_pcb_serial_number(device) + # ics.get_serial_number(device) + + # ics.get_performance_parameters(device) + # ics.get_rtc(device) + + # ics.iso15765_enable_networks(device, ics.NETID_HSCAN) + # ics.iso15765_disable_networks(device) + + + def test_get_gptp_status(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + status = ics.get_gptp_status(self.device) + + self.assertIsNotNone(status) + # print(status) + + + def test_performance_parameters(self): + # device = self._get_device() + # device = ics.open_device() + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + perf_params = ics.get_performance_parameters(self.device) + + self.assertIn(24576, perf_params) + print(perf_params) + + + + def test_set_led(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + prop = ics.set_led_property(self.device, 1, 100, 100) + + self.assertIsNone(prop) + # pass + + + def test_get_set_rtc(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + errors = ics.get_error_messages(self.device) + rtc_time = ics.get_rtc(self.device) + rtc = ics.set_rtc(self.device) + print(rtc) + + self.assertEqual(errors, []) + self.assertEqual(len(rtc_time), 2) + self.assertIsNone(rtc) + + + def test_is_device_feature_supported(self): + from ics.structures.device_feature import DeviceFeature + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + is_supported = ics.is_device_feature_supported(self.device, DeviceFeature.networkTerminationDWCAN01) # currently only works with fire3 + + self.assertEqual(is_supported, 1) + + + def test_read_jupiter_fw(self): + # need RAD-Jupiter to perform test + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + size = 8 + ics.read_jupiter_firmware(self.device, size) + + + def test_write_jupiter_fw(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + bytes = 8 + ics.write_jupiter_firmware(self.device, bytes) + + + def test_request_enter_sleep_mode(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + ics.request_enter_sleep_mode(self.device, 1, 0) # Currently only supported for FIREVNET/PLASMA + + + def test_get_script_status(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + status = ics.get_script_status(self.device) + # self.assertIn(514, status) + + self.assertIn(51, status) + self.assertIn(270, status) + print(status) + + + def test_backup_power_enabled(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + enabled = ics.set_backup_power_enabled(self.device) + + self.assertEqual(enabled, 1) + + + def test_set_bit_rate(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + bit_rate = 2000 + # success = ics.set_bit_rate(device, bit_rate, ics.NETID_HSCAN) # will only work with NEODEVICE_VCAN3 and NEODEVICE_CT_OBD + + # ics.set_bit_rate definition might not be complete + # missing args in documentation + # print(success) + + + def test_set_bit_rate_ex(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + bit_rate = 2000 + # success = ics.set_bit_rate_ex(device, bit_rate, ics.NETID_HSCAN, iOptions) # missing example usage + # missing args in documentation + # print(success) + + + def test_set_fd_bit_rate(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + bit_rate = 2000 + ret_val = ics.set_fd_bit_rate(self.device, bit_rate, ics.NETID_HSCAN) + + self.assertEqual(ret_val, 1) + + + def test_set_context(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + context = ics.set_context(self.device) + + self.assertEqual(context, 1) + + + def test_set_reflash_callback(self): + def callback(msg, progress): + print(msg, progress) + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + success = ics.set_reflash_callback(callback) + + print(success) + + + def test_safe_boot_mode(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + # mode = ics.set_safe_boot_mode(self.device, True) + # self.assertIsNone(mode) + # mode = ics.set_safe_boot_mode(self.device, False) + # + # self.assertIsNone(mode) + print() + + + def test_dhcp_server(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + ics.start_dhcp_server(self.device, ics.NETID_HSCAN) # documentation is missing args usage + ics.stop_dhcp_server(self.device, ics.NETID_HSCAN) # documentation is missing args usage + + + def test_uart_get_baudrate(self): + from ics.structures.e_uart_port_t import e_uart_port_t + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + baudrate = ics.uart_get_baudrate(self.device, e_uart_port_t.eUART0, 2000) # example usage uses ics.uart_set_baudrate() instead of get + print(baudrate) + + # TypeError: meth_uart_get_baudrate() takes exactly 3 arguments (2 given) + # documentation only has 2 arguments + + def test_uart_read(self): + from ics.structures.e_uart_port_t import e_uart_port_t + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + data = ics.uart_read(self.device, e_uart_port_t.eUART0) + # Error: uart_read(): icsneoUartRead() Failed + print(f"Read {len(data)} bytes: {data}") + + + def test_uart_write(self): + from ics.structures.e_uart_port_t import e_uart_port_t + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + data = ics.uart_write(self.device, e_uart_port_t.eUART0, b'my uart data goes here') + print(f"Read {len(data)} bytes: {data}") + + # ics.ics.RuntimeError: Error: uart_write(): icsneoUartWrite() Failed + + def test_validate_hobject(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + validated = ics.validate_hobject(self.device) + + self.assertEqual(validated, 1) + + + def test_write_sdcard(self): + # device = self._get_device() + # device = ics.open_device(device) + # device = ics.find_devices([ics.NEODEVICE_FIRE3]) + # device = ics.open_device(device[0]) + byte_arr = bytearray(512) + ics.write_sdcard(self.device, 0, byte_arr) # no example usage in documentation + print() + + + +# class TestRADMoon2Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_RADMOON2 +# cls.device_settings_type = e_device_settings_type.DeviceRADMoon2SettingsType +# cls.num_devices = 2 +# print("DEBUG: Testing MOON2s...") + + +# class TestFire3Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_FIRE3 +# cls.device_settings_type = e_device_settings_type.DeviceFire3SettingsType +# cls.num_devices = 1 +# print("DEBUG: Testing FIRE3...") + +# class TestFire2Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_FIRE2 +# cls.device_settings_type = e_device_settings_type.DeviceFire2SettingsType +# cls.num_devices = 1 +# print("DEBUG: Testing FIRE2...") + + +# class TestValueCAN42Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_VCAN42 +# cls.device_settings_type = e_device_settings_type.DeviceVCAN412SettingsType +# cls.num_devices = 1 +# print("DEBUG: Testing VCAN42...") + +def suite(): + suite = unittest.TestSuite() + suite.addTest(TestSettings('test_base36enc')) + suite.addTest(TestSettings('test_disk_functions')) + suite.addTest(TestSettings('test_radio_message')) + suite.addTest(TestSettings('test_bus_voltage')) + suite.addTest(TestSettings('test_enable_doip_line')) + suite.addTest(TestSettings('test_enable_network_com')) + suite.addTest(TestSettings('test_generic_api')) + suite.addTest(TestSettings('test_backup_power')) + suite.addTest(TestSettings('test_active_vnet_channel')) + suite.addTest(TestSettings('test_get_all_chip_versions')) + suite.addTest(TestSettings('test_firmware_info')) + suite.addTest(TestSettings('test_get_gptp_status')) + suite.addTest(TestSettings('test_performance_parameters')) + suite.addTest(TestSettings('test_set_led')) + suite.addTest(TestSettings('test_get_set_rtc')) + suite.addTest(TestSettings('test_is_device_feature_supported')) + suite.addTest(TestSettings('test_read_jupiter_fw')) + suite.addTest(TestSettings('test_write_jupiter_fw')) + suite.addTest(TestSettings('test_request_enter_sleep_mode')) + suite.addTest(TestSettings('test_get_script_status')) + suite.addTest(TestSettings('test_backup_power_enabled')) + suite.addTest(TestSettings('test_set_bit_rate')) + suite.addTest(TestSettings('test_set_bit_rate_ex')) + suite.addTest(TestSettings('test_set_fd_bit_rate')) + suite.addTest(TestSettings('test_set_context')) + suite.addTest(TestSettings('test_set_reflash_callback')) + suite.addTest(TestSettings('test_safe_boot_mode')) + suite.addTest(TestSettings('test_dhcp_server')) + suite.addTest(TestSettings('test_uart_get_baudrate')) + suite.addTest(TestSettings('test_uart_read')) + suite.addTest(TestSettings('test_uart_write')) + suite.addTest(TestSettings('test_validate_hobject')) + suite.addTest(TestSettings('test_write_sdcard')) + return suite + +if __name__ == "__main__": + # unittest.main() + runner = unittest.TextTestRunner() + runner.run(TestSettings.suite()) diff --git a/tests/runner/test_base_t1_network.py b/tests/runner/test_base_t1_network.py new file mode 100644 index 000000000..d7e56c702 --- /dev/null +++ b/tests/runner/test_base_t1_network.py @@ -0,0 +1,99 @@ +from subprocess import PIPE, Popen +import unittest +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestCAN(unittest.TestCase): + @classmethod + def setUp(self): + self.devices = ics.find_devices([ics.NEODEVICE_RADMOON2]) + for device in self.devices: + ics.open_device(device) + ics.load_default_settings(device) + ics.close_device(device) + + @classmethod + def tearDown(self): + for device in self.devices: + ics.open_device(device) + ics.load_default_settings(device) + ics.close_device(device) + + def _set_link_mode_setting(self, device, link_mode: int): + ics.open_device(device) + base_settings = ics.get_device_settings(device) + base_settings.Settings.radmoon2.opEth1.ucConfigMode = ( + link_mode # 0 = auto, 1 = master, 2 = slave + ) + ics.set_device_settings(device, base_settings) + ics.close_device(device) + + def test_ethernet_moon2s(self): + dev1_nic_ip = "192.168.200.01" + dev2_nic_ip = "168.154.84.7" + loop_count = 100 + bidirectional_traffic = True + ethernet_exe_path = ( + r"C:\DevJonas\ethernet_test\Debug\ethernet_test.exe" + ) + + args = [ + ethernet_exe_path, + dev1_nic_ip, + dev2_nic_ip, + str(loop_count), + str(int(bidirectional_traffic)), + ] + + # First test in auto mode (default) + program = Popen( + " ".join(args), + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + ) + stdout_buffer, stderr_buffer = program.communicate() + self.assertEqual(len(stderr_buffer), 0) + self.assertFalse(bool(program.returncode)) + + # Test with master/slave mode + self._set_link_mode_setting(self.devices[0], 1) + self._set_link_mode_setting(self.devices[1], 2) + program = Popen( + " ".join(args), + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + ) + stdout_buffer, stderr_buffer = program.communicate() + self.assertEqual(len(stderr_buffer), 0) + self.assertFalse(bool(program.returncode)) + + # Test with slave/master mode + self._set_link_mode_setting(self.devices[0], 2) + self._set_link_mode_setting(self.devices[1], 1) + program = Popen( + " ".join(args), + stdin=PIPE, + stdout=PIPE, + stderr=PIPE, + ) + stdout_buffer, stderr_buffer = program.communicate() + self.assertEqual(len(stderr_buffer), 0) + self.assertFalse(bool(program.returncode)) + + +class TestT1Ethernet(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + # moon2 eth1 = T1, eth2 = T + pass + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_coremini.py b/tests/runner/test_coremini.py new file mode 100644 index 000000000..c65ee17ac --- /dev/null +++ b/tests/runner/test_coremini.py @@ -0,0 +1,174 @@ +# ruff: noqa: E501 +import time +import unittest +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestSettings(unittest.TestCase): + @classmethod + def setUpClass(cls): + pass + + def _get_device(self): + devices = ics.find_devices([self.device_type]) + self.assertEqual( + len(devices), + self.num_devices, + f"Failed to find correct number of devices of type {self.device_type}! Expected {self.num_devices}, got {len(devices)}.", + ) + return devices[0] + + def test_coremini_load_start_clear(self): + device = self._get_device() + device.open() + ics.coremini_clear(device, self.coremini_location) + time.sleep(self.coremini_wait) + self.assertFalse(ics.coremini_get_status(device)) + + ics.coremini_load(device, self.coremini_path, self.coremini_location) + ics.coremini_start(device, self.coremini_location) + self.assertTrue(ics.coremini_get_status(device)) + + ics.coremini_stop(device) + time.sleep(self.coremini_wait) + self.assertFalse(ics.coremini_get_status(device)) + + ics.coremini_start(device, self.coremini_location) + self.assertTrue(ics.coremini_get_status(device)) + time.sleep(2) + + ics.coremini_clear(device, self.coremini_location) + time.sleep(self.coremini_wait) + self.assertFalse(ics.coremini_get_status(device)) + + for error in ics.get_error_messages(device): + print("Coremini error: " + error) + + device.close() + + def test_coremini_fblock(self): + device = self._get_device() + device.open() + ics.coremini_load(device, self.coremini_path, self.coremini_location) + ics.coremini_start(device, self.coremini_location) + self.assertTrue( + ics.coremini_get_fblock_status(device, 0) + ) # coremini must be started otherwise fails + self.assertTrue(ics.coremini_get_fblock_status(device, 1)) + + ics.coremini_stop_fblock(device, 0) + self.assertFalse(ics.coremini_get_fblock_status(device, 0)) + ics.coremini_start_fblock(device, 0) + self.assertTrue(ics.coremini_get_fblock_status(device, 0)) + ics.coremini_stop_fblock(device, 1) + self.assertFalse(ics.coremini_get_fblock_status(device, 1)) + ics.coremini_start_fblock(device, 1) + self.assertTrue(ics.coremini_get_fblock_status(device, 1)) + + ics.coremini_clear(device, self.coremini_location) + time.sleep(self.coremini_wait) + failed = False + try: + ics.coremini_get_fblock_status(device, 0) + except: + failed = True + self.assertTrue(failed) + + for error in ics.get_error_messages(device): + print("Coremini error: " + error) + + device.close() + + def test_coremini_signals(self): + device = self._get_device() + device.open() + # using all on RGB LEDs coremini script to read signals + ics.coremini_load(device, self.coremini_path, self.coremini_location) + ics.coremini_start(device, self.coremini_location) + + self.assertEqual(ics.coremini_read_app_signal(device, 0), 255.0) + self.assertEqual(ics.coremini_read_app_signal(device, 1), 255.0) + self.assertEqual(ics.coremini_read_app_signal(device, 2), 255.0) + failed = False + try: + ics.coremini_read_app_signal(device, 3) + except: + failed = True + self.assertTrue(failed) + + ics.coremini_clear(device, self.coremini_location) + time.sleep(self.coremini_wait) + failed = False + try: + ics.coremini_read_app_signal(device, 0) + except: + failed = True + self.assertTrue(failed) + + # using all off RGB LEDs coremini script + ics.coremini_load(device, self.coremini_path_off, self.coremini_location) + ics.coremini_start(device, self.coremini_location) + + self.assertEqual(ics.coremini_read_app_signal(device, 0), 0.0) + self.assertEqual(ics.coremini_read_app_signal(device, 1), 0.0) + self.assertEqual(ics.coremini_read_app_signal(device, 2), 0.0) + failed = False + try: + ics.coremini_read_app_signal(device, 3) + except: + failed = True + self.assertTrue(failed) + + # write and read signals + ics.coremini_load(device, self.coremini_path, self.coremini_location) + ics.coremini_start(device, self.coremini_location) + ics.coremini_write_app_signal(device, 0, 100.0) + self.assertEqual(ics.coremini_read_app_signal(device, 0), 100.0) + ics.coremini_write_app_signal(device, 1, 0.0) + self.assertEqual(ics.coremini_read_app_signal(device, 1), 0.0) + + ics.coremini_clear(device, self.coremini_location) + device.close() + + +# class TestRADMoon2Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_RADMOON2 +# cls.num_devices = 2 +# print("DEBUG: Testing MOON2s...") + +# class TestFire3Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_FIRE3 +# cls.num_devices = 1 +# print("DEBUG: Testing FIRE3...") + +# class TestFire2Settings(BaseTests.TestSettings): +# @classmethod +# def setUpClass(cls): +# cls.device_type = ics.NEODEVICE_FIRE2 +# cls.num_devices = 1 +# print("DEBUG: Testing FIRE2...") + + +class TestValueCAN42Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_VCAN42 + cls.num_devices = 1 + cls.coremini_location = ics.SCRIPT_LOCATION_FLASH_MEM + cls.coremini_path = r"E:\Users\NCejka\Downloads\leds_white_v4_90.vs3cmb" + cls.coremini_path_off = r"E:\Users\NCejka\Downloads\leds_off_v4_90.vs3cmb" + cls.coremini_wait = 0.1 # sec to wait after stop/clear + print("DEBUG: Testing VCAN42...") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_dll_features.py b/tests/runner/test_dll_features.py new file mode 100644 index 000000000..d75054677 --- /dev/null +++ b/tests/runner/test_dll_features.py @@ -0,0 +1,67 @@ +import time +import unittest +import ics + +# from ics.py_neo_device_ex import PyNeoDeviceEx +from ics.structures.e_device_settings_type import e_device_settings_type + +unittest.TestLoader.sortTestMethodsUsing = None + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestSettings(unittest.TestCase): + @classmethod + def setUpClass(cls): + pass + + def _get_device(self): + devices = ics.find_devices([self.device_type]) + self.assertEqual( + len(devices), + self.num_devices, + f"Failed to find correct number of devices of type {self.device_type}! Expected {self.num_devices}, got {len(devices)}.", + ) + return devices[0] + + def test_dll_features(self): + self.assertGreater(len(ics.get_library_path()), 0) + self.assertGreater( + ics.get_dll_version(), 0 + ) # Documentation for this needs to be updated -- does not take any device arg + + def test_device_dll_features(self): + device = self._get_device() + device.open() + info = ics.get_dll_firmware_info( + device + ) # must be open! Only gives ief ver that dll has + info_comp = device.get_dll_firmware_info() + self.assertEqual(info.iAppMajor, info_comp.iAppMajor) + self.assertEqual(info.iAppMinor, info_comp.iAppMinor) + # device.close() + time.sleep(4) + # ics.get_device_status(device) + # ics.get_last_api_error(device) # TODO figure out best way to replicate api error to read + + def test_library_location(self): + device = self._get_device() + device = ics.open_device(device) + path = ics.get_library_path() + print(path) + ics.override_library_name(r"./icsneo40-v2") + path = ics.get_library_path() + print(path) + # need to put dlls in proper locations and write assertions + +class TestValueCAN42Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_VCAN42 + cls.num_devices = 1 + print("DEBUG: Testing VCAN42...") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_eth_networks.py b/tests/runner/test_eth_networks.py new file mode 100644 index 000000000..5b56c8dce --- /dev/null +++ b/tests/runner/test_eth_networks.py @@ -0,0 +1,182 @@ +import unittest +import time +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +error_flags = { + ics.SPY_STATUS_GLOBAL_ERR: "SPY_STATUS_GLOBAL_ERR", + ics.SPY_STATUS_CRC_ERROR: "SPY_STATUS_CRC_ERROR", + ics.SPY_STATUS_GLOBAL_ERR: "SPY_STATUS_GLOBAL_ERR", + ics.SPY_STATUS_CRC_ERROR: "SPY_STATUS_CRC_ERROR", + ics.SPY_STATUS_CAN_ERROR_PASSIVE: "SPY_STATUS_CAN_ERROR_PASSIVE", + ics.SPY_STATUS_HEADERCRC_ERROR: "SPY_STATUS_HEADERCRC_ERROR", + ics.SPY_STATUS_INCOMPLETE_FRAME: "SPY_STATUS_INCOMPLETE_FRAME", + ics.SPY_STATUS_LOST_ARBITRATION: "SPY_STATUS_LOST_ARBITRATION", + ics.SPY_STATUS_UNDEFINED_ERROR: "SPY_STATUS_UNDEFINED_ERROR", + ics.SPY_STATUS_CAN_BUS_OFF: "SPY_STATUS_CAN_BUS_OFF", + ics.SPY_STATUS_BUS_RECOVERED: "SPY_STATUS_BUS_RECOVERED", + ics.SPY_STATUS_BUS_SHORTED_PLUS: "SPY_STATUS_BUS_SHORTED_PLUS", + ics.SPY_STATUS_BUS_SHORTED_GND: "SPY_STATUS_BUS_SHORTED_GND", + ics.SPY_STATUS_CHECKSUM_ERROR: "SPY_STATUS_CHECKSUM_ERROR", + ics.SPY_STATUS_BAD_MESSAGE_BIT_TIME_ERROR: ( + "SPY_STATUS_BAD_MESSAGE_BIT_TIME_ERROR" + ), + ics.SPY_STATUS_TX_NOMATCH: "SPY_STATUS_TX_NOMATCH", + ics.SPY_STATUS_COMM_IN_OVERFLOW: "SPY_STATUS_COMM_IN_OVERFLOW", + ics.SPY_STATUS_EXPECTED_LEN_MISMATCH: "SPY_STATUS_EXPECTED_LEN_MISMATCH", + ics.SPY_STATUS_MSG_NO_MATCH: "SPY_STATUS_MSG_NO_MATCH", + ics.SPY_STATUS_BREAK: "SPY_STATUS_BREAK", + ics.SPY_STATUS_AVSI_REC_OVERFLOW: "SPY_STATUS_AVSI_REC_OVERFLOW", + ics.SPY_STATUS_BREAK: "SPY_STATUS_AVSI_REC_OVERFLOW", +} + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestCAN(unittest.TestCase): + @classmethod + def setUp(self): + self.fire2 = ics.find_devices([ics.NEODEVICE_FIRE2])[0] + self.fire3 = ics.find_devices([ics.NEODEVICE_FIRE3])[0] + self.devices = [self.fire2, self.fire3] + for device in self.devices: + ics.open_device(device) + ics.load_default_settings(device) + _, _ = ics.get_messages(device, False, 1) + + @classmethod + def tearDown(self): + for device in self.devices: + _, _ = ics.get_messages(device, False, 1) + ics.close_device(device) + + def _check_ethernet_link(self, toggle_com, device, netid): + # Enable network comm to push status message and clear previous messages + if toggle_com: + ics.enable_network_com(device, False) + _, _ = ics.get_messages(device, False, 0.5) + ics.enable_network_com(device, True) + # Search for neoVI ethernet status msg and check for link up + netid_lower = netid & 0xFF + netid_upper = (netid & 0xFF00) >> 8 + msgs, _ = ics.get_messages(device, False, 1) + for msg in msgs: + if msg.NetworkID == ics.NETID_DEVICE and msg.ArbIDOrHeader == 0x162: + if ( + msg.Data[0] == 1 + and msg.Data[3] == netid_lower + and msg.Data[4] == netid_upper + ): + return True + return False + + def _test_tx_rx(self, tx_dev, rx_dev, tx_msg, rx_netid): + # clear buffers + _, _ = ics.get_messages(tx_dev, False, 1) + _, _ = ics.get_messages(rx_dev, False, 1) + found = False + # Transmit message, We aren't guarenteed to receive ethernet on the other side so we "have" to transmit more + # than one message and hope at least one gets through. + for _ in range(20): + if found: + break + ics.transmit_messages(tx_dev, tx_msg) + time.sleep(0.1) + _, _ = ics.get_messages(tx_dev, False, 0) + # Process the messages + rx_msgs, errors = ics.get_messages(rx_dev, False, 2) + for rx_msg in rx_msgs: + # Fire2 eth1 <-> Fire3 eth3 + netid = (rx_msg.NetworkID2 << 8) | rx_msg.NetworkID + if netid == rx_netid: + if tx_msg.Protocol == rx_msg.Protocol: + if tx_msg.ExtraDataPtr == rx_msg.ExtraDataPtr[:-4]: + # We found the message, lets make sure there are no errors present. + if rx_msg.StatusBitField in error_flags.keys(): + raise Exception( + f"{rx_dev} rx msg error: {error_flags[rx_msg.StatusBitField]}" + ) + else: + found = True + break + + if not found: + raise Exception(f"Failed to find tx msg on {rx_dev}") + + def test_ethernet_fire2(self): + start_time = time.time() + toggle_com = True + while not self._check_ethernet_link( + toggle_com, self.fire2, self.fire2_netid + ): + toggle_com = False + if time.time() - start_time > 10.0: + raise TimeoutError( + f"Failed to establish link in 10sec on {self.fire2}" + ) + time.sleep(1) + + # Create SpyMessage for transmission + ics_mac_address = (0x00, 0xFC, 0x70, 0xFF, 0xDE, 0xAD) + ether_type = (0xFF, 0xFF) + payload_size = 64 + tx_msg = ics.SpyMessage() + tx_msg.NetworkID = self.fire2_netid & 0xFF + tx_msg.NetworkID2 = (self.fire2_netid >> 8) & 0xFF + tx_msg.Protocol = ics.SPY_PROTOCOL_ETHERNET + tx_msg.ExtraDataPtr = ( + (self.fire2_netid,) + + ics_mac_address + + ics_mac_address + + ether_type + + tuple([x & 0xFF for x in range(payload_size)]) + ) + + self._test_tx_rx(self.fire2, self.fire3, tx_msg, self.fire3_netid) + + def test_ethernet_fire3(self): + start_time = time.time() + toggle_com = True + while not self._check_ethernet_link( + toggle_com, self.fire3, self.fire3_netid + ): + toggle_com = False + if time.time() - start_time > 10.0: + raise TimeoutError( + f"Failed to establish link in 10sec on {self.fire3}" + ) + time.sleep(1) + + # Create SpyMessage for transmission + ics_mac_address = (0x00, 0xFC, 0x70, 0xFF, 0xDE, 0xAD) + ether_type = (0xFF, 0xFF) + payload_size = 64 + tx_msg = ics.SpyMessage() + tx_msg.NetworkID = self.fire3_netid & 0xFF + tx_msg.NetworkID2 = (self.fire3_netid >> 8) & 0xFF + tx_msg.Protocol = ics.SPY_PROTOCOL_ETHERNET + tx_msg.ExtraDataPtr = ( + (self.fire3_netid,) + + ics_mac_address + + ics_mac_address + + ether_type + + tuple([x & 0xFF for x in range(payload_size)]) + ) + + self._test_tx_rx(self.fire3, self.fire2, tx_msg, self.fire2_netid) + + +class TestEthernet(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + # fire3 ethernet2 <-> fire2 ethernet1 + # fire3 VEM IS NEEDED! + cls.fire3_netid = ics.NETID_ETHERNET3 + cls.fire2_netid = ics.NETID_ETHERNET + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_firmware_updating.py b/tests/runner/test_firmware_updating.py new file mode 100644 index 000000000..a82356d14 --- /dev/null +++ b/tests/runner/test_firmware_updating.py @@ -0,0 +1,157 @@ +import unittest +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +def message_callback(msg, success): + try: + # print("message_callback:", msg, success) + complete_msg = msg + ": " + str(success) + print(complete_msg) + # self.testUpdated.emit(self.current_test_name, str(self.parameters.sn_info.classic.tester), complete_msg) + except Exception as ex: + print(ex) + + +def reflash_callback(msg, progress): + try: + # print("reflash_callback:", msg, progress) + complete_msg = msg + " " + str(progress) + "%" + print(complete_msg) + print(msg, progress) + except Exception as ex: + print(ex) + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestSettings(unittest.TestCase): + def _get_device(self): + devices = ics.find_devices([self.device_type]) + self.assertEqual( + len(devices), + self.num_devices, + f"Failed to find correct number of devices of type {self.device_type}! Expected {self.num_devices}, got {len(devices)}.", + ) + return devices[0] + + def _test_update_firmware(self): + device = self._get_device() + device.open() + # get firmware version + self.assertEqual(device.SerialNumber, ics.get_serial_number(device)) + self.assertEqual(device.get_serial_number(), ics.get_serial_number(device)) + info = ics.get_hw_firmware_info( + device + ) # must be open! Gives iefs ver, pcb rev, bootloader rev, manuf date + # print(device.Name, device.serial_number) + # print(f"MCHIP IEF v{info.iAppMajor}.{info.iAppMinor}") + # print(f"MCHIP BL v{info.iBootLoaderVersionMajor}.{info.iBootLoaderVersionMinor}") + # print(f"Made {info.iManufactureMonth}/{info.iManufactureDay}/{info.iManufactureYear}") + self.assertEqual(device.FirmwareMajor, info.iAppMajor) + self.assertEqual(device.FirmwareMinor, info.iAppMinor) + pcbsn = ics.get_pcb_serial_number(device) + self.assertEqual(int(pcbsn[5]), info.iBoardRevMajor) + self.assertEqual(int(pcbsn[6], 36), info.iBoardRevMinor) + self.assertEqual(pcbsn, device.get_pcb_serial_number()) + + # Mass check ics vs device func attributes + info_comp = device.get_hw_firmware_info() + for attr in dir(info_comp): + if attr[0] == "_": + continue + self.assertEqual(getattr(info_comp, attr), getattr(info, attr)) + + device.close() + + # first flash old firmware + iefs = {ics.VCAN42_MCHIP_ID: self.old_firmware_path} + ics.set_reflash_callback(reflash_callback) + ics.flash_devices( + device, iefs, message_callback + ) # device must be closed?!?! + + # check IEF version and that update is needed + device.open() + self.assertEqual(1, ics.firmware_update_required(device)) + + # then force firmware update + ics.force_firmware_update( + device + ) # device needs to be open and will stay open! + + # check it again + self.assertEqual(0, ics.firmware_update_required(device)) + device.close() + + # update with new iefs + iefs = {ics.VCAN42_MCHIP_ID: self.new_firmware_path} + ics.flash_devices(device, iefs, message_callback) + + # check one last time + device.open() + self.assertEqual(0, ics.firmware_update_required(device)) + device.close() + + # ics.flash_accessory_firmware(device, data, index[, check_success]) + # ics.get_accessory_firmware_version() + + # ics.get_all_chip_versions(device, api_index, instance_index) + # ics.get_backup_power_enabled(device) + # ics.get_backup_power_ready(device) + # ics.get_bus_voltage(device, reserved) + # ics.get_device_status(device) + + # ics.get_last_api_error(device) + + # ics.get_performance_parameters(device) + # ics.get_rtc(device) + # ics.get_script_status() # Documentation needs updating to include "device" parameter + # ics.get_timestamp_for_msg(device, msg) + + def test_accessary_firmware(self): + device = self._get_device() + device.open() + ics.flash_accessory_firmware(device, data, index[, check_success]) + ics.get_accessory_firmware_version() + +class TestValueCAN42Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_VCAN42 + cls.chip_id = ics.VCAN42_MCHIP_ID + cls.num_devices = 1 + cls.new_firmware_path = r"C:\Users\JHarrison\Downloads\vcan42_mchip.ief" + cls.old_firmware_path = r"C:\Users\JHarrison\Downloads\vcan42_mchip_v4_90.ief" + print("DEBUG: Testing VCAN42...") + + def test_update_firmware(self): + self._test_update_firmware() + +class TestRADMoon2Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_RADMOON2 + cls.chip_id = ics.RADMOON2_ZL_MCHIP_ID + cls.num_devices = 2 + print("DEBUG: Testing MOON2s...") + + +class TestFire3Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_FIRE3 + cls.num_devices = 1 + print("DEBUG: Testing FIRE3...") + +class TestFire2Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_FIRE2 + cls.num_devices = 1 + print("DEBUG: Testing FIRE2...") + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_hscan_networks.py b/tests/runner/test_hscan_networks.py new file mode 100644 index 000000000..4ba69172f --- /dev/null +++ b/tests/runner/test_hscan_networks.py @@ -0,0 +1,196 @@ +# ruff: noqa: E501 +import datetime +import unittest +import time +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +def are_errors_present(msg): + """Helper function to detect if a message error occurred.""" + error_flags = 0 + error_flags |= ics.SPY_STATUS_GLOBAL_ERR + error_flags |= ics.SPY_STATUS_CRC_ERROR + error_flags |= ics.SPY_STATUS_CAN_ERROR_PASSIVE + error_flags |= ics.SPY_STATUS_HEADERCRC_ERROR + error_flags |= ics.SPY_STATUS_INCOMPLETE_FRAME + error_flags |= ics.SPY_STATUS_LOST_ARBITRATION + error_flags |= ics.SPY_STATUS_UNDEFINED_ERROR + error_flags |= ics.SPY_STATUS_CAN_BUS_OFF + error_flags |= ics.SPY_STATUS_BUS_RECOVERED + error_flags |= ics.SPY_STATUS_BUS_SHORTED_PLUS + error_flags |= ics.SPY_STATUS_BUS_SHORTED_GND + error_flags |= ics.SPY_STATUS_CHECKSUM_ERROR + error_flags |= ics.SPY_STATUS_BAD_MESSAGE_BIT_TIME_ERROR + error_flags |= ics.SPY_STATUS_TX_NOMATCH + error_flags |= ics.SPY_STATUS_COMM_IN_OVERFLOW + error_flags |= ics.SPY_STATUS_EXPECTED_LEN_MISMATCH + error_flags |= ics.SPY_STATUS_MSG_NO_MATCH + error_flags |= ics.SPY_STATUS_BREAK + error_flags |= ics.SPY_STATUS_AVSI_REC_OVERFLOW + error_flags |= ics.SPY_STATUS_BREAK + if (msg.StatusBitField & error_flags) != 0: + return True + return False + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestCAN(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.netid = None + + @classmethod + def setUp(self): + self.vcan42 = ics.find_devices([ics.NEODEVICE_VCAN42])[0] + self.fire2 = ics.find_devices([ics.NEODEVICE_FIRE2])[0] + self.fire3 = ics.find_devices([ics.NEODEVICE_FIRE3])[0] + self.devices = [self.vcan42, self.fire2, self.fire3] + for device in self.devices: + ics.open_device(device) + # ics.load_default_settings(device) + + @classmethod + def tearDown(self): + for device in self.devices: + ics.close_device(device) + del self.devices + + def _prepare_devices(self): + for device in self.devices: + # Clear any messages in the buffer + _, _ = ics.get_messages(device, False, 1) + _ = ics.get_error_messages( + device + ) # Documentation is wrong -- says it can take 3 args but only takes 1 + # may need more clearing of errors here + + def _tx_rx_devices(self, tx_device, rx_devices: list): + self._prepare_devices() + tx_msg = ics.SpyMessage() + tx_msg.ArbIDOrHeader = 0x01 + tx_msg.NetworkID = self.netid + tx_msg.Protocol = ics.SPY_PROTOCOL_CANFD + tx_msg.StatusBitField = ics.SPY_STATUS_CANFD + tx_msg.StatusBitField3 = ics.SPY_STATUS3_CANFD_BRS + tx_msg.ExtraDataPtr = tuple([x for x in range(64)]) + # tx HSCAN msg + ics.transmit_messages(tx_device, tx_msg) + # CAN ACK timeout in firmware is 200ms, so wait 300ms for ACK + time.sleep(1) + + + # rx HSCAN msg + for device in rx_devices: + # rx_messages, error_count = ics.get_messages(device, False, .01) + # time.sleep(0.3) + rx_messages, error_count = ics.get_messages(device, False, .01) + self.assertEqual(error_count, 0, str(device)) + # self.assertEqual( + # len(rx_messages), + # 1, + # f"Device {str(device)} didnt find 1 msg but {len(rx_messages)} msgs", + # ) + for rx_msg in rx_messages: + if rx_msg.NetworkID == tx_msg.NetworkID: + if rx_msg.ArbIDOrHeader == tx_msg.ArbIDOrHeader: + self.assertEqual(rx_msg.ExtraDataPtr, tx_msg.ExtraDataPtr) + self.assertFalse( + are_errors_present(rx_msg), + f"Device {str(device)} rx msg error: {hex(rx_msg.StatusBitField)}", + ) + datetime.datetime.fromtimestamp(rx_msg.TimeHardware) + self.assertGreater(rx_msg.TimeSystem, 0) + self.assertEqual(rx_msg.TimeStampHardwareID, 9) + self.assertEqual(rx_msg.TimeStampSystemID, 1) + + for device in self.devices: + self.assertFalse(ics.get_error_messages(device)) + + def test_vcan42_transmit(self): + self._tx_rx_devices(self.vcan42, self.devices[1:]) # rx on fire2 and fire3 + + def test_fire2_transmit(self): + self._tx_rx_devices( + self.fire2, [self.devices[0], self.devices[-1]] + ) # rx on vcan42 and fire3 + + def test_fire3_transmit(self): + self._tx_rx_devices(self.fire3, self.devices[:-1]) # rx on vcan42 and fire2 + + def test_spy_message(self): + msg = ics.SpyMessage() + with self.assertRaises(TypeError): + msg.ArbIDOrHeader = "a" + msg.NetworkID = 255 + with self.assertRaises(AttributeError): + msg.Data = [x for x in range(8)] + msg.Data = tuple([x for x in range(8)]) + self.assertTrue(msg.NumberBytesData == 8) + msg.Data = tuple([x for x in range(16)]) + # Data gets truncated and overflow goes into ExtraDataPtr but is it correct??? + self.assertTrue(msg.NumberBytesData == 16) + self.assertTrue(msg.Data == tuple([x for x in range(8)])) + msg.ExtraDataPtr = tuple([x for x in range(255)]) + self.assertTrue(len(msg.ExtraDataPtr) == 255) + msg.ExtraDataPtr = tuple([x for x in range(256)]) + self.assertIsNone(msg.ExtraDataPtr) + + msg.Data = () + msg.ExtraDataPtr = tuple([x for x in range(8)]) + self.assertTrue(msg.Data == msg.ExtraDataPtr) + self.assertTrue(msg.NumberBytesData == 8) + msg.Data = () + self.assertIsNone(msg.ExtraDataPtr) + + msg.ExtraDataPtr = tuple([x for x in range(16)]) + self.assertTrue(msg.Data == tuple([x for x in range(8)])) + self.assertTrue(msg.Data == msg.ExtraDataPtr[:8]) + self.assertTrue(msg.NumberBytesData == 16) + + msg.ExtraDataPtr = () + self.assertTrue(msg.Data == ()) + + msg.ExtraDataPtr = tuple([x for x in range(16)]) + self.assertTrue( + msg.Data == msg.ExtraDataPtr[:8] + ) # This looks like an error... but its fixed now??? + + +class TestHSCAN1(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_HSCAN + + +class TestHSCAN2(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_HSCAN2 + +class TestHSCAN3(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_HSCAN3 + +class TestMSCAN(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_MSCAN + +class TestLSFTCAN(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_LSFTCAN + +class TestSWCAN(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_SWCAN + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_iso15765_networks.py b/tests/runner/test_iso15765_networks.py new file mode 100644 index 000000000..cb427f588 --- /dev/null +++ b/tests/runner/test_iso15765_networks.py @@ -0,0 +1,203 @@ +# ruff: noqa: E501 +import ctypes +import datetime +import unittest +import time +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +def print_message(msg): + if isinstance(msg, ics.st_cm_iso157652_rx_message.st_cm_iso157652_rx_message): + print("\tArbID: {}\tData: {}".format(hex(msg.id), [hex(x) for x in msg.Data])) + return + print("\t" + str(type(msg))) + for attribute in dir(msg): + if attribute.startswith("_"): + continue + length = len(attribute) + if attribute == "data": + print("\t\t{}:{}{}".format(attribute, " " * (30 - length), msg.data[: msg.num_bytes])) + else: + value = getattr(msg, attribute) + try: + value = hex(value) + except: + pass + print("\t\t{}:{}{}".format(attribute, " " * (30 - length), value)) + print() + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestISO15765(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.netid = None + + @classmethod + def setUp(self): + self.vcan42 = ics.find_devices([ics.NEODEVICE_VCAN42])[0] + self.fire2 = ics.find_devices([ics.NEODEVICE_FIRE2])[0] + self.fire3 = ics.find_devices([ics.NEODEVICE_FIRE3])[0] + self.devices = [self.vcan42, self.fire2, self.fire3] + for device in self.devices: + ics.open_device(device) + ics.load_default_settings(device) + + @classmethod + def tearDown(self): + for device in self.devices: + ics.close_device(device) + del self.devices + + def _prepare_devices(self): + for device in self.devices: + # Clear any messages in the buffer + _, _ = ics.get_messages(device, False, 1) + _ = ics.get_error_messages( + device + ) # Documentation is wrong -- says it can take 3 args but only takes 1 + # may need more clearing of errors here + + def _tx_rx_devices(self, tx_device, rx_devices: list): + self._prepare_devices() + + netid = ics.NETID_HSCAN + + tx_device = tx_device + rx_device = rx_devices[0] + + ics.iso15765_enable_networks(tx_device, netid) + ics.iso15765_enable_networks(rx_device, netid) + self.setup_rx_iso15765_msg(rx_device) + self.transmit_iso15765_msg(tx_device) + self.get_iso15765_msgs(rx_device) + + ics.iso15765_disable_networks(tx_device) + ics.iso15765_disable_networks(rx_device) + + # tx_msg = ics.SpyMessage() + # tx_msg.ArbIDOrHeader = 0x01 + # tx_msg.NetworkID = self.netid + # tx_msg.Protocol = ics.SPY_PROTOCOL_CANFD + # tx_msg.StatusBitField = ics.SPY_STATUS_CANFD + # tx_msg.StatusBitField3 = ics.SPY_STATUS3_CANFD_BRS + # tx_msg.ExtraDataPtr = tuple([x for x in range(64)]) + # # tx HSCAN msg + # ics.transmit_messages(tx_device, tx_msg) + # # CAN ACK timeout in firmware is 200ms, so wait 300ms for ACK + # time.sleep(0.3) + + # # rx HSCAN msg + # for device in rx_devices: + # rx_messages, error_count = ics.get_messages(device, False, 1) + # self.assertEqual(error_count, 0, str(device)) + # self.assertEqual( + # len(rx_messages), + # 1, + # f"Device {str(device)} didnt find 1 msg but {len(rx_messages)} msgs", + # ) + # for rx_msg in rx_messages: + # if rx_msg.NetworkID == tx_msg.NetworkID: + # if rx_msg.ArbIDOrHeader == tx_msg.ArbIDOrHeader: + # self.assertEqual(rx_msg.ExtraDataPtr, tx_msg.ExtraDataPtr) + # self.assertFalse( + # are_errors_present(rx_msg), + # f"Device {str(device)} rx msg error: {hex(rx_msg.StatusBitField)}", + # ) + # datetime.datetime.fromtimestamp(rx_msg.TimeHardware) + # self.assertGreater(rx_msg.TimeSystem, 0) + # self.assertEqual(rx_msg.TimeStampHardwareID, 9) + # self.assertEqual(rx_msg.TimeStampSystemID, 1) + + for device in self.devices: + self.assertFalse(ics.get_error_messages(device)) + + def test_vcan42_transmit(self): + self._tx_rx_devices(self.vcan42, self.devices[1:]) # rx on fire2 and fire3 + + def test_fire2_transmit(self): + self._tx_rx_devices( + self.fire2, [self.devices[0], self.devices[-1]] + ) # rx on vcan42 and fire3 + + def test_fire3_transmit(self): + self._tx_rx_devices(self.fire3, self.devices[:-1]) # rx on vcan42 and fire2 + + def setup_rx_iso15765_msg(self, device, netid=ics.NETID_HSCAN, is_canfd=False): + msg = ics.st_cm_iso157652_rx_message.st_cm_iso157652_rx_message() + + msg.id = 0x7E0 + msg.vs_netid = netid + msg.padding = 0xAA + msg.id_mask = 0xFFF + msg.fc_id = 0x7E8 + msg.blockSize = 100 + msg.stMin = 10 + msg.cf_timeout = 1000 + # enableFlowControlTransmission = 1 + msg.enableFlowControlTransmission = 1 + # paddingEnable + msg.paddingEnable = 1 + # CANFD: Enable + BRS + if is_canfd: + msg.iscanFD = 1 + msg.isBRSEnabled = 1 + # print_message(msg) + print("Setting up iso15765 message on {}...".format(device)) + ics.iso15765_receive_message(device, netid, msg) + print("Setup iso15765 message on {}.".format(device)) + + def transmit_iso15765_msg(self, device, netid=ics.NETID_HSCAN, is_canfd=False): + number_of_bytes = 64 + msg = ics.st_cm_iso157652_tx_message.st_cm_iso157652_tx_message() + msg.id = 0x7E0 + msg.vs_netid = netid + msg.num_bytes = number_of_bytes + msg.padding = 0xAA + # Flow Control + msg.fc_id = 0x7E8 + msg.fc_id_mask = 0xFFF + msg.flowControlExtendedAddress = 0xFE + msg.fs_timeout = 0x10 # ms + msg.fs_wait = 0x3000 # ms + msg.blockSize = 0 + msg.stMin = 0 + # paddingEnable + msg.paddingEnable = 1 + # CANFD: Enable + BRS + if is_canfd: + msg.iscanFD = 1 + msg.isBRSEnabled = 1 + # tx_dl + msg.tx_dl = 1 + # Data + my_data = [x for x in range(number_of_bytes)] + msg.data = (ctypes.c_ubyte * len(msg.data))(*my_data) + + # Transmit the message + print("Transmitting iso15765 message on {}...".format(device)) + ics.iso15765_transmit_message(device, netid, msg, 3000) + time.sleep(0.5) + print_message(msg) + print("Transmitted iso15765 message on {}.".format(device)) + + def get_iso15765_msgs(self, device): + msgs, error_count = ics.get_messages(device, False, 1) + print("Received {} messages with {} errors.".format(len(msgs), error_count)) + for i, m in enumerate(msgs): + print("Message #{}\t".format(i + 1), end="") + print_message(m) + + +class TestHSCAN1(BaseTests.TestISO15765): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_HSCAN + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_lin_networks.py b/tests/runner/test_lin_networks.py new file mode 100644 index 000000000..bcb817404 --- /dev/null +++ b/tests/runner/test_lin_networks.py @@ -0,0 +1,116 @@ +# ruff: noqa: E501 +import unittest +import time +import ics + +unittest.TestLoader.sortTestMethodsUsing = None + + +class BaseTests: + """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" + + class TestCAN(unittest.TestCase): + @classmethod + def setUpClass(cls): + cls.netid = None + + @classmethod + def setUp(self): + self.fire2 = ics.find_devices([ics.NEODEVICE_FIRE2])[0] + self.fire3 = ics.find_devices([ics.NEODEVICE_FIRE3])[0] + self.devices = [self.fire2, self.fire3] + for device in self.devices: + ics.open_device(device) + ics.load_default_settings(device) + + @classmethod + def tearDown(self): + for device in self.devices: + ics.close_device(device) + del self.devices + + def _prepare_devices(self): + for device in self.devices: + # Clear any messages in the buffer + _, _ = ics.get_messages(device, False, 1) + _ = ics.get_error_messages( + device + ) # Documentation is wrong -- says it can take 3 args but only takes 1 + # may need more clearing of errors here + + def _tx_rx_lin_devices(self, master_dev, slave_dev): + self._prepare_devices() + + master_msg = ics.SpyMessageJ1850() + master_msg.Header = (0xC1,) + master_msg.NetworkID = ics.NETID_LIN + master_msg.Data = () + master_msg.StatusBitField = ics.SPY_STATUS_LIN_MASTER + + slave_msg = ics.SpyMessageJ1850() + slave_msg.Header = (0xC1,) + slave_msg.NetworkID = self.netid + slave_msg.Data = tuple([x for x in range(8)]) + slave_msg.Protocol = ics.SPY_PROTOCOL_LIN + checksum = 0 + for byte in slave_msg.Data + slave_msg.Header[1:3]: + checksum += byte + if checksum > 255: + checksum -= 255 + slave_msg.Data += ((~checksum & 0xFF),) + + # transmit slave msg first + ics.transmit_messages(slave_dev, slave_msg) + time.sleep(0.5) + + # transmit master msg + ics.transmit_messages(master_dev, master_msg) + time.sleep(0.5) + + # find msg + rx_msgs, errors = ics.get_messages(slave_dev, False, 1) + self.assertFalse(errors) + # Should only see 1 msg but sometimes we get 2 + self.assertTrue(len(rx_msgs) == 1 or len(rx_msgs) == 2) + for msg in rx_msgs: + if msg.NetworkID == master_msg.NetworkID: + if msg.ArbIDOrHeader == master_msg.Header: + if msg.Data == master_msg.Data: + break + + for device in self.devices: + self.assertFalse(ics.get_error_messages(device)) + + def test_fire2_master_tx(self): + self._tx_rx_lin_devices(self.fire2, self.fire3) + + def test_fire3_master_tx(self): + self._tx_rx_lin_devices(self.fire3, self.fire2) + + +class TestLIN1(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_LIN + + +class TestLIN2(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_LIN2 + + +class TestLIN3(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_LIN3 + + +class TestLIN4(BaseTests.TestCAN): + @classmethod + def setUpClass(cls): + cls.netid = ics.NETID_LIN4 + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/runner/test_misc_io.py b/tests/runner/test_misc_io.py new file mode 100644 index 000000000..63fea9a9a --- /dev/null +++ b/tests/runner/test_misc_io.py @@ -0,0 +1,38 @@ +def testMISC(self): + # dev1 = output, dev2 = input + def _test_misc(self, state): + arbs = (0x203, 0x204, 0x205, 0x206, 0x201, 0x202) + for arb in arbs: + message = ics.SpyMessage() + message.ArbIDOrHeader = arb + message.NetworkID = ics.NETID_DEVICE + message.Data = (state,) + self.assertEqual(ics.transmit_messages(self.dev1, message), None) + # Wait for the report message to get sent + time.sleep(0.2) + self.assertFalse(ics.get_error_messages(self.dev1)) + self.assertFalse(ics.get_error_messages(self.dev2)) + messages, error_count = ics.get_messages(self.dev2, False, 0.5) + self.assertEqual(error_count, 0) + self.assertNotEqual(len(messages), 0) + self.assertEqual(messages[-1].ArbIDOrHeader, 0x100) + for i, arb in enumerate(arbs): + # print(messages[-1].Data, messages[-1].Data[1]>>i & 1) + self.assertEqual(messages[-1].Data[1] >> i & 1, state, "MISC%d PIN state is wrong" % (arb - 0x200)) + self.assertFalse(ics.get_error_messages(self.dev1)) + self.assertFalse(ics.get_error_messages(self.dev2)) + + # Change the device settings + # dev1 = output, dev2 = input + settings = ics.get_device_settings(self.dev1) + settings.misc_io_initial_ddr = 0x3F + settings.misc_io_initial_latch = 0x3F + ics.set_device_settings(self.dev1, settings) + settings = ics.get_device_settings(self.dev2) + settings.misc_io_initial_ddr = 0x0 + settings.misc_io_initial_latch = 0x0 + settings.misc_io_on_report_events = 0x7F + settings.misc_io_report_period = 100 + ics.set_device_settings(self.dev2, settings) + _test_misc(self, 1) + _test_misc(self, 0) \ No newline at end of file diff --git a/tests/runner/test_network.py b/tests/runner/test_network.py deleted file mode 100644 index 758d9de26..000000000 --- a/tests/runner/test_network.py +++ /dev/null @@ -1,118 +0,0 @@ -import unittest -import time -import ics -from ics.py_neo_device_ex import PyNeoDeviceEx -from ics.structures.e_device_settings_type import e_device_settings_type - -unittest.TestLoader.sortTestMethodsUsing = None - - -def are_errors_present(msg): - """Helper function to detect if a message error occurred.""" - error_flags = 0 - error_flags |= ics.SPY_STATUS_GLOBAL_ERR - error_flags |= ics.SPY_STATUS_CRC_ERROR - error_flags |= ics.SPY_STATUS_CAN_ERROR_PASSIVE - error_flags |= ics.SPY_STATUS_HEADERCRC_ERROR - error_flags |= ics.SPY_STATUS_INCOMPLETE_FRAME - error_flags |= ics.SPY_STATUS_LOST_ARBITRATION - error_flags |= ics.SPY_STATUS_UNDEFINED_ERROR - error_flags |= ics.SPY_STATUS_CAN_BUS_OFF - error_flags |= ics.SPY_STATUS_BUS_RECOVERED - error_flags |= ics.SPY_STATUS_BUS_SHORTED_PLUS - error_flags |= ics.SPY_STATUS_BUS_SHORTED_GND - error_flags |= ics.SPY_STATUS_CHECKSUM_ERROR - error_flags |= ics.SPY_STATUS_BAD_MESSAGE_BIT_TIME_ERROR - error_flags |= ics.SPY_STATUS_TX_NOMATCH - error_flags |= ics.SPY_STATUS_COMM_IN_OVERFLOW - error_flags |= ics.SPY_STATUS_EXPECTED_LEN_MISMATCH - error_flags |= ics.SPY_STATUS_MSG_NO_MATCH - error_flags |= ics.SPY_STATUS_BREAK - error_flags |= ics.SPY_STATUS_AVSI_REC_OVERFLOW - error_flags |= ics.SPY_STATUS_BREAK - if (msg.StatusBitField & error_flags) != 0: - return True - return False - - -class BaseTests: - """Base classes. These are isolated and won't be run/discovered. Used for inheritance""" - - class TestCAN(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.netid = None - - @classmethod - def setUp(self): - self.devices = ics.find_devices() - for device in self.devices: - device.open() - device.load_default_settings() - - @classmethod - def tearDown(self): - for device in self.devices: - device.close() - - def test_get_messages(self): - for device in self.devices: - messages, error_count = device.get_messages() - - def test_transmit(self): - data = tuple([x for x in range(64)]) - tx_msg = ics.SpyMessage() - tx_msg.ArbIDOrHeader = 0x01 - tx_msg.NetworkID = self.netid - tx_msg.Protocol = ics.SPY_PROTOCOL_CANFD - tx_msg.StatusBitField = ics.SPY_STATUS_CANFD | ics.SPY_STATUS_NETWORK_MESSAGE_TYPE - tx_msg.StatusBitField3 = ics.SPY_STATUS3_CANFD_BRS | ics.SPY_STATUS3_CANFD_FDF - tx_msg.ExtraDataPtr = data - for device in self.devices: - # Clear any messages in the buffer - _, __ = device.get_messages() - device.transmit_messages(tx_msg) - # CAN ACK timeout in firmware is 200ms, so wait 300ms for ACK - time.sleep(0.3) - start = time.time() - messages, error_count = device.get_messages(False, 1) - elapsed = time.time() - start - print(f"Elapsed time rx: {elapsed:.4f}s") - self.assertEqual(error_count, 0, str(device)) - self.assertTrue(len(messages) > 0, str(device)) - # Find the transmit message - tx_messages = [] - for message in messages: - if message.StatusBitField & ics.SPY_STATUS_TX_MSG == ics.SPY_STATUS_TX_MSG: - tx_messages.append(message) - self.assertEqual(len(tx_messages), 1, str(device)) - for message in tx_messages: - # We should only have one transmit message in the buffer - self.assertEqual(tx_msg.ArbIDOrHeader, message.ArbIDOrHeader, str(device)) - self.assertEqual(tx_msg.NetworkID, message.NetworkID, str(device)) - self.assertEqual(tx_msg.ExtraDataPtr, message.ExtraDataPtr, str(device)) - self.assertFalse(are_errors_present(message), f"{str(device)} {hex(message.StatusBitField)}") - self.assertEqual( - tx_msg.StatusBitField | ics.SPY_STATUS_TX_MSG, - message.StatusBitField, - f"{str(device)} {hex(tx_msg.StatusBitField| ics.SPY_STATUS_TX_MSG)} {hex(message.StatusBitField)}", - ) - self.assertEqual( - tx_msg.StatusBitField3, message.StatusBitField3, f"{str(device)} {hex(message.StatusBitField3)}" - ) - - -class TestHSCAN1(BaseTests.TestCAN): - @classmethod - def setUpClass(cls): - cls.netid = ics.NETID_HSCAN - - -class TestHSCAN2(BaseTests.TestCAN): - @classmethod - def setUpClass(cls): - cls.netid = ics.NETID_HSCAN2 - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/runner/test_open_close.py b/tests/runner/test_open_close.py index 6bd0f8dd1..829375ddb 100644 --- a/tests/runner/test_open_close.py +++ b/tests/runner/test_open_close.py @@ -1,5 +1,7 @@ +# ruff: noqa: E501 import unittest import ics +import time unittest.TestLoader.sortTestMethodsUsing = None @@ -7,7 +9,7 @@ class TestOpenClose(unittest.TestCase): @classmethod def setUpClass(self): - self.expected_dev_count = 3 + self.expected_dev_count = 5 self.devices = ics.find_devices() @classmethod @@ -22,93 +24,195 @@ def tearDownClass(self): del self.devices def _check_devices(self): + devices = ics.find_devices() self.assertEqual( - len(self.devices), + len(devices), self.expected_dev_count, - f"Expected {self.expected_dev_count}, found {len(self.devices)} ({self.devices})...", + f"Device check expected {self.expected_dev_count} devices, found {len(self.devices)}: {self.devices}...", ) + def test_find_device_filters(self): + # Weird error here where ics.find_devices([...]) with all device types crashes python and going one by one sometimes fixes it + ics.find_devices([ics.NEODEVICE_FIRE2]) + ics.find_devices([ics.NEODEVICE_FIRE2, ics.NEODEVICE_FIRE3]) + ics.find_devices([ics.NEODEVICE_FIRE2, ics.NEODEVICE_FIRE3, ics.NEODEVICE_VCAN42]) + ics.find_devices([ics.NEODEVICE_FIRE2, ics.NEODEVICE_FIRE3, ics.NEODEVICE_VCAN42, ics.NEODEVICE_RADMOON2]) + # assigning output to variable does help or hurt too + devices = ics.find_devices([ics.NEODEVICE_FIRE2, ics.NEODEVICE_FIRE3, ics.NEODEVICE_VCAN42, ics.NEODEVICE_RADMOON2]) + + def test_find_moon2s(self): + # self._check_devices() + devices = ics.find_devices([ics.NEODEVICE_RADMOON2]) + self.assertTrue(len(devices) == 2) + self.assertEqual(devices[0].DeviceType, ics.NEODEVICE_RADMOON2) + self.assertEqual(devices[1].DeviceType, ics.NEODEVICE_RADMOON2) + def test_find_fire3(self): - self._check_devices() + # self._check_devices() devices = ics.find_devices([ics.NEODEVICE_FIRE3]) self.assertTrue(len(devices) == 1) self.assertEqual(devices[0].DeviceType, ics.NEODEVICE_FIRE3) def test_find_fire2(self): - self._check_devices() + # self._check_devices() devices = ics.find_devices([ics.NEODEVICE_FIRE2]) self.assertTrue(len(devices) == 1) self.assertEqual(devices[0].DeviceType, ics.NEODEVICE_FIRE2) def test_find_vcan42(self): - self._check_devices() + # self._check_devices() devices = ics.find_devices([ics.NEODEVICE_VCAN42]) self.assertTrue(len(devices) == 1) self.assertEqual(devices[0].DeviceType, ics.NEODEVICE_VCAN42) def test_find_fire2_and_vcan42(self): - self._check_devices() + # self._check_devices() devices = ics.find_devices([ics.NEODEVICE_FIRE2, ics.NEODEVICE_VCAN42]) self.assertTrue(len(devices) == 2) + def test_find_fire3_and_moon2(self): + # self._check_devices() + devices = ics.find_devices([ics.NEODEVICE_FIRE3, ics.NEODEVICE_RADMOON2]) + self.assertTrue(len(devices) == 3) + def test_open_close(self): - self._check_devices() - for device in self.devices: - self.assertEqual(device.NumberOfClients, 0) - self.assertEqual(device.MaxAllowedClients, 1) - d = ics.open_device(device) + # self._check_devices() + for dev in self.devices: + if dev.serial_number != ics.find_devices([dev.DeviceType])[0].serial_number: + continue # skip 2nd moon2 + self.assertEqual( + ics.find_devices([dev.DeviceType])[0].NumberOfClients, + 0, + f"Device {dev} not at 0 NumberOfClients before opening", + ) + self.assertEqual(dev.MaxAllowedClients, 1) + d = ics.open_device(dev) try: - self.assertEqual(device, d) - self.assertEqual(device.NumberOfClients, 1) - self.assertEqual(device.MaxAllowedClients, 1) - - self.assertEqual(d.NumberOfClients, 1) + self.assertEqual(dev, d) + self.assertEqual( + ics.find_devices([dev.DeviceType])[0].NumberOfClients, + 1, + f"Device {dev} failed to increment NumberOfClients after opening", + ) # must search again to see number of clients actually increment + self.assertEqual(dev.MaxAllowedClients, 1) + + self.assertEqual( + ics.find_devices([d.DeviceType])[0].NumberOfClients, + 1, + f"Device {d} failed to increment NumberOfClients after opening", + ) self.assertEqual(d.MaxAllowedClients, 1) finally: - self.assertEqual(device.NumberOfClients, 1) - self.assertEqual(d.NumberOfClients, 1) ics.close_device(d) - self.assertEqual(device.NumberOfClients, 0) - self.assertEqual(d.NumberOfClients, 0) + self.assertEqual( + ics.find_devices([dev.DeviceType])[0].NumberOfClients, + 0, + f"Device {dev} failed to decrement NumberOfClients after opening", + ) + self.assertEqual( + ics.find_devices([d.DeviceType])[0].NumberOfClients, + 0, + f"Device {d} failed to decrement NumberOfClients after opening", + ) + + # Now try with NeoDevice functions + try: + dev.open() + self.assertEqual( + ics.find_devices([dev.DeviceType])[0].NumberOfClients, + 1, + f"Device {dev} failed to increment NumberOfClients after opening", + ) + finally: + dev.close() + self.assertEqual( + ics.find_devices([dev.DeviceType])[0].NumberOfClients, + 0, + f"Device {dev} failed to decrement NumberOfClients after opening", + ) def test_open_close_by_serial(self): - # Open by serial number - for device in self.devices: - d = ics.open_device(device.SerialNumber) - self.assertEqual(d.SerialNumber, device.SerialNumber) + # self._check_devices() + for dev in self.devices: + d = ics.open_device(dev.SerialNumber) + self.assertEqual(d.SerialNumber, dev.SerialNumber) ics.close_device(d) def test_open_close_first_found(self): - # Open by first found + # self._check_devices() first_devices = [] - for x, device in enumerate(self.devices): + for _ in range(self.expected_dev_count): try: - self.assertEqual(device.NumberOfClients, 0, f"{device}") d = ics.open_device() first_devices.append(d) - self.assertEqual(d.NumberOfClients, 1, f"{device}") except ics.RuntimeError as ex: - raise RuntimeError(f"Failed to open {device}... Iteration {len(first_devices)} ({ex})") - self.assertEqual(len(self.devices), len(first_devices)) + raise RuntimeError( + f"Failed to open first found device on iteration {len(first_devices)}: {ex}" + ) + self.assertEqual(self.expected_dev_count, len(first_devices)) # Close by API for device in first_devices: - self.assertEqual(device.NumberOfClients, 1, f"{device}") ics.close_device(device) - self.assertEqual(device.NumberOfClients, 0, f"{device}") def test_open_close_10_times(self): - for device in self.devices: + # self._check_devices() + for dev in self.devices: for x in range(10): try: - self.assertEqual(device.NumberOfClients, 0) - ics.open_device(device) - self.assertEqual(device.NumberOfClients, 1) - error_count = ics.close_device(device) - self.assertEqual(device.NumberOfClients, 0) - self.assertEqual(error_count, 0, "Error count was not 0 on {device} iteration {x}...") + ics.open_device(dev) + error_count = ics.close_device(dev) + self.assertEqual( + error_count, + 0, + f"Error count was not 0 on {dev} iteration {x}...", + ) except Exception as ex: - print(f"Failed at iteration {x} {device}: {ex}...") + print(f"Failed at iteration {x} {dev}: {ex}...") raise ex + # Now try with NeoDevice functions + try: + dev.open() + error_count = dev.close() + self.assertEqual( + error_count, + 0, + f"Error count was not 0 on {dev} iteration {x}...", + ) + except Exception as ex: + print(f"Failed at iteration {x} {dev}: {ex}...") + raise ex + + def test_auto_close(self): + # self._check_devices() + devices = ics.find_devices([ics.NEODEVICE_FIRE2,ics.NEODEVICE_FIRE3,ics.NEODEVICE_VCAN42,ics.NEODEVICE_RADMOON2]) + for dev in devices: + ics.open_device(dev) + # time.sleep(5) + del devices + devices = ics.find_devices( + [ + ics.NEODEVICE_FIRE2, + ics.NEODEVICE_FIRE3, + ics.NEODEVICE_VCAN42, + ics.NEODEVICE_RADMOON2, + ] + ) + for dev in devices: + ics.open_device(dev) + ics.close_device(dev) + del devices + + def test_can_only_open_once(self): + # self._check_devices() + for dev in self.devices: + ics.open_device(dev) + with self.assertRaises(SystemError): + ics.open_device(dev) + with self.assertRaises(SystemError): + dev.open() + ics.close_device(dev) + + if __name__ == "__main__": unittest.main() diff --git a/tests/runner/test_settings.py b/tests/runner/test_settings.py index 5ded46cb0..dc7918ca5 100644 --- a/tests/runner/test_settings.py +++ b/tests/runner/test_settings.py @@ -1,6 +1,8 @@ +import time import unittest import ics -from ics.py_neo_device_ex import PyNeoDeviceEx + +# from ics.py_neo_device_ex import PyNeoDeviceEx from ics.structures.e_device_settings_type import e_device_settings_type unittest.TestLoader.sortTestMethodsUsing = None @@ -15,49 +17,121 @@ def setUpClass(cls): cls.device_type = None cls.device_settings_type = None - def _get_device(self) -> PyNeoDeviceEx: + def _get_device(self): devices = ics.find_devices([self.device_type]) self.assertEqual( len(devices), - 1, - f"Failed to find any device types of {self.device_type}! Expected 1, got {len(devices)}.", + self.num_devices, + f"Failed to find correct number of devices of type {self.device_type}! Expected {self.num_devices}, got {len(devices)}.", ) return devices[0] + def _get_device_specific_settings(self, base_settings): + setting_map = { + e_device_settings_type.DeviceFire3SettingsType: "fire3", + e_device_settings_type.DeviceFire2SettingsType: "cyan", + e_device_settings_type.DeviceVCAN412SettingsType: "vcan4_12", + e_device_settings_type.DeviceRADMoon2SettingsType: "radmoon2", + } + return getattr( + base_settings.Settings, setting_map[self.device_settings_type] + ) + def test_load_defaults(self): device = self._get_device() - device.open() + ics.open_device(device) try: ics.load_default_settings(device) finally: - device.close() + ics.close_device(device) def test_get_set_settings(self): + sleep_time = 0.1 # sec device = self._get_device() - device.open() + ics.open_device(device) try: + # load default settings and verify type ics.load_default_settings(device) - settings = ics.get_device_settings(device) - self.assertEqual(settings.DeviceSettingType, self.device_settings_type) - ics.set_device_settings(device, settings) - _ = ics.get_device_settings(device) - # TODO: compare both settings + time.sleep(sleep_time) + base_settings = ics.get_device_settings(device) + self.assertEqual( + base_settings.DeviceSettingType, + self.device_settings_type, + f"Wrong settings type on {device}", + ) + # change HSCAN1 term setting and verify (other stuff if Moon2) + device_settings = self._get_device_specific_settings(base_settings) + if self.device_type != 5: + device_settings.canfd1.FDBaudrate = ( + ics.BPS1000 + ) # change CAN1 baudrate + else: + device_settings.opEth1.link_spd = 0x1 # change ETH1 speed + ics.set_device_settings(device, base_settings) + time.sleep(sleep_time) + verify_new_settings = self._get_device_specific_settings( + ics.get_device_settings(device) + ) + if self.device_type != 5: + self.assertEqual( + verify_new_settings.canfd1.FDBaudrate, + ics.BPS1000, + f"Failed to set settings on {device}", + ) + else: + self.assertEqual( + verify_new_settings.opEth1.link_spd, + 0x1, + f"Failed to set settings on {device}", + ) + # verify setting default + ics.load_default_settings(device) + time.sleep(sleep_time) + verify_default_settings = self._get_device_specific_settings( + ics.get_device_settings(device) + ) + if self.device_type != 5: + self.assertEqual( + verify_default_settings.canfd1.FDBaudrate, + ics.BPS2000, + f"Failed to load default settings on {device}", + ) + else: + self.assertEqual( + verify_default_settings.opEth1.link_spd, + 0x2, + f"Failed to load default settings on {device}", + ) finally: - device.close() + ics.close_device(device) +class TestRADMoon2Settings(BaseTests.TestSettings): + @classmethod + def setUpClass(cls): + cls.device_type = ics.NEODEVICE_RADMOON2 + cls.device_settings_type = e_device_settings_type.DeviceRADMoon2SettingsType + cls.num_devices = 2 + print("DEBUG: Testing MOON2s...") + + +# HAVING ISSUES SETTING SETTINGS WITH THIS UNIT! class TestFire3Settings(BaseTests.TestSettings): @classmethod def setUpClass(cls): cls.device_type = ics.NEODEVICE_FIRE3 cls.device_settings_type = e_device_settings_type.DeviceFire3SettingsType + cls.num_devices = 1 + print("DEBUG: Testing FIRE3...") - +# ISSUES CONNECTING TO THIS DEVICE AT ALL!!! class TestFire2Settings(BaseTests.TestSettings): @classmethod def setUpClass(cls): cls.device_type = ics.NEODEVICE_FIRE2 cls.device_settings_type = e_device_settings_type.DeviceFire2SettingsType + cls.num_devices = 1 + print("DEBUG: Testing FIRE2...") class TestValueCAN42Settings(BaseTests.TestSettings): @@ -65,6 +139,8 @@ class TestValueCAN42Settings(BaseTests.TestSettings): def setUpClass(cls): cls.device_type = ics.NEODEVICE_VCAN42 cls.device_settings_type = e_device_settings_type.DeviceVCAN412SettingsType + cls.num_devices = 1 + print("DEBUG: Testing VCAN42...") if __name__ == "__main__":