From 1c5076ea68345e74de35cad43e4a4b4adf68fa15 Mon Sep 17 00:00:00 2001
From: Josh Blum <josh@joshknows.com>
Date: Thu, 31 Mar 2011 15:00:56 -0700
Subject: uhd: implemented boost barriers on all code that creates threads

The barrier ensures that the thread must spawn before the caller exits.

Some of the code already used a mutex to accomplish this,
however cygwin chokes when a mutex is locked twice by the same thread.
Mutex implementations were replaced with the barrier implementation.
Also the barrier implementation is far cleaner.
---
 host/lib/usrp/usrp1/soft_time_ctrl.cpp | 14 ++++++++------
 host/lib/usrp/usrp2/io_impl.cpp        | 25 ++++++++++++-------------
 host/lib/usrp/usrp_e100/io_impl.cpp    | 16 +++++++++++-----
 3 files changed, 31 insertions(+), 24 deletions(-)

(limited to 'host/lib/usrp')

diff --git a/host/lib/usrp/usrp1/soft_time_ctrl.cpp b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
index e1b671811..1bab34e7b 100644
--- a/host/lib/usrp/usrp1/soft_time_ctrl.cpp
+++ b/host/lib/usrp/usrp1/soft_time_ctrl.cpp
@@ -19,6 +19,7 @@
 #include <uhd/transport/bounded_buffer.hpp>
 #include <boost/any.hpp>
 #include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
 #include <boost/thread/condition_variable.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
 #include <iostream>
@@ -43,10 +44,11 @@ public:
         _stream_on_off(stream_on_off)
     {
         //synchronously spawn a new thread
-        _update_mutex.lock(); //lock mutex before spawned
-        _thread_group.create_thread(boost::bind(&soft_time_ctrl_impl::recv_cmd_dispatcher, this));
-        _update_mutex.lock(); //lock blocks until spawned
-        _update_mutex.unlock(); //unlock mutex before done
+        boost::barrier spawn_barrier(2);
+        _thread_group.create_thread(boost::bind(
+            &soft_time_ctrl_impl::recv_cmd_dispatcher, this, boost::ref(spawn_barrier))
+        );
+        spawn_barrier.wait();
 
         //initialize the time to something
         this->set_time(time_spec_t(0.0));
@@ -175,8 +177,8 @@ public:
         _stream_mode = cmd.stream_mode;
     }
 
