From 25660c5c9e83e352e10d9fe9feb115d40a322353 Mon Sep 17 00:00:00 2001
From: Johannes Demel <johannes.demel@ettus.com>
Date: Tue, 5 Nov 2013 13:56:00 -0800
Subject: b200/dtor-stall: fixed bug that stalled b200 on shutdown.

---
 host/.gitignore                              |   1 +
 host/include/uhd/transport/zero_copy.hpp     |   1 +
 host/include/uhd/utils/CMakeLists.txt        |   1 +
 host/include/uhd/utils/msg_task.hpp          |  74 ++++++++++++++++
 host/include/uhd/utils/tasks.hpp             |   1 -
 host/lib/usrp/b200/b200_impl.cpp             |   4 +-
 host/lib/usrp/b200/b200_impl.hpp             |   4 +-
 host/lib/usrp/b200/b200_io_impl.cpp          |  27 ++++--
 host/lib/usrp/cores/radio_ctrl_core_3000.cpp | 121 +++++++++++++++++----------
 host/lib/usrp/cores/radio_ctrl_core_3000.hpp |   5 +-
 host/lib/utils/tasks.cpp                     |  99 ++++++++++++++++++++++
 11 files changed, 281 insertions(+), 57 deletions(-)
 create mode 100644 host/include/uhd/utils/msg_task.hpp

(limited to 'host')

diff --git a/host/.gitignore b/host/.gitignore
index dec27fcf7..9b0584c23 100644
--- a/host/.gitignore
+++ b/host/.gitignore
@@ -1,2 +1,3 @@
 /build
 tags
+*~
diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp
index 1dc0e8e26..fe2974d09 100644
--- a/host/include/uhd/transport/zero_copy.hpp
+++ b/host/include/uhd/transport/zero_copy.hpp
@@ -72,6 +72,7 @@ namespace uhd{ namespace transport{
         }
 
         boost::detail::atomic_count _ref_count;
+        typedef boost::intrusive_ptr<managed_buffer> sptr;
 
     protected:
         void *_buffer;
diff --git a/host/include/uhd/utils/CMakeLists.txt b/host/include/uhd/utils/CMakeLists.txt
index cdef2e946..e86826435 100644
--- a/host/include/uhd/utils/CMakeLists.txt
+++ b/host/include/uhd/utils/CMakeLists.txt
@@ -27,6 +27,7 @@ UHD_INSTALL(FILES
     images.hpp
     log.hpp
     msg.hpp
+    msg_task.hpp
     paths.hpp
     pimpl.hpp
     safe_call.hpp
diff --git a/host/include/uhd/utils/msg_task.hpp b/host/include/uhd/utils/msg_task.hpp
new file mode 100644
index 000000000..ebb29af08
--- /dev/null
+++ b/host/include/uhd/utils/msg_task.hpp
@@ -0,0 +1,74 @@
+//
+// Copyright 2011-2013 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program.  If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_UHD_UTILS_MSG_TASK_HPP
+#define INCLUDED_UHD_UTILS_MSG_TASK_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+#include <boost/utility.hpp>
+#include <boost/optional/optional.hpp>
+#include <vector>
+
+namespace uhd{
+	class UHD_API msg_task : boost::noncopyable{
+        public:
+            typedef boost::shared_ptr<msg_task> sptr;
+            typedef std::vector<uint8_t> msg_payload_t;
+            typedef std::pair<uint32_t, msg_payload_t > msg_type_t;
+            typedef boost::function<boost::optional<msg_type_t>(void)> task_fcn_type;
+
+            /*
+             * During shutdown message queues for radio control cores might not be available anymore.
+             * Such stranded messages get pushed into a dump queue.
+             * With this function radio_ctrl_core can check if one of the messages meant for it got stranded.
+             */
+            virtual msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid) = 0;
+
+            inline static std::vector<uint8_t> buff_to_vector(uint8_t* p, size_t n) {
+                if(p and n > 0){
+                    std::vector<uint8_t> v(n);
+                    memcpy(&v.front(), p, n);
+                    return v;
+                }
+                return std::vector<uint8_t>();
+            }
+
+            /*!
+             * Create a new task object with function callback.
+             * The task function callback will be run in a loop.
+             * until the thread is interrupted by the deconstructor.
+             *
+             * A function may return payload which is then pushed to
+             * a synchronized message queue.
+             *
+             * A task should return in a reasonable amount of time
+             * or may block forever under the following conditions:
+             *  - The blocking call is interruptible.
+             *  - The task polls the interrupt condition.
+             *
+             * \param task_fcn the task callback function
+             * \return a new task object
+             */
+            static sptr make(const task_fcn_type &task_fcn);
+    };
+} //namespace uhd
+
+#endif /* INCLUDED_UHD_UTILS_MSG_TASK_HPP */
+
diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp
index dcb003e39..a1f682a83 100644
--- a/host/include/uhd/utils/tasks.hpp
+++ b/host/include/uhd/utils/tasks.hpp
@@ -46,7 +46,6 @@ namespace uhd{
         static sptr make(const task_fcn_type &task_fcn);
 
     };
-
 } //namespace uhd
 
 #endif /* INCLUDED_UHD_UTILS_TASKS_HPP */
