aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/libusb1_base.cpp23
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp99
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp3
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp2
-rw-r--r--host/lib/transport/udp_zero_copy.cpp3
5 files changed, 74 insertions, 56 deletions
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
index 0ef53db0a..8bd0f4354 100644
--- a/host/lib/transport/libusb1_base.cpp
+++ b/host/lib/transport/libusb1_base.cpp
@@ -19,10 +19,12 @@
#include <uhd/exception.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/types/dict.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/foreach.hpp>
+#include <boost/bind.hpp>
#include <cstdlib>
#include <iostream>
@@ -37,9 +39,11 @@ public:
libusb_session_impl(void){
UHD_ASSERT_THROW(libusb_init(&_context) == 0);
libusb_set_debug(_context, debug_level);
+ task_handler = task::make(boost::bind(&libusb_session_impl::libusb_event_handler_task, this, _context));
}
~libusb_session_impl(void){
+ task_handler.reset();
libusb_exit(_context);
}
@@ -49,6 +53,21 @@ public:
private:
libusb_context *_context;
+ task::sptr task_handler;
+
+ /*
+ * Task to handle libusb events. There should only be one thread per libusb_context handling events.
+ * Using more than one thread can result in excessive CPU usage in kernel space (presumably from locking/waiting).
+ * The libusb documentation says it is safe, which it is, but it neglects to state the cost in CPU usage.
+ * Just don't do it!
+ */
+ UHD_INLINE void libusb_event_handler_task(libusb_context *context)
+ {
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000;
+ libusb_handle_events_timeout(context, &tv);
+ }
};
libusb::session::sptr libusb::session::get_global_session(void){
@@ -274,6 +293,10 @@ public:
return libusb::device_descriptor::make(this->get_device())->get().idProduct;
}
+ bool firmware_loaded() {
+ return (get_manufacturer() == "Ettus Research LLC");
+ }
+
private:
libusb::device::sptr _dev; //always keep a reference to device
};
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 197e257da..2d18e1623 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -73,6 +73,15 @@ struct lut_result_t
int completed;
libusb_transfer_status status;
int actual_length;
+ boost::mutex mut;
+ boost::condition_variable usb_transfer_complete;
+};
+
+// Created to be used as an argument to boost::condition_variable::timed_wait() function
+struct lut_result_completed {
+ const lut_result_t& _result;
+ lut_result_completed(const lut_result_t& result):_result(result) {}
+ bool operator()() const {return (_result.completed ? true : false);}
};
/*!
@@ -84,48 +93,11 @@ struct lut_result_t
static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
{
lut_result_t *r = (lut_result_t *)lut->user_data;
- r->completed = 1;
+ boost::lock_guard<boost::mutex> lock(r->mut);
r->status = lut->status;
r->actual_length = lut->actual_length;
-}
-
-/*!
- * Wait for a managed buffer to become complete.
- *
- * This routine processes async events until the transaction completes.
- * We must call the libusb handle events in a loop because the handler
- * may complete managed buffers other than the one we are waiting on.
- *
- * We cannot determine if handle events timed out or processed an event.
- * Therefore, the timeout condition is handled by using boost system time.
- *
- * \param ctx the libusb context structure
- * \param timeout the wait timeout in seconds
- * \param completed a reference to the completed flag
- * \return true for completion, false for timeout
- */
-UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, int &completed)
-{
- //already completed by a previous call?
- if (completed) return true;
-
- //perform a non-blocking event handle
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
- libusb_handle_events_timeout_completed(ctx, &tv, &completed);
- if (completed) return true;
-
- //finish the rest with a timeout loop
- const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
- while (not completed and (boost::get_system_time() < timeout_time)){
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 10000; /*10ms*/
- libusb_handle_events_timeout_completed(ctx, &tv, &completed);
- }
-
- return completed;
+ r->completed = 1;
+ r->usb_transfer_complete.notify_one(); // wake up thread waiting in wait_for_completion() member function below
}
/***********************************************************************
@@ -154,7 +126,7 @@ public:
template <typename buffer_type>
UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
{
- if (wait_for_completion(_ctx, timeout, result.completed))
+ if (wait_for_completion(timeout))
{
if (result.status != LIBUSB_TRANSFER_COMPLETED) throw uhd::runtime_error(str(boost::format(
"usb %s transfer status: %d") % _name % int(result.status)));
@@ -164,9 +136,31 @@ public:
return typename buffer_type::sptr();
}
+ // This is public because it is accessed from the libusb_zero_copy_single constructor
lut_result_t result;
+ /*!
+ * Wait for a managed buffer to become complete.
+ *
+ * \param timeout the wait timeout in seconds. A negative value will wait forever.
+ * \return true for completion, false for timeout
+ */
+ UHD_INLINE bool wait_for_completion(const double timeout)
+ {
+ boost::unique_lock<boost::mutex> lock(result.mut);
+ if (!result.completed) {
+ if (timeout < 0.0) {
+ result.usb_transfer_complete.wait(lock);
+ } else {
+ const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
+ result.usb_transfer_complete.timed_wait(lock, timeout_time, lut_result_completed(result));
+ }
+ }
+ return result.completed;
+ }
+
private:
+
boost::function<void(libusb_zero_copy_mb *)> _release_cb;
const bool _is_recv;
const std::string _name;
@@ -252,8 +246,6 @@ public:
~libusb_zero_copy_single(void)
{
- libusb_context *ctx = libusb::session::get_global_session()->get_context();
-
//cancel all transfers
BOOST_FOREACH(libusb_transfer *lut, _all_luts)
{
@@ -261,8 +253,10 @@ public:
}
//process all transfers until timeout occurs
- int completed = 0;
- wait_for_completion(ctx, 0.01, completed);
+ BOOST_FOREACH(libusb_zero_copy_mb *mb, _enqueued)
+ {
+ mb->wait_for_completion(0.01);
+ }
//free all transfers
BOOST_FOREACH(libusb_transfer *lut, _all_luts)
@@ -276,19 +270,18 @@ public:
{
typename buffer_type::sptr buff;
libusb_zero_copy_mb *front = NULL;
+ boost::mutex::scoped_lock lock(_mutex);
+ if (_enqueued.empty())
{
- boost::mutex::scoped_lock l(_mutex);
- if (_enqueued.empty())
- {
- _cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6)));
- }
- if (_enqueued.empty()) return buff;
- front = _enqueued.front();
+ _cond.timed_wait(lock, boost::posix_time::microseconds(long(timeout*1e6)));
}
+ if (_enqueued.empty()) return buff;
+ front = _enqueued.front();
+ lock.unlock();
buff = front->get_new<buffer_type>(timeout);
+ lock.lock();
- boost::mutex::scoped_lock l(_mutex);
if (buff) _enqueued.pop_front();
this->submit_what_we_can();
return buff;
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 688228e49..5080182d6 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -246,7 +246,8 @@ private:
struct xport_chan_props_type{
xport_chan_props_type(void):
packet_count(0),
- handle_overflow(&handle_overflow_nop)
+ handle_overflow(&handle_overflow_nop),
+ fc_update_window(0)
{}
get_buff_type get_buff;
issue_stream_cmd_type issue_stream_cmd;
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 41f030ea6..ae483d1f3 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -239,7 +239,7 @@ private:
size_t _header_offset_words32;
double _tick_rate, _samp_rate;
struct xport_chan_props_type{
- xport_chan_props_type(void):has_sid(false){}
+ xport_chan_props_type(void):has_sid(false),sid(0){}
get_buff_type get_buff;
bool has_sid;
boost::uint32_t sid;
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp
index 166177177..7b6a476f5 100644
--- a/host/lib/transport/udp_zero_copy.cpp
+++ b/host/lib/transport/udp_zero_copy.cpp
@@ -68,7 +68,7 @@ static void check_registry_for_fast_send_threshold(const size_t mtu){
class udp_zero_copy_asio_mrb : public managed_recv_buffer{
public:
udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
- _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size), _len(0) { /*NOP*/ }
void release(void){
_claimer.release();
@@ -87,6 +87,7 @@ public:
if (wait_for_recv_ready(_sock_fd, timeout)){
_len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0);
+ UHD_ASSERT_THROW(_len > 0); // TODO: Handle case of recv error
index++; //advances the caller's buffer
return make(this, _mem, size_t(_len));
}