diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 27 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 85 |
2 files changed, 59 insertions, 53 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index c819302b6..ab48e4fc4 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -100,13 +100,13 @@ private: lut_buff_type::sptr _completed_list; //! a list of all transfer structs we allocated - std::vector<libusb_transfer *> _all_luts; + std::vector<libusb_transfer *> _all_luts; - //! a list of shared arrays for the transfer buffers - std::vector<boost::shared_array<boost::uint8_t> > _buffers; + //! a block of memory for the transfer buffers + boost::shared_array<char> _buffer; // Calls for processing asynchronous I/O - libusb_transfer *allocate_transfer(int buff_len); + libusb_transfer *allocate_transfer(void *mem, size_t len); void print_transfer_status(libusb_transfer *lut); }; @@ -154,9 +154,9 @@ usb_endpoint::usb_endpoint( _input(input) { _completed_list = lut_buff_type::make(num_transfers); - + _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]); for (size_t i = 0; i < num_transfers; i++){ - _all_luts.push_back(allocate_transfer(transfer_size)); + _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size)); //input luts are immediately submitted to be filled //output luts go into the completed list as free buffers @@ -193,23 +193,23 @@ usb_endpoint::~usb_endpoint(void){ * Allocate a libusb transfer * The allocated transfer - and buffer it contains - is repeatedly * submitted, reaped, and reused and should not be freed until shutdown. - * \param buff_len size of the individual buffer held by each transfer + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer * \return pointer to an allocated libusb_transfer */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){ libusb_transfer *lut = libusb_alloc_transfer(0); - - boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]); - _buffers.push_back(buff); //store a reference to this shared array + UHD_ASSERT_THROW(lut != NULL); unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); + unsigned char *buff = reinterpret_cast<unsigned char *>(mem); libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); libusb_fill_bulk_transfer(lut, // transfer _handle->get(), // dev_handle endpoint, // endpoint - buff.get(), // buffer - buff_len, // length + buff, // buffer + len, // length lut_callback, // callback this, // user_data 0); // timeout @@ -232,6 +232,7 @@ void usb_endpoint::submit(libusb_transfer *lut){ * \param lut pointer to an libusb_transfer */ void usb_endpoint::print_transfer_status(libusb_transfer *lut){ + std::cout << "here " << lut->status << std::endl; switch (lut->status) { case LIBUSB_TRANSFER_COMPLETED: if (lut->actual_length < lut->length) { diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 70e7514a1..a1eb516fc 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -18,6 +18,7 @@ #include <uhd/transport/udp_zero_copy.hpp> #include <uhd/transport/udp_simple.hpp> //mtu #include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/thread_priority.hpp> #include <uhd/utils/assert.hpp> #include <uhd/utils/warning.hpp> #include <boost/shared_array.hpp> @@ -26,6 +27,7 @@ #include <boost/thread.hpp> #include <iostream> +using namespace uhd; using namespace uhd::transport; namespace asio = boost::asio; @@ -34,11 +36,15 @@ namespace asio = boost::asio; **********************************************************************/ //enough buffering for half a second of samples at full rate on usrp2 static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); + //Large buffers cause more underflow at high rates. //Perhaps this is due to the kernel scheduling, //but may change with host-based flow control. static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); +//the number of async frames to allocate for each send and recv +static const size_t DEFAULT_NUM_ASYNC_FRAMES = 32; + /*********************************************************************** * Zero Copy UDP implementation with ASIO: * This is the portable zero copy implementation for systems @@ -50,51 +56,52 @@ class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_share public: typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; - udp_zero_copy_asio_impl(const std::string &addr, const std::string &port){ + udp_zero_copy_asio_impl( + const std::string &addr, const std::string &port, + size_t recv_frame_size, size_t num_recv_frames, + size_t send_frame_size, size_t num_send_frames + ): + _recv_frame_size(recv_frame_size), _num_recv_frames(num_recv_frames), + _send_frame_size(send_frame_size), _num_send_frames(num_send_frames) + { //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; - // resolve the address + //resolve the address asio::ip::udp::resolver resolver(_io_service); asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); - // create, open, and connect the socket + //create, open, and connect the socket _socket = new asio::ip::udp::socket(_io_service); _socket->open(asio::ip::udp::v4()); _socket->connect(receiver_endpoint); } - ~udp_zero_copy_asio_impl(void){ - _io_service.stop(); - _thread_group.join_all(); - delete _socket; - } - - /*! - * Init, the second contructor: - * Allocate memory and spwan service thread. - */ void init(void){ //allocate all recv frames and release them to begin xfers - _pending_recv_buffs = pending_buffs_type::make(this->get_num_recv_frames()); - for (size_t i = 0; i < this->get_num_recv_frames(); i++){ - boost::shared_array<char> buff(new char[udp_simple::mtu]); - _buffers.push_back(buff); //store a reference to this shared array - release(buff.get()); + _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); + _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]); + for (size_t i = 0; i < _num_recv_frames; i++){ + release(_recv_buffer.get() + i*_recv_frame_size); } //allocate all send frames and push them into the fifo - _pending_send_buffs = pending_buffs_type::make(this->get_num_send_frames()); - for (size_t i = 0; i < this->get_num_send_frames(); i++){ - boost::shared_array<char> buff(new char[udp_simple::mtu]); - _buffers.push_back(buff); //store a reference to this shared array - handle_send(buff.get()); + _pending_send_buffs = pending_buffs_type::make(_num_send_frames); + _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]); + for (size_t i = 0; i < _num_send_frames; i++){ + handle_send(_send_buffer.get() + i*_send_frame_size); } //spawn the service thread that will run the io service _thread_group.create_thread(boost::bind(&udp_zero_copy_asio_impl::service, this)); } + ~udp_zero_copy_asio_impl(void){ + _io_service.stop(); + _thread_group.join_all(); + delete _socket; + } + //get size for internal socket buffer template <typename Opt> size_t get_buff_size(void) const{ Opt option; @@ -109,19 +116,6 @@ public: return get_buff_size<Opt>(); } - //The number of frames is approximately the buffer size divided by the max datagram size. - //In reality, this is a phony zero-copy interface and the number of frames is infinite. - //However, its sensible to advertise a frame count that is approximate to buffer size. - //This way, the transport caller will have an idea about how much buffering to create. - - size_t get_num_recv_frames(void) const{ - return this->get_buff_size<asio::socket_base::receive_buffer_size>()/udp_simple::mtu; - } - - size_t get_num_send_frames(void) const{ - return this->get_buff_size<asio::socket_base::send_buffer_size>()/udp_simple::mtu; - } - //! pop a filled recv buffer off of the fifo and bind with the release callback managed_recv_buffer::sptr get_recv_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -138,6 +132,8 @@ public: return managed_recv_buffer::sptr(); } + size_t get_num_recv_frames(void) const {return _num_recv_frames;} + //! pop an empty send buffer off of the fifo and bind with the commit callback managed_send_buffer::sptr get_send_buff(double timeout){ boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -154,8 +150,11 @@ public: return managed_send_buffer::sptr(); } + size_t get_num_send_frames(void) const {return _num_send_frames;} + private: void service(void){ + set_thread_priority_safe(); _io_service.run(); } @@ -172,7 +171,7 @@ private: //! release a recv buffer -> start an async recv on the buffer void release(void *mem){ _socket->async_receive( - boost::asio::buffer(mem, udp_simple::mtu), + boost::asio::buffer(mem, _recv_frame_size), boost::bind( &udp_zero_copy_asio_impl::handle_recv, shared_from_this(), mem, @@ -184,7 +183,7 @@ private: //! handle a send callback -> push the emptied memory into the fifo void handle_send(void *mem){ boost::this_thread::disable_interruption di; //disable because the wait can throw - _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, udp_simple::mtu)); + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size)); } //! commit a send buffer -> start an async send on the buffer @@ -204,9 +203,11 @@ private: //memory management -> buffers and fifos boost::thread_group _thread_group; - std::vector<boost::shared_array<char> > _buffers; + boost::shared_array<char> _send_buffer, _recv_buffer; typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; }; /*********************************************************************** @@ -253,7 +254,11 @@ udp_zero_copy::sptr udp_zero_copy::make( size_t recv_buff_size, size_t send_buff_size ){ - udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl(addr, port)); + udp_zero_copy_asio_impl::sptr udp_trans(new udp_zero_copy_asio_impl( + addr, port, + udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES, //recv + udp_simple::mtu, DEFAULT_NUM_ASYNC_FRAMES //send + )); //call the helper to resize send and recv buffers resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); |