diff options
Diffstat (limited to 'host/lib/usrp')
-rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 270 | ||||
-rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.cpp | 20 | ||||
-rw-r--r-- | host/lib/usrp/usrp2/usrp2_impl.hpp | 27 |
3 files changed, 172 insertions, 145 deletions
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 70331e536..98bfcdd3b 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -31,6 +31,7 @@ #include <boost/format.hpp> #include <boost/bind.hpp> #include <boost/thread/mutex.hpp> +#include <boost/make_shared.hpp> #include <iostream> using namespace uhd; @@ -158,10 +159,6 @@ struct usrp2_impl::io_impl{ std::vector<zero_copy_if::sptr> tx_xports; std::vector<flow_control_monitor::sptr> fc_mons; - //state management for the vrt packet handler code - sph::recv_packet_handler recv_handler; - sph::send_packet_handler send_handler; - //methods and variables for the pirate crew void recv_pirate_loop(zero_copy_if::sptr, size_t); std::list<task::sptr> pirate_tasks; @@ -237,17 +234,6 @@ void usrp2_impl::io_impl::recv_pirate_loop( * Helper Functions **********************************************************************/ void usrp2_impl::io_init(void){ - - //setup rx otw type - _rx_otw_type.width = 16; - _rx_otw_type.shift = 0; - _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; - - //setup tx otw type - _tx_otw_type.width = 16; - _tx_otw_type.shift = 0; - _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN; - //create new io impl _io_impl = UHD_PIMPL_MAKE(io_impl, ()); @@ -260,6 +246,12 @@ void usrp2_impl::io_init(void){ ))); } + //allocate streamer weak ptrs containers + BOOST_FOREACH(const std::string &mb, _mbc.keys()){ + _mbc[mb].rx_streamers.resize(_mbc[mb].rx_dsps.size()); + _mbc[mb].tx_streamers.resize(1/*known to be 1 dsp*/); + } + //create a new pirate thread for each zc if (yarr!!) size_t index = 0; BOOST_FOREACH(const std::string &mb, _mbc.keys()){ @@ -269,58 +261,68 @@ void usrp2_impl::io_init(void){ _mbc[mb].tx_dsp_xport, index++ ))); } - - //init some handler stuff - _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); - _io_impl->recv_handler.set_converter(_rx_otw_type); - _io_impl->send_handler.set_vrt_packer(&vrt::if_hdr_pack_be, vrt_send_header_offset_words32); - _io_impl->send_handler.set_converter(_tx_otw_type); - _io_impl->send_handler.set_max_samples_per_packet(get_max_send_samps_per_packet()); - - //set the packet threshold to be an entire socket buffer's worth - const size_t packets_per_sock_buff = size_t(50e6/_mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size()); - _io_impl->recv_handler.set_alignment_failure_threshold(packets_per_sock_buff); } void usrp2_impl::update_tick_rate(const double rate){ - _io_impl->tick_rate = rate; - boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); - _io_impl->recv_handler.set_tick_rate(rate); - boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); - _io_impl->send_handler.set_tick_rate(rate); + _io_impl->tick_rate = rate; //shadow for async msg + + //update the tick rate on all existing streamers -> thread safe + BOOST_FOREACH(const std::string &mb, _mbc.keys()){ + for (size_t i = 0; i < _mbc[mb].rx_streamers.size(); i++){ + boost::shared_ptr<sph::recv_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].rx_streamers[i].lock()); + if (my_streamer.get() == NULL) continue; + boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); + my_streamer->set_tick_rate(rate); + } + for (size_t i = 0; i < _mbc[mb].tx_streamers.size(); i++){ + boost::shared_ptr<sph::send_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<sph::send_packet_streamer>(_mbc[mb].tx_streamers[i].lock()); + if (my_streamer.get() == NULL) continue; + boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); + my_streamer->set_tick_rate(rate); + } + } } -void usrp2_impl::update_rx_samp_rate(const double rate){ - boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); - _io_impl->recv_handler.set_samp_rate(rate); - const double adj = _mbc[_mbc.keys().front()].rx_dsps.front()->get_scaling_adjustment(); - _io_impl->recv_handler.set_scale_factor(adj/32767.); +void usrp2_impl::update_rx_samp_rate(const std::string &mb, const size_t dsp, const double rate){ + boost::shared_ptr<sph::recv_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].rx_streamers[dsp].lock()); + if (my_streamer.get() == NULL) return; + + boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); + + my_streamer->set_samp_rate(rate); + const double adj = _mbc[mb].rx_dsps[dsp]->get_scaling_adjustment(); + my_streamer->set_scale_factor(adj/32767.); } -void usrp2_impl::update_tx_samp_rate(const double rate){ - boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); - _io_impl->send_handler.set_samp_rate(rate); +void usrp2_impl::update_tx_samp_rate(const std::string &mb, const size_t dsp, const double rate){ + boost::shared_ptr<sph::recv_packet_streamer> my_streamer = + boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].tx_streamers[dsp].lock()); + if (my_streamer.get() == NULL) return; + + boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock(); + + my_streamer->set_samp_rate(rate); } -static subdev_spec_t replace_zero_in_spec(const std::string &type, const subdev_spec_t &spec){ - subdev_spec_t new_spec; - BOOST_FOREACH(const subdev_spec_pair_t &pair, spec){ - if (pair.db_name == "0"){ - UHD_MSG(warning) - << boost::format("In the %s subdevice specification: %s") % type % spec.to_string() << std::endl - << "Accepting dboard slot name \"0\" for backward compatibility." << std::endl - << "The official name of the dboard slot on USRP2/N-Series is \"A\"." << std::endl - ; - new_spec.push_back(subdev_spec_pair_t("A", pair.sd_name)); +void usrp2_impl::update_rates(void){ + BOOST_FOREACH(const std::string &mb, _mbc.keys()){ + fs_path root = "/mboards/" + mb; + _tree->access<double>(root / "tick_rate").update(); + + //and now that the tick rate is set, init the host rates to something + BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")){ + _tree->access<double>(root / "rx_dsps" / name / "rate" / "value").update(); + } + BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")){ + _tree->access<double>(root / "tx_dsps" / name / "rate" / "value").update(); } - else new_spec.push_back(pair); } - return new_spec; } -subdev_spec_t usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec_){ - const subdev_spec_t spec = replace_zero_in_spec("RX", spec_); - boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock(); +void usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec){ fs_path root = "/mboards/" + which_mb + "/dboards"; //sanity checking @@ -339,24 +341,9 @@ subdev_spec_t usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, con _mbc[which_mb].rx_chan_occ = spec.size(); size_t nchan = 0; BOOST_FOREACH(const std::string &mb, _mbc.keys()) nchan += _mbc[mb].rx_chan_occ; - _io_impl->recv_handler.resize(nchan); - - //bind new callbacks for the handler - size_t chan = 0; - BOOST_FOREACH(const std::string &mb, _mbc.keys()){ - for (size_t dsp = 0; dsp < _mbc[mb].rx_chan_occ; dsp++){ - _mbc[mb].rx_dsps[dsp]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this - _io_impl->recv_handler.set_xport_chan_get_buff(chan++, boost::bind( - &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1 - )); - } - } - return spec; } -subdev_spec_t usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec_){ - const subdev_spec_t spec = replace_zero_in_spec("TX", spec_); - boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock(); +void usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec){ fs_path root = "/mboards/" + which_mb + "/dboards"; //sanity checking @@ -370,18 +357,6 @@ subdev_spec_t usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, con _mbc[which_mb].tx_chan_occ = spec.size(); size_t nchan = 0; BOOST_FOREACH(const std::string &mb, _mbc.keys()) nchan += _mbc[mb].tx_chan_occ; - _io_impl->send_handler.resize(nchan); - - //bind new callbacks for the handler - size_t chan = 0, i = 0; - BOOST_FOREACH(const std::string &mb, _mbc.keys()){ - for (size_t dsp = 0; dsp < _mbc[mb].tx_chan_occ; dsp++){ - _io_impl->send_handler.set_xport_chan_get_buff(chan++, boost::bind( - &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), i++, _1 - )); - } - } - return spec; } /*********************************************************************** @@ -395,51 +370,118 @@ bool usrp2_impl::recv_async_msg( } /*********************************************************************** - * Send Data + * Receive streamer **********************************************************************/ -size_t usrp2_impl::get_max_send_samps_per_packet(void) const{ +rx_streamer::sptr usrp2_impl::get_rx_streamer(const uhd::streamer_args &args){ + //map an empty channel set to chan0 + const std::vector<size_t> channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels; + + //calculate packet size static const size_t hdr_size = 0 + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) - + vrt_send_header_offset_words32*sizeof(boost::uint32_t) + + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer - sizeof(vrt::if_packet_info_t().cid) //no class id ever used ; - const size_t bpp = _mbc[_mbc.keys().front()].tx_dsp_xport->get_send_frame_size() - hdr_size; - return bpp/_tx_otw_type.get_sample_size(); -} + const size_t bpp = _mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size() - hdr_size; + const size_t spp = bpp/convert::get_bytes_per_item(args.otw_format); + + //make the new streamer given the samples per packet + boost::shared_ptr<sph::recv_packet_streamer> my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp); + + //init some streamer stuff + my_streamer->resize(channels.size()); + my_streamer->set_vrt_unpacker(&vrt::if_hdr_unpack_be); + + //set the converter + uhd::convert::id_type id; + id.input_markup = args.otw_format + "_item32_be"; + id.num_inputs = 1; + id.output_markup = args.cpu_format; + id.num_outputs = 1; + id.args = args.args; + my_streamer->set_converter(id); + + //bind callbacks for the handler + for (size_t chan_i = 0; chan_i < channels.size(); chan_i++){ + const size_t chan = channels[chan_i]; + size_t num_chan_so_far = 0; + BOOST_FOREACH(const std::string &mb, _mbc.keys()){ + num_chan_so_far += _mbc[mb].rx_chan_occ; + if (chan < num_chan_so_far){ + const size_t dsp = num_chan_so_far - chan - 1; + _mbc[mb].rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this + my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( + &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1 + )); + _mbc[mb].rx_streamers[dsp] = my_streamer; //store weak pointer + break; + } + } + } -size_t usrp2_impl::send( - const send_buffs_type &buffs, size_t nsamps_per_buff, - const tx_metadata_t &metadata, const io_type_t &io_type, - send_mode_t send_mode, double timeout -){ - return _io_impl->send_handler.send( - buffs, nsamps_per_buff, - metadata, io_type, - send_mode, timeout - ); + //set the packet threshold to be an entire socket buffer's worth + const size_t packets_per_sock_buff = size_t(50e6/_mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size()); + my_streamer->set_alignment_failure_threshold(packets_per_sock_buff); + + //sets all tick and samp rates on this streamer + this->update_rates(); + + return my_streamer; } /*********************************************************************** - * Receive Data + * Transmit streamer **********************************************************************/ -size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{ +tx_streamer::sptr usrp2_impl::get_tx_streamer(const uhd::streamer_args &args){ + //map an empty channel set to chan0 + const std::vector<size_t> channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels; + + //calculate packet size static const size_t hdr_size = 0 + vrt::max_if_hdr_words32*sizeof(boost::uint32_t) - + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer + + vrt_send_header_offset_words32*sizeof(boost::uint32_t) - sizeof(vrt::if_packet_info_t().cid) //no class id ever used ; - const size_t bpp = _mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size() - hdr_size; - return bpp/_rx_otw_type.get_sample_size(); -} + const size_t bpp = _mbc[_mbc.keys().front()].tx_dsp_xport->get_send_frame_size() - hdr_size; + const size_t spp = bpp/convert::get_bytes_per_item(args.otw_format); + + //make the new streamer given the samples per packet + boost::shared_ptr<sph::send_packet_streamer> my_streamer = boost::make_shared<sph::send_packet_streamer>(spp); + + //init some streamer stuff + my_streamer->resize(channels.size()); + my_streamer->set_vrt_packer(&vrt::if_hdr_pack_be); + + //set the converter + uhd::convert::id_type id; + id.input_markup = args.cpu_format; + id.num_inputs = 1; + id.output_markup = args.otw_format + "_item32_be"; + id.num_outputs = 1; + id.args = args.args; + my_streamer->set_converter(id); + + //bind callbacks for the handler + for (size_t chan_i = 0; chan_i < channels.size(); chan_i++){ + const size_t chan = channels[chan_i]; + size_t num_chan_so_far = 0; + size_t abs = 0; + BOOST_FOREACH(const std::string &mb, _mbc.keys()){ + num_chan_so_far += _mbc[mb].tx_chan_occ; + if (chan < num_chan_so_far){ + const size_t dsp = num_chan_so_far - chan - 1; + my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( + &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), abs, _1 + )); + _mbc[mb].tx_streamers[dsp] = my_streamer; //store weak pointer + break; + } + abs += 1; //assume 1 tx dsp + } + } -size_t usrp2_impl::recv( - const recv_buffs_type &buffs, size_t nsamps_per_buff, - rx_metadata_t &metadata, const io_type_t &io_type, - recv_mode_t recv_mode, double timeout -){ - return _io_impl->recv_handler.recv( - buffs, nsamps_per_buff, - metadata, io_type, - recv_mode, timeout - ); + //sets all tick and samp rates on this streamer + this->update_rates(); + + return my_streamer; } diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp index 2b541bcf0..2d89ddaf4 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.cpp +++ b/host/lib/usrp/usrp2/usrp2_impl.cpp @@ -458,9 +458,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){ ); //TODO lots of properties to expose here for frontends _tree->create<subdev_spec_t>(mb_path / "rx_subdev_spec") - .coerce(boost::bind(&usrp2_impl::update_rx_subdev_spec, this, mb, _1)); + .subscribe(boost::bind(&usrp2_impl::update_rx_subdev_spec, this, mb, _1)); _tree->create<subdev_spec_t>(mb_path / "tx_subdev_spec") - .coerce(boost::bind(&usrp2_impl::update_tx_subdev_spec, this, mb, _1)); + .subscribe(boost::bind(&usrp2_impl::update_tx_subdev_spec, this, mb, _1)); //////////////////////////////////////////////////////////////// // create rx dsp control objects @@ -481,8 +481,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){ _mbc[mb].rx_dsp_xports[dspno]->get_recv_buff(0.01).get(); //recv with timeout for expected fs_path rx_dsp_path = mb_path / str(boost::format("rx_dsps/%u") % dspno); _tree->create<double>(rx_dsp_path / "rate/value") + .set(1e6) //some default .coerce(boost::bind(&rx_dsp_core_200::set_host_rate, _mbc[mb].rx_dsps[dspno], _1)) - .subscribe(boost::bind(&usrp2_impl::update_rx_samp_rate, this, _1)); + .subscribe(boost::bind(&usrp2_impl::update_rx_samp_rate, this, mb, dspno, _1)); _tree->create<double>(rx_dsp_path / "freq/value") .coerce(boost::bind(&rx_dsp_core_200::set_freq, _mbc[mb].rx_dsps[dspno], _1)); _tree->create<meta_range_t>(rx_dsp_path / "freq/range") @@ -501,8 +502,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){ _tree->access<double>(mb_path / "tick_rate") .subscribe(boost::bind(&tx_dsp_core_200::set_tick_rate, _mbc[mb].tx_dsp, _1)); _tree->create<double>(mb_path / "tx_dsps/0/rate/value") + .set(1e6) //some default .coerce(boost::bind(&tx_dsp_core_200::set_host_rate, _mbc[mb].tx_dsp, _1)) - .subscribe(boost::bind(&usrp2_impl::update_tx_samp_rate, this, _1)); + .subscribe(boost::bind(&usrp2_impl::update_tx_samp_rate, this, mb, 0, _1)); _tree->create<double>(mb_path / "tx_dsps/0/freq/value") .coerce(boost::bind(&usrp2_impl::set_tx_dsp_freq, this, mb, _1)); _tree->create<meta_range_t>(mb_path / "tx_dsps/0/freq/range") @@ -594,17 +596,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){ this->io_init(); //do some post-init tasks + this->update_rates(); BOOST_FOREACH(const std::string &mb, _mbc.keys()){ fs_path root = "/mboards/" + mb; - _tree->access<double>(root / "tick_rate").update(); - - //and now that the tick rate is set, init the host rates to something - BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")){ - _tree->access<double>(root / "rx_dsps" / name / "rate" / "value").set(1e6); - } - BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")){ - _tree->access<double>(root / "tx_dsps" / name / "rate" / "value").set(1e6); - } _tree->access<subdev_spec_t>(root / "rx_subdev_spec").set(subdev_spec_t("A:"+_mbc[mb].dboard_manager->get_rx_subdev_names()[0])); _tree->access<subdev_spec_t>(root / "tx_subdev_spec").set(subdev_spec_t("A:"+_mbc[mb].dboard_manager->get_tx_subdev_names()[0])); diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp index 6f133f411..566c93853 100644 --- a/host/lib/usrp/usrp2/usrp2_impl.hpp +++ b/host/lib/usrp/usrp2/usrp2_impl.hpp @@ -31,7 +31,6 @@ #include <uhd/device.hpp> #include <uhd/utils/pimpl.hpp> #include <uhd/types/dict.hpp> -#include <uhd/types/otw_type.hpp> #include <uhd/types/stream_cmd.hpp> #include <uhd/types/clock_config.hpp> #include <uhd/usrp/dboard_eeprom.hpp> @@ -73,18 +72,8 @@ public: ~usrp2_impl(void); //the io interface - size_t send( - const send_buffs_type &, size_t, - const uhd::tx_metadata_t &, const uhd::io_type_t &, - uhd::device::send_mode_t, double - ); - size_t recv( - const recv_buffs_type &, size_t, - uhd::rx_metadata_t &, const uhd::io_type_t &, - uhd::device::recv_mode_t, double - ); - size_t get_max_send_samps_per_packet(void) const; - size_t get_max_recv_samps_per_packet(void) const; + uhd::rx_streamer::sptr get_rx_streamer(const uhd::streamer_args &args); + uhd::tx_streamer::sptr get_tx_streamer(const uhd::streamer_args &args); bool recv_async_msg(uhd::async_metadata_t &, double); private: @@ -97,6 +86,8 @@ private: rx_frontend_core_200::sptr rx_fe; tx_frontend_core_200::sptr tx_fe; std::vector<rx_dsp_core_200::sptr> rx_dsps; + std::vector<boost::weak_ptr<uhd::streamer> > rx_streamers; + std::vector<boost::weak_ptr<uhd::streamer> > tx_streamers; tx_dsp_core_200::sptr tx_dsp; time64_core_200::sptr time64; std::vector<uhd::transport::zero_copy_if::sptr> rx_dsp_xports; @@ -120,15 +111,15 @@ private: } //io impl methods and members - uhd::otw_type_t _rx_otw_type, _tx_otw_type; UHD_PIMPL_DECL(io_impl) _io_impl; void io_init(void); void update_tick_rate(const double rate); - void update_rx_samp_rate(const double rate); - void update_tx_samp_rate(const double rate); + void update_rx_samp_rate(const std::string &, const size_t, const double rate); + void update_tx_samp_rate(const std::string &, const size_t, const double rate); + void update_rates(void); //update spec methods are coercers until we only accept db_name == A - uhd::usrp::subdev_spec_t update_rx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); - uhd::usrp::subdev_spec_t update_tx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); + void update_rx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); + void update_tx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &); double set_tx_dsp_freq(const std::string &, const double); uhd::meta_range_t get_tx_dsp_freq_range(const std::string &); void update_clock_source(const std::string &, const std::string &); |