aboutsummaryrefslogtreecommitdiffstats
path: root/src/SampleQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/SampleQueue.h')
-rw-r--r--src/SampleQueue.h104
1 files changed, 88 insertions, 16 deletions
diff --git a/src/SampleQueue.h b/src/SampleQueue.h
index 2df1934..09b67c7 100644
--- a/src/SampleQueue.h
+++ b/src/SampleQueue.h
@@ -12,6 +12,9 @@
#define DEBUG_SAMPLE_QUEUE 0
#include <mutex>
+#include <thread>
+#include <chrono>
+#include <condition_variable>
#include <queue>
#include <cassert>
#include <sstream>
@@ -55,28 +58,35 @@ public:
/* Push a bunch of samples into the buffer */
size_t push(const T *val, size_t len)
{
- std::lock_guard<std::mutex> lock(m_mutex);
+ size_t new_size = 0;
- assert(len % (m_channels * m_bytes_per_sample) == 0);
+ {
+ std::lock_guard<std::mutex> lock(m_mutex);
+
+ assert(len % (m_channels * m_bytes_per_sample) == 0);
#if DEBUG_SAMPLE_QUEUE
- fprintf(stdout, "######## push %s %zu, %zu >= %zu\n",
- (m_queue.size() >= m_max_size) ? "overrun" : "ok",
- len / 4,
- m_queue.size() / 4,
- m_max_size / 4);
+ fprintf(stdout, "######## push %s %zu, %zu >= %zu\n",
+ (m_queue.size() >= m_max_size) ? "overrun" : "ok",
+ len / 4,
+ m_queue.size() / 4,
+ m_max_size / 4);
#endif
- if (m_queue.size() >= m_max_size) {
- m_overruns++;
- return 0;
- }
+ if (m_queue.size() < m_max_size) {
+ for (size_t i = 0; i < len; i++) {
+ m_queue.push_back(val[i]);
+ }
- for (size_t i = 0; i < len; i++) {
- m_queue.push_back(val[i]);
+ new_size = m_queue.size();
+ }
+ else {
+ m_overruns++;
+ new_size = 0;
+ }
}
- size_t new_size = m_queue.size();
+ m_push_notification.notify_all();
return new_size;
}
@@ -87,7 +97,68 @@ public:
return m_queue.size();
}
- /* Get len elements, place them into the buf array
+ /* Wait until len elements in the queue are available,
+ * and then fill the buf. If the timeout_ms (expressed in milliseconds
+ * expires), fill the available number of elements.
+ * Returns the number
+ * of elemets written into buf
+ */
+ size_t pop_wait(T* buf, size_t len, int timeout_ms)
+ {
+ assert(len % (m_channels * m_bytes_per_sample) == 0);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait %zu\n", len);
+#endif
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ auto time_start = std::chrono::steady_clock::now();
+ const auto timeout = std::chrono::milliseconds(timeout_ms);
+
+#if 1
+ do {
+ const auto wait_timeout = std::chrono::milliseconds(10);
+ m_push_notification.wait_for(lock, wait_timeout);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait %zu need %zu\n",
+ m_queue.size(), len);
+#endif
+
+ if (std::chrono::steady_clock::now() - time_start > timeout) {
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait timeout\n");
+#endif
+ break;
+ }
+ } while (m_queue.size() < len);
+#else
+ while (m_queue.size() < len) {
+ lock.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ lock.lock();
+ }
+#endif
+
+ size_t num_to_copy = (m_queue.size() < len) ?
+ m_queue.size() : len;
+
+ std::copy(
+ m_queue.begin(),
+ m_queue.begin() + num_to_copy,
+ buf);
+
+ m_queue.erase(m_queue.begin(), m_queue.begin() + num_to_copy);
+
+ lock.unlock();
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy);
+#endif
+ return num_to_copy;
+ }
+
+ /* Get up to len elements, place them into the buf array
* Returns the number of elements it was able to take
* from the queue
*/
@@ -97,7 +168,7 @@ public:
return pop(buf, len, ovr);
}
- /* Get len elements, place them into the buf array.
+ /* Get up to len elements, place them into the buf array.
* Also update the overrun variable with the information
* of how many overruns we saw since the last pop.
* Returns the number of elements it was able to take
@@ -166,6 +237,7 @@ public:
private:
std::deque<T> m_queue;
mutable std::mutex m_mutex;
+ std::condition_variable m_push_notification;
unsigned int m_channels;
unsigned int m_bytes_per_sample;