diff options
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.hpp | 15 | ||||
| -rw-r--r-- | host/include/uhd/transport/bounded_buffer.ipp | 17 | ||||
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 4 | ||||
| -rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 10 | 
4 files changed, 35 insertions, 11 deletions
| diff --git a/host/include/uhd/transport/bounded_buffer.hpp b/host/include/uhd/transport/bounded_buffer.hpp index 412d73f17..6aa92c2e6 100644 --- a/host/include/uhd/transport/bounded_buffer.hpp +++ b/host/include/uhd/transport/bounded_buffer.hpp @@ -43,6 +43,16 @@ namespace uhd{ namespace transport{          }          /*! +         * Push a new element into the bounded buffer immediately. +         * The element will not be pushed when the buffer is full. +         * \param elem the element reference pop to +         * \return false when the buffer is full +         */ +        bool push_with_haste(const elem_type &elem){ +            return _detail.push_with_haste(elem); +        } + +        /*!           * Push a new element into the bounded buffer.           * If the buffer is full prior to the push,           * make room by poping the oldest element. @@ -74,9 +84,10 @@ namespace uhd{ namespace transport{          }          /*! -         * Pop an element from the bounded_buffer immediately. +         * Pop an element from the bounded buffer immediately. +         * The element will not be popped when the buffer is empty.           * \param elem the element reference pop to -         * \return false when the bounded_buffer is empty +         * \return false when the buffer is empty           */          bool pop_with_haste(elem_type &elem){              return _detail.pop_with_haste(elem); diff --git a/host/include/uhd/transport/bounded_buffer.ipp b/host/include/uhd/transport/bounded_buffer.ipp index 7be2f987c..0d393ad64 100644 --- a/host/include/uhd/transport/bounded_buffer.ipp +++ b/host/include/uhd/transport/bounded_buffer.ipp @@ -37,9 +37,18 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/              _not_empty_fcn = boost::bind(&bounded_buffer_detail<elem_type>::not_empty, this);          } +        UHD_INLINE bool push_with_haste(const elem_type &elem){ +            boost::mutex::scoped_lock lock(_mutex); +            if (_buffer.full()) return false; +            _buffer.push_front(elem); +            lock.unlock(); +            _empty_cond.notify_one(); +            return true; +        } +          UHD_INLINE bool push_with_pop_on_full(const elem_type &elem){              boost::mutex::scoped_lock lock(_mutex); -            if(_buffer.full()){ +            if (_buffer.full()){                  _buffer.pop_back();                  _buffer.push_front(elem);                  lock.unlock(); @@ -55,6 +64,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE void push_with_wait(const elem_type &elem){ +            if (this->push_with_haste(elem)) return;              boost::mutex::scoped_lock lock(_mutex);              _full_cond.wait(lock, _not_full_fcn);              _buffer.push_front(elem); @@ -63,6 +73,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE bool push_with_timed_wait(const elem_type &elem, double timeout){ +            if (this->push_with_haste(elem)) return true;              boost::mutex::scoped_lock lock(_mutex);              if (not _full_cond.timed_wait(                  lock, to_time_dur(timeout), _not_full_fcn @@ -75,7 +86,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          UHD_INLINE bool pop_with_haste(elem_type &elem){              boost::mutex::scoped_lock lock(_mutex); -            if(_buffer.empty()) return false; +            if (_buffer.empty()) return false;              elem = this->pop_back();              lock.unlock();              _full_cond.notify_one(); @@ -83,6 +94,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE void pop_with_wait(elem_type &elem){ +            if (this->pop_with_haste(elem)) return;              boost::mutex::scoped_lock lock(_mutex);              _empty_cond.wait(lock, _not_empty_fcn);              elem = this->pop_back(); @@ -91,6 +103,7 @@ namespace uhd{ namespace transport{ namespace{ /*anon*/          }          UHD_INLINE bool pop_with_timed_wait(elem_type &elem, double timeout){ +            if (this->pop_with_haste(elem)) return true;              boost::mutex::scoped_lock lock(_mutex);              if (not _empty_cond.timed_wait(                  lock, to_time_dur(timeout), _not_empty_fcn diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index ca37f351f..6fab5ae6f 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -132,7 +132,7 @@ static void callback(libusb_transfer *lut){   * \param pointer to libusb_transfer   */  void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ -    _completed_list.push_with_wait(lut); +    _completed_list.push_with_haste(lut);  } @@ -161,7 +161,7 @@ usb_endpoint::usb_endpoint(          //input luts are immediately submitted to be filled          //output luts go into the completed list as free buffers          if (_input) this->submit(_all_luts.back()); -        else _completed_list.push_with_wait(_all_luts.back()); +        else _completed_list.push_with_haste(_all_luts.back());      }  } diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp index 87c5ec823..c45b196cf 100644 --- a/host/lib/transport/udp_zero_copy_asio.cpp +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -216,14 +216,14 @@ public:      managed_recv_buffer::sptr get_recv_buff(double timeout){          udp_zero_copy_asio_mrb *mrb = NULL; -        if (is_recv_socket_ready(timeout) and _pending_recv_buffs.pop_with_haste(mrb)){ +        if (is_recv_socket_ready(timeout) and _pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){              return mrb->get_new(::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0));          }          return managed_recv_buffer::sptr();      }      UHD_INLINE void handle_recv(udp_zero_copy_asio_mrb *mrb){ -        _pending_recv_buffs.push_with_pop_on_full(mrb); +        _pending_recv_buffs.push_with_haste(mrb);      }      void release(udp_zero_copy_asio_mrb *mrb){ @@ -245,16 +245,16 @@ public:       *  - A managed buffer is always available.       *  - The queue can never be over-filled.       ******************************************************************/ -    managed_send_buffer::sptr get_send_buff(double){ +    managed_send_buffer::sptr get_send_buff(double timeout){          udp_zero_copy_asio_msb *msb = NULL; -        if (_pending_send_buffs.pop_with_haste(msb)){ +        if (_pending_send_buffs.pop_with_timed_wait(msb, timeout)){              return msb->get_new(_send_frame_size);          }          return managed_send_buffer::sptr();      }      UHD_INLINE void handle_send(udp_zero_copy_asio_msb *msb){ -        _pending_send_buffs.push_with_pop_on_full(msb); +        _pending_send_buffs.push_with_haste(msb);      }      void commit(udp_zero_copy_asio_msb *msb, size_t len){ | 
