diff options
32 files changed, 721 insertions, 778 deletions
| diff --git a/host/docs/CMakeLists.txt b/host/docs/CMakeLists.txt index bbb8812b0..65db3befc 100644 --- a/host/docs/CMakeLists.txt +++ b/host/docs/CMakeLists.txt @@ -25,6 +25,7 @@ SET(manual_sources      dboards.rst      general.rst      images.rst +    transport.rst      usrp1.rst      usrp2.rst  ) diff --git a/host/docs/index.rst b/host/docs/index.rst index bd55edc0b..7f8129e2d 100644 --- a/host/docs/index.rst +++ b/host/docs/index.rst @@ -20,11 +20,12 @@ Building the UHD  ^^^^^^^^^^^^^^^^^^^^^  Application Notes  ^^^^^^^^^^^^^^^^^^^^^ -* `General App Notes <./general.html>`_ +* `General Application Notes <./general.html>`_  * `Firmware and FPGA Image Notes <./images.html>`_ -* `USRP1 App Notes <./usrp1.html>`_ -* `USRP2 App Notes <./usrp2.html>`_ -* `Daughterboard App Notes <./dboards.html>`_ +* `USRP1 Application Notes <./usrp1.html>`_ +* `USRP2 Application Notes <./usrp2.html>`_ +* `Daughterboard Application Notes <./dboards.html>`_ +* `Transport Application Notes <./transport.html>`_  ^^^^^^^^^^^^^^^^^^^^^  API Documentation diff --git a/host/docs/transport.rst b/host/docs/transport.rst new file mode 100644 index 000000000..30fc1d78f --- /dev/null +++ b/host/docs/transport.rst @@ -0,0 +1,82 @@ +======================================================================== +UHD - Transport Application Notes +======================================================================== + +.. contents:: Table of Contents + +The advanced user can pass optional parameters +into the underlying transport layer through the device address. +These optional parameters control how the transport object allocates memory, +resizes kernel buffers, spawns threads, etc. +When not spcified, the transport layer will use values for these parameters +that are known to perform well on a variety of systems. +The transport parameters are defined below for the various transports in the UHD: + +------------------------------------------------------------------------ +UDP transport (ASIO) +------------------------------------------------------------------------ +The UDP transport is implemented with Boost's ASIO library. +ASIO provides an asynchronous API for user-space sockets. +The transport implementation allocates a number of buffers +and submits asynchronous requests for send and receive. +IO service threads run in the background to process these requests. + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Transport parameters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The following parameters can be used to alter the transport's default behavior: + +* **recv_frame_size:** The size of a single receive buffer in bytes +* **num_recv_frames:** The number of receive buffers to allocate +* **send_frame_size:** The size of a single send buffer in bytes +* **num_send_frames:** The number of send buffers to allocate +* **concurrency_hint:** The number of threads to run the IO service + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Resize socket buffers +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +It may be useful increase the size of the socket buffers to +move the burden of buffering samples into the kernel, or to +buffer incoming samples faster than they can be processed. +However, if your application cannot process samples fast enough, +no amount of buffering can save you. +The following parameters can be used to alter socket's buffer sizes: + +* **recv_buff_size:** The desired size of the receive buffer in bytes +* **send_buff_size:** The desired size of the send buffer in bytes + +**Note:** Large send buffers tend to decrease transmit performance. + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Linux specific notes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +On linux, the maximum buffer sizes are capped by the sysctl values +**net.core.rmem_max** and **net.core.wmem_max**. +To change the maximum values, run the following commands: +:: + +    sudo sysctl -w net.core.rmem_max=<new value> +    sudo sysctl -w net.core.wmem_max=<new value> + +Set the values permanently by editing */etc/sysctl.conf* + +------------------------------------------------------------------------ +USB transport (libusb) +------------------------------------------------------------------------ +The USB transport is implemented with libusb. +Libusb provides an asynchronous API for USB bulk transfers. +The transport implementation allocates a number of buffers +and submits asynchronous requests through libusb. +A single thread runs in the background +and executes the libusb event handler to process these requests. + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Transport parameters +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +The following parameters can be used to alter the transport's default behavior: + +* **recv_frame_size:** The size of a single receive transfers in bytes +* **num_recv_frames:** The number of simultaneous receive transfers +* **send_frame_size:** The size of a single send transfers in bytes +* **num_send_frames:** The number of simultaneous send transfers +* **concurrency_hint:** The number of threads to run the event handler diff --git a/host/docs/usrp1.rst b/host/docs/usrp1.rst index 0baa93a45..3443fd871 100644 --- a/host/docs/usrp1.rst +++ b/host/docs/usrp1.rst @@ -60,29 +60,6 @@ Example device address string representations to specify non-standard firmware a      fpga=usrp1_fpga_4rx.rbf, fw=usrp1_fw_custom.ihx -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Change USB transfer parameters -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -The advanced user may manipulate parameters of the usb bulk transfers -for various reasons, such as lowering latency or increasing buffer size. -By default, the UHD will use values for these parameters -that are known to perform well on a variety of systems. -The following device address parameters can be used to manipulate USB bulk transfers: - -* **recv_xfer_size:** the size of each receive bulk transfer in bytes -* **recv_num_xfers:** the number of simultaneous receive bulk transfers -* **send_xfer_size:** the size of each send bulk transfer in bytes -* **send_num_xfers:** the number of simultaneous send bulk transfers - -Example usage, set the device address markup string to the following: -:: - -    serial=12345678, recv_num_xfers=16 - -   -- OR -- - -    serial=12345678, recv_xfer_size=2048, recv_num_xfers=16 -  ------------------------------------------------------------------------  Specifying the subdevice to use  ------------------------------------------------------------------------ diff --git a/host/docs/usrp2.rst b/host/docs/usrp2.rst index 70e5ea28b..1ebab388a 100644 --- a/host/docs/usrp2.rst +++ b/host/docs/usrp2.rst @@ -166,47 +166,6 @@ The device address string representation for 2 USRP2s with IPv4 addresses 192.16      addr=192.168.10.2 192.168.20.2  ------------------------------------------------------------------------ -Resize the send and receive buffers ------------------------------------------------------------------------- -It may be useful increase the size of the socket buffers to -move the burden of buffering samples into the kernel, or to -buffer incoming samples faster than they can be processed. -However, if you application cannot process samples fast enough, -no amount of buffering can save you. - -By default, the UHD will try to resize both the send and receive buffer for optimum performance. -A warning will be printed on instantiation if the actual buffer size is insufficient. -See the OS specific notes below: - -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -OS specific notes -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -On linux, the maximum buffer sizes are capped by the sysctl values -**net.core.rmem_max** and **net.core.wmem_max**. -To change the maximum values, run the following commands: -:: - -    sudo sysctl -w net.core.rmem_max=<new value> -    sudo sysctl -w net.core.wmem_max=<new value> - -Set the values permanently by editing */etc/sysctl.conf* - -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -Device address params -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ -To manually set the size of the buffers, -the usrp2 will accept two optional parameters in the device address. -Each parameter will accept a numeric value for the number of bytes. - -* recv_buff_size -* send_buff_size - -Example usage, set the device address markup string to the following: -:: - -    addr=192.168.10.2, recv_buff_size=100e6 - -------------------------------------------------------------------------  Hardware setup notes  ------------------------------------------------------------------------ diff --git a/host/examples/tx_timed_samples.cpp b/host/examples/tx_timed_samples.cpp index f34c121d5..863446682 100644 --- a/host/examples/tx_timed_samples.cpp +++ b/host/examples/tx_timed_samples.cpp @@ -95,8 +95,11 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){          size_t num_tx_samps = dev->send(              &buff.front(), samps_to_send, md,              uhd::io_type_t::COMPLEX_FLOAT32, -            uhd::device::SEND_MODE_FULL_BUFF +            uhd::device::SEND_MODE_FULL_BUFF, +            //send will backup into the host this many seconds before sending: +            seconds_in_future + 0.1 //timeout (delay before transmit + padding)          ); +        if (num_tx_samps < samps_to_send) std::cout << "Send timeout..." << std::endl;          if(verbose) std::cout << std::endl << boost::format("Sent %d samples") % num_tx_samps << std::endl;      } diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index 2077cae62..992276928 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -41,9 +41,6 @@ public:      typedef boost::function<device_addrs_t(const device_addr_t &)> find_t;      typedef boost::function<sptr(const device_addr_t &)> make_t; -    //! A reasonable default timeout for receive -    static const size_t default_recv_timeout_ms = 100; -      /*!       * Register a device into the discovery and factory system.       * @@ -112,12 +109,15 @@ public:       *       * This is a blocking call and will not return until the number       * of samples returned have been read out of each buffer. +     * Under a timeout condition, the number of samples returned +     * may be less than the number of samples specified.       *       * \param buffs a vector of read-only memory containing IF data       * \param nsamps_per_buff the number of samples to send, per buffer       * \param metadata data describing the buffer's contents       * \param io_type the type of data loaded in the buffer       * \param send_mode tells send how to unload the buffer +     * \param timeout the timeout in seconds to wait on a packet       * \return the number of samples sent       */      virtual size_t send( @@ -125,7 +125,8 @@ public:          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, -        send_mode_t send_mode +        send_mode_t send_mode, +        double timeout = 0.1      ) = 0;      /*! @@ -136,7 +137,8 @@ public:          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, -        send_mode_t send_mode +        send_mode_t send_mode, +        double timeout = 0.1      );      /*! @@ -154,7 +156,9 @@ public:       * See the rx metadata fragment flags and offset fields for details.       *       * This is a blocking call and will not return until the number -     * of samples returned have been written into each buffer or timeout. +     * of samples returned have been written into each buffer. +     * Under a timeout condition, the number of samples returned +     * may be less than the number of samples specified.       *       * When using the full buffer recv mode, the metadata only applies       * to the first packet received and written into the recv buffers. @@ -165,7 +169,7 @@ public:       * \param metadata data to fill describing the buffer       * \param io_type the type of data to fill into the buffer       * \param recv_mode tells recv how to load the buffer -     * \param timeout_ms the timeout in milliseconds to wait for a packet +     * \param timeout the timeout in seconds to wait for a packet       * \return the number of samples received or 0 on error       */      virtual size_t recv( @@ -174,7 +178,7 @@ public:          rx_metadata_t &metadata,          const io_type_t &io_type,          recv_mode_t recv_mode, -        size_t timeout_ms = default_recv_timeout_ms +        double timeout = 0.1      ) = 0;      /*! @@ -186,7 +190,7 @@ public:          rx_metadata_t &metadata,          const io_type_t &io_type,          recv_mode_t recv_mode, -        size_t timeout_ms = default_recv_timeout_ms +        double timeout = 0.1      );      /*! @@ -204,12 +208,11 @@ public:      /*!       * Receive and asynchronous message from the device.       * \param async_metadata the metadata to be filled in -     * \param timeout_ms the timeout in milliseconds to wait for a message +     * \param timeout the timeout in seconds to wait for a message       * \return true when the async_metadata is valid, false for timeout       */      virtual bool recv_async_msg( -        async_metadata_t &async_metadata, -        size_t timeout_ms = default_recv_timeout_ms +        async_metadata_t &async_metadata, double timeout = 0.1      ) = 0;  }; diff --git a/host/include/uhd/device.ipp b/host/include/uhd/device.ipp index 60a3f535d..e2e51ecd0 100644 --- a/host/include/uhd/device.ipp +++ b/host/include/uhd/device.ipp @@ -25,12 +25,13 @@ namespace uhd{          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, -        send_mode_t send_mode +        send_mode_t send_mode, +        double timeout      ){          return this->send(              std::vector<const void *>(1, buff),              nsamps_per_buff, metadata, -            io_type, send_mode +            io_type, send_mode, timeout          );      } @@ -40,12 +41,12 @@ namespace uhd{          rx_metadata_t &metadata,          const io_type_t &io_type,          recv_mode_t recv_mode, -        size_t timeout_ms +        double timeout      ){          return this->recv(              std::vector<void *>(1, buff),              nsamps_per_buff, metadata, -            io_type, recv_mode, timeout_ms +            io_type, recv_mode, timeout          );      } diff --git a/host/include/uhd/transport/alignment_buffer.hpp b/host/include/uhd/transport/alignment_buffer.hpp index 29ba74efc..f44a037f8 100644 --- a/host/include/uhd/transport/alignment_buffer.hpp +++ b/host/include/uhd/transport/alignment_buffer.hpp @@ -48,20 +48,17 @@ namespace uhd{ namespace transport{           * \return true if the element fit without popping for space           */          virtual bool push_with_pop_on_full( -            const elem_type &elem, -            const seq_type &seq, -            size_t index +            const elem_type &elem, const seq_type &seq, size_t index          ) = 0;          /*!           * Pop an aligned set of elements from this alignment buffer.           * \param elems a collection to store the aligned elements -         * \param time the timeout time +         * \param timeout the timeout in seconds           * \return false when the operation times out           */          virtual bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, -            const time_duration_t &time +            std::vector<elem_type> &elems, double timeout          ) = 0;      }; diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp index 61b3b60f5..5f09de0d9 100644 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ b/host/include/uhd/transport/alignment_buffer.ipp @@ -41,9 +41,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE bool push_with_pop_on_full( -            const elem_type &elem, -            const seq_type &seq, -            size_t index +            const elem_type &elem, const seq_type &seq, size_t index          ){              //clear the buffer for this index if the seqs are mis-ordered              if (seq < _last_seqs[index]){ @@ -54,17 +52,16 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE bool pop_elems_with_timed_wait( -            std::vector<elem_type> &elems, -            const time_duration_t &time +            std::vector<elem_type> &elems, double timeout          ){ -            boost::system_time exit_time = boost::get_system_time() + time; +            boost::system_time exit_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1e6));              buff_contents_type buff_contents_tmp;              std::list<size_t> indexes_to_do(_all_indexes);              //do an initial pop to load an initial sequence id              size_t index = indexes_to_do.front();              if (not _buffs[index]->pop_with_timed_wait( -                buff_contents_tmp, exit_time - boost::get_system_time() +                buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()              )) return false;              elems[index] = buff_contents_tmp.first;              seq_type expected_seq_id = buff_contents_tmp.second; @@ -79,7 +76,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/                      indexes_to_do = _all_indexes;                      index = indexes_to_do.front();                      if (not _buffs[index]->pop_with_timed_wait( -                        buff_contents_tmp, exit_time - boost::get_system_time() +                        buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()                      )) return false;                      elems[index] = buff_contents_tmp.first;                      expected_seq_id = buff_contents_tmp.second; @@ -89,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/                  //pop an element off for this index                  index = indexes_to_do.front();                  if (not _buffs[index]->pop_with_timed_wait( -                    buff_contents_tmp, exit_time - boost::get_system_time() +                    buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds()                  )) return false;                  //if the sequence id matches: diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index d1deece96..aca93b071 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -20,13 +20,9 @@  #include <uhd/config.hpp>  #include <boost/shared_ptr.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp>  namespace uhd{ namespace transport{ -    //! typedef for the time duration type for wait operations -    typedef boost::posix_time::time_duration time_duration_t; -      /*!       * Implement a templated bounded buffer:       * Used for passing elements between threads in a producer-consumer model. @@ -64,10 +60,10 @@ namespace uhd{ namespace transport{           * Push a new element into the bounded_buffer.           * Wait until the bounded_buffer becomes non-full or timeout.           * \param elem the new element to push -         * \param time the timeout time +         * \param timeout the timeout in seconds           * \return false when the operation times out           */ -        virtual bool push_with_timed_wait(const elem_type &elem, const time_duration_t &time) = 0; +        virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0;          /*!           * Pop an element from the bounded_buffer. @@ -80,10 +76,10 @@ namespace uhd{ namespace transport{           * Pop an element from the bounded_buffer.           * Wait until the bounded_buffer becomes non-empty or timeout.           * \param elem the element reference pop to -         * \param time the timeout time +         * \param timeout the timeout in seconds           * \return false when the operation times out           */ -        virtual bool pop_with_timed_wait(elem_type &elem, const time_duration_t &time) = 0; +        virtual bool pop_with_timed_wait(elem_type &elem, double timeout) = 0;          /*!           * Clear all elements from the bounded_buffer. diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index e106e229e..58f78bab4 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -21,6 +21,7 @@  #include <boost/bind.hpp>  #include <boost/circular_buffer.hpp>  #include <boost/thread/condition.hpp> +#include <boost/date_time/posix_time/posix_time_types.hpp>  namespace uhd{ namespace transport{ namespace{ /*anon*/ @@ -57,9 +58,12 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              _empty_cond.notify_one();          } -        bool push_with_timed_wait(const elem_type &elem, const time_duration_t &time){ +        bool push_with_timed_wait(const elem_type &elem, double timeout){              boost::unique_lock<boost::mutex> lock(_mutex); -            if (not _full_cond.timed_wait(lock, time, boost::bind(&bounded_buffer_impl<elem_type>::not_full, this))) return false; +            if (not _full_cond.timed_wait( +                lock, boost::posix_time::microseconds(long(timeout*1e6)), +                boost::bind(&bounded_buffer_impl<elem_type>::not_full, this) +            )) return false;              _buffer.push_front(elem);              lock.unlock();              _empty_cond.notify_one(); @@ -69,15 +73,18 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          UHD_INLINE void pop_with_wait(elem_type &elem){              boost::unique_lock<boost::mutex> lock(_mutex);              _empty_cond.wait(lock, boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this)); -            elem = _buffer.back(); _buffer.pop_back(); +            this->pop_back(elem);              lock.unlock();              _full_cond.notify_one();          } -        bool pop_with_timed_wait(elem_type &elem, const time_duration_t &time){ +        bool pop_with_timed_wait(elem_type &elem, double timeout){              boost::unique_lock<boost::mutex> lock(_mutex); -            if (not _empty_cond.timed_wait(lock, time, boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this))) return false; -            elem = _buffer.back(); _buffer.pop_back(); +            if (not _empty_cond.timed_wait( +                lock, boost::posix_time::microseconds(long(timeout*1e6)), +                boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this) +            )) return false; +            this->pop_back(elem);              lock.unlock();              _full_cond.notify_one();              return true; @@ -97,6 +104,18 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          bool not_full(void) const{return not _buffer.full();}          bool not_empty(void) const{return not _buffer.empty();} + +        /*! +         * Three part operation to pop an element: +         * 1) assign elem to the back element +         * 2) assign the back element to empty +         * 3) pop the back to move the counter +         */ +        UHD_INLINE void pop_back(elem_type &elem){ +            elem = _buffer.back(); +            _buffer.back() = elem_type(); +            _buffer.pop_back(); +        }      };  }}} //namespace diff --git a/host/include/uhd/transport/udp_zero_copy.hpp b/host/include/uhd/transport/udp_zero_copy.hpp index 818709973..bbba97b21 100644 --- a/host/include/uhd/transport/udp_zero_copy.hpp +++ b/host/include/uhd/transport/udp_zero_copy.hpp @@ -20,6 +20,7 @@  #include <uhd/config.hpp>  #include <uhd/transport/zero_copy.hpp> +#include <uhd/types/device_addr.hpp>  #include <boost/shared_ptr.hpp>  namespace uhd{ namespace transport{ @@ -50,14 +51,12 @@ public:       *       * \param addr a string representing the destination address       * \param port a string representing the destination port -     * \param recv_buff_size size in bytes for the recv buffer, 0 for automatic -     * \param send_buff_size size in bytes for the send buffer, 0 for automatic +     * \param hints optional parameters to pass to the underlying transport       */      static sptr make(          const std::string &addr,          const std::string &port, -        size_t recv_buff_size = 0, -        size_t send_buff_size = 0 +        const device_addr_t &hints = device_addr_t()      );  }; diff --git a/host/include/uhd/transport/usb_control.hpp b/host/include/uhd/transport/usb_control.hpp index f9829c3ec..e6c32f78e 100644 --- a/host/include/uhd/transport/usb_control.hpp +++ b/host/include/uhd/transport/usb_control.hpp @@ -18,7 +18,7 @@  #ifndef INCLUDED_UHD_TRANSPORT_USB_CONTROL_HPP  #define INCLUDED_UHD_TRANSPORT_USB_CONTROL_HPP -#include "usb_device_handle.hpp" +#include <uhd/transport/usb_device_handle.hpp>  namespace uhd { namespace transport { diff --git a/host/include/uhd/transport/usb_zero_copy.hpp b/host/include/uhd/transport/usb_zero_copy.hpp index 61bf380ba..b39171fba 100644 --- a/host/include/uhd/transport/usb_zero_copy.hpp +++ b/host/include/uhd/transport/usb_zero_copy.hpp @@ -18,8 +18,9 @@  #ifndef INCLUDED_UHD_TRANSPORT_USB_ZERO_COPY_HPP  #define INCLUDED_UHD_TRANSPORT_USB_ZERO_COPY_HPP -#include "usb_device_handle.hpp" +#include <uhd/transport/usb_device_handle.hpp>  #include <uhd/transport/zero_copy.hpp> +#include <uhd/types/device_addr.hpp>  namespace uhd { namespace transport { @@ -47,19 +48,13 @@ public:       * \param handle a device handle that uniquely identifying the device       * \param recv_endpoint an integer specifiying an IN endpoint number       * \param send_endpoint an integer specifiying an OUT endpoint number -     * \param recv_xfer_size the number of bytes for each receive transfer -     * \param recv_num_xfers the number of simultaneous receive transfers -     * \param send_xfer_size the number of bytes for each send transfer -     * \param send_num_xfers the number of simultaneous send transfers +     * \param hints optional parameters to pass to the underlying transport       */      static sptr make(          usb_device_handle::sptr handle, -        unsigned int recv_endpoint, -        unsigned int send_endpoint, -        size_t recv_xfer_size = 0, -        size_t recv_num_xfers = 0, -        size_t send_xfer_size = 0, -        size_t send_num_xfers = 0 +        size_t recv_endpoint, +        size_t send_endpoint, +        const device_addr_t &hints = device_addr_t()      );  }; diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 8ecafd3fb..7d8fb4b83 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -19,10 +19,10 @@  #define INCLUDED_UHD_TRANSPORT_ZERO_COPY_HPP  #include <uhd/config.hpp> -#include <uhd/utils/pimpl.hpp>  #include <boost/asio/buffer.hpp>  #include <boost/utility.hpp>  #include <boost/shared_ptr.hpp> +#include <boost/function.hpp>  namespace uhd{ namespace transport{ @@ -34,14 +34,27 @@ namespace uhd{ namespace transport{      class UHD_API managed_recv_buffer : boost::noncopyable{      public:          typedef boost::shared_ptr<managed_recv_buffer> sptr; +        typedef boost::function<void(void)> release_fcn_t; + +        /*! +         * Make a safe managed receive buffer: +         * A safe managed buffer ensures that release is called once, +         * either by the user or automatically upon deconstruction. +         * \param buff a reference to the constant buffer +         * \param release_fcn callback to release the memory +         * \return a new managed receive buffer +         */ +        static sptr make_safe( +            const boost::asio::const_buffer &buff, +            const release_fcn_t &release_fcn +        );          /*! -         * Managed recv buffer destructor:           * Signal to the transport that we are done with the buffer. -         * This should be called to release the buffer to the transport. +         * This should be called to release the buffer to the transport object.           * After calling, the referenced memory should be considered invalid.           */ -        virtual ~managed_recv_buffer(void) = 0; +        virtual void release(void) = 0;          /*!           * Get the size of the underlying buffer. @@ -71,20 +84,34 @@ namespace uhd{ namespace transport{      /*!       * A managed send buffer:       * Contains a reference to transport-managed memory, -     * and a method to release the memory after writing. +     * and a method to commit the memory after writing.       */      class UHD_API managed_send_buffer : boost::noncopyable{      public:          typedef boost::shared_ptr<managed_send_buffer> sptr; +        typedef boost::function<void(size_t)> commit_fcn_t; + +        /*! +         * Make a safe managed send buffer: +         * A safe managed buffer ensures that commit is called once, +         * either by the user or automatically upon deconstruction. +         * In the later case, the deconstructor will call commit(0). +         * \param buff a reference to the mutable buffer +         * \param commit_fcn callback to commit the memory +         * \return a new managed send buffer +         */ +        static sptr make_safe( +            const boost::asio::mutable_buffer &buff, +            const commit_fcn_t &commit_fcn +        );          /*!           * Signal to the transport that we are done with the buffer.           * This should be called to commit the write to the transport object.           * After calling, the referenced memory should be considered invalid.           * \param num_bytes the number of bytes written into the buffer -         * \return the number of bytes written, 0 for timeout, negative for error           */ -        virtual ssize_t commit(size_t num_bytes) = 0; +        virtual void commit(size_t num_bytes) = 0;          /*!           * Get the size of the underlying buffer. @@ -122,105 +149,46 @@ namespace uhd{ namespace transport{          /*!           * Get a new receive buffer from this transport object. -         * \param timeout_ms the timeout to get the buffer in ms +         * \param timeout the timeout to get the buffer in seconds           * \return a managed buffer, or null sptr on timeout/error           */ -        virtual managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms) = 0; +        virtual managed_recv_buffer::sptr get_recv_buff(double timeout = 0.1) = 0;          /*! -         * Get the maximum number of receive frames: -         *   The maximum number of valid managed recv buffers, -         *   or the maximum number of frames in the ring buffer, -         *   depending upon the underlying implementation. +         * Get the number of receive frames: +         * The number of simultaneous receive buffers in use.           * \return number of frames           */          virtual size_t get_num_recv_frames(void) const = 0;          /*! -         * Get a new send buffer from this transport object. -         * \return a managed buffer, or null sptr on timeout/error -         */ -        virtual managed_send_buffer::sptr get_send_buff(void) = 0; - -        /*! -         * Get the maximum number of send frames: -         *   The maximum number of valid managed send buffers, -         *   or the maximum number of frames in the ring buffer, -         *   depending upon the underlying implementation. -         * \return number of frames -         */ -        virtual size_t get_num_send_frames(void) const = 0; - -    }; - -    /*! -     * A phony-zero-copy interface for transport objects that -     * provides a zero-copy interface on top of copying transport. -     * This interface implements the get managed recv buffer, -     * the base class must implement the private recv method. -     */ -    class UHD_API phony_zero_copy_recv_if : public virtual zero_copy_if{ -    public: -        /*! -         * Create a phony zero copy recv interface. -         * \param max_buff_size max buffer size in bytes +         * Get the size of a receive frame: +         * The maximum capacity of a single receive buffer. +         * \return frame size in bytes           */ -        phony_zero_copy_recv_if(size_t max_buff_size); - -        //! destructor -        virtual ~phony_zero_copy_recv_if(void); +        virtual size_t get_recv_frame_size(void) const = 0;          /*! -         * Get a new receive buffer from this transport object. -         * \param timeout_ms the timeout to get the buffer in ms +         * Get a new send buffer from this transport object. +         * \param timeout the timeout to get the buffer in seconds           * \return a managed buffer, or null sptr on timeout/error           */ -        managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); +        virtual managed_send_buffer::sptr get_send_buff(double timeout = 0.1) = 0; -    private:          /*! -         * Perform a private copying recv. -         * \param buff the buffer to write data into -         * \param timeout_ms the timeout to get the buffer in ms -         * \return the number of bytes written to buff, 0 for timeout, negative for error -         */ -        virtual ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms) = 0; - -        UHD_PIMPL_DECL(impl) _impl; -    }; - -    /*! -     * A phony-zero-copy interface for transport objects that -     * provides a zero-copy interface on top of copying transport. -     * This interface implements the get managed send buffer, -     * the base class must implement the private send method. -     */ -    class UHD_API phony_zero_copy_send_if : public virtual zero_copy_if{ -    public: -        /*! -         * Create a phony zero copy send interface. -         * \param max_buff_size max buffer size in bytes -         */ -        phony_zero_copy_send_if(size_t max_buff_size); - -        //! destructor -        virtual ~phony_zero_copy_send_if(void); - -        /*! -         * Get a new send buffer from this transport object. -         * \return a managed buffer, or null sptr on timeout/error +         * Get the number of send frames: +         * The number of simultaneous send buffers in use. +         * \return number of frames           */ -        managed_send_buffer::sptr get_send_buff(void); +        virtual size_t get_num_send_frames(void) const = 0; -    private:          /*! -         * Perform a private copying send. -         * \param buff the buffer to read data from -         * \return the number of bytes read from buff, 0 for timeout, negative for error +         * Get the size of a send frame: +         * The maximum capacity of a single send buffer. +         * \return frame size in bytes           */ -        virtual ssize_t send(const boost::asio::const_buffer &buff) = 0; +        virtual size_t get_send_frame_size(void) const = 0; -        UHD_PIMPL_DECL(impl) _impl;      };  }} //namespace diff --git a/host/include/uhd/types/device_addr.hpp b/host/include/uhd/types/device_addr.hpp index e359d9467..eb3394230 100644 --- a/host/include/uhd/types/device_addr.hpp +++ b/host/include/uhd/types/device_addr.hpp @@ -20,6 +20,8 @@  #include <uhd/config.hpp>  #include <uhd/types/dict.hpp> +#include <boost/lexical_cast.hpp> +#include <stdexcept>  #include <vector>  #include <string> @@ -62,6 +64,24 @@ namespace uhd{           * \return a string with delimiter markup           */          std::string to_string(void) const; + +        /*! +         * Lexically cast a parameter to the specified type, +         * or use the default value if the key is not found. +         * \param key the key as one of the address parameters +         * \param def the value to use when key is not present +         * \return the casted value as type T or the default +         * \throw error when the parameter cannot be casted +         */ +        template <typename T> T cast(const std::string &key, const T &def) const{ +            if (not this->has_key(key)) return def; +            try{ +                return boost::lexical_cast<T>((*this)[key]); +            } +            catch(const boost::bad_lexical_cast &){ +                throw std::runtime_error("cannot cast " + key + " = " + (*this)[key]); +            } +        }      };      //handy typedef for a vector of device addresses diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 61616d077..b9ec7a6ad 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -36,6 +36,10 @@ IF(LIBUSB_FOUND)          INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/lib/transport/msvc)      ENDIF(MSVC)      SET(HAVE_USB_SUPPORT TRUE) +ELSE(LIBUSB_FOUND) +    LIBUHD_APPEND_SOURCES( +        ${CMAKE_SOURCE_DIR}/lib/transport/usb_dummy_impl.cpp +    )  ENDIF(LIBUSB_FOUND)  IF(HAVE_USB_SUPPORT) diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp index 1dd0b48e8..cfa77d9ca 100644 --- a/host/lib/transport/libusb1_base.cpp +++ b/host/lib/transport/libusb1_base.cpp @@ -16,12 +16,10 @@  //  #include "libusb1_base.hpp" -#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp>  #include <uhd/types/dict.hpp>  #include <boost/weak_ptr.hpp>  #include <boost/foreach.hpp> -#include <boost/thread.hpp>  #include <iostream>  using namespace uhd; @@ -35,14 +33,9 @@ public:      libusb_session_impl(void){          UHD_ASSERT_THROW(libusb_init(&_context) == 0);          libusb_set_debug(_context, debug_level); -        _mutex.lock(); -        _thread_group.create_thread(boost::bind(&libusb_session_impl::run_event_loop, this)); -        _mutex.lock();      }      ~libusb_session_impl(void){ -        _running = false; -        _thread_group.join_all();          libusb_exit(_context);      } @@ -52,21 +45,6 @@ public:  private:      libusb_context *_context; -    boost::thread_group _thread_group; -    bool _running; -    boost::mutex _mutex; - -    void run_event_loop(void){ -        set_thread_priority_safe(); -        _running = true; -        timeval tv; -        _mutex.unlock(); -        while(_running){ -            tv.tv_sec = 0; -            tv.tv_usec = 100000; //100ms -            libusb_handle_events_timeout(this->get_context(), &tv); -        } -    }  };  libusb::session::sptr libusb::session::get_global_session(void){ diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f9beb0b4c..f589d7c77 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -18,18 +18,19 @@  #include "libusb1_base.hpp"  #include <uhd/transport/usb_zero_copy.hpp>  #include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp>  #include <boost/shared_array.hpp>  #include <boost/foreach.hpp>  #include <boost/thread.hpp> +#include <boost/enable_shared_from_this.hpp>  #include <vector>  #include <iostream> -#include <iomanip> +using namespace uhd;  using namespace uhd::transport; -const int libusb_timeout = 0; - +static const double CLEANUP_TIMEOUT   = 0.2;    //seconds  static const size_t DEFAULT_NUM_XFERS = 16;     //num xfers  static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes @@ -84,10 +85,10 @@ public:       * Get an available transfer:       * For inputs, this is a just filled transfer.       * For outputs, this is a just emptied transfer. -     * \param timeout_ms the timeout to wait for a lut +     * \param timeout the timeout to wait for a lut       * \return the transfer pointer or NULL if timeout       */ -    libusb_transfer *get_lut_with_wait(size_t timeout_ms = 100); +    libusb_transfer *get_lut_with_wait(double timeout);      //Callback use only      void callback_handle_transfer(libusb_transfer *lut); @@ -97,21 +98,18 @@ private:      int  _endpoint;      bool _input; -    size_t _transfer_size; -    size_t _num_transfers; -      //! hold a bounded buffer of completed transfers      typedef bounded_buffer<libusb_transfer *> lut_buff_type;      lut_buff_type::sptr _completed_list;      //! a list of all transfer structs we allocated -    std::vector<libusb_transfer *>  _all_luts; +    std::vector<libusb_transfer *> _all_luts; -    //! a list of shared arrays for the transfer buffers -    std::vector<boost::shared_array<boost::uint8_t> > _buffers; +    //! a block of memory for the transfer buffers +    boost::shared_array<char> _buffer;      // Calls for processing asynchronous I/O -    libusb_transfer *allocate_transfer(int buff_len); +    libusb_transfer *allocate_transfer(void *mem, size_t len);      void print_transfer_status(libusb_transfer *lut);  }; @@ -156,14 +154,12 @@ usb_endpoint::usb_endpoint(  ):      _handle(handle),      _endpoint(endpoint), -    _input(input), -    _transfer_size(transfer_size), -    _num_transfers(num_transfers) +    _input(input)  {      _completed_list = lut_buff_type::make(num_transfers); - -    for (size_t i = 0; i < _num_transfers; i++){ -        _all_luts.push_back(allocate_transfer(_transfer_size)); +    _buffer = boost::shared_array<char>(new char[num_transfers*transfer_size]); +    for (size_t i = 0; i < num_transfers; i++){ +        _all_luts.push_back(allocate_transfer(_buffer.get() + i*transfer_size, transfer_size));          //input luts are immediately submitted to be filled          //output luts go into the completed list as free buffers @@ -187,7 +183,7 @@ usb_endpoint::~usb_endpoint(void){      }      //collect canceled transfers (drain the queue) -    while (this->get_lut_with_wait() != NULL){}; +    while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){};      //free all transfers      BOOST_FOREACH(libusb_transfer *lut, _all_luts){ @@ -200,23 +196,23 @@ usb_endpoint::~usb_endpoint(void){   * Allocate a libusb transfer   * The allocated transfer - and buffer it contains - is repeatedly   * submitted, reaped, and reused and should not be freed until shutdown. - * \param buff_len size of the individual buffer held by each transfer + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer   * \return pointer to an allocated libusb_transfer   */ -libusb_transfer *usb_endpoint::allocate_transfer(int buff_len){ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){      libusb_transfer *lut = libusb_alloc_transfer(0); - -    boost::shared_array<boost::uint8_t> buff(new boost::uint8_t[buff_len]); -    _buffers.push_back(buff); //store a reference to this shared array +    UHD_ASSERT_THROW(lut != NULL);      unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); +    unsigned char *buff = reinterpret_cast<unsigned char *>(mem);      libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback);      libusb_fill_bulk_transfer(lut,                // transfer                                _handle->get(),     // dev_handle                                endpoint,           // endpoint -                              buff.get(),         // buffer -                              buff_len,           // length +                              buff,               // buffer +                              len,                // length                                lut_callback,       // callback                                this,               // user_data                                0);                 // timeout @@ -239,6 +235,7 @@ void usb_endpoint::submit(libusb_transfer *lut){   * \param lut pointer to an libusb_transfer   */  void usb_endpoint::print_transfer_status(libusb_transfer *lut){ +    std::cout << "here " << lut->status << std::endl;      switch (lut->status) {      case LIBUSB_TRANSFER_COMPLETED:          if (lut->actual_length < lut->length) { @@ -274,136 +271,75 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){      }  } -libusb_transfer *usb_endpoint::get_lut_with_wait(size_t timeout_ms){ +libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){      boost::this_thread::disable_interruption di; //disable because the wait can throw      libusb_transfer *lut; -    if (_completed_list->pop_with_timed_wait( -        lut, boost::posix_time::milliseconds(timeout_ms) -    )) return lut; +    if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut;      return NULL;  }  /*********************************************************************** - * Managed buffers + * USB zero_copy device class   **********************************************************************/ -/* - * Libusb managed receive buffer - * Construct a recv buffer from a libusb transfer. The memory held by - * the libusb transfer is exposed through the managed buffer interface. - * Upon destruction, the transfer and buffer are resubmitted to the - * endpoint for further use. - */ -class libusb_managed_recv_buffer_impl : public managed_recv_buffer { +class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> {  public: -    libusb_managed_recv_buffer_impl(libusb_transfer *lut, -                                    usb_endpoint::sptr endpoint) -        : _buff(lut->buffer, lut->length) -    { -        _lut = lut; -        _endpoint = endpoint; -    } - -    ~libusb_managed_recv_buffer_impl(void){ -        _endpoint->submit(_lut); -    } -private: -    const boost::asio::const_buffer &get(void) const{ -        return _buff; -    } - -    libusb_transfer *_lut; -    usb_endpoint::sptr _endpoint; -    const boost::asio::const_buffer _buff; -}; +    libusb_zero_copy_impl( +        libusb::device_handle::sptr handle, +        size_t recv_endpoint, +        size_t send_endpoint, +        const device_addr_t &hints +    ); -/* - * Libusb managed send buffer - * Construct a send buffer from a libusb transfer. The memory held by - * the libusb transfer is exposed through the managed buffer interface. - * Committing the buffer will set the data length and submit the buffer - * to the endpoint. Submitting a buffer multiple times or destroying - * the buffer before committing is an error. For the latter, the transfer - * is returned to the endpoint with no data for reuse. - */ -class libusb_managed_send_buffer_impl : public managed_send_buffer { -public: -    libusb_managed_send_buffer_impl(libusb_transfer *lut, -                                    usb_endpoint::sptr endpoint) -        : _buff(lut->buffer, lut->length), _committed(false) -    { -        _lut = lut; -        _endpoint = endpoint; +    ~libusb_zero_copy_impl(void){ +        _threads_running = false; +        _thread_group.join_all();      } -    ~libusb_managed_send_buffer_impl(void){ -        if (!_committed) { -            _lut->length = 0; -            _lut->actual_length = 0; -            _endpoint->submit(_lut); -        } -    } +    managed_recv_buffer::sptr get_recv_buff(double); +    managed_send_buffer::sptr get_send_buff(double); -    ssize_t commit(size_t num_bytes) -    { -        if (_committed) { -            std::cerr << "UHD: send buffer already committed" << std::endl; -            return 0; -        } +    size_t get_num_recv_frames(void) const { return _num_recv_frames; } +    size_t get_num_send_frames(void) const { return _num_send_frames; } -        UHD_ASSERT_THROW(num_bytes <= boost::asio::buffer_size(_buff)); +    size_t get_recv_frame_size(void) const { return _recv_frame_size; } +    size_t get_send_frame_size(void) const { return _send_frame_size; } -        _lut->length = num_bytes; -        _lut->actual_length = 0; +private: +    void release(libusb_transfer *lut){ +        _recv_ep->submit(lut); +    } +    void commit(libusb_transfer *lut, size_t num_bytes){ +        lut->length = num_bytes;          try{ -            _endpoint->submit(_lut); -            _committed = true; -            return num_bytes; +            _send_ep->submit(lut);          }          catch(const std::exception &e){              std::cerr << "Error in commit: " << e.what() << std::endl; -            return -1;          }      } -private: -    const boost::asio::mutable_buffer &get(void) const{ -        return _buff; -    } - -    libusb_transfer *_lut; -    usb_endpoint::sptr _endpoint; -    const boost::asio::mutable_buffer _buff; -    bool _committed; -}; - - -/*********************************************************************** - * USB zero_copy device class - **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy -{ -private:      libusb::device_handle::sptr _handle; -    size_t _recv_num_frames, _send_num_frames; +    const size_t _recv_frame_size, _num_recv_frames; +    const size_t _send_frame_size, _num_send_frames;      usb_endpoint::sptr _recv_ep, _send_ep; -public: -    typedef boost::shared_ptr<libusb_zero_copy_impl> sptr; - -    libusb_zero_copy_impl( -        libusb::device_handle::sptr handle, -        unsigned int recv_endpoint, unsigned int send_endpoint, -        size_t recv_xfer_size, size_t recv_num_xfers, -        size_t send_xfer_size, size_t send_num_xfers -    ); - -    managed_recv_buffer::sptr get_recv_buff(size_t timeout_ms); -    managed_send_buffer::sptr get_send_buff(void); - -    size_t get_num_recv_frames(void) const { return _recv_num_frames; } -    size_t get_num_send_frames(void) const { return _send_num_frames; } +    //event handler threads +    boost::thread_group _thread_group; +    bool _threads_running; + +    void run_event_loop(void){ +        set_thread_priority_safe(); +        libusb::session::sptr session = libusb::session::get_global_session(); +        _threads_running = true; +        while(_threads_running){ +            timeval tv; +            tv.tv_sec = 0; +            tv.tv_usec = 100000; //100ms +            libusb_handle_events_timeout(session->get_context(), &tv); +        } +    }  };  /* @@ -413,26 +349,16 @@ public:   */  libusb_zero_copy_impl::libusb_zero_copy_impl(      libusb::device_handle::sptr handle, -    unsigned int recv_endpoint, unsigned int send_endpoint, -    size_t recv_xfer_size, size_t recv_num_xfers, -    size_t send_xfer_size, size_t send_num_xfers -){ -    _handle = handle; - -    //if the sizes are left at 0 (automatic) -> use the defaults -    if (recv_xfer_size == 0) recv_xfer_size = DEFAULT_XFER_SIZE; -    if (recv_num_xfers == 0) recv_num_xfers = DEFAULT_NUM_XFERS; -    if (send_xfer_size == 0) send_xfer_size = DEFAULT_XFER_SIZE; -    if (send_num_xfers == 0) send_num_xfers = DEFAULT_NUM_XFERS; - -    //sanity check the transfer sizes -    UHD_ASSERT_THROW(recv_xfer_size % 512 == 0); -    UHD_ASSERT_THROW(send_xfer_size % 512 == 0); - -    //store the num xfers for the num frames count -    _recv_num_frames = recv_num_xfers; -    _send_num_frames = send_num_xfers; - +    size_t recv_endpoint, +    size_t send_endpoint, +    const device_addr_t &hints +): +    _handle(handle), +    _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), +    _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), +    _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), +    _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))) +{      _handle->claim_interface(2 /*in interface*/);      _handle->claim_interface(1 /*out interface*/); @@ -440,17 +366,23 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(                                _handle,         // libusb device_handle                                recv_endpoint,   // USB endpoint number                                true,            // IN endpoint -                              recv_xfer_size,  // buffer size per transfer -                              recv_num_xfers   // number of libusb transfers +                              this->get_recv_frame_size(),  // buffer size per transfer +                              this->get_num_recv_frames()   // number of libusb transfers      ));      _send_ep = usb_endpoint::sptr(new usb_endpoint(                                _handle,         // libusb device_handle                                send_endpoint,   // USB endpoint number                                false,           // OUT endpoint -                              send_xfer_size,  // buffer size per transfer -                              send_num_xfers   // number of libusb transfers +                              this->get_send_frame_size(),  // buffer size per transfer +                              this->get_num_send_frames()   // number of libusb transfers      )); + +    //spawn the event handler threads +    size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); +    for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( +        boost::bind(&libusb_zero_copy_impl::run_event_loop, this) +    );  }  /* @@ -459,15 +391,16 @@ libusb_zero_copy_impl::libusb_zero_copy_impl(   * Return empty pointer if no transfer is available (timeout or error).   * \return pointer to a managed receive buffer   */ -managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms){ -    libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout_ms); +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ +    libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout);      if (lut == NULL) {          return managed_recv_buffer::sptr();      }      else { -        return managed_recv_buffer::sptr( -            new libusb_managed_recv_buffer_impl(lut, -                                                _recv_ep)); +        return managed_recv_buffer::make_safe( +            boost::asio::const_buffer(lut->buffer, lut->actual_length), +            boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) +        );      }  } @@ -478,15 +411,16 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(size_t timeout_ms   * (timeout or error).   * \return pointer to a managed send buffer   */ -managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){ -    libusb_transfer *lut = _send_ep->get_lut_with_wait(/* TODO timeout API */); +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ +    libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout);      if (lut == NULL) {          return managed_send_buffer::sptr();      }      else { -        return managed_send_buffer::sptr( -            new libusb_managed_send_buffer_impl(lut, -                                                _send_ep)); +        return managed_send_buffer::make_safe( +            boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), +            boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) +        );      }  } @@ -494,18 +428,15 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(void){   * USB zero_copy make functions   **********************************************************************/  usb_zero_copy::sptr usb_zero_copy::make( -	usb_device_handle::sptr handle, -    unsigned int recv_endpoint, unsigned int send_endpoint, -	size_t recv_xfer_size, size_t recv_num_xfers, -	size_t send_xfer_size, size_t send_num_xfers +    usb_device_handle::sptr handle, +    size_t recv_endpoint, +    size_t send_endpoint, +    const device_addr_t &hints  ){      libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(          boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()      ));      return sptr(new libusb_zero_copy_impl( -		dev_handle, -		recv_endpoint,  send_endpoint, -		recv_xfer_size, recv_num_xfers, -		send_xfer_size, send_num_xfers +        dev_handle, recv_endpoint, send_endpoint, hints      ));  } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 0a6c9f2af..a87f17a91 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -17,25 +17,38 @@  #include <uhd/transport/udp_zero_copy.hpp>  #include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp>  #include <uhd/utils/warning.hpp> -#include <boost/cstdint.hpp> +#include <boost/shared_array.hpp>  #include <boost/asio.hpp>  #include <boost/format.hpp> +#include <boost/thread.hpp> +#include <boost/enable_shared_from_this.hpp>  #include <iostream> +using namespace uhd;  using namespace uhd::transport; +namespace asio = boost::asio;  /***********************************************************************   * Constants   **********************************************************************/  //enough buffering for half a second of samples at full rate on usrp2 -static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(sizeof(boost::uint32_t) * 25e6 * 0.5); +static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); +  //Large buffers cause more underflow at high rates.  //Perhaps this is due to the kernel scheduling,  //but may change with host-based flow control.  static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); +//the number of async frames to allocate for each send and recv +static const size_t DEFAULT_NUM_FRAMES = 32; + +//a single concurrent thread for io_service seems to be the fastest +static const size_t CONCURRENCY_HINT = 1; +  /***********************************************************************   * Zero Copy UDP implementation with ASIO:   *   This is the portable zero copy implementation for systems @@ -43,36 +56,59 @@ static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3);   *   However, it is not a true zero copy implementation as each   *   send and recv requires a copy operation to/from userspace.   **********************************************************************/ -class udp_zero_copy_impl: -    public phony_zero_copy_recv_if, -    public phony_zero_copy_send_if, -    public udp_zero_copy -{ +class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> {  public: -    typedef boost::shared_ptr<udp_zero_copy_impl> sptr; +    typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; -    udp_zero_copy_impl( +    udp_zero_copy_asio_impl(          const std::string &addr, -        const std::string &port +        const std::string &port, +        const device_addr_t &hints      ): -        phony_zero_copy_recv_if(udp_simple::mtu), -        phony_zero_copy_send_if(udp_simple::mtu) +        _io_service(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), +        _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))), +        _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), +        _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), +        _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES)))      {          //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; -        // resolve the address -        boost::asio::ip::udp::resolver resolver(_io_service); -        boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); -        boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); +        //resolve the address +        asio::ip::udp::resolver resolver(_io_service); +        asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); +        asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); -        // create, open, and connect the socket -        _socket = new boost::asio::ip::udp::socket(_io_service); -        _socket->open(boost::asio::ip::udp::v4()); +        //create, open, and connect the socket +        _socket = new asio::ip::udp::socket(_io_service); +        _socket->open(asio::ip::udp::v4());          _socket->connect(receiver_endpoint); -        _sock_fd = _socket->native();      } -    ~udp_zero_copy_impl(void){ +    void init(void){ +        //allocate all recv frames and release them to begin xfers +        _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); +        _recv_buffer = boost::shared_array<char>(new char[_num_recv_frames*_recv_frame_size]); +        for (size_t i = 0; i < _num_recv_frames; i++){ +            release(_recv_buffer.get() + i*_recv_frame_size); +        } + +        //allocate all send frames and push them into the fifo +        _pending_send_buffs = pending_buffs_type::make(_num_send_frames); +        _send_buffer = boost::shared_array<char>(new char[_num_send_frames*_send_frame_size]); +        for (size_t i = 0; i < _num_send_frames; i++){ +            handle_send(_send_buffer.get() + i*_send_frame_size); +        } + +        //spawn the service threads that will run the io service +        _work = new asio::io_service::work(_io_service); //new work to delete later +        for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread( +            boost::bind(&udp_zero_copy_asio_impl::service, this) +        ); +    } + +    ~udp_zero_copy_asio_impl(void){ +        delete _work; //allow io_service run to complete +        _thread_group.join_all(); //wait for service threads to exit          delete _socket;      } @@ -90,60 +126,108 @@ public:          return get_buff_size<Opt>();      } +    //! pop a filled recv buffer off of the fifo and bind with the release callback +    managed_recv_buffer::sptr get_recv_buff(double timeout){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        asio::mutable_buffer buff; +        if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ +            return managed_recv_buffer::make_safe( +                buff, boost::bind( +                    &udp_zero_copy_asio_impl::release, +                    shared_from_this(), +                    asio::buffer_cast<void*>(buff) +                ) +            ); +        } +        return managed_recv_buffer::sptr(); +    } -    //The number of frames is approximately the buffer size divided by the max datagram size. -    //In reality, this is a phony zero-copy interface and the number of frames is infinite. -    //However, its sensible to advertise a frame count that is approximate to buffer size. -    //This way, the transport caller will have an idea about how much buffering to create. +    size_t get_num_recv_frames(void) const {return _num_recv_frames;} +    size_t get_recv_frame_size(void) const {return _recv_frame_size;} -    size_t get_num_recv_frames(void) const{ -        return this->get_buff_size<boost::asio::socket_base::receive_buffer_size>()/udp_simple::mtu; +    //! pop an empty send buffer off of the fifo and bind with the commit callback +    managed_send_buffer::sptr get_send_buff(double timeout){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        asio::mutable_buffer buff; +        if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ +            return managed_send_buffer::make_safe( +                buff, boost::bind( +                    &udp_zero_copy_asio_impl::commit, +                    shared_from_this(), +                    asio::buffer_cast<void*>(buff), _1 +                ) +            ); +        } +        return managed_send_buffer::sptr();      } -    size_t get_num_send_frames(void) const{ -        return this->get_buff_size<boost::asio::socket_base::send_buffer_size>()/udp_simple::mtu; -    } +    size_t get_num_send_frames(void) const {return _num_send_frames;} +    size_t get_send_frame_size(void) const {return _send_frame_size;}  private: -    boost::asio::ip::udp::socket   *_socket; -    boost::asio::io_service        _io_service; -    int                            _sock_fd; - -    ssize_t recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){ -        //setup timeval for timeout -        timeval tv; -        tv.tv_sec = 0; -        tv.tv_usec = timeout_ms*1000; - -        //setup rset for timeout -        fd_set rset; -        FD_ZERO(&rset); -        FD_SET(_sock_fd, &rset); - -        //call select to perform timed wait -        if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) return 0; - -        return ::recv( -            _sock_fd, -            boost::asio::buffer_cast<char *>(buff), -            boost::asio::buffer_size(buff), 0 +    void service(void){ +        set_thread_priority_safe(); +        _io_service.run(); +    } + +    /******************************************************************* +     * The async send and receive callbacks +     ******************************************************************/ + +    //! handle a recv callback -> push the filled memory into the fifo +    void handle_recv(void *mem, size_t len){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); +    } + +    //! release a recv buffer -> start an async recv on the buffer +    void release(void *mem){ +        _socket->async_receive( +            boost::asio::buffer(mem, _recv_frame_size), +            boost::bind( +                &udp_zero_copy_asio_impl::handle_recv, +                shared_from_this(), mem, +                asio::placeholders::bytes_transferred +            )          );      } -    ssize_t send(const boost::asio::const_buffer &buff){ -        return ::send( -            _sock_fd, -            boost::asio::buffer_cast<const char *>(buff), -            boost::asio::buffer_size(buff), 0 +    //! handle a send callback -> push the emptied memory into the fifo +    void handle_send(void *mem){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size)); +    } + +    //! commit a send buffer -> start an async send on the buffer +    void commit(void *mem, size_t len){ +        _socket->async_send( +            boost::asio::buffer(mem, len), +            boost::bind( +                &udp_zero_copy_asio_impl::handle_send, +                shared_from_this(), mem +            )          );      } + +    //asio guts -> socket and service +    asio::ip::udp::socket   *_socket; +    asio::io_service        _io_service; +    asio::io_service::work  *_work; + +    //memory management -> buffers and fifos +    boost::thread_group _thread_group; +    boost::shared_array<char> _send_buffer, _recv_buffer; +    typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; +    pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; +    const size_t _recv_frame_size, _num_recv_frames; +    const size_t _send_frame_size, _num_send_frames;  };  /***********************************************************************   * UDP zero copy make function   **********************************************************************/  template<typename Opt> static void resize_buff_helper( -    udp_zero_copy_impl::sptr udp_trans, +    udp_zero_copy_asio_impl::sptr udp_trans,      size_t target_size,      const std::string &name  ){ @@ -164,7 +248,10 @@ template<typename Opt> static void resize_buff_helper(          if (actual_size < target_size) uhd::print_warning(str(boost::format(              "The %s buffer is smaller than the requested size.\n"              "The minimum recommended buffer size is %d bytes.\n" -            "See the USRP2 application notes on buffer resizing.\n" +            "See the transport application notes on buffer resizing.\n" +            #if defined(UHD_PLATFORM_LINUX) +            "On Linux: sudo sysctl -w net.core.rmem_max=%2%\n" +            #endif /*defined(UHD_PLATFORM_LINUX)*/          ) % name % min_sock_buff_size));      } @@ -180,14 +267,21 @@ template<typename Opt> static void resize_buff_helper(  udp_zero_copy::sptr udp_zero_copy::make(      const std::string &addr,      const std::string &port, -    size_t recv_buff_size, -    size_t send_buff_size +    const device_addr_t &hints  ){ -    udp_zero_copy_impl::sptr udp_trans(new udp_zero_copy_impl(addr, port)); +    udp_zero_copy_asio_impl::sptr udp_trans( +        new udp_zero_copy_asio_impl(addr, port, hints) +    ); + +    //extract buffer size hints from the device addr +    size_t recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0)); +    size_t send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0));      //call the helper to resize send and recv buffers -    resize_buff_helper<boost::asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); -    resize_buff_helper<boost::asio::socket_base::send_buffer_size>   (udp_trans, send_buff_size, "send"); +    resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); +    resize_buff_helper<asio::socket_base::send_buffer_size>   (udp_trans, send_buff_size, "send"); + +    udp_trans->init(); //buffers resized -> call init() to use      return udp_trans;  } diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp new file mode 100644 index 000000000..8a9772e7f --- /dev/null +++ b/host/lib/transport/usb_dummy_impl.cpp @@ -0,0 +1,39 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/usb_device_handle.hpp> +#include <uhd/transport/usb_control.hpp> +#include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/utils/exception.hpp> + +using namespace uhd; +using namespace uhd::transport; + +std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::uint16_t, boost::uint16_t){ +    return std::vector<usb_device_handle::sptr>(); //empty list +} + +usb_control::sptr usb_control::make(usb_device_handle::sptr){ +    throw std::runtime_error("no usb support -> usb_control::make not implemented"); +} + +usb_zero_copy::sptr usb_zero_copy::make( +    usb_device_handle::sptr, +    size_t, size_t, const device_addr_t & +){ +    throw std::runtime_error("no usb support -> usb_zero_copy::make not implemented"); +} diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index b603f1371..939517411 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -303,18 +303,18 @@ template <typename T> UHD_INLINE T get_context_code(       * Pack a vrt header, copy-convert the data, and send it.       *  - helper function for vrt_packet_handler::send       ******************************************************************/ -    static UHD_INLINE void _send1( +    static UHD_INLINE size_t _send1(          send_state &state,          const std::vector<const void *> &buffs, -        size_t offset_bytes, -        size_t num_samps, +        const size_t offset_bytes, +        const size_t num_samps,          uhd::transport::vrt::if_packet_info_t &if_packet_info,          const uhd::io_type_t &io_type,          const uhd::otw_type_t &otw_type,          const vrt_packer_t &vrt_packer,          const get_send_buffs_t &get_send_buffs, -        size_t vrt_header_offset_words32, -        size_t chans_per_otw_buff +        const size_t vrt_header_offset_words32, +        const size_t chans_per_otw_buff      ){          //load the rest of the if_packet_info in here          if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t); @@ -322,7 +322,7 @@ template <typename T> UHD_INLINE T get_context_code(          //get send buffers for each channel          managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); -        UHD_ASSERT_THROW(get_send_buffs(send_buffs)); +        if (not get_send_buffs(send_buffs)) return 0;          std::vector<const void *> io_buffs(chans_per_otw_buff);          for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ @@ -343,10 +343,9 @@ template <typename T> UHD_INLINE T get_context_code(              //commit the samples to the zero-copy interface              size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); -            if (send_buffs[i]->commit(num_bytes_total) < ssize_t(num_bytes_total)){ -                std::cerr << "commit to send buffer returned less than commit size" << std::endl; -            } +            send_buffs[i]->commit(num_bytes_total);          } +        return num_samps;      }      /******************************************************************* @@ -381,7 +380,6 @@ template <typename T> UHD_INLINE T get_context_code(          ////////////////////////////////////////////////////////////////          case uhd::device::SEND_MODE_ONE_PACKET:{          //////////////////////////////////////////////////////////////// -            size_t num_samps = std::min(total_num_samps, max_samples_per_packet);              //fill in parts of the packet info overwrote in full buff mode              if_packet_info.has_tsi = metadata.has_time_spec; @@ -389,10 +387,10 @@ template <typename T> UHD_INLINE T get_context_code(              if_packet_info.sob = metadata.start_of_burst;              if_packet_info.eob = metadata.end_of_burst; -            _send1( +            return _send1(                  state,                  buffs, 0, -                num_samps, +                std::min(total_num_samps, max_samples_per_packet),                  if_packet_info,                  io_type, otw_type,                  vrt_packer, @@ -400,31 +398,32 @@ template <typename T> UHD_INLINE T get_context_code(                  vrt_header_offset_words32,                  chans_per_otw_buff              ); -            return num_samps;          }          ////////////////////////////////////////////////////////////////          case uhd::device::SEND_MODE_FULL_BUFF:{          //////////////////////////////////////////////////////////////// -            //calculate constants for fragmentation -            const size_t num_fragments = (total_num_samps+max_samples_per_packet-1)/max_samples_per_packet; -            static const size_t first_fragment_index = 0; -            const size_t final_fragment_index = num_fragments-1; +            size_t total_num_samps_sent = 0;              //loop through the following fragment indexes -            for (size_t n = first_fragment_index; n <= final_fragment_index; n++){ +            while(total_num_samps_sent < total_num_samps){ + +                //calculate per-loop-iteration variables +                const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; +                const bool first_fragment = (total_num_samps_sent == 0); +                const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet);                  //calculate new flags for the fragments -                if_packet_info.has_tsi = metadata.has_time_spec  and (n == first_fragment_index); -                if_packet_info.has_tsf = metadata.has_time_spec  and (n == first_fragment_index); -                if_packet_info.sob     = metadata.start_of_burst and (n == first_fragment_index); -                if_packet_info.eob     = metadata.end_of_burst   and (n == final_fragment_index); +                if_packet_info.has_tsi = metadata.has_time_spec  and first_fragment; +                if_packet_info.has_tsf = if_packet_info.has_tsi; +                if_packet_info.sob     = metadata.start_of_burst and first_fragment; +                if_packet_info.eob     = metadata.end_of_burst   and final_fragment;                  //send the fragment with the helper function -                _send1( +                const size_t num_samps_sent = _send1(                      state, -                    buffs, n*max_samples_per_packet*io_type.size, -                    (n == final_fragment_index)?(total_num_samps%max_samples_per_packet):max_samples_per_packet, +                    buffs, total_num_samps_sent*io_type.size, +                    std::min(total_num_samps_unsent, max_samples_per_packet),                      if_packet_info,                      io_type, otw_type,                      vrt_packer, @@ -432,8 +431,10 @@ template <typename T> UHD_INLINE T get_context_code(                      vrt_header_offset_words32,                      chans_per_otw_buff                  ); +                total_num_samps_sent += num_samps_sent; +                if (num_samps_sent == 0) return total_num_samps_sent;              } -            return total_num_samps; +            return total_num_samps_sent;          }          default: throw std::runtime_error("unknown send mode"); diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index 1fcf846a0..a5a864a04 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -16,32 +16,35 @@  //  #include <uhd/transport/zero_copy.hpp> -#include <boost/cstdint.hpp> -#include <boost/function.hpp> -#include <boost/bind.hpp>  using namespace uhd::transport;  /*********************************************************************** - * The pure-virtual deconstructor needs an implementation to be happy + * Safe managed receive buffer   **********************************************************************/ -managed_recv_buffer::~managed_recv_buffer(void){ +static void release_nop(void){      /* NOP */  } -/*********************************************************************** - * Phony zero-copy recv interface implementation - **********************************************************************/ - -//! phony zero-copy recv buffer implementation -class managed_recv_buffer_impl : public managed_recv_buffer{ +class safe_managed_receive_buffer : public managed_recv_buffer{  public: -    managed_recv_buffer_impl(const boost::asio::const_buffer &buff) : _buff(buff){ +    safe_managed_receive_buffer( +        const boost::asio::const_buffer &buff, +        const release_fcn_t &release_fcn +    ): +        _buff(buff), _release_fcn(release_fcn) +    {          /* NOP */      } -    ~managed_recv_buffer_impl(void){ -        delete [] this->cast<const boost::uint8_t *>(); +    ~safe_managed_receive_buffer(void){ +        _release_fcn(); +    } + +    void release(void){ +        release_fcn_t release_fcn = _release_fcn; +        _release_fcn = &release_nop; +        return release_fcn();      }  private: @@ -50,64 +53,42 @@ private:      }      const boost::asio::const_buffer _buff; +    release_fcn_t _release_fcn;  }; -//! phony zero-copy recv interface implementation -struct phony_zero_copy_recv_if::impl{ -    impl(size_t max_buff_size) : max_buff_size(max_buff_size){ -        /* NOP */ -    } -    size_t max_buff_size; -}; - -phony_zero_copy_recv_if::phony_zero_copy_recv_if(size_t max_buff_size){ -    _impl = UHD_PIMPL_MAKE(impl, (max_buff_size)); -} - -phony_zero_copy_recv_if::~phony_zero_copy_recv_if(void){ -    /* NOP */ -} - -managed_recv_buffer::sptr phony_zero_copy_recv_if::get_recv_buff(size_t timeout_ms){ -    //allocate memory -    boost::uint8_t *recv_mem = new boost::uint8_t[_impl->max_buff_size]; - -    //call recv() with timeout option -    ssize_t num_bytes = this->recv(boost::asio::buffer(recv_mem, _impl->max_buff_size), timeout_ms); - -    if (num_bytes <= 0) return managed_recv_buffer::sptr(); //NULL sptr - -    //create a new managed buffer to house the data -    return managed_recv_buffer::sptr( -        new managed_recv_buffer_impl(boost::asio::buffer(recv_mem, num_bytes)) -    ); +managed_recv_buffer::sptr managed_recv_buffer::make_safe( +    const boost::asio::const_buffer &buff, +    const release_fcn_t &release_fcn +){ +    return sptr(new safe_managed_receive_buffer(buff, release_fcn));  }  /*********************************************************************** - * Phony zero-copy send interface implementation + * Safe managed send buffer   **********************************************************************/ +static void commit_nop(size_t){ +    /* NOP */ +} -//! phony zero-copy send buffer implementation -class managed_send_buffer_impl : public managed_send_buffer{ +class safe_managed_send_buffer : public managed_send_buffer{  public: -    typedef boost::function<ssize_t(const boost::asio::const_buffer &)> send_fcn_t; - -    managed_send_buffer_impl( +    safe_managed_send_buffer(          const boost::asio::mutable_buffer &buff, -        const send_fcn_t &send_fcn +        const commit_fcn_t &commit_fcn      ): -        _buff(buff), -        _send_fcn(send_fcn) +        _buff(buff), _commit_fcn(commit_fcn)      {          /* NOP */      } -    ~managed_send_buffer_impl(void){ -        /* NOP */ +    ~safe_managed_send_buffer(void){ +        _commit_fcn(0);      } -    ssize_t commit(size_t num_bytes){ -        return _send_fcn(boost::asio::buffer(_buff, num_bytes)); +    void commit(size_t num_bytes){ +        commit_fcn_t commit_fcn = _commit_fcn; +        _commit_fcn = &commit_nop; +        return commit_fcn(num_bytes);      }  private: @@ -116,28 +97,12 @@ private:      }      const boost::asio::mutable_buffer _buff; -    const send_fcn_t                  _send_fcn; -}; - -//! phony zero-copy send interface implementation -struct phony_zero_copy_send_if::impl{ -    boost::uint8_t *send_mem; -    managed_send_buffer::sptr send_buff; +    commit_fcn_t _commit_fcn;  }; -phony_zero_copy_send_if::phony_zero_copy_send_if(size_t max_buff_size){ -    _impl = UHD_PIMPL_MAKE(impl, ()); -    _impl->send_mem = new boost::uint8_t[max_buff_size]; -    _impl->send_buff = managed_send_buffer::sptr(new managed_send_buffer_impl( -        boost::asio::buffer(_impl->send_mem, max_buff_size), -        boost::bind(&phony_zero_copy_send_if::send, this, _1) -    )); -} - -phony_zero_copy_send_if::~phony_zero_copy_send_if(void){ -    delete [] _impl->send_mem; -} - -managed_send_buffer::sptr phony_zero_copy_send_if::get_send_buff(void){ -    return _impl->send_buff; //FIXME there is only ever one send buff, we assume that the caller doesnt hang onto these +safe_managed_send_buffer::sptr managed_send_buffer::make_safe( +    const boost::asio::mutable_buffer &buff, +    const commit_fcn_t &commit_fcn +){ +    return sptr(new safe_managed_send_buffer(buff, commit_fcn));  } diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index aee760a83..676b1536a 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -34,35 +34,6 @@ using namespace uhd::transport;  namespace asio = boost::asio;  /*********************************************************************** - * Pseudo send buffer implementation - **********************************************************************/ -class pseudo_managed_send_buffer : public managed_send_buffer{ -public: - -    pseudo_managed_send_buffer( -        const boost::asio::mutable_buffer &buff, -        const boost::function<ssize_t(size_t)> &commit -    ): -        _buff(buff), -        _commit(commit) -    { -        /* NOP */ -    } - -    ssize_t commit(size_t num_bytes){ -        return _commit(num_bytes); -    } - -private: -    const boost::asio::mutable_buffer &get(void) const{ -        return _buff; -    } - -    const boost::asio::mutable_buffer      _buff; -    const boost::function<ssize_t(size_t)> _commit; -}; - -/***********************************************************************   * IO Implementation Details   **********************************************************************/  struct usrp1_impl::io_impl{ @@ -94,12 +65,13 @@ struct usrp1_impl::io_impl{      //all of this to ensure only full buffers are committed      managed_send_buffer::sptr send_buff;      size_t num_bytes_committed; +    double send_timeout;      boost::uint8_t pseudo_buff[BYTES_PER_PACKET]; -    ssize_t phony_commit_pseudo_buff(size_t num_bytes); -    ssize_t phony_commit_send_buff(size_t num_bytes); -    ssize_t commit_send_buff(void); +    void phony_commit_pseudo_buff(size_t num_bytes); +    void phony_commit_send_buff(size_t num_bytes); +    void commit_send_buff(void);      void flush_send_buff(void); -    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &); +    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &, double);      //helpers to get at the send buffer + offset      inline void *get_send_mem_ptr(void){ @@ -118,28 +90,25 @@ struct usrp1_impl::io_impl{   *   The first loop iteration will fill the remainder of the send buffer.   *   The second loop iteration will empty the pseudo buffer remainder.   */ -ssize_t usrp1_impl::io_impl::phony_commit_pseudo_buff(size_t num_bytes){ +void usrp1_impl::io_impl::phony_commit_pseudo_buff(size_t num_bytes){      size_t bytes_to_copy = num_bytes, bytes_copied = 0;      while(bytes_to_copy){          size_t bytes_copied_here = std::min(bytes_to_copy, get_send_mem_size());          std::memcpy(get_send_mem_ptr(), pseudo_buff + bytes_copied, bytes_copied_here); -        ssize_t ret = phony_commit_send_buff(bytes_copied_here); -        if (ret < 0) return ret; -        bytes_to_copy -= ret; -        bytes_copied += ret; +        phony_commit_send_buff(bytes_copied_here); +        bytes_to_copy -= bytes_copied_here; +        bytes_copied += bytes_copied_here;      } -    return bytes_copied;  }  /*!   * Accept a commit of num bytes to the send buffer.   * Conditionally commit the send buffer if full.   */ -ssize_t usrp1_impl::io_impl::phony_commit_send_buff(size_t num_bytes){ +void usrp1_impl::io_impl::phony_commit_send_buff(size_t num_bytes){      num_bytes_committed += num_bytes; -    if (num_bytes_committed != send_buff->size()) return num_bytes; -    ssize_t ret = commit_send_buff(); -    return (ret < 0)? ret : num_bytes; +    if (num_bytes_committed != send_buff->size()) return; +    commit_send_buff();  }  /*! @@ -157,31 +126,31 @@ void usrp1_impl::io_impl::flush_send_buff(void){   * Perform an actual commit on the send buffer:   * Commit the contents of the send buffer and request a new buffer.   */ -ssize_t usrp1_impl::io_impl::commit_send_buff(void){ -    ssize_t ret = send_buff->commit(num_bytes_committed); -    send_buff = data_transport->get_send_buff(); +void usrp1_impl::io_impl::commit_send_buff(void){ +    send_buff->commit(num_bytes_committed); +    send_buff = data_transport->get_send_buff(send_timeout);      num_bytes_committed = 0; -    return ret;  }  bool usrp1_impl::io_impl::get_send_buffs( -    vrt_packet_handler::managed_send_buffs_t &buffs +    vrt_packet_handler::managed_send_buffs_t &buffs, double timeout  ){ +    send_timeout = timeout;      UHD_ASSERT_THROW(buffs.size() == 1);      //not enough bytes free -> use the pseudo buffer      if (get_send_mem_size() < BYTES_PER_PACKET){ -        buffs[0] = managed_send_buffer::sptr(new pseudo_managed_send_buffer( +        buffs[0] = managed_send_buffer::make_safe(              boost::asio::buffer(pseudo_buff),              boost::bind(&usrp1_impl::io_impl::phony_commit_pseudo_buff, this, _1) -        )); +        );      }      //otherwise use the send buffer offset by the bytes written      else{ -        buffs[0] = managed_send_buffer::sptr(new pseudo_managed_send_buffer( +        buffs[0] = managed_send_buffer::make_safe(              boost::asio::buffer(get_send_mem_ptr(), get_send_mem_size()),              boost::bind(&usrp1_impl::io_impl::phony_commit_send_buff, this, _1) -        )); +        );      }      return buffs[0].get() != NULL; @@ -216,7 +185,7 @@ static void usrp1_bs_vrt_packer(  size_t usrp1_impl::send(      const std::vector<const void *> &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type, -    send_mode_t send_mode +    send_mode_t send_mode, double timeout  ){      size_t num_samps_sent = vrt_packet_handler::send(          _io_impl->packet_handler_send_state,       //last state of the send handler @@ -225,7 +194,7 @@ size_t usrp1_impl::send(          io_type, _tx_otw_type,                     //input and output types to convert          _clock_ctrl->get_master_clock_freq(),      //master clock tick rate          &usrp1_bs_vrt_packer, -        boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1), +        boost::bind(&usrp1_impl::io_impl::get_send_buffs, _io_impl.get(), _1, timeout),          get_max_send_samps_per_packet(),          0,                                         //vrt header offset          _tx_subdev_spec.size()                     //num channels @@ -272,18 +241,18 @@ static void usrp1_bs_vrt_unpacker(  }  static bool get_recv_buffs( -    zero_copy_if::sptr zc_if, size_t timeout_ms, +    zero_copy_if::sptr zc_if, double timeout,      vrt_packet_handler::managed_recv_buffs_t &buffs  ){      UHD_ASSERT_THROW(buffs.size() == 1); -    buffs[0] = zc_if->get_recv_buff(timeout_ms); +    buffs[0] = zc_if->get_recv_buff(timeout);      return buffs[0].get() != NULL;  }  size_t usrp1_impl::recv(      const std::vector<void *> &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type, -    recv_mode_t recv_mode, size_t timeout_ms +    recv_mode_t recv_mode, double timeout  ){      size_t num_samps_recvd = vrt_packet_handler::recv(          _io_impl->packet_handler_recv_state,       //last state of the recv handler @@ -292,7 +261,7 @@ size_t usrp1_impl::recv(          io_type, _rx_otw_type,                     //input and output types to convert          _clock_ctrl->get_master_clock_freq(),      //master clock tick rate          &usrp1_bs_vrt_unpacker, -        boost::bind(&get_recv_buffs, _data_transport, timeout_ms, _1), +        boost::bind(&get_recv_buffs, _data_transport, timeout, _1),          &vrt_packet_handler::handle_overflow_nop,          0,                                         //vrt header offset          _rx_subdev_spec.size()                     //num channels diff --git a/host/lib/usrp/usrp1/usrp1_impl.cpp b/host/lib/usrp/usrp1/usrp1_impl.cpp index 156fc119f..276ca86f6 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.cpp +++ b/host/lib/usrp/usrp1/usrp1_impl.cpp @@ -106,17 +106,8 @@ static device_addrs_t usrp1_find(const device_addr_t &hint)  /***********************************************************************   * Make   **********************************************************************/ -template<typename output_type> static output_type cast_from_dev_addr( -    const device_addr_t &device_addr, -    const std::string &key, -    output_type def_val -){ -    return (device_addr.has_key(key))? -        boost::lexical_cast<output_type>(device_addr[key]) : def_val; -} +static device::sptr usrp1_make(const device_addr_t &device_addr){ -static device::sptr usrp1_make(const device_addr_t &device_addr) -{      //extract the FPGA path for the USRP1      std::string usrp1_fpga_image = find_image_path(          device_addr.has_key("fpga")? device_addr["fpga"] : "usrp1_fpga.rbf" @@ -127,29 +118,26 @@ static device::sptr usrp1_make(const device_addr_t &device_addr)      std::vector<usb_device_handle::sptr> device_list =          usb_device_handle::get_device_list(USRP1_VENDOR_ID, USRP1_PRODUCT_ID); -    //create data and control transports -    usb_zero_copy::sptr data_transport; -    usrp_ctrl::sptr usrp_ctrl; - - -    BOOST_FOREACH(usb_device_handle::sptr handle, device_list) { -        if (handle->get_serial() == device_addr["serial"]) { -            usb_control::sptr ctrl_transport = usb_control::make(handle); -            usrp_ctrl = usrp_ctrl::make(ctrl_transport); -            usrp_ctrl->usrp_load_fpga(usrp1_fpga_image); - -            data_transport = usb_zero_copy::make( -                handle,        // identifier -                6,             // IN endpoint -                2,             // OUT endpoint -                size_t(cast_from_dev_addr<double>(device_addr, "recv_xfer_size", 0)), -                size_t(cast_from_dev_addr<double>(device_addr, "recv_num_xfers", 0)), -                size_t(cast_from_dev_addr<double>(device_addr, "send_xfer_size", 0)), -                size_t(cast_from_dev_addr<double>(device_addr, "send_num_xfers", 0)) -            ); +    //locate the matching handle in the device list +    usb_device_handle::sptr handle; +    BOOST_FOREACH(usb_device_handle::sptr dev_handle, device_list) { +        if (dev_handle->get_serial() == device_addr["serial"]){ +            handle = dev_handle;              break;          }      } +    UHD_ASSERT_THROW(handle.get() != NULL); //better be found + +    //create control objects and a data transport +    usb_control::sptr ctrl_transport = usb_control::make(handle); +    usrp_ctrl::sptr usrp_ctrl = usrp_ctrl::make(ctrl_transport); +    usrp_ctrl->usrp_load_fpga(usrp1_fpga_image); +    usb_zero_copy::sptr data_transport = usb_zero_copy::make( +        handle,        // identifier +        6,             // IN endpoint +        2,             // OUT endpoint +        device_addr    // param hints +    );      //create the usrp1 implementation guts      return device::sptr(new usrp1_impl(data_transport, usrp_ctrl)); @@ -209,9 +197,9 @@ usrp1_impl::~usrp1_impl(void){      /* NOP */  } -bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, size_t timeout_ms){ +bool usrp1_impl::recv_async_msg(uhd::async_metadata_t &, double timeout){      //dummy fill-in for the recv_async_msg -    boost::this_thread::sleep(boost::posix_time::milliseconds(timeout_ms)); +    boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6)));      return false;  } diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 20ae3c02a..f2c464610 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -83,13 +83,12 @@ public:                  size_t,                  const uhd::tx_metadata_t &,                  const uhd::io_type_t &, -                send_mode_t); +                send_mode_t, double);      size_t recv(const std::vector<void *> &,                  size_t, uhd::rx_metadata_t &,                  const uhd::io_type_t &, -                recv_mode_t, -                size_t timeout); +                recv_mode_t, double);      static const size_t BYTES_PER_PACKET = 512*4; //under the transfer size @@ -101,7 +100,7 @@ public:          return BYTES_PER_PACKET/_rx_otw_type.get_sample_size()/_rx_subdev_spec.size();      } -    bool recv_async_msg(uhd::async_metadata_t &, size_t); +    bool recv_async_msg(uhd::async_metadata_t &, double);  private:      /*! diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 3395f94e2..07eda08c3 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -33,7 +33,6 @@ using namespace uhd::transport;  namespace asio = boost::asio;  static const int underflow_flags = async_metadata_t::EVENT_CODE_UNDERFLOW | async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET; -static const double RECV_TIMEOUT_MS = 100;  /***********************************************************************   * io impl details (internal to this file) @@ -59,9 +58,9 @@ struct usrp2_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, size_t timeout_ms){ +    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw -        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, boost::posix_time::milliseconds(timeout_ms)); +        return recv_pirate_booty->pop_elems_with_timed_wait(buffs, timeout);      }      //state management for the vrt packet handler code @@ -91,7 +90,7 @@ void usrp2_impl::io_impl::recv_pirate_loop(      size_t next_packet_seq = 0;      while(recv_pirate_crew_raiding){ -        managed_recv_buffer::sptr buff = zc_if->get_recv_buff(RECV_TIMEOUT_MS); +        managed_recv_buffer::sptr buff = zc_if->get_recv_buff();          if (not buff.get()) continue; //ignore timeout/error buffers          try{ @@ -151,7 +150,7 @@ void usrp2_impl::io_init(void){          std::memcpy(send_buff->cast<void*>(), &data, sizeof(data));          send_buff->commit(sizeof(data));          //drain the recv buffers (may have junk) -        while (data_transport->get_recv_buff(RECV_TIMEOUT_MS).get()){}; +        while (data_transport->get_recv_buff().get()){};      }      //the number of recv frames is the number for the first transport @@ -169,22 +168,16 @@ void usrp2_impl::io_init(void){              _mboards.at(i), i          ));      } - -    std::cout << "RX samples per packet: " << get_max_recv_samps_per_packet() << std::endl; -    std::cout << "TX samples per packet: " << get_max_send_samps_per_packet() << std::endl; -    std::cout << "Recv pirate num frames: " << num_frames << std::endl;  }  /***********************************************************************   * Async Data   **********************************************************************/  bool usrp2_impl::recv_async_msg( -    async_metadata_t &async_metadata, size_t timeout_ms +    async_metadata_t &async_metadata, double timeout  ){      boost::this_thread::disable_interruption di; //disable because the wait can throw -    return _io_impl->async_msg_fifo->pop_with_timed_wait( -        async_metadata, boost::posix_time::milliseconds(timeout_ms) -    ); +    return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout);  }  /*********************************************************************** @@ -192,28 +185,31 @@ bool usrp2_impl::recv_async_msg(   **********************************************************************/  static bool get_send_buffs(      const std::vector<udp_zero_copy::sptr> &trans, -    vrt_packet_handler::managed_send_buffs_t &buffs +    vrt_packet_handler::managed_send_buffs_t &buffs, +    double timeout  ){      UHD_ASSERT_THROW(trans.size() == buffs.size()); +    bool good = true;      for (size_t i = 0; i < buffs.size(); i++){ -        buffs[i] = trans[i]->get_send_buff(); +        buffs[i] = trans[i]->get_send_buff(timeout); +        good = good and (buffs[i].get() != NULL);      } -    return true; +    return good;  }  size_t usrp2_impl::send(      const std::vector<const void *> &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type, -    send_mode_t send_mode +    send_mode_t send_mode, double timeout  ){      return vrt_packet_handler::send(          _io_impl->packet_handler_send_state,       //last state of the send handler          buffs, num_samps,                          //buffer to fill          metadata, send_mode,                       //samples metadata -        io_type, _io_helper.get_tx_otw_type(),     //input and output types to convert +        io_type, _tx_otw_type,                     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_pack_be, -        boost::bind(&get_send_buffs, _data_transports, _1), +        boost::bind(&get_send_buffs, _data_transports, _1, timeout),          get_max_send_samps_per_packet()      );  } @@ -224,15 +220,15 @@ size_t usrp2_impl::send(  size_t usrp2_impl::recv(      const std::vector<void *> &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type, -    recv_mode_t recv_mode, size_t timeout_ms +    recv_mode_t recv_mode, double timeout  ){      return vrt_packet_handler::recv(          _io_impl->packet_handler_recv_state,       //last state of the recv handler          buffs, num_samps,                          //buffer to fill          metadata, recv_mode,                       //samples metadata -        io_type, _io_helper.get_rx_otw_type(),     //input and output types to convert +        io_type, _rx_otw_type,                     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_unpack_be, -        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout_ms) +        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _1, timeout)      );  } diff --git a/host/lib/usrp/usrp2/mboard_impl.cpp b/host/lib/usrp/usrp2/mboard_impl.cpp index 0b9f8ee83..a0e6adfad 100644 --- a/host/lib/usrp/usrp2/mboard_impl.cpp +++ b/host/lib/usrp/usrp2/mboard_impl.cpp @@ -38,10 +38,10 @@ using namespace uhd::usrp;  usrp2_mboard_impl::usrp2_mboard_impl(      size_t index,      transport::udp_simple::sptr ctrl_transport, -    const usrp2_io_helper &io_helper +    size_t recv_frame_size  ):      _index(index), -    _io_helper(io_helper) +    _recv_frame_size(recv_frame_size)  {      //make a new interface for usrp2 stuff      _iface = usrp2_iface::make(ctrl_transport); @@ -75,7 +75,7 @@ usrp2_mboard_impl::usrp2_mboard_impl(      this->issue_ddc_stream_cmd(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS);      //init the rx control registers -    _iface->poke32(U2_REG_RX_CTRL_NSAMPS_PER_PKT, _io_helper.get_max_recv_samps_per_packet()); +    _iface->poke32(U2_REG_RX_CTRL_NSAMPS_PER_PKT, _recv_frame_size);      _iface->poke32(U2_REG_RX_CTRL_NCHANNELS, 1);      _iface->poke32(U2_REG_RX_CTRL_CLEAR_OVERRUN, 1); //reset      _iface->poke32(U2_REG_RX_CTRL_VRT_HEADER, 0 @@ -178,7 +178,7 @@ void usrp2_mboard_impl::set_time_spec(const time_spec_t &time_spec, bool now){  void usrp2_mboard_impl::issue_ddc_stream_cmd(const stream_cmd_t &stream_cmd){      _iface->poke32(U2_REG_RX_CTRL_STREAM_CMD, dsp_type1::calc_stream_cmd_word( -        stream_cmd, _io_helper.get_max_recv_samps_per_packet() +        stream_cmd, _recv_frame_size      ));      _iface->poke32(U2_REG_RX_CTRL_TIME_SECS,  boost::uint32_t(stream_cmd.time_spec.get_full_secs()));      _iface->poke32(U2_REG_RX_CTRL_TIME_TICKS, stream_cmd.time_spec.get_tick_count(get_master_clock_freq())); diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index 568c87a22..a680708ad 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -124,26 +124,7 @@ static uhd::device_addrs_t usrp2_find(const device_addr_t &hint){  /***********************************************************************   * Make   **********************************************************************/ -template <typename out_type, typename in_type> -out_type lexical_cast(const in_type &in){ -    try{ -        return boost::lexical_cast<out_type>(in); -    }catch(...){ -        throw std::runtime_error(str(boost::format( -            "failed to cast \"%s\" to type \"%s\"" -        ) % boost::lexical_cast<std::string>(in) % typeid(out_type).name())); -    } -} -  static device::sptr usrp2_make(const device_addr_t &device_addr){ -    //extract the receive and send buffer sizes -    size_t recv_buff_size = 0, send_buff_size= 0 ; -    if (device_addr.has_key("recv_buff_size")){ -        recv_buff_size = size_t(lexical_cast<double>(device_addr["recv_buff_size"])); -    } -    if (device_addr.has_key("send_buff_size")){ -        send_buff_size = size_t(lexical_cast<double>(device_addr["send_buff_size"])); -    }      //create a ctrl and data transport for each address      std::vector<udp_simple::sptr> ctrl_transports; @@ -154,8 +135,7 @@ static device::sptr usrp2_make(const device_addr_t &device_addr){              addr, num2str(USRP2_UDP_CTRL_PORT)          ));          data_transports.push_back(udp_zero_copy::make( -            addr, num2str(USRP2_UDP_DATA_PORT), -            recv_buff_size, send_buff_size +            addr, num2str(USRP2_UDP_DATA_PORT), device_addr          ));      } @@ -178,11 +158,23 @@ usrp2_impl::usrp2_impl(  ):      _data_transports(data_transports)  { +    //setup rx otw type +    _rx_otw_type.width = 16; +    _rx_otw_type.shift = 0; +    _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    //setup tx otw type +    _tx_otw_type.width = 16; +    _tx_otw_type.shift = 0; +    _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + +    //!!!!! set the otw type here before continuing, its used below +      //create a new mboard handler for each control transport      for(size_t i = 0; i < ctrl_transports.size(); i++){ -        _mboards.push_back(usrp2_mboard_impl::sptr( -            new usrp2_mboard_impl(i, ctrl_transports[i], _io_helper) -        )); +        _mboards.push_back(usrp2_mboard_impl::sptr(new usrp2_mboard_impl( +            i, ctrl_transports[i], this->get_max_recv_samps_per_packet() +        )));          //use an empty name when there is only one mboard          std::string name = (ctrl_transports.size() > 1)? boost::lexical_cast<std::string>(i) : "";          _mboard_dict[name] = _mboards.back(); diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 157d17057..e12c4d6d4 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -72,54 +72,6 @@ private:  };  /*! - * The io helper class encapculates the max packet sizes and otw types. - * The otw types are read-only for now, this will be reimplemented - * when it becomes possible to change the otw type in the usrp2. - */ -class usrp2_io_helper{ -public: -    usrp2_io_helper(void){ -        //setup rx otw type -        _rx_otw_type.width = 16; -        _rx_otw_type.shift = 0; -        _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; - -        //setup tx otw type -        _tx_otw_type.width = 16; -        _tx_otw_type.shift = 0; -        _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; -    } - -    inline size_t get_max_send_samps_per_packet(void) const{ -        return _max_tx_bytes_per_packet/_tx_otw_type.get_sample_size(); -    } - -    inline size_t get_max_recv_samps_per_packet(void) const{ -        return _max_rx_bytes_per_packet/_rx_otw_type.get_sample_size(); -    } - -    inline const uhd::otw_type_t &get_rx_otw_type(void) const{ -        return _rx_otw_type; -    } - -    inline const uhd::otw_type_t &get_tx_otw_type(void) const{ -        return _tx_otw_type; -    } - -private: -    uhd::otw_type_t _rx_otw_type, _tx_otw_type; -    static const size_t _max_rx_bytes_per_packet = uhd::transport::udp_simple::mtu -        - uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) -        - sizeof(uhd::transport::vrt::if_packet_info_t().tlr) //forced to have trailer -        + sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used -    ; -    static const size_t _max_tx_bytes_per_packet = uhd::transport::udp_simple::mtu -        - uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) -        + sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used -    ; -}; - -/*!   * USRP2 mboard implementation guts:   * The implementation details are encapsulated here.   * Handles properties on the mboard, dboard, dsps... @@ -129,7 +81,11 @@ public:      typedef boost::shared_ptr<usrp2_mboard_impl> sptr;      //structors -    usrp2_mboard_impl(size_t index, uhd::transport::udp_simple::sptr, const usrp2_io_helper &); +    usrp2_mboard_impl( +        size_t index, +        uhd::transport::udp_simple::sptr, +        size_t recv_frame_size +    );      ~usrp2_mboard_impl(void);      inline double get_master_clock_freq(void){ @@ -139,7 +95,7 @@ public:  private:      size_t _index;      int _rev_hi, _rev_lo; -    const usrp2_io_helper &_io_helper; +    const size_t _recv_frame_size;      //properties for this mboard      void get(const wax::obj &, wax::obj &); @@ -229,22 +185,24 @@ public:      //the io interface      size_t get_max_send_samps_per_packet(void) const{ -        return _io_helper.get_max_send_samps_per_packet(); +        const size_t bytes_per_packet = _data_transports.front()->get_send_frame_size() - _max_tx_header_bytes; +        return bytes_per_packet/_tx_otw_type.get_sample_size();      }      size_t send(          const std::vector<const void *> &, size_t,          const uhd::tx_metadata_t &, const uhd::io_type_t &, -        uhd::device::send_mode_t +        uhd::device::send_mode_t, double      );      size_t get_max_recv_samps_per_packet(void) const{ -        return _io_helper.get_max_recv_samps_per_packet(); +        const size_t bytes_per_packet = _data_transports.front()->get_recv_frame_size() - _max_rx_header_bytes; +        return bytes_per_packet/_rx_otw_type.get_sample_size();      }      size_t recv(          const std::vector<void *> &, size_t,          uhd::rx_metadata_t &, const uhd::io_type_t &, -        uhd::device::recv_mode_t, size_t +        uhd::device::recv_mode_t, double      ); -    bool recv_async_msg(uhd::async_metadata_t &, size_t); +    bool recv_async_msg(uhd::async_metadata_t &, double);  private:      //device properties interface @@ -257,9 +215,20 @@ private:      //io impl methods and members      std::vector<uhd::transport::udp_zero_copy::sptr> _data_transports; -    const usrp2_io_helper _io_helper;      UHD_PIMPL_DECL(io_impl) _io_impl;      void io_init(void); + +    //over-the-wire structs and constants +    uhd::otw_type_t _rx_otw_type, _tx_otw_type; +    static const size_t _max_rx_header_bytes = 0 +        + uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) +        + sizeof(uhd::transport::vrt::if_packet_info_t().tlr) //forced to have trailer +        - sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used +    ; +    static const size_t _max_tx_header_bytes = 0 +        + uhd::transport::vrt::max_if_hdr_words32*sizeof(boost::uint32_t) +        - sizeof(uhd::transport::vrt::if_packet_info_t().cid) //no class id ever used +    ;  };  #endif /* INCLUDED_USRP2_IMPL_HPP */ diff --git a/host/test/buffer_test.cpp b/host/test/buffer_test.cpp index aadb3f951..8445412e7 100644 --- a/host/test/buffer_test.cpp +++ b/host/test/buffer_test.cpp @@ -23,7 +23,7 @@  using namespace boost::assign;  using namespace uhd::transport; -static const boost::posix_time::milliseconds timeout(10); +static const double timeout = 0.01/*secs*/;  BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){      bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3)); | 
