// // experimental/impl/co_spawn.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP #define ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include <exception> #include <functional> #include <memory> #include <new> #include <tuple> #include <utility> #include "asio/async_result.hpp" #include "asio/detail/thread_context.hpp" #include "asio/detail/thread_info_base.hpp" #include "asio/detail/type_traits.hpp" #include "asio/dispatch.hpp" #include "asio/post.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { // Promise object for coroutine at top of thread-of-execution "stack". template <typename Executor> class awaiter { public: struct deleter { void operator()(awaiter* a) { if (a) a->release(); } }; typedef std::unique_ptr<awaiter, deleter> ptr; typedef Executor executor_type; ~awaiter() { if (has_executor_) static_cast<Executor*>(static_cast<void*>(executor_))->~Executor(); } void set_executor(const Executor& ex) { new (&executor_) Executor(ex); has_executor_ = true; } executor_type get_executor() const noexcept { return *static_cast<const Executor*>(static_cast<const void*>(executor_)); } awaiter* get_return_object() { return this; } auto initial_suspend() { return std::experimental::suspend_always(); } auto final_suspend() { return std::experimental::suspend_always(); } void return_void() { } awaiter* add_ref() { ++ref_count_; return this; } void release() { if (--ref_count_ == 0) coroutine_handle<awaiter>::from_promise(*this).destroy(); } void unhandled_exception() { pending_exception_ = std::current_exception(); } void rethrow_unhandled_exception() { if (pending_exception_) { std::exception_ptr ex = std::exchange(pending_exception_, nullptr); std::rethrow_exception(ex); } } private: std::size_t ref_count_ = 0; std::exception_ptr pending_exception_ = nullptr; alignas(Executor) unsigned char executor_[sizeof(Executor)]; bool has_executor_ = false; }; // Base promise for coroutines further down the thread-of-execution "stack". template <typename Executor> class awaitee_base { public: #if !defined(ASIO_DISABLE_AWAITEE_RECYCLING) void* operator new(std::size_t size) { return asio::detail::thread_info_base::allocate( asio::detail::thread_info_base::awaitee_tag(), asio::detail::thread_context::thread_call_stack::top(), size); } void operator delete(void* pointer, std::size_t size) { asio::detail::thread_info_base::deallocate( asio::detail::thread_info_base::awaitee_tag(), asio::detail::thread_context::thread_call_stack::top(), pointer, size); } #endif // !defined(ASIO_DISABLE_AWAITEE_RECYCLING) auto initial_suspend() { return std::experimental::suspend_never(); } struct final_suspender { awaitee_base* this_; bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle<void>) { this_->wake_caller(); } void await_resume() const noexcept { } }; auto final_suspend() { return final_suspender{this}; } void set_except(std::exception_ptr e) { pending_exception_ = e; } void unhandled_exception() { set_except(std::current_exception()); } void rethrow_exception() { if (pending_exception_) { std::exception_ptr ex = std::exchange(pending_exception_, nullptr); std::rethrow_exception(ex); } } awaiter<Executor>* top() { return awaiter_; } coroutine_handle<void> caller() { return caller_; } bool ready() const { return ready_; } void wake_caller() { if (caller_) caller_.resume(); else ready_ = true; } class awaitable_executor { public: explicit awaitable_executor(awaitee_base* a) : this_(a) { } bool await_ready() const noexcept { return this_->awaiter_ != nullptr; } template <typename U, typename Ex> void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept { this_->resume_on_attach_ = h; } Executor await_resume() { return this_->awaiter_->get_executor(); } private: awaitee_base* this_; }; awaitable_executor await_transform(this_coro::executor_t) noexcept { return awaitable_executor(this); } class awaitable_token { public: explicit awaitable_token(awaitee_base* a) : this_(a) { } bool await_ready() const noexcept { return this_->awaiter_ != nullptr; } template <typename U, typename Ex> void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept { this_->resume_on_attach_ = h; } await_token<Executor> await_resume() { return await_token<Executor>(this_->awaiter_); } private: awaitee_base* this_; }; awaitable_token await_transform(this_coro::token_t) noexcept { return awaitable_token(this); } template <typename T> awaitable<T, Executor> await_transform(awaitable<T, Executor>& t) const { return std::move(t); } template <typename T> awaitable<T, Executor> await_transform(awaitable<T, Executor>&& t) const { return std::move(t); } std::experimental::suspend_always await_transform( std::experimental::suspend_always) const { return std::experimental::suspend_always(); } void attach_caller(coroutine_handle<awaiter<Executor>> h) { this->caller_ = h; this->attach_callees(&h.promise()); } template <typename U> void attach_caller(coroutine_handle<awaitee<U, Executor>> h) { this->caller_ = h; if (h.promise().awaiter_) this->attach_callees(h.promise().awaiter_); else h.promise().unattached_callee_ = this; } void attach_callees(awaiter<Executor>* a) { for (awaitee_base* curr = this; curr != nullptr; curr = std::exchange(curr->unattached_callee_, nullptr)) { curr->awaiter_ = a; if (curr->resume_on_attach_) return std::exchange(curr->resume_on_attach_, nullptr).resume(); } } protected: awaiter<Executor>* awaiter_ = nullptr; coroutine_handle<void> caller_ = nullptr; awaitee_base<Executor>* unattached_callee_ = nullptr; std::exception_ptr pending_exception_ = nullptr; coroutine_handle<void> resume_on_attach_ = nullptr; bool ready_ = false; }; // Promise object for coroutines further down the thread-of-execution "stack". template <typename T, typename Executor> class awaitee : public awaitee_base<Executor> { public: awaitee() { } awaitee(awaitee&& other) noexcept : awaitee_base<Executor>(std::move(other)) { } ~awaitee() { if (has_result_) static_cast<T*>(static_cast<void*>(result_))->~T(); } awaitable<T, Executor> get_return_object() { return awaitable<T, Executor>(this); }; template <typename U> void return_value(U&& u) { new (&result_) T(std::forward<U>(u)); has_result_ = true; } T get() { this->caller_ = nullptr; this->rethrow_exception(); return std::move(*static_cast<T*>(static_cast<void*>(result_))); } private: alignas(T) unsigned char result_[sizeof(T)]; bool has_result_ = false; }; // Promise object for coroutines further down the thread-of-execution "stack". template <typename Executor> class awaitee<void, Executor> : public awaitee_base<Executor> { public: awaitable<void, Executor> get_return_object() { return awaitable<void, Executor>(this); }; void return_void() { } void get() { this->caller_ = nullptr; this->rethrow_exception(); } }; template <typename Executor> class awaiter_task { public: typedef Executor executor_type; awaiter_task(awaiter<Executor>* a) : awaiter_(a->add_ref()) { } awaiter_task(awaiter_task&& other) noexcept : awaiter_(std::exchange(other.awaiter_, nullptr)) { } ~awaiter_task() { if (awaiter_) { // Coroutine "stack unwinding" must be performed through the executor. executor_type ex(awaiter_->get_executor()); (post)(ex, [a = std::move(awaiter_)]() mutable { typename awaiter<Executor>::ptr(std::move(a)); }); } } executor_type get_executor() const noexcept { return awaiter_->get_executor(); } protected: typename awaiter<Executor>::ptr awaiter_; }; template <typename Executor> class co_spawn_handler : public awaiter_task<Executor> { public: using awaiter_task<Executor>::awaiter_task; void operator()() { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); coroutine_handle<awaiter<Executor>>::from_promise(*ptr.get()).resume(); } }; template <typename Executor, typename T> class await_handler_base : public awaiter_task<Executor> { public: typedef awaitable<T, Executor> awaitable_type; await_handler_base(await_token<Executor> token) : awaiter_task<Executor>(token.awaiter_), awaitee_(nullptr) { } await_handler_base(await_handler_base&& other) noexcept : awaiter_task<Executor>(std::move(other)), awaitee_(std::exchange(other.awaitee_, nullptr)) { } void attach_awaitee(const awaitable<T, Executor>& a) { awaitee_ = a.awaitee_; } protected: awaitee<T, Executor>* awaitee_; }; template <typename, typename...> class await_handler; template <typename Executor> class await_handler<Executor, void> : public await_handler_base<Executor, void> { public: using await_handler_base<Executor, void>::await_handler_base; void operator()() { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); this->awaitee_->return_void(); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor> class await_handler<Executor, asio::error_code> : public await_handler_base<Executor, void> { public: typedef void return_type; using await_handler_base<Executor, void>::await_handler_base; void operator()(const asio::error_code& ec) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); if (ec) { this->awaitee_->set_except( std::make_exception_ptr(asio::system_error(ec))); } else this->awaitee_->return_void(); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor> class await_handler<Executor, std::exception_ptr> : public await_handler_base<Executor, void> { public: using await_handler_base<Executor, void>::await_handler_base; void operator()(std::exception_ptr ex) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); if (ex) this->awaitee_->set_except(ex); else this->awaitee_->return_void(); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor, typename T> class await_handler<Executor, T> : public await_handler_base<Executor, T> { public: using await_handler_base<Executor, T>::await_handler_base; template <typename Arg> void operator()(Arg&& arg) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); this->awaitee_->return_value(std::forward<Arg>(arg)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor, typename T> class await_handler<Executor, asio::error_code, T> : public await_handler_base<Executor, T> { public: using await_handler_base<Executor, T>::await_handler_base; template <typename Arg> void operator()(const asio::error_code& ec, Arg&& arg) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); if (ec) { this->awaitee_->set_except( std::make_exception_ptr(asio::system_error(ec))); } else this->awaitee_->return_value(std::forward<Arg>(arg)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor, typename T> class await_handler<Executor, std::exception_ptr, T> : public await_handler_base<Executor, T> { public: using await_handler_base<Executor, T>::await_handler_base; template <typename Arg> void operator()(std::exception_ptr ex, Arg&& arg) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); if (ex) this->awaitee_->set_except(ex); else this->awaitee_->return_value(std::forward<Arg>(arg)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor, typename... Ts> class await_handler : public await_handler_base<Executor, std::tuple<Ts...>> { public: using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base; template <typename... Args> void operator()(Args&&... args) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); this->awaitee_->return_value( std::forward_as_tuple(std::forward<Args>(args)...)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor, typename... Ts> class await_handler<Executor, asio::error_code, Ts...> : public await_handler_base<Executor, std::tuple<Ts...>> { public: using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base; template <typename... Args> void operator()(const asio::error_code& ec, Args&&... args) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); if (ec) { this->awaitee_->set_except( std::make_exception_ptr(asio::system_error(ec))); } else { this->awaitee_->return_value( std::forward_as_tuple(std::forward<Args>(args)...)); } this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename Executor, typename... Ts> class await_handler<Executor, std::exception_ptr, Ts...> : public await_handler_base<Executor, std::tuple<Ts...>> { public: using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base; template <typename... Args> void operator()(std::exception_ptr ex, Args&&... args) { typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); if (ex) this->awaitee_->set_except(ex); else { this->awaitee_->return_value( std::forward_as_tuple(std::forward<Args>(args)...)); } this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template <typename T> struct awaitable_signature; template <typename T, typename Executor> struct awaitable_signature<awaitable<T, Executor>> { typedef void type(std::exception_ptr, T); }; template <typename Executor> struct awaitable_signature<awaitable<void, Executor>> { typedef void type(std::exception_ptr); }; template <typename T, typename Executor, typename F, typename Handler> awaiter<Executor>* co_spawn_entry_point(awaitable<T, Executor>*, executor_work_guard<Executor> work_guard, F f, Handler handler) { bool done = false; try { T t = co_await f(); done = true; (dispatch)(work_guard.get_executor(), [handler = std::move(handler), t = std::move(t)]() mutable { handler(std::exception_ptr(), std::move(t)); }); } catch (...) { if (done) throw; (dispatch)(work_guard.get_executor(), [handler = std::move(handler), e = std::current_exception()]() mutable { handler(e, T()); }); } } template <typename Executor, typename F, typename Handler> awaiter<Executor>* co_spawn_entry_point(awaitable<void, Executor>*, executor_work_guard<Executor> work_guard, F f, Handler handler) { std::exception_ptr e = nullptr; try { co_await f(); } catch (...) { e = std::current_exception(); } (dispatch)(work_guard.get_executor(), [handler = std::move(handler), e]() mutable { handler(e); }); } template <typename Executor, typename F, typename CompletionToken> auto co_spawn(const Executor& ex, F&& f, CompletionToken&& token) { typedef typename result_of<F()>::type awaitable_type; typedef typename awaitable_type::executor_type executor_type; typedef typename awaitable_signature<awaitable_type>::type signature_type; async_completion<CompletionToken, signature_type> completion(token); executor_type ex2(ex); auto work_guard = make_work_guard(completion.completion_handler, ex2); auto* a = (co_spawn_entry_point)( static_cast<awaitable_type*>(nullptr), std::move(work_guard), std::forward<F>(f), std::move(completion.completion_handler)); a->set_executor(ex2); (post)(co_spawn_handler<executor_type>(a)); return completion.result.get(); } #if defined(_MSC_VER) # pragma warning(push) # pragma warning(disable:4033) #endif // defined(_MSC_VER) #if defined(_MSC_VER) template <typename T> T dummy_return() { return std::move(*static_cast<T*>(nullptr)); } template <> inline void dummy_return() { } #endif // defined(_MSC_VER) template <typename Awaitable> inline Awaitable make_dummy_awaitable() { for (;;) co_await std::experimental::suspend_always(); #if defined(_MSC_VER) co_return dummy_return<typename Awaitable::value_type>(); #endif // defined(_MSC_VER) } #if defined(_MSC_VER) # pragma warning(pop) #endif // defined(_MSC_VER) } // namespace detail } // namespace experimental template <typename Executor, typename R, typename... Args> class async_result<experimental::await_token<Executor>, R(Args...)> { public: typedef experimental::detail::await_handler< Executor, typename decay<Args>::type...> completion_handler_type; typedef typename experimental::detail::await_handler< Executor, Args...>::awaitable_type return_type; async_result(completion_handler_type& h) : awaitable_(experimental::detail::make_dummy_awaitable<return_type>()) { h.attach_awaitee(awaitable_); } return_type get() { return std::move(awaitable_); } private: return_type awaitable_; }; #if !defined(ASIO_NO_DEPRECATED) template <typename Executor, typename R, typename... Args> struct handler_type<experimental::await_token<Executor>, R(Args...)> { typedef experimental::detail::await_handler< Executor, typename decay<Args>::type...> type; }; template <typename Executor, typename... Args> class async_result<experimental::detail::await_handler<Executor, Args...>> { public: typedef typename experimental::detail::await_handler< Executor, Args...>::awaitable_type type; async_result(experimental::detail::await_handler<Executor, Args...>& h) : awaitable_(experimental::detail::make_dummy_awaitable<type>()) { h.attach_awaitee(awaitable_); } type get() { return std::move(awaitable_); } private: type awaitable_; }; #endif // !defined(ASIO_NO_DEPRECATED) } // namespace asio namespace std { namespace experimental { template <typename Executor, typename... Args> struct coroutine_traits< asio::experimental::detail::awaiter<Executor>*, Args...> { typedef asio::experimental::detail::awaiter<Executor> promise_type; }; template <typename T, typename Executor, typename... Args> struct coroutine_traits< asio::experimental::awaitable<T, Executor>, Args...> { typedef asio::experimental::detail::awaitee<T, Executor> promise_type; }; }} // namespace std::experimental #include "asio/detail/pop_options.hpp" #endif // ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP