summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/lib/transport/libusb1_base.cpp19
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp99
-rw-r--r--host/lib/usrp/b200/b200_io_impl.cpp4
3 files changed, 67 insertions, 55 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index 0ef53db0a..4cf3ea17d 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -19,10 +19,12 @@
#include <uhd/exception.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/types/dict.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/foreach.hpp>
+#include <boost/bind.hpp>
#include <cstdlib>
#include <iostream>
@@ -37,9 +39,11 @@ public:
libusb_session_impl(void){
UHD_ASSERT_THROW(libusb_init(&_context) == 0);
libusb_set_debug(_context, debug_level);
+ task_handler = task::make(boost::bind(&libusb_session_impl::libusb_event_handler_task, this, _context));
}
~libusb_session_impl(void){
+ task_handler.reset();
libusb_exit(_context);
}
@@ -49,6 +53,21 @@ public:
private:
libusb_context *_context;
+ task::sptr task_handler;
+
+ /*
+ * Task to handle libusb events. There should only be one thread per libusb_context handling events.
+ * Using more than one thread can result in excessive CPU usage in kernel space (presumably from locking/waiting).
+ * The libusb documentation says it is safe, which it is, but it neglects to state the cost in CPU usage.
+ * Just don't do it!
+ */
+ UHD_INLINE void libusb_event_handler_task(libusb_context *context)
+ {
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000;
+ libusb_handle_events_timeout(context, &tv);
+ }
};
libusb::session::sptr libusb::session::get_global_session(void){
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 197e257da..2d18e1623 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -73,6 +73,15 @@ struct lut_result_t
int completed;
libusb_transfer_status status;
int actual_length;
+ boost::mutex mut;
+ boost::condition_variable usb_transfer_complete;
+};
+
+// Created to be used as an argument to boost::condition_variable::timed_wait() function
+struct lut_result_completed {
+ const lut_result_t& _result;
+ lut_result_completed(const lut_result_t& result):_result(result) {}
+ bool operator()() const {return (_result.completed ? true : false);}
};
/*!
@@ -84,48 +93,11 @@ struct lut_result_t
static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
{
lut_result_t *r = (lut_result_t *)lut->user_data;
- r->completed = 1;
+ boost::lock_guard<boost::mutex> lock(r->mut);
r->status = lut->status;
r->actual_length = lut->actual_length;
-}
-
-/*!
- * Wait for a managed buffer to become complete.
- *
- * This routine processes async events until the transaction completes.
- * We must call the libusb handle events in a loop because the handler
- * may complete managed buffers other than the one we are waiting on.
- *
- * We cannot determine if handle events timed out or processed an event.
- * Therefore, the timeout condition is handled by using boost system time.
- *
- * \param ctx the libusb context structure
- * \param timeout the wait timeout in seconds
- * \param completed a reference to the completed flag
- * \return true for completion, false for timeout
- */
-UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, int &completed)
-{
- //already completed by a previous call?
- if (completed) return true;
-
- //perform a non-blocking event handle
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- libusb_handle_events_timeout_completed(ctx, &tv, &completed);
- if (completed) return true;
-
- //finish the rest with a timeout loop
- const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
- while (not completed and (boost::get_system_time() < timeout_time)){
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 10000; /*10ms*/
- libusb_handle_events_timeout_completed(ctx, &tv, &completed);
- }
-
- return completed;
+ r->completed = 1;
+ r->usb_transfer_complete.notify_one(); // wake up thread waiting in wait_for_completion() member function below
}
/***********************************************************************
@@ -154,7 +126,7 @@ public:
template <typename buffer_type>
UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
{
- if (wait_for_completion(_ctx, timeout, result.completed))
+ if (wait_for_completion(timeout))
{
if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format(
"usb %s transfer status: %d") % _name % int(result.status)));
@@ -164,9 +136,31 @@ public:
return typename buffer_type::sptr();
}
+ // This is public because it is accessed from the libusb_zero_copy_single constructor
lut_result_t result;
+ /*!
+ * Wait for a managed buffer to become complete.
+ *
+ * \param timeout the wait timeout in seconds. A negative value will wait forever.
+ * \return true for completion, false for timeout
+ */
+ UHD_INLINE bool wait_for_completion(const double timeout)
+ {
+ boost::unique_lock<boost::mutex> lock(result.mut);
+ if (!result.completed) {
+ if (timeout < 0.0) {
+ result.usb_transfer_complete.wait(lock);
+ } else {
+ const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
+ result.usb_transfer_complete.timed_wait(lock, timeout_time, lut_result_completed(result));
+ }
+ }
+ return result.completed;
+ }
+
private:
+
boost::function<void(libusb_zero_copy_mb *)> _release_cb;
const bool _is_recv;
const std::string _name;
@@ -252,8 +246,6 @@ public:
~libusb_zero_copy_single(void)
{
- libusb_context *ctx = libusb::session::get_global_session()->get_context();
-
//cancel all transfers
BOOST_FOREACH(libusb_transfer *lut, _all_luts)
{
@@ -261,8 +253,10 @@ public:
}
//process all transfers until timeout occurs
- int completed = 0;
- wait_for_completion(ctx, 0.01, completed);
+ BOOST_FOREACH(libusb_zero_copy_mb *mb, _enqueued)
+ {
+ mb->wait_for_completion(0.01);
+ }
//free all transfers
BOOST_FOREACH(libusb_transfer *lut, _all_luts)
@@ -276,19 +270,18 @@ public:
{
typename buffer_type::sptr buff;
libusb_zero_copy_mb *front = NULL;
+ boost::mutex::scoped_lock lock(_mutex);
+ if (_enqueued.empty())
{
- boost::mutex::scoped_lock l(_mutex);
- if (_enqueued.empty())
- {
- _cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6)));
- }
- if (_enqueued.empty()) return buff;
- front = _enqueued.front();
+ _cond.timed_wait(lock, boost::posix_time::microseconds(long(timeout*1e6)));
}
+ if (_enqueued.empty()) return buff;
+ front = _enqueued.front();
+ lock.unlock();
buff = front->get_new<buffer_type>(timeout);
+ lock.lock();
- boost::mutex::scoped_lock l(_mutex);
if (buff) _enqueued.pop_front();
this->submit_what_we_can();
return buff;
diff --git a/host/lib/usrp/b200/b200_io_impl.cpp b/host/lib/usrp/b200/b200_io_impl.cpp
index d643ef855..1feeff1a3 100644
--- a/host/lib/usrp/b200/b200_io_impl.cpp
+++ b/host/lib/usrp/b200/b200_io_impl.cpp
@@ -231,14 +231,14 @@ rx_streamer::sptr b200_impl::get_rx_stream(const uhd::stream_args_t &args_)
//calculate packet size
static const size_t hdr_size = 0
+ vrt::max_if_hdr_words32*sizeof(boost::uint32_t)
- + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
+ //+ sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
- sizeof(vrt::if_packet_info_t().cid) //no class id ever used
- sizeof(vrt::if_packet_info_t().tsi) //no int time ever used
;
const size_t bpp = _data_transport->get_recv_frame_size() - hdr_size;
const size_t bpi = convert::get_bytes_per_item(args.otw_format);
size_t spp = unsigned(args.args.cast<double>("spp", bpp/bpi));
- spp = std::min<size_t>(2000, spp); //magic maximum for framing at full rate
+ spp = std::min<size_t>(2044, spp); //magic maximum for framing at full rate
//make the new streamer given the samples per packet
if (not my_streamer) my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp);