diff options
| author | Josh Blum <josh@joshknows.com> | 2011-03-31 15:00:56 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2011-03-31 15:00:56 -0700 | 
| commit | 1c5076ea68345e74de35cad43e4a4b4adf68fa15 (patch) | |
| tree | ad9f49f783761e89054a7bee5a0c20c9b406da35 | |
| parent | 48f6e1f8aae24ee4ff3b15232cfc335b0210ed11 (diff) | |
| download | uhd-1c5076ea68345e74de35cad43e4a4b4adf68fa15.tar.gz uhd-1c5076ea68345e74de35cad43e4a4b4adf68fa15.tar.bz2 uhd-1c5076ea68345e74de35cad43e4a4b4adf68fa15.zip | |
uhd: implemented boost barriers on all code that creates threads
The barrier ensures that the thread must spawn before the caller exits.
Some of the code already used a mutex to accomplish this,
however cygwin chokes when a mutex is locked twice by the same thread.
Mutex implementations were replaced with the barrier implementation.
Also the barrier implementation is far cleaner.
| -rw-r--r-- | host/lib/transport/libusb1_zero_copy.cpp | 8 | ||||
| -rw-r--r-- | host/lib/usrp/usrp1/soft_time_ctrl.cpp | 14 | ||||
| -rw-r--r-- | host/lib/usrp/usrp2/io_impl.cpp | 25 | ||||
| -rw-r--r-- | host/lib/usrp/usrp_e100/io_impl.cpp | 16 | 
4 files changed, 37 insertions, 26 deletions
| diff --git a/host/lib/transport/libusb1_zero_copy.cpp b/host/lib/transport/libusb1_zero_copy.cpp index 9f38ce97b..fe6936c7e 100644 --- a/host/lib/transport/libusb1_zero_copy.cpp +++ b/host/lib/transport/libusb1_zero_copy.cpp @@ -24,6 +24,7 @@  #include <boost/function.hpp>  #include <boost/foreach.hpp>  #include <boost/thread/thread.hpp> +#include <boost/thread/barrier.hpp>  #include <list>  #include <iostream> @@ -191,9 +192,11 @@ public:          //spawn the event handler threads          size_t concurrency = hints.cast<size_t>("concurrency_hint", 1); +        boost::barrier spawn_barrier(concurrency+1);          for (size_t i = 0; i < concurrency; i++) _thread_group.create_thread( -            boost::bind(&libusb_zero_copy_impl::run_event_loop, this) +            boost::bind(&libusb_zero_copy_impl::run_event_loop, this, boost::ref(spawn_barrier))          ); +        spawn_barrier.wait();      }      ~libusb_zero_copy_impl(void){ @@ -263,7 +266,8 @@ private:      boost::thread_group _thread_group;      bool _threads_running; -    void run_event_loop(void){ +    void run_event_loop(boost::barrier &spawn_barrier){ +        spawn_barrier.wait();          set_thread_priority_safe();          libusb_context *context = libusb::session::get_global_session()->get_context();          _threads_running = true; diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp index e1b671811..1bab34e7b 100644 --- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp +++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp @@ -19,6 +19,7 @@  #include <uhd/transport/bounded_buffer.hpp>  #include <boost/any.hpp>  #include <boost/thread/thread.hpp> +#include <boost/thread/barrier.hpp>  #include <boost/thread/condition_variable.hpp>  #include <boost/date_time/posix_time/posix_time.hpp>  #include <iostream> @@ -43,10 +44,11 @@ public:          _stream_on_off(stream_on_off)      {          //synchronously spawn a new thread -        _update_mutex.lock(); //lock mutex before spawned -        _thread_group.create_thread(boost::bind(&soft_time_ctrl_impl::recv_cmd_dispatcher, this)); -        _update_mutex.lock(); //lock blocks until spawned -        _update_mutex.unlock(); //unlock mutex before done +        boost::barrier spawn_barrier(2); +        _thread_group.create_thread(boost::bind( +            &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier)) +        ); +        spawn_barrier.wait();          //initialize the time to something          this->set_time(time_spec_t(0.0)); @@ -175,8 +177,8 @@ public:          _stream_mode = cmd.stream_mode;      } -    void recv_cmd_dispatcher(void){ -        _update_mutex.unlock(); +    void recv_cmd_dispatcher(boost::barrier &spawn_barrier){ +        spawn_barrier.wait();          try{              boost::any cmd;              while (true){ diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp index 340e9d155..07cbd2432 100644 --- a/host/lib/usrp/usrp2/io_impl.cpp +++ b/host/lib/usrp/usrp2/io_impl.cpp @@ -25,7 +25,8 @@  #include <uhd/transport/bounded_buffer.hpp>  #include <boost/format.hpp>  #include <boost/bind.hpp> -#include <boost/thread.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/barrier.hpp>  #include <iostream>  using namespace uhd; @@ -209,11 +210,10 @@ struct usrp2_impl::io_impl{      vrt_packet_handler::send_state packet_handler_send_state;      //methods and variables for the pirate crew -    void recv_pirate_loop(usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t); +    void recv_pirate_loop(boost::barrier &, usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t);      boost::thread_group recv_pirate_crew;      bool recv_pirate_crew_raiding;      bounded_buffer<async_metadata_t> async_msg_fifo; -    boost::mutex spawn_mutex;  };  /*********************************************************************** @@ -223,13 +223,15 @@ struct usrp2_impl::io_impl{   * - put async message packets into queue   **********************************************************************/  void usrp2_impl::io_impl::recv_pirate_loop( -    usrp2_mboard_impl::sptr mboard, zero_copy_if::sptr err_xport, size_t index +    boost::barrier &spawn_barrier, +    usrp2_mboard_impl::sptr mboard, +    zero_copy_if::sptr err_xport, +    size_t index  ){ +    spawn_barrier.wait();      set_thread_priority_safe();      recv_pirate_crew_raiding = true; -    spawn_mutex.unlock(); -      //store a reference to the flow control monitor (offset by max dsps)      flow_control_monitor &fc_mon = *(this->fc_mons[index*usrp2_mboard_impl::MAX_NUM_DSPS]); @@ -286,19 +288,16 @@ void usrp2_impl::io_init(void){      _io_impl = UHD_PIMPL_MAKE(io_impl, (dsp_xports));      //create a new pirate thread for each zc if (yarr!!) +    boost::barrier spawn_barrier(_mboards.size()+1);      for (size_t i = 0; i < _mboards.size(); i++){ -        //lock the unlocked mutex (non-blocking) -        _io_impl->spawn_mutex.lock();          //spawn a new pirate to plunder the recv booty          _io_impl->recv_pirate_crew.create_thread(boost::bind(              &usrp2_impl::io_impl::recv_pirate_loop, -            _io_impl.get(), _mboards.at(i), err_xports.at(i), i +            _io_impl.get(), boost::ref(spawn_barrier), +            _mboards.at(i), err_xports.at(i), i          )); -        //block here until the spawned thread unlocks -        _io_impl->spawn_mutex.lock(); -        //exit loop iteration in an unlocked condition -        _io_impl->spawn_mutex.unlock();      } +    spawn_barrier.wait();      //update mapping here since it didnt b4 when io init not called first      update_xport_channel_mapping(); diff --git a/host/lib/usrp/usrp_e100/io_impl.cpp b/host/lib/usrp/usrp_e100/io_impl.cpp index fc6aaeaee..cbab5a761 100644 --- a/host/lib/usrp/usrp_e100/io_impl.cpp +++ b/host/lib/usrp/usrp_e100/io_impl.cpp @@ -23,7 +23,8 @@  #include "../../transport/vrt_packet_handler.hpp"  #include <boost/bind.hpp>  #include <boost/format.hpp> -#include <boost/thread.hpp> +#include <boost/thread/thread.hpp> +#include <boost/thread/barrier.hpp>  #include <iostream>  using namespace uhd; @@ -93,7 +94,7 @@ struct usrp_e100_impl::io_impl{      bool continuous_streaming;      //a pirate's life is the life for me! -    void recv_pirate_loop(usrp_e100_clock_ctrl::sptr); +    void recv_pirate_loop(boost::barrier &, usrp_e100_clock_ctrl::sptr);      bounded_buffer<managed_recv_buffer::sptr> recv_pirate_booty;      bounded_buffer<async_metadata_t> async_msg_fifo;      boost::thread_group recv_pirate_crew; @@ -105,8 +106,10 @@ struct usrp_e100_impl::io_impl{   * - while raiding, loot for recv buffers   * - put booty into the alignment buffer   **********************************************************************/ -void usrp_e100_impl::io_impl::recv_pirate_loop(usrp_e100_clock_ctrl::sptr clock_ctrl) -{ +void usrp_e100_impl::io_impl::recv_pirate_loop( +    boost::barrier &spawn_barrier, usrp_e100_clock_ctrl::sptr clock_ctrl +){ +    spawn_barrier.wait();      set_thread_priority_safe();      recv_pirate_crew_raiding = true; @@ -201,9 +204,12 @@ void usrp_e100_impl::io_init(void){      _iface->poke32(UE_REG_CTRL_TX_POLICY, UE_FLAG_CTRL_TX_POLICY_NEXT_PACKET);      //spawn a pirate, yarrr! +    boost::barrier spawn_barrier(2);      _io_impl->recv_pirate_crew.create_thread(boost::bind( -        &usrp_e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), _clock_ctrl +        &usrp_e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), +        boost::ref(spawn_barrier), _clock_ctrl      )); +    spawn_barrier.wait();  }  void usrp_e100_impl::issue_stream_cmd(const stream_cmd_t &stream_cmd){ | 
