aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosh Blum <josh@joshknows.com>2011-03-31 15:00:56 -0700
committerJosh Blum <josh@joshknows.com>2011-03-31 15:00:56 -0700
commit1c5076ea68345e74de35cad43e4a4b4adf68fa15 (patch)
treead9f49f783761e89054a7bee5a0c20c9b406da35
parent48f6e1f8aae24ee4ff3b15232cfc335b0210ed11 (diff)
downloaduhd-1c5076ea68345e74de35cad43e4a4b4adf68fa15.tar.gz
uhd-1c5076ea68345e74de35cad43e4a4b4adf68fa15.tar.bz2
uhd-1c5076ea68345e74de35cad43e4a4b4adf68fa15.zip
uhd: implemented boost barriers on all code that creates threads
The barrier ensures that the thread must spawn before the caller exits. Some of the code already used a mutex to accomplish this, however cygwin chokes when a mutex is locked twice by the same thread. Mutex implementations were replaced with the barrier implementation. Also the barrier implementation is far cleaner.
-rw-r--r--host/lib/transport/libusb1_zero_copy.cpp8
-rw-r--r--host/lib/usrp/usrp1/soft_time_ctrl.cpp14
-rw-r--r--host/lib/usrp/usrp2/io_impl.cpp25
-rw-r--r--host/lib/usrp/usrp_e100/io_impl.cpp16
4 files changed, 37 insertions, 26 deletions
diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp
index 9f38ce97b..fe6936c7e 100644
--- a/host/lib/transport/libusb1_zero_copy.cpp
+++ b/host/lib/transport/libusb1_zero_copy.cpp
@@ -24,6 +24,7 @@
#include <boost/function.hpp>
#include <boost/foreach.hpp>
#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
#include <list>
#include <iostream>
@@ -191,9 +192,11 @@ public:
//spawn the event handler threads
size_t concurrency = hints.cast<size_t>("concurrency_hint", 1);
+ boost::barrier spawn_barrier(concurrency+1);
for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread(
- boost::bind(&libusb_zero_copy_impl::run_event_loop, this)
+ boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier))
);
+ spawn_barrier.wait();
}
~libusb_zero_copy_impl(void){
@@ -263,7 +266,8 @@ private:
boost::thread_group _thread_group;
bool _threads_running;
- void run_event_loop(void){
+ void run_event_loop(boost::barrier &spawn_barrier){
+ spawn_barrier.wait();
set_thread_priority_safe();
libusb_context *context = libusb::session::get_global_session()->get_context();
_threads_running = true;
diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
index e1b671811..1bab34e7b 100644
--- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp
+++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
@@ -19,6 +19,7 @@
#include <uhd/transport/bounded_buffer.hpp>
#include <boost/any.hpp>
#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <iostream>
@@ -43,10 +44,11 @@ public:
_stream_on_off(stream_on_off)
{
//synchronously spawn a new thread
- _update_mutex.lock(); //lock mutex before spawned
- _thread_group.create_thread(boost::bind(&soft_time_ctrl_impl::recv_cmd_dispatcher, this));
- _update_mutex.lock(); //lock blocks until spawned
- _update_mutex.unlock(); //unlock mutex before done
+ boost::barrier spawn_barrier(2);
+ _thread_group.create_thread(boost::bind(
+ &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier))
+ );
+ spawn_barrier.wait();
//initialize the time to something
this->set_time(time_spec_t(0.0));
@@ -175,8 +177,8 @@ public:
_stream_mode = cmd.stream_mode;
}
- void recv_cmd_dispatcher(void){
- _update_mutex.unlock();
+ void recv_cmd_dispatcher(boost::barrier &spawn_barrier){
+ spawn_barrier.wait();
try{
boost::any cmd;
while (true){
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index 340e9d155..07cbd2432 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -25,7 +25,8 @@
#include <uhd/transport/bounded_buffer.hpp>
#include <boost/format.hpp>
#include <boost/bind.hpp>
-#include <boost/thread.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
#include <iostream>
using namespace uhd;
@@ -209,11 +210,10 @@ struct usrp2_impl::io_impl{
vrt_packet_handler::send_state packet_handler_send_state;
//methods and variables for the pirate crew
- void recv_pirate_loop(usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t);
+ void recv_pirate_loop(boost::barrier &, usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t);
boost::thread_group recv_pirate_crew;
bool recv_pirate_crew_raiding;
bounded_buffer<async_metadata_t> async_msg_fifo;
- boost::mutex spawn_mutex;
};
/***********************************************************************
@@ -223,13 +223,15 @@ struct usrp2_impl::io_impl{
* - put async message packets into queue
**********************************************************************/
void usrp2_impl::io_impl::recv_pirate_loop(
- usrp2_mboard_impl::sptr mboard, zero_copy_if::sptr err_xport, size_t index
+ boost::barrier &spawn_barrier,
+ usrp2_mboard_impl::sptr mboard,
+ zero_copy_if::sptr err_xport,
+ size_t index
){
+ spawn_barrier.wait();
set_thread_priority_safe();
recv_pirate_crew_raiding = true;
- spawn_mutex.unlock();
-
//store a reference to the flow control monitor (offset by max dsps)
flow_control_monitor &fc_mon = *(this->fc_mons[index*usrp2_mboard_impl::MAX_NUM_DSPS]);
@@ -286,19 +288,16 @@ void usrp2_impl::io_init(void){
_io_impl = UHD_PIMPL_MAKE(io_impl, (dsp_xports));
//create a new pirate thread for each zc if (yarr!!)
+ boost::barrier spawn_barrier(_mboards.size()+1);
for (size_t i = 0; i < _mboards.size(); i++){
- //lock the unlocked mutex (non-blocking)
- _io_impl->spawn_mutex.lock();
//spawn a new pirate to plunder the recv booty
_io_impl->recv_pirate_crew.create_thread(boost::bind(
&usrp2_impl::io_impl::recv_pirate_loop,
- _io_impl.get(), _mboards.at(i), err_xports.at(i), i
+ _io_impl.get(), boost::ref(spawn_barrier),
+ _mboards.at(i), err_xports.at(i), i
));
- //block here until the spawned thread unlocks
- _io_impl->spawn_mutex.lock();
- //exit loop iteration in an unlocked condition
- _io_impl->spawn_mutex.unlock();
}
+ spawn_barrier.wait();
//update mapping here since it didnt b4 when io init not called first
update_xport_channel_mapping();
diff --git a/host/lib/usrp/usrp_e100/io_impl.cpp b/host/lib/usrp/usrp_e100/io_impl.cpp
index fc6aaeaee..cbab5a761 100644
--- a/host/lib/usrp/usrp_e100/io_impl.cpp
+++ b/host/lib/usrp/usrp_e100/io_impl.cpp
@@ -23,7 +23,8 @@
#include "../../transport/vrt_packet_handler.hpp"
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include <boost/thread.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
#include <iostream>
using namespace uhd;
@@ -93,7 +94,7 @@ struct usrp_e100_impl::io_impl{
bool continuous_streaming;
//a pirate's life is the life for me!
- void recv_pirate_loop(usrp_e100_clock_ctrl::sptr);
+ void recv_pirate_loop(boost::barrier &, usrp_e100_clock_ctrl::sptr);
bounded_buffer<managed_recv_buffer::sptr> recv_pirate_booty;
bounded_buffer<async_metadata_t> async_msg_fifo;
boost::thread_group recv_pirate_crew;
@@ -105,8 +106,10 @@ struct usrp_e100_impl::io_impl{
* - while raiding, loot for recv buffers
* - put booty into the alignment buffer
**********************************************************************/
-void usrp_e100_impl::io_impl::recv_pirate_loop(usrp_e100_clock_ctrl::sptr clock_ctrl)
-{
+void usrp_e100_impl::io_impl::recv_pirate_loop(
+ boost::barrier &spawn_barrier, usrp_e100_clock_ctrl::sptr clock_ctrl
+){
+ spawn_barrier.wait();
set_thread_priority_safe();
recv_pirate_crew_raiding = true;
@@ -201,9 +204,12 @@ void usrp_e100_impl::io_init(void){
_iface->poke32(UE_REG_CTRL_TX_POLICY, UE_FLAG_CTRL_TX_POLICY_NEXT_PACKET);
//spawn a pirate, yarrr!
+ boost::barrier spawn_barrier(2);
_io_impl->recv_pirate_crew.create_thread(boost::bind(
- &usrp_e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), _clock_ctrl
+ &usrp_e100_impl::io_impl::recv_pirate_loop, _io_impl.get(),
+ boost::ref(spawn_barrier), _clock_ctrl
));
+ spawn_barrier.wait();
}
void usrp_e100_impl::issue_stream_cmd(const stream_cmd_t &stream_cmd){