From f1108bd25a369a83ef8227c7564e812d4e27f369 Mon Sep 17 00:00:00 2001
From: Josh Blum <josh@joshknows.com>
Date: Tue, 2 Apr 2013 10:22:34 -0700
Subject: uhd: switch the reusable barrier to condition variables

This allows the converter threads in a multi-threaded streamer to wait quietly.
In addition, the use of two barriers in the packet handlers was reduced to one,
by adding a simple exit barrier inside the reusable barrier's wait method.
---
 host/include/uhd/utils/atomic.hpp                | 53 ++++++++++++++++++------
 host/lib/transport/super_recv_packet_handler.hpp | 14 +++----
 host/lib/transport/super_send_packet_handler.hpp | 14 +++----
 3 files changed, 53 insertions(+), 28 deletions(-)

diff --git a/host/include/uhd/utils/atomic.hpp b/host/include/uhd/utils/atomic.hpp
index 7a81d8d5e..8ddee73ca 100644
--- a/host/include/uhd/utils/atomic.hpp
+++ b/host/include/uhd/utils/atomic.hpp
@@ -1,5 +1,5 @@
 //
-// Copyright 2012 Ettus Research LLC
+// Copyright 2012-2013 Ettus Research LLC
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU General Public License as published by
@@ -21,6 +21,8 @@
 #include <uhd/config.hpp>
 #include <uhd/types/time_spec.hpp>
 #include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
 #include <boost/interprocess/detail/atomic.hpp>
 
 #include <boost/version.hpp>
@@ -79,7 +81,6 @@ namespace uhd{
         //! Resize the barrier for N threads
         void resize(const size_t size){
             _size = size;
-            _count.write(size);
         }
 
         /*!
@@ -88,24 +89,52 @@ namespace uhd{
          */
         void interrupt(void)
         {
-            _count.write(boost::uint32_t(~0));
+            _done.inc();
         }
 
         //! Wait on the barrier condition
-        UHD_INLINE void wait(void){
-            _count.dec();
-            _count.cas(_size, 0);
-            while (_count.read() != _size){
-                boost::this_thread::interruption_point();
-                if (_count.read() == boost::uint32_t(~0))
-                    throw boost::thread_interrupted();
-                boost::this_thread::yield();
+        UHD_INLINE void wait(void)
+        {
+            if (_size == 1) return;
+
+            //entry barrier with condition variable
+            _entry_counter.inc();
+            _entry_counter.cas(0, _size);
+            boost::mutex::scoped_lock lock(_mutex);
+            while (_entry_counter.read() != 0)
+            {
+                this->check_interrupt();
+                _cond.timed_wait(lock, boost::posix_time::milliseconds(1));
             }
+            lock.unlock(); //unlock before notify
+            _cond.notify_one();
+
+            //exit barrier to ensure known condition of entry count
+            _exit_counter.inc();
+            _exit_counter.cas(0, _size);
+            while (_exit_counter.read() != 0) this->check_interrupt();
+        }
+
+        //! Wait on the barrier condition
+        UHD_INLINE void wait_others(void)
+        {
+            while (_entry_counter.read() != (_size-1)) this->check_interrupt();
         }
 
     private:
         size_t _size;
-        atomic_uint32_t _count;
+        atomic_uint32_t _entry_counter;
+        atomic_uint32_t _exit_counter;
+        atomic_uint32_t _done;
+        boost::mutex _mutex;
+        boost::condition_variable _cond;
+
+        UHD_INLINE void check_interrupt(void)
+        {
+            if (_done.read() != 0) throw boost::thread_interrupted();
+            boost::this_thread::interruption_point();
+            boost::this_thread::yield();
+        }
     };
 
     /*!
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 7a1972690..5a75d5f0d 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -1,5 +1,5 @@
 //
-// Copyright 2011-2012 Ettus Research LLC
+// Copyright 2011-2013 Ettus Research LLC
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU General Public License as published by
@@ -79,8 +79,7 @@ public:
     }
 
     ~recv_packet_handler(void){
-        _task_barrier_entry.interrupt();
-        _task_barrier_exit.interrupt();
+        _task_barrier.interrupt();
         _task_handlers.clear();
     }
 
@@ -91,8 +90,7 @@ public:
         _props.resize(size);
         //re-initialize all buffers infos by re-creating the vector
         _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
-        _task_barrier_entry.resize(size);
-        _task_barrier_exit.resize(size);
+        _task_barrier.resize(size);
         _task_handlers.resize(size);
         for (size_t i = 1/*skip 0*/; i < size; i++){
             _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
@@ -561,7 +559,7 @@ private:
      ******************************************************************/
     UHD_INLINE void converter_thread_task(const size_t index)
     {
-        _task_barrier_entry.wait();
+        _task_barrier.wait();
 
         //shortcut references to local data structures
         buffers_info_type &buff_info = get_curr_buffer_info();
@@ -587,11 +585,11 @@ private:
             info.buff.reset(); //effectively a release
         }
 
-        _task_barrier_exit.wait();
+        if (index == 0) _task_barrier.wait_others();
     }
 
     //! Shared variables for the worker threads
-    reusable_barrier _task_barrier_entry, _task_barrier_exit;
+    reusable_barrier _task_barrier;
     std::vector<task::sptr> _task_handlers;
     size_t _convert_nsamps;
     const rx_streamer::buffs_type *_convert_buffs;
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 74e893e67..726742327 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -1,5 +1,5 @@
 //
-// Copyright 2011-2012 Ettus Research LLC
+// Copyright 2011-2013 Ettus Research LLC
 //
 // This program is free software: you can redistribute it and/or modify
 // it under the terms of the GNU General Public License as published by
@@ -61,8 +61,7 @@ public:
     }
 
     ~send_packet_handler(void){
-        _task_barrier_entry.interrupt();
-        _task_barrier_exit.interrupt();
+        _task_barrier.interrupt();
         _task_handlers.clear();
     }
 
@@ -73,8 +72,7 @@ public:
         _props.resize(size);
         static const boost::uint64_t zero = 0;
         _zero_buffs.resize(size, &zero);
-        _task_barrier_entry.resize(size);
-        _task_barrier_exit.resize(size);
+        _task_barrier.resize(size);
         _task_handlers.resize(size);
         for (size_t i = 1/*skip 0*/; i < size; i++){
             _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
@@ -272,7 +270,7 @@ private:
      ******************************************************************/
     UHD_INLINE void converter_thread_task(const size_t index)
     {
-        _task_barrier_entry.wait();
+        _task_barrier.wait();
 
         //shortcut references to local data structures
         managed_send_buffer::sptr &buff = _props[index].buff;
@@ -302,11 +300,11 @@ private:
         buff->commit(num_vita_words32*sizeof(boost::uint32_t));
         buff.reset(); //effectively a release
 
-        _task_barrier_exit.wait();
+        if (index == 0) _task_barrier.wait_others();
     }
 
     //! Shared variables for the worker threads
-    reusable_barrier _task_barrier_entry, _task_barrier_exit;
+    reusable_barrier _task_barrier;
     std::vector<task::sptr> _task_handlers;
     size_t _convert_nsamps;
     const tx_streamer::buffs_type *_convert_buffs;
-- 
cgit v1.2.3