//
// Copyright 2010-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
//
#include "libusb1_base.hpp"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#ifdef UHD_TXRX_DEBUG_PRINTS
#include
#include
#include
#endif
using namespace uhd;
using namespace uhd::transport;
static const size_t DEFAULT_NUM_XFERS = 16; //num xfers
static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
//! type for sharing the release queue with managed buffers
class libusb_zero_copy_mb;
typedef boost::shared_ptr > mb_queue_sptr;
/*!
* The libusb docs state that status and actual length can only be read in the callback.
* Therefore, this struct is intended to store data seen from the callback function.
*/
struct lut_result_t
{
lut_result_t(void)
{
completed = 0;
status = LIBUSB_TRANSFER_COMPLETED;
actual_length = 0;
#ifdef UHD_TXRX_DEBUG_PRINTS
start_time = 0;
buff_num = -1;
#endif
}
int completed;
libusb_transfer_status status;
int actual_length;
boost::mutex mut;
boost::condition_variable usb_transfer_complete;
#ifdef UHD_TXRX_DEBUG_PRINTS
// These are fore debugging
long start_time;
int buff_num;
bool is_recv;
#endif
};
// Created to be used as an argument to boost::condition_variable::timed_wait() function
struct lut_result_completed {
const lut_result_t& _result;
lut_result_completed(const lut_result_t& result):_result(result) {}
bool operator()() const {return (_result.completed ? true : false);}
};
#ifdef UHD_TXRX_DEBUG_PRINTS
static std::string dbg_prefix("libusb1_zero_copy,");
static void libusb1_zerocopy_dbg_print_err(std::string msg){
msg = dbg_prefix + msg;
fprintf(stderr, "%s\n", msg.c_str());
}
#endif
/*!
* All libusb callback functions should be marked with the LIBUSB_CALL macro
* to ensure that they are compiled with the same calling convention as libusb.
*/
//! helper function: handles all async callbacks
static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut)
{
lut_result_t *r = (lut_result_t *)lut->user_data;
boost::lock_guard lock(r->mut);
r->status = lut->status;
r->actual_length = lut->actual_length;
r->completed = 1;
r->usb_transfer_complete.notify_one(); // wake up thread waiting in wait_for_completion() member function below
#ifdef UHD_TXRX_DEBUG_PRINTS
long end_time = boost::get_system_time().time_of_day().total_microseconds();
libusb1_zerocopy_dbg_print_err( (boost::format("libusb_async_cb,%s,%i,%i,%i,%ld,%ld") % (r->is_recv ? "rx":"tx") % r->buff_num % r->actual_length % r->status % end_time % r->start_time).str() );
#endif
}
/***********************************************************************
* Reusable managed buffer:
* - Associated with a particular libusb transfer struct.
* - Submits the transfer to libusb in the release method.
**********************************************************************/
class libusb_zero_copy_mb : public managed_buffer
{
public:
libusb_zero_copy_mb(libusb_transfer *lut, const size_t frame_size, boost::function release_cb, const bool is_recv, const std::string &name):
_release_cb(release_cb), _is_recv(is_recv), _name(name),
_ctx(libusb::session::get_global_session()->get_context()),
_lut(lut), _frame_size(frame_size) { /* NOP */ }
void release(void){
_release_cb(this);
}
UHD_INLINE void submit(void)
{
_lut->length = (_is_recv)? _frame_size : size(); //always set length
#ifdef UHD_TXRX_DEBUG_PRINTS
result.start_time = boost::get_system_time().time_of_day().total_microseconds();
result.buff_num = num();
result.is_recv = _is_recv;
#endif
const int ret = libusb_submit_transfer(_lut);
if (ret != LIBUSB_SUCCESS)
throw uhd::usb_error(ret, str(boost::format(
"usb %s submit failed: %s") % _name % libusb_error_name(ret)));
}
template
UHD_INLINE typename buffer_type::sptr get_new(const double timeout)
{
if (wait_for_completion(timeout))
{
if (result.status != LIBUSB_TRANSFER_COMPLETED)
throw uhd::runtime_error(str(boost::format("usb %s transfer status: %d")
% _name % libusb_error_name(result.status)));
result.completed = 0;
return make(reinterpret_cast(this), _lut->buffer, (_is_recv)? result.actual_length : _frame_size);
}
return typename buffer_type::sptr();
}
// This is public because it is accessed from the libusb_zero_copy_single constructor
lut_result_t result;
/*!
* Wait for a managed buffer to become complete.
*
* \param timeout the wait timeout in seconds. A negative value will wait forever.
* \return true for completion, false for timeout
*/
UHD_INLINE bool wait_for_completion(const double timeout)
{
boost::unique_lock lock(result.mut);
if (!result.completed) {
if (timeout < 0.0) {
result.usb_transfer_complete.wait(lock);
} else {
const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
result.usb_transfer_complete.timed_wait(lock, timeout_time, lut_result_completed(result));
}
}
return (result.completed > 0);
}
private:
boost::function _release_cb;
const bool _is_recv;
const std::string _name;
libusb_context *_ctx;
libusb_transfer *_lut;
const size_t _frame_size;
};
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
class libusb_zero_copy_single
{
public:
libusb_zero_copy_single(
libusb::device_handle::sptr handle,
const size_t interface, const size_t endpoint,
const size_t num_frames, const size_t frame_size
):
_handle(handle),
_num_frames(num_frames),
_frame_size(frame_size),
_buffer_pool(buffer_pool::make(_num_frames, _frame_size)),
_enqueued(_num_frames), _released(_num_frames),
_status(STATUS_RUNNING)
{
const bool is_recv = (endpoint & 0x80) != 0;
const std::string name = str(boost::format("%s%d") % ((is_recv)? "rx" : "tx") % int(endpoint & 0x7f));
_handle->claim_interface(interface);
//flush the buffers out of the recv endpoint
//limit the flushing to at most one second
if (is_recv) for (size_t i = 0; i < 100; i++)
{
unsigned char buff[512];
int transfered = 0;
const int status = libusb_bulk_transfer(
_handle->get(), // dev_handle
endpoint, // endpoint
static_cast(buff),
sizeof(buff),
&transfered, //bytes xfered
10 //timeout ms
);
if (status == LIBUSB_ERROR_TIMEOUT) break;
}
//allocate libusb transfer structs and managed buffers
for (size_t i = 0; i < get_num_frames(); i++)
{
libusb_transfer *lut = libusb_alloc_transfer(0);
UHD_ASSERT_THROW(lut != NULL);
_mb_pool.push_back(boost::make_shared(
lut, this->get_frame_size(), boost::bind(&libusb_zero_copy_single::enqueue_buffer, this, _1), is_recv, name
));
libusb_fill_bulk_transfer(
lut, // transfer
_handle->get(), // dev_handle
endpoint, // endpoint
static_cast(_buffer_pool->at(i)), // buffer
this->get_frame_size(), // length
libusb_transfer_cb_fn(&libusb_async_cb), // callback
static_cast(&_mb_pool.back()->result), // user_data
0 // timeout (ms)
);
_all_luts.push_back(lut);
}
//initial release for all buffers
for (size_t i = 0; i < get_num_frames(); i++)
{
libusb_zero_copy_mb &mb = *(_mb_pool[i]);
if (is_recv) mb.release();
else
{
mb.result.completed = 1;
_enqueued.push_back(&mb);
}
}
}
~libusb_zero_copy_single(void)
{
//cancel all transfers
BOOST_FOREACH(libusb_transfer *lut, _all_luts)
{
libusb_cancel_transfer(lut);
}
//process all transfers until timeout occurs
BOOST_FOREACH(libusb_zero_copy_mb *mb, _enqueued)
{
mb->wait_for_completion(0.01);
}
//free all transfers
BOOST_FOREACH(libusb_transfer *lut, _all_luts)
{
libusb_free_transfer(lut);
}
}
template
UHD_INLINE typename buffer_type::sptr get_buff(double timeout)
{
typename buffer_type::sptr buff;
if (_status == STATUS_ERROR)
return buff;
// Serialize access to buffers
boost::mutex::scoped_lock get_buff_lock(_get_buff_mutex);
boost::mutex::scoped_lock queue_lock(_queue_mutex);
if (_enqueued.empty())
{
_buff_ready_cond.timed_wait(queue_lock, boost::posix_time::microseconds(long(timeout*1e6)));
}
if (_enqueued.empty()) return buff;
libusb_zero_copy_mb *front = _enqueued.front();
queue_lock.unlock();
buff = front->get_new(timeout);
queue_lock.lock();
if (buff) _enqueued.pop_front();
this->submit_what_we_can();
return buff;
}
UHD_INLINE size_t get_num_frames(void) const { return _num_frames; }
UHD_INLINE size_t get_frame_size(void) const { return _frame_size; }
private:
libusb::device_handle::sptr _handle;
const size_t _num_frames, _frame_size;
//! Storage for transfer related objects
buffer_pool::sptr _buffer_pool;
std::vector > _mb_pool;
boost::mutex _queue_mutex;
boost::condition_variable _buff_ready_cond;
boost::mutex _get_buff_mutex;
//! why 2 queues? there is room in the future to have > N buffers but only N in flight
boost::circular_buffer _enqueued, _released;
enum {STATUS_RUNNING, STATUS_ERROR} _status;
void enqueue_buffer(libusb_zero_copy_mb *mb)
{
boost::mutex::scoped_lock l(_queue_mutex);
_released.push_back(mb);
this->submit_what_we_can();
_buff_ready_cond.notify_one();
}
void submit_what_we_can(void)
{
if (_status == STATUS_ERROR)
return;
while (not _released.empty() and not _enqueued.full())
{
try {
_released.front()->submit();
_enqueued.push_back(_released.front());
_released.pop_front();
}
catch (uhd::usb_error& e)
{
_status = STATUS_ERROR;
throw e;
}
}
}
//! a list of all transfer structs we allocated
std::list _all_luts;
};
/***********************************************************************
* USB zero_copy device class
**********************************************************************/
struct libusb_zero_copy_impl : usb_zero_copy
{
libusb_zero_copy_impl(
libusb::device_handle::sptr handle,
const size_t recv_interface,
const size_t recv_endpoint,
const size_t send_interface,
const size_t send_endpoint,
const device_addr_t &hints
){
_recv_impl.reset(new libusb_zero_copy_single(
handle, recv_interface, (recv_endpoint & 0x7f) | 0x80,
size_t(hints.cast("num_recv_frames", DEFAULT_NUM_XFERS)),
size_t(hints.cast("recv_frame_size", DEFAULT_XFER_SIZE))));
_send_impl.reset(new libusb_zero_copy_single(
handle, send_interface, (send_endpoint & 0x7f) | 0x00,
size_t(hints.cast("num_send_frames", DEFAULT_NUM_XFERS)),
size_t(hints.cast("send_frame_size", DEFAULT_XFER_SIZE))));
}
managed_recv_buffer::sptr get_recv_buff(double timeout)
{
boost::mutex::scoped_lock l(_recv_mutex);
return _recv_impl->get_buff(timeout);
}
managed_send_buffer::sptr get_send_buff(double timeout)
{
boost::mutex::scoped_lock l(_send_mutex);
return _send_impl->get_buff(timeout);
}
size_t get_num_recv_frames(void) const { return _recv_impl->get_num_frames(); }
size_t get_num_send_frames(void) const { return _send_impl->get_num_frames(); }
size_t get_recv_frame_size(void) const { return _recv_impl->get_frame_size(); }
size_t get_send_frame_size(void) const { return _send_impl->get_frame_size(); }
boost::shared_ptr _recv_impl, _send_impl;
boost::mutex _recv_mutex, _send_mutex;
};
/***********************************************************************
* USB zero_copy make functions
**********************************************************************/
usb_zero_copy::sptr usb_zero_copy::make(
usb_device_handle::sptr handle,
const size_t recv_interface,
const size_t recv_endpoint,
const size_t send_interface,
const size_t send_endpoint,
const device_addr_t &hints
){
libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(
boost::static_pointer_cast(handle)->get_device()
));
return sptr(new libusb_zero_copy_impl(
dev_handle, recv_interface, recv_endpoint, send_interface, send_endpoint, hints
));
}