aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/tests/CMakeLists.txt11
-rw-r--r--host/tests/packet_handler_benchmark.cpp446
2 files changed, 228 insertions, 229 deletions
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index 643efba34..ea069410b 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -59,10 +59,6 @@ set(test_sources
tx_streamer_test.cpp
)
-set(benchmark_sources
- packet_handler_benchmark.cpp
-)
-
#turn each test cpp file into an executable with an int main() function
add_definitions(-DBOOST_TEST_DYN_LINK -DBOOST_TEST_MAIN)
@@ -121,7 +117,7 @@ macro(UHD_ADD_NONAPI_TEST)
get_filename_component(test_name ${test_TARGET} NAME_WE)
include_directories(${test_INCLUDE_DIRS})
add_executable(${test_name} ${test_TARGET} ${test_EXTRA_SOURCES})
- target_link_libraries(${test_name} uhd ${Boost_LIBRARIES})
+ target_link_libraries(${test_name} uhd uhd_test ${Boost_LIBRARIES})
if(NOT ${test_NOAUTORUN})
UHD_ADD_TEST(${test_name} ${test_name})
endif(NOT ${test_NOAUTORUN})
@@ -190,6 +186,11 @@ UHD_ADD_NONAPI_TEST(
)
UHD_ADD_NONAPI_TEST(
+ TARGET "packet_handler_benchmark.cpp"
+ NOAUTORUN
+)
+
+UHD_ADD_NONAPI_TEST(
TARGET "config_parser_test.cpp"
EXTRA_SOURCES ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp
)
diff --git a/host/tests/packet_handler_benchmark.cpp b/host/tests/packet_handler_benchmark.cpp
index ec7eff517..6d4849831 100644
--- a/host/tests/packet_handler_benchmark.cpp
+++ b/host/tests/packet_handler_benchmark.cpp
@@ -18,6 +18,7 @@
#include <uhd/convert.hpp>
#include <uhd/transport/chdr.hpp>
#include <uhd/transport/zero_copy.hpp>
+#include <uhd/transport/zero_copy_flow_ctrl.hpp>
#include <uhd/types/sid.hpp>
#include <uhd/utils/safe_main.hpp>
#include <uhd/utils/thread.hpp>
@@ -27,34 +28,155 @@
namespace po = boost::program_options;
using namespace uhd::transport;
-using namespace uhd::usrp;
-void benchmark_recv_packet_handler(const size_t spp, const std::string& format)
+static constexpr size_t MAX_HEADER_LEN = 16;
+static constexpr size_t LINE_SIZE = 8;
+
+//
+// Old device3 rx flow control cache and procedures
+//
+struct rx_fc_cache_t
{
- const size_t bpi = uhd::convert::get_bytes_per_item(format);
- const size_t frame_size = bpi * spp + DEVICE3_RX_MAX_HDR_LEN;
+ //! Flow control interval in bytes
+ size_t interval = 0;
+ //! Byte count at last flow control packet
+ uint32_t last_byte_count = 0;
+ //! This will wrap around, but that's OK, because math.
+ uint32_t total_bytes_consumed = 0;
+ //! This will wrap around, but that's OK, because math.
+ uint32_t total_packets_consumed = 0;
+ //! Sequence number of next flow control packet
+ uint64_t seq_num = 0;
+ uhd::sid_t sid;
+ uhd::transport::zero_copy_if::sptr xport;
+ std::function<uint32_t(uint32_t)> to_host;
+ std::function<uint32_t(uint32_t)> from_host;
+ std::function<void(
+ const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)>
+ unpack;
+ std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)>
+ pack;
+};
+
+inline bool rx_flow_ctrl(
+ boost::shared_ptr<rx_fc_cache_t> fc_cache, uhd::transport::managed_buffer::sptr buff)
+{
+ // If the caller supplied a buffer
+ if (buff) {
+ // Unpack the header
+ uhd::transport::vrt::if_packet_info_t packet_info;
+ packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t);
+ const uint32_t* pkt = buff->cast<const uint32_t*>();
+ try {
+ fc_cache->unpack(pkt, packet_info);
+ } catch (const std::exception& ex) {
+ // Log and ignore
+ UHD_LOGGER_ERROR("RX FLOW CTRL")
+ << "Error unpacking packet: " << ex.what() << std::endl;
+ return true;
+ }
- mock_zero_copy::sptr xport(new mock_zero_copy(
- vrt::if_packet_info_t::LINK_TYPE_CHDR, frame_size, frame_size));
+ // Update counters assuming the buffer is a consumed packet
+ if (not packet_info.error) {
+ const size_t bytes = 4 * (packet_info.num_header_words32 + packet_info.num_payload_words32);
+ fc_cache->total_bytes_consumed += bytes;
+ fc_cache->total_packets_consumed++;
+ }
+ }
- xport->set_reuse_recv_memory(true);
+ // Just return if there is no need to send a flow control packet
+ if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) {
+ return true;
+ }
- sph::recv_packet_streamer streamer(spp);
- streamer.set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be);
- streamer.set_tick_rate(1.0);
- streamer.set_samp_rate(1.0);
+ // Time to send a flow control packet. For the benchmark, we should never
+ // reach this point.
+ UHD_THROW_INVALID_CODE_PATH();
+}
- uhd::convert::id_type id;
- id.output_format = format;
- id.num_inputs = 1;
- id.input_format = "sc16_item32_be";
- id.num_outputs = 1;
- streamer.set_converter(id);
+inline void handle_rx_flowctrl_ack(
+ boost::shared_ptr<rx_fc_cache_t> /*fc_cache*/, const uint32_t* /*payload*/)
+{
+ // For the benchmark, we should never reach this
+ UHD_THROW_INVALID_CODE_PATH();
+}
- streamer.set_xport_chan_get_buff(0,
- [xport](double timeout) { return xport->get_recv_buff(timeout); },
- false // flush
- );
+//
+// Old device3 tx flow control cache and procedures
+//
+struct tx_fc_cache_t
+{
+ uint32_t last_byte_ack = 0;
+ uint32_t last_seq_ack = 0;
+ uint32_t byte_count = 0;
+ uint32_t pkt_count = 0;
+ uint32_t window_size = 0;
+ uint32_t fc_ack_seqnum = 0;
+ bool fc_received = false;
+ std::function<uint32_t(uint32_t)> to_host;
+ std::function<uint32_t(uint32_t)> from_host;
+ std::function<void(
+ const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)>
+ unpack;
+ std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)>
+ pack;
+};
+
+inline bool tx_flow_ctrl(boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ uhd::transport::zero_copy_if::sptr /*xport*/,
+ uhd::transport::managed_buffer::sptr buff)
+{
+ while (true) {
+ // If there is space
+ if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack)
+ >= buff->size()) {
+ // All is good - packet will be sent
+ fc_cache->byte_count += buff->size();
+ // Round up to nearest word
+ if (fc_cache->byte_count % LINE_SIZE) {
+ fc_cache->byte_count += LINE_SIZE - (fc_cache->byte_count % LINE_SIZE);
+ }
+ fc_cache->pkt_count++;
+
+ // Just zero out the counts here to avoid actually tring to read flow
+ // control packets in the benchmark
+ fc_cache->byte_count = 0;
+ fc_cache->last_byte_ack = 0;
+ fc_cache->pkt_count = 0;
+
+ return true;
+ }
+
+ // Look for a flow control message to update the space available in the
+ // buffer. For the benchmark, we should never reach this point.
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+ return false;
+}
+
+inline void tx_flow_ctrl_ack(boost::shared_ptr<tx_fc_cache_t> fc_cache,
+ uhd::transport::zero_copy_if::sptr /*send_xport*/,
+ uhd::sid_t /*send_sid*/)
+{
+ if (not fc_cache->fc_received) {
+ return;
+ }
+
+ // Time to send a flow control ACK packet. For the benchmark, we should
+ // never reach this point.
+ UHD_THROW_INVALID_CODE_PATH();
+}
+
+//
+// Benchmark functions
+//
+void benchmark_recv_packet_handler(const size_t spp, const std::string& format)
+{
+ const size_t bpi = uhd::convert::get_bytes_per_item(format);
+ const size_t frame_size = bpi * spp + MAX_HEADER_LEN;
+
+ mock_zero_copy::sptr xport(new mock_zero_copy(
+ vrt::if_packet_info_t::LINK_TYPE_CHDR, frame_size, frame_size));
// Create packet for packet handler to read
vrt::if_packet_info_t packet_info;
@@ -66,6 +188,47 @@ void benchmark_recv_packet_handler(const size_t spp, const std::string& format)
std::vector<uint32_t> recv_data(spp, 0);
xport->push_back_recv_packet(packet_info, recv_data);
+ xport->set_reuse_recv_memory(true);
+
+ // Configure xport flow control
+ boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
+ fc_cache->to_host = uhd::ntohx<uint32_t>;
+ fc_cache->from_host = uhd::htonx<uint32_t>;
+ fc_cache->pack = vrt::chdr::if_hdr_pack_be;
+ fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
+ fc_cache->xport = xport;
+ fc_cache->interval = std::numeric_limits<std::size_t>::max();
+
+ auto zero_copy_xport = zero_copy_flow_ctrl::make(
+ xport, 0, [fc_cache](managed_buffer::sptr buff) {
+ return rx_flow_ctrl(fc_cache, buff);
+ });
+
+ // Create streamer
+ auto streamer = boost::make_shared<sph::recv_packet_streamer>(spp);
+ streamer->set_tick_rate(1.0);
+ streamer->set_samp_rate(1.0);
+
+ // Configure streamer xport
+ streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be);
+ streamer->set_xport_chan_get_buff(0,
+ [zero_copy_xport](double timeout) { return zero_copy_xport->get_recv_buff(timeout); },
+ false // flush
+ );
+
+ // Configure flow control ack
+ streamer->set_xport_handle_flowctrl_ack(
+ 0, [fc_cache](const uint32_t* payload) {
+ handle_rx_flowctrl_ack(fc_cache, payload);
+ });
+
+ // Configure converter
+ uhd::convert::id_type id;
+ id.output_format = format;
+ id.num_inputs = 1;
+ id.input_format = "sc16_item32_be";
+ id.num_outputs = 1;
+ streamer->set_converter(id);
// Allocate buffer
std::vector<uint8_t> buffer(spp * bpi);
@@ -78,7 +241,7 @@ void benchmark_recv_packet_handler(const size_t spp, const std::string& format)
const size_t iterations = 1e7;
for (size_t i = 0; i < iterations; i++) {
- streamer.recv(buffers, spp, md, 1.0, true);
+ streamer->recv(buffers, spp, md, 1.0, true);
}
const auto end_time = std::chrono::steady_clock::now();
@@ -93,26 +256,48 @@ void benchmark_send_packet_handler(
const size_t spp, const std::string& format, bool use_time_spec)
{
const size_t bpi = uhd::convert::get_bytes_per_item(format);
- const size_t frame_size = bpi * spp + DEVICE3_TX_MAX_HDR_LEN;
+ const size_t frame_size = bpi * spp + MAX_HEADER_LEN;
mock_zero_copy::sptr xport(new mock_zero_copy(
vrt::if_packet_info_t::LINK_TYPE_CHDR, frame_size, frame_size));
xport->set_reuse_send_memory(true);
- sph::send_packet_streamer streamer(spp);
- streamer.set_vrt_packer(&vrt::chdr::if_hdr_pack_be);
+ // Configure flow control
+ boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t());
+ fc_cache->to_host = uhd::ntohx<uint32_t>;
+ fc_cache->from_host = uhd::htonx<uint32_t>;
+ fc_cache->pack = vrt::chdr::if_hdr_pack_be;
+ fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
+ fc_cache->window_size = UINT32_MAX;
+
+ auto zero_copy_xport = zero_copy_flow_ctrl::make(xport,
+ [fc_cache, xport](managed_buffer::sptr buff) {
+ return tx_flow_ctrl(fc_cache, xport, buff);
+ },
+ 0);
+ // Create streamer
+ auto streamer = boost::make_shared<sph::send_packet_streamer>(spp);
+ streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be);
+
+ // Configure converter
uhd::convert::id_type id;
id.input_format = format;
id.num_inputs = 1;
id.output_format = "sc16_item32_be";
id.num_outputs = 1;
- streamer.set_converter(id);
- streamer.set_enable_trailer(false);
+ streamer->set_converter(id);
+ streamer->set_enable_trailer(false);
+
+ // Configure streamer xport
+ streamer->set_xport_chan_get_buff(
+ 0, [zero_copy_xport](double timeout) { return zero_copy_xport->get_send_buff(timeout); });
- streamer.set_xport_chan_get_buff(
- 0, [xport](double timeout) { return xport->get_send_buff(timeout); });
+ // Configure flow control ack
+ streamer->set_xport_chan_post_send_cb(0, [fc_cache, zero_copy_xport]() {
+ tx_flow_ctrl_ack(fc_cache, zero_copy_xport, 0);
+ });
// Allocate buffer
std::vector<uint8_t> buffer(spp * bpi);
@@ -130,7 +315,7 @@ void benchmark_send_packet_handler(
if (use_time_spec) {
md.time_spec = uhd::time_spec_t(i, 0.0);
}
- streamer.send(buffers, spp, md, 1.0);
+ streamer->send(buffers, spp, md, 1.0);
}
const auto end_time = std::chrono::steady_clock::now();
@@ -141,164 +326,6 @@ void benchmark_send_packet_handler(
<< time_per_packet * 1e9 << " ns/packet\n";
}
-void benchmark_device3_rx_flow_ctrl(bool send_flow_control_packet)
-{
- // Arbitrary sizes
- constexpr uint32_t fc_window = 10000;
-
- mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR));
-
- xport->set_reuse_recv_memory(true);
- xport->set_reuse_send_memory(true);
-
- boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
- fc_cache->to_host = uhd::ntohx<uint32_t>;
- fc_cache->from_host = uhd::htonx<uint32_t>;
- fc_cache->pack = vrt::chdr::if_hdr_pack_be;
- fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
- fc_cache->xport = xport;
- fc_cache->interval = fc_window;
-
- // Create data buffer to pass to flow control function. Number of payload
- // words is arbitrary, just has to fit in the buffer.
- vrt::if_packet_info_t packet_info;
- packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA;
- packet_info.num_payload_words32 = 100;
- packet_info.num_payload_bytes = packet_info.num_payload_words32 * sizeof(uint32_t);
- packet_info.has_tsf = false;
-
- std::vector<uint32_t> recv_data(packet_info.num_payload_words32, 0);
- xport->push_back_recv_packet(packet_info, recv_data);
-
- auto recv_buffer = xport->get_recv_buff(1.0);
-
- // Run benchmark
- const auto start_time = std::chrono::steady_clock::now();
-
- constexpr size_t iterations = 1e7;
-
- for (size_t i = 0; i < iterations; i++) {
- fc_cache->total_bytes_consumed = send_flow_control_packet ? fc_window : 0;
- fc_cache->last_byte_count = 0;
-
- rx_flow_ctrl(fc_cache, recv_buffer);
- }
-
- const auto end_time = std::chrono::steady_clock::now();
- const std::chrono::duration<double> elapsed_time(end_time - start_time);
-
- std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n";
-}
-
-void benchmark_device3_handle_rx_flow_ctrl_ack()
-{
- // Arbitrary sizes
- constexpr uint32_t fc_window = 10000;
-
- mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR));
-
- xport->set_reuse_recv_memory(true);
- xport->set_reuse_send_memory(true);
-
- boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t());
- fc_cache->to_host = uhd::ntohx<uint32_t>;
- fc_cache->from_host = uhd::htonx<uint32_t>;
- fc_cache->pack = vrt::chdr::if_hdr_pack_be;
- fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
- fc_cache->xport = xport;
- fc_cache->interval = fc_window;
- fc_cache->total_bytes_consumed = 100;
-
- // Payload should contain packet count and byte count
- std::vector<uint32_t> payload_data;
- payload_data.push_back(fc_cache->to_host(10)); // packet count
- payload_data.push_back(fc_cache->to_host(100)); // byte count
-
- // Run benchmark
- const auto start_time = std::chrono::steady_clock::now();
- constexpr size_t iterations = 1e7;
-
- for (size_t i = 0; i < iterations; i++) {
- handle_rx_flowctrl_ack(fc_cache, payload_data.data());
- }
-
- const auto end_time = std::chrono::steady_clock::now();
- const std::chrono::duration<double> elapsed_time(end_time - start_time);
-
- std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n";
-}
-
-void benchmark_device3_tx_flow_ctrl(bool send_flow_control_packet)
-{
- // Arbitrary sizes
- constexpr uint32_t fc_window = 10000;
-
- mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR));
-
- xport->set_reuse_recv_memory(true);
-
- boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window));
-
- fc_cache->to_host = uhd::ntohx<uint32_t>;
- fc_cache->from_host = uhd::htonx<uint32_t>;
- fc_cache->pack = vrt::chdr::if_hdr_pack_be;
- fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
-
- xport->push_back_flow_ctrl_packet(
- vrt::if_packet_info_t::PACKET_TYPE_FC, 1 /*packet*/, fc_window /*bytes*/);
-
- // Run benchmark
- const auto start_time = std::chrono::steady_clock::now();
- constexpr size_t iterations = 1e7;
- managed_send_buffer::sptr send_buffer = xport->get_send_buff(0.0);
-
- for (size_t i = 0; i < iterations; i++) {
- fc_cache->byte_count = send_flow_control_packet ? fc_window : 0;
- fc_cache->last_byte_ack = 0;
-
- tx_flow_ctrl(fc_cache, xport, send_buffer);
- }
-
- const auto end_time = std::chrono::steady_clock::now();
- const std::chrono::duration<double> elapsed_time(end_time - start_time);
-
- std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n";
-}
-
-void benchmark_device3_tx_flow_ctrl_ack()
-{
- // Arbitrary sizes
- constexpr uint32_t fc_window = 10000;
-
- mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR));
-
- xport->set_reuse_send_memory(true);
-
- boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window));
-
- fc_cache->to_host = uhd::ntohx<uint32_t>;
- fc_cache->from_host = uhd::htonx<uint32_t>;
- fc_cache->pack = vrt::chdr::if_hdr_pack_be;
- fc_cache->unpack = vrt::chdr::if_hdr_unpack_be;
-
- // Run benchmark
- const auto start_time = std::chrono::steady_clock::now();
- constexpr size_t iterations = 1e7;
- uhd::sid_t send_sid;
-
- for (size_t i = 0; i < iterations; i++) {
- // Setup fc_cache to require an ack
- fc_cache->fc_received = true;
-
- tx_flow_ctrl_ack(fc_cache, xport, send_sid);
- }
-
- const auto end_time = std::chrono::steady_clock::now();
- const std::chrono::duration<double> elapsed_time(end_time - start_time);
-
- std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n";
-}
-
int UHD_SAFE_MAIN(int argc, char* argv[])
{
po::options_description desc("Allowed options");
@@ -320,63 +347,34 @@ int UHD_SAFE_MAIN(int argc, char* argv[])
}
const char* formats[] = {"sc16", "fc32", "fc64"};
- constexpr size_t rx_spp = 2000;
- constexpr size_t tx_spp = 1000;
+ constexpr size_t spp = 1000;
+ std::cout << "spp: " << spp << "\n";
std::cout << "----------------------------------------------------------\n";
- std::cout << "Benchmark of recv with no flow control and mock transport \n";
+ std::cout << "Benchmark of recv with mock link \n";
std::cout << "----------------------------------------------------------\n";
- std::cout << "spp: " << rx_spp << "\n";
for (size_t i = 0; i < std::extent<decltype(formats)>::value; i++) {
- benchmark_recv_packet_handler(rx_spp, formats[i]);
+ benchmark_recv_packet_handler(spp, formats[i]);
}
std::cout << "\n";
std::cout << "----------------------------------------------------------\n";
- std::cout << "Benchmark of send with no flow control and mock transport \n";
+ std::cout << "Benchmark of send with mock link \n";
std::cout << "----------------------------------------------------------\n";
- std::cout << "spp: " << tx_spp << "\n";
std::cout << "*** without timespec ***\n";
for (size_t i = 0; i < std::extent<decltype(formats)>::value; i++) {
- benchmark_send_packet_handler(tx_spp, formats[i], false);
+ benchmark_send_packet_handler(spp, formats[i], false);
}
std::cout << "\n";
std::cout << "*** with timespec ***\n";
for (size_t i = 0; i < std::extent<decltype(formats)>::value; i++) {
- benchmark_send_packet_handler(tx_spp, formats[i], true);
+ benchmark_send_packet_handler(spp, formats[i], true);
}
std::cout << "\n";
- std::cout << "----------------------------------------------------------\n";
- std::cout << " Benchmark of flow control functions with mock transport \n";
- std::cout << "----------------------------------------------------------\n";
- std::cout << "*** device3_tx_flow_ctrl with no flow control packet ***\n";
- benchmark_device3_tx_flow_ctrl(false);
- std::cout << "\n";
-
- std::cout << "*** device3_tx_flow_ctrl with flow control packet ***\n";
- benchmark_device3_tx_flow_ctrl(true);
- std::cout << "\n";
-
- std::cout << "*** device3_tx_flow_ctrl_ack ***\n";
- benchmark_device3_tx_flow_ctrl_ack();
- std::cout << "\n";
-
- std::cout << "*** device3_rx_flow_ctrl with no flow control packet ***\n";
- benchmark_device3_rx_flow_ctrl(false);
- std::cout << "\n";
-
- std::cout << "*** device3_rx_flow_ctrl with flow control packet ***\n";
- benchmark_device3_rx_flow_ctrl(true);
- std::cout << "\n";
-
- std::cout << "*** device3_handle_rx_flow_ctrl_ack ***\n";
- benchmark_device3_handle_rx_flow_ctrl_ack();
- std::cout << "\n";
-
return EXIT_SUCCESS;
}