diff options
Diffstat (limited to 'lib/asio/detail/impl/strand_service.ipp')
-rw-r--r-- | lib/asio/detail/impl/strand_service.ipp | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/lib/asio/detail/impl/strand_service.ipp b/lib/asio/detail/impl/strand_service.ipp new file mode 100644 index 0000000..cbaf25b --- /dev/null +++ b/lib/asio/detail/impl/strand_service.ipp @@ -0,0 +1,177 @@ +// +// detail/impl/strand_service.ipp +// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +// +// Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com) +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// + +#ifndef ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP +#define ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP + +#if defined(_MSC_VER) && (_MSC_VER >= 1200) +# pragma once +#endif // defined(_MSC_VER) && (_MSC_VER >= 1200) + +#include "asio/detail/config.hpp" +#include "asio/detail/call_stack.hpp" +#include "asio/detail/strand_service.hpp" + +#include "asio/detail/push_options.hpp" + +namespace asio { +namespace detail { + +struct strand_service::on_do_complete_exit +{ + io_context_impl* owner_; + strand_impl* impl_; + + ~on_do_complete_exit() + { + impl_->mutex_.lock(); + impl_->ready_queue_.push(impl_->waiting_queue_); + bool more_handlers = impl_->locked_ = !impl_->ready_queue_.empty(); + impl_->mutex_.unlock(); + + if (more_handlers) + owner_->post_immediate_completion(impl_, true); + } +}; + +strand_service::strand_service(asio::io_context& io_context) + : asio::detail::service_base<strand_service>(io_context), + io_context_(asio::use_service<io_context_impl>(io_context)), + mutex_(), + salt_(0) +{ +} + +void strand_service::shutdown() +{ + op_queue<operation> ops; + + asio::detail::mutex::scoped_lock lock(mutex_); + + for (std::size_t i = 0; i < num_implementations; ++i) + { + if (strand_impl* impl = implementations_[i].get()) + { + ops.push(impl->waiting_queue_); + ops.push(impl->ready_queue_); + } + } +} + +void strand_service::construct(strand_service::implementation_type& impl) +{ + asio::detail::mutex::scoped_lock lock(mutex_); + + std::size_t salt = salt_++; +#if defined(ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) + std::size_t index = salt; +#else // defined(ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) + std::size_t index = reinterpret_cast<std::size_t>(&impl); + index += (reinterpret_cast<std::size_t>(&impl) >> 3); + index ^= salt + 0x9e3779b9 + (index << 6) + (index >> 2); +#endif // defined(ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION) + index = index % num_implementations; + + if (!implementations_[index].get()) + implementations_[index].reset(new strand_impl); + impl = implementations_[index].get(); +} + +bool strand_service::running_in_this_thread( + const implementation_type& impl) const +{ + return call_stack<strand_impl>::contains(impl) != 0; +} + +bool strand_service::do_dispatch(implementation_type& impl, operation* op) +{ + // If we are running inside the io_context, and no other handler already + // holds the strand lock, then the handler can run immediately. + bool can_dispatch = io_context_.can_dispatch(); + impl->mutex_.lock(); + if (can_dispatch && !impl->locked_) + { + // Immediate invocation is allowed. + impl->locked_ = true; + impl->mutex_.unlock(); + return true; + } + + if (impl->locked_) + { + // Some other handler already holds the strand lock. Enqueue for later. + impl->waiting_queue_.push(op); + impl->mutex_.unlock(); + } + else + { + // The handler is acquiring the strand lock and so is responsible for + // scheduling the strand. + impl->locked_ = true; + impl->mutex_.unlock(); + impl->ready_queue_.push(op); + io_context_.post_immediate_completion(impl, false); + } + + return false; +} + +void strand_service::do_post(implementation_type& impl, + operation* op, bool is_continuation) +{ + impl->mutex_.lock(); + if (impl->locked_) + { + // Some other handler already holds the strand lock. Enqueue for later. + impl->waiting_queue_.push(op); + impl->mutex_.unlock(); + } + else + { + // The handler is acquiring the strand lock and so is responsible for + // scheduling the strand. + impl->locked_ = true; + impl->mutex_.unlock(); + impl->ready_queue_.push(op); + io_context_.post_immediate_completion(impl, is_continuation); + } +} + +void strand_service::do_complete(void* owner, operation* base, + const asio::error_code& ec, std::size_t /*bytes_transferred*/) +{ + if (owner) + { + strand_impl* impl = static_cast<strand_impl*>(base); + + // Indicate that this strand is executing on the current thread. + call_stack<strand_impl>::context ctx(impl); + + // Ensure the next handler, if any, is scheduled on block exit. + on_do_complete_exit on_exit; + on_exit.owner_ = static_cast<io_context_impl*>(owner); + on_exit.impl_ = impl; + + // Run all ready handlers. No lock is required since the ready queue is + // accessed only within the strand. + while (operation* o = impl->ready_queue_.front()) + { + impl->ready_queue_.pop(); + o->complete(owner, ec, 0); + } + } +} + +} // namespace detail +} // namespace asio + +#include "asio/detail/pop_options.hpp" + +#endif // ASIO_DETAIL_IMPL_STRAND_SERVICE_IPP |