84959

Threadpool / Queueing system in C++

Question:

I have a situation in which I need to do some heavy computation. I found out that subdividing my data and then merging it back together is the fastest (as the size increases, time increases faster, so splitting is logical).

It should be able to give a data size to the application, let's say for example one million double values.

What I have in place now, is sending created data based on this size off to some function, returning it after computation, and then looping over the return to unload this data into the main vector.

I want to send parts of 200, with one "last" part. For example, giving size = 1000005 will perform this function 5000 times initially, and then the last one with data of size 5.

int size = 1000000; int times = size / 200; // 5000 int leftover = size % 200; // 0, this not performed QVector<double> x(size); QVector<double> y(size); x = createData(size); x = createData(size); for (int i = 0; i < times; i++) { holder = createData(200); QVector<double> tempx = x.mid(i*200, 200); QVector<double> tempy = y.mid(i*200, 200); holder = myfunction(tempx, tempy, 200); // let it now just return `tempy` for (int j = 0; j < 200; j++) { y[i*200 + j] = holder[j]; } } // leftover function here, really similar to this part before. // plotting function here

At the end, x will remain as initialized, y will have had the computation upon.

Since these code parts can run apart from each other and speed is crucial, I would like to make use of several cores.

The following further characterizes the situation:

<ul><li>These function calls are independent of each other, only in the end when the vectors are complete do I want to plot the result. </li> <li>The time of completion for each call will be varying a lot.</li> <li>The amount of times should be variable.</li> </ul>

I read something about that maximum threads are advised to be the amount of cores (at least as a starting point), since using too many threads could slow the process down. Considering the situation a queueing system / threadpool would seem to make sense as not to lose time while one thread has some easy jobs and the others are slowing everything down by harder jobs.

While it seems easy to print some messages using some (usually 2) threads in some dozen of tutorials, could anyone provide more detailed help on how to return vectors and unload these thread safely into a main function, and how to create a threadpool so time would not be wasted?

Using Ubuntu 13.04, Qt and C++11x, though it should not matter.

Answer1:

First of all, write a tread pool is hard. If you really want to learn how to write one, the book C++ Concurrency in Action written by Antony Williams teach you how to accomplish that.

However, your case seems to be a situation where a simple parallel_for will fit perfectly. So I suggest using <a href="https://www.threadingbuildingblocks.org/" rel="nofollow">Intel Threading Building Blocks library</a> . The advantage of that library is that it has a very good thread pool and works quite nicely with C++11 features.

Example code:

#include "tbb/task_scheduler_init.h" #include "tbb/blocked_range.h" #include "tbb/parallel_for.h" #include "tbb/tbb_thread.h" #include <vector> int main() { tbb::task_scheduler_init init(tbb::tbb_thread::hardware_concurrency()); std::vector<double> a(1000); std::vector<double> c(1000); std::vector<double> b(1000); std::fill(b.begin(), b.end(), 1); std::fill(c.begin(), c.end(), 1); auto f = [&](const tbb::blocked_range<size_t>& r) { for(size_t j=r.begin(); j!=r.end(); ++j) a[j] = b[j] + c[j]; }; size_t hint_number_iterations_per_thread = 100; tbb::parallel_for(tbb::blocked_range<size_t>(0, 1000, hint_number_iterations_per_thread), f); return 0; }

Done! Intel TBB has a very good thread pool that will try to ajust the workload of each thread. As long as hint_number_iterations_per_thread is not a crazy number, it will be very close to the optimal solution

By the way: intel TBB is a open source library that work with the majority of compilers!

Answer2:

You don't need to create anything. If you're using Qt, your problem has already been solved. You can derive a class from QRunnable and then pass it on to QThreadPool to be executed.

You can instruct QThreadPool on how many threads should run concurrently (any extras just wait in a queue till a slot opens up) but this shouldn't be necessary since QThreadPool sets limits based on your architecture that are usually good enough.

<a href="http://qt-project.org/doc/qt-5.0/qtcore/qthreadpool.html" rel="nofollow">QThreadPool</a>

<a href="http://qt-project.org/doc/qt-5.0/qtcore/qrunnable.html" rel="nofollow">QRunnable</a>

Answer3:

Even simpler than creating a QThreadPool and extending QRunabble, you can use the <a href="http://qt-project.org/doc/qt-5/qtconcurrent.html" rel="nofollow">QtConcurrent</a> library. Specifically use the <a href="http://qt-project.org/doc/qt-5/qtconcurrent.html#mapped-2" rel="nofollow">QtConcurrent::mapped</a> function, which takes a begin iterator and an end iterator, and a function (which can be a lambda), and internally handles the creation of, and execution on, a thread pool for you.

There are two variations: "mapped" returns a QFuture to the results but does not block the current thread, while "blockingMapped" directly returns a list of the results.

To square a large vector of integers, you could do the following:

std::vector<int> myInts = .... QVector<int> result = QtConcurrent::blockingMapped(myInts.begin(), myInts.end(), [](int x) { return x*x}; });

Recommend

  • Avoiding concurrency of Stored Procedures
  • Python imports: Will changing a variable in “child” change variable in “parent”/other children?
  • Postgres Full Text Search on Array Column
  • Can I use a Click to Print button in an email newsletter?
  • Nunmpy sum() function + comprehensions: int32 or in64?
  • NLog to PostgreSQL connection
  • Fixed frame rate using android Camera2 api
  • Eclipse code fomatting
  • Thread Synchronization with IntentService
  • Spring Integration debounce/deduplicate
  • How to model a mixture of finite components from different parametric families with JAGS?
  • In Akka, is ActorContext thread safe?
  • c++ search a vector for element first seen position
  • “A GKScore must specify a leaderboard.”
  • Insertion large number of Entities into SQL Server 2012 [duplicate]
  • Is there any purpose for h2-h6 headings in HTML5?
  • Regex for Specific Tag
  • Time out Error in send mail
  • How can we prepend rows to a react native list-view?
  • ApplePay PKPaymentAuthorizationViewController always shows processing
  • What's the purpose of QString?
  • Jackson Parser: ignore deserializing for type mismatch
  • iOS: Detect app start via notification press
  • Record samples being played with OpenAL
  • formatting the colorbar ticklabels with SymLogNorm normalization in matplotlib
  • Spray.io: When (not) to use non-blocking route handling?
  • angularjs unit test when to use $rootScope.$new()
  • Weird JavaScript statement, what does it mean?
  • How do you troubleshoot character encoding problems?
  • php design question - will a Helper help here?
  • How to format a variable of double type
  • Transpose CSV data with awk (pivot transformation)
  • AngularJs get employee from factory
  • Change div Background jquery
  • How does Linux kernel interrupt the application?
  • IndexOutOfRangeException on multidimensional array despite using GetLength check
  • Authorize attributes not working in MVC 4
  • Busy indicator not showing up in wpf window [duplicate]
  • How to Embed XSL into XML
  • Why do underscore prefixed variables exist?