From c464a63e87e32bea2b4c430ff29b0b5e0829893a Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Mon, 15 Jul 2013 15:44:42 -0700 Subject: uhd: added new calls to streamer object + support work * The transmit streamer gives access to the async msg queue. * The receive streamer gives access to the issue stream cmd. * Supporting usrp implementation files updated. * Example applications updated to use this API. --- host/lib/transport/gen_vrt_if_packet.py | 140 ++++++++++++++++++++--- host/lib/transport/super_recv_packet_handler.hpp | 50 +++++++- host/lib/transport/super_send_packet_handler.hpp | 41 ++++++- 3 files changed, 212 insertions(+), 19 deletions(-) (limited to 'host/lib/transport') diff --git a/host/lib/transport/gen_vrt_if_packet.py b/host/lib/transport/gen_vrt_if_packet.py index e28ce3aae..56e7c61bf 100644 --- a/host/lib/transport/gen_vrt_if_packet.py +++ b/host/lib/transport/gen_vrt_if_packet.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2010-2011 Ettus Research LLC +# Copyright 2010-2013 Ettus Research LLC # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by @@ -48,12 +48,14 @@ TMPL_TEXT = """ using namespace uhd; using namespace uhd::transport; +using namespace uhd::transport::vrt; typedef size_t pred_type; typedef std::vector pred_table_type; #define pred_table_index(hdr) ((hdr >> 20) & 0x1ff) -static pred_table_type get_pred_unpack_table(void){ +static pred_table_type get_pred_unpack_table(void) +{ pred_table_type table(1 << 9, 0); //only 9 bits useful here (20-28) for (size_t i = 0; i < table.size(); i++){ boost::uint32_t vrt_hdr_word = i << 20; @@ -74,13 +76,43 @@ static const pred_table_type pred_unpack_table(get_pred_unpack_table()); //maps num empty bytes to trailer bits static const size_t occ_table[] = {0, 2, 1, 3}; +const boost::uint32_t VRLP = ('V' << 24) | ('R' << 16) | ('L' << 8) | ('P' << 0); +const boost::uint32_t VEND = ('V' << 24) | ('E' << 16) | ('N' << 8) | ('D' << 0); + +UHD_INLINE static boost::uint32_t chdr_to_vrt(const boost::uint32_t chdr, size_t &packet_count) +{ + boost::uint32_t vrt = chdr & 0xffff; //words32 + packet_count = (chdr >> 16) & 0xfff; + vrt |= ((chdr >> 31) & 0x1) << 30; //context packet + vrt |= ((chdr >> 30) & 0x1) << 26; //has tlr + vrt |= ((chdr >> 29) & 0x1) << 20; //has tsf + vrt |= ((chdr >> 28) & 0x1) << 24; //has eob + vrt |= (0x1) << 28; //has sid (always) + return vrt; +} + +UHD_INLINE static boost::uint32_t vrt_to_chdr(const boost::uint32_t vrt, const size_t packet_count) +{ + boost::uint32_t chdr = vrt & 0xffff; //words32 + chdr |= (packet_count & 0xfff) << 16; + chdr |= ((vrt >> 30) & 0x1) << 31; //context packet + chdr |= ((vrt >> 26) & 0x1) << 30; //has tlr + chdr |= ((vrt >> 20) & 0x1) << 29; //has tsf + chdr |= ((vrt >> 24) & 0x1) << 28; //has eob + return chdr; +} + ######################################################################## #def gen_code($XE_MACRO, $suffix) ######################################################################## -void vrt::if_hdr_pack_$(suffix)( +/*********************************************************************** + * interal impl of packing VRT IF header only + **********************************************************************/ +UHD_INLINE void __if_hdr_pack_$(suffix)( boost::uint32_t *packet_buff, - if_packet_info_t &if_packet_info + if_packet_info_t &if_packet_info, + boost::uint32_t &vrt_hdr_word32 ){ boost::uint32_t vrt_hdr_flags = 0; @@ -154,31 +186,33 @@ void vrt::if_hdr_pack_$(suffix)( } //fill in complete header word - packet_buff[0] = $(XE_MACRO)(boost::uint32_t(0 + vrt_hdr_word32 = boost::uint32_t(0 | (if_packet_info.packet_type << 29) | vrt_hdr_flags | ((if_packet_info.packet_count & 0xf) << 16) | (if_packet_info.num_packet_words32 & 0xffff) - )); + ); } -void vrt::if_hdr_unpack_$(suffix)( +/*********************************************************************** + * interal impl of unpacking VRT IF header only + **********************************************************************/ +UHD_INLINE void __if_hdr_unpack_$(suffix)( const boost::uint32_t *packet_buff, - if_packet_info_t &if_packet_info + if_packet_info_t &if_packet_info, + const boost::uint32_t vrt_hdr_word32 ){ - //extract vrt header - boost::uint32_t vrt_hdr_word = $(XE_MACRO)(packet_buff[0]); - size_t packet_words32 = vrt_hdr_word & 0xffff; + const size_t packet_words32 = vrt_hdr_word32 & 0xffff; //failure case if (if_packet_info.num_packet_words32 < packet_words32) throw uhd::value_error("bad vrt header or packet fragment"); //extract fields from the header - if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word >> 29); - if_packet_info.packet_count = (vrt_hdr_word >> 16) & 0xf; + if_packet_info.packet_type = if_packet_info_t::packet_type_t(vrt_hdr_word32 >> 29); + if_packet_info.packet_count = (vrt_hdr_word32 >> 16) & 0xf; - const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word)]; + const pred_type pred = pred_unpack_table[pred_table_index(vrt_hdr_word32)]; size_t empty_bytes = 0; @@ -259,6 +293,84 @@ void vrt::if_hdr_unpack_$(suffix)( } } +/*********************************************************************** + * link layer + VRT IF packing + **********************************************************************/ +void vrt::if_hdr_pack_$(suffix)( + boost::uint32_t *packet_buff, + if_packet_info_t &if_packet_info +){ + boost::uint32_t vrt_hdr_word32 = 0; + switch (if_packet_info.link_type) + { + case if_packet_info_t::LINK_TYPE_NONE: + __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + packet_buff[0] = $(XE_MACRO)(vrt_hdr_word32); + break; + + case if_packet_info_t::LINK_TYPE_CHDR: + { + __if_hdr_pack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + const boost::uint32_t chdr = vrt_to_chdr(vrt_hdr_word32, if_packet_info.packet_count); + packet_buff[0] = $(XE_MACRO)(chdr); + break; + } + + case if_packet_info_t::LINK_TYPE_VRLP: + __if_hdr_pack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32); + if_packet_info.num_header_words32 += 2; + if_packet_info.num_packet_words32 += 3; + packet_buff[0] = $(XE_MACRO)(VRLP); + packet_buff[1] = $(XE_MACRO)(boost::uint32_t( + (if_packet_info.num_packet_words32 & 0xfffff) | + ((if_packet_info.packet_count & 0xfff) << 20) + )); + packet_buff[2] = $(XE_MACRO)(vrt_hdr_word32); + packet_buff[if_packet_info.num_packet_words32-1] = $(XE_MACRO)(VEND); + break; + } +} + +/*********************************************************************** + * link layer + VRT IF unpacking + **********************************************************************/ +void vrt::if_hdr_unpack_$(suffix)( + const boost::uint32_t *packet_buff, + if_packet_info_t &if_packet_info +){ + boost::uint32_t vrt_hdr_word32 = 0; + switch (if_packet_info.link_type) + { + case if_packet_info_t::LINK_TYPE_NONE: + vrt_hdr_word32 = $(XE_MACRO)(packet_buff[0]); + __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + break; + + case if_packet_info_t::LINK_TYPE_CHDR: + { + const boost::uint32_t chdr = $(XE_MACRO)(packet_buff[0]); + size_t packet_count = 0; + vrt_hdr_word32 = chdr_to_vrt(chdr, packet_count); + __if_hdr_unpack_$(suffix)(packet_buff, if_packet_info, vrt_hdr_word32); + if_packet_info.packet_count = packet_count; + break; + } + + case if_packet_info_t::LINK_TYPE_VRLP: + { + if ($(XE_MACRO)(packet_buff[0]) != VRLP) throw uhd::value_error("bad vrl header VRLP"); + const boost::uint32_t vrl_hdr = $(XE_MACRO)(packet_buff[1]); + vrt_hdr_word32 = $(XE_MACRO)(packet_buff[2]); + if (if_packet_info.num_packet_words32 < (vrl_hdr & 0xfffff)) throw uhd::value_error("bad vrl header or packet fragment"); + if ($(XE_MACRO)(packet_buff[(vrl_hdr & 0xfffff)-1]) != VEND) throw uhd::value_error("bad vrl trailer VEND"); + __if_hdr_unpack_$(suffix)(packet_buff+2, if_packet_info, vrt_hdr_word32); + if_packet_info.num_header_words32 += 2; //add vrl header + if_packet_info.packet_count = (vrl_hdr >> 20) & 0xfff; + break; + } + } +} + ######################################################################## #end def ######################################################################## diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 5a75d5f0d..75d1f3068 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -63,6 +63,8 @@ static inline void handle_overflow_nop(void){} class recv_packet_handler{ public: typedef boost::function get_buff_type; + typedef boost::function handle_flowctrl_type; + typedef boost::function issue_stream_cmd_type; typedef void(*vrt_unpacker_type)(const boost::uint32_t *, vrt::if_packet_info_t &); //typedef boost::function vrt_unpacker_type; @@ -139,6 +141,19 @@ public: _props.at(xport_chan).get_buff = get_buff; } + /*! + * Set the function to handle flow control + * \param xport_chan which transport channel + * \param handle_flowctrl the callback function + */ + void set_xport_handle_flowctrl(const size_t xport_chan, const handle_flowctrl_type &handle_flowctrl, const size_t update_window, const bool do_init = false) + { + _props.at(xport_chan).handle_flowctrl = handle_flowctrl; + //we need the window size to be within the 0xfff (max 12 bit seq) + _props.at(xport_chan).fc_update_window = std::min(update_window, 0xfff); + if (do_init) handle_flowctrl(0); + } + //! Set the conversion routine for all channels void set_converter(const uhd::convert::id_type &id){ _num_outputs = id.num_outputs; @@ -158,6 +173,21 @@ public: _converter->set_scalar(scale_factor); } + //! Set the callback to issue stream commands + void set_issue_stream_cmd(const size_t xport_chan, const issue_stream_cmd_type &issue_stream_cmd) + { + _props.at(xport_chan).issue_stream_cmd = issue_stream_cmd; + } + + //! Overload call to issue stream commands + void issue_stream_cmd(const stream_cmd_t &stream_cmd) + { + for (size_t i = 0; i < _props.size(); i++) + { + if (_props[i].issue_stream_cmd) _props[i].issue_stream_cmd(stream_cmd); + } + } + /******************************************************************* * Receive: * The entry point for the fast-path receive calls. @@ -219,8 +249,11 @@ private: handle_overflow(&handle_overflow_nop) {} get_buff_type get_buff; + issue_stream_cmd_type issue_stream_cmd; size_t packet_count; handle_overflow_type handle_overflow; + handle_flowctrl_type handle_flowctrl; + size_t fc_update_window; }; std::vector _props; size_t _num_outputs; @@ -302,6 +335,15 @@ private: info.time = time_spec_t::from_ticks(info.ifpi.tsf, _tick_rate); //assumes has_tsf is true info.copy_buff = reinterpret_cast(info.vrt_hdr + info.ifpi.num_header_words32); + //handle flow control + if (_props[index].handle_flowctrl) + { + if ((info.ifpi.packet_count % _props[index].fc_update_window/2) == 0) + { + _props[index].handle_flowctrl(info.ifpi.packet_count); + } + } + //-------------------------------------------------------------- //-- Determine return conditions: //-- The order of these checks is HOLY. @@ -314,8 +356,9 @@ private: //2) check for sequence errors #ifndef SRPH_DONT_CHECK_SEQUENCE + const size_t seq_mask = (info.ifpi.link_type == vrt::if_packet_info_t::LINK_TYPE_NONE)? 0xf : 0xfff; const size_t expected_packet_count = _props[index].packet_count; - _props[index].packet_count = (info.ifpi.packet_count + 1)%16; + _props[index].packet_count = (info.ifpi.packet_count + 1) & seq_mask; if (expected_packet_count != info.ifpi.packet_count){ return PACKET_SEQUENCE_ERROR; } @@ -622,6 +665,11 @@ public: return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout, one_packet); } + void issue_stream_cmd(const stream_cmd_t &stream_cmd) + { + return recv_packet_handler::issue_stream_cmd(stream_cmd); + } + private: size_t _max_num_samps; }; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 726742327..41f030ea6 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -47,6 +47,7 @@ namespace uhd{ namespace transport{ namespace sph{ class send_packet_handler{ public: typedef boost::function get_buff_type; + typedef boost::function async_receiver_type; typedef void(*vrt_packer_type)(boost::uint32_t *, vrt::if_packet_info_t &); //typedef boost::function vrt_packer_type; @@ -57,6 +58,7 @@ public: send_packet_handler(const size_t size = 1): _next_packet_seq(0) { + this->set_enable_trailer(true); this->resize(size); } @@ -96,6 +98,11 @@ public: _props.at(xport_chan).sid = sid; } + void set_enable_trailer(const bool enable) + { + _has_tlr = enable; + } + //! Set the rate of ticks per second void set_tick_rate(const double rate){ _tick_rate = rate; @@ -138,6 +145,21 @@ public: _converter->set_scalar(scale_factor); } + //! Set the callback to get async messages + void set_async_receiver(const async_receiver_type &async_receiver) + { + _async_receiver = async_receiver; + } + + //! Overload call to get async metadata + bool recv_async_msg( + uhd::async_metadata_t &async_metadata, double timeout = 0.1 + ){ + if (_async_receiver) return _async_receiver(async_metadata, timeout); + boost::this_thread::sleep(boost::posix_time::microseconds(long(timeout*1e6))); + return false; + } + /******************************************************************* * Send: * The entry point for the fast-path send calls. @@ -154,7 +176,7 @@ public: if_packet_info.packet_type = vrt::if_packet_info_t::PACKET_TYPE_DATA; //if_packet_info.has_sid = false; //set per channel if_packet_info.has_cid = false; - if_packet_info.has_tlr = true; + if_packet_info.has_tlr = _has_tlr; if_packet_info.has_tsi = false; if_packet_info.has_tsf = metadata.has_time_spec; if_packet_info.tsf = metadata.time_spec.to_ticks(_tick_rate); @@ -165,9 +187,12 @@ public: //TODO remove this code when sample counts of zero are supported by hardware #ifndef SSPH_DONT_PAD_TO_ONE - if (nsamps_per_buff == 0) return send_one_packet( - _zero_buffs, 1, if_packet_info, timeout - ) & 0x0; + static const boost::uint64_t zero = 0; + _zero_buffs.resize(buffs.size(), &zero); + + if (nsamps_per_buff == 0) return send_one_packet( + _zero_buffs, 1, if_packet_info, timeout + ) & 0x0; #endif return send_one_packet(buffs, nsamps_per_buff, if_packet_info, timeout); @@ -228,6 +253,8 @@ private: size_t _max_samples_per_packet; std::vector _zero_buffs; size_t _next_packet_seq; + bool _has_tlr; + async_receiver_type _async_receiver; /******************************************************************* * Send a single packet: @@ -337,6 +364,12 @@ public: return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout); } + bool recv_async_msg( + uhd::async_metadata_t &async_metadata, double timeout = 0.1 + ){ + return send_packet_handler::recv_async_msg(async_metadata, timeout); + } + private: size_t _max_num_samps; }; -- cgit v1.2.3