summaryrefslogtreecommitdiffstats
path: root/host/lib
diff options
context:
space:
mode:
Diffstat (limited to 'host/lib')
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp34
-rw-r--r--host/lib/usrp/b100/b100_ctrl.cpp20
-rw-r--r--host/lib/usrp/e100/io_impl.cpp20
-rw-r--r--host/lib/usrp/usrp1/io_impl.cpp15
-rw-r--r--host/lib/usrp/usrp1/soft_time_ctrl.cpp29
-rw-r--r--host/lib/usrp/usrp1/usrp1_impl.hpp3
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp29
-rw-r--r--host/lib/usrp/usrp2/usrp2_iface.cpp44
-rw-r--r--host/lib/utils/CMakeLists.txt1
-rw-r--r--host/lib/utils/tasks.cpp75
10 files changed, 136 insertions, 134 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index f781f890d..0fa856d34 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -21,11 +21,11 @@
#include <uhd/transport/buffer_pool.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/exception.hpp>
#include <boost/function.hpp>
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
#include <list>
using namespace uhd;
@@ -202,12 +202,10 @@ public:
}
//spawn the event handler threads
- size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
- boost::barrier spawn_barrier(concurrency+1);
- for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
- boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier))
- );
- spawn_barrier.wait();
+ const size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
+ for (size_t i = 0; i < concurrency; i++) _event_loop_tasks.push_back(task::make(
+ boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
+ ));
}
~libusb_zero_copy_impl(void){
@@ -221,9 +219,6 @@ public:
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
}
- //shutdown the threads
- _thread_group.interrupt_all();
- _thread_group.join_all();
}
managed_recv_buffer::sptr get_recv_buff(double timeout){
@@ -275,20 +270,17 @@ private:
std::list<libusb_transfer *> _all_luts;
//! event handler threads
- boost::thread_group _thread_group;
+ std::list<task::sptr> _event_loop_tasks;
- void run_event_loop(boost::barrier &spawn_barrier){
- spawn_barrier.wait();
+ void run_event_loop(void){
set_thread_priority_safe();
libusb_context *context = libusb::session::get_global_session()->get_context();
- try{
- while (not boost::this_thread::interruption_requested()){
- timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 100000; //100ms
- libusb_handle_events_timeout(context, &tv);
- }
- } catch(const boost::thread_interrupted &){}
+ while (not boost::this_thread::interruption_requested()){
+ timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000; //100ms
+ libusb_handle_events_timeout(context, &tv);
+ }
}
};
diff --git a/host/lib/usrp/b100/b100_ctrl.cpp b/host/lib/usrp/b100/b100_ctrl.cpp
index 5b03fd591..e08b47ce4 100644
--- a/host/lib/usrp/b100/b100_ctrl.cpp
+++ b/host/lib/usrp/b100/b100_ctrl.cpp
@@ -22,10 +22,12 @@
#include <uhd/transport/vrt_if_packet.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/types/metadata.hpp>
#include <uhd/types/serial.hpp>
#include "ctrl_packet.hpp"
-#include <boost/thread.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/bind.hpp>
#include <uhd/exception.hpp>
using namespace uhd::transport;
@@ -40,19 +42,12 @@ public:
_ctrl_transport(ctrl_transport),
_seq(0)
{
- boost::barrier spawn_barrier(2);
- viking_marauders.create_thread(boost::bind(&b100_ctrl_impl::viking_marauder_loop, this, boost::ref(spawn_barrier)));
- spawn_barrier.wait();
+ viking_marauder = task::make(boost::bind(&b100_ctrl_impl::viking_marauder_loop, this));
}
int write(boost::uint32_t addr, const ctrl_data_t &data);
ctrl_data_t read(boost::uint32_t addr, size_t len);
- ~b100_ctrl_impl(void) {
- viking_marauders.interrupt_all();
- viking_marauders.join_all();
- }
-
bool get_ctrl_data(ctrl_data_t &pkt_data, double timeout);
void poke32(wb_addr_type addr, boost::uint32_t data){
@@ -95,10 +90,10 @@ private:
int send_pkt(boost::uint16_t *cmd);
//änd hërë wë gö ä-Vïkïng för äsynchronous control packets
- void viking_marauder_loop(boost::barrier &);
+ void viking_marauder_loop(void);
bounded_buffer<ctrl_data_t> sync_ctrl_fifo;
async_cb_type _async_cb;
- boost::thread_group viking_marauders;
+ task::sptr viking_marauder;
uhd::transport::zero_copy_if::sptr _ctrl_transport;
boost::uint8_t _seq;
@@ -206,8 +201,7 @@ ctrl_data_t b100_ctrl_impl::read(boost::uint32_t addr, size_t len) {
* never have more than 1 message in it, since it's expected that we'll
* wait for a control operation to finish before starting another one.
**********************************************************************/
-void b100_ctrl_impl::viking_marauder_loop(boost::barrier &spawn_barrier) {
- spawn_barrier.wait();
+void b100_ctrl_impl::viking_marauder_loop(void){
set_thread_priority_safe();
while (not boost::this_thread::interruption_requested()){
diff --git a/host/lib/usrp/e100/io_impl.cpp b/host/lib/usrp/e100/io_impl.cpp
index 69ca214dc..a10b3ffb3 100644
--- a/host/lib/usrp/e100/io_impl.cpp
+++ b/host/lib/usrp/e100/io_impl.cpp
@@ -24,12 +24,13 @@
#include "e100_regs.hpp"
#include <uhd/utils/msg.hpp>
#include <uhd/utils/log.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/transport/bounded_buffer.hpp>
#include <boost/bind.hpp>
#include <boost/format.hpp>
+#include <boost/bind.hpp>
#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
#include <poll.h> //poll
#include <fcntl.h> //open, close
#include <sstream>
@@ -51,11 +52,6 @@ struct e100_impl::io_impl{
false_alarm(0), async_msg_fifo(100/*messages deep*/)
{ /* NOP */ }
- ~io_impl(void){
- recv_pirate_crew.interrupt_all();
- recv_pirate_crew.join_all();
- }
-
double tick_rate; //set by update tick rate method
e100_ctrl::sptr iface; //so handle irq can peek and poke
void handle_irq(void);
@@ -70,11 +66,8 @@ struct e100_impl::io_impl{
//a pirate's life is the life for me!
void recv_pirate_loop(
- boost::barrier &spawn_barrier,
spi_iface::sptr //keep a sptr to iface which shares gpio147
){
- spawn_barrier.wait();
-
//open the GPIO and set it up for an IRQ
std::ofstream edge_file("/sys/class/gpio/gpio147/edge");
edge_file << "rising" << std::endl << std::flush;
@@ -94,7 +87,7 @@ struct e100_impl::io_impl{
::close(fd);
}
bounded_buffer<async_metadata_t> async_msg_fifo;
- boost::thread_group recv_pirate_crew;
+ task::sptr pirate_task;
};
void e100_impl::io_impl::handle_irq(void){
@@ -191,12 +184,9 @@ void e100_impl::io_init(void){
_fpga_ctrl->poke32(E100_REG_SR_ERR_CTRL, 1 << 1); //start
//spawn a pirate, yarrr!
- boost::barrier spawn_barrier(2);
- _io_impl->recv_pirate_crew.create_thread(boost::bind(
- &e100_impl::io_impl::recv_pirate_loop, _io_impl.get(),
- boost::ref(spawn_barrier), _aux_spi_iface
+ _io_impl->pirate_task = task::make(boost::bind(
+ &e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), _aux_spi_iface
));
- spawn_barrier.wait();
//init some handler stuff
_io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_le);
diff --git a/host/lib/usrp/usrp1/io_impl.cpp b/host/lib/usrp/usrp1/io_impl.cpp
index b596bbd04..e81b00d1c 100644
--- a/host/lib/usrp/usrp1/io_impl.cpp
+++ b/host/lib/usrp/usrp1/io_impl.cpp
@@ -24,6 +24,7 @@
#include "usrp_commands.h"
#include "usrp1_impl.hpp"
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/utils/safe_call.hpp>
#include <uhd/transport/bounded_buffer.hpp>
#include <boost/math/special_functions/sign.hpp>
@@ -131,8 +132,6 @@ struct usrp1_impl::io_impl{
}
~io_impl(void){
- vandal_tribe.interrupt_all();
- vandal_tribe.join_all();
UHD_SAFE_CALL(flush_send_buff();)
}
@@ -159,7 +158,7 @@ struct usrp1_impl::io_impl{
return omsb.get_new(curr_buff, next_buff);
}
- boost::thread_group vandal_tribe;
+ task::sptr vandal_task;
boost::system_time last_send_time;
};
@@ -230,12 +229,9 @@ void usrp1_impl::io_init(void){
_io_impl = UHD_PIMPL_MAKE(io_impl, (_data_transport));
//create a new vandal thread to poll xerflow conditions
- boost::barrier spawn_barrier(2);
- _io_impl->vandal_tribe.create_thread(boost::bind(
- &usrp1_impl::vandal_conquest_loop,
- this, boost::ref(spawn_barrier)
+ _io_impl->vandal_task = task::make(boost::bind(
+ &usrp1_impl::vandal_conquest_loop, this
));
- spawn_barrier.wait();
//init some handler stuff
_io_impl->recv_handler.set_tick_rate(_master_clock_rate);
@@ -277,8 +273,7 @@ void usrp1_impl::tx_stream_on_off(bool enb){
* On an overflow, interleave an inline message into recv and print.
* This procedure creates "soft" inline and async user messages.
*/
-void usrp1_impl::vandal_conquest_loop(boost::barrier &spawn_barrier){
- spawn_barrier.wait();
+void usrp1_impl::vandal_conquest_loop(void){
//initialize the async metadata
async_metadata_t async_metadata;
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
index ac0899e28..78481c3ff 100644
--- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp
+++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
@@ -16,9 +16,8 @@
//
#include "soft_time_ctrl.hpp"
+#include <uhd/utils/tasks.hpp>
#include <boost/make_shared.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <iostream>
@@ -45,21 +44,12 @@ public:
_stream_on_off(stream_on_off)
{
//synchronously spawn a new thread
- boost::barrier spawn_barrier(2);
- _thread_group.create_thread(boost::bind(
- &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier))
- );
- spawn_barrier.wait();
+ _recv_cmd_task = task::make(boost::bind(&soft_time_ctrl_impl::recv_cmd_task, this));
//initialize the time to something
this->set_time(time_spec_t(0.0));
}
- ~soft_time_ctrl_impl(void){
- _thread_group.interrupt_all();
- _thread_group.join_all();
- }
-
/*******************************************************************
* Time control
******************************************************************/
@@ -204,15 +194,10 @@ public:
_stream_mode = cmd.stream_mode;
}
- void recv_cmd_dispatcher(boost::barrier &spawn_barrier){
- spawn_barrier.wait();
- try{
- boost::shared_ptr<stream_cmd_t> cmd;
- while (true){
- _cmd_queue.pop_with_wait(cmd);
- recv_cmd_handle_cmd(*cmd);
- }
- } catch(const boost::thread_interrupted &){}
+ void recv_cmd_task(void){ //task is looped
+ boost::shared_ptr<stream_cmd_t> cmd;
+ _cmd_queue.pop_with_wait(cmd);
+ recv_cmd_handle_cmd(*cmd);
}
bounded_buffer<async_metadata_t> &get_async_queue(void){
@@ -232,7 +217,7 @@ private:
bounded_buffer<async_metadata_t> _async_msg_queue;
bounded_buffer<rx_metadata_t> _inline_msg_queue;
const cb_fcn_type _stream_on_off;
- boost::thread_group _thread_group;
+ task::sptr _recv_cmd_task;
};
/***********************************************************************
diff --git a/host/lib/usrp/usrp1/usrp1_impl.hpp b/host/lib/usrp/usrp1/usrp1_impl.hpp
index cb1497253..1fe0c1784 100644
--- a/host/lib/usrp/usrp1/usrp1_impl.hpp
+++ b/host/lib/usrp/usrp1/usrp1_impl.hpp
@@ -30,7 +30,6 @@
#include <uhd/usrp/subdev_spec.hpp>
#include <uhd/usrp/dboard_eeprom.hpp>
#include <uhd/usrp/dboard_manager.hpp>
-#include <boost/thread/barrier.hpp>
#include <uhd/transport/usb_zero_copy.hpp>
#ifndef INCLUDED_USRP1_IMPL_HPP
@@ -130,7 +129,7 @@ private:
bool has_rx_halfband(void);
bool has_tx_halfband(void);
- void vandal_conquest_loop(boost::barrier &);
+ void vandal_conquest_loop(void);
//handle the enables
bool _rx_enabled, _tx_enabled;
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index 4c55012ce..7028e1ff8 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -22,15 +22,15 @@
#include "usrp2_regs.hpp"
#include <uhd/utils/log.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/exception.hpp>
#include <uhd/utils/byteswap.hpp>
#include <uhd/utils/thread_priority.hpp>
#include <uhd/transport/bounded_buffer.hpp>
+#include <boost/thread/thread.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
#include <iostream>
using namespace uhd;
@@ -134,11 +134,6 @@ struct usrp2_impl::io_impl{
/* NOP */
}
- ~io_impl(void){
- recv_pirate_crew.interrupt_all();
- recv_pirate_crew.join_all();
- }
-
managed_send_buffer::sptr get_send_buff(size_t chan, double timeout){
flow_control_monitor &fc_mon = *fc_mons[chan];
@@ -163,8 +158,8 @@ struct usrp2_impl::io_impl{
sph::send_packet_handler send_handler;
//methods and variables for the pirate crew
- void recv_pirate_loop(boost::barrier &, zero_copy_if::sptr, size_t);
- boost::thread_group recv_pirate_crew;
+ void recv_pirate_loop(zero_copy_if::sptr, size_t);
+ std::list<task::sptr> pirate_tasks;
bounded_buffer<async_metadata_t> async_msg_fifo;
double tick_rate;
};
@@ -176,11 +171,8 @@ struct usrp2_impl::io_impl{
* - put async message packets into queue
**********************************************************************/
void usrp2_impl::io_impl::recv_pirate_loop(
- boost::barrier &spawn_barrier,
- zero_copy_if::sptr err_xport,
- size_t index
+ zero_copy_if::sptr err_xport, size_t index
){
- spawn_barrier.wait();
set_thread_priority_safe();
//store a reference to the flow control monitor (offset by max dsps)
@@ -231,7 +223,7 @@ void usrp2_impl::io_impl::recv_pirate_loop(
//TODO unknown received packet, may want to print error...
}
}catch(const std::exception &e){
- UHD_MSG(error) << "Error (usrp2 recv pirate loop): " << e.what() << std::endl;
+ UHD_MSG(error) << "Error in recv pirate loop: " << e.what() << std::endl;
}
}
}
@@ -264,17 +256,14 @@ void usrp2_impl::io_init(void){
}
//create a new pirate thread for each zc if (yarr!!)
- boost::barrier spawn_barrier(_mbc.size()+1);
size_t index = 0;
BOOST_FOREACH(const std::string &mb, _mbc.keys()){
//spawn a new pirate to plunder the recv booty
- _io_impl->recv_pirate_crew.create_thread(boost::bind(
- &usrp2_impl::io_impl::recv_pirate_loop,
- _io_impl.get(), boost::ref(spawn_barrier),
+ _io_impl->pirate_tasks.push_back(task::make(boost::bind(
+ &usrp2_impl::io_impl::recv_pirate_loop, _io_impl.get(),
_mbc[mb].err_xports.at(0), index++
- ));
+ )));
}
- spawn_barrier.wait();
//init some handler stuff
_io_impl->recv_handler.set_vrt_unpacker(&vrt::if_hdr_unpack_be);
diff --git a/host/lib/usrp/usrp2/usrp2_iface.cpp b/host/lib/usrp/usrp2/usrp2_iface.cpp
index 0db9e5d58..b1347119b 100644
--- a/host/lib/usrp/usrp2/usrp2_iface.cpp
+++ b/host/lib/usrp/usrp2/usrp2_iface.cpp
@@ -20,15 +20,15 @@
#include "usrp2_iface.hpp"
#include <uhd/exception.hpp>
#include <uhd/utils/msg.hpp>
+#include <uhd/utils/tasks.hpp>
#include <uhd/types/dict.hpp>
#include <boost/thread.hpp>
#include <boost/foreach.hpp>
#include <boost/asio.hpp> //used for htonl and ntohl
#include <boost/assign/list_of.hpp>
#include <boost/format.hpp>
+#include <boost/bind.hpp>
#include <boost/tokenizer.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/thread/barrier.hpp>
#include <boost/functional/hash.hpp>
#include <algorithm>
#include <iostream>
@@ -110,13 +110,12 @@ public:
void lock_device(bool lock){
if (lock){
- boost::barrier spawn_barrier(2);
- _lock_thread_group.create_thread(boost::bind(&usrp2_iface_impl::lock_loop, this, boost::ref(spawn_barrier)));
- spawn_barrier.wait();
+ this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_GPID, boost::uint32_t(get_gpid()));
+ _lock_task = task::make(boost::bind(&usrp2_iface_impl::lock_task, this));
}
else{
- _lock_thread_group.interrupt_all();
- _lock_thread_group.join_all();
+ _lock_task.reset(); //shutdown the task
+ this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, 0); //unlock
}
}
@@ -132,29 +131,12 @@ public:
return lock_gpid != boost::uint32_t(get_gpid());
}
- void lock_loop(boost::barrier &spawn_barrier){
- spawn_barrier.wait();
-
- try{
- this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_GPID, boost::uint32_t(get_gpid()));
- while(true){
- //re-lock in loop
- boost::uint32_t curr_secs = this->peek32(U2_REG_TIME64_SECS_RB_IMM);
- this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, curr_secs);
- //sleep for a bit
- boost::this_thread::sleep(boost::posix_time::milliseconds(1500));
- }
- }
- catch(const boost::thread_interrupted &){
- this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, 0); //unlock on exit
- }
- catch(const std::exception &e){
- UHD_MSG(error)
- << "An unexpected exception was caught in the locker loop." << std::endl
- << "The device will automatically unlock from this process." << std::endl
- << e.what() << std::endl
- ;
- }
+ void lock_task(void){
+ //re-lock in task
+ boost::uint32_t curr_secs = this->peek32(U2_REG_TIME64_SECS_RB_IMM);
+ this->get_reg<boost::uint32_t, USRP2_REG_ACTION_FW_POKE32>(U2_FW_REG_LOCK_TIME, curr_secs);
+ //sleep for a bit
+ boost::this_thread::sleep(boost::posix_time::milliseconds(1500));
}
/***********************************************************************
@@ -400,7 +382,7 @@ private:
boost::uint32_t _protocol_compat;
//lock thread stuff
- boost::thread_group _lock_thread_group;
+ task::sptr _lock_task;
};
/***********************************************************************
diff --git a/host/lib/utils/CMakeLists.txt b/host/lib/utils/CMakeLists.txt
index c8268c7b0..fd3249099 100644
--- a/host/lib/utils/CMakeLists.txt
+++ b/host/lib/utils/CMakeLists.txt
@@ -136,5 +136,6 @@ LIBUHD_APPEND_SOURCES(
${CMAKE_CURRENT_SOURCE_DIR}/paths.cpp
${CMAKE_CURRENT_SOURCE_DIR}/props.cpp
${CMAKE_CURRENT_SOURCE_DIR}/static.cpp
+ ${CMAKE_CURRENT_SOURCE_DIR}/tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/thread_priority.cpp
)
diff --git a/host/lib/utils/tasks.cpp b/host/lib/utils/tasks.cpp
new file mode 100644
index 000000000..ef56bb2de
--- /dev/null
+++ b/host/lib/utils/tasks.cpp
@@ -0,0 +1,75 @@
+//
+// Copyright 2011 Ettus Research LLC
+//
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+//
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see <http://www.gnu.org/licenses/>.
+//
+
+#include <uhd/utils/tasks.hpp>
+#include <uhd/utils/msg.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
+#include <exception>
+#include <iostream>
+
+using namespace uhd;
+
+class task_impl : public task{
+public:
+
+ task_impl(const task_fcn_type &task_fcn){
+ boost::barrier spawn_barrier(2);
+ _thread_group.create_thread(boost::bind(&task_impl::task_loop, this, task_fcn, boost::ref(spawn_barrier)));
+ spawn_barrier.wait();
+ }
+
+ ~task_impl(void){
+ _thread_group.interrupt_all();
+ _thread_group.join_all();
+ }
+
+private:
+
+ void task_loop(const task_fcn_type &task_fcn, boost::barrier &spawn_barrier){
+ spawn_barrier.wait();
+
+ try{
+ while (not boost::this_thread::interruption_requested()){
+ task_fcn();
+ }
+ }
+ catch(const boost::thread_interrupted &){
+ //this is an ok way to exit the task loop
+ }
+ catch(const std::exception &e){
+ do_error_msg(e.what());
+ }
+ catch(...){
+ do_error_msg("unknown exception");
+ }
+ }
+
+ void do_error_msg(const std::string &msg){
+ UHD_MSG(error)
+ << "An unexpected exception was caught in a task loop." << std::endl
+ << "The task loop will now exit, things may not work." << std::endl
+ << msg << std::endl
+ ;
+ }
+
+ boost::thread_group _thread_group;
+};
+
+task::sptr task::make(const task_fcn_type &task_fcn){
+ return task::sptr(new task_impl(task_fcn));
+}