diff options
| author | Josh Blum <josh@joshknows.com> | 2010-10-08 11:20:30 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2010-10-08 11:20:30 -0700 | 
| commit | e8e9258acb8ce8499c40de52abaa4e6ba83d479d (patch) | |
| tree | 622fc5a2d23dd56e1228679b9dbf1b674881833f | |
| parent | 201af62ccb4d1065a44822f0f3ce4c81755510b5 (diff) | |
| download | uhd-e8e9258acb8ce8499c40de52abaa4e6ba83d479d.tar.gz uhd-e8e9258acb8ce8499c40de52abaa4e6ba83d479d.tar.bz2 uhd-e8e9258acb8ce8499c40de52abaa4e6ba83d479d.zip | |
udp: worked blocking send back into udp transport, enable async with #define
tweaked some code in bounded buffer
| -rw-r--r-- | host/include/uhd/transport/alignment_buffer.ipp | 8 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 39 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 114 | 
3 files changed, 99 insertions, 62 deletions
| diff --git a/host/include/uhd/transport/alignment_buffer.ipp b/host/include/uhd/transport/alignment_buffer.ipp index 5f09de0d9..833b5d399 100644 --- a/host/include/uhd/transport/alignment_buffer.ipp +++ b/host/include/uhd/transport/alignment_buffer.ipp @@ -54,14 +54,14 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          UHD_INLINE bool pop_elems_with_timed_wait(              std::vector<elem_type> &elems, double timeout          ){ -            boost::system_time exit_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1e6)); +            boost::system_time exit_time = boost::get_system_time() + to_time_dur(timeout);              buff_contents_type buff_contents_tmp;              std::list<size_t> indexes_to_do(_all_indexes);              //do an initial pop to load an initial sequence id              size_t index = indexes_to_do.front();              if (not _buffs[index]->pop_with_timed_wait( -                buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() +                buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())              )) return false;              elems[index] = buff_contents_tmp.first;              seq_type expected_seq_id = buff_contents_tmp.second; @@ -76,7 +76,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/                      indexes_to_do = _all_indexes;                      index = indexes_to_do.front();                      if (not _buffs[index]->pop_with_timed_wait( -                        buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() +                        buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())                      )) return false;                      elems[index] = buff_contents_tmp.first;                      expected_seq_id = buff_contents_tmp.second; @@ -86,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/                  //pop an element off for this index                  index = indexes_to_do.front();                  if (not _buffs[index]->pop_with_timed_wait( -                    buff_contents_tmp, 1e-6*(exit_time - boost::get_system_time()).total_microseconds() +                    buff_contents_tmp, from_time_dur(exit_time - boost::get_system_time())                  )) return false;                  //if the sequence id matches: diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 58f78bab4..edc7faa06 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -19,18 +19,28 @@  #define INCLUDED_UHD_TRANSPORT_BOUNDED_BUFFER_IPP  #include <boost/bind.hpp> +#include <boost/function.hpp>  #include <boost/circular_buffer.hpp>  #include <boost/thread/condition.hpp>  #include <boost/date_time/posix_time/posix_time_types.hpp>  namespace uhd{ namespace transport{ namespace{ /*anon*/ +    static UHD_INLINE boost::posix_time::time_duration to_time_dur(double timeout){ +        return boost::posix_time::microseconds(long(timeout*1e6)); +    } + +    static UHD_INLINE double from_time_dur(const boost::posix_time::time_duration &time_dur){ +        return 1e-6*time_dur.total_microseconds(); +    } +      template <typename elem_type>      class bounded_buffer_impl : public bounded_buffer<elem_type>{      public:          bounded_buffer_impl(size_t capacity) : _buffer(capacity){ -            /* NOP */ +            _not_full_fcn = boost::bind(&bounded_buffer_impl<elem_type>::not_full, this); +            _not_empty_fcn = boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this);          }          UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){ @@ -52,17 +62,16 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          UHD_INLINE void push_with_wait(const elem_type &elem){              boost::unique_lock<boost::mutex> lock(_mutex); -            _full_cond.wait(lock, boost::bind(&bounded_buffer_impl<elem_type>::not_full, this)); +            _full_cond.wait(lock, _not_full_fcn);              _buffer.push_front(elem);              lock.unlock();              _empty_cond.notify_one();          } -        bool push_with_timed_wait(const elem_type &elem, double timeout){ +        UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){              boost::unique_lock<boost::mutex> lock(_mutex);              if (not _full_cond.timed_wait( -                lock, boost::posix_time::microseconds(long(timeout*1e6)), -                boost::bind(&bounded_buffer_impl<elem_type>::not_full, this) +                lock, to_time_dur(timeout), _not_full_fcn              )) return false;              _buffer.push_front(elem);              lock.unlock(); @@ -72,19 +81,18 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          UHD_INLINE void pop_with_wait(elem_type &elem){              boost::unique_lock<boost::mutex> lock(_mutex); -            _empty_cond.wait(lock, boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this)); -            this->pop_back(elem); +            _empty_cond.wait(lock, _not_empty_fcn); +            elem = this->pop_back();              lock.unlock();              _full_cond.notify_one();          } -        bool pop_with_timed_wait(elem_type &elem, double timeout){ +        UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){              boost::unique_lock<boost::mutex> lock(_mutex);              if (not _empty_cond.timed_wait( -                lock, boost::posix_time::microseconds(long(timeout*1e6)), -                boost::bind(&bounded_buffer_impl<elem_type>::not_empty, this) +                lock, to_time_dur(timeout), _not_empty_fcn              )) return false; -            this->pop_back(elem); +            elem = this->pop_back();              lock.unlock();              _full_cond.notify_one();              return true; @@ -92,7 +100,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          UHD_INLINE void clear(void){              boost::unique_lock<boost::mutex> lock(_mutex); -            while (not_empty()) _buffer.pop_back(); +            while (not_empty()) this->pop_back();              lock.unlock();              _full_cond.notify_one();          } @@ -105,16 +113,19 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          bool not_full(void) const{return not _buffer.full();}          bool not_empty(void) const{return not _buffer.empty();} +        boost::function<bool(void)> _not_full_fcn, _not_empty_fcn; +          /*!           * Three part operation to pop an element:           * 1) assign elem to the back element           * 2) assign the back element to empty           * 3) pop the back to move the counter           */ -        UHD_INLINE void pop_back(elem_type &elem){ -            elem = _buffer.back(); +        UHD_INLINE elem_type pop_back(void){ +            elem_type elem = _buffer.back();              _buffer.back() = elem_type();              _buffer.pop_back(); +            return elem;          }      };  }}} //namespace diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 40fe3fa0f..d84aeefdd 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -39,6 +39,10 @@ namespace asio = boost::asio;  //Otherwise, get_recv_buff uses a blocking receive with timeout.  //#define USE_ASIO_ASYNC_RECV +//Define this to the the boost async io calls to perform send. +//Otherwise, the commit callback uses a blocking send. +//#define USE_ASIO_ASYNC_SEND +  //enough buffering for half a second of samples at full rate on usrp2  static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(4 * 25e6 * 0.5); @@ -55,7 +59,13 @@ static const size_t DEFAULT_NUM_RECV_FRAMES = 32;  #else  static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu;  #endif +//The non-async send only ever requires a single frame +//because the buffer will be committed before a new get. +#ifdef USE_ASIO_ASYNC_SEND  static const size_t DEFAULT_NUM_SEND_FRAMES = 32; +#else +static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;; +#endif  //a single concurrent thread for io_service seems to be the fastest  static const size_t CONCURRENCY_HINT = 1; @@ -143,6 +153,12 @@ public:          return get_buff_size<Opt>();      } +    //! handle a recv callback -> push the filled memory into the fifo +    UHD_INLINE void handle_recv(void *mem, size_t len){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); +    } +      ////////////////////////////////////////////////////////////////////      #ifdef USE_ASIO_ASYNC_RECV      //////////////////////////////////////////////////////////////////// @@ -162,16 +178,10 @@ public:          return managed_recv_buffer::sptr();      } -    //! handle a recv callback -> push the filled memory into the fifo -    void handle_recv(void *mem, size_t len){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); -    } -      //! release a recv buffer -> start an async recv on the buffer      void release(void *mem){          _socket->async_receive( -            boost::asio::buffer(mem, _recv_frame_size), +            boost::asio::buffer(mem, this->get_recv_frame_size()),              boost::bind(                  &udp_zero_copy_asio_impl::handle_recv,                  shared_from_this(), mem, @@ -185,6 +195,7 @@ public:      ////////////////////////////////////////////////////////////////////      managed_recv_buffer::sptr get_recv_buff(double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw +        asio::mutable_buffer buff;          //setup timeval for timeout          timeval tv; @@ -196,34 +207,30 @@ public:          FD_ZERO(&rset);          FD_SET(_sock_fd, &rset); -        //call select to perform timed wait -        if (::select(_sock_fd+1, &rset, NULL, NULL, &tv) <= 0) -            return managed_recv_buffer::sptr(); - -        //grab an available buffer -        asio::mutable_buffer buff; -        if (not _pending_recv_buffs->pop_with_timed_wait(buff, timeout)) -            return managed_recv_buffer::sptr(); - -        //receive and return the buffer -        return managed_recv_buffer::make_safe( -            asio::buffer( -                boost::asio::buffer_cast<void *>(buff), -                _socket->receive(boost::asio::buffer(buff)) -            ), -            boost::bind( -                &udp_zero_copy_asio_impl::release, -                shared_from_this(), -                asio::buffer_cast<void*>(buff) -            ) -        ); +        //call select to perform timed wait and grab an available buffer with wait +        //if the condition is true, call receive and return the managed buffer +        if ( +            ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and +            _pending_recv_buffs->pop_with_timed_wait(buff, timeout) +        ){ +            return managed_recv_buffer::make_safe( +                asio::buffer( +                    boost::asio::buffer_cast<void *>(buff), +                    _socket->receive(asio::buffer(buff)) +                ), +                boost::bind( +                    &udp_zero_copy_asio_impl::release, +                    shared_from_this(), +                    asio::buffer_cast<void*>(buff) +                ) +            ); +        } +        return managed_recv_buffer::sptr();      }      void release(void *mem){          boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_recv_buffs->push_with_wait( -            boost::asio::buffer(mem, this->get_recv_frame_size()) -        ); +        handle_recv(mem, this->get_recv_frame_size());      }      //////////////////////////////////////////////////////////////////// @@ -233,6 +240,12 @@ public:      size_t get_num_recv_frames(void) const {return _num_recv_frames;}      size_t get_recv_frame_size(void) const {return _recv_frame_size;} +    //! handle a send callback -> push the emptied memory into the fifo +    UHD_INLINE void handle_send(void *mem){ +        boost::this_thread::disable_interruption di; //disable because the wait can throw +        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); +    } +      //! pop an empty send buffer off of the fifo and bind with the commit callback      managed_send_buffer::sptr get_send_buff(double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -249,12 +262,9 @@ public:          return managed_send_buffer::sptr();      } -    //! handle a send callback -> push the emptied memory into the fifo -    void handle_send(void *mem){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, _send_frame_size)); -    } - +    //////////////////////////////////////////////////////////////////// +    #ifdef USE_ASIO_ASYNC_SEND +    ////////////////////////////////////////////////////////////////////      //! commit a send buffer -> start an async send on the buffer      void commit(void *mem, size_t len){          _socket->async_send( @@ -266,6 +276,18 @@ public:          );      } +    //////////////////////////////////////////////////////////////////// +    #else /*USE_ASIO_ASYNC_SEND*/ +    //////////////////////////////////////////////////////////////////// +    void commit(void *mem, size_t len){ +        _socket->send(asio::buffer(mem, len)); +        handle_send(mem); +    } + +    //////////////////////////////////////////////////////////////////// +    #endif /*USE_ASIO_ASYNC_SEND*/ +    //////////////////////////////////////////////////////////////////// +      size_t get_num_send_frames(void) const {return _num_send_frames;}      size_t get_send_frame_size(void) const {return _send_frame_size;} @@ -297,6 +319,13 @@ template<typename Opt> static void resize_buff_helper(      if (name == "recv") min_sock_buff_size = MIN_RECV_SOCK_BUFF_SIZE;      if (name == "send") min_sock_buff_size = MIN_SEND_SOCK_BUFF_SIZE; +    std::string help_message; +    #if defined(UHD_PLATFORM_LINUX) +        help_message = str(boost::format( +            "Please run: sudo sysctl -w net.core.%smem_max=%d\n" +        ) % ((name == "recv")?"r":"w") % min_sock_buff_size); +    #endif /*defined(UHD_PLATFORM_LINUX)*/ +      //resize the buffer if size was provided      if (target_size > 0){          size_t actual_size = udp_trans->resize_buff<Opt>(target_size); @@ -308,13 +337,10 @@ template<typename Opt> static void resize_buff_helper(              "Current %s sock buff size: %d bytes"          ) % name % actual_size << std::endl;          if (actual_size < target_size) uhd::print_warning(str(boost::format( -            "The %1% buffer is smaller than the requested size.\n" -            "The minimum recommended buffer size is %2% bytes.\n" -            "See the transport application notes on buffer resizing.\n" -            #if defined(UHD_PLATFORM_LINUX) -            "Please run: sudo sysctl -w net.core.rmem_max=%2%\n" -            #endif /*defined(UHD_PLATFORM_LINUX)*/ -        ) % name % min_sock_buff_size)); +            "The %s buffer is smaller than the requested size.\n" +            "The minimum recommended buffer size is %d bytes.\n" +            "See the transport application notes on buffer resizing.\n%s" +        ) % name % min_sock_buff_size % help_message));      }      //only enable on platforms that are happy with the large buffer resize | 
