diff options
Diffstat (limited to 'host')
| -rw-r--r-- | host/lib/usrp/b100/b100_impl.cpp | 2 | ||||
| -rw-r--r-- | host/lib/usrp/b100/b100_impl.hpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/b100/io_impl.cpp | 8 | ||||
| -rw-r--r-- | host/lib/usrp/common/recv_packet_demuxer_3000.hpp | 114 | 
4 files changed, 119 insertions, 9 deletions
| diff --git a/host/lib/usrp/b100/b100_impl.cpp b/host/lib/usrp/b100/b100_impl.cpp index b3237d547..1673f5f3c 100644 --- a/host/lib/usrp/b100/b100_impl.cpp +++ b/host/lib/usrp/b100/b100_impl.cpp @@ -472,7 +472,7 @@ b100_impl::b100_impl(const device_addr_t &device_addr){      }      //initialize io handling -    _recv_demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), B100_RX_SID_BASE); +    _recv_demuxer.reset(new recv_packet_demuxer_3000(_data_transport));      //allocate streamer weak ptrs containers      _rx_streamers.resize(_rx_dsps.size()); diff --git a/host/lib/usrp/b100/b100_impl.hpp b/host/lib/usrp/b100/b100_impl.hpp index 68d7043a1..5f4899eb3 100644 --- a/host/lib/usrp/b100/b100_impl.hpp +++ b/host/lib/usrp/b100/b100_impl.hpp @@ -29,7 +29,7 @@  #include "time64_core_200.hpp"  #include "fifo_ctrl_excelsior.hpp"  #include "user_settings_core_200.hpp" -#include "recv_packet_demuxer.hpp" +#include "recv_packet_demuxer_3000.hpp"  #include <uhd/device.hpp>  #include <uhd/property_tree.hpp>  #include <uhd/types/dict.hpp> @@ -105,7 +105,7 @@ private:      //transports      uhd::transport::zero_copy_if::sptr _ctrl_transport;      uhd::transport::zero_copy_if::sptr _data_transport; -    uhd::usrp::recv_packet_demuxer::sptr _recv_demuxer; +    boost::shared_ptr<uhd::usrp::recv_packet_demuxer_3000> _recv_demuxer;      //dboard stuff      uhd::usrp::dboard_manager::sptr _dboard_manager; diff --git a/host/lib/usrp/b100/io_impl.cpp b/host/lib/usrp/b100/io_impl.cpp index bcf712cd9..9b5d4d25c 100644 --- a/host/lib/usrp/b100/io_impl.cpp +++ b/host/lib/usrp/b100/io_impl.cpp @@ -1,5 +1,5 @@  // -// Copyright 2011-2012 Ettus Research LLC +// Copyright 2011-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -153,17 +153,13 @@ rx_streamer::sptr b100_impl::get_rx_stream(const uhd::stream_args_t &args_){      id.num_outputs = 1;      my_streamer->set_converter(id); -    //flush stuff -    _data_transport->get_recv_buff(-1.0); //negative flushes!! -    _recv_demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), B100_RX_SID_BASE); -      //bind callbacks for the handler      for (size_t chan_i = 0; chan_i < args.channels.size(); chan_i++){          const size_t dsp = args.channels[chan_i];          _rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this          _rx_dsps[dsp]->setup(args);          my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( -            &recv_packet_demuxer::get_recv_buff, _recv_demuxer, dsp, _1 +            &recv_packet_demuxer_3000::get_recv_buff, _recv_demuxer, B100_RX_SID_BASE + dsp, _1          ), true /*flush*/);          my_streamer->set_overflow_handler(chan_i, boost::bind(              &rx_dsp_core_200::handle_overflow, _rx_dsps[dsp] diff --git a/host/lib/usrp/common/recv_packet_demuxer_3000.hpp b/host/lib/usrp/common/recv_packet_demuxer_3000.hpp new file mode 100644 index 000000000..d4c899f1c --- /dev/null +++ b/host/lib/usrp/common/recv_packet_demuxer_3000.hpp @@ -0,0 +1,114 @@ +// +// Copyright 2013 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_3000_HPP +#define INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_3000_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/cstdint.hpp> +#include <boost/thread.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/atomic.hpp> +#include <uhd/types/time_spec.hpp> +#include <uhd/utils/byteswap.hpp> +#include <queue> +#include <map> + +namespace uhd{ namespace usrp{ + +    struct recv_packet_demuxer_3000 +    { +        recv_packet_demuxer_3000(transport::zero_copy_if::sptr xport): +            _xport(xport) +        {/*NOP*/} + +        transport::managed_recv_buffer::sptr get_recv_buff(const boost::uint32_t sid, const double timeout) +        { +            const time_spec_t exit_time = time_spec_t(timeout) + time_spec_t::get_system_time(); +            transport::managed_recv_buffer::sptr buff; +            buff = _internal_get_recv_buff(sid, timeout); +            while (not buff) //loop until timeout +            { +                const time_spec_t delta = exit_time - time_spec_t::get_system_time(); +                const double new_timeout = delta.get_real_secs(); +                if (new_timeout < 0.0) break; +                buff = _internal_get_recv_buff(sid, new_timeout); +            } +            return buff; +        } + +        transport::managed_recv_buffer::sptr _internal_get_recv_buff(const boost::uint32_t sid, const double timeout) +        { +            transport::managed_recv_buffer::sptr buff; + +            //---------------------------------------------------------- +            //-- Check the queue to see if we already have a buffer +            //---------------------------------------------------------- +            { +                boost::mutex::scoped_lock l(mutex); +                queue_type_t &queue = _queues[sid]; +                if (not queue.empty()) +                { +                    buff = queue.front(); +                    queue.pop(); +                    return buff; +                } +            } + +            //---------------------------------------------------------- +            //-- Try to claim the transport or wait patiently +            //---------------------------------------------------------- +            if (_claimed.cas(1, 0)) +            { +                boost::mutex::scoped_lock l(mutex); +                cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6))); +            } + +            //---------------------------------------------------------- +            //-- Wait on the transport for input buffers +            //---------------------------------------------------------- +            else +            { +                buff = _xport->get_recv_buff(timeout); +                if (buff) +                { +                    const boost::uint32_t new_sid = uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]); +                    if (new_sid != sid) +                    { +                        boost::mutex::scoped_lock l(mutex); +                        _queues[new_sid].push(buff); +                        buff.reset(); +                    } +                } +                _claimed.write(0); +                cond.notify_all(); +            } +            return buff; +        } + +        typedef std::queue<transport::managed_recv_buffer::sptr> queue_type_t; +        std::map<boost::uint32_t, queue_type_t> _queues; +        transport::zero_copy_if::sptr _xport; +        uhd::atomic_uint32_t _claimed; +        boost::condition_variable cond; +        boost::mutex mutex; +    }; + +}} //namespace uhd::usrp + +#endif /* INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_3000_HPP */ | 
