diff options
| author | Josh Blum <josh@joshknows.com> | 2013-04-02 10:22:34 -0700 | 
|---|---|---|
| committer | Josh Blum <josh@joshknows.com> | 2013-04-12 13:32:58 -0700 | 
| commit | f1108bd25a369a83ef8227c7564e812d4e27f369 (patch) | |
| tree | 82591bd942756583fcaec44ec9008e3f44119179 /host | |
| parent | cb3f554658b6c979bf9649509d8c7a6cd3b8a9a4 (diff) | |
| download | uhd-f1108bd25a369a83ef8227c7564e812d4e27f369.tar.gz uhd-f1108bd25a369a83ef8227c7564e812d4e27f369.tar.bz2 uhd-f1108bd25a369a83ef8227c7564e812d4e27f369.zip  | |
uhd: switch the reusable barrier to condition variables
This allows the converter threads in a multi-threaded streamer to wait quietly.
In addition, the use of two barriers in the packet handlers was reduced to one,
by adding a simple exit barrier inside the reusable barrier's wait method.
Diffstat (limited to 'host')
| -rw-r--r-- | host/include/uhd/utils/atomic.hpp | 53 | ||||
| -rw-r--r-- | host/lib/transport/super_recv_packet_handler.hpp | 14 | ||||
| -rw-r--r-- | host/lib/transport/super_send_packet_handler.hpp | 14 | 
3 files changed, 53 insertions, 28 deletions
diff --git a/host/include/uhd/utils/atomic.hpp b/host/include/uhd/utils/atomic.hpp index 7a81d8d5e..8ddee73ca 100644 --- a/host/include/uhd/utils/atomic.hpp +++ b/host/include/uhd/utils/atomic.hpp @@ -1,5 +1,5 @@  // -// Copyright 2012 Ettus Research LLC +// Copyright 2012-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -21,6 +21,8 @@  #include <uhd/config.hpp>  #include <uhd/types/time_spec.hpp>  #include <boost/thread/thread.hpp> +#include <boost/thread/mutex.hpp> +#include <boost/thread/condition_variable.hpp>  #include <boost/interprocess/detail/atomic.hpp>  #include <boost/version.hpp> @@ -79,7 +81,6 @@ namespace uhd{          //! Resize the barrier for N threads          void resize(const size_t size){              _size = size; -            _count.write(size);          }          /*! @@ -88,24 +89,52 @@ namespace uhd{           */          void interrupt(void)          { -            _count.write(boost::uint32_t(~0)); +            _done.inc();          }          //! Wait on the barrier condition -        UHD_INLINE void wait(void){ -            _count.dec(); -            _count.cas(_size, 0); -            while (_count.read() != _size){ -                boost::this_thread::interruption_point(); -                if (_count.read() == boost::uint32_t(~0)) -                    throw boost::thread_interrupted(); -                boost::this_thread::yield(); +        UHD_INLINE void wait(void) +        { +            if (_size == 1) return; + +            //entry barrier with condition variable +            _entry_counter.inc(); +            _entry_counter.cas(0, _size); +            boost::mutex::scoped_lock lock(_mutex); +            while (_entry_counter.read() != 0) +            { +                this->check_interrupt(); +                _cond.timed_wait(lock, boost::posix_time::milliseconds(1));              } +            lock.unlock(); //unlock before notify +            _cond.notify_one(); + +            //exit barrier to ensure known condition of entry count +            _exit_counter.inc(); +            _exit_counter.cas(0, _size); +            while (_exit_counter.read() != 0) this->check_interrupt(); +        } + +        //! Wait on the barrier condition +        UHD_INLINE void wait_others(void) +        { +            while (_entry_counter.read() != (_size-1)) this->check_interrupt();          }      private:          size_t _size; -        atomic_uint32_t _count; +        atomic_uint32_t _entry_counter; +        atomic_uint32_t _exit_counter; +        atomic_uint32_t _done; +        boost::mutex _mutex; +        boost::condition_variable _cond; + +        UHD_INLINE void check_interrupt(void) +        { +            if (_done.read() != 0) throw boost::thread_interrupted(); +            boost::this_thread::interruption_point(); +            boost::this_thread::yield(); +        }      };      /*! diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp index 7a1972690..5a75d5f0d 100644 --- a/host/lib/transport/super_recv_packet_handler.hpp +++ b/host/lib/transport/super_recv_packet_handler.hpp @@ -1,5 +1,5 @@  // -// Copyright 2011-2012 Ettus Research LLC +// Copyright 2011-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -79,8 +79,7 @@ public:      }      ~recv_packet_handler(void){ -        _task_barrier_entry.interrupt(); -        _task_barrier_exit.interrupt(); +        _task_barrier.interrupt();          _task_handlers.clear();      } @@ -91,8 +90,7 @@ public:          _props.resize(size);          //re-initialize all buffers infos by re-creating the vector          _buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size)); -        _task_barrier_entry.resize(size); -        _task_barrier_exit.resize(size); +        _task_barrier.resize(size);          _task_handlers.resize(size);          for (size_t i = 1/*skip 0*/; i < size; i++){              _task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i)); @@ -561,7 +559,7 @@ private:       ******************************************************************/      UHD_INLINE void converter_thread_task(const size_t index)      { -        _task_barrier_entry.wait(); +        _task_barrier.wait();          //shortcut references to local data structures          buffers_info_type &buff_info = get_curr_buffer_info(); @@ -587,11 +585,11 @@ private:              info.buff.reset(); //effectively a release          } -        _task_barrier_exit.wait(); +        if (index == 0) _task_barrier.wait_others();      }      //! Shared variables for the worker threads -    reusable_barrier _task_barrier_entry, _task_barrier_exit; +    reusable_barrier _task_barrier;      std::vector<task::sptr> _task_handlers;      size_t _convert_nsamps;      const rx_streamer::buffs_type *_convert_buffs; diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp index 74e893e67..726742327 100644 --- a/host/lib/transport/super_send_packet_handler.hpp +++ b/host/lib/transport/super_send_packet_handler.hpp @@ -1,5 +1,5 @@  // -// Copyright 2011-2012 Ettus Research LLC +// Copyright 2011-2013 Ettus Research LLC  //  // This program is free software: you can redistribute it and/or modify  // it under the terms of the GNU General Public License as published by @@ -61,8 +61,7 @@ public:      }      ~send_packet_handler(void){ -        _task_barrier_entry.interrupt(); -        _task_barrier_exit.interrupt(); +        _task_barrier.interrupt();          _task_handlers.clear();      } @@ -73,8 +72,7 @@ public:          _props.resize(size);          static const boost::uint64_t zero = 0;          _zero_buffs.resize(size, &zero); -        _task_barrier_entry.resize(size); -        _task_barrier_exit.resize(size); +        _task_barrier.resize(size);          _task_handlers.resize(size);          for (size_t i = 1/*skip 0*/; i < size; i++){              _task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i)); @@ -272,7 +270,7 @@ private:       ******************************************************************/      UHD_INLINE void converter_thread_task(const size_t index)      { -        _task_barrier_entry.wait(); +        _task_barrier.wait();          //shortcut references to local data structures          managed_send_buffer::sptr &buff = _props[index].buff; @@ -302,11 +300,11 @@ private:          buff->commit(num_vita_words32*sizeof(boost::uint32_t));          buff.reset(); //effectively a release -        _task_barrier_exit.wait(); +        if (index == 0) _task_barrier.wait_others();      }      //! Shared variables for the worker threads -    reusable_barrier _task_barrier_entry, _task_barrier_exit; +    reusable_barrier _task_barrier;      std::vector<task::sptr> _task_handlers;      size_t _convert_nsamps;      const tx_streamer::buffs_type *_convert_buffs;  | 
