From 65c3b4b5ad9daac122408236abc6853a9d87ea71 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:18:45 -0800 Subject: [PATCH 01/11] replaces omni.client with isaaclab implementation of client --- scripts/tools/pretrained_checkpoint.py | 5 +- .../03_envs/policy_inference_in_usd.py | 5 +- source/isaaclab/config/extension.toml | 2 +- source/isaaclab/docs/CHANGELOG.rst | 10 + source/isaaclab/isaaclab/sim/utils/nucleus.py | 9 +- source/isaaclab/isaaclab/utils/assets.py | 128 ++-- source/isaaclab/isaaclab/utils/client.py | 622 ++++++++++++++++++ 7 files changed, 715 insertions(+), 66 deletions(-) create mode 100644 source/isaaclab/isaaclab/utils/client.py diff --git a/scripts/tools/pretrained_checkpoint.py b/scripts/tools/pretrained_checkpoint.py index a62514eedd9..507ae4360e2 100644 --- a/scripts/tools/pretrained_checkpoint.py +++ b/scripts/tools/pretrained_checkpoint.py @@ -91,9 +91,6 @@ import subprocess import sys -import omni.client -from omni.client._omniclient import CopyBehavior - from isaaclab.utils.pretrained_checkpoint import ( WORKFLOW_EXPERIMENT_NAME_VARIABLE, WORKFLOW_PLAYER, @@ -259,6 +256,8 @@ def publish_pretrained_checkpoint(workflow, task_name, force_publish=False): task_name: The task name. force_publish: Publish without review. """ + import omni.client + from omni.client._omniclient import CopyBehavior # This workflow task pair hasn't been trained if not has_pretrained_checkpoint_job_run(workflow, task_name): diff --git a/scripts/tutorials/03_envs/policy_inference_in_usd.py b/scripts/tutorials/03_envs/policy_inference_in_usd.py index fcef884d9c9..5ebe828d866 100644 --- a/scripts/tutorials/03_envs/policy_inference_in_usd.py +++ b/scripts/tutorials/03_envs/policy_inference_in_usd.py @@ -41,10 +41,9 @@ import os import torch -import omni - from isaaclab.envs import ManagerBasedRLEnv from isaaclab.terrains import TerrainImporterCfg +from isaaclab.utils import client from isaaclab.utils.assets import ISAAC_NUCLEUS_DIR from isaaclab_tasks.manager_based.locomotion.velocity.config.h1.rough_env_cfg import H1RoughEnvCfg_PLAY @@ -54,7 +53,7 @@ def main(): """Main function.""" # load the trained jit policy policy_path = os.path.abspath(args_cli.checkpoint) - file_content = omni.client.read_file(policy_path)[2] + file_content = client.read_file(policy_path)[2] file = io.BytesIO(memoryview(file_content).tobytes()) policy = torch.jit.load(file, map_location=args_cli.device) diff --git a/source/isaaclab/config/extension.toml b/source/isaaclab/config/extension.toml index 623798e931e..10d03fbad51 100644 --- a/source/isaaclab/config/extension.toml +++ b/source/isaaclab/config/extension.toml @@ -1,7 +1,7 @@ [package] # Note: Semantic Versioning is used: https://semver.org/ -version = "0.49.0" +version = "0.49.1" # Description title = "Isaac Lab framework for Robot Learning" diff --git a/source/isaaclab/docs/CHANGELOG.rst b/source/isaaclab/docs/CHANGELOG.rst index ef1f2755aa6..9ca0cbc34bd 100644 --- a/source/isaaclab/docs/CHANGELOG.rst +++ b/source/isaaclab/docs/CHANGELOG.rst @@ -1,6 +1,16 @@ Changelog --------- +0.49.1 (2025-12-02) +~~~~~~~~~~~~~~~~~~~ + +Fixed +^^^^^ + +* Replaced omni.client with light weight implementation of isaaclab.utils.client. + + + 0.49.0 (2025-11-10) ~~~~~~~~~~~~~~~~~~~ diff --git a/source/isaaclab/isaaclab/sim/utils/nucleus.py b/source/isaaclab/isaaclab/sim/utils/nucleus.py index cb7af95e555..8957f9d59a3 100644 --- a/source/isaaclab/isaaclab/sim/utils/nucleus.py +++ b/source/isaaclab/isaaclab/sim/utils/nucleus.py @@ -6,8 +6,8 @@ import logging import carb -import omni.client -from omni.client import Result + +from isaaclab.utils import client logger = logging.getLogger(__name__) @@ -28,9 +28,8 @@ def check_server(server: str, path: str, timeout: float = 10.0) -> bool: """ logger.info(f"Checking path: {server}{path}") # Increase hang detection timeout - omni.client.set_hang_detection_time_ms(20000) - result, _ = omni.client.stat(f"{server}{path}") - if result == Result.OK: + result, _ = client.stat(f"{server}{path}") + if result == client.Result.OK: logger.info(f"Success: {server}{path}") return True else: diff --git a/source/isaaclab/isaaclab/utils/assets.py b/source/isaaclab/isaaclab/utils/assets.py index 353767c0310..92e4369e9b3 100644 --- a/source/isaaclab/isaaclab/utils/assets.py +++ b/source/isaaclab/isaaclab/utils/assets.py @@ -5,12 +5,8 @@ """Sub-module that defines the host-server where assets and resources are stored. -By default, we use the Isaac Sim Nucleus Server for hosting assets and resources. This makes +By default, we use S3 or other cloud storage for hosting assets and resources. This makes distribution of the assets easier and makes the repository smaller in size code-wise. - -For more information, please check information on `Omniverse Nucleus`_. - -.. _Omniverse Nucleus: https://docs.omniverse.nvidia.com/nucleus/latest/overview/overview.html """ import asyncio @@ -20,15 +16,14 @@ import tempfile import time from typing import Literal +from urllib.parse import urlparse -import carb -import omni.client +from . import client -# import logger logger = logging.getLogger(__name__) -NUCLEUS_ASSET_ROOT_DIR = carb.settings.get_settings().get("/persistent/isaac/asset_root/cloud") -"""Path to the root directory on the Nucleus Server.""" +NUCLEUS_ASSET_ROOT_DIR = "https://omniverse-content-production.s3-us-west-2.amazonaws.com/Assets/Isaac/5.0" +"""Path to the root directory on the cloud storage.""" NVIDIA_NUCLEUS_DIR = f"{NUCLEUS_ASSET_ROOT_DIR}/NVIDIA" """Path to the root directory on the NVIDIA Nucleus Server.""" @@ -40,8 +35,13 @@ """Path to the ``Isaac/IsaacLab`` directory on the NVIDIA Nucleus Server.""" +def _is_usd_path(path: str) -> bool: + ext = os.path.splitext(urlparse(path).path)[1].lower() + return ext in client.USD_EXTENSIONS + + def check_file_path(path: str) -> Literal[0, 1, 2]: - """Checks if a file exists on the Nucleus Server or locally. + """Checks if a file exists on cloud storage or locally. Args: path: The path to the file. @@ -51,74 +51,94 @@ def check_file_path(path: str) -> Literal[0, 1, 2]: * :obj:`0` if the file does not exist * :obj:`1` if the file exists locally - * :obj:`2` if the file exists on the Nucleus Server + * :obj:`2` if the file exists on cloud storage (S3 or HTTP/HTTPS) """ if os.path.isfile(path): return 1 - # we need to convert backslash to forward slash on Windows for omni.client API - elif omni.client.stat(path.replace(os.sep, "/"))[0] == omni.client.Result.OK: + # we need to convert backslash to forward slash on Windows for client API + elif client.stat(path.replace(os.sep, "/"))[0] == client.Result.OK: return 2 else: return 0 -def retrieve_file_path(path: str, download_dir: str | None = None, force_download: bool = True) -> str: - """Retrieves the path to a file on the Nucleus Server or locally. +def retrieve_file_path( + path: str, + download_dir: str | None = None, + force_download: bool = True, +) -> str: + """Resolve a path to a local file, downloading from Nucleus/HTTP/S3 if needed. - If the file exists locally, then the absolute path to the file is returned. - If the file exists on the Nucleus Server, then the file is downloaded to the local machine - and the absolute path to the file is returned. + Behavior: + * Local file returns its absolute path. + * Remote USD pulls the USD and all referenced assets into ``download_dir`` and returns the + absolute path to the local root USD. + * Other remote files are copied once into ``download_dir`` and that local path is returned. Args: - path: The path to the file. - download_dir: The directory where the file should be downloaded. Defaults to None, in which - case the file is downloaded to the system's temporary directory. - force_download: Whether to force download the file from the Nucleus Server. This will overwrite - the local file if it exists. Defaults to True. - - Returns: - The path to the file on the local machine. + path: Local path or remote URL. + download_dir: Directory to place downloads. Defaults to ``tempfile.gettempdir()``. + force_download: If True, re-download even if the target already exists. Raises: - FileNotFoundError: When the file not found locally or on Nucleus Server. - RuntimeError: When the file cannot be copied from the Nucleus Server to the local machine. This - can happen when the file already exists locally and :attr:`force_download` is set to False. + FileNotFoundError: If the path is neither local nor reachable remotely. + + Returns: + Absolute path to the resolved local file. """ - # check file status - file_status = check_file_path(path) - if file_status == 1: + status = check_file_path(path) + + # Local file + if status == 1: return os.path.abspath(path) - elif file_status == 2: - # resolve download directory + + # Remote file + if status == 2: if download_dir is None: download_dir = tempfile.gettempdir() - else: - download_dir = os.path.abspath(download_dir) - # create download directory if it does not exist - if not os.path.exists(download_dir): - os.makedirs(download_dir) - # download file in temp directory using os - file_name = os.path.basename(omni.client.break_url(path.replace(os.sep, "/")).path) + download_dir = os.path.abspath(download_dir) + os.makedirs(download_dir, exist_ok=True) + + url = path.replace(os.sep, "/") + + # USD → USD + dependencies + if _is_usd_path(url): + mapping = client.download_usd_with_references_sync( + root_url=url, + download_root=download_dir, + force_overwrite=force_download, + progress_callback=lambda done, total, src: logger.debug( + " [%s] %d / %s bytes", src, done, "?" if total is None else str(total) + ), + ) + local_root = mapping.get(client._normalize_url(url)) + if local_root is None: + key = urlparse(url).path.lstrip("/") + local_root = os.path.join(download_dir, key) + return os.path.abspath(local_root) + + # Non-USD → single file download + file_name = os.path.basename(client.break_url(url).path) target_path = os.path.join(download_dir, file_name) - # check if file already exists locally + if not os.path.isfile(target_path) or force_download: - # copy file to local machine - result = omni.client.copy(path.replace(os.sep, "/"), target_path, omni.client.CopyBehavior.OVERWRITE) - if result != omni.client.Result.OK and force_download: - raise RuntimeError(f"Unable to copy file: '{path}'. Is the Nucleus Server running?") + result = client.copy(url, target_path, client.CopyBehavior.OVERWRITE) + if result != client.Result.OK and force_download: + raise RuntimeError(f"Unable to copy file: '{path}' from cloud storage.") return os.path.abspath(target_path) - else: - raise FileNotFoundError(f"Unable to find the file: {path}") + + # Not found anywhere + raise FileNotFoundError(f"Unable to find the file: {path}") def read_file(path: str) -> io.BytesIO: - """Reads a file from the Nucleus Server or locally. + """Reads a file from cloud storage or locally. Args: path: The path to the file. Raises: - FileNotFoundError: When the file not found locally or on Nucleus Server. + FileNotFoundError: When the file not found locally or on cloud storage. Returns: The content of the file. @@ -129,7 +149,7 @@ def read_file(path: str) -> io.BytesIO: with open(path, "rb") as f: return io.BytesIO(f.read()) elif file_status == 2: - file_content = omni.client.read_file(path.replace(os.sep, "/"))[2] + file_content = client.read_file(path.replace(os.sep, "/"))[2] return io.BytesIO(memoryview(file_content).tobytes()) else: raise FileNotFoundError(f"Unable to find the file: {path}") @@ -200,8 +220,8 @@ async def _is_usd_path_available(usd_path: str, timeout: float) -> bool: Whether the given USD path is available on the server. """ try: - result, _ = await asyncio.wait_for(omni.client.stat_async(usd_path), timeout=timeout) - return result == omni.client.Result.OK + result, _ = await asyncio.wait_for(client.stat_async(usd_path), timeout=timeout) + return result == client.Result.OK except asyncio.TimeoutError: logger.warning(f"Timed out after {timeout}s while checking for USD: {usd_path}") return False diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py new file mode 100644 index 00000000000..8d8c4a0dba7 --- /dev/null +++ b/source/isaaclab/isaaclab/utils/client.py @@ -0,0 +1,622 @@ +# Copyright (c) 2022-2025, The Isaac Lab Project Developers (https://github.com/isaac-sim/IsaacLab/blob/main/CONTRIBUTORS.md). +# All rights reserved. +# +# SPDX-License-Identifier: BSD-3-Clause + + +"""Lightweight omni.client-like helpers for local/HTTP/S3 access. + +This module implements a subset of omni.client behaviors used inside Isaac Lab: +path normalization, stat/read helpers, USD reference resolution, and simple +copy/download utilities. It supports local files, HTTP(S), and S3 paths and +provides a small Result enum for consistent status reporting. +""" + +import asyncio +import logging +import os +import posixpath +from pathlib import Path +from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Set +from enum import IntEnum, Enum +from urllib.parse import urlparse +from urllib.request import Request, urlopen +from urllib.error import HTTPError, URLError + +from pxr import Sdf +import boto3 +from botocore.exceptions import ClientError + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Basic types +# --------------------------------------------------------------------------- + +OMNI_S3_HOST_PREFIX = "omniverse-content-production.s3-" + + +class Result(IntEnum): + OK = 0 + ERROR_NOT_FOUND = 1 + ERROR_PERMISSION_DENIED = 2 + ERROR_NETWORK = 3 + ERROR_UNKNOWN = 4 + + +class CopyBehavior(Enum): + OVERWRITE = "overwrite" + SKIP = "skip" + + +class UrlParts(NamedTuple): + scheme: str + authority: str + path: str + + +def break_url(url: str) -> UrlParts: + """Parse a URL into (scheme, authority, path) with empty parts when missing.""" + parsed = urlparse(url) + return UrlParts(parsed.scheme or "", parsed.netloc or "", parsed.path or "") + + +_s3 = boto3.client("s3") + + +# --------------------------------------------------------------------------- +# URL helpers +# --------------------------------------------------------------------------- + +def _is_s3_url(path: str) -> bool: + """Return True if the path uses the ``s3://`` scheme.""" + return path.startswith("s3://") + + +def _is_http_url(path: str) -> bool: + """Return True if the path uses HTTP or HTTPS.""" + scheme = urlparse(path).scheme.lower() + return scheme in ("http", "https") + + +def _is_local_path(path: str) -> bool: + """Return True if the path has no URL scheme (treated as local).""" + # Strong assumption: anything without a scheme is local + return urlparse(path).scheme == "" + + +def _split_s3_url(path: str) -> Tuple[str, str]: + """Split an S3 URL into ``(bucket, key)`` or raise on invalid input.""" + parsed = urlparse(path) + bucket = parsed.netloc + key = parsed.path.lstrip("/") + if not bucket or not key: + raise ValueError(f"Invalid S3 URL: {path}") + return bucket, key + + +def _normalize_url(url: str) -> str: + """Convert omniverse S3 URLs to HTTPS; leave others as-is.""" + if not _is_s3_url(url): + return url + parsed = urlparse(url) + if parsed.netloc.startswith(OMNI_S3_HOST_PREFIX): + return f"https://{parsed.netloc}{parsed.path}" + return url + + +def _map_remote_to_local(download_root: str, url: str) -> str: + """Mirror remote path structure under download_root.""" + parsed = urlparse(url) + key = parsed.path.lstrip("/") + local_path = os.path.join(download_root, key) + os.makedirs(os.path.dirname(local_path), exist_ok=True) + return local_path + + +def _resolve_reference_url(base_url: str, ref: str) -> str: + """Resolve a USD asset reference against a base URL (http/s3/local).""" + ref = ref.strip() + if not ref: + return ref + + parsed_ref = urlparse(ref) + if parsed_ref.scheme: + # Already absolute (http://, https://, s3://, etc.) + return ref + + base = urlparse(base_url) + if base.scheme == "": + # Local base + base_dir = os.path.dirname(base_url) + return os.path.normpath(os.path.join(base_dir, ref)) + + # Remote base + base_dir = posixpath.dirname(base.path) + if ref.startswith("/"): + new_path = posixpath.normpath(ref) + else: + new_path = posixpath.normpath(posixpath.join(base_dir, ref)) + return f"{base.scheme}://{base.netloc}{new_path}" + + +# --------------------------------------------------------------------------- +# stat / read_file +# --------------------------------------------------------------------------- + +def stat(path: str) -> Tuple[Result, Optional[Dict[str, Any]]]: + """Check whether a remote or local file exists and return basic metadata. + + Args: + path: Local path or remote URL (HTTP/S3). + + Returns: + Tuple of (:class:`Result`, info dict or None). On success, ``info`` may contain + ``size``, ``etag``, ``last_modified``, and ``content_type`` when available. + The :class:`Result` code is one of ``OK``, ``ERROR_NOT_FOUND``, + ``ERROR_PERMISSION_DENIED``, ``ERROR_NETWORK``, or ``ERROR_UNKNOWN``. + """ + url = _normalize_url(path) + + # HTTP(S) + if _is_http_url(url): + try: + req = Request(url, method="HEAD") + with urlopen(req) as resp: + size_header = resp.headers.get("Content-Length") + info = { + "size": int(size_header) if size_header is not None else None, + "etag": resp.headers.get("ETag"), + "last_modified": resp.headers.get("Last-Modified"), + "content_type": resp.headers.get("Content-Type"), + } + return Result.OK, info + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND, None + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED, None + logger.warning("HTTP error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + except URLError as exc: + logger.warning("Network error in stat(%s): %s", url, exc) + return Result.ERROR_NETWORK, None + except Exception as exc: + logger.warning("Unexpected error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + + # S3 (non-omniverse) + if _is_s3_url(url): + bucket, key = _split_s3_url(url) + try: + response = _s3.head_object(Bucket=bucket, Key=key) + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND, None + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED, None + logger.warning("Error in stat(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, None + + info = { + "size": response.get("ContentLength"), + "etag": response.get("ETag"), + "last_modified": response.get("LastModified"), + "content_type": response.get("ContentType"), + } + return Result.OK, info + + # Local + if _is_local_path(url) and os.path.exists(url): + try: + size = os.path.getsize(url) + except OSError: + size = None + info = { + "size": size, + "etag": None, + "last_modified": None, + "content_type": None, + } + return Result.OK, info + + return Result.ERROR_NOT_FOUND, None + + +async def stat_async(path: str) -> Tuple[Result, Optional[Dict[str, Any]]]: + """Async wrapper for :func:`stat`.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, stat, path) + + +def read_file(path: str) -> Tuple[Result, Dict[str, Any], memoryview]: + """Read file content from HTTP(S), S3, or local.""" + url = _normalize_url(path) + + # HTTP(S) + if _is_http_url(url): + try: + with urlopen(url) as resp: + data_bytes = resp.read() + meta = { + "size": len(data_bytes), + "content_type": resp.headers.get("Content-Type"), + } + return Result.OK, meta, memoryview(data_bytes) + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED, {}, memoryview(b"") + logger.warning("HTTP error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + except URLError as exc: + logger.warning("Network error in read_file(%s): %s", url, exc) + return Result.ERROR_NETWORK, {}, memoryview(b"") + except Exception as exc: + logger.warning("Unexpected error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + + # S3 + if _is_s3_url(url): + bucket, key = _split_s3_url(url) + try: + obj = _s3.get_object(Bucket=bucket, Key=key) + data_bytes = obj["Body"].read() + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED, {}, memoryview(b"") + logger.warning("Error in read_file(%s): %s", url, exc) + return Result.ERROR_UNKNOWN, {}, memoryview(b"") + + meta = { + "size": len(data_bytes), + "content_type": obj.get("ContentType"), + } + return Result.OK, meta, memoryview(data_bytes) + + # Local + if _is_local_path(url) and os.path.isfile(url): + with open(url, "rb") as f: + data_bytes = f.read() + meta = {"size": len(data_bytes), "content_type": None} + return Result.OK, meta, memoryview(data_bytes) + + return Result.ERROR_NOT_FOUND, {}, memoryview(b"") + + +async def read_file_async(path: str) -> Tuple[Result, Dict[str, Any], memoryview]: + """Async wrapper for :func:`read_file`.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, read_file, path) + + +# --------------------------------------------------------------------------- +# copy +# --------------------------------------------------------------------------- + +def _report_progress( + bytes_amount: int, + src: str, + total_size: Optional[int], + cb: Optional[Callable[[int, Optional[int], str], None]], + *, + transferred_ref: list[int], +) -> None: + """Helper to accumulate transferred bytes and forward to a callback.""" + transferred_ref[0] += bytes_amount + if cb: + cb(transferred_ref[0], total_size, src) + + +def copy( + src: str, + dst: str, + behavior: CopyBehavior = CopyBehavior.OVERWRITE, + progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, + chunk_size: int = 8 * 1024 * 1024, +) -> Result: + """Copy between local and remote (HTTP/S3) locations. + + Supported directions: + * HTTP/HTTPS → local + * S3 → local + * Local → S3 + + Args: + src: Source path or URL. + dst: Destination path or URL. + behavior: Overwrite policy for local targets. + progress_callback: Optional ``cb(done_bytes, total_bytes_or_None, src)``. + chunk_size: Chunk size for streamed copies. + + Returns: + Result enum indicating success or failure reason. + """ + src = _normalize_url(src) + + if os.path.exists(dst) and behavior == CopyBehavior.SKIP: + return Result.OK + + # HTTP(S) -> local + if _is_http_url(src) and _is_local_path(dst): + os.makedirs(os.path.dirname(dst) or ".", exist_ok=True) + try: + with urlopen(src) as resp: + size_header = resp.headers.get("Content-Length") + total_size = int(size_header) if size_header is not None else None + + transferred = 0 + with open(dst, "wb") as f: + while True: + chunk = resp.read(chunk_size) + if not chunk: + break + f.write(chunk) + transferred += len(chunk) + if progress_callback: + progress_callback(transferred, total_size, src) + return Result.OK + except HTTPError as exc: + if exc.code == 404: + return Result.ERROR_NOT_FOUND + if exc.code == 403: + return Result.ERROR_PERMISSION_DENIED + logger.warning("HTTP error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_UNKNOWN + except URLError as exc: + logger.warning("Network error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_NETWORK + except Exception as exc: + logger.warning("Unexpected error copying %s -> %s: %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + # Local -> S3 + if _is_local_path(src) and _is_s3_url(dst): + bucket, key = _split_s3_url(dst) + try: + total_size = os.path.getsize(src) + transferred_ref = [0] + with open(src, "rb") as f: + _s3.upload_fileobj( + f, + Bucket=bucket, + Key=key, + Callback=lambda n: _report_progress( + n, + src, + total_size, + progress_callback, + transferred_ref=transferred_ref, + ), + ) + return Result.OK + except FileNotFoundError: + return Result.ERROR_NOT_FOUND + except ClientError as exc: + logger.warning("Error copying local->S3 (%s -> %s): %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + # S3 -> local + if _is_s3_url(src) and _is_local_path(dst): + bucket, key = _split_s3_url(src) + os.makedirs(os.path.dirname(dst) or ".", exist_ok=True) + try: + head = _s3.head_object(Bucket=bucket, Key=key) + total_size = head.get("ContentLength") + obj = _s3.get_object(Bucket=bucket, Key=key) + body = obj["Body"] + + transferred = 0 + with open(dst, "wb") as f: + while True: + chunk = body.read(chunk_size) + if not chunk: + break + f.write(chunk) + transferred += len(chunk) + if progress_callback: + progress_callback(transferred, total_size, src) + return Result.OK + except ClientError as exc: + code = exc.response.get("Error", {}).get("Code", "") + if code in ("404", "NoSuchKey", "NotFound"): + return Result.ERROR_NOT_FOUND + if code in ("AccessDenied", "403"): + return Result.ERROR_PERMISSION_DENIED + logger.warning("Error copying S3->local (%s -> %s): %s", src, dst, exc) + return Result.ERROR_UNKNOWN + + logger.error("Copy combination not supported: %s -> %s", src, dst) + return Result.ERROR_UNKNOWN + + +async def copy_async( + src: str, + dst: str, + behavior: CopyBehavior = CopyBehavior.OVERWRITE, + progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, + chunk_size: int = 8 * 1024 * 1024, +) -> Result: + """Async wrapper for :func:`copy` with the same arguments.""" + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + None, + copy, + src, + dst, + behavior, + progress_callback, + chunk_size, + ) + + +# --------------------------------------------------------------------------- +# USD dependency resolution +# --------------------------------------------------------------------------- + +USD_EXTENSIONS = {".usd", ".usda", ".usdc"} + + +_DOWNLOADABLE_EXTS = { + ".usd", ".usda", ".usdz", + ".png", ".jpg", ".jpeg", + ".exr", ".hdr", ".tif", ".tiff", +} + + +def _is_usd_file(path: str) -> bool: + """Return True if path ends with a USD extension.""" + return Path(path).suffix.lower() in USD_EXTENSIONS + + +def _is_downloadable_asset(path: str) -> bool: + """Return True for USD or other asset types we mirror locally (textures, etc.).""" + clean = path.split("?", 1)[0].split("#", 1)[0] + suffix = Path(clean).suffix.lower() + + if suffix == ".mdl": + # MDL modules (OmniPBR.mdl, OmniSurface.mdl, ...) come from MDL search paths + return False + if not suffix: + return False + if suffix not in _DOWNLOADABLE_EXTS: + return False + return True + + +def _find_usd_references(local_usd_path: str) -> Set[str]: + """Use Sdf API to collect referenced assets from a USD layer.""" + try: + layer = Sdf.Layer.FindOrOpen(local_usd_path) + except Exception: + logger.warning("Failed to open USD layer: %s", local_usd_path, exc_info=True) + return set() + + if layer is None: + return set() + + refs: Set[str] = set() + + # Sublayers + for sub_path in getattr(layer, "subLayerPaths", []) or []: + if sub_path and _is_downloadable_asset(sub_path): + refs.add(str(sub_path)) + + def _walk_prim(prim_spec: Sdf.PrimSpec) -> None: + # References + ref_list = prim_spec.referenceList + for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): + items = getattr(ref_list, field, None) + if not items: + continue + for ref in items: + asset_path = getattr(ref, "assetPath", None) + if asset_path and _is_downloadable_asset(asset_path): + refs.add(str(asset_path)) + + # Payloads + payload_list = prim_spec.payloadList + for field in ("addedItems", "prependedItems", "appendedItems", "explicitItems"): + items = getattr(payload_list, field, None) + if not items: + continue + for payload in items: + asset_path = getattr(payload, "assetPath", None) + if asset_path and _is_downloadable_asset(asset_path): + refs.add(str(asset_path)) + + # AssetPath-valued attributes (this is where OmniPBR.mdl, textures, etc. show up) + for attr_spec in prim_spec.attributes.values(): + default = attr_spec.default + if isinstance(default, Sdf.AssetPath): + if default.path and _is_downloadable_asset(default.path): + refs.add(default.path) + elif isinstance(default, Sdf.AssetPathArray): + for ap in default: + if ap.path and _is_downloadable_asset(ap.path): + refs.add(ap.path) + + for child in prim_spec.nameChildren.values(): + _walk_prim(child) + + for root_prim in layer.rootPrims.values(): + _walk_prim(root_prim) + + return refs + + +async def download_usd_with_references( + root_usd_s3_url: str, + download_root: str, + force_overwrite: bool = True, + progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, +) -> Dict[str, str]: + """Download a USD and all referenced assets to a local mirror. + + Traverses the USD dependency graph, downloading each referenced asset (USD, textures, etc.) + into ``download_root`` while preserving the relative directory structure. Returns a mapping + from normalized remote URLs to their local file paths. + + Args: + root_usd_s3_url: Root USD URL (S3/HTTP). + download_root: Local root directory to mirror into. + force_overwrite: If True, overwrite existing files; otherwise skip. + progress_callback: Optional ``cb(done_bytes, total_bytes_or_None, src)``. + + Returns: + Dict mapping normalized remote URLs to local paths. + """ + os.makedirs(download_root, exist_ok=True) + + root_url = _normalize_url(root_usd_s3_url) + to_visit = [root_url] + visited: Set[str] = set() + mapping: Dict[str, str] = {} + + while to_visit: + current_url = _normalize_url(to_visit.pop()) + if current_url in visited: + continue + visited.add(current_url) + + local_path = _map_remote_to_local(download_root, current_url) + mapping[current_url] = local_path + + behavior = CopyBehavior.OVERWRITE if force_overwrite else CopyBehavior.SKIP + logger.info("Downloading asset %s -> %s", current_url, local_path) + res = await copy_async(current_url, local_path, behavior=behavior, progress_callback=progress_callback) + if res != Result.OK: + logger.warning("Failed to download %s (Result=%s)", current_url, res) + continue + + if _is_usd_file(local_path): + for ref in _find_usd_references(local_path): + dep_url = _resolve_reference_url(current_url, ref) + dep_url = _normalize_url(dep_url) + if dep_url and dep_url not in visited: + to_visit.append(dep_url) + + return mapping + + +def download_usd_with_references_sync( + root_url: str, + download_root: str, + force_overwrite: bool = True, + progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, +) -> Dict[str, str]: + """Synchronous wrapper for :func:`download_usd_with_references`.""" + loop = asyncio.get_event_loop() + return loop.run_until_complete( + download_usd_with_references( + root_url, + download_root, + force_overwrite=force_overwrite, + progress_callback=progress_callback, + ) + ) From ad9873fbcfb50cf7a88d215652858dd0ea7112c7 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:22:34 -0800 Subject: [PATCH 02/11] pass precommit --- source/isaaclab/isaaclab/utils/client.py | 59 ++++++++++++++---------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index 8d8c4a0dba7..81be1bdbf9d 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -16,16 +16,17 @@ import logging import os import posixpath +from enum import Enum, IntEnum from pathlib import Path -from typing import Any, Callable, Dict, NamedTuple, Optional, Tuple, Set -from enum import IntEnum, Enum +from typing import Any, NamedTuple +from collections.abc import Callable +from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen -from urllib.error import HTTPError, URLError -from pxr import Sdf import boto3 from botocore.exceptions import ClientError +from pxr import Sdf logger = logging.getLogger(__name__) @@ -68,6 +69,7 @@ def break_url(url: str) -> UrlParts: # URL helpers # --------------------------------------------------------------------------- + def _is_s3_url(path: str) -> bool: """Return True if the path uses the ``s3://`` scheme.""" return path.startswith("s3://") @@ -85,7 +87,7 @@ def _is_local_path(path: str) -> bool: return urlparse(path).scheme == "" -def _split_s3_url(path: str) -> Tuple[str, str]: +def _split_s3_url(path: str) -> tuple[str, str]: """Split an S3 URL into ``(bucket, key)`` or raise on invalid input.""" parsed = urlparse(path) bucket = parsed.netloc @@ -144,7 +146,8 @@ def _resolve_reference_url(base_url: str, ref: str) -> str: # stat / read_file # --------------------------------------------------------------------------- -def stat(path: str) -> Tuple[Result, Optional[Dict[str, Any]]]: + +def stat(path: str) -> tuple[Result, dict[str, Any] | None]: """Check whether a remote or local file exists and return basic metadata. Args: @@ -224,13 +227,13 @@ def stat(path: str) -> Tuple[Result, Optional[Dict[str, Any]]]: return Result.ERROR_NOT_FOUND, None -async def stat_async(path: str) -> Tuple[Result, Optional[Dict[str, Any]]]: +async def stat_async(path: str) -> tuple[Result, dict[str, Any] | None]: """Async wrapper for :func:`stat`.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, stat, path) -def read_file(path: str) -> Tuple[Result, Dict[str, Any], memoryview]: +def read_file(path: str) -> tuple[Result, dict[str, Any], memoryview]: """Read file content from HTTP(S), S3, or local.""" url = _normalize_url(path) @@ -289,7 +292,7 @@ def read_file(path: str) -> Tuple[Result, Dict[str, Any], memoryview]: return Result.ERROR_NOT_FOUND, {}, memoryview(b"") -async def read_file_async(path: str) -> Tuple[Result, Dict[str, Any], memoryview]: +async def read_file_async(path: str) -> tuple[Result, dict[str, Any], memoryview]: """Async wrapper for :func:`read_file`.""" loop = asyncio.get_running_loop() return await loop.run_in_executor(None, read_file, path) @@ -299,11 +302,12 @@ async def read_file_async(path: str) -> Tuple[Result, Dict[str, Any], memoryview # copy # --------------------------------------------------------------------------- + def _report_progress( bytes_amount: int, src: str, - total_size: Optional[int], - cb: Optional[Callable[[int, Optional[int], str], None]], + total_size: int | None, + cb: Callable[[int, int | None, str], None] | None, *, transferred_ref: list[int], ) -> None: @@ -317,7 +321,7 @@ def copy( src: str, dst: str, behavior: CopyBehavior = CopyBehavior.OVERWRITE, - progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, + progress_callback: Callable[[int, int | None, str], None] | None = None, chunk_size: int = 8 * 1024 * 1024, ) -> Result: """Copy between local and remote (HTTP/S3) locations. @@ -439,7 +443,7 @@ async def copy_async( src: str, dst: str, behavior: CopyBehavior = CopyBehavior.OVERWRITE, - progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, + progress_callback: Callable[[int, int | None, str], None] | None = None, chunk_size: int = 8 * 1024 * 1024, ) -> Result: """Async wrapper for :func:`copy` with the same arguments.""" @@ -463,9 +467,16 @@ async def copy_async( _DOWNLOADABLE_EXTS = { - ".usd", ".usda", ".usdz", - ".png", ".jpg", ".jpeg", - ".exr", ".hdr", ".tif", ".tiff", + ".usd", + ".usda", + ".usdz", + ".png", + ".jpg", + ".jpeg", + ".exr", + ".hdr", + ".tif", + ".tiff", } @@ -489,7 +500,7 @@ def _is_downloadable_asset(path: str) -> bool: return True -def _find_usd_references(local_usd_path: str) -> Set[str]: +def _find_usd_references(local_usd_path: str) -> set[str]: """Use Sdf API to collect referenced assets from a USD layer.""" try: layer = Sdf.Layer.FindOrOpen(local_usd_path) @@ -500,7 +511,7 @@ def _find_usd_references(local_usd_path: str) -> Set[str]: if layer is None: return set() - refs: Set[str] = set() + refs: set[str] = set() # Sublayers for sub_path in getattr(layer, "subLayerPaths", []) or []: @@ -554,8 +565,8 @@ async def download_usd_with_references( root_usd_s3_url: str, download_root: str, force_overwrite: bool = True, - progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, -) -> Dict[str, str]: + progress_callback: Callable[[int, int | None, str], None] | None = None, +) -> dict[str, str]: """Download a USD and all referenced assets to a local mirror. Traverses the USD dependency graph, downloading each referenced asset (USD, textures, etc.) @@ -575,8 +586,8 @@ async def download_usd_with_references( root_url = _normalize_url(root_usd_s3_url) to_visit = [root_url] - visited: Set[str] = set() - mapping: Dict[str, str] = {} + visited: set[str] = set() + mapping: dict[str, str] = {} while to_visit: current_url = _normalize_url(to_visit.pop()) @@ -608,8 +619,8 @@ def download_usd_with_references_sync( root_url: str, download_root: str, force_overwrite: bool = True, - progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, -) -> Dict[str, str]: + progress_callback: Callable[[int, int | None, str], None] | None = None, +) -> dict[str, str]: """Synchronous wrapper for :func:`download_usd_with_references`.""" loop = asyncio.get_event_loop() return loop.run_until_complete( From f2dc3e73c358696e6c12ec3154ee0eeeff151214 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:24:11 -0800 Subject: [PATCH 03/11] remove path from local to s3 --- source/isaaclab/isaaclab/utils/client.py | 39 ++---------------------- 1 file changed, 2 insertions(+), 37 deletions(-) diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index 81be1bdbf9d..cd6f0f97e5a 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -16,10 +16,10 @@ import logging import os import posixpath +from collections.abc import Callable from enum import Enum, IntEnum from pathlib import Path from typing import Any, NamedTuple -from collections.abc import Callable from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen @@ -329,7 +329,6 @@ def copy( Supported directions: * HTTP/HTTPS → local * S3 → local - * Local → S3 Args: src: Source path or URL. @@ -379,32 +378,6 @@ def copy( logger.warning("Unexpected error copying %s -> %s: %s", src, dst, exc) return Result.ERROR_UNKNOWN - # Local -> S3 - if _is_local_path(src) and _is_s3_url(dst): - bucket, key = _split_s3_url(dst) - try: - total_size = os.path.getsize(src) - transferred_ref = [0] - with open(src, "rb") as f: - _s3.upload_fileobj( - f, - Bucket=bucket, - Key=key, - Callback=lambda n: _report_progress( - n, - src, - total_size, - progress_callback, - transferred_ref=transferred_ref, - ), - ) - return Result.OK - except FileNotFoundError: - return Result.ERROR_NOT_FOUND - except ClientError as exc: - logger.warning("Error copying local->S3 (%s -> %s): %s", src, dst, exc) - return Result.ERROR_UNKNOWN - # S3 -> local if _is_s3_url(src) and _is_local_path(dst): bucket, key = _split_s3_url(src) @@ -448,15 +421,7 @@ async def copy_async( ) -> Result: """Async wrapper for :func:`copy` with the same arguments.""" loop = asyncio.get_running_loop() - return await loop.run_in_executor( - None, - copy, - src, - dst, - behavior, - progress_callback, - chunk_size, - ) + return await loop.run_in_executor(None, copy, src, dst, behavior, progress_callback, chunk_size) # --------------------------------------------------------------------------- From 3ab6718cb87114e447ce421d5a55d4302a36cd34 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:27:12 -0800 Subject: [PATCH 04/11] add boto3 in docs mock out modules --- docs/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/conf.py b/docs/conf.py index 00d7af5ae59..de38fae615f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -141,6 +141,7 @@ # Mock out modules that are not available on RTD autodoc_mock_imports = [ + "boto3", "torch", "torchvision", "numpy", From 4028970ec9841290aeecae5496fb0860574de373 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:29:29 -0800 Subject: [PATCH 05/11] add botocore to conf.py --- docs/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/conf.py b/docs/conf.py index de38fae615f..759d2fb2fe6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -142,6 +142,7 @@ # Mock out modules that are not available on RTD autodoc_mock_imports = [ "boto3", + "botocore", "torch", "torchvision", "numpy", From 787665a25a065b9ef0773690720122bd5083c31d Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:36:00 -0800 Subject: [PATCH 06/11] address deprecated get_event_loop and update install dependencies --- source/isaaclab/isaaclab/utils/client.py | 15 +++++++-------- source/isaaclab/setup.py | 3 +++ 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index cd6f0f97e5a..309b83ce04a 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -587,12 +587,11 @@ def download_usd_with_references_sync( progress_callback: Callable[[int, int | None, str], None] | None = None, ) -> dict[str, str]: """Synchronous wrapper for :func:`download_usd_with_references`.""" - loop = asyncio.get_event_loop() - return loop.run_until_complete( - download_usd_with_references( - root_url, - download_root, - force_overwrite=force_overwrite, - progress_callback=progress_callback, + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete( + download_usd_with_references(root_url, download_root, force_overwrite, progress_callback=progress_callback) ) - ) + finally: + loop.close() diff --git a/source/isaaclab/setup.py b/source/isaaclab/setup.py index 75fe5b9a3e7..6c436272fb7 100644 --- a/source/isaaclab/setup.py +++ b/source/isaaclab/setup.py @@ -23,6 +23,9 @@ "onnx>=1.18.0", # 1.16.2 throws access violation on Windows "prettytable==3.3.0", "toml", + # asset management + "boto3", + "botocore", # devices "hidapi==0.14.0.post2", # reinforcement learning From 2fdd7d3452e2bf83fb302807dc6d76df5655c263 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:36:53 -0800 Subject: [PATCH 07/11] delete unused method --- source/isaaclab/isaaclab/utils/client.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index 309b83ce04a..0faee22aa75 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -302,21 +302,6 @@ async def read_file_async(path: str) -> tuple[Result, dict[str, Any], memoryview # copy # --------------------------------------------------------------------------- - -def _report_progress( - bytes_amount: int, - src: str, - total_size: int | None, - cb: Callable[[int, int | None, str], None] | None, - *, - transferred_ref: list[int], -) -> None: - """Helper to accumulate transferred bytes and forward to a callback.""" - transferred_ref[0] += bytes_amount - if cb: - cb(transferred_ref[0], total_size, src) - - def copy( src: str, dst: str, From 2933d36cf3a2abb2f95b88fe2933cecf7f4d72c9 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 17:37:22 -0800 Subject: [PATCH 08/11] pass precommit --- source/isaaclab/isaaclab/utils/client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index 0faee22aa75..845b28190b3 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -302,6 +302,7 @@ async def read_file_async(path: str) -> tuple[Result, dict[str, Any], memoryview # copy # --------------------------------------------------------------------------- + def copy( src: str, dst: str, From 93107214050f65eec61ce3c9e51dc689c139f73a Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 18:06:20 -0800 Subject: [PATCH 09/11] update from file to use our own client --- .../sim/spawners/from_files/from_files.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py b/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py index 79b5e5a0031..f0a7b14b217 100644 --- a/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py +++ b/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py @@ -19,9 +19,9 @@ from pxr import Semantics from isaaclab.sim import converters, schemas -from isaaclab.sim.utils import bind_physics_material, bind_visual_material, clone, select_usd_variants +from isaaclab.sim.utils import bind_physics_material, bind_visual_material, clone, select_usd_variants, from isaaclab.sim.utils.stage import get_current_stage, is_current_stage_in_memory -from isaaclab.utils.assets import check_usd_path_with_timeout +from isaaclab.utils.assets import check_file_path, retrieve_file_path if TYPE_CHECKING: from . import from_files_cfg @@ -257,16 +257,16 @@ def _spawn_from_usd_file( Raises: FileNotFoundError: If the USD file does not exist at the given path. """ - # check if usd path exists with periodic logging until timeout - if not check_usd_path_with_timeout(usd_path): - if "4.5" in usd_path: - usd_5_0_path = usd_path.replace("http", "https").replace("/4.5", "/5.0") - if not check_usd_path_with_timeout(usd_5_0_path): - raise FileNotFoundError(f"USD file not found at path at either: '{usd_path}' or '{usd_5_0_path}'.") - usd_path = usd_5_0_path - else: - raise FileNotFoundError(f"USD file not found at path at: '{usd_path}'.") - + # check file path exists (supports local paths, S3, HTTP/HTTPS URLs) + # check_file_path returns: 0 (not found), 1 (local), 2 (remote) + file_status = check_file_path(usd_path) + if file_status == 0: + raise FileNotFoundError(f"USD file not found at path: '{usd_path}'.") + + # Download remote files (S3, HTTP, HTTPS) to local cache + # This also downloads all USD dependencies to maintain references + if file_status == 2: + usd_path = retrieve_file_path(usd_path) # spawn asset if it doesn't exist. if not prim_utils.is_prim_path_valid(prim_path): # add prim as reference to stage From 1a09ffbc14fbe4c9d7d3e3ea22dd28be799d08ad Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 18:33:36 -0800 Subject: [PATCH 10/11] fixes download_usd_with_references_sync --- source/isaaclab/isaaclab/utils/client.py | 34 +++++++++++++++++------- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index 845b28190b3..6e0b20704c0 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -19,7 +19,7 @@ from collections.abc import Callable from enum import Enum, IntEnum from pathlib import Path -from typing import Any, NamedTuple +from typing import Any, NamedTuple, Optional from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen @@ -570,14 +570,30 @@ def download_usd_with_references_sync( root_url: str, download_root: str, force_overwrite: bool = True, - progress_callback: Callable[[int, int | None, str], None] | None = None, + progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, ) -> dict[str, str]: - """Synchronous wrapper for :func:`download_usd_with_references`.""" - loop = asyncio.new_event_loop() + """Synchronous wrapper for :func:`download_usd_with_references`. Safe for IsaacLab scripts + + NOT safe to call from inside a running event loop (e.g. Isaac Sim / Kit). + In that case, call `await download_usd_with_references(...)` directly. + """ + # If there's a running loop (Kit / Jupyter / etc.), don't try to block it. try: - asyncio.set_event_loop(loop) - return loop.run_until_complete( - download_usd_with_references(root_url, download_root, force_overwrite, progress_callback=progress_callback) + asyncio.get_running_loop() + except RuntimeError: + # No running loop → safe to own one; asyncio.run handles creation/cleanup. + return asyncio.run( + download_usd_with_references( + root_url, + download_root, + force_overwrite=force_overwrite, + progress_callback=progress_callback, + ) ) - finally: - loop.close() + else: + # Already inside an event loop: this wrapper must not be used. + raise RuntimeError( + "download_usd_with_references_sync() was called while an event loop is running.\n" + "Use `await download_usd_with_references(...)` or schedule it with " + "`asyncio.create_task` instead of calling the sync wrapper." + ) \ No newline at end of file From 1bb854b6c380da2ac386b4403553756c5771f8d9 Mon Sep 17 00:00:00 2001 From: Octi Zhang Date: Tue, 2 Dec 2025 18:38:19 -0800 Subject: [PATCH 11/11] pass precommit --- .../isaaclab/isaaclab/sim/spawners/from_files/from_files.py | 2 +- source/isaaclab/isaaclab/utils/client.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py b/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py index f0a7b14b217..e4c1c287524 100644 --- a/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py +++ b/source/isaaclab/isaaclab/sim/spawners/from_files/from_files.py @@ -19,7 +19,7 @@ from pxr import Semantics from isaaclab.sim import converters, schemas -from isaaclab.sim.utils import bind_physics_material, bind_visual_material, clone, select_usd_variants, +from isaaclab.sim.utils import bind_physics_material, bind_visual_material, clone, select_usd_variants from isaaclab.sim.utils.stage import get_current_stage, is_current_stage_in_memory from isaaclab.utils.assets import check_file_path, retrieve_file_path diff --git a/source/isaaclab/isaaclab/utils/client.py b/source/isaaclab/isaaclab/utils/client.py index 6e0b20704c0..a4188254de5 100644 --- a/source/isaaclab/isaaclab/utils/client.py +++ b/source/isaaclab/isaaclab/utils/client.py @@ -19,7 +19,7 @@ from collections.abc import Callable from enum import Enum, IntEnum from pathlib import Path -from typing import Any, NamedTuple, Optional +from typing import Any, NamedTuple from urllib.error import HTTPError, URLError from urllib.parse import urlparse from urllib.request import Request, urlopen @@ -570,7 +570,7 @@ def download_usd_with_references_sync( root_url: str, download_root: str, force_overwrite: bool = True, - progress_callback: Optional[Callable[[int, Optional[int], str], None]] = None, + progress_callback: Callable[[int, int | None, str], None] | None = None, ) -> dict[str, str]: """Synchronous wrapper for :func:`download_usd_with_references`. Safe for IsaacLab scripts @@ -596,4 +596,4 @@ def download_usd_with_references_sync( "download_usd_with_references_sync() was called while an event loop is running.\n" "Use `await download_usd_with_references(...)` or schedule it with " "`asyncio.create_task` instead of calling the sync wrapper." - ) \ No newline at end of file + )