Skip to content

Commit 55dd849

Browse files
committed
Support AIS GetBatch range reads
1 parent dbfa2d1 commit 55dd849

2 files changed

Lines changed: 170 additions & 72 deletions

File tree

lhotse/ais/batch_loader.py

Lines changed: 99 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class AISBatchLoader:
6666
Shar lazy-pointer (``shar_ptr``) sources transparently fall back to
6767
per-object byte-range ``get_reader(byte_range=…)`` calls when the installed
6868
AIStore SDK / cluster doesn't support byte-range entries in
69-
``BatchRequest.add()`` (or when ``force_individual=True``). The MOSS
69+
MOSS GetBatch requests (or when ``force_individual=True``). The MOSS
7070
GetBatch path is preferred when available; the byte-range fallback exists
7171
so non-gzipped lhotse-shar cuts on AIS work even on older deployments.
7272
"""
@@ -121,11 +121,11 @@ def _moss_attrs(info: Any) -> tuple[str, str, str, Optional[str]]:
121121
Normalise an AIStore batch request/result entry into ``(bck, provider,
122122
obj_name, archpath)``.
123123
124-
Verified empirically against ``aistore==1.23.0``:
125-
* ``BatchRequest.requests_list`` items are ``aistore.sdk.batch.types.MossIn``
124+
Verified empirically against ``aistore==1.23.0`` and ``1.25.0``:
125+
* ``Batch.requests_list`` items are ``aistore.sdk.batch.types.MossIn``
126126
with the original short attribute names (``bck``, ``provider``,
127127
``obj_name``, ``archpath``).
128-
* ``BatchRequest.get()`` yields ``(MossOut, content)`` tuples and
128+
* ``Batch.get()`` yields ``(MossOut, content)`` tuples and
129129
``MossOut`` carries different attribute names. To stay robust
130130
against further SDK churn, this helper falls back through every
131131
naming convention we've seen in the wild
@@ -159,7 +159,8 @@ def _get_object_from_moss_in(self, moss_in: Any) -> bytes:
159159
Fetch a single object from AIStore using the ObjectNames request info.
160160
161161
This method is used as a fallback when batch operations fail or return empty content.
162-
It handles archive extraction if an archpath is specified.
162+
It handles archive extraction if an archpath is specified, and preserves
163+
``start`` / ``length`` byte-range requests used by Shar lazy pointers.
163164
164165
Args:
165166
moss_in: AIStore ObjectNames request — accepts both ``MossIn`` (from
@@ -173,21 +174,51 @@ def _get_object_from_moss_in(self, moss_in: Any) -> bytes:
173174
Raises:
174175
Exception: If the object cannot be fetched from AIStore.
175176
"""
176-
from aistore.sdk.archive_config import ArchiveConfig
177-
178177
bck, provider, obj_name, archpath = self._moss_attrs(moss_in)
178+
start, length = self._moss_range(moss_in)
179+
180+
obj = self.client.bucket(bck_name=bck, provider=provider).object(obj_name)
181+
182+
if start is not None or length is not None:
183+
if archpath:
184+
raise AISBatchLoaderError(
185+
"Cannot fall back to direct GET for a request that combines "
186+
f"byte range and archive extraction: {obj_name}/{archpath}"
187+
)
188+
if start is None or length is None:
189+
raise AISBatchLoaderError(
190+
f"Invalid byte-range request for {obj_name}: "
191+
f"start={start!r}, length={length!r}"
192+
)
193+
if length <= 0:
194+
return b""
195+
end_inclusive = start + length - 1
196+
return obj.get_reader(
197+
byte_range=f"bytes={start}-{end_inclusive}"
198+
).read_all()
199+
200+
from aistore.sdk.archive_config import ArchiveConfig
179201

180202
config = None
181203
if archpath:
182204
config = ArchiveConfig(archpath=archpath)
183205

184-
reader = (
185-
self.client.bucket(bck_name=bck, provider=provider)
186-
.object(obj_name)
187-
.get_reader(archive_config=config)
188-
)
206+
reader = obj.get_reader(archive_config=config)
189207
return reader.read_all()
190208

