// // Copyright 2011-2012,2015 Ettus Research LLC // Copyright 2018 Ettus Research, a National Instruments Company // // SPDX-License-Identifier: GPL-3.0-or-later // #include "../common/mock_zero_copy.hpp" #include "../lib/transport/super_recv_packet_handler.hpp" #include #include #include #include #include #include using namespace uhd::transport; #define BOOST_CHECK_TS_CLOSE(a, b) \ BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001) /*********************************************************************** * A dummy overflow handler for testing **********************************************************************/ struct overflow_handler_type { overflow_handler_type(void) { num_overflow = 0; } void handle(void) { num_overflow++; } size_t num_overflow; }; //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_normal) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; mock_zero_copy xport(vrt::if_packet_info_t::LINK_TYPE_VRLP); vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; std::vector data(ifpi.num_payload_words32, 0); xport.push_back_recv_packet(ifpi, data); ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); } // create the super receive packet handler uhd::transport::sph::recv_packet_handler handler(1); handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); handler.set_xport_chan_get_buff( 0, [&xport](double timeout) { return xport.get_recv_buff(timeout); }); handler.set_converter(id); // check the received packets size_t num_accum_samps = 0; std::vector> buff(20); uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE( metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10 + i % 10); num_accum_samps += num_samps_ret; } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing xport.set_simulate_io_error(true); BOOST_REQUIRE_THROW( handler.recv(&buff.front(), buff.size(), metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_sequence_error) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; mock_zero_copy xport(vrt::if_packet_info_t::LINK_TYPE_VRLP); vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; if (i != NUM_PKTS_TO_TEST / 2) { // simulate a lost packet std::vector data(ifpi.num_payload_words32, 0); xport.push_back_recv_packet(ifpi, data); } ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); } // create the super receive packet handler sph::recv_packet_handler handler(1); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); handler.set_xport_chan_get_buff( 0, [&xport](double timeout) { return xport.get_recv_buff(timeout); }); handler.set_converter(id); // check the received packets size_t num_accum_samps = 0; std::vector> buff(20); uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); if (i == NUM_PKTS_TO_TEST / 2) { // must get the soft overflow here BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); BOOST_REQUIRE(metadata.out_of_sequence == true); BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); num_accum_samps += 10 + i % 10; } else { BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10 + i % 10); num_accum_samps += num_samps_ret; } } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing xport.set_simulate_io_error(true); BOOST_REQUIRE_THROW( handler.recv(&buff.front(), buff.size(), metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_inline_message) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; mock_zero_copy xport(vrt::if_packet_info_t::LINK_TYPE_VRLP); vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 10 + i % 10; std::vector data(ifpi.num_payload_words32, 0); xport.push_back_recv_packet(ifpi, data); ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); // simulate overflow if (i == NUM_PKTS_TO_TEST / 2) { ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_CONTEXT; ifpi.num_payload_words32 = 1; xport.push_back_inline_message_packet( ifpi, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); } } // create the super receive packet handler sph::recv_packet_handler handler(1); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); handler.set_xport_chan_get_buff( 0, [&xport](double timeout) { return xport.get_recv_buff(timeout); }); handler.set_converter(id); // create an overflow handler overflow_handler_type overflow_handler; handler.set_overflow_handler( 0, boost::bind(&overflow_handler_type::handle, &overflow_handler)); // check the received packets size_t num_accum_samps = 0; std::vector> buff(20); uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE( metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10 + i % 10); num_accum_samps += num_samps_ret; if (i == NUM_PKTS_TO_TEST / 2) { handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); std::cout << "metadata.error_code " << metadata.error_code << std::endl; BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(overflow_handler.num_overflow, size_t(1)); } } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(&buff.front(), buff.size(), metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing xport.set_simulate_io_error(true); BOOST_REQUIRE_THROW( handler.recv(&buff.front(), buff.size(), metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_normal) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; static const size_t NUM_SAMPS_PER_BUFF = 20; static const size_t NCHANNELS = 4; std::vector xports; for (size_t i = 0; i < NCHANNELS; i++) { xports.push_back( std::make_shared(vrt::if_packet_info_t::LINK_TYPE_VRLP)); } // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; for (size_t ch = 0; ch < NCHANNELS; ch++) { std::vector data(ifpi.num_payload_words32, 0); xports[ch]->push_back_recv_packet(ifpi, data); } ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); } // create the super receive packet handler sph::recv_packet_handler handler(NCHANNELS); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); for (size_t ch = 0; ch < NCHANNELS; ch++) { mock_zero_copy::sptr xport = xports[ch]; handler.set_xport_chan_get_buff( ch, [xport](double timeout) { return xport->get_recv_buff(timeout); }); } handler.set_converter(id); // check the received packets size_t num_accum_samps = 0; std::complex mem[NUM_SAMPS_PER_BUFF * NCHANNELS]; std::vector*> buffs(NCHANNELS); for (size_t ch = 0; ch < NCHANNELS; ch++) { buffs[ch] = &mem[ch * NUM_SAMPS_PER_BUFF]; } uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE( metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10 + i % 10); num_accum_samps += num_samps_ret; } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing for (size_t ch = 0; ch < NCHANNELS; ch++) { xports[ch]->set_simulate_io_error(true); } BOOST_REQUIRE_THROW( handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_sequence_error) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; static const size_t NUM_SAMPS_PER_BUFF = 20; static const size_t NCHANNELS = 4; std::vector xports; for (size_t i = 0; i < NCHANNELS; i++) { xports.push_back( std::make_shared(vrt::if_packet_info_t::LINK_TYPE_VRLP)); } // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; for (size_t ch = 0; ch < NCHANNELS; ch++) { if (i == NUM_PKTS_TO_TEST / 2 and ch == 2) { continue; // simulates a lost packet } std::vector data(ifpi.num_payload_words32, 0); xports[ch]->push_back_recv_packet(ifpi, data); } ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); } // create the super receive packet handler sph::recv_packet_handler handler(NCHANNELS); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); for (size_t ch = 0; ch < NCHANNELS; ch++) { mock_zero_copy::sptr xport = xports[ch]; handler.set_xport_chan_get_buff( ch, [xport](double timeout) { return xport->get_recv_buff(timeout); }); } handler.set_converter(id); // check the received packets size_t num_accum_samps = 0; std::complex mem[NUM_SAMPS_PER_BUFF * NCHANNELS]; std::vector*> buffs(NCHANNELS); for (size_t ch = 0; ch < NCHANNELS; ch++) { buffs[ch] = &mem[ch * NUM_SAMPS_PER_BUFF]; } uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); if (i == NUM_PKTS_TO_TEST / 2) { // must get the soft overflow here BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); BOOST_REQUIRE(metadata.out_of_sequence == true); BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); num_accum_samps += 10 + i % 10; } else { BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10 + i % 10); num_accum_samps += num_samps_ret; } } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing for (size_t ch = 0; ch < NCHANNELS; ch++) { xports[ch]->set_simulate_io_error(true); } BOOST_REQUIRE_THROW( handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_time_error) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; static const size_t NUM_SAMPS_PER_BUFF = 20; static const size_t NCHANNELS = 4; std::vector xports; for (size_t i = 0; i < NCHANNELS; i++) { xports.push_back( std::make_shared(vrt::if_packet_info_t::LINK_TYPE_VRLP)); } // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; for (size_t ch = 0; ch < NCHANNELS; ch++) { std::vector data(ifpi.num_payload_words32, 0); xports[ch]->push_back_recv_packet(ifpi, data); } ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); if (i == NUM_PKTS_TO_TEST / 2) { ifpi.tsf = 0; // simulate the user changing the time } } // create the super receive packet handler sph::recv_packet_handler handler(NCHANNELS); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); for (size_t ch = 0; ch < NCHANNELS; ch++) { mock_zero_copy::sptr xport = xports[ch]; handler.set_xport_chan_get_buff( ch, [xport](double timeout) { return xport->get_recv_buff(timeout); }); } handler.set_converter(id); // check the received packets size_t num_accum_samps = 0; std::complex mem[NUM_SAMPS_PER_BUFF * NCHANNELS]; std::vector*> buffs(NCHANNELS); for (size_t ch = 0; ch < NCHANNELS; ch++) { buffs[ch] = &mem[ch * NUM_SAMPS_PER_BUFF]; } uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE( metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10 + i % 10); num_accum_samps += num_samps_ret; if (i == NUM_PKTS_TO_TEST / 2) { num_accum_samps = 0; // simulate the user changing the time } } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing for (size_t ch = 0; ch < NCHANNELS; ch++) { xports[ch]->set_simulate_io_error(true); } BOOST_REQUIRE_THROW( handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_exception) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; static const size_t NUM_SAMPS_PER_BUFF = 20; static const size_t NCHANNELS = 4; std::vector xports; for (size_t i = 0; i < NCHANNELS; i++) { xports.push_back( std::make_shared(vrt::if_packet_info_t::LINK_TYPE_VRLP)); } // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; for (size_t ch = 0; ch < NCHANNELS; ch++) { std::vector data(ifpi.num_payload_words32, 0); xports[ch]->push_back_recv_packet(ifpi, data); } ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); if (i == NUM_PKTS_TO_TEST / 2) { ifpi.tsf = 0; // simulate the user changing the time } } // create the super receive packet handler sph::recv_packet_handler handler(NCHANNELS); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); for (size_t ch = 0; ch < NCHANNELS; ch++) { mock_zero_copy::sptr xport = xports[ch]; handler.set_xport_chan_get_buff( ch, [xport](double timeout) { return xport->get_recv_buff(timeout); }); } handler.set_converter(id); std::complex mem[NUM_SAMPS_PER_BUFF * NCHANNELS]; std::vector*> buffs(NCHANNELS); for (size_t ch = 0; ch < NCHANNELS; ch++) { buffs[ch] = &mem[ch * NUM_SAMPS_PER_BUFF]; } // simulate a failure on a channel (the last one) uhd::rx_metadata_t metadata; xports[NCHANNELS - 1]->set_simulate_io_error(true); std::cout << "exception check" << std::endl; BOOST_REQUIRE_THROW( handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true), uhd::io_error); } //////////////////////////////////////////////////////////////////////// BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_fragment) { //////////////////////////////////////////////////////////////////////// uhd::convert::id_type id; id.input_format = "sc16_item32_be"; id.num_inputs = 1; id.output_format = "fc32"; id.num_outputs = 1; vrt::if_packet_info_t ifpi; ifpi.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; ifpi.num_payload_words32 = 0; ifpi.packet_count = 0; ifpi.sob = true; ifpi.eob = false; ifpi.has_sid = false; ifpi.has_cid = false; ifpi.has_tsi = true; ifpi.has_tsf = true; ifpi.tsi = 0; ifpi.tsf = 0; ifpi.has_tlr = false; static const double TICK_RATE = 100e6; static const double SAMP_RATE = 10e6; static const size_t NUM_PKTS_TO_TEST = 30; static const size_t NUM_SAMPS_PER_BUFF = 10; static const size_t NCHANNELS = 4; std::vector xports; for (size_t i = 0; i < NCHANNELS; i++) { xports.push_back( std::make_shared(vrt::if_packet_info_t::LINK_TYPE_VRLP)); } // generate a bunch of packets for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { ifpi.num_payload_words32 = 10 + i % 10; for (size_t ch = 0; ch < NCHANNELS; ch++) { std::vector data(ifpi.num_payload_words32, 0); xports[ch]->push_back_recv_packet(ifpi, data); } ifpi.packet_count++; ifpi.tsf += ifpi.num_payload_words32 * size_t(TICK_RATE / SAMP_RATE); } // create the super receive packet handler sph::recv_packet_handler handler(NCHANNELS); handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); handler.set_tick_rate(TICK_RATE); handler.set_samp_rate(SAMP_RATE); for (size_t ch = 0; ch < NCHANNELS; ch++) { mock_zero_copy::sptr xport = xports[ch]; handler.set_xport_chan_get_buff( ch, [xport](double timeout) { return xport->get_recv_buff(timeout); }); } handler.set_converter(id); // check the received packets size_t num_accum_samps = 0; std::complex mem[NUM_SAMPS_PER_BUFF * NCHANNELS]; std::vector*> buffs(NCHANNELS); for (size_t ch = 0; ch < NCHANNELS; ch++) { buffs[ch] = &mem[ch * NUM_SAMPS_PER_BUFF]; } uhd::rx_metadata_t metadata; for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++) { std::cout << "data check " << i << std::endl; size_t num_samps_ret = handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE( metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, 10UL); num_accum_samps += num_samps_ret; if (not metadata.more_fragments) continue; num_samps_ret = handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); BOOST_CHECK(not metadata.more_fragments); BOOST_CHECK_EQUAL(metadata.fragment_offset, 10UL); BOOST_CHECK(metadata.has_time_spec); BOOST_CHECK_TS_CLOSE( metadata.time_spec, uhd::time_spec_t::from_ticks(num_accum_samps, SAMP_RATE)); BOOST_CHECK_EQUAL(num_samps_ret, i % 10); num_accum_samps += num_samps_ret; } // subsequent receives should be a timeout for (size_t i = 0; i < 3; i++) { std::cout << "timeout check " << i << std::endl; handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true); BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); } // simulate the transport failing for (size_t ch = 0; ch < NCHANNELS; ch++) { xports[ch]->set_simulate_io_error(true); } BOOST_REQUIRE_THROW( handler.recv(buffs, NUM_SAMPS_PER_BUFF, metadata, 1.0, true), uhd::io_error); }