aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xhost/lib/transport/gen_vrt_if_packet.py44
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp637
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp287
-rw-r--r--host/tests/CMakeLists.txt2
-rw-r--r--host/tests/sph_recv_test.cpp715
-rw-r--r--host/tests/sph_send_test.cpp204
6 files changed, 1876 insertions, 13 deletions
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py
index 7440def6a..5f048d8c7 100755
--- a/host/lib/transport/gen_vrt_if_packet.py
+++ b/host/lib/transport/gen_vrt_if_packet.py
@@ -62,6 +62,8 @@ static pred_table_type get_pred_unpack_table(void){
if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p);
if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p);
if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p);
+ if(vrt_hdr_word & $hex(0x1 << 24)) table[i] |= $hex($eob_p);
+ if(vrt_hdr_word & $hex(0x1 << 25)) table[i] |= $hex($sob_p);
}
return table;
}
@@ -84,9 +86,11 @@ void vrt::if_hdr_pack_$(suffix)(
if (if_packet_info.has_tsi) pred |= $hex($tsi_p);
if (if_packet_info.has_tsf) pred |= $hex($tsf_p);
if (if_packet_info.has_tlr) pred |= $hex($tlr_p);
+ if (if_packet_info.eob) pred |= $hex($eob_p);
+ if (if_packet_info.sob) pred |= $hex($sob_p);
switch(pred){
- #for $pred in range(2**5)
+ #for $pred in range(2**7)
case $pred:
#set $num_header_words = 1
#set $flags = 0
@@ -126,6 +130,13 @@ void vrt::if_hdr_pack_$(suffix)(
#else
#set $num_trailer_words = 0;
#end if
+ ########## Burst Flags ##########
+ #if $pred & $eob_p
+ #set $flags |= (0x1 << 24);
+ #end if
+ #if $pred & $sob_p
+ #set $flags |= (0x1 << 25);
+ #end if
########## Variables ##########
if_packet_info.num_header_words32 = $num_header_words;
if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32;
@@ -134,10 +145,6 @@ void vrt::if_hdr_pack_$(suffix)(
#end for
}
- //set the burst flags
- if (if_packet_info.sob) vrt_hdr_flags |= $hex(0x1 << 25);
- if (if_packet_info.eob) vrt_hdr_flags |= $hex(0x1 << 24);
-
//fill in complete header word
packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0
| (if_packet_info.packet_type << 29)
@@ -162,13 +169,11 @@ void vrt::if_hdr_unpack_$(suffix)(
//extract fields from the header
if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29);
if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf;
- //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented
- //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented
const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)];
switch(pred){
- #for $pred in range(2**5)
+ #for $pred in range(2**7)
case $pred:
#set $has_time_spec = False
#set $num_header_words = 1
@@ -215,6 +220,17 @@ void vrt::if_hdr_unpack_$(suffix)(
if_packet_info.has_tlr = false;
#set $num_trailer_words = 0;
#end if
+ ########## Burst Flags ##########
+ #if $pred & $eob_p
+ if_packet_info.eob = true;
+ #else
+ if_packet_info.eob = false;
+ #end if
+ #if $pred & $sob_p
+ if_packet_info.sob = true;
+ #else
+ if_packet_info.sob = false;
+ #end if
########## Variables ##########
//another failure case
if (packet_words32 < $($num_header_words + $num_trailer_words))
@@ -243,9 +259,11 @@ if __name__ == '__main__':
open(sys.argv[1], 'w').write(parse_tmpl(
TMPL_TEXT,
file=__file__,
- sid_p = 0b00001,
- cid_p = 0b00010,
- tsi_p = 0b00100,
- tsf_p = 0b01000,
- tlr_p = 0b10000,
+ sid_p = 0b0000001,
+ cid_p = 0b0000010,
+ tsi_p = 0b0000100,
+ tsf_p = 0b0001000,
+ tlr_p = 0b0010000,
+ sob_p = 0b0100000,
+ eob_p = 0b1000000,
))
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
new file mode 100644
index 000000000..78d8dfce8
--- /dev/null
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -0,0 +1,637 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/device.hpp>
+#include <uhd/utils/msg.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/types/io_type.hpp>
+#include <uhd/types/otw_type.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/transport/vrt_if_packet.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <boost/format.hpp>
+#include <iostream>
+#include <vector>
+
+namespace uhd{ namespace transport{ namespace sph{
+
+UHD_INLINE boost::uint32_t get_context_code(
+ const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info
+){
+ //extract the context word (we dont know the endianness so mirror the bytes)
+ boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] |
+ uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]);
+ return word0 & 0xff;
+}
+
+typedef boost::function<void(void)> handle_overflow_type;
+static inline void handle_overflow_nop(void){}
+
+/***********************************************************************
+ * Alignment indexes class:
+ * - Access an integer set with very quick operations.
+ **********************************************************************/
+class alignment_indexes{
+public:
+ typedef boost::uint16_t index_type; //16 buffers
+
+ alignment_indexes(void):
+ _indexes(0),
+ _sizes(256, 0),
+ _fronts(256, ~0)
+ {
+ //fill the O(1) look up tables for a single byte
+ for (size_t i = 0; i < 256; i++){
+ for (size_t j = 0; j < 8; j++){
+ if (i & (1 << j)){
+ _sizes[i]++;
+ _fronts[i] = j;
+ }
+ }
+ }
+ }
+
+ UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;}
+
+ UHD_INLINE size_t front(void){
+ //check one byte per iteration
+ for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){
+ size_t front = _fronts[(_indexes >> i) & 0xff];
+ if (front != size_t(~0)) return front + i;
+ }
+ if (empty()) throw uhd::runtime_error("cannot call front() when empty");
+ UHD_THROW_INVALID_CODE_PATH();
+ }
+
+ UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);}
+
+ UHD_INLINE bool empty(void){return _indexes == 0;}
+
+ UHD_INLINE size_t size(void){
+ size_t size = 0;
+ //check one byte per iteration
+ for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){
+ size += _sizes[(_indexes >> i) & 0xff];
+ }
+ return size;
+ }
+
+private:
+ index_type _indexes;
+ std::vector<size_t> _sizes;
+ std::vector<size_t> _fronts;
+};
+
+/***********************************************************************
+ * Super receive packet handler
+ *
+ * A receive packet handler represents a group of channels.
+ * The channel group shares a common sample rate.
+ * All channels are received in unison in recv().
+ **********************************************************************/
+class recv_packet_handler{
+public:
+ typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type;
+ typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &);
+ //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type;
+
+ /*!
+ * Make a new packet handler for receive
+ * \param size the number of transport channels
+ */
+ recv_packet_handler(const size_t size = 1):
+ _queue_error_for_next_call(false),
+ _buffers_infos_index(0)
+ {
+ UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8);
+ this->resize(size);
+ set_alignment_failure_threshold(1000);
+ }
+
+ //! Resize the number of transport channels
+ void resize(const size_t size){
+ if (this->size() == size) return;
+ _props.resize(size);
+ _buffers_infos.resize(4, buffers_info_type(size));
+ }
+
+ //! Get the channel width of this handler
+ size_t size(void) const{
+ return _props.size();
+ }
+
+ //! Setup the vrt unpacker function and offset
+ void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){
+ _vrt_unpacker = vrt_unpacker;
+ _header_offset_words32 = header_offset_words32;
+ }
+
+ /*!
+ * Set the threshold for alignment failure.
+ * How many packets throw out before giving up?
+ * \param threshold number of packets per channel
+ */
+ void set_alignment_failure_threshold(const size_t threshold){
+ _alignment_faulure_threshold = threshold*this->size();
+ }
+
+ //! Set the rate of ticks per second
+ void set_tick_rate(const double rate){
+ _tick_rate = rate;
+ }
+
+ //! Set the rate of samples per second
+ void set_samp_rate(const double rate){
+ _samp_rate = rate;
+ }
+
+ /*!
+ * Set the function to get a managed buffer.
+ * \param xport_chan which transport channel
+ * \param get_buff the getter function
+ */
+ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){
+ _props.at(xport_chan).get_buff = get_buff;
+ }
+
+ /*!
+ * Setup the conversion functions (homogeneous across transports).
+ * Here, we load a table of converters for all possible io types.
+ * This makes the converter look-up an O(1) operation.
+ * \param otw_type the channel data type
+ * \param width the streams per channel (usually 1)
+ */
+ void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){
+ _io_buffs.resize(width);
+ _converters.resize(128);
+ for (size_t io_type = 0; io_type < _converters.size(); io_type++){
+ try{
+ _converters[io_type] = uhd::convert::get_converter_otw_to_cpu(
+ io_type_t::tid_t(io_type), otw_type, 1, width
+ );
+ }catch(const uhd::value_error &e){} //we expect this, not all io_types valid...
+ }
+ _bytes_per_item = otw_type.get_sample_size();
+ }
+
+ //! Set the transport channel's overflow handler
+ void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){
+ _props.at(xport_chan).handle_overflow = handle_overflow;
+ }
+
+ //! Get a scoped lock object for this instance
+ boost::mutex::scoped_lock get_scoped_lock(void){
+ return boost::mutex::scoped_lock(_mutex);
+ }
+
+ /*******************************************************************
+ * Receive:
+ * The entry point for the fast-path receive calls.
+ * Dispatch into combinations of single packet receive calls.
+ ******************************************************************/
+ UHD_INLINE size_t recv(
+ const uhd::device::recv_buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t &metadata,
+ const uhd::io_type_t &io_type,
+ uhd::device::recv_mode_t recv_mode,
+ double timeout
+ ){
+ boost::mutex::scoped_lock lock(_mutex);
+
+ //handle metadata queued from a previous receive
+ if (_queue_error_for_next_call){
+ _queue_error_for_next_call = false;
+ metadata = _queue_metadata;
+ //We want to allow a full buffer recv to be cut short by a timeout,
+ //but do not want to generate an inline timeout message packet.
+ if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0;
+ }
+
+ switch(recv_mode){
+
+ ////////////////////////////////////////////////////////////////
+ case uhd::device::RECV_MODE_ONE_PACKET:{
+ ////////////////////////////////////////////////////////////////
+ return recv_one_packet(buffs, nsamps_per_buff, metadata, io_type, timeout);
+ }
+
+ ////////////////////////////////////////////////////////////////
+ case uhd::device::RECV_MODE_FULL_BUFF:{
+ ////////////////////////////////////////////////////////////////
+ size_t accum_num_samps = recv_one_packet(
+ buffs, nsamps_per_buff, metadata, io_type, timeout
+ );
+
+ //first recv had an error code set, return immediately
+ if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps;
+
+ //loop until buffer is filled or error code
+ while(accum_num_samps < nsamps_per_buff){
+ size_t num_samps = recv_one_packet(
+ buffs, nsamps_per_buff - accum_num_samps, _queue_metadata,
+ io_type, timeout, accum_num_samps*io_type.size
+ );
+
+ //metadata had an error code set, store for next call and return
+ if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){
+ _queue_error_for_next_call = true;
+ break;
+ }
+ accum_num_samps += num_samps;
+ }
+ return accum_num_samps;
+ }
+
+ default: throw uhd::value_error("unknown recv mode");
+ }//switch(recv_mode)
+ }
+
+private:
+
+ boost::mutex _mutex;
+ vrt_unpacker_type _vrt_unpacker;
+ size_t _header_offset_words32;
+ double _tick_rate, _samp_rate;
+ bool _queue_error_for_next_call;
+ size_t _alignment_faulure_threshold;
+ rx_metadata_t _queue_metadata;
+ struct xport_chan_props_type{
+ xport_chan_props_type(void):
+ packet_count(0),
+ handle_overflow(&handle_overflow_nop)
+ {}
+ get_buff_type get_buff;
+ size_t packet_count;
+ handle_overflow_type handle_overflow;
+ };
+ std::vector<xport_chan_props_type> _props;
+ std::vector<void *> _io_buffs; //used in conversion
+ size_t _bytes_per_item; //used in conversion
+ std::vector<uhd::convert::function_type> _converters; //used in conversion
+
+ //! information stored for a received buffer
+ struct per_buffer_info_type{
+ managed_recv_buffer::sptr buff;
+ const boost::uint32_t *vrt_hdr;
+ vrt::if_packet_info_t ifpi;
+ time_spec_t time;
+ const char *copy_buff;
+ };
+
+ //!information stored for a set of aligned buffers
+ struct buffers_info_type : std::vector<per_buffer_info_type> {
+ buffers_info_type(const size_t size):
+ std::vector<per_buffer_info_type>(size),
+ alignment_time_valid(false),
+ data_bytes_to_copy(0),
+ fragment_offset_in_samps(0)
+ {
+ indexes_to_do.reset(size);
+ }
+ alignment_indexes indexes_to_do; //used in alignment logic
+ time_spec_t alignment_time; //used in alignment logic
+ bool alignment_time_valid; //used in alignment logic
+ size_t data_bytes_to_copy; //keeps track of state
+ size_t fragment_offset_in_samps; //keeps track of state
+ rx_metadata_t metadata; //packet description
+ };
+
+ //! a circular queue of buffer infos
+ std::vector<buffers_info_type> _buffers_infos;
+ size_t _buffers_infos_index;
+ buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];}
+ buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];}
+ buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];}
+ void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;}
+
+ //! possible return options for the packet receiver
+ enum packet_type{
+ PACKET_IF_DATA,
+ PACKET_TIMESTAMP_ERROR,
+ PACKET_INLINE_MESSAGE,
+ PACKET_TIMEOUT_ERROR,
+ PACKET_SEQUENCE_ERROR
+ };
+
+ /*******************************************************************
+ * Get and process a single packet from the transport:
+ * Receive a single packet at the given index.
+ * Extract all the relevant info and store.
+ * Check the info to determine the return code.
+ ******************************************************************/
+ UHD_INLINE packet_type get_and_process_single_packet(
+ const size_t index,
+ buffers_info_type &prev_buffer_info,
+ buffers_info_type &curr_buffer_info,
+ double timeout
+ ){
+ //get a single packet from the transport layer
+ managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff;
+ buff = _props[index].get_buff(timeout);
+ if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR;
+
+ //bounds check before extract
+ size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t);
+ if (num_packet_words32 <= _header_offset_words32){
+ throw std::runtime_error("recv buffer smaller than vrt packet offset");
+ }
+
+ //extract packet info
+ per_buffer_info_type &info = curr_buffer_info[index];
+ info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32;
+ info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32;
+ _vrt_unpacker(info.vrt_hdr, info.ifpi);
+ info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true
+ info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32);
+
+ //store the packet count for the next iteration
+ #ifndef SRPH_DONT_CHECK_SEQUENCE
+ const size_t expected_packet_count = _props[index].packet_count;
+ _props[index].packet_count = (info.ifpi.packet_count + 1)%16;
+ #endif
+
+ //--------------------------------------------------------------
+ //-- Determine return conditions:
+ //-- The order of these checks is HOLY.
+ //--------------------------------------------------------------
+
+ //1) check for out of order timestamps
+ if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){
+ return PACKET_TIMESTAMP_ERROR;
+ }
+
+ //2) check for inline IF message packets
+ if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){
+ return PACKET_INLINE_MESSAGE;
+ }
+
+ //3) check for sequence errors
+ #ifndef SRPH_DONT_CHECK_SEQUENCE
+ if (expected_packet_count != info.ifpi.packet_count){
+ return PACKET_SEQUENCE_ERROR;
+ }
+ #endif
+
+ //4) otherwise the packet is normal!
+ return PACKET_IF_DATA;
+ }
+
+ /*******************************************************************
+ * Alignment check:
+ * Check the received packet for alignment and mark accordingly.
+ ******************************************************************/
+ UHD_INLINE void alignment_check(
+ const size_t index, buffers_info_type &info
+ ){
+ //if alignment time was not valid or if the sequence id is newer:
+ // use this index's time as the alignment time
+ // reset the indexes list and remove this index
+ if (not info.alignment_time_valid or info[index].time > info.alignment_time){
+ info.alignment_time_valid = true;
+ info.alignment_time = info[index].time;
+ info.indexes_to_do.reset(this->size());
+ info.indexes_to_do.remove(index);
+ info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t);
+ }
+
+ //if the sequence id matches:
+ // remove this index from the list and continue
+ else if (info[index].time == info.alignment_time){
+ info.indexes_to_do.remove(index);
+ }
+
+ //if the sequence id is older:
+ // continue with the same index to try again
+ //else if (info[index].time < info.alignment_time)...
+ }
+
+ /*******************************************************************
+ * Get aligned buffers:
+ * Iterate through each index and try to accumulate aligned buffers.
+ * Handle all of the edge cases like inline messages and errors.
+ * The logic will throw out older packets until it finds a match.
+ ******************************************************************/
+ UHD_INLINE void get_aligned_buffs(double timeout){
+
+ increment_buffer_info(); //increment to next buffer
+ buffers_info_type &prev_info = get_prev_buffer_info();
+ buffers_info_type &curr_info = get_curr_buffer_info();
+ buffers_info_type &next_info = get_next_buffer_info();
+
+ //Loop until we get a message of an aligned set of buffers:
+ // - Receive a single packet and extract its info.
+ // - Handle the packet type yielded by the receive.
+ // - Check the timestamps for alignment conditions.
+ size_t iterations = 0;
+ while (not curr_info.indexes_to_do.empty()){
+
+ //get the index to process for this iteration
+ const size_t index = curr_info.indexes_to_do.front();
+ packet_type packet;
+
+ //receive a single packet from the transport
+ try{
+ packet = get_and_process_single_packet(
+ index, prev_info, curr_info, timeout
+ );
+ }
+
+ //handle the case when the get packet throws
+ catch(const std::exception &e){
+ UHD_MSG(error) << boost::format(
+ "The receive packet handler caught an exception.\n%s"
+ ) % e.what() << std::endl;
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = false;
+ curr_info.metadata.time_spec = time_spec_t(0.0);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET;
+ return;
+ }
+
+ switch(packet){
+ case PACKET_IF_DATA:
+ alignment_check(index, curr_info);
+ break;
+
+ case PACKET_TIMESTAMP_ERROR:
+ //If the user changes the device time while streaming or without flushing,
+ //we can receive a packet that comes before the previous packet in time.
+ //This could cause the alignment logic to discard future received packets.
+ //Therefore, when this occurs, we reset the info to restart from scratch.
+ if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){
+ curr_info.alignment_time_valid = false;
+ }
+ alignment_check(index, curr_info);
+ break;
+
+ case PACKET_INLINE_MESSAGE:
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsi and next_info[index].ifpi.has_tsf;
+ curr_info.metadata.time_spec = next_info[index].time;
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi));
+ if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow();
+ UHD_MSG(fastpath) << "O";
+ return;
+
+ case PACKET_TIMEOUT_ERROR:
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = false;
+ curr_info.metadata.time_spec = time_spec_t(0.0);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT;
+ return;
+
+ case PACKET_SEQUENCE_ERROR:
+ alignment_check(index, curr_info);
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec;
+ curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t(
+ 0.0, prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_item, _samp_rate);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW;
+ UHD_MSG(fastpath) << "O";
+ return;
+
+ }
+
+ //too many iterations: detect alignment failure
+ if (iterations++ > _alignment_faulure_threshold){
+ UHD_MSG(error) << boost::format(
+ "The receive packet handler failed to time-align packets.\n"
+ "%u received packets were processed by the handler.\n"
+ "However, a timestamp match could not be determined.\n"
+ ) % iterations << std::endl;
+ std::swap(curr_info, next_info); //save progress from curr -> next
+ curr_info.metadata.has_time_spec = false;
+ curr_info.metadata.time_spec = time_spec_t(0.0);
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ curr_info.metadata.start_of_burst = false;
+ curr_info.metadata.end_of_burst = false;
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT;
+ return;
+ }
+
+ }
+
+ //set the metadata from the buffer information at index zero
+ curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsi and curr_info[0].ifpi.has_tsf;
+ curr_info.metadata.time_spec = curr_info[0].time;
+ curr_info.metadata.more_fragments = false;
+ curr_info.metadata.fragment_offset = 0;
+ /* TODO SOB on RX not supported in hardware
+ static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits
+ curr_info.metadata.start_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_sob_flags) != 0);
+ */
+ curr_info.metadata.start_of_burst = false;
+ static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits
+ curr_info.metadata.end_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_eob_flags) != 0);
+ curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE;
+
+ }
+
+ /*******************************************************************
+ * Receive a single packet:
+ * Handles fragmentation, messages, errors, and copy-conversion.
+ * When no fragments are available, call the get aligned buffers.
+ * Then copy-convert available data into the user's IO buffers.
+ ******************************************************************/
+ UHD_INLINE size_t recv_one_packet(
+ const uhd::device::recv_buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t &metadata,
+ const uhd::io_type_t &io_type,
+ double timeout,
+ const size_t buffer_offset_bytes = 0
+ ){
+ //get the next buffer if the current one has expired
+ if (get_curr_buffer_info().data_bytes_to_copy == 0){
+
+ //reset current buffer info members for reuse
+ get_curr_buffer_info().fragment_offset_in_samps = 0;
+ get_curr_buffer_info().alignment_time_valid = false;
+ get_curr_buffer_info().indexes_to_do.reset(this->size());
+
+ //perform receive with alignment logic
+ get_aligned_buffs(timeout);
+ }
+
+ buffers_info_type &info = get_curr_buffer_info();
+ metadata = info.metadata;
+
+ //interpolate the time spec (useful when this is a fragment)
+ metadata.time_spec += time_spec_t(0, info.fragment_offset_in_samps, _samp_rate);
+
+ //extract the number of samples available to copy
+ const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_item;
+ const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available);
+ const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_item;
+ const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size();
+
+ size_t buff_index = 0;
+ BOOST_FOREACH(per_buffer_info_type &buff_info, info){
+
+ //fill a vector with pointers to the io buffers
+ BOOST_FOREACH(void *&io_buff, _io_buffs){
+ io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes;
+ }
+
+ //copy-convert the samples from the recv buffer
+ _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.);
+
+ //update the rx copy buffer to reflect the bytes copied
+ buff_info.copy_buff += bytes_to_copy;
+ }
+ //update the copy buffer's availability
+ info.data_bytes_to_copy -= bytes_to_copy;
+
+ //setup the fragment flags and offset
+ metadata.more_fragments = info.data_bytes_to_copy != 0;
+ metadata.fragment_offset = info.fragment_offset_in_samps;
+ info.fragment_offset_in_samps += nsamps_to_copy; //set for next call
+
+ return nsamps_to_copy_per_io_buff;
+ }
+};
+
+}}} //namespace
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
new file mode 100644
index 000000000..99d445180
--- /dev/null
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -0,0 +1,287 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP
+#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/convert.hpp>
+#include <uhd/device.hpp>
+#include <uhd/utils/byteswap.hpp>
+#include <uhd/types/io_type.hpp>
+#include <uhd/types/otw_type.hpp>
+#include <uhd/types/metadata.hpp>
+#include <uhd/transport/vrt_if_packet.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/foreach.hpp>
+#include <boost/function.hpp>
+#include <iostream>
+#include <vector>
+
+namespace uhd{ namespace transport{ namespace sph{
+
+/***********************************************************************
+ * Super send packet handler
+ *
+ * A send packet handler represents a group of channels.
+ * The channel group shares a common sample rate.
+ * All channels are sent in unison in send().
+ **********************************************************************/
+class send_packet_handler{
+public:
+ typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type;
+ typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &);
+ //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type;
+
+ /*!
+ * Make a new packet handler for send
+ * \param size the number of transport channels
+ */
+ send_packet_handler(const size_t size = 1):
+ _next_packet_seq(0)
+ {
+ this->resize(size);
+ }
+
+ //! Resize the number of transport channels
+ void resize(const size_t size){
+ if (this->size() == size) return;
+ _props.resize(size);
+ static const boost::uint64_t zero = 0;
+ _zero_buffs.resize(size, &zero);
+ }
+
+ //! Get the channel width of this handler
+ size_t size(void) const{
+ return _props.size();
+ }
+
+ //! Setup the vrt packer function and offset
+ void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){
+ _vrt_packer = vrt_packer;
+ _header_offset_words32 = header_offset_words32;
+ }
+
+ //! Set the rate of ticks per second
+ void set_tick_rate(const double rate){
+ _tick_rate = rate;
+ }
+
+ //! Set the rate of samples per second
+ void set_samp_rate(const double rate){
+ _samp_rate = rate;
+ }
+
+ /*!
+ * Set the function to get a managed buffer.
+ * \param xport_chan which transport channel
+ * \param get_buff the getter function
+ */
+ void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){
+ _props.at(xport_chan).get_buff = get_buff;
+ }
+
+ /*!
+ * Setup the conversion functions (homogeneous across transports).
+ * Here, we load a table of converters for all possible io types.
+ * This makes the converter look-up an O(1) operation.
+ * \param otw_type the channel data type
+ * \param width the streams per channel (usually 1)
+ */
+ void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){
+ _io_buffs.resize(width);
+ _converters.resize(128);
+ for (size_t io_type = 0; io_type < _converters.size(); io_type++){
+ try{
+ _converters[io_type] = uhd::convert::get_converter_cpu_to_otw(
+ io_type_t::tid_t(io_type), otw_type, 1, width
+ );
+ }catch(const uhd::value_error &e){} //we expect this, not all io_types valid...
+ }
+ _bytes_per_item = otw_type.get_sample_size();
+ }
+
+ /*!
+ * Set the maximum number of samples per host packet.
+ * Ex: A USRP1 in dual channel mode would be half.
+ * \param num_samps the maximum samples in a packet
+ */
+ void set_max_samples_per_packet(const size_t num_samps){
+ _max_samples_per_packet = num_samps;
+ }
+
+ //! Get a scoped lock object for this instance
+ boost::mutex::scoped_lock get_scoped_lock(void){
+ return boost::mutex::scoped_lock(_mutex);
+ }
+
+ /*******************************************************************
+ * Send:
+ * The entry point for the fast-path send calls.
+ * Dispatch into combinations of single packet send calls.
+ ******************************************************************/
+ UHD_INLINE size_t send(
+ const uhd::device::send_buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t &metadata,
+ const uhd::io_type_t &io_type,
+ uhd::device::send_mode_t send_mode,
+ double timeout
+ ){
+ boost::mutex::scoped_lock lock(_mutex);
+
+ //translate the metadata to vrt if packet info
+ vrt::if_packet_info_t if_packet_info;
+ if_packet_info.has_sid = false;
+ if_packet_info.has_cid = false;
+ if_packet_info.has_tlr = false;
+ if_packet_info.has_tsi = metadata.has_time_spec;
+ if_packet_info.has_tsf = metadata.has_time_spec;
+ if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs());
+ if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(_tick_rate));
+ if_packet_info.sob = metadata.start_of_burst;
+ if_packet_info.eob = metadata.end_of_burst;
+
+ if (nsamps_per_buff <= _max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET;
+ switch(send_mode){
+
+ ////////////////////////////////////////////////////////////////
+ case uhd::device::SEND_MODE_ONE_PACKET:{
+ ////////////////////////////////////////////////////////////////
+
+ //TODO remove this code when sample counts of zero are supported by hardware
+ if (nsamps_per_buff == 0) return send_one_packet(
+ _zero_buffs, 1, if_packet_info, io_type, timeout
+ );
+
+ return send_one_packet(
+ buffs,
+ std::min(nsamps_per_buff, _max_samples_per_packet),
+ if_packet_info, io_type, timeout
+ );
+ }
+
+ ////////////////////////////////////////////////////////////////
+ case uhd::device::SEND_MODE_FULL_BUFF:{
+ ////////////////////////////////////////////////////////////////
+ size_t total_num_samps_sent = 0;
+
+ //false until final fragment
+ if_packet_info.eob = false;
+
+ const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet;
+ const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1;
+
+ //loop through the following fragment indexes
+ for (size_t i = 0; i < num_fragments; i++){
+
+ //send a fragment with the helper function
+ const size_t num_samps_sent = send_one_packet(
+ buffs, _max_samples_per_packet,
+ if_packet_info, io_type, timeout,
+ total_num_samps_sent*io_type.size
+ );
+ total_num_samps_sent += num_samps_sent;
+ if (num_samps_sent == 0) return total_num_samps_sent;
+
+ //setup metadata for the next fragment
+ const time_spec_t time_spec = metadata.time_spec + time_spec_t(0, total_num_samps_sent, _samp_rate);
+ if_packet_info.tsi = boost::uint32_t(time_spec.get_full_secs());
+ if_packet_info.tsf = boost::uint64_t(time_spec.get_tick_count(_tick_rate));
+ if_packet_info.sob = false;
+
+ }
+
+ //send the final fragment with the helper function
+ if_packet_info.eob = metadata.end_of_burst;
+ return total_num_samps_sent + send_one_packet(
+ buffs, final_length,
+ if_packet_info, io_type, timeout,
+ total_num_samps_sent*io_type.size
+ );
+ }
+
+ default: throw uhd::value_error("unknown send mode");
+ }//switch(send_mode)
+ }
+
+private:
+
+ boost::mutex _mutex;
+ vrt_packer_type _vrt_packer;
+ size_t _header_offset_words32;
+ double _tick_rate, _samp_rate;
+ struct xport_chan_props_type{
+ get_buff_type get_buff;
+ };
+ std::vector<xport_chan_props_type> _props;
+ std::vector<const void *> _io_buffs; //used in conversion
+ size_t _bytes_per_item; //used in conversion
+ std::vector<uhd::convert::function_type> _converters; //used in conversion
+ size_t _max_samples_per_packet;
+ std::vector<const void *> _zero_buffs;
+ size_t _next_packet_seq;
+
+ /*******************************************************************
+ * Send a single packet:
+ ******************************************************************/
+ size_t send_one_packet(
+ const uhd::device::send_buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ vrt::if_packet_info_t &if_packet_info,
+ const uhd::io_type_t &io_type,
+ double timeout,
+ const size_t buffer_offset_bytes = 0
+ ){
+ //load the rest of the if_packet_info in here
+ if_packet_info.num_payload_words32 = (nsamps_per_buff*_io_buffs.size()*_bytes_per_item)/sizeof(boost::uint32_t);
+ if_packet_info.packet_count = _next_packet_seq;
+
+ size_t buff_index = 0;
+ BOOST_FOREACH(xport_chan_props_type &props, _props){
+ managed_send_buffer::sptr buff = props.get_buff(timeout);
+ if (buff.get() == NULL) return 0; //timeout
+
+ //fill a vector with pointers to the io buffers
+ BOOST_FOREACH(const void *&io_buff, _io_buffs){
+ io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes;
+ }
+ boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32;
+
+ //pack metadata into a vrt header
+ _vrt_packer(otw_mem, if_packet_info);
+ otw_mem += if_packet_info.num_header_words32;
+
+ //copy-convert the samples into the send buffer
+ _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.);
+
+ //commit the samples to the zero-copy interface
+ size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t);
+ buff->commit(num_bytes_total);
+
+ }
+ _next_packet_seq++; //increment sequence after commits
+ return nsamps_per_buff;
+ }
+};
+
+}}} //namespace
+
+#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */
diff --git a/host/tests/CMakeLists.txt b/host/tests/CMakeLists.txt
index b38afccf0..b7bcfb7d5 100644
--- a/host/tests/CMakeLists.txt
+++ b/host/tests/CMakeLists.txt
@@ -28,6 +28,8 @@ SET(test_sources
gain_group_test.cpp
msg_test.cpp
ranges_test.cpp
+ sph_recv_test.cpp
+ sph_send_test.cpp
subdev_spec_test.cpp
time_spec_test.cpp
tune_helper_test.cpp
diff --git a/host/tests/sph_recv_test.cpp b/host/tests/sph_recv_test.cpp
new file mode 100644
index 000000000..6bb5d1175
--- /dev/null
+++ b/host/tests/sph_recv_test.cpp
@@ -0,0 +1,715 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <boost/test/unit_test.hpp>
+#include "../lib/transport/super_recv_packet_handler.hpp"
+#include <boost/shared_array.hpp>
+#include <boost/bind.hpp>
+#include <complex>
+#include <vector>
+#include <list>
+
+#define BOOST_CHECK_TS_CLOSE(a, b) \
+ BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001)
+
+/***********************************************************************
+ * A dummy managed receive buffer for testing
+ **********************************************************************/
+class dummy_mrb : public uhd::transport::managed_recv_buffer{
+public:
+ void release(void){
+ //NOP
+ }
+
+ sptr get_new(boost::shared_array<char> mem, size_t len){
+ _mem = mem;
+ _len = len;
+ return make_managed_buffer(this);
+ }
+
+private:
+ const void *get_buff(void) const{return _mem.get();}
+ size_t get_size(void) const{return _len;}
+
+ boost::shared_array<char> _mem;
+ size_t _len;
+};
+
+/***********************************************************************
+ * A dummy transport class to fill with fake data
+ **********************************************************************/
+class dummy_recv_xport_class{
+public:
+ dummy_recv_xport_class(const uhd::otw_type_t &otw_type){
+ _otw_type = otw_type;
+ }
+
+ void push_back_packet(
+ uhd::transport::vrt::if_packet_info_t &ifpi,
+ const boost::uint32_t optional_msg_word = 0
+ ){
+ const size_t max_pkt_len = (ifpi.num_payload_words32 + uhd::transport::vrt::max_if_hdr_words32 + 1/*tlr*/)*sizeof(boost::uint32_t);
+ _mems.push_back(boost::shared_array<char>(new char[max_pkt_len]));
+ if (_otw_type.byteorder == uhd::otw_type_t::BO_BIG_ENDIAN){
+ uhd::transport::vrt::if_hdr_pack_be(reinterpret_cast<boost::uint32_t *>(_mems.back().get()), ifpi);
+ }
+ if (_otw_type.byteorder == uhd::otw_type_t::BO_LITTLE_ENDIAN){
+ uhd::transport::vrt::if_hdr_pack_le(reinterpret_cast<boost::uint32_t *>(_mems.back().get()), ifpi);
+ }
+ (reinterpret_cast<boost::uint32_t *>(_mems.back().get()) + ifpi.num_header_words32)[0] = optional_msg_word | uhd::byteswap(optional_msg_word);
+ _lens.push_back(ifpi.num_packet_words32*sizeof(boost::uint32_t));
+ }
+
+ uhd::transport::managed_recv_buffer::sptr get_recv_buff(double){
+ if (_mems.empty()) return uhd::transport::managed_recv_buffer::sptr(); //timeout
+ _mrbs.push_back(dummy_mrb());
+ uhd::transport::managed_recv_buffer::sptr mrb = _mrbs.back().get_new(_mems.front(), _lens.front());
+ _mems.pop_front();
+ _lens.pop_front();
+ return mrb;
+ }
+
+private:
+ std::list<boost::shared_array<char> > _mems;
+ std::list<size_t> _lens;
+ std::list<dummy_mrb> _mrbs; //list means no-realloc
+ uhd::otw_type_t _otw_type;
+};
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_normal){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ dummy_recv_xport_class dummy_recv_xport(otw_type);
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ dummy_recv_xport.push_back_packet(ifpi);
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(1);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1));
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > buff(20);
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10);
+ num_accum_samps += num_samps_ret;
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_sequence_error){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ dummy_recv_xport_class dummy_recv_xport(otw_type);
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ if (i != NUM_PKTS_TO_TEST/2){ //simulate a lost packet
+ dummy_recv_xport.push_back_packet(ifpi);
+ }
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(1);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1));
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > buff(20);
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ if (i == NUM_PKTS_TO_TEST/2){
+ //must get the soft overflow here
+ BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ num_accum_samps += 10 + i%10;
+ }
+ else{
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10);
+ num_accum_samps += num_samps_ret;
+ }
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_one_channel_inline_message){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ dummy_recv_xport_class dummy_recv_xport(otw_type);
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ dummy_recv_xport.push_back_packet(ifpi);
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+
+ //simulate overflow
+ if (i == NUM_PKTS_TO_TEST/2){
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_EXTENSION;
+ ifpi.num_payload_words32 = 1;
+ dummy_recv_xport.push_back_packet(ifpi, uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ ifpi.packet_count++;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ }
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(1);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ handler.set_xport_chan_get_buff(0, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xport, _1));
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > buff(20);
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10);
+ num_accum_samps += num_samps_ret;
+ if (i == NUM_PKTS_TO_TEST/2){
+ handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ std::cout << "metadata.error_code " << metadata.error_code << std::endl;
+ BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ }
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_normal){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+ static const size_t NUM_SAMPS_PER_BUFF = 20;
+ static const size_t NCHANNELS = 4;
+
+ std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type));
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ dummy_recv_xports[ch].push_back_packet(ifpi);
+ }
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(NCHANNELS);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1));
+ }
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS);
+ std::vector<std::complex<float> *> buffs(NCHANNELS);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF];
+ }
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10);
+ num_accum_samps += num_samps_ret;
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_sequence_error){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+ static const size_t NUM_SAMPS_PER_BUFF = 20;
+ static const size_t NCHANNELS = 4;
+
+ std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type));
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ if (i == NUM_PKTS_TO_TEST/2 and ch == 2){
+ continue; //simulates a lost packet
+ }
+ dummy_recv_xports[ch].push_back_packet(ifpi);
+ }
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(NCHANNELS);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1));
+ }
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS);
+ std::vector<std::complex<float> *> buffs(NCHANNELS);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF];
+ }
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ if (i == NUM_PKTS_TO_TEST/2){
+ //must get the soft overflow here
+ BOOST_REQUIRE(metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ num_accum_samps += 10 + i%10;
+ }
+ else{
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10);
+ num_accum_samps += num_samps_ret;
+ }
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_time_error){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+ static const size_t NUM_SAMPS_PER_BUFF = 20;
+ static const size_t NCHANNELS = 4;
+
+ std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type));
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ dummy_recv_xports[ch].push_back_packet(ifpi);
+ }
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+ if (i == NUM_PKTS_TO_TEST/2){
+ ifpi.tsf = 0; //simulate the user changing the time
+ }
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(NCHANNELS);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1));
+ }
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS);
+ std::vector<std::complex<float> *> buffs(NCHANNELS);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF];
+ }
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10 + i%10);
+ num_accum_samps += num_samps_ret;
+ if (i == NUM_PKTS_TO_TEST/2){
+ num_accum_samps = 0; //simulate the user changing the time
+ }
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_recv_multi_channel_fragment){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ ifpi.packet_type = uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA;
+ ifpi.num_payload_words32 = 0;
+ ifpi.packet_count = 0;
+ ifpi.sob = true;
+ ifpi.eob = false;
+ ifpi.has_sid = false;
+ ifpi.has_cid = false;
+ ifpi.has_tsi = true;
+ ifpi.has_tsf = true;
+ ifpi.tsi = 0;
+ ifpi.tsf = 0;
+ ifpi.has_tlr = false;
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+ static const size_t NUM_SAMPS_PER_BUFF = 10;
+ static const size_t NCHANNELS = 4;
+
+ std::vector<dummy_recv_xport_class> dummy_recv_xports(NCHANNELS, dummy_recv_xport_class(otw_type));
+
+ //generate a bunch of packets
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ ifpi.num_payload_words32 = 10 + i%10;
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ dummy_recv_xports[ch].push_back_packet(ifpi);
+ }
+ ifpi.packet_count++;
+ ifpi.tsf += ifpi.num_payload_words32*size_t(TICK_RATE/SAMP_RATE);
+ }
+
+ //create the super receive packet handler
+ uhd::transport::sph::recv_packet_handler handler(NCHANNELS);
+ handler.set_vrt_unpacker(&uhd::transport::vrt::if_hdr_unpack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ handler.set_xport_chan_get_buff(ch, boost::bind(&dummy_recv_xport_class::get_recv_buff, &dummy_recv_xports[ch], _1));
+ }
+ handler.set_converter(otw_type);
+
+ //check the received packets
+ size_t num_accum_samps = 0;
+ std::vector<std::complex<float> > mem(NUM_SAMPS_PER_BUFF*NCHANNELS);
+ std::vector<std::complex<float> *> buffs(NCHANNELS);
+ for (size_t ch = 0; ch < NCHANNELS; ch++){
+ buffs[ch] = &mem[ch*NUM_SAMPS_PER_BUFF];
+ }
+ uhd::rx_metadata_t metadata;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ size_t num_samps_ret = handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, 10);
+ num_accum_samps += num_samps_ret;
+
+ if (not metadata.more_fragments) continue;
+
+ num_samps_ret = handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_NONE);
+ BOOST_CHECK(not metadata.more_fragments);
+ BOOST_CHECK_EQUAL(metadata.fragment_offset, 10);
+ BOOST_CHECK(metadata.has_time_spec);
+ BOOST_CHECK_TS_CLOSE(metadata.time_spec, uhd::time_spec_t(0, num_accum_samps, SAMP_RATE));
+ BOOST_CHECK_EQUAL(num_samps_ret, i%10);
+ num_accum_samps += num_samps_ret;
+ }
+
+ //subsequent receives should be a timeout
+ for (size_t i = 0; i < 3; i++){
+ std::cout << "timeout check " << i << std::endl;
+ handler.recv(
+ buffs, NUM_SAMPS_PER_BUFF, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::RECV_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(metadata.error_code, uhd::rx_metadata_t::ERROR_CODE_TIMEOUT);
+ }
+
+}
diff --git a/host/tests/sph_send_test.cpp b/host/tests/sph_send_test.cpp
new file mode 100644
index 000000000..ed2f54371
--- /dev/null
+++ b/host/tests/sph_send_test.cpp
@@ -0,0 +1,204 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <boost/test/unit_test.hpp>
+#include "../lib/transport/super_send_packet_handler.hpp"
+#include <boost/shared_array.hpp>
+#include <boost/bind.hpp>
+#include <complex>
+#include <vector>
+#include <list>
+
+#define BOOST_CHECK_TS_CLOSE(a, b) \
+ BOOST_CHECK_CLOSE((a).get_real_secs(), (b).get_real_secs(), 0.001)
+
+/***********************************************************************
+ * A dummy managed send buffer for testing
+ **********************************************************************/
+class dummy_msb : public uhd::transport::managed_send_buffer{
+public:
+ void commit(size_t len){
+ if (len == 0) return;
+ *_len = len;
+ }
+
+ sptr get_new(boost::shared_array<char> mem, size_t *len){
+ _mem = mem;
+ _len = len;
+ return make_managed_buffer(this);
+ }
+
+private:
+ void *get_buff(void) const{return _mem.get();}
+ size_t get_size(void) const{return *_len;}
+
+ boost::shared_array<char> _mem;
+ size_t *_len;
+};
+
+/***********************************************************************
+ * A dummy transport class to fill with fake data
+ **********************************************************************/
+class dummy_send_xport_class{
+public:
+ dummy_send_xport_class(const uhd::otw_type_t &otw_type){
+ _otw_type = otw_type;
+ }
+
+ void pop_front_packet(
+ uhd::transport::vrt::if_packet_info_t &ifpi
+ ){
+ ifpi.num_packet_words32 = _lens.front()/sizeof(boost::uint32_t);
+ if (_otw_type.byteorder == uhd::otw_type_t::BO_BIG_ENDIAN){
+ uhd::transport::vrt::if_hdr_unpack_be(reinterpret_cast<boost::uint32_t *>(_mems.front().get()), ifpi);
+ }
+ if (_otw_type.byteorder == uhd::otw_type_t::BO_LITTLE_ENDIAN){
+ uhd::transport::vrt::if_hdr_unpack_le(reinterpret_cast<boost::uint32_t *>(_mems.front().get()), ifpi);
+ }
+ _mems.pop_front();
+ _lens.pop_front();
+ }
+
+ uhd::transport::managed_send_buffer::sptr get_send_buff(double){
+ _msbs.push_back(dummy_msb());
+ _mems.push_back(boost::shared_array<char>(new char[1000]));
+ _lens.push_back(1000);
+ uhd::transport::managed_send_buffer::sptr mrb = _msbs.back().get_new(_mems.back(), &_lens.back());
+ return mrb;
+ }
+
+private:
+ std::list<boost::shared_array<char> > _mems;
+ std::list<size_t> _lens;
+ std::list<dummy_msb> _msbs; //list means no-realloc
+ uhd::otw_type_t _otw_type;
+};
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_send_one_channel_one_packet_mode){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ dummy_send_xport_class dummy_send_xport(otw_type);
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+
+ //create the super send packet handler
+ uhd::transport::sph::send_packet_handler handler(1);
+ handler.set_vrt_packer(&uhd::transport::vrt::if_hdr_pack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ handler.set_xport_chan_get_buff(0, boost::bind(&dummy_send_xport_class::get_send_buff, &dummy_send_xport, _1));
+ handler.set_converter(otw_type);
+ handler.set_max_samples_per_packet(20);
+
+ //allocate metadata and buffer
+ std::vector<std::complex<float> > buff(20);
+ uhd::tx_metadata_t metadata;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ //generate the test data
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ metadata.start_of_burst = (i == 0);
+ metadata.end_of_burst = (i == NUM_PKTS_TO_TEST-1);
+ const size_t num_sent = handler.send(
+ &buff.front(), 10 + i%10, metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::SEND_MODE_ONE_PACKET, 1.0
+ );
+ BOOST_CHECK_EQUAL(num_sent, 10 + i%10);
+ metadata.time_spec += uhd::time_spec_t(0, num_sent, SAMP_RATE);
+ }
+
+ //check the sent packets
+ size_t num_accum_samps = 0;
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ dummy_send_xport.pop_front_packet(ifpi);
+ BOOST_CHECK_EQUAL(ifpi.num_payload_words32, 10+i%10);
+ BOOST_CHECK(ifpi.has_tsi);
+ BOOST_CHECK(ifpi.has_tsf);
+ BOOST_CHECK_EQUAL(ifpi.tsi, 0);
+ BOOST_CHECK_EQUAL(ifpi.tsf, num_accum_samps*TICK_RATE/SAMP_RATE);
+ BOOST_CHECK_EQUAL(ifpi.sob, i == 0);
+ BOOST_CHECK_EQUAL(ifpi.eob, i == NUM_PKTS_TO_TEST-1);
+ num_accum_samps += ifpi.num_payload_words32;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////
+BOOST_AUTO_TEST_CASE(test_sph_send_one_channel_full_buffer_mode){
+////////////////////////////////////////////////////////////////////////
+ uhd::otw_type_t otw_type;
+ otw_type.width = 16;
+ otw_type.shift = 0;
+ otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
+
+ dummy_send_xport_class dummy_send_xport(otw_type);
+
+ static const double TICK_RATE = 100e6;
+ static const double SAMP_RATE = 10e6;
+ static const size_t NUM_PKTS_TO_TEST = 30;
+
+ //create the super send packet handler
+ uhd::transport::sph::send_packet_handler handler(1);
+ handler.set_vrt_packer(&uhd::transport::vrt::if_hdr_pack_be);
+ handler.set_tick_rate(TICK_RATE);
+ handler.set_samp_rate(SAMP_RATE);
+ handler.set_xport_chan_get_buff(0, boost::bind(&dummy_send_xport_class::get_send_buff, &dummy_send_xport, _1));
+ handler.set_converter(otw_type);
+ handler.set_max_samples_per_packet(20);
+
+ //allocate metadata and buffer
+ std::vector<std::complex<float> > buff(20*NUM_PKTS_TO_TEST);
+ uhd::tx_metadata_t metadata;
+ metadata.start_of_burst = true;
+ metadata.end_of_burst = true;
+ metadata.has_time_spec = true;
+ metadata.time_spec = uhd::time_spec_t(0.0);
+
+ //generate the test data
+ const size_t num_sent = handler.send(
+ &buff.front(), buff.size(), metadata,
+ uhd::io_type_t::COMPLEX_FLOAT32,
+ uhd::device::SEND_MODE_FULL_BUFF, 1.0
+ );
+ BOOST_CHECK_EQUAL(num_sent, buff.size());
+
+ //check the sent packets
+ size_t num_accum_samps = 0;
+ uhd::transport::vrt::if_packet_info_t ifpi;
+ for (size_t i = 0; i < NUM_PKTS_TO_TEST; i++){
+ std::cout << "data check " << i << std::endl;
+ dummy_send_xport.pop_front_packet(ifpi);
+ BOOST_CHECK_EQUAL(ifpi.num_payload_words32, 20);
+ BOOST_CHECK(ifpi.has_tsi);
+ BOOST_CHECK(ifpi.has_tsf);
+ BOOST_CHECK_EQUAL(ifpi.tsi, 0);
+ BOOST_CHECK_EQUAL(ifpi.tsf, num_accum_samps*TICK_RATE/SAMP_RATE);
+ BOOST_CHECK_EQUAL(ifpi.sob, i == 0);
+ BOOST_CHECK_EQUAL(ifpi.eob, i == NUM_PKTS_TO_TEST-1);
+ num_accum_samps += ifpi.num_payload_words32;
+ }
+}