diff --git a/deploy/Dockerfile.pytorch-worker b/deploy/Dockerfile.pytorch-worker new file mode 100644 index 0000000..1fdc88a --- /dev/null +++ b/deploy/Dockerfile.pytorch-worker @@ -0,0 +1,55 @@ +ARG DEBIAN_FRONTEND=noninteractive +ARG UBUNTU_VERSION=24.04 +ARG CUDA_VERSION=13.1.0 + +# syntax=docker/dockerfile:1.7 + +FROM nvidia/cuda:${CUDA_VERSION}-devel-ubuntu${UBUNTU_VERSION} AS lupine-build + +ARG DEBIAN_FRONTEND + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + build-essential \ + ca-certificates \ + cmake \ + g++ \ + gcc \ + libnghttp2-dev \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /opt/lupine-src +COPY . /opt/lupine-src + +RUN cmake -S /opt/lupine-src -B /opt/lupine-src/build \ + -DCMAKE_BUILD_TYPE=Release \ + -DCMAKE_LIBRARY_PATH=/usr/local/cuda/lib64/stubs \ + && cmake --build /opt/lupine-src/build --parallel --target lupine_driver lupine_nvml \ + && mkdir -p /opt/lupine/lib \ + && cp /opt/lupine-src/build/libcuda.so.1 /opt/lupine/lib/libcuda.so.1 \ + && cp /opt/lupine-src/build/libnvidia-ml.so.1 /opt/lupine/lib/libnvidia-ml.so.1 \ + && ln -sf libcuda.so.1 /opt/lupine/lib/libcuda.so \ + && ln -sf libnvidia-ml.so.1 /opt/lupine/lib/libnvidia-ml.so + +FROM ubuntu:${UBUNTU_VERSION} + +ARG DEBIAN_FRONTEND + +RUN apt-get update \ + && apt-get install -y --no-install-recommends \ + ca-certificates \ + libnghttp2-14 \ + python3 \ + python3-pip \ + && rm -rf /var/lib/apt/lists/* + +RUN --mount=type=cache,target=/root/.cache/pip \ + pip3 install --break-system-packages \ + --index-url https://download.pytorch.org/whl/cu130 \ + torch + +COPY --from=lupine-build /opt/lupine/lib /opt/lupine/lib + +ENV LUPINE_LIBCUDA=/opt/lupine/lib/libcuda.so.1 +ENV LUPINE_LIB=/opt/lupine/lib/libcuda.so.1 +ENV LD_LIBRARY_PATH=/opt/lupine/lib:${LD_LIBRARY_PATH} diff --git a/python/lupine/__init__.py b/python/lupine/__init__.py index d644db7..b19e12d 100644 --- a/python/lupine/__init__.py +++ b/python/lupine/__init__.py @@ -5,8 +5,6 @@ it. """ -from __future__ import annotations - import os import ctypes from collections.abc import Sequence @@ -250,6 +248,29 @@ def synchronize(index: int = 0) -> None: torch.cuda.synchronize(_cuda_device(index, require_available=False)) +def sidecar( + server: str | None = None, + *, + image: str | None = None, + runtime: str = "auto", + platform: str = "linux/arm64", + rosetta: bool = False, + env: dict[str, str] | None = None, +) -> Any: + """Create a session-scoped sidecar PyTorch worker frontend.""" + + from .sidecar import DEFAULT_IMAGE, sidecar as _sidecar + + return _sidecar( + server=server, + image=image or DEFAULT_IMAGE, + runtime=runtime, + platform=platform, + rosetta=rosetta, + env=env, + ) + + __all__ = [ "DEFAULT_PORT", "LupineError", @@ -261,6 +282,7 @@ def synchronize(index: int = 0) -> None: "devices", "is_available", "is_configured", + "sidecar", "servers", "synchronize", ] diff --git a/python/lupine/sidecar.py b/python/lupine/sidecar.py new file mode 100644 index 0000000..271598c --- /dev/null +++ b/python/lupine/sidecar.py @@ -0,0 +1,564 @@ +import atexit +import json +import os +import shutil +import subprocess +import sys +import textwrap +import threading +import types +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from typing import Any + +import torch + + +_BACKEND_NAME = "lupine" +_ACTIVE_SESSION: "SidecarSession | None" = None +_REGISTERED = False +DEFAULT_IMAGE = "lupine-pytorch-worker:cuda-13.1.0" + + +class SidecarError(RuntimeError): + """Raised when the sidecar PyTorch worker fails.""" + + +def _dtype_name(dtype: Any) -> str: + return str(dtype).removeprefix("torch.") + + +def _dtype_from_name(name: str) -> Any: + return getattr(torch, name) + + +def _normalize_device(device: Any = None) -> str: + if device is None: + return f"{_BACKEND_NAME}:0" + parsed = torch.device(device) + if parsed.type != _BACKEND_NAME: + raise SidecarError(f"expected {_BACKEND_NAME} device, got {parsed}") + return str(parsed) + + +def _contains_sidecar(value: Any) -> bool: + if isinstance(value, SidecarTensor): + return True + if isinstance(value, Mapping): + return any(_contains_sidecar(item) for item in value.values()) + if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)): + return any(_contains_sidecar(item) for item in value) + return False + + +def _contains_lupine_device(value: Any) -> bool: + if isinstance(value, torch.device): + return value.type == _BACKEND_NAME + if isinstance(value, Mapping): + return any(_contains_lupine_device(item) for item in value.values()) + if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)): + return any(_contains_lupine_device(item) for item in value) + return False + + +def _session_from(value: Any) -> "SidecarSession | None": + if isinstance(value, SidecarTensor): + return value._lupine_session + if isinstance(value, Mapping): + for item in value.values(): + session = _session_from(item) + if session is not None: + return session + if isinstance(value, Sequence) and not isinstance(value, (str, bytes, bytearray)): + for item in value: + session = _session_from(item) + if session is not None: + return session + return None + + +def _op_name(func: Any) -> dict[str, str]: + overload = func.__name__ + if "." in overload: + packet, overload = overload.split(".", 1) + else: + packet = func.overloadpacket.__name__ + overload = "default" + return {"packet": packet, "overload": overload} + + +def _system_running(output: str) -> bool: + try: + payload = json.loads(output) + except json.JSONDecodeError: + return "running" in output.lower() + return payload.get("status") == "running" + + +def _ensure_registered() -> None: + global _REGISTERED + if _REGISTERED: + return + + try: + torch.utils.rename_privateuse1_backend(_BACKEND_NAME) + except RuntimeError as exc: + if _BACKEND_NAME not in str(exc): + raise + + module = types.SimpleNamespace( + is_available=lambda: _ACTIVE_SESSION is not None, + device_count=lambda: 1 if _ACTIVE_SESSION is not None else 0, + current_device=lambda: 0, + _is_in_bad_fork=lambda: False, + manual_seed_all=lambda seed: None, + ) + try: + torch._register_device_module(_BACKEND_NAME, module) + except RuntimeError as exc: + if "already" not in str(exc): + raise + torch.utils.generate_methods_for_privateuse1_backend() + _REGISTERED = True + + +class SidecarTensor(torch.Tensor): + @staticmethod + def __new__( + cls, + *, + session: "SidecarSession", + handle: int, + shape: tuple[int, ...], + dtype: Any, + device: Any = None, + ) -> "SidecarTensor": + _ensure_registered() + return torch.Tensor._make_wrapper_subclass( + cls, + shape, + dtype=dtype, + device=torch.device(_normalize_device(device)), + layout=torch.strided, + requires_grad=False, + ) + + def __init__( + self, + *, + session: "SidecarSession", + handle: int, + shape: tuple[int, ...], + dtype: Any, + device: Any = None, + ) -> None: + self._lupine_session = session + self._lupine_handle = int(handle) + + def __repr__(self) -> str: + return ( + f"SidecarTensor(handle={self._lupine_handle}, " + f"shape={tuple(self.shape)}, dtype={self.dtype}, device={self.device})" + ) + + @classmethod + def __torch_dispatch__( + cls, + func: Any, + types: tuple[type, ...], + args: tuple[Any, ...] = (), + kwargs: dict[str, Any] | None = None, + ) -> Any: + kwargs = kwargs or {} + if func.overloadpacket.__name__ == "detach": + return args[0] + session = _session_from((args, kwargs)) + if session is None: + raise SidecarError(f"sidecar LUPINE dispatch could not find a session for {func}") + return session.forward(func, args, kwargs) + + +class SidecarDispatchMode(torch.utils._python_dispatch.TorchDispatchMode): + def __init__(self, session: "SidecarSession") -> None: + super().__init__() + self._session = session + + def __torch_dispatch__( + self, + func: Any, + types: tuple[type, ...], + args: tuple[Any, ...] = (), + kwargs: dict[str, Any] | None = None, + ) -> Any: + kwargs = kwargs or {} + if _contains_sidecar((args, kwargs)) or _contains_lupine_device(kwargs): + return self._session.forward(func, args, kwargs) + return func(*args, **kwargs) + + +_WORKER = r""" +import json +import sys +import traceback + +import torch + +objects = {} +next_handle = 1 + + +def store(tensor): + global next_handle + if tensor.device.type != "cuda": + return { + "type": "tensor_data", + "shape": list(tensor.shape), + "dtype": str(tensor.dtype).removeprefix("torch."), + "data": tensor.tolist(), + } + handle = next_handle + next_handle += 1 + objects[handle] = tensor + return { + "type": "tensor", + "handle": handle, + "shape": list(tensor.shape), + "dtype": str(tensor.dtype).removeprefix("torch."), + } + + +def decode(value): + if isinstance(value, list): + return [decode(item) for item in value] + if isinstance(value, dict) and "__tuple__" in value: + return tuple(decode(item) for item in value["__tuple__"]) + if isinstance(value, dict) and "__sidecar_tensor__" in value: + return objects[int(value["__sidecar_tensor__"])] + if isinstance(value, dict) and "__dtype__" in value: + return getattr(torch, value["__dtype__"]) + if isinstance(value, dict) and "__device__" in value: + return torch.device(value["__device__"]) + if isinstance(value, dict) and "__layout__" in value: + return getattr(torch, value["__layout__"]) + if isinstance(value, dict) and "__memory_format__" in value: + return getattr(torch, value["__memory_format__"]) + if isinstance(value, dict): + return {key: decode(item) for key, item in value.items()} + return value + + +def encode(value): + if isinstance(value, torch.Tensor): + return store(value) + if isinstance(value, torch.Size): + return {"type": "tuple", "items": list(value)} + if isinstance(value, tuple): + return {"type": "tuple", "items": [encode(item) for item in value]} + if isinstance(value, list): + return {"type": "list", "items": [encode(item) for item in value]} + if isinstance(value, dict): + return {"type": "dict", "items": {key: encode(item) for key, item in value.items()}} + return {"type": "value", "value": value} + + +def resolve(packet, overload): + overload_packet = getattr(torch.ops.aten, packet) + if overload == "default": + return overload_packet.default + return getattr(overload_packet, overload) + + +def release(value): + if isinstance(value, dict) and "__sidecar_tensor__" in value: + objects.pop(int(value["__sidecar_tensor__"]), None) + return + if isinstance(value, list): + for item in value: + release(item) + return + if isinstance(value, dict): + for item in value.values(): + release(item) + + +def handle(request): + op = request["op"] + if op == "ping": + return { + "torch": torch.__version__, + "cuda_available": torch.cuda.is_available(), + "device_count": torch.cuda.device_count(), + "gpu": torch.cuda.get_device_name(0) if torch.cuda.is_available() else None, + } + if op == "call": + func = resolve(request["packet"], request["overload"]) + args = decode(request.get("args", [])) + kwargs = decode(request.get("kwargs", {})) + return encode(func(*args, **kwargs)) + if op == "release": + release(request["value"]) + return True + raise RuntimeError(f"unknown op: {op}") + + +for line in sys.stdin: + try: + response = {"ok": True, "result": handle(json.loads(line))} + except Exception as exc: + response = { + "ok": False, + "error": str(exc), + "traceback": traceback.format_exc(), + } + print(json.dumps(response), flush=True) +""" + + +@dataclass +class ContainerRuntime: + """Launch a Linux worker with Apple's `container` CLI.""" + + image: str = DEFAULT_IMAGE + server: str | None = None + platform: str = "linux/arm64" + rosetta: bool = False + env: dict[str, str] = field(default_factory=dict) + + def _container(self) -> str: + if sys.platform != "darwin": + raise SidecarError("Apple container sidecars are only supported on macOS") + container = shutil.which("container") + if container is None: + raise SidecarError( + "Apple container CLI is not installed. Install Apple's `container` " + "runtime with `brew install --cask container`, or download the signed " + "installer from https://github.com/apple/container/releases. After " + "installing, run `container system start` once to initialize it." + ) + return container + + def _run(self, args: list[str]) -> subprocess.CompletedProcess[str]: + return subprocess.run( + [self._container(), *args], + capture_output=True, + text=True, + check=False, + ) + + def prepare(self) -> None: + status = self._run(["system", "status", "--format", "json"]) + if status.returncode != 0 or not _system_running(status.stdout): + started = self._run(["system", "start"]) + if started.returncode != 0: + raise SidecarError( + "Apple container services are not running and automatic startup failed:\n" + f"{(started.stderr or started.stdout).strip()}" + ) + + inspected = self._run(["image", "inspect", self.image]) + if inspected.returncode == 0: + return + + pulled = self._run( + ["image", "pull", "--progress", "none", "--platform", self.platform, self.image] + ) + if pulled.returncode != 0: + raise SidecarError( + f"LUPINE sidecar image {self.image!r} is not cached and automatic " + "pull failed:\n" + f"{(pulled.stderr or pulled.stdout).strip()}" + ) + + def command(self, script: str) -> list[str]: + container = self._container() + cmd = [ + container, + "run", + "--rm", + "--interactive", + "--progress", + "none", + "--platform", + self.platform, + ] + if self.rosetta: + cmd.append("--rosetta") + environment = dict(self.env) + if self.server: + environment["LUPINE_SERVER"] = self.server + for key, value in environment.items(): + cmd.extend(["--env", f"{key}={value}"]) + cmd.extend([self.image, "python3", "-u", "-c", script]) + return cmd + + +@dataclass +class SidecarSession: + """Session-scoped macOS frontend for a local Linux CUDA PyTorch sidecar.""" + + server: str + image: str = DEFAULT_IMAGE + runtime: str = "auto" + platform: str = "linux/arm64" + rosetta: bool = False + env: dict[str, str] = field(default_factory=dict) + + def __enter__(self) -> "SidecarSession": + global _ACTIVE_SESSION + if _ACTIVE_SESSION is not None: + raise SidecarError("a LUPINE sidecar session is already active") + _ensure_registered() + runtime = "container" if self.runtime == "auto" and sys.platform == "darwin" else self.runtime + if runtime != "container": + raise SidecarError("only runtime='container' is implemented") + launcher = ContainerRuntime( + image=self.image, + server=self.server, + platform=self.platform, + rosetta=self.rosetta, + env=self.env, + ) + launcher.prepare() + self._proc = subprocess.Popen( + launcher.command(textwrap.dedent(_WORKER)), + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + ) + self._lock = threading.Lock() + self.info = self._request({"op": "ping"}) + if not self.info.get("cuda_available"): + raise SidecarError(f"sidecar worker has no CUDA device: {self.info}") + _ACTIVE_SESSION = self + self._mode = SidecarDispatchMode(self) + self._mode.__enter__() + atexit.register(self.close) + return self + + def __exit__(self, exc_type: object, exc: object, tb: object) -> bool: + self.close() + return False + + def close(self) -> None: + global _ACTIVE_SESSION + if _ACTIVE_SESSION is self: + _ACTIVE_SESSION = None + mode = getattr(self, "_mode", None) + if mode is not None: + mode.__exit__(None, None, None) + self._mode = None + proc = getattr(self, "_proc", None) + if proc is not None and proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=2) + except subprocess.TimeoutExpired: + proc.kill() + self._proc = None + + def device(self, index: int = 0) -> Any: + if index != 0: + raise SidecarError("sidecar prototype exposes one LUPINE device") + return torch.device(f"{_BACKEND_NAME}:0") + + def _request(self, payload: dict[str, Any]) -> Any: + with self._lock: + proc = self._proc + if proc.stdin is None or proc.stdout is None: + raise SidecarError("sidecar worker pipes are closed") + proc.stdin.write(json.dumps(payload) + "\n") + proc.stdin.flush() + line = proc.stdout.readline() + if not line: + stderr = "" + if proc.stderr is not None: + stderr = proc.stderr.read() + raise SidecarError(f"sidecar worker exited with code {proc.poll()}: {stderr}") + response = json.loads(line) + if not response.get("ok"): + raise SidecarError(response.get("traceback") or response.get("error")) + return response["result"] + + def _wrap(self, result: dict[str, Any]) -> SidecarTensor: + return SidecarTensor( + session=self, + handle=result["handle"], + shape=tuple(result["shape"]), + dtype=_dtype_from_name(result["dtype"]), + device=self.device(), + ) + + def _encode(self, value: Any) -> Any: + if isinstance(value, SidecarTensor): + return {"__sidecar_tensor__": value._lupine_handle} + if isinstance(value, torch.dtype): + return {"__dtype__": _dtype_name(value)} + if isinstance(value, torch.device): + device = "cuda:0" if value.type == _BACKEND_NAME else str(value) + return {"__device__": device} + if isinstance(value, torch.layout): + return {"__layout__": str(value).removeprefix("torch.")} + if isinstance(value, torch.memory_format): + return {"__memory_format__": str(value).removeprefix("torch.")} + if isinstance(value, torch.Size): + return {"__tuple__": [self._encode(item) for item in value]} + if isinstance(value, tuple): + return {"__tuple__": [self._encode(item) for item in value]} + if isinstance(value, list): + return [self._encode(item) for item in value] + if isinstance(value, Mapping): + return {key: self._encode(item) for key, item in value.items()} + return value + + def _decode(self, value: Any) -> Any: + kind = value.get("type") if isinstance(value, dict) else None + if kind == "tensor": + return self._wrap(value) + if kind == "tensor_data": + return torch.tensor(value["data"], dtype=_dtype_from_name(value["dtype"])) + if kind == "tuple": + return tuple(self._decode(item) for item in value["items"]) + if kind == "list": + return [self._decode(item) for item in value["items"]] + if kind == "dict": + return {key: self._decode(item) for key, item in value["items"].items()} + if kind == "value": + return value["value"] + raise SidecarError(f"sidecar returned unsupported result: {value!r}") + + def forward(self, func: Any, args: tuple[Any, ...], kwargs: dict[str, Any]) -> Any: + op = _op_name(func) + result = self._request( + { + "op": "call", + "packet": op["packet"], + "overload": op["overload"], + "args": self._encode(args), + "kwargs": self._encode(kwargs), + } + ) + return self._decode(result) + + +def sidecar( + server: str | None = None, + *, + image: str = DEFAULT_IMAGE, + runtime: str = "auto", + platform: str = "linux/arm64", + rosetta: bool = False, + env: dict[str, str] | None = None, +) -> SidecarSession: + server = server or os.environ.get("LUPINE_SERVER") + if not server: + raise SidecarError("pass server=... or set LUPINE_SERVER") + return SidecarSession( + server=server, + image=image, + runtime=runtime, + platform=platform, + rosetta=rosetta, + env=dict(env or {}), + ) diff --git a/python/tests/test_lupine_adapter.py b/python/tests/test_lupine_adapter.py index 534cf56..a49ab86 100644 --- a/python/tests/test_lupine_adapter.py +++ b/python/tests/test_lupine_adapter.py @@ -161,3 +161,158 @@ def test_duplicate_hosts_are_rejected(lupine_module): with pytest.raises(lupine.LupineError, match="unique"): lupine.connect(host=["host-a:14833", "host-a"]) + + +def test_sidecar_container_runtime_defaults_to_arm64(monkeypatch): + pytest.importorskip("torch") + import lupine.sidecar as sidecar + + monkeypatch.setattr(sidecar.shutil, "which", lambda name: "/usr/bin/container") + monkeypatch.setattr(sidecar.sys, "platform", "darwin") + + cmd = sidecar.ContainerRuntime(server="host-a:14833").command("print(1)") + + assert cmd[:8] == [ + "/usr/bin/container", + "run", + "--rm", + "--interactive", + "--progress", + "none", + "--platform", + "linux/arm64", + ] + assert "--rosetta" not in cmd + assert "LUPINE_SERVER=host-a:14833" in cmd + + +def test_sidecar_container_runtime_is_macos_only(monkeypatch): + pytest.importorskip("torch") + import lupine.sidecar as sidecar + + monkeypatch.setattr(sidecar.sys, "platform", "linux") + + with pytest.raises(sidecar.SidecarError, match="only supported on macOS"): + sidecar.ContainerRuntime(server="host-a:14833").command("print(1)") + + +def test_sidecar_container_runtime_requires_cli(monkeypatch): + pytest.importorskip("torch") + import lupine.sidecar as sidecar + + monkeypatch.setattr(sidecar.shutil, "which", lambda name: None) + monkeypatch.setattr(sidecar.sys, "platform", "darwin") + + with pytest.raises(sidecar.SidecarError, match="brew install --cask container"): + sidecar.ContainerRuntime(server="host-a:14833").command("print(1)") + + +def test_sidecar_container_runtime_starts_services_and_pulls_missing_image(monkeypatch): + pytest.importorskip("torch") + import lupine.sidecar as sidecar + + calls = [] + + def fake_run(args, **kwargs): + calls.append(args) + if args[1:4] == ["system", "status", "--format"]: + return sidecar.subprocess.CompletedProcess(args, 0, '{"status":"stopped"}', "") + return sidecar.subprocess.CompletedProcess(args, 0, "", "") + + monkeypatch.setattr(sidecar.shutil, "which", lambda name: "/usr/bin/container") + monkeypatch.setattr(sidecar.sys, "platform", "darwin") + monkeypatch.setattr(sidecar.subprocess, "run", fake_run) + + sidecar.ContainerRuntime(server="host-a:14833").prepare() + + assert calls == [ + ["/usr/bin/container", "system", "status", "--format", "json"], + ["/usr/bin/container", "system", "start"], + ["/usr/bin/container", "image", "inspect", sidecar.DEFAULT_IMAGE], + ] + + +def test_sidecar_container_runtime_pulls_missing_image(monkeypatch): + pytest.importorskip("torch") + import lupine.sidecar as sidecar + + calls = [] + + def fake_run(args, **kwargs): + calls.append(args) + if args[1:3] == ["image", "inspect"]: + return sidecar.subprocess.CompletedProcess(args, 1, "", "missing") + return sidecar.subprocess.CompletedProcess(args, 0, '{"status":"running"}', "") + + monkeypatch.setattr(sidecar.shutil, "which", lambda name: "/usr/bin/container") + monkeypatch.setattr(sidecar.sys, "platform", "darwin") + monkeypatch.setattr(sidecar.subprocess, "run", fake_run) + + sidecar.ContainerRuntime(server="host-a:14833").prepare() + + assert calls[-1] == [ + "/usr/bin/container", + "image", + "pull", + "--progress", + "none", + "--platform", + "linux/arm64", + sidecar.DEFAULT_IMAGE, + ] + + +def test_sidecar_dispatch_mode_forwards_factory_ops(monkeypatch): + pytest.importorskip("torch") + import torch + import lupine.sidecar as sidecar + + sidecar._ensure_registered() + session = sidecar.SidecarSession(server="host-a:14833") + calls = [] + + def fake_request(payload): + calls.append(payload) + return {"type": "tensor", "handle": 1, "shape": [2, 3], "dtype": "float32"} + + monkeypatch.setattr(session, "_request", fake_request) + + with sidecar.SidecarDispatchMode(session): + tensor = torch.zeros((2, 3), device=session.device(), dtype=torch.float32) + + assert isinstance(tensor, sidecar.SidecarTensor) + assert calls[0]["op"] == "call" + assert calls[0]["packet"] == "zeros" + assert calls[0]["kwargs"]["device"] == {"__device__": "cuda:0"} + assert calls[0]["kwargs"]["dtype"] == {"__dtype__": "float32"} + + +def test_sidecar_dispatch_mode_forwards_tensor_ops(monkeypatch): + pytest.importorskip("torch") + import torch + import lupine.sidecar as sidecar + + sidecar._ensure_registered() + session = sidecar.SidecarSession(server="host-a:14833") + calls = [] + + def fake_request(payload): + calls.append(payload) + return {"type": "tensor", "handle": 2, "shape": [2], "dtype": "float32"} + + monkeypatch.setattr(session, "_request", fake_request) + tensor = sidecar.SidecarTensor( + session=session, + handle=1, + shape=(2,), + dtype=torch.float32, + device=session.device(), + ) + + with sidecar.SidecarDispatchMode(session): + result = tensor + 3 + + assert isinstance(result, sidecar.SidecarTensor) + assert calls[0]["packet"] == "add" + assert calls[0]["overload"] == "Tensor" + assert calls[0]["args"]["__tuple__"][0] == {"__sidecar_tensor__": 1}