diff options
Diffstat (limited to 'src/SampleQueue.h')
-rw-r--r-- | src/SampleQueue.h | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/src/SampleQueue.h b/src/SampleQueue.h index 646f3dd..aeeb8d4 100644 --- a/src/SampleQueue.h +++ b/src/SampleQueue.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Matthias P. Braendli + * Copyright (C) 2018 Matthias P. Braendli * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ * bytes_per_sample * number_of_channels * * The queue has a maximum size. If this size is reached, push() - * ignores new data. + * either blocks or ignores new data, depending on drift_compensation. * * If pop() is called but there is not enough data in the queue, * the missing samples are replaced by zeros. pop() will always @@ -65,10 +65,12 @@ class SampleQueue public: SampleQueue(unsigned int bytes_per_sample, unsigned int channels, - size_t max_size) : + size_t max_size, + bool drift_compensation) : m_bytes_per_sample(bytes_per_sample), m_channels(channels), m_max_size(max_size), + m_push_block(not drift_compensation), m_overruns(0) {} @@ -81,7 +83,7 @@ public: size_t new_size = 0; { - std::lock_guard<std::mutex> lock(m_mutex); + std::unique_lock<std::mutex> lock(m_mutex); assert(len % (m_channels * m_bytes_per_sample) == 0); @@ -93,17 +95,32 @@ public: m_max_size / 4); #endif - if (m_queue.size() < m_max_size) { - for (size_t i = 0; i < len; i++) { - m_queue.push_back(val[i]); + if (m_push_block) { + while (len) { + const size_t available = m_max_size - m_queue.size(); + const size_t copy_len = std::min(available, len); + + if (copy_len > 0) { + std::copy(val, val + copy_len, std::back_inserter(m_queue)); + len -= copy_len; + val += copy_len; + } + else { + const auto wait_timeout = std::chrono::milliseconds(100); + m_pop_notification.wait_for(lock, wait_timeout); + } } - - new_size = m_queue.size(); } else { - m_overruns++; - new_size = 0; + if (m_queue.size() < m_max_size) { + std::copy(val, val + len, std::back_inserter(m_queue)); + } + else { + m_overruns++; + } } + + new_size = m_queue.size(); } m_push_notification.notify_all(); @@ -142,7 +159,6 @@ public: auto time_start = std::chrono::steady_clock::now(); const auto timeout = std::chrono::milliseconds(timeout_ms); -#if 1 do { const auto wait_timeout = std::chrono::milliseconds(10); m_push_notification.wait_for(lock, wait_timeout); @@ -159,13 +175,6 @@ public: break; } } while (m_queue.size() < len); -#else - while (m_queue.size() < len) { - lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - lock.lock(); - } -#endif size_t num_to_copy = (m_queue.size() < len) ? m_queue.size() : len; @@ -182,6 +191,8 @@ public: #if DEBUG_SAMPLE_QUEUE fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy); #endif + + m_pop_notification.notify_all(); return num_to_copy; } @@ -260,6 +271,7 @@ public: #endif } + m_pop_notification.notify_all(); return ret; } @@ -267,10 +279,12 @@ private: std::deque<T> m_queue; mutable std::mutex m_mutex; std::condition_variable m_push_notification; + std::condition_variable m_pop_notification; unsigned int m_channels; unsigned int m_bytes_per_sample; size_t m_max_size; + bool m_push_block; /*! Counter to keep track of number of overruns between calls * to pop() |