diff options
-rwxr-xr-x | host/lib/transport/gen_vrt_if_packet.py | 44 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 637 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 287 | ||||
-rw-r--r-- | host/tests/CMakeLists.txt | 2 | ||||
-rw-r--r-- | host/tests/sph_recv_test.cpp | 715 | ||||
-rw-r--r-- | host/tests/sph_send_test.cpp | 204 |
6 files changed, 1876 insertions, 13 deletions
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index 7440def6a..5f048d8c7 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -62,6 +62,8 @@ static pred_table_type get_pred_unpack_table(void){ if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p); if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p); if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); + if(vrt_hdr_word & $hex(0x1 << 24)) table[i] |= $hex($eob_p); + if(vrt_hdr_word & $hex(0x1 << 25)) table[i] |= $hex($sob_p); } return table; } @@ -84,9 +86,11 @@ void vrt::if_hdr_pack_$(suffix)( if (if_packet_info.has_tsi) pred |= $hex($tsi_p); if (if_packet_info.has_tsf) pred |= $hex($tsf_p); if (if_packet_info.has_tlr) pred |= $hex($tlr_p); + if (if_packet_info.eob) pred |= $hex($eob_p); + if (if_packet_info.sob) pred |= $hex($sob_p); switch(pred){ - #for $pred in range(2**5) + #for $pred in range(2**7) case $pred: #set $num_header_words = 1 #set $flags = 0 @@ -126,6 +130,13 @@ void vrt::if_hdr_pack_$(suffix)( #else #set $num_trailer_words = 0; #end if + ########## Burst Flags ########## + #if $pred & $eob_p + #set $flags |= (0x1 << 24); + #end if + #if $pred & $sob_p + #set $flags |= (0x1 << 25); + #end if ########## Variables ########## if_packet_info.num_header_words32 = $num_header_words; if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32; @@ -134,10 +145,6 @@ void vrt::if_hdr_pack_$(suffix)( #end for } - //set the burst flags - if (if_packet_info.sob) vrt_hdr_flags |= $hex(0x1 << 25); - if (if_packet_info.eob) vrt_hdr_flags |= $hex(0x1 << 24); - //fill in complete header word packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 | (if_packet_info.packet_type << 29) @@ -162,13 +169,11 @@ void vrt::if_hdr_unpack_$(suffix)( //extract fields from the header if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; - //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented - //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; switch(pred){ - #for $pred in range(2**5) + #for $pred in range(2**7) case $pred: #set $has_time_spec = False #set $num_header_words = 1 @@ -215,6 +220,17 @@ void vrt::if_hdr_unpack_$(suffix)( if_packet_info.has_tlr = false; #set $num_trailer_words = 0; #end if + ########## Burst Flags ########## + #if $pred & $eob_p + if_packet_info.eob = true; + #else + if_packet_info.eob = false; + #end if + #if $pred & $sob_p + if_packet_info.sob = true; + #else + if_packet_info.sob = false; + #end if ########## Variables ########## //another failure case if (packet_words32 < $($num_header_words + $num_trailer_words)) @@ -243,9 +259,11 @@ if __name__ == '__main__': open(sys.argv[1], 'w').write(parse_tmpl( TMPL_TEXT, file=__file__, - sid_p = 0b00001, - cid_p = 0b00010, - tsi_p = 0b00100, - tsf_p = 0b01000, - tlr_p = 0b10000, + sid_p = 0b0000001, + cid_p = 0b0000010, + tsi_p = 0b0000100, + tsf_p = 0b0001000, + tlr_p = 0b0010000, + sob_p = 0b0100000, + eob_p = 0b1000000, )) diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp new file mode 100644 index 000000000..78d8dfce8 --- /dev/null +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -0,0 +1,637 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <boost/format.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +UHD_INLINE boost::uint32_t get_context_code( + const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info +){ + //extract the context word (we dont know the endianness so mirror the bytes) + boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | + uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); + return word0 & 0xff; +} + +typedef boost::function<void(void)> handle_overflow_type; +static inline void handle_overflow_nop(void){} + +/*********************************************************************** + * Alignment indexes class: + * - Access an integer set with very quick operations. + **********************************************************************/ +class alignment_indexes{ +public: + typedef boost::uint16_t index_type; //16 buffers + + alignment_indexes(void): + _indexes(0), + _sizes(256, 0), + _fronts(256, ~0) + { + //fill the O(1) look up tables for a single byte + for (size_t i = 0; i < 256; i++){ + for (size_t j = 0; j < 8; j++){ + if (i & (1 << j)){ + _sizes[i]++; + _fronts[i] = j; + } + } + } + } + + UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;} + + UHD_INLINE size_t front(void){ + //check one byte per iteration + for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ + size_t front = _fronts[(_indexes >> i) & 0xff]; + if (front != size_t(~0)) return front + i; + } + if (empty()) throw uhd::runtime_error("cannot call front() when empty"); + UHD_THROW_INVALID_CODE_PATH(); + } + + UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);} + + UHD_INLINE bool empty(void){return _indexes == 0;} + + UHD_INLINE size_t size(void){ + size_t size = 0; + //check one byte per iteration + for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ + size += _sizes[(_indexes >> i) & 0xff]; + } + return size; + } + +private: + index_type _indexes; + std::vector<size_t> _sizes; + std::vector<size_t> _fronts; +}; + +/*********************************************************************** + * Super receive packet handler + * + * A receive packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are received in unison in recv(). + **********************************************************************/ +class recv_packet_handler{ +public: + typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; + typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; + + /*! + * Make a new packet handler for receive + * \param size the number of transport channels + */ + recv_packet_handler(const size_t size = 1): + _queue_error_for_next_call(false), + _buffers_infos_index(0) + { + UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8); + this->resize(size); + set_alignment_failure_threshold(1000); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _props.resize(size); + _buffers_infos.resize(4, buffers_info_type(size)); + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt unpacker function and offset + void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){ + _vrt_unpacker = vrt_unpacker; + _header_offset_words32 = header_offset_words32; + } + + /*! + * Set the threshold for alignment failure. + * How many packets throw out before giving up? + * \param threshold number of packets per channel + */ + void set_alignment_failure_threshold(const size_t threshold){ + _alignment_faulure_threshold = threshold*this->size(); + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Setup the conversion functions (homogeneous across transports). + * Here, we load a table of converters for all possible io types. + * This makes the converter look-up an O(1) operation. + * \param otw_type the channel data type + * \param width the streams per channel (usually 1) + */ + void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ + _io_buffs.resize(width); + _converters.resize(128); + for (size_t io_type = 0; io_type < _converters.size(); io_type++){ + try{ + _converters[io_type] = uhd::convert::get_converter_otw_to_cpu( + io_type_t::tid_t(io_type), otw_type, 1, width + ); + }catch(const uhd::value_error &e){} //we expect this, not all io_types valid... + } + _bytes_per_item = otw_type.get_sample_size(); + } + + //! Set the transport channel's overflow handler + void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){ + _props.at(xport_chan).handle_overflow = handle_overflow; + } + + //! Get a scoped lock object for this instance + boost::mutex::scoped_lock get_scoped_lock(void){ + return boost::mutex::scoped_lock(_mutex); + } + + /******************************************************************* + * Receive: + * The entry point for the fast-path receive calls. + * Dispatch into combinations of single packet receive calls. + ******************************************************************/ + UHD_INLINE size_t recv( + const uhd::device::recv_buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const uhd::io_type_t &io_type, + uhd::device::recv_mode_t recv_mode, + double timeout + ){ + boost::mutex::scoped_lock lock(_mutex); + + //handle metadata queued from a previous receive + if (_queue_error_for_next_call){ + _queue_error_for_next_call = false; + metadata = _queue_metadata; + //We want to allow a full buffer recv to be cut short by a timeout, + //but do not want to generate an inline timeout message packet. + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0; + } + + switch(recv_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + return recv_one_packet(buffs, nsamps_per_buff, metadata, io_type, timeout); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t accum_num_samps = recv_one_packet( + buffs, nsamps_per_buff, metadata, io_type, timeout + ); + + //first recv had an error code set, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + + //loop until buffer is filled or error code + while(accum_num_samps < nsamps_per_buff){ + size_t num_samps = recv_one_packet( + buffs, nsamps_per_buff - accum_num_samps, _queue_metadata, + io_type, timeout, accum_num_samps*io_type.size + ); + + //metadata had an error code set, store for next call and return + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){ + _queue_error_for_next_call = true; + break; + } + accum_num_samps += num_samps; + } + return accum_num_samps; + } + + default: throw uhd::value_error("unknown recv mode"); + }//switch(recv_mode) + } + +private: + + boost::mutex _mutex; + vrt_unpacker_type _vrt_unpacker; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + bool _queue_error_for_next_call; + size_t _alignment_faulure_threshold; + rx_metadata_t _queue_metadata; + struct xport_chan_props_type{ + xport_chan_props_type(void): + packet_count(0), + handle_overflow(&handle_overflow_nop) + {} + get_buff_type get_buff; + size_t packet_count; + handle_overflow_type handle_overflow; + }; + std::vector<xport_chan_props_type> _props; + std::vector<void *> _io_buffs; //used in conversion + size_t _bytes_per_item; //used in conversion + std::vector<uhd::convert::function_type> _converters; //used in conversion + + //! information stored for a received buffer + struct per_buffer_info_type{ + managed_recv_buffer::sptr buff; + const boost::uint32_t *vrt_hdr; + vrt::if_packet_info_t ifpi; + time_spec_t time; + const char *copy_buff; + }; + + //!information stored for a set of aligned buffers + struct buffers_info_type : std::vector<per_buffer_info_type> { + buffers_info_type(const size_t size): + std::vector<per_buffer_info_type>(size), + alignment_time_valid(false), + data_bytes_to_copy(0), + fragment_offset_in_samps(0) + { + indexes_to_do.reset(size); + } + alignment_indexes indexes_to_do; //used in alignment logic + time_spec_t alignment_time; //used in alignment logic + bool alignment_time_valid; //used in alignment logic + size_t data_bytes_to_copy; //keeps track of state + size_t fragment_offset_in_samps; //keeps track of state + rx_metadata_t metadata; //packet description + }; + + //! a circular queue of buffer infos + std::vector<buffers_info_type> _buffers_infos; + size_t _buffers_infos_index; + buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];} + buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];} + buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];} + void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;} + + //! possible return options for the packet receiver + enum packet_type{ + PACKET_IF_DATA, + PACKET_TIMESTAMP_ERROR, + PACKET_INLINE_MESSAGE, + PACKET_TIMEOUT_ERROR, + PACKET_SEQUENCE_ERROR + }; + + /******************************************************************* + * Get and process a single packet from the transport: + * Receive a single packet at the given index. + * Extract all the relevant info and store. + * Check the info to determine the return code. + ******************************************************************/ + UHD_INLINE packet_type get_and_process_single_packet( + const size_t index, + buffers_info_type &prev_buffer_info, + buffers_info_type &curr_buffer_info, + double timeout + ){ + //get a single packet from the transport layer + managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + per_buffer_info_type &info = curr_buffer_info[index]; + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true + info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + + //store the packet count for the next iteration + #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t expected_packet_count = _props[index].packet_count; + _props[index].packet_count = (info.ifpi.packet_count + 1)%16; + #endif + + //-------------------------------------------------------------- + //-- Determine return conditions: + //-- The order of these checks is HOLY. + //-------------------------------------------------------------- + + //1) check for out of order timestamps + if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ + return PACKET_TIMESTAMP_ERROR; + } + + //2) check for inline IF message packets + if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ + return PACKET_INLINE_MESSAGE; + } + + //3) check for sequence errors + #ifndef SRPH_DONT_CHECK_SEQUENCE + if (expected_packet_count != info.ifpi.packet_count){ + return PACKET_SEQUENCE_ERROR; + } + #endif + + //4) otherwise the packet is normal! + return PACKET_IF_DATA; + } + + /******************************************************************* + * Alignment check: + * Check the received packet for alignment and mark accordingly. + ******************************************************************/ + UHD_INLINE void alignment_check( + const size_t index, buffers_info_type &info + ){ + //if alignment time was not valid or if the sequence id is newer: + // use this index's time as the alignment time + // reset the indexes list and remove this index + if (not info.alignment_time_valid or info[index].time > info.alignment_time){ + info.alignment_time_valid = true; + info.alignment_time = info[index].time; + info.indexes_to_do.reset(this->size()); + info.indexes_to_do.remove(index); + info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t); + } + + //if the sequence id matches: + // remove this index from the list and continue + else if (info[index].time == info.alignment_time){ + info.indexes_to_do.remove(index); + } + + //if the sequence id is older: + // continue with the same index to try again + //else if (info[index].time < info.alignment_time)... + } + + /******************************************************************* + * Get aligned buffers: + * Iterate through each index and try to accumulate aligned buffers. + * Handle all of the edge cases like inline messages and errors. + * The logic will throw out older packets until it finds a match. + ******************************************************************/ + UHD_INLINE void get_aligned_buffs(double timeout){ + + increment_buffer_info(); //increment to next buffer + buffers_info_type &prev_info = get_prev_buffer_info(); + buffers_info_type &curr_info = get_curr_buffer_info(); + buffers_info_type &next_info = get_next_buffer_info(); + + //Loop until we get a message of an aligned set of buffers: + // - Receive a single packet and extract its info. + // - Handle the packet type yielded by the receive. + // - Check the timestamps for alignment conditions. + size_t iterations = 0; + while (not curr_info.indexes_to_do.empty()){ + + //get the index to process for this iteration + const size_t index = curr_info.indexes_to_do.front(); + packet_type packet; + + //receive a single packet from the transport + try{ + packet = get_and_process_single_packet( + index, prev_info, curr_info, timeout + ); + } + + //handle the case when the get packet throws + catch(const std::exception &e){ + UHD_MSG(error) << boost::format( + "The receive packet handler caught an exception.\n%s" + ) % e.what() << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return; + } + + switch(packet){ + case PACKET_IF_DATA: + alignment_check(index, curr_info); + break; + + case PACKET_TIMESTAMP_ERROR: + //If the user changes the device time while streaming or without flushing, + //we can receive a packet that comes before the previous packet in time. + //This could cause the alignment logic to discard future received packets. + //Therefore, when this occurs, we reset the info to restart from scratch. + if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){ + curr_info.alignment_time_valid = false; + } + alignment_check(index, curr_info); + break; + + case PACKET_INLINE_MESSAGE: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsi and next_info[index].ifpi.has_tsf; + curr_info.metadata.time_spec = next_info[index].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); + if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow(); + UHD_MSG(fastpath) << "O"; + return; + + case PACKET_TIMEOUT_ERROR: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return; + + case PACKET_SEQUENCE_ERROR: + alignment_check(index, curr_info); + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; + curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t( + 0.0, prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_item, _samp_rate); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + UHD_MSG(fastpath) << "O"; + return; + + } + + //too many iterations: detect alignment failure + if (iterations++ > _alignment_faulure_threshold){ + UHD_MSG(error) << boost::format( + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return; + } + + } + + //set the metadata from the buffer information at index zero + curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsi and curr_info[0].ifpi.has_tsf; + curr_info.metadata.time_spec = curr_info[0].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + /* TODO SOB on RX not supported in hardware + static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits + curr_info.metadata.start_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_sob_flags) != 0); + */ + curr_info.metadata.start_of_burst = false; + static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits + curr_info.metadata.end_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_eob_flags) != 0); + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + } + + /******************************************************************* + * Receive a single packet: + * Handles fragmentation, messages, errors, and copy-conversion. + * When no fragments are available, call the get aligned buffers. + * Then copy-convert available data into the user's IO buffers. + ******************************************************************/ + UHD_INLINE size_t recv_one_packet( + const uhd::device::recv_buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const uhd::io_type_t &io_type, + double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //get the next buffer if the current one has expired + if (get_curr_buffer_info().data_bytes_to_copy == 0){ + + //reset current buffer info members for reuse + get_curr_buffer_info().fragment_offset_in_samps = 0; + get_curr_buffer_info().alignment_time_valid = false; + get_curr_buffer_info().indexes_to_do.reset(this->size()); + + //perform receive with alignment logic + get_aligned_buffs(timeout); + } + + buffers_info_type &info = get_curr_buffer_info(); + metadata = info.metadata; + + //interpolate the time spec (useful when this is a fragment) + metadata.time_spec += time_spec_t(0, info.fragment_offset_in_samps, _samp_rate); + + //extract the number of samples available to copy + const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_item; + const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available); + const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_item; + const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size(); + + size_t buff_index = 0; + BOOST_FOREACH(per_buffer_info_type &buff_info, info){ + + //fill a vector with pointers to the io buffers + BOOST_FOREACH(void *&io_buff, _io_buffs){ + io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes; + } + + //copy-convert the samples from the recv buffer + _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.); + + //update the rx copy buffer to reflect the bytes copied + buff_info.copy_buff += bytes_to_copy; + } + //update the copy buffer's availability + info.data_bytes_to_copy -= bytes_to_copy; + + //setup the fragment flags and offset + metadata.more_fragments = info.data_bytes_to_copy != 0; + metadata.fragment_offset = info.fragment_offset_in_samps; + info.fragment_offset_in_samps += nsamps_to_copy; //set for next call + + return nsamps_to_copy_per_io_buff; + } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp new file mode 100644 index 000000000..99d445180 --- /dev/null +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -0,0 +1,287 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/thread_time.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +/*********************************************************************** + * Super send packet handler + * + * A send packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are sent in unison in send(). + **********************************************************************/ +class send_packet_handler{ +public: + typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; + typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; + + /*! + * Make a new packet handler for send + * \param size the number of transport channels + */ + send_packet_handler(const size_t size = 1): + _next_packet_seq(0) + { + this->resize(size); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _props.resize(size); + static const boost::uint64_t zero = 0; + _zero_buffs.resize(size, &zero); + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt packer function and offset + void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){ + _vrt_packer = vrt_packer; + _header_offset_words32 = header_offset_words32; + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Setup the conversion functions (homogeneous across transports). + * Here, we load a table of converters for all possible io types. + * This makes the converter look-up an O(1) operation. + * \param otw_type the channel data type + * \param width the streams per channel (usually 1) + */ + void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ + _io_buffs.resize(width); + _converters.resize(128); + for (size_t io_type = 0; io_type < _converters.size(); io_type++){ + try{ + _converters[io_type] = uhd::convert::get_converter_cpu_to_otw( + io_type_t::tid_t(io_type), otw_type, 1, width + ); + }catch(const uhd::value_error &e){} //we expect this, not all io_types valid... + } + _bytes_per_item = otw_type.get_sample_size(); + } + + /*! + * Set the maximum number of samples per host packet. + * Ex: A USRP1 in dual channel mode would be half. + * \param num_samps the maximum samples in a packet + */ + void set_max_samples_per_packet(const size_t num_samps){ + _max_samples_per_packet = num_samps; + } + + //! Get a scoped lock object for this instance + boost::mutex::scoped_lock get_scoped_lock(void){ + return boost::mutex::scoped_lock(_mutex); + } + + /******************************************************************* + * Send: + * The entry point for the fast-path send calls. + * Dispatch into combinations of single packet send calls. + ******************************************************************/ + UHD_INLINE size_t send( + const uhd::device::send_buffs_type &buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t &metadata, + const uhd::io_type_t &io_type, + uhd::device::send_mode_t send_mode, + double timeout + ){ + boost::mutex::scoped_lock lock(_mutex); + + //translate the metadata to vrt if packet info + vrt::if_packet_info_t if_packet_info; + if_packet_info.has_sid = false; + if_packet_info.has_cid = false; + if_packet_info.has_tlr = false; + if_packet_info.has_tsi = metadata.has_time_spec; + if_packet_info.has_tsf = metadata.has_time_spec; + if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(_tick_rate)); + if_packet_info.sob = metadata.start_of_burst; + if_packet_info.eob = metadata.end_of_burst; + + if (nsamps_per_buff <= _max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; + switch(send_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + + //TODO remove this code when sample counts of zero are supported by hardware + if (nsamps_per_buff == 0) return send_one_packet( + _zero_buffs, 1, if_packet_info, io_type, timeout + ); + + return send_one_packet( + buffs, + std::min(nsamps_per_buff, _max_samples_per_packet), + if_packet_info, io_type, timeout + ); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t total_num_samps_sent = 0; + + //false until final fragment + if_packet_info.eob = false; + + const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet; + const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1; + + //loop through the following fragment indexes + for (size_t i = 0; i < num_fragments; i++){ + + //send a fragment with the helper function + const size_t num_samps_sent = send_one_packet( + buffs, _max_samples_per_packet, + if_packet_info, io_type, timeout, + total_num_samps_sent*io_type.size + ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; + + //setup metadata for the next fragment + const time_spec_t time_spec = metadata.time_spec + time_spec_t(0, total_num_samps_sent, _samp_rate); + if_packet_info.tsi = boost::uint32_t(time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(time_spec.get_tick_count(_tick_rate)); + if_packet_info.sob = false; + + } + + //send the final fragment with the helper function + if_packet_info.eob = metadata.end_of_burst; + return total_num_samps_sent + send_one_packet( + buffs, final_length, + if_packet_info, io_type, timeout, + total_num_samps_sent*io_type.size + ); + } + + default: throw uhd::value_error("unknown send mode"); + }//switch(send_mode) + } + +private: + + boost::mutex _mutex; + vrt_packer_type _vrt_packer; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + struct xport_chan_props_type{ + get_buff_type get_buff; + }; + std::vector<xport_chan_props_type> _props; + std::vector<const void *> _io_buffs; //used in conversion + size_t _bytes_per_item; //used in conversion + std::vector<uhd::convert::function_type> _converters; //used in conversion + size_t _max_samples_per_packet; + std::vector<const void *> _zero_buffs; + size_t _next_packet_seq; + + /******************************************************************* + * Send a single packet: + ******************************************************************/ + size_t send_one_packet( + const uhd::device::send_buffs_type &buffs, + const size_t nsamps_per_buff, + vrt::if_packet_info_t &if_packet_info, + const uhd::io_type_t &io_type, + double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //load the rest of the if_packet_info in here + if_packet_info.num_payload_words32 = (nsamps_per_buff*_io_buffs.size()*_bytes_per_item)/sizeof(boost::uint32_t); + if_packet_info.packet_count = _next_packet_seq; + + size_t buff_index = 0; + BOOST_FOREACH(xport_chan_props_type &props, _props){ + managed_send_buffer::sptr buff = props.get_buff(timeout); + if (buff.get() == NULL) return 0; //timeout + + //fill a vector with pointers to the io buffers + BOOST_FOREACH(const void *&io_buff, _io_buffs){ + io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes; + } + boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; + + //pack metadata into a vrt header + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //copy-convert the samples into the send buffer + _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.); + + //commit the samples to the zero-copy interface + size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); + buff->commit(num_bytes_total); + + } + _next_packet_seq++; //increment sequence after commits + return nsamps_per_buff; + } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */ diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt index b38afccf0..b7bcfb7d5 100644 --- a/host/tests/CMakeLists.txt +++ b/host/tests/CMakeLists.txt @@ -28,6 +28,8 @@ SET(test_sources gain_group_test.cpp msg_test.cpp ranges_test.cpp + sph_recv_test.cpp + sph_send_test.cpp subdev_spec_test.cpp time_spec_test.cpp tune_helper_test.cpp diff --git a/host/tests/sph_recv_test.cpp b/host/tests/sph_recv_test.cpp new file mode 100644 index 000000000..6bb5d1175 --- /dev/null +++ b/host/tests/sph_recv_test.cpp @@ -0,0 +1,715 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include "../lib/transport/super_recv_packet_handler.hpp" +#include <boost/shared_array.hpp> +#include <boost/bind.hpp> +#include <complex> +#include <vector> +#include <list> + +#define BOOST_CHECK_TS_CLOSE(a, b) \ + BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001) + +/*********************************************************************** + * A dummy managed receive buffer for testing + **********************************************************************/ +class dummy_mrb : public uhd::transport::managed_recv_buffer{ +public: + void release(void){ + //NOP + } + + sptr get_new(boost::shared_array<char> mem, size_t len){ + _mem = mem; + _len = len; + return make_managed_buffer(this); + } + +private: + const void *get_buff(void) const{return _mem.get();} + size_t get_size(void) const{return _len;} + + boost::shared_array<char> _mem; + size_t _len; +}; + +/*********************************************************************** + * A dummy transport class to fill with fake data + **********************************************************************/ +class dummy_recv_xport_class{ +public: + dummy_recv_xport_class(const uhd::otw_type_t &otw_type){ + _otw_type = otw_type; + } + + void push_back_packet( + uhd::transport::vrt::if_packet_info_t &ifpi, + const boost::uint32_t optional_msg_word = 0 + ){ + const size_t max_pkt_len = (ifpi.num_payload_words32 + uhd::transport::vrt::max_if_hdr_words32 + 1/*tlr*/)*sizeof(boost::uint32_t); + _mems.push_back(boost::shared_array<char>(new char[max_pkt_len])); + if (_otw_type.byteorder == uhd::otw_type_t::BO_BIG_ENDIAN){ + uhd::transport::vrt::if_hdr_pack_be(reinterpret_cast<boost::uint32_t *>(_mems.back().get()), ifpi); + } + if (_otw_type.byteorder == uhd::otw_type_t::BO_LITTLE_ENDIAN){ + uhd::transport::vrt::if_hdr_pack_le(reinterpret_cast<boost::uint32_t *>(_mems.back().get()), ifpi); + } + (reinterpret_cast<boost::uint32_t *>(_mems.back().get()) + ifpi.num_header_words32)[0] = optional_msg_word | uhd::byteswap(optional_msg_word); + _lens.push_back(ifpi.num_packet_words32*sizeof(boost::uint32_t)); + } + + uhd::transport::managed_recv_buffer::sptr get_recv_buff(double){ + if (_mems.empty()) return uhd::transport::managed_recv_buffer::sptr(); //timeout + _mrbs.push_back(dummy_mrb()); + uhd::transport::managed_recv_buffer::sptr mrb = _mrbs.back().get_new(_mems.front(), _lens.front()); + _mems.pop_front(); + _lens.pop_front(); + return mrb; + } + +private: + std::list<boost::shared_array<char> > _mems; + std::list<size_t> _lens; + std::list<dummy_mrb> _mrbs; //list means no-realloc + uhd::otw_type_t _otw_type; +}; + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_normal){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + dummy_recv_xport_class dummy_recv_xport(otw_type); + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + dummy_recv_xport.push_back_packet(ifpi); + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(1); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1)); + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > buff(20); + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); + num_accum_samps += num_samps_ret; + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_sequence_error){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + dummy_recv_xport_class dummy_recv_xport(otw_type); + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + if (i != NUM_PKTS_TO_TEST/2){ //simulate a lost packet + dummy_recv_xport.push_back_packet(ifpi); + } + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(1); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1)); + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > buff(20); + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + if (i == NUM_PKTS_TO_TEST/2){ + //must get the soft overflow here + BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + num_accum_samps += 10 + i%10; + } + else{ + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); + num_accum_samps += num_samps_ret; + } + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_inline_message){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + dummy_recv_xport_class dummy_recv_xport(otw_type); + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + dummy_recv_xport.push_back_packet(ifpi); + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + + //simulate overflow + if (i == NUM_PKTS_TO_TEST/2){ + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_EXTENSION; + ifpi.num_payload_words32 = 1; + dummy_recv_xport.push_back_packet(ifpi, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + ifpi.packet_count++; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + } + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(1); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1)); + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > buff(20); + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); + num_accum_samps += num_samps_ret; + if (i == NUM_PKTS_TO_TEST/2){ + handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + std::cout << "metadata.error_code " << metadata.error_code << std::endl; + BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + } + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_normal){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + static const size_t NUM_SAMPS_PER_BUFF = 20; + static const size_t NCHANNELS = 4; + + std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + for (size_t ch = 0; ch < NCHANNELS; ch++){ + dummy_recv_xports[ch].push_back_packet(ifpi); + } + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(NCHANNELS); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); + } + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); + std::vector<std::complex<float> *> buffs(NCHANNELS); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; + } + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); + num_accum_samps += num_samps_ret; + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } + +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_sequence_error){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + static const size_t NUM_SAMPS_PER_BUFF = 20; + static const size_t NCHANNELS = 4; + + std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + for (size_t ch = 0; ch < NCHANNELS; ch++){ + if (i == NUM_PKTS_TO_TEST/2 and ch == 2){ + continue; //simulates a lost packet + } + dummy_recv_xports[ch].push_back_packet(ifpi); + } + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(NCHANNELS); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); + } + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); + std::vector<std::complex<float> *> buffs(NCHANNELS); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; + } + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + if (i == NUM_PKTS_TO_TEST/2){ + //must get the soft overflow here + BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + num_accum_samps += 10 + i%10; + } + else{ + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); + num_accum_samps += num_samps_ret; + } + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_time_error){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + static const size_t NUM_SAMPS_PER_BUFF = 20; + static const size_t NCHANNELS = 4; + + std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + for (size_t ch = 0; ch < NCHANNELS; ch++){ + dummy_recv_xports[ch].push_back_packet(ifpi); + } + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + if (i == NUM_PKTS_TO_TEST/2){ + ifpi.tsf = 0; //simulate the user changing the time + } + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(NCHANNELS); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); + } + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); + std::vector<std::complex<float> *> buffs(NCHANNELS); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; + } + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10); + num_accum_samps += num_samps_ret; + if (i == NUM_PKTS_TO_TEST/2){ + num_accum_samps = 0; //simulate the user changing the time + } + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_fragment){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + uhd::transport::vrt::if_packet_info_t ifpi; + ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA; + ifpi.num_payload_words32 = 0; + ifpi.packet_count = 0; + ifpi.sob = true; + ifpi.eob = false; + ifpi.has_sid = false; + ifpi.has_cid = false; + ifpi.has_tsi = true; + ifpi.has_tsf = true; + ifpi.tsi = 0; + ifpi.tsf = 0; + ifpi.has_tlr = false; + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + static const size_t NUM_SAMPS_PER_BUFF = 10; + static const size_t NCHANNELS = 4; + + std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type)); + + //generate a bunch of packets + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + ifpi.num_payload_words32 = 10 + i%10; + for (size_t ch = 0; ch < NCHANNELS; ch++){ + dummy_recv_xports[ch].push_back_packet(ifpi); + } + ifpi.packet_count++; + ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE); + } + + //create the super receive packet handler + uhd::transport::sph::recv_packet_handler handler(NCHANNELS); + handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1)); + } + handler.set_converter(otw_type); + + //check the received packets + size_t num_accum_samps = 0; + std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS); + std::vector<std::complex<float> *> buffs(NCHANNELS); + for (size_t ch = 0; ch < NCHANNELS; ch++){ + buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF]; + } + uhd::rx_metadata_t metadata; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + size_t num_samps_ret = handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, 10); + num_accum_samps += num_samps_ret; + + if (not metadata.more_fragments) continue; + + num_samps_ret = handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE); + BOOST_CHECK(not metadata.more_fragments); + BOOST_CHECK_EQUAL(metadata.fragment_offset, 10); + BOOST_CHECK(metadata.has_time_spec); + BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE)); + BOOST_CHECK_EQUAL(num_samps_ret, i%10); + num_accum_samps += num_samps_ret; + } + + //subsequent receives should be a timeout + for (size_t i = 0; i < 3; i++){ + std::cout << "timeout check " << i << std::endl; + handler.recv( + buffs, NUM_SAMPS_PER_BUFF, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::RECV_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT); + } + +} diff --git a/host/tests/sph_send_test.cpp b/host/tests/sph_send_test.cpp new file mode 100644 index 000000000..ed2f54371 --- /dev/null +++ b/host/tests/sph_send_test.cpp @@ -0,0 +1,204 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <boost/test/unit_test.hpp> +#include "../lib/transport/super_send_packet_handler.hpp" +#include <boost/shared_array.hpp> +#include <boost/bind.hpp> +#include <complex> +#include <vector> +#include <list> + +#define BOOST_CHECK_TS_CLOSE(a, b) \ + BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001) + +/*********************************************************************** + * A dummy managed send buffer for testing + **********************************************************************/ +class dummy_msb : public uhd::transport::managed_send_buffer{ +public: + void commit(size_t len){ + if (len == 0) return; + *_len = len; + } + + sptr get_new(boost::shared_array<char> mem, size_t *len){ + _mem = mem; + _len = len; + return make_managed_buffer(this); + } + +private: + void *get_buff(void) const{return _mem.get();} + size_t get_size(void) const{return *_len;} + + boost::shared_array<char> _mem; + size_t *_len; +}; + +/*********************************************************************** + * A dummy transport class to fill with fake data + **********************************************************************/ +class dummy_send_xport_class{ +public: + dummy_send_xport_class(const uhd::otw_type_t &otw_type){ + _otw_type = otw_type; + } + + void pop_front_packet( + uhd::transport::vrt::if_packet_info_t &ifpi + ){ + ifpi.num_packet_words32 = _lens.front()/sizeof(boost::uint32_t); + if (_otw_type.byteorder == uhd::otw_type_t::BO_BIG_ENDIAN){ + uhd::transport::vrt::if_hdr_unpack_be(reinterpret_cast<boost::uint32_t *>(_mems.front().get()), ifpi); + } + if (_otw_type.byteorder == uhd::otw_type_t::BO_LITTLE_ENDIAN){ + uhd::transport::vrt::if_hdr_unpack_le(reinterpret_cast<boost::uint32_t *>(_mems.front().get()), ifpi); + } + _mems.pop_front(); + _lens.pop_front(); + } + + uhd::transport::managed_send_buffer::sptr get_send_buff(double){ + _msbs.push_back(dummy_msb()); + _mems.push_back(boost::shared_array<char>(new char[1000])); + _lens.push_back(1000); + uhd::transport::managed_send_buffer::sptr mrb = _msbs.back().get_new(_mems.back(), &_lens.back()); + return mrb; + } + +private: + std::list<boost::shared_array<char> > _mems; + std::list<size_t> _lens; + std::list<dummy_msb> _msbs; //list means no-realloc + uhd::otw_type_t _otw_type; +}; + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_send_one_channel_one_packet_mode){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + dummy_send_xport_class dummy_send_xport(otw_type); + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + + //create the super send packet handler + uhd::transport::sph::send_packet_handler handler(1); + handler.set_vrt_packer(&uhd::transport::vrt::if_hdr_pack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + handler.set_xport_chan_get_buff(0, boost::bind(&dummy_send_xport_class::get_send_buff, &dummy_send_xport, _1)); + handler.set_converter(otw_type); + handler.set_max_samples_per_packet(20); + + //allocate metadata and buffer + std::vector<std::complex<float> > buff(20); + uhd::tx_metadata_t metadata; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + //generate the test data + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + metadata.start_of_burst = (i == 0); + metadata.end_of_burst = (i == NUM_PKTS_TO_TEST-1); + const size_t num_sent = handler.send( + &buff.front(), 10 + i%10, metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::SEND_MODE_ONE_PACKET, 1.0 + ); + BOOST_CHECK_EQUAL(num_sent, 10 + i%10); + metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE); + } + + //check the sent packets + size_t num_accum_samps = 0; + uhd::transport::vrt::if_packet_info_t ifpi; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + dummy_send_xport.pop_front_packet(ifpi); + BOOST_CHECK_EQUAL(ifpi.num_payload_words32, 10+i%10); + BOOST_CHECK(ifpi.has_tsi); + BOOST_CHECK(ifpi.has_tsf); + BOOST_CHECK_EQUAL(ifpi.tsi, 0); + BOOST_CHECK_EQUAL(ifpi.tsf, num_accum_samps*TICK_RATE/SAMP_RATE); + BOOST_CHECK_EQUAL(ifpi.sob, i == 0); + BOOST_CHECK_EQUAL(ifpi.eob, i == NUM_PKTS_TO_TEST-1); + num_accum_samps += ifpi.num_payload_words32; + } +} + +//////////////////////////////////////////////////////////////////////// +BOOST_AUTO_TEST_CASE(test_sph_send_one_channel_full_buffer_mode){ +//////////////////////////////////////////////////////////////////////// + uhd::otw_type_t otw_type; + otw_type.width = 16; + otw_type.shift = 0; + otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; + + dummy_send_xport_class dummy_send_xport(otw_type); + + static const double TICK_RATE = 100e6; + static const double SAMP_RATE = 10e6; + static const size_t NUM_PKTS_TO_TEST = 30; + + //create the super send packet handler + uhd::transport::sph::send_packet_handler handler(1); + handler.set_vrt_packer(&uhd::transport::vrt::if_hdr_pack_be); + handler.set_tick_rate(TICK_RATE); + handler.set_samp_rate(SAMP_RATE); + handler.set_xport_chan_get_buff(0, boost::bind(&dummy_send_xport_class::get_send_buff, &dummy_send_xport, _1)); + handler.set_converter(otw_type); + handler.set_max_samples_per_packet(20); + + //allocate metadata and buffer + std::vector<std::complex<float> > buff(20*NUM_PKTS_TO_TEST); + uhd::tx_metadata_t metadata; + metadata.start_of_burst = true; + metadata.end_of_burst = true; + metadata.has_time_spec = true; + metadata.time_spec = uhd::time_spec_t(0.0); + + //generate the test data + const size_t num_sent = handler.send( + &buff.front(), buff.size(), metadata, + uhd::io_type_t::COMPLEX_FLOAT32, + uhd::device::SEND_MODE_FULL_BUFF, 1.0 + ); + BOOST_CHECK_EQUAL(num_sent, buff.size()); + + //check the sent packets + size_t num_accum_samps = 0; + uhd::transport::vrt::if_packet_info_t ifpi; + for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){ + std::cout << "data check " << i << std::endl; + dummy_send_xport.pop_front_packet(ifpi); + BOOST_CHECK_EQUAL(ifpi.num_payload_words32, 20); + BOOST_CHECK(ifpi.has_tsi); + BOOST_CHECK(ifpi.has_tsf); + BOOST_CHECK_EQUAL(ifpi.tsi, 0); + BOOST_CHECK_EQUAL(ifpi.tsf, num_accum_samps*TICK_RATE/SAMP_RATE); + BOOST_CHECK_EQUAL(ifpi.sob, i == 0); + BOOST_CHECK_EQUAL(ifpi.eob, i == NUM_PKTS_TO_TEST-1); + num_accum_samps += ifpi.num_payload_words32; + } +} |