diff options
-rw-r--r-- | host/include/uhd/transport/muxed_zero_copy_if.hpp | 72 | ||||
-rw-r--r-- | host/lib/transport/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/lib/transport/muxed_zero_copy_if.cpp | 250 |
3 files changed, 323 insertions, 0 deletions
diff --git a/host/include/uhd/transport/muxed_zero_copy_if.hpp b/host/include/uhd/transport/muxed_zero_copy_if.hpp new file mode 100644 index 000000000..17213885e --- /dev/null +++ b/host/include/uhd/transport/muxed_zero_copy_if.hpp @@ -0,0 +1,72 @@ +// +// Copyright 2016 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_LIBUHD_TRANSPORT_MUXED_ZERO_COPY_IF_HPP +#define INCLUDED_LIBUHD_TRANSPORT_MUXED_ZERO_COPY_IF_HPP + +#include <uhd/transport/zero_copy.hpp> +#include <uhd/config.hpp> +#include <boost/function.hpp> +#include <boost/noncopyable.hpp> +#include <stdint.h> + +namespace uhd { namespace transport { + +/*! + * Implements a software muxed-demuxed zero-copy transport + * This is a wrapper around a base transport that allows + * creation of virtual zero_copy_if streams that are + * indistinguishable from physical transport streams. + * This class handles demuxing receive streams into the + * appropriate virtual streams with the given classifier + * function. A worker therad is spawned to handle the demuxing. + */ +class muxed_zero_copy_if : private boost::noncopyable { +public: + typedef boost::shared_ptr<muxed_zero_copy_if> sptr; + + /*! + * Function to classify the stream based on the payload. + * The classifier must return a stream number, an arbitrary + * identifier for a virtual stream that is consistent with + * the stream number used in the make_stream and remove_stream + * fuctions + * \param buff a pointer to the payload of the frame + * \param size number of bytes in the frame payload + * \return stream number + */ + typedef boost::function<uint32_t(void* buff, size_t size)> stream_classifier_fn; + + //! virtual dtor + virtual ~muxed_zero_copy_if() {} + + //! Make a virtual transport for the specified stream number + virtual zero_copy_if::sptr make_stream(const uint32_t stream_num) = 0; + + //! Unregister the stream number. All packets destined to the stream will be dropped. + virtual void remove_stream(const uint32_t stream_num) = 0; + + //! Get number of frames dropped due to unregistered streams + virtual size_t get_num_dropped_frames() const = 0; + + //! Make a new demuxer from a transport and parameters + static sptr make(zero_copy_if::sptr base_xport, stream_classifier_fn classify_fn, size_t max_streams); +}; + +}} //namespace uhd::transport + +#endif /* INCLUDED_LIBUHD_TRANSPORT_MUXED_ZERO_COPY_IF_HPP */ diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt index db21b9f8e..44c8d59af 100644 --- a/host/lib/transport/CMakeLists.txt +++ b/host/lib/transport/CMakeLists.txt @@ -128,6 +128,7 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp ${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp ${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp ) IF(ENABLE_X300) diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp new file mode 100644 index 000000000..996db3c98 --- /dev/null +++ b/host/lib/transport/muxed_zero_copy_if.cpp @@ -0,0 +1,250 @@ +// +// Copyright 2016 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/transport/muxed_zero_copy_if.hpp> +#include <uhd/transport/bounded_buffer.hpp> +#include <uhd/exception.hpp> +#include <uhd/utils/safe_call.hpp> +#include <boost/enable_shared_from_this.hpp> +#include <boost/make_shared.hpp> +#include <boost/thread.hpp> +#include <boost/thread/locks.hpp> +#include <map> + +using namespace uhd; +using namespace uhd::transport; + +class muxed_zero_copy_if_impl : public muxed_zero_copy_if, + public boost::enable_shared_from_this<muxed_zero_copy_if_impl> +{ +public: + typedef boost::shared_ptr<muxed_zero_copy_if_impl> sptr; + + muxed_zero_copy_if_impl( + zero_copy_if::sptr base_xport, + stream_classifier_fn classify_fn, + size_t max_streams + ): + _base_xport(base_xport), _classify(classify_fn), + _max_num_streams(max_streams), _num_dropped_frames(0) + { + //Create the receive thread to poll the underlying transport + //and classify packets into queues + _recv_thread = boost::thread( + boost::bind(&muxed_zero_copy_if_impl::_update_queues, this)); + } + + virtual ~muxed_zero_copy_if_impl() + { + UHD_SAFE_CALL( + //Interrupt buffer updater loop + _recv_thread.interrupt(); + //Wait for loop to finish + //No timeout on join. The recv loop is guaranteed + //to terminate in a reasonable amount of time because + //there are no timed blocks on the underlying. + _recv_thread.join(); + //Flush base transport + while (_base_xport->get_recv_buff(0.0001)) /*NOP*/; + //Release child streams + //Note that this will not delete or flush the child streams + //until the owners of the streams have released the respective + //shared pointers. This ensures that packets are not dropped. + _streams.clear(); + ); + } + + virtual zero_copy_if::sptr make_stream(const uint32_t stream_num) + { + boost::lock_guard<boost::mutex> lock(_mutex); + if (_streams.size() >= _max_num_streams) { + throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams."); + } + stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num); + _streams[stream_num] = stream; + return stream; + } + + virtual size_t get_num_dropped_frames() const + { + return _num_dropped_frames; + } + + void remove_stream(const uint32_t stream_num) + { + boost::lock_guard<boost::mutex> lock(_mutex); + _streams.erase(stream_num); + } + +private: + class stream_impl : public zero_copy_if + { + public: + typedef boost::shared_ptr<stream_impl> sptr; + typedef boost::weak_ptr<stream_impl> wptr; + + stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num): + _stream_num(stream_num), _muxed_xport(muxed_xport), + _buff_queue(muxed_xport->base_xport()->get_num_recv_frames()) + { + } + + ~stream_impl(void) + { + //First remove the stream from muxed transport + //so no more frames are pushed in + _muxed_xport->remove_stream(_stream_num); + //Flush the transport + managed_recv_buffer::sptr buff; + while (_buff_queue.pop_with_haste(buff)) { + //NOP + } + } + + size_t get_num_recv_frames(void) const { + return _muxed_xport->base_xport()->get_num_recv_frames(); + } + + size_t get_recv_frame_size(void) const { + return _muxed_xport->base_xport()->get_recv_frame_size(); + } + + managed_recv_buffer::sptr get_recv_buff(double timeout) { + managed_recv_buffer::sptr buff; + if (_buff_queue.pop_with_timed_wait(buff, timeout)) { + return buff; + } else { + return managed_recv_buffer::sptr(); + } + + } + + void push_recv_buff(managed_recv_buffer::sptr buff) { + _buff_queue.push_with_wait(buff); + } + + size_t get_num_send_frames(void) const { + return _muxed_xport->base_xport()->get_num_send_frames(); + } + + size_t get_send_frame_size(void) const { + return _muxed_xport->base_xport()->get_send_frame_size(); + } + + managed_send_buffer::sptr get_send_buff(double timeout) + { + return _muxed_xport->base_xport()->get_send_buff(timeout); + } + + private: + const uint32_t _stream_num; + muxed_zero_copy_if_impl::sptr _muxed_xport; + bounded_buffer<managed_recv_buffer::sptr> _buff_queue; + }; + + inline zero_copy_if::sptr& base_xport() { return _base_xport; } + + void _update_queues() + { + //Run forever: + // - Pull packets from the base transport + // - Classify them + // - Push them to the appropriate receive queue + while (true) { + { //Uninterruptable block of code + boost::this_thread::disable_interruption interrupt_disabler; + if (not _process_next_buffer()) { + //Be a good citizen and yield if no packet is processed + static const size_t MIN_DUR = 1; + boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR)); + //We call sleep(MIN_DUR) above instead of yield() to ensure that we + //relinquish the current scheduler time slot. + //yield() is a hint to the scheduler to end the time + //slice early and schedule in another thread that is ready to run. + //However in most situations, there will be no other thread and + //this thread will continue to run which will rail a CPU core. + //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time. + //Ideally we would like to use boost::chrono::.*seconds::min() but that + //is bound to 0, which causes the sleep_for call to be a no-op and + //thus useless to actually force a sleep. + + //**************************************************************** + //NOTE: This behavior makes this transport a poor choice for + // low latency communication. + //**************************************************************** + } + } + //Check if the master thread has requested a shutdown + if (boost::this_thread::interruption_requested()) break; + } + } + + bool _process_next_buffer() + { + managed_recv_buffer::sptr buff = _base_xport->get_recv_buff(0.0); + if (buff) { + stream_impl::sptr stream; + try { + const uint32_t stream_num = _classify(buff->cast<void*>(), _base_xport->get_recv_frame_size()); + { + //Hold the stream mutex long enough to pull a bounded buffer + //and lock it (increment its ref count). + boost::lock_guard<boost::mutex> lock(_mutex); + stream_map_t::iterator str_iter = _streams.find(stream_num); + if (str_iter != _streams.end()) { + stream = (*str_iter).second.lock(); + } + } + } catch (std::exception&) { + //If _classify throws we simply drop the frame + } + //Once a bounded buffer is acquired, we can rely on its + //thread safety to serialize with the consumer. + if (stream.get()) { + stream->push_recv_buff(buff); + } else { + boost::lock_guard<boost::mutex> lock(_mutex); + _num_dropped_frames++; + } + //We processed a packet, and there could be more coming + //Don't yield in the next iteration. + return true; + } else { + //The base transport is idle. Return false to let the + //thread yield. + return false; + } + } + + typedef std::map<uint32_t, stream_impl::wptr> stream_map_t; + + zero_copy_if::sptr _base_xport; + stream_classifier_fn _classify; + stream_map_t _streams; + const size_t _max_num_streams; + size_t _num_dropped_frames; + boost::thread _recv_thread; + boost::mutex _mutex; +}; + +muxed_zero_copy_if::sptr muxed_zero_copy_if::make( + zero_copy_if::sptr base_xport, + muxed_zero_copy_if::stream_classifier_fn classify_fn, + size_t max_streams +) { + return boost::make_shared<muxed_zero_copy_if_impl>(base_xport, classify_fn, max_streams); +} |