diff --git a/docker/Dockerfile.leapmotion_teleop b/docker/Dockerfile.leapmotion_teleop new file mode 100644 index 00000000..de6fdc1b --- /dev/null +++ b/docker/Dockerfile.leapmotion_teleop @@ -0,0 +1,157 @@ +ARG BASE_IMAGE=nvcr.io/nvidia/isaac-sim:5.0.0 + +FROM ${BASE_IMAGE} + +# GR00T Policy Build Arguments, these are only used if INSTALL_GROOT is true +ARG INSTALL_GROOT=false +ARG GROOT_DEPS_GROUP=base + +ARG WORKDIR="/workspace" +ENV WORKDIR=${WORKDIR} +WORKDIR "${WORKDIR}" + +# Hide conflicting Vulkan files, if needed. +RUN if [ -e "/usr/share/vulkan" ] && [ -e "/etc/vulkan" ]; then \ + mv /usr/share/vulkan /usr/share/vulkan_hidden; \ + fi + +# Install apt packages +RUN apt-get update && apt-get install -y \ + git \ + git-lfs \ + cmake \ + sudo \ + python3-pip + +# Update pip to the latest version +RUN pip3 install --upgrade pip + +################################ +# Install Isaac Lab +################################ +# Split copying into separate instructions +# to avoid docker cache invalidation on code edits. +COPY ./submodules/IsaacLab ${WORKDIR}/submodules/IsaacLab +# Set the ISAACLAB_PATH environment variable +ENV ISAACLAB_PATH=${WORKDIR}/submodules/IsaacLab +ENV TERM=xterm +# Symlink isaac sim to IsaacLab +RUN ln -s /isaac-sim/ ${WORKDIR}/submodules/IsaacLab/_isaac_sim +# Install IsaacLab dependencies +RUN for DIR in ${WORKDIR}/submodules/IsaacLab/source/isaaclab*/; do pip install --no-deps -e "$DIR"; done +# Logs and other stuff appear under dist-packages per default, so this dir has to be writeable. +RUN chmod 777 -R /isaac-sim/kit/ +# Install isaaclab +RUN ${ISAACLAB_PATH}/isaaclab.sh -i + +# Install pip dependencies +# NOTE(alexmillane, 2025-07-22): Dependencies are installed in the IsaacSim version of python. +RUN /isaac-sim/python.sh -m pip install --upgrade pip && \ + /isaac-sim/python.sh -m pip install \ + pytest \ + jupyter \ + # NOTE(xinjieyao, 2025-09-15): Needed for inheritance of torch nn.module, saving/loading model + typing_extensions \ + # NOTE(xinjieyao, 2025-09-15): Needed for WBC running onnx compiled policy + onnxruntime + +# lwlabs deps +RUN /isaac-sim/python.sh -m pip install --upgrade pip && \ + /isaac-sim/python.sh -m pip install \ + vuer[all] \ + lightwheel-sdk + +# Lightwheel server +ENV LW_API_ENDPOINT="https://api-dev.lightwheel.net" + +############################### +# Install GR00T and CUDA 12.8 # +############################### +# Copy CUDA 12.8 installation script +COPY docker/setup/install_cuda.sh /tmp/install_cuda.sh +RUN chmod +x /tmp/install_cuda.sh +# If GR00T deps to be installed, install cuda 12.8 (needed for flash-attn, and avoid 4G memory addon) +RUN if [ "$INSTALL_GROOT" = "true" ]; then \ + /tmp/install_cuda.sh; \ + else \ + echo "Skipping CUDA 12.8 installation"; \ + fi && \ + rm -f /tmp/install_cuda.sh +# Copy GR00T +COPY ./submodules/Isaac-GR00T ${WORKDIR}/submodules/Isaac-GR00T +# Copy GR00T dependencies installation script +COPY docker/setup/install_gr00t_deps.sh /tmp/install_gr00t_deps.sh +RUN chmod +x /tmp/install_gr00t_deps.sh +# Install GR00T dependencies if requested +RUN if [ "$INSTALL_GROOT" = "true" ]; then \ + /tmp/install_gr00t_deps.sh; \ + # Fix damage to /isaac-sim/exts/omni.isaac.ml_archive/pip_prebundle/torch + rm -rf /isaac-sim/exts/omni.isaac.ml_archive/pip_prebundle/torch \ + else \ + echo "Skipping GR00T installation"; \ + fi && \ + # Clean up installation scripts + rm -f /tmp/install_gr00t_deps.sh + +# Copy the rest of the files +COPY *.* ${WORKDIR}/ +COPY isaac_arena ${WORKDIR}/isaac_arena +COPY docs ${WORKDIR}/docs + +# Install Isaac Arena +RUN /isaac-sim/python.sh -m pip install -e ${WORKDIR}/ + +# Set an alias to run the isaac lab python interpreter. +RUN echo "alias python='/isaac-sim/python.sh'" >> /etc/bash.bashrc +RUN echo "alias pip3='/isaac-sim/python.sh -m pip'" >> /etc/bash.bashrc +RUN echo "alias pytest='/isaac-sim/python.sh -m pytest'" >> /etc/bash.bashrc + +# Debugging with debugpy +# Usage: +# 1) Set your breakpoints +# 2) Start the script you want to debug with "debugpy" instead of "python". +# It will pause waiting for the debugger to attach. +# 3) Attach to the running container with VSCode using the "Attach to debugpy session" +# configuration from the Run and Debug panel. +RUN pip3 install debugpy +RUN echo "alias debugpy='python -Xfrozen_modules=off -m debugpy --listen localhost:5678 --wait-for-client'" >> /etc/bash.bashrc + +#################################################################################################### +# Install dependencies for Leapmotion teleop +#################################################################################################### +RUN apt install -y wget + +RUN wget -qO - https://repo.ultraleap.com/keys/apt/gpg | gpg --dearmor | tee /etc/apt/trusted.gpg.d/ultraleap.gpg +RUN echo 'deb [arch=amd64] https://repo.ultraleap.com/apt stable main' | tee /etc/apt/sources.list.d/ultraleap.list +# Pre-accept UltraLeap license and install +RUN apt update && \ + echo 'ultraleap-hand-tracking ultraleap-hand-tracking/license-agreement boolean true' | debconf-set-selections && \ + DEBIAN_FRONTEND=noninteractive apt install -y ultraleap-hand-tracking + +# Install build module first +RUN /isaac-sim/kit/python/bin/python3 -m pip install build + +# Clone and install leapc python bindings +RUN git clone https://github.com/ultraleap/leapc-python-bindings /opt/leapc-python-bindings && \ + cd /opt/leapc-python-bindings && \ + /isaac-sim/kit/python/bin/python3 -m pip install -r requirements.txt && \ + /isaac-sim/kit/python/bin/python3 -m build leapc-cffi && \ + /isaac-sim/kit/python/bin/python3 -m pip install leapc-cffi/dist/leapc_cffi-0.0.1.tar.gz && \ + /isaac-sim/kit/python/bin/python3 -m pip install -e leapc-python-api/ + +RUN /isaac-sim/kit/python/bin/python3 -m pip install numpy==1.26.0 +#################################################################################################### +#################################################################################################### + +# Change prompt so it's obvious we're inside the arena container +RUN echo "PS1='[Isaac Arena] \[\e[0;32m\]~\u \[\e[0;34m\]\w\[\e[0m\] \$ '" >> /etc/bash.bashrc +# List files as a table, with colors, show hidden, append indicators (/ for dirs, * for executables, @ for symlinks) +RUN echo "alias ll='ls -alF --color=auto'" >> /etc/bash.bashrc +# Go one level up +RUN echo "alias ..='cd ..'" >> /etc/bash.bashrc +# Change prompt to colored one for root too +RUN cp /etc/bash.bashrc /root/.bashrc + +# Set the entrypoint +COPY docker/setup/entrypoint.sh /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/isaaclab_arena/scripts/record_demos_g1_leapmotion.py b/isaaclab_arena/scripts/record_demos_g1_leapmotion.py new file mode 100644 index 00000000..0c09326d --- /dev/null +++ b/isaaclab_arena/scripts/record_demos_g1_leapmotion.py @@ -0,0 +1,652 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Script to record demonstrations with Isaac Lab environments using human teleoperation. + +This script allows users to record demonstrations operated by human teleoperation for a specified task. +The recorded demonstrations are stored as episodes in a hdf5 file. Users can specify the task, teleoperation +device, dataset directory, and environment stepping rate through command-line arguments. + +required arguments: + --task Name of the task. + +optional arguments: + -h, --help Show this help message and exit + --teleop_device Device for interacting with environment. (default: keyboard) + --dataset_file File path to export recorded demos. (default: "./datasets/dataset.hdf5") + --step_hz Environment stepping rate in Hz. (default: 30) + --num_demos Number of demonstrations to record. (default: 0) + --num_success_steps Number of continuous steps with task success for concluding a demo as successful. (default: 10) +""" + +"""Launch Isaac Sim Simulator first.""" + +# Standard library imports +import contextlib + +# Isaac Lab AppLauncher +from isaaclab.app import AppLauncher + +from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser +from isaaclab_arena.examples.example_environments.cli import ( + add_example_environments_cli_args, + get_arena_builder_from_cli, +) +# add argparse arguments +parser = get_isaaclab_arena_cli_parser() +parser.add_argument( + "--dataset_file", type=str, default="./datasets/dataset.hdf5", help="File path to export recorded demos." +) +parser.add_argument("--step_hz", type=int, default=30, help="Environment stepping rate in Hz.") +parser.add_argument( + "--num_demos", type=int, default=0, help="Number of demonstrations to record. Set to 0 for infinite." +) +parser.add_argument( + "--num_success_steps", + type=int, + default=10, + help="Number of continuous steps with task success for concluding a demo as successful. Default is 10.", +) +parser.add_argument( + "--enable_pinocchio", + action="store_true", + default=False, + help="Enable Pinocchio.", +) +parser.add_argument("--teleop_device", type=str, default="leapmotion", help="The teleop device to use.") + +# Add the example environments CLI args +# NOTE(alexmillane, 2025.09.04): This has to be added last, because +# of the app specific flags being parsed after the global flags. +add_example_environments_cli_args(parser) + +# parse the arguments +args_cli = parser.parse_args() + +app_launcher_args = vars(args_cli) + +if args_cli.enable_pinocchio: + # Import pinocchio before AppLauncher to force the use of the version installed by IsaacLab and not the one installed by Isaac Sim + # pinocchio is required by the Pink IK controllers and the GR1T2 retargeter + import pinocchio # noqa: F401 + + app_launcher_args["xr"] = False + +# launch the simulator +app_launcher = AppLauncher(args_cli) +simulation_app = app_launcher.app + +"""Rest everything follows.""" + + +# Third-party imports +import gymnasium as gym +import os +import time +import torch + +import isaaclab_mimic.envs # noqa: F401 + +# Omniverse logger +import omni.log +import omni.usd +from isaaclab.devices import Se3Keyboard, Se3KeyboardCfg +from isaaclab.devices.openxr import remove_camera_configs +from omni.kit.viewport.utility import get_viewport_from_window_name + +# Imports have to follow simulation startup. + +if args_cli.enable_pinocchio: + import isaaclab_tasks.manager_based.manipulation.pick_place # noqa: F401 + +import isaaclab_tasks # noqa: F401 +from isaaclab.envs import DirectRLEnvCfg, ManagerBasedRLEnvCfg +from isaaclab.envs.mdp.recorders.recorders_cfg import ActionStateRecorderManagerCfg +from isaaclab.managers import DatasetExportMode + + +class RateLimiter: + """Convenience class for enforcing rates in loops.""" + + def __init__(self, hz: int): + """Initialize a RateLimiter with specified frequency. + + Args: + hz: Frequency to enforce in Hertz. + """ + self.hz = hz + self.last_time = time.time() + self.sleep_duration = 1.0 / hz + self.render_period = min(0.033, self.sleep_duration) + + def sleep(self, env: gym.Env): + """Attempt to sleep at the specified rate in hz. + + Args: + env: Environment to render during sleep periods. + """ + next_wakeup_time = self.last_time + self.sleep_duration + while time.time() < next_wakeup_time: + time.sleep(self.render_period) + env.sim.render() + + self.last_time = self.last_time + self.sleep_duration + + # detect time jumping forwards (e.g. loop is too slow) + if self.last_time < time.time(): + while self.last_time < time.time(): + self.last_time += self.sleep_duration + + +def set_viewport_to_camera(camera_prim_path: str): + """Set the viewport camera to a specific camera prim path. + + Args: + camera_prim_path: The prim path of the camera to set as the viewport camera. + + Returns: + bool: True if successful, False otherwise. + """ + try: + # Get the viewport + viewport = get_viewport_from_window_name("Viewport") + if viewport is None: + omni.log.error("Could not find viewport window") + return False + + # Set the active camera to the specified camera + viewport.set_active_camera(camera_prim_path) + omni.log.info(f"Set viewport camera to: {camera_prim_path}") + return True + + except Exception as e: + omni.log.error(f"Could not set viewport camera: {e}") + return False + + +def find_and_set_camera(camera_paths_to_try): + """Find a camera from a list of possible paths and set it as the viewport camera. + + Args: + camera_paths_to_try: List of camera prim paths to try. + + Returns: + bool: True if successful, False otherwise. + """ + stage = omni.usd.get_context().get_stage() + if stage is None: + omni.log.error("No USD stage found") + return False + + # Try each camera path + for path in camera_paths_to_try: + if stage.GetPrimAtPath(path): + omni.log.info(f"Found camera at: {path}") + return set_viewport_to_camera(path) + + omni.log.warn("Could not find any of the specified cameras") + return False + + +def setup_output_directories() -> tuple[str, str]: + """Set up output directories for saving demonstrations. + + Creates the output directory if it doesn't exist and extracts the file name + from the dataset file path. + + Returns: + tuple[str, str]: A tuple containing: + - output_dir: The directory path where the dataset will be saved + - output_file_name: The filename (without extension) for the dataset + """ + # get directory path and file name (without extension) from cli arguments + output_dir = os.path.dirname(args_cli.dataset_file) + output_file_name = os.path.splitext(os.path.basename(args_cli.dataset_file))[0] + + # create directory if it does not exist + if not os.path.exists(output_dir): + os.makedirs(output_dir) + print(f"Created output directory: {output_dir}") + + return output_dir, output_file_name + + +def create_environment_config( + output_dir: str, output_file_name: str +) -> tuple[ManagerBasedRLEnvCfg | DirectRLEnvCfg, str, object | None]: + """Create and configure the environment configuration. + + Parses the environment configuration and makes necessary adjustments for demo recording. + Extracts the success termination function and configures the recorder manager. + + Args: + output_dir: Directory where recorded demonstrations will be saved + output_file_name: Name of the file to store the demonstrations + + Returns: + tuple[isaaclab_tasks.utils.parse_cfg.EnvCfg, Optional[object]]: A tuple containing: + - env_cfg: The configured environment configuration + - success_term: The success termination object or None if not available + + Raises: + Exception: If parsing the environment configuration fails + """ + # parse configuration + try: + arena_builder = get_arena_builder_from_cli(args_cli) + env_name, env_cfg = arena_builder.build_registered() + + except Exception as e: + omni.log.error(f"Failed to parse environment configuration: {e}") + exit(1) + + # extract success checking function to invoke in the main loop + success_term = None + if hasattr(env_cfg.terminations, "success"): + success_term = env_cfg.terminations.success + env_cfg.terminations.success = None + else: + omni.log.warn( + "No success termination term was found in the environment." + " Will not be able to mark recorded demos as successful." + ) + + if args_cli.xr: + # If cameras are not enabled and XR is enabled, remove camera configs + if not args_cli.enable_cameras: + env_cfg = remove_camera_configs(env_cfg) + env_cfg.sim.render.antialiasing_mode = "DLSS" + + # modify configuration such that the environment runs indefinitely until + # the goal is reached or other termination conditions are met + env_cfg.terminations.time_out = None + env_cfg.observations.policy.concatenate_terms = False + + env_cfg.recorders: ActionStateRecorderManagerCfg = ActionStateRecorderManagerCfg() + env_cfg.recorders.dataset_export_dir_path = output_dir + env_cfg.recorders.dataset_filename = output_file_name + env_cfg.recorders.dataset_export_mode = DatasetExportMode.EXPORT_SUCCEEDED_ONLY + + return env_cfg, env_name, success_term + + +def create_environment(env_cfg: ManagerBasedRLEnvCfg | DirectRLEnvCfg, env_name: str) -> gym.Env: + """Create the environment from the configuration. + + Args: + env_cfg: The environment configuration object that defines the environment properties. + This should be an instance of EnvCfg created by parse_env_cfg(). + + Returns: + gym.Env: A Gymnasium environment instance for the specified task. + + Raises: + Exception: If environment creation fails for any reason. + """ + try: + env = gym.make(env_name, cfg=env_cfg).unwrapped + return env + except Exception as e: + omni.log.error(f"Failed to create environment: {e}") + exit(1) + + +def process_success_condition(env: gym.Env, success_term: object | None, success_step_count: int) -> tuple[int, bool]: + """Process the success condition for the current step. + + Checks if the environment has met the success condition for the required + number of consecutive steps. Marks the episode as successful if criteria are met. + + Args: + env: The environment instance to check + success_term: The success termination object or None if not available + success_step_count: Current count of consecutive successful steps + + Returns: + tuple[int, bool]: A tuple containing: + - updated success_step_count: The updated count of consecutive successful steps + - success_reset_needed: Boolean indicating if reset is needed due to success + """ + if success_term is None: + return success_step_count, False + + if bool(success_term.func(env, **success_term.params)[0]): + success_step_count += 1 + if success_step_count >= args_cli.num_success_steps: + env.recorder_manager.record_pre_reset([0], force_export_or_skip=False) + env.recorder_manager.set_success_to_episodes( + [0], torch.tensor([[True]], dtype=torch.bool, device=env.device) + ) + env.recorder_manager.export_episodes([0]) + print("Success condition met! Recording completed.") + return success_step_count, True + else: + success_step_count = 0 + + return success_step_count, False + + +def handle_reset(env: gym.Env, success_step_count: int) -> int: + """Handle resetting the environment. + + Resets the environment, recorder manager, and related state variables. + Updates the instruction display with current status. + + Args: + env: The environment instance to reset + success_step_count: Current count of consecutive successful steps + label_text: Text to display showing current recording status + + Returns: + int: Reset success step count (0) + """ + print("Resetting environment...") + env.sim.reset() + env.recorder_manager.reset() + env.reset() + success_step_count = 0 + return success_step_count + + +"""""" """""" """""" """""" """""" """""" """""" """""" """""" """""" """""" +"""""" """""" """""" """""" """""" """""" """""" """""" """""" """""" """""" +""" +Lower body keyboard control +""" +lin_vel_x = 0.0 +lin_vel_y = 0.0 +ang_vel = 0.0 +standing = 0 +base_height = 0.75 +reset_wbc_flag = False +pause_flag = False +torso_orientation_roll = 0.0 +torso_orientation_pitch = 0.0 +torso_orientation_yaw = 0.0 + + +def reset_wbc(): + global reset_wbc_flag + reset_wbc_flag = True + + +def increase_linear_vel_x(): + global lin_vel_x + lin_vel_x += 0.1 + + +def decrease_linear_vel_x(): + global lin_vel_x + lin_vel_x -= 0.1 + + +def increase_angular_vel(): + global ang_vel + ang_vel += 0.1 + + +def decrease_angular_vel(): + global ang_vel + ang_vel -= 0.1 + + +def increase_linear_vel_y(): + global lin_vel_y + lin_vel_y += 0.1 + + +def decrease_linear_vel_y(): + global lin_vel_y + lin_vel_y -= 0.1 + + +def increase_base_height(): + global base_height + base_height += 0.01 + + +def decrease_base_height(): + global base_height + base_height -= 0.01 + + +def increase_torso_orientation_roll(): + global torso_orientation_roll + torso_orientation_roll += 0.1 + + +def decrease_torso_orientation_roll(): + global torso_orientation_roll + torso_orientation_roll -= 0.1 + + +def increase_torso_orientation_pitch(): + global torso_orientation_pitch + torso_orientation_pitch += 0.1 + + +def decrease_torso_orientation_pitch(): + global torso_orientation_pitch + torso_orientation_pitch -= 0.1 + + +def increase_torso_orientation_yaw(): + global torso_orientation_yaw + torso_orientation_yaw += 0.1 + + +def decrease_torso_orientation_yaw(): + global torso_orientation_yaw + torso_orientation_yaw -= 0.1 + + +def pause(): + global pause_flag + pause_flag = True + + +"""""" """""" """""" """""" """""" """""" """""" """""" """""" """""" """""" +"""""" """""" """""" """""" """""" """""" """""" """""" """""" """""" """""" + + +def run_simulation_loop( + env: gym.Env, + teleop_interface: object | None, + success_term: object | None, + rate_limiter: RateLimiter | None, +) -> int: + """Run the main simulation loop for collecting demonstrations. + + Sets up callback functions for teleop device, initializes the UI, + and runs the main loop that processes user inputs and environment steps. + Records demonstrations when success conditions are met. + + Args: + env: The environment instance + teleop_interface: Optional teleop interface (will be created if None) + success_term: The success termination object or None if not available + rate_limiter: Optional rate limiter to control simulation speed + + Returns: + int: Number of successful demonstrations recorded + """ + current_recorded_demo_count = 0 + success_step_count = 0 + should_reset_recording_instance = False + running_recording_instance = not args_cli.xr + + # Callback closures for the teleop device + def reset_recording_instance(): + nonlocal should_reset_recording_instance + should_reset_recording_instance = True + print("Recording instance reset requested") + reset_wbc() + global lin_vel_x, lin_vel_y, ang_vel, standing, base_height, torso_orientation_roll, torso_orientation_pitch, torso_orientation_yaw + lin_vel_x = 0.0 + lin_vel_y = 0.0 + ang_vel = 0.0 + standing = 0 + base_height = 0.75 + torso_orientation_roll = 0.0 + torso_orientation_pitch = 0.0 + torso_orientation_yaw = 0.0 + print("WBC reset") + teleop_interface.reset() + print("Teleop interface reset") + + def start_recording_instance(): + nonlocal running_recording_instance + running_recording_instance = True + print("Recording started") + + def stop_recording_instance(): + nonlocal running_recording_instance + running_recording_instance = False + print("Recording paused") + + # Import Leapmotion after env creation to prevent circular import + from isaaclab_arena.teleop_devices.leapmotion.leapmotion_teleop_device import Leapmotion, LeapmotionCfg + + teleop_interface = Leapmotion(LeapmotionCfg()) + + """ Lower body keyboard control """ + keyboard_interface = Se3Keyboard(Se3KeyboardCfg(pos_sensitivity=0.1, rot_sensitivity=0.1)) + keyboard_interface.add_callback("R", reset_recording_instance) + keyboard_interface.add_callback("W", increase_linear_vel_x) + keyboard_interface.add_callback("S", decrease_linear_vel_x) + keyboard_interface.add_callback("Q", increase_angular_vel) + keyboard_interface.add_callback("E", decrease_angular_vel) + keyboard_interface.add_callback("A", increase_linear_vel_y) + keyboard_interface.add_callback("D", decrease_linear_vel_y) + keyboard_interface.add_callback("Z", increase_base_height) + keyboard_interface.add_callback("X", decrease_base_height) + keyboard_interface.add_callback("G", increase_torso_orientation_roll) + keyboard_interface.add_callback("J", decrease_torso_orientation_roll) + keyboard_interface.add_callback("Y", increase_torso_orientation_pitch) + keyboard_interface.add_callback("H", decrease_torso_orientation_pitch) + keyboard_interface.add_callback("T", increase_torso_orientation_yaw) + keyboard_interface.add_callback("U", decrease_torso_orientation_yaw) + keyboard_interface.add_callback("P", pause) + keyboard_interface.reset() + + # Reset before starting + env.sim.reset() + env.reset() + teleop_interface.reset() + + # Set viewport camera + camera_paths = [ + "/World/envs/env_0/Robot/head_link/RobotHeadCam", + ] + find_and_set_camera(camera_paths) + + with contextlib.suppress(KeyboardInterrupt) and torch.inference_mode(): + while simulation_app.is_running(): + # Get upper body command + action = teleop_interface.advance() + + # Get lower body command + navigate_cmd = torch.tensor([lin_vel_x, lin_vel_y, ang_vel]) + base_height_cmd = torch.tensor([base_height]) + torso_orientation_rpy_cmd = torch.tensor( + [torso_orientation_roll, torso_orientation_pitch, torso_orientation_yaw] + ) + action = torch.cat([action, navigate_cmd, base_height_cmd, torso_orientation_rpy_cmd]) + + print(f"navigatie_cmd: {navigate_cmd}") + print(f"base_height_cmd: {base_height_cmd}\n") + + actions = action.repeat(env.num_envs, 1) + + # Perform action on environment + if running_recording_instance: + # Compute actions based on environment + env.step(actions) + else: + env.sim.render() + + # Check for success condition + success_step_count, success_reset_needed = process_success_condition(env, success_term, success_step_count) + if success_reset_needed: + should_reset_recording_instance = True + + # Update demo count if it has changed + if env.recorder_manager.exported_successful_episode_count > current_recorded_demo_count: + current_recorded_demo_count = env.recorder_manager.exported_successful_episode_count + + # Handle reset if requested + if should_reset_recording_instance: + success_step_count = handle_reset(env, success_step_count) + should_reset_recording_instance = False + + # Check if we've reached the desired number of demos + if args_cli.num_demos > 0 and env.recorder_manager.exported_successful_episode_count >= args_cli.num_demos: + print(f"All {args_cli.num_demos} demonstrations recorded. Exiting the app.") + break + + # Check if simulation is stopped + if env.sim.is_stopped(): + break + + # Rate limiting + if rate_limiter: + rate_limiter.sleep(env) + + return current_recorded_demo_count + + +def main() -> None: + """Collect demonstrations from the environment using teleop interfaces. + + Main function that orchestrates the entire process: + 1. Sets up rate limiting based on configuration + 2. Creates output directories for saving demonstrations + 3. Configures the environment + 4. Runs the simulation loop to collect demonstrations + 5. Cleans up resources when done + + Raises: + Exception: Propagates exceptions from any of the called functions + """ + # if handtracking is selected, rate limiting is achieved via OpenXR + if args_cli.xr: + rate_limiter = None + else: + rate_limiter = RateLimiter(args_cli.step_hz) + + # Set up output directories + output_dir, output_file_name = setup_output_directories() + + # Create and configure environment + global env_cfg # Make env_cfg available to setup_teleop_device + env_cfg, env_name, success_term = create_environment_config(output_dir, output_file_name) + + # Create environment + env = create_environment(env_cfg, env_name) + + # Run simulation loop + current_recorded_demo_count = run_simulation_loop(env, None, success_term, rate_limiter) + + # Clean up + env.close() + print(f"Recording session completed with {current_recorded_demo_count} successful demonstrations") + print(f"Demonstrations saved to: {args_cli.dataset_file}") + + +if __name__ == "__main__": + # run the main function + main() + # close sim app + simulation_app.close() diff --git a/isaaclab_arena/teleop_devices/leapmotion/base_streamer.py b/isaaclab_arena/teleop_devices/leapmotion/base_streamer.py new file mode 100644 index 00000000..d32e7783 --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/base_streamer.py @@ -0,0 +1,33 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import Any + + +class BaseStreamer(ABC): + def __init__(self, *args, **kwargs): + pass + + @abstractmethod + def start_streaming(self): + pass + + @abstractmethod + def get(self) -> Any: + pass + + @abstractmethod + def stop_streaming(self): + pass diff --git a/isaaclab_arena/teleop_devices/leapmotion/leapmotion.py b/isaaclab_arena/teleop_devices/leapmotion/leapmotion.py new file mode 100644 index 00000000..58cd9a59 --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/leapmotion.py @@ -0,0 +1,40 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from isaaclab.devices.device_base import DevicesCfg + +from isaac_arena.assets.register import register_device +from isaaclab_arena.teleop_devices.leapmotion.leapmotion_teleop_device import LeapmotionCfg +from isaaclab_arena.teleop_devices.teleop_device_base import TeleopDeviceBase + + +@register_device +class LeapmotionTeleopDevice(TeleopDeviceBase): + """ + Teleop device for Leapmotion. + """ + + name = "leapmotion" + + def __init__(self): + super().__init__() + + def build_cfg(self, *, sim_device: str | None = None, actions: object | None = None, xr_cfg: object | None = None): + return DevicesCfg( + devices={ + "leapmotion": LeapmotionCfg( + body_control_device="leapmotion", + sim_device=sim_device, + ), + } + ) diff --git a/isaaclab_arena/teleop_devices/leapmotion/leapmotion_streamer.py b/isaaclab_arena/teleop_devices/leapmotion/leapmotion_streamer.py new file mode 100644 index 00000000..ecb5c89b --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/leapmotion_streamer.py @@ -0,0 +1,221 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import pickle +import queue +import threading +import time +from copy import deepcopy +from scipy.spatial.transform import Rotation as R + +import leap +import leap.events + + +def xyz2np_array(position: leap.datatypes.Vector) -> np.ndarray: + return np.array([position.x, position.y, position.z]) + + +def quat2np_array(quaternion: leap.datatypes.Quaternion) -> np.ndarray: + return np.array([quaternion.x, quaternion.y, quaternion.z, quaternion.w]) + + +def get_raw_finger_points(hand: leap.datatypes.Hand) -> np.ndarray: + target_points = np.array([np.eye(4) for _ in range(25)]) + target_points[0, :3, :3] = R.from_quat(quat2np_array(hand.palm.orientation)).as_matrix() + target_points[0, :3, 3] = xyz2np_array(hand.arm.next_joint) + target_points[1, :3, 3] = xyz2np_array(hand.thumb.bones[0].next_joint) + target_points[2, :3, 3] = xyz2np_array(hand.thumb.bones[1].next_joint) + target_points[3, :3, 3] = xyz2np_array(hand.thumb.bones[2].next_joint) + target_points[4, :3, 3] = xyz2np_array(hand.thumb.bones[3].next_joint) + target_points[5, :3, 3] = xyz2np_array(hand.index.bones[0].prev_joint) + target_points[6, :3, 3] = xyz2np_array(hand.index.bones[0].next_joint) + target_points[7, :3, 3] = xyz2np_array(hand.index.bones[1].next_joint) + target_points[8, :3, 3] = xyz2np_array(hand.index.bones[2].next_joint) + target_points[9, :3, 3] = xyz2np_array(hand.index.bones[3].next_joint) + target_points[10, :3, 3] = xyz2np_array(hand.middle.bones[0].prev_joint) + target_points[11, :3, 3] = xyz2np_array(hand.middle.bones[0].next_joint) + target_points[12, :3, 3] = xyz2np_array(hand.middle.bones[1].next_joint) + target_points[13, :3, 3] = xyz2np_array(hand.middle.bones[2].next_joint) + target_points[14, :3, 3] = xyz2np_array(hand.middle.bones[3].next_joint) + target_points[15, :3, 3] = xyz2np_array(hand.ring.bones[0].prev_joint) + target_points[16, :3, 3] = xyz2np_array(hand.ring.bones[0].next_joint) + target_points[17, :3, 3] = xyz2np_array(hand.ring.bones[1].next_joint) + target_points[18, :3, 3] = xyz2np_array(hand.ring.bones[2].next_joint) + target_points[19, :3, 3] = xyz2np_array(hand.ring.bones[3].next_joint) + target_points[20, :3, 3] = xyz2np_array(hand.pinky.bones[0].prev_joint) + target_points[21, :3, 3] = xyz2np_array(hand.pinky.bones[0].next_joint) + target_points[22, :3, 3] = xyz2np_array(hand.pinky.bones[1].next_joint) + target_points[23, :3, 3] = xyz2np_array(hand.pinky.bones[2].next_joint) + target_points[24, :3, 3] = xyz2np_array(hand.pinky.bones[3].next_joint) + + return target_points + + +def get_raw_wrist_pose(hand: leap.datatypes.Hand) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + hand_palm_position = xyz2np_array(hand.palm.position) + hand_palm_normal = np.array([-hand.palm.normal.z, -hand.palm.normal.x, hand.palm.normal.y]) + hand_palm_direction = np.array([-hand.palm.direction.z, -hand.palm.direction.x, hand.palm.direction.y]) + + return hand_palm_position, hand_palm_normal, hand_palm_direction + + +def get_finger_transform(target_points: np.ndarray, hand_type: str) -> np.ndarray: + pose_palm_root = target_points[0, :3, 3].copy() + rot_palm = target_points[0, :3, :3].copy() + + if hand_type == "right": + rot_leap2base = R.from_euler("z", [180], degrees=True).as_matrix() + rot_reverse = np.array( + [[[0, 0, 1], [0, -1, 0], [1, 0, 0]]] + ) # due to the target_base_rotation in hand IK solver + else: + rot_leap2base = np.eye(3) + rot_reverse = np.array([[[0, 0, -1], [0, -1, 0], [-1, 0, 0]]]) + offset = (target_points[:, :3, 3] - pose_palm_root).copy() / 1000.0 # mm to m + offset = offset @ rot_palm @ rot_leap2base @ rot_reverse + + target_points[:, :3, 3] = offset + + return target_points + + +def get_wrist_transformation( + hand_palm_position: np.ndarray, + hand_palm_normal: np.ndarray, + hand_palm_direction: np.ndarray, + hand_type: str, + pos_sensitivity: float = 1.0 / 1000.0, +) -> np.ndarray: + T = np.eye(4) + T[:3, 3] = hand_palm_position[[2, 0, 1]] * pos_sensitivity * np.array([-1, -1, 1]) + direction_np = hand_palm_direction + palm_normal_np = hand_palm_normal + if hand_type == "left": + transform = R.from_euler("y", -90, degrees=True).as_matrix() + lh_thumb_np = -np.cross(direction_np, palm_normal_np) + rotation_matrix = np.array([direction_np, -palm_normal_np, lh_thumb_np]).T + else: + transform = R.from_euler("xy", [-180, 90], degrees=True).as_matrix() + rh_thumb_np = np.cross(direction_np, palm_normal_np) + rotation_matrix = np.array([direction_np, palm_normal_np, rh_thumb_np]).T + T[:3, :3] = np.dot(rotation_matrix, transform) + return T + + +def get_raw_data(hands: list[leap.datatypes.Hand]) -> dict[str, np.ndarray]: + data = {} + for hand in hands: + hand_type = "left" if str(hand.type) == "HandType.Left" else "right" + data[hand_type + "_wrist"] = get_raw_wrist_pose(hand) + data[hand_type + "_fingers"] = get_raw_finger_points(hand) + return data + + +def process_data(raw_data: dict[str, np.ndarray]) -> dict[str, np.ndarray]: + data = {} + for hand_type in ["left", "right"]: + data[hand_type + "_wrist"] = get_wrist_transformation(*raw_data[hand_type + "_wrist"], hand_type) + data[hand_type + "_fingers"] = {"position": get_finger_transform(raw_data[hand_type + "_fingers"], hand_type)} + return data + + +class LeapMotionListener(leap.Listener): + def __init__(self, verbose=False): + self.data = None + self.data_lock = threading.Lock() + self.verbose = verbose + + def on_connection_event(self, event): + print("Connected") + + def on_device_event(self, event: leap.events.DeviceEvent): + try: + with event.device.open(): + info = event.device.get_info() + except AttributeError: + # Handle the case where LeapCannotOpenDeviceError is not available + try: + info = event.device.get_info() + except Exception as e: + print(f"Error opening device: {e}") + info = None + + print(f"Found Leap Motion device {info.serial}") + + def on_tracking_event(self, event: leap.events.TrackingEvent): + if len(event.hands) == 2: # only when two hands are detected, we update the data + with self.data_lock: + self.data: dict[str, np.ndarray] = get_raw_data(event.hands) + if self.verbose: + for hand in event.hands: + hand_type = "left" if str(hand.type) == "HandType.Left" else "right" + print( + f"Hand id {hand.id} is a {hand_type} hand with position" + f"({hand.palm.position.x}, {hand.palm.position.y}, {hand.palm.position.z})." + ) + + def get_data(self): + with self.data_lock: + return deepcopy(self.data) + + +# Note currently it is a auto-polling based streamer. +# The connection will start another thread to continue polling data +# and the listener will continue to receive data from the connection. +class LeapMotionStreamer: + def __init__(self, verbose=False, record_data=False, **kwargs): + self.connection = leap.Connection() + self.listener = LeapMotionListener(verbose=verbose) + self.connection.add_listener(self.listener) + + self.connection.set_tracking_mode(leap.TrackingMode.Desktop) + self.data = None + + self.record_data = record_data + self.data_queue = queue.Queue() + + def start_streaming(self): + self.connection.connect() + print("Waiting for the first data...") + time.sleep(0.5) + while not self.listener.get_data(): + time.sleep(0.1) + print("First data received!") + + def stop_streaming(self): + self.connection.disconnect() + + def get(self) -> dict[str, np.ndarray]: + raw_data = self.listener.get_data() + self.data_queue.put(raw_data) + return process_data(raw_data) + + def dump_data_to_file(self): + data_list = [] + while not self.data_queue.empty(): + data_list.append(self.data_queue.get()) + with open("leapmotion_data_trans.pkl", "wb") as f: + pickle.dump(data_list, f) + + +if __name__ == "__main__": + streamer = LeapMotionStreamer(verbose=True) + streamer.start_streaming() + for _ in range(100): + streamer.get() + time.sleep(0.1) + streamer.stop_streaming() + streamer.dump_data_to_file() diff --git a/isaaclab_arena/teleop_devices/leapmotion/leapmotion_teleop_device.py b/isaaclab_arena/teleop_devices/leapmotion/leapmotion_teleop_device.py new file mode 100644 index 00000000..8b82c996 --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/leapmotion_teleop_device.py @@ -0,0 +1,204 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +import torch +from collections.abc import Callable +from dataclasses import dataclass +from scipy.spatial.transform import Rotation as R + +from isaaclab.devices.device_base import DeviceBase, DeviceCfg + +from isaaclab_arena_g1.g1_env.robot_model import ReducedRobotModel, RobotModel +from isaaclab_arena_g1.g1_whole_body_controller.wbc_policy.utils.g1 import instantiate_g1_robot_model +from isaaclab_arena.teleop_devices.leapmotion.leapmotion_streamer import LeapMotionStreamer +from isaaclab_arena.teleop_devices.leapmotion.preprocesors.fingers import FingersPreProcessor +from isaaclab_arena.teleop_devices.leapmotion.preprocesors.wrists import WristsPreProcessor + + +class LeapmotionStreamer: + """ + Robot-agnostic teleop retargeting inverse kinematics code. + """ + + def __init__( + self, + robot_model: RobotModel, + left_hand_ik_solver, + right_hand_ik_solver, + body_control_device: str, + hand_control_device: str | None = None, + body_active_joint_groups: list[str] | None = None, + ): + # initialize the body + if body_active_joint_groups is not None: + self.body = ReducedRobotModel.from_active_groups(robot_model, body_active_joint_groups) + self.full_robot = self.body.full_robot + self.using_reduced_robot_model = True + else: + self.body = robot_model + self.full_robot = self.body + self.using_reduced_robot_model = False + + # initialize pre_proccessors + self.body_control_device = body_control_device + if body_control_device: + self.body_pre_processor = WristsPreProcessor() + self.body_pre_processor.register(self.body) + else: + self.body_pre_processor = None + + # initialize hand pre-processors and post-processors + self.hand_control_device = hand_control_device + if hand_control_device: + self.left_hand_pre_processor = FingersPreProcessor(side="left") + self.right_hand_pre_processor = FingersPreProcessor(side="right") + else: + self.left_hand_pre_processor = None + self.right_hand_pre_processor = None + + self.body_streamer = LeapMotionStreamer() + self.body_streamer.start_streaming() + + self.raw_data = {} + + def get_streamer_raw_data(self): + body_data_ = self.body_streamer.get() + self.raw_data.update(body_data_) + return self.raw_data + + def calibrate(self): + """Calibrate the pre-processors.""" + assert self.body_streamer, "Real device is enabled, but no streamer is initialized." + raw_data = self.get_streamer_raw_data() + + if self.body_pre_processor: + self.body_pre_processor.calibrate(raw_data) + if self.left_hand_pre_processor: + self.left_hand_pre_processor.calibrate(raw_data) + if self.right_hand_pre_processor: + self.right_hand_pre_processor.calibrate(raw_data) + + def pre_process(self, raw_data): + """Pre-process the raw data.""" + assert ( + self.body_pre_processor or self.left_hand_pre_processor or self.right_hand_pre_processor + ), "Pre-processors are not initialized." + if self.body_pre_processor: + body_data = self.body_pre_processor(raw_data) + if self.left_hand_pre_processor and self.right_hand_pre_processor: + left_hand_data = self.left_hand_pre_processor(raw_data) + right_hand_data = self.right_hand_pre_processor(raw_data) + return body_data, left_hand_data, right_hand_data + else: + return body_data, None, None + else: # only hands + if self.left_hand_pre_processor and self.right_hand_pre_processor: + left_hand_data = self.left_hand_pre_processor(raw_data) + right_hand_data = self.right_hand_pre_processor(raw_data) + return None, left_hand_data, right_hand_data + + def get_g1_gripper_state(self, finger_data, dist_threshold=0.05): + fingertips = finger_data["position"] + + # Extract X, Y, Z positions of fingertips from the transformation matrices + positions = np.array([finger[:3, 3] for finger in fingertips]) + positions = np.reshape(positions, (-1, 3)) # Ensure 2D array with shape (N, 3) + + # Compute the distance between the thumb and index finger + thumb_pos = positions[4, :] + index_pos = positions[4 + 5, :] + dist = np.linalg.norm(thumb_pos - index_pos) + hand_close = dist < dist_threshold + + return int(hand_close) + + def get_leapmotion_action(self): + raw_action = self.get_streamer_raw_data() + body_data, left_hand_data, right_hand_data = self.pre_process(raw_action) + + # Original variables + left_wrist_action = np.array(body_data["left_wrist_yaw_link"]) + right_wrist_action = np.array(body_data["right_wrist_yaw_link"]) + + # Extract position and quaternion from left wrist 4x4 pose matrix + left_wrist_pos = torch.from_numpy(left_wrist_action[:3, 3]) + left_wrist_rot_matrix = left_wrist_action[:3, :3] + left_wrist_quat = R.from_matrix(left_wrist_rot_matrix).as_quat() + + # Extract position and quaternion from right wrist 4x4 pose matrix + right_wrist_pos = torch.from_numpy(right_wrist_action[:3, 3]) + right_wrist_rot_matrix = right_wrist_action[:3, :3] + right_wrist_quat = R.from_matrix(right_wrist_rot_matrix).as_quat() + + # Convert quaternions from (x, y, z, w) to IsaacLab (w, x, y, z) format + left_wrist_quat_wxyz = torch.from_numpy(np.roll(left_wrist_quat, 1)) + right_wrist_quat_wxyz = torch.from_numpy(np.roll(right_wrist_quat, 1)) + + # Get G1 hand state (0 for open, 1 for close) + left_hand_state = torch.from_numpy(np.array([self.get_g1_gripper_state(left_hand_data)])) + right_hand_state = torch.from_numpy(np.array([self.get_g1_gripper_state(right_hand_data)])) + + # Assemble into upper body env action + upperbody_action = torch.cat([ + left_hand_state, + right_hand_state, + left_wrist_pos, + left_wrist_quat_wxyz, + right_wrist_pos, + right_wrist_quat_wxyz, + ]) + + return upperbody_action + + +@dataclass +class LeapmotionCfg(DeviceCfg): + """Configuration for the Leapmotion teleop device.""" + + body_control_device: str = "leapmotion" + + +class Leapmotion(DeviceBase): + + def __init__(self, cfg: LeapmotionCfg): + self.robot_model = instantiate_g1_robot_model() + self.device_streamer = LeapmotionStreamer( + robot_model=self.robot_model, + left_hand_ik_solver=None, + right_hand_ik_solver=None, + body_control_device=cfg.body_control_device, + hand_control_device=cfg.body_control_device, + body_active_joint_groups=["arms"], + ) + self.device_streamer.calibrate() + + # dictionary for additional callbacks + self._additional_callbacks = dict() + + def __str__(self) -> str: + """Returns: A string containing the information of joystick.""" + msg = f"Leapmotion Controller for G1: {self.__class__.__name__}\n" + return msg + + def reset(self): + self.device_streamer.calibrate() + + def add_callback(self, key: str, func: Callable): + """Add additional functions.""" + self._additional_callbacks[key] = func + + def advance(self) -> torch.Tensor: + print("[Leapmotion] Internal usage only, remove before release.") + return self.device_streamer.get_leapmotion_action() diff --git a/isaaclab_arena/teleop_devices/leapmotion/preprocesors/fingers.py b/isaaclab_arena/teleop_devices/leapmotion/preprocesors/fingers.py new file mode 100644 index 00000000..ad42515b --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/preprocesors/fingers.py @@ -0,0 +1,30 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from isaaclab_arena.teleop_devices.leapmotion.preprocesors.pre_processor import PreProcessor + + +class FingersPreProcessor(PreProcessor): + """Dummy class just takes out the fingers from the data.""" + + def __init__(self, side: str): + super().__init__() + self.side = side + + def __call__(self, data): + return data[f"{self.side}_fingers"] + + # TODO: calibrate max and min + def calibrate(self, data): + pass diff --git a/isaaclab_arena/teleop_devices/leapmotion/preprocesors/pre_processor.py b/isaaclab_arena/teleop_devices/leapmotion/preprocesors/pre_processor.py new file mode 100644 index 00000000..3ad53788 --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/preprocesors/pre_processor.py @@ -0,0 +1,27 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + + +class PreProcessor(abc.ABC): + def __init__(self, **kwargs): + pass + + def register(self, robot): + self.robot = robot + + @abc.abstractmethod + def __call__(self, data) -> dict: + pass diff --git a/isaaclab_arena/teleop_devices/leapmotion/preprocesors/wrists.py b/isaaclab_arena/teleop_devices/leapmotion/preprocesors/wrists.py new file mode 100644 index 00000000..821dc1b2 --- /dev/null +++ b/isaaclab_arena/teleop_devices/leapmotion/preprocesors/wrists.py @@ -0,0 +1,109 @@ +# Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy as np +from copy import deepcopy + +from isaaclab_arena.teleop_devices.leapmotion.preprocesors.pre_processor import PreProcessor + +WRIST_POSITION_OFFSET = np.matrix( + [[1.0, 0.0, 0.0, 0.0], [0.0, 1.0, 0.0, 0.0], [0.0, 0.0, 1.0, 0.0], [0.01, -0.04, 0.03, 1.0]] +).T +RIGHT_HAND_ROTATION = np.matrix([[-1, 0, 0], [0, -1, 0], [0, 0, 1]]) + +# TODO @fcastanedaga: get this rotation from robot model +G1_HAND_ROTATION = np.matrix([[0, 0, 1], [0, 1, 0], [-1, 0, 0]]) + + +class WristsPreProcessor(PreProcessor): + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.pose = {} # poses to calibrate the robot + self.ee_name_list = [] # name of the end-effector "link_head_pitch", "link_LArm7", "link_RArm7" + self.init_ee_pose = {} # initial end-effector pose of the robot + self.T_init_inv = {} # initial transformation matrix + self.R = {} # relative transformation matrix + self.rotation_correction = {} + + self.latest_data = None + + def calibrate(self, data): + left_elbow_joint_name = self.robot.supplemental_info.joint_name_mapping["elbow_pitch"]["left"] + right_elbow_joint_name = self.robot.supplemental_info.joint_name_mapping["elbow_pitch"]["right"] + if "left_wrist" in data: + self.ee_name_list.append(self.robot.supplemental_info.hand_frame_names["left"]) + self.pose[left_elbow_joint_name] = self.robot.supplemental_info.elbow_calibration_joint_angles["left"] + if "right_wrist" in data: + self.ee_name_list.append(self.robot.supplemental_info.hand_frame_names["right"]) + self.pose[right_elbow_joint_name] = self.robot.supplemental_info.elbow_calibration_joint_angles["right"] + + if self.pose: + q = deepcopy(self.robot.q_default) + # set pose + for joint, degree in self.pose.items(): + joint_idx = self.robot.joint_to_dof_index[joint] + q[joint_idx] = np.deg2rad(degree) + self.robot.cache_forward_kinematics(q) + target_ee_poses = [self.robot.frame_placement(ee_name).np for ee_name in self.ee_name_list] + self.robot.reset_forward_kinematics() + else: + target_ee_poses = [self.robot.frame_placement(ee_name).np for ee_name in self.ee_name_list] + + for ee_name in self.ee_name_list: + self.init_ee_pose[ee_name] = deepcopy(target_ee_poses[self.ee_name_list.index(ee_name)]) + self.T_init_inv[ee_name] = ( + np.linalg.inv(deepcopy(data["left_wrist"] @ WRIST_POSITION_OFFSET)) + if ee_name == self.robot.supplemental_info.hand_frame_names["left"] + else np.linalg.inv(deepcopy(data["right_wrist"] @ WRIST_POSITION_OFFSET)) + ) + + def __call__(self, data) -> dict: + processed_data = {} + for ee_name in self.ee_name_list: + # Select wrist data based on ee_name + T_cur = ( + data["left_wrist"] + if ee_name == self.robot.supplemental_info.hand_frame_names["left"] + else data["right_wrist"] + ) + + target_ee_pose = np.zeros((4, 4)) + T_cur_local = self.T_init_inv[ee_name] @ T_cur @ WRIST_POSITION_OFFSET + if ee_name == self.robot.supplemental_info.hand_frame_names["left"]: + target_ee_pose[:3, :3] = ( + self.init_ee_pose[ee_name][:3, :3] + @ np.linalg.inv(G1_HAND_ROTATION) + @ T_cur_local[:3, :3] + @ G1_HAND_ROTATION + ) + target_ee_pose[0, 3] = self.init_ee_pose[ee_name][0, 3] - T_cur_local[2, 3] + target_ee_pose[1, 3] = self.init_ee_pose[ee_name][1, 3] + T_cur_local[1, 3] + target_ee_pose[2, 3] = self.init_ee_pose[ee_name][2, 3] + T_cur_local[0, 3] + else: + target_ee_pose[:3, :3] = ( + self.init_ee_pose[ee_name][:3, :3] + @ np.linalg.inv(G1_HAND_ROTATION) + @ np.linalg.inv(RIGHT_HAND_ROTATION) + @ T_cur_local[:3, :3] + @ RIGHT_HAND_ROTATION + @ G1_HAND_ROTATION + ) + target_ee_pose[0, 3] = self.init_ee_pose[ee_name][0, 3] - T_cur_local[2, 3] + target_ee_pose[1, 3] = self.init_ee_pose[ee_name][1, 3] - T_cur_local[1, 3] + target_ee_pose[2, 3] = self.init_ee_pose[ee_name][2, 3] - T_cur_local[0, 3] + + processed_data[ee_name] = target_ee_pose + + self.latest_data = processed_data + return processed_data