@@ -181,6 +181,7 @@ def __init__(
181
181
self ._sem ,
182
182
self ._stop ,
183
183
),
184
+ name = "read_thread(target=_populate_queue)" ,
184
185
daemon = True ,
185
186
)
186
187
self ._workers : List [Union [threading .Thread , mp .Process ]] = []
@@ -193,7 +194,12 @@ def __init__(
193
194
self ._stop if self .method == "thread" else self ._mp_stop ,
194
195
)
195
196
self ._workers .append (
196
- threading .Thread (target = _apply_udf , args = args , daemon = True )
197
+ threading .Thread (
198
+ target = _apply_udf ,
199
+ args = args ,
200
+ daemon = True ,
201
+ name = f"worker_thread_{ worker_id } (target=_apply_udf)" ,
202
+ )
197
203
if self .method == "thread"
198
204
else mp_context .Process (target = _apply_udf , args = args , daemon = True )
199
205
)
@@ -205,6 +211,7 @@ def __init__(
205
211
target = _sort_worker ,
206
212
args = (self ._intermed_q , self ._sort_q , self ._stop ),
207
213
daemon = True ,
214
+ name = "sort_thread(target=_sort_worker)" ,
208
215
)
209
216
self ._out_q = self ._sort_q
210
217
@@ -231,10 +238,10 @@ def __iter__(self) -> Iterator[T]:
231
238
232
239
def __next__ (self ) -> T :
233
240
while True :
234
- if self ._stop .is_set ():
241
+ if self ._stop .is_set () or self . _mp_stop . is_set () :
235
242
raise StopIteration ()
236
243
elif self ._done and self ._sem ._value == self ._max_tasks :
237
- # Don't stop if we still have items in the queue
244
+ # _done is set, and semaphore is back at initial value, so we can stop
238
245
self ._stop .set ()
239
246
self ._mp_stop .set ()
240
247
raise StopIteration ()
@@ -468,7 +475,7 @@ class _SingleThreadedMapper(Iterator[T]):
468
475
Prefetcher and PinMemory.
469
476
470
477
A thread is started on __init__ and stopped on __del__/_shutdown.
471
- The thread runs _populate_queue , which acquires a BoundedSemaphore with initial value
478
+ The thread runs worker , which acquires a BoundedSemaphore with initial value
472
479
of `prefetch_factor`.
473
480
474
481
When next() is called on this iterator, it will block until an item is available on _q.
@@ -478,14 +485,15 @@ class _SingleThreadedMapper(Iterator[T]):
478
485
- any other item: return the item
479
486
480
487
A Bounded semaphore is used to limit concurrency and memory utilization.
481
- If N items have been pulled from the source, and M items have been yielded by this iterator,
482
- we maintain the invariant that semaphore.value + (N - M) == prefetch_factor (modulo
488
+ If N items have been pulled from the source (i.e. acquire the semaphore),
489
+ and M items have been yielded by this iterator (i.e. release the semaphore),
490
+ we maintain the invariant that semaphore.value + (M - N) == prefetch_factor (modulo
483
491
non-atomicness of operations).
484
492
485
- _populate_queue calls semaphore.acquire. When we pull an item from the queue, we
486
- call semaphore.release (unless it's a StartupExceptionWrapper, because _populate_queue
493
+ worker calls semaphore.acquire. When we pull an item from the queue, we
494
+ call semaphore.release (unless it's a StartupExceptionWrapper, because worker
487
495
does not acquire sempahores in this case). All outstanding items are either being
488
- processed in _populate_queue , in the _q, or about to be returned by an in-flight next() call.
496
+ processed in worker , in the _q, or about to be returned by an in-flight next() call.
489
497
"""
490
498
491
499
def __init__ (
@@ -526,6 +534,7 @@ def __init__(
526
534
self ._stop_event ,
527
535
),
528
536
daemon = True ,
537
+ name = f"worker_thread(target={ self .worker .__name__ } )" ,
529
538
)
530
539
self ._thread .start ()
531
540
0 commit comments