summaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/udp_simple.cpp29
-rw-r--r--host/lib/transport/udp_zero_copy_none.cpp39
2 files changed, 64 insertions, 4 deletions
diff --git a/host/lib/transport/udp_simple.cpp b/host/lib/transport/udp_simple.cpp
index 491cf59db..7004bdfdf 100644
--- a/host/lib/transport/udp_simple.cpp
+++ b/host/lib/transport/udp_simple.cpp
@@ -16,12 +16,35 @@
//
#include <uhd/transport/udp_simple.hpp>
+#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <iostream>
using namespace uhd::transport;
/***********************************************************************
+ * Helper Functions
+ **********************************************************************/
+/*!
+ * A receive timeout for a socket:
+ *
+ * It seems that asio cannot have timeouts with synchronous io.
+ * However, we can implement a polling loop that will timeout.
+ * This is okay bacause this is the slow-path implementation.
+ *
+ * \param socket the asio socket
+ */
+static void reasonable_recv_timeout(
+ boost::asio::ip::udp::socket &socket
+){
+ boost::asio::deadline_timer timer(socket.get_io_service());
+ timer.expires_from_now(boost::posix_time::milliseconds(50));
+ while (not (socket.available() or timer.expires_from_now().is_negative())){
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1));
+ }
+}
+
+/***********************************************************************
* UDP connected implementation class
**********************************************************************/
class udp_connected_impl : public udp_simple{
@@ -62,7 +85,8 @@ size_t udp_connected_impl::send(const boost::asio::const_buffer &buff){
}
size_t udp_connected_impl::recv(const boost::asio::mutable_buffer &buff){
- if (_socket->available() == 0) return 0;
+ reasonable_recv_timeout(*_socket);
+ if (not _socket->available()) return 0;
return _socket->receive(boost::asio::buffer(buff));
}
@@ -112,7 +136,8 @@ size_t udp_broadcast_impl::send(const boost::asio::const_buffer &buff){
}
size_t udp_broadcast_impl::recv(const boost::asio::mutable_buffer &buff){
- if (_socket->available() == 0) return 0;
+ reasonable_recv_timeout(*_socket);
+ if (not _socket->available()) return 0;
boost::asio::ip::udp::endpoint sender_endpoint;
return _socket->receive_from(boost::asio::buffer(buff), sender_endpoint);
}
diff --git a/host/lib/transport/udp_zero_copy_none.cpp b/host/lib/transport/udp_zero_copy_none.cpp
index e95706d94..e29530cf1 100644
--- a/host/lib/transport/udp_zero_copy_none.cpp
+++ b/host/lib/transport/udp_zero_copy_none.cpp
@@ -16,6 +16,8 @@
//
#include <uhd/transport/udp_zero_copy.hpp>
+#include <boost/thread.hpp>
+#include <boost/format.hpp>
using namespace uhd::transport;
@@ -67,6 +69,9 @@ public:
private:
boost::asio::ip::udp::socket *_socket;
boost::asio::io_service _io_service;
+
+ size_t get_recv_buff_size(void);
+ void set_recv_buff_size(size_t);
};
udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::string &port){
@@ -81,6 +86,18 @@ udp_zero_copy_impl::udp_zero_copy_impl(const std::string &addr, const std::strin
_socket = new boost::asio::ip::udp::socket(_io_service);
_socket->open(boost::asio::ip::udp::v4());
_socket->connect(receiver_endpoint);
+
+ // set the rx socket buffer size:
+ // pick a huge size, and deal with whatever we get
+ set_recv_buff_size(54321e3); //some big number!
+ size_t current_buff_size = get_recv_buff_size();
+ std::cout << boost::format(
+ "Current rx socket buffer size: %d\n"
+ ) % current_buff_size;
+ if (current_buff_size < .1e6) std::cout << boost::format(
+ "Adjust max rx socket buffer size (linux only):\n"
+ " sysctl -w net.core.rmem_max=VALUE\n"
+ );
}
udp_zero_copy_impl::~udp_zero_copy_impl(void){
@@ -92,14 +109,21 @@ size_t udp_zero_copy_impl::send(const boost::asio::const_buffer &buff){
}
smart_buffer::sptr udp_zero_copy_impl::recv(void){
- size_t available = _socket->available();
+ size_t available = 0;
+
+ //implement timeout through polling and sleeping
+ boost::asio::deadline_timer timer(_socket->get_io_service());
+ timer.expires_from_now(boost::posix_time::milliseconds(50));
+ while (not ((available = _socket->available()) or timer.expires_from_now().is_negative())){
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1));
+ }
//allocate memory and create buffer
uint32_t *buff_mem = new uint32_t[available/sizeof(uint32_t)];
boost::asio::mutable_buffer buff(buff_mem, available);
//receive only if data is available
- if (available > 0){
+ if (available){
_socket->receive(boost::asio::buffer(buff));
}
@@ -107,6 +131,17 @@ smart_buffer::sptr udp_zero_copy_impl::recv(void){
return smart_buffer::sptr(new smart_buffer_impl(buff));
}
+size_t udp_zero_copy_impl::get_recv_buff_size(void){
+ boost::asio::socket_base::receive_buffer_size option;
+ _socket->get_option(option);
+ return option.value();
+}
+
+void udp_zero_copy_impl::set_recv_buff_size(size_t new_size){
+ boost::asio::socket_base::receive_buffer_size option(new_size);
+ _socket->set_option(option);
+}
+
/***********************************************************************
* UDP zero copy make function
**********************************************************************/