Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 30 additions & 10 deletions isaaclab_arena/environments/arena_env_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@

import argparse
import gymnasium as gym
from pathlib import Path

from isaaclab.envs import ManagerBasedRLMimicEnv
from isaaclab.envs.manager_based_env import ManagerBasedEnv
from isaaclab.managers.recorder_manager import RecorderManagerBaseCfg
from isaaclab.scene import InteractiveSceneCfg
from isaaclab.utils.io import dump_yaml
from isaaclab_tasks.utils import parse_env_cfg

from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment
Expand All @@ -28,9 +30,15 @@ class ArenaEnvBuilder:

DEFAULT_SCENE_CFG = InteractiveSceneCfg(num_envs=4096, env_spacing=30.0, replicate_physics=False)

def __init__(self, arena_env: IsaacLabArenaEnvironment, args: argparse.Namespace):
def __init__(
self, arena_env: IsaacLabArenaEnvironment, args: argparse.Namespace, serialization_file_path: str = None
):
self.arena_env = arena_env
self.args = args
self.serialization_file_path = serialization_file_path if serialization_file_path is not None else "/tmp/"
self.serialization_file_path = Path(self.serialization_file_path).joinpath(
f"{self.arena_env.name}_cfg_entry.yaml"
)

def orchestrate(self) -> None:
"""Orchestrate the environment member interaction"""
Expand Down Expand Up @@ -183,17 +191,27 @@ def get_entry_point(self) -> str | type[ManagerBasedRLMimicEnv]:
else:
return "isaaclab.envs:ManagerBasedRLEnv"

def build_registered(
self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None
) -> tuple[str, IsaacLabArenaManagerBasedRLEnvCfg]:
"""Register Gym env and parse runtime cfg."""
name = self.arena_env.name
def build_cfg_entry(
self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False
) -> IsaacLabArenaManagerBasedRLEnvCfg:
# orchestrate the environment member interaction
self.orchestrate()
cfg_entry = env_cfg if env_cfg is not None else self.compose_manager_cfg()
# THIS IS A WORKAROUND TO ALLOW USER TO GRADUALLY MOVE TO THE NEW CONFIGURATION SYSTEM.
# THIS WILL BE REMOVED IN THE FUTURE.
cfg_entry = self.modify_env_cfg(cfg_entry)
# serialize the configuration if requested
if serialize:
dump_yaml(self.serialization_file_path, cfg_entry)
return cfg_entry

def build_registered(
self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False
) -> tuple[str, IsaacLabArenaManagerBasedRLEnvCfg]:
"""Register Gym env and parse runtime cfg."""
name = self.arena_env.name
cfg_entry = self.build_cfg_entry(env_cfg, serialize=serialize)

