Skip to content

Commit afa668f

Browse files
Add test_emcy.py for EMCY module coverage
1 parent 0130eb8 commit afa668f

File tree

1 file changed

+301
-8
lines changed

1 file changed

+301
-8
lines changed

test/test_emcy.py

Lines changed: 301 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import threading
33
import unittest
44
from contextlib import contextmanager
5+
from unittest.mock import Mock, patch
56

67
import can
78

@@ -25,13 +26,11 @@ def check_error(self, err, code, reg, data, ts):
2526
self.assertAlmostEqual(err.timestamp, ts)
2627

2728
def test_emcy_consumer_on_emcy(self):
28-
# Make sure multiple callbacks receive the same information.
2929
acc1 = []
3030
acc2 = []
3131
self.emcy.add_callback(lambda err: acc1.append(err))
3232
self.emcy.add_callback(lambda err: acc2.append(err))
3333

34-
# Dispatch an EMCY datagram.
3534
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
3635

3736
self.assertEqual(len(self.emcy.log), 1)
@@ -45,7 +44,6 @@ def test_emcy_consumer_on_emcy(self):
4544
data=bytes([0, 1, 2, 3, 4]), ts=1000,
4645
)
4746

48-
# Dispatch a new EMCY datagram.
4947
self.emcy.on_emcy(0x81, b'\x10\x90\x01\x04\x03\x02\x01\x00', 2000)
5048
self.assertEqual(len(self.emcy.log), 2)
5149
self.assertEqual(len(self.emcy.active), 2)
@@ -58,7 +56,6 @@ def test_emcy_consumer_on_emcy(self):
5856
data=bytes([4, 3, 2, 1, 0]), ts=2000,
5957
)
6058

61-
# Dispatch an EMCY reset.
6259
self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 2000)
6360
self.assertEqual(len(self.emcy.log), 3)
6461
self.assertEqual(len(self.emcy.active), 0)
@@ -94,24 +91,20 @@ def timer(func):
9491
finally:
9592
t.join(TIMEOUT)
9693

97-
# Check unfiltered wait, on timeout.
9894
self.assertIsNone(self.emcy.wait(timeout=TIMEOUT))
9995

100-
# Check unfiltered wait, on success.
10196
with timer(push_err) as t:
10297
with self.assertLogs(level=logging.INFO):
10398
t.start()
10499
err = self.emcy.wait(timeout=TIMEOUT)
105100
check_err(err)
106101

107-
# Check filtered wait, on success.
108102
with timer(push_err) as t:
109103
with self.assertLogs(level=logging.INFO):
110104
t.start()
111105
err = self.emcy.wait(0x2001, TIMEOUT)
112106
check_err(err)
113107

114-
# Check filtered wait, on timeout.
115108
with timer(push_err) as t:
116109
t.start()
117110
self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT))
@@ -123,6 +116,110 @@ def push_reset():
123116
t.start()
124117
self.assertIsNone(self.emcy.wait(0x9000, TIMEOUT))
125118

