diff options
Diffstat (limited to 'src/SampleQueue.h')
-rw-r--r-- | src/SampleQueue.h | 104 |
1 files changed, 88 insertions, 16 deletions
diff --git a/src/SampleQueue.h b/src/SampleQueue.h index 2df1934..09b67c7 100644 --- a/src/SampleQueue.h +++ b/src/SampleQueue.h @@ -12,6 +12,9 @@ #define DEBUG_SAMPLE_QUEUE 0 #include <mutex> +#include <thread> +#include <chrono> +#include <condition_variable> #include <queue> #include <cassert> #include <sstream> @@ -55,28 +58,35 @@ public: /* Push a bunch of samples into the buffer */ size_t push(const T *val, size_t len) { - std::lock_guard<std::mutex> lock(m_mutex); + size_t new_size = 0; - assert(len % (m_channels * m_bytes_per_sample) == 0); + { + std::lock_guard<std::mutex> lock(m_mutex); + + assert(len % (m_channels * m_bytes_per_sample) == 0); #if DEBUG_SAMPLE_QUEUE - fprintf(stdout, "######## push %s %zu, %zu >= %zu\n", - (m_queue.size() >= m_max_size) ? "overrun" : "ok", - len / 4, - m_queue.size() / 4, - m_max_size / 4); + fprintf(stdout, "######## push %s %zu, %zu >= %zu\n", + (m_queue.size() >= m_max_size) ? "overrun" : "ok", + len / 4, + m_queue.size() / 4, + m_max_size / 4); #endif - if (m_queue.size() >= m_max_size) { - m_overruns++; - return 0; - } + if (m_queue.size() < m_max_size) { + for (size_t i = 0; i < len; i++) { + m_queue.push_back(val[i]); + } - for (size_t i = 0; i < len; i++) { - m_queue.push_back(val[i]); + new_size = m_queue.size(); + } + else { + m_overruns++; + new_size = 0; + } } - size_t new_size = m_queue.size(); + m_push_notification.notify_all(); return new_size; } @@ -87,7 +97,68 @@ public: return m_queue.size(); } - /* Get len elements, place them into the buf array + /* Wait until len elements in the queue are available, + * and then fill the buf. If the timeout_ms (expressed in milliseconds + * expires), fill the available number of elements. + * Returns the number + * of elemets written into buf + */ + size_t pop_wait(T* buf, size_t len, int timeout_ms) + { + assert(len % (m_channels * m_bytes_per_sample) == 0); + +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait %zu\n", len); +#endif + std::unique_lock<std::mutex> lock(m_mutex); + + auto time_start = std::chrono::steady_clock::now(); + const auto timeout = std::chrono::milliseconds(timeout_ms); + +#if 1 + do { + const auto wait_timeout = std::chrono::milliseconds(10); + m_push_notification.wait_for(lock, wait_timeout); + +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait %zu need %zu\n", + m_queue.size(), len); +#endif + + if (std::chrono::steady_clock::now() - time_start > timeout) { +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait timeout\n"); +#endif + break; + } + } while (m_queue.size() < len); +#else + while (m_queue.size() < len) { + lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + lock.lock(); + } +#endif + + size_t num_to_copy = (m_queue.size() < len) ? + m_queue.size() : len; + + std::copy( + m_queue.begin(), + m_queue.begin() + num_to_copy, + buf); + + m_queue.erase(m_queue.begin(), m_queue.begin() + num_to_copy); + + lock.unlock(); + +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy); +#endif + return num_to_copy; + } + + /* Get up to len elements, place them into the buf array * Returns the number of elements it was able to take * from the queue */ @@ -97,7 +168,7 @@ public: return pop(buf, len, ovr); } - /* Get len elements, place them into the buf array. + /* Get up to len elements, place them into the buf array. * Also update the overrun variable with the information * of how many overruns we saw since the last pop. * Returns the number of elements it was able to take @@ -166,6 +237,7 @@ public: private: std::deque<T> m_queue; mutable std::mutex m_mutex; + std::condition_variable m_push_notification; unsigned int m_channels; unsigned int m_bytes_per_sample; |