diff options
author | Josh Blum <josh@joshknows.com> | 2011-06-12 19:46:06 -0700 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2011-06-14 17:27:46 -0700 |
commit | c65a1f8e40960d5016788de2cc4775c9e603293a (patch) | |
tree | 24a404212d7ee4aef3b34ac2b0df9b1e92841162 /host/lib/transport | |
parent | 85ebb705fa567e8093aa68c0ad88996d434ed2bf (diff) | |
download | uhd-c65a1f8e40960d5016788de2cc4775c9e603293a.tar.gz uhd-c65a1f8e40960d5016788de2cc4775c9e603293a.tar.bz2 uhd-c65a1f8e40960d5016788de2cc4775c9e603293a.zip |
uhd: supper packet handler support squashed
Diffstat (limited to 'host/lib/transport')
-rwxr-xr-x | host/lib/transport/gen_vrt_if_packet.py | 44 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 637 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 287 |
3 files changed, 955 insertions, 13 deletions
diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index 7440def6a..5f048d8c7 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -62,6 +62,8 @@ static pred_table_type get_pred_unpack_table(void){ if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p); if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p); if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); + if(vrt_hdr_word & $hex(0x1 << 24)) table[i] |= $hex($eob_p); + if(vrt_hdr_word & $hex(0x1 << 25)) table[i] |= $hex($sob_p); } return table; } @@ -84,9 +86,11 @@ void vrt::if_hdr_pack_$(suffix)( if (if_packet_info.has_tsi) pred |= $hex($tsi_p); if (if_packet_info.has_tsf) pred |= $hex($tsf_p); if (if_packet_info.has_tlr) pred |= $hex($tlr_p); + if (if_packet_info.eob) pred |= $hex($eob_p); + if (if_packet_info.sob) pred |= $hex($sob_p); switch(pred){ - #for $pred in range(2**5) + #for $pred in range(2**7) case $pred: #set $num_header_words = 1 #set $flags = 0 @@ -126,6 +130,13 @@ void vrt::if_hdr_pack_$(suffix)( #else #set $num_trailer_words = 0; #end if + ########## Burst Flags ########## + #if $pred & $eob_p + #set $flags |= (0x1 << 24); + #end if + #if $pred & $sob_p + #set $flags |= (0x1 << 25); + #end if ########## Variables ########## if_packet_info.num_header_words32 = $num_header_words; if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32; @@ -134,10 +145,6 @@ void vrt::if_hdr_pack_$(suffix)( #end for } - //set the burst flags - if (if_packet_info.sob) vrt_hdr_flags |= $hex(0x1 << 25); - if (if_packet_info.eob) vrt_hdr_flags |= $hex(0x1 << 24); - //fill in complete header word packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 | (if_packet_info.packet_type << 29) @@ -162,13 +169,11 @@ void vrt::if_hdr_unpack_$(suffix)( //extract fields from the header if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; - //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented - //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; switch(pred){ - #for $pred in range(2**5) + #for $pred in range(2**7) case $pred: #set $has_time_spec = False #set $num_header_words = 1 @@ -215,6 +220,17 @@ void vrt::if_hdr_unpack_$(suffix)( if_packet_info.has_tlr = false; #set $num_trailer_words = 0; #end if + ########## Burst Flags ########## + #if $pred & $eob_p + if_packet_info.eob = true; + #else + if_packet_info.eob = false; + #end if + #if $pred & $sob_p + if_packet_info.sob = true; + #else + if_packet_info.sob = false; + #end if ########## Variables ########## //another failure case if (packet_words32 < $($num_header_words + $num_trailer_words)) @@ -243,9 +259,11 @@ if __name__ == '__main__': open(sys.argv[1], 'w').write(parse_tmpl( TMPL_TEXT, file=__file__, - sid_p = 0b00001, - cid_p = 0b00010, - tsi_p = 0b00100, - tsf_p = 0b01000, - tlr_p = 0b10000, + sid_p = 0b0000001, + cid_p = 0b0000010, + tsi_p = 0b0000100, + tsf_p = 0b0001000, + tlr_p = 0b0010000, + sob_p = 0b0100000, + eob_p = 0b1000000, )) diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp new file mode 100644 index 000000000..78d8dfce8 --- /dev/null +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -0,0 +1,637 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <boost/format.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +UHD_INLINE boost::uint32_t get_context_code( + const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info +){ + //extract the context word (we dont know the endianness so mirror the bytes) + boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | + uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); + return word0 & 0xff; +} + +typedef boost::function<void(void)> handle_overflow_type; +static inline void handle_overflow_nop(void){} + +/*********************************************************************** + * Alignment indexes class: + * - Access an integer set with very quick operations. + **********************************************************************/ +class alignment_indexes{ +public: + typedef boost::uint16_t index_type; //16 buffers + + alignment_indexes(void): + _indexes(0), + _sizes(256, 0), + _fronts(256, ~0) + { + //fill the O(1) look up tables for a single byte + for (size_t i = 0; i < 256; i++){ + for (size_t j = 0; j < 8; j++){ + if (i & (1 << j)){ + _sizes[i]++; + _fronts[i] = j; + } + } + } + } + + UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;} + + UHD_INLINE size_t front(void){ + //check one byte per iteration + for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ + size_t front = _fronts[(_indexes >> i) & 0xff]; + if (front != size_t(~0)) return front + i; + } + if (empty()) throw uhd::runtime_error("cannot call front() when empty"); + UHD_THROW_INVALID_CODE_PATH(); + } + + UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);} + + UHD_INLINE bool empty(void){return _indexes == 0;} + + UHD_INLINE size_t size(void){ + size_t size = 0; + //check one byte per iteration + for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ + size += _sizes[(_indexes >> i) & 0xff]; + } + return size; + } + +private: + index_type _indexes; + std::vector<size_t> _sizes; + std::vector<size_t> _fronts; +}; + +/*********************************************************************** + * Super receive packet handler + * + * A receive packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are received in unison in recv(). + **********************************************************************/ +class recv_packet_handler{ +public: + typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; + typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; + + /*! + * Make a new packet handler for receive + * \param size the number of transport channels + */ + recv_packet_handler(const size_t size = 1): + _queue_error_for_next_call(false), + _buffers_infos_index(0) + { + UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8); + this->resize(size); + set_alignment_failure_threshold(1000); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _props.resize(size); + _buffers_infos.resize(4, buffers_info_type(size)); + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt unpacker function and offset + void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){ + _vrt_unpacker = vrt_unpacker; + _header_offset_words32 = header_offset_words32; + } + + /*! + * Set the threshold for alignment failure. + * How many packets throw out before giving up? + * \param threshold number of packets per channel + */ + void set_alignment_failure_threshold(const size_t threshold){ + _alignment_faulure_threshold = threshold*this->size(); + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Setup the conversion functions (homogeneous across transports). + * Here, we load a table of converters for all possible io types. + * This makes the converter look-up an O(1) operation. + * \param otw_type the channel data type + * \param width the streams per channel (usually 1) + */ + void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ + _io_buffs.resize(width); + _converters.resize(128); + for (size_t io_type = 0; io_type < _converters.size(); io_type++){ + try{ + _converters[io_type] = uhd::convert::get_converter_otw_to_cpu( + io_type_t::tid_t(io_type), otw_type, 1, width + ); + }catch(const uhd::value_error &e){} //we expect this, not all io_types valid... + } + _bytes_per_item = otw_type.get_sample_size(); + } + + //! Set the transport channel's overflow handler + void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){ + _props.at(xport_chan).handle_overflow = handle_overflow; + } + + //! Get a scoped lock object for this instance + boost::mutex::scoped_lock get_scoped_lock(void){ + return boost::mutex::scoped_lock(_mutex); + } + + /******************************************************************* + * Receive: + * The entry point for the fast-path receive calls. + * Dispatch into combinations of single packet receive calls. + ******************************************************************/ + UHD_INLINE size_t recv( + const uhd::device::recv_buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const uhd::io_type_t &io_type, + uhd::device::recv_mode_t recv_mode, + double timeout + ){ + boost::mutex::scoped_lock lock(_mutex); + + //handle metadata queued from a previous receive + if (_queue_error_for_next_call){ + _queue_error_for_next_call = false; + metadata = _queue_metadata; + //We want to allow a full buffer recv to be cut short by a timeout, + //but do not want to generate an inline timeout message packet. + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0; + } + + switch(recv_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + return recv_one_packet(buffs, nsamps_per_buff, metadata, io_type, timeout); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t accum_num_samps = recv_one_packet( + buffs, nsamps_per_buff, metadata, io_type, timeout + ); + + //first recv had an error code set, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + + //loop until buffer is filled or error code + while(accum_num_samps < nsamps_per_buff){ + size_t num_samps = recv_one_packet( + buffs, nsamps_per_buff - accum_num_samps, _queue_metadata, + io_type, timeout, accum_num_samps*io_type.size + ); + + //metadata had an error code set, store for next call and return + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){ + _queue_error_for_next_call = true; + break; + } + accum_num_samps += num_samps; + } + return accum_num_samps; + } + + default: throw uhd::value_error("unknown recv mode"); + }//switch(recv_mode) + } + +private: + + boost::mutex _mutex; + vrt_unpacker_type _vrt_unpacker; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + bool _queue_error_for_next_call; + size_t _alignment_faulure_threshold; + rx_metadata_t _queue_metadata; + struct xport_chan_props_type{ + xport_chan_props_type(void): + packet_count(0), + handle_overflow(&handle_overflow_nop) + {} + get_buff_type get_buff; + size_t packet_count; + handle_overflow_type handle_overflow; + }; + std::vector<xport_chan_props_type> _props; + std::vector<void *> _io_buffs; //used in conversion + size_t _bytes_per_item; //used in conversion + std::vector<uhd::convert::function_type> _converters; //used in conversion + + //! information stored for a received buffer + struct per_buffer_info_type{ + managed_recv_buffer::sptr buff; + const boost::uint32_t *vrt_hdr; + vrt::if_packet_info_t ifpi; + time_spec_t time; + const char *copy_buff; + }; + + //!information stored for a set of aligned buffers + struct buffers_info_type : std::vector<per_buffer_info_type> { + buffers_info_type(const size_t size): + std::vector<per_buffer_info_type>(size), + alignment_time_valid(false), + data_bytes_to_copy(0), + fragment_offset_in_samps(0) + { + indexes_to_do.reset(size); + } + alignment_indexes indexes_to_do; //used in alignment logic + time_spec_t alignment_time; //used in alignment logic + bool alignment_time_valid; //used in alignment logic + size_t data_bytes_to_copy; //keeps track of state + size_t fragment_offset_in_samps; //keeps track of state + rx_metadata_t metadata; //packet description + }; + + //! a circular queue of buffer infos + std::vector<buffers_info_type> _buffers_infos; + size_t _buffers_infos_index; + buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];} + buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];} + buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];} + void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;} + + //! possible return options for the packet receiver + enum packet_type{ + PACKET_IF_DATA, + PACKET_TIMESTAMP_ERROR, + PACKET_INLINE_MESSAGE, + PACKET_TIMEOUT_ERROR, + PACKET_SEQUENCE_ERROR + }; + + /******************************************************************* + * Get and process a single packet from the transport: + * Receive a single packet at the given index. + * Extract all the relevant info and store. + * Check the info to determine the return code. + ******************************************************************/ + UHD_INLINE packet_type get_and_process_single_packet( + const size_t index, + buffers_info_type &prev_buffer_info, + buffers_info_type &curr_buffer_info, + double timeout + ){ + //get a single packet from the transport layer + managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + per_buffer_info_type &info = curr_buffer_info[index]; + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true + info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + + //store the packet count for the next iteration + #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t expected_packet_count = _props[index].packet_count; + _props[index].packet_count = (info.ifpi.packet_count + 1)%16; + #endif + + //-------------------------------------------------------------- + //-- Determine return conditions: + //-- The order of these checks is HOLY. + //-------------------------------------------------------------- + + //1) check for out of order timestamps + if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ + return PACKET_TIMESTAMP_ERROR; + } + + //2) check for inline IF message packets + if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ + return PACKET_INLINE_MESSAGE; + } + + //3) check for sequence errors + #ifndef SRPH_DONT_CHECK_SEQUENCE + if (expected_packet_count != info.ifpi.packet_count){ + return PACKET_SEQUENCE_ERROR; + } + #endif + + //4) otherwise the packet is normal! + return PACKET_IF_DATA; + } + + /******************************************************************* + * Alignment check: + * Check the received packet for alignment and mark accordingly. + ******************************************************************/ + UHD_INLINE void alignment_check( + const size_t index, buffers_info_type &info + ){ + //if alignment time was not valid or if the sequence id is newer: + // use this index's time as the alignment time + // reset the indexes list and remove this index + if (not info.alignment_time_valid or info[index].time > info.alignment_time){ + info.alignment_time_valid = true; + info.alignment_time = info[index].time; + info.indexes_to_do.reset(this->size()); + info.indexes_to_do.remove(index); + info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t); + } + + //if the sequence id matches: + // remove this index from the list and continue + else if (info[index].time == info.alignment_time){ + info.indexes_to_do.remove(index); + } + + //if the sequence id is older: + // continue with the same index to try again + //else if (info[index].time < info.alignment_time)... + } + + /******************************************************************* + * Get aligned buffers: + * Iterate through each index and try to accumulate aligned buffers. + * Handle all of the edge cases like inline messages and errors. + * The logic will throw out older packets until it finds a match. + ******************************************************************/ + UHD_INLINE void get_aligned_buffs(double timeout){ + + increment_buffer_info(); //increment to next buffer + buffers_info_type &prev_info = get_prev_buffer_info(); + buffers_info_type &curr_info = get_curr_buffer_info(); + buffers_info_type &next_info = get_next_buffer_info(); + + //Loop until we get a message of an aligned set of buffers: + // - Receive a single packet and extract its info. + // - Handle the packet type yielded by the receive. + // - Check the timestamps for alignment conditions. + size_t iterations = 0; + while (not curr_info.indexes_to_do.empty()){ + + //get the index to process for this iteration + const size_t index = curr_info.indexes_to_do.front(); + packet_type packet; + + //receive a single packet from the transport + try{ + packet = get_and_process_single_packet( + index, prev_info, curr_info, timeout + ); + } + + //handle the case when the get packet throws + catch(const std::exception &e){ + UHD_MSG(error) << boost::format( + "The receive packet handler caught an exception.\n%s" + ) % e.what() << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return; + } + + switch(packet){ + case PACKET_IF_DATA: + alignment_check(index, curr_info); + break; + + case PACKET_TIMESTAMP_ERROR: + //If the user changes the device time while streaming or without flushing, + //we can receive a packet that comes before the previous packet in time. + //This could cause the alignment logic to discard future received packets. + //Therefore, when this occurs, we reset the info to restart from scratch. + if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){ + curr_info.alignment_time_valid = false; + } + alignment_check(index, curr_info); + break; + + case PACKET_INLINE_MESSAGE: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsi and next_info[index].ifpi.has_tsf; + curr_info.metadata.time_spec = next_info[index].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); + if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow(); + UHD_MSG(fastpath) << "O"; + return; + + case PACKET_TIMEOUT_ERROR: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return; + + case PACKET_SEQUENCE_ERROR: + alignment_check(index, curr_info); + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; + curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t( + 0.0, prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_item, _samp_rate); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + UHD_MSG(fastpath) << "O"; + return; + + } + + //too many iterations: detect alignment failure + if (iterations++ > _alignment_faulure_threshold){ + UHD_MSG(error) << boost::format( + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return; + } + + } + + //set the metadata from the buffer information at index zero + curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsi and curr_info[0].ifpi.has_tsf; + curr_info.metadata.time_spec = curr_info[0].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + /* TODO SOB on RX not supported in hardware + static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits + curr_info.metadata.start_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_sob_flags) != 0); + */ + curr_info.metadata.start_of_burst = false; + static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits + curr_info.metadata.end_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_eob_flags) != 0); + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + } + + /******************************************************************* + * Receive a single packet: + * Handles fragmentation, messages, errors, and copy-conversion. + * When no fragments are available, call the get aligned buffers. + * Then copy-convert available data into the user's IO buffers. + ******************************************************************/ + UHD_INLINE size_t recv_one_packet( + const uhd::device::recv_buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const uhd::io_type_t &io_type, + double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //get the next buffer if the current one has expired + if (get_curr_buffer_info().data_bytes_to_copy == 0){ + + //reset current buffer info members for reuse + get_curr_buffer_info().fragment_offset_in_samps = 0; + get_curr_buffer_info().alignment_time_valid = false; + get_curr_buffer_info().indexes_to_do.reset(this->size()); + + //perform receive with alignment logic + get_aligned_buffs(timeout); + } + + buffers_info_type &info = get_curr_buffer_info(); + metadata = info.metadata; + + //interpolate the time spec (useful when this is a fragment) + metadata.time_spec += time_spec_t(0, info.fragment_offset_in_samps, _samp_rate); + + //extract the number of samples available to copy + const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_item; + const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available); + const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_item; + const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size(); + + size_t buff_index = 0; + BOOST_FOREACH(per_buffer_info_type &buff_info, info){ + + //fill a vector with pointers to the io buffers + BOOST_FOREACH(void *&io_buff, _io_buffs){ + io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes; + } + + //copy-convert the samples from the recv buffer + _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.); + + //update the rx copy buffer to reflect the bytes copied + buff_info.copy_buff += bytes_to_copy; + } + //update the copy buffer's availability + info.data_bytes_to_copy -= bytes_to_copy; + + //setup the fragment flags and offset + metadata.more_fragments = info.data_bytes_to_copy != 0; + metadata.fragment_offset = info.fragment_offset_in_samps; + info.fragment_offset_in_samps += nsamps_to_copy; //set for next call + + return nsamps_to_copy_per_io_buff; + } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp new file mode 100644 index 000000000..99d445180 --- /dev/null +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -0,0 +1,287 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/thread_time.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +/*********************************************************************** + * Super send packet handler + * + * A send packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are sent in unison in send(). + **********************************************************************/ +class send_packet_handler{ +public: + typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; + typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; + + /*! + * Make a new packet handler for send + * \param size the number of transport channels + */ + send_packet_handler(const size_t size = 1): + _next_packet_seq(0) + { + this->resize(size); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _props.resize(size); + static const boost::uint64_t zero = 0; + _zero_buffs.resize(size, &zero); + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt packer function and offset + void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){ + _vrt_packer = vrt_packer; + _header_offset_words32 = header_offset_words32; + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Setup the conversion functions (homogeneous across transports). + * Here, we load a table of converters for all possible io types. + * This makes the converter look-up an O(1) operation. + * \param otw_type the channel data type + * \param width the streams per channel (usually 1) + */ + void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ + _io_buffs.resize(width); + _converters.resize(128); + for (size_t io_type = 0; io_type < _converters.size(); io_type++){ + try{ + _converters[io_type] = uhd::convert::get_converter_cpu_to_otw( + io_type_t::tid_t(io_type), otw_type, 1, width + ); + }catch(const uhd::value_error &e){} //we expect this, not all io_types valid... + } + _bytes_per_item = otw_type.get_sample_size(); + } + + /*! + * Set the maximum number of samples per host packet. + * Ex: A USRP1 in dual channel mode would be half. + * \param num_samps the maximum samples in a packet + */ + void set_max_samples_per_packet(const size_t num_samps){ + _max_samples_per_packet = num_samps; + } + + //! Get a scoped lock object for this instance + boost::mutex::scoped_lock get_scoped_lock(void){ + return boost::mutex::scoped_lock(_mutex); + } + + /******************************************************************* + * Send: + * The entry point for the fast-path send calls. + * Dispatch into combinations of single packet send calls. + ******************************************************************/ + UHD_INLINE size_t send( + const uhd::device::send_buffs_type &buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t &metadata, + const uhd::io_type_t &io_type, + uhd::device::send_mode_t send_mode, + double timeout + ){ + boost::mutex::scoped_lock lock(_mutex); + + //translate the metadata to vrt if packet info + vrt::if_packet_info_t if_packet_info; + if_packet_info.has_sid = false; + if_packet_info.has_cid = false; + if_packet_info.has_tlr = false; + if_packet_info.has_tsi = metadata.has_time_spec; + if_packet_info.has_tsf = metadata.has_time_spec; + if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(_tick_rate)); + if_packet_info.sob = metadata.start_of_burst; + if_packet_info.eob = metadata.end_of_burst; + + if (nsamps_per_buff <= _max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; + switch(send_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + + //TODO remove this code when sample counts of zero are supported by hardware + if (nsamps_per_buff == 0) return send_one_packet( + _zero_buffs, 1, if_packet_info, io_type, timeout + ); + + return send_one_packet( + buffs, + std::min(nsamps_per_buff, _max_samples_per_packet), + if_packet_info, io_type, timeout + ); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t total_num_samps_sent = 0; + + //false until final fragment + if_packet_info.eob = false; + + const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet; + const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1; + + //loop through the following fragment indexes + for (size_t i = 0; i < num_fragments; i++){ + + //send a fragment with the helper function + const size_t num_samps_sent = send_one_packet( + buffs, _max_samples_per_packet, + if_packet_info, io_type, timeout, + total_num_samps_sent*io_type.size + ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; + + //setup metadata for the next fragment + const time_spec_t time_spec = metadata.time_spec + time_spec_t(0, total_num_samps_sent, _samp_rate); + if_packet_info.tsi = boost::uint32_t(time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(time_spec.get_tick_count(_tick_rate)); + if_packet_info.sob = false; + + } + + //send the final fragment with the helper function + if_packet_info.eob = metadata.end_of_burst; + return total_num_samps_sent + send_one_packet( + buffs, final_length, + if_packet_info, io_type, timeout, + total_num_samps_sent*io_type.size + ); + } + + default: throw uhd::value_error("unknown send mode"); + }//switch(send_mode) + } + +private: + + boost::mutex _mutex; + vrt_packer_type _vrt_packer; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + struct xport_chan_props_type{ + get_buff_type get_buff; + }; + std::vector<xport_chan_props_type> _props; + std::vector<const void *> _io_buffs; //used in conversion + size_t _bytes_per_item; //used in conversion + std::vector<uhd::convert::function_type> _converters; //used in conversion + size_t _max_samples_per_packet; + std::vector<const void *> _zero_buffs; + size_t _next_packet_seq; + + /******************************************************************* + * Send a single packet: + ******************************************************************/ + size_t send_one_packet( + const uhd::device::send_buffs_type &buffs, + const size_t nsamps_per_buff, + vrt::if_packet_info_t &if_packet_info, + const uhd::io_type_t &io_type, + double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //load the rest of the if_packet_info in here + if_packet_info.num_payload_words32 = (nsamps_per_buff*_io_buffs.size()*_bytes_per_item)/sizeof(boost::uint32_t); + if_packet_info.packet_count = _next_packet_seq; + + size_t buff_index = 0; + BOOST_FOREACH(xport_chan_props_type &props, _props){ + managed_send_buffer::sptr buff = props.get_buff(timeout); + if (buff.get() == NULL) return 0; //timeout + + //fill a vector with pointers to the io buffers + BOOST_FOREACH(const void *&io_buff, _io_buffs){ + io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes; + } + boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; + + //pack metadata into a vrt header + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //copy-convert the samples into the send buffer + _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.); + + //commit the samples to the zero-copy interface + size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); + buff->commit(num_bytes_total); + + } + _next_packet_seq++; //increment sequence after commits + return nsamps_per_buff; + } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */ |