summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-06-24 10:48:06 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-06-24 10:48:23 +0200
commit31c8fca44d11d44b70d90721c27a77ec5a83faa1 (patch)
treebb57f0996fa866b8ea55aaca01ad198792aa6f25
parent9183cd317bebe96b54a227da13d8e7c70b4ae67f (diff)
downloadODR-AudioEnc-31c8fca44d11d44b70d90721c27a77ec5a83faa1.tar.gz
ODR-AudioEnc-31c8fca44d11d44b70d90721c27a77ec5a83faa1.tar.bz2
ODR-AudioEnc-31c8fca44d11d44b70d90721c27a77ec5a83faa1.zip
Add blocking sample queue and adapt VLC input
-rw-r--r--src/SampleQueue.h104
-rw-r--r--src/VLCInput.cpp22
-rw-r--r--src/VLCInput.h93
-rw-r--r--src/dabplus-enc.cpp55
4 files changed, 141 insertions, 133 deletions
diff --git a/src/SampleQueue.h b/src/SampleQueue.h
index 2df1934..09b67c7 100644
--- a/src/SampleQueue.h
+++ b/src/SampleQueue.h
@@ -12,6 +12,9 @@
#define DEBUG_SAMPLE_QUEUE 0
#include <mutex>
+#include <thread>
+#include <chrono>
+#include <condition_variable>
#include <queue>
#include <cassert>
#include <sstream>
@@ -55,28 +58,35 @@ public:
/* Push a bunch of samples into the buffer */
size_t push(const T *val, size_t len)
{
- std::lock_guard<std::mutex> lock(m_mutex);
+ size_t new_size = 0;
- assert(len % (m_channels * m_bytes_per_sample) == 0);
+ {
+ std::lock_guard<std::mutex> lock(m_mutex);
+
+ assert(len % (m_channels * m_bytes_per_sample) == 0);
#if DEBUG_SAMPLE_QUEUE
- fprintf(stdout, "######## push %s %zu, %zu >= %zu\n",
- (m_queue.size() >= m_max_size) ? "overrun" : "ok",
- len / 4,
- m_queue.size() / 4,
- m_max_size / 4);
+ fprintf(stdout, "######## push %s %zu, %zu >= %zu\n",
+ (m_queue.size() >= m_max_size) ? "overrun" : "ok",
+ len / 4,
+ m_queue.size() / 4,
+ m_max_size / 4);
#endif
- if (m_queue.size() >= m_max_size) {
- m_overruns++;
- return 0;
- }
+ if (m_queue.size() < m_max_size) {
+ for (size_t i = 0; i < len; i++) {
+ m_queue.push_back(val[i]);
+ }
- for (size_t i = 0; i < len; i++) {
- m_queue.push_back(val[i]);
+ new_size = m_queue.size();
+ }
+ else {
+ m_overruns++;
+ new_size = 0;
+ }
}
- size_t new_size = m_queue.size();
+ m_push_notification.notify_all();
return new_size;
}
@@ -87,7 +97,68 @@ public:
return m_queue.size();
}
- /* Get len elements, place them into the buf array
+ /* Wait until len elements in the queue are available,
+ * and then fill the buf. If the timeout_ms (expressed in milliseconds
+ * expires), fill the available number of elements.
+ * Returns the number
+ * of elemets written into buf
+ */
+ size_t pop_wait(T* buf, size_t len, int timeout_ms)
+ {
+ assert(len % (m_channels * m_bytes_per_sample) == 0);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait %zu\n", len);
+#endif
+ std::unique_lock<std::mutex> lock(m_mutex);
+
+ auto time_start = std::chrono::steady_clock::now();
+ const auto timeout = std::chrono::milliseconds(timeout_ms);
+
+#if 1
+ do {
+ const auto wait_timeout = std::chrono::milliseconds(10);
+ m_push_notification.wait_for(lock, wait_timeout);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait %zu need %zu\n",
+ m_queue.size(), len);
+#endif
+
+ if (std::chrono::steady_clock::now() - time_start > timeout) {
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait timeout\n");
+#endif
+ break;
+ }
+ } while (m_queue.size() < len);
+#else
+ while (m_queue.size() < len) {
+ lock.unlock();
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ lock.lock();
+ }
+#endif
+
+ size_t num_to_copy = (m_queue.size() < len) ?
+ m_queue.size() : len;
+
+ std::copy(
+ m_queue.begin(),
+ m_queue.begin() + num_to_copy,
+ buf);
+
+ m_queue.erase(m_queue.begin(), m_queue.begin() + num_to_copy);
+
+ lock.unlock();
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy);
+#endif
+ return num_to_copy;
+ }
+
+ /* Get up to len elements, place them into the buf array
* Returns the number of elements it was able to take
* from the queue
*/
@@ -97,7 +168,7 @@ public:
return pop(buf, len, ovr);
}
- /* Get len elements, place them into the buf array.
+ /* Get up to len elements, place them into the buf array.
* Also update the overrun variable with the information
* of how many overruns we saw since the last pop.
* Returns the number of elements it was able to take
@@ -166,6 +237,7 @@ public:
private:
std::deque<T> m_queue;
mutable std::mutex m_mutex;
+ std::condition_variable m_push_notification;
unsigned int m_channels;
unsigned int m_bytes_per_sample;
diff --git a/src/VLCInput.cpp b/src/VLCInput.cpp
index fc7e07f..9cf13d8 100644
--- a/src/VLCInput.cpp
+++ b/src/VLCInput.cpp
@@ -1,5 +1,5 @@
/* ------------------------------------------------------------------
- * Copyright (C) 2015 Matthias P. Braendli
+ * Copyright (C) 2016 Matthias P. Braendli
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -327,16 +327,6 @@ ssize_t VLCInput::m_read(uint8_t* buf, size_t length)
return err;
}
-ssize_t VLCInputDirect::read(uint8_t* buf, size_t length)
-{
- int bytes_per_frame = m_channels * BYTES_PER_SAMPLE;
- assert(length % bytes_per_frame == 0);
-
- ssize_t read = m_read(buf, length);
-
- return read;
-}
-
bool write_icy_to_file(const std::string& text, const std::string& filename, bool dl_plus)
{
FILE* fd = fopen(filename.c_str(), "wb");
@@ -389,16 +379,14 @@ void VLCInput::write_icy_text(const std::string& filename, bool dl_plus)
}
-// ==================== VLCInputThreaded ====================
-
-void VLCInputThreaded::start()
+void VLCInput::start()
{
if (m_fault) {
fprintf(stderr, "Cannot start VLC input. Fault detected previsouly!\n");
}
else {
m_running = true;
- m_thread = std::thread(&VLCInputThreaded::process, this);
+ m_thread = std::thread(&VLCInput::process, this);
}
}
@@ -406,7 +394,7 @@ void VLCInputThreaded::start()
// 10 samples @ 32kHz = 3.125ms
#define NUM_BYTES_PER_CALL (10 * BYTES_PER_SAMPLE)
-void VLCInputThreaded::process()
+void VLCInput::process()
{
uint8_t samplebuf[NUM_BYTES_PER_CALL];
while (m_running) {
@@ -418,7 +406,7 @@ void VLCInputThreaded::process()
break;
}
- m_queue.push(samplebuf, n);
+ m_samplequeue.push(samplebuf, n);
}
}
diff --git a/src/VLCInput.h b/src/VLCInput.h
index 3467525..ad23c4d 100644
--- a/src/VLCInput.h
+++ b/src/VLCInput.h
@@ -1,5 +1,5 @@
/* ------------------------------------------------------------------
- * Copyright (C) 2015 Matthias P. Braendli
+ * Copyright (C) 2016 Matthias P. Braendli
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -53,7 +53,8 @@ class VLCInput
unsigned verbosity,
std::string& gain,
std::string& cache,
- std::vector<std::string>& additional_opts) :
+ std::vector<std::string>& additional_opts,
+ SampleQueue<uint8_t>& queue) :
m_uri(uri),
m_verbosity(verbosity),
m_channels(channels),
@@ -62,13 +63,29 @@ class VLCInput
m_additional_opts(additional_opts),
m_gain(gain),
m_vlc(nullptr),
- m_mp(nullptr) { }
+ m_mp(nullptr),
+ m_fault(false),
+ m_running(false),
+ m_samplequeue(queue) {}
+
+ VLCInput(const VLCInput& other) = delete;
+ VLCInput& operator=(const VLCInput& other) = delete;
+ ~VLCInput()
+ {
+ if (m_running) {
+ m_running = false;
+ m_thread.join();
+ }
- ~VLCInput() { cleanup(); }
+ cleanup();
+ }
/* Prepare the audio input */
int prepare();
+ /* Start the libVLC thread that fills the samplequeue */
+ void start();
+
/* Write the last received ICY-Text to the
* file.
*/
@@ -92,7 +109,9 @@ class VLCInput
int getChannels() { return m_channels; }
- protected:
+ bool fault_detected() { return m_fault; };
+
+ private:
void cleanup(void);
// Fill exactly length bytes into buf. Blocking.
@@ -126,73 +145,15 @@ class VLCInput
libvlc_instance_t *m_vlc;
libvlc_media_player_t *m_mp;
- private:
- VLCInput(const VLCInput& other) {}
-};
-
-class VLCInputDirect : public VLCInput
-{
- public:
- VLCInputDirect(const std::string& uri,
- int rate,
- unsigned channels,
- unsigned verbosity,
- std::string& gain,
- std::string& cache,
- std::vector<std::string>& additional_opts) :
- VLCInput(uri, rate, channels, verbosity, gain, cache, additional_opts) {}
-
- /* Read exactly length bytes into buf.
- * Blocks if not enough data is available,
- * or returns zero if EOF reached.
- *
- * Returns the number of bytes written into
- * the buffer.
- */
- ssize_t read(uint8_t* buf, size_t length);
-
-};
-
-class VLCInputThreaded : public VLCInput
-{
- public:
- VLCInputThreaded(const std::string& uri,
- int rate,
- unsigned channels,
- unsigned verbosity,
- std::string& gain,
- std::string& cache,
- std::vector<std::string>& additional_opts,
- SampleQueue<uint8_t>& queue) :
- VLCInput(uri, rate, channels, verbosity, gain, cache, additional_opts),
- m_fault(false),
- m_running(false),
- m_queue(queue) {}
-
- ~VLCInputThreaded()
- {
- if (m_running) {
- m_running = false;
- m_thread.join();
- }
- }
-
- /* Start the libVLC thread that fills the queue */
- virtual void start();
-
- bool fault_detected() { return m_fault; };
-
- private:
- VLCInputThreaded(const VLCInputThreaded& other) = delete;
- VLCInputThreaded& operator=(const VLCInputThreaded& other) = delete;
+ // For the thread
+ /* The function runnin in the thread */
void process();
std::atomic<bool> m_fault;
std::atomic<bool> m_running;
std::thread m_thread;
- SampleQueue<uint8_t>& m_queue;
-
+ SampleQueue<uint8_t>& m_samplequeue;
};
#endif // HAVE_VLC
diff --git a/src/dabplus-enc.cpp b/src/dabplus-enc.cpp
index 0385930..5b2b403 100644
--- a/src/dabplus-enc.cpp
+++ b/src/dabplus-enc.cpp
@@ -750,8 +750,7 @@ int main(int argc, char *argv[])
JackInput jack_in(jack_name, channels, sample_rate, queue);
#endif
#if HAVE_VLC
- VLCInputDirect vlc_in_direct(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts);
- VLCInputThreaded vlc_in_threaded(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts, queue);
+ VLCInput vlc_input(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts, queue);
#endif
if (infile) {
@@ -770,21 +769,13 @@ int main(int argc, char *argv[])
#endif
#if HAVE_VLC
else if (vlc_uri != "") {
- if (drift_compensation) {
- if (vlc_in_threaded.prepare() != 0) {
- fprintf(stderr, "VLC with drift compensation: preparation failed\n");
- return 1;
- }
-
- fprintf(stderr, "Start VLC thread\n");
- vlc_in_threaded.start();
- }
- else {
- if (vlc_in_direct.prepare() != 0) {
- fprintf(stderr, "VLC preparation failed\n");
- return 1;
- }
+ if (vlc_input.prepare() != 0) {
+ fprintf(stderr, "VLC with drift compensation: preparation failed\n");
+ return 1;
}
+
+ fprintf(stderr, "Start VLC thread\n");
+ vlc_input.start();
}
#endif
#if HAVE_ALSA
@@ -896,17 +887,14 @@ int main(int argc, char *argv[])
}
#if HAVE_VLC
else if (not vlc_uri.empty()) {
- VLCInput* vlc_in = nullptr;
-
- if (drift_compensation) {
- vlc_in = &vlc_in_threaded;
- if (drift_compensation && vlc_in_threaded.fault_detected()) {
- fprintf(stderr, "Detected fault in VLC input!\n");
- retval = 5;
- break;
- }
+ if (drift_compensation && vlc_input.fault_detected()) {
+ fprintf(stderr, "Detected fault in VLC input!\n");
+ retval = 5;
+ break;
+ }
+ if (drift_compensation) {
size_t overruns;
size_t bytes_from_queue = queue.pop(&input_buf[0], input_buf.size(), &overruns); // returns bytes
read_bytes = input_buf.size();
@@ -921,21 +909,20 @@ int main(int argc, char *argv[])
}
}
else {
- vlc_in = &vlc_in_direct;
+ const int timeout_ms = 1000;
+ read_bytes = input_buf.size();
+ size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms); // returns bytes
- read_bytes = vlc_in_direct.read(&input_buf[0], input_buf.size());
- if (read_bytes < 0) {
- fprintf(stderr, "Detected fault in VLC input!\n");
- break;
- }
- else if (read_bytes != input_buf.size()) {
- fprintf(stderr, "Short VLC read !\n");
+ if (bytes_from_queue < read_bytes) {
+ // queue timeout occurred
+ fprintf(stderr, "Detected fault in VLC input! No data in time.\n");
+ retval = 5;
break;
}
}
if (not vlc_icytext_file.empty()) {
- vlc_in->write_icy_text(vlc_icytext_file, vlc_icytext_dlplus);
+ vlc_input.write_icy_text(vlc_icytext_file, vlc_icytext_dlplus);
}
}
#endif