From 7e1b2a0e3c014bc23aaa7a045efb5f1109818051 Mon Sep 17 00:00:00 2001 From: Josh Blum Date: Wed, 13 Jul 2011 17:25:40 -0700 Subject: uhd: added tasks to simplify thread spawning use cases --- host/include/uhd/utils/CMakeLists.txt | 1 + host/include/uhd/utils/tasks.hpp | 53 ++++++++++++++++++++++ host/lib/transport/libusb1_zero_copy.cpp | 34 ++++++--------- host/lib/usrp/b100/b100_ctrl.cpp | 20 +++------ host/lib/usrp/e100/io_impl.cpp | 20 +++------ host/lib/usrp/usrp1/io_impl.cpp | 15 +++---- host/lib/usrp/usrp1/soft_time_ctrl.cpp | 29 +++--------- host/lib/usrp/usrp1/usrp1_impl.hpp | 3 +- host/lib/usrp/usrp2/io_impl.cpp | 29 ++++-------- host/lib/usrp/usrp2/usrp2_iface.cpp | 44 ++++++------------- host/lib/utils/CMakeLists.txt | 1 + host/lib/utils/tasks.cpp | 75 ++++++++++++++++++++++++++++++++ 12 files changed, 190 insertions(+), 134 deletions(-) create mode 100644 host/include/uhd/utils/tasks.hpp create mode 100644 host/lib/utils/tasks.cpp (limited to 'host') diff --git a/host/include/uhd/utils/CMakeLists.txt b/host/include/uhd/utils/CMakeLists.txt index 88a0e612b..0bf98fb67 100644 --- a/host/include/uhd/utils/CMakeLists.txt +++ b/host/include/uhd/utils/CMakeLists.txt @@ -30,6 +30,7 @@ INSTALL(FILES safe_call.hpp safe_main.hpp static.hpp + tasks.hpp thread_priority.hpp DESTINATION ${INCLUDE_DIR}/uhd/utils COMPONENT headers diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp new file mode 100644 index 000000000..38b2bddf0 --- /dev/null +++ b/host/include/uhd/utils/tasks.hpp @@ -0,0 +1,53 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#ifndef INCLUDED_UHD_UTILS_TASKS_HPP +#define INCLUDED_UHD_UTILS_TASKS_HPP + +#include +#include +#include +#include + +namespace uhd{ + + class task : boost::noncopyable{ + public: + typedef boost::shared_ptr sptr; + typedef boost::function task_fcn_type; + + /*! + * Create a new task object with function callback. + * The task function callback will be run in a loop. + * until the thread is interrupted by the deconstructor. + * + * A task should return in a reasonable amount of time + * or may block forever under the following conditions: + * - The blocking call is interruptible. + * - The task polls the interrupt condition. + * + * \param task_fcn the task callback function + * \return a new task object + */ + static sptr make(const task_fcn_type &task_fcn); + + }; + +} //namespace uhd + +#endif /* INCLUDED_UHD_UTILS_TASKS_HPP */ + diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f781f890d..0fa856d34 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,11 +21,11 @@ #include #include #include +#include #include #include #include #include -#include #include using namespace uhd; @@ -202,12 +202,10 @@ public: } //spawn the event handler threads - size_t concurrency = hints.cast("concurrency_hint", 1); - boost::barrier spawn_barrier(concurrency+1); - for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( - boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier)) - ); - spawn_barrier.wait(); + const size_t concurrency = hints.cast("concurrency_hint", 1); + for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make( + boost::bind(&libusb_zero_copy_impl::run_event_loop, this) + )); } ~libusb_zero_copy_impl(void){ @@ -221,9 +219,6 @@ public: boost::this_thread::sleep(boost::posix_time::milliseconds(10)); } } - //shutdown the threads - _thread_group.interrupt_all(); - _thread_group.join_all(); } managed_recv_buffer::sptr get_recv_buff(double timeout){ @@ -275,20 +270,17 @@ private: std::list _all_luts; //! event handler threads - boost::thread_group _thread_group; + std::list _event_loop_tasks; - void run_event_loop(boost::barrier &spawn_barrier){ - spawn_barrier.wait(); + void run_event_loop(void){ set_thread_priority_safe(); libusb_context *context = libusb::session::get_global_session()->get_context(); - try{ - while (not boost::this_thread::interruption_requested()){ - timeval tv; - tv.tv_sec = 0; - tv.tv_usec = 100000; //100ms - libusb_handle_events_timeout(context, &tv); - } - } catch(const boost::thread_interrupted &){} + while (not boost::this_thread::interruption_requested()){ + timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 100000; //100ms + libusb_handle_events_timeout(context, &tv); + } } }; diff --git a/host/lib/usrp/b100/b100_ctrl.cpp b/host/lib/usrp/b100/b100_ctrl.cpp index 5b03fd591..e08b47ce4 100644 --- a/host/lib/usrp/b100/b100_ctrl.cpp +++ b/host/lib/usrp/b100/b100_ctrl.cpp @@ -22,10 +22,12 @@ #include #include #include +#include #include #include #include "ctrl_packet.hpp" -#include +#include +#include #include using namespace uhd::transport; @@ -40,19 +42,12 @@ public: _ctrl_transport(ctrl_transport), _seq(0) { - boost::barrier spawn_barrier(2); - viking_marauders.create_thread(boost::bind(&b100_ctrl_impl::viking_marauder_loop, this, boost::ref(spawn_barrier))); - spawn_barrier.wait(); + viking_marauder = task::make(boost::bind(&b100_ctrl_impl::viking_marauder_loop, this)); } int write(boost::uint32_t addr, const ctrl_data_t &data); ctrl_data_t read(boost::uint32_t addr, size_t len); - ~b100_ctrl_impl(void) { - viking_marauders.interrupt_all(); - viking_marauders.join_all(); - } - bool get_ctrl_data(ctrl_data_t &pkt_data, double timeout); void poke32(wb_addr_type addr, boost::uint32_t data){ @@ -95,10 +90,10 @@ private: int send_pkt(boost::uint16_t *cmd); //änd hërë wë gö ä-Vïkïng för äsynchronous control packets - void viking_marauder_loop(boost::barrier &); + void viking_marauder_loop(void); bounded_buffer sync_ctrl_fifo; async_cb_type _async_cb; - boost::thread_group viking_marauders; + task::sptr viking_marauder; uhd::transport::zero_copy_if::sptr _ctrl_transport; boost::uint8_t _seq; @@ -206,8 +201,7 @@ ctrl_data_t b100_ctrl_impl::read(boost::uint32_t addr, size_t len) { * never have more than 1 message in it, since it's expected that we'll * wait for a control operation to finish before starting another one. **********************************************************************/ -void b100_ctrl_impl::viking_marauder_loop(boost::barrier &spawn_barrier) { - spawn_barrier.wait(); +void b100_ctrl_impl::viking_marauder_loop(void){ set_thread_priority_safe(); while (not boost::this_thread::interruption_requested()){ diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp index 69ca214dc..a10b3ffb3 100644 --- a/host/lib/usrp/e100/io_impl.cpp +++ b/host/lib/usrp/e100/io_impl.cpp @@ -24,12 +24,13 @@ #include "e100_regs.hpp" #include #include +#include #include #include #include #include +#include #include -#include #include //poll #include //open, close #include @@ -51,11 +52,6 @@ struct e100_impl::io_impl{ false_alarm(0), async_msg_fifo(100/*messages deep*/) { /* NOP */ } - ~io_impl(void){ - recv_pirate_crew.interrupt_all(); - recv_pirate_crew.join_all(); - } - double tick_rate; //set by update tick rate method e100_ctrl::sptr iface; //so handle irq can peek and poke void handle_irq(void); @@ -70,11 +66,8 @@ struct e100_impl::io_impl{ //a pirate's life is the life for me! void recv_pirate_loop( - boost::barrier &spawn_barrier, spi_iface::sptr //keep a sptr to iface which shares gpio147 ){ - spawn_barrier.wait(); - //open the GPIO and set it up for an IRQ std::ofstream edge_file("/sys/class/gpio/gpio147/edge"); edge_file << "rising" << std::endl << std::flush; @@ -94,7 +87,7 @@ struct e100_impl::io_impl{ ::close(fd); } bounded_buffer async_msg_fifo; - boost::thread_group recv_pirate_crew; + task::sptr pirate_task; }; void e100_impl::io_impl::handle_irq(void){ @@ -191,12 +184,9 @@ void e100_impl::io_init(void){ _fpga_ctrl->poke32(E100_REG_SR_ERR_CTRL, 1 << 1); //start //spawn a pirate, yarrr! - boost::barrier spawn_barrier(2); - _io_impl->recv_pirate_crew.create_thread(boost::bind( - &e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), - boost::ref(spawn_barrier), _aux_spi_iface + _io_impl->pirate_task = task::make(boost::bind( + &e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), _aux_spi_iface )); - spawn_barrier.wait(); //init some handler stuff _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_le); diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index b596bbd04..e81b00d1c 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -24,6 +24,7 @@ #include "usrp_commands.h" #include "usrp1_impl.hpp" #include +#include #include #include #include @@ -131,8 +132,6 @@ struct usrp1_impl::io_impl{ } ~io_impl(void){ - vandal_tribe.interrupt_all(); - vandal_tribe.join_all(); UHD_SAFE_CALL(flush_send_buff();) } @@ -159,7 +158,7 @@ struct usrp1_impl::io_impl{ return omsb.get_new(curr_buff, next_buff); } - boost::thread_group vandal_tribe; + task::sptr vandal_task; boost::system_time last_send_time; }; @@ -230,12 +229,9 @@ void usrp1_impl::io_init(void){ _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport)); //create a new vandal thread to poll xerflow conditions - boost::barrier spawn_barrier(2); - _io_impl->vandal_tribe.create_thread(boost::bind( - &usrp1_impl::vandal_conquest_loop, - this, boost::ref(spawn_barrier) + _io_impl->vandal_task = task::make(boost::bind( + &usrp1_impl::vandal_conquest_loop, this )); - spawn_barrier.wait(); //init some handler stuff _io_impl->recv_handler.set_tick_rate(_master_clock_rate); @@ -277,8 +273,7 @@ void usrp1_impl::tx_stream_on_off(bool enb){ * On an overflow, interleave an inline message into recv and print. * This procedure creates "soft" inline and async user messages. */ -void usrp1_impl::vandal_conquest_loop(boost::barrier &spawn_barrier){ - spawn_barrier.wait(); +void usrp1_impl::vandal_conquest_loop(void){ //initialize the async metadata async_metadata_t async_metadata; diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index ac0899e28..78481c3ff 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -16,9 +16,8 @@ // #include "soft_time_ctrl.hpp" +#include #include -#include -#include #include #include #include @@ -45,21 +44,12 @@ public: _stream_on_off(stream_on_off) { //synchronously spawn a new thread - boost::barrier spawn_barrier(2); - _thread_group.create_thread(boost::bind( - &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier)) - ); - spawn_barrier.wait(); + _recv_cmd_task = task::make(boost::bind(&soft_time_ctrl_impl::recv_cmd_task, this)); //initialize the time to something this->set_time(time_spec_t(0.0)); } - ~soft_time_ctrl_impl(void){ - _thread_group.interrupt_all(); - _thread_group.join_all(); - } - /******************************************************************* * Time control ******************************************************************/ @@ -204,15 +194,10 @@ public: _stream_mode = cmd.stream_mode; } - void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ - spawn_barrier.wait(); - try{ - boost::shared_ptr cmd; - while (true){ - _cmd_queue.pop_with_wait(cmd); - recv_cmd_handle_cmd(*cmd); - } - } catch(const boost::thread_interrupted &){} + void recv_cmd_task(void){ //task is looped + boost::shared_ptr cmd; + _cmd_queue.pop_with_wait(cmd); + recv_cmd_handle_cmd(*cmd); } bounded_buffer &get_async_queue(void){ @@ -232,7 +217,7 @@ private: bounded_buffer _async_msg_queue; bounded_buffer _inline_msg_queue; const cb_fcn_type _stream_on_off; - boost::thread_group _thread_group; + task::sptr _recv_cmd_task; }; /*********************************************************************** diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index cb1497253..1fe0c1784 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #ifndef INCLUDED_USRP1_IMPL_HPP @@ -130,7 +129,7 @@ private: bool has_rx_halfband(void); bool has_tx_halfband(void); - void vandal_conquest_loop(boost::barrier &); + void vandal_conquest_loop(void); //handle the enables bool _rx_enabled, _tx_enabled; diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 4c55012ce..7028e1ff8 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -22,15 +22,15 @@ #include "usrp2_regs.hpp" #include #include +#include #include #include #include #include +#include #include #include #include -#include -#include #include using namespace uhd; @@ -134,11 +134,6 @@ struct usrp2_impl::io_impl{ /* NOP */ } - ~io_impl(void){ - recv_pirate_crew.interrupt_all(); - recv_pirate_crew.join_all(); - } - managed_send_buffer::sptr get_send_buff(size_t chan, double timeout){ flow_control_monitor &fc_mon = *fc_mons[chan]; @@ -163,8 +158,8 @@ struct usrp2_impl::io_impl{ sph::send_packet_handler send_handler; //methods and variables for the pirate crew - void recv_pirate_loop(boost::barrier &, zero_copy_if::sptr, size_t); - boost::thread_group recv_pirate_crew; + void recv_pirate_loop(zero_copy_if::sptr, size_t); + std::list pirate_tasks; bounded_buffer async_msg_fifo; double tick_rate; }; @@ -176,11 +171,8 @@ struct usrp2_impl::io_impl{ * - put async message packets into queue **********************************************************************/ void usrp2_impl::io_impl::recv_pirate_loop( - boost::barrier &spawn_barrier, - zero_copy_if::sptr err_xport, - size_t index + zero_copy_if::sptr err_xport, size_t index ){ - spawn_barrier.wait(); set_thread_priority_safe(); //store a reference to the flow control monitor (offset by max dsps) @@ -231,7 +223,7 @@ void usrp2_impl::io_impl::recv_pirate_loop( //TODO unknown received packet, may want to print error... } }catch(const std::exception &e){ - UHD_MSG(error) << "Error (usrp2 recv pirate loop): " << e.what() << std::endl; + UHD_MSG(error) << "Error in recv pirate loop: " << e.what() << std::endl; } } } @@ -264,17 +256,14 @@ void usrp2_impl::io_init(void){ } //create a new pirate thread for each zc if (yarr!!) - boost::barrier spawn_barrier(_mbc.size()+1); size_t index = 0; BOOST_FOREACH(const std::string &mb, _mbc.keys()){ //spawn a new pirate to plunder the recv booty - _io_impl->recv_pirate_crew.create_thread(boost::bind( - &usrp2_impl::io_impl::recv_pirate_loop, - _io_impl.get(), boost::ref(spawn_barrier), + _io_impl->pirate_tasks.push_back(task::make(boost::bind( + &usrp2_impl::io_impl::recv_pirate_loop, _io_impl.get(), _mbc[mb].err_xports.at(0), index++ - )); + ))); } - spawn_barrier.wait(); //init some handler stuff _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); diff --git a/host/lib/usrp/usrp2/usrp2_iface.cpp b/host/lib/usrp/usrp2/usrp2_iface.cpp index 0db9e5d58..b1347119b 100644 --- a/host/lib/usrp/usrp2/usrp2_iface.cpp +++ b/host/lib/usrp/usrp2/usrp2_iface.cpp @@ -20,15 +20,15 @@ #include "usrp2_iface.hpp" #include #include +#include #include #include #include #include //used for htonl and ntohl #include #include +#include #include -#include -#include #include #include #include @@ -110,13 +110,12 @@ public: void lock_device(bool lock){ if (lock){ - boost::barrier spawn_barrier(2); - _lock_thread_group.create_thread(boost::bind(&usrp2_iface_impl::lock_loop, this, boost::ref(spawn_barrier))); - spawn_barrier.wait(); + this->get_reg(U2_FW_REG_LOCK_GPID, boost::uint32_t(get_gpid())); + _lock_task = task::make(boost::bind(&usrp2_iface_impl::lock_task, this)); } else{ - _lock_thread_group.interrupt_all(); - _lock_thread_group.join_all(); + _lock_task.reset(); //shutdown the task + this->get_reg(U2_FW_REG_LOCK_TIME, 0); //unlock } } @@ -132,29 +131,12 @@ public: return lock_gpid != boost::uint32_t(get_gpid()); } - void lock_loop(boost::barrier &spawn_barrier){ - spawn_barrier.wait(); - - try{ - this->get_reg(U2_FW_REG_LOCK_GPID, boost::uint32_t(get_gpid())); - while(true){ - //re-lock in loop - boost::uint32_t curr_secs = this->peek32(U2_REG_TIME64_SECS_RB_IMM); - this->get_reg(U2_FW_REG_LOCK_TIME, curr_secs); - //sleep for a bit - boost::this_thread::sleep(boost::posix_time::milliseconds(1500)); - } - } - catch(const boost::thread_interrupted &){ - this->get_reg(U2_FW_REG_LOCK_TIME, 0); //unlock on exit - } - catch(const std::exception &e){ - UHD_MSG(error) - << "An unexpected exception was caught in the locker loop." << std::endl - << "The device will automatically unlock from this process." << std::endl - << e.what() << std::endl - ; - } + void lock_task(void){ + //re-lock in task + boost::uint32_t curr_secs = this->peek32(U2_REG_TIME64_SECS_RB_IMM); + this->get_reg(U2_FW_REG_LOCK_TIME, curr_secs); + //sleep for a bit + boost::this_thread::sleep(boost::posix_time::milliseconds(1500)); } /*********************************************************************** @@ -400,7 +382,7 @@ private: boost::uint32_t _protocol_compat; //lock thread stuff - boost::thread_group _lock_thread_group; + task::sptr _lock_task; }; /*********************************************************************** diff --git a/host/lib/utils/CMakeLists.txt b/host/lib/utils/CMakeLists.txt index c8268c7b0..fd3249099 100644 --- a/host/lib/utils/CMakeLists.txt +++ b/host/lib/utils/CMakeLists.txt @@ -136,5 +136,6 @@ LIBUHD_APPEND_SOURCES( ${CMAKE_CURRENT_SOURCE_DIR}/paths.cpp ${CMAKE_CURRENT_SOURCE_DIR}/props.cpp ${CMAKE_CURRENT_SOURCE_DIR}/static.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/tasks.cpp ${CMAKE_CURRENT_SOURCE_DIR}/thread_priority.cpp ) diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp new file mode 100644 index 000000000..ef56bb2de --- /dev/null +++ b/host/lib/utils/tasks.cpp @@ -0,0 +1,75 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . +// + +#include +#include +#include +#include +#include +#include + +using namespace uhd; + +class task_impl : public task{ +public: + + task_impl(const task_fcn_type &task_fcn){ + boost::barrier spawn_barrier(2); + _thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn, boost::ref(spawn_barrier))); + spawn_barrier.wait(); + } + + ~task_impl(void){ + _thread_group.interrupt_all(); + _thread_group.join_all(); + } + +private: + + void task_loop(const task_fcn_type &task_fcn, boost::barrier &spawn_barrier){ + spawn_barrier.wait(); + + try{ + while (not boost::this_thread::interruption_requested()){ + task_fcn(); + } + } + catch(const boost::thread_interrupted &){ + //this is an ok way to exit the task loop + } + catch(const std::exception &e){ + do_error_msg(e.what()); + } + catch(...){ + do_error_msg("unknown exception"); + } + } + + void do_error_msg(const std::string &msg){ + UHD_MSG(error) + << "An unexpected exception was caught in a task loop." << std::endl + << "The task loop will now exit, things may not work." << std::endl + << msg << std::endl + ; + } + + boost::thread_group _thread_group; +}; + +task::sptr task::make(const task_fcn_type &task_fcn){ + return task::sptr(new task_impl(task_fcn)); +} -- cgit v1.2.3