summaryrefslogtreecommitdiffstats
path: root/src/SampleQueue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/SampleQueue.h')
-rw-r--r--src/SampleQueue.h52
1 files changed, 33 insertions, 19 deletions
diff --git a/src/SampleQueue.h b/src/SampleQueue.h
index 646f3dd..aeeb8d4 100644
--- a/src/SampleQueue.h
+++ b/src/SampleQueue.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2016 Matthias P. Braendli
+ * Copyright (C) 2018 Matthias P. Braendli
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -48,7 +48,7 @@
* bytes_per_sample * number_of_channels
*
* The queue has a maximum size. If this size is reached, push()
- * ignores new data.
+ * either blocks or ignores new data, depending on drift_compensation.
*
* If pop() is called but there is not enough data in the queue,
* the missing samples are replaced by zeros. pop() will always
@@ -65,10 +65,12 @@ class SampleQueue
public:
SampleQueue(unsigned int bytes_per_sample,
unsigned int channels,
- size_t max_size) :
+ size_t max_size,
+ bool drift_compensation) :
m_bytes_per_sample(bytes_per_sample),
m_channels(channels),
m_max_size(max_size),
+ m_push_block(not drift_compensation),
m_overruns(0) {}
@@ -81,7 +83,7 @@ public:
size_t new_size = 0;
{
- std::lock_guard<std::mutex> lock(m_mutex);
+ std::unique_lock<std::mutex> lock(m_mutex);
assert(len % (m_channels * m_bytes_per_sample) == 0);
@@ -93,17 +95,32 @@ public:
m_max_size / 4);
#endif
- if (m_queue.size() < m_max_size) {
- for (size_t i = 0; i < len; i++) {
- m_queue.push_back(val[i]);
+ if (m_push_block) {
+ while (len) {
+ const size_t available = m_max_size - m_queue.size();
+ const size_t copy_len = std::min(available, len);
+
+ if (copy_len > 0) {
+ std::copy(val, val + copy_len, std::back_inserter(m_queue));
+ len -= copy_len;
+ val += copy_len;
+ }
+ else {
+ const auto wait_timeout = std::chrono::milliseconds(100);
+ m_pop_notification.wait_for(lock, wait_timeout);
+ }
}
-
- new_size = m_queue.size();
}
else {
- m_overruns++;
- new_size = 0;
+ if (m_queue.size() < m_max_size) {
+ std::copy(val, val + len, std::back_inserter(m_queue));
+ }
+ else {
+ m_overruns++;
+ }
}
+
+ new_size = m_queue.size();
}
m_push_notification.notify_all();
@@ -142,7 +159,6 @@ public:
auto time_start = std::chrono::steady_clock::now();
const auto timeout = std::chrono::milliseconds(timeout_ms);
-#if 1
do {
const auto wait_timeout = std::chrono::milliseconds(10);
m_push_notification.wait_for(lock, wait_timeout);
@@ -159,13 +175,6 @@ public:
break;
}
} while (m_queue.size() < len);
-#else
- while (m_queue.size() < len) {
- lock.unlock();
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- lock.lock();
- }
-#endif
size_t num_to_copy = (m_queue.size() < len) ?
m_queue.size() : len;
@@ -182,6 +191,8 @@ public:
#if DEBUG_SAMPLE_QUEUE
fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy);
#endif
+
+ m_pop_notification.notify_all();
return num_to_copy;
}
@@ -260,6 +271,7 @@ public:
#endif
}
+ m_pop_notification.notify_all();
return ret;
}
@@ -267,10 +279,12 @@ private:
std::deque<T> m_queue;
mutable std::mutex m_mutex;
std::condition_variable m_push_notification;
+ std::condition_variable m_pop_notification;
unsigned int m_channels;
unsigned int m_bytes_per_sample;
size_t m_max_size;
+ bool m_push_block;
/*! Counter to keep track of number of overruns between calls
* to pop()