209+
@staticmethod
210+
def _moss_range(info: Any) -> tuple[Optional[int], Optional[int]]:
211+
"""Return ``(start, length)`` byte-range fields from a MOSS request."""
212+
from numbers import Integral
213+
214+
def _int_or_none(name: str) -> Optional[int]:
215+
value = getattr(info, name, None)
216+
if isinstance(value, Integral) and not isinstance(value, bool):
217+
return int(value)
218+
return None
219+
220+
return _int_or_none("start"), _int_or_none("length")
221+
191222
def __call__(self, cuts: CutSet) -> CutSet:
192223
"""
193224
Fetch all data referenced by a CutSet in one AIStore batch operation.
@@ -229,8 +260,8 @@ def __call__(self, cuts: CutSet) -> CutSet:
229260

230261
# Decide once per call whether shar_ptr entries can go through MOSS
231262
# GetBatch byte-range adds. If ``force_individual`` is on, or the SDK
232-
# rejects byte-range BatchRequest.add() calls, route shar_ptr through
233-
# the per-object byte-range fallback collected in ``shar_ptr_fallback``.
263+
# predates byte-range MOSS support, route shar_ptr through the per-object
264+
# byte-range fallback collected in ``shar_ptr_fallback``.
234265
shar_ptr_uses_batch = (
235266
not self.force_individual
236267
) and self._aistore_byte_range_supported()
@@ -434,13 +465,20 @@ def _individual_get():
434465

435466
# Retry with direct API call if content is empty (from timeout or actual empty response)
436467
if content == b"":
437-
bck_, provider_, obj_name_, archpath_ = self._moss_attrs(info)
468+
direct_info = (
469+
saved_requests_list[request_idx]
470+
if request_idx < len(saved_requests_list)
471+
else info
472+
)
473+
bck_, provider_, obj_name_, archpath_ = self._moss_attrs(
474+
direct_info
475+
)
438476
logger.warning(
439477
f"Object {obj_name_}/{archpath_} from bucket {provider_}://{bck_} "
440478
f"returned empty content. Retrying with direct AIStore API call."
441479
)
442480
try:
443-
content = self._get_object_from_moss_in(info)
481+
content = self._get_object_from_moss_in(direct_info)
444482
except Exception as ex:
445483
logger.error(
446484
f"Failed to fetch object {obj_name_} from bucket "
@@ -478,8 +516,8 @@ def _cuts_have_ais_data(cuts: CutSet) -> bool:
478516
"""Return True iff any manifest in ``cuts`` is served from AIStore.
479517
480518
Mirrors the detection conditions in :meth:`_collect_manifest_urls` but
481-
without touching ``self.client`` or a ``BatchRequest`` — used to short-
482-
circuit :meth:`__call__` when the CutSet has no AIS-backed data, so
519+
without touching ``self.client`` or a ``Batch`` — used to short-circuit
520+
:meth:`__call__` when the CutSet has no AIS-backed data, so
483521
loaders constructed in environments where AIStore isn't configured
484522
still pass cuts through unchanged.
485523
"""
@@ -608,32 +646,50 @@ def _collect_manifest_urls(
608646
@lru_cache(maxsize=1)
609647
def _aistore_byte_range_supported() -> bool:
610648
"""
611-
Detect whether the installed aistore SDK accepts byte-range fetch in
612-
:meth:`aistore.sdk.batch.batch.BatchRequest.add` *without raising*.
613-
614-
Probe: instantiate a ``BatchRequest`` and call ``add(start=0,
615-
length=0)`` against a sentinel object. If the SDK validates byte-range
616-
usage eagerly with ``NotImplementedError`` (current behaviour, see
617-
``aistore/sdk/batch/batch.py``), this fails locally before any IO.
618-
Cached for the process lifetime.
649+
Detect whether the installed aistore SDK/cluster generation supports
650+
byte-range MOSS entries.
651+
652+
``aistore==1.25.0`` removed the older ``BatchRequest`` class and still
653+
has ``Batch.add(..., start=, length=)`` guarded by ``NotImplementedError``.
654+
The supported API is the lower-level MOSS request schema:
655+
``Batch.requests_list`` exposes ``MossReq.moss_in`` and ``MossIn``
656+
serializes ``start`` / ``length`` in the GetBatch JSON body. Older SDKs
657+
had partial client-side fields before server support existed, so keep a
658+
conservative version gate and schema check here.
619659
"""
620660
try:
621-
from aistore.sdk.batch.batch import BatchRequest
661+
import re
662+
663+
import aistore
664+
from aistore.sdk.batch.batch import Batch
665+
from aistore.sdk.batch.types import MossIn, MossReq
622666
except Exception:
623667
return False
624-
try:
625-
req = BatchRequest()
626-
req.add(object(), start=0, length=0)
627-
except NotImplementedError:
628-
return False
629-
except TypeError:
630-
# ``start`` / ``length`` not in the signature on older SDKs.
668+
669+
m = re.match(r"^(\d+)\.(\d+)\.(\d+)", getattr(aistore, "__version__", ""))
670+
if m is None or tuple(map(int, m.groups())) < (1, 25, 0):
631671
return False
672+
673+
try:
674+
descriptor = vars(Batch).get("requests_list")
675+
if not isinstance(descriptor, property):
676+
return False
677+
if "moss_in" not in MossReq.model_fields:
678+
return False
679+
if not {"start", "length"}.issubset(MossIn.model_fields):
680+
return False
681+
probe = MossIn.model_construct(
682+
obj_name="__lhotse_probe__.tar",
683+
bck="__lhotse_probe__",
684+
provider="ais",
685+
start=0,
686+
length=1,
687+
)
688+
dumped = probe.model_dump(by_alias=True, exclude_defaults=True)
632689
except Exception:
633-
# Any other exception (e.g. invalid object stub) means the SDK
634-
# got past byte-range validation, so the feature is supported.
635-
return True
636-
return True
690+
return False
691+
692+
return dumped.get("start") == 0 and dumped.get("length") == 1
637693

638694
def _add_shar_ptr_to_batch(
639695
self,
@@ -648,8 +704,8 @@ def _add_shar_ptr_to_batch(
648704
Schedule a Shar lazy pointer fetch.
649705
650706
When ``shar_ptr_uses_batch`` is True the request is added to the MOSS
651-
``BatchRequest`` via direct ``MossIn.model_construct`` append (see
652-
:meth:`_append_moss_in` for why we bypass ``BatchRequest.add``).
707+
``Batch`` via direct ``MossIn.model_construct`` append (see
708+
:meth:`_append_moss_in` for why we bypass ``Batch.add``).
653709
Otherwise the ``(manifest_idx, bck, provider, obj_name, offset, length)``
654710
tuple is appended to ``shar_ptr_fallback`` so :meth:`__call__` can
655711
drain it via per-object byte-range gets.
@@ -713,11 +769,11 @@ def _append_moss_in(
713769
length: Optional[int] = None,
714770
) -> None:
715771
"""Append one MossIn entry to the batch request, bypassing the SDK's
716-
``BatchRequest.add(bucket.object(obj_name), ...)`` path.
772+
``Batch.add(bucket.object(obj_name), ...)`` path.
717773
718774
Why this bypass exists
719775
----------------------
720-
``BatchRequest.add`` builds a fresh ``Bucket`` + ``BucketDetails``
776+
``Batch.add`` builds a fresh ``Bucket`` + ``BucketDetails``
721777
(Pydantic v2) + ``Object`` + ``MossIn`` (Pydantic v2 with field
722778
aliases) per call. With ~45 manifests per minibatch in a Granary
723779
blend, profiling (nsys 2026-05-15, NVTX scope ``ais.collect_urls``)
@@ -739,7 +795,7 @@ def _append_moss_in(
739795
round-trips ``model_construct`` vs the validating constructor
740796
and asserts ``model_dump`` equality.
741797
- ``batch.request.moss_in`` is a non-public attribute. Stable
742-
through 1.20.0 → 1.23.0; bumping the SDK major version requires
798+
through 1.20.0 → 1.25.0; bumping the SDK major version requires
743799
re-verifying that the field still exists with the same shape.
744800
"""
745801
# Local imports kept local: aistore is an optional dependency and

test/shar/test_lazy_pointers.py

Lines changed: 71 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from __future__ import annotations
77

88
import pickle
9+
import re
910
from pathlib import Path
1011

1112
# Reuse the standard shar test fixture which specifies all fields
@@ -243,12 +244,19 @@ def test_audio_format_inferred_from_payload(shar_dir_numpy):
243244
# ---------------------------------------------------------------------------
244245

245246

246-
def test_ais_byte_range_disabled_today():
247-
"""With the currently-installed aistore SDK, byte-range batch is unsupported."""
248-
pytest.importorskip("aistore")
247+
def _aistore_at_least(aistore, version: tuple[int, int, int]) -> bool:
248+
m = re.match(r"^(\d+)\.(\d+)\.(\d+)", getattr(aistore, "__version__", ""))
249+
return m is not None and tuple(map(int, m.groups())) >= version
250+
251+
252+
def test_ais_byte_range_support_follows_sdk_schema():
253+
"""``aistore>=1.25.0`` exposes byte ranges through the MOSS request schema."""
254+
aistore = pytest.importorskip("aistore")
249255
from lhotse.ais.batch_loader import AISBatchLoader
250256

251-
assert AISBatchLoader._aistore_byte_range_supported() is False
257+
assert AISBatchLoader._aistore_byte_range_supported() is _aistore_at_least(
258+
aistore, (1, 25, 0)
259+
)
252260

253261

254262
def test_ais_collect_queues_shar_ptr_fallback_when_byte_range_unsupported():
@@ -302,8 +310,7 @@ def test_ais_collect_queues_shar_ptr_fallback_when_byte_range_unsupported():
302310

303311

304312
def test_ais_byte_range_path_when_sdk_supports_it():
305-
"""Future-proof: when the SDK exposes byte-range batch, the loader routes
306-
Shar pointers through ``batch.add(start=, length=)``."""
313+
"""When supported, Shar pointers are added as byte-range MOSS requests."""
307314
pytest.importorskip("aistore")
308315
from lhotse import AudioSource, Recording
309316
from lhotse.ais.batch_loader import AISBatchLoader
@@ -322,29 +329,14 @@ def test_ais_byte_range_path_when_sdk_supports_it():
322329
duration=1.0,
323330
)
324331

325-
captured = []
326-
327332
class FakeBatch:
328-
def add(self, obj, *, start=None, length=None, archpath=None):
329-
captured.append((obj, start, length, archpath))
330-
331-
class FakeObject:
332-
def __init__(self, name):
333-
self.name = name
334-
335-
class FakeBucket:
336-
def object(self, obj_name):
337-
return FakeObject(obj_name)
338-
339-
class FakeClient:
340-
def bucket(self, bck_name, provider):
341-
return FakeBucket()
333+
def __init__(self):
334+
self.requests_list = []
342335

343336
with patch.object(
344337
AISBatchLoader, "_aistore_byte_range_supported", staticmethod(lambda: True)
345338
):
346339
loader = AISBatchLoader.__new__(AISBatchLoader)
347-
loader._client = FakeClient()
348340
batch = FakeBatch()
349341
result = loader._collect_manifest_urls(
350342
rec,
@@ -355,12 +347,62 @@ def bucket(self, bck_name, provider):
355347
)
356348

357349
assert result is True
358-
assert len(captured) == 1
359-
obj, start, length, archpath = captured[0]
360-
assert obj.name == "recording.000000.tar"
361-
assert start == 1024
362-
assert length == 8192 - 1024
363-
assert archpath is None
350+
assert len(batch.requests_list) == 1
351+
moss_in = batch.requests_list[0]
352+
assert moss_in.obj_name == "recording.000000.tar"
353+
assert moss_in.start == 1024
354+
assert moss_in.length == 8192 - 1024
355+
assert moss_in.archpath is None
356+
357+
358+
def test_ais_individual_get_preserves_mossin_byte_range():
359+
"""If ranged MOSS fails, direct fallback must retry the same byte range."""
360+
pytest.importorskip("aistore")
361+
from aistore.sdk.batch.types import MossIn
362+
363+
from lhotse.ais.batch_loader import AISBatchLoader
364+
365+
captured = {}
366+
367+
class FakeReader:
368+
def read_all(self):
369+
return b"payload"
370+
371+
class FakeObject:
372+
def get_reader(self, *, byte_range=None, archive_config=None):
373+
captured["byte_range"] = byte_range
374+
captured["archive_config"] = archive_config
375+
return FakeReader()
376+
377+
class FakeBucket:
378+
def object(self, obj_name):
379+
captured["obj_name"] = obj_name
380+
return FakeObject()
381+
382+
class FakeClient:
383+
def bucket(self, bck_name, provider):
384+
captured["bck_name"] = bck_name
385+
captured["provider"] = provider
386+
return FakeBucket()
387+
388+
loader = AISBatchLoader.__new__(AISBatchLoader)
389+
loader._client = FakeClient()
390+
moss_in = MossIn.model_construct(
391+
obj_name="recording.000000.tar",
392+
bck="b",
393+
provider="ais",
394+
start=1024,
395+
length=8192 - 1024,
396+
)
397+
398+
assert loader._get_object_from_moss_in(moss_in) == b"payload"
399+
assert captured == {
400+
"bck_name": "b",
401+
"provider": "ais",
402+
"obj_name": "recording.000000.tar",
403+
"byte_range": "bytes=1024-8191",
404+
"archive_config": None,
405+
}
364406

365407

366408
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)