diff options
| -rw-r--r-- | host/.gitignore | 1 | ||||
| -rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 1 | ||||
| -rw-r--r-- | host/include/uhd/utils/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/include/uhd/utils/msg_task.hpp | 74 | ||||
| -rw-r--r-- | host/include/uhd/utils/tasks.hpp | 1 | ||||
| -rw-r--r-- | host/lib/usrp/b200/b200_impl.cpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/b200/b200_impl.hpp | 4 | ||||
| -rw-r--r-- | host/lib/usrp/b200/b200_io_impl.cpp | 27 | ||||
| -rw-r--r-- | host/lib/usrp/cores/radio_ctrl_core_3000.cpp | 121 | ||||
| -rw-r--r-- | host/lib/usrp/cores/radio_ctrl_core_3000.hpp | 5 | ||||
| -rw-r--r-- | host/lib/utils/tasks.cpp | 99 | 
11 files changed, 281 insertions, 57 deletions
| diff --git a/host/.gitignore b/host/.gitignore index dec27fcf7..9b0584c23 100644 --- a/host/.gitignore +++ b/host/.gitignore @@ -1,2 +1,3 @@  /build  tags +*~ diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 1dc0e8e26..fe2974d09 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -72,6 +72,7 @@ namespace uhd{ namespace transport{          }          boost::detail::atomic_count _ref_count; +        typedef boost::intrusive_ptr<managed_buffer> sptr;      protected:          void *_buffer; diff --git a/host/include/uhd/utils/CMakeLists.txt b/host/include/uhd/utils/CMakeLists.txt index cdef2e946..e86826435 100644 --- a/host/include/uhd/utils/CMakeLists.txt +++ b/host/include/uhd/utils/CMakeLists.txt @@ -27,6 +27,7 @@ UHD_INSTALL(FILES      images.hpp      log.hpp      msg.hpp +    msg_task.hpp      paths.hpp      pimpl.hpp      safe_call.hpp diff --git a/host/include/uhd/utils/msg_task.hpp b/host/include/uhd/utils/msg_task.hpp new file mode 100644 index 000000000..ebb29af08 --- /dev/null +++ b/host/include/uhd/utils/msg_task.hpp @@ -0,0 +1,74 @@ +// +// Copyright 2011-2013 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_UTILS_MSG_TASK_HPP +#define INCLUDED_UHD_UTILS_MSG_TASK_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/function.hpp> +#include <boost/utility.hpp> +#include <boost/optional/optional.hpp> +#include <vector> + +namespace uhd{ +	class UHD_API msg_task : boost::noncopyable{ +        public: +            typedef boost::shared_ptr<msg_task> sptr; +            typedef std::vector<uint8_t> msg_payload_t; +            typedef std::pair<uint32_t, msg_payload_t > msg_type_t; +            typedef boost::function<boost::optional<msg_type_t>(void)> task_fcn_type; + +            /* +             * During shutdown message queues for radio control cores might not be available anymore. +             * Such stranded messages get pushed into a dump queue. +             * With this function radio_ctrl_core can check if one of the messages meant for it got stranded. +             */ +            virtual msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid) = 0; + +            inline static std::vector<uint8_t> buff_to_vector(uint8_t* p, size_t n) { +                if(p and n > 0){ +                    std::vector<uint8_t> v(n); +                    memcpy(&v.front(), p, n); +                    return v; +                } +                return std::vector<uint8_t>(); +            } + +            /*! +             * Create a new task object with function callback. +             * The task function callback will be run in a loop. +             * until the thread is interrupted by the deconstructor. +             * +             * A function may return payload which is then pushed to +             * a synchronized message queue. +             * +             * A task should return in a reasonable amount of time +             * or may block forever under the following conditions: +             *  - The blocking call is interruptible. +             *  - The task polls the interrupt condition. +             * +             * \param task_fcn the task callback function +             * \return a new task object +             */ +            static sptr make(const task_fcn_type &task_fcn); +    }; +} //namespace uhd + +#endif /* INCLUDED_UHD_UTILS_MSG_TASK_HPP */ + diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp index dcb003e39..a1f682a83 100644 --- a/host/include/uhd/utils/tasks.hpp +++ b/host/include/uhd/utils/tasks.hpp @@ -46,7 +46,6 @@ namespace uhd{          static sptr make(const task_fcn_type &task_fcn);      }; -  } //namespace uhd  #endif /* INCLUDED_UHD_UTILS_TASKS_HPP */ diff --git a/host/lib/usrp/b200/b200_impl.cpp b/host/lib/usrp/b200/b200_impl.cpp index 0da388b93..66ab813c2 100644 --- a/host/lib/usrp/b200/b200_impl.cpp +++ b/host/lib/usrp/b200/b200_impl.cpp @@ -252,7 +252,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr)      ////////////////////////////////////////////////////////////////////      _async_task_data.reset(new AsyncTaskData());      _async_task_data->async_md.reset(new async_md_type(1000/*messages deep*/)); -    _async_task = uhd::task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data)); +    _async_task = uhd::msg_task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data));      ////////////////////////////////////////////////////////////////////      // Local control endpoint @@ -474,7 +474,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr)  b200_impl::~b200_impl(void)  { -    UHD_SAFE_CALL +	UHD_SAFE_CALL      (          _async_task.reset();      ) diff --git a/host/lib/usrp/b200/b200_impl.hpp b/host/lib/usrp/b200/b200_impl.hpp index eced4a539..59a047e01 100644 --- a/host/lib/usrp/b200/b200_impl.hpp +++ b/host/lib/usrp/b200/b200_impl.hpp @@ -120,7 +120,7 @@ struct b200_impl : public uhd::device      boost::weak_ptr<uhd::tx_streamer> _tx_streamer;      //async ctrl + msgs -    uhd::task::sptr _async_task; +    uhd::msg_task::sptr _async_task;      typedef uhd::transport::bounded_buffer<uhd::async_metadata_t> async_md_type;      struct AsyncTaskData      { @@ -130,7 +130,7 @@ struct b200_impl : public uhd::device          b200_uart::sptr gpsdo_uart;      };      boost::shared_ptr<AsyncTaskData> _async_task_data; -    void handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>); +    boost::optional<uhd::msg_task::msg_type_t> handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>);      void register_loopback_self_test(uhd::wb_iface::sptr iface);      void codec_loopback_self_test(uhd::wb_iface::sptr iface); diff --git a/host/lib/usrp/b200/b200_io_impl.cpp b/host/lib/usrp/b200/b200_io_impl.cpp index d643ef855..069b8ff58 100644 --- a/host/lib/usrp/b200/b200_io_impl.cpp +++ b/host/lib/usrp/b200/b200_io_impl.cpp @@ -139,13 +139,24 @@ bool b200_impl::recv_async_msg(      return _async_task_data->async_md->pop_with_timed_wait(async_metadata, timeout);  } -void b200_impl::handle_async_task( +/* + * This method is constantly called in a msg_task loop. + * Incoming messages are dispatched in to the hosts radio_ctrl_cores. + * The radio_ctrl_core queues are accessed via a weak_ptr to them, stored in AsyncTaskData. + * During shutdown the radio_ctrl_core dtor's are called. + * An empty peek32(0) is sent out to flush pending async messages. + * The response to those messages can't be delivered to the ctrl_core queues anymore + * because the shared pointer corresponding to the weak_ptrs is no longer valid. + * Those stranded messages are put into a dump_queue implemented in msg_task. + * A radio_ctrl_core can search for missing messages there. + */ +boost::optional<uhd::msg_task::msg_type_t> b200_impl::handle_async_task(      uhd::transport::zero_copy_if::sptr xport,      boost::shared_ptr<AsyncTaskData> data  )  { -    managed_recv_buffer::sptr buff = xport->get_recv_buff(); -    if (not buff or buff->size() < 8) return; +	managed_recv_buffer::sptr buff = xport->get_recv_buff(); +    if (not buff or buff->size() < 8) return NULL;      const boost::uint32_t sid = uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]);      switch (sid)      { @@ -155,11 +166,16 @@ void b200_impl::handle_async_task(      case B200_RESP1_MSG_SID:      case B200_LOCAL_RESP_SID:      { -        radio_ctrl_core_3000::sptr ctrl; +    	radio_ctrl_core_3000::sptr ctrl;          if (sid == B200_RESP0_MSG_SID) ctrl = data->radio_ctrl[0].lock();          if (sid == B200_RESP1_MSG_SID) ctrl = data->radio_ctrl[1].lock();          if (sid == B200_LOCAL_RESP_SID) ctrl = data->local_ctrl.lock(); -        if (ctrl) ctrl->push_response(buff->cast<const boost::uint32_t *>()); +        if (ctrl){ +        	ctrl->push_response(buff->cast<const boost::uint32_t *>()); +        } +        else{ +            return std::make_pair(sid, uhd::msg_task::buff_to_vector(buff->cast<boost::uint8_t *>(), buff->size() ) ); +        }          break;      } @@ -204,6 +220,7 @@ void b200_impl::handle_async_task(      default:          UHD_MSG(error) << "Got a ctrl packet with unknown SID " << sid << std::endl;      } +    return NULL;  }  /*********************************************************************** diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp index 5298fd213..13b346cc6 100644 --- a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp +++ b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp @@ -35,35 +35,27 @@ using namespace uhd::transport;  static const double ACK_TIMEOUT = 2.0; //supposed to be worst case practical timeout  static const double MASSIVE_TIMEOUT = 10.0; //for when we wait on a timed command -static const size_t SR_READBACK  = 32; +static const size_t SR_READBACK = 32; -class radio_ctrl_core_3000_impl : public radio_ctrl_core_3000 +class radio_ctrl_core_3000_impl: public radio_ctrl_core_3000  {  public: -    radio_ctrl_core_3000_impl( -        const bool big_endian, -        uhd::transport::zero_copy_if::sptr ctrl_xport, -        uhd::transport::zero_copy_if::sptr resp_xport, -        const boost::uint32_t sid, -        const std::string &name -    ): -        _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR), -        _packet_type(vrt::if_packet_info_t::PACKET_TYPE_CONTEXT), -        _bige(big_endian), -        _ctrl_xport(ctrl_xport), -        _resp_xport(resp_xport), -        _sid(sid), -        _name(name), -        _seq_out(0), -        _timeout(ACK_TIMEOUT), -        _resp_queue(128/*max response msgs*/), -        _resp_queue_size(_resp_xport? _resp_xport->get_num_recv_frames() : 3) +    radio_ctrl_core_3000_impl(const bool big_endian, +            uhd::transport::zero_copy_if::sptr ctrl_xport, +            uhd::transport::zero_copy_if::sptr resp_xport, +            const boost::uint32_t sid, const std::string &name) : +            _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR), _packet_type( +                    vrt::if_packet_info_t::PACKET_TYPE_CONTEXT), _bige( +                    big_endian), _ctrl_xport(ctrl_xport), _resp_xport( +                    resp_xport), _sid(sid), _name(name), _seq_out(0), _timeout( +                    ACK_TIMEOUT), _resp_queue(128/*max response msgs*/), _resp_queue_size( +                    _resp_xport ? _resp_xport->get_num_recv_frames() : 3)      { -        UHD_LOG << "radio_ctrl_core_3000_impl() " << _name << std::endl; +        UHD_LOG<< "radio_ctrl_core_3000_impl() " << _name << std::endl;          if (resp_xport)          { -            while (resp_xport->get_recv_buff(0.0)){} //flush +            while (resp_xport->get_recv_buff(0.0)) {} //flush          }          this->set_time(uhd::time_spec_t(0.0));          this->set_tick_rate(1.0); //something possible but bogus @@ -74,8 +66,8 @@ public:          UHD_LOG << "~radio_ctrl_core_3000_impl() " << _name << std::endl;          _timeout = ACK_TIMEOUT; //reset timeout to something small          UHD_SAFE_CALL( -            this->peek32(0); //dummy peek with the purpose of ack'ing all packets -            _async_task.reset(); //now its ok to release the task +                this->peek32(0);//dummy peek with the purpose of ack'ing all packets +                _async_task.reset();//now its ok to release the task          )      } @@ -95,7 +87,6 @@ public:      {          boost::mutex::scoped_lock lock(_mutex);          UHD_LOGV(always) << _name << std::hex << " addr 0x" << addr << std::dec << std::endl; -          this->send_pkt(SR_READBACK, addr/8);          this->wait_for_ack(false); @@ -136,6 +127,11 @@ public:      }  private: +    // This is the buffer type for messages in radio control core. +    struct resp_buff_type +    { +        boost::uint32_t data[8]; +    };      /*******************************************************************       * Primary control and interaction private methods @@ -143,7 +139,7 @@ private:      UHD_INLINE void send_pkt(const boost::uint32_t addr, const boost::uint32_t data = 0)      {          managed_send_buffer::sptr buff = _ctrl_xport->get_send_buff(0.0); -        if (not buff){ +        if (not buff) {              throw uhd::runtime_error("fifo ctrl timed out getting a send buffer");          }          boost::uint32_t *pkt = buff->cast<boost::uint32_t *>(); @@ -173,12 +169,11 @@ private:          pkt[packet_info.num_header_words32+0] = (_bige)? uhd::htonx(addr) : uhd::htowx(addr);          pkt[packet_info.num_header_words32+1] = (_bige)? uhd::htonx(data) : uhd::htowx(data);          //UHD_MSG(status) << boost::format("0x%08x, 0x%08x\n") % addr % data; -          //send the buffer over the interface          _outstanding_seqs.push(_seq_out);          buff->commit(sizeof(boost::uint32_t)*(packet_info.num_packet_words32)); -        _seq_out++; //inc seq for next call +        _seq_out++;//inc seq for next call      }      UHD_INLINE boost::uint64_t wait_for_ack(const bool readback) @@ -186,7 +181,6 @@ private:          while (readback or (_outstanding_seqs.size() >= _resp_queue_size))          {              UHD_LOGV(always) << _name << " wait_for_ack: " << "readback = " << readback << " outstanding_seqs.size() " << _outstanding_seqs.size() << std::endl; -              //get seq to ack from outstanding packets list              UHD_ASSERT_THROW(not _outstanding_seqs.empty());              const size_t seq_to_ack = _outstanding_seqs.front(); @@ -218,7 +212,27 @@ private:              //get buffer from response endpoint - or die in timeout              else              { -                UHD_ASSERT_THROW(_resp_queue.pop_with_timed_wait(resp_buff, _timeout)); +                /* +                 * Couldn't get message with haste. +                 * Now check both possible queues for messages. +                 * Messages should come in on _resp_queue, +                 * but could end up in dump_queue. +                 * If we don't get a message --> Die in timeout. +                 */ +                double accum_timeout = 0.0; +                const double short_timeout = 0.005; // == 5ms +                while(not (_resp_queue.pop_with_haste(resp_buff) +                        || check_dump_queue(resp_buff) +                        || _resp_queue.pop_with_timed_wait(resp_buff, short_timeout) +                        )){ +                    /* +                     * If a message couldn't be received within a given timeout +                     * --> throw AssertionError! +                     */ +                    accum_timeout += short_timeout; +                    UHD_ASSERT_THROW(accum_timeout < _timeout); +                } +                  pkt = resp_buff.data;                  packet_info.num_packet_words32 = sizeof(resp_buff)/sizeof(boost::uint32_t);              } @@ -262,9 +276,33 @@ private:                  return ((hi << 32) | lo);              }          } +          return 0;      } +    /* +     * If ctrl_core waits for a message that didn't arrive it can search for it in the dump queue. +     * This actually happens during shutdown. +     * handle_async_task can't access radio_ctrl_cores queue anymore thus it returns the corresponding message. +     * msg_task class implements a dump_queue to store such messages. +     * With check_dump_queue we can check if a message we are waiting for got stranded there. +     * If a message got stuck we get it here and push it onto our own message_queue. +     */ +    bool check_dump_queue(resp_buff_type b) { +        boost::uint32_t recv_sid = (((_sid)<<16)|((_sid)>>16)); +        uhd::msg_task::msg_payload_t msg; +        do{ +            msg = _async_task->get_msg_from_dump_queue(recv_sid); +        } +        while(msg.size() < 8 && msg.size() != 0); + +        if(msg.size() >= 8) { +            memcpy(b.data, &msg.front(), 8); +            return true; +        } +        return false; +    } +      void push_response(const boost::uint32_t *buff)      {          resp_buff_type resp_buff; @@ -272,7 +310,7 @@ private:          _resp_queue.push_with_haste(resp_buff);      } -    void hold_task(boost::shared_ptr<void> task) +    void hold_task(uhd::msg_task::sptr task)      {          _async_task = task;      } @@ -282,7 +320,7 @@ private:      const bool _bige;      const uhd::transport::zero_copy_if::sptr _ctrl_xport;      const uhd::transport::zero_copy_if::sptr _resp_xport; -    boost::shared_ptr<void> _async_task; +    uhd::msg_task::sptr _async_task;      const boost::uint32_t _sid;      const std::string _name;      boost::mutex _mutex; @@ -292,22 +330,15 @@ private:      double _tick_rate;      double _timeout;      std::queue<size_t> _outstanding_seqs; -    struct resp_buff_type -    { -        boost::uint32_t data[8]; -    };      bounded_buffer<resp_buff_type> _resp_queue;      const size_t _resp_queue_size;  }; - -radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make( -    const bool big_endian, -    zero_copy_if::sptr ctrl_xport, -    zero_copy_if::sptr resp_xport, -    const boost::uint32_t sid, -    const std::string &name -) +radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make(const bool big_endian, +        zero_copy_if::sptr ctrl_xport, zero_copy_if::sptr resp_xport, +        const boost::uint32_t sid, const std::string &name)  { -    return sptr(new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport, sid, name)); +    return sptr( +            new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport, +                    sid, name));  } diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp index a49ca2a4b..51a307c10 100644 --- a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp +++ b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp @@ -18,11 +18,12 @@  #ifndef INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP  #define INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP +#include <uhd/utils/msg_task.hpp>  #include <uhd/types/time_spec.hpp>  #include <uhd/transport/zero_copy.hpp> +#include <uhd/types/wb_iface.hpp>  #include <boost/shared_ptr.hpp>  #include <boost/utility.hpp> -#include <uhd/types/wb_iface.hpp>  #include <string>  /*! @@ -43,7 +44,7 @@ public:      );      //! Hold a ref to a task thats feeding push response -    virtual void hold_task(boost::shared_ptr<void> task) = 0; +    virtual void hold_task(uhd::msg_task::sptr task) = 0;      //! Push a response externall (resp_xport is NULL)      virtual void push_response(const boost::uint32_t *buff) = 0; diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp index 1f735de06..08c32a5fb 100644 --- a/host/lib/utils/tasks.cpp +++ b/host/lib/utils/tasks.cpp @@ -16,11 +16,13 @@  //  #include <uhd/utils/tasks.hpp> +#include <uhd/utils/msg_task.hpp>  #include <uhd/utils/msg.hpp>  #include <boost/thread/thread.hpp>  #include <boost/thread/barrier.hpp>  #include <exception>  #include <iostream> +#include <vector>  using namespace uhd; @@ -80,3 +82,100 @@ private:  task::sptr task::make(const task_fcn_type &task_fcn){      return task::sptr(new task_impl(task_fcn));  } + +/* + * During shutdown pointers to queues for radio_ctrl_core might not be available anymore. + * msg_task_impl provides a dump_queue for such messages. + * ctrl_cores can check this queue for stranded messages. + */ + +class msg_task_impl : public msg_task{ +public: + +    msg_task_impl(const task_fcn_type &task_fcn): +        _spawn_barrier(2) +    { +        _thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn)); +        _spawn_barrier.wait(); +    } + +    ~msg_task_impl(void){ +        _running = false; +        _thread_group.interrupt_all(); +        _thread_group.join_all(); +    } + +    /* +     * Returns the first message for the given SID. +     * This way a radio_ctrl_core doesn't have to die in timeout but can check for stranded messages here. +     * This might happen during shutdown when dtors are called. +     * See also: comments in b200_io_impl->handle_async_task +     */ +    msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid) +    { +        boost::mutex::scoped_lock lock(_mutex); +        msg_payload_t b; +        for (size_t i = 0; i < _dump_queue.size(); i++) { +            if (sid == _dump_queue[i].first) { +                b = _dump_queue[i].second; +                _dump_queue.erase(_dump_queue.begin() + i); +                break; +            } +        } +        return b; +    } + +private: + +    void task_loop(const task_fcn_type &task_fcn){ +        _running = true; +        _spawn_barrier.wait(); + +        try{ +            while (_running){ +            	boost::optional<msg_type_t> buff = task_fcn(); +            	if(buff != boost::none){ +            	    /* +            	     * If a message gets stranded it is returned by task_fcn and then pushed to the dump_queue. +            	     * This way ctrl_cores can check dump_queue for missing messages. +            	     */ +            	    boost::mutex::scoped_lock lock(_mutex); +            	    _dump_queue.push_back(buff.get() ); +            	} +            } +        } +        catch(const boost::thread_interrupted &){ +            //this is an ok way to exit the task loop +        } +        catch(const std::exception &e){ +            do_error_msg(e.what()); +        } +        catch(...){ +            //FIXME +            //Unfortunately, this is also an ok way to end a task, +            //because on some systems boost throws uncatchables. +        } +    } + +    void do_error_msg(const std::string &msg){ +        UHD_MSG(error) +            << "An unexpected exception was caught in a task loop." << std::endl +            << "The task loop will now exit, things may not work." << std::endl +            << msg << std::endl +        ; +    } + +    boost::mutex _mutex; +    boost::thread_group _thread_group; +    boost::barrier _spawn_barrier; +    bool _running; + +    /* +     * This queue holds stranded messages until a radio_ctrl_core grabs them via 'get_msg_from_dump_queue'. +     */ +    std::vector <msg_type_t> _dump_queue; +}; + +msg_task::sptr msg_task::make(const task_fcn_type &task_fcn){ +    return msg_task::sptr(new msg_task_impl(task_fcn)); +} | 
