-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexec_plan.py
More file actions
316 lines (257 loc) · 10.2 KB
/
Copy pathexec_plan.py
File metadata and controls
316 lines (257 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
"""Parsing script for running high-level commands on the spot.
This code makes it possible to write down a sequence of robot commands
in a text file and have them run on the real robot. The logic for
managing, e.g., the lease for the spot and the relevant datastructures
for specifiying SE(2) poses can be abstracted away from the high-level
plan provided as input.
"""
import argparse
from typing import Dict, Optional
import numpy as np
import rerun as rr
import yaml
from bosdyn.client import create_standard_sdk, math_helpers
from bosdyn.client.lease import LeaseClient, LeaseKeepAlive
from bosdyn.client.util import authenticate
from numpy.typing import NDArray
from skills.close_cabinet import close_drawer as run_close_drawer
from skills.drop_into_container import drop_into_container as run_drop_into_container
from skills.erase_whiteboard import wipe_online as run_erase_whiteboard
from skills.grasp_vlm import grasp_with_vlm
from skills.open_cabinet import open_drawer as run_open_drawer
from skills.place_at import place_at as run_place_at
from skills.push_button import push_button as run_push_button
from skills.spot_hand_move import (
close_gripper,
move_hand_to_relative_pose,
open_gripper,
)
from skills.spot_navigation import navigate_to_absolute_pose
from skills.wipe import wipe_multiple_strokes
# from skills.wipe_online import wipe_online as run_wipe_online
from skills.wipe_online_iphone import wipe_online as run_wipe_online
from spot_utils.spot_localization import SpotLocalizer
from spot_utils.utils import (
get_graph_nav_dir,
verify_estop,
)
DEFAULT_HAND_LOOK_FLOOR_POSE = math_helpers.SE3Pose(
x=0.80, y=0.0, z=0.25, rot=math_helpers.Quat.from_pitch(np.pi / 3)
)
DEFAULT_HAND_LOOK_STRAIGHT_DOWN_POSE = math_helpers.SE3Pose(
x=0.80, y=0.0, z=0.25, rot=math_helpers.Quat.from_pitch(np.pi / 2)
)
direction_to_pose = {
"DOWN": DEFAULT_HAND_LOOK_STRAIGHT_DOWN_POSE,
"AHEAD": DEFAULT_HAND_LOOK_FLOOR_POSE,
}
grasp_offset = math_helpers.SE3Pose(0, 0, 0, math_helpers.Quat.from_pitch(np.pi / 2))
LOCALIZER = None
ROBOT = None
SAM_ENDPOINT = None
SPOT_ROOM_POSE: Dict[str, float] = dict()
def np_pose_to_SE3(X_RobEE: NDArray) -> math_helpers.SE3Pose:
"""Convert a numpy pose array to a Spot SDK SE3Pose."""
return math_helpers.SE3Pose(
X_RobEE[0],
X_RobEE[1],
X_RobEE[2],
rot=math_helpers.Quat(X_RobEE[6], X_RobEE[3], X_RobEE[4], X_RobEE[5]),
)
def init(hostname: str, map_name: str, endpoint_url: Optional[str]) -> None:
"""Initialize the robot and the localizer."""
global LOCALIZER
global ROBOT
global SAM_ENDPOINT
# Initialize Rerun for visualization
rr.init("spot_plan_execution", spawn=True)
sdk = create_standard_sdk("NavigationSkillTestClient")
ROBOT = sdk.create_robot(hostname)
authenticate(ROBOT)
verify_estop(ROBOT)
path = get_graph_nav_dir(map_name)
lease_client = ROBOT.ensure_client(LeaseClient.default_service_name)
lease_client.take()
lease_keepalive = LeaseKeepAlive(
lease_client, must_acquire=True, return_at_exit=True
)
LOCALIZER = SpotLocalizer(ROBOT, path, lease_client, lease_keepalive)
ROBOT.time_sync.wait_for_sync()
LOCALIZER.localize()
SAM_ENDPOINT = endpoint_url
def map_to_spot(pose: math_helpers.SE2Pose) -> math_helpers.SE2Pose:
"""Convert from coordinates in the "room" frame, to spot coordinates."""
tf = math_helpers.SE2Pose(
SPOT_ROOM_POSE["x"], SPOT_ROOM_POSE["y"], SPOT_ROOM_POSE["angle"]
)
return tf.mult(pose)
def move_to(x_abs: float, y_abs: float, yaw_abs: float) -> None:
"""Move the robot to the specified absolute pose."""
desired_pose = math_helpers.SE2Pose(x=x_abs, y=y_abs, angle=yaw_abs)
desired_pose_spot = map_to_spot(desired_pose)
if ROBOT is not None and LOCALIZER is not None:
navigate_to_absolute_pose(ROBOT, LOCALIZER, desired_pose_spot)
def gaze(direction: str) -> None:
"""Move the hand to look in a certain direction."""
look_pose = direction_to_pose[direction]
move_hand_to_relative_pose(ROBOT, look_pose)
open_gripper(ROBOT)
def gaze_without_open(direction: str) -> None:
"""Move the hand to look in a certain direction without opening the gripper."""
look_pose = direction_to_pose[direction]
move_hand_to_relative_pose(ROBOT, look_pose)
def grasp(text_prompt: Optional[str]) -> None:
"""Grasp an object at a specified pixel."""
assert ROBOT is not None, "Sahit why!"
assert LOCALIZER is not None, "SAHIT WHY!!!!"
grasp_with_vlm(ROBOT, LOCALIZER, text_prompt)
def grasp_at_pose(X_RobEE: NDArray) -> None:
"""Grasp an object at a specified pose relative to the robot."""
open_gripper(ROBOT)
pose = np_pose_to_SE3(X_RobEE)
move_hand_to_relative_pose(ROBOT, pose.mult(grasp_offset))
close_gripper(ROBOT)
move_hand_to_relative_pose(ROBOT, DEFAULT_HAND_LOOK_FLOOR_POSE)
def place_at_pose(z_above_surface_m: float = 0.1) -> None:
"""Place an object using VLM-guided placement.
Uses the iPhone camera and Gemini VLM to find an open region on a table
surface, then places the currently held object there.
Args:
z_above_surface_m: Height above the detected surface to release the
object. Defaults to 0.1 meters.
"""
assert ROBOT is not None, "Robot is not initialized; call init(...) first."
run_place_at(ROBOT, z_above_surface_m=z_above_surface_m)
def drop(z_above_surface_m: float = 0.3) -> None:
"""Drop an object into a container using VLM-guided placement.
Uses the iPhone camera and Gemini VLM to find an open region in a container,
then drops the currently held object there.
Args:
z_above_surface_m: Height above the detected surface to release the
object. Defaults to 0.1 meters.
"""
assert ROBOT is not None, "Robot is not initialized; call init(...) first."
run_drop_into_container(ROBOT, z_above_surface_m=z_above_surface_m)
def vertical_wipe(
X_RobEE_start: NDArray, stroke_dx: float, y_delta: float, num_strokes: int
) -> None:
"""Wipes a surface at a given pose with known height and width.."""
start_pose = np_pose_to_SE3(X_RobEE_start)
wipe_multiple_strokes(
ROBOT,
start_pose,
start_pose,
stroke_dx=stroke_dx,
stroke_dy=0,
delta_x_y_between_strokes=(0, y_delta),
num_strokes=num_strokes,
duration_per_stroke=3.0,
num_attempts_per_stroke=1,
)
def wipe_at(*args, **kwargs) -> None:
"""Run the online wipe skill using defaults from the skill module."""
run_wipe_online(
ROBOT,
None,
None,
LOCALIZER,
)
def erase(vlm_query_template: Optional[str] = None) -> None:
"""Erase a whiteboard using the iPhone-driven erase skill.
Args:
vlm_query_template: If provided, overrides the default VLM prompt used to
identify writing on the whiteboard.
"""
extra_kwargs = {}
if vlm_query_template is not None:
extra_kwargs["vlm_query_template"] = vlm_query_template
run_erase_whiteboard(
ROBOT,
None,
None,
LOCALIZER,
**extra_kwargs,
)
def press_button(text_prompt: Optional[str]) -> None:
"""Identify a button and push it using the hand camera.
If text_prompt is provided, it will be used as the label (e.g., "button").
"""
label = text_prompt if text_prompt else "button"
if ROBOT is not None and LOCALIZER is not None:
run_push_button(
ROBOT,
label=label,
)
def open_cabinet_drawer(
standoff_dist: float = 1.1,
body_height_offset: float = 0.0,
retreat_offset: float = 0.4,
) -> None:
"""Open a drawer using the high-level open_drawer skill.
This delegates to ``skills.open_drawer.open_drawer``, passing the
initialized global ``ROBOT`` and ``LOCALIZER``.
"""
assert ROBOT is not None, "Robot is not initialized; call init(...) first."
assert LOCALIZER is not None, "Localizer is not initialized; call init(...) first."
run_open_drawer(
ROBOT,
LOCALIZER,
standoff_dist=standoff_dist,
body_height_offset=body_height_offset,
retreat_offset=retreat_offset,
)
def close_cabinet_drawer(
standoff_dist: float = 0.8,
body_height_offset: float = 0.0,
advance_offset: float = 0.4,
) -> None:
"""Close a drawer using the high-level close_drawer skill.
This delegates to ``skills.close_drawer.close_drawer``, passing the
initialized global ``ROBOT`` and ``LOCALIZER``.
"""
assert ROBOT is not None, "Robot is not initialized; call init(...) first."
assert LOCALIZER is not None, "Localizer is not initialized; call init(...) first."
run_close_drawer(
ROBOT,
LOCALIZER,
standoff_dist=standoff_dist,
body_height_offset=body_height_offset,
advance_offset=advance_offset,
)
if __name__ == "__main__":
# running this script standalone initializes a bosdyn robot and localizer.
# It then executes the list of commands provided in the plan file
parser = argparse.ArgumentParser(description="Parse the robot's hostname.")
parser.add_argument(
"--hostname",
type=str,
required=True,
help="The robot's hostname/ip-address (e.g. 192.168.80.3)",
)
parser.add_argument(
"--map_name",
type=str,
required=True,
help="The name of the map folder to load (sub-folder under graph_nav_maps)",
)
parser.add_argument(
"--plan", type=str, required=True, help="Path of the Plan to run"
)
parser.add_argument(
"--sam_endpoint",
type=str,
required=False,
help="Address of endpoint hosting GroundedSAM",
)
args = parser.parse_args()
init(args.hostname, args.map_name, args.sam_endpoint)
with open(get_graph_nav_dir(args.map_name) / "metadata.yaml", "rb") as f:
metadata = yaml.safe_load(f)
if "spot-room-pose" in metadata.keys():
SPOT_ROOM_POSE = metadata["spot-room-pose"]
else:
print("spot-room-pose not found in metadata.yaml, using default val")
SPOT_ROOM_POSE = {"x": 0.0, "y": 0.0, "angle": 0.0}
with open(args.plan, "r") as plan_file:
exec(plan_file.read())
print("done")