summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--host/include/uhd/utils/atomic.hpp53
-rw-r--r--host/lib/transport/super_recv_packet_handler.hpp14
-rw-r--r--host/lib/transport/super_send_packet_handler.hpp14
-rw-r--r--host/lib/types/time_spec.cpp14
-rw-r--r--host/tests/time_spec_test.cpp18
5 files changed, 79 insertions, 34 deletions
diff --git a/host/include/uhd/utils/atomic.hpp b/host/include/uhd/utils/atomic.hpp
index 7a81d8d5e..8ddee73ca 100644
--- a/host/include/uhd/utils/atomic.hpp
+++ b/host/include/uhd/utils/atomic.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2012 Ettus Research LLC
+// Copyright 2012-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -21,6 +21,8 @@
#include <uhd/config.hpp>
#include <uhd/types/time_spec.hpp>
#include <boost/thread/thread.hpp>
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/condition_variable.hpp>
#include <boost/interprocess/detail/atomic.hpp>
#include <boost/version.hpp>
@@ -79,7 +81,6 @@ namespace uhd{
//! Resize the barrier for N threads
void resize(const size_t size){
_size = size;
- _count.write(size);
}
/*!
@@ -88,24 +89,52 @@ namespace uhd{
*/
void interrupt(void)
{
- _count.write(boost::uint32_t(~0));
+ _done.inc();
}
//! Wait on the barrier condition
- UHD_INLINE void wait(void){
- _count.dec();
- _count.cas(_size, 0);
- while (_count.read() != _size){
- boost::this_thread::interruption_point();
- if (_count.read() == boost::uint32_t(~0))
- throw boost::thread_interrupted();
- boost::this_thread::yield();
+ UHD_INLINE void wait(void)
+ {
+ if (_size == 1) return;
+
+ //entry barrier with condition variable
+ _entry_counter.inc();
+ _entry_counter.cas(0, _size);
+ boost::mutex::scoped_lock lock(_mutex);
+ while (_entry_counter.read() != 0)
+ {
+ this->check_interrupt();
+ _cond.timed_wait(lock, boost::posix_time::milliseconds(1));
}
+ lock.unlock(); //unlock before notify
+ _cond.notify_one();
+
+ //exit barrier to ensure known condition of entry count
+ _exit_counter.inc();
+ _exit_counter.cas(0, _size);
+ while (_exit_counter.read() != 0) this->check_interrupt();
+ }
+
+ //! Wait on the barrier condition
+ UHD_INLINE void wait_others(void)
+ {
+ while (_entry_counter.read() != (_size-1)) this->check_interrupt();
}
private:
size_t _size;
- atomic_uint32_t _count;
+ atomic_uint32_t _entry_counter;
+ atomic_uint32_t _exit_counter;
+ atomic_uint32_t _done;
+ boost::mutex _mutex;
+ boost::condition_variable _cond;
+
+ UHD_INLINE void check_interrupt(void)
+ {
+ if (_done.read() != 0) throw boost::thread_interrupted();
+ boost::this_thread::interruption_point();
+ boost::this_thread::yield();
+ }
};
/*!
diff --git a/host/lib/transport/super_recv_packet_handler.hpp b/host/lib/transport/super_recv_packet_handler.hpp
index 7a1972690..5a75d5f0d 100644
--- a/host/lib/transport/super_recv_packet_handler.hpp
+++ b/host/lib/transport/super_recv_packet_handler.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2011-2012 Ettus Research LLC
+// Copyright 2011-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -79,8 +79,7 @@ public:
}
~recv_packet_handler(void){
- _task_barrier_entry.interrupt();
- _task_barrier_exit.interrupt();
+ _task_barrier.interrupt();
_task_handlers.clear();
}
@@ -91,8 +90,7 @@ public:
_props.resize(size);
//re-initialize all buffers infos by re-creating the vector
_buffers_infos = std::vector<buffers_info_type>(4, buffers_info_type(size));
- _task_barrier_entry.resize(size);
- _task_barrier_exit.resize(size);
+ _task_barrier.resize(size);
_task_handlers.resize(size);
for (size_t i = 1/*skip 0*/; i < size; i++){
_task_handlers[i] = task::make(boost::bind(&recv_packet_handler::converter_thread_task, this, i));
@@ -561,7 +559,7 @@ private:
******************************************************************/
UHD_INLINE void converter_thread_task(const size_t index)
{
- _task_barrier_entry.wait();
+ _task_barrier.wait();
//shortcut references to local data structures
buffers_info_type &buff_info = get_curr_buffer_info();
@@ -587,11 +585,11 @@ private:
info.buff.reset(); //effectively a release
}
- _task_barrier_exit.wait();
+ if (index == 0) _task_barrier.wait_others();
}
//! Shared variables for the worker threads
- reusable_barrier _task_barrier_entry, _task_barrier_exit;
+ reusable_barrier _task_barrier;
std::vector<task::sptr> _task_handlers;
size_t _convert_nsamps;
const rx_streamer::buffs_type *_convert_buffs;
diff --git a/host/lib/transport/super_send_packet_handler.hpp b/host/lib/transport/super_send_packet_handler.hpp
index 74e893e67..726742327 100644
--- a/host/lib/transport/super_send_packet_handler.hpp
+++ b/host/lib/transport/super_send_packet_handler.hpp
@@ -1,5 +1,5 @@
//
-// Copyright 2011-2012 Ettus Research LLC
+// Copyright 2011-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -61,8 +61,7 @@ public:
}
~send_packet_handler(void){
- _task_barrier_entry.interrupt();
- _task_barrier_exit.interrupt();
+ _task_barrier.interrupt();
_task_handlers.clear();
}
@@ -73,8 +72,7 @@ public:
_props.resize(size);
static const boost::uint64_t zero = 0;
_zero_buffs.resize(size, &zero);
- _task_barrier_entry.resize(size);
- _task_barrier_exit.resize(size);
+ _task_barrier.resize(size);
_task_handlers.resize(size);
for (size_t i = 1/*skip 0*/; i < size; i++){
_task_handlers[i] = task::make(boost::bind(&send_packet_handler::converter_thread_task, this, i));
@@ -272,7 +270,7 @@ private:
******************************************************************/
UHD_INLINE void converter_thread_task(const size_t index)
{
- _task_barrier_entry.wait();
+ _task_barrier.wait();
//shortcut references to local data structures
managed_send_buffer::sptr &buff = _props[index].buff;
@@ -302,11 +300,11 @@ private:
buff->commit(num_vita_words32*sizeof(boost::uint32_t));
buff.reset(); //effectively a release
- _task_barrier_exit.wait();
+ if (index == 0) _task_barrier.wait_others();
}
//! Shared variables for the worker threads
- reusable_barrier _task_barrier_entry, _task_barrier_exit;
+ reusable_barrier _task_barrier;
std::vector<task::sptr> _task_handlers;
size_t _convert_nsamps;
const tx_streamer::buffs_type *_convert_buffs;
diff --git a/host/lib/types/time_spec.cpp b/host/lib/types/time_spec.cpp
index ec939e52e..2fce841be 100644
--- a/host/lib/types/time_spec.cpp
+++ b/host/lib/types/time_spec.cpp
@@ -1,5 +1,5 @@
//
-// Copyright 2011-2012 Ettus Research LLC
+// Copyright 2011-2013 Ettus Research LLC
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
@@ -16,7 +16,6 @@
//
#include <uhd/types/time_spec.hpp>
-#include <inttypes.h> //imaxdiv, intmax_t
using namespace uhd;
@@ -101,8 +100,9 @@ time_spec_t::time_spec_t(time_t full_secs, long tick_count, double tick_rate){
}
time_spec_t time_spec_t::from_ticks(long long ticks, double tick_rate){
- const imaxdiv_t divres = imaxdiv(ticks, fast_llround(tick_rate));
- return time_spec_t(time_t(divres.quot), double(divres.rem)/tick_rate);
+ const time_t secs_full = time_t(ticks/tick_rate);
+ const double ticks_error = ticks - (secs_full*tick_rate);
+ return time_spec_t(secs_full, ticks_error/tick_rate);
}
/***********************************************************************
@@ -113,8 +113,10 @@ long time_spec_t::get_tick_count(double tick_rate) const{
}
long long time_spec_t::to_ticks(double tick_rate) const{
- return fast_llround(this->get_frac_secs()*tick_rate) + \
- (this->get_full_secs() * fast_llround(tick_rate));
+ const long long ticks_full = this->get_full_secs()*fast_llround(tick_rate);
+ const double secs_error = this->get_full_secs() - (ticks_full/tick_rate);
+ const double secs_frac = this->get_frac_secs() + secs_error;
+ return ticks_full + fast_llround(secs_frac*tick_rate);
}
double time_spec_t::get_real_secs(void) const{
diff --git a/host/tests/time_spec_test.cpp b/host/tests/time_spec_test.cpp
index 139a113af..7dee95c5c 100644
--- a/host/tests/time_spec_test.cpp
+++ b/host/tests/time_spec_test.cpp
@@ -108,3 +108,21 @@ BOOST_AUTO_TEST_CASE(test_time_large_ticks_to_time_spec)
std::cout << "t0.get_frac_secs() " << t0.get_frac_secs() << std::endl;
BOOST_CHECK_EQUAL(t0.get_full_secs(), time_t(1360217663));
}
+
+BOOST_AUTO_TEST_CASE(test_time_error_irrational_rate)
+{
+ static const double rate = 1625e3/6;
+ const long long tick_in = 23423436291667;
+ const uhd::time_spec_t ts = uhd::time_spec_t::from_ticks(tick_in, rate);
+ const long long tick_out = ts.to_ticks(rate);
+ const long long err = tick_in - tick_out;
+
+ std::cout << std::setprecision(18);
+ std::cout << "time ............ " << ts.get_real_secs() << std::endl;
+ std::cout << "tick in ......... " << tick_in << std::endl;
+ std::cout << "tick out ........ " << tick_out << std::endl;
+ std::cout << "tick error ...... " << err << std::endl;
+ std::cout << std::endl;
+
+ BOOST_CHECK_EQUAL(err, (long long)(0));
+}