diff --git a/host/lib/usrp/b200/b200_impl.cpp b/host/lib/usrp/b200/b200_impl.cpp
index 0da388b93..66ab813c2 100644
--- a/host/lib/usrp/b200/b200_impl.cpp
+++ b/host/lib/usrp/b200/b200_impl.cpp
@@ -252,7 +252,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr)
     ////////////////////////////////////////////////////////////////////
     _async_task_data.reset(new AsyncTaskData());
     _async_task_data->async_md.reset(new async_md_type(1000/*messages deep*/));
-    _async_task = uhd::task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data));
+    _async_task = uhd::msg_task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data));
 
     ////////////////////////////////////////////////////////////////////
     // Local control endpoint
@@ -474,7 +474,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr)
 
 b200_impl::~b200_impl(void)
 {
-    UHD_SAFE_CALL
+	UHD_SAFE_CALL
     (
         _async_task.reset();
     )
diff --git a/host/lib/usrp/b200/b200_impl.hpp b/host/lib/usrp/b200/b200_impl.hpp
index eced4a539..59a047e01 100644
--- a/host/lib/usrp/b200/b200_impl.hpp
+++ b/host/lib/usrp/b200/b200_impl.hpp
@@ -120,7 +120,7 @@ struct b200_impl : public uhd::device
     boost::weak_ptr<uhd::tx_streamer> _tx_streamer;
 
     //async ctrl + msgs
-    uhd::task::sptr _async_task;
+    uhd::msg_task::sptr _async_task;
     typedef uhd::transport::bounded_buffer<uhd::async_metadata_t> async_md_type;
     struct AsyncTaskData
     {
@@ -130,7 +130,7 @@ struct b200_impl : public uhd::device
         b200_uart::sptr gpsdo_uart;
     };
     boost::shared_ptr<AsyncTaskData> _async_task_data;
-    void handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>);
+    boost::optional<uhd::msg_task::msg_type_t> handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>);
 
     void register_loopback_self_test(uhd::wb_iface::sptr iface);
     void codec_loopback_self_test(uhd::wb_iface::sptr iface);
diff --git a/host/lib/usrp/b200/b200_io_impl.cpp b/host/lib/usrp/b200/b200_io_impl.cpp
index d643ef855..069b8ff58 100644
--- a/host/lib/usrp/b200/b200_io_impl.cpp
+++ b/host/lib/usrp/b200/b200_io_impl.cpp
@@ -139,13 +139,24 @@ bool b200_impl::recv_async_msg(
     return _async_task_data->async_md->pop_with_timed_wait(async_metadata, timeout);
 }
 
