diff options
Diffstat (limited to 'host/tests')
| -rw-r--r-- | host/tests/CMakeLists.txt | 11 | ||||
| -rw-r--r-- | host/tests/packet_handler_benchmark.cpp | 446 | 
2 files changed, 228 insertions, 229 deletions
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index 643efba34..ea069410b 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -59,10 +59,6 @@ set(test_sources      tx_streamer_test.cpp  ) -set(benchmark_sources -    packet_handler_benchmark.cpp -) -  #turn each test cpp file into an executable with an int main() function  add_definitions(-DBOOST_TEST_DYN_LINK -DBOOST_TEST_MAIN) @@ -121,7 +117,7 @@ macro(UHD_ADD_NONAPI_TEST)      get_filename_component(test_name ${test_TARGET} NAME_WE)      include_directories(${test_INCLUDE_DIRS})      add_executable(${test_name} ${test_TARGET} ${test_EXTRA_SOURCES}) -    target_link_libraries(${test_name} uhd ${Boost_LIBRARIES}) +    target_link_libraries(${test_name} uhd uhd_test ${Boost_LIBRARIES})      if(NOT ${test_NOAUTORUN})          UHD_ADD_TEST(${test_name} ${test_name})      endif(NOT ${test_NOAUTORUN}) @@ -190,6 +186,11 @@ UHD_ADD_NONAPI_TEST(  )  UHD_ADD_NONAPI_TEST( +    TARGET "packet_handler_benchmark.cpp" +    NOAUTORUN +) + +UHD_ADD_NONAPI_TEST(      TARGET "config_parser_test.cpp"      EXTRA_SOURCES ${CMAKE_SOURCE_DIR}/lib/utils/config_parser.cpp  ) diff --git a/host/tests/packet_handler_benchmark.cpp b/host/tests/packet_handler_benchmark.cpp index ec7eff517..6d4849831 100644 --- a/host/tests/packet_handler_benchmark.cpp +++ b/host/tests/packet_handler_benchmark.cpp @@ -18,6 +18,7 @@  #include <uhd/convert.hpp>  #include <uhd/transport/chdr.hpp>  #include <uhd/transport/zero_copy.hpp> +#include <uhd/transport/zero_copy_flow_ctrl.hpp>  #include <uhd/types/sid.hpp>  #include <uhd/utils/safe_main.hpp>  #include <uhd/utils/thread.hpp> @@ -27,34 +28,155 @@  namespace po = boost::program_options;  using namespace uhd::transport; -using namespace uhd::usrp; -void benchmark_recv_packet_handler(const size_t spp, const std::string& format) +static constexpr size_t MAX_HEADER_LEN = 16; +static constexpr size_t LINE_SIZE = 8; + +// +// Old device3 rx flow control cache and procedures +// +struct rx_fc_cache_t  { -    const size_t bpi        = uhd::convert::get_bytes_per_item(format); -    const size_t frame_size = bpi * spp + DEVICE3_RX_MAX_HDR_LEN; +    //! Flow control interval in bytes +    size_t interval = 0; +    //! Byte count at last flow control packet +    uint32_t last_byte_count = 0; +    //! This will wrap around, but that's OK, because math. +    uint32_t total_bytes_consumed = 0; +    //! This will wrap around, but that's OK, because math. +    uint32_t total_packets_consumed = 0; +    //! Sequence number of next flow control packet +    uint64_t seq_num = 0; +    uhd::sid_t sid; +    uhd::transport::zero_copy_if::sptr xport; +    std::function<uint32_t(uint32_t)> to_host; +    std::function<uint32_t(uint32_t)> from_host; +    std::function<void( +        const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        unpack; +    std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        pack; +}; + +inline bool rx_flow_ctrl( +    boost::shared_ptr<rx_fc_cache_t> fc_cache, uhd::transport::managed_buffer::sptr buff) +{ +    // If the caller supplied a buffer +    if (buff) { +        // Unpack the header +        uhd::transport::vrt::if_packet_info_t packet_info; +        packet_info.num_packet_words32 = buff->size() / sizeof(uint32_t); +        const uint32_t* pkt            = buff->cast<const uint32_t*>(); +        try { +            fc_cache->unpack(pkt, packet_info); +        } catch (const std::exception& ex) { +            // Log and ignore +            UHD_LOGGER_ERROR("RX FLOW CTRL") +                << "Error unpacking packet: " << ex.what() << std::endl; +            return true; +        } -    mock_zero_copy::sptr xport(new mock_zero_copy( -        vrt::if_packet_info_t::LINK_TYPE_CHDR, frame_size, frame_size)); +        // Update counters assuming the buffer is a consumed packet +        if (not packet_info.error) { +            const size_t bytes = 4 * (packet_info.num_header_words32 + packet_info.num_payload_words32); +            fc_cache->total_bytes_consumed += bytes; +            fc_cache->total_packets_consumed++; +        } +    } -    xport->set_reuse_recv_memory(true); +    // Just return if there is no need to send a flow control packet +    if (fc_cache->total_bytes_consumed - fc_cache->last_byte_count < fc_cache->interval) { +        return true; +    } -    sph::recv_packet_streamer streamer(spp); -    streamer.set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be); -    streamer.set_tick_rate(1.0); -    streamer.set_samp_rate(1.0); +    // Time to send a flow control packet. For the benchmark, we should never +    // reach this point. +    UHD_THROW_INVALID_CODE_PATH(); +} -    uhd::convert::id_type id; -    id.output_format = format; -    id.num_inputs    = 1; -    id.input_format  = "sc16_item32_be"; -    id.num_outputs   = 1; -    streamer.set_converter(id); +inline void handle_rx_flowctrl_ack( +    boost::shared_ptr<rx_fc_cache_t> /*fc_cache*/, const uint32_t* /*payload*/) +{ +    // For the benchmark, we should never reach this +    UHD_THROW_INVALID_CODE_PATH(); +} -    streamer.set_xport_chan_get_buff(0, -        [xport](double timeout) { return xport->get_recv_buff(timeout); }, -        false // flush -    ); +// +// Old device3 tx flow control cache and procedures +// +struct tx_fc_cache_t +{ +    uint32_t last_byte_ack = 0; +    uint32_t last_seq_ack = 0; +    uint32_t byte_count = 0; +    uint32_t pkt_count = 0; +    uint32_t window_size = 0; +    uint32_t fc_ack_seqnum = 0; +    bool fc_received = false; +    std::function<uint32_t(uint32_t)> to_host; +    std::function<uint32_t(uint32_t)> from_host; +    std::function<void( +        const uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        unpack; +    std::function<void(uint32_t* packet_buff, uhd::transport::vrt::if_packet_info_t&)> +        pack; +}; + +inline bool tx_flow_ctrl(boost::shared_ptr<tx_fc_cache_t> fc_cache, +    uhd::transport::zero_copy_if::sptr /*xport*/, +    uhd::transport::managed_buffer::sptr buff) +{ +    while (true) { +        // If there is space +        if (fc_cache->window_size - (fc_cache->byte_count - fc_cache->last_byte_ack) +            >= buff->size()) { +            // All is good - packet will be sent +            fc_cache->byte_count += buff->size(); +            // Round up to nearest word +            if (fc_cache->byte_count % LINE_SIZE) { +                fc_cache->byte_count += LINE_SIZE - (fc_cache->byte_count % LINE_SIZE); +            } +            fc_cache->pkt_count++; + +            // Just zero out the counts here to avoid actually tring to read flow +            // control packets in the benchmark +            fc_cache->byte_count = 0; +            fc_cache->last_byte_ack = 0; +            fc_cache->pkt_count = 0; + +            return true; +        } + +        // Look for a flow control message to update the space available in the +        // buffer. For the benchmark, we should never reach this point. +        UHD_THROW_INVALID_CODE_PATH(); +    } +    return false; +} + +inline void tx_flow_ctrl_ack(boost::shared_ptr<tx_fc_cache_t> fc_cache, +    uhd::transport::zero_copy_if::sptr /*send_xport*/, +    uhd::sid_t /*send_sid*/) +{ +    if (not fc_cache->fc_received) { +        return; +    } + +    // Time to send a flow control ACK packet. For the benchmark, we should +    // never reach this point. +    UHD_THROW_INVALID_CODE_PATH(); +} + +// +// Benchmark functions +// +void benchmark_recv_packet_handler(const size_t spp, const std::string& format) +{ +    const size_t bpi        = uhd::convert::get_bytes_per_item(format); +    const size_t frame_size = bpi * spp + MAX_HEADER_LEN; + +    mock_zero_copy::sptr xport(new mock_zero_copy( +        vrt::if_packet_info_t::LINK_TYPE_CHDR, frame_size, frame_size));      // Create packet for packet handler to read      vrt::if_packet_info_t packet_info; @@ -66,6 +188,47 @@ void benchmark_recv_packet_handler(const size_t spp, const std::string& format)      std::vector<uint32_t> recv_data(spp, 0);      xport->push_back_recv_packet(packet_info, recv_data); +    xport->set_reuse_recv_memory(true); + +    // Configure xport flow control +    boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); +    fc_cache->to_host   = uhd::ntohx<uint32_t>; +    fc_cache->from_host = uhd::htonx<uint32_t>; +    fc_cache->pack      = vrt::chdr::if_hdr_pack_be; +    fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be; +    fc_cache->xport     = xport; +    fc_cache->interval  = std::numeric_limits<std::size_t>::max(); + +    auto zero_copy_xport = zero_copy_flow_ctrl::make( +        xport, 0, [fc_cache](managed_buffer::sptr buff) { +            return rx_flow_ctrl(fc_cache, buff); +        }); + +    // Create streamer +    auto streamer = boost::make_shared<sph::recv_packet_streamer>(spp); +    streamer->set_tick_rate(1.0); +    streamer->set_samp_rate(1.0); + +    // Configure streamer xport +    streamer->set_vrt_unpacker(&vrt::chdr::if_hdr_unpack_be); +    streamer->set_xport_chan_get_buff(0, +        [zero_copy_xport](double timeout) { return zero_copy_xport->get_recv_buff(timeout); }, +        false // flush +    ); + +    // Configure flow control ack +    streamer->set_xport_handle_flowctrl_ack( +        0, [fc_cache](const uint32_t* payload) { +            handle_rx_flowctrl_ack(fc_cache, payload); +        }); + +    // Configure converter +    uhd::convert::id_type id; +    id.output_format = format; +    id.num_inputs    = 1; +    id.input_format  = "sc16_item32_be"; +    id.num_outputs   = 1; +    streamer->set_converter(id);      // Allocate buffer      std::vector<uint8_t> buffer(spp * bpi); @@ -78,7 +241,7 @@ void benchmark_recv_packet_handler(const size_t spp, const std::string& format)      const size_t iterations = 1e7;      for (size_t i = 0; i < iterations; i++) { -        streamer.recv(buffers, spp, md, 1.0, true); +        streamer->recv(buffers, spp, md, 1.0, true);      }      const auto end_time = std::chrono::steady_clock::now(); @@ -93,26 +256,48 @@ void benchmark_send_packet_handler(      const size_t spp, const std::string& format, bool use_time_spec)  {      const size_t bpi        = uhd::convert::get_bytes_per_item(format); -    const size_t frame_size = bpi * spp + DEVICE3_TX_MAX_HDR_LEN; +    const size_t frame_size = bpi * spp + MAX_HEADER_LEN;      mock_zero_copy::sptr xport(new mock_zero_copy(          vrt::if_packet_info_t::LINK_TYPE_CHDR, frame_size, frame_size));      xport->set_reuse_send_memory(true); -    sph::send_packet_streamer streamer(spp); -    streamer.set_vrt_packer(&vrt::chdr::if_hdr_pack_be); +    // Configure flow control +    boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t()); +    fc_cache->to_host   = uhd::ntohx<uint32_t>; +    fc_cache->from_host = uhd::htonx<uint32_t>; +    fc_cache->pack      = vrt::chdr::if_hdr_pack_be; +    fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be; +    fc_cache->window_size = UINT32_MAX; + +    auto zero_copy_xport = zero_copy_flow_ctrl::make(xport, +        [fc_cache, xport](managed_buffer::sptr buff) { +            return tx_flow_ctrl(fc_cache, xport, buff); +        }, +        0); +    // Create streamer +    auto streamer = boost::make_shared<sph::send_packet_streamer>(spp); +    streamer->set_vrt_packer(&vrt::chdr::if_hdr_pack_be); + +    // Configure converter      uhd::convert::id_type id;      id.input_format  = format;      id.num_inputs    = 1;      id.output_format = "sc16_item32_be";      id.num_outputs   = 1; -    streamer.set_converter(id); -    streamer.set_enable_trailer(false); +    streamer->set_converter(id); +    streamer->set_enable_trailer(false); + +    // Configure streamer xport +    streamer->set_xport_chan_get_buff( +        0, [zero_copy_xport](double timeout) { return zero_copy_xport->get_send_buff(timeout); }); -    streamer.set_xport_chan_get_buff( -        0, [xport](double timeout) { return xport->get_send_buff(timeout); }); +    // Configure flow control ack +    streamer->set_xport_chan_post_send_cb(0, [fc_cache, zero_copy_xport]() { +        tx_flow_ctrl_ack(fc_cache, zero_copy_xport, 0); +    });      // Allocate buffer      std::vector<uint8_t> buffer(spp * bpi); @@ -130,7 +315,7 @@ void benchmark_send_packet_handler(          if (use_time_spec) {              md.time_spec = uhd::time_spec_t(i, 0.0);          } -        streamer.send(buffers, spp, md, 1.0); +        streamer->send(buffers, spp, md, 1.0);      }      const auto end_time = std::chrono::steady_clock::now(); @@ -141,164 +326,6 @@ void benchmark_send_packet_handler(                << time_per_packet * 1e9 << " ns/packet\n";  } -void benchmark_device3_rx_flow_ctrl(bool send_flow_control_packet) -{ -    // Arbitrary sizes -    constexpr uint32_t fc_window = 10000; - -    mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR)); - -    xport->set_reuse_recv_memory(true); -    xport->set_reuse_send_memory(true); - -    boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); -    fc_cache->to_host   = uhd::ntohx<uint32_t>; -    fc_cache->from_host = uhd::htonx<uint32_t>; -    fc_cache->pack      = vrt::chdr::if_hdr_pack_be; -    fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be; -    fc_cache->xport     = xport; -    fc_cache->interval  = fc_window; - -    // Create data buffer to pass to flow control function. Number of payload -    // words is arbitrary, just has to fit in the buffer. -    vrt::if_packet_info_t packet_info; -    packet_info.packet_type         = vrt::if_packet_info_t::PACKET_TYPE_DATA; -    packet_info.num_payload_words32 = 100; -    packet_info.num_payload_bytes   = packet_info.num_payload_words32 * sizeof(uint32_t); -    packet_info.has_tsf             = false; - -    std::vector<uint32_t> recv_data(packet_info.num_payload_words32, 0); -    xport->push_back_recv_packet(packet_info, recv_data); - -    auto recv_buffer = xport->get_recv_buff(1.0); - -    // Run benchmark -    const auto start_time = std::chrono::steady_clock::now(); - -    constexpr size_t iterations = 1e7; - -    for (size_t i = 0; i < iterations; i++) { -        fc_cache->total_bytes_consumed = send_flow_control_packet ? fc_window : 0; -        fc_cache->last_byte_count      = 0; - -        rx_flow_ctrl(fc_cache, recv_buffer); -    } - -    const auto end_time = std::chrono::steady_clock::now(); -    const std::chrono::duration<double> elapsed_time(end_time - start_time); - -    std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n"; -} - -void benchmark_device3_handle_rx_flow_ctrl_ack() -{ -    // Arbitrary sizes -    constexpr uint32_t fc_window = 10000; - -    mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR)); - -    xport->set_reuse_recv_memory(true); -    xport->set_reuse_send_memory(true); - -    boost::shared_ptr<rx_fc_cache_t> fc_cache(new rx_fc_cache_t()); -    fc_cache->to_host              = uhd::ntohx<uint32_t>; -    fc_cache->from_host            = uhd::htonx<uint32_t>; -    fc_cache->pack                 = vrt::chdr::if_hdr_pack_be; -    fc_cache->unpack               = vrt::chdr::if_hdr_unpack_be; -    fc_cache->xport                = xport; -    fc_cache->interval             = fc_window; -    fc_cache->total_bytes_consumed = 100; - -    // Payload should contain packet count and byte count -    std::vector<uint32_t> payload_data; -    payload_data.push_back(fc_cache->to_host(10)); // packet count -    payload_data.push_back(fc_cache->to_host(100)); // byte count - -    // Run benchmark -    const auto start_time       = std::chrono::steady_clock::now(); -    constexpr size_t iterations = 1e7; - -    for (size_t i = 0; i < iterations; i++) { -        handle_rx_flowctrl_ack(fc_cache, payload_data.data()); -    } - -    const auto end_time = std::chrono::steady_clock::now(); -    const std::chrono::duration<double> elapsed_time(end_time - start_time); - -    std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n"; -} - -void benchmark_device3_tx_flow_ctrl(bool send_flow_control_packet) -{ -    // Arbitrary sizes -    constexpr uint32_t fc_window = 10000; - -    mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR)); - -    xport->set_reuse_recv_memory(true); - -    boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); - -    fc_cache->to_host   = uhd::ntohx<uint32_t>; -    fc_cache->from_host = uhd::htonx<uint32_t>; -    fc_cache->pack      = vrt::chdr::if_hdr_pack_be; -    fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be; - -    xport->push_back_flow_ctrl_packet( -        vrt::if_packet_info_t::PACKET_TYPE_FC, 1 /*packet*/, fc_window /*bytes*/); - -    // Run benchmark -    const auto start_time                 = std::chrono::steady_clock::now(); -    constexpr size_t iterations           = 1e7; -    managed_send_buffer::sptr send_buffer = xport->get_send_buff(0.0); - -    for (size_t i = 0; i < iterations; i++) { -        fc_cache->byte_count    = send_flow_control_packet ? fc_window : 0; -        fc_cache->last_byte_ack = 0; - -        tx_flow_ctrl(fc_cache, xport, send_buffer); -    } - -    const auto end_time = std::chrono::steady_clock::now(); -    const std::chrono::duration<double> elapsed_time(end_time - start_time); - -    std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n"; -} - -void benchmark_device3_tx_flow_ctrl_ack() -{ -    // Arbitrary sizes -    constexpr uint32_t fc_window = 10000; - -    mock_zero_copy::sptr xport(new mock_zero_copy(vrt::if_packet_info_t::LINK_TYPE_CHDR)); - -    xport->set_reuse_send_memory(true); - -    boost::shared_ptr<tx_fc_cache_t> fc_cache(new tx_fc_cache_t(fc_window)); - -    fc_cache->to_host   = uhd::ntohx<uint32_t>; -    fc_cache->from_host = uhd::htonx<uint32_t>; -    fc_cache->pack      = vrt::chdr::if_hdr_pack_be; -    fc_cache->unpack    = vrt::chdr::if_hdr_unpack_be; - -    // Run benchmark -    const auto start_time       = std::chrono::steady_clock::now(); -    constexpr size_t iterations = 1e7; -    uhd::sid_t send_sid; - -    for (size_t i = 0; i < iterations; i++) { -        // Setup fc_cache to require an ack -        fc_cache->fc_received = true; - -        tx_flow_ctrl_ack(fc_cache, xport, send_sid); -    } - -    const auto end_time = std::chrono::steady_clock::now(); -    const std::chrono::duration<double> elapsed_time(end_time - start_time); - -    std::cout << elapsed_time.count() / iterations * 1e9 << " ns per call\n"; -} -  int UHD_SAFE_MAIN(int argc, char* argv[])  {      po::options_description desc("Allowed options"); @@ -320,63 +347,34 @@ int UHD_SAFE_MAIN(int argc, char* argv[])      }      const char* formats[]   = {"sc16", "fc32", "fc64"}; -    constexpr size_t rx_spp = 2000; -    constexpr size_t tx_spp = 1000; +    constexpr size_t spp = 1000; +    std::cout << "spp: " << spp << "\n";      std::cout << "----------------------------------------------------------\n"; -    std::cout << "Benchmark of recv with no flow control and mock transport \n"; +    std::cout << "Benchmark of recv with mock link                          \n";      std::cout << "----------------------------------------------------------\n"; -    std::cout << "spp: " << rx_spp << "\n";      for (size_t i = 0; i < std::extent<decltype(formats)>::value; i++) { -        benchmark_recv_packet_handler(rx_spp, formats[i]); +        benchmark_recv_packet_handler(spp, formats[i]);      }      std::cout << "\n";      std::cout << "----------------------------------------------------------\n"; -    std::cout << "Benchmark of send with no flow control and mock transport \n"; +    std::cout << "Benchmark of send with mock link                          \n";      std::cout << "----------------------------------------------------------\n"; -    std::cout << "spp: " << tx_spp << "\n";      std::cout << "*** without timespec ***\n";      for (size_t i = 0; i < std::extent<decltype(formats)>::value; i++) { -        benchmark_send_packet_handler(tx_spp, formats[i], false); +        benchmark_send_packet_handler(spp, formats[i], false);      }      std::cout << "\n";      std::cout << "*** with timespec ***\n";      for (size_t i = 0; i < std::extent<decltype(formats)>::value; i++) { -        benchmark_send_packet_handler(tx_spp, formats[i], true); +        benchmark_send_packet_handler(spp, formats[i], true);      }      std::cout << "\n"; -    std::cout << "----------------------------------------------------------\n"; -    std::cout << "  Benchmark of flow control functions with mock transport \n"; -    std::cout << "----------------------------------------------------------\n"; -    std::cout << "*** device3_tx_flow_ctrl with no flow control packet ***\n"; -    benchmark_device3_tx_flow_ctrl(false); -    std::cout << "\n"; - -    std::cout << "*** device3_tx_flow_ctrl with flow control packet ***\n"; -    benchmark_device3_tx_flow_ctrl(true); -    std::cout << "\n"; - -    std::cout << "*** device3_tx_flow_ctrl_ack ***\n"; -    benchmark_device3_tx_flow_ctrl_ack(); -    std::cout << "\n"; - -    std::cout << "*** device3_rx_flow_ctrl with no flow control packet ***\n"; -    benchmark_device3_rx_flow_ctrl(false); -    std::cout << "\n"; - -    std::cout << "*** device3_rx_flow_ctrl with flow control packet ***\n"; -    benchmark_device3_rx_flow_ctrl(true); -    std::cout << "\n"; - -    std::cout << "*** device3_handle_rx_flow_ctrl_ack ***\n"; -    benchmark_device3_handle_rx_flow_ctrl_ack(); -    std::cout << "\n"; -      return EXIT_SUCCESS;  }  | 
