diff options
| -rw-r--r-- | host/include/uhd/utils/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/include/uhd/utils/tasks.hpp | 53 | ||||
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 34 | ||||
| -rw-r--r-- | host/lib/usrp/b100/b100_ctrl.cpp | 20 | ||||
| -rw-r--r-- | host/lib/usrp/e100/io_impl.cpp | 20 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/io_impl.cpp | 15 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/soft_time_ctrl.cpp | 29 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/usrp1_impl.hpp | 3 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 29 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/usrp2_iface.cpp | 44 | ||||
| -rw-r--r-- | host/lib/utils/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | host/lib/utils/tasks.cpp | 75 | 
12 files changed, 190 insertions, 134 deletions
| diff --git a/host/include/uhd/utils/CMakeLists.txt b/host/include/uhd/utils/CMakeLists.txt index 88a0e612b..0bf98fb67 100644 --- a/host/include/uhd/utils/CMakeLists.txt +++ b/host/include/uhd/utils/CMakeLists.txt @@ -30,6 +30,7 @@ INSTALL(FILES      safe_call.hpp      safe_main.hpp      static.hpp +    tasks.hpp      thread_priority.hpp      DESTINATION ${INCLUDE_DIR}/uhd/utils      COMPONENT headers diff --git a/host/include/uhd/utils/tasks.hpp b/host/include/uhd/utils/tasks.hpp new file mode 100644 index 000000000..38b2bddf0 --- /dev/null +++ b/host/include/uhd/utils/tasks.hpp @@ -0,0 +1,53 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#ifndef INCLUDED_UHD_UTILS_TASKS_HPP +#define INCLUDED_UHD_UTILS_TASKS_HPP + +#include <uhd/config.hpp> +#include <boost/shared_ptr.hpp> +#include <boost/function.hpp> +#include <boost/utility.hpp> + +namespace uhd{ + +    class task : boost::noncopyable{ +    public: +        typedef boost::shared_ptr<task> sptr; +        typedef boost::function<void(void)> task_fcn_type; + +        /*! +         * Create a new task object with function callback. +         * The task function callback will be run in a loop. +         * until the thread is interrupted by the deconstructor. +         * +         * A task should return in a reasonable amount of time +         * or may block forever under the following conditions: +         *  - The blocking call is interruptible. +         *  - The task polls the interrupt condition. +         * +         * \param task_fcn the task callback function +         * \return a new task object +         */ +        static sptr make(const task_fcn_type &task_fcn); + +    }; + +} //namespace uhd + +#endif /* INCLUDED_UHD_UTILS_TASKS_HPP */ + diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index f781f890d..0fa856d34 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -21,11 +21,11 @@  #include <uhd/transport/buffer_pool.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/exception.hpp>  #include <boost/function.hpp>  #include <boost/foreach.hpp>  #include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp>  #include <list>  using namespace uhd; @@ -202,12 +202,10 @@ public:          }          //spawn the event handler threads -        size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); -        boost::barrier spawn_barrier(concurrency+1); -        for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( -            boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier)) -        ); -        spawn_barrier.wait(); +        const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); +        for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make( +            boost::bind(&libusb_zero_copy_impl::run_event_loop, this) +        ));      }      ~libusb_zero_copy_impl(void){ @@ -221,9 +219,6 @@ public:                  boost::this_thread::sleep(boost::posix_time::milliseconds(10));              }          } -        //shutdown the threads -        _thread_group.interrupt_all(); -        _thread_group.join_all();      }      managed_recv_buffer::sptr get_recv_buff(double timeout){ @@ -275,20 +270,17 @@ private:      std::list<libusb_transfer *> _all_luts;      //! event handler threads -    boost::thread_group _thread_group; +    std::list<task::sptr> _event_loop_tasks; -    void run_event_loop(boost::barrier &spawn_barrier){ -        spawn_barrier.wait(); +    void run_event_loop(void){          set_thread_priority_safe();          libusb_context *context = libusb::session::get_global_session()->get_context(); -        try{ -            while (not boost::this_thread::interruption_requested()){ -                timeval tv; -                tv.tv_sec = 0; -                tv.tv_usec = 100000; //100ms -                libusb_handle_events_timeout(context, &tv); -            } -        } catch(const boost::thread_interrupted &){} +        while (not boost::this_thread::interruption_requested()){ +            timeval tv; +            tv.tv_sec = 0; +            tv.tv_usec = 100000; //100ms +            libusb_handle_events_timeout(context, &tv); +        }      }  }; diff --git a/host/lib/usrp/b100/b100_ctrl.cpp b/host/lib/usrp/b100/b100_ctrl.cpp index 5b03fd591..e08b47ce4 100644 --- a/host/lib/usrp/b100/b100_ctrl.cpp +++ b/host/lib/usrp/b100/b100_ctrl.cpp @@ -22,10 +22,12 @@  #include <uhd/transport/vrt_if_packet.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/types/metadata.hpp>  #include <uhd/types/serial.hpp>  #include "ctrl_packet.hpp" -#include <boost/thread.hpp> +#include <boost/thread/thread.hpp> +#include <boost/bind.hpp>  #include <uhd/exception.hpp>  using namespace uhd::transport; @@ -40,19 +42,12 @@ public:          _ctrl_transport(ctrl_transport),          _seq(0)      { -        boost::barrier spawn_barrier(2); -        viking_marauders.create_thread(boost::bind(&b100_ctrl_impl::viking_marauder_loop, this, boost::ref(spawn_barrier))); -        spawn_barrier.wait(); +        viking_marauder = task::make(boost::bind(&b100_ctrl_impl::viking_marauder_loop, this));      }      int write(boost::uint32_t addr, const ctrl_data_t &data);      ctrl_data_t read(boost::uint32_t addr, size_t len); -    ~b100_ctrl_impl(void) { -        viking_marauders.interrupt_all(); -        viking_marauders.join_all(); -    } -      bool get_ctrl_data(ctrl_data_t &pkt_data, double timeout);      void poke32(wb_addr_type addr, boost::uint32_t data){ @@ -95,10 +90,10 @@ private:      int send_pkt(boost::uint16_t *cmd);      //änd hërë wë gö ä-Vïkïng för äsynchronous control packets -    void viking_marauder_loop(boost::barrier &); +    void viking_marauder_loop(void);      bounded_buffer<ctrl_data_t> sync_ctrl_fifo;      async_cb_type _async_cb; -    boost::thread_group viking_marauders; +    task::sptr viking_marauder;      uhd::transport::zero_copy_if::sptr _ctrl_transport;      boost::uint8_t _seq; @@ -206,8 +201,7 @@ ctrl_data_t b100_ctrl_impl::read(boost::uint32_t addr, size_t len) {   * never have more than 1 message in it, since it's expected that we'll   * wait for a control operation to finish before starting another one.   **********************************************************************/ -void b100_ctrl_impl::viking_marauder_loop(boost::barrier &spawn_barrier) { -    spawn_barrier.wait(); +void b100_ctrl_impl::viking_marauder_loop(void){      set_thread_priority_safe();      while (not boost::this_thread::interruption_requested()){ diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp index 69ca214dc..a10b3ffb3 100644 --- a/host/lib/usrp/e100/io_impl.cpp +++ b/host/lib/usrp/e100/io_impl.cpp @@ -24,12 +24,13 @@  #include "e100_regs.hpp"  #include <uhd/utils/msg.hpp>  #include <uhd/utils/log.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/transport/bounded_buffer.hpp>  #include <boost/bind.hpp>  #include <boost/format.hpp> +#include <boost/bind.hpp>  #include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp>  #include <poll.h> //poll  #include <fcntl.h> //open, close  #include <sstream> @@ -51,11 +52,6 @@ struct e100_impl::io_impl{          false_alarm(0), async_msg_fifo(100/*messages deep*/)      { /* NOP */ } -    ~io_impl(void){ -        recv_pirate_crew.interrupt_all(); -        recv_pirate_crew.join_all(); -    } -      double tick_rate; //set by update tick rate method      e100_ctrl::sptr iface; //so handle irq can peek and poke      void handle_irq(void); @@ -70,11 +66,8 @@ struct e100_impl::io_impl{      //a pirate's life is the life for me!      void recv_pirate_loop( -        boost::barrier &spawn_barrier,          spi_iface::sptr //keep a sptr to iface which shares gpio147      ){ -        spawn_barrier.wait(); -          //open the GPIO and set it up for an IRQ          std::ofstream edge_file("/sys/class/gpio/gpio147/edge");          edge_file << "rising" << std::endl << std::flush; @@ -94,7 +87,7 @@ struct e100_impl::io_impl{          ::close(fd);      }      bounded_buffer<async_metadata_t> async_msg_fifo; -    boost::thread_group recv_pirate_crew; +    task::sptr pirate_task;  };  void e100_impl::io_impl::handle_irq(void){ @@ -191,12 +184,9 @@ void e100_impl::io_init(void){      _fpga_ctrl->poke32(E100_REG_SR_ERR_CTRL, 1 << 1); //start      //spawn a pirate, yarrr! -    boost::barrier spawn_barrier(2); -    _io_impl->recv_pirate_crew.create_thread(boost::bind( -        &e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), -        boost::ref(spawn_barrier), _aux_spi_iface +    _io_impl->pirate_task = task::make(boost::bind( +        &e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), _aux_spi_iface      )); -    spawn_barrier.wait();      //init some handler stuff      _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_le); diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp index b596bbd04..e81b00d1c 100644 --- a/host/lib/usrp/usrp1/io_impl.cpp +++ b/host/lib/usrp/usrp1/io_impl.cpp @@ -24,6 +24,7 @@  #include "usrp_commands.h"  #include "usrp1_impl.hpp"  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/utils/safe_call.hpp>  #include <uhd/transport/bounded_buffer.hpp>  #include <boost/math/special_functions/sign.hpp> @@ -131,8 +132,6 @@ struct usrp1_impl::io_impl{      }      ~io_impl(void){ -        vandal_tribe.interrupt_all(); -        vandal_tribe.join_all();          UHD_SAFE_CALL(flush_send_buff();)      } @@ -159,7 +158,7 @@ struct usrp1_impl::io_impl{          return omsb.get_new(curr_buff, next_buff);      } -    boost::thread_group vandal_tribe; +    task::sptr vandal_task;      boost::system_time last_send_time;  }; @@ -230,12 +229,9 @@ void usrp1_impl::io_init(void){      _io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport));      //create a new vandal thread to poll xerflow conditions -    boost::barrier spawn_barrier(2); -    _io_impl->vandal_tribe.create_thread(boost::bind( -        &usrp1_impl::vandal_conquest_loop, -        this, boost::ref(spawn_barrier) +    _io_impl->vandal_task = task::make(boost::bind( +        &usrp1_impl::vandal_conquest_loop, this      )); -    spawn_barrier.wait();      //init some handler stuff      _io_impl->recv_handler.set_tick_rate(_master_clock_rate); @@ -277,8 +273,7 @@ void usrp1_impl::tx_stream_on_off(bool enb){   * On an overflow, interleave an inline message into recv and print.   * This procedure creates "soft" inline and async user messages.   */ -void usrp1_impl::vandal_conquest_loop(boost::barrier &spawn_barrier){ -    spawn_barrier.wait(); +void usrp1_impl::vandal_conquest_loop(void){      //initialize the async metadata      async_metadata_t async_metadata; diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index ac0899e28..78481c3ff 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -16,9 +16,8 @@  //  #include "soft_time_ctrl.hpp" +#include <uhd/utils/tasks.hpp>  #include <boost/make_shared.hpp> -#include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp>  #include <boost/thread/condition_variable.hpp>  #include <boost/date_time/posix_time/posix_time.hpp>  #include <iostream> @@ -45,21 +44,12 @@ public:          _stream_on_off(stream_on_off)      {          //synchronously spawn a new thread -        boost::barrier spawn_barrier(2); -        _thread_group.create_thread(boost::bind( -            &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier)) -        ); -        spawn_barrier.wait(); +        _recv_cmd_task = task::make(boost::bind(&soft_time_ctrl_impl::recv_cmd_task, this));          //initialize the time to something          this->set_time(time_spec_t(0.0));      } -    ~soft_time_ctrl_impl(void){ -        _thread_group.interrupt_all(); -        _thread_group.join_all(); -    } -      /*******************************************************************       * Time control       ******************************************************************/ @@ -204,15 +194,10 @@ public:          _stream_mode = cmd.stream_mode;      } -    void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ -        spawn_barrier.wait(); -        try{ -            boost::shared_ptr<stream_cmd_t> cmd; -            while (true){ -                _cmd_queue.pop_with_wait(cmd); -                recv_cmd_handle_cmd(*cmd); -            } -        } catch(const boost::thread_interrupted &){} +    void recv_cmd_task(void){ //task is looped +        boost::shared_ptr<stream_cmd_t> cmd; +        _cmd_queue.pop_with_wait(cmd); +        recv_cmd_handle_cmd(*cmd);      }      bounded_buffer<async_metadata_t> &get_async_queue(void){ @@ -232,7 +217,7 @@ private:      bounded_buffer<async_metadata_t> _async_msg_queue;      bounded_buffer<rx_metadata_t> _inline_msg_queue;      const cb_fcn_type _stream_on_off; -    boost::thread_group _thread_group; +    task::sptr _recv_cmd_task;  };  /*********************************************************************** diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp index cb1497253..1fe0c1784 100644 --- a/host/lib/usrp/usrp1/usrp1_impl.hpp +++ b/host/lib/usrp/usrp1/usrp1_impl.hpp @@ -30,7 +30,6 @@  #include <uhd/usrp/subdev_spec.hpp>  #include <uhd/usrp/dboard_eeprom.hpp>  #include <uhd/usrp/dboard_manager.hpp> -#include <boost/thread/barrier.hpp>  #include <uhd/transport/usb_zero_copy.hpp>  #ifndef INCLUDED_USRP1_IMPL_HPP @@ -130,7 +129,7 @@ private:      bool has_rx_halfband(void);      bool has_tx_halfband(void); -    void vandal_conquest_loop(boost::barrier &); +    void vandal_conquest_loop(void);      //handle the enables      bool _rx_enabled, _tx_enabled; diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 4c55012ce..7028e1ff8 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -22,15 +22,15 @@  #include "usrp2_regs.hpp"  #include <uhd/utils/log.hpp>  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/exception.hpp>  #include <uhd/utils/byteswap.hpp>  #include <uhd/utils/thread_priority.hpp>  #include <uhd/transport/bounded_buffer.hpp> +#include <boost/thread/thread.hpp>  #include <boost/format.hpp>  #include <boost/bind.hpp>  #include <boost/thread/mutex.hpp> -#include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp>  #include <iostream>  using namespace uhd; @@ -134,11 +134,6 @@ struct usrp2_impl::io_impl{          /* NOP */      } -    ~io_impl(void){ -        recv_pirate_crew.interrupt_all(); -        recv_pirate_crew.join_all(); -    } -      managed_send_buffer::sptr get_send_buff(size_t chan, double timeout){          flow_control_monitor &fc_mon = *fc_mons[chan]; @@ -163,8 +158,8 @@ struct usrp2_impl::io_impl{      sph::send_packet_handler send_handler;      //methods and variables for the pirate crew -    void recv_pirate_loop(boost::barrier &, zero_copy_if::sptr, size_t); -    boost::thread_group recv_pirate_crew; +    void recv_pirate_loop(zero_copy_if::sptr, size_t); +    std::list<task::sptr> pirate_tasks;      bounded_buffer<async_metadata_t> async_msg_fifo;      double tick_rate;  }; @@ -176,11 +171,8 @@ struct usrp2_impl::io_impl{   * - put async message packets into queue   **********************************************************************/  void usrp2_impl::io_impl::recv_pirate_loop( -    boost::barrier &spawn_barrier, -    zero_copy_if::sptr err_xport, -    size_t index +    zero_copy_if::sptr err_xport, size_t index  ){ -    spawn_barrier.wait();      set_thread_priority_safe();      //store a reference to the flow control monitor (offset by max dsps) @@ -231,7 +223,7 @@ void usrp2_impl::io_impl::recv_pirate_loop(                  //TODO unknown received packet, may want to print error...              }          }catch(const std::exception &e){ -            UHD_MSG(error) << "Error (usrp2 recv pirate loop): " << e.what() << std::endl; +            UHD_MSG(error) << "Error in recv pirate loop: " << e.what() << std::endl;          }      }  } @@ -264,17 +256,14 @@ void usrp2_impl::io_init(void){      }      //create a new pirate thread for each zc if (yarr!!) -    boost::barrier spawn_barrier(_mbc.size()+1);      size_t index = 0;      BOOST_FOREACH(const std::string &mb, _mbc.keys()){          //spawn a new pirate to plunder the recv booty -        _io_impl->recv_pirate_crew.create_thread(boost::bind( -            &usrp2_impl::io_impl::recv_pirate_loop, -            _io_impl.get(), boost::ref(spawn_barrier), +        _io_impl->pirate_tasks.push_back(task::make(boost::bind( +            &usrp2_impl::io_impl::recv_pirate_loop, _io_impl.get(),              _mbc[mb].err_xports.at(0), index++ -        )); +        )));      } -    spawn_barrier.wait();      //init some handler stuff      _io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be); diff --git a/host/lib/usrp/usrp2/usrp2_iface.cpp b/host/lib/usrp/usrp2/usrp2_iface.cpp index 0db9e5d58..b1347119b 100644 --- a/host/lib/usrp/usrp2/usrp2_iface.cpp +++ b/host/lib/usrp/usrp2/usrp2_iface.cpp @@ -20,15 +20,15 @@  #include "usrp2_iface.hpp"  #include <uhd/exception.hpp>  #include <uhd/utils/msg.hpp> +#include <uhd/utils/tasks.hpp>  #include <uhd/types/dict.hpp>  #include <boost/thread.hpp>  #include <boost/foreach.hpp>  #include <boost/asio.hpp> //used for htonl and ntohl  #include <boost/assign/list_of.hpp>  #include <boost/format.hpp> +#include <boost/bind.hpp>  #include <boost/tokenizer.hpp> -#include <boost/thread/thread.hpp> -#include <boost/thread/barrier.hpp>  #include <boost/functional/hash.hpp>  #include <algorithm>  #include <iostream> @@ -110,13 +110,12 @@ public:      void lock_device(bool lock){          if (lock){ -            boost::barrier spawn_barrier(2); -            _lock_thread_group.create_thread(boost::bind(&usrp2_iface_impl::lock_loop, this, boost::ref(spawn_barrier))); -            spawn_barrier.wait(); +            this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_GPID, boost::uint32_t(get_gpid())); +            _lock_task = task::make(boost::bind(&usrp2_iface_impl::lock_task, this));          }          else{ -            _lock_thread_group.interrupt_all(); -            _lock_thread_group.join_all(); +            _lock_task.reset(); //shutdown the task +            this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, 0); //unlock          }      } @@ -132,29 +131,12 @@ public:          return lock_gpid != boost::uint32_t(get_gpid());      } -    void lock_loop(boost::barrier &spawn_barrier){ -        spawn_barrier.wait(); - -        try{ -            this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_GPID, boost::uint32_t(get_gpid())); -            while(true){ -                //re-lock in loop -                boost::uint32_t curr_secs = this->peek32(U2_REG_TIME64_SECS_RB_IMM); -                this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, curr_secs); -                //sleep for a bit -                boost::this_thread::sleep(boost::posix_time::milliseconds(1500)); -            } -        } -        catch(const boost::thread_interrupted &){ -            this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, 0); //unlock on exit -        } -        catch(const std::exception &e){ -            UHD_MSG(error) -                << "An unexpected exception was caught in the locker loop." << std::endl -                << "The device will automatically unlock from this process." << std::endl -                << e.what() << std::endl -            ; -        } +    void lock_task(void){ +        //re-lock in task +        boost::uint32_t curr_secs = this->peek32(U2_REG_TIME64_SECS_RB_IMM); +        this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, curr_secs); +        //sleep for a bit +        boost::this_thread::sleep(boost::posix_time::milliseconds(1500));      }  /*********************************************************************** @@ -400,7 +382,7 @@ private:      boost::uint32_t _protocol_compat;      //lock thread stuff -    boost::thread_group _lock_thread_group; +    task::sptr _lock_task;  };  /*********************************************************************** diff --git a/host/lib/utils/CMakeLists.txt b/host/lib/utils/CMakeLists.txt index c8268c7b0..fd3249099 100644 --- a/host/lib/utils/CMakeLists.txt +++ b/host/lib/utils/CMakeLists.txt @@ -136,5 +136,6 @@ LIBUHD_APPEND_SOURCES(      ${CMAKE_CURRENT_SOURCE_DIR}/paths.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/props.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/static.cpp +    ${CMAKE_CURRENT_SOURCE_DIR}/tasks.cpp      ${CMAKE_CURRENT_SOURCE_DIR}/thread_priority.cpp  ) diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp new file mode 100644 index 000000000..ef56bb2de --- /dev/null +++ b/host/lib/utils/tasks.cpp @@ -0,0 +1,75 @@ +// +// Copyright 2011 Ettus Research LLC +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program.  If not, see <http://www.gnu.org/licenses/>. +// + +#include <uhd/utils/tasks.hpp> +#include <uhd/utils/msg.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/barrier.hpp> +#include <exception> +#include <iostream> + +using namespace uhd; + +class task_impl : public task{ +public: + +    task_impl(const task_fcn_type &task_fcn){ +        boost::barrier spawn_barrier(2); +        _thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn, boost::ref(spawn_barrier))); +        spawn_barrier.wait(); +    } + +    ~task_impl(void){ +        _thread_group.interrupt_all(); +        _thread_group.join_all(); +    } + +private: + +    void task_loop(const task_fcn_type &task_fcn, boost::barrier &spawn_barrier){ +        spawn_barrier.wait(); + +        try{ +            while (not boost::this_thread::interruption_requested()){ +                task_fcn(); +            } +        } +        catch(const boost::thread_interrupted &){ +            //this is an ok way to exit the task loop +        } +        catch(const std::exception &e){ +            do_error_msg(e.what()); +        } +        catch(...){ +            do_error_msg("unknown exception"); +        } +    } + +    void do_error_msg(const std::string &msg){ +        UHD_MSG(error) +            << "An unexpected exception was caught in a task loop." << std::endl +            << "The task loop will now exit, things may not work." << std::endl +            << msg << std::endl +        ; +    } + +    boost::thread_group _thread_group; +}; + +task::sptr task::make(const task_fcn_type &task_fcn){ +    return task::sptr(new task_impl(task_fcn)); +} | 