-void b200_impl::handle_async_task(
+/*
+ * This method is constantly called in a msg_task loop.
+ * Incoming messages are dispatched in to the hosts radio_ctrl_cores.
+ * The radio_ctrl_core queues are accessed via a weak_ptr to them, stored in AsyncTaskData.
+ * During shutdown the radio_ctrl_core dtor's are called.
+ * An empty peek32(0) is sent out to flush pending async messages.
+ * The response to those messages can't be delivered to the ctrl_core queues anymore
+ * because the shared pointer corresponding to the weak_ptrs is no longer valid.
+ * Those stranded messages are put into a dump_queue implemented in msg_task.
+ * A radio_ctrl_core can search for missing messages there.
+ */
+boost::optional<uhd::msg_task::msg_type_t> b200_impl::handle_async_task(
     uhd::transport::zero_copy_if::sptr xport,
     boost::shared_ptr<AsyncTaskData> data
 )
 {
-    managed_recv_buffer::sptr buff = xport->get_recv_buff();
-    if (not buff or buff->size() < 8) return;
+	managed_recv_buffer::sptr buff = xport->get_recv_buff();
+    if (not buff or buff->size() < 8) return NULL;
     const boost::uint32_t sid = uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]);
     switch (sid)
     {
@@ -155,11 +166,16 @@ void b200_impl::handle_async_task(
     case B200_RESP1_MSG_SID:
     case B200_LOCAL_RESP_SID:
     {
-        radio_ctrl_core_3000::sptr ctrl;
+    	radio_ctrl_core_3000::sptr ctrl;
         if (sid == B200_RESP0_MSG_SID) ctrl = data->radio_ctrl[0].lock();
         if (sid == B200_RESP1_MSG_SID) ctrl = data->radio_ctrl[1].lock();
         if (sid == B200_LOCAL_RESP_SID) ctrl = data->local_ctrl.lock();
-        if (ctrl) ctrl->push_response(buff->cast<const boost::uint32_t *>());
+        if (ctrl){
+        	ctrl->push_response(buff->cast<const boost::uint32_t *>());
+        }
+        else{
+            return std::make_pair(sid, uhd::msg_task::buff_to_vector(buff->cast<boost::uint8_t *>(), buff->size() ) );
+        }
         break;
     }
 
@@ -204,6 +220,7 @@ void b200_impl::handle_async_task(
     default:
         UHD_MSG(error) << "Got a ctrl packet with unknown SID " << sid << std::endl;
     }
+    return NULL;
 }
 
 /***********************************************************************
diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp
index 5298fd213..13b346cc6 100644
--- a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp
+++ b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp
@@ -35,35 +35,27 @@ using namespace uhd::transport;
 
 static const double ACK_TIMEOUT = 2.0; //supposed to be worst case practical timeout
 static const double MASSIVE_TIMEOUT = 10.0; //for when we wait on a timed command
-static const size_t SR_READBACK  = 32;
+static const size_t SR_READBACK = 32;
 
-class radio_ctrl_core_3000_impl : public radio_ctrl_core_3000
+class radio_ctrl_core_3000_impl: public radio_ctrl_core_3000
 {
 public:
 
-    radio_ctrl_core_3000_impl(
-        const bool big_endian,
-        uhd::transport::zero_copy_if::sptr ctrl_xport,
-        uhd::transport::zero_copy_if::sptr resp_xport,
-        const boost::uint32_t sid,
-        const std::string &name
-    ):
-        _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR),
-        _packet_type(vrt::if_packet_info_t::PACKET_TYPE_CONTEXT),
-        _bige(big_endian),
-        _ctrl_xport(ctrl_xport),
-        _resp_xport(resp_xport),
-        _sid(sid),
-        _name(name),
-        _seq_out(0),
-        _timeout(ACK_TIMEOUT),
-        _resp_queue(128/*max response msgs*/),
-        _resp_queue_size(_resp_xport? _resp_xport->get_num_recv_frames() : 3)
+    radio_ctrl_core_3000_impl(const bool big_endian,
+            uhd::transport::zero_copy_if::sptr ctrl_xport,
+            uhd::transport::zero_copy_if::sptr resp_xport,
+            const boost::uint32_t sid, const std::string &name) :
+            _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR), _packet_type(
+                    vrt::if_packet_info_t::PACKET_TYPE_CONTEXT), _bige(
+                    big_endian), _ctrl_xport(ctrl_xport), _resp_xport(
+                    resp_xport), _sid(sid), _name(name), _seq_out(0), _timeout(
+                    ACK_TIMEOUT), _resp_queue(128/*max response msgs*/), _resp_queue_size(
+                    _resp_xport ? _resp_xport->get_num_recv_frames() : 3)
     {
-        UHD_LOG << "radio_ctrl_core_3000_impl() " << _name << std::endl;
+        UHD_LOG<< "radio_ctrl_core_3000_impl() " << _name << std::endl;
         if (resp_xport)
         {
-            while (resp_xport->get_recv_buff(0.0)){} //flush
+            while (resp_xport->get_recv_buff(0.0)) {} //flush
         }
         this->set_time(uhd::time_spec_t(0.0));
         this->set_tick_rate(1.0); //something possible but bogus
@@ -74,8 +66,8 @@ public:
         UHD_LOG << "~radio_ctrl_core_3000_impl() " << _name << std::endl;
         _timeout = ACK_TIMEOUT; //reset timeout to something small
         UHD_SAFE_CALL(
-            this->peek32(0); //dummy peek with the purpose of ack'ing all packets
-            _async_task.reset(); //now its ok to release the task
+                this->peek32(0);//dummy peek with the purpose of ack'ing all packets
+                _async_task.reset();//now its ok to release the task
         )
     }
 
