|
5 | 5 | from scanspec.specs import Fly, Line |
6 | 6 |
|
7 | 7 | from ophyd_async.core import ( |
| 8 | + DEFAULT_TIMEOUT, |
8 | 9 | get_mock, |
| 10 | + set_and_wait_for_value, |
9 | 11 | set_mock_value, |
10 | 12 | ) |
11 | 13 | from ophyd_async.epics.motor import Motor |
@@ -54,22 +56,39 @@ async def test_pmac_move_to_start(sim_motors: tuple[PmacIO, Motor, Motor]): |
54 | 56 | ramp_up_position = {sim_x_motor: np.float64(-1.2), sim_y_motor: np.float64(-0.6)} |
55 | 57 | pmac_trajectory = PmacTrajectoryTriggerLogic(pmac_io) |
56 | 58 |
|
57 | | - await pmac_trajectory._move_to_start(motor_info, ramp_up_position) |
58 | | - |
59 | | - coord_mock_calls = get_mock(coord).mock_calls |
60 | | - |
61 | | - assert coord_mock_calls[0] == call.defer_moves.put(True, wait=True) |
62 | | - assert coord_mock_calls[1] == ( |
63 | | - "cs_axis_setpoint.7.put", |
64 | | - (np.float64(-1.2)), |
65 | | - {"wait": True}, |
66 | | - ) |
67 | | - assert coord_mock_calls[2] == ( |
68 | | - "cs_axis_setpoint.8.put", |
69 | | - (np.float64(-0.6)), |
70 | | - {"wait": True}, |
71 | | - ) |
72 | | - assert coord_mock_calls[3] == call.defer_moves.put(False, wait=True) |
| 59 | + # Wrap set_and_wait_for_value to check passed arguments |
| 60 | + with patch( |
| 61 | + "ophyd_async.epics.pmac._pmac_trajectory.set_and_wait_for_value", |
| 62 | + wraps=set_and_wait_for_value, |
| 63 | + ) as spy_set_and_wait_for_value: |
| 64 | + await pmac_trajectory._move_to_start(motor_info, ramp_up_position) |
| 65 | + |
| 66 | + coord_mock_calls = get_mock(coord).mock_calls |
| 67 | + |
| 68 | + assert coord_mock_calls[0] == call.defer_moves.put(True, wait=True) |
| 69 | + assert coord_mock_calls[1] == ( |
| 70 | + "cs_axis_setpoint.7.put", |
| 71 | + (np.float64(-1.2)), |
| 72 | + {"wait": True}, |
| 73 | + ) |
| 74 | + assert coord_mock_calls[2] == ( |
| 75 | + "cs_axis_setpoint.8.put", |
| 76 | + (np.float64(-0.6)), |
| 77 | + {"wait": True}, |
| 78 | + ) |
| 79 | + assert coord_mock_calls[3] == call.defer_moves.put(False, wait=True) |
| 80 | + |
| 81 | + # Longest move time should be sim_motor_x, so calculate this |
| 82 | + expected_timeout = DEFAULT_TIMEOUT + ( |
| 83 | + abs(ramp_up_position[sim_x_motor]) |
| 84 | + / motor_info.motor_max_velocity[sim_x_motor] |
| 85 | + ) |
| 86 | + |
| 87 | + # All motors should have the same move timeout |
| 88 | + assert all( |
| 89 | + call_.kwargs["set_timeout"] == expected_timeout |
| 90 | + for call_ in spy_set_and_wait_for_value.mock_calls |
| 91 | + ) |
73 | 92 |
|
74 | 93 |
|
75 | 94 | async def test_pmac_trajectory_kickoff( |
|
0 commit comments