Skip to content
Merged
1 change: 1 addition & 0 deletions moler/device/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ def _create_instance_and_remember_it(
cls._devices_params[name] = {}
cls._devices_params[name]["class_fullname"] = device_class
cls._devices_params[name]["constructor_parameters"] = constructor_parameters
del cls._devices_params[name]["constructor_parameters"]["io_connection"] # do not keep connection in device params, it is not needed to create device and can cause problems with copying
cls._devices_params[name]["cloned_from"] = None
handler = functools.partial(cls.forget_device_handler, name)
device.register_device_removal_callback(callback=handler)
Expand Down
13 changes: 12 additions & 1 deletion moler/device/textualdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,8 @@ def _run_prompts_observers(self):
self._prepare_reverse_state_prompts_dict()

for_state = self._get_for_state_to_run_prompts_observers()
if self._prompts_event is not None:
self._stop_prompts_observers()
self._prompts_event = self.get_event(
event_name="wait4prompts",
event_params={
Expand All @@ -1105,6 +1107,15 @@ def _run_prompts_observers(self):
self._prompts_event.check_against_all_prompts = self._check_all_prompts_on_line
self._prompts_event.disable_log_occurrence()
self._prompts_event.start()
start_time = time.monotonic()
while self._prompts_event.is_in_runner() is False:
if time.monotonic() - start_time > 10:
self._log(
logging.WARNING,
f"Cannot start prompts observers properly. Still not in runner after {time.monotonic() - start_time} seconds.",
)
break
time.sleep(self._tick_to_check_runner)

def _prepare_reverse_state_prompts_dict(self):
for state in self._state_prompts.keys():
Expand Down Expand Up @@ -1148,7 +1159,7 @@ def _stop_prompts_observers(self):
time.sleep(self._tick_to_check_runner)
event.remove_event_occurred_callback()
except Exception as e:
self.logger.error(f"Cannot stop prompts observers properly: {e}")
pass

def build_trigger_to_state(self, state):
trigger = f"GOTO_{state}"
Expand Down
17 changes: 9 additions & 8 deletions moler/io/raw/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"""

__author__ = 'Grzegorz Latuszek, Marcin Usielski'
__copyright__ = 'Copyright (C) 2018-2021, Nokia'
__copyright__ = 'Copyright (C) 2018-2026, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com, marcin.usielski@nokia.com'

import threading
Expand Down Expand Up @@ -211,13 +211,14 @@ def open(self):
"""Start thread pulling data from FIFO buffer."""
ret = super(ThreadedFifoBuffer, self).open()
done = threading.Event()
self.pulling_thread = TillDoneThread(target=self.pull_data,
done_event=done,
kwargs={'pulling_done': done})
self.pulling_thread.start()
self._log(msg=f"open {self}", level=logging.INFO)
self._notify_on_connect()
self.moler_connection.open()
if self.pulling_thread is None:
self.pulling_thread = TillDoneThread(target=self.pull_data,
done_event=done,
kwargs={'pulling_done': done})
self.pulling_thread.start()
self._log(msg=f"open {self}", level=logging.INFO)
self._notify_on_connect()
self.moler_connection.open()
return ret

def close(self):
Expand Down
5 changes: 3 additions & 2 deletions moler/observer_thread_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"""Wrapper for observer registered in ThreadedMolerConnection (old name: ObservableConnection)."""

__author__ = 'Marcin Usielski'
__copyright__ = 'Copyright (C) 2020-2024, Nokia'
__copyright__ = 'Copyright (C) 2020-2026, Nokia'
__email__ = 'marcin.usielski@nokia.com'

import logging
Expand Down Expand Up @@ -35,7 +35,8 @@ def __init__(self, observer, observer_self, logger):
self._request_end = threading.Event()
self._timeout_for_get_from_queue = 1
self.logger = logger
self._t = Thread(target=self._loop_for_observer, name=f"ObserverThreadWrapper-{ObserverThreadWrapper._th_nr}-{observer_self}")
self.name = f"ObserverThreadWrapper-{ObserverThreadWrapper._th_nr}-{observer_self}"
self._t = Thread(target=self._loop_for_observer, name=self.name)
ObserverThreadWrapper._th_nr += 1
self._t.daemon = True
self._t.start()
Expand Down
3 changes: 2 additions & 1 deletion moler/runner_single_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,4 +400,5 @@ def is_connection_observer_running(self, connection_observer) -> bool:
:return: True if connection_observer is currently running in this runner, False otherwise.
"""
with self._connection_observer_lock:
return connection_observer in self._connections_observers
return connection_observer in self._connections_observers or \
connection_observer in self._to_remove_connection_observers
24 changes: 16 additions & 8 deletions moler/threaded_moler_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"""

__author__ = "Grzegorz Latuszek, Marcin Usielski, Michal Ernst"
__copyright__ = "Copyright (C) 2018-2021, Nokia"
__copyright__ = "Copyright (C) 2018-2026, Nokia"
__email__ = (
"grzegorz.latuszek@nokia.com, marcin.usielski@nokia.com, michal.ernst@nokia.com"
)
Expand Down Expand Up @@ -105,14 +105,15 @@ def subscribe(self, observer, connection_closed_handler):
self.logger.debug(
f">>> Entering {self._observers_lock}. conn-obs '{observer}' moler-conn '{self}'"
)
observer_key, value = self._get_observer_key_value(observer)
with self._observers_lock:
self.logger.debug(
f">>> Entered {self._observers_lock}. conn-obs '{observer}' moler-conn '{self}'"
)
self._log(level=TRACE, msg=f"subscribe({observer})")
observer_key, value = self._get_observer_key_value(observer)

if observer_key not in self._observer_wrappers:
assert observer_key not in self._connection_closed_handlers
self_for_observer, observer_reference = value
self._observer_wrappers[observer_key] = self._create_observer_wrapper(
observer_reference=observer_reference,
Expand Down Expand Up @@ -142,19 +143,21 @@ def unsubscribe(self, observer, connection_closed_handler):
self.logger.debug(
f">>> Entering {self._observers_lock}. conn-obs '{observer}' moler-conn '{self}'"
)
observer_key, _ = self._get_observer_key_value(observer)
with self._observers_lock:
self.logger.debug(
f">>> Entered {self._observers_lock}. conn-obs '{observer}' moler-conn '{self}'"
)
self._log(level=TRACE, msg=f"unsubscribe({observer})")
observer_key, _ = self._get_observer_key_value(observer)
if observer_key in self._observer_wrappers and observer_key in self._connection_closed_handlers:
self._observer_wrappers[observer_key].request_stop()
del self._connection_closed_handlers[observer_key]
del self._observer_wrappers[observer_key]
elif observer_key not in self._observer_wrappers and observer_key not in self._connection_closed_handlers:
pass # both not subscribed - do nothing
else:
self._log(
level=logging.WARNING,
level=logging.ERROR,
msg=f"{observer} and {connection_closed_handler} were not both subscribed.",
levels_to_go_up=2,
)
Expand All @@ -167,9 +170,13 @@ def shutdown(self):
Closes connection with notifying all observers about closing.
:return: None
"""
for handler in list(self._connection_closed_handlers.values()):
handler()
self._connection_closed_handlers = {}
with self._observers_lock:
for handler in list(self._connection_closed_handlers.values()):
handler()
for handler in list(self._observer_wrappers.values()):
handler.request_stop()
self._observer_wrappers = {}
self._connection_closed_handlers = {}
super(ThreadedMolerConnection, self).shutdown()

def notify_observers(self, data, recv_time):
Expand All @@ -179,7 +186,8 @@ def notify_observers(self, data, recv_time):
:param recv_time: time of data really read form connection.
:return None
"""
subscribers_wrappers = list(self._observer_wrappers.values())
with self._observers_lock:
subscribers_wrappers = list(self._observer_wrappers.values())
for wrapper in subscribers_wrappers:
try:
self.logger.debug(
Expand Down
62 changes: 59 additions & 3 deletions moler/util/devices_SM.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""

__author__ = "Michal Ernst, Marcin Usielski"
__copyright__ = "Copyright (C) 2019-2025, Nokia"
__copyright__ = "Copyright (C) 2019-2026, Nokia"
__email__ = "michal.ernst@nokia.com, marcin.usielski@nokia.com"

import math
Expand All @@ -13,6 +13,7 @@
import random
import threading
import time
import traceback

from moler.config import load_config
from moler.device import DeviceFactory
Expand Down Expand Up @@ -108,7 +109,7 @@ def iterate_over_device_states(
if max_time is None:
assert 0 == states_to_test.qsize()
for ex in exceptions:
print(f"ex: '{ex}' -> '{repr(ex)}'.")
print(f"ex: '{ex}' -> '{repr(ex)}': {traceback.format_tb(ex.__traceback__)}")
assert 0 == len(exceptions)


Expand Down Expand Up @@ -176,6 +177,7 @@ def _start_device_tests(
timeout_multiply=timeout_multiply,
sleep_after_changed_state=sleep_after_changed_state
)
device.remove()
except Exception as ex:
exceptions.append(ex)
MolerTest.info(f"exception: '{ex}' -> '{repr(ex)}'")
Expand Down Expand Up @@ -254,7 +256,9 @@ def _prepare_device(device, connection, device_output):
if device._established is True: # pylint: disable=protected-access
device._established = False # pylint: disable=protected-access
device.establish_connection()

if device.current_state == "NOT_CONNECTED":
device.io_connection.inject(b"moler_bash#")
time.sleep(0.2)
assert device.current_state != "NOT_CONNECTED"


Expand Down Expand Up @@ -420,3 +424,55 @@ def moler_check_sm_identity(devices: list):
print(f"state={state}, observer={observer}, obs0={pformat(obs0)}, obs1={pformat(obs1)}")
print(f"diff: {compare_objects(obs0, obs1)}")
assert obs0 == obs1


class DeviceCM:

_any_instance = False

def __init__(self, name, connection, device_output, test_file_path):
self.name = name
self.connection = connection
self.device_output = device_output
self.test_file_path = test_file_path
self._device = None
self._threading_tick = 1 # seconds to wait between checks if threads are finished after device removal
self._thread_timeout = 20 # seconds to wait for threads to finish after device removal
self._threads_nr = -1
self._threads_names = []

def __enter__(self):
if self._device is None:
for thread in threading.enumerate():
if "RunnerSingle" in thread.name:
if DeviceCM._any_instance:
self._threads_nr = threading.active_count()
self._threads_names = [thread.name for thread in threading.enumerate()]
break
else:
DeviceCM._any_instance = True
self._device = get_device(name=self.name, connection=self.connection, device_output=self.device_output,
test_file_path=self.test_file_path)
return self._device

def __exit__(self, exc_type, exc_val, exc_tb):
if self._device is not None:
DeviceFactory.remove_device(device=self._device)
self._device = None
if self._threads_nr > 0:
start_time = time.monotonic()
current_threads_names = [thread.name for thread in threading.enumerate()]
unexpected_thread_names = [name for name in current_threads_names if name not in self._threads_names]
while threading.active_count() > self._threads_nr or len(unexpected_thread_names) > 0:
current_threads_names = [thread.name for thread in threading.enumerate()]
unexpected_thread_names = [name for name in current_threads_names if name not in self._threads_names]
if time.monotonic() - start_time > self._thread_timeout:
break
time.sleep(self._threading_tick)
msg = (f"threads no after device removal: {threading.active_count()}, expected no more than {self._threads_nr}:"
f"current threads: {current_threads_names}, expected threads: {self._threads_names}, "
f"unexpected threads: {unexpected_thread_names}")
assert threading.active_count() <= self._threads_nr and len(unexpected_thread_names) == 0, msg # No new thread was left after device removal.
else:
time.sleep(3 * self._threading_tick) # just wait a bit to be sure that all threads are finished after device removal
return False # don't suppress exceptions
29 changes: 14 additions & 15 deletions test/device/test_SM_adb_remote.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
__author__ = 'Grzegorz Latuszek, Marcin Usielski'
__copyright__ = 'Copyright (C) 2020-2025, Nokia'
__copyright__ = 'Copyright (C) 2020-2026, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com, marcin.usielski@nokia.com'

import pytest

from moler.util.devices_SM import iterate_over_device_states, get_device, moler_check_sm_identity
from moler.util.devices_SM import DeviceCM, get_memory_device_connection, iterate_over_device_states, moler_check_sm_identity



Expand All @@ -13,27 +13,26 @@

@pytest.mark.parametrize("device_name", adb_remotes)
def test_adb_remote_device(device_name, device_connection, adb_remote_output):
adb_remote = get_device(name=device_name, connection=device_connection, device_output=adb_remote_output,
test_file_path=__file__)

iterate_over_device_states(device=adb_remote)
with DeviceCM(name=device_name, connection=device_connection, device_output=adb_remote_output,
test_file_path=__file__) as adb_remote:
iterate_over_device_states(device=adb_remote)


@pytest.mark.parametrize("device_name", adb_remotes_proxy_pc)
def test_adb_remote_device_proxy_pc(device_name, device_connection, adb_remote_output_proxy_pc):
adb_remote = get_device(name=device_name, connection=device_connection, device_output=adb_remote_output_proxy_pc,
test_file_path=__file__)

iterate_over_device_states(device=adb_remote)
with DeviceCM(name=device_name, connection=device_connection, device_output=adb_remote_output_proxy_pc,
test_file_path=__file__) as adb_remote:
iterate_over_device_states(device=adb_remote)


@pytest.mark.parametrize("devices", [adb_remotes_proxy_pc, adb_remotes])
def test_unix_sm_identity(devices, device_connection, adb_remote_output):
dev0 = get_device(name=devices[0], connection=device_connection, device_output=adb_remote_output,
test_file_path=__file__)
dev1 = get_device(name=devices[1], connection=device_connection, device_output=adb_remote_output,
test_file_path=__file__)
moler_check_sm_identity([dev0, dev1])
assert len(devices) == 2
with DeviceCM(name=devices[0], connection=device_connection, device_output=adb_remote_output,
test_file_path=__file__) as dev0:
with DeviceCM(name=devices[1], connection=get_memory_device_connection(), device_output=adb_remote_output,
test_file_path=__file__) as dev1:
moler_check_sm_identity([dev0, dev1])


@pytest.fixture
Expand Down
30 changes: 15 additions & 15 deletions test/device/test_SM_at_remote.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
__author__ = 'Grzegorz Latuszek, Marcin Usielski'
__copyright__ = 'Copyright (C) 2020-2025, Nokia'
__copyright__ = 'Copyright (C) 2020-2026, Nokia'
__email__ = 'grzegorz.latuszek@nokia.com, marcin.usielski@nokia.com'

import pytest

from moler.util.devices_SM import iterate_over_device_states, get_device, moler_check_sm_identity
from moler.util.devices_SM import get_memory_device_connection, iterate_over_device_states, moler_check_sm_identity, DeviceCM

at_remotes = ["AT_REMOTE", "AT_REMOTE3"]
at_remotes_proxy_pc = ["AT_REMOTE_PROXY_PC", "AT_REMOTE_PROXY_PC3"]


@pytest.mark.parametrize("device_name", at_remotes)
def test_at_remote_device(device_name, device_connection, at_remote_output):
at_remote = get_device(name=device_name, connection=device_connection, device_output=at_remote_output,
test_file_path=__file__)

iterate_over_device_states(device=at_remote)
with DeviceCM(name=device_name, connection=device_connection, device_output=at_remote_output,
test_file_path=__file__) as at_remote:
iterate_over_device_states(device=at_remote)


@pytest.mark.parametrize("device_name", at_remotes_proxy_pc)
def test_at_remote_device_proxy_pc(device_name, device_connection, at_remote_output_proxy_pc):
at_remote = get_device(name=device_name, connection=device_connection, device_output=at_remote_output_proxy_pc,
test_file_path=__file__)

iterate_over_device_states(device=at_remote)
with DeviceCM(name=device_name, connection=device_connection, device_output=at_remote_output_proxy_pc,
test_file_path=__file__) as at_remote:
iterate_over_device_states(device=at_remote)


@pytest.mark.parametrize("devices", [at_remotes_proxy_pc, at_remotes])
def test_unix_sm_identity(devices, device_connection, at_remote_output):
dev0 = get_device(name=devices[0], connection=device_connection, device_output=at_remote_output,
test_file_path=__file__)
dev1 = get_device(name=devices[1], connection=device_connection, device_output=at_remote_output,
test_file_path=__file__)
moler_check_sm_identity([dev0, dev1])
assert len(devices) == 2
with DeviceCM(name=devices[0], connection=device_connection, device_output=at_remote_output,
test_file_path=__file__) as dev0:
with DeviceCM(name=devices[1], connection=get_memory_device_connection(), device_output=at_remote_output,
test_file_path=__file__) as dev1:
moler_check_sm_identity([dev0, dev1])


@pytest.fixture
def at_remote_output():
Expand Down
Loading
Loading