@@ -95,7 +87,6 @@ public:
     {
         boost::mutex::scoped_lock lock(_mutex);
         UHD_LOGV(always) << _name << std::hex << " addr 0x" << addr << std::dec << std::endl;
-
         this->send_pkt(SR_READBACK, addr/8);
         this->wait_for_ack(false);
 
@@ -136,6 +127,11 @@ public:
     }
 
 private:
+    // This is the buffer type for messages in radio control core.
+    struct resp_buff_type
+    {
+        boost::uint32_t data[8];
+    };
 
     /*******************************************************************
      * Primary control and interaction private methods
@@ -143,7 +139,7 @@ private:
     UHD_INLINE void send_pkt(const boost::uint32_t addr, const boost::uint32_t data = 0)
     {
         managed_send_buffer::sptr buff = _ctrl_xport->get_send_buff(0.0);
-        if (not buff){
+        if (not buff) {
             throw uhd::runtime_error("fifo ctrl timed out getting a send buffer");
         }
         boost::uint32_t *pkt = buff->cast<boost::uint32_t *>();
@@ -173,12 +169,11 @@ private:
         pkt[packet_info.num_header_words32+0] = (_bige)? uhd::htonx(addr) : uhd::htowx(addr);
         pkt[packet_info.num_header_words32+1] = (_bige)? uhd::htonx(data) : uhd::htowx(data);
         //UHD_MSG(status) << boost::format("0x%08x, 0x%08x\n") % addr % data;
-
         //send the buffer over the interface
         _outstanding_seqs.push(_seq_out);
         buff->commit(sizeof(boost::uint32_t)*(packet_info.num_packet_words32));
 
-        _seq_out++; //inc seq for next call
+        _seq_out++;//inc seq for next call
     }
 
     UHD_INLINE boost::uint64_t wait_for_ack(const bool readback)
@@ -186,7 +181,6 @@ private:
         while (readback or (_outstanding_seqs.size() >= _resp_queue_size))
         {
             UHD_LOGV(always) << _name << " wait_for_ack: " << "readback = " << readback << " outstanding_seqs.size() " << _outstanding_seqs.size() << std::endl;
-
             //get seq to ack from outstanding packets list
             UHD_ASSERT_THROW(not _outstanding_seqs.empty());
             const size_t seq_to_ack = _outstanding_seqs.front();
@@ -218,7 +212,27 @@ private:
             //get buffer from response endpoint - or die in timeout
             else
             {
-                UHD_ASSERT_THROW(_resp_queue.pop_with_timed_wait(resp_buff, _timeout));
+                /*
+                 * Couldn't get message with haste.
+                 * Now check both possible queues for messages.
+                 * Messages should come in on _resp_queue,
+                 * but could end up in dump_queue.
+                 * If we don't get a message --> Die in timeout.
+                 */
+                double accum_timeout = 0.0;
+                const double short_timeout = 0.005; // == 5ms
+                while(not (_resp_queue.pop_with_haste(resp_buff)
+                        || check_dump_queue(resp_buff)
+                        || _resp_queue.pop_with_timed_wait(resp_buff, short_timeout)
+                        )){
+                    /*
+                     * If a message couldn't be received within a given timeout
+                     * --> throw AssertionError!
+                     */
+                    accum_timeout += short_timeout;
+                    UHD_ASSERT_THROW(accum_timeout < _timeout);
+                }
+
                 pkt = resp_buff.data;
                 packet_info.num_packet_words32 = sizeof(resp_buff)/sizeof(boost::uint32_t);
             }
