diff options
Diffstat (limited to 'host/tests')
| -rw-r--r-- | host/tests/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | host/tests/common/mock_link.hpp | 8 | ||||
| -rw-r--r-- | host/tests/rfnoc_chdr_test.cpp | 46 | ||||
| -rw-r--r-- | host/tests/rx_streamer_test.cpp | 744 | ||||
| -rw-r--r-- | host/tests/tx_streamer_test.cpp | 393 | 
5 files changed, 1193 insertions, 0 deletions
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index d6ad6d777..1cdb42b96 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -55,6 +55,8 @@ set(test_sources      fe_conn_test.cpp      rfnoc_node_test.cpp      link_test.cpp +    rx_streamer_test.cpp +    tx_streamer_test.cpp  )  set(benchmark_sources diff --git a/host/tests/common/mock_link.hpp b/host/tests/common/mock_link.hpp index 34ea15540..73a65916c 100644 --- a/host/tests/common/mock_link.hpp +++ b/host/tests/common/mock_link.hpp @@ -94,6 +94,14 @@ public:      }      /*! +     * Return the number of packets stored in the mock link. +     */ +    size_t get_num_packets() const +    { +        return _tx_mems.size(); +    } + +    /*!       * Retrieve the contents of a packet sent by the link. The link       * stores packets in a queue in the order they were sent.       */ diff --git a/host/tests/rfnoc_chdr_test.cpp b/host/tests/rfnoc_chdr_test.cpp index 417ed2f96..1c63d5976 100644 --- a/host/tests/rfnoc_chdr_test.cpp +++ b/host/tests/rfnoc_chdr_test.cpp @@ -222,3 +222,49 @@ BOOST_AUTO_TEST_CASE(chdr_strc_packet_no_swap_64)          std::cout << pyld.to_string();      }  } + +BOOST_AUTO_TEST_CASE(chdr_generic_packet_calculate_pyld_offset_64) +{ +    // Check calculation without timestamp +    auto test_pyld_offset = [](chdr_packet::uptr& pkt, +        const packet_type_t pkt_type, +        const size_t num_mdata) +    { +        uint64_t buff[MAX_BUF_SIZE_WORDS]; +        chdr_header header; +        header.set_pkt_type(pkt_type); +        header.set_num_mdata(num_mdata); + +        pkt->refresh(reinterpret_cast<void*>(buff), header, 0); + +        const size_t pyld_offset = pkt->calculate_payload_offset( +            pkt_type, num_mdata); + +        void* pyld_ptr = pkt->get_payload_ptr(); + +        const size_t non_pyld_bytes = static_cast<size_t>( +            reinterpret_cast<uint8_t*>(pyld_ptr) - +            reinterpret_cast<uint8_t*>(buff)); + +        BOOST_CHECK(pyld_offset == non_pyld_bytes); +    }; + +    { +        chdr_packet::uptr pkt = chdr64_be_factory.make_generic(); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2); +    } +    { +        chdr_packet::uptr pkt = chdr256_be_factory.make_generic(); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_NO_TS, 2); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 0); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 1); +        test_pyld_offset(pkt, PKT_TYPE_DATA_WITH_TS, 2); +    } +} diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp new file mode 100644 index 000000000..cd4daf569 --- /dev/null +++ b/host/tests/rx_streamer_test.cpp @@ -0,0 +1,744 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Contents of mock packet header + */ +struct mock_header_t +{ +    bool eob             = false; +    bool has_tsf         = false; +    uint64_t tsf         = 0; +    size_t payload_bytes = 0; +    bool ignore_seq      = true; +    size_t seq_num       = 0; +}; + +/*! + * Mock rx data xport which doesn't use I/O service, and just interacts with + * the link directly. + */ +class mock_rx_data_xport +{ +public: +    using uptr   = std::unique_ptr<mock_rx_data_xport>; +    using buff_t = uhd::transport::frame_buff; + +    //! Values extracted from received RX data packets +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +        const void* payload  = nullptr; +    }; + +    mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {} + +    std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff( +        const int32_t timeout_ms) +    { +        frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms); +        mock_header_t header  = *(reinterpret_cast<mock_header_t*>(buff->data())); + +        packet_info_t info; +        info.eob           = header.eob; +        info.has_tsf       = header.has_tsf; +        info.tsf           = header.tsf; +        info.payload_bytes = header.payload_bytes; +        info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t); + +        const uint8_t* pkt_end = +            reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); +        const size_t pyld_pkt_len = +            pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + +        if (pyld_pkt_len < info.payload_bytes) { +            _recv_link->release_recv_buff(std::move(buff)); +            throw uhd::value_error("Bad header or invalid packet length."); +        } + +        const bool seq_match = header.seq_num == _seq_num; +        const bool seq_error = !header.ignore_seq && !seq_match; +        _seq_num             = header.seq_num + 1; + +        return std::make_tuple(std::move(buff), info, seq_error); +    } + +    void release_recv_buff(frame_buff::uptr buff) +    { +        _recv_link->release_recv_buff(std::move(buff)); +    } + +    size_t get_max_payload_size() const +    { +        return _recv_link->get_recv_frame_size() - sizeof(packet_info_t); +    } + +private: +    mock_recv_link::sptr _recv_link; +    size_t _seq_num = 0; +}; + +/*! + * Mock rx streamer for testing + */ +class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport> +{ +public: +    mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) +        : rx_streamer_impl(num_chans, stream_args) +    { +    } + +    void issue_stream_cmd(const stream_cmd_t&) {} + +    void set_tick_rate(double rate) +    { +        rx_streamer_impl::set_tick_rate(rate); +    } + +    void set_samp_rate(double rate) +    { +        rx_streamer_impl::set_samp_rate(rate); +    } + +    void set_scale_factor(const size_t chan, const double scale_factor) +    { +        rx_streamer_impl::set_scale_factor(chan, scale_factor); +    } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using rx_streamer = rx_streamer_impl<mock_rx_data_xport>; + +static const double TICK_RATE    = 100e6; +static const double SAMP_RATE    = 10e6; +static const size_t FRAME_SIZE   = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_recv_link::sptr> make_links(const size_t num) +{ +    const mock_recv_link::link_params params = {FRAME_SIZE, 1}; + +    std::vector<mock_recv_link::sptr> links; + +    for (size_t i = 0; i < num; i++) { +        links.push_back(std::make_shared<mock_recv_link>(params)); +    } + +    return links; +} + +static boost::shared_ptr<mock_rx_streamer> make_rx_streamer( +    std::vector<mock_recv_link::sptr> recv_links, +    const std::string& host_format, +    const std::string& otw_format = "sc16") +{ +    const uhd::stream_args_t stream_args(host_format, otw_format); +    auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args); +    streamer->set_tick_rate(TICK_RATE); +    streamer->set_samp_rate(SAMP_RATE); + +    for (size_t i = 0; i < recv_links.size(); i++) { +        mock_rx_data_xport::uptr xport( +            std::make_unique<mock_rx_data_xport>(recv_links[i])); + +        streamer->set_scale_factor(i, SCALE_FACTOR); +        streamer->connect_channel(i, std::move(xport)); +    } + +    return streamer; +} + +static void push_back_recv_packet(mock_recv_link::sptr recv_link, +    mock_header_t header, +    size_t num_samps, +    uint16_t start_data = 0) +{ +    // Allocate buffer +    const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>); +    const size_t buff_len   = sizeof(header) + pyld_bytes; +    boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + +    // Write header to buffer +    header.payload_bytes                            = pyld_bytes; +    *(reinterpret_cast<mock_header_t*>(data.get())) = header; + +    // Write data to buffer +    auto data_ptr = +        reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header)); + +    for (size_t i = 0; i < num_samps; i++) { +        uint16_t val = (start_data + i) * 2; +        data_ptr[i]  = std::complex<uint16_t>(val, val + 1); +    } + +    // Push back buffer for link to recv +    recv_link->push_back_recv_packet(data, buff_len); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 5; +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        const bool even_iteration = (i % 2 == 0); +        const bool odd_iteration  = (i % 2 != 0); +        mock_header_t header; +        header.eob     = even_iteration; +        header.has_tsf = odd_iteration; +        header.tsf     = i; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); +        BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + +        for (size_t j = 0; j < num_samps; j++) { +            const auto value = +                std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); +            BOOST_CHECK_EQUAL(value, buff[j]); +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet) +{ +    const size_t NUM_BUFFS_TO_TEST = 5; +    const std::string format("fc64"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t spp       = streamer->get_max_num_samps(); +    const size_t num_samps = spp * 4; +    std::vector<std::complex<double>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { +        mock_header_t header; +        header.eob     = false; +        header.has_tsf = true; +        header.tsf     = i; + +        size_t samps_written = 0; +        while (samps_written < num_samps) { +            size_t samps_to_write = std::min(num_samps - samps_written, spp); +            push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written); +            samps_written += samps_to_write; +        } + +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, false); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, true); +        BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + +        for (size_t j = 0; j < num_samps; j++) { +            const auto value = +                std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); +            BOOST_CHECK_EQUAL(value, buff[j]); +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob) +{ +    // EOB should terminate a multi-packet recv, test that it does +    const std::string format("sc16"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_packets = 4; +    const size_t spp         = streamer->get_max_num_samps(); +    const size_t num_samps   = spp * num_packets; +    std::vector<std::complex<double>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    // Queue 4 packets, with eob set in every other packet +    for (size_t i = 0; i < num_packets; i++) { +        mock_header_t header; +        header.has_tsf = false; +        header.eob     = (i % 2) != 0; +        push_back_recv_packet(recv_links[0], header, spp); +    } + +    // Now call recv and check that eob terminates a recv call +    for (size_t i = 0; i < num_packets / 2; i++) { +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, spp * 2); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, true); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, false); +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 5; +    const std::string format("sc16"); + +    const size_t num_chans = 2; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; + +    std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        const bool even_iteration = (i % 2 == 0); +        const bool odd_iteration  = (i % 2 != 0); +        mock_header_t header; +        header.eob     = even_iteration; +        header.has_tsf = odd_iteration; +        header.tsf     = i; + +        size_t samps_pushed = 0; +        for (size_t ch = 0; ch < num_chans; ch++) { +            push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed); +            samps_pushed += num_samps; +        } + +        std::cout << "receiving packet " << i << std::endl; + +        size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); +        BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + +        size_t samps_checked = 0; +        for (size_t ch = 0; ch < num_chans; ch++) { +            for (size_t samp = 0; samp < num_samps; samp++) { +                const size_t n   = samps_checked + samp; +                const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1)); +                BOOST_CHECK_EQUAL(value, buffer[ch][samp]); +            } +            samps_checked += num_samps; +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment) +{ +    const size_t NUM_PKTS_TO_TEST = 5; +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    // Push back five packets, then read them 1/4 of a packet at a time +    const size_t spp              = streamer->get_max_num_samps(); +    const size_t reads_per_packet = 4; +    const size_t num_samps        = spp / reads_per_packet; +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        mock_header_t header; +        header.eob     = true; +        header.has_tsf = true; +        header.tsf     = 0; +        push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet); +    } + +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        std::cout << "receiving packet " << i << std::endl; + +        size_t total_samps_read = 0; +        for (size_t j = 0; j < reads_per_packet; j++) { +            size_t num_samps_ret = +                streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + +            BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +            BOOST_CHECK_EQUAL(metadata.has_time_spec, true); +            BOOST_CHECK_EQUAL(metadata.end_of_burst, true); +            BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1); +            BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read); + +            const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE); +            const size_t expected_ticks   = ticks_per_sample * total_samps_read; +            BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks); + +            for (size_t samp = 0; samp < num_samps; samp++) { +                const size_t pkt_idx = samp + total_samps_read; +                const auto value     = std::complex<float>( +                    (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR); +                BOOST_CHECK_EQUAL(value, buff[samp]); +            } + +            total_samps_read += num_samps_ret; +        } +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_seq_error) +{ +    // Test that when we get a sequence error the error is returned in the +    // metadata with a time spec that corresponds to the time spec of the +    // last sample in the previous packet plus one sample clock. Test that +    // the packet that causes the sequence error is not discarded. +    const size_t NUM_PKTS_TO_TEST = 2; +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; +    size_t seq_num = 0; +    size_t tsf     = 0; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        mock_header_t header; +        header.eob        = false; +        header.has_tsf    = true; +        header.ignore_seq = false; + +        // Push back three packets but skip a seq_num after the second +        header.seq_num = seq_num++; +        header.tsf     = tsf; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        tsf += num_samps; +        header.seq_num = seq_num++; +        header.tsf     = tsf; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        seq_num++; // dropped packet +        tsf += num_samps; + +        header.seq_num = seq_num++; +        header.tsf     = tsf; +        push_back_recv_packet(recv_links[0], header, num_samps); + +        // First two reads should succeed +        size_t num_samps_ret = +            streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + +        num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        size_t prev_tsf     = metadata.time_spec.to_ticks(TICK_RATE); +        size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE); + +        // Third read should be a sequence error +        num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, 0); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +        BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); +        size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE); +        BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf); + +        // Next read should succeed +        num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +        BOOST_CHECK_EQUAL(metadata.out_of_sequence, false); +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_bad_packet) +{ +    // Test that when we receive a packet with invalid chdr header or length +    // the streamer returns the correct error in meatadata. +    auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) { +        mock_header_t header; +        header.payload_bytes = 1000; + +        // Allocate a buffer that is too small for the payload +        const size_t buff_len = 100; +        boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + +        // Write header to buffer +        *(reinterpret_cast<mock_header_t*>(data.get())) = header; + +        // Push back buffer for link to recv +        recv_link->push_back_recv_packet(data, buff_len); +    }; + +    const std::string format("fc32"); + +    auto recv_links = make_links(1); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 20; +    std::vector<std::complex<float>> buff(num_samps); +    uhd::rx_metadata_t metadata; + +    mock_header_t header; + +    // Push back a regular packet +    push_back_recv_packet(recv_links[0], header, num_samps); + +    // Push back a bad packet +    push_back_bad_packet(recv_links[0]); + +    // Push back another regular packet +    push_back_recv_packet(recv_links[0], header, num_samps); + +    // First read should succeed +    size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + +    // Second read should be an error +    num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET); + +    // Third read should succeed +    num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf) +{ +    // Test that we can receive packets without tsf. Start by pushing +    // a packet with a tsf followed by a few packets without. +    const size_t NUM_PKTS_TO_TEST = 6; +    const std::string format("fc64"); + +    const size_t num_chans = 10; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 21; + +    std::vector<std::vector<std::complex<double>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    uhd::rx_metadata_t metadata; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        mock_header_t header; +        header.eob     = (i == NUM_PKTS_TO_TEST - 1); +        header.has_tsf = (i == 0); +        header.tsf     = 500; + +        for (size_t ch = 0; ch < num_chans; ch++) { +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } + +        size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + +        BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +        BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1); +        BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0); +    } +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error) +{ +    // Test that the streamer handles dropped packets correctly by injecting +    // a sequence error in one channel. The streamer should discard +    // corresponding packets from all other channels. +    const std::string format("fc64"); + +    const size_t num_chans = 100; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 99; + +    std::vector<std::vector<std::complex<double>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    for (size_t ch = 0; ch < num_chans; ch++) { +        mock_header_t header; +        header.eob        = false; +        header.has_tsf    = true; +        header.tsf        = 0; +        header.ignore_seq = false; +        header.seq_num    = 0; + +        // Drop a packet from an arbitrary channel right at the start +        if (ch != num_chans / 2) { +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } + +        // Add a regular packet to check the streamer drops the first +        header.seq_num++; +        header.tsf++; +        push_back_recv_packet(recv_links[ch], header, num_samps); + +        // Drop a packet from the first channel +        header.seq_num++; +        header.tsf++; +        if (ch != 0) { +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } + +        // Add a regular packet +        header.seq_num++; +        header.tsf++; +        push_back_recv_packet(recv_links[ch], header, num_samps); + +        // Drop a few packets from the last channel +        for (size_t j = 0; j < 10; j++) { +            header.seq_num++; +            header.tsf++; +            if (ch != num_chans - 1) { +                push_back_recv_packet(recv_links[ch], header, num_samps); +            } +        } + +        // Add a regular packet +        header.seq_num++; +        header.tsf++; +        push_back_recv_packet(recv_links[ch], header, num_samps); +    } + +    uhd::rx_metadata_t metadata; + +    // First recv should result in error +    size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +    BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + +    // Packet with tsf == 1 should be returned next +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1); + +    // Next recv should result in error +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +    BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + +    // Packet with tsf == 3 should be returned next +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3); + +    // Next recv should result in error +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); +    BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + +    // Packet with tsf == 14 should be returned next +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14); +} + +BOOST_AUTO_TEST_CASE(test_recv_alignment_error) +{ +    // Test that the alignment procedure returns an alignment error if it can't +    // time align packets. +    const std::string format("fc64"); + +    const size_t num_chans = 4; + +    auto recv_links = make_links(num_chans); +    auto streamer   = make_rx_streamer(recv_links, format); + +    const size_t num_samps = 2; + +    std::vector<std::vector<std::complex<double>>> buffer(num_chans); +    std::vector<void*> buffers; +    for (size_t i = 0; i < num_chans; i++) { +        buffer[i].resize(num_samps); +        buffers.push_back(&buffer[i].front()); +    } + +    uhd::rx_metadata_t metadata; + +    mock_header_t header; +    header.eob     = true; +    header.has_tsf = true; +    header.tsf     = 500; + +    for (size_t ch = 0; ch < num_chans; ch++) { +        push_back_recv_packet(recv_links[ch], header, num_samps); +    } + +    size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + +    BOOST_CHECK_EQUAL(num_samps_ret, num_samps); +    BOOST_CHECK_EQUAL(metadata.end_of_burst, true); +    BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + +    for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) { +        header.tsf = header.tsf + num_samps; +        for (size_t ch = 0; ch < num_chans; ch++) { +            if (ch == num_chans - 1) { +                // Misalign this time stamp +                header.tsf += 1; +            } +            push_back_recv_packet(recv_links[ch], header, num_samps); +        } +    } + +    num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); +    BOOST_CHECK_EQUAL(num_samps_ret, 0); +    BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT); +} diff --git a/host/tests/tx_streamer_test.cpp b/host/tests/tx_streamer_test.cpp new file mode 100644 index 000000000..cb07cffad --- /dev/null +++ b/host/tests/tx_streamer_test.cpp @@ -0,0 +1,393 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/tx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Mock tx data xport which doesn't use I/O service, and just interacts with + * the link directly. Transport copies packet info directly into the frame + * buffer. + */ +class mock_tx_data_xport +{ +public: +    using uptr   = std::unique_ptr<mock_tx_data_xport>; +    using buff_t = uhd::transport::frame_buff; + +    struct packet_info_t +    { +        bool eob             = false; +        bool has_tsf         = false; +        uint64_t tsf         = 0; +        size_t payload_bytes = 0; +    }; + +    mock_tx_data_xport(mock_send_link::sptr send_link) : _send_link(send_link) {} + +    buff_t::uptr get_send_buff(const int32_t timeout_ms) +    { +        return _send_link->get_send_buff(timeout_ms); +    } + +    std::pair<void*, size_t> write_packet_header( +        buff_t::uptr& buff, const packet_info_t& info) +    { +        uint8_t* data                             = static_cast<uint8_t*>(buff->data()); +        *(reinterpret_cast<packet_info_t*>(data)) = info; +        return std::make_pair(data + sizeof(info), sizeof(info) + info.payload_bytes); +    } + +    void release_send_buff(buff_t::uptr buff) +    { +        _send_link->release_send_buff(std::move(buff)); +    } + +    size_t get_max_payload_size() const +    { +        return _send_link->get_send_frame_size() - sizeof(packet_info_t); +        ; +    } + +private: +    mock_send_link::sptr _send_link; +}; + +/*! + * Mock tx streamer for testing + */ +class mock_tx_streamer : public tx_streamer_impl<mock_tx_data_xport> +{ +public: +    mock_tx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) +        : tx_streamer_impl(num_chans, stream_args) +    { +    } + +    void set_tick_rate(double rate) +    { +        tx_streamer_impl::set_tick_rate(rate); +    } + +    void set_samp_rate(double rate) +    { +        tx_streamer_impl::set_samp_rate(rate); +    } + +    void set_scale_factor(const size_t chan, const double scale_factor) +    { +        tx_streamer_impl::set_scale_factor(chan, scale_factor); +    } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using tx_streamer = tx_streamer_impl<mock_tx_data_xport>; + +static const double TICK_RATE    = 100e6; +static const double SAMP_RATE    = 10e6; +static const size_t FRAME_SIZE   = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_send_link::sptr> make_links(const size_t num) +{ +    const mock_send_link::link_params params = {FRAME_SIZE, 1}; + +    std::vector<mock_send_link::sptr> links; + +    for (size_t i = 0; i < num; i++) { +        links.push_back(std::make_shared<mock_send_link>(params)); +    } + +    return links; +} + +static boost::shared_ptr<mock_tx_streamer> make_tx_streamer( +    std::vector<mock_send_link::sptr> send_links, const std::string& format) +{ +    const uhd::stream_args_t stream_args(format, "sc16"); +    auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); +    streamer->set_tick_rate(TICK_RATE); +    streamer->set_samp_rate(SAMP_RATE); + +    for (size_t i = 0; i < send_links.size(); i++) { +        mock_tx_data_xport::uptr xport( +            std::make_unique<mock_tx_data_xport>(send_links[i])); + +        streamer->set_scale_factor(i, SCALE_FACTOR); +        streamer->connect_channel(i, std::move(xport)); +    } + +    return streamer; +} + +std::tuple<mock_tx_data_xport::packet_info_t, std::complex<uint16_t>*, size_t, boost::shared_array<uint8_t>> +pop_send_packet(mock_send_link::sptr send_link) +{ +    auto packet = send_link->pop_send_packet(); + +    const size_t packet_samps = +        (packet.second - sizeof(mock_tx_data_xport::packet_info_t)) +        / sizeof(std::complex<uint16_t>); + +    uint8_t* buff_ptr = packet.first.get(); +    auto info         = *(reinterpret_cast<mock_tx_data_xport::packet_info_t*>(buff_ptr)); + +    std::complex<uint16_t>* data = reinterpret_cast<std::complex<uint16_t>*>( +        buff_ptr + sizeof(mock_tx_data_xport::packet_info_t)); + +    return std::make_tuple(info, data, packet_samps, packet.first); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_send_one_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 30; +    const std::string format("fc32"); + +    auto send_links = make_links(1); +    auto streamer   = make_tx_streamer(send_links, format); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec     = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    std::vector<std::complex<float>> buff(20); +    for (size_t i = 0; i < buff.size(); i++) { +        buff[i] = std::complex<float>(i * 2, i * 2 + 1); +    } + +    // Send packets and check data +    size_t num_accum_samps = 0; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        std::cout << "sending packet " << i << std::endl; + +        // Vary num_samps for each packet +        const size_t num_samps = 10 + i % 10; +        metadata.end_of_burst  = (i == NUM_PKTS_TO_TEST - 1); +        const size_t num_sent  = streamer->send(&buff.front(), num_samps, metadata, 1.0); +        BOOST_CHECK_EQUAL(num_sent, num_samps); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + +        mock_tx_data_xport::packet_info_t info; +        std::complex<uint16_t>* data; +        size_t packet_samps; +        boost::shared_array<uint8_t> frame_buff; + +        std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); +        BOOST_CHECK_EQUAL(num_samps, packet_samps); + +        // Check data +        for (size_t j = 0; j < num_samps; j++) { +            const std::complex<uint16_t> value( +                (j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); +            BOOST_CHECK_EQUAL(value, data[j]); +        } + +        BOOST_CHECK_EQUAL(num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); +        BOOST_CHECK(info.has_tsf); +        BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); +        BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1); +        num_accum_samps += num_samps; +    } +} + +BOOST_AUTO_TEST_CASE(test_send_one_channel_multi_packet) +{ +    const size_t NUM_BUFFS_TO_TEST = 5; +    const std::string format("fc64"); + +    auto send_links = make_links(1); +    auto streamer   = make_tx_streamer(send_links, format); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec     = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    const size_t spp       = streamer->get_max_num_samps(); +    const size_t num_samps = spp * 4; +    std::vector<std::complex<double>> buff(num_samps); +    for (size_t i = 0; i < buff.size(); i++) { +        buff[i] = std::complex<double>(i * 2, i * 2 + 1); +    } + +    // Send packets and check data +    size_t num_accum_samps = 0; + +    for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { +        std::cout << "sending packet " << i << std::endl; + +        metadata.end_of_burst = true; +        const size_t num_sent = streamer->send(&buff.front(), num_samps, metadata, 1.0); +        BOOST_CHECK_EQUAL(num_sent, num_samps); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + +        size_t samps_checked = 0; + +        while (samps_checked < num_samps) { +            mock_tx_data_xport::packet_info_t info; +            std::complex<uint16_t>* data; +            size_t packet_samps; +            boost::shared_array<uint8_t> frame_buff; + +            std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[0]); + +            for (size_t j = 0; j < packet_samps; j++) { +                const size_t n = j + samps_checked; +                const std::complex<uint16_t> value( +                    (n * 2) * SCALE_FACTOR, (n * 2 + 1) * SCALE_FACTOR); +                BOOST_CHECK_EQUAL(value, data[j]); +            } + +            BOOST_CHECK_EQUAL( +                packet_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); +            BOOST_CHECK(info.has_tsf); +            BOOST_CHECK_EQUAL( +                info.tsf, (num_accum_samps + samps_checked) * TICK_RATE / SAMP_RATE); +            samps_checked += packet_samps; + +            BOOST_CHECK_EQUAL(info.eob, samps_checked == num_samps); +        } + +        BOOST_CHECK_EQUAL(samps_checked, num_samps); +        num_accum_samps += samps_checked; +    } +} + +BOOST_AUTO_TEST_CASE(test_send_two_channel_one_packet) +{ +    const size_t NUM_PKTS_TO_TEST = 30; +    const std::string format("sc16"); + +    auto send_links = make_links(2); +    auto streamer   = make_tx_streamer(send_links, format); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.has_time_spec = true; +    metadata.time_spec     = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    std::vector<std::complex<uint16_t>> buff(20); +    for (size_t i = 0; i < buff.size(); i++) { +        buff[i] = std::complex<uint16_t>(i * 2, i * 2 + 1); +    } +    std::vector<void*> buffs; +    for (size_t ch = 0; ch < 2; ch++) { +        buffs.push_back(buff.data()); // same buffer for each channel +    } + +    // Send packets and check data +    size_t num_accum_samps = 0; + +    for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { +        std::cout << "sending packet " << i << std::endl; + +        // Vary num_samps for each packet +        const size_t num_samps = 10 + i % 10; +        metadata.end_of_burst  = (i == NUM_PKTS_TO_TEST - 1); +        const size_t num_sent  = streamer->send(buffs, num_samps, metadata, 1.0); +        BOOST_CHECK_EQUAL(num_sent, num_samps); +        metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + +        for (size_t ch = 0; ch < 2; ch++) { +            mock_tx_data_xport::packet_info_t info; +            std::complex<uint16_t>* data; +            size_t packet_samps; +            boost::shared_array<uint8_t> frame_buff; + +            std::tie(info, data, packet_samps, frame_buff) = pop_send_packet(send_links[ch]); +            BOOST_CHECK_EQUAL(num_samps, packet_samps); + +            // Check data +            for (size_t j = 0; j < num_samps; j++) { +                const std::complex<uint16_t> value((j * 2), (j * 2 + 1)); +                BOOST_CHECK_EQUAL(value, data[j]); +            } + +            BOOST_CHECK_EQUAL( +                num_samps, info.payload_bytes / sizeof(std::complex<uint16_t>)); +            BOOST_CHECK(info.has_tsf); +            BOOST_CHECK_EQUAL(info.tsf, num_accum_samps * TICK_RATE / SAMP_RATE); +            BOOST_CHECK_EQUAL(info.eob, i == NUM_PKTS_TO_TEST - 1); +        } +        num_accum_samps += num_samps; +    } +} + +BOOST_AUTO_TEST_CASE(test_meta_data_cache) +{ +    auto send_links = make_links(1); +    auto streamer   = make_tx_streamer(send_links, "fc32"); + +    // Allocate metadata +    uhd::tx_metadata_t metadata; +    metadata.start_of_burst = true; +    metadata.end_of_burst   = true; +    metadata.has_time_spec  = true; +    metadata.time_spec      = uhd::time_spec_t(0.0); + +    // Allocate buffer and write data +    std::vector<std::complex<float>> buff(20); + +    size_t num_sent = streamer->send(buff.data(), 0, metadata, 1.0); +    BOOST_CHECK_EQUAL(send_links[0]->get_num_packets(), 0); +    BOOST_CHECK_EQUAL(num_sent, 0); +    uhd::tx_metadata_t metadata2; +    num_sent = streamer->send(buff.data(), 10, metadata2, 1.0); + +    mock_tx_data_xport::packet_info_t info; +    size_t packet_samps; +    boost::shared_array<uint8_t> frame_buff; + +    std::tie(info, std::ignore, packet_samps, frame_buff) = pop_send_packet(send_links[0]); +    BOOST_CHECK_EQUAL(packet_samps, num_sent); +    BOOST_CHECK(info.has_tsf); +    BOOST_CHECK(info.eob); +} + +BOOST_AUTO_TEST_CASE(test_spp) +{ +    // Test the spp calculation when it is limited by the stream args +    { +        auto send_links = make_links(1); +        uhd::stream_args_t stream_args("fc64", "sc16"); +        stream_args.args["spp"] = std::to_string(10); +        auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); +        mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0])); +        streamer->connect_channel(0, std::move(xport)); +        BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), 10); +    } + +    // Test the spp calculation when it is limited by the frame size +    { +        auto send_links = make_links(1); +        uhd::stream_args_t stream_args("fc64", "sc16"); +        stream_args.args["spp"] = std::to_string(10000); +        auto streamer = boost::make_shared<mock_tx_streamer>(send_links.size(), stream_args); +        mock_tx_data_xport::uptr xport(std::make_unique<mock_tx_data_xport>(send_links[0])); +        const size_t max_pyld = xport->get_max_payload_size(); +        streamer->connect_channel(0, std::move(xport)); +        BOOST_CHECK_EQUAL(streamer->get_max_num_samps(), max_pyld / sizeof(std::complex<uint16_t>)); +    } +}  | 
