diff options
Diffstat (limited to 'host/tests/rx_streamer_test.cpp')
-rw-r--r-- | host/tests/rx_streamer_test.cpp | 744 |
1 files changed, 744 insertions, 0 deletions
diff --git a/host/tests/rx_streamer_test.cpp b/host/tests/rx_streamer_test.cpp new file mode 100644 index 000000000..cd4daf569 --- /dev/null +++ b/host/tests/rx_streamer_test.cpp @@ -0,0 +1,744 @@ +// +// Copyright 2019 Ettus Research, a National Instruments Brand +// +// SPDX-License-Identifier: GPL-3.0-or-later +// + +#include "../common/mock_link.hpp" +#include <uhdlib/transport/rx_streamer_impl.hpp> +#include <boost/make_shared.hpp> +#include <boost/test/unit_test.hpp> +#include <iostream> + +namespace uhd { namespace transport { + +/*! + * Contents of mock packet header + */ +struct mock_header_t +{ + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + bool ignore_seq = true; + size_t seq_num = 0; +}; + +/*! + * Mock rx data xport which doesn't use I/O service, and just interacts with + * the link directly. + */ +class mock_rx_data_xport +{ +public: + using uptr = std::unique_ptr<mock_rx_data_xport>; + using buff_t = uhd::transport::frame_buff; + + //! Values extracted from received RX data packets + struct packet_info_t + { + bool eob = false; + bool has_tsf = false; + uint64_t tsf = 0; + size_t payload_bytes = 0; + const void* payload = nullptr; + }; + + mock_rx_data_xport(mock_recv_link::sptr recv_link) : _recv_link(recv_link) {} + + std::tuple<frame_buff::uptr, packet_info_t, bool> get_recv_buff( + const int32_t timeout_ms) + { + frame_buff::uptr buff = _recv_link->get_recv_buff(timeout_ms); + mock_header_t header = *(reinterpret_cast<mock_header_t*>(buff->data())); + + packet_info_t info; + info.eob = header.eob; + info.has_tsf = header.has_tsf; + info.tsf = header.tsf; + info.payload_bytes = header.payload_bytes; + info.payload = reinterpret_cast<uint8_t*>(buff->data()) + sizeof(mock_header_t); + + const uint8_t* pkt_end = + reinterpret_cast<uint8_t*>(buff->data()) + buff->packet_size(); + const size_t pyld_pkt_len = + pkt_end - reinterpret_cast<const uint8_t*>(info.payload); + + if (pyld_pkt_len < info.payload_bytes) { + _recv_link->release_recv_buff(std::move(buff)); + throw uhd::value_error("Bad header or invalid packet length."); + } + + const bool seq_match = header.seq_num == _seq_num; + const bool seq_error = !header.ignore_seq && !seq_match; + _seq_num = header.seq_num + 1; + + return std::make_tuple(std::move(buff), info, seq_error); + } + + void release_recv_buff(frame_buff::uptr buff) + { + _recv_link->release_recv_buff(std::move(buff)); + } + + size_t get_max_payload_size() const + { + return _recv_link->get_recv_frame_size() - sizeof(packet_info_t); + } + +private: + mock_recv_link::sptr _recv_link; + size_t _seq_num = 0; +}; + +/*! + * Mock rx streamer for testing + */ +class mock_rx_streamer : public rx_streamer_impl<mock_rx_data_xport> +{ +public: + mock_rx_streamer(const size_t num_chans, const uhd::stream_args_t& stream_args) + : rx_streamer_impl(num_chans, stream_args) + { + } + + void issue_stream_cmd(const stream_cmd_t&) {} + + void set_tick_rate(double rate) + { + rx_streamer_impl::set_tick_rate(rate); + } + + void set_samp_rate(double rate) + { + rx_streamer_impl::set_samp_rate(rate); + } + + void set_scale_factor(const size_t chan, const double scale_factor) + { + rx_streamer_impl::set_scale_factor(chan, scale_factor); + } +}; + +}} // namespace uhd::transport + +using namespace uhd::transport; + +using rx_streamer = rx_streamer_impl<mock_rx_data_xport>; + +static const double TICK_RATE = 100e6; +static const double SAMP_RATE = 10e6; +static const size_t FRAME_SIZE = 1000; +static const double SCALE_FACTOR = 2; + +/*! + * Helper functions + */ +static std::vector<mock_recv_link::sptr> make_links(const size_t num) +{ + const mock_recv_link::link_params params = {FRAME_SIZE, 1}; + + std::vector<mock_recv_link::sptr> links; + + for (size_t i = 0; i < num; i++) { + links.push_back(std::make_shared<mock_recv_link>(params)); + } + + return links; +} + +static boost::shared_ptr<mock_rx_streamer> make_rx_streamer( + std::vector<mock_recv_link::sptr> recv_links, + const std::string& host_format, + const std::string& otw_format = "sc16") +{ + const uhd::stream_args_t stream_args(host_format, otw_format); + auto streamer = boost::make_shared<mock_rx_streamer>(recv_links.size(), stream_args); + streamer->set_tick_rate(TICK_RATE); + streamer->set_samp_rate(SAMP_RATE); + + for (size_t i = 0; i < recv_links.size(); i++) { + mock_rx_data_xport::uptr xport( + std::make_unique<mock_rx_data_xport>(recv_links[i])); + + streamer->set_scale_factor(i, SCALE_FACTOR); + streamer->connect_channel(i, std::move(xport)); + } + + return streamer; +} + +static void push_back_recv_packet(mock_recv_link::sptr recv_link, + mock_header_t header, + size_t num_samps, + uint16_t start_data = 0) +{ + // Allocate buffer + const size_t pyld_bytes = num_samps * sizeof(std::complex<uint16_t>); + const size_t buff_len = sizeof(header) + pyld_bytes; + boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + + // Write header to buffer + header.payload_bytes = pyld_bytes; + *(reinterpret_cast<mock_header_t*>(data.get())) = header; + + // Write data to buffer + auto data_ptr = + reinterpret_cast<std::complex<uint16_t>*>(data.get() + sizeof(header)); + + for (size_t i = 0; i < num_samps; i++) { + uint16_t val = (start_data + i) * 2; + data_ptr[i] = std::complex<uint16_t>(val, val + 1); + } + + // Push back buffer for link to recv + recv_link->push_back_recv_packet(data, buff_len); +} + +/*! + * Tests + */ +BOOST_AUTO_TEST_CASE(test_recv_one_channel_one_packet) +{ + const size_t NUM_PKTS_TO_TEST = 5; + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + const bool even_iteration = (i % 2 == 0); + const bool odd_iteration = (i % 2 != 0); + mock_header_t header; + header.eob = even_iteration; + header.has_tsf = odd_iteration; + header.tsf = i; + push_back_recv_packet(recv_links[0], header, num_samps); + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); + BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + + for (size_t j = 0; j < num_samps; j++) { + const auto value = + std::complex<float>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, buff[j]); + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet) +{ + const size_t NUM_BUFFS_TO_TEST = 5; + const std::string format("fc64"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * 4; + std::vector<std::complex<double>> buff(num_samps); + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_BUFFS_TO_TEST; i++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.tsf = i; + + size_t samps_written = 0; + while (samps_written < num_samps) { + size_t samps_to_write = std::min(num_samps - samps_written, spp); + push_back_recv_packet(recv_links[0], header, samps_to_write, samps_written); + samps_written += samps_to_write; + } + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, false); + BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + + for (size_t j = 0; j < num_samps; j++) { + const auto value = + std::complex<double>((j * 2) * SCALE_FACTOR, (j * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, buff[j]); + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_multi_packet_with_eob) +{ + // EOB should terminate a multi-packet recv, test that it does + const std::string format("sc16"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_packets = 4; + const size_t spp = streamer->get_max_num_samps(); + const size_t num_samps = spp * num_packets; + std::vector<std::complex<double>> buff(num_samps); + uhd::rx_metadata_t metadata; + + // Queue 4 packets, with eob set in every other packet + for (size_t i = 0; i < num_packets; i++) { + mock_header_t header; + header.has_tsf = false; + header.eob = (i % 2) != 0; + push_back_recv_packet(recv_links[0], header, spp); + } + + // Now call recv and check that eob terminates a recv call + for (size_t i = 0; i < num_packets / 2; i++) { + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, spp * 2); + BOOST_CHECK_EQUAL(metadata.end_of_burst, true); + BOOST_CHECK_EQUAL(metadata.has_time_spec, false); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_two_channel_one_packet) +{ + const size_t NUM_PKTS_TO_TEST = 5; + const std::string format("sc16"); + + const size_t num_chans = 2; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + + std::vector<std::vector<std::complex<uint16_t>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + const bool even_iteration = (i % 2 == 0); + const bool odd_iteration = (i % 2 != 0); + mock_header_t header; + header.eob = even_iteration; + header.has_tsf = odd_iteration; + header.tsf = i; + + size_t samps_pushed = 0; + for (size_t ch = 0; ch < num_chans; ch++) { + push_back_recv_packet(recv_links[ch], header, num_samps, samps_pushed); + samps_pushed += num_samps; + } + + std::cout << "receiving packet " << i << std::endl; + + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, even_iteration); + BOOST_CHECK_EQUAL(metadata.has_time_spec, odd_iteration); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), i); + + size_t samps_checked = 0; + for (size_t ch = 0; ch < num_chans; ch++) { + for (size_t samp = 0; samp < num_samps; samp++) { + const size_t n = samps_checked + samp; + const auto value = std::complex<uint16_t>((n * 2), (n * 2 + 1)); + BOOST_CHECK_EQUAL(value, buffer[ch][samp]); + } + samps_checked += num_samps; + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_one_channel_packet_fragment) +{ + const size_t NUM_PKTS_TO_TEST = 5; + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + // Push back five packets, then read them 1/4 of a packet at a time + const size_t spp = streamer->get_max_num_samps(); + const size_t reads_per_packet = 4; + const size_t num_samps = spp / reads_per_packet; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + mock_header_t header; + header.eob = true; + header.has_tsf = true; + header.tsf = 0; + push_back_recv_packet(recv_links[0], header, num_samps * reads_per_packet); + } + + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + std::cout << "receiving packet " << i << std::endl; + + size_t total_samps_read = 0; + for (size_t j = 0; j < reads_per_packet; j++) { + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + BOOST_CHECK_EQUAL(metadata.end_of_burst, true); + BOOST_CHECK_EQUAL(metadata.more_fragments, j != reads_per_packet - 1); + BOOST_CHECK_EQUAL(metadata.fragment_offset, total_samps_read); + + const size_t ticks_per_sample = static_cast<size_t>(TICK_RATE / SAMP_RATE); + const size_t expected_ticks = ticks_per_sample * total_samps_read; + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), expected_ticks); + + for (size_t samp = 0; samp < num_samps; samp++) { + const size_t pkt_idx = samp + total_samps_read; + const auto value = std::complex<float>( + (pkt_idx * 2) * SCALE_FACTOR, (pkt_idx * 2 + 1) * SCALE_FACTOR); + BOOST_CHECK_EQUAL(value, buff[samp]); + } + + total_samps_read += num_samps_ret; + } + } +} + +BOOST_AUTO_TEST_CASE(test_recv_seq_error) +{ + // Test that when we get a sequence error the error is returned in the + // metadata with a time spec that corresponds to the time spec of the + // last sample in the previous packet plus one sample clock. Test that + // the packet that causes the sequence error is not discarded. + const size_t NUM_PKTS_TO_TEST = 2; + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + size_t seq_num = 0; + size_t tsf = 0; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.ignore_seq = false; + + // Push back three packets but skip a seq_num after the second + header.seq_num = seq_num++; + header.tsf = tsf; + push_back_recv_packet(recv_links[0], header, num_samps); + + tsf += num_samps; + header.seq_num = seq_num++; + header.tsf = tsf; + push_back_recv_packet(recv_links[0], header, num_samps); + + seq_num++; // dropped packet + tsf += num_samps; + + header.seq_num = seq_num++; + header.tsf = tsf; + push_back_recv_packet(recv_links[0], header, num_samps); + + // First two reads should succeed + size_t num_samps_ret = + streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + size_t prev_tsf = metadata.time_spec.to_ticks(TICK_RATE); + size_t expected_tsf = prev_tsf + num_samps * (TICK_RATE / SAMP_RATE); + + // Third read should be a sequence error + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + size_t metadata_tsf = metadata.time_spec.to_ticks(TICK_RATE); + BOOST_CHECK_EQUAL(metadata_tsf, expected_tsf); + + // Next read should succeed + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, false); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_bad_packet) +{ + // Test that when we receive a packet with invalid chdr header or length + // the streamer returns the correct error in meatadata. + auto push_back_bad_packet = [](mock_recv_link::sptr recv_link) { + mock_header_t header; + header.payload_bytes = 1000; + + // Allocate a buffer that is too small for the payload + const size_t buff_len = 100; + boost::shared_array<uint8_t> data(new uint8_t[buff_len]); + + // Write header to buffer + *(reinterpret_cast<mock_header_t*>(data.get())) = header; + + // Push back buffer for link to recv + recv_link->push_back_recv_packet(data, buff_len); + }; + + const std::string format("fc32"); + + auto recv_links = make_links(1); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 20; + std::vector<std::complex<float>> buff(num_samps); + uhd::rx_metadata_t metadata; + + mock_header_t header; + + // Push back a regular packet + push_back_recv_packet(recv_links[0], header, num_samps); + + // Push back a bad packet + push_back_bad_packet(recv_links[0]); + + // Push back another regular packet + push_back_recv_packet(recv_links[0], header, num_samps); + + // First read should succeed + size_t num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + + // Second read should be an error + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET); + + // Third read should succeed + num_samps_ret = streamer->recv(buff.data(), buff.size(), metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_no_tsf) +{ + // Test that we can receive packets without tsf. Start by pushing + // a packet with a tsf followed by a few packets without. + const size_t NUM_PKTS_TO_TEST = 6; + const std::string format("fc64"); + + const size_t num_chans = 10; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 21; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + uhd::rx_metadata_t metadata; + + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { + mock_header_t header; + header.eob = (i == NUM_PKTS_TO_TEST - 1); + header.has_tsf = (i == 0); + header.tsf = 500; + + for (size_t ch = 0; ch < num_chans; ch++) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, i == NUM_PKTS_TO_TEST - 1); + BOOST_CHECK_EQUAL(metadata.has_time_spec, i == 0); + } +} + +BOOST_AUTO_TEST_CASE(test_recv_multi_channel_seq_error) +{ + // Test that the streamer handles dropped packets correctly by injecting + // a sequence error in one channel. The streamer should discard + // corresponding packets from all other channels. + const std::string format("fc64"); + + const size_t num_chans = 100; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 99; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + for (size_t ch = 0; ch < num_chans; ch++) { + mock_header_t header; + header.eob = false; + header.has_tsf = true; + header.tsf = 0; + header.ignore_seq = false; + header.seq_num = 0; + + // Drop a packet from an arbitrary channel right at the start + if (ch != num_chans / 2) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + // Add a regular packet to check the streamer drops the first + header.seq_num++; + header.tsf++; + push_back_recv_packet(recv_links[ch], header, num_samps); + + // Drop a packet from the first channel + header.seq_num++; + header.tsf++; + if (ch != 0) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + // Add a regular packet + header.seq_num++; + header.tsf++; + push_back_recv_packet(recv_links[ch], header, num_samps); + + // Drop a few packets from the last channel + for (size_t j = 0; j < 10; j++) { + header.seq_num++; + header.tsf++; + if (ch != num_chans - 1) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + } + + // Add a regular packet + header.seq_num++; + header.tsf++; + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + uhd::rx_metadata_t metadata; + + // First recv should result in error + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + + // Packet with tsf == 1 should be returned next + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 1); + + // Next recv should result in error + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + + // Packet with tsf == 3 should be returned next + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 3); + + // Next recv should result in error + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_EQUAL(metadata.out_of_sequence, true); + + // Packet with tsf == 14 should be returned next + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.time_spec.to_ticks(TICK_RATE), 14); +} + +BOOST_AUTO_TEST_CASE(test_recv_alignment_error) +{ + // Test that the alignment procedure returns an alignment error if it can't + // time align packets. + const std::string format("fc64"); + + const size_t num_chans = 4; + + auto recv_links = make_links(num_chans); + auto streamer = make_rx_streamer(recv_links, format); + + const size_t num_samps = 2; + + std::vector<std::vector<std::complex<double>>> buffer(num_chans); + std::vector<void*> buffers; + for (size_t i = 0; i < num_chans; i++) { + buffer[i].resize(num_samps); + buffers.push_back(&buffer[i].front()); + } + + uhd::rx_metadata_t metadata; + + mock_header_t header; + header.eob = true; + header.has_tsf = true; + header.tsf = 500; + + for (size_t ch = 0; ch < num_chans; ch++) { + push_back_recv_packet(recv_links[ch], header, num_samps); + } + + size_t num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + + BOOST_CHECK_EQUAL(num_samps_ret, num_samps); + BOOST_CHECK_EQUAL(metadata.end_of_burst, true); + BOOST_CHECK_EQUAL(metadata.has_time_spec, true); + + for (size_t pkt = 0; pkt < uhd::transport::ALIGNMENT_FAILURE_THRESHOLD; pkt++) { + header.tsf = header.tsf + num_samps; + for (size_t ch = 0; ch < num_chans; ch++) { + if (ch == num_chans - 1) { + // Misalign this time stamp + header.tsf += 1; + } + push_back_recv_packet(recv_links[ch], header, num_samps); + } + } + + num_samps_ret = streamer->recv(buffers, num_samps, metadata, 1.0, false); + BOOST_CHECK_EQUAL(num_samps_ret, 0); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_ALIGNMENT); +} |