diff --git a/samples/openthread/cli/harness-thci/nRF_Connect_SDK_13_14.py b/samples/openthread/cli/harness-thci/nRF_Connect_SDK_13_14.py index 5043d4242f9..508e455d217 100644 --- a/samples/openthread/cli/harness-thci/nRF_Connect_SDK_13_14.py +++ b/samples/openthread/cli/harness-thci/nRF_Connect_SDK_13_14.py @@ -48,6 +48,7 @@ import serial from serial.serialutil import SerialException +from typing import Literal import commissioner from GRLLibs.ThreadPacket.PlatformPackets import ( @@ -335,9 +336,10 @@ def __executeCommand(self, cmd, timeout=DEFAULT_COMMAND_TIMEOUT, ignore_exec=Fal if line is None: time.sleep(0.01) continue - line = line.replace("\x1b[J", "") + # self.log("readline: %s", line) # skip empty lines + line = line.replace("\x1b[J", "") if line: response.append(line) @@ -347,6 +349,8 @@ def __executeCommand(self, cmd, timeout=DEFAULT_COMMAND_TIMEOUT, ignore_exec=Fal m = OpenThreadTHCI._COMMAND_OUTPUT_ERROR_PATTERN.match(line) if m is not None: code, msg = m.groups() + if ignore_exec: + return True raise CommandError(int(code), msg) else: if ignore_exec: @@ -964,7 +968,9 @@ def getMAC(self, bType=MacType.RandomMac): specific type of MAC address """ # if power down happens, return extended address assigned previously - if self.isPowerDown: + if self.isPowerDown and bType == MacType.EthMac: + return self._deviceGetEtherMac() + elif self.isPowerDown: macAddr64 = self.mac else: if bType == MacType.FactoryMac: @@ -2234,9 +2240,9 @@ def diagnosticQuery(self, strDestinationAddr="", listTLV_ids=[], rloc16=0): @API def vendorInfo(self, name="", model="", version=""): - vn = "vendor name %s" % name - vm = "vendor model %s" % model - vv = "vendor swversion %s" % version + vn = "vendor name %s" % name.replace(" ", "") + vm = "vendor model %s" % model.replace(" ", "") + vv = "vendor swversion %s" % version.replace(" ", "") self.__executeCommand(vn) self.__executeCommand(vm) self.__executeCommand(vv) @@ -3050,10 +3056,10 @@ def ValidateDeviceFirmware(self): # elif self.DeviceCapability == OT12_CAPBS: # return OT12_VERSION in self.UIStatusMsg if self.DeviceCapability == OT13_CAPBS: - return [ - True if OT13_VER in self.UIStatusMsg else False - for OT13_VER in OT13_VERSION - ] + OT13_FirmwareCheck = any( + OT13_VER in self.UIStatusMsg for OT13_VER in OT13_VERSION + ) + return OT13_FirmwareCheck else: return False @@ -3262,26 +3268,45 @@ def sendUdp(self, destination, port, payload="hello"): return self.__executeCommand(cmd)[-1] == "Done" @API - def send_udp(self, interface, destination, port, payload="12ABcd"): + def send_udp( + self, + interface, + destination, + port, + payload="12ABcd", + return_result=False, + spiff_udp_ip="", + ): """payload hexstring""" assert payload is not None, "payload should not be none" assert interface == 0, "non-BR must send UDP to Thread interface" - self.__udpOpen() + self.__udpOpen(spiff_udp_ip, port) time.sleep(0.5) + cmd_ = "test tmforiginfilter disable" + self.__executeCommand(cmd_) cmd = "udp send %s %s -x %s" % (destination, port, payload) - result = self.__executeCommand(cmd)[-1] == "Done" - + if not return_result: + result = self.__executeCommand(cmd)[-1] == "Done" + else: + result = self.__executeCommand(cmd)[0] + _cmd = "test tmforiginfilter enable" + self.__executeCommand(_cmd) return result - def __udpOpen(self): + def __udpOpen(self, spiff_udp_ip="", port=""): + if spiff_udp_ip != "": + self.__isUdpOpened = False + self.__executeCommand("udp close") if not self.__isUdpOpened: cmd = "udp open" self.__executeCommand(cmd) # Bind to RLOC address and first dynamic port rlocAddr = self.getRloc() - - cmd = "udp bind %s 49152" % rlocAddr + if spiff_udp_ip == "": + cmd = "udp bind %s 49152" % rlocAddr + else: + cmd = "udp connect %s %s" % (spiff_udp_ip, port) self.__executeCommand(cmd) self.__isUdpOpened = True @@ -3404,6 +3429,10 @@ def mle_Discovery(self, channel): cmd = "discover %s" % channel return self.__executeCommand(cmd) + @API + def icmp_ping_req(self, addr): + return self.__executeCommand("ping %s 2" % addr) + ######################################################################################################################## # 1.3 THCI Commands # @@ -3736,7 +3765,9 @@ def get_own_omr_address(self, omr_prefix): def srp_client_remove(self, instancename, servicename): cmd = "srp client service remove %s %s" % (instancename, servicename) + # cmd = 'srp client service remove service-test-1 _thread-test._udp' self.__executeCommand(cmd) + # self.__executeCommand("netdata register") time.sleep(3) def srpCallBackEnable(self): @@ -3758,10 +3789,6 @@ def host_clear(self): cmd = "srp client host clear" self.__executeCommand(cmd) - def client_key_lease(self, keylease): - cmd = "srp client keyleaseinterval %s" % keylease - self.__executeCommand(cmd) - def srpCallBackDisable(self): cmd = "srp client callback disable" self.__executeCommand(cmd) @@ -3872,8 +3899,8 @@ def srp_update( srv, tcp_service_type, port, - weight, priority, + weight, txt, ) else: @@ -3881,8 +3908,8 @@ def srp_update( srv, service_type, port, - weight, priority, + weight, txt, ) self.__executeCommand(cmd) @@ -4019,8 +4046,8 @@ def srp_add( srv, tcp_service_type, port, - weight, priority, + weight, txt, ) else: @@ -4028,8 +4055,8 @@ def srp_add( srv, service_type, port, - weight, priority, + weight, txt, ) self.__executeCommand(cmd) @@ -4045,6 +4072,41 @@ def srp_add( cmd = "srp client start %s %s" % (dst_addr, port) self.__executeCommand(cmd) + def srp_service_registration( + self, + host_name, + host_addr, + lease_info=(), + dst_addr_and_port=(), + services=[], + service_key: Literal["", "enable", "disable"] = "", + name_compression: Literal["", "enable", "disable"] = "", + ): + self.__executeCommand("srp client host clear") + self.__executeCommand(f"srp client host name {host_name}") + self.__executeCommand(f"srp client host address {host_addr}") + self.__executeCommand(f"srp client leaseinterval {lease_info[0]}") + self.__executeCommand(f"srp client keyleaseinterval {lease_info[1]}") + + if service_key != "": + self.__executeCommand(f"srp client service key {service_key}") + + if name_compression != "": + self.__executeCommand(f"dns compression {name_compression}") + + if dst_addr_and_port: + self.__executeCommand("srp client stop") + self.__executeCommand(f"dns config {dst_addr_and_port[0]} 53 6000 3 1") + self.__executeCommand( + f"srp client start {dst_addr_and_port[0]} {dst_addr_and_port[1]}" + ) + else: + self.__executeCommand("srp client autostart enable") + + for instance_name, service_name, port, priority, weight, txt in services: + cmd = f"srp client service add {instance_name} {service_name} {port} {priority} {weight} {txt}" + self.__executeCommand(cmd) + def dns_query(self, addr="", service_type="_thread-test._udp"): """Send unicast DNS query @@ -4073,9 +4135,9 @@ def dns_resolve4(self, host_name): cmd = "dns resolve4 %s.default.service.arpa" % host_name self.__executeCommand(cmd) - def dns_resolve_ip(self, host_name, server_ip=""): + def dns_resolve_ip(self, host_name, server_ip="", ignore_exec=False): cmd = "dns resolve %s %s" % (host_name, server_ip) - self.__executeCommand(cmd) + self.__executeCommand(cmd, ignore_exec=ignore_exec) def dns_resolve_ipv4(self, host_name, server_ip="", ignore_exec=False): cmd = "dns resolve4 %s %s" % (host_name, server_ip) @@ -4154,14 +4216,20 @@ def dns_tcp_query_instance(self, service_name, service_type): self.__executeCommand(cmd) def dns_query_instance( - self, ip_addr, service_name, service_type="_thread-test._udp" + self, + service_name, + service_type="_thread-test._udp", + ip_addr="", + ignore_exec=False, ): - cmd = "dns service %s %s.default.service.arpa %s 53 6000 3 1" % ( + cmd = "dns service %s %s.default.service.arpa" % ( service_name, service_type, - ip_addr, ) - self.__executeCommand(cmd) + if ip_addr: + cmd += f" {ip_addr} 53 6000 3 1" + print(cmd) + self.__executeCommand(cmd, ignore_exec=ignore_exec) def setMliid(self, sMlIid): self.__setMlIid(sMlIid) @@ -4174,6 +4242,10 @@ def srp_host_remove(self): cmd = "srp client host remove" self.__executeCommand(cmd) + def client_service_clear(self, instancename, servicename): + cmd = "srp client service clear %s %s" % (instancename, servicename) + self.__executeCommand(cmd) + def add_new_service(self): rloc = self.__executeCommand("rloc16")[0] mleid = self.__executeCommand("ipaddr mleid")[0] @@ -4250,9 +4322,49 @@ def stopDTLSSession(self): # time.sleep(15) @API - def add_service(self, service, server): - self.__executeCommand("service add 44970 %s %s" % (service, server)) - self.__executeCommand("netdata register") + def add_service(self, service="", server="", service_data=False): + if service_data: + self.__executeCommand("service add 44970 %s%s" % (service, server)) + self.__executeCommand("netdata register") + else: + self.__executeCommand("service add 44970 %s %s" % (service, server)) + self.__executeCommand("netdata register") + + def tcp_http_request(self, addr="", port=80, hexa_http_request=""): + self.__executeCommand("tcp init") + self.__executeCommand("tcp connect %s %s" % (addr, port)) + time.sleep(3) + tcp_request = self.__executeCommand( + "tcp send -x %s" % hexa_http_request, ignore_exec=True + ) + print(tcp_request) + time.sleep(3) + self.__executeCommand("tcp sendend", ignore_exec=True) + self.__executeCommand("tcp deinit", ignore_exec=True) + return "Failed" if tcp_request is True else True + + def nat64_prefix(self): + netdata = self.__executeCommand("netdata show") + nat64 = "" + for addr in netdata: + if "/96" in addr: + nat64 = addr + break + return nat64 + + def pd_omr(self, prefix): + pd = self.__executeCommand("ipaddr") + result = "" + for pd_omr in pd[:-1]: + if ModuleHelper.GetFullIpv6Address(pd_omr).startswith(prefix): + result = pd_omr + break + else: + continue + if result == "": + prefix = ModuleHelper.GetFullIpv6Address(prefix + "::") + result = prefix + return result ######################################################################################################################## # 1.3 THCI Commands