diff options
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 9 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 11 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 44 | 
3 files changed, 43 insertions, 21 deletions
| diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index aca93b071..33ded8cfd 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -66,6 +66,13 @@ namespace uhd{ namespace transport{          virtual bool push_with_timed_wait(const elem_type &elem, double timeout) = 0;          /*! +         * Pop an element from the bounded_buffer immediately. +         * \param elem the element reference pop to +         * \return false when the bounded_buffer is empty +         */ +        virtual bool pop_with_haste(elem_type &elem) = 0; + +        /*!           * Pop an element from the bounded_buffer.           * Wait until the bounded_buffer becomes non-empty.           * \param elem the element reference pop to diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 4fbe3f085..7bf182ee3 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -1,5 +1,5 @@  // -// Copyright 2010 Ettus Research LLC +// Copyright 2010-2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -72,6 +72,15 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              return true;          } +        UHD_INLINE bool pop_with_haste(elem_type &elem){ +            boost::mutex::scoped_lock lock(_mutex); +            if(_buffer.empty()) return false; +            elem = this->pop_back(); +            lock.unlock(); +            _full_cond.notify_one(); +            return true; +        } +          UHD_INLINE void pop_with_wait(elem_type &elem){              boost::mutex::scoped_lock lock(_mutex);              _empty_cond.wait(lock, _not_empty_fcn); diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 697e172cd..f083b97d8 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -34,7 +34,7 @@ namespace asio = boost::asio;  //Define this to the the boost async io calls to perform receive.  //Otherwise, get_recv_buff uses a blocking receive with timeout. -#define USE_ASIO_ASYNC_RECV +//#define USE_ASIO_ASYNC_RECV  //Define this to the the boost async io calls to perform send.  //Otherwise, the commit callback uses a blocking send. @@ -138,8 +138,7 @@ public:      //! handle a recv callback -> push the filled memory into the fifo      UHD_INLINE void handle_recv(void *mem, size_t len){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); +        _pending_recv_buffs->push_with_pop_on_full(boost::asio::buffer(mem, len));      }      //////////////////////////////////////////////////////////////////// @@ -152,8 +151,7 @@ public:          if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){              return managed_recv_buffer::make_safe(                  buff, boost::bind( -                    &udp_zero_copy_asio_impl::release, -                    shared_from_this(), +                    &udp_zero_copy_asio_impl::release, this,                      asio::buffer_cast<void*>(buff)                  )              ); @@ -177,7 +175,6 @@ public:      #else /*USE_ASIO_ASYNC_RECV*/      ////////////////////////////////////////////////////////////////////      managed_recv_buffer::sptr get_recv_buff(double timeout){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw          asio::mutable_buffer buff;          //setup timeval for timeout @@ -190,11 +187,11 @@ public:          FD_ZERO(&rset);          FD_SET(_sock_fd, &rset); -        //call select to perform timed wait and grab an available buffer with wait +        //call select to perform timed wait and grab an available buffer now          //if the condition is true, call receive and return the managed buffer          if ( -            ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and -            _pending_recv_buffs->pop_with_timed_wait(buff, timeout) +            ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 +            and _pending_recv_buffs->pop_with_haste(buff)          ){              return managed_recv_buffer::make_safe(                  asio::buffer( @@ -202,8 +199,7 @@ public:                      _socket->receive(asio::buffer(buff))                  ),                  boost::bind( -                    &udp_zero_copy_asio_impl::release, -                    shared_from_this(), +                    &udp_zero_copy_asio_impl::release, this,                      asio::buffer_cast<void*>(buff)                  )              ); @@ -212,7 +208,6 @@ public:      }      void release(void *mem){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw          handle_recv(mem, this->get_recv_frame_size());      } @@ -225,10 +220,12 @@ public:      //! handle a send callback -> push the emptied memory into the fifo      UHD_INLINE void handle_send(void *mem){ -        boost::this_thread::disable_interruption di; //disable because the wait can throw -        _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); +        _pending_send_buffs->push_with_pop_on_full(boost::asio::buffer(mem, this->get_send_frame_size()));      } +    //////////////////////////////////////////////////////////////////// +    #ifdef USE_ASIO_ASYNC_SEND +    ////////////////////////////////////////////////////////////////////      //! pop an empty send buffer off of the fifo and bind with the commit callback      managed_send_buffer::sptr get_send_buff(double timeout){          boost::this_thread::disable_interruption di; //disable because the wait can throw @@ -236,8 +233,7 @@ public:          if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){              return managed_send_buffer::make_safe(                  buff, boost::bind( -                    &udp_zero_copy_asio_impl::commit, -                    shared_from_this(), +                    &udp_zero_copy_asio_impl::commit, this,                      asio::buffer_cast<void*>(buff), _1                  )              ); @@ -245,9 +241,6 @@ public:          return managed_send_buffer::sptr();      } -    //////////////////////////////////////////////////////////////////// -    #ifdef USE_ASIO_ASYNC_SEND -    ////////////////////////////////////////////////////////////////////      //! commit a send buffer -> start an async send on the buffer      void commit(void *mem, size_t len){          _socket->async_send( @@ -262,6 +255,19 @@ public:      ////////////////////////////////////////////////////////////////////      #else /*USE_ASIO_ASYNC_SEND*/      //////////////////////////////////////////////////////////////////// +    managed_send_buffer::sptr get_send_buff(double){ +        asio::mutable_buffer buff; +        if (_pending_send_buffs->pop_with_haste(buff)){ +            return managed_send_buffer::make_safe( +                buff, boost::bind( +                    &udp_zero_copy_asio_impl::commit, this, +                    asio::buffer_cast<void*>(buff), _1 +                ) +            ); +        } +        return managed_send_buffer::sptr(); +    } +      void commit(void *mem, size_t len){          _socket->send(asio::buffer(mem, len));          handle_send(mem); | 
