diff --git a/docs/conf.py b/docs/conf.py index 5c8ed07a683..f3f94ad5929 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -128,6 +128,8 @@ # Mock out modules that are not available on RTD autodoc_mock_imports = [ + "boto3", + "botocore", "torch", "torchvision", "numpy", diff --git a/source/isaaclab/isaaclab/sim/utils/nucleus.py b/source/isaaclab/isaaclab/sim/utils/nucleus.py new file mode 100644 index 00000000000..f9d6ed1216f --- /dev/null +++ b/source/isaaclab/isaaclab/sim/utils/nucleus.py @@ -0,0 +1,74 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + +import logging + +import carb + +from isaaclab.utils import client + +logger = logging.getLogger(__name__) + +DEFAULT_ASSET_ROOT_PATH_SETTING = "/persistent/isaac/asset_root/default" +DEFAULT_ASSET_ROOT_TIMEOUT_SETTING = "/persistent/isaac/asset_root/timeout" + + +def check_server(server: str, path: str, timeout: float = 10.0) -> bool: + """Check a specific server for a path + + Args: + server (str): Name of Nucleus server + path (str): Path to search + timeout (float): Default value: 10 seconds + + Returns: + bool: True if folder is found + """ + logger.info(f"Checking path: {server}{path}") + result, _ = client.stat(f"{server}{path}") + if result == client.Result.OK: + logger.info(f"Success: {server}{path}") + return True + else: + logger.info(f"Failure: {server}{path} not accessible") + return False + + +def get_assets_root_path(*, skip_check: bool = False) -> str: + """Tries to find the root path to the Isaac Sim assets on a Nucleus server + + Args: + skip_check (bool): If True, skip the checking step to verify that the resolved path exists. + + Raises: + RuntimeError: if the root path setting is not set. + RuntimeError: if the root path is not found. + + Returns: + url (str): URL of Nucleus server with root path to assets folder. + """ + + # get timeout + timeout = carb.settings.get_settings().get(DEFAULT_ASSET_ROOT_TIMEOUT_SETTING) + if not isinstance(timeout, (int, float)): + timeout = 10.0 + + # resolve path + logger.info(f"Check {DEFAULT_ASSET_ROOT_PATH_SETTING} setting") + default_asset_root = carb.settings.get_settings().get(DEFAULT_ASSET_ROOT_PATH_SETTING) + if not default_asset_root: + raise RuntimeError(f"The '{DEFAULT_ASSET_ROOT_PATH_SETTING}' setting is not set") + if skip_check: + return default_asset_root + + # check path + result = check_server(default_asset_root, "/Isaac", timeout) + if result: + result = check_server(default_asset_root, "/NVIDIA", timeout) + if result: + logger.info(f"Assets root found at {default_asset_root}") + return default_asset_root + + raise RuntimeError(f"Could not find assets root folder: {default_asset_root}") diff --git a/source/isaaclab/isaaclab/sim/utils/stage.py b/source/isaaclab/isaaclab/sim/utils/stage.py index 80d17331921..94adde59baa 100644 --- a/source/isaaclab/isaaclab/sim/utils/stage.py +++ b/source/isaaclab/isaaclab/sim/utils/stage.py @@ -491,10 +491,10 @@ def add_reference_to_stage(usd_path: str, prim_path: str, prim_type: str = "Xfor # Download remote files to local cache if file_status == 2: - logger.info(f"Downloading remote USD file: {original_usd_path}") + logger.debug(f"Downloading remote USD file: {original_usd_path}") try: usd_path = retrieve_file_path(usd_path, force_download=False) - logger.info(f" Downloaded to: {usd_path}") + logger.debug(f" Downloaded to: {usd_path}") except Exception as e: raise FileNotFoundError(f"Failed to download USD file from {original_usd_path}: {e}") diff --git a/source/isaaclab/isaaclab/utils/assets.py b/source/isaaclab/isaaclab/utils/assets.py index bb9d386c296..af7e419461c 100644 --- a/source/isaaclab/isaaclab/utils/assets.py +++ b/source/isaaclab/isaaclab/utils/assets.py @@ -5,27 +5,22 @@ """Sub-module that defines the host-server where assets and resources are stored. -By default, we use the Isaac Sim Nucleus Server for hosting assets and resources. This makes +By default, we use S3 or other cloud storage for hosting assets and resources. This makes distribution of the assets easier and makes the repository smaller in size code-wise. - -For more information, please check information on `Omniverse Nucleus`_. - -.. _Omniverse Nucleus: https://docs.omniverse.nvidia.com/nucleus/latest/overview/overview.html """ +import asyncio import io import logging import os -import posixpath import tempfile -from pathlib import Path +import time from typing import Literal from urllib.parse import urlparse -import omni.client +from . import client logger = logging.getLogger(__name__) -from pxr import Sdf NUCLEUS_ASSET_ROOT_DIR = "https://omniverse-content-production.s3-us-west-2.amazonaws.com/Assets/Isaac/5.1" """Path to the root directory on the cloud storage.""" @@ -42,8 +37,13 @@ USD_EXTENSIONS = {".usd", ".usda", ".usdz"} +def _is_usd_path(path: str) -> bool: + ext = os.path.splitext(urlparse(path).path)[1].lower() + return ext in client.USD_EXTENSIONS + + def check_file_path(path: str) -> Literal[0, 1, 2]: - """Checks if a file exists on the Nucleus Server or locally. + """Checks if a file exists on cloud storage or locally. Args: path: The path to the file. @@ -53,96 +53,94 @@ def check_file_path(path: str) -> Literal[0, 1, 2]: * :obj:`0` if the file does not exist * :obj:`1` if the file exists locally - * :obj:`2` if the file exists on the Nucleus Server + * :obj:`2` if the file exists on cloud storage (S3 or HTTP/HTTPS) """ if os.path.isfile(path): return 1 - # we need to convert backslash to forward slash on Windows for omni.client API - elif omni.client.stat(path.replace(os.sep, "/"))[0] == omni.client.Result.OK: + # we need to convert backslash to forward slash on Windows for client API + elif client.stat(path.replace(os.sep, "/"))[0] == client.Result.OK: return 2 else: return 0 -def retrieve_file_path(path: str, download_dir: str | None = None, force_download: bool = True) -> str: - """Retrieves the path to a file on the Nucleus Server or locally. +def retrieve_file_path( + path: str, + download_dir: str | None = None, + force_download: bool = True, +) -> str: + """Resolve a path to a local file, downloading from Nucleus/HTTP/S3 if needed. - If the file exists locally, then the absolute path to the file is returned. - If the file exists on the Nucleus Server, then the file is downloaded to the local machine - and the absolute path to the file is returned. + Behavior: + * Local file returns its absolute path. + * Remote USD pulls the USD and all referenced assets into ``download_dir`` and returns the + absolute path to the local root USD. + * Other remote files are copied once into ``download_dir`` and that local path is returned. Args: - path: The path to the file. - download_dir: The directory where the file should be downloaded. Defaults to None, in which - case the file is downloaded to the system's temporary directory. - force_download: Whether to force download the file from the Nucleus Server. This will overwrite - the local file if it exists. Defaults to True. - - Returns: - The path to the file on the local machine. + path: Local path or remote URL. + download_dir: Directory to place downloads. Defaults to ``tempfile.gettempdir()``. + force_download: If True, re-download even if the target already exists. Raises: - FileNotFoundError: When the file not found locally or on Nucleus Server. - RuntimeError: When the file cannot be copied from the Nucleus Server to the local machine. This - can happen when the file already exists locally and :attr:`force_download` is set to False. + FileNotFoundError: If the path is neither local nor reachable remotely. + + Returns: + Absolute path to the resolved local file. """ - # check file status - file_status = check_file_path(path) - if file_status == 1: + status = check_file_path(path) + + # Local file + if status == 1: return os.path.abspath(path) - elif file_status == 2: - # resolve download directory + + # Remote file + if status == 2: if download_dir is None: download_dir = tempfile.gettempdir() - else: - download_dir = os.path.abspath(download_dir) - # create download directory if it does not exist - if not os.path.exists(download_dir): - os.makedirs(download_dir) - # recursive download: mirror remote tree under download_dir - remote_url = path.replace(os.sep, "/") - to_visit = [remote_url] - visited = set() - local_root = None - - while to_visit: - cur_url = to_visit.pop() - if cur_url in visited: - continue - visited.add(cur_url) - - cur_rel = urlparse(cur_url).path.lstrip("/") - target_path = os.path.join(download_dir, cur_rel) - os.makedirs(os.path.dirname(target_path), exist_ok=True) - - if not os.path.isfile(target_path) or force_download: - result = omni.client.copy(cur_url, target_path, omni.client.CopyBehavior.OVERWRITE) - if result != omni.client.Result.OK and force_download: - raise RuntimeError(f"Unable to copy file: '{cur_url}'. Is the Nucleus Server running?") - + download_dir = os.path.abspath(download_dir) + os.makedirs(download_dir, exist_ok=True) + + url = path.replace(os.sep, "/") + + # USD → USD + dependencies + if _is_usd_path(url): + mapping = client.download_usd_with_references_sync( + root_url=url, + download_root=download_dir, + force_overwrite=force_download, + progress_callback=lambda done, total, src: logger.debug( + " [%s] %d / %s bytes", src, done, "?" if total is None else str(total) + ), + ) + local_root = mapping.get(client._normalize_url(url)) if local_root is None: - local_root = target_path + key = urlparse(url).path.lstrip("/") + local_root = os.path.join(download_dir, key) + return os.path.abspath(local_root) - # recurse into USD dependencies and referenced assets - if Path(target_path).suffix.lower() in USD_EXTENSIONS: - for ref in _find_usd_references(target_path): - ref_url = _resolve_reference_url(cur_url, ref) - if ref_url and ref_url not in visited: - to_visit.append(ref_url) + # Non-USD → single file download + file_name = os.path.basename(client.break_url(url).path) + target_path = os.path.join(download_dir, file_name) - return os.path.abspath(local_root) - else: - raise FileNotFoundError(f"Unable to find the file: {path}") + if not os.path.isfile(target_path) or force_download: + result = client.copy(url, target_path, client.CopyBehavior.OVERWRITE) + if result != client.Result.OK and force_download: + raise RuntimeError(f"Unable to copy file: '{path}' from cloud storage.") + return os.path.abspath(target_path) + + # Not found anywhere + raise FileNotFoundError(f"Unable to find the file: {path}") def read_file(path: str) -> io.BytesIO: - """Reads a file from the Nucleus Server or locally. + """Reads a file from cloud storage or locally. Args: path: The path to the file. Raises: - FileNotFoundError: When the file not found locally or on Nucleus Server. + FileNotFoundError: When the file not found locally or on cloud storage. Returns: The content of the file. @@ -153,113 +151,82 @@ def read_file(path: str) -> io.BytesIO: with open(path, "rb") as f: return io.BytesIO(f.read()) elif file_status == 2: - file_content = omni.client.read_file(path.replace(os.sep, "/"))[2] + file_content = client.read_file(path.replace(os.sep, "/"))[2] return io.BytesIO(memoryview(file_content).tobytes()) else: raise FileNotFoundError(f"Unable to find the file: {path}") -def _is_downloadable_asset(path: str) -> bool: - """Return True for USD or other asset types we mirror locally (textures, etc.).""" - clean = path.split("?", 1)[0].split("#", 1)[0] - suffix = Path(clean).suffix.lower() +""" +Nucleus Connection. +""" - if suffix == ".mdl": - # MDL modules (OmniPBR.mdl, OmniSurface.mdl, ...) come from MDL search paths - return False - if not suffix: - return False - if suffix not in {".usd", ".usda", ".usdz", ".png", ".jpg", ".jpeg", ".exr", ".hdr", ".tif", ".tiff"}: - return False - return True +def check_usd_path_with_timeout(usd_path: str, timeout: float = 300, log_interval: float = 30) -> bool: + """Checks whether the given USD file path is available on the NVIDIA Nucleus server. + + This function synchronously runs an asynchronous USD path availability check, + logging progress periodically until it completes. The file is available on the server + if the HTTP status code is 200. Otherwise, the file is not available on the server. -def _find_usd_references(local_usd_path: str) -> set[str]: - """Use Sdf API to collect referenced assets from a USD layer.""" + This is useful for checking server responsiveness before attempting to load a remote + asset. It will block execution until the check completes or times out. + + Args: + usd_path: The remote USD file path to check. + timeout: Maximum time (in seconds) to wait for the server check. + log_interval: Interval (in seconds) at which progress is logged. + + Returns: + Whether the given USD path is available on the server. + """ + start_time = time.time() + loop = asyncio.get_event_loop() + + coroutine = _is_usd_path_available(usd_path, timeout) + task = asyncio.ensure_future(coroutine) + + next_log_time = start_time + log_interval + + first_log = True + while not task.done(): + now = time.time() + if now >= next_log_time: + elapsed = int(now - start_time) + if first_log: + logger.warning(f"Checking server availability for USD path: {usd_path} (timeout: {timeout}s)") + first_log = False + logger.warning(f"Waiting for server response... ({elapsed}s elapsed)") + next_log_time += log_interval + loop.run_until_complete(asyncio.sleep(0.1)) # Yield to allow async work + + return task.result() + + +""" +Helper functions. +""" + + +async def _is_usd_path_available(usd_path: str, timeout: float) -> bool: + """Checks whether the given USD path is available on the Omniverse Nucleus server. + + This function is a asynchronous routine to check the availability of the given USD path on the Omniverse Nucleus server. + It will return True if the USD path is available on the server, False otherwise. + + Args: + usd_path: The remote or local USD file path to check. + timeout: Timeout in seconds for the async stat call. + + Returns: + Whether the given USD path is available on the server. + """ try: - layer = Sdf.Layer.FindOrOpen(local_usd_path) - except Exception: - logger.warning("Failed to open USD layer: %s", local_usd_path, exc_info=True) - return set() - - if layer is None: - return set() - - refs: set[str] = set() - - # Sublayers - for sub_path in getattr(layer, "subLayerPaths", []) or []: - if sub_path and _is_downloadable_asset(sub_path): - refs.add(str(sub_path)) - - def _walk_prim(prim_spec: Sdf.PrimSpec) -> None: - # References - ref_list = prim_spec.referenceList - for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): - items = getattr(ref_list, field, None) - if not items: - continue - for ref in items: - asset_path = getattr(ref, "assetPath", None) - if asset_path and _is_downloadable_asset(asset_path): - refs.add(str(asset_path)) - - # Payloads - payload_list = prim_spec.payloadList - for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): - items = getattr(payload_list, field, None) - if not items: - continue - for payload in items: - asset_path = getattr(payload, "assetPath", None) - if asset_path and _is_downloadable_asset(asset_path): - refs.add(str(asset_path)) - - # AssetPath-valued attributes (this is where OmniPBR.mdl, textures, etc. show up) - for attr_spec in prim_spec.attributes.values(): - default = attr_spec.default - if isinstance(default, Sdf.AssetPath): - if default.path and _is_downloadable_asset(default.path): - refs.add(default.path) - elif isinstance(default, Sdf.AssetPathArray): - for ap in default: - if ap.path and _is_downloadable_asset(ap.path): - refs.add(ap.path) - - # Variants - each variant set can have multiple variants with their own prim content - for variant_set_spec in prim_spec.variantSets.values(): - for variant_spec in variant_set_spec.variants.values(): - variant_prim_spec = variant_spec.primSpec - if variant_prim_spec is not None: - _walk_prim(variant_prim_spec) - - for child in prim_spec.nameChildren.values(): - _walk_prim(child) - - for root_prim in layer.rootPrims.values(): - _walk_prim(root_prim) - - return refs - - -def _resolve_reference_url(base_url: str, ref: str) -> str: - """Resolve a USD asset reference against a base URL (http/local).""" - ref = ref.strip() - if not ref: - return ref - - parsed_ref = urlparse(ref) - if parsed_ref.scheme: - return ref - - base = urlparse(base_url) - if base.scheme == "": - base_dir = os.path.dirname(base_url) - return os.path.normpath(os.path.join(base_dir, ref)) - - base_dir = posixpath.dirname(base.path) - if ref.startswith("/"): - new_path = posixpath.normpath(ref) - else: - new_path = posixpath.normpath(posixpath.join(base_dir, ref)) - return f"{base.scheme}://{base.netloc}{new_path}" + result, _ = await asyncio.wait_for(client.stat_async(usd_path), timeout=timeout) + return result == client.Result.OK + except asyncio.TimeoutError: + logger.warning(f"Timed out after {timeout}s while checking for USD: {usd_path}") + return False + except Exception as ex: + logger.warning(f"Exception during USD file check: {type(ex).__name__}: {ex}") + return False diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py new file mode 100644 index 00000000000..1d5abc19e3c --- /dev/null +++ b/source/isaaclab/isaaclab/utils/client.py @@ -0,0 +1,576 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + + +"""Lightweight omni.client-like helpers for local/HTTP/S3 access. +This module implements a subset of omni.client behaviors used inside Isaac Lab: +path normalization, stat/read helpers, USD reference resolution, and simple +copy/download utilities. It supports local files, HTTP(S), and S3 paths and +provides a small Result enum for consistent status reporting. +""" + +import asyncio +import logging +import os +import posixpath +from collections.abc import Callable +from enum import Enum, IntEnum +from pathlib import Path +from typing import Any, NamedTuple +from urllib.error import HTTPError, URLError +from urllib.parse import urlparse +from urllib.request import Request, urlopen + +import boto3 +from botocore.exceptions import ClientError +from pxr import Sdf + +logger = logging.getLogger(__name__) + + +_s3 = None + +OMNI_S3_HOST_PREFIX = "omniverse-content-production.s3-" + +USD_EXTENSIONS = {".usd", ".usda", ".usdc"} + +_DOWNLOADABLE_EXTS = {".usd", ".usda", ".usdz", ".usdc", ".png", ".jpg", ".jpeg", ".exr", ".hdr", ".tif", ".tiff"} + + +class Result(IntEnum): + OK = 0 + ERROR_NOT_FOUND = 1 + ERROR_PERMISSION_DENIED = 2 + ERROR_NETWORK = 3 + ERROR_UNKNOWN = 4 + + +class CopyBehavior(Enum): + OVERWRITE = "overwrite" + SKIP = "skip" + + +class UrlParts(NamedTuple): + scheme: str + authority: str + path: str + + +def break_url(url: str) -> UrlParts: + """Parse a URL into (scheme, authority, path) with empty parts when missing.""" + parsed = urlparse(url) + return UrlParts(parsed.scheme or "", parsed.netloc or "", parsed.path or "") + + +def _get_s3_client(): + """Return a cached boto3 S3 client.""" + global _s3 + if _s3 is None: + _s3 = boto3.client("s3") + return _s3 + + +def _is_s3_url(path: str) -> bool: + """Return True if the path uses the ``s3://`` scheme.""" + return path.startswith("s3://") + + +def _is_http_url(path: str) -> bool: + """Return True if the path uses HTTP or HTTPS.""" + scheme = urlparse(path).scheme.lower() + return scheme in ("http", "https") + + +def _is_local_path(path: str) -> bool: + """Return True if the path has no URL scheme (treated as local).""" + # Strong assumption: anything without a scheme is local + return urlparse(path).scheme == "" + + +def _split_s3_url(path: str) -> tuple[str, str]: + """Split an S3 URL into ``(bucket, key)`` or raise on invalid input.""" + parsed = urlparse(path) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + if not bucket or not key: + raise ValueError(f"Invalid S3 URL: {path}") + return bucket, key + + +def _normalize_url(url: str) -> str: + """Convert omniverse S3 URLs to HTTPS; leave others as-is.""" + if not _is_s3_url(url): + return url + parsed = urlparse(url) + if parsed.netloc.startswith(OMNI_S3_HOST_PREFIX): + return f"https://{parsed.netloc}{parsed.path}" + return url + + +def _map_remote_to_local(download_root: str, url: str) -> str: + """Mirror remote path structure under download_root.""" + parsed = urlparse(url) + key = parsed.path.lstrip("/") + local_path = os.path.join(download_root, key) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + return local_path + + +def _resolve_reference_url(base_url: str, ref: str) -> str: + """Resolve a USD asset reference against a base URL (http/s3/local).""" + ref = ref.strip() + if not ref: + return ref + + parsed_ref = urlparse(ref) + if parsed_ref.scheme: + # Already absolute (http://, https://, s3://, etc.) + return ref + + base = urlparse(base_url) + if base.scheme == "": + # Local base + base_dir = os.path.dirname(base_url) + return os.path.normpath(os.path.join(base_dir, ref)) + + # Remote base + base_dir = posixpath.dirname(base.path) + if ref.startswith("/"): + new_path = posixpath.normpath(ref) + else: + new_path = posixpath.normpath(posixpath.join(base_dir, ref)) + return f"{base.scheme}://{base.netloc}{new_path}" + + +# stat / read_file + + +def stat(path: str) -> tuple[Result, dict[str, Any] | None]: + """Check whether a remote or local file exists and return basic metadata. + Args: + path: Local path or remote URL (HTTP/S3). + Returns: + Tuple of (:class:`Result`, info dict or None). On success, ``info`` may contain + ``size``, ``etag``, ``last_modified``, and ``content_type`` when available. + The :class:`Result` code is one of ``OK``, ``ERROR_NOT_FOUND``, + ``ERROR_PERMISSION_DENIED``, ``ERROR_NETWORK``, or ``ERROR_UNKNOWN``. + """ + url = _normalize_url(path) + + # HTTP(S) + if _is_http_url(url): + try: + req = Request(url, method="HEAD") + with urlopen(req) as resp: + size_header = resp.headers.get("Content-Length") + info = { + "size": int(size_header) if size_header is not None else None, + "etag": resp.headers.get("ETag"), + "last_modified": resp.headers.get("Last-Modified"), + "content_type": resp.headers.get("Content-Type"), + } + return Result.OK, info + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND, None + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED, None + logger.warning("HTTP error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + except URLError as exc: + logger.warning("Network error in stat(%s): %s", url, exc) + return Result.ERROR_NETWORK, None + except Exception as exc: + logger.warning("Unexpected error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + + # S3 (non-omniverse) + if _is_s3_url(url): + bucket, key = _split_s3_url(url) + s3 = _get_s3_client() + try: + response = s3.head_object(Bucket=bucket, Key=key) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND, None + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED, None + logger.warning("Error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + + info = { + "size": response.get("ContentLength"), + "etag": response.get("ETag"), + "last_modified": response.get("LastModified"), + "content_type": response.get("ContentType"), + } + return Result.OK, info + + # Local + if _is_local_path(url) and os.path.exists(url): + try: + size = os.path.getsize(url) + except OSError: + size = None + info = { + "size": size, + "etag": None, + "last_modified": None, + "content_type": None, + } + return Result.OK, info + + return Result.ERROR_NOT_FOUND, None + + +async def stat_async(path: str) -> tuple[Result, dict[str, Any] | None]: + """Async wrapper for :func:`stat`.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, stat, path) + + +def read_file(path: str) -> tuple[Result, dict[str, Any], memoryview]: + """Read file content from HTTP(S), S3, or local.""" + url = _normalize_url(path) + + # HTTP(S) + if _is_http_url(url): + try: + with urlopen(url) as resp: + data_bytes = resp.read() + meta = { + "size": len(data_bytes), + "content_type": resp.headers.get("Content-Type"), + } + return Result.OK, meta, memoryview(data_bytes) + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED, {}, memoryview(b"") + logger.warning("HTTP error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + except URLError as exc: + logger.warning("Network error in read_file(%s): %s", url, exc) + return Result.ERROR_NETWORK, {}, memoryview(b"") + except Exception as exc: + logger.warning("Unexpected error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + + # S3 + if _is_s3_url(url): + bucket, key = _split_s3_url(url) + s3 = _get_s3_client() + try: + obj = s3.get_object(Bucket=bucket, Key=key) + data_bytes = obj["Body"].read() + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED, {}, memoryview(b"") + logger.warning("Error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + + meta = { + "size": len(data_bytes), + "content_type": obj.get("ContentType"), + } + return Result.OK, meta, memoryview(data_bytes) + + # Local + if _is_local_path(url) and os.path.isfile(url): + with open(url, "rb") as f: + data_bytes = f.read() + meta = {"size": len(data_bytes), "content_type": None} + return Result.OK, meta, memoryview(data_bytes) + + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + + +async def read_file_async(path: str) -> tuple[Result, dict[str, Any], memoryview]: + """Async wrapper for :func:`read_file`.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, read_file, path) + + +# copy + + +def copy( + src: str, + dst: str, + behavior: CopyBehavior = CopyBehavior.OVERWRITE, + progress_callback: Callable[[int, int | None, str], None] | None = None, + chunk_size: int = 8 * 1024 * 1024, +) -> Result: + """Copy between local and remote (HTTP/S3) locations. + Supported directions: + * HTTP/HTTPS → local + * S3 → local + Args: + src: Source path or URL. + dst: Destination path or URL. + behavior: Overwrite policy for local targets. + progress_callback: Optional ``cb(done_bytes, total_bytes_or_None, src)``. + chunk_size: Chunk size for streamed copies. + Returns: + Result enum indicating success or failure reason. + """ + src = _normalize_url(src) + + if os.path.exists(dst) and behavior == CopyBehavior.SKIP: + return Result.OK + + # HTTP(S) -> local + if _is_http_url(src) and _is_local_path(dst): + os.makedirs(os.path.dirname(dst) or ".", exist_ok=True) + try: + with urlopen(src) as resp: + size_header = resp.headers.get("Content-Length") + total_size = int(size_header) if size_header is not None else None + + transferred = 0 + with open(dst, "wb") as f: + while True: + chunk = resp.read(chunk_size) + if not chunk: + break + f.write(chunk) + transferred += len(chunk) + if progress_callback: + progress_callback(transferred, total_size, src) + return Result.OK + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED + logger.warning("HTTP error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_UNKNOWN + except URLError as exc: + logger.warning("Network error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_NETWORK + except Exception as exc: + logger.warning("Unexpected error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + # S3 -> local + if _is_s3_url(src) and _is_local_path(dst): + bucket, key = _split_s3_url(src) + s3 = _get_s3_client() + os.makedirs(os.path.dirname(dst) or ".", exist_ok=True) + try: + head = s3.head_object(Bucket=bucket, Key=key) + total_size = head.get("ContentLength") + obj = s3.get_object(Bucket=bucket, Key=key) + body = obj["Body"] + + transferred = 0 + with open(dst, "wb") as f: + while True: + chunk = body.read(chunk_size) + if not chunk: + break + f.write(chunk) + transferred += len(chunk) + if progress_callback: + progress_callback(transferred, total_size, src) + return Result.OK + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED + logger.warning("Error copying S3->local (%s -> %s): %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + logger.error("Copy combination not supported: %s -> %s", src, dst) + return Result.ERROR_UNKNOWN + + +async def copy_async( + src: str, + dst: str, + behavior: CopyBehavior = CopyBehavior.OVERWRITE, + progress_callback: Callable[[int, int | None, str], None] | None = None, + chunk_size: int = 8 * 1024 * 1024, +) -> Result: + """Async wrapper for :func:`copy` with the same arguments.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, copy, src, dst, behavior, progress_callback, chunk_size) + + +# USD dependency resolution + + +def _is_downloadable_asset(path: str) -> bool: + """Return True for USD or other asset types we mirror locally (textures, etc.).""" + clean = path.split("?", 1)[0].split("#", 1)[0] + suffix = Path(clean).suffix.lower() + + if suffix == ".mdl": + # MDL modules (OmniPBR.mdl, OmniSurface.mdl, ...) come from MDL search paths + return False + if not suffix: + return False + if suffix not in _DOWNLOADABLE_EXTS: + return False + return True + + +def _find_usd_references(local_usd_path: str) -> set[str]: + """Use Sdf API to collect referenced assets from a USD layer.""" + try: + layer = Sdf.Layer.FindOrOpen(local_usd_path) + except Exception: + logger.warning("Failed to open USD layer: %s", local_usd_path, exc_info=True) + return set() + + if layer is None: + return set() + + refs: set[str] = set() + + # Sublayers + for sub_path in getattr(layer, "subLayerPaths", []) or []: + if sub_path and _is_downloadable_asset(sub_path): + refs.add(str(sub_path)) + + def _walk_prim(prim_spec: Sdf.PrimSpec) -> None: + # References + ref_list = prim_spec.referenceList + for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): + items = getattr(ref_list, field, None) + if not items: + continue + for ref in items: + asset_path = getattr(ref, "assetPath", None) + if asset_path and _is_downloadable_asset(asset_path): + refs.add(str(asset_path)) + + # Payloads + payload_list = prim_spec.payloadList + for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): + items = getattr(payload_list, field, None) + if not items: + continue + for payload in items: + asset_path = getattr(payload, "assetPath", None) + if asset_path and _is_downloadable_asset(asset_path): + refs.add(str(asset_path)) + + # AssetPath-valued attributes (this is where OmniPBR.mdl, textures, etc. show up) + for attr_spec in prim_spec.attributes.values(): + default = attr_spec.default + if isinstance(default, Sdf.AssetPath): + if default.path and _is_downloadable_asset(default.path): + refs.add(default.path) + elif isinstance(default, Sdf.AssetPathArray): + for ap in default: + if ap.path and _is_downloadable_asset(ap.path): + refs.add(ap.path) + + # Variants - each variant set can have multiple variants with their own prim content + for variant_set_spec in prim_spec.variantSets.values(): + for variant_spec in variant_set_spec.variants.values(): + variant_prim_spec = variant_spec.primSpec + if variant_prim_spec is not None: + _walk_prim(variant_prim_spec) + + for child in prim_spec.nameChildren.values(): + _walk_prim(child) + + for root_prim in layer.rootPrims.values(): + _walk_prim(root_prim) + + return refs + + +async def download_usd_with_references( + root_usd_s3_url: str, + download_root: str, + force_overwrite: bool = True, + progress_callback: Callable[[int, int | None, str], None] | None = None, +) -> dict[str, str]: + """Download a USD and all referenced assets to a local mirror. + Traverses the USD dependency graph, downloading each referenced asset (USD, textures, etc.) + into ``download_root`` while preserving the relative directory structure. Returns a mapping + from normalized remote URLs to their local file paths. + Args: + root_usd_s3_url: Root USD URL (S3/HTTP). + download_root: Local root directory to mirror into. + force_overwrite: If True, overwrite existing files; otherwise skip. + progress_callback: Optional ``cb(done_bytes, total_bytes_or_None, src)``. + Returns: + Dict mapping normalized remote URLs to local paths. + """ + os.makedirs(download_root, exist_ok=True) + + root_url = _normalize_url(root_usd_s3_url) + to_visit = [root_url] + visited: set[str] = set() + mapping: dict[str, str] = {} + + while to_visit: + current_url = _normalize_url(to_visit.pop()) + if current_url in visited: + continue + visited.add(current_url) + + local_path = _map_remote_to_local(download_root, current_url) + mapping[current_url] = local_path + + behavior = CopyBehavior.OVERWRITE if force_overwrite else CopyBehavior.SKIP + logger.debug("Downloading asset %s -> %s", current_url, local_path) + res = await copy_async(current_url, local_path, behavior=behavior, progress_callback=progress_callback) + if res != Result.OK: + logger.warning("Failed to download %s (Result=%s)", current_url, res) + continue + + if Path(local_path).suffix.lower() in USD_EXTENSIONS: + for ref in _find_usd_references(local_path): + dep_url = _resolve_reference_url(current_url, ref) + dep_url = _normalize_url(dep_url) + if dep_url and dep_url not in visited: + to_visit.append(dep_url) + + return mapping + + +def download_usd_with_references_sync( + root_url: str, + download_root: str, + force_overwrite: bool = True, + progress_callback: Callable[[int, int | None, str], None] | None = None, +) -> dict[str, str]: + """Synchronous wrapper for :func:`download_usd_with_references`. Safe for IsaacLab scripts + + NOT safe to call from inside a running event loop (e.g. Isaac Sim / Kit). + In that case, call `await download_usd_with_references(...)` directly. + """ + # If there's a running loop (Kit / Jupyter / etc.), don't try to block it. + try: + asyncio.get_running_loop() + except RuntimeError: + # No running loop → safe to own one; asyncio.run handles creation/cleanup. + return asyncio.run( + download_usd_with_references( + root_url, + download_root, + force_overwrite=force_overwrite, + progress_callback=progress_callback, + ) + ) + else: + # Already inside an event loop: this wrapper must not be used. + raise RuntimeError( + "download_usd_with_references_sync() was called while an event loop is running.\n" + "Use `await download_usd_with_references(...)` or schedule it with " + "`asyncio.create_task` instead of calling the sync wrapper." + ) diff --git a/source/isaaclab/setup.py b/source/isaaclab/setup.py index 42fc27b3aea..875b79ead39 100644 --- a/source/isaaclab/setup.py +++ b/source/isaaclab/setup.py @@ -26,6 +26,9 @@ "toml", "fast_simplification", "tqdm==4.67.1", # previous version was causing sys errors + # asset management + "boto3", + "botocore", # devices "hidapi==0.14.0.post2", # reinforcement learning