@@ -262,9 +276,33 @@ private:
                 return ((hi << 32) | lo);
             }
         }
+
         return 0;
     }
 
+    /*
+     * If ctrl_core waits for a message that didn't arrive it can search for it in the dump queue.
+     * This actually happens during shutdown.
+     * handle_async_task can't access radio_ctrl_cores queue anymore thus it returns the corresponding message.
+     * msg_task class implements a dump_queue to store such messages.
+     * With check_dump_queue we can check if a message we are waiting for got stranded there.
+     * If a message got stuck we get it here and push it onto our own message_queue.
+     */
+    bool check_dump_queue(resp_buff_type b) {
+        boost::uint32_t recv_sid = (((_sid)<<16)|((_sid)>>16));
+        uhd::msg_task::msg_payload_t msg;
+        do{
+            msg = _async_task->get_msg_from_dump_queue(recv_sid);
+        }
+        while(msg.size() < 8 && msg.size() != 0);
+
+        if(msg.size() >= 8) {
+            memcpy(b.data, &msg.front(), 8);
+            return true;
+        }
+        return false;
+    }
+
     void push_response(const boost::uint32_t *buff)
     {
         resp_buff_type resp_buff;
@@ -272,7 +310,7 @@ private:
         _resp_queue.push_with_haste(resp_buff);
     }
 
-    void hold_task(boost::shared_ptr<void> task)
+    void hold_task(uhd::msg_task::sptr task)
     {
         _async_task = task;
     }
@@ -282,7 +320,7 @@ private:
     const bool _bige;
     const uhd::transport::zero_copy_if::sptr _ctrl_xport;
     const uhd::transport::zero_copy_if::sptr _resp_xport;
-    boost::shared_ptr<void> _async_task;
+    uhd::msg_task::sptr _async_task;
     const boost::uint32_t _sid;
     const std::string _name;
     boost::mutex _mutex;
@@ -292,22 +330,15 @@ private:
     double _tick_rate;
     double _timeout;
     std::queue<size_t> _outstanding_seqs;
-    struct resp_buff_type
-    {
-        boost::uint32_t data[8];
-    };
     bounded_buffer<resp_buff_type> _resp_queue;
     const size_t _resp_queue_size;
 };
 
-
-radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make(
-    const bool big_endian,
-    zero_copy_if::sptr ctrl_xport,
-    zero_copy_if::sptr resp_xport,
-    const boost::uint32_t sid,
-    const std::string &name
-)
+radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make(const bool big_endian,
+        zero_copy_if::sptr ctrl_xport, zero_copy_if::sptr resp_xport,
+        const boost::uint32_t sid, const std::string &name)
 {
-    return sptr(new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport, sid, name));
+    return sptr(
+            new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport,
+                    sid, name));
 }
diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp
index a49ca2a4b..51a307c10 100644
--- a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp
+++ b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp
@@ -18,11 +18,12 @@
 #ifndef INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP
 #define INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP
 
+#include <uhd/utils/msg_task.hpp>
 #include <uhd/types/time_spec.hpp>
 #include <uhd/transport/zero_copy.hpp>
+#include <uhd/types/wb_iface.hpp>
 #include <boost/shared_ptr.hpp>
 #include <boost/utility.hpp>
