aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt122
-rw-r--r--host/lib/transport/buffer_pool.cpp80
-rw-r--r--host/lib/transport/gen_vrt_if_packet.py286
-rw-r--r--host/lib/transport/if_addrs.cpp121
-rw-r--r--host/lib/transport/libusb1_base.cpp279
-rw-r--r--host/lib/transport/libusb1_base.hpp149
-rw-r--r--host/lib/transport/libusb1_control.cpp67
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp312
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp631
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp346
-rw-r--r--host/lib/transport/udp_common.hpp60
-rw-r--r--host/lib/transport/udp_simple.cpp134
-rw-r--r--host/lib/transport/udp_wsa_zero_copy.cpp300
-rw-r--r--host/lib/transport/udp_zero_copy.cpp304
-rw-r--r--host/lib/transport/usb_dummy_impl.cpp38
-rw-r--r--host/lib/transport/usb_zero_copy_wrapper.cpp231
16 files changed, 3460 insertions, 0 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
new file mode 100644
index 000000000..6524a8412
--- /dev/null
+++ b/host/lib/transport/CMakeLists.txt
@@ -0,0 +1,122 @@
+#
+# Copyright 2010-2011 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+########################################################################
+# This file included, use CMake directory variables
+########################################################################
+
+########################################################################
+# Setup libusb
+########################################################################
+MESSAGE(STATUS "")
+FIND_PACKAGE(USB1)
+
+LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF)
+
+IF(ENABLE_USB)
+ MESSAGE(STATUS "USB support enabled via libusb.")
+ INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIRS})
+ LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES})
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_control.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_zero_copy.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/libusb1_base.hpp
+ )
+ELSE(ENABLE_USB)
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/usb_dummy_impl.cpp
+ )
+ENDIF(ENABLE_USB)
+
+########################################################################
+# Setup defines for interface address discovery
+########################################################################
+MESSAGE(STATUS "")
+MESSAGE(STATUS "Configuring interface address discovery...")
+INCLUDE(CheckCXXSourceCompiles)
+INCLUDE(CheckIncludeFileCXX)
+
+CHECK_CXX_SOURCE_COMPILES("
+ #include <ifaddrs.h>
+ int main(){
+ struct ifaddrs *ifap;
+ getifaddrs(&ifap);
+ return 0;
+ }
+ " HAVE_GETIFADDRS
+)
+
+CHECK_INCLUDE_FILE_CXX(winsock2.h HAVE_WINSOCK2_H)
+
+IF(HAVE_GETIFADDRS)
+ MESSAGE(STATUS " Interface address discovery supported through getifaddrs.")
+ SET(IF_ADDRS_DEFS HAVE_GETIFADDRS)
+ELSEIF(HAVE_WINSOCK2_H)
+ MESSAGE(STATUS " Interface address discovery supported through SIO_GET_INTERFACE_LIST.")
+ SET(IF_ADDRS_DEFS HAVE_SIO_GET_INTERFACE_LIST)
+ELSE()
+ MESSAGE(STATUS " Interface address discovery not supported.")
+ SET(IF_ADDRS_DEFS HAVE_IF_ADDRS_DUMMY)
+ENDIF()
+
+SET_SOURCE_FILES_PROPERTIES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp
+ PROPERTIES COMPILE_DEFINITIONS "${IF_ADDRS_DEFS}"
+)
+
+########################################################################
+# Setup UDP
+########################################################################
+IF(WIN32)
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp)
+ELSE()
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp)
+ENDIF()
+
+#On windows, the boost asio implementation uses the winsock2 library.
+#Note: we exclude the .lib extension for cygwin and mingw platforms.
+IF(WIN32)
+ LIBUHD_APPEND_LIBS(ws2_32)
+ENDIF()
+
+#atlbase.h is not included with visual studio express
+#conditionally check for atlbase.h and define if found
+INCLUDE(CheckIncludeFileCXX)
+CHECK_INCLUDE_FILE_CXX(atlbase.h HAVE_ATLBASE_H)
+IF(HAVE_ATLBASE_H)
+ SET_SOURCE_FILES_PROPERTIES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp
+ PROPERTIES COMPILE_DEFINITIONS "HAVE_ATLBASE_H"
+ )
+ENDIF(HAVE_ATLBASE_H)
+
+########################################################################
+# Append to the list of sources for lib uhd
+########################################################################
+LIBUHD_PYTHON_GEN_SOURCE(
+ ${CMAKE_CURRENT_SOURCE_DIR}/gen_vrt_if_packet.py
+ ${CMAKE_CURRENT_BINARY_DIR}/vrt_if_packet.cpp
+)
+
+LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/usb_zero_copy_wrapper.cpp
+)
diff --git a/host/lib/transport/buffer_pool.cpp b/host/lib/transport/buffer_pool.cpp
new file mode 100644
index 000000000..971bbb75a
--- /dev/null
+++ b/host/lib/transport/buffer_pool.cpp
@@ -0,0 +1,80 @@
+//
+// Copyright 2011-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/buffer_pool.hpp>
+#include <boost/shared_array.hpp>
+#include <vector>
+
+using namespace uhd::transport;
+
+//! pad the byte count to a multiple of alignment
+static size_t pad_to_boundary(const size_t bytes, const size_t alignment){
+ return bytes + (alignment - bytes)%alignment;
+}
+
+/***********************************************************************
+ * Buffer pool implementation
+ **********************************************************************/
+class buffer_pool_impl : public buffer_pool{
+public:
+ buffer_pool_impl(
+ const std::vector<ptr_type> &ptrs,
+ boost::shared_array<char> mem
+ ): _ptrs(ptrs), _mem(mem){
+ /* NOP */
+ }
+
+ ptr_type at(const size_t index) const{
+ return _ptrs.at(index);
+ }
+
+ size_t size(void) const{
+ return _ptrs.size();
+ }
+
+private:
+ std::vector<ptr_type> _ptrs;
+ boost::shared_array<char> _mem;
+};
+
+/***********************************************************************
+ * Buffer pool factor function
+ **********************************************************************/
+buffer_pool::sptr buffer_pool::make(
+ const size_t num_buffs,
+ const size_t buff_size,
+ const size_t alignment
+){
+ //1) pad the buffer size to be a multiple of alignment
+ //2) pad the overall memory size for room after alignment
+ //3) allocate the memory in one block of sufficient size
+ const size_t padded_buff_size = pad_to_boundary(buff_size, alignment);
+ boost::shared_array<char> mem(new char[padded_buff_size*num_buffs + alignment-1]);
+
+ //Fill a vector with boundary-aligned points in the memory
+ const size_t mem_start = pad_to_boundary(size_t(mem.get()), alignment);
+ std::vector<ptr_type> ptrs(num_buffs);
+ for (size_t i = 0; i < num_buffs; i++){
+ ptrs[i] = ptr_type(mem_start + padded_buff_size*i);
+ }
+
+ //Create a new buffer pool implementation with:
+ // - the pre-computed pointers, and
+ // - the reference to allocated memory.
+ return sptr(new buffer_pool_impl(ptrs, mem));
+}
+
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py
new file mode 100644
index 000000000..e28ce3aae
--- /dev/null
+++ b/host/lib/transport/gen_vrt_if_packet.py
@@ -0,0 +1,286 @@
+#!/usr/bin/env python
+#
+# Copyright 2010-2011 Ettus Research LLC
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+"""
+The vrt packer/unpacker code generator:
+
+This script will generate the pack and unpack routines that convert
+metatdata into vrt headers and vrt headers into metadata.
+
+The generated code infers jump tables to speed-up the parsing time.
+"""
+
+TMPL_TEXT = """
+#import time
+/***********************************************************************
+ * This file was generated by $file on $time.strftime("%c")
+ **********************************************************************/
+
+\#include <uhd/exception.hpp>
+\#include <uhd/transport/vrt_if_packet.hpp>
+\#include <uhd/utils/byteswap.hpp>
+\#include <boost/detail/endian.hpp>
+\#include <vector>
+
+//define the endian macros to convert integers
+\#ifdef BOOST_BIG_ENDIAN
+ \#define BE_MACRO(x) (x)
+ \#define LE_MACRO(x) uhd::byteswap(x)
+\#else
+ \#define BE_MACRO(x) uhd::byteswap(x)
+ \#define LE_MACRO(x) (x)
+\#endif
+
+using namespace uhd;
+using namespace uhd::transport;
+
+typedef size_t pred_type;
+typedef std::vector<pred_type> pred_table_type;
+#define pred_table_index(hdr) ((hdr >> 20) & 0x1ff)
+
+static pred_table_type get_pred_unpack_table(void){
+ pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28)
+ for (size_t i = 0; i < table.size(); i++){
+ boost::uint32_t vrt_hdr_word = i << 20;
+ if(vrt_hdr_word & $hex(0x1 << 28)) table[i] |= $hex($sid_p);
+ if(vrt_hdr_word & $hex(0x1 << 27)) table[i] |= $hex($cid_p);
+ if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p);
+ if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p);
+ if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p);
+ if(vrt_hdr_word & $hex(0x1 << 24)) table[i] |= $hex($eob_p);
+ if(vrt_hdr_word & $hex(0x1 << 25)) table[i] |= $hex($sob_p);
+ }
+ return table;
+}
+
+static const pred_table_type pred_unpack_table(get_pred_unpack_table());
+
+//maps trailer bits to num empty bytes
+//maps num empty bytes to trailer bits
+static const size_t occ_table[] = {0, 2, 1, 3};
+
+########################################################################
+#def gen_code($XE_MACRO, $suffix)
+########################################################################
+
+void vrt::if_hdr_pack_$(suffix)(
+ boost::uint32_t *packet_buff,
+ if_packet_info_t &if_packet_info
+){
+ boost::uint32_t vrt_hdr_flags = 0;
+
+ pred_type pred = 0;
+ if (if_packet_info.has_sid) pred |= $hex($sid_p);
+ if (if_packet_info.has_cid) pred |= $hex($cid_p);
+ if (if_packet_info.has_tsi) pred |= $hex($tsi_p);
+ if (if_packet_info.has_tsf) pred |= $hex($tsf_p);
+ if (if_packet_info.has_tlr) pred |= $hex($tlr_p);
+ if (if_packet_info.eob) pred |= $hex($eob_p);
+ if (if_packet_info.sob) pred |= $hex($sob_p);
+
+ switch(pred){
+ #for $pred in range(2**7)
+ case $pred:
+ #set $num_header_words = 1
+ #set $flags = 0
+ ########## Stream ID ##########
+ #if $pred & $sid_p
+ packet_buff[$num_header_words] = $(XE_MACRO)(if_packet_info.sid);
+ #set $num_header_words += 1
+ #set $flags |= (0x1 << 28);
+ #end if
+ ########## Class ID ##########
+ #if $pred & $cid_p
+ packet_buff[$num_header_words] = 0; //not implemented
+ #set $num_header_words += 1
+ packet_buff[$num_header_words] = 0; //not implemented
+ #set $num_header_words += 1
+ #set $flags |= (0x1 << 27);
+ #end if
+ ########## Integer Time ##########
+ #if $pred & $tsi_p
+ packet_buff[$num_header_words] = $(XE_MACRO)(if_packet_info.tsi);
+ #set $num_header_words += 1
+ #set $flags |= (0x3 << 22);
+ #end if
+ ########## Fractional Time ##########
+ #if $pred & $tsf_p
+ packet_buff[$num_header_words] = $(XE_MACRO)(boost::uint32_t(if_packet_info.tsf >> 32));
+ #set $num_header_words += 1
+ packet_buff[$num_header_words] = $(XE_MACRO)(boost::uint32_t(if_packet_info.tsf >> 0));
+ #set $num_header_words += 1
+ #set $flags |= (0x1 << 20);
+ #end if
+ ########## Burst Flags ##########
+ #if $pred & $eob_p
+ #set $flags |= (0x1 << 24);
+ #end if
+ #if $pred & $sob_p
+ #set $flags |= (0x1 << 25);
+ #end if
+ ########## Trailer ##########
+ #if $pred & $tlr_p
+ {
+ const size_t empty_bytes = if_packet_info.num_payload_words32*sizeof(boost::uint32_t) - if_packet_info.num_payload_bytes;
+ if_packet_info.tlr = (0x3 << 22) | (occ_table[empty_bytes & 0x3] << 10);
+ }
+ packet_buff[$num_header_words+if_packet_info.num_payload_words32] = $(XE_MACRO)(if_packet_info.tlr);
+ #set $flags |= (0x1 << 26);
+ #set $num_trailer_words = 1;
+ #else
+ #set $num_trailer_words = 0;
+ #end if
+ ########## Variables ##########
+ if_packet_info.num_header_words32 = $num_header_words;
+ if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32;
+ vrt_hdr_flags = $hex($flags);
+ break;
+ #end for
+ }
+
+ //fill in complete header word
+ packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0
+ | (if_packet_info.packet_type << 29)
+ | vrt_hdr_flags
+ | ((if_packet_info.packet_count & 0xf) << 16)
+ | (if_packet_info.num_packet_words32 & 0xffff)
+ ));
+}
+
+void vrt::if_hdr_unpack_$(suffix)(
+ const boost::uint32_t *packet_buff,
+ if_packet_info_t &if_packet_info
+){
+ //extract vrt header
+ boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]);
+ size_t packet_words32 = vrt_hdr_word & 0xffff;
+
+ //failure case
+ if (if_packet_info.num_packet_words32 < packet_words32)
+ throw uhd::value_error("bad vrt header or packet fragment");
+
+ //extract fields from the header
+ if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29);
+ if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf;
+
+ const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];
+
+ size_t empty_bytes = 0;
+
+ switch(pred){
+ #for $pred in range(2**7)
+ case $pred:
+ #set $has_time_spec = False
+ #set $num_header_words = 1
+ ########## Stream ID ##########
+ #if $pred & $sid_p
+ if_packet_info.has_sid = true;
+ if_packet_info.sid = $(XE_MACRO)(packet_buff[$num_header_words]);
+ #set $num_header_words += 1
+ #else
+ if_packet_info.has_sid = false;
+ #end if
+ ########## Class ID ##########
+ #if $pred & $cid_p
+ if_packet_info.has_cid = true;
+ if_packet_info.cid = 0; //not implemented
+ #set $num_header_words += 2
+ #else
+ if_packet_info.has_cid = false;
+ #end if
+ ########## Integer Time ##########
+ #if $pred & $tsi_p
+ if_packet_info.has_tsi = true;
+ if_packet_info.tsi = $(XE_MACRO)(packet_buff[$num_header_words]);
+ #set $num_header_words += 1
+ #else
+ if_packet_info.has_tsi = false;
+ #end if
+ ########## Fractional Time ##########
+ #if $pred & $tsf_p
+ if_packet_info.has_tsf = true;
+ if_packet_info.tsf = boost::uint64_t($(XE_MACRO)(packet_buff[$num_header_words])) << 32;
+ #set $num_header_words += 1
+ if_packet_info.tsf |= $(XE_MACRO)(packet_buff[$num_header_words]);
+ #set $num_header_words += 1
+ #else
+ if_packet_info.has_tsf = false;
+ #end if
+ ########## Burst Flags ##########
+ #if $pred & $eob_p
+ if_packet_info.eob = true;
+ #else
+ if_packet_info.eob = false;
+ #end if
+ #if $pred & $sob_p
+ if_packet_info.sob = true;
+ #else
+ if_packet_info.sob = false;
+ #end if
+ ########## Trailer ##########
+ #if $pred & $tlr_p
+ if_packet_info.has_tlr = true;
+ if_packet_info.tlr = $(XE_MACRO)(packet_buff[packet_words32-1]);
+ #set $num_trailer_words = 1;
+ {
+ const int indicators = (if_packet_info.tlr >> 20) & (if_packet_info.tlr >> 8);
+ if ((indicators & (1 << 0)) != 0) if_packet_info.eob = true;
+ if ((indicators & (1 << 1)) != 0) if_packet_info.sob = true;
+ empty_bytes = occ_table[(indicators >> 2) & 0x3];
+ }
+ #else
+ if_packet_info.has_tlr = false;
+ #set $num_trailer_words = 0;
+ #end if
+ ########## Variables ##########
+ //another failure case
+ if (packet_words32 < $($num_header_words + $num_trailer_words))
+ throw uhd::value_error("bad vrt header or invalid packet length");
+ if_packet_info.num_header_words32 = $num_header_words;
+ if_packet_info.num_payload_words32 = packet_words32 - $($num_header_words + $num_trailer_words);
+ if_packet_info.num_payload_bytes = if_packet_info.num_payload_words32*sizeof(boost::uint32_t) - empty_bytes;
+ break;
+ #end for
+ }
+}
+
+########################################################################
+#end def
+########################################################################
+
+$gen_code("BE_MACRO", "be")
+$gen_code("LE_MACRO", "le")
+"""
+
+def parse_tmpl(_tmpl_text, **kwargs):
+ from Cheetah.Template import Template
+ return str(Template(_tmpl_text, kwargs))
+
+if __name__ == '__main__':
+ import sys
+ open(sys.argv[1], 'w').write(parse_tmpl(
+ TMPL_TEXT,
+ file=__file__,
+ sid_p = 0b0000001,
+ cid_p = 0b0000010,
+ tsi_p = 0b0000100,
+ tsf_p = 0b0001000,
+ tlr_p = 0b0010000,
+ sob_p = 0b0100000,
+ eob_p = 0b1000000,
+ ))
diff --git a/host/lib/transport/if_addrs.cpp b/host/lib/transport/if_addrs.cpp
new file mode 100644
index 000000000..2ad0c8c53
--- /dev/null
+++ b/host/lib/transport/if_addrs.cpp
@@ -0,0 +1,121 @@
+//
+// Copyright 2010-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/if_addrs.hpp>
+#include <boost/asio/ip/address_v4.hpp>
+#include <boost/cstdint.hpp>
+#include <iostream>
+
+/***********************************************************************
+ * Interface address discovery through ifaddrs api
+ **********************************************************************/
+#ifdef HAVE_GETIFADDRS
+#include <ifaddrs.h>
+
+static boost::asio::ip::address_v4 sockaddr_to_ip_addr(sockaddr *addr){
+ return boost::asio::ip::address_v4(ntohl(
+ reinterpret_cast<sockaddr_in*>(addr)->sin_addr.s_addr
+ ));
+}
+
+std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){
+ std::vector<if_addrs_t> if_addrs;
+ struct ifaddrs *ifap;
+ if (getifaddrs(&ifap) == 0){
+ for (struct ifaddrs *iter = ifap; iter != NULL; iter = iter->ifa_next){
+ //ensure that the entries are valid
+ if (iter->ifa_addr == NULL) continue;
+ if (iter->ifa_addr->sa_family != AF_INET) continue;
+ if (iter->ifa_netmask->sa_family != AF_INET) continue;
+ if (iter->ifa_broadaddr->sa_family != AF_INET) continue;
+
+ //append a new set of interface addresses
+ if_addrs_t if_addr;
+ if_addr.inet = sockaddr_to_ip_addr(iter->ifa_addr).to_string();
+ if_addr.mask = sockaddr_to_ip_addr(iter->ifa_netmask).to_string();
+ if_addr.bcast = sockaddr_to_ip_addr(iter->ifa_broadaddr).to_string();
+
+ //correct the bcast address when its same as the gateway
+ if (if_addr.inet == if_addr.bcast or sockaddr_to_ip_addr(iter->ifa_broadaddr) == boost::asio::ip::address_v4(0)){
+ //manually calculate broadcast address
+ //https://svn.boost.org/trac/boost/ticket/5198
+ const boost::uint32_t addr = sockaddr_to_ip_addr(iter->ifa_addr).to_ulong();
+ const boost::uint32_t mask = sockaddr_to_ip_addr(iter->ifa_netmask).to_ulong();
+ const boost::uint32_t bcast = (addr & mask) | ~mask;
+ if_addr.bcast = boost::asio::ip::address_v4(bcast).to_string();
+ }
+
+ if_addrs.push_back(if_addr);
+ }
+ freeifaddrs(ifap);
+ }
+ return if_addrs;
+}
+
+#endif /* HAVE_GETIFADDRS */
+
+/***********************************************************************
+ * Interface address discovery through windows api
+ **********************************************************************/
+#ifdef HAVE_SIO_GET_INTERFACE_LIST
+#include <winsock2.h>
+
+std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){
+ std::vector<if_addrs_t> if_addrs;
+ SOCKET sd = WSASocket(AF_INET, SOCK_DGRAM, 0, 0, 0, 0);
+ if (sd == SOCKET_ERROR) {
+ std::cerr << "Failed to get a socket. Error " << WSAGetLastError() <<
+ std::endl; return if_addrs;
+ }
+
+ INTERFACE_INFO InterfaceList[20];
+ unsigned long nBytesReturned;
+ if (WSAIoctl(sd, SIO_GET_INTERFACE_LIST, 0, 0, &InterfaceList,
+ sizeof(InterfaceList), &nBytesReturned, 0, 0) == SOCKET_ERROR) {
+ std::cerr << "Failed calling WSAIoctl: error " << WSAGetLastError() <<
+ std::endl;
+ return if_addrs;
+ }
+
+ int nNumInterfaces = nBytesReturned / sizeof(INTERFACE_INFO);
+ for (int i = 0; i < nNumInterfaces; ++i) {
+ boost::uint32_t iiAddress = ntohl(reinterpret_cast<sockaddr_in&>(InterfaceList[i].iiAddress).sin_addr.s_addr);
+ boost::uint32_t iiNetmask = ntohl(reinterpret_cast<sockaddr_in&>(InterfaceList[i].iiNetmask).sin_addr.s_addr);
+ boost::uint32_t iiBroadcastAddress = (iiAddress & iiNetmask) | ~iiNetmask;
+
+ if_addrs_t if_addr;
+ if_addr.inet = boost::asio::ip::address_v4(iiAddress).to_string();
+ if_addr.mask = boost::asio::ip::address_v4(iiNetmask).to_string();
+ if_addr.bcast = boost::asio::ip::address_v4(iiBroadcastAddress).to_string();
+ if_addrs.push_back(if_addr);
+ }
+
+ return if_addrs;
+}
+
+#endif /* HAVE_SIO_GET_INTERFACE_LIST */
+
+/***********************************************************************
+ * Interface address discovery not included
+ **********************************************************************/
+#ifdef HAVE_IF_ADDRS_DUMMY
+
+std::vector<uhd::transport::if_addrs_t> uhd::transport::get_if_addrs(void){
+ return std::vector<if_addrs_t>();
+}
+
+#endif /* HAVE_IF_ADDRS_DUMMY */
diff --git a/host/lib/transport/libusb1_base.cpp b/host/lib/transport/libusb1_base.cpp
new file mode 100644
index 000000000..d4ec874f1
--- /dev/null
+++ b/host/lib/transport/libusb1_base.cpp
@@ -0,0 +1,279 @@
+//
+// Copyright 2010-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "libusb1_base.hpp"
+#include <uhd/exception.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/types/dict.hpp>
+#include <boost/weak_ptr.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/foreach.hpp>
+#include <iostream>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+/***********************************************************************
+ * libusb session
+ **********************************************************************/
+class libusb_session_impl : public libusb::session{
+public:
+ libusb_session_impl(void){
+ UHD_ASSERT_THROW(libusb_init(&_context) == 0);
+ libusb_set_debug(_context, debug_level);
+ }
+
+ ~libusb_session_impl(void){
+ libusb_exit(_context);
+ }
+
+ libusb_context *get_context(void) const{
+ return _context;
+ }
+
+private:
+ libusb_context *_context;
+};
+
+libusb::session::sptr libusb::session::get_global_session(void){
+ static boost::weak_ptr<session> global_session;
+
+ //not expired -> get existing session
+ if (not global_session.expired()) return global_session.lock();
+
+ //create a new global session
+ sptr new_global_session(new libusb_session_impl());
+ global_session = new_global_session;
+ return new_global_session;
+}
+
+/***********************************************************************
+ * libusb device
+ **********************************************************************/
+class libusb_device_impl : public libusb::device{
+public:
+ libusb_device_impl(libusb_device *dev){
+ _session = libusb::session::get_global_session();
+ _dev = dev;
+ }
+
+ ~libusb_device_impl(void){
+ libusb_unref_device(this->get());
+ }
+
+ libusb_device *get(void) const{
+ return _dev;
+ }
+
+private:
+ libusb::session::sptr _session; //always keep a reference to session
+ libusb_device *_dev;
+};
+
+/***********************************************************************
+ * libusb device list
+ **********************************************************************/
+class libusb_device_list_impl : public libusb::device_list{
+public:
+ libusb_device_list_impl(void){
+ libusb::session::sptr sess = libusb::session::get_global_session();
+
+ //allocate a new list of devices
+ libusb_device** dev_list;
+ ssize_t ret = libusb_get_device_list(sess->get_context(), &dev_list);
+ if (ret < 0) throw uhd::os_error("cannot enumerate usb devices");
+
+ //fill the vector of device references
+ for (size_t i = 0; i < size_t(ret); i++) _devs.push_back(
+ libusb::device::sptr(new libusb_device_impl(dev_list[i]))
+ );
+
+ //free the device list but dont unref (done in ~device)
+ libusb_free_device_list(dev_list, false/*dont unref*/);
+ }
+
+ size_t size(void) const{
+ return _devs.size();
+ }
+
+ libusb::device::sptr at(size_t i) const{
+ return _devs.at(i);
+ }
+
+private:
+ std::vector<libusb::device::sptr> _devs;
+};
+
+libusb::device_list::sptr libusb::device_list::make(void){
+ return sptr(new libusb_device_list_impl());
+}
+
+/***********************************************************************
+ * libusb device descriptor
+ **********************************************************************/
+class libusb_device_descriptor_impl : public libusb::device_descriptor{
+public:
+ libusb_device_descriptor_impl(libusb::device::sptr dev){
+ _dev = dev;
+ UHD_ASSERT_THROW(libusb_get_device_descriptor(_dev->get(), &_desc) == 0);
+ }
+
+ const libusb_device_descriptor &get(void) const{
+ return _desc;
+ }
+
+ std::string get_ascii_serial(void) const{
+ if (this->get().iSerialNumber == 0) return "";
+
+ libusb::device_handle::sptr handle(
+ libusb::device_handle::get_cached_handle(_dev)
+ );
+
+ unsigned char buff[512];
+ ssize_t ret = libusb_get_string_descriptor_ascii(
+ handle->get(), this->get().iSerialNumber, buff, sizeof(buff)
+ );
+ if (ret < 0) return ""; //on error, just return empty string
+
+ return std::string((char *)buff, ret);
+ }
+
+private:
+ libusb::device::sptr _dev; //always keep a reference to device
+ libusb_device_descriptor _desc;
+};
+
+libusb::device_descriptor::sptr libusb::device_descriptor::make(device::sptr dev){
+ return sptr(new libusb_device_descriptor_impl(dev));
+}
+
+/***********************************************************************
+ * libusb device handle
+ **********************************************************************/
+class libusb_device_handle_impl : public libusb::device_handle{
+public:
+ libusb_device_handle_impl(libusb::device::sptr dev){
+ _dev = dev;
+ UHD_ASSERT_THROW(libusb_open(_dev->get(), &_handle) == 0);
+ }
+
+ ~libusb_device_handle_impl(void){
+ //release all claimed interfaces
+ for (size_t i = 0; i < _claimed.size(); i++){
+ libusb_release_interface(this->get(), _claimed[i]);
+ }
+ libusb_close(_handle);
+ }
+
+ libusb_device_handle *get(void) const{
+ return _handle;
+ }
+
+ void claim_interface(int interface){
+ UHD_ASSERT_THROW(libusb_claim_interface(this->get(), interface) == 0);
+ _claimed.push_back(interface);
+ }
+
+private:
+ libusb::device::sptr _dev; //always keep a reference to device
+ libusb_device_handle *_handle;
+ std::vector<int> _claimed;
+};
+
+libusb::device_handle::sptr libusb::device_handle::get_cached_handle(device::sptr dev){
+ static uhd::dict<libusb_device *, boost::weak_ptr<device_handle> > handles;
+
+ //lock for atomic access to static table above
+ static boost::mutex mutex;
+ boost::mutex::scoped_lock lock(mutex);
+
+ //not expired -> get existing handle
+ if (handles.has_key(dev->get()) and not handles[dev->get()].expired()){
+ return handles[dev->get()].lock();
+ }
+
+ //create a new cached handle
+ try{
+ sptr new_handle(new libusb_device_handle_impl(dev));
+ handles[dev->get()] = new_handle;
+ return new_handle;
+ }
+ catch(const uhd::exception &){
+ #ifdef UHD_PLATFORM_LINUX
+ UHD_MSG(error) <<
+ "USB open failed: insufficient permissions.\n"
+ "See the application notes for your device.\n"
+ << std::endl;
+ #else
+ UHD_LOG << "USB open failed: device already claimed." << std::endl;
+ #endif
+ throw;
+ }
+}
+
+/***********************************************************************
+ * libusb special handle
+ **********************************************************************/
+class libusb_special_handle_impl : public libusb::special_handle{
+public:
+ libusb_special_handle_impl(libusb::device::sptr dev){
+ _dev = dev;
+ }
+
+ libusb::device::sptr get_device(void) const{
+ return _dev;
+ }
+
+ std::string get_serial(void) const{
+ return libusb::device_descriptor::make(this->get_device())->get_ascii_serial();
+ }
+
+ boost::uint16_t get_vendor_id(void) const{
+ return libusb::device_descriptor::make(this->get_device())->get().idVendor;
+ }
+
+ boost::uint16_t get_product_id(void) const{
+ return libusb::device_descriptor::make(this->get_device())->get().idProduct;
+ }
+
+private:
+ libusb::device::sptr _dev; //always keep a reference to device
+};
+
+libusb::special_handle::sptr libusb::special_handle::make(device::sptr dev){
+ return sptr(new libusb_special_handle_impl(dev));
+}
+
+/***********************************************************************
+ * list device handles implementations
+ **********************************************************************/
+std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(
+ boost::uint16_t vid, boost::uint16_t pid
+){
+ std::vector<usb_device_handle::sptr> handles;
+
+ libusb::device_list::sptr dev_list = libusb::device_list::make();
+ for (size_t i = 0; i < dev_list->size(); i++){
+ usb_device_handle::sptr handle = libusb::special_handle::make(dev_list->at(i));
+ if (handle->get_vendor_id() == vid and handle->get_product_id() == pid){
+ handles.push_back(handle);
+ }
+ }
+
+ return handles;
+}
diff --git a/host/lib/transport/libusb1_base.hpp b/host/lib/transport/libusb1_base.hpp
new file mode 100644
index 000000000..04c1d6574
--- /dev/null
+++ b/host/lib/transport/libusb1_base.hpp
@@ -0,0 +1,149 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP
+
+#include <uhd/config.hpp>
+#include <boost/utility.hpp>
+#include <boost/shared_ptr.hpp>
+#include <uhd/transport/usb_device_handle.hpp>
+#include <libusb.h>
+
+/***********************************************************************
+ * Libusb object oriented smart pointer wrappers:
+ * The following wrappers provide allocation and automatic deallocation
+ * for various libusb data types and handles. The construction routines
+ * also store tables of already allocated structures to avoid multiple
+ * occurrences of opened handles (for example).
+ **********************************************************************/
+namespace uhd { namespace transport {
+
+namespace libusb {
+
+ /*!
+ * This session class holds a global libusb context for this process.
+ * The get global session call will create a new context if none exists.
+ * When all references to session are destroyed, the context will be freed.
+ */
+ class session : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<session> sptr;
+
+ /*!
+ * Level 0: no messages ever printed by the library (default)
+ * Level 1: error messages are printed to stderr
+ * Level 2: warning and error messages are printed to stderr
+ * Level 3: informational messages are printed to stdout, warning
+ * and error messages are printed to stderr
+ */
+ static const int debug_level = 0;
+
+ //! get a shared pointer to the global session
+ static sptr get_global_session(void);
+
+ //! get the underlying libusb context pointer
+ virtual libusb_context *get_context(void) const = 0;
+ };
+
+ /*!
+ * Holds a device pointer with a reference to the session.
+ */
+ class device : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device> sptr;
+
+ //! get the underlying device pointer
+ virtual libusb_device *get(void) const = 0;
+ };
+
+ /*!
+ * This device list class holds a device list that will be
+ * automatically freed when the last reference is destroyed.
+ */
+ class device_list : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device_list> sptr;
+
+ //! make a new device list
+ static sptr make(void);
+
+ //! the number of devices in this list
+ virtual size_t size() const = 0;
+
+ //! get the device pointer at a particular index
+ virtual device::sptr at(size_t index) const = 0;
+ };
+
+ /*!
+ * Holds a device descriptor and a reference to the device.
+ */
+ class device_descriptor : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device_descriptor> sptr;
+
+ //! make a new descriptor from a device reference
+ static sptr make(device::sptr);
+
+ //! get the underlying device descriptor
+ virtual const libusb_device_descriptor &get(void) const = 0;
+
+ virtual std::string get_ascii_serial(void) const = 0;
+ };
+
+ /*!
+ * Holds a device handle and a reference to the device.
+ */
+ class device_handle : boost::noncopyable {
+ public:
+ typedef boost::shared_ptr<device_handle> sptr;
+
+ //! get a cached handle or make a new one given the device
+ static sptr get_cached_handle(device::sptr);
+
+ //! get the underlying device handle
+ virtual libusb_device_handle *get(void) const = 0;
+
+ /*!
+ * Open USB interfaces for control using magic value
+ * IN interface: 2
+ * OUT interface: 1
+ * Control interface: 0
+ */
+ virtual void claim_interface(int) = 0;
+ };
+
+ /*!
+ * The special handle is our internal implementation of the
+ * usb device handle which is used publicly to identify a device.
+ */
+ class special_handle : public usb_device_handle {
+ public:
+ typedef boost::shared_ptr<special_handle> sptr;
+
+ //! make a new special handle from device
+ static sptr make(device::sptr);
+
+ //! get the underlying device reference
+ virtual device::sptr get_device(void) const = 0;
+ };
+
+}
+
+}} //namespace
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_LIBUSB_HPP */
diff --git a/host/lib/transport/libusb1_control.cpp b/host/lib/transport/libusb1_control.cpp
new file mode 100644
index 000000000..3d9b38785
--- /dev/null
+++ b/host/lib/transport/libusb1_control.cpp
@@ -0,0 +1,67 @@
+//
+// Copyright 2010 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "libusb1_base.hpp"
+#include <uhd/transport/usb_control.hpp>
+#include <boost/thread/mutex.hpp>
+
+using namespace uhd::transport;
+
+const int libusb_timeout = 0;
+
+/***********************************************************************
+ * libusb-1.0 implementation of USB control transport
+ **********************************************************************/
+class libusb_control_impl : public usb_control {
+public:
+ libusb_control_impl(libusb::device_handle::sptr handle, const size_t interface):
+ _handle(handle)
+ {
+ _handle->claim_interface(interface);
+ }
+
+ ssize_t submit(boost::uint8_t request_type,
+ boost::uint8_t request,
+ boost::uint16_t value,
+ boost::uint16_t index,
+ unsigned char *buff,
+ boost::uint16_t length
+ ){
+ boost::mutex::scoped_lock lock(_mutex);
+ return libusb_control_transfer(_handle->get(),
+ request_type,
+ request,
+ value,
+ index,
+ buff,
+ length,
+ libusb_timeout);
+ }
+
+private:
+ libusb::device_handle::sptr _handle;
+ boost::mutex _mutex;
+};
+
+/***********************************************************************
+ * USB control public make functions
+ **********************************************************************/
+usb_control::sptr usb_control::make(usb_device_handle::sptr handle, const size_t interface){
+ return sptr(new libusb_control_impl(libusb::device_handle::get_cached_handle(
+ boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()
+ ), interface));
+}
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
new file mode 100644
index 000000000..28bff9709
--- /dev/null
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -0,0 +1,312 @@
+//
+// Copyright 2010-2012 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "libusb1_base.hpp"
+#include <uhd/transport/usb_zero_copy.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/exception.hpp>
+#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp>
+#include <list>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+static const size_t DEFAULT_NUM_XFERS = 16; //num xfers
+static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes
+
+//! Define LIBUSB_CALL when its missing (non-windows)
+#ifndef LIBUSB_CALL
+ #define LIBUSB_CALL
+#endif /*LIBUSB_CALL*/
+
+/*!
+ * All libusb callback functions should be marked with the LIBUSB_CALL macro
+ * to ensure that they are compiled with the same calling convention as libusb.
+ */
+
+//! helper function: handles all async callbacks
+static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){
+ *(static_cast<bool *>(lut->user_data)) = true;
+}
+
+/*!
+ * Wait for a managed buffer to become complete.
+ *
+ * This routine processes async events until the transaction completes.
+ * We must call the libusb handle events in a loop because the handler
+ * may complete managed buffers other than the one we are waiting on.
+ *
+ * We cannot determine if handle events timed out or processed an event.
+ * Therefore, the timeout condition is handled by using boost system time.
+ *
+ * \param ctx the libusb context structure
+ * \param timeout the wait timeout in seconds
+ * \param completed a reference to the completed flag
+ * \return true for completion, false for timeout
+ */
+UHD_INLINE bool wait_for_completion(libusb_context *ctx, const double timeout, bool &completed){
+ //already completed by a previous call?
+ if (completed) return true;
+
+ //perform a non-blocking event handle
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+ libusb_handle_events_timeout(ctx, &tv);
+ if (completed) return true;
+
+ //finish the rest with a timeout loop
+ const boost::system_time timeout_time = boost::get_system_time() + boost::posix_time::microseconds(long(timeout*1000000));
+ while (not completed and (boost::get_system_time() < timeout_time)){
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 10000; /*10ms*/
+ libusb_handle_events_timeout(ctx, &tv);
+ }
+
+ return completed;
+}
+
+/***********************************************************************
+ * Reusable managed receiver buffer:
+ * - Associated with a particular libusb transfer struct.
+ * - Submits the transfer to libusb in the release method.
+ **********************************************************************/
+class libusb_zero_copy_mrb : public managed_recv_buffer{
+public:
+ libusb_zero_copy_mrb(libusb_transfer *lut, const size_t frame_size):
+ _ctx(libusb::session::get_global_session()->get_context()),
+ _lut(lut), _frame_size(frame_size) { /* NOP */ }
+
+ void release(void){
+ completed = false;
+ _lut->length = _frame_size; //always reset length
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ }
+
+ sptr get_new(const double timeout, size_t &index){
+ if (wait_for_completion(_ctx, timeout, completed)){
+ index++;
+ return make(this, _lut->buffer, _lut->actual_length);
+ }
+ return managed_recv_buffer::sptr();
+ }
+
+ bool completed;
+
+private:
+ libusb_context *_ctx;
+ libusb_transfer *_lut;
+ const size_t _frame_size;
+};
+
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - Associated with a particular libusb transfer struct.
+ * - Submits the transfer to libusb in the commit method.
+ **********************************************************************/
+class libusb_zero_copy_msb : public managed_send_buffer{
+public:
+ libusb_zero_copy_msb(libusb_transfer *lut, const size_t frame_size):
+ _ctx(libusb::session::get_global_session()->get_context()),
+ _lut(lut), _frame_size(frame_size) { completed = true; }
+
+ void release(void){
+ completed = false;
+ _lut->length = size();
+ UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0);
+ }
+
+ sptr get_new(const double timeout, size_t &index){
+ if (wait_for_completion(_ctx, timeout, completed)){
+ index++;
+ return make(this, _lut->buffer, _frame_size);
+ }
+ return managed_send_buffer::sptr();
+ }
+
+ bool completed;
+
+private:
+ libusb_context *_ctx;
+ libusb_transfer *_lut;
+ const size_t _frame_size;
+};
+
+/***********************************************************************
+ * USB zero_copy device class
+ **********************************************************************/
+class libusb_zero_copy_impl : public usb_zero_copy{
+public:
+
+ libusb_zero_copy_impl(
+ libusb::device_handle::sptr handle,
+ const size_t recv_interface,
+ const size_t recv_endpoint,
+ const size_t send_interface,
+ const size_t send_endpoint,
+ const device_addr_t &hints
+ ):
+ _handle(handle),
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", DEFAULT_XFER_SIZE))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_XFERS))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", DEFAULT_XFER_SIZE))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_XFERS))),
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _next_recv_buff_index(0),
+ _next_send_buff_index(0)
+ {
+ _handle->claim_interface(recv_interface);
+ _handle->claim_interface(send_interface);
+
+ //flush the buffers out of the recv endpoint
+ //limit the flushing to at most one second
+ for (size_t i = 0; i < 100; i++)
+ {
+ unsigned char buff[512];
+ int transfered = 0;
+ const int status = libusb_bulk_transfer(
+ _handle->get(), // dev_handle
+ (recv_endpoint & 0x7f) | 0x80, // endpoint
+ static_cast<unsigned char *>(buff),
+ sizeof(buff),
+ &transfered, //bytes xfered
+ 10 //timeout ms
+ );
+ if (status == LIBUSB_ERROR_TIMEOUT) break;
+ }
+
+ //allocate libusb transfer structs and managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+
+ libusb_transfer *lut = libusb_alloc_transfer(0);
+ UHD_ASSERT_THROW(lut != NULL);
+
+ _mrb_pool.push_back(boost::make_shared<libusb_zero_copy_mrb>(lut, this->get_recv_frame_size()));
+
+ libusb_fill_bulk_transfer(
+ lut, // transfer
+ _handle->get(), // dev_handle
+ (recv_endpoint & 0x7f) | 0x80, // endpoint
+ static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer
+ this->get_recv_frame_size(), // length
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void *>(&_mrb_pool.back()->completed), // user_data
+ 0 // timeout (ms)
+ );
+
+ _all_luts.push_back(lut);
+ _mrb_pool.back()->release();
+ }
+
+ //allocate libusb transfer structs and managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+
+ libusb_transfer *lut = libusb_alloc_transfer(0);
+ UHD_ASSERT_THROW(lut != NULL);
+
+ _msb_pool.push_back(boost::make_shared<libusb_zero_copy_msb>(lut, this->get_send_frame_size()));
+
+ libusb_fill_bulk_transfer(
+ lut, // transfer
+ _handle->get(), // dev_handle
+ (send_endpoint & 0x7f) | 0x00, // endpoint
+ static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer
+ this->get_send_frame_size(), // length
+ libusb_transfer_cb_fn(&libusb_async_cb), // callback
+ static_cast<void *>(&_msb_pool.back()->completed), // user_data
+ 0 // timeout
+ );
+
+ _all_luts.push_back(lut);
+ }
+ }
+
+ ~libusb_zero_copy_impl(void){
+ libusb_context *ctx = libusb::session::get_global_session()->get_context();
+
+ //cancel all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_cancel_transfer(lut);
+ }
+
+ //process all transfers until timeout occurs
+ bool completed = false;
+ wait_for_completion(ctx, 0.01, completed);
+
+ //free all transfers
+ BOOST_FOREACH(libusb_transfer *lut, _all_luts){
+ libusb_free_transfer(lut);
+ }
+
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
+ }
+
+ size_t get_num_recv_frames(void) const { return _num_recv_frames; }
+ size_t get_num_send_frames(void) const { return _num_send_frames; }
+
+ size_t get_recv_frame_size(void) const { return _recv_frame_size; }
+ size_t get_send_frame_size(void) const { return _send_frame_size; }
+
+private:
+ libusb::device_handle::sptr _handle;
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
+
+ //! Storage for transfer related objects
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ std::vector<boost::shared_ptr<libusb_zero_copy_mrb> > _mrb_pool;
+ std::vector<boost::shared_ptr<libusb_zero_copy_msb> > _msb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
+
+ //! a list of all transfer structs we allocated
+ std::list<libusb_transfer *> _all_luts;
+
+
+};
+
+/***********************************************************************
+ * USB zero_copy make functions
+ **********************************************************************/
+usb_zero_copy::sptr usb_zero_copy::make(
+ usb_device_handle::sptr handle,
+ const size_t recv_interface,
+ const size_t recv_endpoint,
+ const size_t send_interface,
+ const size_t send_endpoint,
+ const device_addr_t &hints
+){
+ libusb::device_handle::sptr dev_handle(libusb::device_handle::get_cached_handle(
+ boost::static_pointer_cast<libusb::special_handle>(handle)->get_device()
+ ));
+ return sptr(new libusb_zero_copy_impl(
+ dev_handle, recv_interface, recv_endpoint, send_interface, send_endpoint, hints
+ ));
+}
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
new file mode 100644
index 000000000..5a75d5f0d
--- /dev/null
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -0,0 +1,631 @@
+//
+// Copyright 2011-2013 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/transport/vrt_if_packet.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/dynamic_bitset.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/format.hpp>
+#include <boost/bind.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/barrier.hpp>
+#include <iostream>
+#include <vector>
+
+namespace uhd{ namespace transport{ namespace sph{
+
+UHD_INLINE boost::uint32_t get_context_code(
+ const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info
+){
+ //extract the context word (we dont know the endianness so mirror the bytes)
+ boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] |
+ uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]);
+ return word0 & 0xff;
+}
+
+typedef boost::function<void(void)> handle_overflow_type;
+static inline void handle_overflow_nop(void){}
+
+/***********************************************************************
+ * Super receive packet handler
+ *
+ * A receive packet handler represents a group of channels.
+ * The channel group shares a common sample rate.
+ * All channels are received in unison in recv().
+ **********************************************************************/
+class recv_packet_handler{
+public:
+ typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
+ typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
+ //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
+
+ /*!
+ * Make a new packet handler for receive
+ * \param size the number of transport channels
+ */
+ recv_packet_handler(const size_t size = 1):
+ _queue_error_for_next_call(false),
+ _buffers_infos_index(0)
+ {
+ this->resize(size);
+ set_alignment_failure_threshold(1000);
+ }
+
+ ~recv_packet_handler(void){
+ _task_barrier.interrupt();
+ _task_handlers.clear();
+ }
+
+ //! Resize the number of transport channels
+ void resize(const size_t size){
+ if (this->size() == size) return;
+ _task_handlers.clear();
+ _props.resize(size);
+ //re-initialize all buffers infos by re-creating the vector
+ _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
+ _task_barrier.resize(size);
+ _task_handlers.resize(size);
+ for (size_t i = 1/*skip 0*/; i < size; i++){
+ _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
+ };
+ }
+
+ //! Get the channel width of this handler
+ size_t size(void) const{
+ return _props.size();
+ }
+
+ //! Setup the vrt unpacker function and offset
+ void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){
+ _vrt_unpacker = vrt_unpacker;
+ _header_offset_words32 = header_offset_words32;
+ }
+
+ /*!
+ * Set the threshold for alignment failure.
+ * How many packets throw out before giving up?
+ * \param threshold number of packets per channel
+ */
+ void set_alignment_failure_threshold(const size_t threshold){
+ _alignment_faulure_threshold = threshold*this->size();
+ }
+
+ //! Set the rate of ticks per second
+ void set_tick_rate(const double rate){
+ _tick_rate = rate;
+ }
+
+ //! Set the rate of samples per second
+ void set_samp_rate(const double rate){
+ _samp_rate = rate;
+ }
+
+ /*!
+ * Set the function to get a managed buffer.
+ * \param xport_chan which transport channel
+ * \param get_buff the getter function
+ */
+ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff, const bool flush = false){
+ if (flush){
+ while (get_buff(0.0));
+ }
+ _props.at(xport_chan).get_buff = get_buff;
+ }
+
+ //! Set the conversion routine for all channels
+ void set_converter(const uhd::convert::id_type &id){
+ _num_outputs = id.num_outputs;
+ _converter = uhd::convert::get_converter(id)();
+ this->set_scale_factor(1/32767.); //update after setting converter
+ _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.input_format);
+ _bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.output_format);
+ }
+
+ //! Set the transport channel's overflow handler
+ void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){
+ _props.at(xport_chan).handle_overflow = handle_overflow;
+ }
+
+ //! Set the scale factor used in float conversion
+ void set_scale_factor(const double scale_factor){
+ _converter->set_scalar(scale_factor);
+ }
+
+ /*******************************************************************
+ * Receive:
+ * The entry point for the fast-path receive calls.
+ * Dispatch into combinations of single packet receive calls.
+ ******************************************************************/
+ UHD_INLINE size_t recv(
+ const uhd::rx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t &metadata,
+ const double timeout,
+ const bool one_packet
+ ){
+ //handle metadata queued from a previous receive
+ if (_queue_error_for_next_call){
+ _queue_error_for_next_call = false;
+ metadata = _queue_metadata;
+ //We want to allow a full buffer recv to be cut short by a timeout,
+ //but do not want to generate an inline timeout message packet.
+ if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0;
+ }
+
+ size_t accum_num_samps = recv_one_packet(
+ buffs, nsamps_per_buff, metadata, timeout
+ );
+
+ if (one_packet) return accum_num_samps;
+
+ //first recv had an error code set, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps;
+
+ //loop until buffer is filled or error code
+ while(accum_num_samps < nsamps_per_buff){
+ size_t num_samps = recv_one_packet(
+ buffs, nsamps_per_buff - accum_num_samps, _queue_metadata,
+ timeout, accum_num_samps*_bytes_per_cpu_item
+ );
+
+ //metadata had an error code set, store for next call and return
+ if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){
+ _queue_error_for_next_call = true;
+ break;
+ }
+ accum_num_samps += num_samps;
+ }
+ return accum_num_samps;
+ }
+
+private:
+
+ vrt_unpacker_type _vrt_unpacker;
+ size_t _header_offset_words32;
+ double _tick_rate, _samp_rate;
+ bool _queue_error_for_next_call;
+ size_t _alignment_faulure_threshold;
+ rx_metadata_t _queue_metadata;
+ struct xport_chan_props_type{
+ xport_chan_props_type(void):
+ packet_count(0),
+ handle_overflow(&handle_overflow_nop)
+ {}
+ get_buff_type get_buff;
+ size_t packet_count;
+ handle_overflow_type handle_overflow;
+ };
+ std::vector<xport_chan_props_type> _props;
+ size_t _num_outputs;
+ size_t _bytes_per_otw_item; //used in conversion
+ size_t _bytes_per_cpu_item; //used in conversion
+ uhd::convert::converter::sptr _converter; //used in conversion
+
+ //! information stored for a received buffer
+ struct per_buffer_info_type{
+ managed_recv_buffer::sptr buff;
+ const boost::uint32_t *vrt_hdr;
+ vrt::if_packet_info_t ifpi;
+ time_spec_t time;
+ const char *copy_buff;
+ };
+
+ //!information stored for a set of aligned buffers
+ struct buffers_info_type : std::vector<per_buffer_info_type> {
+ buffers_info_type(const size_t size):
+ std::vector<per_buffer_info_type>(size),
+ indexes_todo(size, true),
+ alignment_time_valid(false),
+ data_bytes_to_copy(0),
+ fragment_offset_in_samps(0)
+ {/* NOP */}
+ boost::dynamic_bitset<> indexes_todo; //used in alignment logic
+ time_spec_t alignment_time; //used in alignment logic
+ bool alignment_time_valid; //used in alignment logic
+ size_t data_bytes_to_copy; //keeps track of state
+ size_t fragment_offset_in_samps; //keeps track of state
+ rx_metadata_t metadata; //packet description
+ };
+
+ //! a circular queue of buffer infos
+ std::vector<buffers_info_type> _buffers_infos;
+ size_t _buffers_infos_index;
+ buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];}
+ buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];}
+ buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];}
+ void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;}
+
+ //! possible return options for the packet receiver
+ enum packet_type{
+ PACKET_IF_DATA,
+ PACKET_TIMESTAMP_ERROR,
+ PACKET_INLINE_MESSAGE,
+ PACKET_TIMEOUT_ERROR,
+ PACKET_SEQUENCE_ERROR
+ };
+
+ /*******************************************************************
+ * Get and process a single packet from the transport:
+ * Receive a single packet at the given index.
+ * Extract all the relevant info and store.
+ * Check the info to determine the return code.
+ ******************************************************************/
+ UHD_INLINE packet_type get_and_process_single_packet(
+ const size_t index,
+ buffers_info_type &prev_buffer_info,
+ buffers_info_type &curr_buffer_info,
+ double timeout
+ ){
+ //get a single packet from the transport layer
+ managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff;
+ buff = _props[index].get_buff(timeout);
+ if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
+
+ //bounds check before extract
+ size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
+ if (num_packet_words32 <= _header_offset_words32){
+ throw std::runtime_error("recv buffer smaller than vrt packet offset");
+ }
+
+ //extract packet info
+ per_buffer_info_type &info = curr_buffer_info[index];
+ info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
+ info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;
+ _vrt_unpacker(info.vrt_hdr, info.ifpi);
+ info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true
+ info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+
+ //--------------------------------------------------------------
+ //-- Determine return conditions:
+ //-- The order of these checks is HOLY.
+ //--------------------------------------------------------------
+
+ //1) check for inline IF message packets
+ if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
+ return PACKET_INLINE_MESSAGE;
+ }
+
+ //2) check for sequence errors
+ #ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t expected_packet_count = _props[index].packet_count;
+ _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
+ if (expected_packet_count != info.ifpi.packet_count){
+ return PACKET_SEQUENCE_ERROR;
+ }
+ #endif
+
+ //3) check for out of order timestamps
+ if (info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
+ return PACKET_TIMESTAMP_ERROR;
+ }
+
+ //4) otherwise the packet is normal!
+ return PACKET_IF_DATA;
+ }
+
+ /*******************************************************************
+ * Alignment check:
+ * Check the received packet for alignment and mark accordingly.
+ ******************************************************************/
+ UHD_INLINE void alignment_check(
+ const size_t index, buffers_info_type &info
+ ){
+ //if alignment time was not valid or if the sequence id is newer:
+ // use this index's time as the alignment time
+ // reset the indexes list and remove this index
+ if (not info.alignment_time_valid or info[index].time > info.alignment_time){
+ info.alignment_time_valid = true;
+ info.alignment_time = info[index].time;
+ info.indexes_todo.set();
+ info.indexes_todo.reset(index);
+ info.data_bytes_to_copy = info[index].ifpi.num_payload_bytes;
+ }
+
+ //if the sequence id matches:
+ // remove this index from the list and continue
+ else if (info[index].time == info.alignment_time){
+ info.indexes_todo.reset(index);
+ }
+
+ //if the sequence id is older:
+ // continue with the same index to try again
+ //else if (info[index].time < info.alignment_time)...
+ }
+
+ /*******************************************************************
+ * Get aligned buffers:
+ * Iterate through each index and try to accumulate aligned buffers.
+ * Handle all of the edge cases like inline messages and errors.
+ * The logic will throw out older packets until it finds a match.
+ ******************************************************************/
+ UHD_INLINE void get_aligned_buffs(double timeout){
+
+ increment_buffer_info(); //increment to next buffer
+ buffers_info_type &prev_info = get_prev_buffer_info();
+ buffers_info_type &curr_info = get_curr_buffer_info();
+ buffers_info_type &next_info = get_next_buffer_info();
+
+ //Loop until we get a message of an aligned set of buffers:
+ // - Receive a single packet and extract its info.
+ // - Handle the packet type yielded by the receive.
+ // - Check the timestamps for alignment conditions.
+ size_t iterations = 0;
+ while (curr_info.indexes_todo.any()){
+
+ //get the index to process for this iteration
+ const size_t index = curr_info.indexes_todo.find_first();
+ packet_type packet;
+
+ //receive a single packet from the transport
+ try{
+ packet = get_and_process_single_packet(
+ index, prev_info, curr_info, timeout
+ );
+ }
+
+ //handle the case when the get packet throws
+ catch(const std::exception &e){
+ UHD_MSG(error) << boost::format(
+ "The receive packet handler caught an exception.\n%s"
+ ) % e.what() << std::endl;
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = false;
+ curr_info.metadata.time_spec = time_spec_t(0.0);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return;
+ }
+
+ switch(packet){
+ case PACKET_IF_DATA:
+ alignment_check(index, curr_info);
+ break;
+
+ case PACKET_TIMESTAMP_ERROR:
+ //If the user changes the device time while streaming or without flushing,
+ //we can receive a packet that comes before the previous packet in time.
+ //This could cause the alignment logic to discard future received packets.
+ //Therefore, when this occurs, we reset the info to restart from scratch.
+ if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){
+ curr_info.alignment_time_valid = false;
+ }
+ alignment_check(index, curr_info);
+ break;
+
+ case PACKET_INLINE_MESSAGE:
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsf;
+ curr_info.metadata.time_spec = next_info[index].time;
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
+ if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW){
+ _props[index].handle_overflow();
+ UHD_MSG(fastpath) << "O";
+ }
+ return;
+
+ case PACKET_TIMEOUT_ERROR:
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = false;
+ curr_info.metadata.time_spec = time_spec_t(0.0);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return;
+
+ case PACKET_SEQUENCE_ERROR:
+ alignment_check(index, curr_info);
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
+ curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t::from_ticks(
+ prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_otw_item, _samp_rate);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ UHD_MSG(fastpath) << "O";
+ return;
+
+ }
+
+ //too many iterations: detect alignment failure
+ if (iterations++ > _alignment_faulure_threshold){
+ UHD_MSG(error) << boost::format(
+ "The receive packet handler failed to time-align packets.\n"
+ "%u received packets were processed by the handler.\n"
+ "However, a timestamp match could not be determined.\n"
+ ) % iterations << std::endl;
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = false;
+ curr_info.metadata.time_spec = time_spec_t(0.0);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return;
+ }
+
+ }
+
+ //set the metadata from the buffer information at index zero
+ curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsf;
+ curr_info.metadata.time_spec = curr_info[0].time;
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = curr_info[0].ifpi.sob;
+ curr_info.metadata.end_of_burst = curr_info[0].ifpi.eob;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ }
+
+ /*******************************************************************
+ * Receive a single packet:
+ * Handles fragmentation, messages, errors, and copy-conversion.
+ * When no fragments are available, call the get aligned buffers.
+ * Then copy-convert available data into the user's IO buffers.
+ ******************************************************************/
+ UHD_INLINE size_t recv_one_packet(
+ const uhd::rx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t &metadata,
+ const double timeout,
+ const size_t buffer_offset_bytes = 0
+ ){
+ //get the next buffer if the current one has expired
+ if (get_curr_buffer_info().data_bytes_to_copy == 0){
+
+ //reset current buffer info members for reuse
+ get_curr_buffer_info().fragment_offset_in_samps = 0;
+ get_curr_buffer_info().alignment_time_valid = false;
+ get_curr_buffer_info().indexes_todo.set();
+
+ //perform receive with alignment logic
+ get_aligned_buffs(timeout);
+ }
+
+ buffers_info_type &info = get_curr_buffer_info();
+ metadata = info.metadata;
+
+ //interpolate the time spec (useful when this is a fragment)
+ metadata.time_spec += time_spec_t::from_ticks(info.fragment_offset_in_samps, _samp_rate);
+
+ //extract the number of samples available to copy
+ const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_otw_item;
+ const size_t nsamps_to_copy = std::min(nsamps_per_buff*_num_outputs, nsamps_available);
+ const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_otw_item;
+ const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_num_outputs;
+
+ //setup the data to share with converter threads
+ _convert_nsamps = nsamps_to_copy_per_io_buff;
+ _convert_buffs = &buffs;
+ _convert_buffer_offset_bytes = buffer_offset_bytes;
+ _convert_bytes_to_copy = bytes_to_copy;
+
+ //perform N channels of conversion
+ converter_thread_task(0);
+
+ //update the copy buffer's availability
+ info.data_bytes_to_copy -= bytes_to_copy;
+
+ //setup the fragment flags and offset
+ metadata.more_fragments = info.data_bytes_to_copy != 0;
+ metadata.fragment_offset = info.fragment_offset_in_samps;
+ info.fragment_offset_in_samps += nsamps_to_copy; //set for next call
+
+ return nsamps_to_copy_per_io_buff;
+ }
+
+ /*******************************************************************
+ * Perform one thread's work of the conversion task.
+ * The entry and exit use a dual synchronization barrier,
+ * to wait for data to become ready and block until completion.
+ ******************************************************************/
+ UHD_INLINE void converter_thread_task(const size_t index)
+ {
+ _task_barrier.wait();
+
+ //shortcut references to local data structures
+ buffers_info_type &buff_info = get_curr_buffer_info();
+ per_buffer_info_type &info = buff_info[index];
+ const rx_streamer::buffs_type &buffs = *_convert_buffs;
+
+ //fill IO buffs with pointers into the output buffer
+ void *io_buffs[4/*max interleave*/];
+ for (size_t i = 0; i < _num_outputs; i++){
+ char *b = reinterpret_cast<char *>(buffs[index*_num_outputs + i]);
+ io_buffs[i] = b + _convert_buffer_offset_bytes;
+ }
+ const ref_vector<void *> out_buffs(io_buffs, _num_outputs);
+
+ //perform the conversion operation
+ _converter->conv(info.copy_buff, out_buffs, _convert_nsamps);
+
+ //advance the pointer for the source buffer
+ info.copy_buff += _convert_bytes_to_copy;
+
+ //release the buffer if fully consumed
+ if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
+ info.buff.reset(); //effectively a release
+ }
+
+ if (index == 0) _task_barrier.wait_others();
+ }
+
+ //! Shared variables for the worker threads
+ reusable_barrier _task_barrier;
+ std::vector<task::sptr> _task_handlers;
+ size_t _convert_nsamps;
+ const rx_streamer::buffs_type *_convert_buffs;
+ size_t _convert_buffer_offset_bytes;
+ size_t _convert_bytes_to_copy;
+
+};
+
+class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
+public:
+ recv_packet_streamer(const size_t max_num_samps){
+ _max_num_samps = max_num_samps;
+ }
+
+ size_t get_num_channels(void) const{
+ return this->size();
+ }
+
+ size_t get_max_num_samps(void) const{
+ return _max_num_samps;
+ }
+
+ size_t recv(
+ const rx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t &metadata,
+ const double timeout,
+ const bool one_packet
+ ){
+ return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet);
+ }
+
+private:
+ size_t _max_num_samps;
+};
+
+}}} //namespace
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
new file mode 100644
index 000000000..726742327
--- /dev/null
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -0,0 +1,346 @@
+//
+// Copyright 2011-2013 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/stream.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/transport/vrt_if_packet.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <iostream>
+#include <vector>
+
+namespace uhd{ namespace transport{ namespace sph{
+
+/***********************************************************************
+ * Super send packet handler
+ *
+ * A send packet handler represents a group of channels.
+ * The channel group shares a common sample rate.
+ * All channels are sent in unison in send().
+ **********************************************************************/
+class send_packet_handler{
+public:
+ typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
+ typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &);
+ //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
+
+ /*!
+ * Make a new packet handler for send
+ * \param size the number of transport channels
+ */
+ send_packet_handler(const size_t size = 1):
+ _next_packet_seq(0)
+ {
+ this->resize(size);
+ }
+
+ ~send_packet_handler(void){
+ _task_barrier.interrupt();
+ _task_handlers.clear();
+ }
+
+ //! Resize the number of transport channels
+ void resize(const size_t size){
+ if (this->size() == size) return;
+ _task_handlers.clear();
+ _props.resize(size);
+ static const boost::uint64_t zero = 0;
+ _zero_buffs.resize(size, &zero);
+ _task_barrier.resize(size);
+ _task_handlers.resize(size);
+ for (size_t i = 1/*skip 0*/; i < size; i++){
+ _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
+ };
+ }
+
+ //! Get the channel width of this handler
+ size_t size(void) const{
+ return _props.size();
+ }
+
+ //! Setup the vrt packer function and offset
+ void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){
+ _vrt_packer = vrt_packer;
+ _header_offset_words32 = header_offset_words32;
+ }
+
+ //! Set the stream ID for a specific channel (or no SID)
+ void set_xport_chan_sid(const size_t xport_chan, const bool has_sid, const boost::uint32_t sid = 0){
+ _props.at(xport_chan).has_sid = has_sid;
+ _props.at(xport_chan).sid = sid;
+ }
+
+ //! Set the rate of ticks per second
+ void set_tick_rate(const double rate){
+ _tick_rate = rate;
+ }
+
+ //! Set the rate of samples per second
+ void set_samp_rate(const double rate){
+ _samp_rate = rate;
+ }
+
+ /*!
+ * Set the function to get a managed buffer.
+ * \param xport_chan which transport channel
+ * \param get_buff the getter function
+ */
+ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){
+ _props.at(xport_chan).get_buff = get_buff;
+ }
+
+ //! Set the conversion routine for all channels
+ void set_converter(const uhd::convert::id_type &id){
+ _num_inputs = id.num_inputs;
+ _converter = uhd::convert::get_converter(id)();
+ this->set_scale_factor(32767.); //update after setting converter
+ _bytes_per_otw_item = uhd::convert::get_bytes_per_item(id.output_format);
+ _bytes_per_cpu_item = uhd::convert::get_bytes_per_item(id.input_format);
+ }
+
+ /*!
+ * Set the maximum number of samples per host packet.
+ * Ex: A USRP1 in dual channel mode would be half.
+ * \param num_samps the maximum samples in a packet
+ */
+ void set_max_samples_per_packet(const size_t num_samps){
+ _max_samples_per_packet = num_samps;
+ }
+
+ //! Set the scale factor used in float conversion
+ void set_scale_factor(const double scale_factor){
+ _converter->set_scalar(scale_factor);
+ }
+
+ /*******************************************************************
+ * Send:
+ * The entry point for the fast-path send calls.
+ * Dispatch into combinations of single packet send calls.
+ ******************************************************************/
+ UHD_INLINE size_t send(
+ const uhd::tx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t &metadata,
+ const double timeout
+ ){
+ //translate the metadata to vrt if packet info
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ //if_packet_info.has_sid = false; //set per channel
+ if_packet_info.has_cid = false;
+ if_packet_info.has_tlr = true;
+ if_packet_info.has_tsi = false;
+ if_packet_info.has_tsf = metadata.has_time_spec;
+ if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate);
+ if_packet_info.sob = metadata.start_of_burst;
+ if_packet_info.eob = metadata.end_of_burst;
+
+ if (nsamps_per_buff <= _max_samples_per_packet){
+
+ //TODO remove this code when sample counts of zero are supported by hardware
+ #ifndef SSPH_DONT_PAD_TO_ONE
+ if (nsamps_per_buff == 0) return send_one_packet(
+ _zero_buffs, 1, if_packet_info, timeout
+ ) & 0x0;
+ #endif
+
+ return send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout);
+ }
+ size_t total_num_samps_sent = 0;
+
+ //false until final fragment
+ if_packet_info.eob = false;
+
+ const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet;
+ const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1;
+
+ //loop through the following fragment indexes
+ for (size_t i = 0; i < num_fragments; i++){
+
+ //send a fragment with the helper function
+ const size_t num_samps_sent = send_one_packet(
+ buffs, _max_samples_per_packet,
+ if_packet_info, timeout,
+ total_num_samps_sent*_bytes_per_cpu_item
+ );
+ total_num_samps_sent += num_samps_sent;
+ if (num_samps_sent == 0) return total_num_samps_sent;
+
+ //setup metadata for the next fragment
+ const time_spec_t time_spec = metadata.time_spec + time_spec_t::from_ticks(total_num_samps_sent, _samp_rate);
+ if_packet_info.tsf = time_spec.to_ticks(_tick_rate);
+ if_packet_info.sob = false;
+
+ }
+
+ //send the final fragment with the helper function
+ if_packet_info.eob = metadata.end_of_burst;
+ return total_num_samps_sent + send_one_packet(
+ buffs, final_length,
+ if_packet_info, timeout,
+ total_num_samps_sent*_bytes_per_cpu_item
+ );
+ }
+
+private:
+
+ vrt_packer_type _vrt_packer;
+ size_t _header_offset_words32;
+ double _tick_rate, _samp_rate;
+ struct xport_chan_props_type{
+ xport_chan_props_type(void):has_sid(false){}
+ get_buff_type get_buff;
+ bool has_sid;
+ boost::uint32_t sid;
+ managed_send_buffer::sptr buff;
+ };
+ std::vector<xport_chan_props_type> _props;
+ size_t _num_inputs;
+ size_t _bytes_per_otw_item; //used in conversion
+ size_t _bytes_per_cpu_item; //used in conversion
+ uhd::convert::converter::sptr _converter; //used in conversion
+ size_t _max_samples_per_packet;
+ std::vector<const void *> _zero_buffs;
+ size_t _next_packet_seq;
+
+ /*******************************************************************
+ * Send a single packet:
+ ******************************************************************/
+ UHD_INLINE size_t send_one_packet(
+ const uhd::tx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ vrt::if_packet_info_t &if_packet_info,
+ const double timeout,
+ const size_t buffer_offset_bytes = 0
+ ){
+ //load the rest of the if_packet_info in here
+ if_packet_info.num_payload_bytes = nsamps_per_buff*_num_inputs*_bytes_per_otw_item;
+ if_packet_info.num_payload_words32 = (if_packet_info.num_payload_bytes + 3/*round up*/)/sizeof(boost::uint32_t);
+ if_packet_info.packet_count = _next_packet_seq;
+
+ //get a buffer for each channel or timeout
+ BOOST_FOREACH(xport_chan_props_type &props, _props){
+ if (not props.buff) props.buff = props.get_buff(timeout);
+ if (not props.buff) return 0; //timeout
+ }
+
+ //setup the data to share with converter threads
+ _convert_nsamps = nsamps_per_buff;
+ _convert_buffs = &buffs;
+ _convert_buffer_offset_bytes = buffer_offset_bytes;
+ _convert_if_packet_info = &if_packet_info;
+
+ //perform N channels of conversion
+ converter_thread_task(0);
+
+ _next_packet_seq++; //increment sequence after commits
+ return nsamps_per_buff;
+ }
+
+ /*******************************************************************
+ * Perform one thread's work of the conversion task.
+ * The entry and exit use a dual synchronization barrier,
+ * to wait for data to become ready and block until completion.
+ ******************************************************************/
+ UHD_INLINE void converter_thread_task(const size_t index)
+ {
+ _task_barrier.wait();
+
+ //shortcut references to local data structures
+ managed_send_buffer::sptr &buff = _props[index].buff;
+ vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
+ const tx_streamer::buffs_type &buffs = *_convert_buffs;
+
+ //fill IO buffs with pointers into the output buffer
+ const void *io_buffs[4/*max interleave*/];
+ for (size_t i = 0; i < _num_inputs; i++){
+ const char *b = reinterpret_cast<const char *>(buffs[index*_num_inputs + i]);
+ io_buffs[i] = b + _convert_buffer_offset_bytes;
+ }
+ const ref_vector<const void *> in_buffs(io_buffs, _num_inputs);
+
+ //pack metadata into a vrt header
+ boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
+ if_packet_info.has_sid = _props[index].has_sid;
+ if_packet_info.sid = _props[index].sid;
+ _vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
+
+ //perform the conversion operation
+ _converter->conv(in_buffs, otw_mem, _convert_nsamps);
+
+ //commit the samples to the zero-copy interface
+ const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
+ buff->commit(num_vita_words32*sizeof(boost::uint32_t));
+ buff.reset(); //effectively a release
+
+ if (index == 0) _task_barrier.wait_others();
+ }
+
+ //! Shared variables for the worker threads
+ reusable_barrier _task_barrier;
+ std::vector<task::sptr> _task_handlers;
+ size_t _convert_nsamps;
+ const tx_streamer::buffs_type *_convert_buffs;
+ size_t _convert_buffer_offset_bytes;
+ vrt::if_packet_info_t *_convert_if_packet_info;
+
+};
+
+class send_packet_streamer : public send_packet_handler, public tx_streamer{
+public:
+ send_packet_streamer(const size_t max_num_samps){
+ _max_num_samps = max_num_samps;
+ this->set_max_samples_per_packet(_max_num_samps);
+ }
+
+ size_t get_num_channels(void) const{
+ return this->size();
+ }
+
+ size_t get_max_num_samps(void) const{
+ return _max_num_samps;
+ }
+
+ size_t send(
+ const tx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t &metadata,
+ const double timeout
+ ){
+ return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout);
+ }
+
+private:
+ size_t _max_num_samps;
+};
+
+}}} //namespace
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */
diff --git a/host/lib/transport/udp_common.hpp b/host/lib/transport/udp_common.hpp
new file mode 100644
index 000000000..bf4712613
--- /dev/null
+++ b/host/lib/transport/udp_common.hpp
@@ -0,0 +1,60 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP
+
+#include <uhd/config.hpp>
+#include <boost/asio.hpp>
+
+namespace uhd{ namespace transport{
+
+ typedef boost::shared_ptr<boost::asio::ip::udp::socket> socket_sptr;
+
+ /*!
+ * Wait for the socket to become ready for a receive operation.
+ * \param sock_fd the open socket file descriptor
+ * \param timeout the timeout duration in seconds
+ * \return true when the socket is ready for receive
+ */
+ UHD_INLINE bool wait_for_recv_ready(int sock_fd, double timeout){
+ //setup timeval for timeout
+ timeval tv;
+ //If the tv_usec > 1 second on some platforms, select will
+ //error EINVAL: An invalid timeout interval was specified.
+ tv.tv_sec = int(timeout);
+ tv.tv_usec = int(timeout*1000000)%1000000;
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(sock_fd, &rset);
+
+ //http://www.gnu.org/s/hello/manual/libc/Interrupted-Primitives.html
+ //This macro is provided with gcc to properly deal with EINTR.
+ //If not provided, define an empty macro, assume that is OK
+ #ifndef TEMP_FAILURE_RETRY
+ #define TEMP_FAILURE_RETRY(x) (x)
+ #endif
+
+ //call select with timeout on receive socket
+ return TEMP_FAILURE_RETRY(::select(sock_fd+1, &rset, NULL, NULL, &tv)) > 0;
+ }
+
+}} //namespace uhd::transport
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */
diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp
new file mode 100644
index 000000000..d6c55eae7
--- /dev/null
+++ b/host/lib/transport/udp_simple.cpp
@@ -0,0 +1,134 @@
+//
+// Copyright 2010-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "udp_common.hpp"
+#include <uhd/transport/udp_simple.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/format.hpp>
+
+using namespace uhd::transport;
+namespace asio = boost::asio;
+
+/***********************************************************************
+ * UDP simple implementation: connected and broadcast
+ **********************************************************************/
+class udp_simple_impl : public udp_simple{
+public:
+ udp_simple_impl(
+ const std::string &addr, const std::string &port, bool bcast, bool connect
+ ):_connected(connect){
+ UHD_LOG << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
+
+ //resolve the address
+ asio::ip::udp::resolver resolver(_io_service);
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
+ _send_endpoint = *resolver.resolve(query);
+
+ //create and open the socket
+ _socket = socket_sptr(new asio::ip::udp::socket(_io_service));
+ _socket->open(asio::ip::udp::v4());
+
+ //allow broadcasting
+ _socket->set_option(asio::socket_base::broadcast(bcast));
+
+ //connect the socket
+ if (connect) _socket->connect(_send_endpoint);
+
+ }
+
+ size_t send(const asio::const_buffer &buff){
+ if (_connected) return _socket->send(asio::buffer(buff));
+ return _socket->send_to(asio::buffer(buff), _send_endpoint);
+ }
+
+ size_t recv(const asio::mutable_buffer &buff, double timeout){
+ if (not wait_for_recv_ready(_socket->native(), timeout)) return 0;
+ return _socket->receive_from(asio::buffer(buff), _recv_endpoint);
+ }
+
+ std::string get_recv_addr(void){
+ return _recv_endpoint.address().to_string();
+ }
+
+private:
+ bool _connected;
+ asio::io_service _io_service;
+ socket_sptr _socket;
+ asio::ip::udp::endpoint _send_endpoint;
+ asio::ip::udp::endpoint _recv_endpoint;
+};
+
+/***********************************************************************
+ * UDP public make functions
+ **********************************************************************/
+udp_simple::sptr udp_simple::make_connected(
+ const std::string &addr, const std::string &port
+){
+ return sptr(new udp_simple_impl(addr, port, false, true /* no bcast, connect */));
+}
+
+udp_simple::sptr udp_simple::make_broadcast(
+ const std::string &addr, const std::string &port
+){
+ return sptr(new udp_simple_impl(addr, port, true, false /* bcast, no connect */));
+}
+
+/***********************************************************************
+ * Simple UART over UDP
+ **********************************************************************/
+#include <boost/thread/thread.hpp>
+class udp_simple_uart_impl : public uhd::uart_iface{
+public:
+ udp_simple_uart_impl(udp_simple::sptr udp){
+ _udp = udp;
+ _len = 0;
+ _off = 0;
+ this->write_uart(""); //send an empty packet to init
+ }
+
+ void write_uart(const std::string &buf){
+ _udp->send(asio::buffer(buf));
+ }
+
+ std::string read_uart(double timeout){
+ std::string line;
+ const boost::system_time exit_time = boost::get_system_time() + boost::posix_time::milliseconds(long(timeout*1000));
+ do{
+ //drain anything in current buffer
+ while (_off < _len){
+ const char ch = _buf[_off]; _off++;
+ line += std::string(1, ch);
+ if (ch == '\n' or ch == '\r') return line;
+ }
+
+ //recv a new packet into the buffer
+ _len = _udp->recv(asio::buffer(_buf), std::max((exit_time - boost::get_system_time()).total_milliseconds()/1000., 0.0));
+ _off = 0;
+
+ } while (_len != 0);
+ return line;
+ }
+
+private:
+ udp_simple::sptr _udp;
+ size_t _len, _off;
+ boost::uint8_t _buf[udp_simple::mtu];
+};
+
+uhd::uart_iface::sptr udp_simple::make_uart(sptr udp){
+ return uart_iface::sptr(new udp_simple_uart_impl(udp));
+}
diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp
new file mode 100644
index 000000000..6fe4e3cad
--- /dev/null
+++ b/host/lib/transport/udp_wsa_zero_copy.cpp
@@ -0,0 +1,300 @@
+//
+// Copyright 2010-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "udp_common.hpp"
+#include <uhd/transport/udp_zero_copy.hpp>
+#include <uhd/transport/udp_simple.hpp> //mtu
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <boost/format.hpp>
+#include <vector>
+
+using namespace uhd;
+using namespace uhd::transport;
+namespace asio = boost::asio;
+
+//A reasonable number of frames for send/recv and async/sync
+static const size_t DEFAULT_NUM_FRAMES = 32;
+
+/***********************************************************************
+ * Check registry for correct fast-path setting (windows only)
+ **********************************************************************/
+#ifdef HAVE_ATLBASE_H
+#define CHECK_REG_SEND_THRESH
+#include <atlbase.h> //CRegKey
+static void check_registry_for_fast_send_threshold(const size_t mtu){
+ static bool warned = false;
+ if (warned) return; //only allow one printed warning per process
+
+ CRegKey reg_key;
+ DWORD threshold = 1024; //system default when threshold is not specified
+ if (
+ reg_key.Open(HKEY_LOCAL_MACHINE, "System\\CurrentControlSet\\Services\\AFD\\Parameters", KEY_READ) != ERROR_SUCCESS or
+ reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) != ERROR_SUCCESS or threshold < mtu
+ ){
+ UHD_MSG(warning) << boost::format(
+ "The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n"
+ "This will negatively affect the transmit performance.\n"
+ "See the transport application notes for more detail.\n"
+ ) % mtu % threshold << std::endl;
+ warned = true;
+ }
+ reg_key.Close();
+}
+#endif /*HAVE_ATLBASE_H*/
+
+/***********************************************************************
+ * Static initialization to take care of WSA init and cleanup
+ **********************************************************************/
+struct uhd_wsa_control{
+ uhd_wsa_control(void){
+ WSADATA wsaData;
+ WSAStartup(MAKEWORD(2, 2), &wsaData); /*windows socket startup */
+ }
+
+ ~uhd_wsa_control(void){
+ WSACleanup();
+ }
+};
+
+/***********************************************************************
+ * Reusable managed receiver buffer:
+ * - Initialize with memory and a release callback.
+ * - Call get new with a length in bytes to re-use.
+ **********************************************************************/
+class udp_zero_copy_asio_mrb : public managed_recv_buffer{
+public:
+ udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
+ _sock_fd(sock_fd), _frame_size(frame_size)
+ {
+ _wsa_buff.buf = reinterpret_cast<char *>(mem);
+ ZeroMemory(&_overlapped, sizeof(_overlapped));
+ _overlapped.hEvent = WSACreateEvent();
+ UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT);
+ this->release(); //makes buffer available via get_new
+ }
+
+ ~udp_zero_copy_asio_mrb(void){
+ WSACloseEvent(_overlapped.hEvent);
+ }
+
+ void release(void){
+ _wsa_buff.len = _frame_size;
+ _flags = 0;
+ WSARecv(_sock_fd, &_wsa_buff, 1, &_wsa_buff.len, &_flags, &_overlapped, NULL);
+ }
+
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ const DWORD result = WSAWaitForMultipleEvents(
+ 1, &_overlapped.hEvent, true, DWORD(timeout*1000), true
+ );
+ if (result == WSA_WAIT_TIMEOUT) return managed_recv_buffer::sptr();
+ index++; //advances the caller's buffer
+
+ WSAGetOverlappedResult(_sock_fd, &_overlapped, &_wsa_buff.len, true, &_flags);
+
+ WSAResetEvent(_overlapped.hEvent);
+ return make(this, _wsa_buff.buf, _wsa_buff.len);
+ }
+
+private:
+ int _sock_fd;
+ const size_t _frame_size;
+ WSAOVERLAPPED _overlapped;
+ WSABUF _wsa_buff;
+ DWORD _flags;
+};
+
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - committing the buffer calls the asynchronous socket send
+ * - getting a new buffer performs the blocking wait for completion
+ **********************************************************************/
+class udp_zero_copy_asio_msb : public managed_send_buffer{
+public:
+ udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
+ _sock_fd(sock_fd), _frame_size(frame_size)
+ {
+ _wsa_buff.buf = reinterpret_cast<char *>(mem);
+ ZeroMemory(&_overlapped, sizeof(_overlapped));
+ _overlapped.hEvent = WSACreateEvent();
+ UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT);
+ WSASetEvent(_overlapped.hEvent); //makes buffer available via get_new
+ }
+
+ ~udp_zero_copy_asio_msb(void){
+ WSACloseEvent(_overlapped.hEvent);
+ }
+
+ void release(void){
+ _wsa_buff.len = size();
+ WSASend(_sock_fd, &_wsa_buff, 1, NULL, 0, &_overlapped, NULL);
+ }
+
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ const DWORD result = WSAWaitForMultipleEvents(
+ 1, &_overlapped.hEvent, true, DWORD(timeout*1000), true
+ );
+ if (result == WSA_WAIT_TIMEOUT) return managed_send_buffer::sptr();
+ index++; //advances the caller's buffer
+
+ WSAResetEvent(_overlapped.hEvent);
+ _wsa_buff.len = _frame_size;
+ return make(this, _wsa_buff.buf, _wsa_buff.len);
+ }
+
+private:
+ int _sock_fd;
+ const size_t _frame_size;
+ WSAOVERLAPPED _overlapped;
+ WSABUF _wsa_buff;
+};
+
+/***********************************************************************
+ * Zero Copy UDP implementation with WSA:
+ *
+ * This is not a true zero copy implementation as each
+ * send and recv requires a copy operation to/from userspace.
+ *
+ * For receive, use a blocking recv() call on the socket.
+ * This has better performance than the overlapped IO.
+ * For send, use overlapped IO to submit async sends.
+ **********************************************************************/
+class udp_zero_copy_wsa_impl : public udp_zero_copy{
+public:
+ typedef boost::shared_ptr<udp_zero_copy_wsa_impl> sptr;
+
+ udp_zero_copy_wsa_impl(
+ const std::string &addr,
+ const std::string &port,
+ const device_addr_t &hints
+ ):
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _next_recv_buff_index(0), _next_send_buff_index(0)
+ {
+ #ifdef CHECK_REG_SEND_THRESH
+ check_registry_for_fast_send_threshold(this->get_send_frame_size());
+ #endif /*CHECK_REG_SEND_THRESH*/
+
+ UHD_MSG(status) << boost::format("Creating WSA UDP transport for %s:%s") % addr % port << std::endl;
+ static uhd_wsa_control uhd_wsa; //makes wsa start happen via lazy initialization
+
+ UHD_ASSERT_THROW(_num_send_frames <= WSA_MAXIMUM_WAIT_EVENTS);
+
+ //resolve the address
+ asio::io_service io_service;
+ asio::ip::udp::resolver resolver(io_service);
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
+ asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+
+ //create the socket
+ _sock_fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED);
+ if (_sock_fd == INVALID_SOCKET){
+ const DWORD error = WSAGetLastError();
+ throw uhd::os_error(str(boost::format("WSASocket() failed with error %d") % error));
+ }
+
+ //set the socket non-blocking for recv
+ //u_long mode = 1;
+ //ioctlsocket(_sock_fd, FIONBIO, &mode);
+
+ //resize the socket buffers
+ const int recv_buff_size = int(hints.cast<double>("recv_buff_size", 0.0));
+ const int send_buff_size = int(hints.cast<double>("send_buff_size", 0.0));
+ if (recv_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_RCVBUF, (const char *)&recv_buff_size, sizeof(recv_buff_size));
+ if (send_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_SNDBUF, (const char *)&send_buff_size, sizeof(send_buff_size));
+
+ //connect the socket so we can send/recv
+ const asio::ip::udp::endpoint::data_type &servaddr = *receiver_endpoint.data();
+ if (WSAConnect(_sock_fd, (const struct sockaddr *)&servaddr, sizeof(servaddr), NULL, NULL, NULL, NULL) != 0){
+ const DWORD error = WSAGetLastError();
+ closesocket(_sock_fd);
+ throw uhd::os_error(str(boost::format("WSAConnect() failed with error %d") % error));
+ }
+
+ //allocate re-usable managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+ _mrb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_mrb>(
+ new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size())
+ ));
+ }
+
+ //allocate re-usable managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+ _msb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_msb>(
+ new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), _sock_fd, get_send_frame_size())
+ ));
+ }
+ }
+
+ ~udp_zero_copy_wsa_impl(void){
+ closesocket(_sock_fd);
+ }
+
+ /*******************************************************************
+ * Receive implementation:
+ * Block on the managed buffer's get call and advance the index.
+ ******************************************************************/
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
+ }
+
+ size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+ size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+
+ /*******************************************************************
+ * Send implementation:
+ * Block on the managed buffer's get call and advance the index.
+ ******************************************************************/
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
+ }
+
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+ size_t get_send_frame_size(void) const {return _send_frame_size;}
+
+private:
+ //memory management -> buffers and fifos
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
+
+ //socket guts
+ SOCKET _sock_fd;
+};
+
+/***********************************************************************
+ * UDP zero copy make function
+ **********************************************************************/
+udp_zero_copy::sptr udp_zero_copy::make(
+ const std::string &addr,
+ const std::string &port,
+ const device_addr_t &hints
+){
+ return sptr(new udp_zero_copy_wsa_impl(addr, port, hints));
+}
diff --git a/host/lib/transport/udp_zero_copy.cpp b/host/lib/transport/udp_zero_copy.cpp
new file mode 100644
index 000000000..166177177
--- /dev/null
+++ b/host/lib/transport/udp_zero_copy.cpp
@@ -0,0 +1,304 @@
+//
+// Copyright 2010-2013 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include "udp_common.hpp"
+#include <uhd/transport/udp_zero_copy.hpp>
+#include <uhd/transport/udp_simple.hpp> //mtu
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/atomic.hpp>
+#include <boost/format.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp> //sleep
+#include <vector>
+
+using namespace uhd;
+using namespace uhd::transport;
+namespace asio = boost::asio;
+
+//A reasonable number of frames for send/recv and async/sync
+static const size_t DEFAULT_NUM_FRAMES = 32;
+
+/***********************************************************************
+ * Check registry for correct fast-path setting (windows only)
+ **********************************************************************/
+#ifdef HAVE_ATLBASE_H
+#define CHECK_REG_SEND_THRESH
+#include <atlbase.h> //CRegKey
+static void check_registry_for_fast_send_threshold(const size_t mtu){
+ static bool warned = false;
+ if (warned) return; //only allow one printed warning per process
+
+ CRegKey reg_key;
+ DWORD threshold = 1024; //system default when threshold is not specified
+ if (
+ reg_key.Open(HKEY_LOCAL_MACHINE, "System\\CurrentControlSet\\Services\\AFD\\Parameters", KEY_READ) != ERROR_SUCCESS or
+ reg_key.QueryDWORDValue("FastSendDatagramThreshold", threshold) != ERROR_SUCCESS or threshold < mtu
+ ){
+ UHD_MSG(warning) << boost::format(
+ "The MTU (%d) is larger than the FastSendDatagramThreshold (%d)!\n"
+ "This will negatively affect the transmit performance.\n"
+ "See the transport application notes for more detail.\n"
+ ) % mtu % threshold << std::endl;
+ warned = true;
+ }
+ reg_key.Close();
+}
+#endif /*HAVE_ATLBASE_H*/
+
+/***********************************************************************
+ * Reusable managed receiver buffer:
+ * - get_new performs the recv operation
+ **********************************************************************/
+class udp_zero_copy_asio_mrb : public managed_recv_buffer{
+public:
+ udp_zero_copy_asio_mrb(void *mem, int sock_fd, const size_t frame_size):
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
+
+ void release(void){
+ _claimer.release();
+ }
+
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ if (not _claimer.claim_with_wait(timeout)) return sptr();
+
+ #ifdef MSG_DONTWAIT //try a non-blocking recv() if supported
+ _len = ::recv(_sock_fd, (char *)_mem, _frame_size, MSG_DONTWAIT);
+ if (_len > 0){
+ index++; //advances the caller's buffer
+ return make(this, _mem, size_t(_len));
+ }
+ #endif
+
+ if (wait_for_recv_ready(_sock_fd, timeout)){
+ _len = ::recv(_sock_fd, (char *)_mem, _frame_size, 0);
+ index++; //advances the caller's buffer
+ return make(this, _mem, size_t(_len));
+ }
+
+ _claimer.release(); //undo claim
+ return sptr(); //null for timeout
+ }
+
+private:
+ void *_mem;
+ int _sock_fd;
+ size_t _frame_size;
+ ssize_t _len;
+ simple_claimer _claimer;
+};
+
+/***********************************************************************
+ * Reusable managed send buffer:
+ * - commit performs the send operation
+ **********************************************************************/
+class udp_zero_copy_asio_msb : public managed_send_buffer{
+public:
+ udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size):
+ _mem(mem), _sock_fd(sock_fd), _frame_size(frame_size) { /*NOP*/ }
+
+ void release(void){
+ //Retry logic because send may fail with ENOBUFS.
+ //This is known to occur at least on some OSX systems.
+ //But it should be safe to always check for the error.
+ while (true)
+ {
+ const ssize_t ret = ::send(_sock_fd, (const char *)_mem, size(), 0);
+ if (ret == ssize_t(size())) break;
+ if (ret == -1 and errno == ENOBUFS)
+ {
+ boost::this_thread::sleep(boost::posix_time::microseconds(1));
+ continue; //try to send again
+ }
+ UHD_ASSERT_THROW(ret == ssize_t(size()));
+ }
+ _claimer.release();
+ }
+
+ UHD_INLINE sptr get_new(const double timeout, size_t &index){
+ if (not _claimer.claim_with_wait(timeout)) return sptr();
+ index++; //advances the caller's buffer
+ return make(this, _mem, _frame_size);
+ }
+
+private:
+ void *_mem;
+ int _sock_fd;
+ size_t _frame_size;
+ simple_claimer _claimer;
+};
+
+/***********************************************************************
+ * Zero Copy UDP implementation with ASIO:
+ * This is the portable zero copy implementation for systems
+ * where a faster, platform specific solution is not available.
+ * However, it is not a true zero copy implementation as each
+ * send and recv requires a copy operation to/from userspace.
+ **********************************************************************/
+class udp_zero_copy_asio_impl : public udp_zero_copy{
+public:
+ typedef boost::shared_ptr<udp_zero_copy_asio_impl> sptr;
+
+ udp_zero_copy_asio_impl(
+ const std::string &addr,
+ const std::string &port,
+ const device_addr_t &hints
+ ):
+ _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
+ _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))),
+ _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))),
+ _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)),
+ _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)),
+ _next_recv_buff_index(0), _next_send_buff_index(0)
+ {
+ UHD_LOG << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
+
+ #ifdef CHECK_REG_SEND_THRESH
+ check_registry_for_fast_send_threshold(this->get_send_frame_size());
+ #endif /*CHECK_REG_SEND_THRESH*/
+
+ //resolve the address
+ asio::ip::udp::resolver resolver(_io_service);
+ asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port);
+ asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query);
+
+ //create, open, and connect the socket
+ _socket = socket_sptr(new asio::ip::udp::socket(_io_service));
+ _socket->open(asio::ip::udp::v4());
+ _socket->connect(receiver_endpoint);
+ _sock_fd = _socket->native();
+
+ //allocate re-usable managed receive buffers
+ for (size_t i = 0; i < get_num_recv_frames(); i++){
+ _mrb_pool.push_back(boost::make_shared<udp_zero_copy_asio_mrb>(
+ _recv_buffer_pool->at(i), _sock_fd, get_recv_frame_size()
+ ));
+ }
+
+ //allocate re-usable managed send buffers
+ for (size_t i = 0; i < get_num_send_frames(); i++){
+ _msb_pool.push_back(boost::make_shared<udp_zero_copy_asio_msb>(
+ _send_buffer_pool->at(i), _sock_fd, get_send_frame_size()
+ ));
+ }
+ }
+
+ //get size for internal socket buffer
+ template <typename Opt> size_t get_buff_size(void) const{
+ Opt option;
+ _socket->get_option(option);
+ return option.value();
+ }
+
+ //set size for internal socket buffer
+ template <typename Opt> size_t resize_buff(size_t num_bytes){
+ Opt option(num_bytes);
+ _socket->set_option(option);
+ return get_buff_size<Opt>();
+ }
+
+ /*******************************************************************
+ * Receive implementation:
+ * Block on the managed buffer's get call and advance the index.
+ ******************************************************************/
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ if (_next_recv_buff_index == _num_recv_frames) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(timeout, _next_recv_buff_index);
+ }
+
+ size_t get_num_recv_frames(void) const {return _num_recv_frames;}
+ size_t get_recv_frame_size(void) const {return _recv_frame_size;}
+
+ /*******************************************************************
+ * Send implementation:
+ * Block on the managed buffer's get call and advance the index.
+ ******************************************************************/
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0;
+ return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index);
+ }
+
+ size_t get_num_send_frames(void) const {return _num_send_frames;}
+ size_t get_send_frame_size(void) const {return _send_frame_size;}
+
+private:
+ //memory management -> buffers and fifos
+ const size_t _recv_frame_size, _num_recv_frames;
+ const size_t _send_frame_size, _num_send_frames;
+ buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool;
+ std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool;
+ size_t _next_recv_buff_index, _next_send_buff_index;
+
+ //asio guts -> socket and service
+ asio::io_service _io_service;
+ socket_sptr _socket;
+ int _sock_fd;
+};
+
+/***********************************************************************
+ * UDP zero copy make function
+ **********************************************************************/
+template<typename Opt> static void resize_buff_helper(
+ udp_zero_copy_asio_impl::sptr udp_trans,
+ const size_t target_size,
+ const std::string &name
+){
+ std::string help_message;
+ #if defined(UHD_PLATFORM_LINUX)
+ help_message = str(boost::format(
+ "Please run: sudo sysctl -w net.core.%smem_max=%d\n"
+ ) % ((name == "recv")?"r":"w") % target_size);
+ #endif /*defined(UHD_PLATFORM_LINUX)*/
+
+ //resize the buffer if size was provided
+ if (target_size > 0){
+ size_t actual_size = udp_trans->resize_buff<Opt>(target_size);
+ UHD_LOG << boost::format(
+ "Target %s sock buff size: %d bytes\n"
+ "Actual %s sock buff size: %d bytes"
+ ) % name % target_size % name % actual_size << std::endl;
+ if (actual_size < target_size) UHD_MSG(warning) << boost::format(
+ "The %s buffer could not be resized sufficiently.\n"
+ "Target sock buff size: %d bytes.\n"
+ "Actual sock buff size: %d bytes.\n"
+ "See the transport application notes on buffer resizing.\n%s"
+ ) % name % target_size % actual_size % help_message;
+ }
+}
+
+udp_zero_copy::sptr udp_zero_copy::make(
+ const std::string &addr,
+ const std::string &port,
+ const device_addr_t &hints
+){
+ udp_zero_copy_asio_impl::sptr udp_trans(
+ new udp_zero_copy_asio_impl(addr, port, hints)
+ );
+
+ //extract buffer size hints from the device addr
+ size_t recv_buff_size = size_t(hints.cast<double>("recv_buff_size", 0.0));
+ size_t send_buff_size = size_t(hints.cast<double>("send_buff_size", 0.0));
+
+ //call the helper to resize send and recv buffers
+ resize_buff_helper<asio::socket_base::receive_buffer_size>(udp_trans, recv_buff_size, "recv");
+ resize_buff_helper<asio::socket_base::send_buffer_size> (udp_trans, send_buff_size, "send");
+
+ return udp_trans;
+}
diff --git a/host/lib/transport/usb_dummy_impl.cpp b/host/lib/transport/usb_dummy_impl.cpp
new file mode 100644
index 000000000..7be753f76
--- /dev/null
+++ b/host/lib/transport/usb_dummy_impl.cpp
@@ -0,0 +1,38 @@
+//
+// Copyright 2010-2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/usb_device_handle.hpp>
+#include <uhd/transport/usb_control.hpp>
+#include <uhd/transport/usb_zero_copy.hpp>
+#include <uhd/exception.hpp>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+std::vector<usb_device_handle::sptr> usb_device_handle::get_device_list(boost::uint16_t, boost::uint16_t){
+ return std::vector<usb_device_handle::sptr>(); //empty list
+}
+
+usb_control::sptr usb_control::make(usb_device_handle::sptr, const size_t){
+ throw uhd::not_implemented_error("no usb support -> usb_control::make not implemented");
+}
+
+usb_zero_copy::sptr usb_zero_copy::make(
+ usb_device_handle::sptr, const size_t, const size_t, const size_t, const size_t, const device_addr_t &
+){
+ throw uhd::not_implemented_error("no usb support -> usb_zero_copy::make not implemented");
+}
diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp
new file mode 100644
index 000000000..d04244ca9
--- /dev/null
+++ b/host/lib/transport/usb_zero_copy_wrapper.cpp
@@ -0,0 +1,231 @@
+//
+// Copyright 2011-2012 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/usb_zero_copy.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/atomic.hpp>
+#include <boost/foreach.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/bind.hpp>
+#include <vector>
+#include <iostream>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+static const boost::posix_time::time_duration AUTOFLUSH_TIMEOUT(boost::posix_time::milliseconds(1));
+
+/***********************************************************************
+ * USB zero copy wrapper - managed receive buffer
+ **********************************************************************/
+class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{
+public:
+ usb_zero_copy_wrapper_mrb(void){/*NOP*/}
+
+ void release(void){
+ _mrb.reset(); //decrement ref count, other MRB's may hold a ref
+ _claimer.release();
+ }
+
+ UHD_INLINE sptr get_new(
+ managed_recv_buffer::sptr &mrb, size_t &offset_bytes,
+ const double timeout, size_t &index
+ ){
+ if (not mrb or not _claimer.claim_with_wait(timeout)) return sptr();
+
+ index++; //advances the caller's buffer
+
+ //hold a copy of the buffer shared pointer
+ _mrb = mrb;
+
+ //extract this packet's memory address and length in bytes
+ char *mem = mrb->cast<char *>() + offset_bytes;
+ const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem);
+ const size_t words32 = (uhd::wtohx(mem32[0]) & 0xffff); //length in words32 (from VRT header)
+ const size_t len = words32*sizeof(boost::uint32_t); //length in bytes
+
+ //check if this receive buffer has been exhausted
+ offset_bytes += len;
+ if (offset_bytes >= mrb->size()) mrb.reset(); //drop caller's ref
+ else if (uhd::wtohx(mem32[words32]) == 0) mrb.reset();
+
+ return make(this, mem, len);
+ }
+
+private:
+ managed_recv_buffer::sptr _mrb;
+ simple_claimer _claimer;
+};
+
+/***********************************************************************
+ * USB zero copy wrapper - managed send buffer
+ **********************************************************************/
+class usb_zero_copy_wrapper_msb : public managed_send_buffer{
+public:
+ usb_zero_copy_wrapper_msb(const usb_zero_copy::sptr internal, const size_t fragmentation_size):
+ _internal(internal), _fragmentation_size(fragmentation_size)
+ {
+ _ok_to_auto_flush = false;
+ _task = uhd::task::make(boost::bind(&usb_zero_copy_wrapper_msb::auto_flush, this));
+ }
+
+ ~usb_zero_copy_wrapper_msb(void)
+ {
+ //ensure the task has exited before anything auto deconstructs
+ _task.reset();
+ }
+
+ void release(void){
+ boost::mutex::scoped_lock lock(_mutex);
+ _ok_to_auto_flush = true;
+
+ //get a reference to the VITA header before incrementing
+ const boost::uint32_t vita_header = reinterpret_cast<const boost::uint32_t *>(_mem_buffer_tip)[0];
+
+ _bytes_in_buffer += size();
+ _mem_buffer_tip += size();
+
+ //extract VITA end of packet flag, we must force flush under eof conditions
+ const bool eop = (uhd::wtohx(vita_header) & (0x1 << 24)) != 0;
+ const bool full = _bytes_in_buffer >= (_last_send_buff->size() - _fragmentation_size);
+ if (eop or full){
+ _last_send_buff->commit(_bytes_in_buffer);
+ _last_send_buff.reset();
+
+ //notify the auto-flusher to restart its timed_wait
+ lock.unlock(); _cond.notify_one();
+ }
+ }
+
+ UHD_INLINE sptr get_new(const double timeout){
+ boost::mutex::scoped_lock lock(_mutex);
+ _ok_to_auto_flush = false;
+
+ if (not _last_send_buff){
+ _last_send_buff = _internal->get_send_buff(timeout);
+ if (not _last_send_buff) return sptr();
+ _mem_buffer_tip = _last_send_buff->cast<char *>();
+ _bytes_in_buffer = 0;
+ }
+
+ return make(this, _mem_buffer_tip, _fragmentation_size);
+ }
+
+private:
+ usb_zero_copy::sptr _internal;
+ const size_t _fragmentation_size;
+ managed_send_buffer::sptr _last_send_buff;
+ size_t _bytes_in_buffer;
+ char *_mem_buffer_tip;
+
+ //private variables for auto flusher
+ boost::mutex _mutex;
+ boost::condition_variable _cond;
+ uhd::task::sptr _task;
+ bool _ok_to_auto_flush;
+
+ /*!
+ * The auto flusher ensures that buffers are force committed when
+ * the user has not called get_new() within a certain time window.
+ */
+ void auto_flush(void)
+ {
+ boost::mutex::scoped_lock lock(_mutex);
+ const bool timeout = not _cond.timed_wait(lock, AUTOFLUSH_TIMEOUT);
+ if (timeout and _ok_to_auto_flush and _last_send_buff and _bytes_in_buffer != 0)
+ {
+ _last_send_buff->commit(_bytes_in_buffer);
+ _last_send_buff.reset();
+ }
+ }
+};
+
+/***********************************************************************
+ * USB zero copy wrapper implementation
+ **********************************************************************/
+class usb_zero_copy_wrapper : public usb_zero_copy{
+public:
+ usb_zero_copy_wrapper(sptr usb_zc, const size_t frame_boundary):
+ _internal_zc(usb_zc),
+ _frame_boundary(frame_boundary),
+ _next_recv_buff_index(0)
+ {
+ for (size_t i = 0; i < this->get_num_recv_frames(); i++){
+ _mrb_pool.push_back(boost::make_shared<usb_zero_copy_wrapper_mrb>());
+ }
+ _the_only_msb = boost::make_shared<usb_zero_copy_wrapper_msb>(usb_zc, frame_boundary);
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout){
+ //attempt to get a managed recv buffer
+ if (not _last_recv_buff){
+ _last_recv_buff = _internal_zc->get_recv_buff(timeout);
+ _last_recv_offset = 0; //reset offset into buffer
+ }
+
+ //get the buffer to be returned to the user
+ if (_next_recv_buff_index == _mrb_pool.size()) _next_recv_buff_index = 0;
+ return _mrb_pool[_next_recv_buff_index]->get_new(
+ _last_recv_buff, _last_recv_offset, timeout, _next_recv_buff_index
+ );
+ }
+
+ size_t get_num_recv_frames(void) const{
+ return _internal_zc->get_num_recv_frames();
+ }
+
+ size_t get_recv_frame_size(void) const{
+ return std::min(_frame_boundary, _internal_zc->get_recv_frame_size());
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout){
+ return _the_only_msb->get_new(timeout);
+ }
+
+ size_t get_num_send_frames(void) const{
+ return _internal_zc->get_num_send_frames();
+ }
+
+ size_t get_send_frame_size(void) const{
+ return std::min(_frame_boundary, _internal_zc->get_send_frame_size());
+ }
+
+private:
+ sptr _internal_zc;
+ size_t _frame_boundary;
+ std::vector<boost::shared_ptr<usb_zero_copy_wrapper_mrb> > _mrb_pool;
+ boost::shared_ptr<usb_zero_copy_wrapper_msb> _the_only_msb;
+
+ //state for last recv buffer to create multiple managed buffers
+ managed_recv_buffer::sptr _last_recv_buff;
+ size_t _last_recv_offset;
+ size_t _next_recv_buff_index;
+};
+
+/***********************************************************************
+ * USB zero copy wrapper factory function
+ **********************************************************************/
+usb_zero_copy::sptr usb_zero_copy::make_wrapper(
+ sptr usb_zc, size_t usb_frame_boundary
+){
+ return sptr(new usb_zero_copy_wrapper(usb_zc, usb_frame_boundary));
+}