From 64dc582e502585aac65284756a6fd84c09ec48df Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Thu, 7 Jun 2018 08:46:39 +0200 Subject: Make queue push behaviour depend on drift compensation push() now blocks without drift compensation --- src/SampleQueue.h | 52 +++++++++++++++++++++++++++++++++------------------- src/odr-audioenc.cpp | 5 ++--- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/src/SampleQueue.h b/src/SampleQueue.h index 646f3dd..aeeb8d4 100644 --- a/src/SampleQueue.h +++ b/src/SampleQueue.h @@ -1,5 +1,5 @@ /* - * Copyright (C) 2016 Matthias P. Braendli + * Copyright (C) 2018 Matthias P. Braendli * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,7 +48,7 @@ * bytes_per_sample * number_of_channels * * The queue has a maximum size. If this size is reached, push() - * ignores new data. + * either blocks or ignores new data, depending on drift_compensation. * * If pop() is called but there is not enough data in the queue, * the missing samples are replaced by zeros. pop() will always @@ -65,10 +65,12 @@ class SampleQueue public: SampleQueue(unsigned int bytes_per_sample, unsigned int channels, - size_t max_size) : + size_t max_size, + bool drift_compensation) : m_bytes_per_sample(bytes_per_sample), m_channels(channels), m_max_size(max_size), + m_push_block(not drift_compensation), m_overruns(0) {} @@ -81,7 +83,7 @@ public: size_t new_size = 0; { - std::lock_guard lock(m_mutex); + std::unique_lock lock(m_mutex); assert(len % (m_channels * m_bytes_per_sample) == 0); @@ -93,17 +95,32 @@ public: m_max_size / 4); #endif - if (m_queue.size() < m_max_size) { - for (size_t i = 0; i < len; i++) { - m_queue.push_back(val[i]); + if (m_push_block) { + while (len) { + const size_t available = m_max_size - m_queue.size(); + const size_t copy_len = std::min(available, len); + + if (copy_len > 0) { + std::copy(val, val + copy_len, std::back_inserter(m_queue)); + len -= copy_len; + val += copy_len; + } + else { + const auto wait_timeout = std::chrono::milliseconds(100); + m_pop_notification.wait_for(lock, wait_timeout); + } } - - new_size = m_queue.size(); } else { - m_overruns++; - new_size = 0; + if (m_queue.size() < m_max_size) { + std::copy(val, val + len, std::back_inserter(m_queue)); + } + else { + m_overruns++; + } } + + new_size = m_queue.size(); } m_push_notification.notify_all(); @@ -142,7 +159,6 @@ public: auto time_start = std::chrono::steady_clock::now(); const auto timeout = std::chrono::milliseconds(timeout_ms); -#if 1 do { const auto wait_timeout = std::chrono::milliseconds(10); m_push_notification.wait_for(lock, wait_timeout); @@ -159,13 +175,6 @@ public: break; } } while (m_queue.size() < len); -#else - while (m_queue.size() < len) { - lock.unlock(); - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - lock.lock(); - } -#endif size_t num_to_copy = (m_queue.size() < len) ? m_queue.size() : len; @@ -182,6 +191,8 @@ public: #if DEBUG_SAMPLE_QUEUE fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy); #endif + + m_pop_notification.notify_all(); return num_to_copy; } @@ -260,6 +271,7 @@ public: #endif } + m_pop_notification.notify_all(); return ret; } @@ -267,10 +279,12 @@ private: std::deque m_queue; mutable std::mutex m_mutex; std::condition_variable m_push_notification; + std::condition_variable m_pop_notification; unsigned int m_channels; unsigned int m_bytes_per_sample; size_t m_max_size; + bool m_push_block; /*! Counter to keep track of number of overruns between calls * to pop() diff --git a/src/odr-audioenc.cpp b/src/odr-audioenc.cpp index b5f7b9f..3b42dfd 100644 --- a/src/odr-audioenc.cpp +++ b/src/odr-audioenc.cpp @@ -931,7 +931,7 @@ int main(int argc, char *argv[]) /*! The SampleQueue \c queue is given to the inputs, so that they * can fill it. */ - SampleQueue queue(BYTES_PER_SAMPLE, settings.channels, max_size); + SampleQueue queue(BYTES_PER_SAMPLE, settings.channels, max_size, settings.drift_compensation); /* symsize=8, gfpoly=0x11d, fcr=0, prim=1, nroots=10, pad=135 */ rs_handler = init_rs_char(8, 0x11d, 0, 1, 10, 135); @@ -1106,8 +1106,7 @@ int main(int argc, char *argv[]) size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms, &overruns); // returns bytes if (overruns) { - fprintf(stderr, "%zd overruns occured!\n", overruns); - status |= STATUS_OVERRUN; + throw logic_error("Queue overrun in non-drift compensation!"); } if (bytes_from_queue < read_bytes) { -- cgit v1.2.3