diff options
| author | Josh Blum <josh@joshknows.com> | 2011-07-01 16:54:53 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2011-07-01 16:54:53 -0700 | 
| commit | 614d4901bb9dbb6866a0c5f57b1397e4b185a11f (patch) | |
| tree | d2c0308af2464ae3b15e2079a25d8f24d46f0caa | |
| parent | 6aa4690af05559d28b5238fa153fd46ff57cf06f (diff) | |
| download | uhd-614d4901bb9dbb6866a0c5f57b1397e4b185a11f.tar.gz uhd-614d4901bb9dbb6866a0c5f57b1397e4b185a11f.tar.bz2 uhd-614d4901bb9dbb6866a0c5f57b1397e4b185a11f.zip | |
usrp: created common code to demux an rx stream (b100, e100)
| -rw-r--r-- | host/examples/rx_multi_samples.cpp | 2 | ||||
| -rw-r--r-- | host/lib/usrp/b100/io_impl.cpp | 52 | ||||
| -rw-r--r-- | host/lib/usrp/common/CMakeLists.txt | 9 | ||||
| -rw-r--r-- | host/lib/usrp/common/recv_packet_demuxer.cpp | 87 | ||||
| -rw-r--r-- | host/lib/usrp/common/recv_packet_demuxer.hpp | 41 | ||||
| -rw-r--r-- | host/lib/usrp/common/validate_subdev_spec.hpp | 1 | ||||
| -rw-r--r-- | host/lib/usrp/e100/io_impl.cpp | 56 | 
7 files changed, 152 insertions, 96 deletions
| diff --git a/host/examples/rx_multi_samples.cpp b/host/examples/rx_multi_samples.cpp index e820343ca..08b2d399d 100644 --- a/host/examples/rx_multi_samples.cpp +++ b/host/examples/rx_multi_samples.cpp @@ -44,7 +44,7 @@ int UHD_SAFE_MAIN(int argc, char *argv[]){          ("nsamps", po::value<size_t>(&total_num_samps)->default_value(10000), "total number of samples to receive")          ("rate", po::value<double>(&rate)->default_value(100e6/16), "rate of incoming samples")          ("sync", po::value<std::string>(&sync)->default_value("now"), "synchronization method: now, pps") -        ("subdev", po::value<std::string>(&subdev)->default_value(""), "subdev spec (homogeneous across motherboards)") +        ("subdev", po::value<std::string>(&subdev), "subdev spec (homogeneous across motherboards)")          ("dilv", "specify to disable inner-loop verbose")      ;      po::variables_map vm; diff --git a/host/lib/usrp/b100/io_impl.cpp b/host/lib/usrp/b100/io_impl.cpp index 5b38a14ac..fbf341c7f 100644 --- a/host/lib/usrp/b100/io_impl.cpp +++ b/host/lib/usrp/b100/io_impl.cpp @@ -15,6 +15,7 @@  // along with this program.  If not, see <http://www.gnu.org/licenses/>.  // +#include "recv_packet_demuxer.hpp"  #include "validate_subdev_spec.hpp"  #include "../../transport/super_recv_packet_handler.hpp"  #include "../../transport/super_send_packet_handler.hpp" @@ -25,7 +26,6 @@  #include <uhd/transport/bounded_buffer.hpp>  #include <boost/bind.hpp>  #include <boost/format.hpp> -#include <boost/asio.hpp>  #include <boost/bind.hpp>  #include <boost/thread.hpp>  #include <uhd/utils/msg.hpp> @@ -34,55 +34,18 @@  using namespace uhd;  using namespace uhd::usrp;  using namespace uhd::transport; -namespace asio = boost::asio;  /***********************************************************************   * IO Implementation Details   **********************************************************************/  struct b100_impl::io_impl{ -    io_impl(zero_copy_if::sptr data_transport, const size_t recv_width): -        data_transport(data_transport), async_msg_fifo(100/*messages deep*/) -    { -        for (size_t i = 0; i < recv_width; i++){ -            typedef bounded_buffer<managed_recv_buffer::sptr> buffs_queue_type; -            _buffs_queue.push_back(new buffs_queue_type(data_transport->get_num_recv_frames())); -        } -    } - -    ~io_impl(void){ -        for (size_t i = 0; i < _buffs_queue.size(); i++){ -            delete _buffs_queue[i]; -        } -    } +    io_impl(void): +        async_msg_fifo(100/*messages deep*/) +    { /* NOP */ }      zero_copy_if::sptr data_transport;      bounded_buffer<async_metadata_t> async_msg_fifo; -    std::vector<bounded_buffer<managed_recv_buffer::sptr> *> _buffs_queue; - -    //gets buffer, determines if its the requested index, -    //and either queues the buffer or returns the buffer -    managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){ -        while (true){ -            managed_recv_buffer::sptr buff; - -            //attempt to pop a buffer from the queue -            if (_buffs_queue[index]->pop_with_haste(buff)) return buff; - -            //otherwise, call into the transport -            buff = data_transport->get_recv_buff(timeout); -            if (buff.get() == NULL) return buff; //timeout - -            //check the stream id to know which channel -            const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>(); -            const size_t rx_index = uhd::wtohx(vrt_hdr[1]) - B100_RX_SID_BASE; -            if (rx_index == index) return buff; //got expected message - -            //otherwise queue and try again -            if (rx_index < _buffs_queue.size()) _buffs_queue[rx_index]->push_with_pop_on_full(buff); -            else UHD_MSG(error) << "Got a data packet with known SID " << uhd::wtohx(vrt_hdr[1]) << std::endl; -        } -    } - +    recv_packet_demuxer::sptr demuxer;      sph::recv_packet_handler recv_handler;      sph::send_packet_handler send_handler;  }; @@ -109,7 +72,8 @@ void b100_impl::io_init(void){      _fpga_ctrl->poke32(B100_REG_MISC_RX_LEN, 4);      //create new io impl -    _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport, _rx_dsps.size())); +    _io_impl = UHD_PIMPL_MAKE(io_impl, ()); +    _io_impl->demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), B100_RX_SID_BASE);      //now its safe to register the async callback      _fpga_ctrl->set_async_cb(boost::bind(&b100_impl::handle_async_message, this, _1)); @@ -193,7 +157,7 @@ void b100_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){      for (size_t i = 0; i < _io_impl->recv_handler.size(); i++){          _rx_dsps[i]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this          _io_impl->recv_handler.set_xport_chan_get_buff(i, boost::bind( -            &b100_impl::io_impl::get_recv_buff, _io_impl.get(), i, _1 +            &recv_packet_demuxer::get_recv_buff, _io_impl->demuxer, i, _1          ));          _io_impl->recv_handler.set_overflow_handler(i, boost::bind(&rx_dsp_core_200::handle_overflow, _rx_dsps[i]));      } diff --git a/host/lib/usrp/common/CMakeLists.txt b/host/lib/usrp/common/CMakeLists.txt index 31004d952..ec8b60fec 100644 --- a/host/lib/usrp/common/CMakeLists.txt +++ b/host/lib/usrp/common/CMakeLists.txt @@ -20,10 +20,15 @@  ########################################################################  IF(ENABLE_USB)      INCLUDE_DIRECTORIES(${CMAKE_SOURCE_DIR}/../firmware/fx2/common) -    INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})      LIBUHD_APPEND_SOURCES(          ${CMAKE_CURRENT_SOURCE_DIR}/fx2_ctrl.cpp -        ${CMAKE_CURRENT_SOURCE_DIR}/validate_subdev_spec.cpp      )  ENDIF(ENABLE_USB) + +INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) + +LIBUHD_APPEND_SOURCES( +    ${CMAKE_CURRENT_SOURCE_DIR}/validate_subdev_spec.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/recv_packet_demuxer.cpp +) diff --git a/host/lib/usrp/common/recv_packet_demuxer.cpp b/host/lib/usrp/common/recv_packet_demuxer.cpp new file mode 100644 index 000000000..93f423aeb --- /dev/null +++ b/host/lib/usrp/common/recv_packet_demuxer.cpp @@ -0,0 +1,87 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include "recv_packet_demuxer.hpp" +#include <uhd/utils/msg.hpp> +#include <uhd/utils/byteswap.hpp> +#include <boost/thread/mutex.hpp> +#include <queue> +#include <deque> +#include <vector> + +using namespace uhd; +using namespace uhd::usrp; +using namespace uhd::transport; + +static UHD_INLINE boost::uint32_t extract_sid(managed_recv_buffer::sptr &buff){ +    //ASSUME that the data is in little endian format +    return uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]); +} + +class recv_packet_demuxer_impl : public uhd::usrp::recv_packet_demuxer{ +public: +    recv_packet_demuxer_impl( +        transport::zero_copy_if::sptr transport, +        const size_t size, +        const boost::uint32_t sid_base +    ): +        _transport(transport), _sid_base(sid_base), _queues(size) +    { +        /* NOP */ +    } + +    managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){ +        boost::mutex::scoped_lock lock(_mutex); +        managed_recv_buffer::sptr buff; + +        //there is already an entry in the queue, so pop that +        if (not _queues[index].wrapper.empty()){ +            std::swap(buff, _queues[index].wrapper.front()); +            _queues[index].wrapper.pop(); +            return buff; +        } + +        while (true){ +            //otherwise call into the transport +            buff = _transport->get_recv_buff(timeout); +            if (buff.get() == NULL) return buff; //timeout + +            //check the stream id to know which channel +            const size_t rx_index = extract_sid(buff) - _sid_base; +            if (rx_index == index) return buff; //got expected message + +            //otherwise queue and try again +            if (rx_index < _queues.size()) _queues[rx_index].wrapper.push(buff); +            else UHD_MSG(error) << "Got a data packet with known SID " << extract_sid(buff) << std::endl; +        } +    } + +private: +    transport::zero_copy_if::sptr _transport; +    const boost::uint32_t _sid_base; +    boost::mutex _mutex; +    struct channel_guts_type{ +        channel_guts_type(void): wrapper(container){} +        std::deque<managed_recv_buffer::sptr> container; +        std::queue<managed_recv_buffer::sptr> wrapper; +    }; +    std::vector<channel_guts_type> _queues; +}; + +recv_packet_demuxer::sptr recv_packet_demuxer::make(transport::zero_copy_if::sptr transport, const size_t size, const boost::uint32_t sid_base){ +    return sptr(new recv_packet_demuxer_impl(transport, size, sid_base)); +} diff --git a/host/lib/usrp/common/recv_packet_demuxer.hpp b/host/lib/usrp/common/recv_packet_demuxer.hpp new file mode 100644 index 000000000..fde756d27 --- /dev/null +++ b/host/lib/usrp/common/recv_packet_demuxer.hpp @@ -0,0 +1,41 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_HPP +#define INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/cstdint.hpp> + +namespace uhd{ namespace usrp{ + +    class recv_packet_demuxer{ +    public: +        typedef boost::shared_ptr<recv_packet_demuxer> sptr; + +        //! Make a new demuxer from a transport and parameters +        static sptr make(transport::zero_copy_if::sptr transport, const size_t size, const boost::uint32_t sid_base); + +        //! Get a buffer at the given index from the transport +        virtual transport::managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout) = 0; +    }; + +}} //namespace uhd::usrp + +#endif /* INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_HPP */ diff --git a/host/lib/usrp/common/validate_subdev_spec.hpp b/host/lib/usrp/common/validate_subdev_spec.hpp index e73b3e1ee..7d9e2c309 100644 --- a/host/lib/usrp/common/validate_subdev_spec.hpp +++ b/host/lib/usrp/common/validate_subdev_spec.hpp @@ -20,7 +20,6 @@  #include <uhd/config.hpp>  #include <uhd/usrp/subdev_spec.hpp> -#include <uhd/utils/assert_has.hpp>  #include <uhd/property_tree.hpp>  #include <string> diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp index 7af2515a9..14b348f96 100644 --- a/host/lib/usrp/e100/io_impl.cpp +++ b/host/lib/usrp/e100/io_impl.cpp @@ -15,6 +15,7 @@  // along with this program.  If not, see <http://www.gnu.org/licenses/>.  // +#include "recv_packet_demuxer.hpp"  #include "validate_subdev_spec.hpp"  #include "../../transport/super_recv_packet_handler.hpp"  #include "../../transport/super_send_packet_handler.hpp" @@ -46,59 +47,17 @@ using namespace uhd::transport;   * - vrt packet handler states   **********************************************************************/  struct e100_impl::io_impl{ -    io_impl(zero_copy_if::sptr data_transport, const size_t recv_width): -        false_alarm(0), -        data_transport(data_transport), -        async_msg_fifo(100/*messages deep*/) -    { -        for (size_t i = 0; i < recv_width; i++){ -            typedef bounded_buffer<managed_recv_buffer::sptr> buffs_queue_type; -            _buffs_queue.push_back(new buffs_queue_type(data_transport->get_num_recv_frames())); -        } -    } - -    ~io_impl(void){ -        recv_pirate_crew.interrupt_all(); -        recv_pirate_crew.join_all(); -        for (size_t i = 0; i < _buffs_queue.size(); i++){ -            delete _buffs_queue[i]; -        } -    } +    io_impl(void): +        false_alarm(0), async_msg_fifo(100/*messages deep*/) +    { /* NOP */ }      double tick_rate; //set by update tick rate method      e100_ctrl::sptr iface; //so handle irq can peek and poke      void handle_irq(void);      size_t false_alarm; - -    std::vector<bounded_buffer<managed_recv_buffer::sptr> *> _buffs_queue; - -    //gets buffer, determines if its the requested index, -    //and either queues the buffer or returns the buffer -    managed_recv_buffer::sptr get_recv_buff(const size_t index, const double timeout){ -        while (true){ -            managed_recv_buffer::sptr buff; - -            //attempt to pop a buffer from the queue -            if (_buffs_queue[index]->pop_with_haste(buff)) return buff; - -            //otherwise, call into the transport -            buff = data_transport->get_recv_buff(timeout); -            if (buff.get() == NULL) return buff; //timeout - -            //check the stream id to know which channel -            const boost::uint32_t *vrt_hdr = buff->cast<const boost::uint32_t *>(); -            const size_t rx_index = uhd::wtohx(vrt_hdr[1]) - E100_RX_SID_BASE; -            if (rx_index == index) return buff; //got expected message - -            //otherwise queue and try again -            if (rx_index < _buffs_queue.size()) _buffs_queue[rx_index]->push_with_pop_on_full(buff); -            else UHD_MSG(error) << "Got a data packet with known SID " << uhd::wtohx(vrt_hdr[1]) << std::endl; -        } -    } -      //The data transport is listed first so that it is deconstructed last,      //which is after the states and booty which may hold managed buffers. -    zero_copy_if::sptr data_transport; +    recv_packet_demuxer::sptr demuxer;      //state management for the vrt packet handler code      sph::recv_packet_handler recv_handler; @@ -213,7 +172,8 @@ void e100_impl::io_init(void){      _tx_otw_type.byteorder = uhd::otw_type_t::BO_LITTLE_ENDIAN;      //create new io impl -    _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport, _rx_dsps.size())); +    _io_impl = UHD_PIMPL_MAKE(io_impl, ()); +    _io_impl->demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), E100_RX_SID_BASE);      _io_impl->iface = _fpga_ctrl;      //clear state machines @@ -280,7 +240,7 @@ void e100_impl::update_rx_subdev_spec(const uhd::usrp::subdev_spec_t &spec){      for (size_t i = 0; i < _io_impl->recv_handler.size(); i++){          _rx_dsps[i]->set_nsamps_per_packet(get_max_recv_samps_per_packet()); //seems to be a good place to set this          _io_impl->recv_handler.set_xport_chan_get_buff(i, boost::bind( -            &e100_impl::io_impl::get_recv_buff, _io_impl.get(), i, _1 +            &recv_packet_demuxer::get_recv_buff, _io_impl->demuxer, i, _1          ));          _io_impl->recv_handler.set_overflow_handler(i, boost::bind(&rx_dsp_core_200::handle_overflow, _rx_dsps[i]));      } | 
