aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/udp_simple.cpp44
-rw-r--r--host/lib/transport/udp_zero_copy_asio.cpp31
-rw-r--r--host/lib/transport/vrt_packet_handler.hpp16
3 files changed, 55 insertions, 36 deletions
diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp
index 89750f99d..5829b462b 100644
--- a/host/lib/transport/udp_simple.cpp
+++ b/host/lib/transport/udp_simple.cpp
@@ -27,23 +27,25 @@ using namespace uhd::transport;
* Helper Functions
**********************************************************************/
/*!
- * A receive timeout for a socket:
- *
- * It seems that asio cannot have timeouts with synchronous io.
- * However, we can implement a polling loop that will timeout.
- * This is okay bacause this is the slow-path implementation.
- *
+ * Wait for available data or timeout.
* \param socket the asio socket
- * \param timeout_ms the timeout in milliseconds
+ * \param timeout the timeout in seconds
+ * \return false for timeout, true for data
*/
-static void reasonable_recv_timeout(
- boost::asio::ip::udp::socket &socket, size_t timeout_ms
+static bool wait_available(
+ boost::asio::ip::udp::socket &socket, double timeout
){
- boost::asio::deadline_timer timer(socket.get_io_service());
- timer.expires_from_now(boost::posix_time::milliseconds(timeout_ms));
- while (not (socket.available() or timer.expires_from_now().is_negative())){
- boost::this_thread::sleep(boost::posix_time::milliseconds(1));
- }
+ //setup timeval for timeout
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = long(timeout*1e6);
+
+ //setup rset for timeout
+ fd_set rset;
+ FD_ZERO(&rset);
+ FD_SET(socket.native(), &rset);
+
+ return ::select(socket.native()+1, &rset, NULL, NULL, &tv) > 0;
}
/***********************************************************************
@@ -57,7 +59,7 @@ public:
//send/recv
size_t send(const boost::asio::const_buffer &);
- size_t recv(const boost::asio::mutable_buffer &, size_t);
+ size_t recv(const boost::asio::mutable_buffer &, double);
private:
boost::asio::ip::udp::socket *_socket;
@@ -86,9 +88,8 @@ size_t udp_connected_impl::send(const boost::asio::const_buffer &buff){
return _socket->send(boost::asio::buffer(buff));
}
-size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){
- reasonable_recv_timeout(*_socket, timeout_ms);
- if (not _socket->available()) return 0;
+size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){
+ if (not wait_available(*_socket, timeout)) return 0;
return _socket->receive(boost::asio::buffer(buff));
}
@@ -103,7 +104,7 @@ public:
//send/recv
size_t send(const boost::asio::const_buffer &);
- size_t recv(const boost::asio::mutable_buffer &, size_t);
+ size_t recv(const boost::asio::mutable_buffer &, double);
private:
boost::asio::ip::udp::socket *_socket;
@@ -137,9 +138,8 @@ size_t udp_broadcast_impl::send(const boost::asio::const_buffer &buff){
return _socket->send_to(boost::asio::buffer(buff), _receiver_endpoint);
}
-size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, size_t timeout_ms){
- reasonable_recv_timeout(*_socket, timeout_ms);
- if (not _socket->available()) return 0;
+size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff, double timeout){
+ if (not wait_available(*_socket, timeout)) return 0;
boost::asio::ip::udp::endpoint sender_endpoint;
return _socket->receive_from(boost::asio::buffer(buff), sender_endpoint);
}
diff --git a/host/lib/transport/udp_zero_copy_asio.cpp b/host/lib/transport/udp_zero_copy_asio.cpp
index ed29864e9..c758fa894 100644
--- a/host/lib/transport/udp_zero_copy_asio.cpp
+++ b/host/lib/transport/udp_zero_copy_asio.cpp
@@ -59,16 +59,23 @@ static const size_t DEFAULT_NUM_RECV_FRAMES = 32;
#else
static const size_t DEFAULT_NUM_RECV_FRAMES = MIN_RECV_SOCK_BUFF_SIZE/udp_simple::mtu;
#endif
+
//The non-async send only ever requires a single frame
//because the buffer will be committed before a new get.
#ifdef USE_ASIO_ASYNC_SEND
static const size_t DEFAULT_NUM_SEND_FRAMES = 32;
#else
-static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;;
+static const size_t DEFAULT_NUM_SEND_FRAMES = MIN_SEND_SOCK_BUFF_SIZE/udp_simple::mtu;
#endif
-//a single concurrent thread for io_service seems to be the fastest
+//The number of service threads to spawn for async ASIO:
+//A single concurrent thread for io_service seems to be the fastest.
+//Threads are disabled when no async implementations are enabled.
+#if defined(USE_ASIO_ASYNC_RECV) || defined(USE_ASIO_ASYNC_SEND)
static const size_t CONCURRENCY_HINT = 1;
+#else
+static const size_t CONCURRENCY_HINT = 0;
+#endif
/***********************************************************************
* Zero Copy UDP implementation with ASIO:
@@ -86,11 +93,12 @@ public:
const std::string &port,
const device_addr_t &hints
):
- _io_service(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
_recv_frame_size(size_t(hints.cast<double>("recv_frame_size", udp_simple::mtu))),
_num_recv_frames(size_t(hints.cast<double>("num_recv_frames", DEFAULT_NUM_RECV_FRAMES))),
_send_frame_size(size_t(hints.cast<double>("send_frame_size", udp_simple::mtu))),
- _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES)))
+ _num_send_frames(size_t(hints.cast<double>("num_send_frames", DEFAULT_NUM_SEND_FRAMES))),
+ _concurrency_hint(hints.cast<size_t>("concurrency_hint", CONCURRENCY_HINT)),
+ _io_service(_concurrency_hint)
{
//std::cout << boost::format("Creating udp transport for %s %s") % addr % port << std::endl;
@@ -129,7 +137,7 @@ public:
//spawn the service threads that will run the io service
_work = new asio::io_service::work(_io_service); //new work to delete later
- for (size_t i = 0; i < CONCURRENCY_HINT; i++) _thread_group.create_thread(
+ for (size_t i = 0; i < _concurrency_hint; i++) _thread_group.create_thread(
boost::bind(&udp_zero_copy_asio_impl::service, this)
);
}
@@ -292,12 +300,6 @@ public:
size_t get_send_frame_size(void) const {return _send_frame_size;}
private:
- //asio guts -> socket and service
- asio::ip::udp::socket *_socket;
- asio::io_service _io_service;
- asio::io_service::work *_work;
- int _sock_fd;
-
//memory management -> buffers and fifos
boost::thread_group _thread_group;
boost::shared_array<char> _send_buffer, _recv_buffer;
@@ -305,6 +307,13 @@ private:
pending_buffs_type::sptr _pending_recv_buffs, _pending_send_buffs;
const size_t _recv_frame_size, _num_recv_frames;
const size_t _send_frame_size, _num_send_frames;
+
+ //asio guts -> socket and service
+ size_t _concurrency_hint;
+ asio::io_service _io_service;
+ asio::ip::udp::socket *_socket;
+ asio::io_service::work *_work;
+ int _sock_fd;
};
/***********************************************************************
diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp
index 939517411..278bcfeaa 100644
--- a/host/lib/transport/vrt_packet_handler.hpp
+++ b/host/lib/transport/vrt_packet_handler.hpp
@@ -318,7 +318,7 @@ template <typename T> UHD_INLINE T get_context_code(
){
//load the rest of the if_packet_info in here
if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*otw_type.get_sample_size())/sizeof(boost::uint32_t);
- if_packet_info.packet_count = state.next_packet_seq++;
+ if_packet_info.packet_count = state.next_packet_seq;
//get send buffers for each channel
managed_send_buffs_t send_buffs(buffs.size()/chans_per_otw_buff);
@@ -345,6 +345,7 @@ template <typename T> UHD_INLINE T get_context_code(
size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);
send_buffs[i]->commit(num_bytes_total);
}
+ state.next_packet_seq++; //increment sequence after commits
return num_samps;
}
@@ -387,10 +388,19 @@ template <typename T> UHD_INLINE T get_context_code(
if_packet_info.sob = metadata.start_of_burst;
if_packet_info.eob = metadata.end_of_burst;
+ //TODO remove this code when sample counts of zero are supported by hardware
+ std::vector<const void *> buffs_(buffs);
+ size_t total_num_samps_(total_num_samps);
+ if (total_num_samps == 0){
+ static const boost::uint64_t zeros = 0; //max size of a host sample
+ buffs_ = std::vector<const void *>(buffs.size(), &zeros);
+ total_num_samps_ = 1;
+ }
+
return _send1(
state,
- buffs, 0,
- std::min(total_num_samps, max_samples_per_packet),
+ buffs_, 0,
+ std::min(total_num_samps_, max_samples_per_packet),
if_packet_info,
io_type, otw_type,
vrt_packer,