aboutsummaryrefslogtreecommitdiffstats
path: root/host/lib/transport
diff options
context:
space:
mode:
authorAshish Chaudhari <ashish@ettus.com>2016-04-15 13:40:27 -0700
committerMartin Braun <martin.braun@ettus.com>2016-07-29 17:55:56 -0700
commit368c9ca04d5723d5e6adfcb1a795ebe688064dd4 (patch)
tree1c989e87fe658d0149c4a13b1e6fc484c37f40bb /host/lib/transport
parentb9c3688d05defd71d7a24e7a750dc8954733ee8b (diff)
downloaduhd-368c9ca04d5723d5e6adfcb1a795ebe688064dd4.tar.gz
uhd-368c9ca04d5723d5e6adfcb1a795ebe688064dd4.tar.bz2
uhd-368c9ca04d5723d5e6adfcb1a795ebe688064dd4.zip
usrp3: Added SW muxer/demuxer for the zero_copy interface
- Can be used to share a single zero_copy_if object with multiple streams - Allows creating of virtual streams that are indistinguishable from low-level streams
Diffstat (limited to 'host/lib/transport')
-rw-r--r--host/lib/transport/CMakeLists.txt1
-rw-r--r--host/lib/transport/muxed_zero_copy_if.cpp250
2 files changed, 251 insertions, 0 deletions
diff --git a/host/lib/transport/CMakeLists.txt b/host/lib/transport/CMakeLists.txt
index db21b9f8e..44c8d59af 100644
--- a/host/lib/transport/CMakeLists.txt
+++ b/host/lib/transport/CMakeLists.txt
@@ -128,6 +128,7 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/if_addrs.cpp
${CMAKE_CURRENT_SOURCE_DIR}/udp_simple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/chdr.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/muxed_zero_copy_if.cpp
)
IF(ENABLE_X300)
diff --git a/host/lib/transport/muxed_zero_copy_if.cpp b/host/lib/transport/muxed_zero_copy_if.cpp
new file mode 100644
index 000000000..996db3c98
--- /dev/null
+++ b/host/lib/transport/muxed_zero_copy_if.cpp
@@ -0,0 +1,250 @@
+//
+// Copyright 2016 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/transport/muxed_zero_copy_if.hpp>
+#include <uhd/transport/bounded_buffer.hpp>
+#include <uhd/exception.hpp>
+#include <uhd/utils/safe_call.hpp>
+#include <boost/enable_shared_from_this.hpp>
+#include <boost/make_shared.hpp>
+#include <boost/thread.hpp>
+#include <boost/thread/locks.hpp>
+#include <map>
+
+using namespace uhd;
+using namespace uhd::transport;
+
+class muxed_zero_copy_if_impl : public muxed_zero_copy_if,
+ public boost::enable_shared_from_this<muxed_zero_copy_if_impl>
+{
+public:
+ typedef boost::shared_ptr<muxed_zero_copy_if_impl> sptr;
+
+ muxed_zero_copy_if_impl(
+ zero_copy_if::sptr base_xport,
+ stream_classifier_fn classify_fn,
+ size_t max_streams
+ ):
+ _base_xport(base_xport), _classify(classify_fn),
+ _max_num_streams(max_streams), _num_dropped_frames(0)
+ {
+ //Create the receive thread to poll the underlying transport
+ //and classify packets into queues
+ _recv_thread = boost::thread(
+ boost::bind(&muxed_zero_copy_if_impl::_update_queues, this));
+ }
+
+ virtual ~muxed_zero_copy_if_impl()
+ {
+ UHD_SAFE_CALL(
+ //Interrupt buffer updater loop
+ _recv_thread.interrupt();
+ //Wait for loop to finish
+ //No timeout on join. The recv loop is guaranteed
+ //to terminate in a reasonable amount of time because
+ //there are no timed blocks on the underlying.
+ _recv_thread.join();
+ //Flush base transport
+ while (_base_xport->get_recv_buff(0.0001)) /*NOP*/;
+ //Release child streams
+ //Note that this will not delete or flush the child streams
+ //until the owners of the streams have released the respective
+ //shared pointers. This ensures that packets are not dropped.
+ _streams.clear();
+ );
+ }
+
+ virtual zero_copy_if::sptr make_stream(const uint32_t stream_num)
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ if (_streams.size() >= _max_num_streams) {
+ throw uhd::runtime_error("muxed_zero_copy_if: stream capacity exceeded. cannot create more streams.");
+ }
+ stream_impl::sptr stream = boost::make_shared<stream_impl>(this->shared_from_this(), stream_num);
+ _streams[stream_num] = stream;
+ return stream;
+ }
+
+ virtual size_t get_num_dropped_frames() const
+ {
+ return _num_dropped_frames;
+ }
+
+ void remove_stream(const uint32_t stream_num)
+ {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ _streams.erase(stream_num);
+ }
+
+private:
+ class stream_impl : public zero_copy_if
+ {
+ public:
+ typedef boost::shared_ptr<stream_impl> sptr;
+ typedef boost::weak_ptr<stream_impl> wptr;
+
+ stream_impl(muxed_zero_copy_if_impl::sptr muxed_xport, const uint32_t stream_num):
+ _stream_num(stream_num), _muxed_xport(muxed_xport),
+ _buff_queue(muxed_xport->base_xport()->get_num_recv_frames())
+ {
+ }
+
+ ~stream_impl(void)
+ {
+ //First remove the stream from muxed transport
+ //so no more frames are pushed in
+ _muxed_xport->remove_stream(_stream_num);
+ //Flush the transport
+ managed_recv_buffer::sptr buff;
+ while (_buff_queue.pop_with_haste(buff)) {
+ //NOP
+ }
+ }
+
+ size_t get_num_recv_frames(void) const {
+ return _muxed_xport->base_xport()->get_num_recv_frames();
+ }
+
+ size_t get_recv_frame_size(void) const {
+ return _muxed_xport->base_xport()->get_recv_frame_size();
+ }
+
+ managed_recv_buffer::sptr get_recv_buff(double timeout) {
+ managed_recv_buffer::sptr buff;
+ if (_buff_queue.pop_with_timed_wait(buff, timeout)) {
+ return buff;
+ } else {
+ return managed_recv_buffer::sptr();
+ }
+
+ }
+
+ void push_recv_buff(managed_recv_buffer::sptr buff) {
+ _buff_queue.push_with_wait(buff);
+ }
+
+ size_t get_num_send_frames(void) const {
+ return _muxed_xport->base_xport()->get_num_send_frames();
+ }
+
+ size_t get_send_frame_size(void) const {
+ return _muxed_xport->base_xport()->get_send_frame_size();
+ }
+
+ managed_send_buffer::sptr get_send_buff(double timeout)
+ {
+ return _muxed_xport->base_xport()->get_send_buff(timeout);
+ }
+
+ private:
+ const uint32_t _stream_num;
+ muxed_zero_copy_if_impl::sptr _muxed_xport;
+ bounded_buffer<managed_recv_buffer::sptr> _buff_queue;
+ };
+
+ inline zero_copy_if::sptr& base_xport() { return _base_xport; }
+
+ void _update_queues()
+ {
+ //Run forever:
+ // - Pull packets from the base transport
+ // - Classify them
+ // - Push them to the appropriate receive queue
+ while (true) {
+ { //Uninterruptable block of code
+ boost::this_thread::disable_interruption interrupt_disabler;
+ if (not _process_next_buffer()) {
+ //Be a good citizen and yield if no packet is processed
+ static const size_t MIN_DUR = 1;
+ boost::this_thread::sleep_for(boost::chrono::nanoseconds(MIN_DUR));
+ //We call sleep(MIN_DUR) above instead of yield() to ensure that we
+ //relinquish the current scheduler time slot.
+ //yield() is a hint to the scheduler to end the time
+ //slice early and schedule in another thread that is ready to run.
+ //However in most situations, there will be no other thread and
+ //this thread will continue to run which will rail a CPU core.
+ //We call sleep(MIN_DUR=1) instead which will sleep for a minimum time.
+ //Ideally we would like to use boost::chrono::.*seconds::min() but that
+ //is bound to 0, which causes the sleep_for call to be a no-op and
+ //thus useless to actually force a sleep.
+
+ //****************************************************************
+ //NOTE: This behavior makes this transport a poor choice for
+ // low latency communication.
+ //****************************************************************
+ }
+ }
+ //Check if the master thread has requested a shutdown
+ if (boost::this_thread::interruption_requested()) break;
+ }
+ }
+
+ bool _process_next_buffer()
+ {
+ managed_recv_buffer::sptr buff = _base_xport->get_recv_buff(0.0);
+ if (buff) {
+ stream_impl::sptr stream;
+ try {
+ const uint32_t stream_num = _classify(buff->cast<void*>(), _base_xport->get_recv_frame_size());
+ {
+ //Hold the stream mutex long enough to pull a bounded buffer
+ //and lock it (increment its ref count).
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ stream_map_t::iterator str_iter = _streams.find(stream_num);
+ if (str_iter != _streams.end()) {
+ stream = (*str_iter).second.lock();
+ }
+ }
+ } catch (std::exception&) {
+ //If _classify throws we simply drop the frame
+ }
+ //Once a bounded buffer is acquired, we can rely on its
+ //thread safety to serialize with the consumer.
+ if (stream.get()) {
+ stream->push_recv_buff(buff);
+ } else {
+ boost::lock_guard<boost::mutex> lock(_mutex);
+ _num_dropped_frames++;
+ }
+ //We processed a packet, and there could be more coming
+ //Don't yield in the next iteration.
+ return true;
+ } else {
+ //The base transport is idle. Return false to let the
+ //thread yield.
+ return false;
+ }
+ }
+
+ typedef std::map<uint32_t, stream_impl::wptr> stream_map_t;
+
+ zero_copy_if::sptr _base_xport;
+ stream_classifier_fn _classify;
+ stream_map_t _streams;
+ const size_t _max_num_streams;
+ size_t _num_dropped_frames;
+ boost::thread _recv_thread;
+ boost::mutex _mutex;
+};
+
+muxed_zero_copy_if::sptr muxed_zero_copy_if::make(
+ zero_copy_if::sptr base_xport,
+ muxed_zero_copy_if::stream_classifier_fn classify_fn,
+ size_t max_streams
+) {
+ return boost::make_shared<muxed_zero_copy_if_impl>(base_xport, classify_fn, max_streams);
+}