diff options
author | Josh Blum <josh@joshknows.com> | 2011-06-15 14:50:02 -0700 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2011-06-15 14:50:02 -0700 |
commit | 0cef788d3d34a298c975e32c488e800a5c65ccce (patch) | |
tree | 14b7f4242a28927b1701a73a3e8ec0df3fa04811 /host/lib/transport | |
parent | 7951170c6161d9266726ec19e8c009500cf11f75 (diff) | |
parent | 27f1622d439ceb787e7dada733d0eb82270c5532 (diff) | |
download | uhd-0cef788d3d34a298c975e32c488e800a5c65ccce.tar.gz uhd-0cef788d3d34a298c975e32c488e800a5c65ccce.tar.bz2 uhd-0cef788d3d34a298c975e32c488e800a5c65ccce.zip |
Merge branch 'next'
Diffstat (limited to 'host/lib/transport')
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 2 | ||||
-rwxr-xr-x | host/lib/transport/gen_vrt_if_packet.py | 44 | ||||
-rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 33 | ||||
-rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 638 | ||||
-rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 287 | ||||
-rw-r--r-- | host/lib/transport/usb_zero_copy_wrapper.cpp | 202 | ||||
-rw-r--r-- | host/lib/transport/vrt_packet_handler.hpp | 470 |
7 files changed, 1181 insertions, 495 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index 90360977a..b1821956c 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -98,5 +98,5 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_zero_copy.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/vrt_packet_handler.hpp + ${CMAKE_CURRENT_SOURCE_DIR}/usb_zero_copy_wrapper.cpp ) diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index 7440def6a..5f048d8c7 100755 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -62,6 +62,8 @@ static pred_table_type get_pred_unpack_table(void){ if(vrt_hdr_word & $hex(0x3 << 22)) table[i] |= $hex($tsi_p); if(vrt_hdr_word & $hex(0x3 << 20)) table[i] |= $hex($tsf_p); if(vrt_hdr_word & $hex(0x1 << 26)) table[i] |= $hex($tlr_p); + if(vrt_hdr_word & $hex(0x1 << 24)) table[i] |= $hex($eob_p); + if(vrt_hdr_word & $hex(0x1 << 25)) table[i] |= $hex($sob_p); } return table; } @@ -84,9 +86,11 @@ void vrt::if_hdr_pack_$(suffix)( if (if_packet_info.has_tsi) pred |= $hex($tsi_p); if (if_packet_info.has_tsf) pred |= $hex($tsf_p); if (if_packet_info.has_tlr) pred |= $hex($tlr_p); + if (if_packet_info.eob) pred |= $hex($eob_p); + if (if_packet_info.sob) pred |= $hex($sob_p); switch(pred){ - #for $pred in range(2**5) + #for $pred in range(2**7) case $pred: #set $num_header_words = 1 #set $flags = 0 @@ -126,6 +130,13 @@ void vrt::if_hdr_pack_$(suffix)( #else #set $num_trailer_words = 0; #end if + ########## Burst Flags ########## + #if $pred & $eob_p + #set $flags |= (0x1 << 24); + #end if + #if $pred & $sob_p + #set $flags |= (0x1 << 25); + #end if ########## Variables ########## if_packet_info.num_header_words32 = $num_header_words; if_packet_info.num_packet_words32 = $($num_header_words + $num_trailer_words) + if_packet_info.num_payload_words32; @@ -134,10 +145,6 @@ void vrt::if_hdr_pack_$(suffix)( #end for } - //set the burst flags - if (if_packet_info.sob) vrt_hdr_flags |= $hex(0x1 << 25); - if (if_packet_info.eob) vrt_hdr_flags |= $hex(0x1 << 24); - //fill in complete header word packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 | (if_packet_info.packet_type << 29) @@ -162,13 +169,11 @@ void vrt::if_hdr_unpack_$(suffix)( //extract fields from the header if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; - //if_packet_info.sob = bool(vrt_hdr_word & $hex(0x1 << 25)); //not implemented - //if_packet_info.eob = bool(vrt_hdr_word & $hex(0x1 << 24)); //not implemented const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; switch(pred){ - #for $pred in range(2**5) + #for $pred in range(2**7) case $pred: #set $has_time_spec = False #set $num_header_words = 1 @@ -215,6 +220,17 @@ void vrt::if_hdr_unpack_$(suffix)( if_packet_info.has_tlr = false; #set $num_trailer_words = 0; #end if + ########## Burst Flags ########## + #if $pred & $eob_p + if_packet_info.eob = true; + #else + if_packet_info.eob = false; + #end if + #if $pred & $sob_p + if_packet_info.sob = true; + #else + if_packet_info.sob = false; + #end if ########## Variables ########## //another failure case if (packet_words32 < $($num_header_words + $num_trailer_words)) @@ -243,9 +259,11 @@ if __name__ == '__main__': open(sys.argv[1], 'w').write(parse_tmpl( TMPL_TEXT, file=__file__, - sid_p = 0b00001, - cid_p = 0b00010, - tsi_p = 0b00100, - tsf_p = 0b01000, - tlr_p = 0b10000, + sid_p = 0b0000001, + cid_p = 0b0000010, + tsi_p = 0b0000100, + tsf_p = 0b0001000, + tlr_p = 0b0010000, + sob_p = 0b0100000, + eob_p = 0b1000000, )) diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 28bea978b..19a7a3742 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -20,7 +20,7 @@ #include <uhd/transport/bounded_buffer.hpp> #include <uhd/transport/buffer_pool.hpp> #include <uhd/utils/thread_priority.hpp> -#include <uhd/utils/log.hpp> +#include <uhd/utils/msg.hpp> #include <uhd/exception.hpp> #include <boost/function.hpp> #include <boost/foreach.hpp> @@ -44,15 +44,24 @@ static const size_t DEFAULT_XFER_SIZE = 32*512; //bytes * to ensure that they are compiled with the same calling convention as libusb. */ -//! helper function: handles all async callbacks -static void LIBUSB_CALL libusb_async_cb(libusb_transfer *lut){ +//! helper function: handles all rx async callbacks +static void LIBUSB_CALL libusb_async_rx_cb(libusb_transfer *lut){ + if(lut->actual_length == 0) { + UHD_ASSERT_THROW(libusb_submit_transfer(lut) == 0); //get out until you find some real data + return; + } + (*static_cast<boost::function<void()> *>(lut->user_data))(); +} + +//! helper function: handles all tx async callbacks +static void LIBUSB_CALL libusb_async_tx_cb(libusb_transfer *lut) { (*static_cast<boost::function<void()> *>(lut->user_data))(); } //! callback to free transfer upon cancellation static void LIBUSB_CALL cancel_transfer_cb(libusb_transfer *lut){ - if (lut->status == LIBUSB_TRANSFER_CANCELLED) libusb_free_transfer(lut); - else UHD_LOGV(rarely) << "libusb cancel_transfer unexpected status " << lut->status << std::endl; + if (lut->status == LIBUSB_TRANSFER_CANCELLED || lut->status == LIBUSB_TRANSFER_TIMED_OUT) libusb_free_transfer(lut); + else UHD_MSG(error) << "libusb cancel_transfer unexpected status " << lut->status << std::endl; } /*********************************************************************** @@ -97,7 +106,7 @@ public: void commit(size_t len){ if (_expired) return; _lut->length = len; - if(len == 0) libusb_async_cb(_lut); + if(len == 0) libusb_async_tx_cb(_lut); else UHD_ASSERT_THROW(libusb_submit_transfer(_lut) == 0); _expired = true; } @@ -157,9 +166,9 @@ public: (recv_endpoint & 0x7f) | 0x80, // endpoint static_cast<unsigned char *>(_recv_buffer_pool->at(i)), // buffer this->get_recv_frame_size(), // length - libusb_transfer_cb_fn(&libusb_async_cb), // callback + libusb_transfer_cb_fn(&libusb_async_rx_cb), // callback static_cast<void *>(&_callbacks.back()), // user_data - 0 // timeout + 0 // timeout (ms) ); _all_luts.push_back(lut); @@ -183,13 +192,13 @@ public: (send_endpoint & 0x7f) | 0x00, // endpoint static_cast<unsigned char *>(_send_buffer_pool->at(i)), // buffer this->get_send_frame_size(), // length - libusb_transfer_cb_fn(&libusb_async_cb), // callback + libusb_transfer_cb_fn(&libusb_async_tx_cb), // callback static_cast<void *>(&_callbacks.back()), // user_data 0 // timeout ); _all_luts.push_back(lut); - libusb_async_cb(lut); + libusb_async_tx_cb(lut); } //spawn the event handler threads @@ -206,7 +215,9 @@ public: BOOST_FOREACH(libusb_transfer *lut, _all_luts){ lut->callback = libusb_transfer_cb_fn(&cancel_transfer_cb); libusb_cancel_transfer(lut); - while(lut->status != LIBUSB_TRANSFER_CANCELLED && lut->status != LIBUSB_TRANSFER_COMPLETED) { + while(lut->status != LIBUSB_TRANSFER_CANCELLED + && lut->status != LIBUSB_TRANSFER_COMPLETED + && lut->status != LIBUSB_TRANSFER_TIMED_OUT) { boost::this_thread::sleep(boost::posix_time::milliseconds(10)); } } diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp new file mode 100644 index 000000000..80ad17b6c --- /dev/null +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -0,0 +1,638 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <boost/format.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +UHD_INLINE boost::uint32_t get_context_code( + const boost::uint32_t *vrt_hdr, const vrt::if_packet_info_t &if_packet_info +){ + //extract the context word (we dont know the endianness so mirror the bytes) + boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | + uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); + return word0 & 0xff; +} + +typedef boost::function<void(void)> handle_overflow_type; +static inline void handle_overflow_nop(void){} + +/*********************************************************************** + * Alignment indexes class: + * - Access an integer set with very quick operations. + **********************************************************************/ +class alignment_indexes{ +public: + typedef boost::uint16_t index_type; //16 buffers + + alignment_indexes(void): + _indexes(0), + _sizes(256, 0), + _fronts(256, ~0) + { + //fill the O(1) look up tables for a single byte + for (size_t i = 0; i < 256; i++){ + for (size_t j = 0; j < 8; j++){ + if (i & (1 << j)){ + _sizes[i]++; + _fronts[i] = j; + } + } + } + } + + UHD_INLINE void reset(size_t len){_indexes = (1 << len) - 1;} + + UHD_INLINE size_t front(void){ + //check one byte per iteration + for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ + size_t front = _fronts[(_indexes >> i) & 0xff]; + if (front != size_t(~0)) return front + i; + } + if (empty()) throw uhd::runtime_error("cannot call front() when empty"); + UHD_THROW_INVALID_CODE_PATH(); + } + + UHD_INLINE void remove(size_t index){_indexes &= ~(1 << index);} + + UHD_INLINE bool empty(void){return _indexes == 0;} + + UHD_INLINE size_t size(void){ + size_t size = 0; + //check one byte per iteration + for (size_t i = 0; i < sizeof(_indexes)*8; i+=8){ + size += _sizes[(_indexes >> i) & 0xff]; + } + return size; + } + +private: + index_type _indexes; + std::vector<size_t> _sizes; + std::vector<size_t> _fronts; +}; + +/*********************************************************************** + * Super receive packet handler + * + * A receive packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are received in unison in recv(). + **********************************************************************/ +class recv_packet_handler{ +public: + typedef boost::function<managed_recv_buffer::sptr(double)> get_buff_type; + typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function<void(const boost::uint32_t *, vrt::if_packet_info_t &)> vrt_unpacker_type; + + /*! + * Make a new packet handler for receive + * \param size the number of transport channels + */ + recv_packet_handler(const size_t size = 1): + _queue_error_for_next_call(false), + _buffers_infos_index(0) + { + UHD_ASSERT_THROW(size <= sizeof(alignment_indexes::index_type)*8); + this->resize(size); + set_alignment_failure_threshold(1000); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _props.resize(size); + //re-initialize all buffers infos by re-creating the vector + _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt unpacker function and offset + void set_vrt_unpacker(const vrt_unpacker_type &vrt_unpacker, const size_t header_offset_words32 = 0){ + _vrt_unpacker = vrt_unpacker; + _header_offset_words32 = header_offset_words32; + } + + /*! + * Set the threshold for alignment failure. + * How many packets throw out before giving up? + * \param threshold number of packets per channel + */ + void set_alignment_failure_threshold(const size_t threshold){ + _alignment_faulure_threshold = threshold*this->size(); + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Setup the conversion functions (homogeneous across transports). + * Here, we load a table of converters for all possible io types. + * This makes the converter look-up an O(1) operation. + * \param otw_type the channel data type + * \param width the streams per channel (usually 1) + */ + void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ + _io_buffs.resize(width); + _converters.resize(128); + for (size_t io_type = 0; io_type < _converters.size(); io_type++){ + try{ + _converters[io_type] = uhd::convert::get_converter_otw_to_cpu( + io_type_t::tid_t(io_type), otw_type, 1, width + ); + }catch(const uhd::value_error &){} //we expect this, not all io_types valid... + } + _bytes_per_item = otw_type.get_sample_size(); + } + + //! Set the transport channel's overflow handler + void set_overflow_handler(const size_t xport_chan, const handle_overflow_type &handle_overflow){ + _props.at(xport_chan).handle_overflow = handle_overflow; + } + + //! Get a scoped lock object for this instance + boost::mutex::scoped_lock get_scoped_lock(void){ + return boost::mutex::scoped_lock(_mutex); + } + + /******************************************************************* + * Receive: + * The entry point for the fast-path receive calls. + * Dispatch into combinations of single packet receive calls. + ******************************************************************/ + UHD_INLINE size_t recv( + const uhd::device::recv_buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const uhd::io_type_t &io_type, + uhd::device::recv_mode_t recv_mode, + double timeout + ){ + boost::mutex::scoped_lock lock(_mutex); + + //handle metadata queued from a previous receive + if (_queue_error_for_next_call){ + _queue_error_for_next_call = false; + metadata = _queue_metadata; + //We want to allow a full buffer recv to be cut short by a timeout, + //but do not want to generate an inline timeout message packet. + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_TIMEOUT) return 0; + } + + switch(recv_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + return recv_one_packet(buffs, nsamps_per_buff, metadata, io_type, timeout); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::RECV_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t accum_num_samps = recv_one_packet( + buffs, nsamps_per_buff, metadata, io_type, timeout + ); + + //first recv had an error code set, return immediately + if (metadata.error_code != rx_metadata_t::ERROR_CODE_NONE) return accum_num_samps; + + //loop until buffer is filled or error code + while(accum_num_samps < nsamps_per_buff){ + size_t num_samps = recv_one_packet( + buffs, nsamps_per_buff - accum_num_samps, _queue_metadata, + io_type, timeout, accum_num_samps*io_type.size + ); + + //metadata had an error code set, store for next call and return + if (_queue_metadata.error_code != rx_metadata_t::ERROR_CODE_NONE){ + _queue_error_for_next_call = true; + break; + } + accum_num_samps += num_samps; + } + return accum_num_samps; + } + + default: throw uhd::value_error("unknown recv mode"); + }//switch(recv_mode) + } + +private: + + boost::mutex _mutex; + vrt_unpacker_type _vrt_unpacker; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + bool _queue_error_for_next_call; + size_t _alignment_faulure_threshold; + rx_metadata_t _queue_metadata; + struct xport_chan_props_type{ + xport_chan_props_type(void): + packet_count(0), + handle_overflow(&handle_overflow_nop) + {} + get_buff_type get_buff; + size_t packet_count; + handle_overflow_type handle_overflow; + }; + std::vector<xport_chan_props_type> _props; + std::vector<void *> _io_buffs; //used in conversion + size_t _bytes_per_item; //used in conversion + std::vector<uhd::convert::function_type> _converters; //used in conversion + + //! information stored for a received buffer + struct per_buffer_info_type{ + managed_recv_buffer::sptr buff; + const boost::uint32_t *vrt_hdr; + vrt::if_packet_info_t ifpi; + time_spec_t time; + const char *copy_buff; + }; + + //!information stored for a set of aligned buffers + struct buffers_info_type : std::vector<per_buffer_info_type> { + buffers_info_type(const size_t size): + std::vector<per_buffer_info_type>(size), + alignment_time_valid(false), + data_bytes_to_copy(0), + fragment_offset_in_samps(0) + { + indexes_to_do.reset(size); + } + alignment_indexes indexes_to_do; //used in alignment logic + time_spec_t alignment_time; //used in alignment logic + bool alignment_time_valid; //used in alignment logic + size_t data_bytes_to_copy; //keeps track of state + size_t fragment_offset_in_samps; //keeps track of state + rx_metadata_t metadata; //packet description + }; + + //! a circular queue of buffer infos + std::vector<buffers_info_type> _buffers_infos; + size_t _buffers_infos_index; + buffers_info_type &get_curr_buffer_info(void){return _buffers_infos[_buffers_infos_index];} + buffers_info_type &get_prev_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 3)%4];} + buffers_info_type &get_next_buffer_info(void){return _buffers_infos[(_buffers_infos_index + 1)%4];} + void increment_buffer_info(void){_buffers_infos_index = (_buffers_infos_index + 1)%4;} + + //! possible return options for the packet receiver + enum packet_type{ + PACKET_IF_DATA, + PACKET_TIMESTAMP_ERROR, + PACKET_INLINE_MESSAGE, + PACKET_TIMEOUT_ERROR, + PACKET_SEQUENCE_ERROR + }; + + /******************************************************************* + * Get and process a single packet from the transport: + * Receive a single packet at the given index. + * Extract all the relevant info and store. + * Check the info to determine the return code. + ******************************************************************/ + UHD_INLINE packet_type get_and_process_single_packet( + const size_t index, + buffers_info_type &prev_buffer_info, + buffers_info_type &curr_buffer_info, + double timeout + ){ + //get a single packet from the transport layer + managed_recv_buffer::sptr &buff = curr_buffer_info[index].buff; + buff = _props[index].get_buff(timeout); + if (buff.get() == NULL) return PACKET_TIMEOUT_ERROR; + + //bounds check before extract + size_t num_packet_words32 = buff->size()/sizeof(boost::uint32_t); + if (num_packet_words32 <= _header_offset_words32){ + throw std::runtime_error("recv buffer smaller than vrt packet offset"); + } + + //extract packet info + per_buffer_info_type &info = curr_buffer_info[index]; + info.ifpi.num_packet_words32 = num_packet_words32 - _header_offset_words32; + info.vrt_hdr = buff->cast<const boost::uint32_t *>() + _header_offset_words32; + _vrt_unpacker(info.vrt_hdr, info.ifpi); + info.time = time_spec_t(time_t(info.ifpi.tsi), size_t(info.ifpi.tsf), _tick_rate); //assumes has_tsi and has_tsf are true + info.copy_buff = reinterpret_cast<const char *>(info.vrt_hdr + info.ifpi.num_header_words32); + + //store the packet count for the next iteration + #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t expected_packet_count = _props[index].packet_count; + _props[index].packet_count = (info.ifpi.packet_count + 1)%16; + #endif + + //-------------------------------------------------------------- + //-- Determine return conditions: + //-- The order of these checks is HOLY. + //-------------------------------------------------------------- + + //1) check for out of order timestamps + if (info.ifpi.has_tsi and info.ifpi.has_tsf and prev_buffer_info[index].time > info.time){ + return PACKET_TIMESTAMP_ERROR; + } + + //2) check for inline IF message packets + if (info.ifpi.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA){ + return PACKET_INLINE_MESSAGE; + } + + //3) check for sequence errors + #ifndef SRPH_DONT_CHECK_SEQUENCE + if (expected_packet_count != info.ifpi.packet_count){ + return PACKET_SEQUENCE_ERROR; + } + #endif + + //4) otherwise the packet is normal! + return PACKET_IF_DATA; + } + + /******************************************************************* + * Alignment check: + * Check the received packet for alignment and mark accordingly. + ******************************************************************/ + UHD_INLINE void alignment_check( + const size_t index, buffers_info_type &info + ){ + //if alignment time was not valid or if the sequence id is newer: + // use this index's time as the alignment time + // reset the indexes list and remove this index + if (not info.alignment_time_valid or info[index].time > info.alignment_time){ + info.alignment_time_valid = true; + info.alignment_time = info[index].time; + info.indexes_to_do.reset(this->size()); + info.indexes_to_do.remove(index); + info.data_bytes_to_copy = info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t); + } + + //if the sequence id matches: + // remove this index from the list and continue + else if (info[index].time == info.alignment_time){ + info.indexes_to_do.remove(index); + } + + //if the sequence id is older: + // continue with the same index to try again + //else if (info[index].time < info.alignment_time)... + } + + /******************************************************************* + * Get aligned buffers: + * Iterate through each index and try to accumulate aligned buffers. + * Handle all of the edge cases like inline messages and errors. + * The logic will throw out older packets until it finds a match. + ******************************************************************/ + UHD_INLINE void get_aligned_buffs(double timeout){ + + increment_buffer_info(); //increment to next buffer + buffers_info_type &prev_info = get_prev_buffer_info(); + buffers_info_type &curr_info = get_curr_buffer_info(); + buffers_info_type &next_info = get_next_buffer_info(); + + //Loop until we get a message of an aligned set of buffers: + // - Receive a single packet and extract its info. + // - Handle the packet type yielded by the receive. + // - Check the timestamps for alignment conditions. + size_t iterations = 0; + while (not curr_info.indexes_to_do.empty()){ + + //get the index to process for this iteration + const size_t index = curr_info.indexes_to_do.front(); + packet_type packet; + + //receive a single packet from the transport + try{ + packet = get_and_process_single_packet( + index, prev_info, curr_info, timeout + ); + } + + //handle the case when the get packet throws + catch(const std::exception &e){ + UHD_MSG(error) << boost::format( + "The receive packet handler caught an exception.\n%s" + ) % e.what() << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_BAD_PACKET; + return; + } + + switch(packet){ + case PACKET_IF_DATA: + alignment_check(index, curr_info); + break; + + case PACKET_TIMESTAMP_ERROR: + //If the user changes the device time while streaming or without flushing, + //we can receive a packet that comes before the previous packet in time. + //This could cause the alignment logic to discard future received packets. + //Therefore, when this occurs, we reset the info to restart from scratch. + if (curr_info.alignment_time_valid and curr_info.alignment_time != curr_info[index].time){ + curr_info.alignment_time_valid = false; + } + alignment_check(index, curr_info); + break; + + case PACKET_INLINE_MESSAGE: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = next_info[index].ifpi.has_tsi and next_info[index].ifpi.has_tsf; + curr_info.metadata.time_spec = next_info[index].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::error_code_t(get_context_code(next_info[index].vrt_hdr, next_info[index].ifpi)); + if (curr_info.metadata.error_code == rx_metadata_t::ERROR_CODE_OVERFLOW) _props[index].handle_overflow(); + UHD_MSG(fastpath) << "O"; + return; + + case PACKET_TIMEOUT_ERROR: + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_TIMEOUT; + return; + + case PACKET_SEQUENCE_ERROR: + alignment_check(index, curr_info); + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = prev_info.metadata.has_time_spec; + curr_info.metadata.time_spec = prev_info.metadata.time_spec + time_spec_t(0, + prev_info[index].ifpi.num_payload_words32*sizeof(boost::uint32_t)/_bytes_per_item, _samp_rate); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_OVERFLOW; + UHD_MSG(fastpath) << "O"; + return; + + } + + //too many iterations: detect alignment failure + if (iterations++ > _alignment_faulure_threshold){ + UHD_MSG(error) << boost::format( + "The receive packet handler failed to time-align packets.\n" + "%u received packets were processed by the handler.\n" + "However, a timestamp match could not be determined.\n" + ) % iterations << std::endl; + std::swap(curr_info, next_info); //save progress from curr -> next + curr_info.metadata.has_time_spec = false; + curr_info.metadata.time_spec = time_spec_t(0.0); + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + curr_info.metadata.start_of_burst = false; + curr_info.metadata.end_of_burst = false; + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_ALIGNMENT; + return; + } + + } + + //set the metadata from the buffer information at index zero + curr_info.metadata.has_time_spec = curr_info[0].ifpi.has_tsi and curr_info[0].ifpi.has_tsf; + curr_info.metadata.time_spec = curr_info[0].time; + curr_info.metadata.more_fragments = false; + curr_info.metadata.fragment_offset = 0; + /* TODO SOB on RX not supported in hardware + static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits + curr_info.metadata.start_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_sob_flags) != 0); + */ + curr_info.metadata.start_of_burst = false; + static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits + curr_info.metadata.end_of_burst = curr_info[0].ifpi.has_tlr and (int(curr_info[0].ifpi.tlr & tlr_eob_flags) != 0); + curr_info.metadata.error_code = rx_metadata_t::ERROR_CODE_NONE; + + } + + /******************************************************************* + * Receive a single packet: + * Handles fragmentation, messages, errors, and copy-conversion. + * When no fragments are available, call the get aligned buffers. + * Then copy-convert available data into the user's IO buffers. + ******************************************************************/ + UHD_INLINE size_t recv_one_packet( + const uhd::device::recv_buffs_type &buffs, + const size_t nsamps_per_buff, + uhd::rx_metadata_t &metadata, + const uhd::io_type_t &io_type, + double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //get the next buffer if the current one has expired + if (get_curr_buffer_info().data_bytes_to_copy == 0){ + + //reset current buffer info members for reuse + get_curr_buffer_info().fragment_offset_in_samps = 0; + get_curr_buffer_info().alignment_time_valid = false; + get_curr_buffer_info().indexes_to_do.reset(this->size()); + + //perform receive with alignment logic + get_aligned_buffs(timeout); + } + + buffers_info_type &info = get_curr_buffer_info(); + metadata = info.metadata; + + //interpolate the time spec (useful when this is a fragment) + metadata.time_spec += time_spec_t(0, info.fragment_offset_in_samps, _samp_rate); + + //extract the number of samples available to copy + const size_t nsamps_available = info.data_bytes_to_copy/_bytes_per_item; + const size_t nsamps_to_copy = std::min(nsamps_per_buff*_io_buffs.size(), nsamps_available); + const size_t bytes_to_copy = nsamps_to_copy*_bytes_per_item; + const size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/_io_buffs.size(); + + size_t buff_index = 0; + BOOST_FOREACH(per_buffer_info_type &buff_info, info){ + + //fill a vector with pointers to the io buffers + BOOST_FOREACH(void *&io_buff, _io_buffs){ + io_buff = reinterpret_cast<char *>(buffs[buff_index++]) + buffer_offset_bytes; + } + + //copy-convert the samples from the recv buffer + _converters[io_type.tid](buff_info.copy_buff, _io_buffs, nsamps_to_copy_per_io_buff, 1/32767.); + + //update the rx copy buffer to reflect the bytes copied + buff_info.copy_buff += bytes_to_copy; + } + //update the copy buffer's availability + info.data_bytes_to_copy -= bytes_to_copy; + + //setup the fragment flags and offset + metadata.more_fragments = info.data_bytes_to_copy != 0; + metadata.fragment_offset = info.fragment_offset_in_samps; + info.fragment_offset_in_samps += nsamps_to_copy; //set for next call + + return nsamps_to_copy_per_io_buff; + } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp new file mode 100644 index 000000000..8ebc264ef --- /dev/null +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -0,0 +1,287 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP +#define INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP + +#include <uhd/config.hpp> +#include <uhd/exception.hpp> +#include <uhd/convert.hpp> +#include <uhd/device.hpp> +#include <uhd/utils/byteswap.hpp> +#include <uhd/types/io_type.hpp> +#include <uhd/types/otw_type.hpp> +#include <uhd/types/metadata.hpp> +#include <uhd/transport/vrt_if_packet.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/thread/thread_time.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/foreach.hpp> +#include <boost/function.hpp> +#include <iostream> +#include <vector> + +namespace uhd{ namespace transport{ namespace sph{ + +/*********************************************************************** + * Super send packet handler + * + * A send packet handler represents a group of channels. + * The channel group shares a common sample rate. + * All channels are sent in unison in send(). + **********************************************************************/ +class send_packet_handler{ +public: + typedef boost::function<managed_send_buffer::sptr(double)> get_buff_type; + typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); + //typedef boost::function<void(boost::uint32_t *, vrt::if_packet_info_t &)> vrt_packer_type; + + /*! + * Make a new packet handler for send + * \param size the number of transport channels + */ + send_packet_handler(const size_t size = 1): + _next_packet_seq(0) + { + this->resize(size); + } + + //! Resize the number of transport channels + void resize(const size_t size){ + if (this->size() == size) return; + _props.resize(size); + static const boost::uint64_t zero = 0; + _zero_buffs.resize(size, &zero); + } + + //! Get the channel width of this handler + size_t size(void) const{ + return _props.size(); + } + + //! Setup the vrt packer function and offset + void set_vrt_packer(const vrt_packer_type &vrt_packer, const size_t header_offset_words32 = 0){ + _vrt_packer = vrt_packer; + _header_offset_words32 = header_offset_words32; + } + + //! Set the rate of ticks per second + void set_tick_rate(const double rate){ + _tick_rate = rate; + } + + //! Set the rate of samples per second + void set_samp_rate(const double rate){ + _samp_rate = rate; + } + + /*! + * Set the function to get a managed buffer. + * \param xport_chan which transport channel + * \param get_buff the getter function + */ + void set_xport_chan_get_buff(const size_t xport_chan, const get_buff_type &get_buff){ + _props.at(xport_chan).get_buff = get_buff; + } + + /*! + * Setup the conversion functions (homogeneous across transports). + * Here, we load a table of converters for all possible io types. + * This makes the converter look-up an O(1) operation. + * \param otw_type the channel data type + * \param width the streams per channel (usually 1) + */ + void set_converter(const uhd::otw_type_t &otw_type, const size_t width = 1){ + _io_buffs.resize(width); + _converters.resize(128); + for (size_t io_type = 0; io_type < _converters.size(); io_type++){ + try{ + _converters[io_type] = uhd::convert::get_converter_cpu_to_otw( + io_type_t::tid_t(io_type), otw_type, 1, width + ); + }catch(const uhd::value_error &){} //we expect this, not all io_types valid... + } + _bytes_per_item = otw_type.get_sample_size(); + } + + /*! + * Set the maximum number of samples per host packet. + * Ex: A USRP1 in dual channel mode would be half. + * \param num_samps the maximum samples in a packet + */ + void set_max_samples_per_packet(const size_t num_samps){ + _max_samples_per_packet = num_samps; + } + + //! Get a scoped lock object for this instance + boost::mutex::scoped_lock get_scoped_lock(void){ + return boost::mutex::scoped_lock(_mutex); + } + + /******************************************************************* + * Send: + * The entry point for the fast-path send calls. + * Dispatch into combinations of single packet send calls. + ******************************************************************/ + UHD_INLINE size_t send( + const uhd::device::send_buffs_type &buffs, + const size_t nsamps_per_buff, + const uhd::tx_metadata_t &metadata, + const uhd::io_type_t &io_type, + uhd::device::send_mode_t send_mode, + double timeout + ){ + boost::mutex::scoped_lock lock(_mutex); + + //translate the metadata to vrt if packet info + vrt::if_packet_info_t if_packet_info; + if_packet_info.has_sid = false; + if_packet_info.has_cid = false; + if_packet_info.has_tlr = false; + if_packet_info.has_tsi = metadata.has_time_spec; + if_packet_info.has_tsf = metadata.has_time_spec; + if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(_tick_rate)); + if_packet_info.sob = metadata.start_of_burst; + if_packet_info.eob = metadata.end_of_burst; + + if (nsamps_per_buff <= _max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; + switch(send_mode){ + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_ONE_PACKET:{ + //////////////////////////////////////////////////////////////// + + //TODO remove this code when sample counts of zero are supported by hardware + if (nsamps_per_buff == 0) return send_one_packet( + _zero_buffs, 1, if_packet_info, io_type, timeout + ); + + return send_one_packet( + buffs, + std::min(nsamps_per_buff, _max_samples_per_packet), + if_packet_info, io_type, timeout + ); + } + + //////////////////////////////////////////////////////////////// + case uhd::device::SEND_MODE_FULL_BUFF:{ + //////////////////////////////////////////////////////////////// + size_t total_num_samps_sent = 0; + + //false until final fragment + if_packet_info.eob = false; + + const size_t num_fragments = (nsamps_per_buff-1)/_max_samples_per_packet; + const size_t final_length = ((nsamps_per_buff-1)%_max_samples_per_packet)+1; + + //loop through the following fragment indexes + for (size_t i = 0; i < num_fragments; i++){ + + //send a fragment with the helper function + const size_t num_samps_sent = send_one_packet( + buffs, _max_samples_per_packet, + if_packet_info, io_type, timeout, + total_num_samps_sent*io_type.size + ); + total_num_samps_sent += num_samps_sent; + if (num_samps_sent == 0) return total_num_samps_sent; + + //setup metadata for the next fragment + const time_spec_t time_spec = metadata.time_spec + time_spec_t(0, total_num_samps_sent, _samp_rate); + if_packet_info.tsi = boost::uint32_t(time_spec.get_full_secs()); + if_packet_info.tsf = boost::uint64_t(time_spec.get_tick_count(_tick_rate)); + if_packet_info.sob = false; + + } + + //send the final fragment with the helper function + if_packet_info.eob = metadata.end_of_burst; + return total_num_samps_sent + send_one_packet( + buffs, final_length, + if_packet_info, io_type, timeout, + total_num_samps_sent*io_type.size + ); + } + + default: throw uhd::value_error("unknown send mode"); + }//switch(send_mode) + } + +private: + + boost::mutex _mutex; + vrt_packer_type _vrt_packer; + size_t _header_offset_words32; + double _tick_rate, _samp_rate; + struct xport_chan_props_type{ + get_buff_type get_buff; + }; + std::vector<xport_chan_props_type> _props; + std::vector<const void *> _io_buffs; //used in conversion + size_t _bytes_per_item; //used in conversion + std::vector<uhd::convert::function_type> _converters; //used in conversion + size_t _max_samples_per_packet; + std::vector<const void *> _zero_buffs; + size_t _next_packet_seq; + + /******************************************************************* + * Send a single packet: + ******************************************************************/ + size_t send_one_packet( + const uhd::device::send_buffs_type &buffs, + const size_t nsamps_per_buff, + vrt::if_packet_info_t &if_packet_info, + const uhd::io_type_t &io_type, + double timeout, + const size_t buffer_offset_bytes = 0 + ){ + //load the rest of the if_packet_info in here + if_packet_info.num_payload_words32 = (nsamps_per_buff*_io_buffs.size()*_bytes_per_item)/sizeof(boost::uint32_t); + if_packet_info.packet_count = _next_packet_seq; + + size_t buff_index = 0; + BOOST_FOREACH(xport_chan_props_type &props, _props){ + managed_send_buffer::sptr buff = props.get_buff(timeout); + if (buff.get() == NULL) return 0; //timeout + + //fill a vector with pointers to the io buffers + BOOST_FOREACH(const void *&io_buff, _io_buffs){ + io_buff = reinterpret_cast<const char *>(buffs[buff_index++]) + buffer_offset_bytes; + } + boost::uint32_t *otw_mem = buff->cast<boost::uint32_t *>() + _header_offset_words32; + + //pack metadata into a vrt header + _vrt_packer(otw_mem, if_packet_info); + otw_mem += if_packet_info.num_header_words32; + + //copy-convert the samples into the send buffer + _converters[io_type.tid](_io_buffs, otw_mem, nsamps_per_buff, 32767.); + + //commit the samples to the zero-copy interface + size_t num_bytes_total = (_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); + buff->commit(num_bytes_total); + + } + _next_packet_seq++; //increment sequence after commits + return nsamps_per_buff; + } +}; + +}}} //namespace + +#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/usb_zero_copy_wrapper.cpp b/host/lib/transport/usb_zero_copy_wrapper.cpp new file mode 100644 index 000000000..227c4b392 --- /dev/null +++ b/host/lib/transport/usb_zero_copy_wrapper.cpp @@ -0,0 +1,202 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/usb_zero_copy.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/transport/buffer_pool.hpp> +#include <boost/foreach.hpp> +#include <vector> +#include <iostream> + +using namespace uhd::transport; +bool debug = true; + +static inline size_t next_boundary(size_t length, size_t boundary){ + //pad to the boundary, assumes boundary is a power of 2 + return (length + (boundary-1)) & ~(boundary-1); +} + +/*********************************************************************** + * USB zero copy wrapper - managed receive buffer + **********************************************************************/ +class usb_zero_copy_wrapper_mrb : public managed_recv_buffer{ +public: + usb_zero_copy_wrapper_mrb(bounded_buffer<usb_zero_copy_wrapper_mrb *> &queue): + _queue(queue){/*NOP*/} + + void release(void){ + if (_mrb.get() == NULL) return; + _mrb->release(); + _queue.push_with_haste(this); + _mrb.reset(); + } + + sptr get_new(managed_recv_buffer::sptr mrb, const void *mem, size_t len){ + _mrb = mrb; + _mem = mem; + _len = len; + return make_managed_buffer(this); + } + +private: + const void *get_buff(void) const{return _mem;} + size_t get_size(void) const{return _len;} + + bounded_buffer<usb_zero_copy_wrapper_mrb *> &_queue; + const void *_mem; + size_t _len; + managed_recv_buffer::sptr _mrb; +}; + +/*********************************************************************** + * USB zero copy wrapper - managed send buffer + **********************************************************************/ +class usb_zero_copy_wrapper_msb : public managed_send_buffer{ +public: + usb_zero_copy_wrapper_msb(bounded_buffer<usb_zero_copy_wrapper_msb *> &queue, size_t boundary): + _queue(queue), _boundary(boundary){/*NOP*/} + + void commit(size_t len){ + if (_msb.get() == NULL) return; + _msb->commit(next_boundary(len, _boundary)); + _queue.push_with_haste(this); + _msb.reset(); + } + + sptr get_new(managed_send_buffer::sptr msb){ + _msb = msb; + return make_managed_buffer(this); + } + +private: + void *get_buff(void) const{return _msb->cast<void *>();} + size_t get_size(void) const{return _msb->size();} + + bounded_buffer<usb_zero_copy_wrapper_msb *> &_queue; + size_t _boundary; + managed_send_buffer::sptr _msb; +}; + +/*********************************************************************** + * USB zero copy wrapper implementation + **********************************************************************/ +class usb_zero_copy_wrapper : public usb_zero_copy{ +public: + usb_zero_copy_wrapper( + sptr usb_zc, size_t usb_frame_boundary + ): + _internal_zc(usb_zc), + _usb_frame_boundary(usb_frame_boundary), + _available_recv_buffs(this->get_num_recv_frames()), + _available_send_buffs(this->get_num_send_frames()), + _mrb_pool(this->get_num_recv_frames(), usb_zero_copy_wrapper_mrb(_available_recv_buffs)), + _msb_pool(this->get_num_send_frames(), usb_zero_copy_wrapper_msb(_available_send_buffs, usb_frame_boundary)) + { + BOOST_FOREACH(usb_zero_copy_wrapper_mrb &mrb, _mrb_pool){ + _available_recv_buffs.push_with_haste(&mrb); + } + + BOOST_FOREACH(usb_zero_copy_wrapper_msb &msb, _msb_pool){ + _available_send_buffs.push_with_haste(&msb); + } + } + + managed_recv_buffer::sptr get_recv_buff(double timeout){ + //attempt to get a managed recv buffer + if (not _last_recv_buff.get()){ + _last_recv_buff = _internal_zc->get_recv_buff(timeout); + _last_recv_offset = 0; + } + + //attempt to get a wrapper for a managed recv buffer + usb_zero_copy_wrapper_mrb *wmrb = NULL; + if (_last_recv_buff.get() and _available_recv_buffs.pop_with_timed_wait(wmrb, timeout)){ + //extract this packet's memory address and length in bytes + const char *mem = _last_recv_buff->cast<const char *>() + _last_recv_offset; + const boost::uint32_t *mem32 = reinterpret_cast<const boost::uint32_t *>(mem); + size_t len = (mem32[0] & 0xffff)*sizeof(boost::uint32_t); //length in bytes (from VRT header) + + managed_recv_buffer::sptr recv_buff; //the buffer to be returned to the user + + recv_buff = wmrb->get_new(_last_recv_buff, mem, len); + _last_recv_offset = next_boundary(_last_recv_offset + len, _usb_frame_boundary); + + //check if this receive buffer has been exhausted + if (_last_recv_offset >= _last_recv_buff->size()) { + _last_recv_buff.reset(); + } + + return recv_buff; + } + + //otherwise return a null sptr for failure + return managed_recv_buffer::sptr(); + } + + size_t get_num_recv_frames(void) const{ + return _internal_zc->get_num_recv_frames(); + } + + size_t get_recv_frame_size(void) const{ + return _internal_zc->get_recv_frame_size(); + } + + managed_send_buffer::sptr get_send_buff(double timeout){ + managed_send_buffer::sptr send_buff = _internal_zc->get_send_buff(timeout); + + //attempt to get a wrapper for a managed send buffer + usb_zero_copy_wrapper_msb *wmsb = NULL; + if (send_buff.get() and _available_send_buffs.pop_with_haste(wmsb)){ + return wmsb->get_new(send_buff); + } + + //otherwise return a null sptr for failure + return managed_send_buffer::sptr(); + } + + size_t get_num_send_frames(void) const{ + return _internal_zc->get_num_send_frames(); + } + + size_t get_send_frame_size(void) const{ + return _internal_zc->get_send_frame_size(); + } + +private: + sptr _internal_zc; + size_t _usb_frame_boundary; + bounded_buffer<usb_zero_copy_wrapper_mrb *> _available_recv_buffs; + bounded_buffer<usb_zero_copy_wrapper_msb *> _available_send_buffs; + std::vector<usb_zero_copy_wrapper_mrb> _mrb_pool; + std::vector<usb_zero_copy_wrapper_msb> _msb_pool; + + //buffer to store partially-received VRT packets in + buffer_pool::sptr _fragment_mem; + + //state for last recv buffer to create multiple managed buffers + managed_recv_buffer::sptr _last_recv_buff; + size_t _last_recv_offset; +}; + +/*********************************************************************** + * USB zero copy wrapper factory function + **********************************************************************/ +usb_zero_copy::sptr usb_zero_copy::make_wrapper( + sptr usb_zc, size_t usb_frame_boundary +){ + return sptr(new usb_zero_copy_wrapper(usb_zc, usb_frame_boundary)); +} diff --git a/host/lib/transport/vrt_packet_handler.hpp b/host/lib/transport/vrt_packet_handler.hpp deleted file mode 100644 index d74b2c13c..000000000 --- a/host/lib/transport/vrt_packet_handler.hpp +++ /dev/null @@ -1,470 +0,0 @@ -// -// Copyright 2010-2011 Ettus Research LLC -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see <http://www.gnu.org/licenses/>. -// - -#ifndef INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP -#define INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP - -#include <uhd/config.hpp> -#include <uhd/device.hpp> -#include <uhd/exception.hpp> -#include <uhd/utils/byteswap.hpp> -#include <uhd/types/io_type.hpp> -#include <uhd/types/otw_type.hpp> -#include <uhd/types/metadata.hpp> -#include <uhd/transport/vrt_if_packet.hpp> -#include <uhd/convert.hpp> -#include <uhd/transport/zero_copy.hpp> -#include <boost/function.hpp> -#include <stdexcept> -#include <iostream> -#include <vector> - -namespace vrt_packet_handler{ - -//this may change in the future but its a constant for now -static const size_t OTW_BYTES_PER_SAMP = sizeof(boost::uint32_t); - -template <typename T> UHD_INLINE T get_context_code( - const boost::uint32_t *vrt_hdr, - const uhd::transport::vrt::if_packet_info_t &if_packet_info -){ - //extract the context word (we dont know the endianness so mirror the bytes) - boost::uint32_t word0 = vrt_hdr[if_packet_info.num_header_words32] | - uhd::byteswap(vrt_hdr[if_packet_info.num_header_words32]); - return T(word0 & 0xff); -} - -/*********************************************************************** - * vrt packet handler for recv - **********************************************************************/ - typedef std::vector<uhd::transport::managed_recv_buffer::sptr> managed_recv_buffs_t; - typedef boost::function<bool(managed_recv_buffs_t &)> get_recv_buffs_t; - typedef boost::function<void(size_t /*which channel*/)> handle_overflow_t; - typedef boost::function<void(const boost::uint32_t *, uhd::transport::vrt::if_packet_info_t &)> vrt_unpacker_t; - - static inline void handle_overflow_nop(size_t){} - - struct recv_state{ - //width of the receiver in channels - size_t width; - - //state variables to handle fragments - managed_recv_buffs_t managed_buffs; - std::vector<const boost::uint8_t *> copy_buffs; - size_t size_of_copy_buffs; - size_t fragment_offset_in_samps; - std::vector<void *> io_buffs; - std::vector<const void *> otw_buffs; - - recv_state(size_t width = 1): - width(width), - managed_buffs(width), - copy_buffs(width, NULL), - size_of_copy_buffs(0), - fragment_offset_in_samps(0), - io_buffs(0) //resized later - { - /* NOP */ - } - }; - - /******************************************************************* - * Unpack a received vrt header and set the copy buffer. - * - helper function for vrt_packet_handler::_recv1 - ******************************************************************/ - static UHD_INLINE void _recv1_helper( - recv_state &state, - uhd::rx_metadata_t &metadata, - double tick_rate, - const vrt_unpacker_t &vrt_unpacker, - const handle_overflow_t &handle_overflow, - size_t vrt_header_offset_words32 - ){ - //vrt unpack each managed buffer - uhd::transport::vrt::if_packet_info_t if_packet_info; - for (size_t i = 0; i < state.width; i++){ - if (state.managed_buffs[i].get() == NULL) continue; //better have a message packet coming up... - - //extract packet words and check thats its enough to move on - size_t num_packet_words32 = state.managed_buffs[i]->size()/sizeof(boost::uint32_t); - if (num_packet_words32 <= vrt_header_offset_words32){ - throw std::runtime_error("recv buffer smaller than vrt packet offset"); - } - - //unpack the vrt header into the info struct - const boost::uint32_t *vrt_hdr = state.managed_buffs[i]->cast<const boost::uint32_t *>() + vrt_header_offset_words32; - if_packet_info.num_packet_words32 = num_packet_words32 - vrt_header_offset_words32; - vrt_unpacker(vrt_hdr, if_packet_info); - - //handle the non-data packet case and parse its contents - if (if_packet_info.packet_type != uhd::transport::vrt::if_packet_info_t::PACKET_TYPE_DATA){ - - metadata.error_code = get_context_code<uhd::rx_metadata_t::error_code_t>(vrt_hdr, if_packet_info); - if (metadata.error_code == uhd::rx_metadata_t::ERROR_CODE_OVERFLOW) handle_overflow(i); - - //break to exit loop and store metadata below - state.size_of_copy_buffs = 0; break; - } - - //setup the buffer to point to the data - state.copy_buffs[i] = reinterpret_cast<const boost::uint8_t *>(vrt_hdr + if_packet_info.num_header_words32); - - //store the minimum payload length into the copy buffer length - size_t num_payload_bytes = if_packet_info.num_payload_words32*sizeof(boost::uint32_t); - if (i == 0 or state.size_of_copy_buffs > num_payload_bytes){ - state.size_of_copy_buffs = num_payload_bytes; - } - } - - //store the last vrt info into the metadata - metadata.has_time_spec = if_packet_info.has_tsi and if_packet_info.has_tsf; - metadata.time_spec = uhd::time_spec_t( - time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), tick_rate - ); - static const int tlr_sob_flags = (1 << 21) | (1 << 9); //enable and indicator bits - metadata.start_of_burst = if_packet_info.has_tlr and (int(if_packet_info.tlr & tlr_sob_flags) == tlr_sob_flags); - static const int tlr_eob_flags = (1 << 20) | (1 << 8); //enable and indicator bits - metadata.end_of_burst = if_packet_info.has_tlr and (int(if_packet_info.tlr & tlr_eob_flags) == tlr_eob_flags); - } - - /******************************************************************* - * Recv data, unpack a vrt header, and copy-convert the data. - * - helper function for vrt_packet_handler::recv - ******************************************************************/ - static UHD_INLINE size_t _recv1( - recv_state &state, - const uhd::device::recv_buffs_type &buffs, - size_t offset_bytes, - size_t total_samps, - uhd::rx_metadata_t &metadata, - uhd::convert::function_type &converter, - double tick_rate, - const vrt_unpacker_t &vrt_unpacker, - const get_recv_buffs_t &get_recv_buffs, - const handle_overflow_t &handle_overflow, - size_t vrt_header_offset_words32, - size_t chans_per_otw_buff - ){ - metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_NONE; - - //perform a receive if no rx data is waiting to be copied - if (state.size_of_copy_buffs == 0){ - state.fragment_offset_in_samps = 0; - if (not get_recv_buffs(state.managed_buffs)){ - metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_TIMEOUT; - return 0; - } - try{ - _recv1_helper( - state, metadata, tick_rate, - vrt_unpacker, handle_overflow, - vrt_header_offset_words32 - ); - }catch(const std::exception &e){ - state.size_of_copy_buffs = 0; //reset copy buffs size - std::cerr << "Error (recv): " << e.what() << std::endl; - metadata.error_code = uhd::rx_metadata_t::ERROR_CODE_BAD_PACKET; - return 0; - } - } - //defaults for the metadata when this is a fragment - else{ - metadata.has_time_spec = false; - metadata.start_of_burst = false; - metadata.end_of_burst = false; - } - - //extract the number of samples available to copy - size_t bytes_per_item = OTW_BYTES_PER_SAMP; - size_t nsamps_available = state.size_of_copy_buffs/bytes_per_item; - size_t nsamps_to_copy = std::min(total_samps*chans_per_otw_buff, nsamps_available); - size_t bytes_to_copy = nsamps_to_copy*bytes_per_item; - size_t nsamps_to_copy_per_io_buff = nsamps_to_copy/chans_per_otw_buff; - - for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ - - //fill a vector with pointers to the io buffers - for (size_t j = 0; j < chans_per_otw_buff; j++){ - state.io_buffs[j] = reinterpret_cast<boost::uint8_t *>(buffs[i+j]) + offset_bytes; - } - - //copy-convert the samples from the recv buffer - converter(state.copy_buffs[i], state.io_buffs, nsamps_to_copy_per_io_buff); - - //update the rx copy buffer to reflect the bytes copied - state.copy_buffs[i] += bytes_to_copy; - } - //update the copy buffer's availability - state.size_of_copy_buffs -= bytes_to_copy; - - //setup the fragment flags and offset - metadata.more_fragments = state.size_of_copy_buffs != 0; - metadata.fragment_offset = state.fragment_offset_in_samps; - state.fragment_offset_in_samps += nsamps_to_copy; //set for next call - - return nsamps_to_copy_per_io_buff; - } - - /******************************************************************* - * Recv vrt packets and copy convert the samples into the buffer. - ******************************************************************/ - static UHD_INLINE size_t recv( - recv_state &state, - const uhd::device::recv_buffs_type &buffs, - const size_t total_num_samps, - uhd::rx_metadata_t &metadata, - uhd::device::recv_mode_t recv_mode, - const uhd::io_type_t &io_type, - const uhd::otw_type_t &otw_type, - double tick_rate, - const vrt_unpacker_t &vrt_unpacker, - const get_recv_buffs_t &get_recv_buffs, - const handle_overflow_t &handle_overflow = &handle_overflow_nop, - size_t vrt_header_offset_words32 = 0, - size_t chans_per_otw_buff = 1 - ){ - state.io_buffs.resize(chans_per_otw_buff); - - uhd::convert::function_type converter( - uhd::convert::get_converter_otw_to_cpu( - io_type, otw_type, 1, chans_per_otw_buff - )); - - switch(recv_mode){ - - //////////////////////////////////////////////////////////////// - case uhd::device::RECV_MODE_ONE_PACKET:{ - //////////////////////////////////////////////////////////////// - return _recv1( - state, - buffs, 0, - total_num_samps, - metadata, - converter, - tick_rate, - vrt_unpacker, - get_recv_buffs, - handle_overflow, - vrt_header_offset_words32, - chans_per_otw_buff - ); - } - - //////////////////////////////////////////////////////////////// - case uhd::device::RECV_MODE_FULL_BUFF:{ - //////////////////////////////////////////////////////////////// - size_t accum_num_samps = 0; - uhd::rx_metadata_t tmp_md; - while(accum_num_samps < total_num_samps){ - size_t num_samps = _recv1( - state, - buffs, accum_num_samps*io_type.size, - total_num_samps - accum_num_samps, - (accum_num_samps == 0)? metadata : tmp_md, //only the first metadata gets kept - converter, - tick_rate, - vrt_unpacker, - get_recv_buffs, - handle_overflow, - vrt_header_offset_words32, - chans_per_otw_buff - ); - if (num_samps == 0) break; //had a recv timeout or error, break loop - accum_num_samps += num_samps; - } - return accum_num_samps; - } - - default: throw std::runtime_error("unknown recv mode"); - }//switch(recv_mode) - } - -/*********************************************************************** - * vrt packet handler for send - **********************************************************************/ - typedef std::vector<uhd::transport::managed_send_buffer::sptr> managed_send_buffs_t; - typedef boost::function<bool(managed_send_buffs_t &)> get_send_buffs_t; - typedef boost::function<void(boost::uint32_t *, uhd::transport::vrt::if_packet_info_t &)> vrt_packer_t; - - static const boost::uint64_t zeros = 0; - - struct send_state{ - //init the expected seq number - size_t next_packet_seq; - managed_send_buffs_t managed_buffs; - std::vector<const void *> zero_buffs; - std::vector<const void *> io_buffs; - - send_state(size_t width = 1): - next_packet_seq(0), - managed_buffs(width), - zero_buffs(width, &zeros), - io_buffs(0) //resized later - { - /* NOP */ - } - }; - - /******************************************************************* - * Pack a vrt header, copy-convert the data, and send it. - * - helper function for vrt_packet_handler::send - ******************************************************************/ - static UHD_INLINE size_t _send1( - send_state &state, - const uhd::device::send_buffs_type &buffs, - const size_t offset_bytes, - const size_t num_samps, - uhd::transport::vrt::if_packet_info_t &if_packet_info, - uhd::convert::function_type &converter, - const vrt_packer_t &vrt_packer, - const get_send_buffs_t &get_send_buffs, - const size_t vrt_header_offset_words32, - const size_t chans_per_otw_buff - ){ - //load the rest of the if_packet_info in here - if_packet_info.num_payload_words32 = (num_samps*chans_per_otw_buff*OTW_BYTES_PER_SAMP)/sizeof(boost::uint32_t); - if_packet_info.packet_count = state.next_packet_seq; - - //get send buffers for each otw channel - if (not get_send_buffs(state.managed_buffs)) return 0; - - for (size_t i = 0; i < buffs.size(); i+=chans_per_otw_buff){ - //calculate pointers with offsets to io and otw memory - for (size_t j = 0; j < chans_per_otw_buff; j++){ - state.io_buffs[j] = reinterpret_cast<const boost::uint8_t *>(buffs[i+j]) + offset_bytes; - } - boost::uint32_t *otw_mem = state.managed_buffs[i]->cast<boost::uint32_t *>() + vrt_header_offset_words32; - - //pack metadata into a vrt header - vrt_packer(otw_mem, if_packet_info); - otw_mem += if_packet_info.num_header_words32; - - //copy-convert the samples into the send buffer - converter(state.io_buffs, otw_mem, num_samps); - - //commit the samples to the zero-copy interface - size_t num_bytes_total = (vrt_header_offset_words32+if_packet_info.num_packet_words32)*sizeof(boost::uint32_t); - state.managed_buffs[i]->commit(num_bytes_total); - } - state.next_packet_seq++; //increment sequence after commits - return num_samps; - } - - /******************************************************************* - * Send vrt packets and copy convert the samples into the buffer. - ******************************************************************/ - static UHD_INLINE size_t send( - send_state &state, - const uhd::device::send_buffs_type &buffs, - const size_t total_num_samps, - const uhd::tx_metadata_t &metadata, - uhd::device::send_mode_t send_mode, - const uhd::io_type_t &io_type, - const uhd::otw_type_t &otw_type, - double tick_rate, - const vrt_packer_t &vrt_packer, - const get_send_buffs_t &get_send_buffs, - size_t max_samples_per_packet, - size_t vrt_header_offset_words32 = 0, - size_t chans_per_otw_buff = 1 - ){ - state.io_buffs.resize(chans_per_otw_buff); - - uhd::convert::function_type converter( - uhd::convert::get_converter_cpu_to_otw( - io_type, otw_type, chans_per_otw_buff, 1 - )); - - //translate the metadata to vrt if packet info - uhd::transport::vrt::if_packet_info_t if_packet_info; - if_packet_info.has_sid = false; - if_packet_info.has_cid = false; - if_packet_info.has_tlr = false; - if_packet_info.tsi = boost::uint32_t(metadata.time_spec.get_full_secs()); - if_packet_info.tsf = boost::uint64_t(metadata.time_spec.get_tick_count(tick_rate)); - - if (total_num_samps <= max_samples_per_packet) send_mode = uhd::device::SEND_MODE_ONE_PACKET; - switch(send_mode){ - - //////////////////////////////////////////////////////////////// - case uhd::device::SEND_MODE_ONE_PACKET:{ - //////////////////////////////////////////////////////////////// - - //fill in parts of the packet info overwrote in full buff mode - if_packet_info.has_tsi = metadata.has_time_spec; - if_packet_info.has_tsf = metadata.has_time_spec; - if_packet_info.sob = metadata.start_of_burst; - if_packet_info.eob = metadata.end_of_burst; - - return _send1( - state, - //TODO remove this code when sample counts of zero are supported by hardware - (total_num_samps)?buffs : state.zero_buffs, 0, - std::max<size_t>(1, std::min(total_num_samps, max_samples_per_packet)), - if_packet_info, - converter, - vrt_packer, - get_send_buffs, - vrt_header_offset_words32, - chans_per_otw_buff - ); - } - - //////////////////////////////////////////////////////////////// - case uhd::device::SEND_MODE_FULL_BUFF:{ - //////////////////////////////////////////////////////////////// - size_t total_num_samps_sent = 0; - - //loop through the following fragment indexes - while(total_num_samps_sent < total_num_samps){ - - //calculate per-loop-iteration variables - const size_t total_num_samps_unsent = total_num_samps - total_num_samps_sent; - const bool first_fragment = (total_num_samps_sent == 0); - const bool final_fragment = (total_num_samps_unsent <= max_samples_per_packet); - - //calculate new flags for the fragments - if_packet_info.has_tsi = metadata.has_time_spec and first_fragment; - if_packet_info.has_tsf = if_packet_info.has_tsi; - if_packet_info.sob = metadata.start_of_burst and first_fragment; - if_packet_info.eob = metadata.end_of_burst and final_fragment; - - //send the fragment with the helper function - const size_t num_samps_sent = _send1( - state, - buffs, total_num_samps_sent*io_type.size, - std::min(total_num_samps_unsent, max_samples_per_packet), - if_packet_info, - converter, - vrt_packer, - get_send_buffs, - vrt_header_offset_words32, - chans_per_otw_buff - ); - total_num_samps_sent += num_samps_sent; - if (num_samps_sent == 0) return total_num_samps_sent; - } - return total_num_samps_sent; - } - - default: throw std::runtime_error("unknown send mode"); - }//switch(send_mode) - } - -} //namespace vrt_packet_handler - -#endif /* INCLUDED_LIBUHD_TRANSPORT_VRT_PACKET_HANDLER_HPP */ |