diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 10 | ||||
-rw-r--r-- | host/lib/transport/buffer_pool.cpp | 9 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 6 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 93 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 78 | ||||
-rw-r--r-- | host/lib/transport/xport_benchmarker.cpp | 158 | ||||
-rw-r--r-- | host/lib/transport/xport_benchmarker.hpp | 63 | ||||
-rw-r--r-- | host/lib/transport/zero_copy_recv_offload.cpp | 146 |
8 files changed, 0 insertions, 563 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index e899cfba0..d21644f01 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -115,7 +115,6 @@ LIBUHD_PYTHON_GEN_SOURCE( LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_flow_ctrl.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp ${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp @@ -130,7 +129,6 @@ LIBUHD_APPEND_SOURCES( if(ENABLE_X300) LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/nirio_link.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp ) endif(ENABLE_X300) @@ -150,11 +148,3 @@ if(ENABLE_DPDK) ) endif(ENABLE_DPDK) -# Verbose Debug output for send/recv -set( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" ) -option( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" ) -if(UHD_TXRX_DEBUG_PRINTS) - message(STATUS "Using verbose debug output for send/recv") - add_definitions(-DUHD_TXRX_DEBUG_PRINTS) -endif() - diff --git a/host/lib/transport/buffer_pool.cpp b/host/lib/transport/buffer_pool.cpp index c481b9d02..0dd4a8d7f 100644 --- a/host/lib/transport/buffer_pool.cpp +++ b/host/lib/transport/buffer_pool.cpp @@ -12,15 +12,6 @@ using namespace uhd::transport; -#ifdef UHD_TXRX_DEBUG_PRINTS -/* - * This is the implementation for the static variable 's_buffer_count' - * located in uhd/transport/zero_copy.hpp. - * It is used in the managed_buffer class. - */ -boost::detail::atomic_count managed_buffer::s_buffer_count(0); -#endif // UHD_TXRX_DEBUG_PRINTS - //! pad the byte count to a multiple of alignment static size_t pad_to_boundary(const size_t bytes, const size_t alignment) { diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f5693b198..9a1b74fb2 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,12 +21,6 @@ #include <boost/thread/mutex.hpp> #include <list> -#ifdef UHD_TXRX_DEBUG_PRINTS -# include <boost/format.hpp> -# include <fstream> -# include <vector> -#endif - using namespace uhd; using namespace uhd::transport; diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 85d00bad1..950da4c8a 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -18,7 +18,6 @@ #include <uhd/utils/byteswap.hpp> #include <uhd/utils/log.hpp> #include <uhd/utils/tasks.hpp> -#include <uhdlib/rfnoc/rx_stream_terminator.hpp> #include <boost/dynamic_bitset.hpp> #include <boost/format.hpp> #include <boost/function.hpp> @@ -26,13 +25,6 @@ #include <iostream> #include <vector> -// Included for debugging -#ifdef UHD_TXRX_DEBUG_PRINTS -# include "boost/date_time/posix_time/posix_time.hpp" -# include <boost/format.hpp> -# include <boost/thread/thread.hpp> -#endif - namespace uhd { namespace transport { namespace sph { UHD_INLINE uint32_t get_context_code( @@ -250,10 +242,6 @@ public: recv_one_packet(buffs, nsamps_per_buff, metadata, timeout); if (one_packet or metadata.end_of_burst) { -#ifdef UHD_TXRX_DEBUG_PRINTS - dbg_gather_data( - nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); -#endif return accum_num_samps; } @@ -285,9 +273,6 @@ public: break; } } -#ifdef UHD_TXRX_DEBUG_PRINTS - dbg_gather_data(nsamps_per_buff, accum_num_samps, metadata, timeout, one_packet); -#endif return accum_num_samps; } @@ -813,84 +798,6 @@ private: size_t _convert_buffer_offset_bytes; size_t _convert_bytes_to_copy; - /* - * This last section is only for debugging purposes. - * It causes a lot of prints to stderr which can be piped to a file. - * Gathered data can be used to post process it with external tools. - */ -#ifdef UHD_TXRX_DEBUG_PRINTS - struct dbg_recv_stat_t - { - dbg_recv_stat_t(long wc, - size_t nspb, - size_t nsr, - uhd::rx_metadata_t md, - double to, - bool op, - double rate) - : wallclock(wc) - , nsamps_per_buff(nspb) - , nsamps_recv(nsr) - , metadata(md) - , timeout(to) - , one_packet(op) - , samp_rate(rate) - { - } - long wallclock; - size_t nsamps_per_buff; - size_t nsamps_recv; - uhd::rx_metadata_t metadata; - double timeout; - bool one_packet; - double samp_rate; - // Create a formatted print line for all the info gathered in this struct. - std::string print_line() - { - boost::format fmt("recv,%ld,%f,%i,%i,%s,%i,%s,%s,%s,%i,%s,%ld"); - fmt % wallclock; - fmt % timeout % (int)nsamps_per_buff % (int)nsamps_recv; - fmt % (one_packet ? "true" : "false"); - fmt % metadata.error_code; - fmt % (metadata.start_of_burst ? "true" : "false") - % (metadata.end_of_burst ? "true" : "false"); - fmt % (metadata.more_fragments ? "true" : "false") - % (int)metadata.fragment_offset; - fmt % (metadata.has_time_spec ? "true" : "false") - % metadata.time_spec.to_ticks(samp_rate); - return fmt.str(); - } - }; - - void dbg_gather_data(const size_t nsamps_per_buff, - const size_t nsamps_recv, - uhd::rx_metadata_t& metadata, - const double timeout, - const bool one_packet, - bool dbg_print_directly = true) - { - // Initialize a struct with all available data. It can return a formatted string - // with all infos if wanted. - dbg_recv_stat_t data(boost::get_system_time().time_of_day().total_microseconds(), - nsamps_per_buff, - nsamps_recv, - metadata, - timeout, - one_packet, - _samp_rate); - if (dbg_print_directly) { - dbg_print_err(data.print_line()); - } - } - - - void dbg_print_err(std::string msg) - { - std::string dbg_prefix("super_recv_packet_handler,"); - msg = dbg_prefix + msg; - fprintf(stderr, "%s\n", msg.c_str()); - } -#endif }; class recv_packet_streamer : public recv_packet_handler, public rx_streamer diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index af6ecaa5e..cd707cb89 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -18,22 +18,12 @@ #include <uhd/utils/byteswap.hpp> #include <uhd/utils/tasks.hpp> #include <uhd/utils/thread.hpp> -#include <uhdlib/rfnoc/tx_stream_terminator.hpp> #include <boost/function.hpp> #include <chrono> #include <iostream> #include <thread> #include <vector> -#ifdef UHD_TXRX_DEBUG_PRINTS -// Included for debugging -# include "boost/date_time/posix_time/posix_time.hpp" -# include <boost/format.hpp> -# include <boost/thread/thread.hpp> -# include <fstream> -# include <map> -#endif - namespace uhd { namespace transport { namespace sph { /*********************************************************************** @@ -240,9 +230,6 @@ public: size_t nsamps_sent = send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout); -#ifdef UHD_TXRX_DEBUG_PRINTS - dbg_print_send(nsamps_per_buff, nsamps_sent, metadata, timeout); -#endif return nsamps_sent; } size_t total_num_samps_sent = 0; @@ -281,10 +268,6 @@ public: if_packet_info, timeout, total_num_samps_sent * _bytes_per_cpu_item); -#ifdef UHD_TXRX_DEBUG_PRINTS - dbg_print_send(nsamps_per_buff, nsamps_sent, metadata, timeout); - -#endif return nsamps_sent; } @@ -314,67 +297,6 @@ private: bool _cached_metadata; uhd::tx_metadata_t _metadata_cache; -#ifdef UHD_TXRX_DEBUG_PRINTS - struct dbg_send_stat_t - { - dbg_send_stat_t(long wc, - size_t nspb, - size_t nss, - uhd::tx_metadata_t md, - double to, - double rate) - : wallclock(wc) - , nsamps_per_buff(nspb) - , nsamps_sent(nss) - , metadata(md) - , timeout(to) - , samp_rate(rate) - { - } - long wallclock; - size_t nsamps_per_buff; - size_t nsamps_sent; - uhd::tx_metadata_t metadata; - double timeout; - double samp_rate; - // Create a formatted print line for all the info gathered in this struct. - std::string print_line() - { - boost::format fmt("send,%ld,%f,%i,%i,%s,%s,%s,%ld"); - fmt % wallclock; - fmt % timeout % (int)nsamps_per_buff % (int)nsamps_sent; - fmt % (metadata.start_of_burst ? "true" : "false") - % (metadata.end_of_burst ? "true" : "false"); - fmt % (metadata.has_time_spec ? "true" : "false") - % metadata.time_spec.to_ticks(samp_rate); - return fmt.str(); - } - }; - - void dbg_print_send(size_t nsamps_per_buff, - size_t nsamps_sent, - const uhd::tx_metadata_t& metadata, - const double timeout, - bool dbg_print_directly = true) - { - dbg_send_stat_t data(boost::get_system_time().time_of_day().total_microseconds(), - nsamps_per_buff, - nsamps_sent, - metadata, - timeout, - _samp_rate); - if (dbg_print_directly) { - dbg_print_err(data.print_line()); - } - } - void dbg_print_err(std::string msg) - { - msg = "super_send_packet_handler," + msg; - fprintf(stderr, "%s\n", msg.c_str()); - } - - -#endif /******************************************************************* * Send a single packet: diff --git a/host/lib/transport/xport_benchmarker.cpp b/host/lib/transport/xport_benchmarker.cpp deleted file mode 100644 index 67582ff2c..000000000 --- a/host/lib/transport/xport_benchmarker.cpp +++ /dev/null @@ -1,158 +0,0 @@ -// -// Copyright 2010-2013 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#include "xport_benchmarker.hpp" -#include <chrono> -#include <thread> - -namespace uhd { namespace transport { - -const device_addr_t& xport_benchmarker::benchmark_throughput_chdr( - zero_copy_if::sptr tx_transport, - zero_copy_if::sptr rx_transport, - uint32_t sid, - bool big_endian, - uint32_t duration_ms) -{ - vrt::if_packet_info_t pkt_info; - _initialize_chdr(tx_transport, rx_transport, sid, pkt_info); - _reset_counters(); - boost::posix_time::ptime start_time(boost::posix_time::microsec_clock::local_time()); - - _tx_thread.reset(new boost::thread(boost::bind(&xport_benchmarker::_stream_tx, - this, - tx_transport.get(), - &pkt_info, - big_endian))); - _rx_thread.reset(new boost::thread(boost::bind(&xport_benchmarker::_stream_rx, - this, - rx_transport.get(), - &pkt_info, - big_endian))); - - std::this_thread::sleep_for(std::chrono::milliseconds(duration_ms)); - - _tx_thread->interrupt(); - _rx_thread->interrupt(); - _tx_thread->join(); - _rx_thread->join(); - - boost::posix_time::ptime stop_time(boost::posix_time::microsec_clock::local_time()); - double duration_s = ((double)(stop_time - start_time).total_microseconds()) / 1e6; - - uint64_t tx_bytes = pkt_info.num_payload_words32 * sizeof(uint32_t) * _num_tx_packets; - uint64_t rx_bytes = pkt_info.num_payload_words32 * sizeof(uint32_t) * _num_rx_packets; - double tx_rate = (((double)tx_bytes) / duration_s); - double rx_rate = (((double)rx_bytes) / duration_s); - - _results["TX-Bytes"] = (boost::format("%.2fMB") % (tx_bytes / (1024 * 1024))).str(); - _results["RX-Bytes"] = (boost::format("%.2fMB") % (rx_bytes / (1024 * 1024))).str(); - _results["TX-Throughput"] = - (boost::format("%.2fMB/s") % (tx_rate / (1024 * 1024))).str(); - _results["RX-Throughput"] = - (boost::format("%.2fMB/s") % (rx_rate / (1024 * 1024))).str(); - _results["TX-Timeouts"] = std::to_string(_num_tx_timeouts); - _results["RX-Timeouts"] = std::to_string(_num_rx_timeouts); - _results["Data-Errors"] = std::to_string(_num_data_errors); - - return _results; -} - -void xport_benchmarker::_stream_tx( - zero_copy_if* transport, vrt::if_packet_info_t* pkt_info, bool big_endian) -{ - while (not boost::this_thread::interruption_requested()) { - managed_send_buffer::sptr buff = transport->get_send_buff(_tx_timeout); - if (buff) { - uint32_t* packet_buff = buff->cast<uint32_t*>(); - // Populate packet - if (big_endian) { - vrt::if_hdr_pack_be(packet_buff, *pkt_info); - } else { - vrt::if_hdr_pack_le(packet_buff, *pkt_info); - } - // send the buffer over the interface - buff->commit(sizeof(uint32_t) * (pkt_info->num_packet_words32)); - _num_tx_packets++; - } else { - _num_tx_timeouts++; - } - } -} - -void xport_benchmarker::_stream_rx( - zero_copy_if* transport, const vrt::if_packet_info_t* exp_pkt_info, bool big_endian) -{ - while (not boost::this_thread::interruption_requested()) { - managed_recv_buffer::sptr buff = transport->get_recv_buff(_rx_timeout); - if (buff) { - // Extract packet info - vrt::if_packet_info_t pkt_info; - pkt_info.link_type = exp_pkt_info->link_type; - pkt_info.num_packet_words32 = buff->size() / sizeof(uint32_t); - const uint32_t* packet_buff = buff->cast<const uint32_t*>(); - - _num_rx_packets++; - - // unpacking can fail - try { - if (big_endian) { - vrt::if_hdr_unpack_be(packet_buff, pkt_info); - } else { - vrt::if_hdr_unpack_le(packet_buff, pkt_info); - } - - if (exp_pkt_info->packet_type != pkt_info.packet_type - || exp_pkt_info->num_payload_bytes != pkt_info.num_payload_bytes) { - _num_data_errors++; - } - } catch (const std::exception& ex) { - _num_data_errors++; - } - } else { - _num_rx_timeouts++; - } - } -} - -void xport_benchmarker::_reset_counters(void) -{ - _num_tx_packets = 0; - _num_rx_packets = 0; - _num_tx_timeouts = 0; - _num_rx_timeouts = 0; - _num_data_errors = 0; -} - -void xport_benchmarker::_initialize_chdr(zero_copy_if::sptr tx_transport, - zero_copy_if::sptr rx_transport, - uint32_t sid, - vrt::if_packet_info_t& pkt_info) -{ - _tx_timeout = 0.5; - _rx_timeout = 0.5; - - size_t frame_size = std::min( - tx_transport->get_send_frame_size(), rx_transport->get_recv_frame_size()); - - pkt_info.link_type = vrt::if_packet_info_t::LINK_TYPE_CHDR; - pkt_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; - pkt_info.num_packet_words32 = (frame_size / sizeof(uint32_t)); - pkt_info.num_payload_words32 = pkt_info.num_packet_words32 - 2; - pkt_info.num_payload_bytes = pkt_info.num_payload_words32 * sizeof(uint32_t); - pkt_info.packet_count = 0; - pkt_info.sob = false; - pkt_info.eob = false; - pkt_info.sid = sid; - pkt_info.has_sid = true; - pkt_info.has_cid = false; - pkt_info.has_tsi = false; - pkt_info.has_tsf = false; - pkt_info.has_tlr = false; -} - -}} // namespace uhd::transport diff --git a/host/lib/transport/xport_benchmarker.hpp b/host/lib/transport/xport_benchmarker.hpp deleted file mode 100644 index 3843540e0..000000000 --- a/host/lib/transport/xport_benchmarker.hpp +++ /dev/null @@ -1,63 +0,0 @@ -// -// Copyright 2010-2013 Ettus Research LLC -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#ifndef INCLUDED_LIBUHD_XPORT_BENCHMARKER_HPP -#define INCLUDED_LIBUHD_XPORT_BENCHMARKER_HPP - -#include <uhd/transport/vrt_if_packet.hpp> -#include <uhd/transport/zero_copy.hpp> -#include <uhd/types/device_addr.hpp> -#include <uhd/utils/log.hpp> -#include <boost/shared_ptr.hpp> -#include <boost/thread/thread.hpp> - -namespace uhd { namespace transport { - -// Test class to benchmark a low-level transport object with a VITA/C-VITA data stream -class xport_benchmarker : uhd::noncopyable -{ -public: - const device_addr_t& benchmark_throughput_chdr(zero_copy_if::sptr tx_transport, - zero_copy_if::sptr rx_transport, - uint32_t sid, - bool big_endian, - uint32_t duration_ms); - -private: - void _stream_tx( - zero_copy_if* transport, vrt::if_packet_info_t* pkt_info, bool big_endian); - - void _stream_rx(zero_copy_if* transport, - const vrt::if_packet_info_t* exp_pkt_info, - bool big_endian); - - void _initialize_chdr(zero_copy_if::sptr tx_transport, - zero_copy_if::sptr rx_transport, - uint32_t sid, - vrt::if_packet_info_t& pkt_info); - - void _reset_counters(void); - - boost::shared_ptr<boost::thread> _tx_thread; - boost::shared_ptr<boost::thread> _rx_thread; - - uint64_t _num_tx_packets; - uint64_t _num_rx_packets; - uint64_t _num_tx_timeouts; - uint64_t _num_rx_timeouts; - uint64_t _num_data_errors; - - double _tx_timeout; - double _rx_timeout; - - device_addr_t _results; -}; - - -}} // namespace uhd::transport - -#endif /* INCLUDED_LIBUHD_XPORT_BENCHMARKER_HPP */ diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp deleted file mode 100644 index 7329dbdf3..000000000 --- a/host/lib/transport/zero_copy_recv_offload.cpp +++ /dev/null @@ -1,146 +0,0 @@ -// -// Copyright 2016 Ettus Research -// Copyright 2018 Ettus Research, a National Instruments Company -// -// SPDX-License-Identifier: GPL-3.0-or-later -// - -#include <uhd/transport/bounded_buffer.hpp> -#include <uhd/transport/buffer_pool.hpp> -#include <uhd/transport/zero_copy_recv_offload.hpp> -#include <uhd/utils/log.hpp> -#include <uhd/utils/safe_call.hpp> -#include <uhd/utils/thread.hpp> -#include <boost/bind.hpp> -#include <boost/format.hpp> -#include <boost/make_shared.hpp> -#include <boost/thread/mutex.hpp> -#include <boost/thread/thread.hpp> - -using namespace uhd; -using namespace uhd::transport; - -typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t; - -/*********************************************************************** - * Zero copy offload transport: - * An intermediate transport that utilizes threading to free - * the main thread from any receive work. - **********************************************************************/ -class zero_copy_recv_offload_impl : public zero_copy_recv_offload -{ -public: - typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr; - - zero_copy_recv_offload_impl(zero_copy_if::sptr transport, const double timeout) - : _transport(transport) - , _timeout(timeout) - , _inbox(transport->get_num_recv_frames()) - , _recv_done(false) - { - UHD_LOGGER_TRACE("XPORT") << "Created threaded transport"; - - // Create the receive and send threads to offload - // the system calls onto other threads - _recv_thread = - boost::thread(boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this)); - set_thread_name(&_recv_thread, "zero_copy_recv"); - } - - // Receive thread flags - void set_recv_done() - { - boost::lock_guard<boost::mutex> guard(_recv_mutex); - _recv_done = true; - } - - bool is_recv_done() - { - boost::lock_guard<boost::mutex> guard(_recv_mutex); - return _recv_done; - } - - ~zero_copy_recv_offload_impl() - { - // Signal the threads we're finished - set_recv_done(); - - // Wait for them to join - UHD_SAFE_CALL(_recv_thread.join();) - } - - // The receive thread function is responsible for - // pulling pointers to managed receiver buffers quickly - void enqueue_recv() - { - while (not is_recv_done()) { - managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout); - if (not buff) - continue; - _inbox.push_with_timed_wait(buff, _timeout); - } - } - - /******************************************************************* - * Receive implementation: - * Pop the receive buffer pointer from the underlying transport - ******************************************************************/ - managed_recv_buffer::sptr get_recv_buff(double timeout) - { - managed_recv_buffer::sptr ptr; - _inbox.pop_with_timed_wait(ptr, timeout); - return ptr; - } - - size_t get_num_recv_frames() const - { - return _transport->get_num_recv_frames(); - } - - size_t get_recv_frame_size() const - { - return _transport->get_recv_frame_size(); - } - - /******************************************************************* - * Send implementation: - * Pass the send buffer pointer from the underlying transport - ******************************************************************/ - managed_send_buffer::sptr get_send_buff(double timeout) - { - return _transport->get_send_buff(timeout); - } - - size_t get_num_send_frames() const - { - return _transport->get_num_send_frames(); - } - - size_t get_send_frame_size() const - { - return _transport->get_send_frame_size(); - } - -private: - // The linked transport - zero_copy_if::sptr _transport; - - const double _timeout; - - // Shared buffers - bounded_buffer_t _inbox; - - // Threading - bool _recv_done; - boost::thread _recv_thread; - boost::mutex _recv_mutex; -}; - -zero_copy_recv_offload::sptr zero_copy_recv_offload::make( - zero_copy_if::sptr transport, const double timeout) -{ - zero_copy_recv_offload_impl::sptr zero_copy_recv_offload( - new zero_copy_recv_offload_impl(transport, timeout)); - - return zero_copy_recv_offload; -} |