From ebef8c34191ce3f26b62776f35f7da32a34cc660 Mon Sep 17 00:00:00 2001 From: Alex Williams Date: Tue, 20 Aug 2019 15:48:24 -0700 Subject: transport: Port liberio to link_if --- host/lib/include/uhdlib/transport/liberio_link.hpp | 180 +++++++++++++ host/lib/transport/CMakeLists.txt | 2 +- host/lib/transport/liberio_link.cpp | 152 +++++++++++ host/lib/transport/liberio_zero_copy.cpp | 279 --------------------- host/lib/transport/liberio_zero_copy.hpp | 35 --- 5 files changed, 333 insertions(+), 315 deletions(-) create mode 100644 host/lib/include/uhdlib/transport/liberio_link.hpp create mode 100644 host/lib/transport/liberio_link.cpp delete mode 100644 host/lib/transport/liberio_zero_copy.cpp delete mode 100644 host/lib/transport/liberio_zero_copy.hpp (limited to 'host/lib') diff --git a/host/lib/include/uhdlib/transport/liberio_link.hpp b/host/lib/include/uhdlib/transport/liberio_link.hpp new file mode 100644 index 000000000..8ab90976a --- /dev/null +++ b/host/lib/include/uhdlib/transport/liberio_link.hpp @@ -0,0 +1,180 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#ifndef INCLUDED_UHDLIB_TRANSPORT_LIBERIO_LINK_HPP +#define INCLUDED_UHDLIB_TRANSPORT_LIBERIO_LINK_HPP + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +class liberio_frame_buff : public frame_buff +{ +public: + liberio_frame_buff(struct liberio_chan* chan) : _chan(chan) + { + assert(_chan); + // Acquire channel reference + liberio_chan_get(_chan); + } + + ~liberio_frame_buff() + { + if (_buff) { + // Set payload size to signify "empty" + liberio_buf_set_payload(_buff, 0, 0); + + // Release buffer back to liberio + liberio_chan_buf_enqueue(_chan, _buff); + _buff = nullptr; + _data = nullptr; + } + // Release channel reference + liberio_chan_put(_chan); + } + + liberio_frame_buff(const liberio_frame_buff& src) + : liberio_frame_buff(src._chan) {} + + UHD_FORCE_INLINE size_t get(int32_t timeout_ms) + { + // Dequeue timeout in microseconds + _buff = liberio_chan_buf_dequeue(_chan, timeout_ms * 1000); + if (!_buff) { + return 0; + } + _packet_size = liberio_buf_get_len(_buff, 0); + _data = liberio_buf_get_mem(_buff, 0); + return _packet_size; + } + + UHD_FORCE_INLINE void release() + { + assert(_buff); + liberio_buf_set_payload(_buff, 0, _packet_size); + liberio_chan_buf_enqueue(_chan, _buff); + _buff = nullptr; + _data = nullptr; + } +private: + struct liberio_buf* _buff = nullptr; + struct liberio_chan* _chan; +}; + +class liberio_adapter_info : public adapter_info +{ +public: + liberio_adapter_info() = default; + ~liberio_adapter_info() = default; + + std::string to_string() + { + // Currently, there is only ever one liberio adapter + // If that changes, fix this! + return std::string("Liberio"); + } + + bool operator==(const liberio_adapter_info& /*rhs*/) const + { + // Currently, there is only ever one liberio adapter + // If that changes, fix this! + return true; + } +}; + +/*! + * A zero copy transport interface to the liberio DMA library. + */ +class liberio_link : public recv_link_base, + public send_link_base +{ +public: + using sptr = std::shared_ptr; + + liberio_link( + const std::string& tx_path, const std::string& rx_path, const link_params_t& params); + + ~liberio_link(); + + /*! + * Make a new liberio link. + * + * \param tx_path a path string to the TX DMA device + * \param rx_path a path string to the RX DMA device + * \param params Values for frame sizes, num frames, and buffer sizes + * \return a shared_ptr to a new liberio link + */ + static sptr make(const std::string& tx_path, + const std::string& rx_path, + const link_params_t& params); + + /*! + * Get the physical adapter ID used for this link + */ + adapter_id_t get_send_adapter_id() const + { + return _adapter_id; + } + + /*! + * Get the physical adapter ID used for this link + */ + adapter_id_t get_recv_adapter_id() const + { + return _adapter_id; + } + +private: + using recv_link_base_t = recv_link_base; + using send_link_base_t = send_link_base; + + // Friend declarations to allow base classes to call private methods + friend recv_link_base_t; + friend send_link_base_t; + + // Methods called by recv_link_base + size_t get_recv_buff_derived(frame_buff& buff, int32_t timeout_ms) + { + auto& buffer = static_cast(buff); + return buffer.get(timeout_ms); + } + + void release_recv_buff_derived(frame_buff& buff) + { + auto& buffer = static_cast(buff); + buffer.release(); + } + + bool get_send_buff_derived(frame_buff& buff, int32_t timeout_ms) + { + auto& buffer = static_cast(buff); + return buffer.get(timeout_ms); + } + + void release_send_buff_derived(frame_buff& buff) + { + auto& buffer = static_cast(buff); + buffer.release(); + } + + struct liberio_chan* _tx_chan; + struct liberio_chan* _rx_chan; + std::vector _recv_buffs; + std::vector _send_buffs; + adapter_id_t _adapter_id; +}; + +}} // namespace uhd::transport + +#endif /* INCLUDED_UHDLIB_TRANSPORT_LIBERIO_LINK_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 50b95155a..e899cfba0 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -138,7 +138,7 @@ if(ENABLE_LIBERIO) include_directories(${LIBERIO_INCLUDE_DIRS}) LIBUHD_APPEND_LIBS(${LIBERIO_LIBRARIES}) LIBUHD_APPEND_SOURCES( - ${CMAKE_CURRENT_SOURCE_DIR}/liberio_zero_copy.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/liberio_link.cpp ) endif(ENABLE_LIBERIO) diff --git a/host/lib/transport/liberio_link.cpp b/host/lib/transport/liberio_link.cpp new file mode 100644 index 000000000..acc63ee37 --- /dev/null +++ b/host/lib/transport/liberio_link.cpp @@ -0,0 +1,152 @@ +// +// Copyright 2019 Ettus Research, a National Instruments brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include +#include +#include +#include +#include +#include +#include + +namespace uhd { namespace transport { + +using liberio_ctx_sptr = std::shared_ptr; + +static const uint64_t USEC = 1000000; + +static void liberio_log_cb(int severity, const char* msg, void*) +{ + switch (severity) { + case LOG_WARNING: + UHD_LOG_WARNING("LIBERIO", msg); + return; + case LOG_NOTICE: + case LOG_INFO: + UHD_LOG_INFO("LIBERIO", msg); + return; + default: + UHD_LOG_INFO("LIBERIO", msg); + }; +} + +class liberio_context_holder +{ +public: + UHD_SINGLETON_FCN(liberio_context_holder, get); + + inline struct liberio_chan* alloc_tx_chan(const std::string& path) + { + return liberio_ctx_alloc_chan(_ctx, path.c_str(), TX, USRP_MEMORY_MMAP); + } + + inline struct liberio_chan* alloc_rx_chan(const std::string& path) + { + return liberio_ctx_alloc_chan(_ctx, path.c_str(), RX, USRP_MEMORY_MMAP); + } + +private: + liberio_context_holder(void) + { + _ctx = liberio_ctx_new(); + liberio_ctx_register_logger(_ctx, &liberio_log_cb, nullptr); + } + + ~liberio_context_holder(void) + { + liberio_ctx_put(_ctx); + } + + liberio_ctx* _ctx; +}; + +liberio_link::liberio_link( + const std::string& tx_path, const std::string& rx_path, const link_params_t& params) + : recv_link_base_t(params.num_recv_frames, params.recv_frame_size) + , send_link_base_t(params.num_send_frames, params.send_frame_size) +{ + auto& ctx_holder = liberio_context_holder::get(); + + /* Allocate TX channel (begins with refcount of 1) + */ + _tx_chan = ctx_holder.alloc_tx_chan(tx_path); + UHD_ASSERT_THROW(_tx_chan); + + /* Reset channel */ + liberio_chan_stop_streaming(_tx_chan); + liberio_chan_request_buffers(_tx_chan, 0); + UHD_ASSERT_THROW(!liberio_chan_set_fixed_size(_tx_chan, 0, params.send_frame_size)); + UHD_ASSERT_THROW(!liberio_chan_request_buffers(_tx_chan, params.num_send_frames)); + + /* TODO: Check params in factory and adjust them there instead of throwing exception + * here? */ + UHD_ASSERT_THROW(params.num_send_frames == liberio_chan_get_num_bufs(_tx_chan)); + for (size_t i = 0; i < params.num_send_frames; i++) { + _send_buffs.push_back(liberio_frame_buff(_tx_chan)); + } + + /* Allocate RX channel (begins with refcount of 1) + */ + _rx_chan = ctx_holder.alloc_rx_chan(rx_path); + UHD_ASSERT_THROW(_rx_chan); + + /* stop the channel, free the buffers, set the size, allocate */ + liberio_chan_stop_streaming(_rx_chan); + liberio_chan_request_buffers(_rx_chan, 0); + UHD_ASSERT_THROW(!liberio_chan_set_fixed_size(_rx_chan, 0, params.recv_frame_size)); + UHD_ASSERT_THROW(!liberio_chan_request_buffers(_rx_chan, params.num_recv_frames)); + /* TODO: Check params in factory and adjust them there instead of throwing exception + * here? */ + UHD_ASSERT_THROW(params.num_recv_frames == liberio_chan_get_num_bufs(_rx_chan)); + + for (size_t i = 0; i < params.num_recv_frames; i++) { + _recv_buffs.push_back(liberio_frame_buff(_rx_chan)); + } + UHD_ASSERT_THROW(!liberio_chan_enqueue_all(_rx_chan)); + + /* Start streaming on the devices */ + liberio_chan_start_streaming(_rx_chan); + liberio_chan_start_streaming(_tx_chan); + + /* Finally, preload the buffer pools in parent class */ + for (auto& buff : _send_buffs) { + send_link_base_t::preload_free_buff(&buff); + } + for (auto& buff : _recv_buffs) { + recv_link_base_t::preload_free_buff(&buff); + } + + auto info = liberio_adapter_info(); + auto& adap_ctx = adapter_ctx::get(); + _adapter_id = adap_ctx.register_adapter(info); + + UHD_LOGGER_TRACE("LIBERIO") + << boost::format("Created liberio link (tx:%s, rx:%s)") % tx_path % rx_path; + UHD_LOGGER_TRACE("LIBERIO") + << boost::format("num_recv_frames=%d, recv_frame_size=%d, num_send_frames=%d, " + "send_frame_size=%d") + % params.num_recv_frames % params.recv_frame_size % params.num_send_frames + % params.send_frame_size; +} + +liberio_link::~liberio_link() +{ + liberio_chan_put(_tx_chan); + liberio_chan_put(_rx_chan); +} + +liberio_link::sptr liberio_link::make( + const std::string& tx_path, const std::string& rx_path, const link_params_t& params) +{ + UHD_ASSERT_THROW(params.recv_frame_size > 0); + UHD_ASSERT_THROW(params.send_frame_size > 0); + UHD_ASSERT_THROW(params.num_send_frames > 0); + UHD_ASSERT_THROW(params.num_recv_frames > 0); + + return std::make_shared(tx_path, rx_path, params); +} + +}} // namespace uhd::transport diff --git a/host/lib/transport/liberio_zero_copy.cpp b/host/lib/transport/liberio_zero_copy.cpp deleted file mode 100644 index 15131910a..000000000 --- a/host/lib/transport/liberio_zero_copy.cpp +++ /dev/null @@ -1,279 +0,0 @@ -// -// Copyright 2017 Ettus Research -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#include "liberio_zero_copy.hpp" -#include -#include -#include -#include -#include -#include -#include - -namespace uhd { namespace transport { - -static const uint64_t USEC = 1000000; - -static void liberio_log_cb(int severity, const char* msg, void*) -{ - switch (severity) { - case LOG_WARNING: - UHD_LOG_WARNING("LIBERIO", msg); - return; - case LOG_NOTICE: - case LOG_INFO: - UHD_LOG_INFO("LIBERIO", msg); - return; - default: - UHD_LOG_INFO("LIBERIO", msg); - }; -} - -class liberio_context_holder -{ -public: - liberio_context_holder(void) - { - _ctx = liberio_ctx_new(); - liberio_ctx_register_logger(_ctx, &liberio_log_cb, nullptr); - } - - ~liberio_context_holder(void) - { - liberio_ctx_put(_ctx); - } - - liberio_ctx* get(void) - { - liberio_ctx_get(_ctx); - return _ctx; - } - -private: - liberio_ctx* _ctx; -}; - -class liberio_zero_copy_msb : public virtual managed_send_buffer -{ -public: - liberio_zero_copy_msb(liberio_chan* chan) : _chan(chan), _buf(nullptr) {} - ~liberio_zero_copy_msb(void) - { - liberio_chan_put(_chan); - } - - void release(void) - { - if (_buf) { - liberio_buf_set_payload(_buf, 0, _length); - liberio_chan_buf_enqueue(_chan, _buf); - } - } - - sptr get_new(double timeout, size_t& index) - { - _buf = liberio_chan_buf_dequeue(_chan, timeout * USEC); - if (!_buf) - return sptr(); - - index++; - - return make(this, liberio_buf_get_mem(_buf, 0), liberio_buf_get_len(_buf, 0)); - } - -private: - liberio_chan* _chan; - liberio_buf* _buf; -}; - -class liberio_zero_copy_mrb : public virtual managed_recv_buffer -{ -public: - liberio_zero_copy_mrb(liberio_chan* chan) : _chan(chan), _buf(nullptr) {} - ~liberio_zero_copy_mrb(void) - { - liberio_chan_put(_chan); - } - - void release(void) - { - if (_buf) - liberio_chan_buf_enqueue(_chan, _buf); - } - - sptr get_new(double timeout, size_t& index) - { - _buf = liberio_chan_buf_dequeue(_chan, timeout * USEC); - if (!_buf) - return sptr(); - - index++; - - return make(this, liberio_buf_get_mem(_buf, 0), liberio_buf_get_payload(_buf, 0)); - } - -private: - liberio_chan* _chan; - liberio_buf* _buf; -}; - -class liberio_zero_copy_impl : public liberio_zero_copy -{ -public: - liberio_zero_copy_impl(const std::string& tx_path, - const std::string& rx_path, - const zero_copy_xport_params& xport_params) - : _tx_buf_size(xport_params.send_frame_size) - , _rx_buf_size(xport_params.recv_frame_size) - , _next_recv_buff_index(0) - , _next_send_buff_index(0) - { - UHD_ASSERT_THROW(xport_params.recv_frame_size > 0); - UHD_ASSERT_THROW(xport_params.send_frame_size > 0); - UHD_ASSERT_THROW(xport_params.num_send_frames > 0); - UHD_ASSERT_THROW(xport_params.num_recv_frames > 0); - - { - std::lock_guard _l(_context_lock); - if (!_ref_count) { - _context_holder = std::make_shared(); - } - - _ref_count++; - } - liberio_ctx* ctx = _context_holder->get(); - - /* we hold a reference, that we'd drop immediately after, - * so no requirement to get another one here ... - */ - _tx_chan = liberio_ctx_alloc_chan(ctx, tx_path.c_str(), TX, USRP_MEMORY_MMAP); - - UHD_ASSERT_THROW(_tx_chan); - liberio_chan_stop_streaming(_tx_chan); - liberio_chan_request_buffers(_tx_chan, 0); - UHD_ASSERT_THROW( - !liberio_chan_set_fixed_size(_tx_chan, 0, xport_params.send_frame_size)); - UHD_ASSERT_THROW( - !liberio_chan_request_buffers(_tx_chan, xport_params.num_send_frames)); - _num_send_bufs = liberio_chan_get_num_bufs(_tx_chan); - - for (size_t i = 0; i < xport_params.num_send_frames; i++) { - liberio_chan_get(_tx_chan); - _msb_pool.push_back(boost::make_shared(_tx_chan)); - } - - /* we hold a reference, that we'd drop immediately after, - * so no requirement to get another one here ... - */ - _rx_chan = liberio_ctx_alloc_chan(ctx, rx_path.c_str(), RX, USRP_MEMORY_MMAP); - UHD_ASSERT_THROW(_rx_chan); - - /* done with the local reference, the channel keeps its own */ - liberio_ctx_put(ctx); - - /* stop the channel, free the buffers, set the size, allocate */ - liberio_chan_stop_streaming(_rx_chan); - liberio_chan_request_buffers(_rx_chan, 0); - UHD_ASSERT_THROW( - !liberio_chan_set_fixed_size(_rx_chan, 0, xport_params.recv_frame_size)); - UHD_ASSERT_THROW( - !liberio_chan_request_buffers(_rx_chan, xport_params.num_recv_frames)); - _num_recv_bufs = liberio_chan_get_num_bufs(_rx_chan); - - for (size_t i = 0; i < xport_params.num_recv_frames; i++) { - liberio_chan_get(_rx_chan); - _mrb_pool.push_back(boost::make_shared(_rx_chan)); - } - - UHD_ASSERT_THROW(!liberio_chan_enqueue_all(_rx_chan)); - - liberio_chan_start_streaming(_rx_chan); - liberio_chan_start_streaming(_tx_chan); - } - - ~liberio_zero_copy_impl(void) - { - liberio_chan_put(_tx_chan); - liberio_chan_put(_rx_chan); - { - std::lock_guard _l(_context_lock); - _ref_count--; - if (!_ref_count) { - _context_holder.reset(); - } - } - } - - managed_recv_buffer::sptr get_recv_buff(double timeout = 0.1) - { - std::lock_guard lock(_rx_mutex); - if (_next_recv_buff_index == _num_recv_bufs) - _next_recv_buff_index = 0; - return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index); - } - - size_t get_num_recv_frames(void) const - { - return liberio_chan_get_num_bufs(_rx_chan); - } - - size_t get_recv_frame_size(void) const - { - return _rx_buf_size; - } - - managed_send_buffer::sptr get_send_buff(double timeout = 0.1) - { - std::lock_guard lock(_tx_mutex); - if (_next_send_buff_index == _num_send_bufs) - _next_send_buff_index = 0; - return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); - } - - size_t get_num_send_frames(void) const - { - return liberio_chan_get_num_bufs(_tx_chan); - } - - size_t get_send_frame_size(void) const - { - return _tx_buf_size; - } - -private: - liberio_chan* _tx_chan; - const size_t _tx_buf_size; - size_t _num_send_bufs; - liberio_chan* _rx_chan; - const size_t _rx_buf_size; - size_t _num_recv_bufs; - - std::vector> _mrb_pool; - size_t _next_recv_buff_index; - std::vector> _msb_pool; - size_t _next_send_buff_index; - std::mutex _rx_mutex; - std::mutex _tx_mutex; - - static std::mutex _context_lock; - static size_t _ref_count; - static std::shared_ptr _context_holder; -}; - -std::mutex liberio_zero_copy_impl::_context_lock; -size_t liberio_zero_copy_impl::_ref_count = 0; -std::shared_ptr liberio_zero_copy_impl::_context_holder{}; - -liberio_zero_copy::sptr liberio_zero_copy::make(const std::string& tx_path, - const std::string& rx_path, - const zero_copy_xport_params& default_buff_args) -{ - return liberio_zero_copy::sptr( - new liberio_zero_copy_impl(tx_path, rx_path, default_buff_args)); -} - -}} // namespace uhd::transport diff --git a/host/lib/transport/liberio_zero_copy.hpp b/host/lib/transport/liberio_zero_copy.hpp deleted file mode 100644 index b41b6416c..000000000 --- a/host/lib/transport/liberio_zero_copy.hpp +++ /dev/null @@ -1,35 +0,0 @@ -// -// Copyright 2017 Ettus Research -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#ifndef LIBERIO_HPP -#define LIBERIO_HPP - -#include -#include -#include -#include -#include -#include - -namespace uhd { namespace transport { - -/*! - * A zero copy transport interface to the liberio DMA library. - */ -class liberio_zero_copy : public virtual zero_copy_if -{ -public: - typedef boost::shared_ptr sptr; - - static sptr make(const std::string& tx_path, - const std::string& rx_path, - const zero_copy_xport_params& default_buff_args); -}; - -}} // namespace uhd::transport - -#endif /* LIBERIO_HPP */ -- cgit v1.2.3