diff options
Diffstat (limited to 'host/lib/transport')
-rwxr-xr-x | host/lib/transport/gen_vrt_if_packet.py | 51 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 26 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 289 | ||||
-rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 73 | ||||
-rw-r--r-- | host/lib/transport/zero_copy.cpp | 40 |
5 files changed, 232 insertions, 247 deletions
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index dbe026ba3..3ba562d68 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2010 Ettus Research LLC +# Copyright 2010-2011 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -35,6 +35,7 @@ TMPL_TEXT = """ \#include <uhd/utils/byteswap.hpp> \#include <boost/detail/endian.hpp> \#include <stdexcept> +\#include <vector> //define the endian macros to convert integers \#ifdef BOOST_BIG_ENDIAN @@ -48,18 +49,26 @@ TMPL_TEXT = """ using namespace uhd; using namespace uhd::transport; -######################################################################## -#def gen_code($XE_MACRO, $suffix) -######################################################################## +typedef size_t pred_type; +typedef std::vector<pred_type> pred_table_type; +#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff) + +static pred_table_type get_pred_unpack_table(void){ + pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28) + for (size_t i = 0; i < table.size(); i++){ + boost::uint32_t vrt_hdr_word = i << 20; + if(vrt_hdr_word & $hex(0x1 << 28)) table[i] |= $hex($sid_p); + if(vrt_hdr_word & $hex(0x1 << 27)) table[i] |= $hex($cid_p); + if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p); + if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p); + if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); + } + return table; +} ######################################################################## -## setup predicates +#def gen_code($XE_MACRO, $suffix) ######################################################################## -#set $sid_p = 0b00001 -#set $cid_p = 0b00010 -#set $tsi_p = 0b00100 -#set $tsf_p = 0b01000 -#set $tlr_p = 0b10000 void vrt::if_hdr_pack_$(suffix)( boost::uint32_t *packet_buff, @@ -67,7 +76,7 @@ void vrt::if_hdr_pack_$(suffix)( ){ boost::uint32_t vrt_hdr_flags = 0; - boost::uint8_t pred = 0; + pred_type pred = 0; if (if_packet_info.has_sid) pred |= $hex($sid_p); if (if_packet_info.has_cid) pred |= $hex($cid_p); if (if_packet_info.has_tsi) pred |= $hex($tsi_p); @@ -159,12 +168,8 @@ void vrt::if_hdr_unpack_$(suffix)( //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented - boost::uint8_t pred = 0; - if(vrt_hdr_word & $hex(0x1 << 28)) pred |= $hex($sid_p); - if(vrt_hdr_word & $hex(0x1 << 27)) pred |= $hex($cid_p); - if(vrt_hdr_word & $hex(0x3 << 22)) pred |= $hex($tsi_p); - if(vrt_hdr_word & $hex(0x3 << 20)) pred |= $hex($tsf_p); - if(vrt_hdr_word & $hex(0x1 << 26)) pred |= $hex($tlr_p); + static const pred_table_type pred_unpack_table(get_pred_unpack_table()); + const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; switch(pred){ #for $pred in range(2**5) @@ -200,7 +205,7 @@ void vrt::if_hdr_unpack_$(suffix)( if_packet_info.has_tsf = true; if_packet_info.tsf = boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 32; #set $num_header_words += 1 - if_packet_info.tsf |= boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 0; + if_packet_info.tsf |= $(XE_MACRO)(packet_buff[$num_header_words]); #set $num_header_words += 1 #else if_packet_info.has_tsf = false; @@ -239,4 +244,12 @@ def parse_tmpl(_tmpl_text, **kwargs): if __name__ == '__main__': import sys - open(sys.argv[1], 'w').write(parse_tmpl(TMPL_TEXT, file=__file__)) + open(sys.argv[1], 'w').write(parse_tmpl( + TMPL_TEXT, + file=__file__, + sid_p = 0b00001, + cid_p = 0b00010, + tsi_p = 0b00100, + tsf_p = 0b01000, + tlr_p = 0b10000, + )) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 311a8953b..ca37f351f 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -23,7 +23,6 @@ #include <uhd/utils/assert.hpp> #include <boost/foreach.hpp> #include <boost/thread.hpp> -#include <boost/enable_shared_from_this.hpp> #include <vector> #include <iostream> @@ -99,8 +98,7 @@ private: bool _input; //! hold a bounded buffer of completed transfers - typedef bounded_buffer<libusb_transfer *> lut_buff_type; - lut_buff_type::sptr _completed_list; + bounded_buffer<libusb_transfer *> _completed_list; //! a list of all transfer structs we allocated std::vector<libusb_transfer *> _all_luts; @@ -134,7 +132,7 @@ static void callback(libusb_transfer *lut){ * \param pointer to libusb_transfer */ void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ - _completed_list->push_with_wait(lut); + _completed_list.push_with_wait(lut); } @@ -153,9 +151,9 @@ usb_endpoint::usb_endpoint( ): _handle(handle), _endpoint(endpoint), - _input(input) + _input(input), + _completed_list(num_transfers) { - _completed_list = lut_buff_type::make(num_transfers); _buffer_pool = buffer_pool::make(num_transfers, transfer_size); for (size_t i = 0; i < num_transfers; i++){ _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size)); @@ -163,7 +161,7 @@ usb_endpoint::usb_endpoint( //input luts are immediately submitted to be filled //output luts go into the completed list as free buffers if (_input) this->submit(_all_luts.back()); - else _completed_list->push_with_wait(_all_luts.back()); + else _completed_list.push_with_wait(_all_luts.back()); } } @@ -272,15 +270,15 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw - libusb_transfer *lut; - if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; + libusb_transfer *lut = NULL; + if (_completed_list.pop_with_timed_wait(lut, timeout)) return lut; return NULL; } /*********************************************************************** * USB zero_copy device class **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> { +class libusb_zero_copy_impl : public usb_zero_copy{ public: libusb_zero_copy_impl( @@ -400,8 +398,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ } else { return managed_recv_buffer::make_safe( - boost::asio::const_buffer(lut->buffer, lut->actual_length), - boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) + lut->buffer, lut->actual_length, + boost::bind(&libusb_zero_copy_impl::release, this, lut) ); } } @@ -420,8 +418,8 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ } else { return managed_send_buffer::make_safe( - boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), - boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) + lut->buffer, this->get_send_frame_size(), + boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1) ); } } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index a80de7b87..48b0941eb 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -19,53 +19,108 @@ #include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp> #include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> #include <boost/asio.hpp> #include <boost/format.hpp> -#include <boost/thread/thread.hpp> -#include <boost/enable_shared_from_this.hpp> #include <iostream> +#include <vector> using namespace uhd; using namespace uhd::transport; namespace asio = boost::asio; -//Define this to the the boost async io calls to perform receive. -//Otherwise, get_recv_buff uses a blocking receive with timeout. -#define USE_ASIO_ASYNC_RECV - -//Define this to the the boost async io calls to perform send. -//Otherwise, the commit callback uses a blocking send. -//#define USE_ASIO_ASYNC_SEND - -//The asio async receive implementation is broken for some macos. -//Just disable for all macos since we don't know the problem. -#if defined(UHD_PLATFORM_MACOS) && defined(USE_ASIO_ASYNC_RECV) - #undef USE_ASIO_ASYNC_RECV -#endif - -//The number of service threads to spawn for async ASIO: -//A single concurrent thread for io_service seems to be the fastest. -//Threads are disabled when no async implementations are enabled. -#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) -static const size_t CONCURRENCY_HINT = 1; -#else -static const size_t CONCURRENCY_HINT = 0; -#endif - //A reasonable number of frames for send/recv and async/sync static const size_t DEFAULT_NUM_FRAMES = 32; /*********************************************************************** + * Reusable managed receiver buffer: + * - Initialize with memory and a release callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: + typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr; + typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type; + + udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): + _mem(mem), _release_cb(release_cb){/* NOP */} + + void release(void){ + if (_expired) return; + this->_release_cb(this); + _expired = true; + } + + sptr get_new(size_t len){ + _expired = false; + _len = len; + return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); + } + + template <class T> T cast(void) const{return static_cast<T>(_mem);} + +private: + static void fake_deleter(void *obj){ + static_cast<udp_zero_copy_asio_mrb *>(obj)->release(); + } + + const void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bool _expired; + void *_mem; + size_t _len; + release_cb_type _release_cb; +}; + +/*********************************************************************** + * Reusable managed send buffer: + * - Initialize with memory and a commit callback. + * - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: + typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr; + typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type; + + udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): + _mem(mem), _commit_cb(commit_cb){/* NOP */} + + void commit(size_t len){ + if (_expired) return; + this->_commit_cb(this, len); + _expired = true; + } + + sptr get_new(size_t len){ + _expired = false; + _len = len; + return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); + } + +private: + static void fake_deleter(void *obj){ + static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0); + } + + void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bool _expired; + void *_mem; + size_t _len; + commit_cb_type _commit_cb; +}; + +/*********************************************************************** * Zero Copy UDP implementation with ASIO: * This is the portable zero copy implementation for systems * where a faster, platform specific solution is not available. * However, it is not a true zero copy implementation as each * send and recv requires a copy operation to/from userspace. **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { +class udp_zero_copy_asio_impl : public udp_zero_copy{ public: typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; @@ -78,8 +133,9 @@ public: _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), - _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), - _io_service(_concurrency_hint) + _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), + _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), + _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames) { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -93,39 +149,28 @@ public: _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); _sock_fd = _socket->native(); - } - ~udp_zero_copy_asio_impl(void){ - delete _work; //allow io_service run to complete - _thread_group.join_all(); //wait for service threads to exit - delete _socket; - } - - void init(void){ - //allocate all recv frames and release them to begin xfers - _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); - _recv_buffer_pool = buffer_pool::make(_num_recv_frames, _recv_frame_size); - for (size_t i = 0; i < _num_recv_frames; i++){ - release(_recv_buffer_pool->at(i)); + //allocate re-usable managed receive buffers + for (size_t i = 0; i < get_num_recv_frames(); i++){ + _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( + new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) + )); + handle_recv(_mrb_pool.back().get()); } - //allocate all send frames and push them into the fifo - _pending_send_buffs = pending_buffs_type::make(_num_send_frames); - _send_buffer_pool = buffer_pool::make(_num_send_frames, _send_frame_size); - for (size_t i = 0; i < _num_send_frames; i++){ - handle_send(_send_buffer_pool->at(i)); + //allocate re-usable managed send buffers + for (size_t i = 0; i < get_num_send_frames(); i++){ + _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( + new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), + boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) + )); + handle_send(_msb_pool.back().get()); } - - //spawn the service threads that will run the io service - _work = new asio::io_service::work(_io_service); //new work to delete later - for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( - boost::bind(&udp_zero_copy_asio_impl::service, this) - ); } - void service(void){ - set_thread_priority_safe(); - _io_service.run(); + ~udp_zero_copy_asio_impl(void){ + delete _socket; } //get size for internal socket buffer @@ -142,49 +187,12 @@ public: return get_buff_size<Opt>(); } - //! handle a recv callback -> push the filled memory into the fifo - UHD_INLINE void handle_recv(void *mem, size_t len){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ + _pending_recv_buffs.push_with_pop_on_full(mrb); } - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_RECV - //////////////////////////////////////////////////////////////////// - //! pop a filled recv buffer off of the fifo and bind with the release callback managed_recv_buffer::sptr get_recv_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ - return managed_recv_buffer::make_safe( - buff, boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), - asio::buffer_cast<void*>(buff) - ) - ); - } - return managed_recv_buffer::sptr(); - } - - //! release a recv buffer -> start an async recv on the buffer - void release(void *mem){ - _socket->async_receive( - boost::asio::buffer(mem, this->get_recv_frame_size()), - boost::bind( - &udp_zero_copy_asio_impl::handle_recv, - shared_from_this(), mem, - asio::placeholders::bytes_transferred - ) - ); - } - - //////////////////////////////////////////////////////////////////// - #else /*USE_ASIO_ASYNC_RECV*/ - //////////////////////////////////////////////////////////////////// - managed_recv_buffer::sptr get_recv_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; + udp_zero_copy_asio_mrb *mrb; //setup timeval for timeout timeval tv; @@ -196,104 +204,57 @@ public: FD_ZERO(&rset); FD_SET(_sock_fd, &rset); - //call select to perform timed wait and grab an available buffer with wait + //call select to perform timed wait and grab an available buffer now //if the condition is true, call receive and return the managed buffer if ( - ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and - _pending_recv_buffs->pop_with_timed_wait(buff, timeout) + ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 + and _pending_recv_buffs.pop_with_haste(mrb) ){ - return managed_recv_buffer::make_safe( - asio::buffer( - boost::asio::buffer_cast<void *>(buff), - _socket->receive(asio::buffer(buff)) - ), - boost::bind( - &udp_zero_copy_asio_impl::release, - shared_from_this(), - asio::buffer_cast<void*>(buff) - ) - ); + return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0)); } return managed_recv_buffer::sptr(); } - void release(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - handle_recv(mem, this->get_recv_frame_size()); + void release(udp_zero_copy_asio_mrb *mrb){ + handle_recv(mrb); } - //////////////////////////////////////////////////////////////////// - #endif /*USE_ASIO_ASYNC_RECV*/ - //////////////////////////////////////////////////////////////////// - size_t get_num_recv_frames(void) const {return _num_recv_frames;} size_t get_recv_frame_size(void) const {return _recv_frame_size;} - //! handle a send callback -> push the emptied memory into the fifo - UHD_INLINE void handle_send(void *mem){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); + UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ + _pending_send_buffs.push_with_pop_on_full(msb); } - //! pop an empty send buffer off of the fifo and bind with the commit callback - managed_send_buffer::sptr get_send_buff(double timeout){ - boost::this_thread::disable_interruption di; //disable because the wait can throw - asio::mutable_buffer buff; - if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ - return managed_send_buffer::make_safe( - buff, boost::bind( - &udp_zero_copy_asio_impl::commit, - shared_from_this(), - asio::buffer_cast<void*>(buff), _1 - ) - ); + managed_send_buffer::sptr get_send_buff(double){ + udp_zero_copy_asio_msb *msb; + if (_pending_send_buffs.pop_with_haste(msb)){ + return msb->get_new(_send_frame_size); } return managed_send_buffer::sptr(); } - //////////////////////////////////////////////////////////////////// - #ifdef USE_ASIO_ASYNC_SEND - //////////////////////////////////////////////////////////////////// - //! commit a send buffer -> start an async send on the buffer - void commit(void *mem, size_t len){ - _socket->async_send( - boost::asio::buffer(mem, len), - boost::bind( - &udp_zero_copy_asio_impl::handle_send, - shared_from_this(), mem - ) - ); - } - - //////////////////////////////////////////////////////////////////// - #else /*USE_ASIO_ASYNC_SEND*/ - //////////////////////////////////////////////////////////////////// - void commit(void *mem, size_t len){ - _socket->send(asio::buffer(mem, len)); - handle_send(mem); + void commit(udp_zero_copy_asio_msb *msb, size_t len){ + ::send(_sock_fd, msb->cast<const char *>(), len, 0); + handle_send(msb); } - //////////////////////////////////////////////////////////////////// - #endif /*USE_ASIO_ASYNC_SEND*/ - //////////////////////////////////////////////////////////////////// - size_t get_num_send_frames(void) const {return _num_send_frames;} size_t get_send_frame_size(void) const {return _send_frame_size;} private: //memory management -> buffers and fifos - boost::thread_group _thread_group; - buffer_pool::sptr _send_buffer_pool, _recv_buffer_pool; - typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; - pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; const size_t _recv_frame_size, _num_recv_frames; const size_t _send_frame_size, _num_send_frames; + buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; + bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; + bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; + std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; + std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool; //asio guts -> socket and service - size_t _concurrency_hint; asio::io_service _io_service; asio::ip::udp::socket *_socket; - asio::io_service::work *_work; int _sock_fd; }; @@ -346,7 +307,5 @@ udp_zero_copy::sptr udp_zero_copy::make( resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); - udp_trans->init(); //buffers resized -> call init() to use - return udp_trans; } diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index c535edd04..795d5bc62 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -67,13 +67,17 @@ template <typename T> UHD_INLINE T get_context_code( std::vector<const boost::uint8_t *> copy_buffs; size_t size_of_copy_buffs; size_t fragment_offset_in_samps; + std::vector<void *> io_buffs; + uhd::convert::input_type otw_buffs; recv_state(size_t width = 1): width(width), managed_buffs(width), copy_buffs(width, NULL), size_of_copy_buffs(0), - fragment_offset_in_samps(0) + fragment_offset_in_samps(0), + io_buffs(0), //resized later + otw_buffs(1) //always 1 for now { /* NOP */ } @@ -144,7 +148,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t _recv1( recv_state &state, - const std::vector<void *> &buffs, + const uhd::device::recv_buffs_type &buffs, size_t offset_bytes, size_t total_samps, uhd::rx_metadata_t &metadata, @@ -192,17 +196,16 @@ template <typename T> UHD_INLINE T get_context_code( size_t bytes_to_copy = nsamps_to_copy*bytes_per_item; size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff; - std::vector<void *> io_buffs(chans_per_otw_buff); - for (size_t i = 0; i < state.width; i+=chans_per_otw_buff){ + for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ //fill a vector with pointers to the io buffers for (size_t j = 0; j < chans_per_otw_buff; j++){ - io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; + state.io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; } //copy-convert the samples from the recv buffer - uhd::convert::input_type otw_buffs(1, state.copy_buffs[i]); - converter(otw_buffs, io_buffs, nsamps_to_copy_per_io_buff); + state.otw_buffs[0] = state.copy_buffs[i]; + converter(state.otw_buffs, state.io_buffs, nsamps_to_copy_per_io_buff); //update the rx copy buffer to reflect the bytes copied state.copy_buffs[i] += bytes_to_copy; @@ -223,7 +226,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t recv( recv_state &state, - const std::vector<void *> &buffs, + const uhd::device::recv_buffs_type &buffs, const size_t total_num_samps, uhd::rx_metadata_t &metadata, uhd::device::recv_mode_t recv_mode, @@ -236,6 +239,8 @@ template <typename T> UHD_INLINE T get_context_code( size_t vrt_header_offset_words32 = 0, size_t chans_per_otw_buff = 1 ){ + state.io_buffs.resize(chans_per_otw_buff); + uhd::convert::function_type converter( uhd::convert::get_converter_otw_to_cpu( io_type, otw_type, 1, chans_per_otw_buff @@ -300,8 +305,20 @@ template <typename T> UHD_INLINE T get_context_code( struct send_state{ //init the expected seq number size_t next_packet_seq; - - send_state(void) : next_packet_seq(0){ + managed_send_buffs_t managed_buffs; + const boost::uint64_t zeros; + std::vector<const void *> zero_buffs; + std::vector<const void *> io_buffs; + uhd::convert::output_type otw_buffs; + + send_state(size_t width = 1): + next_packet_seq(0), + managed_buffs(width), + zeros(0), + zero_buffs(width, &zeros), + io_buffs(0), //resized later + otw_buffs(1) //always 1 for now + { /* NOP */ } }; @@ -312,7 +329,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t _send1( send_state &state, - const std::vector<const void *> &buffs, + const uhd::device::send_buffs_type &buffs, const size_t offset_bytes, const size_t num_samps, uhd::transport::vrt::if_packet_info_t &if_packet_info, @@ -326,29 +343,27 @@ template <typename T> UHD_INLINE T get_context_code( if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t); if_packet_info.packet_count = state.next_packet_seq; - //get send buffers for each channel - managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); - if (not get_send_buffs(send_buffs)) return 0; + //get send buffers for each otw channel + if (not get_send_buffs(state.managed_buffs)) return 0; - std::vector<const void *> io_buffs(chans_per_otw_buff); for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ //calculate pointers with offsets to io and otw memory for (size_t j = 0; j < chans_per_otw_buff; j++){ - io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; + state.io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; } - boost::uint32_t *otw_mem = send_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; + boost::uint32_t *otw_mem = state.managed_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; //pack metadata into a vrt header vrt_packer(otw_mem, if_packet_info); otw_mem += if_packet_info.num_header_words32; //copy-convert the samples into the send buffer - uhd::convert::output_type otw_buffs(1, otw_mem); - converter(io_buffs, otw_buffs, num_samps); + state.otw_buffs[0] = otw_mem; + converter(state.io_buffs, state.otw_buffs, num_samps); //commit the samples to the zero-copy interface size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); - send_buffs[i]->commit(num_bytes_total); + state.managed_buffs[i]->commit(num_bytes_total); } state.next_packet_seq++; //increment sequence after commits return num_samps; @@ -359,7 +374,7 @@ template <typename T> UHD_INLINE T get_context_code( ******************************************************************/ static UHD_INLINE size_t send( send_state &state, - const std::vector<const void *> &buffs, + const uhd::device::send_buffs_type &buffs, const size_t total_num_samps, const uhd::tx_metadata_t &metadata, uhd::device::send_mode_t send_mode, @@ -372,6 +387,8 @@ template <typename T> UHD_INLINE T get_context_code( size_t vrt_header_offset_words32 = 0, size_t chans_per_otw_buff = 1 ){ + state.io_buffs.resize(chans_per_otw_buff); + uhd::convert::function_type converter( uhd::convert::get_converter_cpu_to_otw( io_type, otw_type, chans_per_otw_buff, 1 @@ -398,19 +415,11 @@ template <typename T> UHD_INLINE T get_context_code( if_packet_info.sob = metadata.start_of_burst; if_packet_info.eob = metadata.end_of_burst; - //TODO remove this code when sample counts of zero are supported by hardware - std::vector<const void *> buffs_(buffs); - size_t total_num_samps_(total_num_samps); - if (total_num_samps == 0){ - static const boost::uint64_t zeros = 0; //max size of a host sample - buffs_ = std::vector<const void *>(buffs.size(), &zeros); - total_num_samps_ = 1; - } - return _send1( state, - buffs_, 0, - std::min(total_num_samps_, max_samples_per_packet), + //TODO remove this code when sample counts of zero are supported by hardware + (total_num_samps)?buffs : state.zero_buffs, 0, + std::max<size_t>(1, std::min(total_num_samps, max_samples_per_packet)), if_packet_info, converter, vrt_packer, diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index a5a864a04..b91eaae1d 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -1,5 +1,5 @@ // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -29,10 +29,9 @@ static void release_nop(void){ class safe_managed_receive_buffer : public managed_recv_buffer{ public: safe_managed_receive_buffer( - const boost::asio::const_buffer &buff, - const release_fcn_t &release_fcn + const void *buff, size_t size, const release_fcn_t &release_fcn ): - _buff(buff), _release_fcn(release_fcn) + _buff(buff), _size(size), _release_fcn(release_fcn) { /* NOP */ } @@ -48,19 +47,23 @@ public: } private: - const boost::asio::const_buffer &get(void) const{ + const void *get_buff(void) const{ return _buff; } - const boost::asio::const_buffer _buff; + size_t get_size(void) const{ + return _size; + } + + const void *_buff; + size_t _size; release_fcn_t _release_fcn; }; managed_recv_buffer::sptr managed_recv_buffer::make_safe( - const boost::asio::const_buffer &buff, - const release_fcn_t &release_fcn + const void *buff, size_t size, const release_fcn_t &release_fcn ){ - return sptr(new safe_managed_receive_buffer(buff, release_fcn)); + return sptr(new safe_managed_receive_buffer(buff, size, release_fcn)); } /*********************************************************************** @@ -73,10 +76,9 @@ static void commit_nop(size_t){ class safe_managed_send_buffer : public managed_send_buffer{ public: safe_managed_send_buffer( - const boost::asio::mutable_buffer &buff, - const commit_fcn_t &commit_fcn + void *buff, size_t size, const commit_fcn_t &commit_fcn ): - _buff(buff), _commit_fcn(commit_fcn) + _buff(buff), _size(size), _commit_fcn(commit_fcn) { /* NOP */ } @@ -92,17 +94,21 @@ public: } private: - const boost::asio::mutable_buffer &get(void) const{ + void *get_buff(void) const{ return _buff; } - const boost::asio::mutable_buffer _buff; + size_t get_size(void) const{ + return _size; + } + + void *_buff; + size_t _size; commit_fcn_t _commit_fcn; }; safe_managed_send_buffer::sptr managed_send_buffer::make_safe( - const boost::asio::mutable_buffer &buff, - const commit_fcn_t &commit_fcn + void *buff, size_t size, const commit_fcn_t &commit_fcn ){ - return sptr(new safe_managed_send_buffer(buff, commit_fcn)); + return sptr(new safe_managed_send_buffer(buff, size, commit_fcn)); } |