diff options
Diffstat (limited to 'host/lib/transport/libusb1_zero_copy.cpp')
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 75 | 
1 files changed, 39 insertions, 36 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 1ac02d16f..6925e7659 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -43,23 +43,6 @@ using namespace uhd::transport;  static const size_t DEFAULT_NUM_XFERS = 16;     //num xfers  static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes -//! Define LIBUSB_CALL when its missing (non-windows) -#ifndef LIBUSB_CALL -    #define LIBUSB_CALL -#endif /*LIBUSB_CALL*/ - -//! libusb_handle_events_timeout_completed is only in newer API -#ifndef HAVE_LIBUSB_HANDLE_EVENTS_TIMEOUT_COMPLETED -    #define libusb_handle_events_timeout_completed(ctx, tx, completed) \ -        libusb_handle_events_timeout(ctx, tx) -#endif - -//! libusb_error_name is only in newer API -#ifndef HAVE_LIBUSB_ERROR_NAME -    #define libusb_error_name(code) \ -        str(boost::format("LIBUSB_ERROR_CODE %d") % code) -#endif -  //! type for sharing the release queue with managed buffers  class libusb_zero_copy_mb;  typedef boost::shared_ptr<bounded_buffer<libusb_zero_copy_mb *> > mb_queue_sptr; @@ -155,8 +138,9 @@ public:          result.is_recv = _is_recv;  #endif          const int ret = libusb_submit_transfer(_lut); -        if (ret != 0) throw uhd::runtime_error(str(boost::format( -            "usb %s submit failed: %s") % _name % libusb_error_name(ret))); +        if (ret != LIBUSB_SUCCESS) +            throw uhd::runtime_error(str(boost::format("usb %s submit failed: %s") +                                         % _name % libusb_strerror((libusb_error)ret)));      }      template <typename buffer_type> @@ -164,8 +148,9 @@ public:      {          if (wait_for_completion(timeout))          { -            if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format( -                "usb %s transfer status: %d") % _name % int(result.status))); +            if (result.status != LIBUSB_TRANSFER_COMPLETED) +                throw uhd::runtime_error(str(boost::format("usb %s transfer status: %d") +                                             % _name % libusb_error_name(result.status)));              result.completed = 0;              return make(reinterpret_cast<buffer_type *>(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size);          } @@ -219,7 +204,8 @@ public:          _num_frames(num_frames),          _frame_size(frame_size),          _buffer_pool(buffer_pool::make(_num_frames, _frame_size)), -        _enqueued(_num_frames), _released(_num_frames) +        _enqueued(_num_frames), _released(_num_frames), +        _status(STATUS_RUNNING)      {          const bool is_recv = (endpoint & 0x80) != 0;          const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f)); @@ -304,18 +290,24 @@ public:      UHD_INLINE typename buffer_type::sptr get_buff(double timeout)      {          typename buffer_type::sptr buff; -        libusb_zero_copy_mb *front = NULL; -        boost::mutex::scoped_lock lock(_mutex); + +        if (_status == STATUS_ERROR) +            return buff; + +        // Serialize access to buffers +        boost::mutex::scoped_lock get_buff_lock(_get_buff_mutex); + +        boost::mutex::scoped_lock queue_lock(_queue_mutex);          if (_enqueued.empty())          { -            _cond.timed_wait(lock, boost::posix_time::microseconds(long(timeout*1e6))); +            _buff_ready_cond.timed_wait(queue_lock, boost::posix_time::microseconds(long(timeout*1e6)));          }          if (_enqueued.empty()) return buff; -        front = _enqueued.front(); +        libusb_zero_copy_mb *front = _enqueued.front(); -        lock.unlock(); +        queue_lock.unlock();          buff = front->get_new<buffer_type>(timeout); -        lock.lock(); +        queue_lock.lock();          if (buff) _enqueued.pop_front();          this->submit_what_we_can(); @@ -333,28 +325,39 @@ private:      buffer_pool::sptr _buffer_pool;      std::vector<boost::shared_ptr<libusb_zero_copy_mb> > _mb_pool; -    boost::mutex _mutex; -    boost::condition_variable _cond; +    boost::mutex _queue_mutex; +    boost::condition_variable _buff_ready_cond; +    boost::mutex _get_buff_mutex;      //! why 2 queues? there is room in the future to have > N buffers but only N in flight      boost::circular_buffer<libusb_zero_copy_mb *> _enqueued, _released; +    enum {STATUS_RUNNING, STATUS_ERROR} _status; +      void enqueue_buffer(libusb_zero_copy_mb *mb)      { -        boost::mutex::scoped_lock l(_mutex); +        boost::mutex::scoped_lock l(_queue_mutex);          _released.push_back(mb);          this->submit_what_we_can(); -        l.unlock(); -        _cond.notify_one(); +        _buff_ready_cond.notify_one();      }      void submit_what_we_can(void)      { +        if (_status == STATUS_ERROR) +            return;          while (not _released.empty() and not _enqueued.full())          { -            _released.front()->submit(); -            _enqueued.push_back(_released.front()); -            _released.pop_front(); +            try { +                _released.front()->submit(); +                _enqueued.push_back(_released.front()); +                _released.pop_front(); +            } +            catch (uhd::runtime_error& e) +            { +                _status = STATUS_ERROR; +                throw e; +            }          }      }  | 
