55from .util import SAMPLE_EDS , tmp_file
66
77
8- class TestPDO (unittest .TestCase ):
8+ class TestPDO (unittest .IsolatedAsyncioTestCase ):
9+
10+ __test__ = False # This is a base class, tests should not be run directly.
11+ use_async : bool
912
1013 def setUp (self ):
1114 node = canopen .Node (1 , SAMPLE_EDS )
@@ -17,59 +20,115 @@ def setUp(self):
1720 pdo .add_variable ('BOOLEAN value' , length = 1 ) # 0x2005
1821 pdo .add_variable ('BOOLEAN value 2' , length = 1 ) # 0x2006
1922
20- # Write some values
21- pdo ['INTEGER16 value' ].raw = - 3
22- pdo ['UNSIGNED8 value' ].raw = 0xf
23- pdo ['INTEGER8 value' ].raw = - 2
24- pdo ['INTEGER32 value' ].raw = 0x01020304
25- pdo ['BOOLEAN value' ].raw = False
26- pdo ['BOOLEAN value 2' ].raw = True
27-
2823 self .pdo = pdo
2924 self .node = node
3025
31- def test_pdo_map_bit_mapping (self ):
32- self . assertEqual ( self . pdo . data , b' \xfd \xff \xef \x04 \x03 \x02 \x01 \x02 ' )
26+ async def set_values (self ):
27+ """Initialize the PDO with some valuues.
3328
34- def test_pdo_map_getitem (self ):
29+ Do this in a separate method in order to be abel to use the
30+ async and sync versions of the tests.
31+ """
32+ node = self .node
33+ pdo = node .pdo .tx [1 ]
34+ if self .use_async :
35+ # Write some values (different from the synchronous values)
36+ await pdo ['INTEGER16 value' ].aset_raw (12 )
37+ await pdo ['UNSIGNED8 value' ].aset_raw (0xe )
38+ await pdo ['INTEGER8 value' ].aset_raw (- 4 )
39+ await pdo ['INTEGER32 value' ].aset_raw (0x56789abc )
40+ await pdo ['BOOLEAN value' ].aset_raw (True )
41+ await pdo ['BOOLEAN value 2' ].aset_raw (False )
42+ else :
43+ # Write some values
44+ pdo ['INTEGER16 value' ].raw = - 3
45+ pdo ['UNSIGNED8 value' ].raw = 0xf
46+ pdo ['INTEGER8 value' ].raw = - 2
47+ pdo ['INTEGER32 value' ].raw = 0x01020304
48+ pdo ['BOOLEAN value' ].raw = False
49+ pdo ['BOOLEAN value 2' ].raw = True
50+
51+ async def test_pdo_map_bit_mapping (self ):
52+ await self .set_values ()
53+ if self .use_async :
54+ self .assertEqual (self .pdo .data , b'\x0c \x00 \xce \xbc \x9a \x78 \x56 \x01 ' )
55+ else :
56+ self .assertEqual (self .pdo .data , b'\xfd \xff \xef \x04 \x03 \x02 \x01 \x02 ' )
57+
58+ async def test_pdo_map_getitem (self ):
59+ await self .set_values ()
3560 pdo = self .pdo
36- self .assertEqual (pdo ['INTEGER16 value' ].raw , - 3 )
37- self .assertEqual (pdo ['UNSIGNED8 value' ].raw , 0xf )
38- self .assertEqual (pdo ['INTEGER8 value' ].raw , - 2 )
39- self .assertEqual (pdo ['INTEGER32 value' ].raw , 0x01020304 )
40- self .assertEqual (pdo ['BOOLEAN value' ].raw , False )
41- self .assertEqual (pdo ['BOOLEAN value 2' ].raw , True )
42-
43- def test_pdo_getitem (self ):
61+ if self .use_async :
62+ self .assertEqual (await pdo ['INTEGER16 value' ].aget_raw (), 12 )
63+ self .assertEqual (await pdo ['UNSIGNED8 value' ].aget_raw (), 0xe )
64+ self .assertEqual (await pdo ['INTEGER8 value' ].aget_raw (), - 4 )
65+ self .assertEqual (await pdo ['INTEGER32 value' ].aget_raw (), 0x56789abc )
66+ self .assertEqual (await pdo ['BOOLEAN value' ].aget_raw (), True )
67+ self .assertEqual (await pdo ['BOOLEAN value 2' ].aget_raw (), False )
68+ else :
69+ self .assertEqual (pdo ['INTEGER16 value' ].raw , - 3 )
70+ self .assertEqual (pdo ['UNSIGNED8 value' ].raw , 0xf )
71+ self .assertEqual (pdo ['INTEGER8 value' ].raw , - 2 )
72+ self .assertEqual (pdo ['INTEGER32 value' ].raw , 0x01020304 )
73+ self .assertEqual (pdo ['BOOLEAN value' ].raw , False )
74+ self .assertEqual (pdo ['BOOLEAN value 2' ].raw , True )
75+
76+ async def test_pdo_getitem (self ):
77+ await self .set_values ()
4478 node = self .node
45- self .assertEqual (node .tpdo [1 ]['INTEGER16 value' ].raw , - 3 )
46- self .assertEqual (node .tpdo [1 ]['UNSIGNED8 value' ].raw , 0xf )
47- self .assertEqual (node .tpdo [1 ]['INTEGER8 value' ].raw , - 2 )
48- self .assertEqual (node .tpdo [1 ]['INTEGER32 value' ].raw , 0x01020304 )
49- self .assertEqual (node .tpdo ['INTEGER32 value' ].raw , 0x01020304 )
50- self .assertEqual (node .tpdo [1 ]['BOOLEAN value' ].raw , False )
51- self .assertEqual (node .tpdo [1 ]['BOOLEAN value 2' ].raw , True )
52-
53- # Test different types of access
54- self .assertEqual (node .pdo [0x1600 ]['INTEGER16 value' ].raw , - 3 )
55- self .assertEqual (node .pdo ['INTEGER16 value' ].raw , - 3 )
56- self .assertEqual (node .pdo .tx [1 ]['INTEGER16 value' ].raw , - 3 )
57- self .assertEqual (node .pdo [0x2001 ].raw , - 3 )
58- self .assertEqual (node .tpdo [0x2001 ].raw , - 3 )
59- self .assertEqual (node .pdo [0x2002 ].raw , 0xf )
60- self .assertEqual (node .pdo ['0x2002' ].raw , 0xf )
61- self .assertEqual (node .tpdo [0x2002 ].raw , 0xf )
62- self .assertEqual (node .pdo [0x1600 ][0x2002 ].raw , 0xf )
63-
64- def test_pdo_save (self ):
65- self .node .tpdo .save ()
66- self .node .rpdo .save ()
67-
68- def test_pdo_export (self ):
79+ if self .use_async :
80+ self .assertEqual (await node .tpdo [1 ]['INTEGER16 value' ].aget_raw (), 12 )
81+ self .assertEqual (await node .tpdo [1 ]['UNSIGNED8 value' ].aget_raw (), 0xe )
82+ self .assertEqual (await node .tpdo [1 ]['INTEGER8 value' ].aget_raw (), - 4 )
83+ self .assertEqual (await node .tpdo [1 ]['INTEGER32 value' ].aget_raw (), 0x56789abc )
84+ self .assertEqual (await node .tpdo ['INTEGER32 value' ].aget_raw (), 0x56789abc )
85+ self .assertEqual (await node .tpdo [1 ]['BOOLEAN value' ].aget_raw (), True )
86+ self .assertEqual (await node .tpdo [1 ]['BOOLEAN value 2' ].aget_raw (), False )
87+
88+ # Test different types of access
89+ self .assertEqual (await node .pdo [0x1600 ]['INTEGER16 value' ].aget_raw (), 12 )
90+ self .assertEqual (await node .pdo ['INTEGER16 value' ].aget_raw (), 12 )
91+ self .assertEqual (await node .pdo .tx [1 ]['INTEGER16 value' ].aget_raw (), 12 )
92+ self .assertEqual (await node .pdo [0x2001 ].aget_raw (), 12 )
93+ self .assertEqual (await node .tpdo [0x2001 ].aget_raw (), 12 )
94+ self .assertEqual (await node .pdo [0x2002 ].aget_raw (), 0xe )
95+ self .assertEqual (await node .pdo ['0x2002' ].aget_raw (), 0xe )
96+ self .assertEqual (await node .tpdo [0x2002 ].aget_raw (), 0xe )
97+ self .assertEqual (await node .pdo [0x1600 ][0x2002 ].aget_raw (), 0xe )
98+ else :
99+ self .assertEqual (node .tpdo [1 ]['INTEGER16 value' ].raw , - 3 )
100+ self .assertEqual (node .tpdo [1 ]['UNSIGNED8 value' ].raw , 0xf )
101+ self .assertEqual (node .tpdo [1 ]['INTEGER8 value' ].raw , - 2 )
102+ self .assertEqual (node .tpdo [1 ]['INTEGER32 value' ].raw , 0x01020304 )
103+ self .assertEqual (node .tpdo ['INTEGER32 value' ].raw , 0x01020304 )
104+ self .assertEqual (node .tpdo [1 ]['BOOLEAN value' ].raw , False )
105+ self .assertEqual (node .tpdo [1 ]['BOOLEAN value 2' ].raw , True )
106+
107+ # Test different types of access
108+ self .assertEqual (node .pdo [0x1600 ]['INTEGER16 value' ].raw , - 3 )
109+ self .assertEqual (node .pdo ['INTEGER16 value' ].raw , - 3 )
110+ self .assertEqual (node .pdo .tx [1 ]['INTEGER16 value' ].raw , - 3 )
111+ self .assertEqual (node .pdo [0x2001 ].raw , - 3 )
112+ self .assertEqual (node .tpdo [0x2001 ].raw , - 3 )
113+ self .assertEqual (node .pdo [0x2002 ].raw , 0xf )
114+ self .assertEqual (node .pdo ['0x2002' ].raw , 0xf )
115+ self .assertEqual (node .tpdo [0x2002 ].raw , 0xf )
116+ self .assertEqual (node .pdo [0x1600 ][0x2002 ].raw , 0xf )
117+
118+ async def test_pdo_save (self ):
119+ await self .set_values ()
120+ if self .use_async :
121+ await self .node .tpdo .asave ()
122+ await self .node .rpdo .asave ()
123+ else :
124+ self .node .tpdo .save ()
125+ self .node .rpdo .save ()
126+
127+ async def test_pdo_export (self ):
69128 try :
70129 import canmatrix
71130 except ImportError :
72- raise unittest . SkipTest ("The PDO export API requires canmatrix" )
131+ self . skipTest ("The PDO export API requires canmatrix" )
73132
74133 for pdo in "tpdo" , "rpdo" :
75134 with tmp_file (suffix = ".csv" ) as tmp :
@@ -82,5 +141,17 @@ def test_pdo_export(self):
82141 self .assertIn ("Frame Name" , header )
83142
84143
144+ class TestPDOSync (TestPDO ):
145+ """ Test the functions in synchronous mode. """
146+ __test__ = True
147+ use_async = False
148+
149+
150+ class TestPDOAsync (TestPDO ):
151+ """ Test the functions in asynchronous mode. """
152+ __test__ = True
153+ use_async = True
154+
155+
85156if __name__ == "__main__" :
86157 unittest .main ()
0 commit comments