diff options
Diffstat (limited to 'lib/asio/experimental/impl/co_spawn.hpp')
-rw-r--r-- | lib/asio/experimental/impl/co_spawn.hpp | 876 |
1 files changed, 876 insertions, 0 deletions
diff --git a/lib/asio/experimental/impl/co_spawn.hpp b/lib/asio/experimental/impl/co_spawn.hpp new file mode 100644 index 0000000..8263eff --- /dev/null +++ b/lib/asio/experimental/impl/co_spawn.hpp @@ -0,0 +1,876 @@ +// +// experimental/impl/co_spawn.hpp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP +#define ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +# pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include "asio/detail/config.hpp" +#include <exception> +#include <functional> +#include <memory> +#include <new> +#include <tuple> +#include <utility> +#include "asio/async_result.hpp" +#include "asio/detail/thread_context.hpp" +#include "asio/detail/thread_info_base.hpp" +#include "asio/detail/type_traits.hpp" +#include "asio/dispatch.hpp" +#include "asio/post.hpp" + +#include "asio/detail/push_options.hpp" + +namespace asio { +namespace experimental { +namespace detail { + +// Promise object for coroutine at top of thread-of-execution "stack". +template <typename Executor> +class awaiter +{ +public: + struct deleter + { + void operator()(awaiter* a) + { + if (a) + a->release(); + } + }; + + typedef std::unique_ptr<awaiter, deleter> ptr; + + typedef Executor executor_type; + + ~awaiter() + { + if (has_executor_) + static_cast<Executor*>(static_cast<void*>(executor_))->~Executor(); + } + + void set_executor(const Executor& ex) + { + new (&executor_) Executor(ex); + has_executor_ = true; + } + + executor_type get_executor() const noexcept + { + return *static_cast<const Executor*>(static_cast<const void*>(executor_)); + } + + awaiter* get_return_object() + { + return this; + } + + auto initial_suspend() + { + return std::experimental::suspend_always(); + } + + auto final_suspend() + { + return std::experimental::suspend_always(); + } + + void return_void() + { + } + + awaiter* add_ref() + { + ++ref_count_; + return this; + } + + void release() + { + if (--ref_count_ == 0) + coroutine_handle<awaiter>::from_promise(*this).destroy(); + } + + void unhandled_exception() + { + pending_exception_ = std::current_exception(); + } + + void rethrow_unhandled_exception() + { + if (pending_exception_) + { + std::exception_ptr ex = std::exchange(pending_exception_, nullptr); + std::rethrow_exception(ex); + } + } + +private: + std::size_t ref_count_ = 0; + std::exception_ptr pending_exception_ = nullptr; + alignas(Executor) unsigned char executor_[sizeof(Executor)]; + bool has_executor_ = false; +}; + +// Base promise for coroutines further down the thread-of-execution "stack". +template <typename Executor> +class awaitee_base +{ +public: +#if !defined(ASIO_DISABLE_AWAITEE_RECYCLING) + void* operator new(std::size_t size) + { + return asio::detail::thread_info_base::allocate( + asio::detail::thread_info_base::awaitee_tag(), + asio::detail::thread_context::thread_call_stack::top(), + size); + } + + void operator delete(void* pointer, std::size_t size) + { + asio::detail::thread_info_base::deallocate( + asio::detail::thread_info_base::awaitee_tag(), + asio::detail::thread_context::thread_call_stack::top(), + pointer, size); + } +#endif // !defined(ASIO_DISABLE_AWAITEE_RECYCLING) + + auto initial_suspend() + { + return std::experimental::suspend_never(); + } + + struct final_suspender + { + awaitee_base* this_; + + bool await_ready() const noexcept + { + return false; + } + + void await_suspend(coroutine_handle<void>) + { + this_->wake_caller(); + } + + void await_resume() const noexcept + { + } + }; + + auto final_suspend() + { + return final_suspender{this}; + } + + void set_except(std::exception_ptr e) + { + pending_exception_ = e; + } + + void unhandled_exception() + { + set_except(std::current_exception()); + } + + void rethrow_exception() + { + if (pending_exception_) + { + std::exception_ptr ex = std::exchange(pending_exception_, nullptr); + std::rethrow_exception(ex); + } + } + + awaiter<Executor>* top() + { + return awaiter_; + } + + coroutine_handle<void> caller() + { + return caller_; + } + + bool ready() const + { + return ready_; + } + + void wake_caller() + { + if (caller_) + caller_.resume(); + else + ready_ = true; + } + + class awaitable_executor + { + public: + explicit awaitable_executor(awaitee_base* a) + : this_(a) + { + } + + bool await_ready() const noexcept + { + return this_->awaiter_ != nullptr; + } + + template <typename U, typename Ex> + void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept + { + this_->resume_on_attach_ = h; + } + + Executor await_resume() + { + return this_->awaiter_->get_executor(); + } + + private: + awaitee_base* this_; + }; + + awaitable_executor await_transform(this_coro::executor_t) noexcept + { + return awaitable_executor(this); + } + + class awaitable_token + { + public: + explicit awaitable_token(awaitee_base* a) + : this_(a) + { + } + + bool await_ready() const noexcept + { + return this_->awaiter_ != nullptr; + } + + template <typename U, typename Ex> + void await_suspend(coroutine_handle<detail::awaitee<U, Ex>> h) noexcept + { + this_->resume_on_attach_ = h; + } + + await_token<Executor> await_resume() + { + return await_token<Executor>(this_->awaiter_); + } + + private: + awaitee_base* this_; + }; + + awaitable_token await_transform(this_coro::token_t) noexcept + { + return awaitable_token(this); + } + + template <typename T> + awaitable<T, Executor> await_transform(awaitable<T, Executor>& t) const + { + return std::move(t); + } + + template <typename T> + awaitable<T, Executor> await_transform(awaitable<T, Executor>&& t) const + { + return std::move(t); + } + + std::experimental::suspend_always await_transform( + std::experimental::suspend_always) const + { + return std::experimental::suspend_always(); + } + + void attach_caller(coroutine_handle<awaiter<Executor>> h) + { + this->caller_ = h; + this->attach_callees(&h.promise()); + } + + template <typename U> + void attach_caller(coroutine_handle<awaitee<U, Executor>> h) + { + this->caller_ = h; + if (h.promise().awaiter_) + this->attach_callees(h.promise().awaiter_); + else + h.promise().unattached_callee_ = this; + } + + void attach_callees(awaiter<Executor>* a) + { + for (awaitee_base* curr = this; curr != nullptr; + curr = std::exchange(curr->unattached_callee_, nullptr)) + { + curr->awaiter_ = a; + if (curr->resume_on_attach_) + return std::exchange(curr->resume_on_attach_, nullptr).resume(); + } + } + +protected: + awaiter<Executor>* awaiter_ = nullptr; + coroutine_handle<void> caller_ = nullptr; + awaitee_base<Executor>* unattached_callee_ = nullptr; + std::exception_ptr pending_exception_ = nullptr; + coroutine_handle<void> resume_on_attach_ = nullptr; + bool ready_ = false; +}; + +// Promise object for coroutines further down the thread-of-execution "stack". +template <typename T, typename Executor> +class awaitee + : public awaitee_base<Executor> +{ +public: + awaitee() + { + } + + awaitee(awaitee&& other) noexcept + : awaitee_base<Executor>(std::move(other)) + { + } + + ~awaitee() + { + if (has_result_) + static_cast<T*>(static_cast<void*>(result_))->~T(); + } + + awaitable<T, Executor> get_return_object() + { + return awaitable<T, Executor>(this); + }; + + template <typename U> + void return_value(U&& u) + { + new (&result_) T(std::forward<U>(u)); + has_result_ = true; + } + + T get() + { + this->caller_ = nullptr; + this->rethrow_exception(); + return std::move(*static_cast<T*>(static_cast<void*>(result_))); + } + +private: + alignas(T) unsigned char result_[sizeof(T)]; + bool has_result_ = false; +}; + +// Promise object for coroutines further down the thread-of-execution "stack". +template <typename Executor> +class awaitee<void, Executor> + : public awaitee_base<Executor> +{ +public: + awaitable<void, Executor> get_return_object() + { + return awaitable<void, Executor>(this); + }; + + void return_void() + { + } + + void get() + { + this->caller_ = nullptr; + this->rethrow_exception(); + } +}; + +template <typename Executor> +class awaiter_task +{ +public: + typedef Executor executor_type; + + awaiter_task(awaiter<Executor>* a) + : awaiter_(a->add_ref()) + { + } + + awaiter_task(awaiter_task&& other) noexcept + : awaiter_(std::exchange(other.awaiter_, nullptr)) + { + } + + ~awaiter_task() + { + if (awaiter_) + { + // Coroutine "stack unwinding" must be performed through the executor. + executor_type ex(awaiter_->get_executor()); + (post)(ex, + [a = std::move(awaiter_)]() mutable + { + typename awaiter<Executor>::ptr(std::move(a)); + }); + } + } + + executor_type get_executor() const noexcept + { + return awaiter_->get_executor(); + } + +protected: + typename awaiter<Executor>::ptr awaiter_; +}; + +template <typename Executor> +class co_spawn_handler : public awaiter_task<Executor> +{ +public: + using awaiter_task<Executor>::awaiter_task; + + void operator()() + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + coroutine_handle<awaiter<Executor>>::from_promise(*ptr.get()).resume(); + } +}; + +template <typename Executor, typename T> +class await_handler_base : public awaiter_task<Executor> +{ +public: + typedef awaitable<T, Executor> awaitable_type; + + await_handler_base(await_token<Executor> token) + : awaiter_task<Executor>(token.awaiter_), + awaitee_(nullptr) + { + } + + await_handler_base(await_handler_base&& other) noexcept + : awaiter_task<Executor>(std::move(other)), + awaitee_(std::exchange(other.awaitee_, nullptr)) + { + } + + void attach_awaitee(const awaitable<T, Executor>& a) + { + awaitee_ = a.awaitee_; + } + +protected: + awaitee<T, Executor>* awaitee_; +}; + +template <typename, typename...> class await_handler; + +template <typename Executor> +class await_handler<Executor, void> + : public await_handler_base<Executor, void> +{ +public: + using await_handler_base<Executor, void>::await_handler_base; + + void operator()() + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + this->awaitee_->return_void(); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor> +class await_handler<Executor, asio::error_code> + : public await_handler_base<Executor, void> +{ +public: + typedef void return_type; + + using await_handler_base<Executor, void>::await_handler_base; + + void operator()(const asio::error_code& ec) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + if (ec) + { + this->awaitee_->set_except( + std::make_exception_ptr(asio::system_error(ec))); + } + else + this->awaitee_->return_void(); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor> +class await_handler<Executor, std::exception_ptr> + : public await_handler_base<Executor, void> +{ +public: + using await_handler_base<Executor, void>::await_handler_base; + + void operator()(std::exception_ptr ex) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + if (ex) + this->awaitee_->set_except(ex); + else + this->awaitee_->return_void(); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor, typename T> +class await_handler<Executor, T> + : public await_handler_base<Executor, T> +{ +public: + using await_handler_base<Executor, T>::await_handler_base; + + template <typename Arg> + void operator()(Arg&& arg) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + this->awaitee_->return_value(std::forward<Arg>(arg)); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor, typename T> +class await_handler<Executor, asio::error_code, T> + : public await_handler_base<Executor, T> +{ +public: + using await_handler_base<Executor, T>::await_handler_base; + + template <typename Arg> + void operator()(const asio::error_code& ec, Arg&& arg) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + if (ec) + { + this->awaitee_->set_except( + std::make_exception_ptr(asio::system_error(ec))); + } + else + this->awaitee_->return_value(std::forward<Arg>(arg)); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor, typename T> +class await_handler<Executor, std::exception_ptr, T> + : public await_handler_base<Executor, T> +{ +public: + using await_handler_base<Executor, T>::await_handler_base; + + template <typename Arg> + void operator()(std::exception_ptr ex, Arg&& arg) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + if (ex) + this->awaitee_->set_except(ex); + else + this->awaitee_->return_value(std::forward<Arg>(arg)); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor, typename... Ts> +class await_handler + : public await_handler_base<Executor, std::tuple<Ts...>> +{ +public: + using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base; + + template <typename... Args> + void operator()(Args&&... args) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + this->awaitee_->return_value( + std::forward_as_tuple(std::forward<Args>(args)...)); + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor, typename... Ts> +class await_handler<Executor, asio::error_code, Ts...> + : public await_handler_base<Executor, std::tuple<Ts...>> +{ +public: + using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base; + + template <typename... Args> + void operator()(const asio::error_code& ec, Args&&... args) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + if (ec) + { + this->awaitee_->set_except( + std::make_exception_ptr(asio::system_error(ec))); + } + else + { + this->awaitee_->return_value( + std::forward_as_tuple(std::forward<Args>(args)...)); + } + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename Executor, typename... Ts> +class await_handler<Executor, std::exception_ptr, Ts...> + : public await_handler_base<Executor, std::tuple<Ts...>> +{ +public: + using await_handler_base<Executor, std::tuple<Ts...>>::await_handler_base; + + template <typename... Args> + void operator()(std::exception_ptr ex, Args&&... args) + { + typename awaiter<Executor>::ptr ptr(std::move(this->awaiter_)); + if (ex) + this->awaitee_->set_except(ex); + else + { + this->awaitee_->return_value( + std::forward_as_tuple(std::forward<Args>(args)...)); + } + this->awaitee_->wake_caller(); + ptr->rethrow_unhandled_exception(); + } +}; + +template <typename T> +struct awaitable_signature; + +template <typename T, typename Executor> +struct awaitable_signature<awaitable<T, Executor>> +{ + typedef void type(std::exception_ptr, T); +}; + +template <typename Executor> +struct awaitable_signature<awaitable<void, Executor>> +{ + typedef void type(std::exception_ptr); +}; + +template <typename T, typename Executor, typename F, typename Handler> +awaiter<Executor>* co_spawn_entry_point(awaitable<T, Executor>*, + executor_work_guard<Executor> work_guard, F f, Handler handler) +{ + bool done = false; + + try + { + T t = co_await f(); + + done = true; + + (dispatch)(work_guard.get_executor(), + [handler = std::move(handler), t = std::move(t)]() mutable + { + handler(std::exception_ptr(), std::move(t)); + }); + } + catch (...) + { + if (done) + throw; + + (dispatch)(work_guard.get_executor(), + [handler = std::move(handler), e = std::current_exception()]() mutable + { + handler(e, T()); + }); + } +} + +template <typename Executor, typename F, typename Handler> +awaiter<Executor>* co_spawn_entry_point(awaitable<void, Executor>*, + executor_work_guard<Executor> work_guard, F f, Handler handler) +{ + std::exception_ptr e = nullptr; + + try + { + co_await f(); + } + catch (...) + { + e = std::current_exception(); + } + + (dispatch)(work_guard.get_executor(), + [handler = std::move(handler), e]() mutable + { + handler(e); + }); +} + +template <typename Executor, typename F, typename CompletionToken> +auto co_spawn(const Executor& ex, F&& f, CompletionToken&& token) +{ + typedef typename result_of<F()>::type awaitable_type; + typedef typename awaitable_type::executor_type executor_type; + typedef typename awaitable_signature<awaitable_type>::type signature_type; + + async_completion<CompletionToken, signature_type> completion(token); + + executor_type ex2(ex); + auto work_guard = make_work_guard(completion.completion_handler, ex2); + + auto* a = (co_spawn_entry_point)( + static_cast<awaitable_type*>(nullptr), std::move(work_guard), + std::forward<F>(f), std::move(completion.completion_handler)); + + a->set_executor(ex2); + (post)(co_spawn_handler<executor_type>(a)); + + return completion.result.get(); +} + +#if defined(_MSC_VER) +# pragma warning(push) +# pragma warning(disable:4033) +#endif // defined(_MSC_VER) + +#if defined(_MSC_VER) +template <typename T> T dummy_return() +{ + return std::move(*static_cast<T*>(nullptr)); +} + +template <> +inline void dummy_return() +{ +} +#endif // defined(_MSC_VER) + +template <typename Awaitable> +inline Awaitable make_dummy_awaitable() +{ + for (;;) co_await std::experimental::suspend_always(); +#if defined(_MSC_VER) + co_return dummy_return<typename Awaitable::value_type>(); +#endif // defined(_MSC_VER) +} + +#if defined(_MSC_VER) +# pragma warning(pop) +#endif // defined(_MSC_VER) + +} // namespace detail +} // namespace experimental + +template <typename Executor, typename R, typename... Args> +class async_result<experimental::await_token<Executor>, R(Args...)> +{ +public: + typedef experimental::detail::await_handler< + Executor, typename decay<Args>::type...> completion_handler_type; + + typedef typename experimental::detail::await_handler< + Executor, Args...>::awaitable_type return_type; + + async_result(completion_handler_type& h) + : awaitable_(experimental::detail::make_dummy_awaitable<return_type>()) + { + h.attach_awaitee(awaitable_); + } + + return_type get() + { + return std::move(awaitable_); + } + +private: + return_type awaitable_; +}; + +#if !defined(ASIO_NO_DEPRECATED) + +template <typename Executor, typename R, typename... Args> +struct handler_type<experimental::await_token<Executor>, R(Args...)> +{ + typedef experimental::detail::await_handler< + Executor, typename decay<Args>::type...> type; +}; + +template <typename Executor, typename... Args> +class async_result<experimental::detail::await_handler<Executor, Args...>> +{ +public: + typedef typename experimental::detail::await_handler< + Executor, Args...>::awaitable_type type; + + async_result(experimental::detail::await_handler<Executor, Args...>& h) + : awaitable_(experimental::detail::make_dummy_awaitable<type>()) + { + h.attach_awaitee(awaitable_); + } + + type get() + { + return std::move(awaitable_); + } + +private: + type awaitable_; +}; + +#endif // !defined(ASIO_NO_DEPRECATED) + +} // namespace asio + +namespace std { namespace experimental { + +template <typename Executor, typename... Args> +struct coroutine_traits< + asio::experimental::detail::awaiter<Executor>*, Args...> +{ + typedef asio::experimental::detail::awaiter<Executor> promise_type; +}; + +template <typename T, typename Executor, typename... Args> +struct coroutine_traits< + asio::experimental::awaitable<T, Executor>, Args...> +{ + typedef asio::experimental::detail::awaitee<T, Executor> promise_type; +}; + +}} // namespace std::experimental + +#include "asio/detail/pop_options.hpp" + +#endif // ASIO_EXPERIMENTAL_IMPL_CO_SPAWN_HPP |