Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions examples/run_put_bowl_teleop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python3
"""
Teleop script for Put Bowl Inside Microwave environment.
"""

import genesis as gs
import torch
from gs_agent.wrappers.teleop_wrapper import KeyboardWrapper
from gs_env.sim.envs.config.registry import EnvArgsRegistry
from gs_env.sim.envs.manipulation.put_bowl_inside_microwave_env import PutBowlInsideMicrowaveEnv


def main() -> None:
"""Run teleop for put bowl inside microwave task."""
print("Initializing Put Bowl Inside Microwave Teleop System...")

# Initialize Genesis
gs.init(
seed=0,
precision="32",
logging_level="info",
backend=gs.cpu, # type: ignore
)

# Create teleop wrapper first (without environment)
print("Creating teleop wrapper...")
teleop_wrapper = KeyboardWrapper(
env=None,
device=torch.device("cpu"),
movement_speed=0.01, # Position movement speed
rotation_speed=0.05, # Rotation speed
trajectory_filename_prefix="put_bowl_microwave_",
)

# Start teleop wrapper (keyboard listener) FIRST, before creating Genesis scene
teleop_wrapper.start() # type: ignore

# Create task environment AFTER teleop wrapper is running
env = PutBowlInsideMicrowaveEnv(
args=EnvArgsRegistry["put_bowl_inside_microwave_default"],
device=torch.device("cpu"),
)
teleop_wrapper.set_environment(env)

print("\n" + "=" * 50)
print("Put Bowl Inside Microwave TELEOP SYSTEM READY")
print("=" * 50)
print("📝 TRAJECTORY RECORDING INSTRUCTIONS:")
print(" 1. Press 'r' to start recording (anytime)")
print(" 2. Move robot with arrow keys, n/m, space")
print(" 3. Press 'r' again to stop recording and save")
print(" 💡 Recording works from any state now!")
print("=" * 50)

# Run the main control loop in the main thread (Genesis viewer requires this)
try:
step_count = 0
while teleop_wrapper.running:
# Step the teleop wrapper (this processes input and steps environment)
teleop_wrapper.step(torch.tensor([]))
step_count += 1

# Check for quit command
if (
hasattr(teleop_wrapper, "last_command")
and teleop_wrapper.last_command
and hasattr(teleop_wrapper.last_command, "quit_teleop")
and teleop_wrapper.last_command.quit_teleop
):
print("Quit command received, exiting...")
break

# Safety check - exit after 1 hour of running
if step_count > 180000: # 1 hour at 50Hz
print("Maximum runtime reached, exiting...")
break

except KeyboardInterrupt:
print("\n👋 Teleop interrupted by user")

finally:
# Cleanup
teleop_wrapper.stop()
print("✅ Teleop session ended")


if __name__ == "__main__":
main()
6 changes: 6 additions & 0 deletions src/env/gs_env/sim/envs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .manipulation import GoalReachingEnv, PutBowlInsideMicrowaveEnv

__all__ = [
"GoalReachingEnv",
"PutBowlInsideMicrowaveEnv",
]
11 changes: 11 additions & 0 deletions src/env/gs_env/sim/envs/config/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,3 +56,14 @@
reward_args={},
img_resolution=(480, 270),
)


EnvArgsRegistry["put_bowl_inside_microwave_default"] = EnvArgs(
gs_init_args=GenesisInitArgsRegistry["default"],
scene_args=SceneArgsRegistry["flat_scene_default"],
robot_args=RobotArgsRegistry["franka_teleop"],
objects_args=[], # Objects are created directly in the environment
sensors_args=[],
reward_args={},
img_resolution=(480, 270),
)
2 changes: 2 additions & 0 deletions src/env/gs_env/sim/envs/manipulation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from .goal_reaching_env import GoalReachingEnv
from .put_bowl_inside_microwave_env import PutBowlInsideMicrowaveEnv

__all__ = [
"GoalReachingEnv",
"PutBowlInsideMicrowaveEnv",
]
269 changes: 269 additions & 0 deletions src/env/gs_env/sim/envs/manipulation/put_bowl_inside_microwave_env.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
import random
from typing import Any

import genesis as gs
import torch

from gs_env.common.bases.base_env import BaseEnv
from gs_env.sim.envs.config.schema import EnvArgs
from gs_env.sim.robots.config.schema import EEPoseAbsAction
from gs_env.sim.robots.manipulators import FrankaRobot

_DEFAULT_DEVICE = torch.device("cpu")


class PutBowlInsideMicrowaveEnv(BaseEnv):
"""Put bowl inside microwave environment."""

def __init__(
self,
args: EnvArgs,
device: torch.device = _DEFAULT_DEVICE,
) -> None:
super().__init__(device=device)
self._device = device
self._num_envs = 1 # Single environment for teleop
FPS = 60
# Create Genesis scene
self.scene = gs.Scene(
sim_options=gs.options.SimOptions(
substeps=4,
dt=1 / FPS,
),
rigid_options=gs.options.RigidOptions(
enable_joint_limit=True,
enable_collision=True,
gravity=(0, 0, -9.8),
box_box_detection=True,
constraint_timeconst=0.02,
),
viewer_options=gs.options.ViewerOptions(
camera_pos=(1.5, 0.0, 0.7),
camera_lookat=(0.2, 0.0, 0.1),
camera_fov=50,
max_FPS=200,
),
show_viewer=True, # Enable viewer for visualization
show_FPS=False,
)

# Add entities
self.entities = {}

# Ground plane
self.entities["plane"] = self.scene.add_entity(gs.morphs.Plane())

# SO101 robot
self.entities["robot"] = FrankaRobot(
num_envs=self._num_envs,
scene=self.scene, # use flat scene
args=args.robot_args,
device=self.device,
)

# Table
self.entities["table"] = self.scene.add_entity(
morph=gs.morphs.Box(
pos=(0.0, 0.0, 0.05),
size=(0.6, 0.6, 0.1),
),
)

# Bowl (using the winter_bowl.glb from assets)
self.entities["bowl"] = self.scene.add_entity(
morph=gs.morphs.Mesh(
file="assets/winter_bowl.glb",
pos=(0.05, -0.2, 0.15),
euler=(90, 0, 90),
scale=1 / 5000,
collision=True,
),
)

# Microwave (using the 7310 URDF from assets)
self.entities["microwave"] = self.scene.add_entity(
morph=gs.morphs.URDF(
file="assets/7310/mobility.urdf",
pos=(0.2, 0.2, 0.18),
euler=(0, 0, 30),
scale=0.3,
collision=True,
merge_fixed_links=True,
convexify=False,
),
)

self.entities["ee_frame"] = self.scene.add_entity(
morph=gs.morphs.Mesh(
file="meshes/axis.obj",
scale=0.15,
collision=False,
),
)

# Build scene
self.scene.build(n_envs=1)

# Command handling
self.last_command = None

# Store entities for easy access
self.robot = self.entities["robot"]

# Initialize with randomized positions
self._randomize_objects()

# Track current target point for visualization
self.current_target_pos = None

def initialize(self) -> None:
"""Initialize the environment."""
# Set bowl mass
self.entities["bowl"].set_mass(0.01)

# Set microwave door damping
# Set damping for microwave (8 DOFs: 3 pos + 4 quat + 1 joint)
self.entities["microwave"].set_dofs_damping([0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 1.0])

def _randomize_objects(self) -> None:
"""Randomize object positions."""
# Randomize bowl position on table
bowl_x = random.uniform(-0.1, 0.1)
bowl_y = random.uniform(-0.2, 0.0)
bowl_pos = torch.tensor([bowl_x, bowl_y, 0.15], dtype=torch.float32)
self.entities["bowl"].set_pos(bowl_pos)

# Microwave position is fixed
microwave_pos = torch.tensor([0.2, 0.2, 0.18], dtype=torch.float32)
self.entities["microwave"].set_pos(microwave_pos)

def apply_action(self, action: torch.Tensor | Any) -> None:
"""Apply action to the environment (BaseEnv requirement)."""
# For teleop, action might be a command object instead of tensor
if isinstance(action, torch.Tensor):
# Empty tensor from teleop wrapper - no action to apply
pass
else:
# This is a command object from teleop
self.last_command = action

pos_quat = torch.concat([action.position, action.orientation], -1)
self.entities["ee_frame"].set_qpos(pos_quat)
# Apply action to robot

