summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBalint Seeber <balint@ettus.com>2013-11-19 13:30:21 -0800
committerBalint Seeber <balint@ettus.com>2013-11-19 13:30:21 -0800
commit48ab2a971e73e46a7165ca5ebfff9a946fd5e981 (patch)
treed3b07bd21a7327d35022ab5efe35d8298975cbe4
parent01cfe2118f0bd72719809d2539c61df46a164fec (diff)
parent8a78802c10fa64af5d1ec7cf39ce389f003cce11 (diff)
downloaduhd-48ab2a971e73e46a7165ca5ebfff9a946fd5e981.tar.gz
uhd-48ab2a971e73e46a7165ca5ebfff9a946fd5e981.tar.bz2
uhd-48ab2a971e73e46a7165ca5ebfff9a946fd5e981.zip
Merge remote-tracking branch 'origin/b200/dtor-master' into b200/kitchen_sink
-rw-r--r--host/.gitignore1
-rw-r--r--host/include/uhd/transport/zero_copy.hpp1
-rw-r--r--host/include/uhd/utils/CMakeLists.txt1
-rw-r--r--host/include/uhd/utils/msg_task.hpp74
-rw-r--r--host/include/uhd/utils/tasks.hpp1
-rw-r--r--host/lib/usrp/b200/b200_impl.cpp4
-rw-r--r--host/lib/usrp/b200/b200_impl.hpp4
-rw-r--r--host/lib/usrp/b200/b200_io_impl.cpp30
-rw-r--r--host/lib/usrp/cores/radio_ctrl_core_3000.cpp121
-rw-r--r--host/lib/usrp/cores/radio_ctrl_core_3000.hpp5
-rw-r--r--host/lib/utils/tasks.cpp99
11 files changed, 283 insertions, 58 deletions
diff --git a/host/.gitignore b/host/.gitignore
index dec27fcf7..9b0584c23 100644
--- a/host/.gitignore
+++ b/host/.gitignore
@@ -1,2 +1,3 @@
/build
tags
+*~
diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp
index 1dc0e8e26..fe2974d09 100644
--- a/host/include/uhd/transport/zero_copy.hpp
+++ b/host/include/uhd/transport/zero_copy.hpp
@@ -72,6 +72,7 @@ namespace uhd{ namespace transport{
}
boost::detail::atomic_count _ref_count;
+ typedef boost::intrusive_ptr<managed_buffer> sptr;
protected:
void *_buffer;
diff --git a/host/include/uhd/utils/CMakeLists.txt b/host/include/uhd/utils/CMakeLists.txt
index cdef2e946..e86826435 100644
--- a/host/include/uhd/utils/CMakeLists.txt
+++ b/host/include/uhd/utils/CMakeLists.txt
@@ -27,6 +27,7 @@ UHD_INSTALL(FILES
images.hpp
log.hpp
msg.hpp
+ msg_task.hpp
paths.hpp
pimpl.hpp
safe_call.hpp
diff --git a/host/include/uhd/utils/msg_task.hpp b/host/include/uhd/utils/msg_task.hpp
new file mode 100644
index 000000000..ebb29af08
--- /dev/null
+++ b/host/include/uhd/utils/msg_task.hpp
@@ -0,0 +1,74 @@
+//
+// Copyright 2011-2013 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#ifndef INCLUDED_UHD_UTILS_MSG_TASK_HPP
+#define INCLUDED_UHD_UTILS_MSG_TASK_HPP
+
+#include <uhd/config.hpp>
+#include <uhd/transport/zero_copy.hpp>
+#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
+#include <boost/utility.hpp>
+#include <boost/optional/optional.hpp>
+#include <vector>
+
+namespace uhd{
+ class UHD_API msg_task : boost::noncopyable{
+ public:
+ typedef boost::shared_ptr<msg_task> sptr;
+ typedef std::vector<uint8_t> msg_payload_t;
+ typedef std::pair<uint32_t, msg_payload_t > msg_type_t;
+ typedef boost::function<boost::optional<msg_type_t>(void)> task_fcn_type;
+
+ /*
+ * During shutdown message queues for radio control cores might not be available anymore.
+ * Such stranded messages get pushed into a dump queue.
+ * With this function radio_ctrl_core can check if one of the messages meant for it got stranded.
+ */
+ virtual msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid) = 0;
+
+ inline static std::vector<uint8_t> buff_to_vector(uint8_t* p, size_t n) {
+ if(p and n > 0){
+ std::vector<uint8_t> v(n);
+ memcpy(&v.front(), p, n);
+ return v;
+ }
+ return std::vector<uint8_t>();
+ }
+
+ /*!
+ * Create a new task object with function callback.
+ * The task function callback will be run in a loop.
+ * until the thread is interrupted by the deconstructor.
+ *
+ * A function may return payload which is then pushed to
+ * a synchronized message queue.
+ *
+ * A task should return in a reasonable amount of time
+ * or may block forever under the following conditions:
+ * - The blocking call is interruptible.
+ * - The task polls the interrupt condition.
+ *
+ * \param task_fcn the task callback function
+ * \return a new task object
+ */
+ static sptr make(const task_fcn_type &task_fcn);
+ };
+} //namespace uhd
+
+#endif /* INCLUDED_UHD_UTILS_MSG_TASK_HPP */
+
diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp
index dcb003e39..a1f682a83 100644
--- a/host/include/uhd/utils/tasks.hpp
+++ b/host/include/uhd/utils/tasks.hpp
@@ -46,7 +46,6 @@ namespace uhd{
static sptr make(const task_fcn_type &task_fcn);
};
-
} //namespace uhd
#endif /* INCLUDED_UHD_UTILS_TASKS_HPP */
diff --git a/host/lib/usrp/b200/b200_impl.cpp b/host/lib/usrp/b200/b200_impl.cpp
index 0da388b93..66ab813c2 100644
--- a/host/lib/usrp/b200/b200_impl.cpp
+++ b/host/lib/usrp/b200/b200_impl.cpp
@@ -252,7 +252,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr)
////////////////////////////////////////////////////////////////////
_async_task_data.reset(new AsyncTaskData());
_async_task_data->async_md.reset(new async_md_type(1000/*messages deep*/));
- _async_task = uhd::task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data));
+ _async_task = uhd::msg_task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data));
////////////////////////////////////////////////////////////////////
// Local control endpoint
@@ -474,7 +474,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr)
b200_impl::~b200_impl(void)
{
- UHD_SAFE_CALL
+ UHD_SAFE_CALL
(
_async_task.reset();
)
diff --git a/host/lib/usrp/b200/b200_impl.hpp b/host/lib/usrp/b200/b200_impl.hpp
index eced4a539..59a047e01 100644
--- a/host/lib/usrp/b200/b200_impl.hpp
+++ b/host/lib/usrp/b200/b200_impl.hpp
@@ -120,7 +120,7 @@ struct b200_impl : public uhd::device
boost::weak_ptr<uhd::tx_streamer> _tx_streamer;
//async ctrl + msgs
- uhd::task::sptr _async_task;
+ uhd::msg_task::sptr _async_task;
typedef uhd::transport::bounded_buffer<uhd::async_metadata_t> async_md_type;
struct AsyncTaskData
{
@@ -130,7 +130,7 @@ struct b200_impl : public uhd::device
b200_uart::sptr gpsdo_uart;
};
boost::shared_ptr<AsyncTaskData> _async_task_data;
- void handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>);
+ boost::optional<uhd::msg_task::msg_type_t> handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>);
void register_loopback_self_test(uhd::wb_iface::sptr iface);
void codec_loopback_self_test(uhd::wb_iface::sptr iface);
diff --git a/host/lib/usrp/b200/b200_io_impl.cpp b/host/lib/usrp/b200/b200_io_impl.cpp
index 1feeff1a3..7b09c87df 100644
--- a/host/lib/usrp/b200/b200_io_impl.cpp
+++ b/host/lib/usrp/b200/b200_io_impl.cpp
@@ -139,27 +139,44 @@ bool b200_impl::recv_async_msg(
return _async_task_data->async_md->pop_with_timed_wait(async_metadata, timeout);
}
-void b200_impl::handle_async_task(
+/*
+ * This method is constantly called in a msg_task loop.
+ * Incoming messages are dispatched in to the hosts radio_ctrl_cores.
+ * The radio_ctrl_core queues are accessed via a weak_ptr to them, stored in AsyncTaskData.
+ * During shutdown the radio_ctrl_core dtor's are called.
+ * An empty peek32(0) is sent out to flush pending async messages.
+ * The response to those messages can't be delivered to the ctrl_core queues anymore
+ * because the shared pointer corresponding to the weak_ptrs is no longer valid.
+ * Those stranded messages are put into a dump_queue implemented in msg_task.
+ * A radio_ctrl_core can search for missing messages there.
+ */
+boost::optional<uhd::msg_task::msg_type_t> b200_impl::handle_async_task(
uhd::transport::zero_copy_if::sptr xport,
boost::shared_ptr<AsyncTaskData> data
)
{
managed_recv_buffer::sptr buff = xport->get_recv_buff();
- if (not buff or buff->size() < 8) return;
+ if (not buff or buff->size() < 8)
+ return NULL;
+
const boost::uint32_t sid = uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]);
- switch (sid)
- {
+ switch (sid) {
//if the packet is a control response
case B200_RESP0_MSG_SID:
case B200_RESP1_MSG_SID:
case B200_LOCAL_RESP_SID:
{
- radio_ctrl_core_3000::sptr ctrl;
+ radio_ctrl_core_3000::sptr ctrl;
if (sid == B200_RESP0_MSG_SID) ctrl = data->radio_ctrl[0].lock();
if (sid == B200_RESP1_MSG_SID) ctrl = data->radio_ctrl[1].lock();
if (sid == B200_LOCAL_RESP_SID) ctrl = data->local_ctrl.lock();
- if (ctrl) ctrl->push_response(buff->cast<const boost::uint32_t *>());
+ if (ctrl){
+ ctrl->push_response(buff->cast<const boost::uint32_t *>());
+ }
+ else{
+ return std::make_pair(sid, uhd::msg_task::buff_to_vector(buff->cast<boost::uint8_t *>(), buff->size() ) );
+ }
break;
}
@@ -204,6 +221,7 @@ void b200_impl::handle_async_task(
default:
UHD_MSG(error) << "Got a ctrl packet with unknown SID " << sid << std::endl;
}
+ return NULL;
}
/***********************************************************************
diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp
index 5298fd213..0d6e1c665 100644
--- a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp
+++ b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp
@@ -35,35 +35,27 @@ using namespace uhd::transport;
static const double ACK_TIMEOUT = 2.0; //supposed to be worst case practical timeout
static const double MASSIVE_TIMEOUT = 10.0; //for when we wait on a timed command
-static const size_t SR_READBACK = 32;
+static const size_t SR_READBACK = 32;
-class radio_ctrl_core_3000_impl : public radio_ctrl_core_3000
+class radio_ctrl_core_3000_impl: public radio_ctrl_core_3000
{
public:
- radio_ctrl_core_3000_impl(
- const bool big_endian,
- uhd::transport::zero_copy_if::sptr ctrl_xport,
- uhd::transport::zero_copy_if::sptr resp_xport,
- const boost::uint32_t sid,
- const std::string &name
- ):
- _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR),
- _packet_type(vrt::if_packet_info_t::PACKET_TYPE_CONTEXT),
- _bige(big_endian),
- _ctrl_xport(ctrl_xport),
- _resp_xport(resp_xport),
- _sid(sid),
- _name(name),
- _seq_out(0),
- _timeout(ACK_TIMEOUT),
- _resp_queue(128/*max response msgs*/),
- _resp_queue_size(_resp_xport? _resp_xport->get_num_recv_frames() : 3)
+ radio_ctrl_core_3000_impl(const bool big_endian,
+ uhd::transport::zero_copy_if::sptr ctrl_xport,
+ uhd::transport::zero_copy_if::sptr resp_xport,
+ const boost::uint32_t sid, const std::string &name) :
+ _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR), _packet_type(
+ vrt::if_packet_info_t::PACKET_TYPE_CONTEXT), _bige(
+ big_endian), _ctrl_xport(ctrl_xport), _resp_xport(
+ resp_xport), _sid(sid), _name(name), _seq_out(0), _timeout(
+ ACK_TIMEOUT), _resp_queue(128/*max response msgs*/), _resp_queue_size(
+ _resp_xport ? _resp_xport->get_num_recv_frames() : 3)
{
- UHD_LOG << "radio_ctrl_core_3000_impl() " << _name << std::endl;
+ UHD_LOG<< "radio_ctrl_core_3000_impl() " << _name << std::endl;
if (resp_xport)
{
- while (resp_xport->get_recv_buff(0.0)){} //flush
+ while (resp_xport->get_recv_buff(0.0)) {} //flush
}
this->set_time(uhd::time_spec_t(0.0));
this->set_tick_rate(1.0); //something possible but bogus
@@ -74,8 +66,8 @@ public:
UHD_LOG << "~radio_ctrl_core_3000_impl() " << _name << std::endl;
_timeout = ACK_TIMEOUT; //reset timeout to something small
UHD_SAFE_CALL(
- this->peek32(0); //dummy peek with the purpose of ack'ing all packets
- _async_task.reset(); //now its ok to release the task
+ this->peek32(0);//dummy peek with the purpose of ack'ing all packets
+ _async_task.reset();//now its ok to release the task
)
}
@@ -95,7 +87,6 @@ public:
{
boost::mutex::scoped_lock lock(_mutex);
UHD_LOGV(always) << _name << std::hex << " addr 0x" << addr << std::dec << std::endl;
-
this->send_pkt(SR_READBACK, addr/8);
this->wait_for_ack(false);
@@ -136,6 +127,11 @@ public:
}
private:
+ // This is the buffer type for messages in radio control core.
+ struct resp_buff_type
+ {
+ boost::uint32_t data[8];
+ };
/*******************************************************************
* Primary control and interaction private methods
@@ -143,7 +139,7 @@ private:
UHD_INLINE void send_pkt(const boost::uint32_t addr, const boost::uint32_t data = 0)
{
managed_send_buffer::sptr buff = _ctrl_xport->get_send_buff(0.0);
- if (not buff){
+ if (not buff) {
throw uhd::runtime_error("fifo ctrl timed out getting a send buffer");
}
boost::uint32_t *pkt = buff->cast<boost::uint32_t *>();
@@ -173,12 +169,11 @@ private:
pkt[packet_info.num_header_words32+0] = (_bige)? uhd::htonx(addr) : uhd::htowx(addr);
pkt[packet_info.num_header_words32+1] = (_bige)? uhd::htonx(data) : uhd::htowx(data);
//UHD_MSG(status) << boost::format("0x%08x, 0x%08x\n") % addr % data;
-
//send the buffer over the interface
_outstanding_seqs.push(_seq_out);
buff->commit(sizeof(boost::uint32_t)*(packet_info.num_packet_words32));
- _seq_out++; //inc seq for next call
+ _seq_out++;//inc seq for next call
}
UHD_INLINE boost::uint64_t wait_for_ack(const bool readback)
@@ -186,7 +181,6 @@ private:
while (readback or (_outstanding_seqs.size() >= _resp_queue_size))
{
UHD_LOGV(always) << _name << " wait_for_ack: " << "readback = " << readback << " outstanding_seqs.size() " << _outstanding_seqs.size() << std::endl;
-
//get seq to ack from outstanding packets list
UHD_ASSERT_THROW(not _outstanding_seqs.empty());
const size_t seq_to_ack = _outstanding_seqs.front();
@@ -218,7 +212,27 @@ private:
//get buffer from response endpoint - or die in timeout
else
{
- UHD_ASSERT_THROW(_resp_queue.pop_with_timed_wait(resp_buff, _timeout));
+ /*
+ * Couldn't get message with haste.
+ * Now check both possible queues for messages.
+ * Messages should come in on _resp_queue,
+ * but could end up in dump_queue.
+ * If we don't get a message --> Die in timeout.
+ */
+ double accum_timeout = 0.0;
+ const double short_timeout = 0.005; // == 5ms
+ while(not (_resp_queue.pop_with_haste(resp_buff)
+ || check_dump_queue(resp_buff)
+ || _resp_queue.pop_with_timed_wait(resp_buff, short_timeout)
+ )){
+ /*
+ * If a message couldn't be received within a given timeout
+ * --> throw AssertionError!
+ */
+ accum_timeout += short_timeout;
+ UHD_ASSERT_THROW(accum_timeout < _timeout);
+ }
+
pkt = resp_buff.data;
packet_info.num_packet_words32 = sizeof(resp_buff)/sizeof(boost::uint32_t);
}
@@ -262,9 +276,33 @@ private:
return ((hi << 32) | lo);
}
}
+
return 0;
}
+ /*
+ * If ctrl_core waits for a message that didn't arrive it can search for it in the dump queue.
+ * This actually happens during shutdown.
+ * handle_async_task can't access radio_ctrl_cores queue anymore thus it returns the corresponding message.
+ * msg_task class implements a dump_queue to store such messages.
+ * With check_dump_queue we can check if a message we are waiting for got stranded there.
+ * If a message got stuck we get it here and push it onto our own message_queue.
+ */
+ bool check_dump_queue(resp_buff_type b) {
+ boost::uint32_t recv_sid = (((_sid)<<16)|((_sid)>>16));
+ uhd::msg_task::msg_payload_t msg;
+ do{
+ msg = _async_task->get_msg_from_dump_queue(recv_sid);
+ }
+ while(msg.size() < 8 && msg.size() != 0);
+
+ if(msg.size() >= 8) {
+ memcpy(b.data, &msg.front(), 8);
+ return true;
+ }
+ return false;
+ }
+
void push_response(const boost::uint32_t *buff)
{
resp_buff_type resp_buff;
@@ -272,7 +310,7 @@ private:
_resp_queue.push_with_haste(resp_buff);
}
- void hold_task(boost::shared_ptr<void> task)
+ void hold_task(uhd::msg_task::sptr task)
{
_async_task = task;
}
@@ -282,7 +320,7 @@ private:
const bool _bige;
const uhd::transport::zero_copy_if::sptr _ctrl_xport;
const uhd::transport::zero_copy_if::sptr _resp_xport;
- boost::shared_ptr<void> _async_task;
+ uhd::msg_task::sptr _async_task;
const boost::uint32_t _sid;
const std::string _name;
boost::mutex _mutex;
@@ -292,22 +330,15 @@ private:
double _tick_rate;
double _timeout;
std::queue<size_t> _outstanding_seqs;
- struct resp_buff_type
- {
- boost::uint32_t data[8];
- };
bounded_buffer<resp_buff_type> _resp_queue;
const size_t _resp_queue_size;
};
-
-radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make(
- const bool big_endian,
- zero_copy_if::sptr ctrl_xport,
- zero_copy_if::sptr resp_xport,
- const boost::uint32_t sid,
- const std::string &name
-)
+radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make(const bool big_endian,
+ zero_copy_if::sptr ctrl_xport, zero_copy_if::sptr resp_xport,
+ const boost::uint32_t sid, const std::string &name)
{
- return sptr(new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport, sid, name));
+ return sptr(
+ new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport,
+ sid, name));
}
diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp
index a49ca2a4b..51a307c10 100644
--- a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp
+++ b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp
@@ -18,11 +18,12 @@
#ifndef INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP
#define INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP
+#include <uhd/utils/msg_task.hpp>
#include <uhd/types/time_spec.hpp>
#include <uhd/transport/zero_copy.hpp>
+#include <uhd/types/wb_iface.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/utility.hpp>
-#include <uhd/types/wb_iface.hpp>
#include <string>
/*!
@@ -43,7 +44,7 @@ public:
);
//! Hold a ref to a task thats feeding push response
- virtual void hold_task(boost::shared_ptr<void> task) = 0;
+ virtual void hold_task(uhd::msg_task::sptr task) = 0;
//! Push a response externall (resp_xport is NULL)
virtual void push_response(const boost::uint32_t *buff) = 0;
diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp
index 1f735de06..08c32a5fb 100644
--- a/host/lib/utils/tasks.cpp
+++ b/host/lib/utils/tasks.cpp
@@ -16,11 +16,13 @@
//
#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/msg_task.hpp>
#include <uhd/utils/msg.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/barrier.hpp>
#include <exception>
#include <iostream>
+#include <vector>
using namespace uhd;
@@ -80,3 +82,100 @@ private:
task::sptr task::make(const task_fcn_type &task_fcn){
return task::sptr(new task_impl(task_fcn));
}
+
+/*
+ * During shutdown pointers to queues for radio_ctrl_core might not be available anymore.
+ * msg_task_impl provides a dump_queue for such messages.
+ * ctrl_cores can check this queue for stranded messages.
+ */
+
+class msg_task_impl : public msg_task{
+public:
+
+ msg_task_impl(const task_fcn_type &task_fcn):
+ _spawn_barrier(2)
+ {
+ _thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn));
+ _spawn_barrier.wait();
+ }
+
+ ~msg_task_impl(void){
+ _running = false;
+ _thread_group.interrupt_all();
+ _thread_group.join_all();
+ }
+
+ /*
+ * Returns the first message for the given SID.
+ * This way a radio_ctrl_core doesn't have to die in timeout but can check for stranded messages here.
+ * This might happen during shutdown when dtors are called.
+ * See also: comments in b200_io_impl->handle_async_task
+ */
+ msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid)
+ {
+ boost::mutex::scoped_lock lock(_mutex);
+ msg_payload_t b;
+ for (size_t i = 0; i < _dump_queue.size(); i++) {
+ if (sid == _dump_queue[i].first) {
+ b = _dump_queue[i].second;
+ _dump_queue.erase(_dump_queue.begin() + i);
+ break;
+ }
+ }
+ return b;
+ }
+
+private:
+
+ void task_loop(const task_fcn_type &task_fcn){
+ _running = true;
+ _spawn_barrier.wait();
+
+ try{
+ while (_running){
+ boost::optional<msg_type_t> buff = task_fcn();
+ if(buff != boost::none){
+ /*
+ * If a message gets stranded it is returned by task_fcn and then pushed to the dump_queue.
+ * This way ctrl_cores can check dump_queue for missing messages.
+ */
+ boost::mutex::scoped_lock lock(_mutex);
+ _dump_queue.push_back(buff.get() );
+ }
+ }
+ }
+ catch(const boost::thread_interrupted &){
+ //this is an ok way to exit the task loop
+ }
+ catch(const std::exception &e){
+ do_error_msg(e.what());
+ }
+ catch(...){
+ //FIXME
+ //Unfortunately, this is also an ok way to end a task,
+ //because on some systems boost throws uncatchables.
+ }
+ }
+
+ void do_error_msg(const std::string &msg){
+ UHD_MSG(error)
+ << "An unexpected exception was caught in a task loop." << std::endl
+ << "The task loop will now exit, things may not work." << std::endl
+ << msg << std::endl
+ ;
+ }
+
+ boost::mutex _mutex;
+ boost::thread_group _thread_group;
+ boost::barrier _spawn_barrier;
+ bool _running;
+
+ /*
+ * This queue holds stranded messages until a radio_ctrl_core grabs them via 'get_msg_from_dump_queue'.
+ */
+ std::vector <msg_type_t> _dump_queue;
+};
+
+msg_task::sptr msg_task::make(const task_fcn_type &task_fcn){
+ return msg_task::sptr(new msg_task_impl(task_fcn));
+}