1
- import time
2
-
3
1
import pytest
2
+ import rclpy
4
3
from moveit_pro_test_utils .objective_test_fixture import (
5
4
ExecuteObjectiveResource ,
6
5
execute_objective_resource as execute_objective_resource ,
23
22
"Plan and Save Trajectory" ,
24
23
"Record and Replay Scanning Motion" ,
25
24
}
25
+
26
26
skip_objectives = {
27
27
"Grasp Planning" ,
28
28
"Joint Diagnostic" ,
34
34
}
35
35
36
36
37
+ def wait_for_joint_states (timeout : int = 180 ):
38
+ node = rclpy .create_node ("test_node" )
39
+ start_time = node .get_clock ().now ()
40
+ while (node .get_clock ().now () - start_time ).to_msg ().sec < timeout :
41
+ if (
42
+ node .count_publishers ("/joint_states" ) > 0
43
+ and node .count_publishers ("/wrist_camera/color" ) > 0
44
+ and node .count_publishers ("/display_contacts" )
45
+ ):
46
+ return True
47
+ rclpy .spin_once (node , timeout_sec = 0.1 )
48
+ raise TimeoutError (
49
+ "Timeout waiting for /joint_states and /wrist_camera/color and /display_contacts to start publishing"
50
+ )
51
+
52
+
37
53
@pytest .mark .parametrize (
38
54
"objective_id, should_cancel" ,
39
55
get_objective_pytest_params ("lab_sim" , cancel_objectives , skip_objectives ),
@@ -43,4 +59,5 @@ def test_all_objectives(
43
59
should_cancel : bool ,
44
60
execute_objective_resource : ExecuteObjectiveResource ,
45
61
):
62
+ wait_for_joint_states ()
46
63
run_objective (objective_id , should_cancel , execute_objective_resource )
0 commit comments