robot_action = EEPoseAbsAction(
ee_link_pos=action.position,
ee_link_quat=action.orientation,
gripper_width=0.0 if action.gripper_close else 0.04,
)
self.entities["robot"].apply_action(robot_action)

# Handle special commands
if hasattr(action, "reset_scene") and action.reset_scene:
self.reset_idx(torch.IntTensor([0]))
elif hasattr(action, "quit_teleop") and action.quit_teleop:
print("Quit command received from teleop")

# Step the scene (like goal_reaching_env)
self.scene.step()

def get_observations(self) -> torch.Tensor:
"""Get current observation as tensor (BaseEnv requirement)."""
ee_pose = self.entities["robot"].ee_pose
joint_pos = self.entities["robot"].joint_positions

# Get bowl position and orientation
bowl_pos = self.entities["bowl"].get_pos()
bowl_quat = self.entities["bowl"].get_quat()

# Get microwave position and orientation
microwave_pos = self.entities["microwave"].get_pos()
microwave_quat = self.entities["microwave"].get_quat()

# Concatenate all observations into a single tensor
obs_tensor = torch.cat(
[
ee_pose, # 7 values: [x, y, z, qw, qx, qy, qz]
joint_pos, # 7 values: joint positions
bowl_pos, # 3 values: bowl position
bowl_quat, # 4 values: bowl quaternion [w, x, y, z]
microwave_pos, # 3 values: microwave position
microwave_quat, # 4 values: microwave quaternion [w, x, y, z]
],
dim=-1,
)

return obs_tensor

def get_ee_pose(self) -> tuple[torch.Tensor, torch.Tensor]:
"""Get end-effector pose for teleop wrapper."""
robot_pos = self.entities["robot"].ee_pose
return robot_pos[..., :3], robot_pos[..., 3:]

def get_extra_infos(self) -> dict[str, Any]:
"""Get extra information."""
return {}

def get_terminated(self) -> torch.Tensor:
"""Get termination status."""
return torch.tensor([False])

def get_truncated(self) -> torch.Tensor:
"""Get truncation status."""
return torch.tensor([False])

def get_reward(self) -> tuple[torch.Tensor, dict[str, torch.Tensor]]:
"""Get reward."""
return torch.tensor([0.0]), {}

def is_episode_complete(self) -> torch.Tensor:
"""Check if episode is complete - bowl is inside microwave."""
# Get AABB (Axis-Aligned Bounding Box) for both objects
bowl_aabb = self.entities["bowl"].get_AABB() # [2, 3] - min and max corners
microwave_aabb = self.entities["microwave"].get_AABB() # [2, 3] - min and max corners

# Check if bowl is inside microwave using AABB intersection
# Bowl is inside if its AABB is completely contained within microwave AABB
bowl_min, bowl_max = bowl_aabb[0], bowl_aabb[1]
microwave_min, microwave_max = microwave_aabb[0], microwave_aabb[1]

# Check if bowl is inside microwave (with some tolerance)
tolerance = 0.05 # 5cm tolerance
is_inside = torch.all(
(bowl_min >= microwave_min - tolerance) & (bowl_max <= microwave_max + tolerance)
)

return torch.tensor([is_inside])

def reset_idx(self, envs_idx: Any) -> None:
"""Reset environment."""
# Clear any existing debug objects
self.scene.clear_debug_objects()

# Reset robot to natural pose
initial_q = torch.tensor(
[0.0, -0.3, 0.5, 0.0, 0.0, 0.0, 0.0], dtype=torch.float32
) # 7 joints to match registry format
self.entities["robot"].reset_to_pose(initial_q)

# Reset microwave door to closed position
current_qpos = self.entities["microwave"].get_qpos()
# Keep position and orientation, but set door joint to 0 (closed)
reset_qpos = current_qpos.clone()
reset_qpos[0, 7] = 0.0 # Set door joint to closed position
self.entities["microwave"].set_qpos(reset_qpos)

# Randomize object positions
self._randomize_objects()

# Reset target visualization
self.current_target_pos = None

def step(self) -> None:
"""Step the environment."""
self.scene.step()

@property
def num_envs(self) -> int:
"""Get number of environments."""
return self._num_envs