diff options
28 files changed, 550 insertions, 554 deletions
| diff --git a/host/docs/transport.rst b/host/docs/transport.rst index 018f909c1..6b9d28bfa 100644 --- a/host/docs/transport.rst +++ b/host/docs/transport.rst @@ -34,10 +34,9 @@ The following parameters can be used to alter the transport's default behavior:  * **num_recv_frames:** The number of receive buffers to allocate  * **send_frame_size:** The size of a single send buffer in bytes  * **num_send_frames:** The number of send buffers to allocate -* **concurrency_hint:** The number of threads to run the IO service -**Note:** num_send_frames will not have an effect -as the asynchronous send implementation is currently disabled. +**Note:** num_recv_frames and num_send_frames will not have an effect +as the asynchronous send implementation is currently unimplemented.  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  Flow control parameters diff --git a/host/examples/test_async_messages.cpp b/host/examples/test_async_messages.cpp index 7f1094ee0..7f922ed35 100644 --- a/host/examples/test_async_messages.cpp +++ b/host/examples/test_async_messages.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -91,8 +91,7 @@ bool test_underflow_message(uhd::usrp::multi_usrp::sptr usrp){      md.end_of_burst   = false;      md.has_time_spec  = false; -    usrp->get_device()->send( -        NULL, 0, md, +    usrp->get_device()->send("", 0, md,          uhd::io_type_t::COMPLEX_FLOAT32,          uhd::device::SEND_MODE_FULL_BUFF      ); @@ -139,8 +138,7 @@ bool test_time_error_message(uhd::usrp::multi_usrp::sptr usrp){      usrp->set_time_now(uhd::time_spec_t(200.0)); //time at 200s -    usrp->get_device()->send( -        NULL, 0, md, +    usrp->get_device()->send("", 0, md,          uhd::io_type_t::COMPLEX_FLOAT32,          uhd::device::SEND_MODE_FULL_BUFF      ); diff --git a/host/examples/tx_waveforms.cpp b/host/examples/tx_waveforms.cpp index dd18d3174..05d49a8b3 100644 --- a/host/examples/tx_waveforms.cpp +++ b/host/examples/tx_waveforms.cpp @@ -171,7 +171,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){      //send a mini EOB packet      md.start_of_burst = false;      md.end_of_burst   = true; -    usrp->get_device()->send(NULL, 0, md, +    usrp->get_device()->send("", 0, md,          uhd::io_type_t::COMPLEX_FLOAT32,          uhd::device::SEND_MODE_FULL_BUFF      ); diff --git a/host/include/uhd/CMakeLists.txt b/host/include/uhd/CMakeLists.txt index fee1270e9..b7a22cf0b 100644 --- a/host/include/uhd/CMakeLists.txt +++ b/host/include/uhd/CMakeLists.txt @@ -25,7 +25,6 @@ INSTALL(FILES      config.hpp      convert.hpp      device.hpp -    device.ipp      version.hpp      wax.hpp      DESTINATION ${INCLUDE_DIR}/uhd diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index 992276928..50237472b 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -22,11 +22,11 @@  #include <uhd/types/device_addr.hpp>  #include <uhd/types/metadata.hpp>  #include <uhd/types/io_type.hpp> +#include <uhd/types/ref_vector.hpp>  #include <uhd/wax.hpp>  #include <boost/utility.hpp>  #include <boost/shared_ptr.hpp>  #include <boost/function.hpp> -#include <vector>  namespace uhd{ @@ -96,6 +96,12 @@ public:          RECV_MODE_ONE_PACKET = 1      }; +    //! Typedef for a pointer to a single, or a collection of send buffers +    typedef ref_vector<const void *> send_buffs_type; + +    //! Typedef for a pointer to a single, or a collection of recv buffers +    typedef ref_vector<void *> recv_buffs_type; +      /*!       * Send buffers containing IF data described by the metadata.       * @@ -121,7 +127,7 @@ public:       * \return the number of samples sent       */      virtual size_t send( -        const std::vector<const void *> &buffs, +        const send_buffs_type &buffs,          size_t nsamps_per_buff,          const tx_metadata_t &metadata,          const io_type_t &io_type, @@ -130,18 +136,6 @@ public:      ) = 0;      /*! -     * Convenience wrapper for send that takes a single buffer. -     */ -    size_t send( -        const void *buff, -        size_t nsamps_per_buff, -        const tx_metadata_t &metadata, -        const io_type_t &io_type, -        send_mode_t send_mode, -        double timeout = 0.1 -    ); - -    /*!       * Receive buffers containing IF data described by the metadata.       *       * Receive handles fragmentation as follows: @@ -173,7 +167,7 @@ public:       * \return the number of samples received or 0 on error       */      virtual size_t recv( -        const std::vector<void *> &buffs, +        const recv_buffs_type &buffs,          size_t nsamps_per_buff,          rx_metadata_t &metadata,          const io_type_t &io_type, @@ -182,18 +176,6 @@ public:      ) = 0;      /*! -     * Convenience wrapper for recv that takes a single buffer. -     */ -    size_t recv( -        void *buff, -        size_t nsamps_per_buff, -        rx_metadata_t &metadata, -        const io_type_t &io_type, -        recv_mode_t recv_mode, -        double timeout = 0.1 -    ); - -    /*!       * Get the maximum number of samples per packet on send.       * \return the number of samples       */ @@ -219,6 +201,4 @@ public:  } //namespace uhd -#include <uhd/device.ipp> -  #endif /* INCLUDED_UHD_DEVICE_HPP */ diff --git a/host/include/uhd/device.ipp b/host/include/uhd/device.ipp deleted file mode 100644 index e2e51ecd0..000000000 --- a/host/include/uhd/device.ipp +++ /dev/null @@ -1,55 +0,0 @@ -// -// Copyright 2010 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program.  If not, see <http://www.gnu.org/licenses/>. -// - -#ifndef INCLUDED_UHD_DEVICE_IPP -#define INCLUDED_UHD_DEVICE_IPP - -namespace uhd{ - -    UHD_INLINE size_t device::send( -        const void *buff, -        size_t nsamps_per_buff, -        const tx_metadata_t &metadata, -        const io_type_t &io_type, -        send_mode_t send_mode, -        double timeout -    ){ -        return this->send( -            std::vector<const void *>(1, buff), -            nsamps_per_buff, metadata, -            io_type, send_mode, timeout -        ); -    } - -    UHD_INLINE size_t device::recv( -        void *buff, -        size_t nsamps_per_buff, -        rx_metadata_t &metadata, -        const io_type_t &io_type, -        recv_mode_t recv_mode, -        double timeout -    ){ -        return this->recv( -            std::vector<void *>(1, buff), -            nsamps_per_buff, metadata, -            io_type, recv_mode, timeout -        ); -    } - -} //namespace uhd - -#endif /* INCLUDED_UHD_DEVICE_IPP */ diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index aca93b071..412d73f17 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -18,8 +18,7 @@  #ifndef INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP  #define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP -#include <uhd/config.hpp> -#include <boost/shared_ptr.hpp> +#include <uhd/transport/bounded_buffer.ipp> //detail  namespace uhd{ namespace transport{ @@ -32,13 +31,16 @@ namespace uhd{ namespace transport{       */      template <typename elem_type> class bounded_buffer{      public: -        typedef boost::shared_ptr<bounded_buffer<elem_type> > sptr;          /*! -         * Make a new bounded buffer object. +         * Create a new bounded buffer object.           * \param capacity the bounded_buffer capacity           */ -        static sptr make(size_t capacity); +        bounded_buffer(size_t capacity): +            _detail(capacity) +        { +            /* NOP */ +        }          /*!           * Push a new element into the bounded buffer. @@ -47,14 +49,18 @@ namespace uhd{ namespace transport{           * \param elem the new element to push           * \return true if the element fit without popping for space           */ -        virtual bool push_with_pop_on_full(const elem_type &elem) = 0; +        bool push_with_pop_on_full(const elem_type &elem){ +            return _detail.push_with_pop_on_full(elem); +        }          /*!           * Push a new element into the bounded_buffer.           * Wait until the bounded_buffer becomes non-full.           * \param elem the new element to push           */ -        virtual void push_with_wait(const elem_type &elem) = 0; +        void push_with_wait(const elem_type &elem){ +            return _detail.push_with_wait(elem); +        }          /*!           * Push a new element into the bounded_buffer. @@ -63,14 +69,27 @@ namespace uhd{ namespace transport{           * \param timeout the timeout in seconds           * \return false when the operation times out           */ -        virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0; +        bool push_with_timed_wait(const elem_type &elem, double timeout){ +            return _detail.push_with_timed_wait(elem, timeout); +        } + +        /*! +         * Pop an element from the bounded_buffer immediately. +         * \param elem the element reference pop to +         * \return false when the bounded_buffer is empty +         */ +        bool pop_with_haste(elem_type &elem){ +            return _detail.pop_with_haste(elem); +        }          /*!           * Pop an element from the bounded_buffer.           * Wait until the bounded_buffer becomes non-empty.           * \param elem the element reference pop to           */ -        virtual void pop_with_wait(elem_type &elem) = 0; +        void pop_with_wait(elem_type &elem){ +            return _detail.pop_with_wait(elem); +        }          /*!           * Pop an element from the bounded_buffer. @@ -79,16 +98,13 @@ namespace uhd{ namespace transport{           * \param timeout the timeout in seconds           * \return false when the operation times out           */ -        virtual bool pop_with_timed_wait(elem_type &elem, double timeout) = 0; +        bool pop_with_timed_wait(elem_type &elem, double timeout){ +            return _detail.pop_with_timed_wait(elem, timeout); +        } -        /*! -         * Clear all elements from the bounded_buffer. -         */ -        virtual void clear(void) = 0; +    private: bounded_buffer_detail<elem_type> _detail;      };  }} //namespace -#include <uhd/transport/bounded_buffer.ipp> -  #endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_HPP */ diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 4fbe3f085..7be2f987c 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -18,22 +18,23 @@  #ifndef INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP  #define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP +#include <uhd/config.hpp>  #include <boost/bind.hpp>  #include <boost/function.hpp>  #include <boost/circular_buffer.hpp>  #include <boost/thread/condition.hpp>  #include <boost/thread/locks.hpp> -#include <boost/date_time/posix_time/posix_time_types.hpp>  namespace uhd{ namespace transport{ namespace{ /*anon*/ -    template <typename elem_type> -    class bounded_buffer_impl : public bounded_buffer<elem_type>{ +    template <typename elem_type> class bounded_buffer_detail{      public: -        bounded_buffer_impl(size_t capacity) : _buffer(capacity){ -            _not_full_fcn = boost::bind(&bounded_buffer_impl<elem_type>::not_full, this); -            _not_empty_fcn = boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this); +        bounded_buffer_detail(size_t capacity): +            _buffer(capacity) +        { +            _not_full_fcn  = boost::bind(&bounded_buffer_detail<elem_type>::not_full, this); +            _not_empty_fcn = boost::bind(&bounded_buffer_detail<elem_type>::not_empty, this);          }          UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ @@ -72,6 +73,15 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              return true;          } +        UHD_INLINE bool pop_with_haste(elem_type &elem){ +            boost::mutex::scoped_lock lock(_mutex); +            if(_buffer.empty()) return false; +            elem = this->pop_back(); +            lock.unlock(); +            _full_cond.notify_one(); +            return true; +        } +          UHD_INLINE void pop_with_wait(elem_type &elem){              boost::mutex::scoped_lock lock(_mutex);              _empty_cond.wait(lock, _not_empty_fcn); @@ -91,13 +101,6 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              return true;          } -        UHD_INLINE void clear(void){ -            boost::mutex::scoped_lock lock(_mutex); -            while (not_empty()) this->pop_back(); -            lock.unlock(); -            _full_cond.notify_one(); -        } -      private:          boost::mutex _mutex;          boost::condition _empty_cond, _full_cond; @@ -128,13 +131,4 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/      };  }}} //namespace -namespace uhd{ namespace transport{ - -    template <typename elem_type> typename bounded_buffer<elem_type>::sptr -    bounded_buffer<elem_type>::make(size_t capacity){ -        return typename bounded_buffer<elem_type>::sptr(new bounded_buffer_impl<elem_type>(capacity)); -    } - -}} //namespace -  #endif /* INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP */ diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 7d8fb4b83..d5a536b27 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -19,7 +19,6 @@  #define INCLUDED_UHD_TRANSPORT_ZERO_COPY_HPP  #include <uhd/config.hpp> -#include <boost/asio/buffer.hpp>  #include <boost/utility.hpp>  #include <boost/shared_ptr.hpp>  #include <boost/function.hpp> @@ -40,13 +39,13 @@ namespace uhd{ namespace transport{           * Make a safe managed receive buffer:           * A safe managed buffer ensures that release is called once,           * either by the user or automatically upon deconstruction. -         * \param buff a reference to the constant buffer +         * \param buff a pointer into read-only memory +         * \param size the length of the buffer in bytes           * \param release_fcn callback to release the memory           * \return a new managed receive buffer           */          static sptr make_safe( -            const boost::asio::const_buffer &buff, -            const release_fcn_t &release_fcn +            const void *buff, size_t size, const release_fcn_t &release_fcn          );          /*! @@ -57,28 +56,24 @@ namespace uhd{ namespace transport{          virtual void release(void) = 0;          /*! -         * Get the size of the underlying buffer. -         * \return the number of bytes -         */ -        inline size_t size(void) const{ -            return boost::asio::buffer_size(this->get()); -        } - -        /*!           * Get a pointer to the underlying buffer.           * \return a pointer into memory           */          template <class T> inline T cast(void) const{ -            return boost::asio::buffer_cast<T>(this->get()); +            return static_cast<T>(this->get_buff());          } -    private:          /*! -         * Get a reference to the internal const buffer. -         * The buffer has a reference to memory and a size. -         * \return a boost asio const buffer +         * Get the size of the underlying buffer. +         * \return the number of bytes           */ -        virtual const boost::asio::const_buffer &get(void) const = 0; +        inline size_t size(void) const{ +            return this->get_size(); +        } + +    private: +        virtual const void *get_buff(void) const = 0; +        virtual size_t get_size(void) const = 0;      };      /*! @@ -96,13 +91,13 @@ namespace uhd{ namespace transport{           * A safe managed buffer ensures that commit is called once,           * either by the user or automatically upon deconstruction.           * In the later case, the deconstructor will call commit(0). -         * \param buff a reference to the mutable buffer +         * \param buff a pointer into writable memory +         * \param size the length of the buffer in bytes           * \param commit_fcn callback to commit the memory           * \return a new managed send buffer           */          static sptr make_safe( -            const boost::asio::mutable_buffer &buff, -            const commit_fcn_t &commit_fcn +            void *buff, size_t size, const commit_fcn_t &commit_fcn          );          /*! @@ -114,28 +109,24 @@ namespace uhd{ namespace transport{          virtual void commit(size_t num_bytes) = 0;          /*! -         * Get the size of the underlying buffer. -         * \return the number of bytes -         */ -        inline size_t size(void) const{ -            return boost::asio::buffer_size(this->get()); -        } - -        /*!           * Get a pointer to the underlying buffer.           * \return a pointer into memory           */          template <class T> inline T cast(void) const{ -            return boost::asio::buffer_cast<T>(this->get()); +            return static_cast<T>(this->get_buff());          } -    private:          /*! -         * Get a reference to the internal mutable buffer. -         * The buffer has a reference to memory and a size. -         * \return a boost asio mutable buffer +         * Get the size of the underlying buffer. +         * \return the number of bytes           */ -        virtual const boost::asio::mutable_buffer &get(void) const = 0; +        inline size_t size(void) const{ +            return this->get_size(); +        } + +    private: +        virtual void *get_buff(void) const = 0; +        virtual size_t get_size(void) const = 0;      };      /*! diff --git a/host/include/uhd/types/CMakeLists.txt b/host/include/uhd/types/CMakeLists.txt index 51be164aa..c856e5568 100644 --- a/host/include/uhd/types/CMakeLists.txt +++ b/host/include/uhd/types/CMakeLists.txt @@ -26,6 +26,7 @@ INSTALL(FILES      metadata.hpp      otw_type.hpp      ranges.hpp +    ref_vector.hpp      sensors.hpp      serial.hpp      stream_cmd.hpp diff --git a/host/include/uhd/types/ref_vector.hpp b/host/include/uhd/types/ref_vector.hpp new file mode 100644 index 000000000..ef970802f --- /dev/null +++ b/host/include/uhd/types/ref_vector.hpp @@ -0,0 +1,69 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_TYPES_REF_VECTOR_HPP +#define INCLUDED_UHD_TYPES_REF_VECTOR_HPP + +#include <uhd/config.hpp> + +namespace uhd{ + +/*! + * Reference vector: + *  - Provides a std::vector-like interface for an array. + *  - Statically sized, and does not manage the memory. + */ +template <typename T> class ref_vector{ +public: +    //! Create a reference vector of length one from a pointer +    template <typename Ptr> ref_vector(Ptr *ptr): +        _mem(memp_t(&ptr)), _size(1) +    { +        /* NOP */ +    } + +    //! Create a reference vector from a std::vector container +    template <typename Range> ref_vector(const Range &range): +        _mem(memp_t(&range[0])), _size(range.size()) +    { +        /* NOP */ +    } + +    //! Create a reference vector from a memory pointer and size +    ref_vector(T *mem, size_t size): +        _mem(mem), _size(size) +    { +        /* NOP */ +    } + +    T &operator[](size_t index) const{ +        return _mem[index]; +    } + +    size_t size(void) const{ +        return _size; +    } + +private: +    typedef T* memp_t; +    const memp_t _mem; +    const size_t _size; +}; + +} //namespace uhd + +#endif /* INCLUDED_UHD_TYPES_REF_VECTOR_HPP */ diff --git a/host/lib/convert/gen_convert_pred.py b/host/lib/convert/gen_convert_pred.py index fea7db4cc..d2f90bf41 100644 --- a/host/lib/convert/gen_convert_pred.py +++ b/host/lib/convert/gen_convert_pred.py @@ -21,8 +21,6 @@ TMPL_TEXT = """  /***********************************************************************   * This file was generated by $file on $time.strftime("%c")   **********************************************************************/ -typedef size_t pred_type; -  \#include <boost/tokenizer.hpp>  \#include <boost/lexical_cast.hpp>  \#include <boost/detail/endian.hpp> @@ -31,6 +29,9 @@ typedef size_t pred_type;  \#include <string>  \#include <vector> +typedef size_t pred_type; +typedef std::vector<pred_type> pred_vector_type; +  enum dir_type{      DIR_OTW_TO_CPU = 0,      DIR_CPU_TO_OTW = 1 @@ -101,46 +102,60 @@ pred_type make_pred(const std::string &markup, dir_type &dir){      return pred;  } +#define pred_table_wildcard pred_type(~0) +#define pred_table_max_size size_t(128) +#define pred_table_index(e) (pred_type(e) & 0x7f) + +static pred_vector_type get_pred_byte_order_table(void){ +    pred_vector_type table(pred_table_max_size, pred_table_wildcard); +    \#ifdef BOOST_BIG_ENDIAN +    table[pred_table_index(otw_type_t::BO_BIG_ENDIAN)]    = $ph.nswap_p; +    table[pred_table_index(otw_type_t::BO_LITTLE_ENDIAN)] = $ph.bswap_p; +    \#else +    table[pred_table_index(otw_type_t::BO_BIG_ENDIAN)]    = $ph.bswap_p; +    table[pred_table_index(otw_type_t::BO_LITTLE_ENDIAN)] = $ph.nswap_p; +    \#endif +    table[pred_table_index(otw_type_t::BO_NATIVE)]        = $ph.nswap_p; +    return table; +} + +static pred_vector_type get_pred_io_type_table(void){ +    pred_vector_type table(pred_table_max_size, pred_table_wildcard); +    table[pred_table_index(io_type_t::COMPLEX_FLOAT64)]    = $ph.fc64_p; +    table[pred_table_index(io_type_t::COMPLEX_FLOAT32)]    = $ph.fc32_p; +    table[pred_table_index(io_type_t::COMPLEX_INT16)]      = $ph.sc16_p; +    return table; +} + +static pred_vector_type get_pred_num_io_table(void){ +    pred_vector_type table(pred_table_max_size, pred_table_wildcard); +    table[1] = $ph.chan1_p; +    table[2] = $ph.chan2_p; +    table[3] = $ph.chan3_p; +    table[4] = $ph.chan4_p; +    return table; +} +  UHD_INLINE pred_type make_pred(      const io_type_t &io_type,      const otw_type_t &otw_type,      size_t num_inputs,      size_t num_outputs  ){ -    pred_type pred = 0; +    pred_type pred = $ph.item32_p; //only item32 supported as of now -    switch(otw_type.byteorder){ -    \#ifdef BOOST_BIG_ENDIAN -    case otw_type_t::BO_BIG_ENDIAN:    pred |= $ph.nswap_p; break; -    case otw_type_t::BO_LITTLE_ENDIAN: pred |= $ph.bswap_p; break; -    \#else -    case otw_type_t::BO_BIG_ENDIAN:    pred |= $ph.bswap_p; break; -    case otw_type_t::BO_LITTLE_ENDIAN: pred |= $ph.nswap_p; break; -    \#endif -    case otw_type_t::BO_NATIVE:        pred |= $ph.nswap_p; break; -    default: throw pred_error("unhandled otw byteorder type"); -    } +    static const pred_vector_type pred_byte_order_table(get_pred_byte_order_table()); +    pred |= pred_byte_order_table[pred_table_index(otw_type.byteorder)]; -    switch(otw_type.get_sample_size()){ -    case sizeof(boost::uint32_t): pred |= $ph.item32_p; break; -    default: throw pred_error("unhandled otw sample size"); -    } +    static const pred_vector_type pred_io_type_table(get_pred_io_type_table()); +    pred |= pred_io_type_table[pred_table_index(io_type.tid)]; -    switch(io_type.tid){ -    case io_type_t::COMPLEX_FLOAT32: pred |= $ph.fc32_p; break; -    case io_type_t::COMPLEX_INT16:   pred |= $ph.sc16_p; break; -    //case io_type_t::COMPLEX_INT8:    pred |= $ph.sc8_p; break; -    case io_type_t::COMPLEX_FLOAT64: pred |= $ph.fc64_p; break; -    default: throw pred_error("unhandled io type id"); -    } +    static const pred_vector_type pred_num_io_table(get_pred_num_io_table()); +    pred |= pred_num_io_table[pred_table_index(num_inputs*num_outputs)]; -    switch(num_inputs*num_outputs){ //FIXME treated as one value -    case 1: pred |= $ph.chan1_p; break; -    case 2: pred |= $ph.chan2_p; break; -    case 3: pred |= $ph.chan3_p; break; -    case 4: pred |= $ph.chan4_p; break; -    default: throw pred_error("unhandled number of channels"); -    } +    if (pred == pred_table_wildcard) throw pred_error( +        "unhanded input combination for make_pred()" +    );      return pred;  } diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index dbe026ba3..3ba562d68 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -1,6 +1,6 @@  #!/usr/bin/env python  # -# Copyright 2010 Ettus Research LLC +# Copyright 2010-2011 Ettus Research LLC  #  # This program is free software: you can redistribute it and/or modify  # it under the terms of the GNU General Public License as published by @@ -35,6 +35,7 @@ TMPL_TEXT = """  \#include <uhd/utils/byteswap.hpp>  \#include <boost/detail/endian.hpp>  \#include <stdexcept> +\#include <vector>  //define the endian macros to convert integers  \#ifdef BOOST_BIG_ENDIAN @@ -48,18 +49,26 @@ TMPL_TEXT = """  using namespace uhd;  using namespace uhd::transport; -######################################################################## -#def gen_code($XE_MACRO, $suffix) -######################################################################## +typedef size_t pred_type; +typedef std::vector<pred_type> pred_table_type; +#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff) + +static pred_table_type get_pred_unpack_table(void){ +    pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28) +    for (size_t i = 0; i < table.size(); i++){ +        boost::uint32_t vrt_hdr_word = i << 20; +        if(vrt_hdr_word & $hex(0x1 << 28)) table[i] |= $hex($sid_p); +        if(vrt_hdr_word & $hex(0x1 << 27)) table[i] |= $hex($cid_p); +        if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p); +        if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p); +        if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); +    } +    return table; +}  ######################################################################## -## setup predicates +#def gen_code($XE_MACRO, $suffix)  ######################################################################## -#set $sid_p = 0b00001 -#set $cid_p = 0b00010 -#set $tsi_p = 0b00100 -#set $tsf_p = 0b01000 -#set $tlr_p = 0b10000  void vrt::if_hdr_pack_$(suffix)(      boost::uint32_t *packet_buff, @@ -67,7 +76,7 @@ void vrt::if_hdr_pack_$(suffix)(  ){      boost::uint32_t vrt_hdr_flags = 0; -    boost::uint8_t pred = 0; +    pred_type pred = 0;      if (if_packet_info.has_sid) pred |= $hex($sid_p);      if (if_packet_info.has_cid) pred |= $hex($cid_p);      if (if_packet_info.has_tsi) pred |= $hex($tsi_p); @@ -159,12 +168,8 @@ void vrt::if_hdr_unpack_$(suffix)(      //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented      //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented -    boost::uint8_t pred = 0; -    if(vrt_hdr_word & $hex(0x1 << 28)) pred |= $hex($sid_p); -    if(vrt_hdr_word & $hex(0x1 << 27)) pred |= $hex($cid_p); -    if(vrt_hdr_word & $hex(0x3 << 22)) pred |= $hex($tsi_p); -    if(vrt_hdr_word & $hex(0x3 << 20)) pred |= $hex($tsf_p); -    if(vrt_hdr_word & $hex(0x1 << 26)) pred |= $hex($tlr_p); +    static const pred_table_type pred_unpack_table(get_pred_unpack_table()); +    const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];      switch(pred){      #for $pred in range(2**5) @@ -200,7 +205,7 @@ void vrt::if_hdr_unpack_$(suffix)(              if_packet_info.has_tsf = true;              if_packet_info.tsf = boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 32;              #set $num_header_words += 1 -            if_packet_info.tsf |= boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 0; +            if_packet_info.tsf |= $(XE_MACRO)(packet_buff[$num_header_words]);              #set $num_header_words += 1          #else              if_packet_info.has_tsf = false; @@ -239,4 +244,12 @@ def parse_tmpl(_tmpl_text, **kwargs):  if __name__ == '__main__':      import sys -    open(sys.argv[1], 'w').write(parse_tmpl(TMPL_TEXT, file=__file__)) +    open(sys.argv[1], 'w').write(parse_tmpl( +        TMPL_TEXT, +        file=__file__, +        sid_p = 0b00001, +        cid_p = 0b00010, +        tsi_p = 0b00100, +        tsf_p = 0b01000, +        tlr_p = 0b10000, +    )) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 311a8953b..ca37f351f 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -23,7 +23,6 @@  #include <uhd/utils/assert.hpp>  #include <boost/foreach.hpp>  #include <boost/thread.hpp> -#include <boost/enable_shared_from_this.hpp>  #include <vector>  #include <iostream> @@ -99,8 +98,7 @@ private:      bool _input;      //! hold a bounded buffer of completed transfers -    typedef bounded_buffer<libusb_transfer *> lut_buff_type; -    lut_buff_type::sptr _completed_list; +    bounded_buffer<libusb_transfer *> _completed_list;      //! a list of all transfer structs we allocated      std::vector<libusb_transfer *> _all_luts; @@ -134,7 +132,7 @@ static void callback(libusb_transfer *lut){   * \param pointer to libusb_transfer   */  void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ -    _completed_list->push_with_wait(lut); +    _completed_list.push_with_wait(lut);  } @@ -153,9 +151,9 @@ usb_endpoint::usb_endpoint(  ):      _handle(handle),      _endpoint(endpoint), -    _input(input) +    _input(input), +    _completed_list(num_transfers)  { -    _completed_list = lut_buff_type::make(num_transfers);      _buffer_pool = buffer_pool::make(num_transfers, transfer_size);      for (size_t i = 0; i < num_transfers; i++){          _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size)); @@ -163,7 +161,7 @@ usb_endpoint::usb_endpoint(          //input luts are immediately submitted to be filled          //output luts go into the completed list as free buffers          if (_input) this->submit(_all_luts.back()); -        else _completed_list->push_with_wait(_all_luts.back()); +        else _completed_list.push_with_wait(_all_luts.back());      }  } @@ -272,15 +270,15 @@ void usb_endpoint::print_transfer_status(libusb_transfer *lut){  libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){      boost::this_thread::disable_interruption di; //disable because the wait can throw -    libusb_transfer *lut; -    if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; +    libusb_transfer *lut = NULL; +    if (_completed_list.pop_with_timed_wait(lut, timeout)) return lut;      return NULL;  }  /***********************************************************************   * USB zero_copy device class   **********************************************************************/ -class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> { +class libusb_zero_copy_impl : public usb_zero_copy{  public:      libusb_zero_copy_impl( @@ -400,8 +398,8 @@ managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){      }      else {          return managed_recv_buffer::make_safe( -            boost::asio::const_buffer(lut->buffer, lut->actual_length), -            boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) +            lut->buffer, lut->actual_length, +            boost::bind(&libusb_zero_copy_impl::release, this, lut)          );      }  } @@ -420,8 +418,8 @@ managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){      }      else {          return managed_send_buffer::make_safe( -            boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), -            boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) +            lut->buffer, this->get_send_frame_size(), +            boost::bind(&libusb_zero_copy_impl::commit, this, lut, _1)          );      }  } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index a80de7b87..48b0941eb 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -19,53 +19,108 @@  #include <uhd/transport/udp_simple.hpp> //mtu  #include <uhd/transport/bounded_buffer.hpp>  #include <uhd/transport/buffer_pool.hpp> -#include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/assert.hpp>  #include <uhd/utils/warning.hpp>  #include <boost/asio.hpp>  #include <boost/format.hpp> -#include <boost/thread/thread.hpp> -#include <boost/enable_shared_from_this.hpp>  #include <iostream> +#include <vector>  using namespace uhd;  using namespace uhd::transport;  namespace asio = boost::asio; -//Define this to the the boost async io calls to perform receive. -//Otherwise, get_recv_buff uses a blocking receive with timeout. -#define USE_ASIO_ASYNC_RECV - -//Define this to the the boost async io calls to perform send. -//Otherwise, the commit callback uses a blocking send. -//#define USE_ASIO_ASYNC_SEND - -//The asio async receive implementation is broken for some macos. -//Just disable for all macos since we don't know the problem. -#if defined(UHD_PLATFORM_MACOS) && defined(USE_ASIO_ASYNC_RECV) -    #undef USE_ASIO_ASYNC_RECV -#endif - -//The number of service threads to spawn for async ASIO: -//A single concurrent thread for io_service seems to be the fastest. -//Threads are disabled when no async implementations are enabled. -#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) -static const size_t CONCURRENCY_HINT = 1; -#else -static const size_t CONCURRENCY_HINT = 0; -#endif -  //A reasonable number of frames for send/recv and async/sync  static const size_t DEFAULT_NUM_FRAMES = 32;  /*********************************************************************** + * Reusable managed receiver buffer: + *  - Initialize with memory and a release callback. + *  - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: +    typedef boost::shared_ptr<udp_zero_copy_asio_mrb> sptr; +    typedef boost::function<void(udp_zero_copy_asio_mrb *)> release_cb_type; + +    udp_zero_copy_asio_mrb(void *mem, const release_cb_type &release_cb): +        _mem(mem), _release_cb(release_cb){/* NOP */} + +    void release(void){ +        if (_expired) return; +        this->_release_cb(this); +        _expired = true; +    } + +    sptr get_new(size_t len){ +        _expired = false; +        _len = len; +        return sptr(this, &udp_zero_copy_asio_mrb::fake_deleter); +    } + +    template <class T> T cast(void) const{return static_cast<T>(_mem);} + +private: +    static void fake_deleter(void *obj){ +        static_cast<udp_zero_copy_asio_mrb *>(obj)->release(); +    } + +    const void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _len;} + +    bool _expired; +    void *_mem; +    size_t _len; +    release_cb_type _release_cb; +}; + +/*********************************************************************** + * Reusable managed send buffer: + *  - Initialize with memory and a commit callback. + *  - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: +    typedef boost::shared_ptr<udp_zero_copy_asio_msb> sptr; +    typedef boost::function<void(udp_zero_copy_asio_msb *, size_t)> commit_cb_type; + +    udp_zero_copy_asio_msb(void *mem, const commit_cb_type &commit_cb): +        _mem(mem), _commit_cb(commit_cb){/* NOP */} + +    void commit(size_t len){ +        if (_expired) return; +        this->_commit_cb(this, len); +        _expired = true; +    } + +    sptr get_new(size_t len){ +        _expired = false; +        _len = len; +        return sptr(this, &udp_zero_copy_asio_msb::fake_deleter); +    } + +private: +    static void fake_deleter(void *obj){ +        static_cast<udp_zero_copy_asio_msb *>(obj)->commit(0); +    } + +    void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _len;} + +    bool _expired; +    void *_mem; +    size_t _len; +    commit_cb_type _commit_cb; +}; + +/***********************************************************************   * Zero Copy UDP implementation with ASIO:   *   This is the portable zero copy implementation for systems   *   where a faster, platform specific solution is not available.   *   However, it is not a true zero copy implementation as each   *   send and recv requires a copy operation to/from userspace.   **********************************************************************/ -class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { +class udp_zero_copy_asio_impl : public udp_zero_copy{  public:      typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; @@ -78,8 +133,9 @@ public:          _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),          _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),          _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), -        _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), -        _io_service(_concurrency_hint) +        _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), +        _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), +        _pending_recv_buffs(_num_recv_frames), _pending_send_buffs(_num_send_frames)      {          //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; @@ -93,39 +149,28 @@ public:          _socket->open(asio::ip::udp::v4());          _socket->connect(receiver_endpoint);          _sock_fd = _socket->native(); -    } -    ~udp_zero_copy_asio_impl(void){ -        delete _work; //allow io_service run to complete -        _thread_group.join_all(); //wait for service threads to exit -        delete _socket; -    } - -    void init(void){ -        //allocate all recv frames and release them to begin xfers -        _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); -        _recv_buffer_pool = buffer_pool::make(_num_recv_frames, _recv_frame_size); -        for (size_t i = 0; i < _num_recv_frames; i++){ -            release(_recv_buffer_pool->at(i)); +        //allocate re-usable managed receive buffers +        for (size_t i = 0; i < get_num_recv_frames(); i++){ +            _mrb_pool.push_back(udp_zero_copy_asio_mrb::sptr( +                new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), +                boost::bind(&udp_zero_copy_asio_impl::release, this, _1)) +            )); +            handle_recv(_mrb_pool.back().get());          } -        //allocate all send frames and push them into the fifo -        _pending_send_buffs = pending_buffs_type::make(_num_send_frames); -        _send_buffer_pool = buffer_pool::make(_num_send_frames, _send_frame_size); -        for (size_t i = 0; i < _num_send_frames; i++){ -            handle_send(_send_buffer_pool->at(i)); +        //allocate re-usable managed send buffers +        for (size_t i = 0; i < get_num_send_frames(); i++){ +            _msb_pool.push_back(udp_zero_copy_asio_msb::sptr( +                new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), +                boost::bind(&udp_zero_copy_asio_impl::commit, this, _1, _2)) +            )); +            handle_send(_msb_pool.back().get());          } - -        //spawn the service threads that will run the io service -        _work = new asio::io_service::work(_io_service); //new work to delete later -        for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( -            boost::bind(&udp_zero_copy_asio_impl::service, this) -        );      } -    void service(void){ -        set_thread_priority_safe(); -        _io_service.run(); +    ~udp_zero_copy_asio_impl(void){ +        delete _socket;      }      //get size for internal socket buffer @@ -142,49 +187,12 @@ public:          return get_buff_size<Opt>();      } -    //! handle a recv callback -> push the filled memory into the fifo -    UHD_INLINE void handle_recv(void *mem, size_t len){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); +    UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ +        _pending_recv_buffs.push_with_pop_on_full(mrb);      } -    //////////////////////////////////////////////////////////////////// -    #ifdef USE_ASIO_ASYNC_RECV -    //////////////////////////////////////////////////////////////////// -    //! pop a filled recv buffer off of the fifo and bind with the release callback      managed_recv_buffer::sptr get_recv_buff(double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        asio::mutable_buffer buff; -        if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ -            return managed_recv_buffer::make_safe( -                buff, boost::bind( -                    &udp_zero_copy_asio_impl::release, -                    shared_from_this(), -                    asio::buffer_cast<void*>(buff) -                ) -            ); -        } -        return managed_recv_buffer::sptr(); -    } - -    //! release a recv buffer -> start an async recv on the buffer -    void release(void *mem){ -        _socket->async_receive( -            boost::asio::buffer(mem, this->get_recv_frame_size()), -            boost::bind( -                &udp_zero_copy_asio_impl::handle_recv, -                shared_from_this(), mem, -                asio::placeholders::bytes_transferred -            ) -        ); -    } - -    //////////////////////////////////////////////////////////////////// -    #else /*USE_ASIO_ASYNC_RECV*/ -    //////////////////////////////////////////////////////////////////// -    managed_recv_buffer::sptr get_recv_buff(double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        asio::mutable_buffer buff; +        udp_zero_copy_asio_mrb *mrb;          //setup timeval for timeout          timeval tv; @@ -196,104 +204,57 @@ public:          FD_ZERO(&rset);          FD_SET(_sock_fd, &rset); -        //call select to perform timed wait and grab an available buffer with wait +        //call select to perform timed wait and grab an available buffer now          //if the condition is true, call receive and return the managed buffer          if ( -            ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and -            _pending_recv_buffs->pop_with_timed_wait(buff, timeout) +            ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 +            and _pending_recv_buffs.pop_with_haste(mrb)          ){ -            return managed_recv_buffer::make_safe( -                asio::buffer( -                    boost::asio::buffer_cast<void *>(buff), -                    _socket->receive(asio::buffer(buff)) -                ), -                boost::bind( -                    &udp_zero_copy_asio_impl::release, -                    shared_from_this(), -                    asio::buffer_cast<void*>(buff) -                ) -            ); +            return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0));          }          return managed_recv_buffer::sptr();      } -    void release(void *mem){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        handle_recv(mem, this->get_recv_frame_size()); +    void release(udp_zero_copy_asio_mrb *mrb){ +        handle_recv(mrb);      } -    //////////////////////////////////////////////////////////////////// -    #endif /*USE_ASIO_ASYNC_RECV*/ -    //////////////////////////////////////////////////////////////////// -      size_t get_num_recv_frames(void) const {return _num_recv_frames;}      size_t get_recv_frame_size(void) const {return _recv_frame_size;} -    //! handle a send callback -> push the emptied memory into the fifo -    UHD_INLINE void handle_send(void *mem){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); +    UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ +        _pending_send_buffs.push_with_pop_on_full(msb);      } -    //! pop an empty send buffer off of the fifo and bind with the commit callback -    managed_send_buffer::sptr get_send_buff(double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        asio::mutable_buffer buff; -        if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ -            return managed_send_buffer::make_safe( -                buff, boost::bind( -                    &udp_zero_copy_asio_impl::commit, -                    shared_from_this(), -                    asio::buffer_cast<void*>(buff), _1 -                ) -            ); +    managed_send_buffer::sptr get_send_buff(double){ +        udp_zero_copy_asio_msb *msb; +        if (_pending_send_buffs.pop_with_haste(msb)){ +            return msb->get_new(_send_frame_size);          }          return managed_send_buffer::sptr();      } -    //////////////////////////////////////////////////////////////////// -    #ifdef USE_ASIO_ASYNC_SEND -    //////////////////////////////////////////////////////////////////// -    //! commit a send buffer -> start an async send on the buffer -    void commit(void *mem, size_t len){ -        _socket->async_send( -            boost::asio::buffer(mem, len), -            boost::bind( -                &udp_zero_copy_asio_impl::handle_send, -                shared_from_this(), mem -            ) -        ); -    } - -    //////////////////////////////////////////////////////////////////// -    #else /*USE_ASIO_ASYNC_SEND*/ -    //////////////////////////////////////////////////////////////////// -    void commit(void *mem, size_t len){ -        _socket->send(asio::buffer(mem, len)); -        handle_send(mem); +    void commit(udp_zero_copy_asio_msb *msb, size_t len){ +        ::send(_sock_fd, msb->cast<const char *>(), len, 0); +        handle_send(msb);      } -    //////////////////////////////////////////////////////////////////// -    #endif /*USE_ASIO_ASYNC_SEND*/ -    //////////////////////////////////////////////////////////////////// -      size_t get_num_send_frames(void) const {return _num_send_frames;}      size_t get_send_frame_size(void) const {return _send_frame_size;}  private:      //memory management -> buffers and fifos -    boost::thread_group _thread_group; -    buffer_pool::sptr _send_buffer_pool, _recv_buffer_pool; -    typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; -    pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;      const size_t _recv_frame_size, _num_recv_frames;      const size_t _send_frame_size, _num_send_frames; +    buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; +    bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; +    bounded_buffer<udp_zero_copy_asio_msb *> _pending_send_buffs; +    std::vector<udp_zero_copy_asio_msb::sptr> _msb_pool; +    std::vector<udp_zero_copy_asio_mrb::sptr> _mrb_pool;      //asio guts -> socket and service -    size_t                  _concurrency_hint;      asio::io_service        _io_service;      asio::ip::udp::socket   *_socket; -    asio::io_service::work  *_work;      int                     _sock_fd;  }; @@ -346,7 +307,5 @@ udp_zero_copy::sptr udp_zero_copy::make(      resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");      resize_buff_helper<asio::socket_base::send_buffer_size>   (udp_trans, send_buff_size, "send"); -    udp_trans->init(); //buffers resized -> call init() to use -      return udp_trans;  } diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp index c535edd04..795d5bc62 100644 --- a/host/lib/transport/vrt_packet_handler.hpp +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -67,13 +67,17 @@ template <typename T> UHD_INLINE T get_context_code(          std::vector<const boost::uint8_t *> copy_buffs;          size_t size_of_copy_buffs;          size_t fragment_offset_in_samps; +        std::vector<void *> io_buffs; +        uhd::convert::input_type otw_buffs;          recv_state(size_t width = 1):              width(width),              managed_buffs(width),              copy_buffs(width, NULL),              size_of_copy_buffs(0), -            fragment_offset_in_samps(0) +            fragment_offset_in_samps(0), +            io_buffs(0), //resized later +            otw_buffs(1) //always 1 for now          {              /* NOP */          } @@ -144,7 +148,7 @@ template <typename T> UHD_INLINE T get_context_code(       ******************************************************************/      static UHD_INLINE size_t _recv1(          recv_state &state, -        const std::vector<void *> &buffs, +        const uhd::device::recv_buffs_type &buffs,          size_t offset_bytes,          size_t total_samps,          uhd::rx_metadata_t &metadata, @@ -192,17 +196,16 @@ template <typename T> UHD_INLINE T get_context_code(          size_t bytes_to_copy = nsamps_to_copy*bytes_per_item;          size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff; -        std::vector<void *> io_buffs(chans_per_otw_buff); -        for (size_t i = 0; i < state.width; i+=chans_per_otw_buff){ +        for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){              //fill a vector with pointers to the io buffers              for (size_t j = 0; j < chans_per_otw_buff; j++){ -                io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; +                state.io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes;              }              //copy-convert the samples from the recv buffer -            uhd::convert::input_type otw_buffs(1, state.copy_buffs[i]); -            converter(otw_buffs, io_buffs, nsamps_to_copy_per_io_buff); +            state.otw_buffs[0] = state.copy_buffs[i]; +            converter(state.otw_buffs, state.io_buffs, nsamps_to_copy_per_io_buff);              //update the rx copy buffer to reflect the bytes copied              state.copy_buffs[i] += bytes_to_copy; @@ -223,7 +226,7 @@ template <typename T> UHD_INLINE T get_context_code(       ******************************************************************/      static UHD_INLINE size_t recv(          recv_state &state, -        const std::vector<void *> &buffs, +        const uhd::device::recv_buffs_type &buffs,          const size_t total_num_samps,          uhd::rx_metadata_t &metadata,          uhd::device::recv_mode_t recv_mode, @@ -236,6 +239,8 @@ template <typename T> UHD_INLINE T get_context_code(          size_t vrt_header_offset_words32 = 0,          size_t chans_per_otw_buff = 1      ){ +        state.io_buffs.resize(chans_per_otw_buff); +          uhd::convert::function_type converter(              uhd::convert::get_converter_otw_to_cpu(                  io_type, otw_type, 1, chans_per_otw_buff @@ -300,8 +305,20 @@ template <typename T> UHD_INLINE T get_context_code(      struct send_state{          //init the expected seq number          size_t next_packet_seq; - -        send_state(void) : next_packet_seq(0){ +        managed_send_buffs_t managed_buffs; +        const boost::uint64_t zeros; +        std::vector<const void *> zero_buffs; +        std::vector<const void *> io_buffs; +        uhd::convert::output_type otw_buffs; + +        send_state(size_t width = 1): +            next_packet_seq(0), +            managed_buffs(width), +            zeros(0), +            zero_buffs(width, &zeros), +            io_buffs(0), //resized later +            otw_buffs(1) //always 1 for now +        {              /* NOP */          }      }; @@ -312,7 +329,7 @@ template <typename T> UHD_INLINE T get_context_code(       ******************************************************************/      static UHD_INLINE size_t _send1(          send_state &state, -        const std::vector<const void *> &buffs, +        const uhd::device::send_buffs_type &buffs,          const size_t offset_bytes,          const size_t num_samps,          uhd::transport::vrt::if_packet_info_t &if_packet_info, @@ -326,29 +343,27 @@ template <typename T> UHD_INLINE T get_context_code(          if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t);          if_packet_info.packet_count = state.next_packet_seq; -        //get send buffers for each channel -        managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); -        if (not get_send_buffs(send_buffs)) return 0; +        //get send buffers for each otw channel +        if (not get_send_buffs(state.managed_buffs)) return 0; -        std::vector<const void *> io_buffs(chans_per_otw_buff);          for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){              //calculate pointers with offsets to io and otw memory              for (size_t j = 0; j < chans_per_otw_buff; j++){ -                io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; +                state.io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes;              } -            boost::uint32_t *otw_mem = send_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; +            boost::uint32_t *otw_mem = state.managed_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32;              //pack metadata into a vrt header              vrt_packer(otw_mem, if_packet_info);              otw_mem += if_packet_info.num_header_words32;              //copy-convert the samples into the send buffer -            uhd::convert::output_type otw_buffs(1, otw_mem); -            converter(io_buffs, otw_buffs, num_samps); +            state.otw_buffs[0] = otw_mem; +            converter(state.io_buffs, state.otw_buffs, num_samps);              //commit the samples to the zero-copy interface              size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); -            send_buffs[i]->commit(num_bytes_total); +            state.managed_buffs[i]->commit(num_bytes_total);          }          state.next_packet_seq++; //increment sequence after commits          return num_samps; @@ -359,7 +374,7 @@ template <typename T> UHD_INLINE T get_context_code(       ******************************************************************/      static UHD_INLINE size_t send(          send_state &state, -        const std::vector<const void *> &buffs, +        const uhd::device::send_buffs_type &buffs,          const size_t total_num_samps,          const uhd::tx_metadata_t &metadata,          uhd::device::send_mode_t send_mode, @@ -372,6 +387,8 @@ template <typename T> UHD_INLINE T get_context_code(          size_t vrt_header_offset_words32 = 0,          size_t chans_per_otw_buff = 1      ){ +        state.io_buffs.resize(chans_per_otw_buff); +          uhd::convert::function_type converter(              uhd::convert::get_converter_cpu_to_otw(                  io_type, otw_type, chans_per_otw_buff, 1 @@ -398,19 +415,11 @@ template <typename T> UHD_INLINE T get_context_code(              if_packet_info.sob = metadata.start_of_burst;              if_packet_info.eob = metadata.end_of_burst; -            //TODO remove this code when sample counts of zero are supported by hardware -            std::vector<const void *> buffs_(buffs); -            size_t total_num_samps_(total_num_samps); -            if (total_num_samps == 0){ -                static const boost::uint64_t zeros = 0; //max size of a host sample -                buffs_ = std::vector<const void *>(buffs.size(), &zeros); -                total_num_samps_ = 1; -            } -              return _send1(                  state, -                buffs_, 0, -                std::min(total_num_samps_, max_samples_per_packet), +                //TODO remove this code when sample counts of zero are supported by hardware +                (total_num_samps)?buffs : state.zero_buffs, 0, +                std::max<size_t>(1, std::min(total_num_samps, max_samples_per_packet)),                  if_packet_info,                  converter,                  vrt_packer, diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp index a5a864a04..b91eaae1d 100644 --- a/host/lib/transport/zero_copy.cpp +++ b/host/lib/transport/zero_copy.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -29,10 +29,9 @@ static void release_nop(void){  class safe_managed_receive_buffer : public managed_recv_buffer{  public:      safe_managed_receive_buffer( -        const boost::asio::const_buffer &buff, -        const release_fcn_t &release_fcn +        const void *buff, size_t size, const release_fcn_t &release_fcn      ): -        _buff(buff), _release_fcn(release_fcn) +        _buff(buff), _size(size), _release_fcn(release_fcn)      {          /* NOP */      } @@ -48,19 +47,23 @@ public:      }  private: -    const boost::asio::const_buffer &get(void) const{ +    const void *get_buff(void) const{          return _buff;      } -    const boost::asio::const_buffer _buff; +    size_t get_size(void) const{ +        return _size; +    } + +    const void *_buff; +    size_t _size;      release_fcn_t _release_fcn;  };  managed_recv_buffer::sptr managed_recv_buffer::make_safe( -    const boost::asio::const_buffer &buff, -    const release_fcn_t &release_fcn +    const void *buff, size_t size, const release_fcn_t &release_fcn  ){ -    return sptr(new safe_managed_receive_buffer(buff, release_fcn)); +    return sptr(new safe_managed_receive_buffer(buff, size, release_fcn));  }  /*********************************************************************** @@ -73,10 +76,9 @@ static void commit_nop(size_t){  class safe_managed_send_buffer : public managed_send_buffer{  public:      safe_managed_send_buffer( -        const boost::asio::mutable_buffer &buff, -        const commit_fcn_t &commit_fcn +        void *buff, size_t size, const commit_fcn_t &commit_fcn      ): -        _buff(buff), _commit_fcn(commit_fcn) +        _buff(buff), _size(size), _commit_fcn(commit_fcn)      {          /* NOP */      } @@ -92,17 +94,21 @@ public:      }  private: -    const boost::asio::mutable_buffer &get(void) const{ +    void *get_buff(void) const{          return _buff;      } -    const boost::asio::mutable_buffer _buff; +    size_t get_size(void) const{ +        return _size; +    } + +    void *_buff; +    size_t _size;      commit_fcn_t _commit_fcn;  };  safe_managed_send_buffer::sptr managed_send_buffer::make_safe( -    const boost::asio::mutable_buffer &buff, -    const commit_fcn_t &commit_fcn +    void *buff, size_t size, const commit_fcn_t &commit_fcn  ){ -    return sptr(new safe_managed_send_buffer(buff, commit_fcn)); +    return sptr(new safe_managed_send_buffer(buff, size, commit_fcn));  } diff --git a/host/lib/types/time_spec.cpp b/host/lib/types/time_spec.cpp index ece3b92f3..4a41f0fb9 100644 --- a/host/lib/types/time_spec.cpp +++ b/host/lib/types/time_spec.cpp @@ -99,7 +99,7 @@ time_spec_t::time_spec_t(time_t full_secs, double frac_secs):  time_spec_t::time_spec_t(time_t full_secs, long tick_count, double tick_rate):      _full_secs(full_secs), -    _frac_secs(double(tick_count)/tick_rate) +    _frac_secs(tick_count/tick_rate)  {      /* NOP */  } @@ -116,13 +116,11 @@ double time_spec_t::get_real_secs(void) const{  }  time_t time_spec_t::get_full_secs(void) const{ -    double intpart; -    std::modf(this->_frac_secs, &intpart); -    return this->_full_secs + time_t(intpart); +    return this->_full_secs + time_t(this->_frac_secs);  }  double time_spec_t::get_frac_secs(void) const{ -    return std::fmod(this->_frac_secs, 1.0); +    return this->_frac_secs - time_t(this->_frac_secs);  }  /*********************************************************************** diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index 52a7c6650..88cbab073 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -159,10 +159,8 @@ bool usrp1_impl::io_impl::get_send_buffs(      //calculate the buffer pointer and size given the offset      //references to the buffers are held in the bound function      buffs[0] = managed_send_buffer::make_safe( -        boost::asio::buffer( -            curr_buff->buff->cast<char *>() + curr_buff->offset, -            curr_buff->buff->size()         - curr_buff->offset -        ), +        curr_buff->buff->cast<char *>() + curr_buff->offset, +        curr_buff->buff->size()         - curr_buff->offset,          boost::bind(&usrp1_impl::io_impl::commit_send_buff, this, curr_buff, next_buff, _1)      ); @@ -222,7 +220,7 @@ size_t usrp1_impl::get_max_send_samps_per_packet(void) const {  }  size_t usrp1_impl::send( -    const std::vector<const void *> &buffs, size_t num_samps, +    const send_buffs_type &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type,      send_mode_t send_mode, double timeout  ){ @@ -300,7 +298,7 @@ size_t usrp1_impl::get_max_recv_samps_per_packet(void) const {  }  size_t usrp1_impl::recv( -    const std::vector<void *> &buffs, size_t num_samps, +    const recv_buffs_type &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type,      recv_mode_t recv_mode, double timeout  ){ diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index 246df93eb..e1b671811 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -39,7 +39,7 @@ public:      soft_time_ctrl_impl(const cb_fcn_type &stream_on_off):          _nsamps_remaining(0),          _stream_mode(stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS), -        _cmd_queue(bounded_buffer<boost::any>::make(2)), +        _cmd_queue(2),          _stream_on_off(stream_on_off)      {          //synchronously spawn a new thread @@ -112,7 +112,7 @@ public:      }      void issue_stream_cmd(const stream_cmd_t &cmd){ -        _cmd_queue->push_with_wait(cmd); +        _cmd_queue.push_with_wait(cmd);      }      void stream_on_off(bool enb){ @@ -180,7 +180,7 @@ public:          try{              boost::any cmd;              while (true){ -                _cmd_queue->pop_with_wait(cmd); +                _cmd_queue.pop_with_wait(cmd);                  recv_cmd_handle_cmd(boost::any_cast<stream_cmd_t>(cmd));              }          } catch(const boost::thread_interrupted &){} @@ -191,7 +191,7 @@ private:      size_t _nsamps_remaining;      stream_cmd_t::stream_mode_t _stream_mode;      time_spec_t _time_offset; -    bounded_buffer<boost::any>::sptr _cmd_queue; +    bounded_buffer<boost::any> _cmd_queue;      const cb_fcn_type _stream_on_off;      boost::thread_group _thread_group;  }; diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index 28199ebe3..1d9f6709f 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -80,13 +80,13 @@ public:      ~usrp1_impl(void);      //the io interface -    size_t send(const std::vector<const void *> &, +    size_t send(const send_buffs_type &,                  size_t,                  const uhd::tx_metadata_t &,                  const uhd::io_type_t &,                  send_mode_t, double); -    size_t recv(const std::vector<void *> &, +    size_t recv(const recv_buffs_type &,                  size_t, uhd::rx_metadata_t &,                  const uhd::io_type_t &,                  recv_mode_t, double); diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 30eaecae2..d09ce1871 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -32,6 +32,18 @@ using namespace uhd;  using namespace uhd::usrp;  using namespace uhd::transport;  namespace asio = boost::asio; +namespace pt = boost::posix_time; + +/*********************************************************************** + * helpers + **********************************************************************/ +static UHD_INLINE pt::time_duration to_time_dur(double timeout){ +    return pt::microseconds(long(timeout*1e6)); +} + +static UHD_INLINE double from_time_dur(const pt::time_duration &time_dur){ +    return 1e-6*time_dur.total_microseconds(); +}  /***********************************************************************   * constants @@ -61,6 +73,7 @@ public:          _last_seq_out = 0;          _last_seq_ack = 0;          _max_seqs_out = max_seqs_out; +        _ready_fcn = boost::bind(&flow_control_monitor::ready, this);      }      /*! @@ -73,11 +86,8 @@ public:          boost::this_thread::disable_interruption di; //disable because the wait can throw          boost::unique_lock<boost::mutex> lock(_fc_mutex);          _last_seq_out = seq; -        return _fc_cond.timed_wait( -            lock, -            boost::posix_time::microseconds(long(timeout*1e6)), -            boost::bind(&flow_control_monitor::ready, this) -        ); +        if (this->ready()) return true; +        return _fc_cond.timed_wait(lock, to_time_dur(timeout), _ready_fcn);      }      /*! @@ -99,6 +109,7 @@ private:      boost::mutex _fc_mutex;      boost::condition _fc_cond;      seq_type _last_seq_out, _last_seq_ack, _max_seqs_out; +    boost::function<bool(void)> _ready_fcn;  };  /*********************************************************************** @@ -110,11 +121,16 @@ private:   **********************************************************************/  struct usrp2_impl::io_impl{ -    io_impl(size_t send_frame_size, size_t width): -        packet_handler_recv_state(width), -        async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/)) +    io_impl(size_t send_frame_size, const std::vector<zero_copy_if::sptr> &xports): +        xports(xports), +        packet_handler_recv_state(xports.size()), +        packet_handler_send_state(xports.size()), +        async_msg_fifo(100/*messages deep*/)      { -        for (size_t i = 0; i < width; i++){ +        get_recv_buffs_fcn = boost::bind(&usrp2_impl::io_impl::get_recv_buffs, this, _1); +        get_send_buffs_fcn = boost::bind(&usrp2_impl::io_impl::get_send_buffs, this, _1); + +        for (size_t i = 0; i < xports.size(); i++){              fc_mons.push_back(flow_control_monitor::sptr(                  new flow_control_monitor(usrp2_impl::sram_bytes/send_frame_size)              )); @@ -135,31 +151,32 @@ struct usrp2_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_send_buffs( -        const std::vector<zero_copy_if::sptr> &trans, -        vrt_packet_handler::managed_send_buffs_t &buffs, -        double timeout -    ){ -        UHD_ASSERT_THROW(trans.size() == buffs.size()); +    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &buffs){ +        UHD_ASSERT_THROW(xports.size() == buffs.size());          //calculate the flow control word          const boost::uint32_t fc_word32 = packet_handler_send_state.next_packet_seq;          //grab a managed buffer for each index          for (size_t i = 0; i < buffs.size(); i++){ -            if (not fc_mons[i]->check_fc_condition(fc_word32, timeout)) return false; -            buffs[i] = trans[i]->get_send_buff(timeout); +            if (not fc_mons[i]->check_fc_condition(fc_word32, send_timeout)) return false; +            buffs[i] = xports[i]->get_send_buff(send_timeout);              if (not buffs[i].get()) return false;              buffs[i]->cast<boost::uint32_t *>()[0] = uhd::htonx(fc_word32);          }          return true;      } -    bool get_recv_buffs( -        const std::vector<zero_copy_if::sptr> &xports, -        vrt_packet_handler::managed_recv_buffs_t &buffs, -        double timeout -    ); +    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs); + +    const std::vector<zero_copy_if::sptr> &xports; + +    //timeouts set on calls to recv/send (passed into get buffs methods) +    double recv_timeout, send_timeout; + +    //bound callbacks for get buffs (bound once here, not in fast-path) +    vrt_packet_handler::get_recv_buffs_t get_recv_buffs_fcn; +    vrt_packet_handler::get_send_buffs_t get_send_buffs_fcn;      //previous state for each buffer      std::vector<vrt::if_packet_info_t> prev_infos; @@ -175,7 +192,7 @@ struct usrp2_impl::io_impl{      void recv_pirate_loop(zero_copy_if::sptr, usrp2_mboard_impl::sptr, size_t);      boost::thread_group recv_pirate_crew;      bool recv_pirate_crew_raiding; -    bounded_buffer<async_metadata_t>::sptr async_msg_fifo; +    bounded_buffer<async_metadata_t> async_msg_fifo;      boost::mutex spawn_mutex;  }; @@ -228,7 +245,7 @@ void usrp2_impl::io_impl::recv_pirate_loop(                  //print the famous U, and push the metadata into the message queue                  if (metadata.event_code & underflow_flags) std::cerr << "U" << std::flush;                  //else std::cout << "metadata.event_code " << metadata.event_code << std::endl; -                async_msg_fifo->push_with_pop_on_full(metadata); +                async_msg_fifo.push_with_pop_on_full(metadata);              }              else{                  //TODO unknown received packet, may want to print error... @@ -248,7 +265,7 @@ void usrp2_impl::io_init(void){      const size_t send_frame_size = _data_transports.front()->get_send_frame_size();      //create new io impl -    _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports.size())); +    _io_impl = UHD_PIMPL_MAKE(io_impl, (send_frame_size, _data_transports));      //create a new pirate thread for each zc if (yarr!!)      for (size_t i = 0; i < _data_transports.size(); i++){ @@ -274,7 +291,7 @@ bool usrp2_impl::recv_async_msg(      async_metadata_t &async_metadata, double timeout  ){      boost::this_thread::disable_interruption di; //disable because the wait can throw -    return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout); +    return _io_impl->async_msg_fifo.pop_with_timed_wait(async_metadata, timeout);  }  /*********************************************************************** @@ -291,10 +308,11 @@ size_t usrp2_impl::get_max_send_samps_per_packet(void) const{  }  size_t usrp2_impl::send( -    const std::vector<const void *> &buffs, size_t num_samps, +    const send_buffs_type &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type,      send_mode_t send_mode, double timeout  ){ +    _io_impl->send_timeout = timeout;      return vrt_packet_handler::send(          _io_impl->packet_handler_send_state,       //last state of the send handler          buffs, num_samps,                          //buffer to fill @@ -302,7 +320,7 @@ size_t usrp2_impl::send(          io_type, _tx_otw_type,                     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_pack_be, -        boost::bind(&usrp2_impl::io_impl::get_send_buffs, _io_impl.get(), _data_transports, _1, timeout), +        _io_impl->get_send_buffs_fcn,          get_max_send_samps_per_packet(),          vrt_send_header_offset_words32      ); @@ -311,14 +329,6 @@ size_t usrp2_impl::send(  /***********************************************************************   * Alignment logic on receive   **********************************************************************/ -static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ -    return boost::posix_time::microseconds(long(timeout*1e6)); -} - -static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ -    return 1e-6*time_dur.total_microseconds(); -} -  static UHD_INLINE time_spec_t extract_time_spec(      const vrt::if_packet_info_t &packet_info  ){ @@ -360,12 +370,10 @@ static UHD_INLINE bool handle_msg_packet(  }  UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( -    const std::vector<zero_copy_if::sptr> &xports, -    vrt_packet_handler::managed_recv_buffs_t &buffs, -    double timeout +    vrt_packet_handler::managed_recv_buffs_t &buffs  ){      if (buffs.size() == 1){ -        buffs[0] = xports[0]->get_recv_buff(timeout); +        buffs[0] = xports[0]->get_recv_buff(recv_timeout);          if (buffs[0].get() == NULL) return false;          bool clear, msg; time_spec_t time; //unused variables          //call extract_packet_info to handle printing the overflows @@ -373,7 +381,7 @@ UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs(          return true;      }      //-------------------- begin alignment logic ---------------------// -    boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout); +    boost::system_time exit_time = boost::get_system_time() + to_time_dur(recv_timeout);      managed_recv_buffer::sptr buff_tmp;      std::list<size_t> _all_indexes, indexes_to_do;      for (size_t i = 0; i < buffs.size(); i++) _all_indexes.push_back(i); @@ -454,10 +462,11 @@ static void handle_overflow(std::vector<usrp2_mboard_impl::sptr> &mboards, size_  }  size_t usrp2_impl::recv( -    const std::vector<void *> &buffs, size_t num_samps, +    const recv_buffs_type &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type,      recv_mode_t recv_mode, double timeout  ){ +    _io_impl->recv_timeout = timeout;      return vrt_packet_handler::recv(          _io_impl->packet_handler_recv_state,       //last state of the recv handler          buffs, num_samps,                          //buffer to fill @@ -465,7 +474,7 @@ size_t usrp2_impl::recv(          io_type, _rx_otw_type,                     //input and output types to convert          _mboards.front()->get_master_clock_freq(), //master clock tick rate          uhd::transport::vrt::if_hdr_unpack_be, -        boost::bind(&usrp2_impl::io_impl::get_recv_buffs, _io_impl.get(), _data_transports, _1, timeout), -        boost::bind(&handle_overflow, _mboards, _1) +        _io_impl->get_recv_buffs_fcn, +        boost::bind(&handle_overflow, boost::ref(_mboards), _1)      );  } diff --git a/host/lib/usrp/usrp2/usrp2_iface.cpp b/host/lib/usrp/usrp2/usrp2_iface.cpp index 149c5011f..4407a3011 100644 --- a/host/lib/usrp/usrp2/usrp2_iface.cpp +++ b/host/lib/usrp/usrp2/usrp2_iface.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -42,6 +42,7 @@ public:   **********************************************************************/      usrp2_iface_impl(udp_simple::sptr ctrl_transport){          _ctrl_transport = ctrl_transport; +        _ctrl_seq_num = 0;          mb_eeprom = mboard_eeprom_t(*this, mboard_eeprom_t::MAP_N100);          switch(this->get_rev()){ diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index ad95b2a4a..337f842d6 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -200,12 +200,12 @@ public:      //the io interface      size_t send( -        const std::vector<const void *> &, size_t, +        const send_buffs_type &, size_t,          const uhd::tx_metadata_t &, const uhd::io_type_t &,          uhd::device::send_mode_t, double      );      size_t recv( -        const std::vector<void *> &, size_t, +        const recv_buffs_type &, size_t,          uhd::rx_metadata_t &, const uhd::io_type_t &,          uhd::device::recv_mode_t, double      ); diff --git a/host/lib/usrp/usrp_e100/io_impl.cpp b/host/lib/usrp/usrp_e100/io_impl.cpp index 2388482c7..cf2410e4f 100644 --- a/host/lib/usrp/usrp_e100/io_impl.cpp +++ b/host/lib/usrp/usrp_e100/io_impl.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -54,8 +54,8 @@ struct usrp_e100_impl::io_impl{      bool continuous_streaming;      io_impl(usrp_e100_iface::sptr iface):          data_xport(usrp_e100_make_mmap_zero_copy(iface)), -        recv_pirate_booty(recv_booty_type::make(data_xport->get_num_recv_frames())), -        async_msg_fifo(bounded_buffer<async_metadata_t>::make(100/*messages deep*/)) +        recv_pirate_booty(data_xport->get_num_recv_frames()), +        async_msg_fifo(100/*messages deep*/)      {          /* NOP */      } @@ -69,14 +69,13 @@ struct usrp_e100_impl::io_impl{      bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs, double timeout){          UHD_ASSERT_THROW(buffs.size() == 1);          boost::this_thread::disable_interruption di; //disable because the wait can throw -        return recv_pirate_booty->pop_with_timed_wait(buffs.front(), timeout); +        return recv_pirate_booty.pop_with_timed_wait(buffs.front(), timeout);      }      //a pirate's life is the life for me!      void recv_pirate_loop(usrp_e100_clock_ctrl::sptr); -    typedef bounded_buffer<managed_recv_buffer::sptr> recv_booty_type; -    recv_booty_type::sptr recv_pirate_booty; -    bounded_buffer<async_metadata_t>::sptr async_msg_fifo; +    bounded_buffer<managed_recv_buffer::sptr> recv_pirate_booty; +    bounded_buffer<async_metadata_t> async_msg_fifo;      boost::thread_group recv_pirate_crew;      bool recv_pirate_crew_raiding;  }; @@ -124,12 +123,12 @@ void usrp_e100_impl::io_impl::recv_pirate_loop(usrp_e100_clock_ctrl::sptr clock_                  //print the famous U, and push the metadata into the message queue                  if (metadata.event_code & underflow_flags) std::cerr << "U" << std::flush; -                async_msg_fifo->push_with_pop_on_full(metadata); +                async_msg_fifo.push_with_pop_on_full(metadata);                  continue;              }              //same number of frames as the data transport -> always immediate -            recv_pirate_booty->push_with_wait(buff); +            recv_pirate_booty.push_with_wait(buff);          }catch(const std::exception &e){              std::cerr << "Error (usrp-e recv pirate loop): " << e.what() << std::endl; @@ -213,7 +212,7 @@ size_t usrp_e100_impl::get_max_send_samps_per_packet(void) const{  }  size_t usrp_e100_impl::send( -    const std::vector<const void *> &buffs, size_t num_samps, +    const send_buffs_type &buffs, size_t num_samps,      const tx_metadata_t &metadata, const io_type_t &io_type,      send_mode_t send_mode, double timeout  ){ @@ -243,7 +242,7 @@ size_t usrp_e100_impl::get_max_recv_samps_per_packet(void) const{  }  size_t usrp_e100_impl::recv( -    const std::vector<void *> &buffs, size_t num_samps, +    const recv_buffs_type &buffs, size_t num_samps,      rx_metadata_t &metadata, const io_type_t &io_type,      recv_mode_t recv_mode, double timeout  ){ @@ -266,5 +265,5 @@ bool usrp_e100_impl::recv_async_msg(      async_metadata_t &async_metadata, double timeout  ){      boost::this_thread::disable_interruption di; //disable because the wait can throw -    return _io_impl->async_msg_fifo->pop_with_timed_wait(async_metadata, timeout); +    return _io_impl->async_msg_fifo.pop_with_timed_wait(async_metadata, timeout);  } diff --git a/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp b/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp index df8e5dc9f..2dc401305 100644 --- a/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp +++ b/host/lib/usrp/usrp_e100/usrp_e100_impl.hpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -83,8 +83,8 @@ public:      ~usrp_e100_impl(void);      //the io interface -    size_t send(const std::vector<const void *> &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, send_mode_t, double); -    size_t recv(const std::vector<void *> &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, recv_mode_t, double); +    size_t send(const send_buffs_type &, size_t, const uhd::tx_metadata_t &, const uhd::io_type_t &, send_mode_t, double); +    size_t recv(const recv_buffs_type &, size_t, uhd::rx_metadata_t &, const uhd::io_type_t &, recv_mode_t, double);      bool recv_async_msg(uhd::async_metadata_t &, double);      size_t get_max_send_samps_per_packet(void) const;      size_t get_max_recv_samps_per_packet(void) const; diff --git a/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp b/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp index bf378a9b1..4e0137fdb 100644 --- a/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp +++ b/host/lib/usrp/usrp_e100/usrp_e100_mmap_zero_copy.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -23,7 +23,6 @@  #include <unistd.h> //getpagesize  #include <poll.h> //poll  #include <boost/bind.hpp> -#include <boost/enable_shared_from_this.hpp>  #include <iostream>  using namespace uhd; @@ -36,7 +35,7 @@ static const size_t poll_breakout = 10; //how many poll timeouts constitute a fu  /***********************************************************************   * The zero copy interface implementation   **********************************************************************/ -class usrp_e100_mmap_zero_copy_impl : public zero_copy_if, public boost::enable_shared_from_this<usrp_e100_mmap_zero_copy_impl> { +class usrp_e100_mmap_zero_copy_impl : public zero_copy_if{  public:      usrp_e100_mmap_zero_copy_impl(usrp_e100_iface::sptr iface):          _fd(iface->get_file_descriptor()), _recv_index(0), _send_index(0) @@ -125,8 +124,8 @@ public:          //return the managed buffer for this frame          if (fp_verbose) std::cout << "  make_recv_buff: " << info->len << std::endl;          return managed_recv_buffer::make_safe( -            boost::asio::const_buffer(mem, info->len), -            boost::bind(&usrp_e100_mmap_zero_copy_impl::release, shared_from_this(), info) +            mem, info->len, +            boost::bind(&usrp_e100_mmap_zero_copy_impl::release, this, info)          );      } @@ -161,8 +160,8 @@ public:          //return the managed buffer for this frame          if (fp_verbose) std::cout << "  make_send_buff: " << _frame_size << std::endl;          return managed_send_buffer::make_safe( -            boost::asio::mutable_buffer(mem, _frame_size), -            boost::bind(&usrp_e100_mmap_zero_copy_impl::commit, shared_from_this(), info, _1) +            mem, _frame_size, +            boost::bind(&usrp_e100_mmap_zero_copy_impl::commit, this, info, _1)          );      } diff --git a/host/tests/buffer_test.cpp b/host/tests/buffer_test.cpp index e7bc88699..23b52a9bf 100644 --- a/host/tests/buffer_test.cpp +++ b/host/tests/buffer_test.cpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -25,40 +25,40 @@ using namespace uhd::transport;  static const double timeout = 0.01/*secs*/;  BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_timed_wait){ -    bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3)); +    bounded_buffer<int> bb(3);      //push elements, check for timeout -    BOOST_CHECK(bb->push_with_timed_wait(0, timeout)); -    BOOST_CHECK(bb->push_with_timed_wait(1, timeout)); -    BOOST_CHECK(bb->push_with_timed_wait(2, timeout)); -    BOOST_CHECK(not bb->push_with_timed_wait(3, timeout)); +    BOOST_CHECK(bb.push_with_timed_wait(0, timeout)); +    BOOST_CHECK(bb.push_with_timed_wait(1, timeout)); +    BOOST_CHECK(bb.push_with_timed_wait(2, timeout)); +    BOOST_CHECK(not bb.push_with_timed_wait(3, timeout));      int val;      //pop elements, check for timeout and check values -    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(bb.pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 0); -    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(bb.pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 1); -    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(bb.pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 2); -    BOOST_CHECK(not bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(not bb.pop_with_timed_wait(val, timeout));  }  BOOST_AUTO_TEST_CASE(test_bounded_buffer_with_pop_on_full){ -    bounded_buffer<int>::sptr bb(bounded_buffer<int>::make(3)); +    bounded_buffer<int> bb(3);      //push elements, check for timeout -    BOOST_CHECK(bb->push_with_pop_on_full(0)); -    BOOST_CHECK(bb->push_with_pop_on_full(1)); -    BOOST_CHECK(bb->push_with_pop_on_full(2)); -    BOOST_CHECK(not bb->push_with_pop_on_full(3)); +    BOOST_CHECK(bb.push_with_pop_on_full(0)); +    BOOST_CHECK(bb.push_with_pop_on_full(1)); +    BOOST_CHECK(bb.push_with_pop_on_full(2)); +    BOOST_CHECK(not bb.push_with_pop_on_full(3));      int val;      //pop elements, check for timeout and check values -    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(bb.pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 1); -    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(bb.pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 2); -    BOOST_CHECK(bb->pop_with_timed_wait(val, timeout)); +    BOOST_CHECK(bb.pop_with_timed_wait(val, timeout));      BOOST_CHECK_EQUAL(val, 3);  } | 
