From 2c8a7c7debf19d92065661cc1d258f97bd38e224 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Thu, 30 Sep 2010 14:36:24 -0700 Subject: uhd: implemented recv timeout for zero copy interface --- host/lib/transport/udp_zero_copy_asio.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ee989ee2b..0a6c9f2af 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -35,7 +35,6 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 2 //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); -static const double RECV_TIMEOUT = 0.1; //100 ms /*********************************************************************** * Zero Copy UDP implementation with ASIO: @@ -110,11 +109,11 @@ private: boost::asio::io_service _io_service; int _sock_fd; - ssize_t recv(const boost::asio::mutable_buffer &buff){ + ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ //setup timeval for timeout timeval tv; tv.tv_sec = 0; - tv.tv_usec = int(RECV_TIMEOUT*1e6); + tv.tv_usec = timeout_ms*1000; //setup rset for timeout fd_set rset; -- cgit v1.2.3 From 00cd6018405b57a0982b0ce103ff858c646ee18c Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Fri, 1 Oct 2010 18:22:41 -0700 Subject: uhd: implemented a double timeout (in seconds) for send and recv chains converted all size_t timeout_ms to double timeout bounded and alignment buffer now take double timeout added timeout to device::send and zero_copy_if::get_send_buff --- host/include/uhd/device.hpp | 27 ++++++++------ host/include/uhd/device.ipp | 9 +++-- host/include/uhd/transport/alignment_buffer.hpp | 9 ++--- host/include/uhd/transport/alignment_buffer.ipp | 15 +++----- host/include/uhd/transport/bounded_buffer.hpp | 12 ++---- host/include/uhd/transport/bounded_buffer.ipp | 15 ++++++-- host/include/uhd/transport/zero_copy.hpp | 18 +++++---- host/lib/transport/libusb1_zero_copy.cpp | 42 ++++++++++----------- host/lib/transport/udp_zero_copy_asio.cpp | 4 +- host/lib/transport/vrt_packet_handler.hpp | 49 +++++++++++++------------ host/lib/transport/zero_copy.cpp | 6 +-- host/lib/usrp/usrp1/io_impl.cpp | 20 +++++----- host/lib/usrp/usrp1/usrp1_impl.cpp | 4 +- host/lib/usrp/usrp1/usrp1_impl.hpp | 7 ++-- host/lib/usrp/usrp2/io_impl.cpp | 32 ++++++++-------- host/lib/usrp/usrp2/usrp2_impl.hpp | 6 +-- host/test/buffer_test.cpp | 2 +- 17 files changed, 140 insertions(+), 137 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index 2077cae62..992276928 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -41,9 +41,6 @@ public: typedef boost::function find_t; typedef boost::function make_t; - //! A reasonable default timeout for receive - static const size_t default_recv_timeout_ms = 100; - /*! * Register a device into the discovery and factory system. * @@ -112,12 +109,15 @@ public: * * This is a blocking call and will not return until the number * of samples returned have been read out of each buffer. + * Under a timeout condition, the number of samples returned + * may be less than the number of samples specified. * * \param buffs a vector of read-only memory containing IF data * \param nsamps_per_buff the number of samples to send, per buffer * \param metadata data describing the buffer's contents * \param io_type the type of data loaded in the buffer * \param send_mode tells send how to unload the buffer + * \param timeout the timeout in seconds to wait on a packet * \return the number of samples sent */ virtual size_t send( @@ -125,7 +125,8 @@ public: size_t nsamps_per_buff, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, + double timeout = 0.1 ) = 0; /*! @@ -136,7 +137,8 @@ public: size_t nsamps_per_buff, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, + double timeout = 0.1 ); /*! @@ -154,7 +156,9 @@ public: * See the rx metadata fragment flags and offset fields for details. * * This is a blocking call and will not return until the number - * of samples returned have been written into each buffer or timeout. + * of samples returned have been written into each buffer. + * Under a timeout condition, the number of samples returned + * may be less than the number of samples specified. * * When using the full buffer recv mode, the metadata only applies * to the first packet received and written into the recv buffers. @@ -165,7 +169,7 @@ public: * \param metadata data to fill describing the buffer * \param io_type the type of data to fill into the buffer * \param recv_mode tells recv how to load the buffer - * \param timeout_ms the timeout in milliseconds to wait for a packet + * \param timeout the timeout in seconds to wait for a packet * \return the number of samples received or 0 on error */ virtual size_t recv( @@ -174,7 +178,7 @@ public: rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, - size_t timeout_ms = default_recv_timeout_ms + double timeout = 0.1 ) = 0; /*! @@ -186,7 +190,7 @@ public: rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, - size_t timeout_ms = default_recv_timeout_ms + double timeout = 0.1 ); /*! @@ -204,12 +208,11 @@ public: /*! * Receive and asynchronous message from the device. * \param async_metadata the metadata to be filled in - * \param timeout_ms the timeout in milliseconds to wait for a message + * \param timeout the timeout in seconds to wait for a message * \return true when the async_metadata is valid, false for timeout */ virtual bool recv_async_msg( - async_metadata_t &async_metadata, - size_t timeout_ms = default_recv_timeout_ms + async_metadata_t &async_metadata, double timeout = 0.1 ) = 0; }; diff --git a/host/include/uhd/device.ipp b/host/include/uhd/device.ipp index 60a3f535d..e2e51ecd0 100644 --- a/host/include/uhd/device.ipp +++ b/host/include/uhd/device.ipp @@ -25,12 +25,13 @@ namespace uhd{ size_t nsamps_per_buff, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, + double timeout ){ return this->send( std::vector(1, buff), nsamps_per_buff, metadata, - io_type, send_mode + io_type, send_mode, timeout ); } @@ -40,12 +41,12 @@ namespace uhd{ rx_metadata_t &metadata, const io_type_t &io_type, recv_mode_t recv_mode, - size_t timeout_ms + double timeout ){ return this->recv( std::vector(1, buff), nsamps_per_buff, metadata, - io_type, recv_mode, timeout_ms + io_type, recv_mode, timeout ); } diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp index 29ba74efc..f44a037f8 100644 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -48,20 +48,17 @@ namespace uhd{ namespace transport{ * \return true if the element fit without popping for space */ virtual bool push_with_pop_on_full( - const elem_type &elem, - const seq_type &seq, - size_t index + const elem_type &elem, const seq_type &seq, size_t index ) = 0; /*! * Pop an aligned set of elements from this alignment buffer. * \param elems a collection to store the aligned elements - * \param time the timeout time + * \param timeout the timeout in seconds * \return false when the operation times out */ virtual bool pop_elems_with_timed_wait( - std::vector &elems, - const time_duration_t &time + std::vector &elems, double timeout ) = 0; }; diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp index 61b3b60f5..5f09de0d9 100644 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ b/host/include/uhd/transport/alignment_buffer.ipp @@ -41,9 +41,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ } UHD_INLINE bool push_with_pop_on_full( - const elem_type &elem, - const seq_type &seq, - size_t index + const elem_type &elem, const seq_type &seq, size_t index ){ //clear the buffer for this index if the seqs are mis-ordered if (seq < _last_seqs[index]){ @@ -54,17 +52,16 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ } UHD_INLINE bool pop_elems_with_timed_wait( - std::vector &elems, - const time_duration_t &time + std::vector &elems, double timeout ){ - boost::system_time exit_time = boost::get_system_time() + time; + boost::system_time exit_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1e6)); buff_contents_type buff_contents_tmp; std::list indexes_to_do(_all_indexes); //do an initial pop to load an initial sequence id size_t index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait( - buff_contents_tmp, exit_time - boost::get_system_time() + buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() )) return false; elems[index] = buff_contents_tmp.first; seq_type expected_seq_id = buff_contents_tmp.second; @@ -79,7 +76,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ indexes_to_do = _all_indexes; index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait( - buff_contents_tmp, exit_time - boost::get_system_time() + buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() )) return false; elems[index] = buff_contents_tmp.first; expected_seq_id = buff_contents_tmp.second; @@ -89,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ //pop an element off for this index index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait( - buff_contents_tmp, exit_time - boost::get_system_time() + buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() )) return false; //if the sequence id matches: diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index d1deece96..aca93b071 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -20,13 +20,9 @@ #include #include -#include namespace uhd{ namespace transport{ - //! typedef for the time duration type for wait operations - typedef boost::posix_time::time_duration time_duration_t; - /*! * Implement a templated bounded buffer: * Used for passing elements between threads in a producer-consumer model. @@ -64,10 +60,10 @@ namespace uhd{ namespace transport{ * Push a new element into the bounded_buffer. * Wait until the bounded_buffer becomes non-full or timeout. * \param elem the new element to push - * \param time the timeout time + * \param timeout the timeout in seconds * \return false when the operation times out */ - virtual bool push_with_timed_wait(const elem_type &elem, const time_duration_t &time) = 0; + virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0; /*! * Pop an element from the bounded_buffer. @@ -80,10 +76,10 @@ namespace uhd{ namespace transport{ * Pop an element from the bounded_buffer. * Wait until the bounded_buffer becomes non-empty or timeout. * \param elem the element reference pop to - * \param time the timeout time + * \param timeout the timeout in seconds * \return false when the operation times out */ - virtual bool pop_with_timed_wait(elem_type &elem, const time_duration_t &time) = 0; + virtual bool pop_with_timed_wait(elem_type &elem, double timeout) = 0; /*! * Clear all elements from the bounded_buffer. diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index e106e229e..71143741e 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -21,6 +21,7 @@ #include #include #include +#include namespace uhd{ namespace transport{ namespace{ /*anon*/ @@ -57,9 +58,12 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ _empty_cond.notify_one(); } - bool push_with_timed_wait(const elem_type &elem, const time_duration_t &time){ + bool push_with_timed_wait(const elem_type &elem, double timeout){ boost::unique_lock lock(_mutex); - if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer_impl::not_full, this))) return false; + if (not _full_cond.timed_wait( + lock, boost::posix_time::microseconds(long(timeout*1e6)), + boost::bind(&bounded_buffer_impl::not_full, this) + )) return false; _buffer.push_front(elem); lock.unlock(); _empty_cond.notify_one(); @@ -74,9 +78,12 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ _full_cond.notify_one(); } - bool pop_with_timed_wait(elem_type &elem, const time_duration_t &time){ + bool pop_with_timed_wait(elem_type &elem, double timeout){ boost::unique_lock lock(_mutex); - if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer_impl::not_empty, this))) return false; + if (not _empty_cond.timed_wait( + lock, boost::posix_time::microseconds(long(timeout*1e6)), + boost::bind(&bounded_buffer_impl::not_empty, this) + )) return false; elem = _buffer.back(); _buffer.pop_back(); lock.unlock(); _full_cond.notify_one(); diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 8ecafd3fb..ba19b193c 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -122,10 +122,10 @@ namespace uhd{ namespace transport{ /*! * Get a new receive buffer from this transport object. - * \param timeout_ms the timeout to get the buffer in ms + * \param timeout the timeout to get the buffer in seconds * \return a managed buffer, or null sptr on timeout/error */ - virtual managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms) = 0; + virtual managed_recv_buffer::sptr get_recv_buff(double timeout = 0.1) = 0; /*! * Get the maximum number of receive frames: @@ -138,9 +138,10 @@ namespace uhd{ namespace transport{ /*! * Get a new send buffer from this transport object. + * \param timeout the timeout to get the buffer in seconds * \return a managed buffer, or null sptr on timeout/error */ - virtual managed_send_buffer::sptr get_send_buff(void) = 0; + virtual managed_send_buffer::sptr get_send_buff(double timeout = 0.1) = 0; /*! * Get the maximum number of send frames: @@ -172,19 +173,19 @@ namespace uhd{ namespace transport{ /*! * Get a new receive buffer from this transport object. - * \param timeout_ms the timeout to get the buffer in ms + * \param timeout the timeout to get the buffer in seconds * \return a managed buffer, or null sptr on timeout/error */ - managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); + managed_recv_buffer::sptr get_recv_buff(double timeout); private: /*! * Perform a private copying recv. * \param buff the buffer to write data into - * \param timeout_ms the timeout to get the buffer in ms + * \param timeout the timeout to get the buffer in seconds * \return the number of bytes written to buff, 0 for timeout, negative for error */ - virtual ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms) = 0; + virtual ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout) = 0; UHD_PIMPL_DECL(impl) _impl; }; @@ -208,9 +209,10 @@ namespace uhd{ namespace transport{ /*! * Get a new send buffer from this transport object. + * \param timeout the timeout to get the buffer in seconds * \return a managed buffer, or null sptr on timeout/error */ - managed_send_buffer::sptr get_send_buff(void); + managed_send_buffer::sptr get_send_buff(double timeout); private: /*! diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f9beb0b4c..7f2bc3468 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -24,12 +24,10 @@ #include #include #include -#include using namespace uhd::transport; -const int libusb_timeout = 0; - +static const double CLEANUP_TIMEOUT = 0.2; //seconds static const size_t DEFAULT_NUM_XFERS = 16; //num xfers static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes @@ -84,10 +82,10 @@ public: * Get an available transfer: * For inputs, this is a just filled transfer. * For outputs, this is a just emptied transfer. - * \param timeout_ms the timeout to wait for a lut + * \param timeout the timeout to wait for a lut * \return the transfer pointer or NULL if timeout */ - libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); + libusb_transfer *get_lut_with_wait(double timeout); //Callback use only void callback_handle_transfer(libusb_transfer *lut); @@ -187,7 +185,7 @@ usb_endpoint::~usb_endpoint(void){ } //collect canceled transfers (drain the queue) - while (this->get_lut_with_wait() != NULL){}; + while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){}; //free all transfers BOOST_FOREACH(libusb_transfer *lut, _all_luts){ @@ -274,12 +272,10 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ } } -libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ +libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw libusb_transfer *lut; - if (_completed_list->pop_with_timed_wait( - lut, boost::posix_time::milliseconds(timeout_ms) - )) return lut; + if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; return NULL; } @@ -399,8 +395,8 @@ public: size_t send_xfer_size, size_t send_num_xfers ); - managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); - managed_send_buffer::sptr get_send_buff(void); + managed_recv_buffer::sptr get_recv_buff(double); + managed_send_buffer::sptr get_send_buff(double); size_t get_num_recv_frames(void) const { return _recv_num_frames; } size_t get_num_send_frames(void) const { return _send_num_frames; } @@ -459,8 +455,8 @@ libusb_zero_copy_impl::libusb_zero_copy_impl( * Return empty pointer if no transfer is available (timeout or error). * \return pointer to a managed receive buffer */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms){ - libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout_ms); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ + libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout); if (lut == NULL) { return managed_recv_buffer::sptr(); } @@ -478,8 +474,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms * (timeout or error). * \return pointer to a managed send buffer */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ - libusb_transfer *lut = _send_ep->get_lut_with_wait(/* TODO timeout API */); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ + libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout); if (lut == NULL) { return managed_send_buffer::sptr(); } @@ -494,18 +490,18 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ * USB zero_copy make functions **********************************************************************/ usb_zero_copy::sptr usb_zero_copy::make( - usb_device_handle::sptr handle, + usb_device_handle::sptr handle, unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers + size_t recv_xfer_size, size_t recv_num_xfers, + size_t send_xfer_size, size_t send_num_xfers ){ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( boost::static_pointer_cast(handle)->get_device() )); return sptr(new libusb_zero_copy_impl( - dev_handle, - recv_endpoint, send_endpoint, - recv_xfer_size, recv_num_xfers, - send_xfer_size, send_num_xfers + dev_handle, + recv_endpoint, send_endpoint, + recv_xfer_size, recv_num_xfers, + send_xfer_size, send_num_xfers )); } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 0a6c9f2af..3130830a5 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -109,11 +109,11 @@ private: boost::asio::io_service _io_service; int _sock_fd; - ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ + ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout){ //setup timeval for timeout timeval tv; tv.tv_sec = 0; - tv.tv_usec = timeout_ms*1000; + tv.tv_usec = long(timeout*1e6); //setup rset for timeout fd_set rset; diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index b603f1371..e11afff30 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -303,18 +303,18 @@ template UHD_INLINE T get_context_code( * Pack a vrt header, copy-convert the data, and send it. * - helper function for vrt_packet_handler::send ******************************************************************/ - static UHD_INLINE void _send1( + static UHD_INLINE size_t _send1( send_state &state, const std::vector &buffs, - size_t offset_bytes, - size_t num_samps, + const size_t offset_bytes, + const size_t num_samps, uhd::transport::vrt::if_packet_info_t &if_packet_info, const uhd::io_type_t &io_type, const uhd::otw_type_t &otw_type, const vrt_packer_t &vrt_packer, const get_send_buffs_t &get_send_buffs, - size_t vrt_header_offset_words32, - size_t chans_per_otw_buff + const size_t vrt_header_offset_words32, + const size_t chans_per_otw_buff ){ //load the rest of the if_packet_info in here if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t); @@ -322,7 +322,7 @@ template UHD_INLINE T get_context_code( //get send buffers for each channel managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); - UHD_ASSERT_THROW(get_send_buffs(send_buffs)); + if (not get_send_buffs(send_buffs)) return 0; std::vector io_buffs(chans_per_otw_buff); for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ @@ -347,6 +347,7 @@ template UHD_INLINE T get_context_code( std::cerr << "commit to send buffer returned less than commit size" << std::endl; } } + return num_samps; } /******************************************************************* @@ -381,7 +382,6 @@ template UHD_INLINE T get_context_code( //////////////////////////////////////////////////////////////// case uhd::device::SEND_MODE_ONE_PACKET:{ //////////////////////////////////////////////////////////////// - size_t num_samps = std::min(total_num_samps, max_samples_per_packet); //fill in parts of the packet info overwrote in full buff mode if_packet_info.has_tsi = metadata.has_time_spec; @@ -389,10 +389,10 @@ template UHD_INLINE T get_context_code( if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; - _send1( + return _send1( state, buffs, 0, - num_samps, + std::min(total_num_samps, max_samples_per_packet), if_packet_info, io_type, otw_type, vrt_packer, @@ -400,31 +400,32 @@ template UHD_INLINE T get_context_code( vrt_header_offset_words32, chans_per_otw_buff ); - return num_samps; } //////////////////////////////////////////////////////////////// case uhd::device::SEND_MODE_FULL_BUFF:{ //////////////////////////////////////////////////////////////// - //calculate constants for fragmentation - const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet; - static const size_t first_fragment_index = 0; - const size_t final_fragment_index = num_fragments-1; + size_t total_num_samps_sent = 0; //loop through the following fragment indexes - for (size_t n = first_fragment_index; n <= final_fragment_index; n++){ + while(total_num_samps_sent < total_num_samps){ + + //calculate per-loop-iteration variables + const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; + const bool first_fragment = (total_num_samps_sent == 0); + const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet); //calculate new flags for the fragments - if_packet_info.has_tsi = metadata.has_time_spec and (n == first_fragment_index); - if_packet_info.has_tsf = metadata.has_time_spec and (n == first_fragment_index); - if_packet_info.sob = metadata.start_of_burst and (n == first_fragment_index); - if_packet_info.eob = metadata.end_of_burst and (n == final_fragment_index); + if_packet_info.has_tsi = metadata.has_time_spec and first_fragment; + if_packet_info.has_tsf = if_packet_info.has_tsi; + if_packet_info.sob = metadata.start_of_burst and first_fragment; + if_packet_info.eob = metadata.end_of_burst and final_fragment; //send the fragment with the helper function - _send1( + const size_t num_samps_sent = _send1( state, - buffs, n*max_samples_per_packet*io_type.size, - (n == final_fragment_index)?(total_num_samps%max_samples_per_packet):max_samples_per_packet, + buffs, total_num_samps_sent*io_type.size, + std::min(total_num_samps_unsent, max_samples_per_packet), if_packet_info, io_type, otw_type, vrt_packer, @@ -432,8 +433,10 @@ template UHD_INLINE T get_context_code( vrt_header_offset_words32, chans_per_otw_buff ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; } - return total_num_samps; + return total_num_samps_sent; } default: throw std::runtime_error("unknown send mode"); diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index 1fcf846a0..dfb65951f 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -68,12 +68,12 @@ phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){ /* NOP */ } -managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(size_t timeout_ms){ +managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(double timeout){ //allocate memory boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size]; //call recv() with timeout option - ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout_ms); + ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout); if (num_bytes <= 0) return managed_recv_buffer::sptr(); //NULL sptr @@ -138,6 +138,6 @@ phony_zero_copy_send_if::~phony_zero_copy_send_if(void){ delete [] _impl->send_mem; } -managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){ +managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(double){ return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these } diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index aee760a83..8d9c68961 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -94,12 +94,13 @@ struct usrp1_impl::io_impl{ //all of this to ensure only full buffers are committed managed_send_buffer::sptr send_buff; size_t num_bytes_committed; + double send_timeout; boost::uint8_t pseudo_buff[BYTES_PER_PACKET]; ssize_t phony_commit_pseudo_buff(size_t num_bytes); ssize_t phony_commit_send_buff(size_t num_bytes); ssize_t commit_send_buff(void); void flush_send_buff(void); - bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &); + bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &, double); //helpers to get at the send buffer + offset inline void *get_send_mem_ptr(void){ @@ -159,14 +160,15 @@ void usrp1_impl::io_impl::flush_send_buff(void){ */ ssize_t usrp1_impl::io_impl::commit_send_buff(void){ ssize_t ret = send_buff->commit(num_bytes_committed); - send_buff = data_transport->get_send_buff(); + send_buff = data_transport->get_send_buff(send_timeout); num_bytes_committed = 0; return ret; } bool usrp1_impl::io_impl::get_send_buffs( - vrt_packet_handler::managed_send_buffs_t &buffs + vrt_packet_handler::managed_send_buffs_t &buffs, double timeout ){ + send_timeout = timeout; UHD_ASSERT_THROW(buffs.size() == 1); //not enough bytes free -> use the pseudo buffer @@ -216,7 +218,7 @@ static void usrp1_bs_vrt_packer( size_t usrp1_impl::send( const std::vector &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, double timeout ){ size_t num_samps_sent = vrt_packet_handler::send( _io_impl->packet_handler_send_state, //last state of the send handler @@ -225,7 +227,7 @@ size_t usrp1_impl::send( io_type, _tx_otw_type, //input and output types to convert _clock_ctrl->get_master_clock_freq(), //master clock tick rate &usrp1_bs_vrt_packer, - boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1), + boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1, timeout), get_max_send_samps_per_packet(), 0, //vrt header offset _tx_subdev_spec.size() //num channels @@ -272,18 +274,18 @@ static void usrp1_bs_vrt_unpacker( } static bool get_recv_buffs( - zero_copy_if::sptr zc_if, size_t timeout_ms, + zero_copy_if::sptr zc_if, double timeout, vrt_packet_handler::managed_recv_buffs_t &buffs ){ UHD_ASSERT_THROW(buffs.size() == 1); - buffs[0] = zc_if->get_recv_buff(timeout_ms); + buffs[0] = zc_if->get_recv_buff(timeout); return buffs[0].get() != NULL; } size_t usrp1_impl::recv( const std::vector &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, - recv_mode_t recv_mode, size_t timeout_ms + recv_mode_t recv_mode, double timeout ){ size_t num_samps_recvd = vrt_packet_handler::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler @@ -292,7 +294,7 @@ size_t usrp1_impl::recv( io_type, _rx_otw_type, //input and output types to convert _clock_ctrl->get_master_clock_freq(), //master clock tick rate &usrp1_bs_vrt_unpacker, - boost::bind(&get_recv_buffs, _data_transport, timeout_ms, _1), + boost::bind(&get_recv_buffs, _data_transport, timeout, _1), &vrt_packet_handler::handle_overflow_nop, 0, //vrt header offset _rx_subdev_spec.size() //num channels diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index 156fc119f..6ebd6bb09 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -209,9 +209,9 @@ usrp1_impl::~usrp1_impl(void){ /* NOP */ } -bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, size_t timeout_ms){ +bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, double timeout){ //dummy fill-in for the recv_async_msg - boost::this_thread::sleep(boost::posix_time::milliseconds(timeout_ms)); + boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); return false; } diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 20ae3c02a..f2c464610 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -83,13 +83,12 @@ public: size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, - send_mode_t); + send_mode_t, double); size_t recv(const std::vector &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, - recv_mode_t, - size_t timeout); + recv_mode_t, double); static const size_t BYTES_PER_PACKET = 512*4; //under the transfer size @@ -101,7 +100,7 @@ public: return BYTES_PER_PACKET/_rx_otw_type.get_sample_size()/_rx_subdev_spec.size(); } - bool recv_async_msg(uhd::async_metadata_t &, size_t); + bool recv_async_msg(uhd::async_metadata_t &, double); private: /*! diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 3395f94e2..c0d8ab029 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -33,7 +33,6 @@ using namespace uhd::transport; namespace asio = boost::asio; static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET; -static const double RECV_TIMEOUT_MS = 100; /*********************************************************************** * io impl details (internal to this file) @@ -59,9 +58,9 @@ struct usrp2_impl::io_impl{ recv_pirate_crew.join_all(); } - bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, size_t timeout_ms){ + bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw - return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(timeout_ms)); + return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout); } //state management for the vrt packet handler code @@ -91,7 +90,7 @@ void usrp2_impl::io_impl::recv_pirate_loop( size_t next_packet_seq = 0; while(recv_pirate_crew_raiding){ - managed_recv_buffer::sptr buff = zc_if->get_recv_buff(RECV_TIMEOUT_MS); + managed_recv_buffer::sptr buff = zc_if->get_recv_buff(); if (not buff.get()) continue; //ignore timeout/error buffers try{ @@ -151,7 +150,7 @@ void usrp2_impl::io_init(void){ std::memcpy(send_buff->cast(), &data, sizeof(data)); send_buff->commit(sizeof(data)); //drain the recv buffers (may have junk) - while (data_transport->get_recv_buff(RECV_TIMEOUT_MS).get()){}; + while (data_transport->get_recv_buff().get()){}; } //the number of recv frames is the number for the first transport @@ -179,12 +178,10 @@ void usrp2_impl::io_init(void){ * Async Data **********************************************************************/ bool usrp2_impl::recv_async_msg( - async_metadata_t &async_metadata, size_t timeout_ms + async_metadata_t &async_metadata, double timeout ){ boost::this_thread::disable_interruption di; //disable because the wait can throw - return _io_impl->async_msg_fifo->pop_with_timed_wait( - async_metadata, boost::posix_time::milliseconds(timeout_ms) - ); + return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout); } /*********************************************************************** @@ -192,19 +189,22 @@ bool usrp2_impl::recv_async_msg( **********************************************************************/ static bool get_send_buffs( const std::vector &trans, - vrt_packet_handler::managed_send_buffs_t &buffs + vrt_packet_handler::managed_send_buffs_t &buffs, + double timeout ){ UHD_ASSERT_THROW(trans.size() == buffs.size()); + bool good = true; for (size_t i = 0; i < buffs.size(); i++){ - buffs[i] = trans[i]->get_send_buff(); + buffs[i] = trans[i]->get_send_buff(timeout); + good = good and (buffs[i].get() != NULL); } - return true; + return good; } size_t usrp2_impl::send( const std::vector &buffs, size_t num_samps, const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode + send_mode_t send_mode, double timeout ){ return vrt_packet_handler::send( _io_impl->packet_handler_send_state, //last state of the send handler @@ -213,7 +213,7 @@ size_t usrp2_impl::send( io_type, _io_helper.get_tx_otw_type(), //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_pack_be, - boost::bind(&get_send_buffs, _data_transports, _1), + boost::bind(&get_send_buffs, _data_transports, _1, timeout), get_max_send_samps_per_packet() ); } @@ -224,7 +224,7 @@ size_t usrp2_impl::send( size_t usrp2_impl::recv( const std::vector &buffs, size_t num_samps, rx_metadata_t &metadata, const io_type_t &io_type, - recv_mode_t recv_mode, size_t timeout_ms + recv_mode_t recv_mode, double timeout ){ return vrt_packet_handler::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler @@ -233,6 +233,6 @@ size_t usrp2_impl::recv( io_type, _io_helper.get_rx_otw_type(), //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_unpack_be, - boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout_ms) + boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout) ); } diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 157d17057..e8763b284 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -234,7 +234,7 @@ public: size_t send( const std::vector &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, - uhd::device::send_mode_t + uhd::device::send_mode_t, double ); size_t get_max_recv_samps_per_packet(void) const{ return _io_helper.get_max_recv_samps_per_packet(); @@ -242,9 +242,9 @@ public: size_t recv( const std::vector &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, - uhd::device::recv_mode_t, size_t + uhd::device::recv_mode_t, double ); - bool recv_async_msg(uhd::async_metadata_t &, size_t); + bool recv_async_msg(uhd::async_metadata_t &, double); private: //device properties interface diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp index aadb3f951..8445412e7 100644 --- a/host/test/buffer_test.cpp +++ b/host/test/buffer_test.cpp @@ -23,7 +23,7 @@ using namespace boost::assign; using namespace uhd::transport; -static const boost::posix_time::milliseconds timeout(10); +static const double timeout = 0.01/*secs*/; BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){ bounded_buffer::sptr bb(bounded_buffer::make(3)); -- cgit v1.2.3 From b57c84b34bcdd6c66eb053695b83e6bd6c481774 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 3 Oct 2010 01:41:24 -0700 Subject: uhd: implemented udp zero copy asio with async calls --- host/lib/transport/CMakeLists.txt | 2 +- host/lib/transport/libusb1_zero_copy.cpp | 6 +- host/lib/transport/udp_zero_copy_asio.cpp | 186 +++++++++++++++++++++--------- 3 files changed, 133 insertions(+), 61 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 2be2c89ec..bf92b0822 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -101,7 +101,7 @@ SET_SOURCE_FILES_PROPERTIES( LIBUHD_APPEND_SOURCES( ${CMAKE_SOURCE_DIR}/lib/transport/if_addrs.cpp ${CMAKE_SOURCE_DIR}/lib/transport/udp_simple.cpp - #${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp + ${CMAKE_SOURCE_DIR}/lib/transport/udp_zero_copy_asio.cpp ${CMAKE_SOURCE_DIR}/lib/transport/vrt_packet_handler.hpp ${CMAKE_SOURCE_DIR}/lib/transport/zero_copy.cpp ) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index e1cc8398c..c819302b6 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -277,7 +277,7 @@ libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ /*********************************************************************** * USB zero_copy device class **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy { +class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this { public: typedef boost::shared_ptr sptr; @@ -378,7 +378,7 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ else { return managed_recv_buffer::make_safe( boost::asio::const_buffer(lut->buffer, lut->actual_length), - boost::bind(&libusb_zero_copy_impl::release, this, lut) + boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) ); } } @@ -398,7 +398,7 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ else { return managed_send_buffer::make_safe( boost::asio::mutable_buffer(lut->buffer, _send_xfer_size), - boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1) + boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) ); } } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 3130830a5..70e7514a1 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -17,20 +17,23 @@ #include #include //mtu +#include #include #include -#include +#include #include #include +#include #include using namespace uhd::transport; +namespace asio = boost::asio; /*********************************************************************** * Constants **********************************************************************/ //enough buffering for half a second of samples at full rate on usrp2 -static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 25e6 * 0.5); +static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); //Large buffers cause more underflow at high rates. //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. @@ -43,39 +46,55 @@ static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_impl: - public phony_zero_copy_recv_if, - public phony_zero_copy_send_if, - public udp_zero_copy -{ +class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this { public: - typedef boost::shared_ptr sptr; - - udp_zero_copy_impl( - const std::string &addr, - const std::string &port - ): - phony_zero_copy_recv_if(udp_simple::mtu), - phony_zero_copy_send_if(udp_simple::mtu) - { + typedef boost::shared_ptr sptr; + + udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){ //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; // resolve the address - boost::asio::ip::udp::resolver resolver(_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); - boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); // create, open, and connect the socket - _socket = new boost::asio::ip::udp::socket(_io_service); - _socket->open(boost::asio::ip::udp::v4()); + _socket = new asio::ip::udp::socket(_io_service); + _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); - _sock_fd = _socket->native(); } - ~udp_zero_copy_impl(void){ + ~udp_zero_copy_asio_impl(void){ + _io_service.stop(); + _thread_group.join_all(); delete _socket; } + /*! + * Init, the second contructor: + * Allocate memory and spwan service thread. + */ + void init(void){ + //allocate all recv frames and release them to begin xfers + _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames()); + for (size_t i = 0; i < this->get_num_recv_frames(); i++){ + boost::shared_array buff(new char[udp_simple::mtu]); + _buffers.push_back(buff); //store a reference to this shared array + release(buff.get()); + } + + //allocate all send frames and push them into the fifo + _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames()); + for (size_t i = 0; i < this->get_num_send_frames(); i++){ + boost::shared_array buff(new char[udp_simple::mtu]); + _buffers.push_back(buff); //store a reference to this shared array + handle_send(buff.get()); + } + + //spawn the service thread that will run the io service + _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this)); + } + //get size for internal socket buffer template size_t get_buff_size(void) const{ Opt option; @@ -90,60 +109,111 @@ public: return get_buff_size(); } - //The number of frames is approximately the buffer size divided by the max datagram size. //In reality, this is a phony zero-copy interface and the number of frames is infinite. //However, its sensible to advertise a frame count that is approximate to buffer size. //This way, the transport caller will have an idea about how much buffering to create. size_t get_num_recv_frames(void) const{ - return this->get_buff_size()/udp_simple::mtu; + return this->get_buff_size()/udp_simple::mtu; } size_t get_num_send_frames(void) const{ - return this->get_buff_size()/udp_simple::mtu; + return this->get_buff_size()/udp_simple::mtu; + } + + //! pop a filled recv buffer off of the fifo and bind with the release callback + managed_recv_buffer::sptr get_recv_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_recv_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast(buff) + ) + ); + } + return managed_recv_buffer::sptr(); + } + + //! pop an empty send buffer off of the fifo and bind with the commit callback + managed_send_buffer::sptr get_send_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_send_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::commit, + shared_from_this(), + asio::buffer_cast(buff), _1 + ) + ); + } + return managed_send_buffer::sptr(); } private: - boost::asio::ip::udp::socket *_socket; - boost::asio::io_service _io_service; - int _sock_fd; - - ssize_t recv(const boost::asio::mutable_buffer &buff, double timeout){ - //setup timeval for timeout - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = long(timeout*1e6); - - //setup rset for timeout - fd_set rset; - FD_ZERO(&rset); - FD_SET(_sock_fd, &rset); - - //call select to perform timed wait - if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0; - - return ::recv( - _sock_fd, - boost::asio::buffer_cast(buff), - boost::asio::buffer_size(buff), 0 + void service(void){ + _io_service.run(); + } + + /******************************************************************* + * The async send and receive callbacks + ******************************************************************/ + + //! handle a recv callback -> push the filled memory into the fifo + void handle_recv(void *mem, size_t len){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + } + + //! release a recv buffer -> start an async recv on the buffer + void release(void *mem){ + _socket->async_receive( + boost::asio::buffer(mem, udp_simple::mtu), + boost::bind( + &udp_zero_copy_asio_impl::handle_recv, + shared_from_this(), mem, + asio::placeholders::bytes_transferred + ) ); } - ssize_t send(const boost::asio::const_buffer &buff){ - return ::send( - _sock_fd, - boost::asio::buffer_cast(buff), - boost::asio::buffer_size(buff), 0 + //! handle a send callback -> push the emptied memory into the fifo + void handle_send(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu)); + } + + //! commit a send buffer -> start an async send on the buffer + void commit(void *mem, size_t len){ + _socket->async_send( + boost::asio::buffer(mem, len), + boost::bind( + &udp_zero_copy_asio_impl::handle_send, + shared_from_this(), mem + ) ); } + + //asio guts -> socket and service + asio::ip::udp::socket *_socket; + asio::io_service _io_service; + + //memory management -> buffers and fifos + boost::thread_group _thread_group; + std::vector > _buffers; + typedef bounded_buffer pending_buffs_type; + pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; }; /*********************************************************************** * UDP zero copy make function **********************************************************************/ template static void resize_buff_helper( - udp_zero_copy_impl::sptr udp_trans, + udp_zero_copy_asio_impl::sptr udp_trans, size_t target_size, const std::string &name ){ @@ -183,11 +253,13 @@ udp_zero_copy::sptr udp_zero_copy::make( size_t recv_buff_size, size_t send_buff_size ){ - udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port)); + udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port)); //call the helper to resize send and recv buffers - resize_buff_helper(udp_trans, recv_buff_size, "recv"); - resize_buff_helper (udp_trans, send_buff_size, "send"); + resize_buff_helper(udp_trans, recv_buff_size, "recv"); + resize_buff_helper (udp_trans, send_buff_size, "send"); + + udp_trans->init(); //buffers resized -> call init() to use return udp_trans; } -- cgit v1.2.3 From c6f17f1e2f908747ae1547fa43b2c22c3c20ba50 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Sun, 3 Oct 2010 19:34:41 -0700 Subject: uhd: changed buffer allocations to be in a single chunk, udp: pass frame sizes into the impl constructor --- host/lib/transport/libusb1_zero_copy.cpp | 27 +++++----- host/lib/transport/udp_zero_copy_asio.cpp | 85 ++++++++++++++++--------------- 2 files changed, 59 insertions(+), 53 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index c819302b6..ab48e4fc4 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -100,13 +100,13 @@ private: lut_buff_type::sptr _completed_list; //! a list of all transfer structs we allocated - std::vector _all_luts; + std::vector _all_luts; - //! a list of shared arrays for the transfer buffers - std::vector > _buffers; + //! a block of memory for the transfer buffers + boost::shared_array _buffer; // Calls for processing asynchronous I/O - libusb_transfer *allocate_transfer(int buff_len); + libusb_transfer *allocate_transfer(void *mem, size_t len); void print_transfer_status(libusb_transfer *lut); }; @@ -154,9 +154,9 @@ usb_endpoint::usb_endpoint( _input(input) { _completed_list = lut_buff_type::make(num_transfers); - + _buffer = boost::shared_array(new char[num_transfers*transfer_size]); for (size_t i = 0; i < num_transfers; i++){ - _all_luts.push_back(allocate_transfer(transfer_size)); + _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size)); //input luts are immediately submitted to be filled //output luts go into the completed list as free buffers @@ -193,23 +193,23 @@ usb_endpoint::~usb_endpoint(void){ * Allocate a libusb transfer * The allocated transfer - and buffer it contains - is repeatedly * submitted, reaped, and reused and should not be freed until shutdown. - * \param buff_len size of the individual buffer held by each transfer + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer * \return pointer to an allocated libusb_transfer */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){ libusb_transfer *lut = libusb_alloc_transfer(0); - - boost::shared_array buff(new boost::uint8_t[buff_len]); - _buffers.push_back(buff); //store a reference to this shared array + UHD_ASSERT_THROW(lut != NULL); unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); + unsigned char *buff = reinterpret_cast(mem); libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); libusb_fill_bulk_transfer(lut, // transfer _handle->get(), // dev_handle endpoint, // endpoint - buff.get(), // buffer - buff_len, // length + buff, // buffer + len, // length lut_callback, // callback this, // user_data 0); // timeout @@ -232,6 +232,7 @@ void usb_endpoint::submit(libusb_transfer *lut){ * \param lut pointer to an libusb_transfer */ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ + std::cout << "here " << lut->status << std::endl; switch (lut->status) { case LIBUSB_TRANSFER_COMPLETED: if (lut->actual_length < lut->length) { diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 70e7514a1..a1eb516fc 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -18,6 +18,7 @@ #include #include //mtu #include +#include #include #include #include @@ -26,6 +27,7 @@ #include #include +using namespace uhd; using namespace uhd::transport; namespace asio = boost::asio; @@ -34,11 +36,15 @@ namespace asio = boost::asio; **********************************************************************/ //enough buffering for half a second of samples at full rate on usrp2 static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); + //Large buffers cause more underflow at high rates. //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); +//the number of async frames to allocate for each send and recv +static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32; + /*********************************************************************** * Zero Copy UDP implementation with ASIO: * This is the portable zero copy implementation for systems @@ -50,51 +56,52 @@ class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_share public: typedef boost::shared_ptr sptr; - udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){ + udp_zero_copy_asio_impl( + const std::string &addr, const std::string &port, + size_t recv_frame_size, size_t num_recv_frames, + size_t send_frame_size, size_t num_send_frames + ): + _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames), + _send_frame_size(send_frame_size), _num_send_frames(num_send_frames) + { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - // resolve the address + //resolve the address asio::ip::udp::resolver resolver(_io_service); asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - // create, open, and connect the socket + //create, open, and connect the socket _socket = new asio::ip::udp::socket(_io_service); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); } - ~udp_zero_copy_asio_impl(void){ - _io_service.stop(); - _thread_group.join_all(); - delete _socket; - } - - /*! - * Init, the second contructor: - * Allocate memory and spwan service thread. - */ void init(void){ //allocate all recv frames and release them to begin xfers - _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames()); - for (size_t i = 0; i < this->get_num_recv_frames(); i++){ - boost::shared_array buff(new char[udp_simple::mtu]); - _buffers.push_back(buff); //store a reference to this shared array - release(buff.get()); + _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); + _recv_buffer = boost::shared_array(new char[_num_recv_frames*_recv_frame_size]); + for (size_t i = 0; i < _num_recv_frames; i++){ + release(_recv_buffer.get() + i*_recv_frame_size); } //allocate all send frames and push them into the fifo - _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames()); - for (size_t i = 0; i < this->get_num_send_frames(); i++){ - boost::shared_array buff(new char[udp_simple::mtu]); - _buffers.push_back(buff); //store a reference to this shared array - handle_send(buff.get()); + _pending_send_buffs = pending_buffs_type::make(_num_send_frames); + _send_buffer = boost::shared_array(new char[_num_send_frames*_send_frame_size]); + for (size_t i = 0; i < _num_send_frames; i++){ + handle_send(_send_buffer.get() + i*_send_frame_size); } //spawn the service thread that will run the io service _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this)); } + ~udp_zero_copy_asio_impl(void){ + _io_service.stop(); + _thread_group.join_all(); + delete _socket; + } + //get size for internal socket buffer template size_t get_buff_size(void) const{ Opt option; @@ -109,19 +116,6 @@ public: return get_buff_size(); } - //The number of frames is approximately the buffer size divided by the max datagram size. - //In reality, this is a phony zero-copy interface and the number of frames is infinite. - //However, its sensible to advertise a frame count that is approximate to buffer size. - //This way, the transport caller will have an idea about how much buffering to create. - - size_t get_num_recv_frames(void) const{ - return this->get_buff_size()/udp_simple::mtu; - } - - size_t get_num_send_frames(void) const{ - return this->get_buff_size()/udp_simple::mtu; - } - //! pop a filled recv buffer off of the fifo and bind with the release callback managed_recv_buffer::sptr get_recv_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -138,6 +132,8 @@ public: return managed_recv_buffer::sptr(); } + size_t get_num_recv_frames(void) const {return _num_recv_frames;} + //! pop an empty send buffer off of the fifo and bind with the commit callback managed_send_buffer::sptr get_send_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -154,8 +150,11 @@ public: return managed_send_buffer::sptr(); } + size_t get_num_send_frames(void) const {return _num_send_frames;} + private: void service(void){ + set_thread_priority_safe(); _io_service.run(); } @@ -172,7 +171,7 @@ private: //! release a recv buffer -> start an async recv on the buffer void release(void *mem){ _socket->async_receive( - boost::asio::buffer(mem, udp_simple::mtu), + boost::asio::buffer(mem, _recv_frame_size), boost::bind( &udp_zero_copy_asio_impl::handle_recv, shared_from_this(), mem, @@ -184,7 +183,7 @@ private: //! handle a send callback -> push the emptied memory into the fifo void handle_send(void *mem){ boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu)); + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size)); } //! commit a send buffer -> start an async send on the buffer @@ -204,9 +203,11 @@ private: //memory management -> buffers and fifos boost::thread_group _thread_group; - std::vector > _buffers; + boost::shared_array _send_buffer, _recv_buffer; typedef bounded_buffer pending_buffs_type; pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; }; /*********************************************************************** @@ -253,7 +254,11 @@ udp_zero_copy::sptr udp_zero_copy::make( size_t recv_buff_size, size_t send_buff_size ){ - udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port)); + udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl( + addr, port, + udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv + udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES //send + )); //call the helper to resize send and recv buffers resize_buff_helper(udp_trans, recv_buff_size, "recv"); -- cgit v1.2.3 From 57ad303e850f143573d8c6edb1a88542b7b43350 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 4 Oct 2010 09:28:26 -0700 Subject: udp: added io service work to keep service running --- host/lib/transport/udp_zero_copy_asio.cpp | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index a1eb516fc..2cf7bde18 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -45,6 +45,9 @@ static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); //the number of async frames to allocate for each send and recv static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32; +//a single concurrent thread for io_service seems to be the fastest +static const size_t CONCURRENCY_HINT = 1; + /*********************************************************************** * Zero Copy UDP implementation with ASIO: * This is the portable zero copy implementation for systems @@ -61,6 +64,8 @@ public: size_t recv_frame_size, size_t num_recv_frames, size_t send_frame_size, size_t num_send_frames ): + _io_service(CONCURRENCY_HINT), + _work(new asio::io_service::work(_io_service)), _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames), _send_frame_size(send_frame_size), _num_send_frames(num_send_frames) { @@ -92,13 +97,15 @@ public: handle_send(_send_buffer.get() + i*_send_frame_size); } - //spawn the service thread that will run the io service - _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this)); + //spawn the service threads that will run the io service + for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread( + boost::bind(&udp_zero_copy_asio_impl::service, this) + ); } ~udp_zero_copy_asio_impl(void){ - _io_service.stop(); - _thread_group.join_all(); + delete _work; //allow io_service run to complete + _thread_group.join_all(); //wait for service threads to exit delete _socket; } @@ -200,6 +207,7 @@ private: //asio guts -> socket and service asio::ip::udp::socket *_socket; asio::io_service _io_service; + asio::io_service::work *_work; //memory management -> buffers and fifos boost::thread_group _thread_group; -- cgit v1.2.3 From 219183453b23a572ddd6eeec81c3bc1907754560 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 4 Oct 2010 11:30:14 -0700 Subject: uhd: added include for enable_shared_from_this when used --- host/lib/transport/libusb1_zero_copy.cpp | 2 +- host/lib/transport/udp_zero_copy_asio.cpp | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index ab48e4fc4..c7f084f62 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -280,7 +281,6 @@ libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ **********************************************************************/ class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this { public: - typedef boost::shared_ptr sptr; libusb_zero_copy_impl( libusb::device_handle::sptr handle, diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 2cf7bde18..e9d91fe45 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include using namespace uhd; -- cgit v1.2.3 From 0cd5375b5c8a928f112a963d9c9c2556bafed108 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 5 Oct 2010 10:30:28 -0700 Subject: uhd: replaced frame params for the zero copy interfaces with a device address the device address gives a key, value pair of infinite optional capabilities added a cast option to the device address to cast string to type T added call to the zero_copy_if to get send and recv frame sizes changed the usrp2 impl to calculate recv/send spp from the data transport --- host/include/uhd/transport/udp_zero_copy.hpp | 7 ++- host/include/uhd/transport/usb_control.hpp | 2 +- host/include/uhd/transport/usb_zero_copy.hpp | 17 +++---- host/include/uhd/transport/zero_copy.hpp | 26 +++++++--- host/include/uhd/types/device_addr.hpp | 20 ++++++++ host/lib/transport/libusb1_zero_copy.cpp | 67 +++++++++++-------------- host/lib/transport/udp_zero_copy_asio.cpp | 35 +++++++------ host/lib/transport/usb_dummy_impl.cpp | 4 +- host/lib/usrp/usrp1/usrp1_impl.cpp | 48 +++++++----------- host/lib/usrp/usrp2/io_impl.cpp | 4 +- host/lib/usrp/usrp2/mboard_impl.cpp | 8 +-- host/lib/usrp/usrp2/usrp2_impl.cpp | 40 ++++++--------- host/lib/usrp/usrp2/usrp2_impl.hpp | 75 ++++++++-------------------- 13 files changed, 162 insertions(+), 191 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/include/uhd/transport/udp_zero_copy.hpp b/host/include/uhd/transport/udp_zero_copy.hpp index 818709973..bbba97b21 100644 --- a/host/include/uhd/transport/udp_zero_copy.hpp +++ b/host/include/uhd/transport/udp_zero_copy.hpp @@ -20,6 +20,7 @@ #include #include +#include #include namespace uhd{ namespace transport{ @@ -50,14 +51,12 @@ public: * * \param addr a string representing the destination address * \param port a string representing the destination port - * \param recv_buff_size size in bytes for the recv buffer, 0 for automatic - * \param send_buff_size size in bytes for the send buffer, 0 for automatic + * \param hints optional parameters to pass to the underlying transport */ static sptr make( const std::string &addr, const std::string &port, - size_t recv_buff_size = 0, - size_t send_buff_size = 0 + const device_addr_t &hints = device_addr_t() ); }; diff --git a/host/include/uhd/transport/usb_control.hpp b/host/include/uhd/transport/usb_control.hpp index f9829c3ec..e6c32f78e 100644 --- a/host/include/uhd/transport/usb_control.hpp +++ b/host/include/uhd/transport/usb_control.hpp @@ -18,7 +18,7 @@ #ifndef INCLUDED_UHD_TRANSPORT_USB_CONTROL_HPP #define INCLUDED_UHD_TRANSPORT_USB_CONTROL_HPP -#include "usb_device_handle.hpp" +#include namespace uhd { namespace transport { diff --git a/host/include/uhd/transport/usb_zero_copy.hpp b/host/include/uhd/transport/usb_zero_copy.hpp index 61bf380ba..b39171fba 100644 --- a/host/include/uhd/transport/usb_zero_copy.hpp +++ b/host/include/uhd/transport/usb_zero_copy.hpp @@ -18,8 +18,9 @@ #ifndef INCLUDED_UHD_TRANSPORT_USB_ZERO_COPY_HPP #define INCLUDED_UHD_TRANSPORT_USB_ZERO_COPY_HPP -#include "usb_device_handle.hpp" +#include #include +#include namespace uhd { namespace transport { @@ -47,19 +48,13 @@ public: * \param handle a device handle that uniquely identifying the device * \param recv_endpoint an integer specifiying an IN endpoint number * \param send_endpoint an integer specifiying an OUT endpoint number - * \param recv_xfer_size the number of bytes for each receive transfer - * \param recv_num_xfers the number of simultaneous receive transfers - * \param send_xfer_size the number of bytes for each send transfer - * \param send_num_xfers the number of simultaneous send transfers + * \param hints optional parameters to pass to the underlying transport */ static sptr make( usb_device_handle::sptr handle, - unsigned int recv_endpoint, - unsigned int send_endpoint, - size_t recv_xfer_size = 0, - size_t recv_num_xfers = 0, - size_t send_xfer_size = 0, - size_t send_num_xfers = 0 + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints = device_addr_t() ); }; diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 9dd16280c..7d8fb4b83 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -155,14 +155,19 @@ namespace uhd{ namespace transport{ virtual managed_recv_buffer::sptr get_recv_buff(double timeout = 0.1) = 0; /*! - * Get the maximum number of receive frames: - * The maximum number of valid managed recv buffers, - * or the maximum number of frames in the ring buffer, - * depending upon the underlying implementation. + * Get the number of receive frames: + * The number of simultaneous receive buffers in use. * \return number of frames */ virtual size_t get_num_recv_frames(void) const = 0; + /*! + * Get the size of a receive frame: + * The maximum capacity of a single receive buffer. + * \return frame size in bytes + */ + virtual size_t get_recv_frame_size(void) const = 0; + /*! * Get a new send buffer from this transport object. * \param timeout the timeout to get the buffer in seconds @@ -171,14 +176,19 @@ namespace uhd{ namespace transport{ virtual managed_send_buffer::sptr get_send_buff(double timeout = 0.1) = 0; /*! - * Get the maximum number of send frames: - * The maximum number of valid managed send buffers, - * or the maximum number of frames in the ring buffer, - * depending upon the underlying implementation. + * Get the number of send frames: + * The number of simultaneous send buffers in use. * \return number of frames */ virtual size_t get_num_send_frames(void) const = 0; + /*! + * Get the size of a send frame: + * The maximum capacity of a single send buffer. + * \return frame size in bytes + */ + virtual size_t get_send_frame_size(void) const = 0; + }; }} //namespace diff --git a/host/include/uhd/types/device_addr.hpp b/host/include/uhd/types/device_addr.hpp index e359d9467..eb3394230 100644 --- a/host/include/uhd/types/device_addr.hpp +++ b/host/include/uhd/types/device_addr.hpp @@ -20,6 +20,8 @@ #include #include +#include +#include #include #include @@ -62,6 +64,24 @@ namespace uhd{ * \return a string with delimiter markup */ std::string to_string(void) const; + + /*! + * Lexically cast a parameter to the specified type, + * or use the default value if the key is not found. + * \param key the key as one of the address parameters + * \param def the value to use when key is not present + * \return the casted value as type T or the default + * \throw error when the parameter cannot be casted + */ + template T cast(const std::string &key, const T &def) const{ + if (not this->has_key(key)) return def; + try{ + return boost::lexical_cast((*this)[key]); + } + catch(const boost::bad_lexical_cast &){ + throw std::runtime_error("cannot cast " + key + " = " + (*this)[key]); + } + } }; //handy typedef for a vector of device addresses diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 819874483..df6db1eb9 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -26,6 +26,7 @@ #include #include +using namespace uhd; using namespace uhd::transport; static const double CLEANUP_TIMEOUT = 0.2; //seconds @@ -284,16 +285,19 @@ public: libusb_zero_copy_impl( libusb::device_handle::sptr handle, - unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints ); managed_recv_buffer::sptr get_recv_buff(double); managed_send_buffer::sptr get_send_buff(double); - size_t get_num_recv_frames(void) const { return _recv_num_frames; } - size_t get_num_send_frames(void) const { return _send_num_frames; } + size_t get_num_recv_frames(void) const { return _num_recv_frames; } + size_t get_num_send_frames(void) const { return _num_send_frames; } + + size_t get_recv_frame_size(void) const { return _recv_frame_size; } + size_t get_send_frame_size(void) const { return _send_frame_size; } private: void release(libusb_transfer *lut){ @@ -311,8 +315,8 @@ private: } libusb::device_handle::sptr _handle; - size_t _recv_xfer_size, _send_xfer_size; - size_t _recv_num_frames, _send_num_frames; + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; usb_endpoint::sptr _recv_ep, _send_ep; }; @@ -323,24 +327,16 @@ private: */ libusb_zero_copy_impl::libusb_zero_copy_impl( libusb::device_handle::sptr handle, - unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers -){ - _handle = handle; - - //if the sizes are left at 0 (automatic) -> use the defaults - if (recv_xfer_size == 0) recv_xfer_size = DEFAULT_XFER_SIZE; - if (recv_num_xfers == 0) recv_num_xfers = DEFAULT_NUM_XFERS; - if (send_xfer_size == 0) send_xfer_size = DEFAULT_XFER_SIZE; - if (send_num_xfers == 0) send_num_xfers = DEFAULT_NUM_XFERS; - - //store the num xfers for the num frames count - _recv_xfer_size = recv_xfer_size; - _recv_num_frames = recv_num_xfers; - _send_xfer_size = send_xfer_size; - _send_num_frames = send_num_xfers; - + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints +): + _handle(handle), + _recv_frame_size(size_t(hints.cast("recv_frame_size", DEFAULT_XFER_SIZE))), + _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_XFERS))), + _send_frame_size(size_t(hints.cast("send_frame_size", DEFAULT_XFER_SIZE))), + _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_XFERS))) +{ _handle->claim_interface(2 /*in interface*/); _handle->claim_interface(1 /*out interface*/); @@ -348,16 +344,16 @@ libusb_zero_copy_impl::libusb_zero_copy_impl( _handle, // libusb device_handle recv_endpoint, // USB endpoint number true, // IN endpoint - recv_xfer_size, // buffer size per transfer - recv_num_xfers // number of libusb transfers + this->get_recv_frame_size(), // buffer size per transfer + this->get_num_recv_frames() // number of libusb transfers )); _send_ep = usb_endpoint::sptr(new usb_endpoint( _handle, // libusb device_handle send_endpoint, // USB endpoint number false, // OUT endpoint - send_xfer_size, // buffer size per transfer - send_num_xfers // number of libusb transfers + this->get_send_frame_size(), // buffer size per transfer + this->get_num_send_frames() // number of libusb transfers )); } @@ -394,7 +390,7 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ } else { return managed_send_buffer::make_safe( - boost::asio::mutable_buffer(lut->buffer, _send_xfer_size), + boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) ); } @@ -405,17 +401,14 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ **********************************************************************/ usb_zero_copy::sptr usb_zero_copy::make( usb_device_handle::sptr handle, - unsigned int recv_endpoint, unsigned int send_endpoint, - size_t recv_xfer_size, size_t recv_num_xfers, - size_t send_xfer_size, size_t send_num_xfers + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints ){ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( boost::static_pointer_cast(handle)->get_device() )); return sptr(new libusb_zero_copy_impl( - dev_handle, - recv_endpoint, send_endpoint, - recv_xfer_size, recv_num_xfers, - send_xfer_size, send_num_xfers + dev_handle, recv_endpoint, send_endpoint, hints )); } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index e9d91fe45..ada282e07 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -44,7 +44,7 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); //the number of async frames to allocate for each send and recv -static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32; +static const size_t DEFAULT_NUM_FRAMES = 32; //a single concurrent thread for io_service seems to be the fastest static const size_t CONCURRENCY_HINT = 1; @@ -61,14 +61,15 @@ public: typedef boost::shared_ptr sptr; udp_zero_copy_asio_impl( - const std::string &addr, const std::string &port, - size_t recv_frame_size, size_t num_recv_frames, - size_t send_frame_size, size_t num_send_frames + const std::string &addr, + const std::string &port, + const device_addr_t &hints ): - _io_service(CONCURRENCY_HINT), - _work(new asio::io_service::work(_io_service)), - _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames), - _send_frame_size(send_frame_size), _num_send_frames(num_send_frames) + _io_service(hints.cast("concurrency_hint", CONCURRENCY_HINT)), + _recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))), + _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_FRAMES))), + _send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))), + _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_FRAMES))) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -99,6 +100,7 @@ public: } //spawn the service threads that will run the io service + _work = new asio::io_service::work(_io_service); //new work to delete later for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread( boost::bind(&udp_zero_copy_asio_impl::service, this) ); @@ -141,6 +143,7 @@ public: } size_t get_num_recv_frames(void) const {return _num_recv_frames;} + size_t get_recv_frame_size(void) const {return _recv_frame_size;} //! pop an empty send buffer off of the fifo and bind with the commit callback managed_send_buffer::sptr get_send_buff(double timeout){ @@ -159,6 +162,7 @@ public: } size_t get_num_send_frames(void) const {return _num_send_frames;} + size_t get_send_frame_size(void) const {return _send_frame_size;} private: void service(void){ @@ -260,14 +264,15 @@ template static void resize_buff_helper( udp_zero_copy::sptr udp_zero_copy::make( const std::string &addr, const std::string &port, - size_t recv_buff_size, - size_t send_buff_size + const device_addr_t &hints ){ - udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl( - addr, port, - udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv - udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES //send - )); + udp_zero_copy_asio_impl::sptr udp_trans( + new udp_zero_copy_asio_impl(addr, port, hints) + ); + + //extract buffer size hints from the device addr + size_t recv_buff_size = size_t(hints.cast("recv_buff_size", 0.0)); + size_t send_buff_size = size_t(hints.cast("send_buff_size", 0.0)); //call the helper to resize send and recv buffers resize_buff_helper(udp_trans, recv_buff_size, "recv"); diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp index 518342aba..8a9772e7f 100644 --- a/host/lib/transport/usb_dummy_impl.cpp +++ b/host/lib/transport/usb_dummy_impl.cpp @@ -20,6 +20,7 @@ #include #include +using namespace uhd; using namespace uhd::transport; std::vector usb_device_handle::get_device_list(boost::uint16_t, boost::uint16_t){ @@ -32,8 +33,7 @@ usb_control::sptr usb_control::make(usb_device_handle::sptr){ usb_zero_copy::sptr usb_zero_copy::make( usb_device_handle::sptr, - unsigned int, unsigned int, - size_t, size_t, size_t, size_t + size_t, size_t, const device_addr_t & ){ throw std::runtime_error("no usb support -> usb_zero_copy::make not implemented"); } diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index 6ebd6bb09..276ca86f6 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -106,17 +106,8 @@ static device_addrs_t usrp1_find(const device_addr_t &hint) /*********************************************************************** * Make **********************************************************************/ -template static output_type cast_from_dev_addr( - const device_addr_t &device_addr, - const std::string &key, - output_type def_val -){ - return (device_addr.has_key(key))? - boost::lexical_cast(device_addr[key]) : def_val; -} +static device::sptr usrp1_make(const device_addr_t &device_addr){ -static device::sptr usrp1_make(const device_addr_t &device_addr) -{ //extract the FPGA path for the USRP1 std::string usrp1_fpga_image = find_image_path( device_addr.has_key("fpga")? device_addr["fpga"] : "usrp1_fpga.rbf" @@ -127,29 +118,26 @@ static device::sptr usrp1_make(const device_addr_t &device_addr) std::vector device_list = usb_device_handle::get_device_list(USRP1_VENDOR_ID, USRP1_PRODUCT_ID); - //create data and control transports - usb_zero_copy::sptr data_transport; - usrp_ctrl::sptr usrp_ctrl; - - - BOOST_FOREACH(usb_device_handle::sptr handle, device_list) { - if (handle->get_serial() == device_addr["serial"]) { - usb_control::sptr ctrl_transport = usb_control::make(handle); - usrp_ctrl = usrp_ctrl::make(ctrl_transport); - usrp_ctrl->usrp_load_fpga(usrp1_fpga_image); - - data_transport = usb_zero_copy::make( - handle, // identifier - 6, // IN endpoint - 2, // OUT endpoint - size_t(cast_from_dev_addr(device_addr, "recv_xfer_size", 0)), - size_t(cast_from_dev_addr(device_addr, "recv_num_xfers", 0)), - size_t(cast_from_dev_addr(device_addr, "send_xfer_size", 0)), - size_t(cast_from_dev_addr(device_addr, "send_num_xfers", 0)) - ); + //locate the matching handle in the device list + usb_device_handle::sptr handle; + BOOST_FOREACH(usb_device_handle::sptr dev_handle, device_list) { + if (dev_handle->get_serial() == device_addr["serial"]){ + handle = dev_handle; break; } } + UHD_ASSERT_THROW(handle.get() != NULL); //better be found + + //create control objects and a data transport + usb_control::sptr ctrl_transport = usb_control::make(handle); + usrp_ctrl::sptr usrp_ctrl = usrp_ctrl::make(ctrl_transport); + usrp_ctrl->usrp_load_fpga(usrp1_fpga_image); + usb_zero_copy::sptr data_transport = usb_zero_copy::make( + handle, // identifier + 6, // IN endpoint + 2, // OUT endpoint + device_addr // param hints + ); //create the usrp1 implementation guts return device::sptr(new usrp1_impl(data_transport, usrp_ctrl)); diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 33cec3cbc..07eda08c3 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -206,7 +206,7 @@ size_t usrp2_impl::send( _io_impl->packet_handler_send_state, //last state of the send handler buffs, num_samps, //buffer to fill metadata, send_mode, //samples metadata - io_type, _io_helper.get_tx_otw_type(), //input and output types to convert + io_type, _tx_otw_type, //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_pack_be, boost::bind(&get_send_buffs, _data_transports, _1, timeout), @@ -226,7 +226,7 @@ size_t usrp2_impl::recv( _io_impl->packet_handler_recv_state, //last state of the recv handler buffs, num_samps, //buffer to fill metadata, recv_mode, //samples metadata - io_type, _io_helper.get_rx_otw_type(), //input and output types to convert + io_type, _rx_otw_type, //input and output types to convert _mboards.front()->get_master_clock_freq(), //master clock tick rate uhd::transport::vrt::if_hdr_unpack_be, boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout) diff --git a/host/lib/usrp/usrp2/mboard_impl.cpp b/host/lib/usrp/usrp2/mboard_impl.cpp index 0b9f8ee83..a0e6adfad 100644 --- a/host/lib/usrp/usrp2/mboard_impl.cpp +++ b/host/lib/usrp/usrp2/mboard_impl.cpp @@ -38,10 +38,10 @@ using namespace uhd::usrp; usrp2_mboard_impl::usrp2_mboard_impl( size_t index, transport::udp_simple::sptr ctrl_transport, - const usrp2_io_helper &io_helper + size_t recv_frame_size ): _index(index), - _io_helper(io_helper) + _recv_frame_size(recv_frame_size) { //make a new interface for usrp2 stuff _iface = usrp2_iface::make(ctrl_transport); @@ -75,7 +75,7 @@ usrp2_mboard_impl::usrp2_mboard_impl( this->issue_ddc_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS); //init the rx control registers - _iface->poke32(U2_REG_RX_CTRL_NSAMPS_PER_PKT, _io_helper.get_max_recv_samps_per_packet()); + _iface->poke32(U2_REG_RX_CTRL_NSAMPS_PER_PKT, _recv_frame_size); _iface->poke32(U2_REG_RX_CTRL_NCHANNELS, 1); _iface->poke32(U2_REG_RX_CTRL_CLEAR_OVERRUN, 1); //reset _iface->poke32(U2_REG_RX_CTRL_VRT_HEADER, 0 @@ -178,7 +178,7 @@ void usrp2_mboard_impl::set_time_spec(const time_spec_t &time_spec, bool now){ void usrp2_mboard_impl::issue_ddc_stream_cmd(const stream_cmd_t &stream_cmd){ _iface->poke32(U2_REG_RX_CTRL_STREAM_CMD, dsp_type1::calc_stream_cmd_word( - stream_cmd, _io_helper.get_max_recv_samps_per_packet() + stream_cmd, _recv_frame_size )); _iface->poke32(U2_REG_RX_CTRL_TIME_SECS, boost::uint32_t(stream_cmd.time_spec.get_full_secs())); _iface->poke32(U2_REG_RX_CTRL_TIME_TICKS, stream_cmd.time_spec.get_tick_count(get_master_clock_freq())); diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index 568c87a22..a680708ad 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -124,26 +124,7 @@ static uhd::device_addrs_t usrp2_find(const device_addr_t &hint){ /*********************************************************************** * Make **********************************************************************/ -template -out_type lexical_cast(const in_type &in){ - try{ - return boost::lexical_cast(in); - }catch(...){ - throw std::runtime_error(str(boost::format( - "failed to cast \"%s\" to type \"%s\"" - ) % boost::lexical_cast(in) % typeid(out_type).name())); - } -} - static device::sptr usrp2_make(const device_addr_t &device_addr){ - //extract the receive and send buffer sizes - size_t recv_buff_size = 0, send_buff_size= 0 ; - if (device_addr.has_key("recv_buff_size")){ - recv_buff_size = size_t(lexical_cast(device_addr["recv_buff_size"])); - } - if (device_addr.has_key("send_buff_size")){ - send_buff_size = size_t(lexical_cast(device_addr["send_buff_size"])); - } //create a ctrl and data transport for each address std::vector ctrl_transports; @@ -154,8 +135,7 @@ static device::sptr usrp2_make(const device_addr_t &device_addr){ addr, num2str(USRP2_UDP_CTRL_PORT) )); data_transports.push_back(udp_zero_copy::make( - addr, num2str(USRP2_UDP_DATA_PORT), - recv_buff_size, send_buff_size + addr, num2str(USRP2_UDP_DATA_PORT), device_addr )); } @@ -178,11 +158,23 @@ usrp2_impl::usrp2_impl( ): _data_transports(data_transports) { + //setup rx otw type + _rx_otw_type.width = 16; + _rx_otw_type.shift = 0; + _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + //setup tx otw type + _tx_otw_type.width = 16; + _tx_otw_type.shift = 0; + _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + //!!!!! set the otw type here before continuing, its used below + //create a new mboard handler for each control transport for(size_t i = 0; i < ctrl_transports.size(); i++){ - _mboards.push_back(usrp2_mboard_impl::sptr( - new usrp2_mboard_impl(i, ctrl_transports[i], _io_helper) - )); + _mboards.push_back(usrp2_mboard_impl::sptr(new usrp2_mboard_impl( + i, ctrl_transports[i], this->get_max_recv_samps_per_packet() + ))); //use an empty name when there is only one mboard std::string name = (ctrl_transports.size() > 1)? boost::lexical_cast(i) : ""; _mboard_dict[name] = _mboards.back(); diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index e8763b284..e12c4d6d4 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -71,54 +71,6 @@ private: void set(const wax::obj &key, const wax::obj &val){return _set(key, val);} }; -/*! - * The io helper class encapculates the max packet sizes and otw types. - * The otw types are read-only for now, this will be reimplemented - * when it becomes possible to change the otw type in the usrp2. - */ -class usrp2_io_helper{ -public: - usrp2_io_helper(void){ - //setup rx otw type - _rx_otw_type.width = 16; - _rx_otw_type.shift = 0; - _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; - - //setup tx otw type - _tx_otw_type.width = 16; - _tx_otw_type.shift = 0; - _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; - } - - inline size_t get_max_send_samps_per_packet(void) const{ - return _max_tx_bytes_per_packet/_tx_otw_type.get_sample_size(); - } - - inline size_t get_max_recv_samps_per_packet(void) const{ - return _max_rx_bytes_per_packet/_rx_otw_type.get_sample_size(); - } - - inline const uhd::otw_type_t &get_rx_otw_type(void) const{ - return _rx_otw_type; - } - - inline const uhd::otw_type_t &get_tx_otw_type(void) const{ - return _tx_otw_type; - } - -private: - uhd::otw_type_t _rx_otw_type, _tx_otw_type; - static const size_t _max_rx_bytes_per_packet = uhd::transport::udp_simple::mtu - - uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) - - sizeof(uhd::transport::vrt::if_packet_info_t().tlr) //forced to have trailer - + sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used - ; - static const size_t _max_tx_bytes_per_packet = uhd::transport::udp_simple::mtu - - uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) - + sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used - ; -}; - /*! * USRP2 mboard implementation guts: * The implementation details are encapsulated here. @@ -129,7 +81,11 @@ public: typedef boost::shared_ptr sptr; //structors - usrp2_mboard_impl(size_t index, uhd::transport::udp_simple::sptr, const usrp2_io_helper &); + usrp2_mboard_impl( + size_t index, + uhd::transport::udp_simple::sptr, + size_t recv_frame_size + ); ~usrp2_mboard_impl(void); inline double get_master_clock_freq(void){ @@ -139,7 +95,7 @@ public: private: size_t _index; int _rev_hi, _rev_lo; - const usrp2_io_helper &_io_helper; + const size_t _recv_frame_size; //properties for this mboard void get(const wax::obj &, wax::obj &); @@ -229,7 +185,8 @@ public: //the io interface size_t get_max_send_samps_per_packet(void) const{ - return _io_helper.get_max_send_samps_per_packet(); + const size_t bytes_per_packet = _data_transports.front()->get_send_frame_size() - _max_tx_header_bytes; + return bytes_per_packet/_tx_otw_type.get_sample_size(); } size_t send( const std::vector &, size_t, @@ -237,7 +194,8 @@ public: uhd::device::send_mode_t, double ); size_t get_max_recv_samps_per_packet(void) const{ - return _io_helper.get_max_recv_samps_per_packet(); + const size_t bytes_per_packet = _data_transports.front()->get_recv_frame_size() - _max_rx_header_bytes; + return bytes_per_packet/_rx_otw_type.get_sample_size(); } size_t recv( const std::vector &, size_t, @@ -257,9 +215,20 @@ private: //io impl methods and members std::vector _data_transports; - const usrp2_io_helper _io_helper; UHD_PIMPL_DECL(io_impl) _io_impl; void io_init(void); + + //over-the-wire structs and constants + uhd::otw_type_t _rx_otw_type, _tx_otw_type; + static const size_t _max_rx_header_bytes = 0 + + uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) + + sizeof(uhd::transport::vrt::if_packet_info_t().tlr) //forced to have trailer + - sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used + ; + static const size_t _max_tx_header_bytes = 0 + + uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) + - sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used + ; }; #endif /* INCLUDED_USRP2_IMPL_HPP */ -- cgit v1.2.3 From d2494b0313399b141913ad332315fefbba012e94 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 5 Oct 2010 12:50:53 -0700 Subject: uhd: transport docs for UDP and USB (moved from usrp docs) --- host/docs/CMakeLists.txt | 1 + host/docs/index.rst | 9 ++-- host/docs/transport.rst | 81 +++++++++++++++++++++++++++++++ host/docs/usrp1.rst | 23 --------- host/docs/usrp2.rst | 41 ---------------- host/lib/transport/udp_zero_copy_asio.cpp | 5 +- 6 files changed, 91 insertions(+), 69 deletions(-) create mode 100644 host/docs/transport.rst (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/docs/CMakeLists.txt b/host/docs/CMakeLists.txt index bbb8812b0..65db3befc 100644 --- a/host/docs/CMakeLists.txt +++ b/host/docs/CMakeLists.txt @@ -25,6 +25,7 @@ SET(manual_sources dboards.rst general.rst images.rst + transport.rst usrp1.rst usrp2.rst ) diff --git a/host/docs/index.rst b/host/docs/index.rst index bd55edc0b..7f8129e2d 100644 --- a/host/docs/index.rst +++ b/host/docs/index.rst @@ -20,11 +20,12 @@ Building the UHD ^^^^^^^^^^^^^^^^^^^^^ Application Notes ^^^^^^^^^^^^^^^^^^^^^ -* `General App Notes <./general.html>`_ +* `General Application Notes <./general.html>`_ * `Firmware and FPGA Image Notes <./images.html>`_ -* `USRP1 App Notes <./usrp1.html>`_ -* `USRP2 App Notes <./usrp2.html>`_ -* `Daughterboard App Notes <./dboards.html>`_ +* `USRP1 Application Notes <./usrp1.html>`_ +* `USRP2 Application Notes <./usrp2.html>`_ +* `Daughterboard Application Notes <./dboards.html>`_ +* `Transport Application Notes <./transport.html>`_ ^^^^^^^^^^^^^^^^^^^^^ API Documentation diff --git a/host/docs/transport.rst b/host/docs/transport.rst new file mode 100644 index 000000000..d6a146c67 --- /dev/null +++ b/host/docs/transport.rst @@ -0,0 +1,81 @@ +======================================================================== +UHD - Transport Application Notes +======================================================================== + +.. contents:: Table of Contents + +The advanced user can pass optional parameters +into the underlying transport layer through the device address. +These optional parameters control how the transport object allocates memory, +resizes kernel buffers, spawns threads, etc. +When not spcified, the transport layer will use values for these parameters +that are known to perform well on a variety of systems. +The transport parameters are defined below for the various transports in the UHD: + +------------------------------------------------------------------------ +UDP transport (ASIO) +------------------------------------------------------------------------ +The UDP transport is implemented with Boost's ASIO library. +ASIO provides an asynchronous API for user-space sockets. +The transport implementation allocates a number of buffers +and submits asynchronous requests for send and receive. +IO service threads run in the background to process these requests. + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Transport parameters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The following parameters can be used to alter the transport's default behavior: + +* **recv_frame_size:** The size of a single receive buffer in bytes +* **num_recv_frames:** The number of receive buffers to allocate +* **send_frame_size:** The size of a single send buffer in bytes +* **num_send_frames:** The number of send buffers to allocate +* **concurrency_hint:** The number of threads to run the IO service + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Resize socket buffers +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +It may be useful increase the size of the socket buffers to +move the burden of buffering samples into the kernel, or to +buffer incoming samples faster than they can be processed. +However, if your application cannot process samples fast enough, +no amount of buffering can save you. +The following parameters can be used to alter socket's buffer sizes: + +* **recv_buff_size:** The desired size of the receive buffer in bytes +* **send_buff_size:** The desired size of the send buffer in bytes + +**Note:** Large send buffers tend to decrease transmit performance. + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Linux specific notes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +On linux, the maximum buffer sizes are capped by the sysctl values +**net.core.rmem_max** and **net.core.wmem_max**. +To change the maximum values, run the following commands: +:: + + sudo sysctl -w net.core.rmem_max= + sudo sysctl -w net.core.wmem_max= + +Set the values permanently by editing */etc/sysctl.conf* + +------------------------------------------------------------------------ +USB transport (libusb) +------------------------------------------------------------------------ +The USB transport is implemented with libusb. +Libusb provides an asynchronous API for USB bulk transfers. +The transport implementation allocates a number of buffers +and submits asynchronous requests through libusb. +A single thread runs in the background +and executes the libusb event handler to process these requests. + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Transport parameters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The following parameters can be used to alter the transport's default behavior: + +* **recv_frame_size:** The size of a single receive transfers in bytes +* **num_recv_frames:** The number of simultaneous receive transfers +* **send_frame_size:** The size of a single send transfers in bytes +* **num_send_frames:** The number of simultaneous send transfers diff --git a/host/docs/usrp1.rst b/host/docs/usrp1.rst index 0baa93a45..3443fd871 100644 --- a/host/docs/usrp1.rst +++ b/host/docs/usrp1.rst @@ -60,29 +60,6 @@ Example device address string representations to specify non-standard firmware a fpga=usrp1_fpga_4rx.rbf, fw=usrp1_fw_custom.ihx -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Change USB transfer parameters -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The advanced user may manipulate parameters of the usb bulk transfers -for various reasons, such as lowering latency or increasing buffer size. -By default, the UHD will use values for these parameters -that are known to perform well on a variety of systems. -The following device address parameters can be used to manipulate USB bulk transfers: - -* **recv_xfer_size:** the size of each receive bulk transfer in bytes -* **recv_num_xfers:** the number of simultaneous receive bulk transfers -* **send_xfer_size:** the size of each send bulk transfer in bytes -* **send_num_xfers:** the number of simultaneous send bulk transfers - -Example usage, set the device address markup string to the following: -:: - - serial=12345678, recv_num_xfers=16 - - -- OR -- - - serial=12345678, recv_xfer_size=2048, recv_num_xfers=16 - ------------------------------------------------------------------------ Specifying the subdevice to use ------------------------------------------------------------------------ diff --git a/host/docs/usrp2.rst b/host/docs/usrp2.rst index 70e5ea28b..1ebab388a 100644 --- a/host/docs/usrp2.rst +++ b/host/docs/usrp2.rst @@ -165,47 +165,6 @@ The device address string representation for 2 USRP2s with IPv4 addresses 192.16 addr=192.168.10.2 192.168.20.2 ------------------------------------------------------------------------- -Resize the send and receive buffers ------------------------------------------------------------------------- -It may be useful increase the size of the socket buffers to -move the burden of buffering samples into the kernel, or to -buffer incoming samples faster than they can be processed. -However, if you application cannot process samples fast enough, -no amount of buffering can save you. - -By default, the UHD will try to resize both the send and receive buffer for optimum performance. -A warning will be printed on instantiation if the actual buffer size is insufficient. -See the OS specific notes below: - -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -OS specific notes -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -On linux, the maximum buffer sizes are capped by the sysctl values -**net.core.rmem_max** and **net.core.wmem_max**. -To change the maximum values, run the following commands: -:: - - sudo sysctl -w net.core.rmem_max= - sudo sysctl -w net.core.wmem_max= - -Set the values permanently by editing */etc/sysctl.conf* - -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Device address params -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -To manually set the size of the buffers, -the usrp2 will accept two optional parameters in the device address. -Each parameter will accept a numeric value for the number of bytes. - -* recv_buff_size -* send_buff_size - -Example usage, set the device address markup string to the following: -:: - - addr=192.168.10.2, recv_buff_size=100e6 - ------------------------------------------------------------------------ Hardware setup notes ------------------------------------------------------------------------ diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index ada282e07..a87f17a91 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -248,7 +248,10 @@ template static void resize_buff_helper( if (actual_size < target_size) uhd::print_warning(str(boost::format( "The %s buffer is smaller than the requested size.\n" "The minimum recommended buffer size is %d bytes.\n" - "See the USRP2 application notes on buffer resizing.\n" + "See the transport application notes on buffer resizing.\n" + #if defined(UHD_PLATFORM_LINUX) + "On Linux: sudo sysctl -w net.core.rmem_max=%2%\n" + #endif /*defined(UHD_PLATFORM_LINUX)*/ ) % name % min_sock_buff_size)); } -- cgit v1.2.3 From 7c127b6c82db773ca2dcab372ab28b4dc4e5b347 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Wed, 6 Oct 2010 10:34:10 -0700 Subject: udp: fixed boost format syntax for warning message --- host/lib/transport/udp_zero_copy_asio.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index a87f17a91..7e28caf2d 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -246,11 +246,11 @@ template static void resize_buff_helper( "Current %s sock buff size: %d bytes" ) % name % actual_size << std::endl; if (actual_size < target_size) uhd::print_warning(str(boost::format( - "The %s buffer is smaller than the requested size.\n" - "The minimum recommended buffer size is %d bytes.\n" + "The %1% buffer is smaller than the requested size.\n" + "The minimum recommended buffer size is %2% bytes.\n" "See the transport application notes on buffer resizing.\n" #if defined(UHD_PLATFORM_LINUX) - "On Linux: sudo sysctl -w net.core.rmem_max=%2%\n" + "Please run: sudo sysctl -w net.core.rmem_max=%2%\n" #endif /*defined(UHD_PLATFORM_LINUX)*/ ) % name % min_sock_buff_size)); } -- cgit v1.2.3 From 952db7702f52e34fa05b8795abe46fb22add137d Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Thu, 7 Oct 2010 16:31:41 -0700 Subject: udp: implementation for blocking recv w/ timeout, switch async implementation w/ #define --- host/lib/transport/udp_zero_copy_asio.cpp | 140 +++++++++++++++++++++--------- 1 file changed, 101 insertions(+), 39 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 7e28caf2d..798cc657d 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -35,6 +35,10 @@ namespace asio = boost::asio; /*********************************************************************** * Constants **********************************************************************/ +//Define this to the the boost async io calls to perform receive. +//Otherwise, get_recv_buff uses a blocking receive with timeout. +//#define USE_ASIO_ASYNC_RECV + //enough buffering for half a second of samples at full rate on usrp2 static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); @@ -43,8 +47,15 @@ static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); //but may change with host-based flow control. static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); -//the number of async frames to allocate for each send and recv -static const size_t DEFAULT_NUM_FRAMES = 32; +//The number of async frames to allocate for each send and recv: +//The non-async recv can have a very large number of recv frames +//because the CPU overhead is independent of the number of frames. +#ifdef USE_ASIO_ASYNC_RECV +static const size_t DEFAULT_NUM_RECV_FRAMES = 32; +#else +static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu; +#endif +static const size_t DEFAULT_NUM_SEND_FRAMES = 32; //a single concurrent thread for io_service seems to be the fastest static const size_t CONCURRENCY_HINT = 1; @@ -67,9 +78,9 @@ public: ): _io_service(hints.cast("concurrency_hint", CONCURRENCY_HINT)), _recv_frame_size(size_t(hints.cast("recv_frame_size", udp_simple::mtu))), - _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_FRAMES))), + _num_recv_frames(size_t(hints.cast("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))), _send_frame_size(size_t(hints.cast("send_frame_size", udp_simple::mtu))), - _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_FRAMES))) + _num_send_frames(size_t(hints.cast("num_send_frames", DEFAULT_NUM_SEND_FRAMES))) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -82,6 +93,13 @@ public: _socket = new asio::ip::udp::socket(_io_service); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); + _sock_fd = _socket->native(); + } + + ~udp_zero_copy_asio_impl(void){ + delete _work; //allow io_service run to complete + _thread_group.join_all(); //wait for service threads to exit + delete _socket; } void init(void){ @@ -106,10 +124,9 @@ public: ); } - ~udp_zero_copy_asio_impl(void){ - delete _work; //allow io_service run to complete - _thread_group.join_all(); //wait for service threads to exit - delete _socket; + void service(void){ + set_thread_priority_safe(); + _io_service.run(); } //get size for internal socket buffer @@ -126,6 +143,9 @@ public: return get_buff_size(); } + //////////////////////////////////////////////////////////////////// + #ifdef USE_ASIO_ASYNC_RECV + //////////////////////////////////////////////////////////////////// //! pop a filled recv buffer off of the fifo and bind with the release callback managed_recv_buffer::sptr get_recv_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -142,6 +162,74 @@ public: return managed_recv_buffer::sptr(); } + //! handle a recv callback -> push the filled memory into the fifo + void handle_recv(void *mem, size_t len){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + } + + //! release a recv buffer -> start an async recv on the buffer + void release(void *mem){ + _socket->async_receive( + boost::asio::buffer(mem, _recv_frame_size), + boost::bind( + &udp_zero_copy_asio_impl::handle_recv, + shared_from_this(), mem, + asio::placeholders::bytes_transferred + ) + ); + } + + //////////////////////////////////////////////////////////////////// + #else /*USE_ASIO_ASYNC_RECV*/ + //////////////////////////////////////////////////////////////////// + managed_recv_buffer::sptr get_recv_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = timeout*1e6; + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(_sock_fd, &rset); + + //call select to perform timed wait + if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) + return managed_recv_buffer::sptr(); + + //grab an available buffer + asio::mutable_buffer buff; + if (not _pending_recv_buffs->pop_with_timed_wait(buff, timeout)) + return managed_recv_buffer::sptr(); + + //receive and return the buffer + return managed_recv_buffer::make_safe( + asio::buffer( + boost::asio::buffer_cast(buff), + _socket->receive(boost::asio::buffer(buff)) + ), + boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast(buff) + ) + ); + } + + void release(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait( + boost::asio::buffer(mem, this->get_recv_frame_size()) + ); + } + + //////////////////////////////////////////////////////////////////// + #endif /*USE_ASIO_ASYNC_RECV*/ + //////////////////////////////////////////////////////////////////// + size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} @@ -161,37 +249,6 @@ public: return managed_send_buffer::sptr(); } - size_t get_num_send_frames(void) const {return _num_send_frames;} - size_t get_send_frame_size(void) const {return _send_frame_size;} - -private: - void service(void){ - set_thread_priority_safe(); - _io_service.run(); - } - - /******************************************************************* - * The async send and receive callbacks - ******************************************************************/ - - //! handle a recv callback -> push the filled memory into the fifo - void handle_recv(void *mem, size_t len){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); - } - - //! release a recv buffer -> start an async recv on the buffer - void release(void *mem){ - _socket->async_receive( - boost::asio::buffer(mem, _recv_frame_size), - boost::bind( - &udp_zero_copy_asio_impl::handle_recv, - shared_from_this(), mem, - asio::placeholders::bytes_transferred - ) - ); - } - //! handle a send callback -> push the emptied memory into the fifo void handle_send(void *mem){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -209,10 +266,15 @@ private: ); } + size_t get_num_send_frames(void) const {return _num_send_frames;} + size_t get_send_frame_size(void) const {return _send_frame_size;} + +private: //asio guts -> socket and service asio::ip::udp::socket *_socket; asio::io_service _io_service; asio::io_service::work *_work; + asio::ip::udp::socket::native_type _sock_fd; //memory management -> buffers and fifos boost::thread_group _thread_group; -- cgit v1.2.3 From 201af62ccb4d1065a44822f0f3ce4c81755510b5 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Thu, 7 Oct 2010 16:34:29 -0700 Subject: udp: fix msvc errors for udp transport --- host/lib/transport/udp_zero_copy_asio.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 798cc657d..40fe3fa0f 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -189,7 +189,7 @@ public: //setup timeval for timeout timeval tv; tv.tv_sec = 0; - tv.tv_usec = timeout*1e6; + tv.tv_usec = long(timeout*1e6); //setup rset for timeout fd_set rset; @@ -274,7 +274,7 @@ private: asio::ip::udp::socket *_socket; asio::io_service _io_service; asio::io_service::work *_work; - asio::ip::udp::socket::native_type _sock_fd; + int _sock_fd; //memory management -> buffers and fifos boost::thread_group _thread_group; -- cgit v1.2.3 From e8e9258acb8ce8499c40de52abaa4e6ba83d479d Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Fri, 8 Oct 2010 11:20:30 -0700 Subject: udp: worked blocking send back into udp transport, enable async with #define tweaked some code in bounded buffer --- host/include/uhd/transport/alignment_buffer.ipp | 8 +- host/include/uhd/transport/bounded_buffer.ipp | 39 +++++--- host/lib/transport/udp_zero_copy_asio.cpp | 114 +++++++++++++++--------- 3 files changed, 99 insertions(+), 62 deletions(-) (limited to 'host/lib/transport/udp_zero_copy_asio.cpp') diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp index 5f09de0d9..833b5d399 100644 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ b/host/include/uhd/transport/alignment_buffer.ipp @@ -54,14 +54,14 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ UHD_INLINE bool pop_elems_with_timed_wait( std::vector &elems, double timeout ){ - boost::system_time exit_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1e6)); + boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); buff_contents_type buff_contents_tmp; std::list indexes_to_do(_all_indexes); //do an initial pop to load an initial sequence id size_t index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait( - buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() + buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) )) return false; elems[index] = buff_contents_tmp.first; seq_type expected_seq_id = buff_contents_tmp.second; @@ -76,7 +76,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ indexes_to_do = _all_indexes; index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait( - buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() + buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) )) return false; elems[index] = buff_contents_tmp.first; expected_seq_id = buff_contents_tmp.second; @@ -86,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ //pop an element off for this index index = indexes_to_do.front(); if (not _buffs[index]->pop_with_timed_wait( - buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() + buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time()) )) return false; //if the sequence id matches: diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 58f78bab4..edc7faa06 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -19,18 +19,28 @@ #define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP #include +#include #include #include #include namespace uhd{ namespace transport{ namespace{ /*anon*/ + static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ + return boost::posix_time::microseconds(long(timeout*1e6)); + } + + static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ + return 1e-6*time_dur.total_microseconds(); + } + template class bounded_buffer_impl : public bounded_buffer{ public: bounded_buffer_impl(size_t capacity) : _buffer(capacity){ - /* NOP */ + _not_full_fcn = boost::bind(&bounded_buffer_impl::not_full, this); + _not_empty_fcn = boost::bind(&bounded_buffer_impl::not_empty, this); } UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ @@ -52,17 +62,16 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ UHD_INLINE void push_with_wait(const elem_type &elem){ boost::unique_lock lock(_mutex); - _full_cond.wait(lock, boost::bind(&bounded_buffer_impl::not_full, this)); + _full_cond.wait(lock, _not_full_fcn); _buffer.push_front(elem); lock.unlock(); _empty_cond.notify_one(); } - bool push_with_timed_wait(const elem_type &elem, double timeout){ + UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){ boost::unique_lock lock(_mutex); if (not _full_cond.timed_wait( - lock, boost::posix_time::microseconds(long(timeout*1e6)), - boost::bind(&bounded_buffer_impl::not_full, this) + lock, to_time_dur(timeout), _not_full_fcn )) return false; _buffer.push_front(elem); lock.unlock(); @@ -72,19 +81,18 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ UHD_INLINE void pop_with_wait(elem_type &elem){ boost::unique_lock lock(_mutex); - _empty_cond.wait(lock, boost::bind(&bounded_buffer_impl::not_empty, this)); - this->pop_back(elem); + _empty_cond.wait(lock, _not_empty_fcn); + elem = this->pop_back(); lock.unlock(); _full_cond.notify_one(); } - bool pop_with_timed_wait(elem_type &elem, double timeout){ + UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){ boost::unique_lock lock(_mutex); if (not _empty_cond.timed_wait( - lock, boost::posix_time::microseconds(long(timeout*1e6)), - boost::bind(&bounded_buffer_impl::not_empty, this) + lock, to_time_dur(timeout), _not_empty_fcn )) return false; - this->pop_back(elem); + elem = this->pop_back(); lock.unlock(); _full_cond.notify_one(); return true; @@ -92,7 +100,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ UHD_INLINE void clear(void){ boost::unique_lock lock(_mutex); - while (not_empty()) _buffer.pop_back(); + while (not_empty()) this->pop_back(); lock.unlock(); _full_cond.notify_one(); } @@ -105,16 +113,19 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/ bool not_full(void) const{return not _buffer.full();} bool not_empty(void) const{return not _buffer.empty();} + boost::function _not_full_fcn, _not_empty_fcn; + /*! * Three part operation to pop an element: * 1) assign elem to the back element * 2) assign the back element to empty * 3) pop the back to move the counter */ - UHD_INLINE void pop_back(elem_type &elem){ - elem = _buffer.back(); + UHD_INLINE elem_type pop_back(void){ + elem_type elem = _buffer.back(); _buffer.back() = elem_type(); _buffer.pop_back(); + return elem; } }; }}} //namespace diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 40fe3fa0f..d84aeefdd 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -39,6 +39,10 @@ namespace asio = boost::asio; //Otherwise, get_recv_buff uses a blocking receive with timeout. //#define USE_ASIO_ASYNC_RECV +//Define this to the the boost async io calls to perform send. +//Otherwise, the commit callback uses a blocking send. +//#define USE_ASIO_ASYNC_SEND + //enough buffering for half a second of samples at full rate on usrp2 static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); @@ -55,7 +59,13 @@ static const size_t DEFAULT_NUM_RECV_FRAMES = 32; #else static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu; #endif +//The non-async send only ever requires a single frame +//because the buffer will be committed before a new get. +#ifdef USE_ASIO_ASYNC_SEND static const size_t DEFAULT_NUM_SEND_FRAMES = 32; +#else +static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;; +#endif //a single concurrent thread for io_service seems to be the fastest static const size_t CONCURRENCY_HINT = 1; @@ -143,6 +153,12 @@ public: return get_buff_size(); } + //! handle a recv callback -> push the filled memory into the fifo + UHD_INLINE void handle_recv(void *mem, size_t len){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + } + //////////////////////////////////////////////////////////////////// #ifdef USE_ASIO_ASYNC_RECV //////////////////////////////////////////////////////////////////// @@ -162,16 +178,10 @@ public: return managed_recv_buffer::sptr(); } - //! handle a recv callback -> push the filled memory into the fifo - void handle_recv(void *mem, size_t len){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); - } - //! release a recv buffer -> start an async recv on the buffer void release(void *mem){ _socket->async_receive( - boost::asio::buffer(mem, _recv_frame_size), + boost::asio::buffer(mem, this->get_recv_frame_size()), boost::bind( &udp_zero_copy_asio_impl::handle_recv, shared_from_this(), mem, @@ -185,6 +195,7 @@ public: //////////////////////////////////////////////////////////////////// managed_recv_buffer::sptr get_recv_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; //setup timeval for timeout timeval tv; @@ -196,34 +207,30 @@ public: FD_ZERO(&rset); FD_SET(_sock_fd, &rset); - //call select to perform timed wait - if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) - return managed_recv_buffer::sptr(); - - //grab an available buffer - asio::mutable_buffer buff; - if (not _pending_recv_buffs->pop_with_timed_wait(buff, timeout)) - return managed_recv_buffer::sptr(); - - //receive and return the buffer - return managed_recv_buffer::make_safe( - asio::buffer( - boost::asio::buffer_cast(buff), - _socket->receive(boost::asio::buffer(buff)) - ), - boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), - asio::buffer_cast(buff) - ) - ); + //call select to perform timed wait and grab an available buffer with wait + //if the condition is true, call receive and return the managed buffer + if ( + ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and + _pending_recv_buffs->pop_with_timed_wait(buff, timeout) + ){ + return managed_recv_buffer::make_safe( + asio::buffer( + boost::asio::buffer_cast(buff), + _socket->receive(asio::buffer(buff)) + ), + boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast(buff) + ) + ); + } + return managed_recv_buffer::sptr(); } void release(void *mem){ boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_recv_buffs->push_with_wait( - boost::asio::buffer(mem, this->get_recv_frame_size()) - ); + handle_recv(mem, this->get_recv_frame_size()); } //////////////////////////////////////////////////////////////////// @@ -233,6 +240,12 @@ public: size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} + //! handle a send callback -> push the emptied memory into the fifo + UHD_INLINE void handle_send(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); + } + //! pop an empty send buffer off of the fifo and bind with the commit callback managed_send_buffer::sptr get_send_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -249,12 +262,9 @@ public: return managed_send_buffer::sptr(); } - //! handle a send callback -> push the emptied memory into the fifo - void handle_send(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size)); - } - + //////////////////////////////////////////////////////////////////// + #ifdef USE_ASIO_ASYNC_SEND + //////////////////////////////////////////////////////////////////// //! commit a send buffer -> start an async send on the buffer void commit(void *mem, size_t len){ _socket->async_send( @@ -266,6 +276,18 @@ public: ); } + //////////////////////////////////////////////////////////////////// + #else /*USE_ASIO_ASYNC_SEND*/ + //////////////////////////////////////////////////////////////////// + void commit(void *mem, size_t len){ + _socket->send(asio::buffer(mem, len)); + handle_send(mem); + } + + //////////////////////////////////////////////////////////////////// + #endif /*USE_ASIO_ASYNC_SEND*/ + //////////////////////////////////////////////////////////////////// + size_t get_num_send_frames(void) const {return _num_send_frames;} size_t get_send_frame_size(void) const {return _send_frame_size;} @@ -297,6 +319,13 @@ template static void resize_buff_helper( if (name == "recv") min_sock_buff_size = MIN_RECV_SOCK_BUFF_SIZE; if (name == "send") min_sock_buff_size = MIN_SEND_SOCK_BUFF_SIZE; + std::string help_message; + #if defined(UHD_PLATFORM_LINUX) + help_message = str(boost::format( + "Please run: sudo sysctl -w net.core.%smem_max=%d\n" + ) % ((name == "recv")?"r":"w") % min_sock_buff_size); + #endif /*defined(UHD_PLATFORM_LINUX)*/ + //resize the buffer if size was provided if (target_size > 0){ size_t actual_size = udp_trans->resize_buff(target_size); @@ -308,13 +337,10 @@ template static void resize_buff_helper( "Current %s sock buff size: %d bytes" ) % name % actual_size << std::endl; if (actual_size < target_size) uhd::print_warning(str(boost::format( - "The %1% buffer is smaller than the requested size.\n" - "The minimum recommended buffer size is %2% bytes.\n" - "See the transport application notes on buffer resizing.\n" - #if defined(UHD_PLATFORM_LINUX) - "Please run: sudo sysctl -w net.core.rmem_max=%2%\n" - #endif /*defined(UHD_PLATFORM_LINUX)*/ - ) % name % min_sock_buff_size)); + "The %s buffer is smaller than the requested size.\n" + "The minimum recommended buffer size is %d bytes.\n" + "See the transport application notes on buffer resizing.\n%s" + ) % name % min_sock_buff_size % help_message)); } //only enable on platforms that are happy with the large buffer resize -- cgit v1.2.3