Skip to content

Commit c6d9534

Browse files
fix: handle truncated batch stream with sequential GET fallback
- catch StopIteration during batch result iteration and fall back to individual GET requests instead of crashing the DataLoader worker - use batch_stream_failed flag to skip dead iterator for remaining objects - reuse existing _get_object_from_moss_in() retry path for recovery - update test to verify fallback behavior instead of expecting crash Signed-off-by: Abhishek Gaikwad <gaikwadabhishek1997@gmail.com>
1 parent 8feed52 commit c6d9534

2 files changed

Lines changed: 54 additions & 32 deletions

File tree

lhotse/ais/batch_loader.py

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -153,33 +153,49 @@ def sequential_get():
153153

154154
# Apply the received data back into each manifest that had a URL
155155
request_idx = 0
156+
batch_stream_failed = False
156157
for manifest, has_url in manifest_list:
157158
if has_url:
158159
info = None
159160
content = None
160161

161-
try:
162-
info, content = next(batch_result)
163-
except StopIteration:
164-
raise AISBatchLoaderError(
165-
"Batch result iterator exhausted prematurely. "
166-
f"Expected more objects for manifests with URLs."
167-
)
168-
except TimeoutError as e:
169-
# Timeout occurred - recover the request info from saved_requests_list
170-
logger.warning(
171-
f"Timeout while fetching batch result at index {request_idx}: {e}. "
172-
f"Falling back to direct AIStore API call."
173-
)
174-
175-
if request_idx < len(saved_requests_list):
176-
info = saved_requests_list[request_idx]
177-
content = b"" # Mark as empty to trigger retry
178-
else:
179-
raise AISBatchLoaderError(
180-
f"Timeout at request index {request_idx}, but cannot recover: "
181-
f"index out of range for saved_requests_list (len={len(saved_requests_list)})"
182-
) from e
162+
if batch_stream_failed:
163+
# Batch stream already broke — go straight to individual GET
164+
info = saved_requests_list[request_idx]
165+
content = b"" # trigger retry below
166+
else:
167+
try:
168+
info, content = next(batch_result)
169+
except StopIteration:
170+
# Batch stream was truncated (e.g., connection reset mid-tar).
171+
# Fall back to individual GET for this and all remaining objects.
172+
batch_stream_failed = True
173+
logger.warning(
174+
f"Batch stream truncated at index {request_idx}/{len(saved_requests_list)}. "
175+
f"Falling back to direct AIStore API calls for remaining objects."
176+
)
177+
if request_idx < len(saved_requests_list):
178+
info = saved_requests_list[request_idx]
179+
content = b"" # trigger retry below
180+
else:
181+
raise AISBatchLoaderError(
182+
f"Batch stream truncated at index {request_idx}, but cannot recover: "
183+
f"index out of range for saved_requests_list (len={len(saved_requests_list)})"
184+
)
185+
except TimeoutError as e:
186+
# Timeout occurred - recover the request info from saved_requests_list
187+
logger.warning(
188+
f"Timeout while fetching batch result at index {request_idx}: {e}. "
189+
f"Falling back to direct AIStore API call."
190+
)
191+
if request_idx < len(saved_requests_list):
192+
info = saved_requests_list[request_idx]
193+
content = b"" # Mark as empty to trigger retry
194+
else:
195+
raise AISBatchLoaderError(
196+
f"Timeout at request index {request_idx}, but cannot recover: "
197+
f"index out of range for saved_requests_list (len={len(saved_requests_list)})"
198+
) from e
183199

184200
# Retry with direct API call if content is empty (from timeout or actual empty response)
185201
if content == b"":

test/cut/test_ais_batch_loader.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -968,35 +968,41 @@ def test_fallback_failure_raises_error(
968968
loader(cuts)
969969

970970
@patch("lhotse.ais.batch_loader.get_aistore_client")
971-
def test_iterator_exhausted_raises_error(
971+
def test_iterator_exhausted_falls_back_to_sequential(
972972
self, mock_get_client, cut_with_url_recording
973973
):
974-
"""Test that iterator exhaustion raises AISBatchLoaderError."""
974+
"""Test that iterator exhaustion falls back to individual GET requests."""
975975
client = MagicMock()
976976
batch = MagicMock()
977977

978978
# Track add() calls
979979
add_count = []
980980
batch.add.side_effect = lambda *args, **kwargs: add_count.append(1)
981981

982-
# Mock batch.get() to return fewer items than expected
982+
# Mock batch.get() to return fewer items than expected (empty iterator)
983983
def mock_batch_get():
984-
# Return nothing even though we expect 1 item
985984
return iter([])
986985

987986
batch.get.side_effect = lambda: mock_batch_get()
987+
batch.requests_list = [MagicMock(obj_name="test.wav", bck="test-bucket", provider="ais", archpath="")]
988988
client.batch.return_value = batch
989-
client.bucket.return_value = MagicMock()
989+
mock_bucket = MagicMock()
990+
mock_obj = MagicMock()
991+
mock_reader = MagicMock()
992+
mock_reader.read_all.return_value = b"\x00" * 16000
993+
mock_obj.get_reader.return_value = mock_reader
994+
mock_bucket.object.return_value = mock_obj
995+
client.bucket.return_value = mock_bucket
990996
mock_get_client.return_value = (client, None)
991997

992998
loader = AISBatchLoader()
993999
cuts = CutSet.from_cuts([cut_with_url_recording])
9941000

995-
# Should raise AISBatchLoaderError when iterator is exhausted
996-
with pytest.raises(
997-
AISBatchLoaderError, match="Batch result iterator exhausted prematurely"
998-
):
999-
loader(cuts)
1001+
# Should NOT raise — falls back to individual GET
1002+
result = loader(cuts)
1003+
assert result is not None
1004+
# Verify the fallback GET was called
1005+
mock_obj.get_reader.assert_called()
10001006

10011007
@patch("lhotse.ais.batch_loader.get_aistore_client")
10021008
def test_multiple_cuts_with_fallback(self, mock_get_client, cut_with_url_recording):

0 commit comments

Comments
 (0)