86970

How can completed futures be automatically erased from std::vector

Question:

In the following exampl, mEventExecutors is a std::vector<std::future<void>>. I would like to be able to remove the futures from the vector, as they complete. Can this be done?

void RaiseEvent(EventID messageID) { mEventExecutors.push_back(std::move(std::async([=]{ auto eventObject = mEventListeners.find(messageID); if (eventObject != mEventListeners.end()) { for (auto listener : eventObject->second) { listener(); } } }) )); }

Answer1:

Don't use std::async is the easy solution in my opinion, and use std::thread instead.

You need to be careful though, your code currently has a lot of data races. Consider using another mutex or some other technique to prevent them.

std::thread{[=]() { // Task is running... auto eventObject = mEventListeners.find(messageID); if (eventObject != mEventListeners.end()) { for (auto listener : eventObject->second) { listener(); } } }.detach(); // detach thread so that it continues

Answer2:

The question itself has been answer by another, but it piqued my curiosity as to how one could implement a fully functional, thread-safe task manager in a minimum number of code lines.

I also wondered whether it would be possible to wait on the tasks as futures, or optionally provide a callback function.

Then of course this begged the question whether those futures could use the sexy continuation syntax of .then(xxx) rather than blocking the code.

Here is my attempt.

Much kudos to Christopher Kohlhoff, the author of boost::asio. By studying his awesome work, I learned the value of separating classes into:

<ul><li>handle - controls the lifetime of the object</li> <li>service - provides object logic, state shared amongst object impls, and manages the lifetimes of implementation objects should they outlive the handle (anything that relies on a callback usually does), and </li> <li>implementation provides per-object state.</li> </ul>

So here's an example of calling the code:

int main() { task_manager mgr; // an example of using async callbacks to indicate completion and error mgr.submit([] { emit("task 1 is doing something"); std::this_thread::sleep_for(1s); emit("task 1 done"); }, [](auto err) { if (not err) { emit("task 1 completed"); } else { emit("task 1 failed"); } }); // an example of returning a future (see later) auto f = mgr.submit([] { emit("task 2 doing something"); std::this_thread::sleep_for(1500ms); emit("task 2 is going to throw"); throw std::runtime_error("here is an error"); }, use_future); // an example of returning a future and then immediately using its continuation. // note that the continuation happens on the task_manager's thread pool mgr.submit([] { emit("task 3 doing something"); std::this_thread::sleep_for(500ms); emit("task 3 is done"); }, use_future) .then([](auto f) { try { f.get(); } catch(std::exception const& e) { emit("task 3 threw an exception: ", e.what()); } }); // block on the future of the second example try { f.get(); } catch (std::exception &e) { emit("task 2 threw: ", e.what()); } }

Which would result in the following output:

task 1 is doing something task 2 doing something task 3 doing something task 3 is done task 1 done task 1 completed task 2 is going to throw task 2 threw: here is an error

And here's the complete code (tested on apple clang which is more promiscuous than gcc, so if i've missed a this-> in a lambda, my apologies):

#define BOOST_THREAD_PROVIDES_FUTURE 1 #define BOOST_THREAD_PROVIDES_FUTURE_CONTINUATION 1 #define BOOST_THREAD_PROVIDES_EXECUTORS 1 /* written by Richard Hodges 2017 * You're free to use the code, but please give credit where it's due :) */ #include <boost/thread/future.hpp> #include <boost/thread/executors/basic_thread_pool.hpp> #include <thread> #include <utility> #include <unordered_map> #include <stdexcept> #include <condition_variable> // I made a task an object because I thought I might want to store state in it. // it turns out that this is not strictly necessary struct task { }; /* * This is the implementation data for one task_manager */ struct task_manager_impl { using mutex_type = std::mutex; using lock_type = std::unique_lock<mutex_type>; auto get_lock() -> lock_type { return lock_type(mutex_); } auto add_task(lock_type const &lock, std::unique_ptr<task> t) { auto id = t.get(); task_map_.emplace(id, std::move(t)); } auto remove_task(lock_type lock, task *task_id) { task_map_.erase(task_id); if (task_map_.empty()) { lock.unlock(); no_more_tasks_.notify_all(); } } auto wait(lock_type lock) { no_more_tasks_.wait(lock, [this]() { return task_map_.empty(); }); } // for this example I have chosen to express errors as exceptions using error_type = std::exception_ptr; mutex_type mutex_; std::condition_variable no_more_tasks_; std::unordered_map<task *, std::unique_ptr<task>> task_map_; }; /* * This stuff is the protocol to figure out whether to return a future * or just invoke a callback. * Total respect to Christopher Kohlhoff of asio fame for figuring this out * I merely step in his footsteps here, with some simplifications because of c++11 */ struct use_future_t { }; constexpr auto use_future = use_future_t(); template<class Handler> struct make_async_handler { auto wrap(Handler handler) { return handler; } struct result_type { auto get() -> void {} }; struct result_type result; }; template<> struct make_async_handler<const use_future_t &> { struct shared_state_type { boost::promise<void> promise; }; make_async_handler() { } template<class Handler> auto wrap(Handler &&) { return [shared_state = this->shared_state](auto error) { // boost promises deal in terms of boost::exception_ptr so we need to marshal. // this is a small price to pay for the extra utility of boost::promise over // std::promise if (error) { try { std::rethrow_exception(error); } catch (...) { shared_state->promise.set_exception(boost::current_exception()); } } else { shared_state->promise.set_value(); } }; } struct result_type { auto get() -> boost::future<void> { return shared_state->promise.get_future(); } std::shared_ptr<shared_state_type> shared_state; }; std::shared_ptr<shared_state_type> shared_state = std::make_shared<shared_state_type>(); result_type result{shared_state}; }; /* * Provides the logic of a task manager. Also notice that it maintains a boost::basic_thread_pool * The destructor of a basic_thread_pool will not complete until all tasks are complete. So our * program will not crash horribly at exit time. */ struct task_manager_service { /* * through this function, the service has full control over how it is created and destroyed. */ static auto use() -> task_manager_service& { static task_manager_service me {}; return me; } using impl_class = task_manager_impl; struct deleter { void operator()(impl_class *p) { service_->destroy(p); } task_manager_service *service_; }; /* * defining impl_type in terms of a unique_ptr ensures that the handle will be * moveable but not copyable. * Had we used a shared_ptr, the handle would be copyable with shared semantics. * That can be useful too. */ using impl_type = std::unique_ptr<impl_class, deleter>; auto construct() -> impl_type { return impl_type(new impl_class(), deleter {this}); } auto destroy(impl_class *impl) -> void { wait(*impl); delete impl; } template<class Job, class Handler> auto submit(impl_class &impl, Job &&job, Handler &&handler) { auto make_handler = make_async_handler<Handler>(); auto async_handler = make_handler.wrap(std::forward<Handler>(handler)); auto my_task = std::make_unique<task>(); auto task_ptr = my_task.get(); auto task_done = [ this, task_id = task_ptr, &impl, async_handler ](auto error) { async_handler(error); this->remove_task(impl, task_id); }; auto lock = impl.get_lock(); impl.add_task(lock, std::move(my_task)); launch(impl, task_ptr, std::forward<Job>(job), task_done); return make_handler.result.get(); }; template<class F, class Handler> auto launch(impl_class &, task *task_ptr, F &&f, Handler &&handler) -> void { this->thread_pool_.submit([f, handler] { auto error = std::exception_ptr(); try { f(); } catch (...) { error = std::current_exception(); } handler(error); }); } auto wait(impl_class &impl) -> void { impl.wait(impl.get_lock()); } auto remove_task(impl_class &impl, task *task_id) -> void { impl.remove_task(impl.get_lock(), task_id); } boost::basic_thread_pool thread_pool_{std::thread::hardware_concurrency()}; }; /* * The task manage handle. Holds the task_manager implementation plus provides access to the * owning task_manager_service. In this case, the service is a global static object. In an io loop environment * for example, asio, the service would be owned by the io loop. */ struct task_manager { using service_type = task_manager_service; using impl_type = service_type::impl_type; using impl_class = decltype(*std::declval<impl_type>()); task_manager() : service_(std::addressof(service_type::use())) , impl_(get_service().construct()) {} template<class Job, class Handler> auto submit(Job &&job, Handler &&handler) { return get_service().submit(get_impl(), std::forward<Job>(job), std::forward<Handler>(handler)); } auto get_service() -> service_type & { return *service_; } auto get_impl() -> impl_class & { return *impl_; } private: service_type* service_; impl_type impl_; }; /* * helpful thread-safe emitter */ std::mutex thing_mutex; template<class...Things> void emit(Things &&...things) { auto lock = std::unique_lock<std::mutex>(thing_mutex); using expand = int[]; void(expand{0, ((std::cout << things), 0)... }); std::cout << std::endl; } using namespace std::literals; int main() { task_manager mgr; // an example of using async callbacks to indicate completion and error mgr.submit([] { emit("task 1 is doing something"); std::this_thread::sleep_for(1s); emit("task 1 done"); }, [](auto err) { if (not err) { emit("task 1 completed"); } else { emit("task 1 failed"); } }); // an example of returning a future (see later) auto f = mgr.submit([] { emit("task 2 doing something"); std::this_thread::sleep_for(1500ms); emit("task 2 is going to throw"); throw std::runtime_error("here is an error"); }, use_future); // an example of returning a future and then immediately using its continuation. // note that the continuation happens on the task_manager's thread pool mgr.submit([] { emit("task 3 doing something"); std::this_thread::sleep_for(500ms); emit("task 3 is done"); }, use_future) .then([](auto f) { try { f.get(); } catch (std::exception const &e) { emit("task 3 threw an exception: ", e.what()); } }); // block on the future of the second example try { f.get(); } catch (std::exception &e) { emit("task 2 threw: ", e.what()); } }

Recommend

  • How to notify after all download completed from Android download manager
  • How to send message from BroadcastReceiver to activity or fragment
  • Manage update in Application Exit
  • How to enable the background data settings in android?
  • How to use Google Drive Client Library for Java with my homegrown OAUTH 2 Framework for web applicat
  • How can I migrate my WP8 application to universal when it uses a local linq to sql db?
  • Eclipse code fomatting
  • Thread Synchronization with IntentService
  • how to reopen a class in gems
  • Spring Integration debounce/deduplicate
  • 'Resource exhausted' memory error when trying to train a Keras model
  • Cryptic error when trying to run POW
  • Passing parameter through “window.location.href”
  • “A GKScore must specify a leaderboard.”
  • Jenkins: could not create Android emulator failed to parse AVD config file
  • Unable to run tests with JUnit5 Console Launcher
  • Let a function return any type in C++ class
  • why calling cd shell command through system() or execvp() from a child process won't work?
  • Two Tables Serving as one Model in Rails
  • 550 Access denied - Invalid HELO name
  • pillow imaging ImportError
  • C# program and C++ DLL compiled for 32-bit system crash on 64-bit system
  • Check for zero lines output from command over SSH
  • Problems installing Yesod for Haskell
  • Unable to install Git-core+svn by MacPorts
  • Saving Changes After In-App Purchase Has Been Purchased
  • VSO Build — Response status code does not indicate success: 404 (Not Found)
  • ilmerge with a PFX file
  • QLineEdit password safety
  • Why winpcap requires both .lib and .dll to run?
  • Akka Routing: Reply's send to router ends up as dead letters
  • Hits per day in Google Big Query
  • How get height of the a view with gone visibility and height defined as wrap_content in xml?
  • FormattedException instead of throw new Exception(string.Format(…)) in .NET
  • Linking SubReports Without LinkChild/LinkMaster
  • unknown Exception android
  • XCode 8, some methods disappeared ? ex: layoutAttributesClass() -> AnyClass
  • Checking variable from a different class in C#
  • Sorting a 2D array using the second column C++
  • java string with new operator and a literal