aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt18
-rw-r--r--host/lib/transport/nirio/lvbitx/CMakeLists.txt5
-rw-r--r--host/lib/transport/nirio/rpc/rpc_client.cpp24
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp47
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp35
-rw-r--r--host/lib/transport/zero_copy_recv_offload.cpp158
6 files changed, 208 insertions, 79 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index 6abc399b4..db21b9f8e 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -22,17 +22,15 @@
########################################################################
# Include subdirectories (different than add)
########################################################################
-INCLUDE_SUBDIRECTORY(nirio)
+IF(ENABLE_X300)
+ INCLUDE_SUBDIRECTORY(nirio)
+ENDIF(ENABLE_X300)
########################################################################
# Setup libusb
########################################################################
-MESSAGE(STATUS "")
-FIND_PACKAGE(USB1)
-
-LIBUHD_REGISTER_COMPONENT("USB" ENABLE_USB ON "ENABLE_LIBUHD;LIBUSB_FOUND" OFF OFF)
-
IF(ENABLE_USB)
+ MESSAGE(STATUS "")
MESSAGE(STATUS "USB support enabled via libusb.")
INCLUDE_DIRECTORIES(${LIBUSB_INCLUDE_DIRS})
LIBUHD_APPEND_LIBS(${LIBUSB_LIBRARIES})
@@ -124,14 +122,20 @@ LIBUHD_PYTHON_GEN_SOURCE(
)
LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/zero_copy_recv_offload.cpp
${CMAKE_CURRENT_SOURCE_DIR}/tcp_zero_copy.cpp
${CMAKE_CURRENT_SOURCE_DIR}/buffer_pool.cpp
${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
- ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp
${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp
)
+IF(ENABLE_X300)
+ LIBUHD_APPEND_SOURCES(
+ ${CMAKE_CURRENT_SOURCE_DIR}/nirio_zero_copy.cpp
+ )
+ENDIF(ENABLE_X300)
+
# Verbose Debug output for send/recv
SET( UHD_TXRX_DEBUG_PRINTS OFF CACHE BOOL "Use verbose debug output for send/recv" )
OPTION( UHD_TXRX_DEBUG_PRINTS "Use verbose debug output for send/recv" "" )
diff --git a/host/lib/transport/nirio/lvbitx/CMakeLists.txt b/host/lib/transport/nirio/lvbitx/CMakeLists.txt
index b9a2a9f15..5741a12f8 100644
--- a/host/lib/transport/nirio/lvbitx/CMakeLists.txt
+++ b/host/lib/transport/nirio/lvbitx/CMakeLists.txt
@@ -1,5 +1,5 @@
#
-# Copyright 2013 Ettus Research LLC
+# Copyright 2013,2015 Ettus Research LLC
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@@ -30,8 +30,8 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile)
SET(IMAGES_PATH_OPT --uhd-images-path=${UHD_IMAGES_DIR})
ADD_CUSTOM_COMMAND(
- OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp
OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp
+ OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/process-lvbitx.py
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.hpp
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/template_lvbitx.cpp
@@ -41,6 +41,7 @@ MACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM lvbitx binfile)
)
#make libuhd depend on the output file
+ LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.hpp)
LIBUHD_APPEND_SOURCES(${CMAKE_CURRENT_BINARY_DIR}/${lvbitxprefix}_lvbitx.cpp)
ENDMACRO(LIBUHD_LVBITX_GEN_SOURCE_AND_BITSTREAM)
diff --git a/host/lib/transport/nirio/rpc/rpc_client.cpp b/host/lib/transport/nirio/rpc/rpc_client.cpp
index bbaf9f235..3d62b57ae 100644
--- a/host/lib/transport/nirio/rpc/rpc_client.cpp
+++ b/host/lib/transport/nirio/rpc/rpc_client.cpp
@@ -1,5 +1,5 @@
///
-// Copyright 2013 Ettus Research LLC
+// Copyright 2013,2016 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -55,22 +55,7 @@ rpc_client::rpc_client (
tcp::resolver::query::flags query_flags(tcp::resolver::query::passive);
tcp::resolver::query query(tcp::v4(), server, port, query_flags);
tcp::resolver::iterator iterator = resolver.resolve(query);
-
- #if BOOST_VERSION < 104700
- // default constructor creates end iterator
- tcp::resolver::iterator end;
-
- boost::system::error_code error = boost::asio::error::host_not_found;
- while (error && iterator != end)
- {
- _socket.close();
- _socket.connect(*iterator++, error);
- }
- if (error)
- throw boost::system::system_error(error);
- #else
- boost::asio::connect(_socket, iterator);
- #endif
+ boost::asio::connect(_socket, iterator);
UHD_LOG << "rpc_client connected to server." << std::endl;
@@ -109,11 +94,6 @@ rpc_client::rpc_client (
} catch (boost::exception&) {
UHD_LOG << "rpc_client connection request cancelled/aborted." << std::endl;
_exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
-#if BOOST_VERSION < 104700
- } catch (std::exception& e) {
- UHD_LOG << "rpc_client connection error: " << e.what() << std::endl;
- _exec_err.assign(boost::asio::error::connection_aborted, boost::asio::error::get_system_category());
-#endif
}
}
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 8bfa1973a..5ca18d4a0 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -24,7 +24,6 @@
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
@@ -35,7 +34,6 @@
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
-#include <boost/thread/barrier.hpp>
#include <iostream>
#include <vector>
@@ -92,22 +90,15 @@ public:
}
~recv_packet_handler(void){
- _task_barrier.interrupt();
- _task_handlers.clear();
+ /* NOP */
}
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
- _task_handlers.clear();
_props.resize(size);
//re-initialize all buffers infos by re-creating the vector
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
- _task_barrier.resize(size);
- _task_handlers.resize(size);
- for (size_t i = 1/*skip 0*/; i < size; i++){
- _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
- };
}
//! Get the channel width of this handler
@@ -127,7 +118,7 @@ public:
* \param threshold number of packets per channel
*/
void set_alignment_failure_threshold(const size_t threshold){
- _alignment_faulure_threshold = threshold*this->size();
+ _alignment_failure_threshold = threshold*this->size();
}
//! Set the rate of ticks per second
@@ -203,6 +194,12 @@ public:
//! Overload call to issue stream commands
void issue_stream_cmd(const stream_cmd_t &stream_cmd)
{
+ if (stream_cmd.stream_now
+ and stream_cmd.stream_mode != stream_cmd_t::STREAM_MODE_STOP_CONTINUOUS
+ and _props.size() > 1) {
+ throw uhd::runtime_error("Attempting to do multi-channel receive with stream_now == true will result in misaligned channels. Aborting.");
+ }
+
for (size_t i = 0; i < _props.size(); i++)
{
if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd);
@@ -269,7 +266,7 @@ private:
size_t _header_offset_words32;
double _tick_rate, _samp_rate;
bool _queue_error_for_next_call;
- size_t _alignment_faulure_threshold;
+ size_t _alignment_failure_threshold;
rx_metadata_t _queue_metadata;
struct xport_chan_props_type{
xport_chan_props_type(void):
@@ -593,7 +590,7 @@ private:
}
//too many iterations: detect alignment failure
- if (iterations++ > _alignment_faulure_threshold){
+ if (iterations++ > _alignment_failure_threshold){
UHD_MSG(error) << boost::format(
"The receive packet handler failed to time-align packets.\n"
"%u received packets were processed by the handler.\n"
@@ -657,7 +654,9 @@ private:
_convert_bytes_to_copy = bytes_to_copy;
//perform N channels of conversion
- converter_thread_task(0);
+ for (size_t i = 0; i < buffs.size(); i++) {
+ convert_to_out_buff(i);
+ }
//update the copy buffer's availability
info.data_bytes_to_copy -= bytes_to_copy;
@@ -670,15 +669,15 @@ private:
return nsamps_to_copy_per_io_buff;
}
- /*******************************************************************
- * Perform one thread's work of the conversion task.
- * The entry and exit use a dual synchronization barrier,
- * to wait for data to become ready and block until completion.
- ******************************************************************/
- UHD_INLINE void converter_thread_task(const size_t index)
+ /*! Run the conversion from the internal buffers to the user's output
+ * buffer.
+ *
+ * - Calls the converter
+ * - Releases internal data buffers
+ * - Updates read/write pointers
+ */
+ inline void convert_to_out_buff(const size_t index)
{
- _task_barrier.wait();
-
//shortcut references to local data structures
buffers_info_type &buff_info = get_curr_buffer_info();
per_buffer_info_type &info = buff_info[index];
@@ -702,13 +701,9 @@ private:
if (buff_info.data_bytes_to_copy == _convert_bytes_to_copy){
info.buff.reset(); //effectively a release
}
-
- if (index == 0) _task_barrier.wait_others();
}
//! Shared variables for the worker threads
- reusable_barrier _task_barrier;
- std::vector<task::sptr> _task_handlers;
size_t _convert_nsamps;
const rx_streamer::buffs_type *_convert_buffs;
size_t _convert_buffer_offset_bytes;
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index c2810842e..a6b9b12d0 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -24,11 +24,11 @@
#include <uhd/stream.hpp>
#include <uhd/utils/msg.hpp>
#include <uhd/utils/tasks.hpp>
-#include <uhd/utils/atomic.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/transport/zero_copy.hpp>
+#include <boost/thread/thread.hpp>
#include <boost/thread/thread_time.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
@@ -74,22 +74,15 @@ public:
}
~send_packet_handler(void){
- _task_barrier.interrupt();
- _task_handlers.clear();
+ /* NOP */
}
//! Resize the number of transport channels
void resize(const size_t size){
if (this->size() == size) return;
- _task_handlers.clear();
_props.resize(size);
static const boost::uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
- _task_barrier.resize(size);
- _task_handlers.resize(size);
- for (size_t i = 1/*skip 0*/; i < size; i++){
- _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
- };
}
//! Get the channel width of this handler
@@ -377,21 +370,23 @@ private:
_convert_if_packet_info = &if_packet_info;
//perform N channels of conversion
- converter_thread_task(0);
+ for (size_t i = 0; i < buffs.size(); i++) {
+ convert_to_in_buff(i);
+ }
_next_packet_seq++; //increment sequence after commits
return nsamps_per_buff;
}
- /*******************************************************************
- * Perform one thread's work of the conversion task.
- * The entry and exit use a dual synchronization barrier,
- * to wait for data to become ready and block until completion.
- ******************************************************************/
- UHD_INLINE void converter_thread_task(const size_t index)
+ /*! Run the conversion from the internal buffers to the user's input
+ * buffer.
+ *
+ * - Calls the converter
+ * - Releases internal data buffers
+ * - Updates read/write pointers
+ */
+ UHD_INLINE void convert_to_in_buff(const size_t index)
{
- _task_barrier.wait();
-
//shortcut references to local data structures
managed_send_buffer::sptr &buff = _props[index].buff;
vrt::if_packet_info_t if_packet_info = *_convert_if_packet_info;
@@ -419,13 +414,9 @@ private:
const size_t num_vita_words32 = _header_offset_words32+if_packet_info.num_packet_words32;
buff->commit(num_vita_words32*sizeof(boost::uint32_t));
buff.reset(); //effectively a release
-
- if (index == 0) _task_barrier.wait_others();
}
//! Shared variables for the worker threads
- reusable_barrier _task_barrier;
- std::vector<task::sptr> _task_handlers;
size_t _convert_nsamps;
const tx_streamer::buffs_type *_convert_buffs;
size_t _convert_buffer_offset_bytes;
diff --git a/host/lib/transport/zero_copy_recv_offload.cpp b/host/lib/transport/zero_copy_recv_offload.cpp
new file mode 100644
index 000000000..e8b013abc
--- /dev/null
+++ b/host/lib/transport/zero_copy_recv_offload.cpp
@@ -0,0 +1,158 @@
+//
+// Copyright 2016 Ettus Research
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/zero_copy_recv_offload.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/transport/buffer_pool.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/log.hpp>
+#include <uhd/utils/safe_call.hpp>
+#include <boost/format.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/bind.hpp>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+typedef bounded_buffer<managed_recv_buffer::sptr> bounded_buffer_t;
+
+/***********************************************************************
+ * Zero copy offload transport:
+ * An intermediate transport that utilizes threading to free
+ * the main thread from any receive work.
+ **********************************************************************/
+class zero_copy_recv_offload_impl : public zero_copy_recv_offload {
+public:
+ typedef boost::shared_ptr<zero_copy_recv_offload_impl> sptr;
+
+ zero_copy_recv_offload_impl(zero_copy_if::sptr transport,
+ const double timeout) :
+ _transport(transport), _timeout(timeout),
+ _inbox(transport->get_num_recv_frames()),
+ _recv_done(false)
+ {
+ UHD_LOG << "Created threaded transport" << std::endl;
+
+ // Create the receive and send threads to offload
+ // the system calls onto other threads
+ _recv_thread = boost::thread(
+ boost::bind(&zero_copy_recv_offload_impl::enqueue_recv, this)
+ );
+ }
+
+ // Receive thread flags
+ void set_recv_done()
+ {
+ boost::lock_guard<boost::mutex> guard(_recv_mutex);
+ _recv_done = true;
+ }
+
+ bool is_recv_done()
+ {
+ boost::lock_guard<boost::mutex> guard(_recv_mutex);
+ return _recv_done;
+ }
+
+ ~zero_copy_recv_offload_impl()
+ {
+ // Signal the threads we're finished
+ set_recv_done();
+
+ // Wait for them to join
+ UHD_SAFE_CALL(
+ _recv_thread.join();
+ )
+ }
+
+ // The receive thread function is responsible for
+ // pulling pointers to managed receiver buffers quickly
+ void enqueue_recv()
+ {
+ while (not is_recv_done()) {
+ managed_recv_buffer::sptr buff = _transport->get_recv_buff(_timeout);
+ if (not buff) continue;
+ _inbox.push_with_timed_wait(buff, _timeout);
+ }
+ }
+
+ /*******************************************************************
+ * Receive implementation:
+ * Pop the receive buffer pointer from the underlying transport
+ ******************************************************************/
+ managed_recv_buffer::sptr get_recv_buff(double timeout)
+ {
+ managed_recv_buffer::sptr ptr;
+ _inbox.pop_with_timed_wait(ptr, timeout);
+ return ptr;
+ }
+
+ size_t get_num_recv_frames() const
+ {
+ return _transport->get_num_recv_frames();
+ }
+
+ size_t get_recv_frame_size() const
+ {
+ return _transport->get_recv_frame_size();
+ }
+
+ /*******************************************************************
+ * Send implementation:
+ * Pass the send buffer pointer from the underlying transport
+ ******************************************************************/
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ return _transport->get_send_buff(timeout);
+ }
+
+ size_t get_num_send_frames() const
+ {
+ return _transport->get_num_send_frames();
+ }
+
+ size_t get_send_frame_size() const
+ {
+ return _transport->get_send_frame_size();
+ }
+
+private:
+ // The linked transport
+ zero_copy_if::sptr _transport;
+
+ const double _timeout;
+
+ // Shared buffers
+ bounded_buffer_t _inbox;
+
+ // Threading
+ bool _recv_done;
+ boost::thread _recv_thread;
+ boost::mutex _recv_mutex;
+};
+
+zero_copy_recv_offload::sptr zero_copy_recv_offload::make(
+ zero_copy_if::sptr transport,
+ const double timeout)
+{
+ zero_copy_recv_offload_impl::sptr zero_copy_recv_offload(
+ new zero_copy_recv_offload_impl(transport, timeout)
+ );
+
+ return zero_copy_recv_offload;
+}