diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp | 110 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/transport/offload_io_service_client.hpp | 158 | ||||
| -rw-r--r-- | host/lib/include/uhdlib/utils/semaphore.hpp | 71 | ||||
| -rw-r--r-- | host/lib/transport/offload_io_service.cpp | 309 | 
4 files changed, 351 insertions, 297 deletions
diff --git a/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp new file mode 100644 index 000000000..f0dd853a4 --- /dev/null +++ b/host/lib/include/uhdlib/transport/frame_reservation_mgr.hpp @@ -0,0 +1,110 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP +#define INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhdlib/transport/link_if.hpp> +#include <unordered_map> + +namespace uhd { namespace transport { + +/*! + * Helper class to keep track of the number of frames reserved from a pair of links + */ +class frame_reservation_mgr +{ +public: +    struct frame_reservation_t +    { +        recv_link_if::sptr recv_link; +        size_t num_recv_frames = 0; +        send_link_if::sptr send_link; +        size_t num_send_frames = 0; +    }; + +    void register_link(const recv_link_if::sptr& recv_link) +    { +        if (_recv_tbl[recv_link.get()] != 0) { +            throw uhd::runtime_error("Recv link already attached to I/O service"); +        } +        _recv_tbl[recv_link.get()] = 0; +    } + +    void register_link(const send_link_if::sptr& send_link) +    { +        if (_send_tbl[send_link.get()] != 0) { +            throw uhd::runtime_error("Send link already attached to I/O service"); +        } +        _send_tbl[send_link.get()] = 0; +    } + +    void unregister_link(const recv_link_if::sptr& recv_link) +    { +        auto link_ptr = recv_link.get(); +        UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); +        _recv_tbl.erase(link_ptr); +    } + +    void unregister_link(const send_link_if::sptr& send_link) +    { +        auto link_ptr = send_link.get(); +        UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); +        _send_tbl.erase(link_ptr); +    } + +    void reserve_frames(const frame_reservation_t& reservation) +    { +        if (reservation.recv_link) { +            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); +            const size_t capacity    = reservation.recv_link->get_num_recv_frames(); +            if (rsvd_frames + reservation.num_recv_frames > capacity) { +                throw uhd::runtime_error( +                    "Number of frames requested exceeds link recv frame capacity"); +            } + +            recv_link_if* link_ptr = reservation.recv_link.get(); +            _recv_tbl[link_ptr]    = rsvd_frames + reservation.num_recv_frames; +        } + +        if (reservation.send_link) { +            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); +            const size_t capacity    = reservation.send_link->get_num_send_frames(); +            if (rsvd_frames + reservation.num_send_frames > capacity) { +                throw uhd::runtime_error( +                    "Number of frames requested exceeds link send frame capacity"); +            } + +            send_link_if* link_ptr = reservation.send_link.get(); +            _send_tbl[link_ptr]    = rsvd_frames + reservation.num_send_frames; +        } +    } + +    void unreserve_frames(const frame_reservation_t& reservation) +    { +        if (reservation.recv_link) { +            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); +            recv_link_if* link_ptr   = reservation.recv_link.get(); +            _recv_tbl[link_ptr]      = rsvd_frames - reservation.num_recv_frames; +        } + +        if (reservation.send_link) { +            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); +            send_link_if* link_ptr   = reservation.send_link.get(); +            _send_tbl[link_ptr]      = rsvd_frames - reservation.num_send_frames; +        } +    } + +private: +    std::unordered_map<recv_link_if*, size_t> _recv_tbl; +    std::unordered_map<send_link_if*, size_t> _send_tbl; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_FRAME_RESERVATION_MGR_HPP */ diff --git a/host/lib/include/uhdlib/transport/offload_io_service_client.hpp b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp new file mode 100644 index 000000000..620e796ef --- /dev/null +++ b/host/lib/include/uhdlib/transport/offload_io_service_client.hpp @@ -0,0 +1,158 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP +#define INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP + +#include <uhd/transport/frame_buff.hpp> +#include <chrono> +#include <thread> + +namespace uhd { namespace transport { + +namespace detail { + +// Implementation of get_send_buff used below by send and recv clients +template <typename pop_func_t> +static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) +{ +    using namespace std::chrono; + +    if (timeout_ms == 0) { +        return frame_buff::uptr(pop()); +    } + +    const auto end_time = steady_clock::now() + milliseconds(timeout_ms); + +    bool last_check = false; + +    while (true) { +        if (frame_buff* buff = pop()) { +            return frame_buff::uptr(buff); +        } + +        if (timeout_ms > 0 && steady_clock::now() > end_time) { +            if (last_check) { +                return nullptr; +            } else { +                last_check = true; +            } +        } +        std::this_thread::yield(); +    } +} + +} // namespace detail + +/*! + * Recv I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_recv_io : public recv_io_if +{ +public: +    offload_recv_io(typename io_service_t::sptr io_srv, +        size_t num_recv_frames, +        size_t num_send_frames, +        typename io_service_t::client_port_t::sptr& port) +        : _io_srv(io_srv), _port(port) +    { +        _num_recv_frames = num_recv_frames; +        _num_send_frames = num_send_frames; +    } + +    ~offload_recv_io() +    { +        assert(_num_frames_in_use == 0); + +        if (_io_srv) { +            _port->client_disconnect(); +        } +    } + +    frame_buff::uptr get_recv_buff(int32_t timeout_ms) +    { +        return detail::client_get_buff( +            [this]() { +                frame_buff* buff = _port->client_pop(); +                _num_frames_in_use += buff ? 1 : 0; +                return buff; +            }, +            timeout_ms); +    } + +    void release_recv_buff(frame_buff::uptr buff) +    { +        assert(buff); +        _port->client_push(buff.release()); +        _num_frames_in_use--; +    } + +private: +    offload_recv_io()                       = delete; +    offload_recv_io(const offload_recv_io&) = delete; + +    typename io_service_t::sptr _io_srv; +    typename io_service_t::client_port_t::sptr _port; +    size_t _num_frames_in_use = 0; +}; + +/*! + * Send I/O client for offload I/O service + */ +template <typename io_service_t> +class offload_send_io : public send_io_if +{ +public: +    offload_send_io(typename io_service_t::sptr io_srv, +        size_t num_recv_frames, +        size_t num_send_frames, +        typename io_service_t::client_port_t::sptr& port) +        : _io_srv(io_srv), _port(port) +    { +        _num_recv_frames = num_recv_frames; +        _num_send_frames = num_send_frames; +    } + +    ~offload_send_io() +    { +        assert(_num_frames_in_use == 0); + +        if (_io_srv) { +            _port->client_disconnect(); +        } +    } + +    frame_buff::uptr get_send_buff(int32_t timeout_ms) +    { +        return detail::client_get_buff( +            [this]() { +                frame_buff* buff = _port->client_pop(); +                _num_frames_in_use += buff ? 1 : 0; +                return buff; +            }, +            timeout_ms); +    } + +    void release_send_buff(frame_buff::uptr buff) +    { +        assert(buff); +        _port->client_push(buff.release()); +        _num_frames_in_use--; +    } + +private: +    offload_send_io()                       = delete; +    offload_send_io(const offload_send_io&) = delete; + +    typename io_service_t::sptr _io_srv; +    typename io_service_t::client_port_t::sptr _port; +    size_t _num_frames_in_use = 0; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_OFFLOAD_IO_SERVICE_CLIENT_HPP */ diff --git a/host/lib/include/uhdlib/utils/semaphore.hpp b/host/lib/include/uhdlib/utils/semaphore.hpp new file mode 100644 index 000000000..ae77ed102 --- /dev/null +++ b/host/lib/include/uhdlib/utils/semaphore.hpp @@ -0,0 +1,71 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include <condition_variable> +#include <chrono> +#include <mutex> + +#ifndef INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP +#define INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP + +namespace uhd { + +/*! + * A sempahore built using std::condition_variable + */ +class semaphore +{ +public: +    void notify() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        _count++; +        _cv.notify_one(); +    } + +    void wait() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        _cv.wait(lock, [this]() { return this->_count != 0; }); +        _count--; +    } + +    bool try_wait() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        if (_count != 0) { +            _count--; +            return true; +        } +        return false; +    } + +    bool wait_for(size_t timeout_ms) +    { +        std::chrono::milliseconds timeout(timeout_ms); +        std::unique_lock<std::mutex> lock(_cv_mutex); +        if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { +            _count--; +            return true; +        } +        return false; +    } + +    size_t count() +    { +        std::unique_lock<std::mutex> lock(_cv_mutex); +        return _count; +    } + +private: +    std::condition_variable _cv; +    std::mutex _cv_mutex; +    size_t _count = 0; +}; + +} // namespace uhd + +#endif /* INCLUDED_UHDLIB_UTILS_SEMAPHORE_HPP */ diff --git a/host/lib/transport/offload_io_service.cpp b/host/lib/transport/offload_io_service.cpp index 012c86868..c9b9af344 100644 --- a/host/lib/transport/offload_io_service.cpp +++ b/host/lib/transport/offload_io_service.cpp @@ -6,11 +6,12 @@  #include <uhd/config.hpp>  #include <uhd/exception.hpp> -#include <uhd/utils/log.hpp>  #include <uhd/utils/thread.hpp> +#include <uhdlib/transport/frame_reservation_mgr.hpp>  #include <uhdlib/transport/offload_io_service.hpp> +#include <uhdlib/transport/offload_io_service_client.hpp> +#include <uhdlib/utils/semaphore.hpp>  #include <condition_variable> -#include <unordered_map>  #include <boost/lockfree/queue.hpp>  #include <atomic>  #include <chrono> @@ -25,141 +26,6 @@ namespace {  constexpr int32_t blocking_timeout_ms = 100; -// Struct to help keep track of frames reserved for each link -struct frame_reservation_t -{ -    recv_link_if::sptr recv_link; -    size_t num_recv_frames = 0; -    send_link_if::sptr send_link; -    size_t num_send_frames = 0; -}; - -// Helper class to keep track of frames reserved for each link -class frame_reservation_mgr -{ -public: -    void register_link(const recv_link_if::sptr& recv_link) -    { -        if (_recv_tbl[recv_link.get()] != 0) { -            throw uhd::runtime_error("Recv link already attached to I/O service"); -        } -        _recv_tbl[recv_link.get()] = 0; -    } - -    void register_link(const send_link_if::sptr& send_link) -    { -        if (_send_tbl[send_link.get()] != 0) { -            throw uhd::runtime_error("Send link already attached to I/O service"); -        } -        _send_tbl[send_link.get()] = 0; -    } - -    void unregister_link(const recv_link_if::sptr& recv_link) -    { -        auto link_ptr = recv_link.get(); -        UHD_ASSERT_THROW(_recv_tbl.count(link_ptr) != 0); -        _recv_tbl.erase(link_ptr); -    } - -    void unregister_link(const send_link_if::sptr& send_link) -    { -        auto link_ptr = send_link.get(); -        UHD_ASSERT_THROW(_send_tbl.count(link_ptr) != 0); -        _send_tbl.erase(link_ptr); -    } - -    void reserve_frames(const frame_reservation_t& reservation) -    { -        if (reservation.recv_link) { -            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); -            const size_t capacity    = reservation.recv_link->get_num_recv_frames(); -            if (rsvd_frames + reservation.num_recv_frames > capacity) { -                throw uhd::runtime_error("Number of frames requested exceeds link recv frame capacity"); -            } - -            recv_link_if* link_ptr = reservation.recv_link.get(); -            _recv_tbl[link_ptr]    = rsvd_frames + reservation.num_recv_frames; -        } - -        if (reservation.send_link) { -            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); -            const size_t capacity    = reservation.send_link->get_num_send_frames(); -            if (rsvd_frames + reservation.num_send_frames > capacity) { -                throw uhd::runtime_error("Number of frames requested exceeds link send frame capacity"); -            } - -            send_link_if* link_ptr = reservation.send_link.get(); -            _send_tbl[link_ptr]    = rsvd_frames + reservation.num_send_frames; -        } -    } - -    void unreserve_frames(const frame_reservation_t& reservation) -    { -        if (reservation.recv_link) { -            const size_t rsvd_frames = _recv_tbl.at(reservation.recv_link.get()); -            recv_link_if* link_ptr   = reservation.recv_link.get(); -            _recv_tbl[link_ptr]      = rsvd_frames - reservation.num_recv_frames; -        } - -        if (reservation.send_link) { -            const size_t rsvd_frames = _send_tbl.at(reservation.send_link.get()); -            send_link_if* link_ptr   = reservation.send_link.get(); -            _send_tbl[link_ptr]      = rsvd_frames - reservation.num_send_frames; -        } -    } - -private: -    std::unordered_map<recv_link_if*, size_t> _recv_tbl; -    std::unordered_map<send_link_if*, size_t> _send_tbl; -}; - - -// Semaphore used in blocking I/O for offload thread -class semaphore -{ -public: -    void notify() { -        std::unique_lock<std::mutex> lock(_cv_mutex); -        _count++; -        _cv.notify_one(); -    } - -    void wait() { -        std::unique_lock<std::mutex> lock(_cv_mutex); -        _cv.wait(lock, [this]() { return this->_count != 0; }); -        _count--; -    } - -    bool try_wait() { -        std::unique_lock<std::mutex> lock(_cv_mutex); -        if (_count != 0) { -            _count--; -            return true; -        } -        return false; -    } - -    bool wait_for(size_t timeout_ms) { -        std::chrono::milliseconds timeout(timeout_ms); -        std::unique_lock<std::mutex> lock(_cv_mutex); -        if (_cv.wait_for(lock, timeout, [this]() { return this->_count != 0; })) { -            _count--; -            return true; -        } -        return false; -    } - -    size_t count() { -        std::unique_lock<std::mutex> lock(_cv_mutex); -        return _count; -    } - -private: -    std::condition_variable _cv; -    std::mutex _cv_mutex; -    size_t _count = 0; -}; -  // Fixed-size queue that supports blocking semantics  template <typename queue_item_t>  class offload_thread_queue { @@ -221,12 +87,12 @@ private:  };  // Object that implements the communication between client and offload thread -struct client_port_t +struct client_port_impl_t  {  public: -    using sptr = std::shared_ptr<client_port_t>; +    using sptr = std::shared_ptr<client_port_impl_t>; -    client_port_t(size_t size) +    client_port_impl_t(size_t size)          : _from_offload_thread(size)          , _to_offload_thread(size + 1) // add one for disconnect command      { @@ -363,7 +229,8 @@ class offload_io_service_impl        public std::enable_shared_from_this<offload_io_service_impl>  {  public: -    using sptr = std::shared_ptr<offload_io_service_impl>; +    using sptr          = std::shared_ptr<offload_io_service_impl>; +    using client_port_t = client_port_impl_t;      offload_io_service_impl(          io_service::sptr io_srv, const offload_io_service::params_t& params); @@ -392,6 +259,8 @@ public:  private:      offload_io_service_impl(const offload_io_service_impl&) = delete; +    using frame_reservation_t = frame_reservation_mgr::frame_reservation_t; +      // Queue for new client creation, multiple producers allowed. Requests are      // passed as heap-allocated pointers because boost lockfree queues require      // simple types. @@ -453,160 +322,6 @@ private:      frame_reservation_mgr _reservation_mgr;  }; -class offload_recv_io : public recv_io_if -{ -public: -    offload_recv_io(offload_io_service_impl::sptr io_srv, -        size_t num_recv_frames, -        size_t num_send_frames, -        client_port_t::sptr& port); - -    ~offload_recv_io(); - -    frame_buff::uptr get_recv_buff(int32_t timeout_ms); -    void release_recv_buff(frame_buff::uptr buff); - -private: -    offload_recv_io()                       = delete; -    offload_recv_io(const offload_recv_io&) = delete; - -    offload_io_service_impl::sptr _io_srv; -    client_port_t::sptr _port; -    size_t _num_frames_in_use = 0; -}; - -class offload_send_io : public send_io_if -{ -public: -    offload_send_io(offload_io_service_impl::sptr io_srv, -        size_t num_recv_frames, -        size_t num_send_frames, -        client_port_t::sptr& port); - -    ~offload_send_io(); - -    frame_buff::uptr get_send_buff(int32_t timeout_ms); -    void release_send_buff(frame_buff::uptr buff); - -private: -    offload_send_io()                       = delete; -    offload_send_io(const offload_send_io&) = delete; - -    offload_io_service_impl::sptr _io_srv; -    client_port_t::sptr _port; -    size_t _num_frames_in_use = 0; -}; - -// Implementation of get_send_buff used below by send and recv clients -template <typename pop_func_t> -static frame_buff::uptr client_get_buff(pop_func_t pop, const int32_t timeout_ms) -{ -    using namespace std::chrono; - -    if (timeout_ms == 0) { -        return frame_buff::uptr(pop()); -    } - -    const auto end_time = steady_clock::now() + milliseconds(timeout_ms); - -    bool last_check = false; - -    while (true) { -        if (frame_buff* buff = pop()) { -            return frame_buff::uptr(buff); -        } - -        if (timeout_ms > 0 && steady_clock::now() > end_time) { -            if (last_check) { -                return nullptr; -            } else { -                last_check = true; -            } -        } -        std::this_thread::yield(); -    } -} - -// -// offload_recv_io methods -// -offload_recv_io::offload_recv_io(offload_io_service_impl::sptr io_srv, -    size_t num_recv_frames, -    size_t num_send_frames, -    client_port_t::sptr& port) -    : _io_srv(io_srv), _port(port) -{ -    _num_recv_frames = num_recv_frames; -    _num_send_frames = num_send_frames; -} - -offload_recv_io::~offload_recv_io() -{ -    assert(_num_frames_in_use == 0); - -    if (_io_srv) { -        _port->client_disconnect(); -    } -} - -frame_buff::uptr offload_recv_io::get_recv_buff(int32_t timeout_ms) -{ -    return client_get_buff( -        [this]() { -            frame_buff* buff = _port->client_pop(); -            _num_frames_in_use += buff ? 1 : 0; -            return buff; -        }, -        timeout_ms); -} - -void offload_recv_io::release_recv_buff(frame_buff::uptr buff) -{ -    assert(buff); -    _port->client_push(buff.release()); -    _num_frames_in_use--; -} - -// -// offload_send_io methods -// -offload_send_io::offload_send_io(offload_io_service_impl::sptr io_srv, -    size_t num_recv_frames, -    size_t num_send_frames, -    client_port_t::sptr& port) -    : _io_srv(io_srv), _port(port) -{ -    _num_recv_frames = num_recv_frames; -    _num_send_frames = num_send_frames; -} - -offload_send_io::~offload_send_io() -{ -    assert(_num_frames_in_use == 0); - -    if (_io_srv) { -        _port->client_disconnect(); -    } -} - -frame_buff::uptr offload_send_io::get_send_buff(int32_t timeout_ms) -{ -    return client_get_buff( -        [this]() { -            frame_buff* buff = _port->client_pop(); -            _num_frames_in_use += buff ? 1 : 0; -            return buff; -        }, -        timeout_ms); -} - -void offload_send_io::release_send_buff(frame_buff::uptr buff) -{ -    assert(buff); -    _port->client_push(buff.release()); -    _num_frames_in_use--; -} -  //  // offload_io_service methods  // @@ -759,7 +474,7 @@ recv_io_if::sptr offload_io_service_impl::make_recv_client(recv_link_if::sptr re      port->client_wait_until_connected();      // Return a new recv client to the caller that just operates on the queues -    return std::make_shared<offload_recv_io>( +    return std::make_shared<offload_recv_io<offload_io_service_impl>>(          shared_from_this(), num_recv_frames, num_send_frames, port);  } @@ -813,7 +528,7 @@ send_io_if::sptr offload_io_service_impl::make_send_client(send_link_if::sptr se      }      // Return a new recv client to the caller that just operates on the queues -    return std::make_shared<offload_send_io>( +    return std::make_shared<offload_send_io<offload_io_service_impl>>(          shared_from_this(), num_recv_frames, num_send_frames, port);  }  | 
