aboutsummaryrefslogtreecommitdiffstats
path: root/host/examples/network_relay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'host/examples/network_relay.cpp')
-rw-r--r--host/examples/network_relay.cpp161
1 files changed, 92 insertions, 69 deletions
diff --git a/host/examples/network_relay.cpp b/host/examples/network_relay.cpp
index 3f4a1d711..464d2084d 100644
--- a/host/examples/network_relay.cpp
+++ b/host/examples/network_relay.cpp
@@ -5,21 +5,21 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
-#include <uhd/utils/thread.hpp>
#include <uhd/utils/safe_main.hpp>
+#include <uhd/utils/thread.hpp>
+#include <boost/asio.hpp>
+#include <boost/format.hpp>
#include <boost/program_options.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/thread.hpp>
-#include <boost/format.hpp>
-#include <boost/asio.hpp>
-#include <iostream>
+#include <chrono>
#include <csignal>
-#include <vector>
#include <cstdlib>
-#include <chrono>
+#include <iostream>
#include <thread>
+#include <vector>
-namespace po = boost::program_options;
+namespace po = boost::program_options;
namespace asio = boost::asio;
typedef boost::shared_ptr<asio::ip::udp::socket> socket_type;
@@ -27,11 +27,11 @@ typedef boost::shared_ptr<asio::ip::udp::socket> socket_type;
static const size_t insane_mtu = 9000;
#if defined(UHD_PLATFORM_MACOS)
- //limit buffer resize on macos or it will error
- const size_t rx_dsp_buff_size = size_t(1e6);
+// limit buffer resize on macos or it will error
+const size_t rx_dsp_buff_size = size_t(1e6);
#else
- //set to half-a-second of buffering at max rate
- const size_t rx_dsp_buff_size = size_t(50e6);
+// set to half-a-second of buffering at max rate
+const size_t rx_dsp_buff_size = size_t(50e6);
#endif
const size_t tx_dsp_buff_size = (1 << 20);
@@ -40,43 +40,49 @@ const size_t tx_dsp_buff_size = (1 << 20);
* Signal handlers
**********************************************************************/
static bool stop_signal_called = false;
-void sig_int_handler(int){stop_signal_called = true;}
+void sig_int_handler(int)
+{
+ stop_signal_called = true;
+}
-static bool wait_for_recv_ready(int sock_fd){
- //setup timeval for timeout
+static bool wait_for_recv_ready(int sock_fd)
+{
+ // setup timeval for timeout
timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 100000; //100ms
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000; // 100ms
- //setup rset for timeout
+ // setup rset for timeout
fd_set rset;
FD_ZERO(&rset);
FD_SET(sock_fd, &rset);
- //call select with timeout on receive socket
- return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0;
+ // call select with timeout on receive socket
+ return ::select(sock_fd + 1, &rset, NULL, NULL, &tv) > 0;
}
/***********************************************************************
* Relay class
**********************************************************************/
-class udp_relay_type{
+class udp_relay_type
+{
public:
- udp_relay_type(
- const std::string &server_addr,
- const std::string &client_addr,
- const std::string &port,
+ udp_relay_type(const std::string& server_addr,
+ const std::string& client_addr,
+ const std::string& port,
const size_t server_rx_size = 0,
const size_t server_tx_size = 0,
const size_t client_rx_size = 0,
- const size_t client_tx_size = 0
- ):_port(port){
+ const size_t client_tx_size = 0)
+ : _port(port)
+ {
{
asio::ip::udp::resolver resolver(_io_service);
asio::ip::udp::resolver::query query(asio::ip::udp::v4(), server_addr, port);
asio::ip::udp::endpoint endpoint = *resolver.resolve(query);
- _server_socket = boost::shared_ptr<asio::ip::udp::socket>(new asio::ip::udp::socket(_io_service, endpoint));
+ _server_socket = boost::shared_ptr<asio::ip::udp::socket>(
+ new asio::ip::udp::socket(_io_service, endpoint));
resize_buffs(_server_socket, server_rx_size, server_tx_size);
}
{
@@ -84,22 +90,27 @@ public:
asio::ip::udp::resolver::query query(asio::ip::udp::v4(), client_addr, port);
asio::ip::udp::endpoint endpoint = *resolver.resolve(query);
- _client_socket = boost::shared_ptr<asio::ip::udp::socket>(new asio::ip::udp::socket(_io_service));
+ _client_socket = boost::shared_ptr<asio::ip::udp::socket>(
+ new asio::ip::udp::socket(_io_service));
_client_socket->open(asio::ip::udp::v4());
_client_socket->connect(endpoint);
resize_buffs(_client_socket, client_rx_size, client_tx_size);
}
std::cout << "spawning relay threads... " << _port << std::endl;
- boost::unique_lock<boost::mutex> lock(spawn_mutex); // lock in preparation to wait for threads to spawn
- (void)_thread_group.create_thread(boost::bind(&udp_relay_type::server_thread, this));
- wait_for_thread.wait(lock); // wait for thread to spin up
- (void)_thread_group.create_thread(boost::bind(&udp_relay_type::client_thread, this));
- wait_for_thread.wait(lock); // wait for thread to spin up
+ boost::unique_lock<boost::mutex> lock(
+ spawn_mutex); // lock in preparation to wait for threads to spawn
+ (void)_thread_group.create_thread(
+ boost::bind(&udp_relay_type::server_thread, this));
+ wait_for_thread.wait(lock); // wait for thread to spin up
+ (void)_thread_group.create_thread(
+ boost::bind(&udp_relay_type::client_thread, this));
+ wait_for_thread.wait(lock); // wait for thread to spin up
std::cout << " done!" << std::endl << std::endl;
}
- ~udp_relay_type(void){
+ ~udp_relay_type(void)
+ {
std::cout << "killing relay threads... " << _port << std::endl;
_thread_group.interrupt_all();
_thread_group.join_all();
@@ -107,31 +118,36 @@ public:
}
private:
-
- static void resize_buffs(socket_type sock, const size_t rx_size, const size_t tx_size){
- if (rx_size != 0) sock->set_option(asio::socket_base::receive_buffer_size(rx_size));
- if (tx_size != 0) sock->set_option(asio::socket_base::send_buffer_size(tx_size));
+ static void resize_buffs(socket_type sock, const size_t rx_size, const size_t tx_size)
+ {
+ if (rx_size != 0)
+ sock->set_option(asio::socket_base::receive_buffer_size(rx_size));
+ if (tx_size != 0)
+ sock->set_option(asio::socket_base::send_buffer_size(tx_size));
}
- void server_thread(void){
+ void server_thread(void)
+ {
uhd::set_thread_priority_safe();
std::cout << " entering server_thread..." << std::endl;
- wait_for_thread.notify_one(); // notify constructor that this thread has started
+ wait_for_thread.notify_one(); // notify constructor that this thread has started
std::vector<char> buff(insane_mtu);
- while (not boost::this_thread::interruption_requested()){
- if (wait_for_recv_ready(_server_socket->native_handle())){
+ while (not boost::this_thread::interruption_requested()) {
+ if (wait_for_recv_ready(_server_socket->native_handle())) {
boost::mutex::scoped_lock lock(_endpoint_mutex);
- const size_t len = _server_socket->receive_from(asio::buffer(&buff.front(), buff.size()), _endpoint);
+ const size_t len = _server_socket->receive_from(
+ asio::buffer(&buff.front(), buff.size()), _endpoint);
lock.unlock();
_client_socket->send(asio::buffer(&buff.front(), len));
- //perform sequence error detection on tx dsp data (can detect bad network cards)
+ // perform sequence error detection on tx dsp data (can detect bad network
+ // cards)
/*
if (_port[4] == '7'){
static uint32_t next_seq;
- const uint32_t this_seq = ntohl(reinterpret_cast<const uint32_t *>(&buff.front())[0]);
- if (next_seq != this_seq and this_seq != 0) std::cout << "S" << std::flush;
- next_seq = this_seq + 1;
+ const uint32_t this_seq = ntohl(reinterpret_cast<const uint32_t
+ *>(&buff.front())[0]); if (next_seq != this_seq and this_seq != 0)
+ std::cout << "S" << std::flush; next_seq = this_seq + 1;
}
*/
}
@@ -139,14 +155,16 @@ private:
std::cout << " exiting server_thread..." << std::endl;
}
- void client_thread(void){
+ void client_thread(void)
+ {
uhd::set_thread_priority_safe();
std::cout << " entering client_thread..." << std::endl;
- wait_for_thread.notify_one(); // notify constructor that this thread has started
+ wait_for_thread.notify_one(); // notify constructor that this thread has started
std::vector<char> buff(insane_mtu);
- while (not boost::this_thread::interruption_requested()){
- if (wait_for_recv_ready(_client_socket->native_handle())){
- const size_t len = _client_socket->receive(asio::buffer(&buff.front(), buff.size()));
+ while (not boost::this_thread::interruption_requested()) {
+ if (wait_for_recv_ready(_client_socket->native_handle())) {
+ const size_t len =
+ _client_socket->receive(asio::buffer(&buff.front(), buff.size()));
boost::mutex::scoped_lock lock(_endpoint_mutex);
_server_socket->send_to(asio::buffer(&buff.front(), len), _endpoint);
}
@@ -168,14 +186,15 @@ private:
/***********************************************************************
* Main
**********************************************************************/
-int UHD_SAFE_MAIN(int argc, char *argv[]){
+int UHD_SAFE_MAIN(int argc, char* argv[])
+{
uhd::set_thread_priority_safe();
- //variables to be set by po
+ // variables to be set by po
std::string addr;
std::string bind;
- //setup the program options
+ // setup the program options
po::options_description desc("Allowed options");
// clang-format off
desc.add_options()
@@ -188,32 +207,36 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){
po::store(po::parse_command_line(argc, argv, desc), vm);
po::notify(vm);
- //print the help message
- if (vm.count("help") or not vm.count("addr")){
- std::cout
- << boost::format("UHD Network Relay %s") % desc << std::endl
- << "Runs a network relay between UHD on one computer and a USRP on the network.\n"
- << "This example is basically for test purposes. Use at your own convenience.\n"
- << std::endl;
+ // print the help message
+ if (vm.count("help") or not vm.count("addr")) {
+ std::cout << boost::format("UHD Network Relay %s") % desc << std::endl
+ << "Runs a network relay between UHD on one computer and a USRP on the "
+ "network.\n"
+ << "This example is basically for test purposes. Use at your own "
+ "convenience.\n"
+ << std::endl;
return EXIT_FAILURE;
}
{
- boost::shared_ptr<udp_relay_type> ctrl (new udp_relay_type(bind, addr, "49152"));
- boost::shared_ptr<udp_relay_type> rxdsp0(new udp_relay_type(bind, addr, "49156", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0));
- boost::shared_ptr<udp_relay_type> txdsp0(new udp_relay_type(bind, addr, "49157", tx_dsp_buff_size, 0, 0, tx_dsp_buff_size));
- boost::shared_ptr<udp_relay_type> rxdsp1(new udp_relay_type(bind, addr, "49158", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0));
- boost::shared_ptr<udp_relay_type> gps (new udp_relay_type(bind, addr, "49172"));
+ boost::shared_ptr<udp_relay_type> ctrl(new udp_relay_type(bind, addr, "49152"));
+ boost::shared_ptr<udp_relay_type> rxdsp0(new udp_relay_type(
+ bind, addr, "49156", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0));
+ boost::shared_ptr<udp_relay_type> txdsp0(new udp_relay_type(
+ bind, addr, "49157", tx_dsp_buff_size, 0, 0, tx_dsp_buff_size));
+ boost::shared_ptr<udp_relay_type> rxdsp1(new udp_relay_type(
+ bind, addr, "49158", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0));
+ boost::shared_ptr<udp_relay_type> gps(new udp_relay_type(bind, addr, "49172"));
std::signal(SIGINT, &sig_int_handler);
std::cout << "Press Ctrl + C to stop streaming..." << std::endl;
- while (not stop_signal_called){
+ while (not stop_signal_called) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
- //finished
+ // finished
std::cout << std::endl << "Done!" << std::endl << std::endl;
return EXIT_SUCCESS;