entry_point = self.get_entry_point()
gym.register(
id=name,
Expand All @@ -209,12 +227,14 @@ def build_registered(
)
return name, cfg

def make_registered(self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None) -> ManagerBasedEnv:
env, _ = self.make_registered_and_return_cfg(env_cfg)
def make_registered(
self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False
) -> ManagerBasedEnv:
env, _ = self.make_registered_and_return_cfg(env_cfg, serialize=serialize)
return env

def make_registered_and_return_cfg(
self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None
self, env_cfg: None | IsaacLabArenaManagerBasedRLEnvCfg = None, serialize: bool = False
) -> tuple[ManagerBasedEnv, IsaacLabArenaManagerBasedRLEnvCfg]:
name, cfg = self.build_registered(env_cfg)
name, cfg = self.build_registered(env_cfg, serialize=serialize)
return gym.make(name, cfg=cfg).unwrapped, cfg
64 changes: 64 additions & 0 deletions isaaclab_arena/tasks/generic_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0

from isaaclab_arena.tasks.task_base import TaskBase


class GenericTask(TaskBase):
"""Generic task wrapper for deserialized task data."""

def __init__(self, scene_cfg, events_cfg, termination_cfg,
observation_cfg, rewards_cfg, curriculum_cfg, commands_cfg,
episode_length_s=None):
super().__init__(episode_length_s=episode_length_s)

# Store deserialized config attributes
self.scene_config = scene_cfg
self.events_cfg_data = events_cfg
self.termination_cfg_data = termination_cfg
self.observation_cfg_data = observation_cfg
self.rewards_cfg_data = rewards_cfg
self.curriculum_cfg_data = curriculum_cfg
self.commands_cfg_data = commands_cfg

def get_scene_cfg(self):
"""Returns task-specific scene config if available."""
return self.scene_config

def get_termination_cfg(self):
"""Returns task-specific termination config."""
return self.termination_cfg_data

def get_events_cfg(self):
"""Returns task-specific events config."""
return self.events_cfg_data

def get_observation_cfg(self):
"""Returns task-specific observation config."""
return self.observation_cfg_data

def get_rewards_cfg(self):
"""Returns task-specific rewards config."""
return self.rewards_cfg_data

def get_curriculum_cfg(self):
"""Returns task-specific curriculum config."""
return self.curriculum_cfg_data

def get_commands_cfg(self):
"""Returns task-specific commands config."""
return self.commands_cfg_data

def get_prompt(self):
"""Returns task prompt if available."""
return self.task_data.get('prompt', '')

def get_mimic_env_cfg(self, embodiment_name: str):
"""Returns mimic env config (not available in generic task)."""
return None

def get_metrics(self):
"""Returns empty metrics list (metrics are stored at cfg level)."""
return []
126 changes: 126 additions & 0 deletions isaaclab_arena/tests/test_config_serializations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# Copyright (c) 2025, The Isaac Lab Arena Project Developers (https://github.com/isaac-sim/IsaacLab-Arena/blob/main/CONTRIBUTORS.md).
# All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0

from pathlib import Path
import gymnasium as gym
from tqdm import tqdm
import torch

from isaaclab_arena.tests.utils.subprocess import run_simulation_app_function

CFG_YAML_PATH = "/tmp/test_config_serializations.yaml"
NUM_STEPS = 100
HEADLESS = True

def _test_config_serializations(simulation_app) -> bool:

from isaaclab_arena.utils.config_serialization import load_env_cfg_from_yaml
from isaaclab_arena.metrics.metrics import compute_metrics
from isaaclab_tasks.utils.parse_cfg import parse_env_cfg
from isaaclab_arena.assets.asset_registry import AssetRegistry
from isaaclab_arena.assets.object_reference import ObjectReference
from isaaclab_arena.scene.scene import Scene
from isaaclab_arena.environments.isaaclab_arena_environment import IsaacLabArenaEnvironment
from isaaclab_arena.tasks.pick_and_place_task import PickAndPlaceTask
from isaaclab_arena.utils.pose import Pose
from isaaclab_arena.assets.object_base import ObjectType
from isaaclab_arena.environments.arena_env_builder import ArenaEnvBuilder
from isaaclab_arena.cli.isaaclab_arena_cli import get_isaaclab_arena_cli_parser

args_cli = get_isaaclab_arena_cli_parser().parse_args([])
asset_registry = AssetRegistry()
background = asset_registry.get_asset_by_name("kitchen")()
cracker_box = asset_registry.get_asset_by_name("cracker_box")()
embodiment = asset_registry.get_asset_by_name("gr1_joint")()

cracker_box.set_initial_pose(
Pose(
position_xyz=(0.4, 0.0, 0.1),
rotation_wxyz=(1.0, 0.0, 0.0, 0.0),
)
)
destination_location = ObjectReference(
name="destination_location",
prim_path="{ENV_REGEX_NS}/kitchen/Cabinet_B_02",
parent_asset=background,
object_type=ObjectType.RIGID,
)

scene = Scene(assets=[background, cracker_box, destination_location])
isaaclab_arena_environment = IsaacLabArenaEnvironment(
name="kitchen_pick_and_place",
embodiment=embodiment,
scene=scene,
task=PickAndPlaceTask(cracker_box, destination_location, background),
teleop_device=None,
)

try:
env_builder = ArenaEnvBuilder(isaaclab_arena_environment, args_cli)
env_builder.serialization_file_path = CFG_YAML_PATH
cfg_entry_from_cli = env_builder.build_cfg_entry(serialize=True)
except Exception as e:
print(f"Error: {e}")
return False

assert Path(CFG_YAML_PATH).exists()

cfg_entry_from_yaml = load_env_cfg_from_yaml(CFG_YAML_PATH)
# test env can be created from the yaml file
name = "kitchen_pick_and_place"
entry_point = "isaaclab.envs:ManagerBasedRLEnv"
try:
gym.register(
id=name,
entry_point=entry_point,
kwargs={"env_cfg_entry_point": cfg_entry_from_yaml},
disable_env_checker=True,
)

cfg = parse_env_cfg(
name,
device="cuda:0",
num_envs=1,
use_fabric=False,
)

# Create environment
print("[INFO] Creating environment...")
env = gym.make(name, cfg=cfg).unwrapped
env.reset()
except Exception as e:
print(f"Error: {e}")
return False

try:

# Run some zero actions.
for _ in tqdm(range(NUM_STEPS)):
with torch.inference_mode():
actions = torch.zeros(env.action_space.shape, device=env.unwrapped.device)
env.step(actions)

metrics = compute_metrics(env)
assert metrics is not None
assert "num_episodes" in metrics
assert "success_rate" in metrics
assert "object_moved_rate" in metrics

finally:
env.close()

return True


def test_config_serializations():
result = run_simulation_app_function(
_test_config_serializations,
headless=HEADLESS,
)
assert result, f"Test {test_config_serializations.__name__} failed"


if __name__ == "__main__":
test_config_serializations()
Loading