@@ -193,12 +193,13 @@ def test_normal_streaming_write(self, tmp_path: Path, mocker):
193193 cb = mocker .MagicMock ()
194194 tracker = _make_tracker (tmp_path , commit_cache_cb = cb )
195195 meta = _make_cache_meta ()
196+ tracker .cache_meta = meta
196197 que : SimpleQueue [bytes | None ] = SimpleQueue ()
197198 que .put (b"chunk1" )
198199 que .put (b"chunk2" )
199200 que .put (None ) # sentinel
200201
201- tracker .provider_write_file_in_thread (meta , que )
202+ tracker .provider_write_file_in_thread (que )
202203
203204 assert tracker ._tracker_events .writer_finished
204205 assert not tracker ._tracker_events .writer_failed
@@ -211,25 +212,25 @@ def test_normal_streaming_write(self, tmp_path: Path, mocker):
211212
212213 def test_writer_failed_flag_aborts (self , tmp_path : Path ):
213214 tracker = _make_tracker (tmp_path )
214- meta = _make_cache_meta ()
215+ tracker . cache_meta = _make_cache_meta ()
215216 que : SimpleQueue [bytes | None ] = SimpleQueue ()
216217 que .put (b"data" )
217218 # pre-set failed so the loop sees it on next iteration
218219 tracker ._tracker_events .set_writer_failed ()
219220 que .put (b"more_data" )
220221 que .put (None )
221222
222- tracker .provider_write_file_in_thread (meta , que )
223+ tracker .provider_write_file_in_thread (que )
223224 assert tracker ._tracker_events .writer_failed
224225
225226 def test_hard_limit_aborts (self , tmp_path : Path ):
226227 tracker = _make_tracker (tmp_path , below_hard_limit_set = False )
227- meta = _make_cache_meta ()
228+ tracker . cache_meta = _make_cache_meta ()
228229 que : SimpleQueue [bytes | None ] = SimpleQueue ()
229230 que .put (b"data" )
230231 que .put (None )
231232
232- tracker .provider_write_file_in_thread (meta , que )
233+ tracker .provider_write_file_in_thread (que )
233234 assert tracker ._tracker_events .writer_failed
234235
235236
@@ -238,9 +239,10 @@ def test_normal_single_write(self, tmp_path: Path, mocker):
238239 cb = mocker .MagicMock ()
239240 tracker = _make_tracker (tmp_path , commit_cache_cb = cb )
240241 meta = _make_cache_meta ()
242+ tracker .cache_meta = meta
241243 data = b"small file content"
242244
243- tracker .provider_write_once_in_thread (meta , data )
245+ tracker .provider_write_once_in_thread (data )
244246
245247 assert tracker ._tracker_events .writer_finished
246248 assert not tracker ._tracker_events .writer_failed
@@ -252,13 +254,33 @@ def test_normal_single_write(self, tmp_path: Path, mocker):
252254
253255 def test_hard_limit_aborts (self , tmp_path : Path ):
254256 tracker = _make_tracker (tmp_path , below_hard_limit_set = False )
255- meta = _make_cache_meta ()
257+ tracker . cache_meta = _make_cache_meta ()
256258
257- tracker .provider_write_once_in_thread (meta , b"data" )
259+ tracker .provider_write_once_in_thread (b"data" )
258260 assert tracker ._tracker_events .writer_failed
259261 assert not tracker ._tracker_events .writer_finished
260262
261263
264+ class TestProviderHandleEmptyFile :
265+ def test_sets_writer_finished_and_commits (self , tmp_path : Path , mocker ):
266+ cb = mocker .MagicMock ()
267+ tracker = _make_tracker (tmp_path , commit_cache_cb = cb )
268+ meta = _make_cache_meta ()
269+ tracker .cache_meta = meta
270+
271+ tracker .provider_handle_empty_file ()
272+
273+ assert tracker ._tracker_events .writer_finished
274+ assert not tracker ._tracker_events .writer_failed
275+ cb .assert_called_once_with (meta )
276+
277+ def test_asserts_cache_meta_is_set (self , tmp_path : Path ):
278+ tracker = _make_tracker (tmp_path )
279+ # cache_meta is None by default
280+ with pytest .raises (AssertionError ):
281+ tracker .provider_handle_empty_file ()
282+
283+
262284# ---- CacheTracker subscriber methods ----
263285
264286
@@ -296,14 +318,29 @@ async def test_timeout_raises_provider_not_ready(self, tmp_path: Path, mocker):
296318 with pytest .raises (CacheProviderNotReady ):
297319 await tracker .subscriber_wait_for_provider ()
298320
299- async def test_provider_failed_returns_false (self , tmp_path : Path ):
300- """set_writer_failed() also sets _started=True, so the wait loop
301- exits immediately and returns writer_finished (False) ."""
321+ async def test_provider_failed_raises (self , tmp_path : Path ):
322+ """set_writer_failed() sets _started=True, so the wait loop exits,
323+ then the post-loop writer_failed check raises CacheProviderNotReady ."""
302324 tracker = _make_tracker (tmp_path )
303325 tracker ._tracker_events .set_writer_failed ()
304326
305- result = await tracker .subscriber_wait_for_provider ()
306- assert result is False
327+ with pytest .raises (CacheProviderNotReady ):
328+ await tracker .subscriber_wait_for_provider ()
329+
330+ async def test_provider_fails_during_wait (self , tmp_path : Path ):
331+ """Provider fails while subscriber is polling — subscriber should
332+ see writer_started=True (from set_writer_failed), exit the loop,
333+ and raise CacheProviderNotReady from the post-loop check."""
334+ tracker = _make_tracker (tmp_path )
335+
336+ async def _fail_later ():
337+ await asyncio .sleep (0.05 )
338+ tracker ._tracker_events .set_writer_failed ()
339+
340+ task = asyncio .create_task (_fail_later ())
341+ with pytest .raises (CacheProviderNotReady ):
342+ await tracker .subscriber_wait_for_provider ()
343+ await task
307344
308345
309346class TestSubscriberStreamCacheInThread :
@@ -482,28 +519,53 @@ def work():
482519 assert called .is_set ()
483520 assert not ev .writer_failed
484521
485- async def test_stream_writing_cache_empty_fd (self , tmp_path : Path ):
486- """Empty async generator → commit_cache_cb called directly."""
487- tracker = _make_tracker (tmp_path )
522+ async def test_stream_writing_cache_empty_fd_stop_iteration (
523+ self , tmp_path : Path , mocker
524+ ):
525+ """Empty async generator (StopAsyncIteration) → provider_handle_empty_file called."""
526+ cb = mocker .MagicMock ()
527+ tracker = _make_tracker (tmp_path , commit_cache_cb = cb )
488528 meta = _make_cache_meta ()
529+ tracker .cache_meta = meta
489530
490531 async def empty_gen () -> AsyncGenerator [bytes ]:
491532 return
492533 yield b"" # noqa: unreachable, needed to make this an async generator
493534
494- gen = self .pool .stream_writing_cache (empty_gen (), tracker , meta )
535+ gen = self .pool .stream_writing_cache (empty_gen (), tracker )
495536 chunks = [chunk async for chunk in gen ]
496537 assert chunks == []
538+ assert tracker ._tracker_events .writer_finished
539+ cb .assert_called_once_with (meta )
540+
541+ async def test_stream_writing_cache_empty_fd_zero_len_chunk (
542+ self , tmp_path : Path , mocker
543+ ):
544+ """First chunk is zero-length bytes → provider_handle_empty_file called."""
545+ cb = mocker .MagicMock ()
546+ tracker = _make_tracker (tmp_path , commit_cache_cb = cb )
547+ meta = _make_cache_meta ()
548+ tracker .cache_meta = meta
549+
550+ async def empty_chunk_gen () -> AsyncGenerator [bytes ]:
551+ yield b""
552+
553+ gen = self .pool .stream_writing_cache (empty_chunk_gen (), tracker )
554+ chunks = [chunk async for chunk in gen ]
555+ assert chunks == [b"" ]
556+ assert tracker ._tracker_events .writer_finished
557+ cb .assert_called_once_with (meta )
497558
498559 async def test_stream_writing_cache_single_chunk (self , tmp_path : Path ):
499560 """Single-chunk fd → provider_write_once_in_thread dispatched."""
500561 tracker = _make_tracker (tmp_path )
501562 meta = _make_cache_meta ()
563+ tracker .cache_meta = meta
502564
503565 async def single_chunk_gen () -> AsyncGenerator [bytes ]:
504566 yield b"only_chunk"
505567
506- gen = self .pool .stream_writing_cache (single_chunk_gen (), tracker , meta )
568+ gen = self .pool .stream_writing_cache (single_chunk_gen (), tracker )
507569 chunks = [chunk async for chunk in gen ]
508570 assert chunks == [b"only_chunk" ]
509571
@@ -518,13 +580,14 @@ async def test_stream_writing_cache_multi_chunk(self, tmp_path: Path):
518580 """Multi-chunk fd → provider_write_file_in_thread dispatched."""
519581 tracker = _make_tracker (tmp_path )
520582 meta = _make_cache_meta ()
583+ tracker .cache_meta = meta
521584
522585 async def multi_chunk_gen () -> AsyncGenerator [bytes ]:
523586 yield b"chunk1"
524587 yield b"chunk2"
525588 yield b"chunk3"
526589
527- gen = self .pool .stream_writing_cache (multi_chunk_gen (), tracker , meta )
590+ gen = self .pool .stream_writing_cache (multi_chunk_gen (), tracker )
528591 chunks = [chunk async for chunk in gen ]
529592 assert chunks == [b"chunk1" , b"chunk2" , b"chunk3" ]
530593
@@ -537,13 +600,14 @@ async def test_stream_writing_cache_fd_error_raises(self, tmp_path: Path):
537600 """Exception from upstream fd → CacheStreamingFailed raised."""
538601 tracker = _make_tracker (tmp_path )
539602 meta = _make_cache_meta ()
603+ tracker .cache_meta = meta
540604
541605 async def failing_gen () -> AsyncGenerator [bytes ]:
542606 yield b"chunk1"
543607 yield b"chunk2"
544608 raise ConnectionError ("upstream broke" )
545609
546- gen = self .pool .stream_writing_cache (failing_gen (), tracker , meta )
610+ gen = self .pool .stream_writing_cache (failing_gen (), tracker )
547611 with pytest .raises (CacheStreamingFailed ):
548612 async for _ in gen :
549613 pass
@@ -650,6 +714,25 @@ async def _finish_later():
650714 assert b"" .join (chunks ) == b"streamed data"
651715 await task
652716
717+ async def test_subscribe_tracker_finished_empty_file (self , tmp_path : Path ):
718+ """Finished empty cache (cache_size=0) → returns b"" directly."""
719+ tracker = _make_tracker (tmp_path )
720+ meta = _make_cache_meta (cache_size = 0 )
721+ tracker .cache_meta = meta
722+ tracker ._tracker_events .set_writer_finished ()
723+
724+ result = await self .pool .subscribe_tracker (tracker )
725+ assert result == b""
726+
727+ async def test_subscribe_tracker_provider_failed (self , tmp_path : Path ):
728+ """Provider failed → CacheProviderNotReady raised."""
729+ tracker = _make_tracker (tmp_path )
730+ tracker .cache_meta = _make_cache_meta ()
731+ tracker ._tracker_events .set_writer_failed ()
732+
733+ with pytest .raises (CacheProviderNotReady ):
734+ await self .pool .subscribe_tracker (tracker )
735+
653736 async def test_subscribe_tracker_finished_large_file (self , tmp_path : Path ):
654737 """Finished large cache → stream path (not read_file_once)."""
655738 tracker = _make_tracker (tmp_path )
0 commit comments