// // experimental/impl/co_spawn.hpp // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ // // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #ifndef ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP #define ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP #if defined(_MSC_VER) && (_MSC_VER >= 1200) # pragma once #endif // defined(_MSC_VER) && (_MSC_VER >= 1200) #include "asio/detail/config.hpp" #include #include #include #include #include #include #include "asio/async_result.hpp" #include "asio/detail/thread_context.hpp" #include "asio/detail/thread_info_base.hpp" #include "asio/detail/type_traits.hpp" #include "asio/dispatch.hpp" #include "asio/post.hpp" #include "asio/detail/push_options.hpp" namespace asio { namespace experimental { namespace detail { // Promise object for coroutine at top of thread-of-execution "stack". template class awaiter { public: struct deleter { void operator()(awaiter* a) { if (a) a->release(); } }; typedef std::unique_ptr ptr; typedef Executor executor_type; ~awaiter() { if (has_executor_) static_cast(static_cast(executor_))->~Executor(); } void set_executor(const Executor& ex) { new (&executor_) Executor(ex); has_executor_ = true; } executor_type get_executor() const noexcept { return *static_cast(static_cast(executor_)); } awaiter* get_return_object() { return this; } auto initial_suspend() { return std::experimental::suspend_always(); } auto final_suspend() { return std::experimental::suspend_always(); } void return_void() { } awaiter* add_ref() { ++ref_count_; return this; } void release() { if (--ref_count_ == 0) coroutine_handle::from_promise(*this).destroy(); } void unhandled_exception() { pending_exception_ = std::current_exception(); } void rethrow_unhandled_exception() { if (pending_exception_) { std::exception_ptr ex = std::exchange(pending_exception_, nullptr); std::rethrow_exception(ex); } } private: std::size_t ref_count_ = 0; std::exception_ptr pending_exception_ = nullptr; alignas(Executor) unsigned char executor_[sizeof(Executor)]; bool has_executor_ = false; }; // Base promise for coroutines further down the thread-of-execution "stack". template class awaitee_base { public: #if !defined(ASIO_DISABLE_AWAITEE_RECYCLING) void* operator new(std::size_t size) { return asio::detail::thread_info_base::allocate( asio::detail::thread_info_base::awaitee_tag(), asio::detail::thread_context::thread_call_stack::top(), size); } void operator delete(void* pointer, std::size_t size) { asio::detail::thread_info_base::deallocate( asio::detail::thread_info_base::awaitee_tag(), asio::detail::thread_context::thread_call_stack::top(), pointer, size); } #endif // !defined(ASIO_DISABLE_AWAITEE_RECYCLING) auto initial_suspend() { return std::experimental::suspend_never(); } struct final_suspender { awaitee_base* this_; bool await_ready() const noexcept { return false; } void await_suspend(coroutine_handle) { this_->wake_caller(); } void await_resume() const noexcept { } }; auto final_suspend() { return final_suspender{this}; } void set_except(std::exception_ptr e) { pending_exception_ = e; } void unhandled_exception() { set_except(std::current_exception()); } void rethrow_exception() { if (pending_exception_) { std::exception_ptr ex = std::exchange(pending_exception_, nullptr); std::rethrow_exception(ex); } } awaiter* top() { return awaiter_; } coroutine_handle caller() { return caller_; } bool ready() const { return ready_; } void wake_caller() { if (caller_) caller_.resume(); else ready_ = true; } class awaitable_executor { public: explicit awaitable_executor(awaitee_base* a) : this_(a) { } bool await_ready() const noexcept { return this_->awaiter_ != nullptr; } template void await_suspend(coroutine_handle> h) noexcept { this_->resume_on_attach_ = h; } Executor await_resume() { return this_->awaiter_->get_executor(); } private: awaitee_base* this_; }; awaitable_executor await_transform(this_coro::executor_t) noexcept { return awaitable_executor(this); } class awaitable_token { public: explicit awaitable_token(awaitee_base* a) : this_(a) { } bool await_ready() const noexcept { return this_->awaiter_ != nullptr; } template void await_suspend(coroutine_handle> h) noexcept { this_->resume_on_attach_ = h; } await_token await_resume() { return await_token(this_->awaiter_); } private: awaitee_base* this_; }; awaitable_token await_transform(this_coro::token_t) noexcept { return awaitable_token(this); } template awaitable await_transform(awaitable& t) const { return std::move(t); } template awaitable await_transform(awaitable&& t) const { return std::move(t); } std::experimental::suspend_always await_transform( std::experimental::suspend_always) const { return std::experimental::suspend_always(); } void attach_caller(coroutine_handle> h) { this->caller_ = h; this->attach_callees(&h.promise()); } template void attach_caller(coroutine_handle> h) { this->caller_ = h; if (h.promise().awaiter_) this->attach_callees(h.promise().awaiter_); else h.promise().unattached_callee_ = this; } void attach_callees(awaiter* a) { for (awaitee_base* curr = this; curr != nullptr; curr = std::exchange(curr->unattached_callee_, nullptr)) { curr->awaiter_ = a; if (curr->resume_on_attach_) return std::exchange(curr->resume_on_attach_, nullptr).resume(); } } protected: awaiter* awaiter_ = nullptr; coroutine_handle caller_ = nullptr; awaitee_base* unattached_callee_ = nullptr; std::exception_ptr pending_exception_ = nullptr; coroutine_handle resume_on_attach_ = nullptr; bool ready_ = false; }; // Promise object for coroutines further down the thread-of-execution "stack". template class awaitee : public awaitee_base { public: awaitee() { } awaitee(awaitee&& other) noexcept : awaitee_base(std::move(other)) { } ~awaitee() { if (has_result_) static_cast(static_cast(result_))->~T(); } awaitable get_return_object() { return awaitable(this); }; template void return_value(U&& u) { new (&result_) T(std::forward(u)); has_result_ = true; } T get() { this->caller_ = nullptr; this->rethrow_exception(); return std::move(*static_cast(static_cast(result_))); } private: alignas(T) unsigned char result_[sizeof(T)]; bool has_result_ = false; }; // Promise object for coroutines further down the thread-of-execution "stack". template class awaitee : public awaitee_base { public: awaitable get_return_object() { return awaitable(this); }; void return_void() { } void get() { this->caller_ = nullptr; this->rethrow_exception(); } }; template class awaiter_task { public: typedef Executor executor_type; awaiter_task(awaiter* a) : awaiter_(a->add_ref()) { } awaiter_task(awaiter_task&& other) noexcept : awaiter_(std::exchange(other.awaiter_, nullptr)) { } ~awaiter_task() { if (awaiter_) { // Coroutine "stack unwinding" must be performed through the executor. executor_type ex(awaiter_->get_executor()); (post)(ex, [a = std::move(awaiter_)]() mutable { typename awaiter::ptr(std::move(a)); }); } } executor_type get_executor() const noexcept { return awaiter_->get_executor(); } protected: typename awaiter::ptr awaiter_; }; template class co_spawn_handler : public awaiter_task { public: using awaiter_task::awaiter_task; void operator()() { typename awaiter::ptr ptr(std::move(this->awaiter_)); coroutine_handle>::from_promise(*ptr.get()).resume(); } }; template class await_handler_base : public awaiter_task { public: typedef awaitable awaitable_type; await_handler_base(await_token token) : awaiter_task(token.awaiter_), awaitee_(nullptr) { } await_handler_base(await_handler_base&& other) noexcept : awaiter_task(std::move(other)), awaitee_(std::exchange(other.awaitee_, nullptr)) { } void attach_awaitee(const awaitable& a) { awaitee_ = a.awaitee_; } protected: awaitee* awaitee_; }; template class await_handler; template class await_handler : public await_handler_base { public: using await_handler_base::await_handler_base; void operator()() { typename awaiter::ptr ptr(std::move(this->awaiter_)); this->awaitee_->return_void(); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base { public: typedef void return_type; using await_handler_base::await_handler_base; void operator()(const asio::error_code& ec) { typename awaiter::ptr ptr(std::move(this->awaiter_)); if (ec) { this->awaitee_->set_except( std::make_exception_ptr(asio::system_error(ec))); } else this->awaitee_->return_void(); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base { public: using await_handler_base::await_handler_base; void operator()(std::exception_ptr ex) { typename awaiter::ptr ptr(std::move(this->awaiter_)); if (ex) this->awaitee_->set_except(ex); else this->awaitee_->return_void(); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base { public: using await_handler_base::await_handler_base; template void operator()(Arg&& arg) { typename awaiter::ptr ptr(std::move(this->awaiter_)); this->awaitee_->return_value(std::forward(arg)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base { public: using await_handler_base::await_handler_base; template void operator()(const asio::error_code& ec, Arg&& arg) { typename awaiter::ptr ptr(std::move(this->awaiter_)); if (ec) { this->awaitee_->set_except( std::make_exception_ptr(asio::system_error(ec))); } else this->awaitee_->return_value(std::forward(arg)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base { public: using await_handler_base::await_handler_base; template void operator()(std::exception_ptr ex, Arg&& arg) { typename awaiter::ptr ptr(std::move(this->awaiter_)); if (ex) this->awaitee_->set_except(ex); else this->awaitee_->return_value(std::forward(arg)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base> { public: using await_handler_base>::await_handler_base; template void operator()(Args&&... args) { typename awaiter::ptr ptr(std::move(this->awaiter_)); this->awaitee_->return_value( std::forward_as_tuple(std::forward(args)...)); this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base> { public: using await_handler_base>::await_handler_base; template void operator()(const asio::error_code& ec, Args&&... args) { typename awaiter::ptr ptr(std::move(this->awaiter_)); if (ec) { this->awaitee_->set_except( std::make_exception_ptr(asio::system_error(ec))); } else { this->awaitee_->return_value( std::forward_as_tuple(std::forward(args)...)); } this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template class await_handler : public await_handler_base> { public: using await_handler_base>::await_handler_base; template void operator()(std::exception_ptr ex, Args&&... args) { typename awaiter::ptr ptr(std::move(this->awaiter_)); if (ex) this->awaitee_->set_except(ex); else { this->awaitee_->return_value( std::forward_as_tuple(std::forward(args)...)); } this->awaitee_->wake_caller(); ptr->rethrow_unhandled_exception(); } }; template struct awaitable_signature; template struct awaitable_signature> { typedef void type(std::exception_ptr, T); }; template struct awaitable_signature> { typedef void type(std::exception_ptr); }; template awaiter* co_spawn_entry_point(awaitable*, executor_work_guard work_guard, F f, Handler handler) { bool done = false; try { T t = co_await f(); done = true; (dispatch)(work_guard.get_executor(), [handler = std::move(handler), t = std::move(t)]() mutable { handler(std::exception_ptr(), std::move(t)); }); } catch (...) { if (done) throw; (dispatch)(work_guard.get_executor(), [handler = std::move(handler), e = std::current_exception()]() mutable { handler(e, T()); }); } } template awaiter* co_spawn_entry_point(awaitable*, executor_work_guard work_guard, F f, Handler handler) { std::exception_ptr e = nullptr; try { co_await f(); } catch (...) { e = std::current_exception(); } (dispatch)(work_guard.get_executor(), [handler = std::move(handler), e]() mutable { handler(e); }); } template auto co_spawn(const Executor& ex, F&& f, CompletionToken&& token) { typedef typename result_of::type awaitable_type; typedef typename awaitable_type::executor_type executor_type; typedef typename awaitable_signature::type signature_type; async_completion completion(token); executor_type ex2(ex); auto work_guard = make_work_guard(completion.completion_handler, ex2); auto* a = (co_spawn_entry_point)( static_cast(nullptr), std::move(work_guard), std::forward(f), std::move(completion.completion_handler)); a->set_executor(ex2); (post)(co_spawn_handler(a)); return completion.result.get(); } #if defined(_MSC_VER) # pragma warning(push) # pragma warning(disable:4033) #endif // defined(_MSC_VER) #if defined(_MSC_VER) template T dummy_return() { return std::move(*static_cast(nullptr)); } template <> inline void dummy_return() { } #endif // defined(_MSC_VER) template inline Awaitable make_dummy_awaitable() { for (;;) co_await std::experimental::suspend_always(); #if defined(_MSC_VER) co_return dummy_return(); #endif // defined(_MSC_VER) } #if defined(_MSC_VER) # pragma warning(pop) #endif // defined(_MSC_VER) } // namespace detail } // namespace experimental template class async_result, R(Args...)> { public: typedef experimental::detail::await_handler< Executor, typename decay::type...> completion_handler_type; typedef typename experimental::detail::await_handler< Executor, Args...>::awaitable_type return_type; async_result(completion_handler_type& h) : awaitable_(experimental::detail::make_dummy_awaitable()) { h.attach_awaitee(awaitable_); } return_type get() { return std::move(awaitable_); } private: return_type awaitable_; }; #if !defined(ASIO_NO_DEPRECATED) template struct handler_type, R(Args...)> { typedef experimental::detail::await_handler< Executor, typename decay::type...> type; }; template class async_result> { public: typedef typename experimental::detail::await_handler< Executor, Args...>::awaitable_type type; async_result(experimental::detail::await_handler& h) : awaitable_(experimental::detail::make_dummy_awaitable()) { h.attach_awaitee(awaitable_); } type get() { return std::move(awaitable_); } private: type awaitable_; }; #endif // !defined(ASIO_NO_DEPRECATED) } // namespace asio namespace std { namespace experimental { template struct coroutine_traits< asio::experimental::detail::awaiter*, Args...> { typedef asio::experimental::detail::awaiter promise_type; }; template struct coroutine_traits< asio::experimental::awaitable, Args...> { typedef asio::experimental::detail::awaitee promise_type; }; }} // namespace std::experimental #include "asio/detail/pop_options.hpp" #endif // ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP