diff options
| author | Josh Blum <josh@joshknows.com> | 2011-06-12 19:54:09 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2011-06-14 17:27:46 -0700 | 
| commit | 397521fcdbe88d4d797bf9a5bcd2cdb097003ab3 (patch) | |
| tree | 7abca0107f1a013d250578b26e600bdf563673f4 | |
| parent | c65a1f8e40960d5016788de2cc4775c9e603293a (diff) | |
| download | uhd-397521fcdbe88d4d797bf9a5bcd2cdb097003ab3.tar.gz uhd-397521fcdbe88d4d797bf9a5bcd2cdb097003ab3.tar.bz2 uhd-397521fcdbe88d4d797bf9a5bcd2cdb097003ab3.zip | |
usrp2: super packet handler support squashed
| -rw-r--r-- | host/lib/usrp/usrp2/dsp_impl.cpp | 8 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 286 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 1 | 
3 files changed, 77 insertions, 218 deletions
| diff --git a/host/lib/usrp/usrp2/dsp_impl.cpp b/host/lib/usrp/usrp2/dsp_impl.cpp index 292659f36..03cdeae42 100644 --- a/host/lib/usrp/usrp2/dsp_impl.cpp +++ b/host/lib/usrp/usrp2/dsp_impl.cpp @@ -118,12 +118,6 @@ void usrp2_mboard_impl::issue_ddc_stream_cmd(const stream_cmd_t &stream_cmd, siz      _iface->poke32(U2_REG_RX_CTRL_TIME_TICKS(which_dsp), stream_cmd.time_spec.get_tick_count(get_master_clock_freq()));  } -void usrp2_mboard_impl::handle_overflow(size_t which_dsp){ -    if (_dsp_impl->continuous_streaming[which_dsp]){ //re-issue the stream command if already continuous -        this->issue_ddc_stream_cmd(stream_cmd_t::STREAM_MODE_START_CONTINUOUS, which_dsp); -    } -} -  /***********************************************************************   * DDC Properties   **********************************************************************/ @@ -186,6 +180,7 @@ void usrp2_mboard_impl::ddc_set(const wax::obj &key_, const wax::obj &val, size_                  dsp_type1::calc_iq_scale_word(default_rx_scale_iq, default_rx_scale_iq)              );          } +        _device.update_xport_channel_mapping(); //rate changed -> update          return;      default: UHD_THROW_PROP_SET_ERROR(); @@ -259,6 +254,7 @@ void usrp2_mboard_impl::duc_set(const wax::obj &key_, const wax::obj &val, size_              //set the scaling              _iface->poke32(U2_REG_DSP_TX_SCALE_IQ, dsp_type1::calc_iq_scale_word(_dsp_impl->duc_interp[which_dsp]));          } +        _device.update_xport_channel_mapping(); //rate changed -> update          return;      default: UHD_THROW_PROP_SET_ERROR(); diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 33f249599..ffe9a88e7 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -15,18 +15,21 @@  // along with this program.  If not, see <http://www.gnu.org/licenses/>.  // -#include "../../transport/vrt_packet_handler.hpp" +#include "../../transport/super_recv_packet_handler.hpp" +#include "../../transport/super_send_packet_handler.hpp"  #include "usrp2_impl.hpp"  #include "usrp2_regs.hpp"  #include <uhd/utils/log.hpp>  #include <uhd/utils/msg.hpp>  #include <uhd/exception.hpp>  #include <uhd/usrp/mboard_props.hpp> +#include <uhd/usrp/dsp_props.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/transport/bounded_buffer.hpp>  #include <boost/format.hpp>  #include <boost/bind.hpp> +#include <boost/thread/mutex.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/barrier.hpp>  #include <iostream> @@ -80,14 +83,21 @@ public:      }      /*! +     * Gets the current sequence number to go out. +     * Increments the sequence for the next call +     * \return the sequence to be sent to the dsp +     */ +    UHD_INLINE seq_type get_curr_seq_out(void){ +        return _last_seq_out++; +    } + +    /*!       * Check the flow control condition. -     * \param seq the sequence to go out       * \param timeout the timeout in seconds       * \return false on timeout       */ -    UHD_INLINE bool check_fc_condition(seq_type seq, double timeout){ -        boost::unique_lock<boost::mutex> lock(_fc_mutex); -        _last_seq_out = seq; +    UHD_INLINE bool check_fc_condition(double timeout){ +        boost::mutex::scoped_lock lock(_fc_mutex);          if (this->ready()) return true;          boost::this_thread::disable_interruption di; //disable because the wait can throw          return _fc_cond.timed_wait(lock, to_time_dur(timeout), _ready_fcn); @@ -98,7 +108,7 @@ public:       * \param seq the last sequence number to be ACK'd       */      UHD_INLINE void update_fc_condition(seq_type seq){ -        boost::unique_lock<boost::mutex> lock(_fc_mutex); +        boost::mutex::scoped_lock lock(_fc_mutex);          _last_seq_ack = seq;          lock.unlock();          _fc_cond.notify_one(); @@ -116,23 +126,6 @@ private:  };  /*********************************************************************** - * Alignment indexes class: keeps track of indexes - **********************************************************************/ -class alignment_indexes{ -public: -    alignment_indexes(void){_indexes = 0;} -    void reset(size_t len){_indexes = (1 << len) - 1;} -    size_t front(void){ //TODO replace with look-up table -        size_t index = 0; -        while ((_indexes & (1 << index)) == 0) index++; -        return index; -    } -    void remove(size_t index){_indexes &= ~(1 << index);} -    bool empty(void){return _indexes == 0;} -private: size_t _indexes; -}; - -/***********************************************************************   * io impl details (internal to this file)   * - pirate crew   * - alignment buffer @@ -143,24 +136,13 @@ struct usrp2_impl::io_impl{      io_impl(std::vector<zero_copy_if::sptr> &dsp_xports):          dsp_xports(dsp_xports), //the assumption is that all data transports should be identical -        get_recv_buffs_fcn(boost::bind(&usrp2_impl::io_impl::get_recv_buffs, this, _1)), -        get_send_buffs_fcn(boost::bind(&usrp2_impl::io_impl::get_send_buffs, this, _1)),          async_msg_fifo(100/*messages deep*/)      {          for (size_t i = 0; i < dsp_xports.size(); i++){              fc_mons.push_back(flow_control_monitor::sptr(new flow_control_monitor(                  usrp2_impl::sram_bytes/dsp_xports.front()->get_send_frame_size() -            )));; +            )));          } - -        //init empty packet infos -        vrt::if_packet_info_t packet_info = vrt::if_packet_info_t(); -        packet_info.packet_count = 0xf; -        packet_info.has_tsi = true; -        packet_info.tsi = 0; -        packet_info.has_tsf = true; -        packet_info.tsf = 0; -        prev_infos.resize(dsp_xports.size(), packet_info);      }      ~io_impl(void){ @@ -169,38 +151,27 @@ struct usrp2_impl::io_impl{          recv_pirate_crew.join_all();      } -    bool get_send_buffs(vrt_packet_handler::managed_send_buffs_t &buffs){ -        UHD_ASSERT_THROW(send_map.size() == buffs.size()); +    managed_send_buffer::sptr get_send_buff(size_t chan, double timeout){ +        const size_t index = send_map[chan]; +        flow_control_monitor &fc_mon = *fc_mons[index]; -        //calculate the flow control word -        const boost::uint32_t fc_word32 = packet_handler_send_state.next_packet_seq; +        //wait on flow control w/ timeout +        if (not fc_mon.check_fc_condition(timeout)) return managed_send_buffer::sptr(); -        //grab a managed buffer for each index -        for (size_t i = 0; i < buffs.size(); i++){ -            if (not fc_mons[send_map[i]]->check_fc_condition(fc_word32, send_timeout)) return false; -            buffs[i] = dsp_xports[send_map[i]]->get_send_buff(send_timeout); -            if (not buffs[i].get()) return false; -            buffs[i]->cast<boost::uint32_t *>()[0] = uhd::htonx(fc_word32); -        } -        return true; -    } +        //get a buffer from the transport w/ timeout +        managed_send_buffer::sptr buff = dsp_xports[index]->get_send_buff(timeout); + +        //write the flow control word into the buffer +        if (buff.get()) buff->cast<boost::uint32_t *>()[0] = uhd::htonx(fc_mon.get_curr_seq_out()); -    alignment_indexes indexes_to_do; //used in alignment logic -    time_spec_t expected_time; //used in alignment logic -    bool get_recv_buffs(vrt_packet_handler::managed_recv_buffs_t &buffs); +        return buff; +    }      std::vector<zero_copy_if::sptr> &dsp_xports;      //mappings from channel index to dsp xport      std::vector<size_t> send_map, recv_map; -    //timeouts set on calls to recv/send (passed into get buffs methods) -    double recv_timeout, send_timeout; - -    //bound callbacks for get buffs (bound once here, not in fast-path) -    vrt_packet_handler::get_recv_buffs_t get_recv_buffs_fcn; -    vrt_packet_handler::get_send_buffs_t get_send_buffs_fcn; -      //previous state for each buffer      std::vector<vrt::if_packet_info_t> prev_infos; @@ -208,8 +179,8 @@ struct usrp2_impl::io_impl{      std::vector<flow_control_monitor::sptr> fc_mons;      //state management for the vrt packet handler code -    vrt_packet_handler::recv_state packet_handler_recv_state; -    vrt_packet_handler::send_state packet_handler_send_state; +    sph::recv_packet_handler recv_handler; +    sph::send_packet_handler send_handler;      //methods and variables for the pirate crew      void recv_pirate_loop(boost::barrier &, usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t); @@ -258,7 +229,7 @@ void usrp2_impl::io_impl::recv_pirate_loop(                  metadata.time_spec = time_spec_t(                      time_t(if_packet_info.tsi), size_t(if_packet_info.tsf), mboard->get_master_clock_freq()                  ); -                metadata.event_code = vrt_packet_handler::get_context_code<async_metadata_t::event_code_t>(vrt_hdr, if_packet_info); +                metadata.event_code = async_metadata_t::event_code_t(sph::get_context_code(vrt_hdr, if_packet_info));                  //catch the flow control packets and react                  if (metadata.event_code == 0){ @@ -327,8 +298,39 @@ void usrp2_impl::update_xport_channel_mapping(void){      } -    _io_impl->packet_handler_recv_state = vrt_packet_handler::recv_state(_io_impl->recv_map.size()); -    _io_impl->packet_handler_send_state = vrt_packet_handler::send_state(_io_impl->send_map.size()); +    //set all of the relevant properties on the handler +    boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); +    _io_impl->recv_handler.resize(_io_impl->recv_map.size()); +    _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); +    _io_impl->recv_handler.set_tick_rate(_mboards.front()->get_master_clock_freq()); +    //TODO temporarily use the first dsp rate until we support non-homo rates +    const std::string rx_dsp_name = _mboards.at(0)->get_link()[MBOARD_PROP_RX_DSP_NAMES].as<prop_names_t>().at(0); +    const double rx_host_rate = _mboards.at(0)->get_link()[named_prop_t(MBOARD_PROP_RX_DSP, rx_dsp_name)][DSP_PROP_HOST_RATE].as<double>(); +    _io_impl->recv_handler.set_samp_rate(rx_host_rate); +    for (size_t chan = 0; chan < _io_impl->recv_handler.size(); chan++){ +        _io_impl->recv_handler.set_xport_chan_get_buff(chan, boost::bind( +            &uhd::transport::zero_copy_if::get_recv_buff, +            _io_impl->dsp_xports[_io_impl->recv_map[chan]], _1 +        )); +    } +    _io_impl->recv_handler.set_converter(_rx_otw_type); + +    //set all of the relevant properties on the handler +    boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); +    _io_impl->send_handler.resize(_io_impl->send_map.size()); +    _io_impl->send_handler.set_vrt_packer(&vrt::if_hdr_pack_be, vrt_send_header_offset_words32); +    _io_impl->send_handler.set_tick_rate(_mboards.front()->get_master_clock_freq()); +    //TODO temporarily use the first dsp rate until we support non-homo rates +    const std::string tx_dsp_name = _mboards.at(0)->get_link()[MBOARD_PROP_TX_DSP_NAMES].as<prop_names_t>().at(0); +    const double tx_host_rate = _mboards.at(0)->get_link()[named_prop_t(MBOARD_PROP_TX_DSP, tx_dsp_name)][DSP_PROP_HOST_RATE].as<double>(); +    _io_impl->send_handler.set_samp_rate(tx_host_rate); +    for (size_t chan = 0; chan < _io_impl->send_handler.size(); chan++){ +        _io_impl->send_handler.set_xport_chan_get_buff(chan, boost::bind( +            &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), chan, _1 +        )); +    } +    _io_impl->send_handler.set_converter(_tx_otw_type); +    _io_impl->send_handler.set_max_samples_per_packet(get_max_send_samps_per_packet());  }  /*********************************************************************** @@ -355,143 +357,17 @@ size_t usrp2_impl::get_max_send_samps_per_packet(void) const{  }  size_t usrp2_impl::send( -    const send_buffs_type &buffs, size_t num_samps, +    const send_buffs_type &buffs, size_t nsamps_per_buff,      const tx_metadata_t &metadata, const io_type_t &io_type,      send_mode_t send_mode, double timeout  ){ -    _io_impl->send_timeout = timeout; -    return vrt_packet_handler::send( -        _io_impl->packet_handler_send_state,       //last state of the send handler -        buffs, num_samps,                          //buffer to fill -        metadata, send_mode,                       //samples metadata -        io_type, _tx_otw_type,                     //input and output types to convert -        _mboards.front()->get_master_clock_freq(), //master clock tick rate -        uhd::transport::vrt::if_hdr_pack_be, -        _io_impl->get_send_buffs_fcn, -        get_max_send_samps_per_packet(), -        vrt_send_header_offset_words32 -    ); -} - -/*********************************************************************** - * Alignment logic on receive - **********************************************************************/ -static UHD_INLINE time_spec_t extract_time_spec( -    const vrt::if_packet_info_t &packet_info -){ -    return time_spec_t( //assumes has_tsi and has_tsf are true -        time_t(packet_info.tsi), size_t(packet_info.tsf), -        100e6 //tick rate does not have to be correct for comparison purposes +    return _io_impl->send_handler.send( +        buffs, nsamps_per_buff, +        metadata, io_type, +        send_mode, timeout      );  } -static UHD_INLINE void extract_packet_info( -    managed_recv_buffer::sptr &buff, -    vrt::if_packet_info_t &prev_info, -    time_spec_t &time, bool &clear, bool &msg -){ -    //extract packet info -    vrt::if_packet_info_t next_info; -    next_info.num_packet_words32 = buff->size()/sizeof(boost::uint32_t); -    vrt::if_hdr_unpack_be(buff->cast<const boost::uint32_t *>(), next_info); - -    //handle the packet count / sequence number -    if ((prev_info.packet_count+1)%16 != next_info.packet_count){ -        UHD_MSG(fastpath) << "O"; //report overflow (drops in the kernel) -    } - -    time = extract_time_spec(next_info); -    clear = extract_time_spec(prev_info) > time; -    msg = next_info.packet_type != vrt::if_packet_info_t::PACKET_TYPE_DATA; -    prev_info = next_info; -} - -static UHD_INLINE bool handle_msg_packet( -    vrt_packet_handler::managed_recv_buffs_t &buffs, size_t index -){ -    for (size_t i = 0; i < buffs.size(); i++){ -        if (i == index) continue; -        buffs[i].reset(); //set NULL -    } -    return true; -} - -UHD_INLINE bool usrp2_impl::io_impl::get_recv_buffs( -    vrt_packet_handler::managed_recv_buffs_t &buffs -){ -    if (buffs.size() == 1){ -        buffs[0] = dsp_xports[recv_map[0]]->get_recv_buff(recv_timeout); -        if (buffs[0].get() == NULL) return false; -        bool clear, msg; time_spec_t time; //unused variables -        //call extract_packet_info to handle printing the overflows -        extract_packet_info(buffs[0], this->prev_infos[recv_map[0]], time, clear, msg); -        return true; -    } -    //-------------------- begin alignment logic ---------------------// -    UHD_ASSERT_THROW(recv_map.size() == buffs.size()); -    boost::system_time exit_time = boost::get_system_time() + to_time_dur(recv_timeout); -    managed_recv_buffer::sptr buff_tmp; -    bool clear, msg; -    size_t index; - -    //If we did not enter this routine with an empty indexes set, -    //jump to after the clear so we can preserve the previous state. -    //This saves buffers from being lost when using non-blocking recv. -    if (not indexes_to_do.empty()) goto skip_pop_initial; - -    //respond to a clear by starting from scratch -    got_clear: -    indexes_to_do.reset(buffs.size()); -    clear = false; - -    //do an initial pop to load an initial sequence id -    index = indexes_to_do.front(); -    buff_tmp = dsp_xports[recv_map[index]]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); -    if (buff_tmp.get() == NULL) return false; -    extract_packet_info(buff_tmp, this->prev_infos[recv_map[index]], expected_time, clear, msg); -    if (clear) goto got_clear; -    buffs[index] = buff_tmp; -    if (msg) return handle_msg_packet(buffs, index); -    indexes_to_do.remove(index); -    skip_pop_initial: - -    //get an aligned set of elements from the buffers: -    while(not indexes_to_do.empty()){ - -        //pop an element off for this index -        index = indexes_to_do.front(); -        buff_tmp = dsp_xports[recv_map[index]]->get_recv_buff(from_time_dur(exit_time - boost::get_system_time())); -        if (buff_tmp.get() == NULL) return false; -        time_spec_t this_time; -        extract_packet_info(buff_tmp, this->prev_infos[recv_map[index]], this_time, clear, msg); -        if (clear) goto got_clear; -        buffs[index] = buff_tmp; -        if (msg) return handle_msg_packet(buffs, index); - -        //if the sequence id matches: -        //  remove this index from the list and continue -        if (this_time == expected_time){ -            indexes_to_do.remove(index); -        } - -        //if the sequence id is newer: -        //  use the new expected time for comparison -        //  add all other indexes back into the list -        else if (this_time > expected_time){ -            expected_time = this_time; -            indexes_to_do.reset(buffs.size()); -            indexes_to_do.remove(index); -        } - -        //if the sequence id is older: -        //  continue with the same index to try again -        //else if (this_time < expected_time)... - -    } -    return true; -    //-------------------- end alignment logic -----------------------// -} -  /***********************************************************************   * Receive Data   **********************************************************************/ @@ -505,26 +381,14 @@ size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{      return bpp/_rx_otw_type.get_sample_size();  } -void usrp2_impl::handle_overflow(size_t chan){ -    UHD_MSG(fastpath) << "O"; -    ldiv_t indexes = ldiv(chan, usrp2_mboard_impl::NUM_RX_DSPS); -    _mboards.at(indexes.quot)->handle_overflow(indexes.rem); -} -  size_t usrp2_impl::recv( -    const recv_buffs_type &buffs, size_t num_samps, +    const recv_buffs_type &buffs, size_t nsamps_per_buff,      rx_metadata_t &metadata, const io_type_t &io_type,      recv_mode_t recv_mode, double timeout  ){ -    _io_impl->recv_timeout = timeout; -    return vrt_packet_handler::recv( -        _io_impl->packet_handler_recv_state,       //last state of the recv handler -        buffs, num_samps,                          //buffer to fill -        metadata, recv_mode,                       //samples metadata -        io_type, _rx_otw_type,                     //input and output types to convert -        _mboards.front()->get_master_clock_freq(), //master clock tick rate -        uhd::transport::vrt::if_hdr_unpack_be, -        _io_impl->get_recv_buffs_fcn, -        boost::bind(&usrp2_impl::handle_overflow, this, _1) +    return _io_impl->recv_handler.recv( +        buffs, nsamps_per_buff, +        metadata, io_type, +        recv_mode, timeout      );  } diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index ccaf0c9a8..4d19863b1 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -220,7 +220,6 @@ private:      uhd::otw_type_t _rx_otw_type, _tx_otw_type;      UHD_PIMPL_DECL(io_impl) _io_impl;      void io_init(void); -    void handle_overflow(size_t);  };  #endif /* INCLUDED_USRP2_IMPL_HPP */ | 
