diff options
Diffstat (limited to 'host')
-rw-r--r-- | host/.gitignore | 1 | ||||
-rw-r--r-- | host/include/uhd/transport/zero_copy.hpp | 1 | ||||
-rw-r--r-- | host/include/uhd/utils/CMakeLists.txt | 1 | ||||
-rw-r--r-- | host/include/uhd/utils/msg_task.hpp | 74 | ||||
-rw-r--r-- | host/include/uhd/utils/tasks.hpp | 1 | ||||
-rw-r--r-- | host/lib/usrp/b200/b200_impl.cpp | 4 | ||||
-rw-r--r-- | host/lib/usrp/b200/b200_impl.hpp | 4 | ||||
-rw-r--r-- | host/lib/usrp/b200/b200_io_impl.cpp | 27 | ||||
-rw-r--r-- | host/lib/usrp/cores/radio_ctrl_core_3000.cpp | 121 | ||||
-rw-r--r-- | host/lib/usrp/cores/radio_ctrl_core_3000.hpp | 5 | ||||
-rw-r--r-- | host/lib/utils/tasks.cpp | 99 |
11 files changed, 281 insertions, 57 deletions
diff --git a/host/.gitignore b/host/.gitignore index dec27fcf7..9b0584c23 100644 --- a/host/.gitignore +++ b/host/.gitignore @@ -1,2 +1,3 @@ /build tags +*~ diff --git a/host/include/uhd/transport/zero_copy.hpp b/host/include/uhd/transport/zero_copy.hpp index 1dc0e8e26..fe2974d09 100644 --- a/host/include/uhd/transport/zero_copy.hpp +++ b/host/include/uhd/transport/zero_copy.hpp @@ -72,6 +72,7 @@ namespace uhd{ namespace transport{ } boost::detail::atomic_count _ref_count; + typedef boost::intrusive_ptr<managed_buffer> sptr; protected: void *_buffer; diff --git a/host/include/uhd/utils/CMakeLists.txt b/host/include/uhd/utils/CMakeLists.txt index cdef2e946..e86826435 100644 --- a/host/include/uhd/utils/CMakeLists.txt +++ b/host/include/uhd/utils/CMakeLists.txt @@ -27,6 +27,7 @@ UHD_INSTALL(FILES images.hpp log.hpp msg.hpp + msg_task.hpp paths.hpp pimpl.hpp safe_call.hpp diff --git a/host/include/uhd/utils/msg_task.hpp b/host/include/uhd/utils/msg_task.hpp new file mode 100644 index 000000000..ebb29af08 --- /dev/null +++ b/host/include/uhd/utils/msg_task.hpp @@ -0,0 +1,74 @@ +// +// Copyright 2011-2013 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_UTILS_MSG_TASK_HPP +#define INCLUDED_UHD_UTILS_MSG_TASK_HPP + +#include <uhd/config.hpp> +#include <uhd/transport/zero_copy.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/function.hpp> +#include <boost/utility.hpp> +#include <boost/optional/optional.hpp> +#include <vector> + +namespace uhd{ + class UHD_API msg_task : boost::noncopyable{ + public: + typedef boost::shared_ptr<msg_task> sptr; + typedef std::vector<uint8_t> msg_payload_t; + typedef std::pair<uint32_t, msg_payload_t > msg_type_t; + typedef boost::function<boost::optional<msg_type_t>(void)> task_fcn_type; + + /* + * During shutdown message queues for radio control cores might not be available anymore. + * Such stranded messages get pushed into a dump queue. + * With this function radio_ctrl_core can check if one of the messages meant for it got stranded. + */ + virtual msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid) = 0; + + inline static std::vector<uint8_t> buff_to_vector(uint8_t* p, size_t n) { + if(p and n > 0){ + std::vector<uint8_t> v(n); + memcpy(&v.front(), p, n); + return v; + } + return std::vector<uint8_t>(); + } + + /*! + * Create a new task object with function callback. + * The task function callback will be run in a loop. + * until the thread is interrupted by the deconstructor. + * + * A function may return payload which is then pushed to + * a synchronized message queue. + * + * A task should return in a reasonable amount of time + * or may block forever under the following conditions: + * - The blocking call is interruptible. + * - The task polls the interrupt condition. + * + * \param task_fcn the task callback function + * \return a new task object + */ + static sptr make(const task_fcn_type &task_fcn); + }; +} //namespace uhd + +#endif /* INCLUDED_UHD_UTILS_MSG_TASK_HPP */ + diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp index dcb003e39..a1f682a83 100644 --- a/host/include/uhd/utils/tasks.hpp +++ b/host/include/uhd/utils/tasks.hpp @@ -46,7 +46,6 @@ namespace uhd{ static sptr make(const task_fcn_type &task_fcn); }; - } //namespace uhd #endif /* INCLUDED_UHD_UTILS_TASKS_HPP */ diff --git a/host/lib/usrp/b200/b200_impl.cpp b/host/lib/usrp/b200/b200_impl.cpp index 0da388b93..66ab813c2 100644 --- a/host/lib/usrp/b200/b200_impl.cpp +++ b/host/lib/usrp/b200/b200_impl.cpp @@ -252,7 +252,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr) //////////////////////////////////////////////////////////////////// _async_task_data.reset(new AsyncTaskData()); _async_task_data->async_md.reset(new async_md_type(1000/*messages deep*/)); - _async_task = uhd::task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data)); + _async_task = uhd::msg_task::make(boost::bind(&b200_impl::handle_async_task, this, _ctrl_transport, _async_task_data)); //////////////////////////////////////////////////////////////////// // Local control endpoint @@ -474,7 +474,7 @@ b200_impl::b200_impl(const device_addr_t &device_addr) b200_impl::~b200_impl(void) { - UHD_SAFE_CALL + UHD_SAFE_CALL ( _async_task.reset(); ) diff --git a/host/lib/usrp/b200/b200_impl.hpp b/host/lib/usrp/b200/b200_impl.hpp index eced4a539..59a047e01 100644 --- a/host/lib/usrp/b200/b200_impl.hpp +++ b/host/lib/usrp/b200/b200_impl.hpp @@ -120,7 +120,7 @@ struct b200_impl : public uhd::device boost::weak_ptr<uhd::tx_streamer> _tx_streamer; //async ctrl + msgs - uhd::task::sptr _async_task; + uhd::msg_task::sptr _async_task; typedef uhd::transport::bounded_buffer<uhd::async_metadata_t> async_md_type; struct AsyncTaskData { @@ -130,7 +130,7 @@ struct b200_impl : public uhd::device b200_uart::sptr gpsdo_uart; }; boost::shared_ptr<AsyncTaskData> _async_task_data; - void handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>); + boost::optional<uhd::msg_task::msg_type_t> handle_async_task(uhd::transport::zero_copy_if::sptr, boost::shared_ptr<AsyncTaskData>); void register_loopback_self_test(uhd::wb_iface::sptr iface); void codec_loopback_self_test(uhd::wb_iface::sptr iface); diff --git a/host/lib/usrp/b200/b200_io_impl.cpp b/host/lib/usrp/b200/b200_io_impl.cpp index d643ef855..069b8ff58 100644 --- a/host/lib/usrp/b200/b200_io_impl.cpp +++ b/host/lib/usrp/b200/b200_io_impl.cpp @@ -139,13 +139,24 @@ bool b200_impl::recv_async_msg( return _async_task_data->async_md->pop_with_timed_wait(async_metadata, timeout); } -void b200_impl::handle_async_task( +/* + * This method is constantly called in a msg_task loop. + * Incoming messages are dispatched in to the hosts radio_ctrl_cores. + * The radio_ctrl_core queues are accessed via a weak_ptr to them, stored in AsyncTaskData. + * During shutdown the radio_ctrl_core dtor's are called. + * An empty peek32(0) is sent out to flush pending async messages. + * The response to those messages can't be delivered to the ctrl_core queues anymore + * because the shared pointer corresponding to the weak_ptrs is no longer valid. + * Those stranded messages are put into a dump_queue implemented in msg_task. + * A radio_ctrl_core can search for missing messages there. + */ +boost::optional<uhd::msg_task::msg_type_t> b200_impl::handle_async_task( uhd::transport::zero_copy_if::sptr xport, boost::shared_ptr<AsyncTaskData> data ) { - managed_recv_buffer::sptr buff = xport->get_recv_buff(); - if (not buff or buff->size() < 8) return; + managed_recv_buffer::sptr buff = xport->get_recv_buff(); + if (not buff or buff->size() < 8) return NULL; const boost::uint32_t sid = uhd::wtohx(buff->cast<const boost::uint32_t *>()[1]); switch (sid) { @@ -155,11 +166,16 @@ void b200_impl::handle_async_task( case B200_RESP1_MSG_SID: case B200_LOCAL_RESP_SID: { - radio_ctrl_core_3000::sptr ctrl; + radio_ctrl_core_3000::sptr ctrl; if (sid == B200_RESP0_MSG_SID) ctrl = data->radio_ctrl[0].lock(); if (sid == B200_RESP1_MSG_SID) ctrl = data->radio_ctrl[1].lock(); if (sid == B200_LOCAL_RESP_SID) ctrl = data->local_ctrl.lock(); - if (ctrl) ctrl->push_response(buff->cast<const boost::uint32_t *>()); + if (ctrl){ + ctrl->push_response(buff->cast<const boost::uint32_t *>()); + } + else{ + return std::make_pair(sid, uhd::msg_task::buff_to_vector(buff->cast<boost::uint8_t *>(), buff->size() ) ); + } break; } @@ -204,6 +220,7 @@ void b200_impl::handle_async_task( default: UHD_MSG(error) << "Got a ctrl packet with unknown SID " << sid << std::endl; } + return NULL; } /*********************************************************************** diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp index 5298fd213..13b346cc6 100644 --- a/host/lib/usrp/cores/radio_ctrl_core_3000.cpp +++ b/host/lib/usrp/cores/radio_ctrl_core_3000.cpp @@ -35,35 +35,27 @@ using namespace uhd::transport; static const double ACK_TIMEOUT = 2.0; //supposed to be worst case practical timeout static const double MASSIVE_TIMEOUT = 10.0; //for when we wait on a timed command -static const size_t SR_READBACK = 32; +static const size_t SR_READBACK = 32; -class radio_ctrl_core_3000_impl : public radio_ctrl_core_3000 +class radio_ctrl_core_3000_impl: public radio_ctrl_core_3000 { public: - radio_ctrl_core_3000_impl( - const bool big_endian, - uhd::transport::zero_copy_if::sptr ctrl_xport, - uhd::transport::zero_copy_if::sptr resp_xport, - const boost::uint32_t sid, - const std::string &name - ): - _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR), - _packet_type(vrt::if_packet_info_t::PACKET_TYPE_CONTEXT), - _bige(big_endian), - _ctrl_xport(ctrl_xport), - _resp_xport(resp_xport), - _sid(sid), - _name(name), - _seq_out(0), - _timeout(ACK_TIMEOUT), - _resp_queue(128/*max response msgs*/), - _resp_queue_size(_resp_xport? _resp_xport->get_num_recv_frames() : 3) + radio_ctrl_core_3000_impl(const bool big_endian, + uhd::transport::zero_copy_if::sptr ctrl_xport, + uhd::transport::zero_copy_if::sptr resp_xport, + const boost::uint32_t sid, const std::string &name) : + _link_type(vrt::if_packet_info_t::LINK_TYPE_CHDR), _packet_type( + vrt::if_packet_info_t::PACKET_TYPE_CONTEXT), _bige( + big_endian), _ctrl_xport(ctrl_xport), _resp_xport( + resp_xport), _sid(sid), _name(name), _seq_out(0), _timeout( + ACK_TIMEOUT), _resp_queue(128/*max response msgs*/), _resp_queue_size( + _resp_xport ? _resp_xport->get_num_recv_frames() : 3) { - UHD_LOG << "radio_ctrl_core_3000_impl() " << _name << std::endl; + UHD_LOG<< "radio_ctrl_core_3000_impl() " << _name << std::endl; if (resp_xport) { - while (resp_xport->get_recv_buff(0.0)){} //flush + while (resp_xport->get_recv_buff(0.0)) {} //flush } this->set_time(uhd::time_spec_t(0.0)); this->set_tick_rate(1.0); //something possible but bogus @@ -74,8 +66,8 @@ public: UHD_LOG << "~radio_ctrl_core_3000_impl() " << _name << std::endl; _timeout = ACK_TIMEOUT; //reset timeout to something small UHD_SAFE_CALL( - this->peek32(0); //dummy peek with the purpose of ack'ing all packets - _async_task.reset(); //now its ok to release the task + this->peek32(0);//dummy peek with the purpose of ack'ing all packets + _async_task.reset();//now its ok to release the task ) } @@ -95,7 +87,6 @@ public: { boost::mutex::scoped_lock lock(_mutex); UHD_LOGV(always) << _name << std::hex << " addr 0x" << addr << std::dec << std::endl; - this->send_pkt(SR_READBACK, addr/8); this->wait_for_ack(false); @@ -136,6 +127,11 @@ public: } private: + // This is the buffer type for messages in radio control core. + struct resp_buff_type + { + boost::uint32_t data[8]; + }; /******************************************************************* * Primary control and interaction private methods @@ -143,7 +139,7 @@ private: UHD_INLINE void send_pkt(const boost::uint32_t addr, const boost::uint32_t data = 0) { managed_send_buffer::sptr buff = _ctrl_xport->get_send_buff(0.0); - if (not buff){ + if (not buff) { throw uhd::runtime_error("fifo ctrl timed out getting a send buffer"); } boost::uint32_t *pkt = buff->cast<boost::uint32_t *>(); @@ -173,12 +169,11 @@ private: pkt[packet_info.num_header_words32+0] = (_bige)? uhd::htonx(addr) : uhd::htowx(addr); pkt[packet_info.num_header_words32+1] = (_bige)? uhd::htonx(data) : uhd::htowx(data); //UHD_MSG(status) << boost::format("0x%08x, 0x%08x\n") % addr % data; - //send the buffer over the interface _outstanding_seqs.push(_seq_out); buff->commit(sizeof(boost::uint32_t)*(packet_info.num_packet_words32)); - _seq_out++; //inc seq for next call + _seq_out++;//inc seq for next call } UHD_INLINE boost::uint64_t wait_for_ack(const bool readback) @@ -186,7 +181,6 @@ private: while (readback or (_outstanding_seqs.size() >= _resp_queue_size)) { UHD_LOGV(always) << _name << " wait_for_ack: " << "readback = " << readback << " outstanding_seqs.size() " << _outstanding_seqs.size() << std::endl; - //get seq to ack from outstanding packets list UHD_ASSERT_THROW(not _outstanding_seqs.empty()); const size_t seq_to_ack = _outstanding_seqs.front(); @@ -218,7 +212,27 @@ private: //get buffer from response endpoint - or die in timeout else { - UHD_ASSERT_THROW(_resp_queue.pop_with_timed_wait(resp_buff, _timeout)); + /* + * Couldn't get message with haste. + * Now check both possible queues for messages. + * Messages should come in on _resp_queue, + * but could end up in dump_queue. + * If we don't get a message --> Die in timeout. + */ + double accum_timeout = 0.0; + const double short_timeout = 0.005; // == 5ms + while(not (_resp_queue.pop_with_haste(resp_buff) + || check_dump_queue(resp_buff) + || _resp_queue.pop_with_timed_wait(resp_buff, short_timeout) + )){ + /* + * If a message couldn't be received within a given timeout + * --> throw AssertionError! + */ + accum_timeout += short_timeout; + UHD_ASSERT_THROW(accum_timeout < _timeout); + } + pkt = resp_buff.data; packet_info.num_packet_words32 = sizeof(resp_buff)/sizeof(boost::uint32_t); } @@ -262,9 +276,33 @@ private: return ((hi << 32) | lo); } } + return 0; } + /* + * If ctrl_core waits for a message that didn't arrive it can search for it in the dump queue. + * This actually happens during shutdown. + * handle_async_task can't access radio_ctrl_cores queue anymore thus it returns the corresponding message. + * msg_task class implements a dump_queue to store such messages. + * With check_dump_queue we can check if a message we are waiting for got stranded there. + * If a message got stuck we get it here and push it onto our own message_queue. + */ + bool check_dump_queue(resp_buff_type b) { + boost::uint32_t recv_sid = (((_sid)<<16)|((_sid)>>16)); + uhd::msg_task::msg_payload_t msg; + do{ + msg = _async_task->get_msg_from_dump_queue(recv_sid); + } + while(msg.size() < 8 && msg.size() != 0); + + if(msg.size() >= 8) { + memcpy(b.data, &msg.front(), 8); + return true; + } + return false; + } + void push_response(const boost::uint32_t *buff) { resp_buff_type resp_buff; @@ -272,7 +310,7 @@ private: _resp_queue.push_with_haste(resp_buff); } - void hold_task(boost::shared_ptr<void> task) + void hold_task(uhd::msg_task::sptr task) { _async_task = task; } @@ -282,7 +320,7 @@ private: const bool _bige; const uhd::transport::zero_copy_if::sptr _ctrl_xport; const uhd::transport::zero_copy_if::sptr _resp_xport; - boost::shared_ptr<void> _async_task; + uhd::msg_task::sptr _async_task; const boost::uint32_t _sid; const std::string _name; boost::mutex _mutex; @@ -292,22 +330,15 @@ private: double _tick_rate; double _timeout; std::queue<size_t> _outstanding_seqs; - struct resp_buff_type - { - boost::uint32_t data[8]; - }; bounded_buffer<resp_buff_type> _resp_queue; const size_t _resp_queue_size; }; - -radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make( - const bool big_endian, - zero_copy_if::sptr ctrl_xport, - zero_copy_if::sptr resp_xport, - const boost::uint32_t sid, - const std::string &name -) +radio_ctrl_core_3000::sptr radio_ctrl_core_3000::make(const bool big_endian, + zero_copy_if::sptr ctrl_xport, zero_copy_if::sptr resp_xport, + const boost::uint32_t sid, const std::string &name) { - return sptr(new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport, sid, name)); + return sptr( + new radio_ctrl_core_3000_impl(big_endian, ctrl_xport, resp_xport, + sid, name)); } diff --git a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp index a49ca2a4b..51a307c10 100644 --- a/host/lib/usrp/cores/radio_ctrl_core_3000.hpp +++ b/host/lib/usrp/cores/radio_ctrl_core_3000.hpp @@ -18,11 +18,12 @@ #ifndef INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP #define INCLUDED_LIBUHD_USRP_RADIO_CTRL_3000_HPP +#include <uhd/utils/msg_task.hpp> #include <uhd/types/time_spec.hpp> #include <uhd/transport/zero_copy.hpp> +#include <uhd/types/wb_iface.hpp> #include <boost/shared_ptr.hpp> #include <boost/utility.hpp> -#include <uhd/types/wb_iface.hpp> #include <string> /*! @@ -43,7 +44,7 @@ public: ); //! Hold a ref to a task thats feeding push response - virtual void hold_task(boost::shared_ptr<void> task) = 0; + virtual void hold_task(uhd::msg_task::sptr task) = 0; //! Push a response externall (resp_xport is NULL) virtual void push_response(const boost::uint32_t *buff) = 0; diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp index 1f735de06..08c32a5fb 100644 --- a/host/lib/utils/tasks.cpp +++ b/host/lib/utils/tasks.cpp @@ -16,11 +16,13 @@ // #include <uhd/utils/tasks.hpp> +#include <uhd/utils/msg_task.hpp> #include <uhd/utils/msg.hpp> #include <boost/thread/thread.hpp> #include <boost/thread/barrier.hpp> #include <exception> #include <iostream> +#include <vector> using namespace uhd; @@ -80,3 +82,100 @@ private: task::sptr task::make(const task_fcn_type &task_fcn){ return task::sptr(new task_impl(task_fcn)); } + +/* + * During shutdown pointers to queues for radio_ctrl_core might not be available anymore. + * msg_task_impl provides a dump_queue for such messages. + * ctrl_cores can check this queue for stranded messages. + */ + +class msg_task_impl : public msg_task{ +public: + + msg_task_impl(const task_fcn_type &task_fcn): + _spawn_barrier(2) + { + _thread_group.create_thread(boost::bind(&msg_task_impl::task_loop, this, task_fcn)); + _spawn_barrier.wait(); + } + + ~msg_task_impl(void){ + _running = false; + _thread_group.interrupt_all(); + _thread_group.join_all(); + } + + /* + * Returns the first message for the given SID. + * This way a radio_ctrl_core doesn't have to die in timeout but can check for stranded messages here. + * This might happen during shutdown when dtors are called. + * See also: comments in b200_io_impl->handle_async_task + */ + msg_payload_t get_msg_from_dump_queue(boost::uint32_t sid) + { + boost::mutex::scoped_lock lock(_mutex); + msg_payload_t b; + for (size_t i = 0; i < _dump_queue.size(); i++) { + if (sid == _dump_queue[i].first) { + b = _dump_queue[i].second; + _dump_queue.erase(_dump_queue.begin() + i); + break; + } + } + return b; + } + +private: + + void task_loop(const task_fcn_type &task_fcn){ + _running = true; + _spawn_barrier.wait(); + + try{ + while (_running){ + boost::optional<msg_type_t> buff = task_fcn(); + if(buff != boost::none){ + /* + * If a message gets stranded it is returned by task_fcn and then pushed to the dump_queue. + * This way ctrl_cores can check dump_queue for missing messages. + */ + boost::mutex::scoped_lock lock(_mutex); + _dump_queue.push_back(buff.get() ); + } + } + } + catch(const boost::thread_interrupted &){ + //this is an ok way to exit the task loop + } + catch(const std::exception &e){ + do_error_msg(e.what()); + } + catch(...){ + //FIXME + //Unfortunately, this is also an ok way to end a task, + //because on some systems boost throws uncatchables. + } + } + + void do_error_msg(const std::string &msg){ + UHD_MSG(error) + << "An unexpected exception was caught in a task loop." << std::endl + << "The task loop will now exit, things may not work." << std::endl + << msg << std::endl + ; + } + + boost::mutex _mutex; + boost::thread_group _thread_group; + boost::barrier _spawn_barrier; + bool _running; + + /* + * This queue holds stranded messages until a radio_ctrl_core grabs them via 'get_msg_from_dump_queue'. + */ + std::vector <msg_type_t> _dump_queue; +}; + +msg_task::sptr msg_task::make(const task_fcn_type &task_fcn){ + return msg_task::sptr(new msg_task_impl(task_fcn)); +} |