diff options
author | Josh Blum <josh@joshknows.com> | 2013-07-15 15:19:58 -0700 |
---|---|---|
committer | Josh Blum <josh@joshknows.com> | 2013-07-15 15:21:05 -0700 |
commit | 4322666dc134998c1420ee00c6e01eddc207e5b4 (patch) | |
tree | 7f98c7b4c33f8e5ad0e7a3dac79f0acfe2da8cb2 /host | |
parent | a7153fecdb1416df03f21467c52dbb35cb675f6f (diff) | |
download | uhd-4322666dc134998c1420ee00c6e01eddc207e5b4.tar.gz uhd-4322666dc134998c1420ee00c6e01eddc207e5b4.tar.bz2 uhd-4322666dc134998c1420ee00c6e01eddc207e5b4.zip |
b100: switch to new packet demuxer
Diffstat (limited to 'host')
-rw-r--r-- | host/lib/usrp/b100/b100_impl.cpp | 2 | ||||
-rw-r--r-- | host/lib/usrp/b100/b100_impl.hpp | 4 | ||||
-rw-r--r-- | host/lib/usrp/b100/io_impl.cpp | 8 | ||||
-rw-r--r-- | host/lib/usrp/common/recv_packet_demuxer_3000.hpp | 114 |
4 files changed, 119 insertions, 9 deletions
diff --git a/host/lib/usrp/b100/b100_impl.cpp b/host/lib/usrp/b100/b100_impl.cpp index b3237d547..1673f5f3c 100644 --- a/host/lib/usrp/b100/b100_impl.cpp +++ b/host/lib/usrp/b100/b100_impl.cpp @@ -472,7 +472,7 @@ b100_impl::b100_impl(const device_addr_t &device_addr){ } //initialize io handling - _recv_demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), B100_RX_SID_BASE); + _recv_demuxer.reset(new recv_packet_demuxer_3000(_data_transport)); //allocate streamer weak ptrs containers _rx_streamers.resize(_rx_dsps.size()); diff --git a/host/lib/usrp/b100/b100_impl.hpp b/host/lib/usrp/b100/b100_impl.hpp index 68d7043a1..5f4899eb3 100644 --- a/host/lib/usrp/b100/b100_impl.hpp +++ b/host/lib/usrp/b100/b100_impl.hpp @@ -29,7 +29,7 @@ #include "time64_core_200.hpp" #include "fifo_ctrl_excelsior.hpp" #include "user_settings_core_200.hpp" -#include "recv_packet_demuxer.hpp" +#include "recv_packet_demuxer_3000.hpp" #include <uhd/device.hpp> #include <uhd/property_tree.hpp> #include <uhd/types/dict.hpp> @@ -105,7 +105,7 @@ private: //transports uhd::transport::zero_copy_if::sptr _ctrl_transport; uhd::transport::zero_copy_if::sptr _data_transport; - uhd::usrp::recv_packet_demuxer::sptr _recv_demuxer; + boost::shared_ptr<uhd::usrp::recv_packet_demuxer_3000> _recv_demuxer; //dboard stuff uhd::usrp::dboard_manager::sptr _dboard_manager; diff --git a/host/lib/usrp/b100/io_impl.cpp b/host/lib/usrp/b100/io_impl.cpp index bcf712cd9..9b5d4d25c 100644 --- a/host/lib/usrp/b100/io_impl.cpp +++ b/host/lib/usrp/b100/io_impl.cpp @@ -1,5 +1,5 @@ // -// Copyright 2011-2012 Ettus Research LLC +// Copyright 2011-2013 Ettus Research LLC // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU General Public License as published by @@ -153,17 +153,13 @@ rx_streamer::sptr b100_impl::get_rx_stream(const uhd::stream_args_t &args_){ id.num_outputs = 1; my_streamer->set_converter(id); - //flush stuff - _data_transport->get_recv_buff(-1.0); //negative flushes!! - _recv_demuxer = recv_packet_demuxer::make(_data_transport, _rx_dsps.size(), B100_RX_SID_BASE); - //bind callbacks for the handler for (size_t chan_i = 0; chan_i < args.channels.size(); chan_i++){ const size_t dsp = args.channels[chan_i]; _rx_dsps[dsp]->set_nsamps_per_packet(spp); //seems to be a good place to set this _rx_dsps[dsp]->setup(args); my_streamer->set_xport_chan_get_buff(chan_i, boost::bind( - &recv_packet_demuxer::get_recv_buff, _recv_demuxer, dsp, _1 + &recv_packet_demuxer_3000::get_recv_buff, _recv_demuxer, B100_RX_SID_BASE + dsp, _1 ), true /*flush*/); my_streamer->set_overflow_handler(chan_i, boost::bind( &rx_dsp_core_200::handle_overflow, _rx_dsps[dsp] diff --git a/host/lib/usrp/common/recv_packet_demuxer_3000.hpp b/host/lib/usrp/common/recv_packet_demuxer_3000.hpp new file mode 100644 index 000000000..d4c899f1c --- /dev/null +++ b/host/lib/usrp/common/recv_packet_demuxer_3000.hpp @@ -0,0 +1,114 @@ +// +// Copyright 2013 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_3000_HPP +#define INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_3000_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/cstdint.hpp> +#include <boost/thread.hpp> +#include <uhd/utils/msg.hpp> +#include <uhd/utils/atomic.hpp> +#include <uhd/types/time_spec.hpp> +#include <uhd/utils/byteswap.hpp> +#include <queue> +#include <map> + +namespace uhd{ namespace usrp{ + + struct recv_packet_demuxer_3000 + { + recv_packet_demuxer_3000(transport::zero_copy_if::sptr xport): + _xport(xport) + {/*NOP*/} + + transport::managed_recv_buffer::sptr get_recv_buff(const boost::uint32_t sid, const double timeout) + { + const time_spec_t exit_time = time_spec_t(timeout) + time_spec_t::get_system_time(); + transport::managed_recv_buffer::sptr buff; + buff = _internal_get_recv_buff(sid, timeout); + while (not buff) //loop until timeout + { + const time_spec_t delta = exit_time - time_spec_t::get_system_time(); + const double new_timeout = delta.get_real_secs(); + if (new_timeout < 0.0) break; + buff = _internal_get_recv_buff(sid, new_timeout); + } + return buff; + } + + transport::managed_recv_buffer::sptr _internal_get_recv_buff(const boost::uint32_t sid, const double timeout) + { + transport::managed_recv_buffer::sptr buff; + + //---------------------------------------------------------- + //-- Check the queue to see if we already have a buffer + //---------------------------------------------------------- + { + boost::mutex::scoped_lock l(mutex); + queue_type_t &queue = _queues[sid]; + if (not queue.empty()) + { + buff = queue.front(); + queue.pop(); + return buff; + } + } + + //---------------------------------------------------------- + //-- Try to claim the transport or wait patiently + //---------------------------------------------------------- + if (_claimed.cas(1, 0)) + { + boost::mutex::scoped_lock l(mutex); + cond.timed_wait(l, boost::posix_time::microseconds(long(timeout*1e6))); + } + + //---------------------------------------------------------- + //-- Wait on the transport for input buffers + //---------------------------------------------------------- + else + { + buff = _xport->get_recv_buff(timeout); + if (buff) + { + const boost::uint32_t new_sid = uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]); + if (new_sid != sid) + { + boost::mutex::scoped_lock l(mutex); + _queues[new_sid].push(buff); + buff.reset(); + } + } + _claimed.write(0); + cond.notify_all(); + } + return buff; + } + + typedef std::queue<transport::managed_recv_buffer::sptr> queue_type_t; + std::map<boost::uint32_t, queue_type_t> _queues; + transport::zero_copy_if::sptr _xport; + uhd::atomic_uint32_t _claimed; + boost::condition_variable cond; + boost::mutex mutex; + }; + +}} //namespace uhd::usrp + +#endif /* INCLUDED_LIBUHD_USRP_COMMON_RECV_PACKET_DEMUXER_3000_HPP */ |