diff options
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 81 | ||||
-rw-r--r-- | host/lib/transport/buffer_pool.cpp | 80 | ||||
-rwxr-xr-x | host/lib/transport/gen_vrt_if_packet.py | 242 | ||||
-rw-r--r-- | host/lib/transport/if_addrs.cpp | 109 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.cpp | 265 | ||||
-rw-r--r-- | host/lib/transport/libusb1_base.hpp | 149 | ||||
-rw-r--r-- | host/lib/transport/libusb1_control.cpp | 64 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 444 | ||||
-rw-r--r-- | host/lib/transport/udp_simple.cpp | 173 | ||||
-rw-r--r-- | host/lib/transport/udp_zero_copy_asio.cpp | 386 | ||||
-rw-r--r-- | host/lib/transport/usb_dummy_impl.cpp | 39 | ||||
-rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 466 | ||||
-rw-r--r-- | host/lib/transport/zero_copy.cpp | 108 |
13 files changed, 2606 insertions, 0 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt new file mode 100644 index 000000000..8765c6703 --- /dev/null +++ b/host/lib/transport/CMakeLists.txt @@ -0,0 +1,81 @@ +# +# Copyright 2010-2011 Ettus Research LLC +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +######################################################################## +# This file included, use CMake directory variables +######################################################################## + +######################################################################## +# Setup libusb +######################################################################## +MESSAGE(STATUS "") +FIND_PACKAGE(USB1) + +LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF) + +IF(ENABLE_USB) + MESSAGE(STATUS "USB support enabled via libusb.") + INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIR}) + LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES}) + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_control.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_zero_copy.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.hpp + ) +ELSE(ENABLE_USB) + LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/usb_dummy_impl.cpp + ) +ENDIF(ENABLE_USB) + +######################################################################## +# Setup defines for interface address discovery +######################################################################## +MESSAGE(STATUS "") +MESSAGE(STATUS "Configuring interface address discovery...") + +INCLUDE(CheckIncludeFileCXX) +CHECK_INCLUDE_FILE_CXX(ifaddrs.h HAVE_IFADDRS_H) +CHECK_INCLUDE_FILE_CXX(winsock2.h HAVE_WINSOCK2_H) + +IF(HAVE_IFADDRS_H) + MESSAGE(STATUS " Interface address discovery supported through getifaddrs.") + ADD_DEFINITIONS(-DHAVE_IFADDRS_H) +ELSEIF(HAVE_WINSOCK2_H) + MESSAGE(STATUS " Interface address discovery supported through SIO_GET_INTERFACE_LIST.") + ADD_DEFINITIONS(-DHAVE_WINSOCK2_H) +ELSE(HAVE_IFADDRS_H) + MESSAGE(STATUS " Interface address discovery not supported.") +ENDIF(HAVE_IFADDRS_H) + +######################################################################## +# Append to the list of sources for lib uhd +######################################################################## +LIBUHD_PYTHON_GEN_SOURCE( + ${CMAKE_CURRENT_SOURCE_DIR}/gen_vrt_if_packet.py + ${CMAKE_CURRENT_BINARY_DIR}/vrt_if_packet.cpp +) + +LIBUHD_APPEND_SOURCES( + ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy_asio.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/vrt_packet_handler.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy.cpp +) diff --git a/host/lib/transport/buffer_pool.cpp b/host/lib/transport/buffer_pool.cpp new file mode 100644 index 000000000..971bbb75a --- /dev/null +++ b/host/lib/transport/buffer_pool.cpp @@ -0,0 +1,80 @@ +// +// Copyright 2011-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/buffer_pool.hpp> +#include <boost/shared_array.hpp> +#include <vector> + +using namespace uhd::transport; + +//! pad the byte count to a multiple of alignment +static size_t pad_to_boundary(const size_t bytes, const size_t alignment){ + return bytes + (alignment - bytes)%alignment; +} + +/*********************************************************************** + * Buffer pool implementation + **********************************************************************/ +class buffer_pool_impl : public buffer_pool{ +public: + buffer_pool_impl( + const std::vector<ptr_type> &ptrs, + boost::shared_array<char> mem + ): _ptrs(ptrs), _mem(mem){ + /* NOP */ + } + + ptr_type at(const size_t index) const{ + return _ptrs.at(index); + } + + size_t size(void) const{ + return _ptrs.size(); + } + +private: + std::vector<ptr_type> _ptrs; + boost::shared_array<char> _mem; +}; + +/*********************************************************************** + * Buffer pool factor function + **********************************************************************/ +buffer_pool::sptr buffer_pool::make( + const size_t num_buffs, + const size_t buff_size, + const size_t alignment +){ + //1) pad the buffer size to be a multiple of alignment + //2) pad the overall memory size for room after alignment + //3) allocate the memory in one block of sufficient size + const size_t padded_buff_size = pad_to_boundary(buff_size, alignment); + boost::shared_array<char> mem(new char[padded_buff_size*num_buffs + alignment-1]); + + //Fill a vector with boundary-aligned points in the memory + const size_t mem_start = pad_to_boundary(size_t(mem.get()), alignment); + std::vector<ptr_type> ptrs(num_buffs); + for (size_t i = 0; i < num_buffs; i++){ + ptrs[i] = ptr_type(mem_start + padded_buff_size*i); + } + + //Create a new buffer pool implementation with: + // - the pre-computed pointers, and + // - the reference to allocated memory. + return sptr(new buffer_pool_impl(ptrs, mem)); +} + diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py new file mode 100755 index 000000000..dbe026ba3 --- /dev/null +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -0,0 +1,242 @@ +#!/usr/bin/env python +# +# Copyright 2010 Ettus Research LLC +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +""" +The vrt packer/unpacker code generator: + +This script will generate the pack and unpack routines that convert +metatdata into vrt headers and vrt headers into metadata. + +The generated code infers jump tables to speed-up the parsing time. +""" + +TMPL_TEXT = """ +#import time +/*********************************************************************** + * This file was generated by $file on $time.strftime("%c") + **********************************************************************/ + +\#include <uhd/transport/vrt_if_packet.hpp> +\#include <uhd/utils/byteswap.hpp> +\#include <boost/detail/endian.hpp> +\#include <stdexcept> + +//define the endian macros to convert integers +\#ifdef BOOST_BIG_ENDIAN + \#define BE_MACRO(x) (x) + \#define LE_MACRO(x) uhd::byteswap(x) +\#else + \#define BE_MACRO(x) uhd::byteswap(x) + \#define LE_MACRO(x) (x) +\#endif + +using namespace uhd; +using namespace uhd::transport; + +######################################################################## +#def gen_code($XE_MACRO, $suffix) +######################################################################## + +######################################################################## +## setup predicates +######################################################################## +#set $sid_p = 0b00001 +#set $cid_p = 0b00010 +#set $tsi_p = 0b00100 +#set $tsf_p = 0b01000 +#set $tlr_p = 0b10000 + +void vrt::if_hdr_pack_$(suffix)( + boost::uint32_t *packet_buff, + if_packet_info_t &if_packet_info +){ + boost::uint32_t vrt_hdr_flags = 0; + + boost::uint8_t pred = 0; + if (if_packet_info.has_sid) pred |= $hex($sid_p); + if (if_packet_info.has_cid) pred |= $hex($cid_p); + if (if_packet_info.has_tsi) pred |= $hex($tsi_p); + if (if_packet_info.has_tsf) pred |= $hex($tsf_p); + if (if_packet_info.has_tlr) pred |= $hex($tlr_p); + + switch(pred){ + #for $pred in range(2**5) + case $pred: + #set $num_header_words = 1 + #set $flags = 0 + ########## Stream ID ########## + #if $pred & $sid_p + packet_buff[$num_header_words] = $(XE_MACRO)(if_packet_info.sid); + #set $num_header_words += 1 + #set $flags |= (0x1 << 28); + #end if + ########## Class ID ########## + #if $pred & $cid_p + packet_buff[$num_header_words] = 0; //not implemented + #set $num_header_words += 1 + packet_buff[$num_header_words] = 0; //not implemented + #set $num_header_words += 1 + #set $flags |= (0x1 << 27); + #end if + ########## Integer Time ########## + #if $pred & $tsi_p + packet_buff[$num_header_words] = $(XE_MACRO)(if_packet_info.tsi); + #set $num_header_words += 1 + #set $flags |= (0x3 << 22); + #end if + ########## Fractional Time ########## + #if $pred & $tsf_p + packet_buff[$num_header_words] = $(XE_MACRO)(boost::uint32_t(if_packet_info.tsf >> 32)); + #set $num_header_words += 1 + packet_buff[$num_header_words] = $(XE_MACRO)(boost::uint32_t(if_packet_info.tsf >> 0)); + #set $num_header_words += 1 + #set $flags |= (0x1 << 20); + #end if + ########## Trailer ########## + #if $pred & $tlr_p + //packet_buff[$num_header_words+if_packet_info.num_payload_words32] = $(XE_MACRO)(if_packet_info.tlr); + #set $flags |= (0x1 << 26); + #set $num_trailer_words = 1; + #else + #set $num_trailer_words = 0; + #end if + ########## Variables ########## + if_packet_info.num_header_words32 = $num_header_words; + if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32; + vrt_hdr_flags = $hex($flags); + break; + #end for + } + + //set the burst flags + if (if_packet_info.sob) vrt_hdr_flags |= $hex(0x1 << 25); + if (if_packet_info.eob) vrt_hdr_flags |= $hex(0x1 << 24); + + //fill in complete header word + packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 + | vrt_hdr_flags + | ((if_packet_info.packet_count & 0xf) << 16) + | (if_packet_info.num_packet_words32 & 0xffff) + )); +} + +void vrt::if_hdr_unpack_$(suffix)( + const boost::uint32_t *packet_buff, + if_packet_info_t &if_packet_info +){ + //extract vrt header + boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]); + /* + size_t packet_words32 = vrt_hdr_word & 0xffff; + + //failure case + if (if_packet_info.num_packet_words32 < packet_words32) + throw std::runtime_error("bad vrt header or packet fragment"); + */ + //Fix for short packets sent from the fpga: + // Use the num_packet_words32 passed in as input, + // and do not use the header bits which could be wrong. + size_t packet_words32 = if_packet_info.num_packet_words32; + + //extract fields from the header + if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); + if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; + //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented + //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented + + boost::uint8_t pred = 0; + if(vrt_hdr_word & $hex(0x1 << 28)) pred |= $hex($sid_p); + if(vrt_hdr_word & $hex(0x1 << 27)) pred |= $hex($cid_p); + if(vrt_hdr_word & $hex(0x3 << 22)) pred |= $hex($tsi_p); + if(vrt_hdr_word & $hex(0x3 << 20)) pred |= $hex($tsf_p); + if(vrt_hdr_word & $hex(0x1 << 26)) pred |= $hex($tlr_p); + + switch(pred){ + #for $pred in range(2**5) + case $pred: + #set $has_time_spec = False + #set $num_header_words = 1 + ########## Stream ID ########## + #if $pred & $sid_p + if_packet_info.has_sid = true; + if_packet_info.sid = $(XE_MACRO)(packet_buff[$num_header_words]); + #set $num_header_words += 1 + #else + if_packet_info.has_sid = false; + #end if + ########## Class ID ########## + #if $pred & $cid_p + if_packet_info.has_cid = true; + if_packet_info.cid = 0; //not implemented + #set $num_header_words += 2 + #else + if_packet_info.has_cid = false; + #end if + ########## Integer Time ########## + #if $pred & $tsi_p + if_packet_info.has_tsi = true; + if_packet_info.tsi = $(XE_MACRO)(packet_buff[$num_header_words]); + #set $num_header_words += 1 + #else + if_packet_info.has_tsi = false; + #end if + ########## Fractional Time ########## + #if $pred & $tsf_p + if_packet_info.has_tsf = true; + if_packet_info.tsf = boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 32; + #set $num_header_words += 1 + if_packet_info.tsf |= boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 0; + #set $num_header_words += 1 + #else + if_packet_info.has_tsf = false; + #end if + ########## Trailer ########## + #if $pred & $tlr_p + if_packet_info.has_tlr = true; + if_packet_info.tlr = $(XE_MACRO)(packet_buff[packet_words32-1]); + #set $num_trailer_words = 1; + #else + if_packet_info.has_tlr = false; + #set $num_trailer_words = 0; + #end if + ########## Variables ########## + //another failure case + if (packet_words32 < $($num_header_words + $num_trailer_words)) + throw std::runtime_error("bad vrt header or invalid packet length"); + if_packet_info.num_header_words32 = $num_header_words; + if_packet_info.num_payload_words32 = packet_words32 - $($num_header_words + $num_trailer_words); + break; + #end for + } +} + +######################################################################## +#end def +######################################################################## + +$gen_code("BE_MACRO", "be") +$gen_code("LE_MACRO", "le") +""" + +def parse_tmpl(_tmpl_text, **kwargs): + from Cheetah.Template import Template + return str(Template(_tmpl_text, kwargs)) + +if __name__ == '__main__': + import sys + open(sys.argv[1], 'w').write(parse_tmpl(TMPL_TEXT, file=__file__)) diff --git a/host/lib/transport/if_addrs.cpp b/host/lib/transport/if_addrs.cpp new file mode 100644 index 000000000..ad9a2325b --- /dev/null +++ b/host/lib/transport/if_addrs.cpp @@ -0,0 +1,109 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/if_addrs.hpp> +#include <boost/asio/ip/address_v4.hpp> +#include <boost/cstdint.hpp> +#include <iostream> + +uhd::transport::if_addrs_t::if_addrs_t(void){ + /* NOP */ +} + +/*********************************************************************** + * Interface address discovery through ifaddrs api + **********************************************************************/ +#if defined(HAVE_IFADDRS_H) +#include <ifaddrs.h> + +static boost::asio::ip::address_v4 sockaddr_to_ip_addr(sockaddr *addr){ + return boost::asio::ip::address_v4(ntohl( + reinterpret_cast<sockaddr_in*>(addr)->sin_addr.s_addr + )); +} + +std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ + std::vector<if_addrs_t> if_addrs; + struct ifaddrs *ifap; + if (getifaddrs(&ifap) == 0){ + for (struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next){ + //ensure that the entries are valid + if (iter->ifa_addr->sa_family != AF_INET) continue; + if (iter->ifa_netmask->sa_family != AF_INET) continue; + if (iter->ifa_broadaddr->sa_family != AF_INET) continue; + + //append a new set of interface addresses + if_addrs_t if_addr; + if_addr.inet = sockaddr_to_ip_addr(iter->ifa_addr).to_string(); + if_addr.mask = sockaddr_to_ip_addr(iter->ifa_netmask).to_string(); + if_addr.bcast = sockaddr_to_ip_addr(iter->ifa_broadaddr).to_string(); + if_addrs.push_back(if_addr); + } + freeifaddrs(ifap); + } + return if_addrs; +} + +/*********************************************************************** + * Interface address discovery through windows api + **********************************************************************/ +#elif defined(HAVE_WINSOCK2_H) +#include <winsock2.h> + +std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ + std::vector<if_addrs_t> if_addrs; + SOCKET sd = WSASocket(AF_INET, SOCK_DGRAM, 0, 0, 0, 0); + if (sd == SOCKET_ERROR) { + std::cerr << "Failed to get a socket. Error " << WSAGetLastError() << + std::endl; return if_addrs; + } + + INTERFACE_INFO InterfaceList[20]; + unsigned long nBytesReturned; + if (WSAIoctl(sd, SIO_GET_INTERFACE_LIST, 0, 0, &InterfaceList, + sizeof(InterfaceList), &nBytesReturned, 0, 0) == SOCKET_ERROR) { + std::cerr << "Failed calling WSAIoctl: error " << WSAGetLastError() << + std::endl; + return if_addrs; + } + + int nNumInterfaces = nBytesReturned / sizeof(INTERFACE_INFO); + for (int i = 0; i < nNumInterfaces; ++i) { + boost::uint32_t iiAddress = ntohl(reinterpret_cast<sockaddr_in&>(InterfaceList[i].iiAddress).sin_addr.s_addr); + boost::uint32_t iiNetmask = ntohl(reinterpret_cast<sockaddr_in&>(InterfaceList[i].iiNetmask).sin_addr.s_addr); + boost::uint32_t iiBroadcastAddress = (iiAddress & iiNetmask) | ~iiNetmask; + + if_addrs_t if_addr; + if_addr.inet = boost::asio::ip::address_v4(iiAddress).to_string(); + if_addr.mask = boost::asio::ip::address_v4(iiNetmask).to_string(); + if_addr.bcast = boost::asio::ip::address_v4(iiBroadcastAddress).to_string(); + if_addrs.push_back(if_addr); + } + + return if_addrs; +} + +/*********************************************************************** + * Interface address discovery not included + **********************************************************************/ +#else /* HAVE_IFADDRS_H */ + +std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){ + return std::vector<if_addrs_t>(); +} + +#endif /* HAVE_IFADDRS_H */ diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp new file mode 100644 index 000000000..cfa77d9ca --- /dev/null +++ b/host/lib/transport/libusb1_base.cpp @@ -0,0 +1,265 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include "libusb1_base.hpp" +#include <uhd/utils/assert.hpp> +#include <uhd/types/dict.hpp> +#include <boost/weak_ptr.hpp> +#include <boost/foreach.hpp> +#include <iostream> + +using namespace uhd; +using namespace uhd::transport; + +/*********************************************************************** + * libusb session + **********************************************************************/ +class libusb_session_impl : public libusb::session{ +public: + libusb_session_impl(void){ + UHD_ASSERT_THROW(libusb_init(&_context) == 0); + libusb_set_debug(_context, debug_level); + } + + ~libusb_session_impl(void){ + libusb_exit(_context); + } + + libusb_context *get_context(void) const{ + return _context; + } + +private: + libusb_context *_context; +}; + +libusb::session::sptr libusb::session::get_global_session(void){ + static boost::weak_ptr<session> global_session; + + //not expired -> get existing session + if (not global_session.expired()) return global_session.lock(); + + //create a new global session + sptr new_global_session(new libusb_session_impl()); + global_session = new_global_session; + return new_global_session; +} + +/*********************************************************************** + * libusb device + **********************************************************************/ +class libusb_device_impl : public libusb::device{ +public: + libusb_device_impl(libusb_device *dev){ + _session = libusb::session::get_global_session(); + _dev = dev; + } + + ~libusb_device_impl(void){ + libusb_unref_device(this->get()); + } + + libusb_device *get(void) const{ + return _dev; + } + +private: + libusb::session::sptr _session; //always keep a reference to session + libusb_device *_dev; +}; + +/*********************************************************************** + * libusb device list + **********************************************************************/ +class libusb_device_list_impl : public libusb::device_list{ +public: + libusb_device_list_impl(void){ + libusb::session::sptr sess = libusb::session::get_global_session(); + + //allocate a new list of devices + libusb_device** dev_list; + ssize_t ret = libusb_get_device_list(sess->get_context(), &dev_list); + if (ret < 0) throw std::runtime_error("cannot enumerate usb devices"); + + //fill the vector of device references + for (size_t i = 0; i < size_t(ret); i++) _devs.push_back( + libusb::device::sptr(new libusb_device_impl(dev_list[i])) + ); + + //free the device list but dont unref (done in ~device) + libusb_free_device_list(dev_list, false/*dont unref*/); + } + + size_t size(void) const{ + return _devs.size(); + } + + libusb::device::sptr at(size_t i) const{ + return _devs.at(i); + } + +private: + std::vector<libusb::device::sptr> _devs; +}; + +libusb::device_list::sptr libusb::device_list::make(void){ + return sptr(new libusb_device_list_impl()); +} + +/*********************************************************************** + * libusb device descriptor + **********************************************************************/ +class libusb_device_descriptor_impl : public libusb::device_descriptor{ +public: + libusb_device_descriptor_impl(libusb::device::sptr dev){ + _dev = dev; + UHD_ASSERT_THROW(libusb_get_device_descriptor(_dev->get(), &_desc) == 0); + } + + const libusb_device_descriptor &get(void) const{ + return _desc; + } + + std::string get_ascii_serial(void) const{ + if (this->get().iSerialNumber == 0) return ""; + + libusb::device_handle::sptr handle( + libusb::device_handle::get_cached_handle(_dev) + ); + + unsigned char buff[512]; + ssize_t ret = libusb_get_string_descriptor_ascii( + handle->get(), this->get().iSerialNumber, buff, sizeof(buff) + ); + if (ret < 0) return ""; //on error, just return empty string + + return std::string((char *)buff, ret); + } + +private: + libusb::device::sptr _dev; //always keep a reference to device + libusb_device_descriptor _desc; +}; + +libusb::device_descriptor::sptr libusb::device_descriptor::make(device::sptr dev){ + return sptr(new libusb_device_descriptor_impl(dev)); +} + +/*********************************************************************** + * libusb device handle + **********************************************************************/ +class libusb_device_handle_impl : public libusb::device_handle{ +public: + libusb_device_handle_impl(libusb::device::sptr dev){ + _dev = dev; + UHD_ASSERT_THROW(libusb_open(_dev->get(), &_handle) == 0); + } + + ~libusb_device_handle_impl(void){ + //release all claimed interfaces + for (size_t i = 0; i < _claimed.size(); i++){ + libusb_release_interface(this->get(), _claimed[i]); + } + libusb_close(_handle); + } + + libusb_device_handle *get(void) const{ + return _handle; + } + + void claim_interface(int interface){ + UHD_ASSERT_THROW(libusb_claim_interface(this->get(), interface) == 0); + _claimed.push_back(interface); + } + +private: + libusb::device::sptr _dev; //always keep a reference to device + libusb_device_handle *_handle; + std::vector<int> _claimed; +}; + +libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::sptr dev){ + static uhd::dict<libusb_device *, boost::weak_ptr<device_handle> > handles; + + //not expired -> get existing handle + if (handles.has_key(dev->get()) and not handles[dev->get()].expired()){ + return handles[dev->get()].lock(); + } + + //create a new cached handle + try{ + sptr new_handle(new libusb_device_handle_impl(dev)); + handles[dev->get()] = new_handle; + return new_handle; + } + catch(const std::exception &e){ + std::cerr << "USB open failed: see the application notes for your device." << std::endl; + throw std::runtime_error(e.what()); + } +} + +/*********************************************************************** + * libusb special handle + **********************************************************************/ +class libusb_special_handle_impl : public libusb::special_handle{ +public: + libusb_special_handle_impl(libusb::device::sptr dev){ + _dev = dev; + } + + libusb::device::sptr get_device(void) const{ + return _dev; + } + + std::string get_serial(void) const{ + return libusb::device_descriptor::make(this->get_device())->get_ascii_serial(); + } + + boost::uint16_t get_vendor_id(void) const{ + return libusb::device_descriptor::make(this->get_device())->get().idVendor; + } + + boost::uint16_t get_product_id(void) const{ + return libusb::device_descriptor::make(this->get_device())->get().idProduct; + } + +private: + libusb::device::sptr _dev; //always keep a reference to device +}; + +libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){ + return sptr(new libusb_special_handle_impl(dev)); +} + +/*********************************************************************** + * list device handles implementations + **********************************************************************/ +std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list( + boost::uint16_t vid, boost::uint16_t pid +){ + std::vector<usb_device_handle::sptr> handles; + + libusb::device_list::sptr dev_list = libusb::device_list::make(); + for (size_t i = 0; i < dev_list->size(); i++){ + usb_device_handle::sptr handle = libusb::special_handle::make(dev_list->at(i)); + if (handle->get_vendor_id() == vid and handle->get_product_id() == pid){ + handles.push_back(handle); + } + } + + return handles; +} diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp new file mode 100644 index 000000000..04c1d6574 --- /dev/null +++ b/host/lib/transport/libusb1_base.hpp @@ -0,0 +1,149 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP +#define INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP + +#include <uhd/config.hpp> +#include <boost/utility.hpp> +#include <boost/shared_ptr.hpp> +#include <uhd/transport/usb_device_handle.hpp> +#include <libusb.h> + +/*********************************************************************** + * Libusb object oriented smart pointer wrappers: + * The following wrappers provide allocation and automatic deallocation + * for various libusb data types and handles. The construction routines + * also store tables of already allocated structures to avoid multiple + * occurrences of opened handles (for example). + **********************************************************************/ +namespace uhd { namespace transport { + +namespace libusb { + + /*! + * This session class holds a global libusb context for this process. + * The get global session call will create a new context if none exists. + * When all references to session are destroyed, the context will be freed. + */ + class session : boost::noncopyable { + public: + typedef boost::shared_ptr<session> sptr; + + /*! + * Level 0: no messages ever printed by the library (default) + * Level 1: error messages are printed to stderr + * Level 2: warning and error messages are printed to stderr + * Level 3: informational messages are printed to stdout, warning + * and error messages are printed to stderr + */ + static const int debug_level = 0; + + //! get a shared pointer to the global session + static sptr get_global_session(void); + + //! get the underlying libusb context pointer + virtual libusb_context *get_context(void) const = 0; + }; + + /*! + * Holds a device pointer with a reference to the session. + */ + class device : boost::noncopyable { + public: + typedef boost::shared_ptr<device> sptr; + + //! get the underlying device pointer + virtual libusb_device *get(void) const = 0; + }; + + /*! + * This device list class holds a device list that will be + * automatically freed when the last reference is destroyed. + */ + class device_list : boost::noncopyable { + public: + typedef boost::shared_ptr<device_list> sptr; + + //! make a new device list + static sptr make(void); + + //! the number of devices in this list + virtual size_t size() const = 0; + + //! get the device pointer at a particular index + virtual device::sptr at(size_t index) const = 0; + }; + + /*! + * Holds a device descriptor and a reference to the device. + */ + class device_descriptor : boost::noncopyable { + public: + typedef boost::shared_ptr<device_descriptor> sptr; + + //! make a new descriptor from a device reference + static sptr make(device::sptr); + + //! get the underlying device descriptor + virtual const libusb_device_descriptor &get(void) const = 0; + + virtual std::string get_ascii_serial(void) const = 0; + }; + + /*! + * Holds a device handle and a reference to the device. + */ + class device_handle : boost::noncopyable { + public: + typedef boost::shared_ptr<device_handle> sptr; + + //! get a cached handle or make a new one given the device + static sptr get_cached_handle(device::sptr); + + //! get the underlying device handle + virtual libusb_device_handle *get(void) const = 0; + + /*! + * Open USB interfaces for control using magic value + * IN interface: 2 + * OUT interface: 1 + * Control interface: 0 + */ + virtual void claim_interface(int) = 0; + }; + + /*! + * The special handle is our internal implementation of the + * usb device handle which is used publicly to identify a device. + */ + class special_handle : public usb_device_handle { + public: + typedef boost::shared_ptr<special_handle> sptr; + + //! make a new special handle from device + static sptr make(device::sptr); + + //! get the underlying device reference + virtual device::sptr get_device(void) const = 0; + }; + +} + +}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP */ diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp new file mode 100644 index 000000000..f903907d0 --- /dev/null +++ b/host/lib/transport/libusb1_control.cpp @@ -0,0 +1,64 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include "libusb1_base.hpp" +#include <uhd/transport/usb_control.hpp> + +using namespace uhd::transport; + +const int libusb_timeout = 0; + +/*********************************************************************** + * libusb-1.0 implementation of USB control transport + **********************************************************************/ +class libusb_control_impl : public usb_control { +public: + libusb_control_impl(libusb::device_handle::sptr handle): + _handle(handle) + { + _handle->claim_interface(0 /* control interface */); + } + + ssize_t submit(boost::uint8_t request_type, + boost::uint8_t request, + boost::uint16_t value, + boost::uint16_t index, + unsigned char *buff, + boost::uint16_t length + ){ + return libusb_control_transfer(_handle->get(), + request_type, + request, + value, + index, + buff, + length, + libusb_timeout); + } + +private: + libusb::device_handle::sptr _handle; +}; + +/*********************************************************************** + * USB control public make functions + **********************************************************************/ +usb_control::sptr usb_control::make(usb_device_handle::sptr handle){ + return sptr(new libusb_control_impl(libusb::device_handle::get_cached_handle( + boost::static_pointer_cast<libusb::special_handle>(handle)->get_device() + ))); +} diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp new file mode 100644 index 000000000..311a8953b --- /dev/null +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -0,0 +1,444 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include "libusb1_base.hpp" +#include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/thread_priority.hpp> +#include <uhd/utils/assert.hpp> +#include <boost/foreach.hpp> +#include <boost/thread.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <vector> +#include <iostream> + +using namespace uhd; +using namespace uhd::transport; + +static const double CLEANUP_TIMEOUT = 0.2; //seconds +static const size_t DEFAULT_NUM_XFERS = 16; //num xfers +static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes + +/*********************************************************************** + * Helper functions + ***********************************************************************/ +/* + * Print the values of a libusb_transfer struct + * http://libusb.sourceforge.net/api-1.0/structlibusb__transfer.html + */ +void pp_transfer(libusb_transfer *lut) +{ + std::cout << "Libusb transfer" << std::endl; + std::cout << " flags: 0x" << std::hex << (unsigned int) lut->flags << std::endl; + std::cout << " endpoint: 0x" << std::hex << (unsigned int) lut->endpoint << std::endl; + std::cout << " type: 0x" << std::hex << (unsigned int) lut->type << std::endl; + std::cout << " timeout: " << std::dec << lut->timeout << std::endl; + std::cout << " status: 0x" << std::hex << lut->status << std::endl; + std::cout << " length: " << std::dec << lut->length << std::endl; + std::cout << " actual_length: " << std::dec << lut->actual_length << std::endl; +} + +/*********************************************************************** + * USB asynchronous zero_copy endpoint + * This endpoint implementation provides asynchronous I/O to libusb-1.0 + * devices. Each endpoint is directional and two can be combined to + * create a bidirectional interface. It is a zero copy implementation + * with respect to libusb, however, each send and recv requires a copy + * operation from kernel to userspace; this is due to the usbfs + * interface provided by the kernel. + **********************************************************************/ +class usb_endpoint { +public: + typedef boost::shared_ptr<usb_endpoint> sptr; + + usb_endpoint( + libusb::device_handle::sptr handle, + int endpoint, + bool input, + size_t transfer_size, + size_t num_transfers + ); + + ~usb_endpoint(void); + + // Exposed interface for submitting / retrieving transfer buffers + + //! Submit a new transfer that was presumably just filled or emptied. + void submit(libusb_transfer *lut); + + /*! + * Get an available transfer: + * For inputs, this is a just filled transfer. + * For outputs, this is a just emptied transfer. + * \param timeout the timeout to wait for a lut + * \return the transfer pointer or NULL if timeout + */ + libusb_transfer *get_lut_with_wait(double timeout); + + //Callback use only + void callback_handle_transfer(libusb_transfer *lut); + +private: + libusb::device_handle::sptr _handle; + int _endpoint; + bool _input; + + //! hold a bounded buffer of completed transfers + typedef bounded_buffer<libusb_transfer *> lut_buff_type; + lut_buff_type::sptr _completed_list; + + //! a list of all transfer structs we allocated + std::vector<libusb_transfer *> _all_luts; + + //! memory allocated for the transfer buffers + buffer_pool::sptr _buffer_pool; + + // Calls for processing asynchronous I/O + libusb_transfer *allocate_transfer(void *mem, size_t len); + void print_transfer_status(libusb_transfer *lut); +}; + + +/* + * Callback function called when submitted transfers complete. + * The endpoint upon which the transfer is part of is recovered + * and the transfer moved from pending to completed state. + * Callbacks occur during the reaping calls where libusb_handle_events() + * is used. The callback only modifies the transfer state by moving + * it from the pending to completed status list. + * \param lut pointer to libusb_transfer + */ +static void callback(libusb_transfer *lut){ + usb_endpoint *endpoint = (usb_endpoint *) lut->user_data; + endpoint->callback_handle_transfer(lut); +} + + +/* + * Accessor call to allow list access from callback space + * \param pointer to libusb_transfer + */ +void usb_endpoint::callback_handle_transfer(libusb_transfer *lut){ + _completed_list->push_with_wait(lut); +} + + +/* + * Constructor + * Allocate libusb transfers and mark as free. For IN endpoints, + * submit the transfers so that they're ready to return when + * data is available. + */ +usb_endpoint::usb_endpoint( + libusb::device_handle::sptr handle, + int endpoint, + bool input, + size_t transfer_size, + size_t num_transfers +): + _handle(handle), + _endpoint(endpoint), + _input(input) +{ + _completed_list = lut_buff_type::make(num_transfers); + _buffer_pool = buffer_pool::make(num_transfers, transfer_size); + for (size_t i = 0; i < num_transfers; i++){ + _all_luts.push_back(allocate_transfer(_buffer_pool->at(i), transfer_size)); + + //input luts are immediately submitted to be filled + //output luts go into the completed list as free buffers + if (_input) this->submit(_all_luts.back()); + else _completed_list->push_with_wait(_all_luts.back()); + } +} + + +/* + * Destructor + * Make sure all the memory is freed. Cancel any pending transfers. + * When all completed transfers are moved to the free list, release + * the transfers. Libusb will deallocate the data buffer held by + * each transfer. + */ +usb_endpoint::~usb_endpoint(void){ + //cancel all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_cancel_transfer(lut); + } + + //collect canceled transfers (drain the queue) + while (this->get_lut_with_wait(CLEANUP_TIMEOUT) != NULL){}; + + //free all transfers + BOOST_FOREACH(libusb_transfer *lut, _all_luts){ + libusb_free_transfer(lut); + } +} + + +/* + * Allocate a libusb transfer + * The allocated transfer - and buffer it contains - is repeatedly + * submitted, reaped, and reused and should not be freed until shutdown. + * \param mem a pointer to the buffer memory + * \param len size of the individual buffer + * \return pointer to an allocated libusb_transfer + */ +libusb_transfer *usb_endpoint::allocate_transfer(void *mem, size_t len){ + libusb_transfer *lut = libusb_alloc_transfer(0); + UHD_ASSERT_THROW(lut != NULL); + + unsigned int endpoint = ((_endpoint & 0x7f) | (_input ? 0x80 : 0)); + unsigned char *buff = reinterpret_cast<unsigned char *>(mem); + libusb_transfer_cb_fn lut_callback = libusb_transfer_cb_fn(&callback); + + libusb_fill_bulk_transfer(lut, // transfer + _handle->get(), // dev_handle + endpoint, // endpoint + buff, // buffer + len, // length + lut_callback, // callback + this, // user_data + 0); // timeout + return lut; +} + + +/* + * Asynchonous transfer submission + * Submit a libusb transfer to libusb add pending status + * \param lut pointer to libusb_transfer + * \return true on success or false on error + */ +void usb_endpoint::submit(libusb_transfer *lut){ + UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); +} + +/* + * Print status errors of a completed transfer + * \param lut pointer to an libusb_transfer + */ +void usb_endpoint::print_transfer_status(libusb_transfer *lut){ + std::cout << "here " << lut->status << std::endl; + switch (lut->status) { + case LIBUSB_TRANSFER_COMPLETED: + if (lut->actual_length < lut->length) { + std::cerr << "USB: transfer completed with short write," + << " length = " << lut->length + << " actual = " << lut->actual_length << std::endl; + } + + if ((lut->actual_length < 0) || (lut->length < 0)) { + std::cerr << "USB: transfer completed with invalid response" + << std::endl; + } + break; + case LIBUSB_TRANSFER_CANCELLED: + break; + case LIBUSB_TRANSFER_NO_DEVICE: + std::cerr << "USB: device was disconnected" << std::endl; + break; + case LIBUSB_TRANSFER_OVERFLOW: + std::cerr << "USB: device sent more data than requested" << std::endl; + break; + case LIBUSB_TRANSFER_TIMED_OUT: + std::cerr << "USB: transfer timed out" << std::endl; + break; + case LIBUSB_TRANSFER_STALL: + std::cerr << "USB: halt condition detected (stalled)" << std::endl; + break; + case LIBUSB_TRANSFER_ERROR: + std::cerr << "USB: transfer failed" << std::endl; + break; + default: + std::cerr << "USB: received unknown transfer status" << std::endl; + } +} + +libusb_transfer *usb_endpoint::get_lut_with_wait(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + libusb_transfer *lut; + if (_completed_list->pop_with_timed_wait(lut, timeout)) return lut; + return NULL; +} + +/*********************************************************************** + * USB zero_copy device class + **********************************************************************/ +class libusb_zero_copy_impl : public usb_zero_copy, public boost::enable_shared_from_this<libusb_zero_copy_impl> { +public: + + libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints + ); + + ~libusb_zero_copy_impl(void){ + _threads_running = false; + _thread_group.interrupt_all(); + _thread_group.join_all(); + } + + managed_recv_buffer::sptr get_recv_buff(double); + managed_send_buffer::sptr get_send_buff(double); + + size_t get_num_recv_frames(void) const { return _num_recv_frames; } + size_t get_num_send_frames(void) const { return _num_send_frames; } + + size_t get_recv_frame_size(void) const { return _recv_frame_size; } + size_t get_send_frame_size(void) const { return _send_frame_size; } + +private: + void release(libusb_transfer *lut){ + _recv_ep->submit(lut); + } + + void commit(libusb_transfer *lut, size_t num_bytes){ + lut->length = num_bytes; + try{ + _send_ep->submit(lut); + } + catch(const std::exception &e){ + std::cerr << "Error in commit: " << e.what() << std::endl; + } + } + + libusb::device_handle::sptr _handle; + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; + usb_endpoint::sptr _recv_ep, _send_ep; + + //event handler threads + boost::thread_group _thread_group; + bool _threads_running; + + void run_event_loop(void){ + set_thread_priority_safe(); + libusb::session::sptr session = libusb::session::get_global_session(); + _threads_running = true; + try{ + while(_threads_running){ + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; //100ms + libusb_handle_events_timeout(session->get_context(), &tv); + } + } catch(const boost::thread_interrupted &){} + } +}; + +/* + * Constructor + * Initializes libusb, opens devices, and sets up interfaces for I/O. + * Finally, creates endpoints for asynchronous I/O. + */ +libusb_zero_copy_impl::libusb_zero_copy_impl( + libusb::device_handle::sptr handle, + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints +): + _handle(handle), + _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))), + _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))), + _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))), + _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))) +{ + _handle->claim_interface(2 /*in interface*/); + _handle->claim_interface(1 /*out interface*/); + + _recv_ep = usb_endpoint::sptr(new usb_endpoint( + _handle, // libusb device_handle + recv_endpoint, // USB endpoint number + true, // IN endpoint + this->get_recv_frame_size(), // buffer size per transfer + this->get_num_recv_frames() // number of libusb transfers + )); + + _send_ep = usb_endpoint::sptr(new usb_endpoint( + _handle, // libusb device_handle + send_endpoint, // USB endpoint number + false, // OUT endpoint + this->get_send_frame_size(), // buffer size per transfer + this->get_num_send_frames() // number of libusb transfers + )); + + //spawn the event handler threads + size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); + for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( + boost::bind(&libusb_zero_copy_impl::run_event_loop, this) + ); +} + +/* + * Construct a managed receive buffer from a completed libusb transfer + * (happy with buffer full of data) obtained from the receive endpoint. + * Return empty pointer if no transfer is available (timeout or error). + * \return pointer to a managed receive buffer + */ +managed_recv_buffer::sptr libusb_zero_copy_impl::get_recv_buff(double timeout){ + libusb_transfer *lut = _recv_ep->get_lut_with_wait(timeout); + if (lut == NULL) { + return managed_recv_buffer::sptr(); + } + else { + return managed_recv_buffer::make_safe( + boost::asio::const_buffer(lut->buffer, lut->actual_length), + boost::bind(&libusb_zero_copy_impl::release, shared_from_this(), lut) + ); + } +} + + +/* + * Construct a managed send buffer from a free libusb transfer (with + * empty buffer). Return empty pointer of no transfer is available + * (timeout or error). + * \return pointer to a managed send buffer + */ +managed_send_buffer::sptr libusb_zero_copy_impl::get_send_buff(double timeout){ + libusb_transfer *lut = _send_ep->get_lut_with_wait(timeout); + if (lut == NULL) { + return managed_send_buffer::sptr(); + } + else { + return managed_send_buffer::make_safe( + boost::asio::mutable_buffer(lut->buffer, this->get_send_frame_size()), + boost::bind(&libusb_zero_copy_impl::commit, shared_from_this(), lut, _1) + ); + } +} + +/*********************************************************************** + * USB zero_copy make functions + **********************************************************************/ +usb_zero_copy::sptr usb_zero_copy::make( + usb_device_handle::sptr handle, + size_t recv_endpoint, + size_t send_endpoint, + const device_addr_t &hints +){ + libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle( + boost::static_pointer_cast<libusb::special_handle>(handle)->get_device() + )); + return sptr(new libusb_zero_copy_impl( + dev_handle, recv_endpoint, send_endpoint, hints + )); +} diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp new file mode 100644 index 000000000..6799ac7b2 --- /dev/null +++ b/host/lib/transport/udp_simple.cpp @@ -0,0 +1,173 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/udp_simple.hpp> +#include <boost/asio.hpp> +#include <boost/thread.hpp> +#include <boost/format.hpp> +#include <iostream> + +using namespace uhd::transport; + +/*********************************************************************** + * Helper Functions + **********************************************************************/ +/*! + * Wait for available data or timeout. + * \param socket the asio socket + * \param timeout the timeout in seconds + * \return false for timeout, true for data + */ +static bool wait_available( + boost::asio::ip::udp::socket &socket, double timeout +){ + #if defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = long(timeout*1e6); + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(socket.native(), &rset); + + return ::select(socket.native()+1, &rset, NULL, NULL, &tv) > 0; + + #else /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ + + //FIXME: why does select fail on macintosh? + for (size_t i = 0; i < size_t(timeout*1e3); i++){ + if (socket.available()) return true; + boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + } + return false; + + #endif /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ +} + +/*********************************************************************** + * UDP connected implementation class + **********************************************************************/ +class udp_connected_impl : public udp_simple{ +public: + //structors + udp_connected_impl(const std::string &addr, const std::string &port); + ~udp_connected_impl(void); + + //send/recv + size_t send(const boost::asio::const_buffer &); + size_t recv(const boost::asio::mutable_buffer &, double); + +private: + boost::asio::ip::udp::socket *_socket; + boost::asio::io_service _io_service; +}; + +udp_connected_impl::udp_connected_impl(const std::string &addr, const std::string &port){ + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + + // resolve the address + boost::asio::ip::udp::resolver resolver(_io_service); + boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); + boost::asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + + // Create, open, and connect the socket + _socket = new boost::asio::ip::udp::socket(_io_service); + _socket->open(boost::asio::ip::udp::v4()); + _socket->connect(receiver_endpoint); +} + +udp_connected_impl::~udp_connected_impl(void){ + delete _socket; +} + +size_t udp_connected_impl::send(const boost::asio::const_buffer &buff){ + return _socket->send(boost::asio::buffer(buff)); +} + +size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ + if (not wait_available(*_socket, timeout)) return 0; + return _socket->receive(boost::asio::buffer(buff)); +} + +/*********************************************************************** + * UDP broadcast implementation class + **********************************************************************/ +class udp_broadcast_impl : public udp_simple{ +public: + //structors + udp_broadcast_impl(const std::string &addr, const std::string &port); + ~udp_broadcast_impl(void); + + //send/recv + size_t send(const boost::asio::const_buffer &); + size_t recv(const boost::asio::mutable_buffer &, double); + +private: + boost::asio::ip::udp::socket *_socket; + boost::asio::ip::udp::endpoint _receiver_endpoint; + boost::asio::io_service _io_service; +}; + +udp_broadcast_impl::udp_broadcast_impl(const std::string &addr, const std::string &port){ + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + + // resolve the address + boost::asio::ip::udp::resolver resolver(_io_service); + boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); + _receiver_endpoint = *resolver.resolve(query); + + // Create and open the socket + _socket = new boost::asio::ip::udp::socket(_io_service); + _socket->open(boost::asio::ip::udp::v4()); + + // Allow broadcasting + boost::asio::socket_base::broadcast option(true); + _socket->set_option(option); + +} + +udp_broadcast_impl::~udp_broadcast_impl(void){ + delete _socket; +} + +size_t udp_broadcast_impl::send(const boost::asio::const_buffer &buff){ + return _socket->send_to(boost::asio::buffer(buff), _receiver_endpoint); +} + +size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){ + if (not wait_available(*_socket, timeout)) return 0; + boost::asio::ip::udp::endpoint sender_endpoint; + return _socket->receive_from(boost::asio::buffer(buff), sender_endpoint); +} + +/*********************************************************************** + * UDP public make functions + **********************************************************************/ +udp_simple::sptr udp_simple::make_connected( + const std::string &addr, const std::string &port +){ + return sptr(new udp_connected_impl(addr, port)); +} + +udp_simple::sptr udp_simple::make_broadcast( + const std::string &addr, const std::string &port +){ + return sptr(new udp_broadcast_impl(addr, port)); +} diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp new file mode 100644 index 000000000..6d1a7413b --- /dev/null +++ b/host/lib/transport/udp_zero_copy_asio.cpp @@ -0,0 +1,386 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/udp_zero_copy.hpp> +#include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/thread_priority.hpp> +#include <uhd/utils/assert.hpp> +#include <uhd/utils/warning.hpp> +#include <boost/asio.hpp> +#include <boost/format.hpp> +#include <boost/thread.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <iostream> + +using namespace uhd; +using namespace uhd::transport; +namespace asio = boost::asio; + +/*********************************************************************** + * Constants + **********************************************************************/ +//Define this to the the boost async io calls to perform receive. +//Otherwise, get_recv_buff uses a blocking receive with timeout. +#define USE_ASIO_ASYNC_RECV + +//Define this to the the boost async io calls to perform send. +//Otherwise, the commit callback uses a blocking send. +//#define USE_ASIO_ASYNC_SEND + +//By default, this buffer is sized insufficiently small. +//For peformance, this buffer should be 10s of megabytes. +static const size_t MIN_RECV_SOCK_BUFF_SIZE = size_t(10e3); + +//Large buffers cause more underflow at high rates. +//Perhaps this is due to the kernel scheduling, +//but may change with host-based flow control. +static const size_t MIN_SEND_SOCK_BUFF_SIZE = size_t(10e3); + +//The number of async frames to allocate for each send and recv: +//The non-async recv can have a very large number of recv frames +//because the CPU overhead is independent of the number of frames. +#ifdef USE_ASIO_ASYNC_RECV +static const size_t DEFAULT_NUM_RECV_FRAMES = 32; +#else +static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu; +#endif + +//The non-async send only ever requires a single frame +//because the buffer will be committed before a new get. +#ifdef USE_ASIO_ASYNC_SEND +static const size_t DEFAULT_NUM_SEND_FRAMES = 32; +#else +static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu; +#endif + +//The number of service threads to spawn for async ASIO: +//A single concurrent thread for io_service seems to be the fastest. +//Threads are disabled when no async implementations are enabled. +#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND) +static const size_t CONCURRENCY_HINT = 1; +#else +static const size_t CONCURRENCY_HINT = 0; +#endif + +/*********************************************************************** + * Zero Copy UDP implementation with ASIO: + * This is the portable zero copy implementation for systems + * where a faster, platform specific solution is not available. + * However, it is not a true zero copy implementation as each + * send and recv requires a copy operation to/from userspace. + **********************************************************************/ +class udp_zero_copy_asio_impl : public udp_zero_copy, public boost::enable_shared_from_this<udp_zero_copy_asio_impl> { +public: + typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr; + + udp_zero_copy_asio_impl( + const std::string &addr, + const std::string &port, + const device_addr_t &hints + ): + _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu-4))), //TODO remove -4 when FPGA handles 2 byte pad correctly + _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))), + _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), + _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES))), + _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)), + _io_service(_concurrency_hint) + { + //std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl; + + //resolve the address + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); + asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + + //create, open, and connect the socket + _socket = new asio::ip::udp::socket(_io_service); + _socket->open(asio::ip::udp::v4()); + _socket->connect(receiver_endpoint); + _sock_fd = _socket->native(); + } + + ~udp_zero_copy_asio_impl(void){ + delete _work; //allow io_service run to complete + _thread_group.join_all(); //wait for service threads to exit + delete _socket; + } + + void init(void){ + //allocate all recv frames and release them to begin xfers + _pending_recv_buffs = pending_buffs_type::make(_num_recv_frames); + _recv_buffer_pool = buffer_pool::make(_num_recv_frames, _recv_frame_size); + for (size_t i = 0; i < _num_recv_frames; i++){ + release(_recv_buffer_pool->at(i)); + } + + //allocate all send frames and push them into the fifo + _pending_send_buffs = pending_buffs_type::make(_num_send_frames); + _send_buffer_pool = buffer_pool::make(_num_send_frames, _send_frame_size); + for (size_t i = 0; i < _num_send_frames; i++){ + handle_send(_send_buffer_pool->at(i)); + } + + //spawn the service threads that will run the io service + _work = new asio::io_service::work(_io_service); //new work to delete later + for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread( + boost::bind(&udp_zero_copy_asio_impl::service, this) + ); + } + + void service(void){ + set_thread_priority_safe(); + _io_service.run(); + } + + //get size for internal socket buffer + template <typename Opt> size_t get_buff_size(void) const{ + Opt option; + _socket->get_option(option); + return option.value(); + } + + //set size for internal socket buffer + template <typename Opt> size_t resize_buff(size_t num_bytes){ + Opt option(num_bytes); + _socket->set_option(option); + return get_buff_size<Opt>(); + } + + //! handle a recv callback -> push the filled memory into the fifo + UHD_INLINE void handle_recv(void *mem, size_t len){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_recv_buffs->push_with_wait(boost::asio::buffer(mem, len)); + } + + //////////////////////////////////////////////////////////////////// + #ifdef USE_ASIO_ASYNC_RECV + //////////////////////////////////////////////////////////////////// + //! pop a filled recv buffer off of the fifo and bind with the release callback + managed_recv_buffer::sptr get_recv_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_recv_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_recv_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast<void*>(buff) + ) + ); + } + return managed_recv_buffer::sptr(); + } + + //! release a recv buffer -> start an async recv on the buffer + void release(void *mem){ + _socket->async_receive( + boost::asio::buffer(mem, this->get_recv_frame_size()), + boost::bind( + &udp_zero_copy_asio_impl::handle_recv, + shared_from_this(), mem, + asio::placeholders::bytes_transferred + ) + ); + } + + //////////////////////////////////////////////////////////////////// + #else /*USE_ASIO_ASYNC_RECV*/ + //////////////////////////////////////////////////////////////////// + managed_recv_buffer::sptr get_recv_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = long(timeout*1e6); + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(_sock_fd, &rset); + + //call select to perform timed wait and grab an available buffer with wait + //if the condition is true, call receive and return the managed buffer + if ( + ::select(_sock_fd+1, &rset, NULL, NULL, &tv) > 0 and + _pending_recv_buffs->pop_with_timed_wait(buff, timeout) + ){ + return managed_recv_buffer::make_safe( + asio::buffer( + boost::asio::buffer_cast<void *>(buff), + _socket->receive(asio::buffer(buff)) + ), + boost::bind( + &udp_zero_copy_asio_impl::release, + shared_from_this(), + asio::buffer_cast<void*>(buff) + ) + ); + } + return managed_recv_buffer::sptr(); + } + + void release(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + handle_recv(mem, this->get_recv_frame_size()); + } + + //////////////////////////////////////////////////////////////////// + #endif /*USE_ASIO_ASYNC_RECV*/ + //////////////////////////////////////////////////////////////////// + + size_t get_num_recv_frames(void) const {return _num_recv_frames;} + size_t get_recv_frame_size(void) const {return _recv_frame_size;} + + //! handle a send callback -> push the emptied memory into the fifo + UHD_INLINE void handle_send(void *mem){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + _pending_send_buffs->push_with_wait(boost::asio::buffer(mem, this->get_send_frame_size())); + } + + //! pop an empty send buffer off of the fifo and bind with the commit callback + managed_send_buffer::sptr get_send_buff(double timeout){ + boost::this_thread::disable_interruption di; //disable because the wait can throw + asio::mutable_buffer buff; + if (_pending_send_buffs->pop_with_timed_wait(buff, timeout)){ + return managed_send_buffer::make_safe( + buff, boost::bind( + &udp_zero_copy_asio_impl::commit, + shared_from_this(), + asio::buffer_cast<void*>(buff), _1 + ) + ); + } + return managed_send_buffer::sptr(); + } + + //////////////////////////////////////////////////////////////////// + #ifdef USE_ASIO_ASYNC_SEND + //////////////////////////////////////////////////////////////////// + //! commit a send buffer -> start an async send on the buffer + void commit(void *mem, size_t len){ + _socket->async_send( + boost::asio::buffer(mem, len), + boost::bind( + &udp_zero_copy_asio_impl::handle_send, + shared_from_this(), mem + ) + ); + } + + //////////////////////////////////////////////////////////////////// + #else /*USE_ASIO_ASYNC_SEND*/ + //////////////////////////////////////////////////////////////////// + void commit(void *mem, size_t len){ + _socket->send(asio::buffer(mem, len)); + handle_send(mem); + } + + //////////////////////////////////////////////////////////////////// + #endif /*USE_ASIO_ASYNC_SEND*/ + //////////////////////////////////////////////////////////////////// + + size_t get_num_send_frames(void) const {return _num_send_frames;} + size_t get_send_frame_size(void) const {return _send_frame_size;} + +private: + //memory management -> buffers and fifos + boost::thread_group _thread_group; + buffer_pool::sptr _send_buffer_pool, _recv_buffer_pool; + typedef bounded_buffer<asio::mutable_buffer> pending_buffs_type; + pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs; + const size_t _recv_frame_size, _num_recv_frames; + const size_t _send_frame_size, _num_send_frames; + + //asio guts -> socket and service + size_t _concurrency_hint; + asio::io_service _io_service; + asio::ip::udp::socket *_socket; + asio::io_service::work *_work; + int _sock_fd; +}; + +/*********************************************************************** + * UDP zero copy make function + **********************************************************************/ +template<typename Opt> static void resize_buff_helper( + udp_zero_copy_asio_impl::sptr udp_trans, + const size_t target_size, + const std::string &name +){ + size_t min_sock_buff_size = 0; + if (name == "recv") min_sock_buff_size = MIN_RECV_SOCK_BUFF_SIZE; + if (name == "send") min_sock_buff_size = MIN_SEND_SOCK_BUFF_SIZE; + min_sock_buff_size = std::max(min_sock_buff_size, target_size); + + std::string help_message; + #if defined(UHD_PLATFORM_LINUX) + help_message = str(boost::format( + "Please run: sudo sysctl -w net.core.%smem_max=%d\n" + ) % ((name == "recv")?"r":"w") % min_sock_buff_size); + #endif /*defined(UHD_PLATFORM_LINUX)*/ + + //resize the buffer if size was provided + if (target_size > 0){ + size_t actual_size = udp_trans->resize_buff<Opt>(target_size); + if (target_size != actual_size) std::cout << boost::format( + "Target %s sock buff size: %d bytes\n" + "Actual %s sock buff size: %d bytes" + ) % name % target_size % name % actual_size << std::endl; + else std::cout << boost::format( + "Current %s sock buff size: %d bytes" + ) % name % actual_size << std::endl; + if (actual_size < target_size) uhd::warning::post(str(boost::format( + "The %s buffer is smaller than the requested size.\n" + "The minimum requested buffer size is %d bytes.\n" + "See the transport application notes on buffer resizing.\n%s" + ) % name % min_sock_buff_size % help_message)); + } + + //only enable on platforms that are happy with the large buffer resize + #if defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + //otherwise, ensure that the buffer is at least the minimum size + else if (udp_trans->get_buff_size<Opt>() < min_sock_buff_size){ + resize_buff_helper<Opt>(udp_trans, min_sock_buff_size, name); + } + #endif /*defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)*/ +} + +udp_zero_copy::sptr udp_zero_copy::make( + const std::string &addr, + const std::string &port, + const device_addr_t &hints +){ + udp_zero_copy_asio_impl::sptr udp_trans( + new udp_zero_copy_asio_impl(addr, port, hints) + ); + + //extract buffer size hints from the device addr + size_t recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0)); + size_t send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0)); + + //call the helper to resize send and recv buffers + resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv"); + resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send"); + + udp_trans->init(); //buffers resized -> call init() to use + + return udp_trans; +} diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp new file mode 100644 index 000000000..8a9772e7f --- /dev/null +++ b/host/lib/transport/usb_dummy_impl.cpp @@ -0,0 +1,39 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/usb_device_handle.hpp> +#include <uhd/transport/usb_control.hpp> +#include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/utils/exception.hpp> + +using namespace uhd; +using namespace uhd::transport; + +std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::uint16_t, boost::uint16_t){ + return std::vector<usb_device_handle::sptr>(); //empty list +} + +usb_control::sptr usb_control::make(usb_device_handle::sptr){ + throw std::runtime_error("no usb support -> usb_control::make not implemented"); +} + +usb_zero_copy::sptr usb_zero_copy::make( + usb_device_handle::sptr, + size_t, size_t, const device_addr_t & +){ + throw std::runtime_error("no usb support -> usb_zero_copy::make not implemented"); +} diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp new file mode 100644 index 000000000..c535edd04 --- /dev/null +++ b/host/lib/transport/vrt_packet_handler.hpp @@ -0,0 +1,466 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/assert.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/convert.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/function.hpp> +#include <stdexcept> +#include <iostream> +#include <vector> + +namespace vrt_packet_handler{ + +//this may change in the future but its a constant for now +static const size_t OTW_BYTES_PER_SAMP = sizeof(boost::uint32_t); + +template <typename T> UHD_INLINE T get_context_code( + const boost::uint32_t *vrt_hdr, + const uhd::transport::vrt::if_packet_info_t &if_packet_info +){ + //extract the context word (we dont know the endianness so mirror the bytes) + boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | + uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); + return T(word0 & 0xff); +} + +/*********************************************************************** + * vrt packet handler for recv + **********************************************************************/ + typedef std::vector<uhd::transport::managed_recv_buffer::sptr> managed_recv_buffs_t; + typedef boost::function<bool(managed_recv_buffs_t &)> get_recv_buffs_t; + typedef boost::function<void(size_t /*which channel*/)> handle_overflow_t; + typedef boost::function<void(const boost::uint32_t *, uhd::transport::vrt::if_packet_info_t &)> vrt_unpacker_t; + + static inline void handle_overflow_nop(size_t){} + + struct recv_state{ + //width of the receiver in channels + size_t width; + + //state variables to handle fragments + managed_recv_buffs_t managed_buffs; + std::vector<const boost::uint8_t *> copy_buffs; + size_t size_of_copy_buffs; + size_t fragment_offset_in_samps; + + recv_state(size_t width = 1): + width(width), + managed_buffs(width), + copy_buffs(width, NULL), + size_of_copy_buffs(0), + fragment_offset_in_samps(0) + { + /* NOP */ + } + }; + + /******************************************************************* + * Unpack a received vrt header and set the copy buffer. + * - helper function for vrt_packet_handler::_recv1 + ******************************************************************/ + static UHD_INLINE void _recv1_helper( + recv_state &state, + uhd::rx_metadata_t &metadata, + double tick_rate, + const vrt_unpacker_t &vrt_unpacker, + const handle_overflow_t &handle_overflow, + size_t vrt_header_offset_words32 + ){ + //vrt unpack each managed buffer + uhd::transport::vrt::if_packet_info_t if_packet_info; + for (size_t i = 0; i < state.width; i++){ + if (state.managed_buffs[i].get() == NULL) continue; //better have a message packet coming up... + + //extract packet words and check thats its enough to move on + size_t num_packet_words32 = state.managed_buffs[i]->size()/sizeof(boost::uint32_t); + if (num_packet_words32 <= vrt_header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //unpack the vrt header into the info struct + const boost::uint32_t *vrt_hdr = state.managed_buffs[i]->cast<const boost::uint32_t *>() + vrt_header_offset_words32; + if_packet_info.num_packet_words32 = num_packet_words32 - vrt_header_offset_words32; + vrt_unpacker(vrt_hdr, if_packet_info); + + //handle the non-data packet case and parse its contents + if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA){ + + metadata.error_code = get_context_code<uhd::rx_metadata_t::error_code_t>(vrt_hdr, if_packet_info); + if (metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) handle_overflow(i); + + //break to exit loop and store metadata below + state.size_of_copy_buffs = 0; break; + } + + //setup the buffer to point to the data + state.copy_buffs[i] = reinterpret_cast<const boost::uint8_t *>(vrt_hdr + if_packet_info.num_header_words32); + + //store the minimum payload length into the copy buffer length + size_t num_payload_bytes = if_packet_info.num_payload_words32*sizeof(boost::uint32_t); + if (i == 0 or state.size_of_copy_buffs > num_payload_bytes){ + state.size_of_copy_buffs = num_payload_bytes; + } + } + + //store the last vrt info into the metadata + metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf; + metadata.time_spec = uhd::time_spec_t( + time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), tick_rate + ); + static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits + metadata.start_of_burst = if_packet_info.has_tlr and (int(if_packet_info.tlr & tlr_sob_flags) == tlr_sob_flags); + static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits + metadata.end_of_burst = if_packet_info.has_tlr and (int(if_packet_info.tlr & tlr_eob_flags) == tlr_eob_flags); + } + + /******************************************************************* + * Recv data, unpack a vrt header, and copy-convert the data. + * - helper function for vrt_packet_handler::recv + ******************************************************************/ + static UHD_INLINE size_t _recv1( + recv_state &state, + const std::vector<void *> &buffs, + size_t offset_bytes, + size_t total_samps, + uhd::rx_metadata_t &metadata, + uhd::convert::function_type &converter, + double tick_rate, + const vrt_unpacker_t &vrt_unpacker, + const get_recv_buffs_t &get_recv_buffs, + const handle_overflow_t &handle_overflow, + size_t vrt_header_offset_words32, + size_t chans_per_otw_buff + ){ + metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_NONE; + + //perform a receive if no rx data is waiting to be copied + if (state.size_of_copy_buffs == 0){ + state.fragment_offset_in_samps = 0; + if (not get_recv_buffs(state.managed_buffs)){ + metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_TIMEOUT; + return 0; + } + try{ + _recv1_helper( + state, metadata, tick_rate, + vrt_unpacker, handle_overflow, + vrt_header_offset_words32 + ); + }catch(const std::exception &e){ + state.size_of_copy_buffs = 0; //reset copy buffs size + std::cerr << "Error (recv): " << e.what() << std::endl; + metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET; + return 0; + } + } + //defaults for the metadata when this is a fragment + else{ + metadata.has_time_spec = false; + metadata.start_of_burst = false; + metadata.end_of_burst = false; + } + + //extract the number of samples available to copy + size_t bytes_per_item = OTW_BYTES_PER_SAMP; + size_t nsamps_available = state.size_of_copy_buffs/bytes_per_item; + size_t nsamps_to_copy = std::min(total_samps*chans_per_otw_buff, nsamps_available); + size_t bytes_to_copy = nsamps_to_copy*bytes_per_item; + size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff; + + std::vector<void *> io_buffs(chans_per_otw_buff); + for (size_t i = 0; i < state.width; i+=chans_per_otw_buff){ + + //fill a vector with pointers to the io buffers + for (size_t j = 0; j < chans_per_otw_buff; j++){ + io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; + } + + //copy-convert the samples from the recv buffer + uhd::convert::input_type otw_buffs(1, state.copy_buffs[i]); + converter(otw_buffs, io_buffs, nsamps_to_copy_per_io_buff); + + //update the rx copy buffer to reflect the bytes copied + state.copy_buffs[i] += bytes_to_copy; + } + //update the copy buffer's availability + state.size_of_copy_buffs -= bytes_to_copy; + + //setup the fragment flags and offset + metadata.more_fragments = state.size_of_copy_buffs != 0; + metadata.fragment_offset = state.fragment_offset_in_samps; + state.fragment_offset_in_samps += nsamps_to_copy; //set for next call + + return nsamps_to_copy_per_io_buff; + } + + /******************************************************************* + * Recv vrt packets and copy convert the samples into the buffer. + ******************************************************************/ + static UHD_INLINE size_t recv( + recv_state &state, + const std::vector<void *> &buffs, + const size_t total_num_samps, + uhd::rx_metadata_t &metadata, + uhd::device::recv_mode_t recv_mode, + const uhd::io_type_t &io_type, + const uhd::otw_type_t &otw_type, + double tick_rate, + const vrt_unpacker_t &vrt_unpacker, + const get_recv_buffs_t &get_recv_buffs, + const handle_overflow_t &handle_overflow = &handle_overflow_nop, + size_t vrt_header_offset_words32 = 0, + size_t chans_per_otw_buff = 1 + ){ + uhd::convert::function_type converter( + uhd::convert::get_converter_otw_to_cpu( + io_type, otw_type, 1, chans_per_otw_buff + )); + + switch(recv_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + return _recv1( + state, + buffs, 0, + total_num_samps, + metadata, + converter, + tick_rate, + vrt_unpacker, + get_recv_buffs, + handle_overflow, + vrt_header_offset_words32, + chans_per_otw_buff + ); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t accum_num_samps = 0; + uhd::rx_metadata_t tmp_md; + while(accum_num_samps < total_num_samps){ + size_t num_samps = _recv1( + state, + buffs, accum_num_samps*io_type.size, + total_num_samps - accum_num_samps, + (accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept + converter, + tick_rate, + vrt_unpacker, + get_recv_buffs, + handle_overflow, + vrt_header_offset_words32, + chans_per_otw_buff + ); + if (num_samps == 0) break; //had a recv timeout or error, break loop + accum_num_samps += num_samps; + } + return accum_num_samps; + } + + default: throw std::runtime_error("unknown recv mode"); + }//switch(recv_mode) + } + +/*********************************************************************** + * vrt packet handler for send + **********************************************************************/ + typedef std::vector<uhd::transport::managed_send_buffer::sptr> managed_send_buffs_t; + typedef boost::function<bool(managed_send_buffs_t &)> get_send_buffs_t; + typedef boost::function<void(boost::uint32_t *, uhd::transport::vrt::if_packet_info_t &)> vrt_packer_t; + + struct send_state{ + //init the expected seq number + size_t next_packet_seq; + + send_state(void) : next_packet_seq(0){ + /* NOP */ + } + }; + + /******************************************************************* + * Pack a vrt header, copy-convert the data, and send it. + * - helper function for vrt_packet_handler::send + ******************************************************************/ + static UHD_INLINE size_t _send1( + send_state &state, + const std::vector<const void *> &buffs, + const size_t offset_bytes, + const size_t num_samps, + uhd::transport::vrt::if_packet_info_t &if_packet_info, + uhd::convert::function_type &converter, + const vrt_packer_t &vrt_packer, + const get_send_buffs_t &get_send_buffs, + const size_t vrt_header_offset_words32, + const size_t chans_per_otw_buff + ){ + //load the rest of the if_packet_info in here + if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t); + if_packet_info.packet_count = state.next_packet_seq; + + //get send buffers for each channel + managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff); + if (not get_send_buffs(send_buffs)) return 0; + + std::vector<const void *> io_buffs(chans_per_otw_buff); + for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ + //calculate pointers with offsets to io and otw memory + for (size_t j = 0; j < chans_per_otw_buff; j++){ + io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; + } + boost::uint32_t *otw_mem = send_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; + + //pack metadata into a vrt header + vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //copy-convert the samples into the send buffer + uhd::convert::output_type otw_buffs(1, otw_mem); + converter(io_buffs, otw_buffs, num_samps); + + //commit the samples to the zero-copy interface + size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); + send_buffs[i]->commit(num_bytes_total); + } + state.next_packet_seq++; //increment sequence after commits + return num_samps; + } + + /******************************************************************* + * Send vrt packets and copy convert the samples into the buffer. + ******************************************************************/ + static UHD_INLINE size_t send( + send_state &state, + const std::vector<const void *> &buffs, + const size_t total_num_samps, + const uhd::tx_metadata_t &metadata, + uhd::device::send_mode_t send_mode, + const uhd::io_type_t &io_type, + const uhd::otw_type_t &otw_type, + double tick_rate, + const vrt_packer_t &vrt_packer, + const get_send_buffs_t &get_send_buffs, + size_t max_samples_per_packet, + size_t vrt_header_offset_words32 = 0, + size_t chans_per_otw_buff = 1 + ){ + uhd::convert::function_type converter( + uhd::convert::get_converter_cpu_to_otw( + io_type, otw_type, chans_per_otw_buff, 1 + )); + + //translate the metadata to vrt if packet info + uhd::transport::vrt::if_packet_info_t if_packet_info; + if_packet_info.has_sid = false; + if_packet_info.has_cid = false; + if_packet_info.has_tlr = false; + if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(tick_rate)); + + if (total_num_samps <= max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; + switch(send_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + + //fill in parts of the packet info overwrote in full buff mode + if_packet_info.has_tsi = metadata.has_time_spec; + if_packet_info.has_tsf = metadata.has_time_spec; + if_packet_info.sob = metadata.start_of_burst; + if_packet_info.eob = metadata.end_of_burst; + + //TODO remove this code when sample counts of zero are supported by hardware + std::vector<const void *> buffs_(buffs); + size_t total_num_samps_(total_num_samps); + if (total_num_samps == 0){ + static const boost::uint64_t zeros = 0; //max size of a host sample + buffs_ = std::vector<const void *>(buffs.size(), &zeros); + total_num_samps_ = 1; + } + + return _send1( + state, + buffs_, 0, + std::min(total_num_samps_, max_samples_per_packet), + if_packet_info, + converter, + vrt_packer, + get_send_buffs, + vrt_header_offset_words32, + chans_per_otw_buff + ); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t total_num_samps_sent = 0; + + //loop through the following fragment indexes + while(total_num_samps_sent < total_num_samps){ + + //calculate per-loop-iteration variables + const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; + const bool first_fragment = (total_num_samps_sent == 0); + const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet); + + //calculate new flags for the fragments + if_packet_info.has_tsi = metadata.has_time_spec and first_fragment; + if_packet_info.has_tsf = if_packet_info.has_tsi; + if_packet_info.sob = metadata.start_of_burst and first_fragment; + if_packet_info.eob = metadata.end_of_burst and final_fragment; + + //send the fragment with the helper function + const size_t num_samps_sent = _send1( + state, + buffs, total_num_samps_sent*io_type.size, + std::min(total_num_samps_unsent, max_samples_per_packet), + if_packet_info, + converter, + vrt_packer, + get_send_buffs, + vrt_header_offset_words32, + chans_per_otw_buff + ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; + } + return total_num_samps_sent; + } + + default: throw std::runtime_error("unknown send mode"); + }//switch(send_mode) + } + +} //namespace vrt_packet_handler + +#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/zero_copy.cpp b/host/lib/transport/zero_copy.cpp new file mode 100644 index 000000000..a5a864a04 --- /dev/null +++ b/host/lib/transport/zero_copy.cpp @@ -0,0 +1,108 @@ +// +// Copyright 2010 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/zero_copy.hpp> + +using namespace uhd::transport; + +/*********************************************************************** + * Safe managed receive buffer + **********************************************************************/ +static void release_nop(void){ + /* NOP */ +} + +class safe_managed_receive_buffer : public managed_recv_buffer{ +public: + safe_managed_receive_buffer( + const boost::asio::const_buffer &buff, + const release_fcn_t &release_fcn + ): + _buff(buff), _release_fcn(release_fcn) + { + /* NOP */ + } + + ~safe_managed_receive_buffer(void){ + _release_fcn(); + } + + void release(void){ + release_fcn_t release_fcn = _release_fcn; + _release_fcn = &release_nop; + return release_fcn(); + } + +private: + const boost::asio::const_buffer &get(void) const{ + return _buff; + } + + const boost::asio::const_buffer _buff; + release_fcn_t _release_fcn; +}; + +managed_recv_buffer::sptr managed_recv_buffer::make_safe( + const boost::asio::const_buffer &buff, + const release_fcn_t &release_fcn +){ + return sptr(new safe_managed_receive_buffer(buff, release_fcn)); +} + +/*********************************************************************** + * Safe managed send buffer + **********************************************************************/ +static void commit_nop(size_t){ + /* NOP */ +} + +class safe_managed_send_buffer : public managed_send_buffer{ +public: + safe_managed_send_buffer( + const boost::asio::mutable_buffer &buff, + const commit_fcn_t &commit_fcn + ): + _buff(buff), _commit_fcn(commit_fcn) + { + /* NOP */ + } + + ~safe_managed_send_buffer(void){ + _commit_fcn(0); + } + + void commit(size_t num_bytes){ + commit_fcn_t commit_fcn = _commit_fcn; + _commit_fcn = &commit_nop; + return commit_fcn(num_bytes); + } + +private: + const boost::asio::mutable_buffer &get(void) const{ + return _buff; + } + + const boost::asio::mutable_buffer _buff; + commit_fcn_t _commit_fcn; +}; + +safe_managed_send_buffer::sptr managed_send_buffer::make_safe( + const boost::asio::mutable_buffer &buff, + const commit_fcn_t &commit_fcn +){ + return sptr(new safe_managed_send_buffer(buff, commit_fcn)); +} |