Skip to content

Commit

Permalink
Fix concurrent_bounding_queue capacity on copy, move and swap operati…
Browse files Browse the repository at this point in the history
…ons (#1609)

Co-authored-by: Ilya Isaev <[email protected]>
  • Loading branch information
kboyarinov and isaevil authored Jan 21, 2025
1 parent 8dea30c commit 0d73df4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
14 changes: 9 additions & 5 deletions include/oneapi/tbb/concurrent_queue.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,6 @@ std::pair<bool, ticket_type> internal_try_pop_impl(void* dst, QueueRep& queue, A

// A high-performance thread-safe non-blocking concurrent queue.
// Multiple threads may each push and pop concurrently.
// Assignment construction is not allowed.
template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
class concurrent_queue {
using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
Expand Down Expand Up @@ -317,7 +316,6 @@ namespace d2 {
// A high-performance thread-safe blocking concurrent bounded queue.
// Supports boundedness and blocking semantics.
// Multiple threads may each push and pop concurrently.
// Assignment construction is not allowed.
template <typename T, typename Allocator = tbb::cache_aligned_allocator<T>>
class concurrent_bounded_queue {
using allocator_traits_type = tbb::detail::allocator_traits<Allocator>;
Expand Down Expand Up @@ -376,12 +374,14 @@ class concurrent_bounded_queue {
concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a ) :
concurrent_bounded_queue(a)
{
my_capacity = src.my_capacity;
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}

concurrent_bounded_queue( const concurrent_bounded_queue& src ) :
concurrent_bounded_queue(queue_allocator_traits::select_on_container_copy_construction(src.get_allocator()))
{
my_capacity = src.my_capacity;
my_queue_representation->assign(*src.my_queue_representation, my_allocator, copy_construct_item);
}

Expand Down Expand Up @@ -420,6 +420,7 @@ class concurrent_bounded_queue {
if (my_queue_representation != other.my_queue_representation) {
clear();
my_allocator = other.my_allocator;
my_capacity = other.my_capacity;
my_queue_representation->assign(*other.my_queue_representation, my_allocator, copy_construct_item);
}
return *this;
Expand All @@ -435,6 +436,7 @@ class concurrent_bounded_queue {
my_queue_representation->assign(*other.my_queue_representation, other.my_allocator, move_construct_item);
other.clear();
my_allocator = std::move(other.my_allocator);
my_capacity = other.my_capacity;
}
}
return *this;
Expand Down Expand Up @@ -547,8 +549,10 @@ class concurrent_bounded_queue {

private:
void internal_swap( concurrent_bounded_queue& src ) {
std::swap(my_queue_representation, src.my_queue_representation);
std::swap(my_monitors, src.my_monitors);
using std::swap;
swap(my_queue_representation, src.my_queue_representation);
swap(my_capacity, src.my_capacity);
swap(my_monitors, src.my_monitors);
}

static constexpr std::ptrdiff_t infinite_capacity = std::ptrdiff_t(~size_type(0) / 2);
Expand Down
44 changes: 43 additions & 1 deletion test/tbb/test_concurrent_queue.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright (c) 2005-2023 Intel Corporation
Copyright (c) 2005-2025 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -282,3 +282,45 @@ TEST_CASE("Test clear and dtor with TrackableItem") {
test_tracking_dtors_on_clear<oneapi::tbb::concurrent_queue<TrackableItem>>();
test_tracking_dtors_on_clear<oneapi::tbb::concurrent_bounded_queue<TrackableItem>>();
}

//! \brief \ref regression
TEST_CASE("test capacity on modifying operations") {
// Test that concurrent_bounded_queue capacity is preserved on copying, moving and swapping
using queue_type = oneapi::tbb::concurrent_bounded_queue<int>;
using capacity_type = typename queue_type::size_type;

queue_type q;
capacity_type desired_capacity = 64;

q.set_capacity(desired_capacity);
REQUIRE_MESSAGE(q.capacity() == desired_capacity, "Capacity is not set correctly");

queue_type q_copy(q);
REQUIRE_MESSAGE(q_copy.capacity() == desired_capacity, "Capacity is not preserved on copying");

queue_type q_move(std::move(q));
REQUIRE_MESSAGE(q_move.capacity() == desired_capacity, "Capacity is not preserved on moving");

queue_type different_capacity_q1;
different_capacity_q1.set_capacity(desired_capacity * 2);

different_capacity_q1 = q_move;
REQUIRE_MESSAGE(different_capacity_q1.capacity() == desired_capacity,
"Capacity is not preserved on copy assignment");

queue_type different_capacity_q2;
different_capacity_q2.set_capacity(desired_capacity * 2);

different_capacity_q2 = std::move(q_move);
REQUIRE_MESSAGE(different_capacity_q2.capacity() == desired_capacity,
"Capacity is not preserved on move assignment");

queue_type different_capacity_q3;
different_capacity_q3.set_capacity(desired_capacity * 2);

different_capacity_q3.swap(different_capacity_q2);
REQUIRE_MESSAGE(different_capacity_q3.capacity() == desired_capacity,
"Capacity is not preserved on swap");
REQUIRE_MESSAGE(different_capacity_q2.capacity() == desired_capacity * 2,
"Capacity is not preserved on swap");
}

0 comments on commit 0d73df4

Please sign in to comment.