During this project I implemented a concurrent queue, which can be simultaneously accessed from both sides. I.e you can do push and pop at the same time and there are not lock with the same mutex.
I implemented Michael-Scott Algorithm, which can be found here. The main challenge was memory management, which was not described in the article.
Interface consist of several methods:
- getPopMutex
- lock
- unlock
- push
- pop
- tryPop
- empty
- size
And implementations:
- ConcurrentQueueSimple<T> (push and pop share the same mutex)
- ConcurrentQueueExtended<T> (What this project is all about)
As push and pop in ConcurrentQueueExtended affect different mutexes, you cannot simultaneously wait for both of them. Hence, there is no wait() method.
In order to build the project and run tests you should have c++ compiler, python interpreter, cmake and gtest installed on your system.
Make Debug or Release directory
mkdir Debug
cd Debug
Let cmake generate make file and run it
cmake -DCMAKE_BUILD_TYPE=Debug ..
make
Run some of the tests
./tests
-
tests - includes simple tests with integer queue, such as:
initialization
sequntial push
sequential pop
concurrent pop and push
MPMC pattern
the last 2 tests are the most important:
- IncreaseToThousand
-
There are numbers in increaseToThousandInitial withing small range ([0; 10]). We push them in the queue and perform the following operations in multiple threads:
get element from queue and increase in by 1
if it is >= 1000, push it in result queue
else push it back in the initial queue
We are using std::thread::hardware_concurrency() number of threads
- AddNumbers
-
Implementing MPMC pattern to compare implementations of concurrent queue.
There are producers: they push positive numbers in the queue. There are consumers: they pop 2 elements from the queue and push their sum back. The goal of the following process is to compute sum of all the numbers produced. Consumers and producers work in paralel, so -1 is considered as a death pill. Because we are using queue, not deque, I suggest the following algorithm for consumers. We'll try to pop 3 elements from the queue.
Possible situations:
Nothing => we should wait
Death pill => consumer should push it back and die
Number => consumer should push it back
Number and death pill => consumer should push number and death pill back and die
2 numbers => consumer should add them, push sum back
3 numbers => same as in 5)
2 Numbers and death pill => add numbers, push sum back, push death pill back
We will be using half of available threads as producers and other half as consumers.
You can run tests with 2 command line arguments:
Number of samples in IncreaseToThousand (default: 100)
Number of samples in AddNumbers (default: 100)
-
stringTests - includes similar tests with string queue, such as:
initialization
sequential push
concurrent push
MPMC pattern
The last test is the most important:
- ConcatenationMPMC
The idea is pretty much the same as in AddNumbers, but now we are operating with strings
You can run stringTests with 1 command line argument:
Number of samples in ConcatenationMPMC (default: 100)
george@george-PC:~/Documents/cpp/cpp_CourseProject/Release$ ./stringTests 100000
Testing:
Samples num: 100000
[==========] Running 8 tests from 2 test cases.
[----------] Global test environment set-up.
[----------] 4 tests from ConcurrentQueueInstantiation/ConcurrentQueueTest/0, where TypeParam = ConcurrentQueueSimple<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.Init
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.Init (0 ms)
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.ConcatenationSequential
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.ConcatenationSequential (519 ms)
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.ConcatenationMultiThreadProducer
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.ConcatenationMultiThreadProducer (420 ms)
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.ConcatenationMPMC
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/0.ConcatenationMPMC (2350 ms)
[----------] 4 tests from ConcurrentQueueInstantiation/ConcurrentQueueTest/0 (3292 ms total)
[----------] 4 tests from ConcurrentQueueInstantiation/ConcurrentQueueTest/1, where TypeParam = ConcurrentQueueExtended<std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> > >
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.Init
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.Init (0 ms)
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.ConcatenationSequential
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.ConcatenationSequential (446 ms)
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.ConcatenationMultiThreadProducer
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.ConcatenationMultiThreadProducer (403 ms)
[ RUN ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.ConcatenationMPMC
[ OK ] ConcurrentQueueInstantiation/ConcurrentQueueTest/1.ConcatenationMPMC (1183 ms)
[----------] 4 tests from ConcurrentQueueInstantiation/ConcurrentQueueTest/1 (2035 ms total)
[----------] Global test environment tear-down
[==========] 8 tests from 2 test cases ran. (5327 ms total)
[ PASSED ] 8 tests.
Although valgrind detects possible data race (which can bee seen here), I was unable to reproduce it. Hence, I created a simple python script that can help. It simply runs specified test program infinite number of times.
Command line arguments:
Folder name (Debug or Release)
Command line arguments for the program called
Usage:
george@george-PC:~/Documents/cpp/cpp_CourseProject/Release$ python3 testErrors.py Debug tests 10 10