119+
def test_emcy_consumer_initialization(self):
120+
"""Test EmcyConsumer initialization state."""
121+
consumer = canopen.emcy.EmcyConsumer()
122+
self.assertEqual(consumer.log, [])
123+
self.assertEqual(consumer.active, [])
124+
self.assertEqual(consumer.callbacks, [])
125+
self.assertIsInstance(consumer.emcy_received, threading.Condition)
126+
127+
def test_emcy_consumer_multiple_callbacks(self):
128+
"""Test adding multiple callbacks and their execution order."""
129+
call_order = []
130+
131+
def callback1(err):
132+
call_order.append('callback1')
133+
134+
def callback2(err):
135+
call_order.append('callback2')
136+
137+
def callback3(err):
138+
call_order.append('callback3')
139+
140+
self.emcy.add_callback(callback1)
141+
self.emcy.add_callback(callback2)
142+
self.emcy.add_callback(callback3)
143+
144+
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
145+
146+
self.assertEqual(call_order, ['callback1', 'callback2', 'callback3'])
147+
self.assertEqual(len(self.emcy.callbacks), 3)
148+
149+
def test_emcy_consumer_callback_exception_handling(self):
150+
"""Test that callback exceptions don't break other callbacks or the system."""
151+
successful_callbacks = []
152+
153+
def failing_callback(err):
154+
raise ValueError("Test exception in callback")
155+
156+
def successful_callback1(err):
157+
successful_callbacks.append('success1')
158+
159+
def successful_callback2(err):
160+
successful_callbacks.append('success2')
161+
162+
self.emcy.add_callback(successful_callback1)
163+
self.emcy.add_callback(failing_callback)
164+
self.emcy.add_callback(successful_callback2)
165+
166+
with self.assertRaises(ValueError):
167+
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
168+
169+
def test_emcy_consumer_error_reset_variants(self):
170+
"""Test different error reset code patterns."""
171+
self.emcy.on_emcy(0x81, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
172+
self.emcy.on_emcy(0x81, b'\x10\x90\x01\x04\x03\x02\x01\x00', 2000)
173+
self.assertEqual(len(self.emcy.active), 2)
174+
175+
self.emcy.on_emcy(0x81, b'\x00\x00\x00\x00\x00\x00\x00\x00', 3000)
176+
self.assertEqual(len(self.emcy.active), 0)
177+
178+
self.emcy.on_emcy(0x81, b'\x01\x30\x02\x00\x01\x02\x03\x04', 4000)
179+
self.assertEqual(len(self.emcy.active), 1)
180+
181+
self.emcy.on_emcy(0x81, b'\x99\x00\x01\x00\x00\x00\x00\x00', 5000)
182+
self.assertEqual(len(self.emcy.active), 0)
183+
184+
def test_emcy_consumer_wait_timeout_edge_cases(self):
185+
"""Test wait method with various timeout scenarios."""
186+
result = self.emcy.wait(timeout=0)
187+
self.assertIsNone(result)
188+
189+
result = self.emcy.wait(timeout=0.001)
190+
self.assertIsNone(result)
191+
192+
def test_emcy_consumer_wait_concurrent_errors(self):
193+
"""Test wait method when multiple errors arrive concurrently."""
194+
def push_multiple_errors():
195+
self.emcy.on_emcy(0x81, b'\x01\x20\x01\x01\x02\x03\x04\x05', 100)
196+
self.emcy.on_emcy(0x81, b'\x02\x20\x01\x01\x02\x03\x04\x05', 101)
197+
self.emcy.on_emcy(0x81, b'\x03\x20\x01\x01\x02\x03\x04\x05', 102)
198+
199+
t = threading.Timer(TIMEOUT / 2, push_multiple_errors)
200+
with self.assertLogs(level=logging.INFO):
201+
t.start()
202+
err = self.emcy.wait(0x2003, timeout=TIMEOUT)
203+
t.join(TIMEOUT)
204+
205+
self.assertIsNotNone(err)
206+
self.assertEqual(err.code, 0x2003)
207+
208+
def test_emcy_consumer_wait_time_expiry_during_execution(self):
209+
"""Test wait method when time expires while processing."""
210+
def push_err_with_delay():
211+
import time
212+
time.sleep(TIMEOUT * 1.5)
213+
self.emcy.on_emcy(0x81, b'\x01\x20\x01\x01\x02\x03\x04\x05', 100)
214+
215+
t = threading.Timer(TIMEOUT / 4, push_err_with_delay)
216+
t.start()
217+
218+
result = self.emcy.wait(timeout=TIMEOUT)
219+
t.join(TIMEOUT * 2)
220+
221+
self.assertIsNone(result)
222+
126223

127224
class TestEmcyError(unittest.TestCase):
128225
def test_emcy_error(self):
@@ -180,6 +277,75 @@ def check(code, expected):
180277
check(0xff00, "Device Specific")
181278
check(0xffff, "Device Specific")
182279

280+
def test_emcy_error_initialization_types(self):
281+
"""Test EmcyError initialization with various data types."""
282+
error = EmcyError(0x1000, 0, b'', 123.456)
283+
self.assertEqual(error.code, 0x1000)
284+
self.assertEqual(error.register, 0)
285+
self.assertEqual(error.data, b'')
286+
self.assertEqual(error.timestamp, 123.456)
287+
288+
error = EmcyError(0xFFFF, 0xFF, b'\xFF' * 5, float('inf'))
289+
self.assertEqual(error.code, 0xFFFF)
290+
self.assertEqual(error.register, 0xFF)
291+
self.assertEqual(error.data, b'\xFF' * 5)
292+
self.assertEqual(error.timestamp, float('inf'))
293+
294+
def test_emcy_error_str_edge_cases(self):
295+
"""Test string representation with edge cases."""
296+
error = EmcyError(0x0000, 0, b'', 1000)
297+
self.assertEqual(str(error), "Code 0x0000, Error Reset / No Error")
298+
299+
error = EmcyError(0x0001, 0, b'', 1000)
300+
self.assertEqual(str(error), "Code 0x0001, Error Reset / No Error")
301+
302+
error = EmcyError(0x0100, 0, b'', 1000)
303+
self.assertEqual(str(error), "Code 0x0100")
304+
305+
error = EmcyError(0xFFFF, 0, b'', 1000)
306+
self.assertEqual(str(error), "Code 0xFFFF, Device Specific")
307+
308+
def test_emcy_error_get_desc_boundary_conditions(self):
309+
"""Test get_desc method with boundary conditions."""
310+
def check(code, expected):
311+
err = EmcyError(code, 1, b'', 1000)
312+
actual = err.get_desc()
313+
self.assertEqual(actual, expected)
314+
315+
check(0x0000, "Error Reset / No Error")
316+
check(0x00FF, "Error Reset / No Error")
317+
check(0x0100, "")
318+
319+
check(0x0FFF, "")
320+
check(0x1000, "Generic Error")
321+
check(0x10FF, "Generic Error")
322+
check(0x1100, "")
323+
324+
check(0x1FFF, "")
325+
check(0x2000, "Current")
326+
check(0x2FFF, "Current")
327+
check(0x3000, "Voltage")
328+
329+
check(0x4FFF, "Temperature")
330+
check(0x5000, "Device Hardware")
331+
check(0x50FF, "Device Hardware")
332+
check(0x5100, "")
333+
334+
def test_emcy_error_inheritance(self):
335+
"""Test that EmcyError properly inherits from Exception."""
336+
error = EmcyError(0x1000, 0, b'', 1000)
337+
338+
self.assertIsInstance(error, Exception)
339+
340+
with self.assertRaises(EmcyError):
341+
raise error
342+
343+
try:
344+
raise error
345+
except Exception as e:
346+
self.assertIsInstance(e, EmcyError)
347+
self.assertEqual(e.code, 0x1000)
348+
183349

184350
class TestEmcyProducer(unittest.TestCase):
185351
def setUp(self):
@@ -220,6 +386,133 @@ def check(*args, res):
220386
check(3, res=b'\x00\x00\x03\x00\x00\x00\x00\x00')
221387
check(3, b"\xaa\xbb", res=b'\x00\x00\x03\xaa\xbb\x00\x00\x00')
222388

389+
def test_emcy_producer_initialization(self):
390+
"""Test EmcyProducer initialization."""
391+
producer = canopen.emcy.EmcyProducer(0x123)
392+
self.assertEqual(producer.cob_id, 0x123)
393+
network = producer.network
394+
self.assertIsNotNone(network)
395+
396+
def test_emcy_producer_send_edge_cases(self):
397+
"""Test EmcyProducer send method with edge cases."""
398+
def check(*args, res):
399+
self.emcy.send(*args)
400+
self.check_response(res)
401+
402+
check(0xFFFF, 0xFF, b'\xFF\xFF\xFF\xFF\xFF',
403+
res=b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF')
404+
405+
check(0x0000, 0x00, b'',
406+
res=b'\x00\x00\x00\x00\x00\x00\x00\x00')
407+
408+
check(0x1234, 0x56, b'\xAB\xCD',
409+
res=b'\x34\x12\x56\xAB\xCD\x00\x00\x00')
410+
411+
check(0x1234, 0x56, b'\xAB\xCD\xEF\x12\x34',
412+
res=b'\x34\x12\x56\xAB\xCD\xEF\x12\x34')
413+
414+
def test_emcy_producer_reset_edge_cases(self):
415+
"""Test EmcyProducer reset method with edge cases."""
416+
def check(*args, res):
417+
self.emcy.reset(*args)
418+
self.check_response(res)
419+
420+
check(0xFF, res=b'\x00\x00\xFF\x00\x00\x00\x00\x00')
421+
422+
check(0xFF, b'\xFF\xFF\xFF\xFF\xFF',
423+
res=b'\x00\x00\xFF\xFF\xFF\xFF\xFF\xFF')
424+
425+
check(0x12, b'\xAB\xCD',
426+
res=b'\x00\x00\x12\xAB\xCD\x00\x00\x00')
427+
428+
def test_emcy_producer_network_assignment(self):
429+
"""Test EmcyProducer network assignment and usage."""
430+
producer = canopen.emcy.EmcyProducer(0x100)
431+
initial_network = producer.network
432+
433+
producer.network = self.net
434+
self.assertEqual(producer.network, self.net)
435+
436+
producer.send(0x1000)
437+
msg = self.rxbus.recv(TIMEOUT)
438+
self.assertIsNotNone(msg)
439+
self.assertEqual(msg.arbitration_id, 0x100)
440+
441+
def test_emcy_producer_struct_packing(self):
442+
"""Test that the EMCY_STRUCT packing works correctly."""
443+
from canopen.emcy import EMCY_STRUCT
444+
445+
packed = EMCY_STRUCT.pack(0x1234, 0x56, b'\xAB\xCD\xEF\x12\x34')
446+
expected = b'\x34\x12\x56\xAB\xCD\xEF\x12\x34'
447+
self.assertEqual(packed, expected)
448+
449+
code, register, data = EMCY_STRUCT.unpack(expected)
450+
self.assertEqual(code, 0x1234)
451+
self.assertEqual(register, 0x56)
452+
self.assertEqual(data, b'\xAB\xCD\xEF\x12\x34')
453+
454+
packed = EMCY_STRUCT.pack(0x1234, 0x56, b'\xAB')
455+
expected = b'\x34\x12\x56\xAB\x00\x00\x00\x00'
456+
self.assertEqual(packed, expected)
457+
458+
459+
class TestEmcyIntegration(unittest.TestCase):
460+
"""Integration tests for EMCY producer and consumer."""
461+
462+
def setUp(self):
463+
self.txbus = can.Bus(interface="virtual")
464+
self.rxbus = can.Bus(interface="virtual")
465+
self.net = canopen.Network(self.txbus)
466+
self.net.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
467+
self.net.connect()
468+
469+
self.producer = canopen.emcy.EmcyProducer(0x081)
470+
self.producer.network = self.net
471+
472+
self.consumer = canopen.emcy.EmcyConsumer()
473+
474+
def tearDown(self):
475+
self.net.disconnect()
476+
self.txbus.shutdown()
477+
self.rxbus.shutdown()
478+
479+
def test_producer_consumer_integration(self):
480+
"""Test that producer and consumer work together."""
481+
received_errors = []
482+
self.consumer.add_callback(lambda err: received_errors.append(err))
483+
484+
self.producer.send(0x2001, 0x02, b'\x01\x02\x03\x04\x05')
485+
486+
msg = self.rxbus.recv(TIMEOUT)
487+
self.assertIsNotNone(msg)
488+
489+
self.consumer.on_emcy(msg.arbitration_id, msg.data, msg.timestamp)
490+
491+
self.assertEqual(len(received_errors), 1)
492+
self.assertEqual(len(self.consumer.log), 1)
493+
self.assertEqual(len(self.consumer.active), 1)
494+
495+
error = received_errors[0]
496+
self.assertEqual(error.code, 0x2001)
497+
self.assertEqual(error.register, 0x02)
498+
self.assertEqual(error.data, b'\x01\x02\x03\x04\x05')
499+
500+
def test_producer_reset_consumer_integration(self):
501+
"""Test producer reset clears consumer active errors."""
502+
self.consumer.on_emcy(0x081, b'\x01\x20\x02\x00\x01\x02\x03\x04', 1000)
503+
self.assertEqual(len(self.consumer.active), 1)
504+
505+
self.producer.reset()
506+
507+
msg = self.rxbus.recv(TIMEOUT)
508+
self.assertIsNotNone(msg)
509+
510+
self.consumer.on_emcy(msg.arbitration_id, msg.data, msg.timestamp)
511+
512+
self.assertEqual(len(self.consumer.active), 0)
513+
self.assertEqual(len(self.consumer.log), 2)
514+
223515

224516
if __name__ == "__main__":
225517
unittest.main()
518+

0 commit comments

Comments
 (0)