From a6d3483bf9d98ea3a645b11c1f70ca01def794b3 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 20 Dec 2011 14:37:34 -0800 Subject: usrp2: created network relay example app --- host/examples/CMakeLists.txt | 1 + host/examples/network_relay.cpp | 194 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 195 insertions(+) create mode 100644 host/examples/network_relay.cpp (limited to 'host/examples') diff --git a/host/examples/CMakeLists.txt b/host/examples/CMakeLists.txt index bad7b72a6..4f3650e63 100644 --- a/host/examples/CMakeLists.txt +++ b/host/examples/CMakeLists.txt @@ -20,6 +20,7 @@ ######################################################################## SET(example_sources benchmark_rate.cpp + network_relay.cpp rx_multi_samples.cpp rx_samples_to_file.cpp rx_samples_to_udp.cpp diff --git a/host/examples/network_relay.cpp b/host/examples/network_relay.cpp new file mode 100644 index 000000000..f9f2e33fc --- /dev/null +++ b/host/examples/network_relay.cpp @@ -0,0 +1,194 @@ +// +// Copyright 2010-2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace po = boost::program_options; +namespace asio = boost::asio; + +typedef boost::shared_ptr socket_type; + +static const size_t insane_mtu = 9000; + +boost::mutex spawn_mutex; + +/*********************************************************************** + * Signal handlers + **********************************************************************/ +static bool stop_signal_called = false; +void sig_int_handler(int){stop_signal_called = true;} + +static bool wait_for_recv_ready(int sock_fd){ + //setup timeval for timeout + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; //100ms + + //setup rset for timeout + fd_set rset; + FD_ZERO(&rset); + FD_SET(sock_fd, &rset); + + //call select with timeout on receive socket + return ::select(sock_fd+1, &rset, NULL, NULL, &tv) > 0; +} + +/*********************************************************************** + * Relay class + **********************************************************************/ +class udp_relay_type{ +public: + udp_relay_type( + const std::string &server_addr, + const std::string &client_addr, + const std::string &port + ):_port(port){ + { + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), server_addr, port); + asio::ip::udp::endpoint endpoint = *resolver.resolve(query); + + _server_socket = boost::shared_ptr(new asio::ip::udp::socket(_io_service, endpoint)); + } + { + asio::ip::udp::resolver resolver(_io_service); + asio::ip::udp::resolver::query query(asio::ip::udp::v4(), client_addr, port); + asio::ip::udp::endpoint endpoint = *resolver.resolve(query); + + _client_socket = boost::shared_ptr(new asio::ip::udp::socket(_io_service)); + _client_socket->open(asio::ip::udp::v4()); + _client_socket->connect(endpoint); + } + + std::cout << "spawning relay threads..." << std::endl; + _thread_group.create_thread(boost::bind(&udp_relay_type::server_thread, this)); + spawn_mutex.lock(); + spawn_mutex.lock(); + spawn_mutex.unlock(); + _thread_group.create_thread(boost::bind(&udp_relay_type::client_thread, this)); + spawn_mutex.lock(); + spawn_mutex.lock(); + spawn_mutex.unlock(); + std::cout << " done" << std::endl; + } + + ~udp_relay_type(void){ + std::cout << "killing relay threads..." << std::endl; + _thread_group.interrupt_all(); + _thread_group.join_all(); + std::cout << " done" << std::endl; + } + +private: + + void server_thread(void){ + std::cout << "entering server_thread... " << _port << std::endl; + spawn_mutex.unlock(); + std::vector buff(insane_mtu); + while (not boost::this_thread::interruption_requested()){ + if (wait_for_recv_ready(_server_socket->native())){ + boost::mutex::scoped_lock lock(_endpoint_mutex); + const size_t len = _server_socket->receive_from(asio::buffer(&buff.front(), buff.size()), _endpoint); + lock.unlock(); + //std::cout << " len " << len << std::endl; + _client_socket->send(asio::buffer(&buff.front(), len)); + } + } + std::cout << "exiting server_thread... " << _port << std::endl; + } + + void client_thread(void){ + std::cout << "entering client_thread... " << _port << std::endl; + spawn_mutex.unlock(); + std::vector buff(insane_mtu); + while (not boost::this_thread::interruption_requested()){ + if (wait_for_recv_ready(_client_socket->native())){ + const size_t len = _client_socket->receive(asio::buffer(&buff.front(), buff.size())); + //std::cout << " len " << len << std::endl; + boost::mutex::scoped_lock lock(_endpoint_mutex); + _server_socket->send_to(asio::buffer(&buff.front(), len), _endpoint); + } + } + std::cout << "exiting client_thread... " << _port << std::endl; + } + + const std::string _port; + boost::thread_group _thread_group; + asio::io_service _io_service; + asio::ip::udp::endpoint _endpoint; + boost::mutex _endpoint_mutex; + socket_type _server_socket, _client_socket; +}; + + +/*********************************************************************** + * Main + **********************************************************************/ +int UHD_SAFE_MAIN(int argc, char *argv[]){ + uhd::set_thread_priority_safe(); + + //variables to be set by po + std::string addr; + + //setup the program options + po::options_description desc("Allowed options"); + desc.add_options() + ("help", "help message") + ("addr", po::value(&addr)->default_value(""), "the resolvable address of the usrp") + ; + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, desc), vm); + po::notify(vm); + + //print the help message + if (vm.count("help")){ + std::cout + << boost::format("UHD Network Relay %s") % desc << std::endl + << "Runs a network relay between UHD on one computer and a USRP on the network.\n" + << "This example is basically for test purposes. Use at your own convenience.\n" + << std::endl; + return ~0; + } + + { + boost::shared_ptr ctrl(new udp_relay_type("0.0.0.0", addr, "49152")); + boost::shared_ptr rxdsp0(new udp_relay_type("0.0.0.0", addr, "49156")); + boost::shared_ptr txdsp0(new udp_relay_type("0.0.0.0", addr, "49157")); + boost::shared_ptr rxdsp1(new udp_relay_type("0.0.0.0", addr, "49158")); + boost::shared_ptr gps(new udp_relay_type("0.0.0.0", addr, "49172")); + + std::signal(SIGINT, &sig_int_handler); + std::cout << "Press Ctrl + C to stop streaming..." << std::endl; + + while (not stop_signal_called){ + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + } + } + + //finished + std::cout << std::endl << "Done!" << std::endl << std::endl; + + return 0; +} -- cgit v1.2.3 From bf575a8a4fbaf72421e87cf3482c0beb78a10703 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 20 Dec 2011 15:26:52 -0800 Subject: usrp2: more work on relay app and sequence error detection --- host/examples/network_relay.cpp | 59 +++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 14 deletions(-) (limited to 'host/examples') diff --git a/host/examples/network_relay.cpp b/host/examples/network_relay.cpp index f9f2e33fc..6ecf27858 100644 --- a/host/examples/network_relay.cpp +++ b/host/examples/network_relay.cpp @@ -34,6 +34,16 @@ static const size_t insane_mtu = 9000; boost::mutex spawn_mutex; +#if defined(UHD_PLATFORM_MACOS) || defined(UHD_PLATFORM_BSD) + //limit buffer resize on macos or it will error + const size_t rx_dsp_buff_size = 1e6; +#elif defined(UHD_PLATFORM_LINUX) || defined(UHD_PLATFORM_WIN32) + //set to half-a-second of buffering at max rate + const size_t rx_dsp_buff_size = 50e6; +#endif + +const size_t tx_dsp_buff_size = (1 << 20); + /*********************************************************************** * Signal handlers **********************************************************************/ @@ -63,7 +73,11 @@ public: udp_relay_type( const std::string &server_addr, const std::string &client_addr, - const std::string &port + const std::string &port, + const size_t server_rx_size = 0, + const size_t server_tx_size = 0, + const size_t client_rx_size = 0, + const size_t client_tx_size = 0 ):_port(port){ { asio::ip::udp::resolver resolver(_io_service); @@ -71,6 +85,7 @@ public: asio::ip::udp::endpoint endpoint = *resolver.resolve(query); _server_socket = boost::shared_ptr(new asio::ip::udp::socket(_io_service, endpoint)); + resize_buffs(_server_socket, server_rx_size, server_tx_size); } { asio::ip::udp::resolver resolver(_io_service); @@ -80,9 +95,10 @@ public: _client_socket = boost::shared_ptr(new asio::ip::udp::socket(_io_service)); _client_socket->open(asio::ip::udp::v4()); _client_socket->connect(endpoint); + resize_buffs(_client_socket, client_rx_size, client_tx_size); } - std::cout << "spawning relay threads..." << std::endl; + std::cout << "spawning relay threads... " << _port << std::endl; _thread_group.create_thread(boost::bind(&udp_relay_type::server_thread, this)); spawn_mutex.lock(); spawn_mutex.lock(); @@ -91,20 +107,26 @@ public: spawn_mutex.lock(); spawn_mutex.lock(); spawn_mutex.unlock(); - std::cout << " done" << std::endl; + std::cout << " done!" << std::endl << std::endl; } ~udp_relay_type(void){ - std::cout << "killing relay threads..." << std::endl; + std::cout << "killing relay threads... " << _port << std::endl; _thread_group.interrupt_all(); _thread_group.join_all(); - std::cout << " done" << std::endl; + std::cout << " done!" << std::endl << std::endl; } private: + static void resize_buffs(socket_type sock, const size_t rx_size, const size_t tx_size){ + if (rx_size != 0) sock->set_option(asio::socket_base::receive_buffer_size(rx_size)); + if (tx_size != 0) sock->set_option(asio::socket_base::send_buffer_size(tx_size)); + } + void server_thread(void){ - std::cout << "entering server_thread... " << _port << std::endl; + uhd::set_thread_priority_safe(); + std::cout << " entering server_thread..." << std::endl; spawn_mutex.unlock(); std::vector buff(insane_mtu); while (not boost::this_thread::interruption_requested()){ @@ -112,26 +134,35 @@ private: boost::mutex::scoped_lock lock(_endpoint_mutex); const size_t len = _server_socket->receive_from(asio::buffer(&buff.front(), buff.size()), _endpoint); lock.unlock(); - //std::cout << " len " << len << std::endl; _client_socket->send(asio::buffer(&buff.front(), len)); + + //perform sequence error detection on tx dsp data (can detect bad network cards) + /* + if (_port[4] == '7'){ + static boost::uint32_t next_seq; + const boost::uint32_t this_seq = ntohl(reinterpret_cast(&buff.front())[0]); + if (next_seq != this_seq and this_seq != 0) std::cout << "S" << std::flush; + next_seq = this_seq + 1; + } + */ } } - std::cout << "exiting server_thread... " << _port << std::endl; + std::cout << " exiting server_thread..." << std::endl; } void client_thread(void){ - std::cout << "entering client_thread... " << _port << std::endl; + uhd::set_thread_priority_safe(); + std::cout << " entering client_thread..." << std::endl; spawn_mutex.unlock(); std::vector buff(insane_mtu); while (not boost::this_thread::interruption_requested()){ if (wait_for_recv_ready(_client_socket->native())){ const size_t len = _client_socket->receive(asio::buffer(&buff.front(), buff.size())); - //std::cout << " len " << len << std::endl; boost::mutex::scoped_lock lock(_endpoint_mutex); _server_socket->send_to(asio::buffer(&buff.front(), len), _endpoint); } } - std::cout << "exiting client_thread... " << _port << std::endl; + std::cout << " exiting client_thread..." << std::endl; } const std::string _port; @@ -174,9 +205,9 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){ { boost::shared_ptr ctrl(new udp_relay_type("0.0.0.0", addr, "49152")); - boost::shared_ptr rxdsp0(new udp_relay_type("0.0.0.0", addr, "49156")); - boost::shared_ptr txdsp0(new udp_relay_type("0.0.0.0", addr, "49157")); - boost::shared_ptr rxdsp1(new udp_relay_type("0.0.0.0", addr, "49158")); + boost::shared_ptr rxdsp0(new udp_relay_type("0.0.0.0", addr, "49156", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0)); + boost::shared_ptr txdsp0(new udp_relay_type("0.0.0.0", addr, "49157", tx_dsp_buff_size, 0, 0, tx_dsp_buff_size)); + boost::shared_ptr rxdsp1(new udp_relay_type("0.0.0.0", addr, "49158", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0)); boost::shared_ptr gps(new udp_relay_type("0.0.0.0", addr, "49172")); std::signal(SIGINT, &sig_int_handler); -- cgit v1.2.3 From bdb267b9f445e929f765b481c038edbd6310ce85 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Tue, 20 Dec 2011 17:13:28 -0800 Subject: usrp2: added bind option to the relay --- host/examples/network_relay.cpp | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) (limited to 'host/examples') diff --git a/host/examples/network_relay.cpp b/host/examples/network_relay.cpp index 6ecf27858..8909c7530 100644 --- a/host/examples/network_relay.cpp +++ b/host/examples/network_relay.cpp @@ -182,19 +182,21 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){ //variables to be set by po std::string addr; + std::string bind; //setup the program options po::options_description desc("Allowed options"); desc.add_options() ("help", "help message") - ("addr", po::value(&addr)->default_value(""), "the resolvable address of the usrp") + ("addr", po::value(&addr), "the resolvable address of the usrp (must be specified)") + ("bind", po::value(&bind)->default_value("0.0.0.0"), "bind the server to this network address (default: any)") ; po::variables_map vm; po::store(po::parse_command_line(argc, argv, desc), vm); po::notify(vm); //print the help message - if (vm.count("help")){ + if (vm.count("help") or not vm.count("addr")){ std::cout << boost::format("UHD Network Relay %s") % desc << std::endl << "Runs a network relay between UHD on one computer and a USRP on the network.\n" @@ -204,11 +206,11 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){ } { - boost::shared_ptr ctrl(new udp_relay_type("0.0.0.0", addr, "49152")); - boost::shared_ptr rxdsp0(new udp_relay_type("0.0.0.0", addr, "49156", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0)); - boost::shared_ptr txdsp0(new udp_relay_type("0.0.0.0", addr, "49157", tx_dsp_buff_size, 0, 0, tx_dsp_buff_size)); - boost::shared_ptr rxdsp1(new udp_relay_type("0.0.0.0", addr, "49158", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0)); - boost::shared_ptr gps(new udp_relay_type("0.0.0.0", addr, "49172")); + boost::shared_ptr ctrl (new udp_relay_type(bind, addr, "49152")); + boost::shared_ptr rxdsp0(new udp_relay_type(bind, addr, "49156", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0)); + boost::shared_ptr txdsp0(new udp_relay_type(bind, addr, "49157", tx_dsp_buff_size, 0, 0, tx_dsp_buff_size)); + boost::shared_ptr rxdsp1(new udp_relay_type(bind, addr, "49158", 0, tx_dsp_buff_size, rx_dsp_buff_size, 0)); + boost::shared_ptr gps (new udp_relay_type(bind, addr, "49172")); std::signal(SIGINT, &sig_int_handler); std::cout << "Press Ctrl + C to stop streaming..." << std::endl; -- cgit v1.2.3