diff options
| -rw-r--r-- | host/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/docs/transport.rst | 42 | ||||
| -rw-r--r-- | host/lib/convert/CMakeLists.txt | 10 | ||||
| -rw-r--r-- | host/lib/convert/convert_fc32_with_sse2.cpp (renamed from host/lib/convert/convert_with_sse2.cpp) | 6 | ||||
| -rw-r--r-- | host/lib/convert/convert_fc64_with_sse2.cpp | 212 | ||||
| -rw-r--r-- | host/lib/transport/CMakeLists.txt | 10 | ||||
| -rw-r--r-- | host/lib/transport/udp_wsa_zero_copy.cpp | 281 | ||||
| -rw-r--r-- | host/lib/usrp/dboard/db_wbx_common.cpp | 40 | ||||
| -rw-r--r-- | host/lib/usrp/dboard/db_wbx_simple.cpp | 12 | 
9 files changed, 585 insertions, 30 deletions
diff --git a/host/CMakeLists.txt b/host/CMakeLists.txt index 7fa6dd36b..12c1cc179 100644 --- a/host/CMakeLists.txt +++ b/host/CMakeLists.txt @@ -129,7 +129,7 @@ IF(MSVC)      ENDIF(BOOST_ALL_DYN_LINK)  ENDIF(MSVC) -SET(Boost_ADDITIONAL_VERSIONS "1.42.0" "1.42" "1.43.0" "1.43" "1.44.0" "1.44" "1.45.0" "1.45" "1.46.0" "1.46") +SET(Boost_ADDITIONAL_VERSIONS "1.42.0" "1.42" "1.43.0" "1.43" "1.44.0" "1.44" "1.45.0" "1.45" "1.46.0" "1.46" "1.47.0" "1.47")  FIND_PACKAGE(Boost 1.36 COMPONENTS ${BOOST_REQUIRED_COMPONENTS})  INCLUDE_DIRECTORIES(${Boost_INCLUDE_DIRS}) diff --git a/host/docs/transport.rst b/host/docs/transport.rst index e7c2f1885..b601cd8ff 100644 --- a/host/docs/transport.rst +++ b/host/docs/transport.rst @@ -19,7 +19,10 @@ The transport parameters are defined below for the various transports in the UHD  ------------------------------------------------------------------------  UDP transport (sockets)  ------------------------------------------------------------------------ -The UDP transport is implemented with standard user-space/Berkeley sockets. +The UDP transport is implemented with user-space sockets: + +* **UNIX:** standard Berkeley sockets API using send()/recv() +* **Windows:** Windows Sockets API (WSA) using overlapped IO  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  Transport parameters @@ -31,9 +34,14 @@ The following parameters can be used to alter the transport's default behavior:  * **send_frame_size:** The size of a single send buffer in bytes  * **num_send_frames:** The number of send buffers to allocate -**Note1:** num_recv_frames and num_send_frames do not affect performance. +**Note1:** +num_recv_frames does not affect performance (all platforms). + +**Note2:** +num_send_frames does not affect performance (UNIX only). -**Note2:** recv_frame_size and send_frame_size can be used to +**Note3:** +recv_frame_size and send_frame_size can be used to  increase or decrease the maximum number of samples per packet.  The frame sizes default to an MTU of 1472 bytes per IP/UDP packet,  and may be increased if permitted by your network hardware. @@ -65,6 +73,25 @@ The following parameters can be used to alter socket's buffer sizes:  **Note:** Large send buffers tend to decrease transmit performance.  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Latency Optimization +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Latency is a measurement of the time it takes a sample to travel between the host and device. +Most computer hardware and software is bandwidth optimized which may negatively affect latency. +If your application has strict latency requirements, please consider the following notes: + +**Note1:** +The time taken by the device to populate a packet is proportional to the sample rate. +Therefore, to improve receive latency, configure the transport for a smaller frame size. + +**Note2:** +For overall latency improvements, +look for "Interrupt Coalescing" settings for your OS and ethernet chipset. +It seems the Intel ethernet chipsets offer fine-grained control in Linux. +Also, consult: + +* http://publib.boulder.ibm.com/infocenter/pseries/v5r3/index.jsp?topic=/com.ibm.aix.prftungd/doc/prftungd/interrupt_coal.htm + +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  Linux specific notes  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^  On linux, the maximum buffer sizes are capped by the sysctl values @@ -77,6 +104,15 @@ To change the maximum values, run the following commands:  Set the values permanently by editing */etc/sysctl.conf* +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Windows specific notes +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +On Windows, it is important to change the default UDP behavior such that +1500 byte packets still travel through the fast path of the sockets stack. +FastSendDatagramThreshold registry key to change documented here: + +* http://www.microsoft.com/windows/windowsmedia/howto/articles/optimize_web.aspx#appendix_e +  ------------------------------------------------------------------------  USB transport (libusb)  ------------------------------------------------------------------------ diff --git a/host/lib/convert/CMakeLists.txt b/host/lib/convert/CMakeLists.txt index 5f05b0cb8..e6e8ec088 100644 --- a/host/lib/convert/CMakeLists.txt +++ b/host/lib/convert/CMakeLists.txt @@ -36,13 +36,15 @@ CHECK_INCLUDE_FILE_CXX(emmintrin.h HAVE_EMMINTRIN_H)  UNSET(CMAKE_REQUIRED_FLAGS)  IF(HAVE_EMMINTRIN_H) +    SET(convert_with_sse2_sources +        ${CMAKE_CURRENT_SOURCE_DIR}/convert_fc32_with_sse2.cpp +        ${CMAKE_CURRENT_SOURCE_DIR}/convert_fc64_with_sse2.cpp +    )      SET_SOURCE_FILES_PROPERTIES( -        ${CMAKE_CURRENT_SOURCE_DIR}/convert_with_sse2.cpp +        ${convert_with_sse2_sources}          PROPERTIES COMPILE_FLAGS "${EMMINTRIN_FLAGS}"      ) -    LIBUHD_APPEND_SOURCES( -        ${CMAKE_CURRENT_SOURCE_DIR}/convert_with_sse2.cpp -    ) +    LIBUHD_APPEND_SOURCES(${convert_with_sse2_sources})  ENDIF(HAVE_EMMINTRIN_H)  ######################################################################## diff --git a/host/lib/convert/convert_with_sse2.cpp b/host/lib/convert/convert_fc32_with_sse2.cpp index 0649baab4..676e1561c 100644 --- a/host/lib/convert/convert_with_sse2.cpp +++ b/host/lib/convert/convert_fc32_with_sse2.cpp @@ -1,5 +1,5 @@  // -// Copyright 2011-2011 Ettus Research LLC +// Copyright 2011 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -33,7 +33,7 @@ DECLARE_CONVERTER(convert_fc32_1_to_item32_1_nswap, PRIORITY_CUSTOM){          __m128 tmplo = _mm_load ## _al_ ## ps(reinterpret_cast<const float *>(input+i+0)); \          __m128 tmphi = _mm_load ## _al_ ## ps(reinterpret_cast<const float *>(input+i+2)); \                                                                          \ -        /* convert and scale */ \ +        /* convert and scale */                                         \          __m128i tmpilo = _mm_cvtps_epi32(_mm_mul_ps(tmplo, scalar));    \          __m128i tmpihi = _mm_cvtps_epi32(_mm_mul_ps(tmphi, scalar));    \                                                                          \ @@ -76,7 +76,7 @@ DECLARE_CONVERTER(convert_fc32_1_to_item32_1_bswap, PRIORITY_CUSTOM){          __m128 tmplo = _mm_load ## _al_ ## ps(reinterpret_cast<const float *>(input+i+0)); \          __m128 tmphi = _mm_load ## _al_ ## ps(reinterpret_cast<const float *>(input+i+2)); \                                                                          \ -        /* convert and scale */ \ +        /* convert and scale */                                         \          __m128i tmpilo = _mm_cvtps_epi32(_mm_mul_ps(tmplo, scalar));    \          __m128i tmpihi = _mm_cvtps_epi32(_mm_mul_ps(tmphi, scalar));    \                                                                          \ diff --git a/host/lib/convert/convert_fc64_with_sse2.cpp b/host/lib/convert/convert_fc64_with_sse2.cpp new file mode 100644 index 000000000..4d28396a4 --- /dev/null +++ b/host/lib/convert/convert_fc64_with_sse2.cpp @@ -0,0 +1,212 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include "convert_common.hpp" +#include <uhd/utils/byteswap.hpp> +#include <emmintrin.h> + +using namespace uhd::convert; + +DECLARE_CONVERTER(convert_fc64_1_to_item32_1_nswap, PRIORITY_CUSTOM){ +    const fc64_t *input = reinterpret_cast<const fc64_t *>(inputs[0]); +    item32_t *output = reinterpret_cast<item32_t *>(outputs[0]); + +    const __m128d scalar = _mm_set1_pd(scale_factor); + +    #define convert_fc64_1_to_item32_1_nswap_guts(_al_)                 \ +    for (; i+4 < nsamps; i+=4){                                         \ +        /* load from input */                                           \ +        __m128d tmp0 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+0)); \ +        __m128d tmp1 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+1)); \ +        __m128d tmp2 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+2)); \ +        __m128d tmp3 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+3)); \ +                                                                        \ +        /* convert and scale */                                         \ +        __m128i tmpi0 = _mm_cvttpd_epi32(_mm_mul_pd(tmp0, scalar));     \ +        __m128i tmpi1 = _mm_cvttpd_epi32(_mm_mul_pd(tmp1, scalar));     \ +        __m128i tmpilo = _mm_unpacklo_epi64(tmpi0, tmpi1);              \ +        __m128i tmpi2 = _mm_cvttpd_epi32(_mm_mul_pd(tmp2, scalar));     \ +        __m128i tmpi3 = _mm_cvttpd_epi32(_mm_mul_pd(tmp3, scalar));     \ +        __m128i tmpihi = _mm_unpacklo_epi64(tmpi2, tmpi3);              \ +                                                                        \ +        /* pack + swap 16-bit pairs */                                  \ +        __m128i tmpi = _mm_packs_epi32(tmpilo, tmpihi);                 \ +        tmpi = _mm_shufflelo_epi16(tmpi, _MM_SHUFFLE(2, 3, 0, 1));      \ +        tmpi = _mm_shufflehi_epi16(tmpi, _MM_SHUFFLE(2, 3, 0, 1));      \ +                                                                        \ +        /* store to output */                                           \ +        _mm_storeu_si128(reinterpret_cast<__m128i *>(output+i), tmpi);  \ +    }                                                                   \ + +    size_t i = 0; + +    //dispatch according to alignment +    if ((size_t(input) & 0xf) == 0){ +        convert_fc64_1_to_item32_1_nswap_guts(_) +    } +    else{ +        convert_fc64_1_to_item32_1_nswap_guts(u_) +    } + +    //convert remainder +    for (; i < nsamps; i++){ +        output[i] = fc64_to_item32(input[i], scale_factor); +    } +} + +DECLARE_CONVERTER(convert_fc64_1_to_item32_1_bswap, PRIORITY_CUSTOM){ +    const fc64_t *input = reinterpret_cast<const fc64_t *>(inputs[0]); +    item32_t *output = reinterpret_cast<item32_t *>(outputs[0]); + +    const __m128d scalar = _mm_set1_pd(scale_factor); + +    #define convert_fc64_1_to_item32_1_bswap_guts(_al_)                 \ +    for (; i+4 < nsamps; i+=4){                                         \ +        /* load from input */                                           \ +        __m128d tmp0 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+0)); \ +        __m128d tmp1 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+1)); \ +        __m128d tmp2 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+2)); \ +        __m128d tmp3 = _mm_load ## _al_ ## pd(reinterpret_cast<const double *>(input+i+3)); \ +                                                                        \ +        /* convert and scale */                                         \ +        __m128i tmpi0 = _mm_cvttpd_epi32(_mm_mul_pd(tmp0, scalar));     \ +        __m128i tmpi1 = _mm_cvttpd_epi32(_mm_mul_pd(tmp1, scalar));     \ +        __m128i tmpilo = _mm_unpacklo_epi64(tmpi0, tmpi1);              \ +        __m128i tmpi2 = _mm_cvttpd_epi32(_mm_mul_pd(tmp2, scalar));     \ +        __m128i tmpi3 = _mm_cvttpd_epi32(_mm_mul_pd(tmp3, scalar));     \ +        __m128i tmpihi = _mm_unpacklo_epi64(tmpi2, tmpi3);              \ +                                                                        \ +        /* pack + byteswap -> byteswap 16 bit words */                  \ +        __m128i tmpi = _mm_packs_epi32(tmpilo, tmpihi);                 \ +        tmpi = _mm_or_si128(_mm_srli_epi16(tmpi, 8), _mm_slli_epi16(tmpi, 8)); \ +                                                                        \ +        /* store to output */                                           \ +        _mm_storeu_si128(reinterpret_cast<__m128i *>(output+i), tmpi);  \ +    }                                                                   \ + +    size_t i = 0; + +    //dispatch according to alignment +    if ((size_t(input) & 0xf) == 0){ +        convert_fc64_1_to_item32_1_bswap_guts(_) +    } +    else{ +        convert_fc64_1_to_item32_1_bswap_guts(u_) +    } + +    //convert remainder +    for (; i < nsamps; i++){ +        output[i] = uhd::byteswap(fc64_to_item32(input[i], scale_factor)); +    } +} + +DECLARE_CONVERTER(convert_item32_1_to_fc64_1_nswap, PRIORITY_CUSTOM){ +    const item32_t *input = reinterpret_cast<const item32_t *>(inputs[0]); +    fc64_t *output = reinterpret_cast<fc64_t *>(outputs[0]); + +    const __m128d scalar = _mm_set1_pd(scale_factor/(1 << 16)); +    const __m128i zeroi = _mm_setzero_si128(); + +    #define convert_item32_1_to_fc64_1_nswap_guts(_al_)                 \ +    for (; i+4 < nsamps; i+=4){                                         \ +        /* load from input */                                           \ +        __m128i tmpi = _mm_loadu_si128(reinterpret_cast<const __m128i *>(input+i)); \ +                                                                        \ +        /* unpack + swap 16-bit pairs */                                \ +        tmpi = _mm_shufflelo_epi16(tmpi, _MM_SHUFFLE(2, 3, 0, 1));      \ +        tmpi = _mm_shufflehi_epi16(tmpi, _MM_SHUFFLE(2, 3, 0, 1));      \ +        __m128i tmpilo = _mm_unpacklo_epi16(zeroi, tmpi); /* value in upper 16 bits */ \ +        __m128i tmpihi = _mm_unpackhi_epi16(zeroi, tmpi);               \ +                                                                        \ +        /* convert and scale */                                         \ +        __m128d tmp0 = _mm_mul_pd(_mm_cvtepi32_pd(tmpilo), scalar);     \ +        tmpilo = _mm_unpackhi_epi64(tmpilo, zeroi);                     \ +        __m128d tmp1 = _mm_mul_pd(_mm_cvtepi32_pd(tmpilo), scalar);     \ +        __m128d tmp2 = _mm_mul_pd(_mm_cvtepi32_pd(tmpihi), scalar);     \ +        tmpihi = _mm_unpackhi_epi64(tmpihi, zeroi);                     \ +        __m128d tmp3 = _mm_mul_pd(_mm_cvtepi32_pd(tmpihi), scalar);     \ +                                                                        \ +        /* store to output */                                           \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+0), tmp0); \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+1), tmp1); \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+2), tmp2); \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+3), tmp3); \ +    }                                                                   \ + +    size_t i = 0; + +    //dispatch according to alignment +    if ((size_t(output) & 0xf) == 0){ +        convert_item32_1_to_fc64_1_nswap_guts(_) +    } +    else{ +        convert_item32_1_to_fc64_1_nswap_guts(u_) +    } + +    //convert remainder +    for (; i < nsamps; i++){ +        output[i] = item32_to_fc64(input[i], scale_factor); +    } +} + +DECLARE_CONVERTER(convert_item32_1_to_fc64_1_bswap, PRIORITY_CUSTOM){ +    const item32_t *input = reinterpret_cast<const item32_t *>(inputs[0]); +    fc64_t *output = reinterpret_cast<fc64_t *>(outputs[0]); + +    const __m128d scalar = _mm_set1_pd(scale_factor/(1 << 16)); +    const __m128i zeroi = _mm_setzero_si128(); + +    #define convert_item32_1_to_fc64_1_bswap_guts(_al_)                 \ +    for (; i+4 < nsamps; i+=4){                                         \ +        /* load from input */                                           \ +        __m128i tmpi = _mm_loadu_si128(reinterpret_cast<const __m128i *>(input+i)); \ +                                                                        \ +        /* byteswap + unpack -> byteswap 16 bit words */                \ +        tmpi = _mm_or_si128(_mm_srli_epi16(tmpi, 8), _mm_slli_epi16(tmpi, 8)); \ +        __m128i tmpilo = _mm_unpacklo_epi16(zeroi, tmpi); /* value in upper 16 bits */ \ +        __m128i tmpihi = _mm_unpackhi_epi16(zeroi, tmpi);               \ +                                                                        \ +        /* convert and scale */                                         \ +        __m128d tmp0 = _mm_mul_pd(_mm_cvtepi32_pd(tmpilo), scalar);     \ +        tmpilo = _mm_unpackhi_epi64(tmpilo, zeroi);                     \ +        __m128d tmp1 = _mm_mul_pd(_mm_cvtepi32_pd(tmpilo), scalar);     \ +        __m128d tmp2 = _mm_mul_pd(_mm_cvtepi32_pd(tmpihi), scalar);     \ +        tmpihi = _mm_unpackhi_epi64(tmpihi, zeroi);                     \ +        __m128d tmp3 = _mm_mul_pd(_mm_cvtepi32_pd(tmpihi), scalar);     \ +                                                                        \ +        /* store to output */                                           \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+0), tmp0); \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+1), tmp1); \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+2), tmp2); \ +        _mm_store ## _al_ ## pd(reinterpret_cast<double *>(output+i+3), tmp3); \ +    }                                                                   \ + +    size_t i = 0; + +    //dispatch according to alignment +    if ((size_t(output) & 0xf) == 0){ +        convert_item32_1_to_fc64_1_bswap_guts(_) +    } +    else{ +        convert_item32_1_to_fc64_1_bswap_guts(u_) +    } + +    //convert remainder +    for (; i < nsamps; i++){ +        output[i] = item32_to_fc64(uhd::byteswap(input[i]), scale_factor); +    } +} diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index b1821956c..866ade75f 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -79,6 +79,15 @@ SET_SOURCE_FILES_PROPERTIES(      PROPERTIES COMPILE_DEFINITIONS "${IF_ADDRS_DEFS}"  ) +######################################################################## +# Setup UDP +######################################################################## +IF(WIN32) +    LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_wsa_zero_copy.cpp) +ELSE() +    LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp) +ENDIF() +  #On windows, the boost asio implementation uses the winsock2 library.  #Note: we exclude the .lib extension for cygwin and mingw platforms.  IF(WIN32) @@ -97,6 +106,5 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp -    ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/usb_zero_copy_wrapper.cpp  ) diff --git a/host/lib/transport/udp_wsa_zero_copy.cpp b/host/lib/transport/udp_wsa_zero_copy.cpp new file mode 100644 index 000000000..ccfed38ea --- /dev/null +++ b/host/lib/transport/udp_wsa_zero_copy.cpp @@ -0,0 +1,281 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include "udp_common.hpp" +#include <uhd/transport/udp_zero_copy.hpp> +#include <uhd/transport/udp_simple.hpp> //mtu +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/log.hpp> +#include <boost/format.hpp> +#include <vector> + +using namespace uhd; +using namespace uhd::transport; +namespace asio = boost::asio; + +//A reasonable number of frames for send/recv and async/sync +static const size_t DEFAULT_NUM_FRAMES = 32; + +/*********************************************************************** + * Static initialization to take care of WSA init and cleanup + **********************************************************************/ +struct uhd_wsa_control{ +    uhd_wsa_control(void){ +        WSADATA wsaData; +        WSAStartup(MAKEWORD(2, 2), &wsaData); /*windows socket startup */ +    } + +    ~uhd_wsa_control(void){ +        WSACleanup(); +    } +}; + +/*********************************************************************** + * Reusable managed receiver buffer: + *  - Initialize with memory and a release callback. + *  - Call get new with a length in bytes to re-use. + **********************************************************************/ +class udp_zero_copy_asio_mrb : public managed_recv_buffer{ +public: +    udp_zero_copy_asio_mrb(void *mem, bounded_buffer<udp_zero_copy_asio_mrb *> &pending): +        _mem(mem), _len(0), _pending(pending){/* NOP */} + +    void release(void){ +        if (_len == 0) return; +        _pending.push_with_haste(this); +        _len = 0; +    } + +    sptr get_new(size_t len){ +        _len = len; +        return make_managed_buffer(this); +    } + +    template <class T> T cast(void) const{return static_cast<T>(_mem);} + +private: +    const void *get_buff(void) const{return _mem;} +    size_t get_size(void) const{return _len;} + +    void *_mem; +    size_t _len; +    bounded_buffer<udp_zero_copy_asio_mrb *> &_pending; +}; + +/*********************************************************************** + * Reusable managed send buffer: + *  - committing the buffer calls the asynchronous socket send + *  - getting a new buffer performs the blocking wait for completion + **********************************************************************/ +class udp_zero_copy_asio_msb : public managed_send_buffer{ +public: +    udp_zero_copy_asio_msb(void *mem, int sock_fd, const size_t frame_size): +        _sock_fd(sock_fd), _frame_size(frame_size), _committed(false) +    { +        _wsa_buff.buf = reinterpret_cast<char *>(mem); +        ZeroMemory(&_overlapped, sizeof(_overlapped)); +        _overlapped.hEvent = WSACreateEvent(); +        UHD_ASSERT_THROW(_overlapped.hEvent != WSA_INVALID_EVENT); +        this->commit(0); //makes buffer available via get_new +    } + +    ~udp_zero_copy_asio_msb(void){ +        WSACloseEvent(_overlapped.hEvent); +    } + +    UHD_INLINE void commit(size_t len){ +        if (_committed) return; +        _committed = true; +        _wsa_buff.len = len; +        if (len == 0) WSASetEvent(_overlapped.hEvent); +        else WSASend(_sock_fd, &_wsa_buff, 1, NULL, 0, &_overlapped, NULL); +    } + +    UHD_INLINE sptr get_new(const double timeout, size_t &index){ +        const DWORD result = WSAWaitForMultipleEvents( +            1, &_overlapped.hEvent, true, DWORD(timeout*1000), true +        ); +        if (result == WSA_WAIT_TIMEOUT) return managed_send_buffer::sptr(); +        index++; //advances the caller's buffer + +        WSAResetEvent(_overlapped.hEvent); +        _committed = false; +        _wsa_buff.len = _frame_size; +        return make_managed_buffer(this); +    } + +private: +    void *get_buff(void) const{return _wsa_buff.buf;} +    size_t get_size(void) const{return _wsa_buff.len;} + +    int _sock_fd; +    const size_t _frame_size; +    bool _committed; +    WSAOVERLAPPED _overlapped; +    WSABUF _wsa_buff; +}; + +/*********************************************************************** + * Zero Copy UDP implementation with WSA: + * + *   This is not a true zero copy implementation as each + *   send and recv requires a copy operation to/from userspace. + * + *   For receive, use a blocking recv() call on the socket. + *   This has better performance than the overlapped IO. + *   For send, use overlapped IO to submit async sends. + **********************************************************************/ +class udp_zero_copy_wsa_impl : public udp_zero_copy{ +public: +    typedef boost::shared_ptr<udp_zero_copy_wsa_impl> sptr; + +    udp_zero_copy_wsa_impl( +        const std::string &addr, +        const std::string &port, +        const device_addr_t &hints +    ): +        _recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))), +        _num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_FRAMES))), +        _send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))), +        _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_FRAMES))), +        _recv_buffer_pool(buffer_pool::make(_num_recv_frames, _recv_frame_size)), +        _send_buffer_pool(buffer_pool::make(_num_send_frames, _send_frame_size)), +        _pending_recv_buffs(_num_recv_frames), +        _next_send_buff_index(0) +    { +        UHD_MSG(status) << boost::format("Creating WSA UDP transport for %s:%s") % addr % port << std::endl; +        static uhd_wsa_control uhd_wsa; //makes wsa start happen via lazy initialization + +        UHD_ASSERT_THROW(_num_send_frames <= WSA_MAXIMUM_WAIT_EVENTS); + +        //resolve the address +        asio::io_service io_service; +        asio::ip::udp::resolver resolver(io_service); +        asio::ip::udp::resolver::query query(asio::ip::udp::v4(), addr, port); +        asio::ip::udp::endpoint receiver_endpoint = *resolver.resolve(query); + +        //create the socket +        _sock_fd = WSASocket(AF_INET, SOCK_DGRAM, IPPROTO_UDP, NULL, 0, WSA_FLAG_OVERLAPPED); +        if (_sock_fd == INVALID_SOCKET){ +            const DWORD error = WSAGetLastError(); +            throw uhd::os_error(str(boost::format("WSASocket() failed with error %d") % error)); +        } + +        //set the socket non-blocking for recv +        u_long mode = 1; +        ioctlsocket(_sock_fd, FIONBIO, &mode); + +        //resize the socket buffers +        const int recv_buff_size = int(hints.cast<double>("recv_buff_size", 0.0)); +        const int send_buff_size = int(hints.cast<double>("send_buff_size", 0.0)); +        if (recv_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_RCVBUF, (const char *)&recv_buff_size, sizeof(recv_buff_size)); +        if (send_buff_size > 0) setsockopt(_sock_fd, SOL_SOCKET, SO_SNDBUF, (const char *)&send_buff_size, sizeof(send_buff_size)); + +        //connect the socket so we can send/recv +        const asio::ip::udp::endpoint::data_type &servaddr = *receiver_endpoint.data(); +        if (WSAConnect(_sock_fd, (const struct sockaddr *)&servaddr, sizeof(servaddr), NULL, NULL, NULL, NULL) != 0){ +            const DWORD error = WSAGetLastError(); +            closesocket(_sock_fd); +            throw uhd::os_error(str(boost::format("WSAConnect() failed with error %d") % error)); +        } + +        //allocate re-usable managed receive buffers +        for (size_t i = 0; i < get_num_recv_frames(); i++){ +            _mrb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_mrb>( +                new udp_zero_copy_asio_mrb(_recv_buffer_pool->at(i), _pending_recv_buffs) +            )); +            _pending_recv_buffs.push_with_haste(_mrb_pool.back().get()); +        } + +        //allocate re-usable managed send buffers +        for (size_t i = 0; i < get_num_send_frames(); i++){ +            _msb_pool.push_back(boost::shared_ptr<udp_zero_copy_asio_msb>( +                new udp_zero_copy_asio_msb(_send_buffer_pool->at(i), _sock_fd, get_send_frame_size()) +            )); +        } +    } + +    ~udp_zero_copy_wsa_impl(void){ +        closesocket(_sock_fd); +    } + +    /******************************************************************* +     * Receive implementation: +     * +     * Perform a non-blocking receive for performance, +     * and then fall back to a blocking receive with timeout. +     * Return the managed receive buffer with the new length. +     * When the caller is finished with the managed buffer, +     * the managed receive buffer is released back into the queue. +     ******************************************************************/ +    managed_recv_buffer::sptr get_recv_buff(double timeout){ +        udp_zero_copy_asio_mrb *mrb = NULL; +        if (_pending_recv_buffs.pop_with_timed_wait(mrb, timeout)){ + +            ssize_t ret = ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0); +            if (ret > 0) return mrb->get_new(ret); + +            if (wait_for_recv_ready(_sock_fd, timeout)) return mrb->get_new( +                ::recv(_sock_fd, mrb->cast<char *>(), _recv_frame_size, 0) +            ); + +            _pending_recv_buffs.push_with_haste(mrb); //timeout: return the managed buffer to the queue +        } +        return managed_recv_buffer::sptr(); +    } + +    size_t get_num_recv_frames(void) const {return _num_recv_frames;} +    size_t get_recv_frame_size(void) const {return _recv_frame_size;} + +    /******************************************************************* +     * Send implementation: +     * Block on the managed buffer's get call and advance the index. +     ******************************************************************/ +    managed_send_buffer::sptr get_send_buff(double timeout){ +        if (_next_send_buff_index == _num_send_frames) _next_send_buff_index = 0; +        return _msb_pool[_next_send_buff_index]->get_new(timeout, _next_send_buff_index); +    } + +    size_t get_num_send_frames(void) const {return _num_send_frames;} +    size_t get_send_frame_size(void) const {return _send_frame_size;} + +private: +    //memory management -> buffers and fifos +    const size_t _recv_frame_size, _num_recv_frames; +    const size_t _send_frame_size, _num_send_frames; +    buffer_pool::sptr _recv_buffer_pool, _send_buffer_pool; +    std::vector<boost::shared_ptr<udp_zero_copy_asio_msb> > _msb_pool; +    std::vector<boost::shared_ptr<udp_zero_copy_asio_mrb> > _mrb_pool; +    bounded_buffer<udp_zero_copy_asio_mrb *> _pending_recv_buffs; +    size_t _next_send_buff_index; + +    //socket guts +    SOCKET                  _sock_fd; +}; + +/*********************************************************************** + * UDP zero copy make function + **********************************************************************/ +udp_zero_copy::sptr udp_zero_copy::make( +    const std::string &addr, +    const std::string &port, +    const device_addr_t &hints +){ +    return sptr(new udp_zero_copy_wsa_impl(addr, port, hints)); +} diff --git a/host/lib/usrp/dboard/db_wbx_common.cpp b/host/lib/usrp/dboard/db_wbx_common.cpp index 81aba6426..1089dc7c2 100644 --- a/host/lib/usrp/dboard/db_wbx_common.cpp +++ b/host/lib/usrp/dboard/db_wbx_common.cpp @@ -44,7 +44,7 @@  #define TX_ATTN_MASK    (TX_ATTN_16|TX_ATTN_8|TX_ATTN_4|TX_ATTN_2|TX_ATTN_1)      // valid bits of TX Attenuator Control  // Mixer functions -#define TX_MIXER_ENB    (TXMOD_EN|ADF4350_PDBRF) +#define TX_MIXER_ENB    (TXMOD_EN|ADF4350_PDBRF)    // for v3, TXMOD_EN tied to ADF4350_PDBRF rather than separate  #define TX_MIXER_DIS    0  #define RX_MIXER_ENB    (RXBB_PDB|ADF4350_PDBRF) @@ -101,18 +101,19 @@ wbx_base::wbx_base(ctor_args_t args) : xcvr_dboard_base(args){      //v3 has different io bits for attenuator control      int v3_iobits = is_v3() ? TX_ATTN_MASK : ADF4350_CE; +    int v3_tx_mod = is_v3() ? ADF4350_PDBRF : TXMOD_EN|ADF4350_PDBRF;      //set the gpio directions and atr controls -    this->get_iface()->set_pin_ctrl(dboard_iface::UNIT_TX, TXMOD_EN|ADF4350_PDBRF); +    this->get_iface()->set_pin_ctrl(dboard_iface::UNIT_TX, v3_tx_mod);      this->get_iface()->set_pin_ctrl(dboard_iface::UNIT_RX, RXBB_PDB|ADF4350_PDBRF); -    this->get_iface()->set_gpio_ddr(dboard_iface::UNIT_TX, TX_PUP_5V|TX_PUP_3V|TXMOD_EN|ADF4350_PDBRF|v3_iobits); +    this->get_iface()->set_gpio_ddr(dboard_iface::UNIT_TX, TX_PUP_5V|TX_PUP_3V|v3_tx_mod|v3_iobits);      this->get_iface()->set_gpio_ddr(dboard_iface::UNIT_RX, RX_PUP_5V|RX_PUP_3V|ADF4350_CE|RXBB_PDB|ADF4350_PDBRF|RX_ATTN_MASK);      //setup ATR for the mixer enables (always enabled to prevent phase slip between bursts) -    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_IDLE,        TX_MIXER_ENB, TX_MIXER_DIS | TX_MIXER_ENB); -    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_RX_ONLY,     TX_MIXER_ENB, TX_MIXER_DIS | TX_MIXER_ENB); -    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_TX_ONLY,     TX_MIXER_ENB, TX_MIXER_DIS | TX_MIXER_ENB); -    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_FULL_DUPLEX, TX_MIXER_ENB, TX_MIXER_DIS | TX_MIXER_ENB); +    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_IDLE,        v3_tx_mod, TX_MIXER_DIS | v3_tx_mod); +    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_RX_ONLY,     v3_tx_mod, TX_MIXER_DIS | v3_tx_mod); +    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_TX_ONLY,     v3_tx_mod, TX_MIXER_DIS | v3_tx_mod); +    this->get_iface()->set_atr_reg(dboard_iface::UNIT_TX, dboard_iface::ATR_REG_FULL_DUPLEX, v3_tx_mod, TX_MIXER_DIS | v3_tx_mod);      this->get_iface()->set_atr_reg(dboard_iface::UNIT_RX, dboard_iface::ATR_REG_IDLE,        RX_MIXER_ENB, RX_MIXER_DIS | RX_MIXER_ENB);      this->get_iface()->set_atr_reg(dboard_iface::UNIT_RX, dboard_iface::ATR_REG_TX_ONLY,     RX_MIXER_ENB, RX_MIXER_DIS | RX_MIXER_ENB); @@ -190,7 +191,7 @@ static int tx_pga0_gain_to_iobits(double &gain){      double attn = wbx_v3_tx_gain_ranges["PGA0"].stop() - gain;      //calculate the attenuation -    int attn_code = boost::math::iround(attn*2); +    int attn_code = boost::math::iround(attn);      int iobits = (              (attn_code & 16 ? 0 : TX_ATTN_16) |              (attn_code &  8 ? 0 : TX_ATTN_8) | @@ -235,11 +236,11 @@ void wbx_base::set_tx_gain(double gain, const std::string &name){      if (is_v3()) {          assert_has(wbx_v3_tx_gain_ranges.keys(), name, "wbx tx gain name");          if(name == "PGA0"){ -            double dac_volts = tx_pga0_gain_to_iobits(gain); +            boost::uint16_t io_bits = tx_pga0_gain_to_iobits(gain);              _tx_gains[name] = gain; -            //write the new voltage to the aux dac -            this->get_iface()->write_aux_dac(dboard_iface::UNIT_TX, dboard_iface::AUX_DAC_A, dac_volts); +            //write the new gain to tx gpio outputs +            this->get_iface()->set_gpio_out(dboard_iface::UNIT_TX, io_bits, TX_ATTN_MASK);          }          else UHD_THROW_INVALID_CODE_PATH();      } @@ -446,7 +447,7 @@ bool wbx_base::get_locked(dboard_iface::unit_t unit){  }  bool wbx_base::is_v3(void){ -    return get_rx_id() == 0x057; +    return get_rx_id().to_uint16() == 0x057;  }  /*********************************************************************** @@ -569,12 +570,21 @@ void wbx_base::tx_get(const wax::obj &key_, wax::obj &val){          return;      case SUBDEV_PROP_GAIN_RANGE: -        assert_has(wbx_tx_gain_ranges.keys(), key.name, "wbx tx gain name"); -        val = wbx_tx_gain_ranges[key.name]; +        if (is_v3()) { +            assert_has(wbx_v3_tx_gain_ranges.keys(), key.name, "wbx tx gain name"); +            val = wbx_v3_tx_gain_ranges[key.name]; +        } +        else { +            assert_has(wbx_tx_gain_ranges.keys(), key.name, "wbx tx gain name"); +            val = wbx_tx_gain_ranges[key.name]; +        }          return;      case SUBDEV_PROP_GAIN_NAMES: -        val = prop_names_t(wbx_tx_gain_ranges.keys()); +        if (is_v3()) +            val = prop_names_t(wbx_v3_tx_gain_ranges.keys()); +        else +            val = prop_names_t(wbx_tx_gain_ranges.keys());          return;      case SUBDEV_PROP_FREQ: diff --git a/host/lib/usrp/dboard/db_wbx_simple.cpp b/host/lib/usrp/dboard/db_wbx_simple.cpp index ae466b08a..990bacbc8 100644 --- a/host/lib/usrp/dboard/db_wbx_simple.cpp +++ b/host/lib/usrp/dboard/db_wbx_simple.cpp @@ -16,7 +16,7 @@  //  // Antenna constants -#define ANTSW_IO        ((1 << 5)|(1 << 15))    // on UNIT_TX, 0 = TX, 1 = RX, on UNIT_RX 0 = main ant, 1 = RX2 +#define ANTSW_IO        ((1 << 15))             // on UNIT_TX, 0 = TX, 1 = RX, on UNIT_RX 0 = main ant, 1 = RX2  #define ANT_TX          0                       //the tx line is transmitting  #define ANT_RX          ANTSW_IO                //the tx line is receiving  #define ANT_TXRX        0                       //the rx line is on txrx @@ -149,7 +149,10 @@ void wbx_simple::rx_get(const wax::obj &key_, wax::obj &val){      //handle the get request conditioned on the key      switch(key.as<subdev_prop_t>()){      case SUBDEV_PROP_NAME: -        val = std::string("WBX RX + Simple GDB"); +        if (is_v3()) +            val = std::string("WBX v3 RX + Simple GDB"); +        else +            val = std::string("WBX RX + Simple GDB");          return;      case SUBDEV_PROP_FREQ: @@ -203,7 +206,10 @@ void wbx_simple::tx_get(const wax::obj &key_, wax::obj &val){      //handle the get request conditioned on the key      switch(key.as<subdev_prop_t>()){      case SUBDEV_PROP_NAME: -        val = std::string("WBX TX + Simple GDB"); +        if (is_v3()) +            val = std::string("WBX v3 TX + Simple GDB"); +        else +            val = std::string("WBX TX + Simple GDB");          return;      case SUBDEV_PROP_FREQ:  | 
