diff --git a/docs/conf.py b/docs/conf.py index 00d7af5ae59..759d2fb2fe6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -141,6 +141,8 @@ # Mock out modules that are not available on RTD autodoc_mock_imports = [ + "boto3", + "botocore", "torch", "torchvision", "numpy", diff --git a/scripts/tools/pretrained_checkpoint.py b/scripts/tools/pretrained_checkpoint.py index a62514eedd9..507ae4360e2 100644 --- a/scripts/tools/pretrained_checkpoint.py +++ b/scripts/tools/pretrained_checkpoint.py @@ -91,9 +91,6 @@ import subprocess import sys -import omni.client -from omni.client._omniclient import CopyBehavior - from isaaclab.utils.pretrained_checkpoint import ( WORKFLOW_EXPERIMENT_NAME_VARIABLE, WORKFLOW_PLAYER, @@ -259,6 +256,8 @@ def publish_pretrained_checkpoint(workflow, task_name, force_publish=False): task_name: The task name. force_publish: Publish without review. """ + import omni.client + from omni.client._omniclient import CopyBehavior # This workflow task pair hasn't been trained if not has_pretrained_checkpoint_job_run(workflow, task_name): diff --git a/scripts/tutorials/03_envs/policy_inference_in_usd.py b/scripts/tutorials/03_envs/policy_inference_in_usd.py index fcef884d9c9..5ebe828d866 100644 --- a/scripts/tutorials/03_envs/policy_inference_in_usd.py +++ b/scripts/tutorials/03_envs/policy_inference_in_usd.py @@ -41,10 +41,9 @@ import os import torch -import omni - from isaaclab.envs import ManagerBasedRLEnv from isaaclab.terrains import TerrainImporterCfg +from isaaclab.utils import client from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR from isaaclab_tasks.manager_based.locomotion.velocity.config.h1.rough_env_cfg import H1RoughEnvCfg_PLAY @@ -54,7 +53,7 @@ def main(): """Main function.""" # load the trained jit policy policy_path = os.path.abspath(args_cli.checkpoint) - file_content = omni.client.read_file(policy_path)[2] + file_content = client.read_file(policy_path)[2] file = io.BytesIO(memoryview(file_content).tobytes()) policy = torch.jit.load(file, map_location=args_cli.device) diff --git a/source/isaaclab/config/extension.toml b/source/isaaclab/config/extension.toml index 623798e931e..10d03fbad51 100644 --- a/source/isaaclab/config/extension.toml +++ b/source/isaaclab/config/extension.toml @@ -1,7 +1,7 @@ [package] # Note: Semantic Versioning is used: https://semver.org/ -version = "0.49.0" +version = "0.49.1" # Description title = "Isaac Lab framework for Robot Learning" diff --git a/source/isaaclab/docs/CHANGELOG.rst b/source/isaaclab/docs/CHANGELOG.rst index ef1f2755aa6..9ca0cbc34bd 100644 --- a/source/isaaclab/docs/CHANGELOG.rst +++ b/source/isaaclab/docs/CHANGELOG.rst @@ -1,6 +1,16 @@ Changelog --------- +0.49.1 (2025-12-02) +~~~~~~~~~~~~~~~~~~~ + +Fixed +^^^^^ + +* Replaced omni.client with light weight implementation of isaaclab.utils.client. + + + 0.49.0 (2025-11-10) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py b/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py index 79b5e5a0031..e4c1c287524 100644 --- a/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py +++ b/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py @@ -21,7 +21,7 @@ from isaaclab.sim import converters, schemas from isaaclab.sim.utils import bind_physics_material, bind_visual_material, clone, select_usd_variants from isaaclab.sim.utils.stage import get_current_stage, is_current_stage_in_memory -from isaaclab.utils.assets import check_usd_path_with_timeout +from isaaclab.utils.assets import check_file_path, retrieve_file_path if TYPE_CHECKING: from . import from_files_cfg @@ -257,16 +257,16 @@ def _spawn_from_usd_file( Raises: FileNotFoundError: If the USD file does not exist at the given path. """ - # check if usd path exists with periodic logging until timeout - if not check_usd_path_with_timeout(usd_path): - if "4.5" in usd_path: - usd_5_0_path = usd_path.replace("http", "https").replace("/4.5", "/5.0") - if not check_usd_path_with_timeout(usd_5_0_path): - raise FileNotFoundError(f"USD file not found at path at either: '{usd_path}' or '{usd_5_0_path}'.") - usd_path = usd_5_0_path - else: - raise FileNotFoundError(f"USD file not found at path at: '{usd_path}'.") - + # check file path exists (supports local paths, S3, HTTP/HTTPS URLs) + # check_file_path returns: 0 (not found), 1 (local), 2 (remote) + file_status = check_file_path(usd_path) + if file_status == 0: + raise FileNotFoundError(f"USD file not found at path: '{usd_path}'.") + + # Download remote files (S3, HTTP, HTTPS) to local cache + # This also downloads all USD dependencies to maintain references + if file_status == 2: + usd_path = retrieve_file_path(usd_path) # spawn asset if it doesn't exist. if not prim_utils.is_prim_path_valid(prim_path): # add prim as reference to stage diff --git a/source/isaaclab/isaaclab/sim/utils/nucleus.py b/source/isaaclab/isaaclab/sim/utils/nucleus.py index cb7af95e555..8957f9d59a3 100644 --- a/source/isaaclab/isaaclab/sim/utils/nucleus.py +++ b/source/isaaclab/isaaclab/sim/utils/nucleus.py @@ -6,8 +6,8 @@ import logging import carb -import omni.client -from omni.client import Result + +from isaaclab.utils import client logger = logging.getLogger(__name__) @@ -28,9 +28,8 @@ def check_server(server: str, path: str, timeout: float = 10.0) -> bool: """ logger.info(f"Checking path: {server}{path}") # Increase hang detection timeout - omni.client.set_hang_detection_time_ms(20000) - result, _ = omni.client.stat(f"{server}{path}") - if result == Result.OK: + result, _ = client.stat(f"{server}{path}") + if result == client.Result.OK: logger.info(f"Success: {server}{path}") return True else: diff --git a/source/isaaclab/isaaclab/utils/assets.py b/source/isaaclab/isaaclab/utils/assets.py index 353767c0310..92e4369e9b3 100644 --- a/source/isaaclab/isaaclab/utils/assets.py +++ b/source/isaaclab/isaaclab/utils/assets.py @@ -5,12 +5,8 @@ """Sub-module that defines the host-server where assets and resources are stored. -By default, we use the Isaac Sim Nucleus Server for hosting assets and resources. This makes +By default, we use S3 or other cloud storage for hosting assets and resources. This makes distribution of the assets easier and makes the repository smaller in size code-wise. - -For more information, please check information on `Omniverse Nucleus`_. - -.. _Omniverse Nucleus: https://docs.omniverse.nvidia.com/nucleus/latest/overview/overview.html """ import asyncio @@ -20,15 +16,14 @@ import tempfile import time from typing import Literal +from urllib.parse import urlparse -import carb -import omni.client +from . import client -# import logger logger = logging.getLogger(__name__) -NUCLEUS_ASSET_ROOT_DIR = carb.settings.get_settings().get("/persistent/isaac/asset_root/cloud") -"""Path to the root directory on the Nucleus Server.""" +NUCLEUS_ASSET_ROOT_DIR = "https://omniverse-content-production.s3-us-west-2.amazonaws.com/Assets/Isaac/5.0" +"""Path to the root directory on the cloud storage.""" NVIDIA_NUCLEUS_DIR = f"{NUCLEUS_ASSET_ROOT_DIR}/NVIDIA" """Path to the root directory on the NVIDIA Nucleus Server.""" @@ -40,8 +35,13 @@ """Path to the ``Isaac/IsaacLab`` directory on the NVIDIA Nucleus Server.""" +def _is_usd_path(path: str) -> bool: + ext = os.path.splitext(urlparse(path).path)[1].lower() + return ext in client.USD_EXTENSIONS + + def check_file_path(path: str) -> Literal[0, 1, 2]: - """Checks if a file exists on the Nucleus Server or locally. + """Checks if a file exists on cloud storage or locally. Args: path: The path to the file. @@ -51,74 +51,94 @@ def check_file_path(path: str) -> Literal[0, 1, 2]: * :obj:`0` if the file does not exist * :obj:`1` if the file exists locally - * :obj:`2` if the file exists on the Nucleus Server + * :obj:`2` if the file exists on cloud storage (S3 or HTTP/HTTPS) """ if os.path.isfile(path): return 1 - # we need to convert backslash to forward slash on Windows for omni.client API - elif omni.client.stat(path.replace(os.sep, "/"))[0] == omni.client.Result.OK: + # we need to convert backslash to forward slash on Windows for client API + elif client.stat(path.replace(os.sep, "/"))[0] == client.Result.OK: return 2 else: return 0 -def retrieve_file_path(path: str, download_dir: str | None = None, force_download: bool = True) -> str: - """Retrieves the path to a file on the Nucleus Server or locally. +def retrieve_file_path( + path: str, + download_dir: str | None = None, + force_download: bool = True, +) -> str: + """Resolve a path to a local file, downloading from Nucleus/HTTP/S3 if needed. - If the file exists locally, then the absolute path to the file is returned. - If the file exists on the Nucleus Server, then the file is downloaded to the local machine - and the absolute path to the file is returned. + Behavior: + * Local file returns its absolute path. + * Remote USD pulls the USD and all referenced assets into ``download_dir`` and returns the + absolute path to the local root USD. + * Other remote files are copied once into ``download_dir`` and that local path is returned. Args: - path: The path to the file. - download_dir: The directory where the file should be downloaded. Defaults to None, in which - case the file is downloaded to the system's temporary directory. - force_download: Whether to force download the file from the Nucleus Server. This will overwrite - the local file if it exists. Defaults to True. - - Returns: - The path to the file on the local machine. + path: Local path or remote URL. + download_dir: Directory to place downloads. Defaults to ``tempfile.gettempdir()``. + force_download: If True, re-download even if the target already exists. Raises: - FileNotFoundError: When the file not found locally or on Nucleus Server. - RuntimeError: When the file cannot be copied from the Nucleus Server to the local machine. This - can happen when the file already exists locally and :attr:`force_download` is set to False. + FileNotFoundError: If the path is neither local nor reachable remotely. + + Returns: + Absolute path to the resolved local file. """ - # check file status - file_status = check_file_path(path) - if file_status == 1: + status = check_file_path(path) + + # Local file + if status == 1: return os.path.abspath(path) - elif file_status == 2: - # resolve download directory + + # Remote file + if status == 2: if download_dir is None: download_dir = tempfile.gettempdir() - else: - download_dir = os.path.abspath(download_dir) - # create download directory if it does not exist - if not os.path.exists(download_dir): - os.makedirs(download_dir) - # download file in temp directory using os - file_name = os.path.basename(omni.client.break_url(path.replace(os.sep, "/")).path) + download_dir = os.path.abspath(download_dir) + os.makedirs(download_dir, exist_ok=True) + + url = path.replace(os.sep, "/") + + # USD → USD + dependencies + if _is_usd_path(url): + mapping = client.download_usd_with_references_sync( + root_url=url, + download_root=download_dir, + force_overwrite=force_download, + progress_callback=lambda done, total, src: logger.debug( + " [%s] %d / %s bytes", src, done, "?" if total is None else str(total) + ), + ) + local_root = mapping.get(client._normalize_url(url)) + if local_root is None: + key = urlparse(url).path.lstrip("/") + local_root = os.path.join(download_dir, key) + return os.path.abspath(local_root) + + # Non-USD → single file download + file_name = os.path.basename(client.break_url(url).path) target_path = os.path.join(download_dir, file_name) - # check if file already exists locally + if not os.path.isfile(target_path) or force_download: - # copy file to local machine - result = omni.client.copy(path.replace(os.sep, "/"), target_path, omni.client.CopyBehavior.OVERWRITE) - if result != omni.client.Result.OK and force_download: - raise RuntimeError(f"Unable to copy file: '{path}'. Is the Nucleus Server running?") + result = client.copy(url, target_path, client.CopyBehavior.OVERWRITE) + if result != client.Result.OK and force_download: + raise RuntimeError(f"Unable to copy file: '{path}' from cloud storage.") return os.path.abspath(target_path) - else: - raise FileNotFoundError(f"Unable to find the file: {path}") + + # Not found anywhere + raise FileNotFoundError(f"Unable to find the file: {path}") def read_file(path: str) -> io.BytesIO: - """Reads a file from the Nucleus Server or locally. + """Reads a file from cloud storage or locally. Args: path: The path to the file. Raises: - FileNotFoundError: When the file not found locally or on Nucleus Server. + FileNotFoundError: When the file not found locally or on cloud storage. Returns: The content of the file. @@ -129,7 +149,7 @@ def read_file(path: str) -> io.BytesIO: with open(path, "rb") as f: return io.BytesIO(f.read()) elif file_status == 2: - file_content = omni.client.read_file(path.replace(os.sep, "/"))[2] + file_content = client.read_file(path.replace(os.sep, "/"))[2] return io.BytesIO(memoryview(file_content).tobytes()) else: raise FileNotFoundError(f"Unable to find the file: {path}") @@ -200,8 +220,8 @@ async def _is_usd_path_available(usd_path: str, timeout: float) -> bool: Whether the given USD path is available on the server. """ try: - result, _ = await asyncio.wait_for(omni.client.stat_async(usd_path), timeout=timeout) - return result == omni.client.Result.OK + result, _ = await asyncio.wait_for(client.stat_async(usd_path), timeout=timeout) + return result == client.Result.OK except asyncio.TimeoutError: logger.warning(f"Timed out after {timeout}s while checking for USD: {usd_path}") return False diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py new file mode 100644 index 00000000000..a4188254de5 --- /dev/null +++ b/source/isaaclab/isaaclab/utils/client.py @@ -0,0 +1,599 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + + +"""Lightweight omni.client-like helpers for local/HTTP/S3 access. + +This module implements a subset of omni.client behaviors used inside Isaac Lab: +path normalization, stat/read helpers, USD reference resolution, and simple +copy/download utilities. It supports local files, HTTP(S), and S3 paths and +provides a small Result enum for consistent status reporting. +""" + +import asyncio +import logging +import os +import posixpath +from collections.abc import Callable +from enum import Enum, IntEnum +from pathlib import Path +from typing import Any, NamedTuple +from urllib.error import HTTPError, URLError +from urllib.parse import urlparse +from urllib.request import Request, urlopen + +import boto3 +from botocore.exceptions import ClientError +from pxr import Sdf + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Basic types +# --------------------------------------------------------------------------- + +OMNI_S3_HOST_PREFIX = "omniverse-content-production.s3-" + + +class Result(IntEnum): + OK = 0 + ERROR_NOT_FOUND = 1 + ERROR_PERMISSION_DENIED = 2 + ERROR_NETWORK = 3 + ERROR_UNKNOWN = 4 + + +class CopyBehavior(Enum): + OVERWRITE = "overwrite" + SKIP = "skip" + + +class UrlParts(NamedTuple): + scheme: str + authority: str + path: str + + +def break_url(url: str) -> UrlParts: + """Parse a URL into (scheme, authority, path) with empty parts when missing.""" + parsed = urlparse(url) + return UrlParts(parsed.scheme or "", parsed.netloc or "", parsed.path or "") + + +_s3 = boto3.client("s3") + + +# --------------------------------------------------------------------------- +# URL helpers +# --------------------------------------------------------------------------- + + +def _is_s3_url(path: str) -> bool: + """Return True if the path uses the ``s3://`` scheme.""" + return path.startswith("s3://") + + +def _is_http_url(path: str) -> bool: + """Return True if the path uses HTTP or HTTPS.""" + scheme = urlparse(path).scheme.lower() + return scheme in ("http", "https") + + +def _is_local_path(path: str) -> bool: + """Return True if the path has no URL scheme (treated as local).""" + # Strong assumption: anything without a scheme is local + return urlparse(path).scheme == "" + + +def _split_s3_url(path: str) -> tuple[str, str]: + """Split an S3 URL into ``(bucket, key)`` or raise on invalid input.""" + parsed = urlparse(path) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + if not bucket or not key: + raise ValueError(f"Invalid S3 URL: {path}") + return bucket, key + + +def _normalize_url(url: str) -> str: + """Convert omniverse S3 URLs to HTTPS; leave others as-is.""" + if not _is_s3_url(url): + return url + parsed = urlparse(url) + if parsed.netloc.startswith(OMNI_S3_HOST_PREFIX): + return f"https://{parsed.netloc}{parsed.path}" + return url + + +def _map_remote_to_local(download_root: str, url: str) -> str: + """Mirror remote path structure under download_root.""" + parsed = urlparse(url) + key = parsed.path.lstrip("/") + local_path = os.path.join(download_root, key) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + return local_path + + +def _resolve_reference_url(base_url: str, ref: str) -> str: + """Resolve a USD asset reference against a base URL (http/s3/local).""" + ref = ref.strip() + if not ref: + return ref + + parsed_ref = urlparse(ref) + if parsed_ref.scheme: + # Already absolute (http://, https://, s3://, etc.) + return ref + + base = urlparse(base_url) + if base.scheme == "": + # Local base + base_dir = os.path.dirname(base_url) + return os.path.normpath(os.path.join(base_dir, ref)) + + # Remote base + base_dir = posixpath.dirname(base.path) + if ref.startswith("/"): + new_path = posixpath.normpath(ref) + else: + new_path = posixpath.normpath(posixpath.join(base_dir, ref)) + return f"{base.scheme}://{base.netloc}{new_path}" + + +# --------------------------------------------------------------------------- +# stat / read_file +# --------------------------------------------------------------------------- + + +def stat(path: str) -> tuple[Result, dict[str, Any] | None]: + """Check whether a remote or local file exists and return basic metadata. + + Args: + path: Local path or remote URL (HTTP/S3). + + Returns: + Tuple of (:class:`Result`, info dict or None). On success, ``info`` may contain + ``size``, ``etag``, ``last_modified``, and ``content_type`` when available. + The :class:`Result` code is one of ``OK``, ``ERROR_NOT_FOUND``, + ``ERROR_PERMISSION_DENIED``, ``ERROR_NETWORK``, or ``ERROR_UNKNOWN``. + """ + url = _normalize_url(path) + + # HTTP(S) + if _is_http_url(url): + try: + req = Request(url, method="HEAD") + with urlopen(req) as resp: + size_header = resp.headers.get("Content-Length") + info = { + "size": int(size_header) if size_header is not None else None, + "etag": resp.headers.get("ETag"), + "last_modified": resp.headers.get("Last-Modified"), + "content_type": resp.headers.get("Content-Type"), + } + return Result.OK, info + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND, None + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED, None + logger.warning("HTTP error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + except URLError as exc: + logger.warning("Network error in stat(%s): %s", url, exc) + return Result.ERROR_NETWORK, None + except Exception as exc: + logger.warning("Unexpected error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + + # S3 (non-omniverse) + if _is_s3_url(url): + bucket, key = _split_s3_url(url) + try: + response = _s3.head_object(Bucket=bucket, Key=key) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND, None + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED, None + logger.warning("Error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + + info = { + "size": response.get("ContentLength"), + "etag": response.get("ETag"), + "last_modified": response.get("LastModified"), + "content_type": response.get("ContentType"), + } + return Result.OK, info + + # Local + if _is_local_path(url) and os.path.exists(url): + try: + size = os.path.getsize(url) + except OSError: + size = None + info = { + "size": size, + "etag": None, + "last_modified": None, + "content_type": None, + } + return Result.OK, info + + return Result.ERROR_NOT_FOUND, None + + +async def stat_async(path: str) -> tuple[Result, dict[str, Any] | None]: + """Async wrapper for :func:`stat`.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, stat, path) + + +def read_file(path: str) -> tuple[Result, dict[str, Any], memoryview]: + """Read file content from HTTP(S), S3, or local.""" + url = _normalize_url(path) + + # HTTP(S) + if _is_http_url(url): + try: + with urlopen(url) as resp: + data_bytes = resp.read() + meta = { + "size": len(data_bytes), + "content_type": resp.headers.get("Content-Type"), + } + return Result.OK, meta, memoryview(data_bytes) + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED, {}, memoryview(b"") + logger.warning("HTTP error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + except URLError as exc: + logger.warning("Network error in read_file(%s): %s", url, exc) + return Result.ERROR_NETWORK, {}, memoryview(b"") + except Exception as exc: + logger.warning("Unexpected error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + + # S3 + if _is_s3_url(url): + bucket, key = _split_s3_url(url) + try: + obj = _s3.get_object(Bucket=bucket, Key=key) + data_bytes = obj["Body"].read() + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED, {}, memoryview(b"") + logger.warning("Error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + + meta = { + "size": len(data_bytes), + "content_type": obj.get("ContentType"), + } + return Result.OK, meta, memoryview(data_bytes) + + # Local + if _is_local_path(url) and os.path.isfile(url): + with open(url, "rb") as f: + data_bytes = f.read() + meta = {"size": len(data_bytes), "content_type": None} + return Result.OK, meta, memoryview(data_bytes) + + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + + +async def read_file_async(path: str) -> tuple[Result, dict[str, Any], memoryview]: + """Async wrapper for :func:`read_file`.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, read_file, path) + + +# --------------------------------------------------------------------------- +# copy +# --------------------------------------------------------------------------- + + +def copy( + src: str, + dst: str, + behavior: CopyBehavior = CopyBehavior.OVERWRITE, + progress_callback: Callable[[int, int | None, str], None] | None = None, + chunk_size: int = 8 * 1024 * 1024, +) -> Result: + """Copy between local and remote (HTTP/S3) locations. + + Supported directions: + * HTTP/HTTPS → local + * S3 → local + + Args: + src: Source path or URL. + dst: Destination path or URL. + behavior: Overwrite policy for local targets. + progress_callback: Optional ``cb(done_bytes, total_bytes_or_None, src)``. + chunk_size: Chunk size for streamed copies. + + Returns: + Result enum indicating success or failure reason. + """ + src = _normalize_url(src) + + if os.path.exists(dst) and behavior == CopyBehavior.SKIP: + return Result.OK + + # HTTP(S) -> local + if _is_http_url(src) and _is_local_path(dst): + os.makedirs(os.path.dirname(dst) or ".", exist_ok=True) + try: + with urlopen(src) as resp: + size_header = resp.headers.get("Content-Length") + total_size = int(size_header) if size_header is not None else None + + transferred = 0 + with open(dst, "wb") as f: + while True: + chunk = resp.read(chunk_size) + if not chunk: + break + f.write(chunk) + transferred += len(chunk) + if progress_callback: + progress_callback(transferred, total_size, src) + return Result.OK + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED + logger.warning("HTTP error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_UNKNOWN + except URLError as exc: + logger.warning("Network error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_NETWORK + except Exception as exc: + logger.warning("Unexpected error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + # S3 -> local + if _is_s3_url(src) and _is_local_path(dst): + bucket, key = _split_s3_url(src) + os.makedirs(os.path.dirname(dst) or ".", exist_ok=True) + try: + head = _s3.head_object(Bucket=bucket, Key=key) + total_size = head.get("ContentLength") + obj = _s3.get_object(Bucket=bucket, Key=key) + body = obj["Body"] + + transferred = 0 + with open(dst, "wb") as f: + while True: + chunk = body.read(chunk_size) + if not chunk: + break + f.write(chunk) + transferred += len(chunk) + if progress_callback: + progress_callback(transferred, total_size, src) + return Result.OK + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED + logger.warning("Error copying S3->local (%s -> %s): %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + logger.error("Copy combination not supported: %s -> %s", src, dst) + return Result.ERROR_UNKNOWN + + +async def copy_async( + src: str, + dst: str, + behavior: CopyBehavior = CopyBehavior.OVERWRITE, + progress_callback: Callable[[int, int | None, str], None] | None = None, + chunk_size: int = 8 * 1024 * 1024, +) -> Result: + """Async wrapper for :func:`copy` with the same arguments.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, copy, src, dst, behavior, progress_callback, chunk_size) + + +# --------------------------------------------------------------------------- +# USD dependency resolution +# --------------------------------------------------------------------------- + +USD_EXTENSIONS = {".usd", ".usda", ".usdc"} + + +_DOWNLOADABLE_EXTS = { + ".usd", + ".usda", + ".usdz", + ".png", + ".jpg", + ".jpeg", + ".exr", + ".hdr", + ".tif", + ".tiff", +} + + +def _is_usd_file(path: str) -> bool: + """Return True if path ends with a USD extension.""" + return Path(path).suffix.lower() in USD_EXTENSIONS + + +def _is_downloadable_asset(path: str) -> bool: + """Return True for USD or other asset types we mirror locally (textures, etc.).""" + clean = path.split("?", 1)[0].split("#", 1)[0] + suffix = Path(clean).suffix.lower() + + if suffix == ".mdl": + # MDL modules (OmniPBR.mdl, OmniSurface.mdl, ...) come from MDL search paths + return False + if not suffix: + return False + if suffix not in _DOWNLOADABLE_EXTS: + return False + return True + + +def _find_usd_references(local_usd_path: str) -> set[str]: + """Use Sdf API to collect referenced assets from a USD layer.""" + try: + layer = Sdf.Layer.FindOrOpen(local_usd_path) + except Exception: + logger.warning("Failed to open USD layer: %s", local_usd_path, exc_info=True) + return set() + + if layer is None: + return set() + + refs: set[str] = set() + + # Sublayers + for sub_path in getattr(layer, "subLayerPaths", []) or []: + if sub_path and _is_downloadable_asset(sub_path): + refs.add(str(sub_path)) + + def _walk_prim(prim_spec: Sdf.PrimSpec) -> None: + # References + ref_list = prim_spec.referenceList + for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): + items = getattr(ref_list, field, None) + if not items: + continue + for ref in items: + asset_path = getattr(ref, "assetPath", None) + if asset_path and _is_downloadable_asset(asset_path): + refs.add(str(asset_path)) + + # Payloads + payload_list = prim_spec.payloadList + for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): + items = getattr(payload_list, field, None) + if not items: + continue + for payload in items: + asset_path = getattr(payload, "assetPath", None) + if asset_path and _is_downloadable_asset(asset_path): + refs.add(str(asset_path)) + + # AssetPath-valued attributes (this is where OmniPBR.mdl, textures, etc. show up) + for attr_spec in prim_spec.attributes.values(): + default = attr_spec.default + if isinstance(default, Sdf.AssetPath): + if default.path and _is_downloadable_asset(default.path): + refs.add(default.path) + elif isinstance(default, Sdf.AssetPathArray): + for ap in default: + if ap.path and _is_downloadable_asset(ap.path): + refs.add(ap.path) + + for child in prim_spec.nameChildren.values(): + _walk_prim(child) + + for root_prim in layer.rootPrims.values(): + _walk_prim(root_prim) + + return refs + + +async def download_usd_with_references( + root_usd_s3_url: str, + download_root: str, + force_overwrite: bool = True, + progress_callback: Callable[[int, int | None, str], None] | None = None, +) -> dict[str, str]: + """Download a USD and all referenced assets to a local mirror. + + Traverses the USD dependency graph, downloading each referenced asset (USD, textures, etc.) + into ``download_root`` while preserving the relative directory structure. Returns a mapping + from normalized remote URLs to their local file paths. + + Args: + root_usd_s3_url: Root USD URL (S3/HTTP). + download_root: Local root directory to mirror into. + force_overwrite: If True, overwrite existing files; otherwise skip. + progress_callback: Optional ``cb(done_bytes, total_bytes_or_None, src)``. + + Returns: + Dict mapping normalized remote URLs to local paths. + """ + os.makedirs(download_root, exist_ok=True) + + root_url = _normalize_url(root_usd_s3_url) + to_visit = [root_url] + visited: set[str] = set() + mapping: dict[str, str] = {} + + while to_visit: + current_url = _normalize_url(to_visit.pop()) + if current_url in visited: + continue + visited.add(current_url) + + local_path = _map_remote_to_local(download_root, current_url) + mapping[current_url] = local_path + + behavior = CopyBehavior.OVERWRITE if force_overwrite else CopyBehavior.SKIP + logger.info("Downloading asset %s -> %s", current_url, local_path) + res = await copy_async(current_url, local_path, behavior=behavior, progress_callback=progress_callback) + if res != Result.OK: + logger.warning("Failed to download %s (Result=%s)", current_url, res) + continue + + if _is_usd_file(local_path): + for ref in _find_usd_references(local_path): + dep_url = _resolve_reference_url(current_url, ref) + dep_url = _normalize_url(dep_url) + if dep_url and dep_url not in visited: + to_visit.append(dep_url) + + return mapping + + +def download_usd_with_references_sync( + root_url: str, + download_root: str, + force_overwrite: bool = True, + progress_callback: Callable[[int, int | None, str], None] | None = None, +) -> dict[str, str]: + """Synchronous wrapper for :func:`download_usd_with_references`. Safe for IsaacLab scripts + + NOT safe to call from inside a running event loop (e.g. Isaac Sim / Kit). + In that case, call `await download_usd_with_references(...)` directly. + """ + # If there's a running loop (Kit / Jupyter / etc.), don't try to block it. + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop → safe to own one; asyncio.run handles creation/cleanup. + return asyncio.run( + download_usd_with_references( + root_url, + download_root, + force_overwrite=force_overwrite, + progress_callback=progress_callback, + ) + ) + else: + # Already inside an event loop: this wrapper must not be used. + raise RuntimeError( + "download_usd_with_references_sync() was called while an event loop is running.\n" + "Use `await download_usd_with_references(...)` or schedule it with " + "`asyncio.create_task` instead of calling the sync wrapper." + ) diff --git a/source/isaaclab/setup.py b/source/isaaclab/setup.py index 75fe5b9a3e7..6c436272fb7 100644 --- a/source/isaaclab/setup.py +++ b/source/isaaclab/setup.py @@ -23,6 +23,9 @@ "onnx>=1.18.0", # 1.16.2 throws access violation on Windows "prettytable==3.3.0", "toml", + # asset management + "boto3", + "botocore", # devices "hidapi==0.14.0.post2", # reinforcement learning