diff options
| -rw-r--r-- | host/include/uhd/device.hpp | 8 | ||||
| -rw-r--r-- | host/include/uhd/device_deprecated.ipp | 46 | ||||
| -rw-r--r-- | host/include/uhd/streamer.hpp | 81 | ||||
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 27 | ||||
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 27 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 270 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.cpp | 20 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 27 | 
8 files changed, 302 insertions, 204 deletions
| diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp index f76739907..c96139858 100644 --- a/host/include/uhd/device.hpp +++ b/host/include/uhd/device.hpp @@ -76,11 +76,11 @@ public:       */      static sptr make(const device_addr_t &hint, size_t which = 0); -    //! Make a new receive streamer given the list of channels -    virtual rx_streamer::sptr get_rx_streamer(const std::vector<size_t> &channels) = 0; +    //! Make a new receive streamer from the streamer arguments +    virtual rx_streamer::sptr get_rx_streamer(const streamer_args &args) = 0; -    //! Make a new transmit streamer given the list of channels -    virtual tx_streamer::sptr get_tx_streamer(const std::vector<size_t> &channels) = 0; +    //! Make a new transmit streamer from the streamer arguments +    virtual tx_streamer::sptr get_tx_streamer(const streamer_args &args) = 0;      /*!       * Receive and asynchronous message from the device. diff --git a/host/include/uhd/device_deprecated.ipp b/host/include/uhd/device_deprecated.ipp index ba0edb1bc..4647a050e 100644 --- a/host/include/uhd/device_deprecated.ipp +++ b/host/include/uhd/device_deprecated.ipp @@ -77,16 +77,16 @@ size_t send(      send_mode_t send_mode,      double timeout = 0.1  ){ -    if (_tx_streamer.get() == NULL or _tx_streamer->get_num_channels() != buffs.size()){ +    if (_tx_streamer.get() == NULL or _tx_streamer->get_num_channels() != buffs.size() or _send_tid != io_type.tid){ +        _send_tid = io_type.tid;          std::vector<size_t> chans(buffs.size());          for (size_t ch = 0; ch < chans.size(); ch++) chans[ch] = ch;          _tx_streamer.reset(); //cleanup possible old one -        _tx_streamer = get_tx_streamer(chans); -        _send_tid = io_type_t::CUSTOM_TYPE; -    } -    if (io_type.tid != _send_tid){ -        _send_tid = io_type.tid; -        _tx_streamer->set_format((_send_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16", "sc16"); +        streamer_args args; +        args.cpu_format = (_send_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16"; +        args.otw_format = "sc16"; +        args.channels = chans; +        _tx_streamer = get_tx_streamer(args);      }      const size_t nsamps = (send_mode == SEND_MODE_ONE_PACKET)?          std::min(nsamps_per_buff, get_max_send_samps_per_packet()) : @@ -133,16 +133,16 @@ size_t recv(      recv_mode_t recv_mode,      double timeout = 0.1  ){ -    if (_rx_streamer.get() == NULL or _rx_streamer->get_num_channels() != buffs.size()){ +    if (_rx_streamer.get() == NULL or _rx_streamer->get_num_channels() != buffs.size() or _recv_tid != io_type.tid){ +        _recv_tid = io_type.tid;          std::vector<size_t> chans(buffs.size());          for (size_t ch = 0; ch < chans.size(); ch++) chans[ch] = ch;          _rx_streamer.reset(); //cleanup possible old one -        _rx_streamer = get_rx_streamer(chans); -        _recv_tid = io_type_t::CUSTOM_TYPE; -    } -    if (io_type.tid != _recv_tid){ -        _recv_tid = io_type.tid; -        _rx_streamer->set_format((_recv_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16", "sc16"); +        streamer_args args; +        args.cpu_format = (_send_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16"; +        args.otw_format = "sc16"; +        args.channels = chans; +        _rx_streamer = get_rx_streamer(args);      }      const size_t nsamps = (recv_mode == RECV_MODE_ONE_PACKET)?          std::min(nsamps_per_buff, get_max_recv_samps_per_packet()) : @@ -156,10 +156,13 @@ size_t recv(   */  size_t get_max_send_samps_per_packet(void){      if (_tx_streamer.get() == NULL){ -        std::vector<size_t> chans(1, 0); -        _tx_streamer = get_tx_streamer(chans); +        streamer_args args; +        args.cpu_format = "fc32"; +        args.otw_format = "sc16"; +        _tx_streamer = get_tx_streamer(args); +        _send_tid = io_type_t::COMPLEX_FLOAT32;      } -    return _tx_streamer->get_items_per_packet(); +    return _tx_streamer->get_max_num_samps();  }  /*! @@ -168,10 +171,13 @@ size_t get_max_send_samps_per_packet(void){   */  size_t get_max_recv_samps_per_packet(void){      if (_rx_streamer.get() == NULL){ -        std::vector<size_t> chans(1, 0); -        _rx_streamer = get_rx_streamer(chans); +        streamer_args args; +        args.cpu_format = "fc32"; +        args.otw_format = "sc16"; +        _rx_streamer = get_rx_streamer(args); +        _recv_tid = io_type_t::COMPLEX_FLOAT32;      } -    return _rx_streamer->get_items_per_packet(); +    return _rx_streamer->get_max_num_samps();  }  private: diff --git a/host/include/uhd/streamer.hpp b/host/include/uhd/streamer.hpp index d96eeb2b7..7a80d0f36 100644 --- a/host/include/uhd/streamer.hpp +++ b/host/include/uhd/streamer.hpp @@ -23,23 +23,30 @@  #include <uhd/types/ref_vector.hpp>  #include <boost/utility.hpp>  #include <boost/shared_ptr.hpp> +#include <vector>  #include <string>  namespace uhd{  /*! - * A streamer is the host interface to RX or TX samples. - * It represents the layer between the samples on the host - * and samples inside the device's DSP processing. + * A struct of parameters to construct a streamer. + * + * Note: + * Not all combinations of CPU and OTW format have conversion support. + * You may however write and register your own conversion routines.   */ -class UHD_API streamer : boost::noncopyable{ -public: -    //! Get the number of channels associated with this streamer -    virtual size_t get_num_channels(void) const = 0; +struct UHD_API streamer_args{ + +    //! Convenience constructor for streamer args +    streamer_args( +        const std::string &cpu = "fc32", +        const std::string &otw = "sc16" +    ){ +        cpu_format = cpu; +        otw_format = otw; +    }      /*! -     * \brief Set the format for all channels in this streamer. -     *       * The CPU format is a string that describes the format of host memory.       * Common CPU formats are:       *  - fc32 - complex<float> @@ -50,43 +57,47 @@ public:       *  - f64 - double       *  - s16 - int16_t       *  - s8 - int8_t -     * +     */ +    std::string cpu_format; + +    /*!       * The OTW format is a string that describes the format over-the-wire.       * Common OTW format are:       *  - sc16 - Q16 I16       *  - sc8 - Q8_1 I8_1 Q8_0 I8_0       *  - s16 - R16_1 R16_0       *  - s8 - R8_3 R8_2 R8_1 R8_0 -     * +     */ +    std::string otw_format; + +    /*!       * The args parameter is currently unused. Leave it blank.       * The intention is that a user with a custom DSP design       * may want to pass args and do something special with it. -     * -     * Note: -     * Not all combinations of CPU and OTW format have conversion support. -     * You may however write and register your own conversion routines. -     * -     * \param cpu_format the data format for samples used on the host -     * \param otw_format the data format of the samples over the wire -     * \param args optional arguments to augment the format       */ -    virtual void set_format( -        const std::string &cpu_format, -        const std::string &otw_format, -        const std::string &args = "" -    ) = 0; - -    //! Get the number of bytes per CPU item/sample -    virtual size_t get_bytes_per_cpu_item(void) const = 0; - -    //! Get the number of bytes per OTW item/sample -    virtual size_t get_bytes_per_otw_item(void) const = 0; +    std::string args; -    //! Get the max number of items/samples per packet -    virtual size_t get_items_per_packet(void) const = 0; +    /*! +     * The channels is a list of channel numbers. +     * Leave this blank to default to channel 0. +     * Set channels for a multi-channel application. +     * Channel mapping depends on the front-end selection. +     */ +    std::vector<size_t> channels; +}; -    //TODO enumerate cpu and otw format options +/*! + * A streamer is the host interface to RX or TX samples. + * It represents the layer between the samples on the host + * and samples inside the device's DSP processing. + */ +class UHD_API streamer : boost::noncopyable{ +public: +    //! Get the number of channels associated with this streamer +    virtual size_t get_num_channels(void) const = 0; +    //! Get the max number of samples per buffer per packet +    virtual size_t get_max_num_samps(void) const = 0;  };  //! A receive streamer to receive host samples @@ -124,7 +135,7 @@ public:       */      virtual size_t recv(          const buffs_type &buffs, -        size_t nsamps_per_buff, +        const size_t nsamps_per_buff,          rx_metadata_t &metadata,          double timeout = 0.1      ) = 0; @@ -161,7 +172,7 @@ public:       */      virtual size_t send(          const buffs_type &buffs, -        size_t nsamps_per_buff, +        const size_t nsamps_per_buff,          const tx_metadata_t &metadata,          double timeout = 0.1      ) = 0; diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 4ae51e146..f0b053473 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -562,6 +562,33 @@ private:      }  }; +class recv_packet_streamer : public recv_packet_handler, public rx_streamer{ +public: +    recv_packet_streamer(const size_t max_num_samps){ +        _max_num_samps = max_num_samps; +    } + +    size_t get_num_channels(void) const{ +        return this->size(); +    } + +    size_t get_max_num_samps(void) const{ +        return _max_num_samps; +    } + +    size_t recv( +        const rx_streamer::buffs_type &buffs, +        const size_t nsamps_per_buff, +        uhd::rx_metadata_t &metadata, +        double timeout +    ){ +        return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout); +    } + +private: +    size_t _max_num_samps; +}; +  }}} //namespace  #endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */ diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 6950abb73..edfe9bd13 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -259,6 +259,33 @@ private:      }  }; +class send_packet_streamer : public send_packet_handler, public tx_streamer{ +public: +    send_packet_streamer(const size_t max_num_samps){ +        _max_num_samps = max_num_samps; +    } + +    size_t get_num_channels(void) const{ +        return this->size(); +    } + +    size_t get_max_num_samps(void) const{ +        return _max_num_samps; +    } + +    size_t send( +        const tx_streamer::buffs_type &buffs, +        const size_t nsamps_per_buff, +        const uhd::tx_metadata_t &metadata, +        double timeout +    ){ +        return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout); +    } + +private: +    size_t _max_num_samps; +}; +  }}} //namespace  #endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */ diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 70331e536..98bfcdd3b 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -31,6 +31,7 @@  #include <boost/format.hpp>  #include <boost/bind.hpp>  #include <boost/thread/mutex.hpp> +#include <boost/make_shared.hpp>  #include <iostream>  using namespace uhd; @@ -158,10 +159,6 @@ struct usrp2_impl::io_impl{      std::vector<zero_copy_if::sptr> tx_xports;      std::vector<flow_control_monitor::sptr> fc_mons; -    //state management for the vrt packet handler code -    sph::recv_packet_handler recv_handler; -    sph::send_packet_handler send_handler; -      //methods and variables for the pirate crew      void recv_pirate_loop(zero_copy_if::sptr, size_t);      std::list<task::sptr> pirate_tasks; @@ -237,17 +234,6 @@ void usrp2_impl::io_impl::recv_pirate_loop(   * Helper Functions   **********************************************************************/  void usrp2_impl::io_init(void){ - -    //setup rx otw type -    _rx_otw_type.width = 16; -    _rx_otw_type.shift = 0; -    _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; - -    //setup tx otw type -    _tx_otw_type.width = 16; -    _tx_otw_type.shift = 0; -    _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; -      //create new io impl      _io_impl = UHD_PIMPL_MAKE(io_impl, ()); @@ -260,6 +246,12 @@ void usrp2_impl::io_init(void){          )));      } +    //allocate streamer weak ptrs containers +    BOOST_FOREACH(const std::string &mb, _mbc.keys()){ +        _mbc[mb].rx_streamers.resize(_mbc[mb].rx_dsps.size()); +        _mbc[mb].tx_streamers.resize(1/*known to be 1 dsp*/); +    } +      //create a new pirate thread for each zc if (yarr!!)      size_t index = 0;      BOOST_FOREACH(const std::string &mb, _mbc.keys()){ @@ -269,58 +261,68 @@ void usrp2_impl::io_init(void){              _mbc[mb].tx_dsp_xport, index++          )));      } - -    //init some handler stuff -    _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); -    _io_impl->recv_handler.set_converter(_rx_otw_type); -    _io_impl->send_handler.set_vrt_packer(&vrt::if_hdr_pack_be, vrt_send_header_offset_words32); -    _io_impl->send_handler.set_converter(_tx_otw_type); -    _io_impl->send_handler.set_max_samples_per_packet(get_max_send_samps_per_packet()); - -    //set the packet threshold to be an entire socket buffer's worth -    const size_t packets_per_sock_buff = size_t(50e6/_mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size()); -    _io_impl->recv_handler.set_alignment_failure_threshold(packets_per_sock_buff);  }  void usrp2_impl::update_tick_rate(const double rate){ -    _io_impl->tick_rate = rate; -    boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); -    _io_impl->recv_handler.set_tick_rate(rate); -    boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); -    _io_impl->send_handler.set_tick_rate(rate); +    _io_impl->tick_rate = rate; //shadow for async msg + +    //update the tick rate on all existing streamers -> thread safe +    BOOST_FOREACH(const std::string &mb, _mbc.keys()){ +        for (size_t i = 0; i < _mbc[mb].rx_streamers.size(); i++){ +            boost::shared_ptr<sph::recv_packet_streamer> my_streamer = +                boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].rx_streamers[i].lock()); +            if (my_streamer.get() == NULL) continue; +            boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); +            my_streamer->set_tick_rate(rate); +        } +        for (size_t i = 0; i < _mbc[mb].tx_streamers.size(); i++){ +            boost::shared_ptr<sph::send_packet_streamer> my_streamer = +                boost::dynamic_pointer_cast<sph::send_packet_streamer>(_mbc[mb].tx_streamers[i].lock()); +            if (my_streamer.get() == NULL) continue; +            boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); +            my_streamer->set_tick_rate(rate); +        } +    }  } -void usrp2_impl::update_rx_samp_rate(const double rate){ -    boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); -    _io_impl->recv_handler.set_samp_rate(rate); -    const double adj = _mbc[_mbc.keys().front()].rx_dsps.front()->get_scaling_adjustment(); -    _io_impl->recv_handler.set_scale_factor(adj/32767.); +void usrp2_impl::update_rx_samp_rate(const std::string &mb, const size_t dsp, const double rate){ +    boost::shared_ptr<sph::recv_packet_streamer> my_streamer = +        boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].rx_streamers[dsp].lock()); +    if (my_streamer.get() == NULL) return; + +    boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); + +    my_streamer->set_samp_rate(rate); +    const double adj = _mbc[mb].rx_dsps[dsp]->get_scaling_adjustment(); +    my_streamer->set_scale_factor(adj/32767.);  } -void usrp2_impl::update_tx_samp_rate(const double rate){ -    boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); -    _io_impl->send_handler.set_samp_rate(rate); +void usrp2_impl::update_tx_samp_rate(const std::string &mb, const size_t dsp, const double rate){ +    boost::shared_ptr<sph::recv_packet_streamer> my_streamer = +        boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].tx_streamers[dsp].lock()); +    if (my_streamer.get() == NULL) return; + +    boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); + +    my_streamer->set_samp_rate(rate);  } -static subdev_spec_t replace_zero_in_spec(const std::string &type, const subdev_spec_t &spec){ -    subdev_spec_t new_spec; -    BOOST_FOREACH(const subdev_spec_pair_t &pair, spec){ -        if (pair.db_name == "0"){ -            UHD_MSG(warning) -                << boost::format("In the %s subdevice specification: %s") % type % spec.to_string() << std::endl -                << "Accepting dboard slot name \"0\" for backward compatibility." << std::endl -                << "The official name of the dboard slot on USRP2/N-Series is \"A\"." << std::endl -            ; -            new_spec.push_back(subdev_spec_pair_t("A", pair.sd_name)); +void usrp2_impl::update_rates(void){ +    BOOST_FOREACH(const std::string &mb, _mbc.keys()){ +        fs_path root = "/mboards/" + mb; +        _tree->access<double>(root / "tick_rate").update(); + +        //and now that the tick rate is set, init the host rates to something +        BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")){ +            _tree->access<double>(root / "rx_dsps" / name / "rate" / "value").update(); +        } +        BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")){ +            _tree->access<double>(root / "tx_dsps" / name / "rate" / "value").update();          } -        else new_spec.push_back(pair);      } -    return new_spec;  } -subdev_spec_t usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec_){ -    const subdev_spec_t spec = replace_zero_in_spec("RX", spec_); -    boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); +void usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec){      fs_path root = "/mboards/" + which_mb + "/dboards";      //sanity checking @@ -339,24 +341,9 @@ subdev_spec_t usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, con      _mbc[which_mb].rx_chan_occ = spec.size();      size_t nchan = 0;      BOOST_FOREACH(const std::string &mb, _mbc.keys()) nchan += _mbc[mb].rx_chan_occ; -    _io_impl->recv_handler.resize(nchan); - -    //bind new callbacks for the handler -    size_t chan = 0; -    BOOST_FOREACH(const std::string &mb, _mbc.keys()){ -        for (size_t dsp = 0; dsp < _mbc[mb].rx_chan_occ; dsp++){ -            _mbc[mb].rx_dsps[dsp]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this -            _io_impl->recv_handler.set_xport_chan_get_buff(chan++, boost::bind( -                &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1 -            )); -        } -    } -    return spec;  } -subdev_spec_t usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec_){ -    const subdev_spec_t spec = replace_zero_in_spec("TX", spec_); -    boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); +void usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec){      fs_path root = "/mboards/" + which_mb + "/dboards";      //sanity checking @@ -370,18 +357,6 @@ subdev_spec_t usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, con      _mbc[which_mb].tx_chan_occ = spec.size();      size_t nchan = 0;      BOOST_FOREACH(const std::string &mb, _mbc.keys()) nchan += _mbc[mb].tx_chan_occ; -    _io_impl->send_handler.resize(nchan); - -    //bind new callbacks for the handler -    size_t chan = 0, i = 0; -    BOOST_FOREACH(const std::string &mb, _mbc.keys()){ -        for (size_t dsp = 0; dsp < _mbc[mb].tx_chan_occ; dsp++){ -            _io_impl->send_handler.set_xport_chan_get_buff(chan++, boost::bind( -                &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), i++, _1 -            )); -        } -    } -    return spec;  }  /*********************************************************************** @@ -395,51 +370,118 @@ bool usrp2_impl::recv_async_msg(  }  /*********************************************************************** - * Send Data + * Receive streamer   **********************************************************************/ -size_t usrp2_impl::get_max_send_samps_per_packet(void) const{ +rx_streamer::sptr usrp2_impl::get_rx_streamer(const uhd::streamer_args &args){ +    //map an empty channel set to chan0 +    const std::vector<size_t> channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels; + +    //calculate packet size      static const size_t hdr_size = 0          + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) -        + vrt_send_header_offset_words32*sizeof(boost::uint32_t) +        + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer          - sizeof(vrt::if_packet_info_t().cid) //no class id ever used      ; -    const size_t bpp = _mbc[_mbc.keys().front()].tx_dsp_xport->get_send_frame_size() - hdr_size; -    return bpp/_tx_otw_type.get_sample_size(); -} +    const size_t bpp = _mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size() - hdr_size; +    const size_t spp = bpp/convert::get_bytes_per_item(args.otw_format); + +    //make the new streamer given the samples per packet +    boost::shared_ptr<sph::recv_packet_streamer> my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); + +    //init some streamer stuff +    my_streamer->resize(channels.size()); +    my_streamer->set_vrt_unpacker(&vrt::if_hdr_unpack_be); + +    //set the converter +    uhd::convert::id_type id; +    id.input_markup = args.otw_format + "_item32_be"; +    id.num_inputs = 1; +    id.output_markup = args.cpu_format; +    id.num_outputs = 1; +    id.args = args.args; +    my_streamer->set_converter(id); + +    //bind callbacks for the handler +    for (size_t chan_i = 0; chan_i < channels.size(); chan_i++){ +        const size_t chan = channels[chan_i]; +        size_t num_chan_so_far = 0; +        BOOST_FOREACH(const std::string &mb, _mbc.keys()){ +            num_chan_so_far += _mbc[mb].rx_chan_occ; +            if (chan < num_chan_so_far){ +                const size_t dsp = num_chan_so_far - chan - 1; +                _mbc[mb].rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this +                my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( +                    &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1 +                )); +                _mbc[mb].rx_streamers[dsp] = my_streamer; //store weak pointer +                break; +            } +        } +    } -size_t usrp2_impl::send( -    const send_buffs_type &buffs, size_t nsamps_per_buff, -    const tx_metadata_t &metadata, const io_type_t &io_type, -    send_mode_t send_mode, double timeout -){ -    return _io_impl->send_handler.send( -        buffs, nsamps_per_buff, -        metadata, io_type, -        send_mode, timeout -    ); +    //set the packet threshold to be an entire socket buffer's worth +    const size_t packets_per_sock_buff = size_t(50e6/_mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size()); +    my_streamer->set_alignment_failure_threshold(packets_per_sock_buff); + +    //sets all tick and samp rates on this streamer +    this->update_rates(); + +    return my_streamer;  }  /*********************************************************************** - * Receive Data + * Transmit streamer   **********************************************************************/ -size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{ +tx_streamer::sptr usrp2_impl::get_tx_streamer(const uhd::streamer_args &args){ +    //map an empty channel set to chan0 +    const std::vector<size_t> channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels; + +    //calculate packet size      static const size_t hdr_size = 0          + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) -        + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer +        + vrt_send_header_offset_words32*sizeof(boost::uint32_t)          - sizeof(vrt::if_packet_info_t().cid) //no class id ever used      ; -    const size_t bpp = _mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size() - hdr_size; -    return bpp/_rx_otw_type.get_sample_size(); -} +    const size_t bpp = _mbc[_mbc.keys().front()].tx_dsp_xport->get_send_frame_size() - hdr_size; +    const size_t spp = bpp/convert::get_bytes_per_item(args.otw_format); + +    //make the new streamer given the samples per packet +    boost::shared_ptr<sph::send_packet_streamer> my_streamer = boost::make_shared<sph::send_packet_streamer>(spp); + +    //init some streamer stuff +    my_streamer->resize(channels.size()); +    my_streamer->set_vrt_packer(&vrt::if_hdr_pack_be); + +    //set the converter +    uhd::convert::id_type id; +    id.input_markup = args.cpu_format; +    id.num_inputs = 1; +    id.output_markup = args.otw_format + "_item32_be"; +    id.num_outputs = 1; +    id.args = args.args; +    my_streamer->set_converter(id); + +    //bind callbacks for the handler +    for (size_t chan_i = 0; chan_i < channels.size(); chan_i++){ +        const size_t chan = channels[chan_i]; +        size_t num_chan_so_far = 0; +        size_t abs = 0; +        BOOST_FOREACH(const std::string &mb, _mbc.keys()){ +            num_chan_so_far += _mbc[mb].tx_chan_occ; +            if (chan < num_chan_so_far){ +                const size_t dsp = num_chan_so_far - chan - 1; +                my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( +                    &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), abs, _1 +                )); +                _mbc[mb].tx_streamers[dsp] = my_streamer; //store weak pointer +                break; +            } +            abs += 1; //assume 1 tx dsp +        } +    } -size_t usrp2_impl::recv( -    const recv_buffs_type &buffs, size_t nsamps_per_buff, -    rx_metadata_t &metadata, const io_type_t &io_type, -    recv_mode_t recv_mode, double timeout -){ -    return _io_impl->recv_handler.recv( -        buffs, nsamps_per_buff, -        metadata, io_type, -        recv_mode, timeout -    ); +    //sets all tick and samp rates on this streamer +    this->update_rates(); + +    return my_streamer;  } diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index 2b541bcf0..2d89ddaf4 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -458,9 +458,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){          );          //TODO lots of properties to expose here for frontends          _tree->create<subdev_spec_t>(mb_path / "rx_subdev_spec") -            .coerce(boost::bind(&usrp2_impl::update_rx_subdev_spec, this, mb, _1)); +            .subscribe(boost::bind(&usrp2_impl::update_rx_subdev_spec, this, mb, _1));          _tree->create<subdev_spec_t>(mb_path / "tx_subdev_spec") -            .coerce(boost::bind(&usrp2_impl::update_tx_subdev_spec, this, mb, _1)); +            .subscribe(boost::bind(&usrp2_impl::update_tx_subdev_spec, this, mb, _1));          ////////////////////////////////////////////////////////////////          // create rx dsp control objects @@ -481,8 +481,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){              _mbc[mb].rx_dsp_xports[dspno]->get_recv_buff(0.01).get(); //recv with timeout for expected              fs_path rx_dsp_path = mb_path / str(boost::format("rx_dsps/%u") % dspno);              _tree->create<double>(rx_dsp_path / "rate/value") +                .set(1e6) //some default                  .coerce(boost::bind(&rx_dsp_core_200::set_host_rate, _mbc[mb].rx_dsps[dspno], _1)) -                .subscribe(boost::bind(&usrp2_impl::update_rx_samp_rate, this, _1)); +                .subscribe(boost::bind(&usrp2_impl::update_rx_samp_rate, this, mb, dspno, _1));              _tree->create<double>(rx_dsp_path / "freq/value")                  .coerce(boost::bind(&rx_dsp_core_200::set_freq, _mbc[mb].rx_dsps[dspno], _1));              _tree->create<meta_range_t>(rx_dsp_path / "freq/range") @@ -501,8 +502,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){          _tree->access<double>(mb_path / "tick_rate")              .subscribe(boost::bind(&tx_dsp_core_200::set_tick_rate, _mbc[mb].tx_dsp, _1));          _tree->create<double>(mb_path / "tx_dsps/0/rate/value") +            .set(1e6) //some default              .coerce(boost::bind(&tx_dsp_core_200::set_host_rate, _mbc[mb].tx_dsp, _1)) -            .subscribe(boost::bind(&usrp2_impl::update_tx_samp_rate, this, _1)); +            .subscribe(boost::bind(&usrp2_impl::update_tx_samp_rate, this, mb, 0, _1));          _tree->create<double>(mb_path / "tx_dsps/0/freq/value")              .coerce(boost::bind(&usrp2_impl::set_tx_dsp_freq, this, mb, _1));          _tree->create<meta_range_t>(mb_path / "tx_dsps/0/freq/range") @@ -594,17 +596,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){      this->io_init();      //do some post-init tasks +    this->update_rates();      BOOST_FOREACH(const std::string &mb, _mbc.keys()){          fs_path root = "/mboards/" + mb; -        _tree->access<double>(root / "tick_rate").update(); - -        //and now that the tick rate is set, init the host rates to something -        BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")){ -            _tree->access<double>(root / "rx_dsps" / name / "rate" / "value").set(1e6); -        } -        BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")){ -            _tree->access<double>(root / "tx_dsps" / name / "rate" / "value").set(1e6); -        }          _tree->access<subdev_spec_t>(root / "rx_subdev_spec").set(subdev_spec_t("A:"+_mbc[mb].dboard_manager->get_rx_subdev_names()[0]));          _tree->access<subdev_spec_t>(root / "tx_subdev_spec").set(subdev_spec_t("A:"+_mbc[mb].dboard_manager->get_tx_subdev_names()[0])); diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 6f133f411..566c93853 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -31,7 +31,6 @@  #include <uhd/device.hpp>  #include <uhd/utils/pimpl.hpp>  #include <uhd/types/dict.hpp> -#include <uhd/types/otw_type.hpp>  #include <uhd/types/stream_cmd.hpp>  #include <uhd/types/clock_config.hpp>  #include <uhd/usrp/dboard_eeprom.hpp> @@ -73,18 +72,8 @@ public:      ~usrp2_impl(void);      //the io interface -    size_t send( -        const send_buffs_type &, size_t, -        const uhd::tx_metadata_t &, const uhd::io_type_t &, -        uhd::device::send_mode_t, double -    ); -    size_t recv( -        const recv_buffs_type &, size_t, -        uhd::rx_metadata_t &, const uhd::io_type_t &, -        uhd::device::recv_mode_t, double -    ); -    size_t get_max_send_samps_per_packet(void) const; -    size_t get_max_recv_samps_per_packet(void) const; +    uhd::rx_streamer::sptr get_rx_streamer(const uhd::streamer_args &args); +    uhd::tx_streamer::sptr get_tx_streamer(const uhd::streamer_args &args);      bool recv_async_msg(uhd::async_metadata_t &, double);  private: @@ -97,6 +86,8 @@ private:          rx_frontend_core_200::sptr rx_fe;          tx_frontend_core_200::sptr tx_fe;          std::vector<rx_dsp_core_200::sptr> rx_dsps; +        std::vector<boost::weak_ptr<uhd::streamer> > rx_streamers; +        std::vector<boost::weak_ptr<uhd::streamer> > tx_streamers;          tx_dsp_core_200::sptr tx_dsp;          time64_core_200::sptr time64;          std::vector<uhd::transport::zero_copy_if::sptr> rx_dsp_xports; @@ -120,15 +111,15 @@ private:      }      //io impl methods and members -    uhd::otw_type_t _rx_otw_type, _tx_otw_type;      UHD_PIMPL_DECL(io_impl) _io_impl;      void io_init(void);      void update_tick_rate(const double rate); -    void update_rx_samp_rate(const double rate); -    void update_tx_samp_rate(const double rate); +    void update_rx_samp_rate(const std::string &, const size_t, const double rate); +    void update_tx_samp_rate(const std::string &, const size_t, const double rate); +    void update_rates(void);      //update spec methods are coercers until we only accept db_name == A -    uhd::usrp::subdev_spec_t update_rx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); -    uhd::usrp::subdev_spec_t update_tx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); +    void update_rx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); +    void update_tx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &);      double set_tx_dsp_freq(const std::string &, const double);      uhd::meta_range_t get_tx_dsp_freq_range(const std::string &);      void update_clock_source(const std::string &, const std::string &); | 
