@@ -173,7 +173,7 @@ class micro_queue {
173
173
tail_counter.fetch_add (queue_rep_type::n_queue);
174
174
}
175
175
176
- bool pop ( void * dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator) {
176
+ bool pop ( void * dst, ticket_type k, queue_rep_type& base, queue_allocator_type& allocator ) {
177
177
k &= -queue_rep_type::n_queue;
178
178
spin_wait_until_eq (head_counter, k);
179
179
d1::call_itt_notify (d1::acquired, &head_counter);
@@ -189,7 +189,7 @@ class micro_queue {
189
189
k + queue_rep_type::n_queue, index == items_per_page - 1 ? p : nullptr );
190
190
if (p->mask .load (std::memory_order_relaxed) & (std::uintptr_t (1 ) << index )) {
191
191
success = true ;
192
- assign_and_destroy_item ( dst, *p, index );
192
+ assign_and_destroy_item (dst, *p, index );
193
193
} else {
194
194
--base.n_invalid_entries ;
195
195
}
@@ -276,36 +276,38 @@ class micro_queue {
276
276
}
277
277
}
278
278
279
- padded_page* get_tail_page () {
280
- return tail_page.load (std::memory_order_relaxed);
281
- }
282
-
283
279
padded_page* get_head_page () {
284
280
return head_page.load (std::memory_order_relaxed);
285
281
}
286
282
287
- void set_tail_page ( padded_page* pg ) {
288
- tail_page.store (pg, std::memory_order_relaxed);
289
- }
290
-
291
- void clear (queue_allocator_type& allocator ) {
292
- padded_page* curr_page = head_page.load (std::memory_order_relaxed);
293
- std::size_t index = head_counter.load (std::memory_order_relaxed);
283
+ void clear (queue_allocator_type& allocator, padded_page* new_head = nullptr , padded_page* new_tail = nullptr ) {
284
+ padded_page* curr_page = get_head_page ();
285
+ size_type index = (head_counter.load (std::memory_order_relaxed) / queue_rep_type::n_queue) % items_per_page;
294
286
page_allocator_type page_allocator (allocator);
295
287
296
- while (curr_page) {
297
- for (; index != items_per_page - 1 ; ++index ) {
298
- curr_page->operator [](index ).~value_type ();
288
+ while (curr_page && is_valid_page (curr_page)) {
289
+ while (index != items_per_page) {
290
+ if (curr_page->mask .load (std::memory_order_relaxed) & (std::uintptr_t (1 ) << index )) {
291
+ page_allocator_traits::destroy (page_allocator, &curr_page->operator [](index ));
292
+ }
293
+ ++index ;
299
294
}
300
- padded_page* next_page = curr_page->next ;
301
- page_allocator_traits::destroy (page_allocator, curr_page);
302
- page_allocator_traits::deallocate (page_allocator, curr_page, 1 );
303
- curr_page = next_page;
295
+
296
+ index = 0 ;
297
+ padded_page* next_page = curr_page->next ;
298
+ page_allocator_traits::destroy (page_allocator, curr_page);
299
+ page_allocator_traits::deallocate (page_allocator, curr_page, 1 );
300
+ curr_page = next_page;
304
301
}
302
+ head_counter.store (0 , std::memory_order_relaxed);
303
+ tail_counter.store (0 , std::memory_order_relaxed);
304
+ head_page.store (new_head, std::memory_order_relaxed);
305
+ tail_page.store (new_tail, std::memory_order_relaxed);
306
+ }
305
307
308
+ void clear_and_invalidate (queue_allocator_type& allocator) {
306
309
padded_page* invalid_page = reinterpret_cast <padded_page*>(std::uintptr_t (1 ));
307
- head_page.store (invalid_page, std::memory_order_relaxed);
308
- tail_page.store (invalid_page, std::memory_order_relaxed);
310
+ clear (allocator, invalid_page, invalid_page);
309
311
}
310
312
311
313
private:
@@ -430,18 +432,12 @@ struct concurrent_queue_rep {
430
432
concurrent_queue_rep& operator =( const concurrent_queue_rep& ) = delete ;
431
433
432
434
void clear ( queue_allocator_type& alloc ) {
433
- page_allocator_type page_allocator (alloc);
434
- for (size_type i = 0 ; i < n_queue; ++i) {
435
- padded_page* tail_page = array[i].get_tail_page ();
436
- if ( is_valid_page (tail_page) ) {
437
- __TBB_ASSERT (array[i].get_head_page () == tail_page, " at most one page should remain" );
438
- page_allocator_traits::destroy (page_allocator, static_cast <padded_page*>(tail_page));
439
- page_allocator_traits::deallocate (page_allocator, static_cast <padded_page*>(tail_page), 1 );
440
- array[i].set_tail_page (nullptr );
441
- } else {
442
- __TBB_ASSERT (!is_valid_page (array[i].get_head_page ()), " head page pointer corrupt?" );
443
- }
435
+ for (size_type index = 0 ; index < n_queue; ++index ) {
436
+ array[index ].clear (alloc);
444
437
}
438
+ head_counter.store (0 , std::memory_order_relaxed);
439
+ tail_counter.store (0 , std::memory_order_relaxed);
440
+ n_invalid_entries.store (0 , std::memory_order_relaxed);
445
441
}
446
442
447
443
void assign ( const concurrent_queue_rep& src, queue_allocator_type& alloc, item_constructor_type construct_item ) {
@@ -457,7 +453,7 @@ struct concurrent_queue_rep {
457
453
}
458
454
}).on_exception ( [&] {
459
455
for (size_type i = 0 ; i < queue_idx + 1 ; ++i) {
460
- array[i].clear (alloc);
456
+ array[i].clear_and_invalidate (alloc);
461
457
}
462
458
head_counter.store (0 , std::memory_order_relaxed);
463
459
tail_counter.store (0 , std::memory_order_relaxed);
0 commit comments