-#include <uhd/types/wb_iface.hpp>
 #include <string>
 
 /*!
@@ -43,7 +44,7 @@ public:
     );
 
     //! Hold a ref to a task thats feeding push response
-    virtual void hold_task(boost::shared_ptr<void> task) = 0;
+    virtual void hold_task(uhd::msg_task::sptr task) = 0;
 
     //! Push a response externall (resp_xport is NULL)
     virtual void push_response(const boost::uint32_t *buff) = 0;
diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp
index 1f735de06..08c32a5fb 100644
--- a/host/lib/utils/tasks.cpp
+++ b/host/lib/utils/tasks.cpp
@@ -16,11 +16,13 @@
 //
 
 #include <uhd/utils/tasks.hpp>
+#include <uhd/utils/msg_task.hpp>
 #include <uhd/utils/msg.hpp>
 #include <boost/thread/thread.hpp>
 #include <boost/thread/barrier.hpp>
 #include <exception>
 #include <iostream>
+#include <vector>
 
 using namespace uhd;
 
@@ -80,3 +82,100 @@ private:
 task::sptr task::make(const task_fcn_type &task_fcn){
     return task::sptr(new task_impl(task_fcn));
 }
+
+/*
+ * During shutdown pointers to queues for radio_ctrl_core might not be available anymore.
+ * msg_task_impl provides a dump_queue for such messages.
+ * ctrl_cores can check this queue for stranded messages.
+ */
+
+class msg_task_impl : public msg_task{
+public:
+
+    msg_task_impl(const task_fcn_type &task_fcn):
+        _spawn_barrier(2)
+    {
+        _thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn));
+        _spawn_barrier.wait();
+    }
+
+    ~msg_task_impl(void){
+        _running = false;
+        _thread_group.interrupt_all();
+        _thread_group.join_all();
+    }
+
+    /*
+     * Returns the first message for the given SID.
+     * This way a radio_ctrl_core doesn't have to die in timeout but can check for stranded messages here.
+     * This might happen during shutdown when dtors are called.
+     * See also: comments in b200_io_impl->handle_async_task
+     */
+    msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid)
+    {
+        boost::mutex::scoped_lock lock(_mutex);
+        msg_payload_t b;
+        for (size_t i = 0; i < _dump_queue.size(); i++) {
+            if (sid == _dump_queue[i].first) {
+                b = _dump_queue[i].second;
+                _dump_queue.erase(_dump_queue.begin() + i);
+                break;
+            }
+        }
+        return b;
+    }
+
+private:
+
+    void task_loop(const task_fcn_type &task_fcn){
+        _running = true;
+        _spawn_barrier.wait();
+
+        try{
+            while (_running){
+            	boost::optional<msg_type_t> buff = task_fcn();
+            	if(buff != boost::none){
+            	    /*
+            	     * If a message gets stranded it is returned by task_fcn and then pushed to the dump_queue.
+            	     * This way ctrl_cores can check dump_queue for missing messages.
+            	     */
+            	    boost::mutex::scoped_lock lock(_mutex);
+            	    _dump_queue.push_back(buff.get() );
+            	}
+            }
+        }
+        catch(const boost::thread_interrupted &){
+            //this is an ok way to exit the task loop
+        }
+        catch(const std::exception &e){
+            do_error_msg(e.what());
+        }
+        catch(...){
+            //FIXME
+            //Unfortunately, this is also an ok way to end a task,
+            //because on some systems boost throws uncatchables.
+        }
+    }
+
+    void do_error_msg(const std::string &msg){
+        UHD_MSG(error)
+            << "An unexpected exception was caught in a task loop." << std::endl
+            << "The task loop will now exit, things may not work." << std::endl
+            << msg << std::endl
+        ;
+    }
+
+    boost::mutex _mutex;
+    boost::thread_group _thread_group;
+    boost::barrier _spawn_barrier;
+    bool _running;
+
+    /*
+     * This queue holds stranded messages until a radio_ctrl_core grabs them via 'get_msg_from_dump_queue'.
+     */
+    std::vector <msg_type_t> _dump_queue;
+};
+
+msg_task::sptr msg_task::make(const task_fcn_type &task_fcn){
+    return msg_task::sptr(new msg_task_impl(task_fcn));
+}
-- 
cgit v1.2.3