-    void recv_cmd_dispatcher(void){
-        _update_mutex.unlock();
+    void recv_cmd_dispatcher(boost::barrier &spawn_barrier){
+        spawn_barrier.wait();
         try{
             boost::any cmd;
             while (true){
diff --git a/host/lib/usrp/usrp2/io_impl.cpp b/host/lib/usrp/usrp2/io_impl.cpp
index 340e9d155..07cbd2432 100644
--- a/host/lib/usrp/usrp2/io_impl.cpp
+++ b/host/lib/usrp/usrp2/io_impl.cpp
@@ -25,7 +25,8 @@
 #include <uhd/transport/bounded_buffer.hpp>
 #include <boost/format.hpp>
 #include <boost/bind.hpp>
-#include <boost/thread.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
 #include <iostream>
 
 using namespace uhd;
@@ -209,11 +210,10 @@ struct usrp2_impl::io_impl{
     vrt_packet_handler::send_state packet_handler_send_state;
 
     //methods and variables for the pirate crew
-    void recv_pirate_loop(usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t);
+    void recv_pirate_loop(boost::barrier &, usrp2_mboard_impl::sptr, zero_copy_if::sptr, size_t);
     boost::thread_group recv_pirate_crew;
     bool recv_pirate_crew_raiding;
     bounded_buffer<async_metadata_t> async_msg_fifo;
-    boost::mutex spawn_mutex;
 };
 
 /***********************************************************************
@@ -223,13 +223,15 @@ struct usrp2_impl::io_impl{
  * - put async message packets into queue
  **********************************************************************/
 void usrp2_impl::io_impl::recv_pirate_loop(
-    usrp2_mboard_impl::sptr mboard, zero_copy_if::sptr err_xport, size_t index
+    boost::barrier &spawn_barrier,
+    usrp2_mboard_impl::sptr mboard,
+    zero_copy_if::sptr err_xport,
+    size_t index
 ){
+    spawn_barrier.wait();
     set_thread_priority_safe();
     recv_pirate_crew_raiding = true;
 
-    spawn_mutex.unlock();
-
     //store a reference to the flow control monitor (offset by max dsps)
     flow_control_monitor &fc_mon = *(this->fc_mons[index*usrp2_mboard_impl::MAX_NUM_DSPS]);
 
@@ -286,19 +288,16 @@ void usrp2_impl::io_init(void){
     _io_impl = UHD_PIMPL_MAKE(io_impl, (dsp_xports));
 
     //create a new pirate thread for each zc if (yarr!!)
+    boost::barrier spawn_barrier(_mboards.size()+1);
     for (size_t i = 0; i < _mboards.size(); i++){
-        //lock the unlocked mutex (non-blocking)
-        _io_impl->spawn_mutex.lock();
         //spawn a new pirate to plunder the recv booty
         _io_impl->recv_pirate_crew.create_thread(boost::bind(
             &usrp2_impl::io_impl::recv_pirate_loop,
-            _io_impl.get(), _mboards.at(i), err_xports.at(i), i
+            _io_impl.get(), boost::ref(spawn_barrier),
+            _mboards.at(i), err_xports.at(i), i
         ));
-        //block here until the spawned thread unlocks
-        _io_impl->spawn_mutex.lock();
-        //exit loop iteration in an unlocked condition
-        _io_impl->spawn_mutex.unlock();
     }
+    spawn_barrier.wait();
 
     //update mapping here since it didnt b4 when io init not called first
     update_xport_channel_mapping();
diff --git a/host/lib/usrp/usrp_e100/io_impl.cpp b/host/lib/usrp/usrp_e100/io_impl.cpp
index fc6aaeaee..cbab5a761 100644
--- a/host/lib/usrp/usrp_e100/io_impl.cpp
+++ b/host/lib/usrp/usrp_e100/io_impl.cpp
@@ -23,7 +23,8 @@
 #include "../../transport/vrt_packet_handler.hpp"
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
-#include <boost/thread.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/thread/barrier.hpp>
 #include <iostream>
 
 using namespace uhd;
@@ -93,7 +94,7 @@ struct usrp_e100_impl::io_impl{
     bool continuous_streaming;
 
     //a pirate's life is the life for me!
-    void recv_pirate_loop(usrp_e100_clock_ctrl::sptr);
+    void recv_pirate_loop(boost::barrier &, usrp_e100_clock_ctrl::sptr);
     bounded_buffer<managed_recv_buffer::sptr> recv_pirate_booty;
     bounded_buffer<async_metadata_t> async_msg_fifo;
     boost::thread_group recv_pirate_crew;
@@ -105,8 +106,10 @@ struct usrp_e100_impl::io_impl{
  * - while raiding, loot for recv buffers
  * - put booty into the alignment buffer
  **********************************************************************/
-void usrp_e100_impl::io_impl::recv_pirate_loop(usrp_e100_clock_ctrl::sptr clock_ctrl)
-{
+void usrp_e100_impl::io_impl::recv_pirate_loop(
+    boost::barrier &spawn_barrier, usrp_e100_clock_ctrl::sptr clock_ctrl
+){
+    spawn_barrier.wait();
     set_thread_priority_safe();
     recv_pirate_crew_raiding = true;
 
@@ -201,9 +204,12 @@ void usrp_e100_impl::io_init(void){
     _iface->poke32(UE_REG_CTRL_TX_POLICY, UE_FLAG_CTRL_TX_POLICY_NEXT_PACKET);
 
     //spawn a pirate, yarrr!
+    boost::barrier spawn_barrier(2);
     _io_impl->recv_pirate_crew.create_thread(boost::bind(
-        &usrp_e100_impl::io_impl::recv_pirate_loop, _io_impl.get(), _clock_ctrl
+        &usrp_e100_impl::io_impl::recv_pirate_loop, _io_impl.get(),
+        boost::ref(spawn_barrier), _clock_ctrl
     ));
+    spawn_barrier.wait();
 }
 
 void usrp_e100_impl::issue_stream_cmd(const stream_cmd_t &stream_cmd){
-- 
cgit v1.2.3