aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2011-10-04 18:08:47 -0700
committerJosh Blum <josh@joshknows.com>2011-11-03 20:37:10 -0700
commit66c95c12a1d2f7b79bdf5b9b871d1b957c56606f (patch)
tree120f3106559b856c7ab64579663b962f5827f6f1
parentabed04b3c6d70c260d8725831b8aa6e944f52749 (diff)
downloaduhd-66c95c12a1d2f7b79bdf5b9b871d1b957c56606f.tar.gz
uhd-66c95c12a1d2f7b79bdf5b9b871d1b957c56606f.tar.bz2
uhd-66c95c12a1d2f7b79bdf5b9b871d1b957c56606f.zip
uhd: lots of work releated to streamer work and usrp2 implementation
-rw-r--r--host/include/uhd/device.hpp8
-rw-r--r--host/include/uhd/device_deprecated.ipp46
-rw-r--r--host/include/uhd/streamer.hpp81
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp27
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp27
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp270
-rw-r--r--host/lib/usrp/usrp2/usrp2_impl.cpp20
-rw-r--r--host/lib/usrp/usrp2/usrp2_impl.hpp27
8 files changed, 302 insertions, 204 deletions
diff --git a/host/include/uhd/device.hpp b/host/include/uhd/device.hpp
index f76739907..c96139858 100644
--- a/host/include/uhd/device.hpp
+++ b/host/include/uhd/device.hpp
@@ -76,11 +76,11 @@ public:
*/
static sptr make(const device_addr_t &hint, size_t which = 0);
- //! Make a new receive streamer given the list of channels
- virtual rx_streamer::sptr get_rx_streamer(const std::vector<size_t> &channels) = 0;
+ //! Make a new receive streamer from the streamer arguments
+ virtual rx_streamer::sptr get_rx_streamer(const streamer_args &args) = 0;
- //! Make a new transmit streamer given the list of channels
- virtual tx_streamer::sptr get_tx_streamer(const std::vector<size_t> &channels) = 0;
+ //! Make a new transmit streamer from the streamer arguments
+ virtual tx_streamer::sptr get_tx_streamer(const streamer_args &args) = 0;
/*!
* Receive and asynchronous message from the device.
diff --git a/host/include/uhd/device_deprecated.ipp b/host/include/uhd/device_deprecated.ipp
index ba0edb1bc..4647a050e 100644
--- a/host/include/uhd/device_deprecated.ipp
+++ b/host/include/uhd/device_deprecated.ipp
@@ -77,16 +77,16 @@ size_t send(
send_mode_t send_mode,
double timeout = 0.1
){
- if (_tx_streamer.get() == NULL or _tx_streamer->get_num_channels() != buffs.size()){
+ if (_tx_streamer.get() == NULL or _tx_streamer->get_num_channels() != buffs.size() or _send_tid != io_type.tid){
+ _send_tid = io_type.tid;
std::vector<size_t> chans(buffs.size());
for (size_t ch = 0; ch < chans.size(); ch++) chans[ch] = ch;
_tx_streamer.reset(); //cleanup possible old one
- _tx_streamer = get_tx_streamer(chans);
- _send_tid = io_type_t::CUSTOM_TYPE;
- }
- if (io_type.tid != _send_tid){
- _send_tid = io_type.tid;
- _tx_streamer->set_format((_send_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16", "sc16");
+ streamer_args args;
+ args.cpu_format = (_send_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16";
+ args.otw_format = "sc16";
+ args.channels = chans;
+ _tx_streamer = get_tx_streamer(args);
}
const size_t nsamps = (send_mode == SEND_MODE_ONE_PACKET)?
std::min(nsamps_per_buff, get_max_send_samps_per_packet()) :
@@ -133,16 +133,16 @@ size_t recv(
recv_mode_t recv_mode,
double timeout = 0.1
){
- if (_rx_streamer.get() == NULL or _rx_streamer->get_num_channels() != buffs.size()){
+ if (_rx_streamer.get() == NULL or _rx_streamer->get_num_channels() != buffs.size() or _recv_tid != io_type.tid){
+ _recv_tid = io_type.tid;
std::vector<size_t> chans(buffs.size());
for (size_t ch = 0; ch < chans.size(); ch++) chans[ch] = ch;
_rx_streamer.reset(); //cleanup possible old one
- _rx_streamer = get_rx_streamer(chans);
- _recv_tid = io_type_t::CUSTOM_TYPE;
- }
- if (io_type.tid != _recv_tid){
- _recv_tid = io_type.tid;
- _rx_streamer->set_format((_recv_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16", "sc16");
+ streamer_args args;
+ args.cpu_format = (_send_tid == io_type_t::COMPLEX_FLOAT32)? "fc32" : "sc16";
+ args.otw_format = "sc16";
+ args.channels = chans;
+ _rx_streamer = get_rx_streamer(args);
}
const size_t nsamps = (recv_mode == RECV_MODE_ONE_PACKET)?
std::min(nsamps_per_buff, get_max_recv_samps_per_packet()) :
@@ -156,10 +156,13 @@ size_t recv(
*/
size_t get_max_send_samps_per_packet(void){
if (_tx_streamer.get() == NULL){
- std::vector<size_t> chans(1, 0);
- _tx_streamer = get_tx_streamer(chans);
+ streamer_args args;
+ args.cpu_format = "fc32";
+ args.otw_format = "sc16";
+ _tx_streamer = get_tx_streamer(args);
+ _send_tid = io_type_t::COMPLEX_FLOAT32;
}
- return _tx_streamer->get_items_per_packet();
+ return _tx_streamer->get_max_num_samps();
}
/*!
@@ -168,10 +171,13 @@ size_t get_max_send_samps_per_packet(void){
*/
size_t get_max_recv_samps_per_packet(void){
if (_rx_streamer.get() == NULL){
- std::vector<size_t> chans(1, 0);
- _rx_streamer = get_rx_streamer(chans);
+ streamer_args args;
+ args.cpu_format = "fc32";
+ args.otw_format = "sc16";
+ _rx_streamer = get_rx_streamer(args);
+ _recv_tid = io_type_t::COMPLEX_FLOAT32;
}
- return _rx_streamer->get_items_per_packet();
+ return _rx_streamer->get_max_num_samps();
}
private:
diff --git a/host/include/uhd/streamer.hpp b/host/include/uhd/streamer.hpp
index d96eeb2b7..7a80d0f36 100644
--- a/host/include/uhd/streamer.hpp
+++ b/host/include/uhd/streamer.hpp
@@ -23,23 +23,30 @@
#include <uhd/types/ref_vector.hpp>
#include <boost/utility.hpp>
#include <boost/shared_ptr.hpp>
+#include <vector>
#include <string>
namespace uhd{
/*!
- * A streamer is the host interface to RX or TX samples.
- * It represents the layer between the samples on the host
- * and samples inside the device's DSP processing.
+ * A struct of parameters to construct a streamer.
+ *
+ * Note:
+ * Not all combinations of CPU and OTW format have conversion support.
+ * You may however write and register your own conversion routines.
*/
-class UHD_API streamer : boost::noncopyable{
-public:
- //! Get the number of channels associated with this streamer
- virtual size_t get_num_channels(void) const = 0;
+struct UHD_API streamer_args{
+
+ //! Convenience constructor for streamer args
+ streamer_args(
+ const std::string &cpu = "fc32",
+ const std::string &otw = "sc16"
+ ){
+ cpu_format = cpu;
+ otw_format = otw;
+ }
/*!
- * \brief Set the format for all channels in this streamer.
- *
* The CPU format is a string that describes the format of host memory.
* Common CPU formats are:
* - fc32 - complex<float>
@@ -50,43 +57,47 @@ public:
* - f64 - double
* - s16 - int16_t
* - s8 - int8_t
- *
+ */
+ std::string cpu_format;
+
+ /*!
* The OTW format is a string that describes the format over-the-wire.
* Common OTW format are:
* - sc16 - Q16 I16
* - sc8 - Q8_1 I8_1 Q8_0 I8_0
* - s16 - R16_1 R16_0
* - s8 - R8_3 R8_2 R8_1 R8_0
- *
+ */
+ std::string otw_format;
+
+ /*!
* The args parameter is currently unused. Leave it blank.
* The intention is that a user with a custom DSP design
* may want to pass args and do something special with it.
- *
- * Note:
- * Not all combinations of CPU and OTW format have conversion support.
- * You may however write and register your own conversion routines.
- *
- * \param cpu_format the data format for samples used on the host
- * \param otw_format the data format of the samples over the wire
- * \param args optional arguments to augment the format
*/
- virtual void set_format(
- const std::string &cpu_format,
- const std::string &otw_format,
- const std::string &args = ""
- ) = 0;
-
- //! Get the number of bytes per CPU item/sample
- virtual size_t get_bytes_per_cpu_item(void) const = 0;
-
- //! Get the number of bytes per OTW item/sample
- virtual size_t get_bytes_per_otw_item(void) const = 0;
+ std::string args;
- //! Get the max number of items/samples per packet
- virtual size_t get_items_per_packet(void) const = 0;
+ /*!
+ * The channels is a list of channel numbers.
+ * Leave this blank to default to channel 0.
+ * Set channels for a multi-channel application.
+ * Channel mapping depends on the front-end selection.
+ */
+ std::vector<size_t> channels;
+};
- //TODO enumerate cpu and otw format options
+/*!
+ * A streamer is the host interface to RX or TX samples.
+ * It represents the layer between the samples on the host
+ * and samples inside the device's DSP processing.
+ */
+class UHD_API streamer : boost::noncopyable{
+public:
+ //! Get the number of channels associated with this streamer
+ virtual size_t get_num_channels(void) const = 0;
+ //! Get the max number of samples per buffer per packet
+ virtual size_t get_max_num_samps(void) const = 0;
};
//! A receive streamer to receive host samples
@@ -124,7 +135,7 @@ public:
*/
virtual size_t recv(
const buffs_type &buffs,
- size_t nsamps_per_buff,
+ const size_t nsamps_per_buff,
rx_metadata_t &metadata,
double timeout = 0.1
) = 0;
@@ -161,7 +172,7 @@ public:
*/
virtual size_t send(
const buffs_type &buffs,
- size_t nsamps_per_buff,
+ const size_t nsamps_per_buff,
const tx_metadata_t &metadata,
double timeout = 0.1
) = 0;
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 4ae51e146..f0b053473 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -562,6 +562,33 @@ private:
}
};
+class recv_packet_streamer : public recv_packet_handler, public rx_streamer{
+public:
+ recv_packet_streamer(const size_t max_num_samps){
+ _max_num_samps = max_num_samps;
+ }
+
+ size_t get_num_channels(void) const{
+ return this->size();
+ }
+
+ size_t get_max_num_samps(void) const{
+ return _max_num_samps;
+ }
+
+ size_t recv(
+ const rx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ uhd::rx_metadata_t &metadata,
+ double timeout
+ ){
+ return recv_packet_handler::recv(buffs, nsamps_per_buff, metadata, timeout);
+ }
+
+private:
+ size_t _max_num_samps;
+};
+
}}} //namespace
#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_RECV_PACKET_HANDLER_HPP */
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 6950abb73..edfe9bd13 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -259,6 +259,33 @@ private:
}
};
+class send_packet_streamer : public send_packet_handler, public tx_streamer{
+public:
+ send_packet_streamer(const size_t max_num_samps){
+ _max_num_samps = max_num_samps;
+ }
+
+ size_t get_num_channels(void) const{
+ return this->size();
+ }
+
+ size_t get_max_num_samps(void) const{
+ return _max_num_samps;
+ }
+
+ size_t send(
+ const tx_streamer::buffs_type &buffs,
+ const size_t nsamps_per_buff,
+ const uhd::tx_metadata_t &metadata,
+ double timeout
+ ){
+ return send_packet_handler::send(buffs, nsamps_per_buff, metadata, timeout);
+ }
+
+private:
+ size_t _max_num_samps;
+};
+
}}} //namespace
#endif /* INCLUDED_LIBUHD_TRANSPORT_SUPER_SEND_PACKET_HANDLER_HPP */
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index 70331e536..98bfcdd3b 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -31,6 +31,7 @@
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
+#include <boost/make_shared.hpp>
#include <iostream>
using namespace uhd;
@@ -158,10 +159,6 @@ struct usrp2_impl::io_impl{
std::vector<zero_copy_if::sptr> tx_xports;
std::vector<flow_control_monitor::sptr> fc_mons;
- //state management for the vrt packet handler code
- sph::recv_packet_handler recv_handler;
- sph::send_packet_handler send_handler;
-
//methods and variables for the pirate crew
void recv_pirate_loop(zero_copy_if::sptr, size_t);
std::list<task::sptr> pirate_tasks;
@@ -237,17 +234,6 @@ void usrp2_impl::io_impl::recv_pirate_loop(
* Helper Functions
**********************************************************************/
void usrp2_impl::io_init(void){
-
- //setup rx otw type
- _rx_otw_type.width = 16;
- _rx_otw_type.shift = 0;
- _rx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
-
- //setup tx otw type
- _tx_otw_type.width = 16;
- _tx_otw_type.shift = 0;
- _tx_otw_type.byteorder = uhd::otw_type_t::BO_BIG_ENDIAN;
-
//create new io impl
_io_impl = UHD_PIMPL_MAKE(io_impl, ());
@@ -260,6 +246,12 @@ void usrp2_impl::io_init(void){
)));
}
+ //allocate streamer weak ptrs containers
+ BOOST_FOREACH(const std::string &mb, _mbc.keys()){
+ _mbc[mb].rx_streamers.resize(_mbc[mb].rx_dsps.size());
+ _mbc[mb].tx_streamers.resize(1/*known to be 1 dsp*/);
+ }
+
//create a new pirate thread for each zc if (yarr!!)
size_t index = 0;
BOOST_FOREACH(const std::string &mb, _mbc.keys()){
@@ -269,58 +261,68 @@ void usrp2_impl::io_init(void){
_mbc[mb].tx_dsp_xport, index++
)));
}
-
- //init some handler stuff
- _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be);
- _io_impl->recv_handler.set_converter(_rx_otw_type);
- _io_impl->send_handler.set_vrt_packer(&vrt::if_hdr_pack_be, vrt_send_header_offset_words32);
- _io_impl->send_handler.set_converter(_tx_otw_type);
- _io_impl->send_handler.set_max_samples_per_packet(get_max_send_samps_per_packet());
-
- //set the packet threshold to be an entire socket buffer's worth
- const size_t packets_per_sock_buff = size_t(50e6/_mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size());
- _io_impl->recv_handler.set_alignment_failure_threshold(packets_per_sock_buff);
}
void usrp2_impl::update_tick_rate(const double rate){
- _io_impl->tick_rate = rate;
- boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock();
- _io_impl->recv_handler.set_tick_rate(rate);
- boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock();
- _io_impl->send_handler.set_tick_rate(rate);
+ _io_impl->tick_rate = rate; //shadow for async msg
+
+ //update the tick rate on all existing streamers -> thread safe
+ BOOST_FOREACH(const std::string &mb, _mbc.keys()){
+ for (size_t i = 0; i < _mbc[mb].rx_streamers.size(); i++){
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].rx_streamers[i].lock());
+ if (my_streamer.get() == NULL) continue;
+ boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock();
+ my_streamer->set_tick_rate(rate);
+ }
+ for (size_t i = 0; i < _mbc[mb].tx_streamers.size(); i++){
+ boost::shared_ptr<sph::send_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::send_packet_streamer>(_mbc[mb].tx_streamers[i].lock());
+ if (my_streamer.get() == NULL) continue;
+ boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock();
+ my_streamer->set_tick_rate(rate);
+ }
+ }
}
-void usrp2_impl::update_rx_samp_rate(const double rate){
- boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock();
- _io_impl->recv_handler.set_samp_rate(rate);
- const double adj = _mbc[_mbc.keys().front()].rx_dsps.front()->get_scaling_adjustment();
- _io_impl->recv_handler.set_scale_factor(adj/32767.);
+void usrp2_impl::update_rx_samp_rate(const std::string &mb, const size_t dsp, const double rate){
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].rx_streamers[dsp].lock());
+ if (my_streamer.get() == NULL) return;
+
+ boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock();
+
+ my_streamer->set_samp_rate(rate);
+ const double adj = _mbc[mb].rx_dsps[dsp]->get_scaling_adjustment();
+ my_streamer->set_scale_factor(adj/32767.);
}
-void usrp2_impl::update_tx_samp_rate(const double rate){
- boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock();
- _io_impl->send_handler.set_samp_rate(rate);
+void usrp2_impl::update_tx_samp_rate(const std::string &mb, const size_t dsp, const double rate){
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer =
+ boost::dynamic_pointer_cast<sph::recv_packet_streamer>(_mbc[mb].tx_streamers[dsp].lock());
+ if (my_streamer.get() == NULL) return;
+
+ boost::mutex::scoped_lock lock = my_streamer->get_scoped_lock();
+
+ my_streamer->set_samp_rate(rate);
}
-static subdev_spec_t replace_zero_in_spec(const std::string &type, const subdev_spec_t &spec){
- subdev_spec_t new_spec;
- BOOST_FOREACH(const subdev_spec_pair_t &pair, spec){
- if (pair.db_name == "0"){
- UHD_MSG(warning)
- << boost::format("In the %s subdevice specification: %s") % type % spec.to_string() << std::endl
- << "Accepting dboard slot name \"0\" for backward compatibility." << std::endl
- << "The official name of the dboard slot on USRP2/N-Series is \"A\"." << std::endl
- ;
- new_spec.push_back(subdev_spec_pair_t("A", pair.sd_name));
+void usrp2_impl::update_rates(void){
+ BOOST_FOREACH(const std::string &mb, _mbc.keys()){
+ fs_path root = "/mboards/" + mb;
+ _tree->access<double>(root / "tick_rate").update();
+
+ //and now that the tick rate is set, init the host rates to something
+ BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")){
+ _tree->access<double>(root / "rx_dsps" / name / "rate" / "value").update();
+ }
+ BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")){
+ _tree->access<double>(root / "tx_dsps" / name / "rate" / "value").update();
}
- else new_spec.push_back(pair);
}
- return new_spec;
}
-subdev_spec_t usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec_){
- const subdev_spec_t spec = replace_zero_in_spec("RX", spec_);
- boost::mutex::scoped_lock recv_lock = _io_impl->recv_handler.get_scoped_lock();
+void usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec){
fs_path root = "/mboards/" + which_mb + "/dboards";
//sanity checking
@@ -339,24 +341,9 @@ subdev_spec_t usrp2_impl::update_rx_subdev_spec(const std::string &which_mb, con
_mbc[which_mb].rx_chan_occ = spec.size();
size_t nchan = 0;
BOOST_FOREACH(const std::string &mb, _mbc.keys()) nchan += _mbc[mb].rx_chan_occ;
- _io_impl->recv_handler.resize(nchan);
-
- //bind new callbacks for the handler
- size_t chan = 0;
- BOOST_FOREACH(const std::string &mb, _mbc.keys()){
- for (size_t dsp = 0; dsp < _mbc[mb].rx_chan_occ; dsp++){
- _mbc[mb].rx_dsps[dsp]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this
- _io_impl->recv_handler.set_xport_chan_get_buff(chan++, boost::bind(
- &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1
- ));
- }
- }
- return spec;
}
-subdev_spec_t usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec_){
- const subdev_spec_t spec = replace_zero_in_spec("TX", spec_);
- boost::mutex::scoped_lock send_lock = _io_impl->send_handler.get_scoped_lock();
+void usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, const subdev_spec_t &spec){
fs_path root = "/mboards/" + which_mb + "/dboards";
//sanity checking
@@ -370,18 +357,6 @@ subdev_spec_t usrp2_impl::update_tx_subdev_spec(const std::string &which_mb, con
_mbc[which_mb].tx_chan_occ = spec.size();
size_t nchan = 0;
BOOST_FOREACH(const std::string &mb, _mbc.keys()) nchan += _mbc[mb].tx_chan_occ;
- _io_impl->send_handler.resize(nchan);
-
- //bind new callbacks for the handler
- size_t chan = 0, i = 0;
- BOOST_FOREACH(const std::string &mb, _mbc.keys()){
- for (size_t dsp = 0; dsp < _mbc[mb].tx_chan_occ; dsp++){
- _io_impl->send_handler.set_xport_chan_get_buff(chan++, boost::bind(
- &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), i++, _1
- ));
- }
- }
- return spec;
}
/***********************************************************************
@@ -395,51 +370,118 @@ bool usrp2_impl::recv_async_msg(
}
/***********************************************************************
- * Send Data
+ * Receive streamer
**********************************************************************/
-size_t usrp2_impl::get_max_send_samps_per_packet(void) const{
+rx_streamer::sptr usrp2_impl::get_rx_streamer(const uhd::streamer_args &args){
+ //map an empty channel set to chan0
+ const std::vector<size_t> channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels;
+
+ //calculate packet size
static const size_t hdr_size = 0
+ vrt::max_if_hdr_words32*sizeof(boost::uint32_t)
- + vrt_send_header_offset_words32*sizeof(boost::uint32_t)
+ + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
- sizeof(vrt::if_packet_info_t().cid) //no class id ever used
;
- const size_t bpp = _mbc[_mbc.keys().front()].tx_dsp_xport->get_send_frame_size() - hdr_size;
- return bpp/_tx_otw_type.get_sample_size();
-}
+ const size_t bpp = _mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size() - hdr_size;
+ const size_t spp = bpp/convert::get_bytes_per_item(args.otw_format);
+
+ //make the new streamer given the samples per packet
+ boost::shared_ptr<sph::recv_packet_streamer> my_streamer = boost::make_shared<sph::recv_packet_streamer>(spp);
+
+ //init some streamer stuff
+ my_streamer->resize(channels.size());
+ my_streamer->set_vrt_unpacker(&vrt::if_hdr_unpack_be);
+
+ //set the converter
+ uhd::convert::id_type id;
+ id.input_markup = args.otw_format + "_item32_be";
+ id.num_inputs = 1;
+ id.output_markup = args.cpu_format;
+ id.num_outputs = 1;
+ id.args = args.args;
+ my_streamer->set_converter(id);
+
+ //bind callbacks for the handler
+ for (size_t chan_i = 0; chan_i < channels.size(); chan_i++){
+ const size_t chan = channels[chan_i];
+ size_t num_chan_so_far = 0;
+ BOOST_FOREACH(const std::string &mb, _mbc.keys()){
+ num_chan_so_far += _mbc[mb].rx_chan_occ;
+ if (chan < num_chan_so_far){
+ const size_t dsp = num_chan_so_far - chan - 1;
+ _mbc[mb].rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this
+ my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
+ &zero_copy_if::get_recv_buff, _mbc[mb].rx_dsp_xports[dsp], _1
+ ));
+ _mbc[mb].rx_streamers[dsp] = my_streamer; //store weak pointer
+ break;
+ }
+ }
+ }
-size_t usrp2_impl::send(
- const send_buffs_type &buffs, size_t nsamps_per_buff,
- const tx_metadata_t &metadata, const io_type_t &io_type,
- send_mode_t send_mode, double timeout
-){
- return _io_impl->send_handler.send(
- buffs, nsamps_per_buff,
- metadata, io_type,
- send_mode, timeout
- );
+ //set the packet threshold to be an entire socket buffer's worth
+ const size_t packets_per_sock_buff = size_t(50e6/_mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size());
+ my_streamer->set_alignment_failure_threshold(packets_per_sock_buff);
+
+ //sets all tick and samp rates on this streamer
+ this->update_rates();
+
+ return my_streamer;
}
/***********************************************************************
- * Receive Data
+ * Transmit streamer
**********************************************************************/
-size_t usrp2_impl::get_max_recv_samps_per_packet(void) const{
+tx_streamer::sptr usrp2_impl::get_tx_streamer(const uhd::streamer_args &args){
+ //map an empty channel set to chan0
+ const std::vector<size_t> channels = args.channels.empty()? std::vector<size_t>(1, 0) : args.channels;
+
+ //calculate packet size
static const size_t hdr_size = 0
+ vrt::max_if_hdr_words32*sizeof(boost::uint32_t)
- + sizeof(vrt::if_packet_info_t().tlr) //forced to have trailer
+ + vrt_send_header_offset_words32*sizeof(boost::uint32_t)
- sizeof(vrt::if_packet_info_t().cid) //no class id ever used
;
- const size_t bpp = _mbc[_mbc.keys().front()].rx_dsp_xports[0]->get_recv_frame_size() - hdr_size;
- return bpp/_rx_otw_type.get_sample_size();
-}
+ const size_t bpp = _mbc[_mbc.keys().front()].tx_dsp_xport->get_send_frame_size() - hdr_size;
+ const size_t spp = bpp/convert::get_bytes_per_item(args.otw_format);
+
+ //make the new streamer given the samples per packet
+ boost::shared_ptr<sph::send_packet_streamer> my_streamer = boost::make_shared<sph::send_packet_streamer>(spp);
+
+ //init some streamer stuff
+ my_streamer->resize(channels.size());
+ my_streamer->set_vrt_packer(&vrt::if_hdr_pack_be);
+
+ //set the converter
+ uhd::convert::id_type id;
+ id.input_markup = args.cpu_format;
+ id.num_inputs = 1;
+ id.output_markup = args.otw_format + "_item32_be";
+ id.num_outputs = 1;
+ id.args = args.args;
+ my_streamer->set_converter(id);
+
+ //bind callbacks for the handler
+ for (size_t chan_i = 0; chan_i < channels.size(); chan_i++){
+ const size_t chan = channels[chan_i];
+ size_t num_chan_so_far = 0;
+ size_t abs = 0;
+ BOOST_FOREACH(const std::string &mb, _mbc.keys()){
+ num_chan_so_far += _mbc[mb].tx_chan_occ;
+ if (chan < num_chan_so_far){
+ const size_t dsp = num_chan_so_far - chan - 1;
+ my_streamer->set_xport_chan_get_buff(chan_i, boost::bind(
+ &usrp2_impl::io_impl::get_send_buff, _io_impl.get(), abs, _1
+ ));
+ _mbc[mb].tx_streamers[dsp] = my_streamer; //store weak pointer
+ break;
+ }
+ abs += 1; //assume 1 tx dsp
+ }
+ }
-size_t usrp2_impl::recv(
- const recv_buffs_type &buffs, size_t nsamps_per_buff,
- rx_metadata_t &metadata, const io_type_t &io_type,
- recv_mode_t recv_mode, double timeout
-){
- return _io_impl->recv_handler.recv(
- buffs, nsamps_per_buff,
- metadata, io_type,
- recv_mode, timeout
- );
+ //sets all tick and samp rates on this streamer
+ this->update_rates();
+
+ return my_streamer;
}
diff --git a/host/lib/usrp/usrp2/usrp2_impl.cpp b/host/lib/usrp/usrp2/usrp2_impl.cpp
index 2b541bcf0..2d89ddaf4 100644
--- a/host/lib/usrp/usrp2/usrp2_impl.cpp
+++ b/host/lib/usrp/usrp2/usrp2_impl.cpp
@@ -458,9 +458,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){
);
//TODO lots of properties to expose here for frontends
_tree->create<subdev_spec_t>(mb_path / "rx_subdev_spec")
- .coerce(boost::bind(&usrp2_impl::update_rx_subdev_spec, this, mb, _1));
+ .subscribe(boost::bind(&usrp2_impl::update_rx_subdev_spec, this, mb, _1));
_tree->create<subdev_spec_t>(mb_path / "tx_subdev_spec")
- .coerce(boost::bind(&usrp2_impl::update_tx_subdev_spec, this, mb, _1));
+ .subscribe(boost::bind(&usrp2_impl::update_tx_subdev_spec, this, mb, _1));
////////////////////////////////////////////////////////////////
// create rx dsp control objects
@@ -481,8 +481,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){
_mbc[mb].rx_dsp_xports[dspno]->get_recv_buff(0.01).get(); //recv with timeout for expected
fs_path rx_dsp_path = mb_path / str(boost::format("rx_dsps/%u") % dspno);
_tree->create<double>(rx_dsp_path / "rate/value")
+ .set(1e6) //some default
.coerce(boost::bind(&rx_dsp_core_200::set_host_rate, _mbc[mb].rx_dsps[dspno], _1))
- .subscribe(boost::bind(&usrp2_impl::update_rx_samp_rate, this, _1));
+ .subscribe(boost::bind(&usrp2_impl::update_rx_samp_rate, this, mb, dspno, _1));
_tree->create<double>(rx_dsp_path / "freq/value")
.coerce(boost::bind(&rx_dsp_core_200::set_freq, _mbc[mb].rx_dsps[dspno], _1));
_tree->create<meta_range_t>(rx_dsp_path / "freq/range")
@@ -501,8 +502,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){
_tree->access<double>(mb_path / "tick_rate")
.subscribe(boost::bind(&tx_dsp_core_200::set_tick_rate, _mbc[mb].tx_dsp, _1));
_tree->create<double>(mb_path / "tx_dsps/0/rate/value")
+ .set(1e6) //some default
.coerce(boost::bind(&tx_dsp_core_200::set_host_rate, _mbc[mb].tx_dsp, _1))
- .subscribe(boost::bind(&usrp2_impl::update_tx_samp_rate, this, _1));
+ .subscribe(boost::bind(&usrp2_impl::update_tx_samp_rate, this, mb, 0, _1));
_tree->create<double>(mb_path / "tx_dsps/0/freq/value")
.coerce(boost::bind(&usrp2_impl::set_tx_dsp_freq, this, mb, _1));
_tree->create<meta_range_t>(mb_path / "tx_dsps/0/freq/range")
@@ -594,17 +596,9 @@ usrp2_impl::usrp2_impl(const device_addr_t &_device_addr){
this->io_init();
//do some post-init tasks
+ this->update_rates();
BOOST_FOREACH(const std::string &mb, _mbc.keys()){
fs_path root = "/mboards/" + mb;
- _tree->access<double>(root / "tick_rate").update();
-
- //and now that the tick rate is set, init the host rates to something
- BOOST_FOREACH(const std::string &name, _tree->list(root / "rx_dsps")){
- _tree->access<double>(root / "rx_dsps" / name / "rate" / "value").set(1e6);
- }
- BOOST_FOREACH(const std::string &name, _tree->list(root / "tx_dsps")){
- _tree->access<double>(root / "tx_dsps" / name / "rate" / "value").set(1e6);
- }
_tree->access<subdev_spec_t>(root / "rx_subdev_spec").set(subdev_spec_t("A:"+_mbc[mb].dboard_manager->get_rx_subdev_names()[0]));
_tree->access<subdev_spec_t>(root / "tx_subdev_spec").set(subdev_spec_t("A:"+_mbc[mb].dboard_manager->get_tx_subdev_names()[0]));
diff --git a/host/lib/usrp/usrp2/usrp2_impl.hpp b/host/lib/usrp/usrp2/usrp2_impl.hpp
index 6f133f411..566c93853 100644
--- a/host/lib/usrp/usrp2/usrp2_impl.hpp
+++ b/host/lib/usrp/usrp2/usrp2_impl.hpp
@@ -31,7 +31,6 @@
#include <uhd/device.hpp>
#include <uhd/utils/pimpl.hpp>
#include <uhd/types/dict.hpp>
-#include <uhd/types/otw_type.hpp>
#include <uhd/types/stream_cmd.hpp>
#include <uhd/types/clock_config.hpp>
#include <uhd/usrp/dboard_eeprom.hpp>
@@ -73,18 +72,8 @@ public:
~usrp2_impl(void);
//the io interface
- size_t send(
- const send_buffs_type &, size_t,
- const uhd::tx_metadata_t &, const uhd::io_type_t &,
- uhd::device::send_mode_t, double
- );
- size_t recv(
- const recv_buffs_type &, size_t,
- uhd::rx_metadata_t &, const uhd::io_type_t &,
- uhd::device::recv_mode_t, double
- );
- size_t get_max_send_samps_per_packet(void) const;
- size_t get_max_recv_samps_per_packet(void) const;
+ uhd::rx_streamer::sptr get_rx_streamer(const uhd::streamer_args &args);
+ uhd::tx_streamer::sptr get_tx_streamer(const uhd::streamer_args &args);
bool recv_async_msg(uhd::async_metadata_t &, double);
private:
@@ -97,6 +86,8 @@ private:
rx_frontend_core_200::sptr rx_fe;
tx_frontend_core_200::sptr tx_fe;
std::vector<rx_dsp_core_200::sptr> rx_dsps;
+ std::vector<boost::weak_ptr<uhd::streamer> > rx_streamers;
+ std::vector<boost::weak_ptr<uhd::streamer> > tx_streamers;
tx_dsp_core_200::sptr tx_dsp;
time64_core_200::sptr time64;
std::vector<uhd::transport::zero_copy_if::sptr> rx_dsp_xports;
@@ -120,15 +111,15 @@ private:
}
//io impl methods and members
- uhd::otw_type_t _rx_otw_type, _tx_otw_type;
UHD_PIMPL_DECL(io_impl) _io_impl;
void io_init(void);
void update_tick_rate(const double rate);
- void update_rx_samp_rate(const double rate);
- void update_tx_samp_rate(const double rate);
+ void update_rx_samp_rate(const std::string &, const size_t, const double rate);
+ void update_tx_samp_rate(const std::string &, const size_t, const double rate);
+ void update_rates(void);
//update spec methods are coercers until we only accept db_name == A
- uhd::usrp::subdev_spec_t update_rx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &);
- uhd::usrp::subdev_spec_t update_tx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &);
+ void update_rx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &);
+ void update_tx_subdev_spec(const std::string &, const uhd::usrp::subdev_spec_t &);
double set_tx_dsp_freq(const std::string &, const double);
uhd::meta_range_t get_tx_dsp_freq_range(const std::string &);
void update_clock_source(const std::string &, const std::string &);