aboutsummaryrefslogtreecommitdiffstats
path: root/host/examples/network_relay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/examples/network_relay.cpp')
-rw-r--r--host/examples/network_relay.cpp59
1 files changed, 45 insertions, 14 deletions
diff --git a/host/examples/network_relay.cpp b/host/examples/network_relay.cpp
index f9f2e33fc..6ecf27858 100644
--- a/host/examples/network_relay.cpp
+++ b/host/examples/network_relay.cpp
@@ -34,6 +34,16 @@ static const size_t insane_mtu = 9000;
boost::mutex spawn_mutex;
+#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD)
+ //limit buffer resize on macos or it will error
+ const size_t rx_dsp_buff_size = 1e6;
+#elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32)
+ //set to half-a-second of buffering at max rate
+ const size_t rx_dsp_buff_size = 50e6;
+#endif
+
+const size_t tx_dsp_buff_size = (1 << 20);
+
/***********************************************************************
* Signal handlers
**********************************************************************/
@@ -63,7 +73,11 @@ public:
udp_relay_type(
const std::string &server_addr,
const std::string &client_addr,
- const std::string &port
+ const std::string &port,
+ const size_t server_rx_size = 0,
+ const size_t server_tx_size = 0,
+ const size_t client_rx_size = 0,
+ const size_t client_tx_size = 0
):_port(port){
{
asio::ip::udp::resolver resolver(_io_service);
@@ -71,6 +85,7 @@ public:
asio::ip::udp::endpoint endpoint = *resolver.resolve(query);
_server_socket = boost::shared_ptr<asio::ip::udp::socket>(new asio::ip::udp::socket(_io_service, endpoint));
+ resize_buffs(_server_socket, server_rx_size, server_tx_size);
}
{
asio::ip::udp::resolver resolver(_io_service);
@@ -80,9 +95,10 @@ public:
_client_socket = boost::shared_ptr<asio::ip::udp::socket>(new asio::ip::udp::socket(_io_service));
_client_socket->open(asio::ip::udp::v4());
_client_socket->connect(endpoint);
+ resize_buffs(_client_socket, client_rx_size, client_tx_size);
}
- std::cout << "spawning relay threads..." << std::endl;
+ std::cout << "spawning relay threads... " << _port << std::endl;
_thread_group.create_thread(boost::bind(&udp_relay_type::server_thread, this));
spawn_mutex.lock();
spawn_mutex.lock();
@@ -91,20 +107,26 @@ public:
spawn_mutex.lock();
spawn_mutex.lock();
spawn_mutex.unlock();
- std::cout << " done" << std::endl;
+ std::cout << " done!" << std::endl << std::endl;
}
~udp_relay_type(void){
- std::cout << "killing relay threads..." << std::endl;
+ std::cout << "killing relay threads... " << _port << std::endl;
_thread_group.interrupt_all();
_thread_group.join_all();
- std::cout << " done" << std::endl;
+ std::cout << " done!" << std::endl << std::endl;
}
private:
+ static void resize_buffs(socket_type sock, const size_t rx_size, const size_t tx_size){
+ if (rx_size != 0) sock->set_option(asio::socket_base::receive_buffer_size(rx_size));
+ if (tx_size != 0) sock->set_option(asio::socket_base::send_buffer_size(tx_size));
+ }
+
void server_thread(void){
- std::cout << "entering server_thread... " << _port << std::endl;
+ uhd::set_thread_priority_safe();
+ std::cout << " entering server_thread..." << std::endl;
spawn_mutex.unlock();
std::vector<char> buff(insane_mtu);
while (not boost::this_thread::interruption_requested()){
@@ -112,26 +134,35 @@ private:
boost::mutex::scoped_lock lock(_endpoint_mutex);
const size_t len = _server_socket->receive_from(asio::buffer(&buff.front(), buff.size()), _endpoint);
lock.unlock();
- //std::cout << " len " << len << std::endl;
_client_socket->send(asio::buffer(&buff.front(), len));
+
+ //perform sequence error detection on tx dsp data (can detect bad network cards)
+ /*
+ if (_port[4] == '7'){
+ static boost::uint32_t next_seq;
+ const boost::uint32_t this_seq = ntohl(reinterpret_cast<const boost::uint32_t *>(&buff.front())[0]);
+ if (next_seq != this_seq and this_seq != 0) std::cout << "S" << std::flush;
+ next_seq = this_seq + 1;
+ }
+ */
}
}
- std::cout << "exiting server_thread... " << _port << std::endl;
+ std::cout << " exiting server_thread..." << std::endl;
}
void client_thread(void){
- std::cout << "entering client_thread... " << _port << std::endl;
+ uhd::set_thread_priority_safe();
+ std::cout << " entering client_thread..." << std::endl;
spawn_mutex.unlock();
std::vector<char> buff(insane_mtu);
while (not boost::this_thread::interruption_requested()){
if (wait_for_recv_ready(_client_socket->native())){
const size_t len = _client_socket->receive(asio::buffer(&buff.front(), buff.size()));
- //std::cout << " len " << len << std::endl;
boost::mutex::scoped_lock lock(_endpoint_mutex);
_server_socket->send_to(asio::buffer(&buff.front(), len), _endpoint);
}
}
- std::cout << "exiting client_thread... " << _port << std::endl;
+ std::cout << " exiting client_thread..." << std::endl;
}
const std::string _port;
@@ -174,9 +205,9 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
{
boost::shared_ptr<udp_relay_type> ctrl(new udp_relay_type("0.0.0.0", addr, "49152"));
- boost::shared_ptr<udp_relay_type> rxdsp0(new udp_relay_type("0.0.0.0", addr, "49156"));
- boost::shared_ptr<udp_relay_type> txdsp0(new udp_relay_type("0.0.0.0", addr, "49157"));
- boost::shared_ptr<udp_relay_type> rxdsp1(new udp_relay_type("0.0.0.0", addr, "49158"));
+ boost::shared_ptr<udp_relay_type> rxdsp0(new udp_relay_type("0.0.0.0", addr, "49156", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0));
+ boost::shared_ptr<udp_relay_type> txdsp0(new udp_relay_type("0.0.0.0", addr, "49157", tx_dsp_buff_size, 0, 0, tx_dsp_buff_size));
+ boost::shared_ptr<udp_relay_type> rxdsp1(new udp_relay_type("0.0.0.0", addr, "49158", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0));
boost::shared_ptr<udp_relay_type> gps(new udp_relay_type("0.0.0.0", addr, "49172"));
std::signal(SIGINT, &sig_int_handler);