From 31c8fca44d11d44b70d90721c27a77ec5a83faa1 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 24 Jun 2016 10:48:06 +0200 Subject: Add blocking sample queue and adapt VLC input --- src/SampleQueue.h | 104 ++++++++++++++++++++++++++++++++++++++++++++-------- src/VLCInput.cpp | 22 +++-------- src/VLCInput.h | 93 ++++++++++++++-------------------------------- src/dabplus-enc.cpp | 55 +++++++++++---------------- 4 files changed, 141 insertions(+), 133 deletions(-) diff --git a/src/SampleQueue.h b/src/SampleQueue.h index 2df1934..09b67c7 100644 --- a/src/SampleQueue.h +++ b/src/SampleQueue.h @@ -12,6 +12,9 @@ #define DEBUG_SAMPLE_QUEUE 0 #include +#include +#include +#include #include #include #include @@ -55,28 +58,35 @@ public: /* Push a bunch of samples into the buffer */ size_t push(const T *val, size_t len) { - std::lock_guard lock(m_mutex); + size_t new_size = 0; - assert(len % (m_channels * m_bytes_per_sample) == 0); + { + std::lock_guard lock(m_mutex); + + assert(len % (m_channels * m_bytes_per_sample) == 0); #if DEBUG_SAMPLE_QUEUE - fprintf(stdout, "######## push %s %zu, %zu >= %zu\n", - (m_queue.size() >= m_max_size) ? "overrun" : "ok", - len / 4, - m_queue.size() / 4, - m_max_size / 4); + fprintf(stdout, "######## push %s %zu, %zu >= %zu\n", + (m_queue.size() >= m_max_size) ? "overrun" : "ok", + len / 4, + m_queue.size() / 4, + m_max_size / 4); #endif - if (m_queue.size() >= m_max_size) { - m_overruns++; - return 0; - } + if (m_queue.size() < m_max_size) { + for (size_t i = 0; i < len; i++) { + m_queue.push_back(val[i]); + } - for (size_t i = 0; i < len; i++) { - m_queue.push_back(val[i]); + new_size = m_queue.size(); + } + else { + m_overruns++; + new_size = 0; + } } - size_t new_size = m_queue.size(); + m_push_notification.notify_all(); return new_size; } @@ -87,7 +97,68 @@ public: return m_queue.size(); } - /* Get len elements, place them into the buf array + /* Wait until len elements in the queue are available, + * and then fill the buf. If the timeout_ms (expressed in milliseconds + * expires), fill the available number of elements. + * Returns the number + * of elemets written into buf + */ + size_t pop_wait(T* buf, size_t len, int timeout_ms) + { + assert(len % (m_channels * m_bytes_per_sample) == 0); + +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait %zu\n", len); +#endif + std::unique_lock lock(m_mutex); + + auto time_start = std::chrono::steady_clock::now(); + const auto timeout = std::chrono::milliseconds(timeout_ms); + +#if 1 + do { + const auto wait_timeout = std::chrono::milliseconds(10); + m_push_notification.wait_for(lock, wait_timeout); + +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait %zu need %zu\n", + m_queue.size(), len); +#endif + + if (std::chrono::steady_clock::now() - time_start > timeout) { +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait timeout\n"); +#endif + break; + } + } while (m_queue.size() < len); +#else + while (m_queue.size() < len) { + lock.unlock(); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + lock.lock(); + } +#endif + + size_t num_to_copy = (m_queue.size() < len) ? + m_queue.size() : len; + + std::copy( + m_queue.begin(), + m_queue.begin() + num_to_copy, + buf); + + m_queue.erase(m_queue.begin(), m_queue.begin() + num_to_copy); + + lock.unlock(); + +#if DEBUG_SAMPLE_QUEUE + fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy); +#endif + return num_to_copy; + } + + /* Get up to len elements, place them into the buf array * Returns the number of elements it was able to take * from the queue */ @@ -97,7 +168,7 @@ public: return pop(buf, len, ovr); } - /* Get len elements, place them into the buf array. + /* Get up to len elements, place them into the buf array. * Also update the overrun variable with the information * of how many overruns we saw since the last pop. * Returns the number of elements it was able to take @@ -166,6 +237,7 @@ public: private: std::deque m_queue; mutable std::mutex m_mutex; + std::condition_variable m_push_notification; unsigned int m_channels; unsigned int m_bytes_per_sample; diff --git a/src/VLCInput.cpp b/src/VLCInput.cpp index fc7e07f..9cf13d8 100644 --- a/src/VLCInput.cpp +++ b/src/VLCInput.cpp @@ -1,5 +1,5 @@ /* ------------------------------------------------------------------ - * Copyright (C) 2015 Matthias P. Braendli + * Copyright (C) 2016 Matthias P. Braendli * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -327,16 +327,6 @@ ssize_t VLCInput::m_read(uint8_t* buf, size_t length) return err; } -ssize_t VLCInputDirect::read(uint8_t* buf, size_t length) -{ - int bytes_per_frame = m_channels * BYTES_PER_SAMPLE; - assert(length % bytes_per_frame == 0); - - ssize_t read = m_read(buf, length); - - return read; -} - bool write_icy_to_file(const std::string& text, const std::string& filename, bool dl_plus) { FILE* fd = fopen(filename.c_str(), "wb"); @@ -389,16 +379,14 @@ void VLCInput::write_icy_text(const std::string& filename, bool dl_plus) } -// ==================== VLCInputThreaded ==================== - -void VLCInputThreaded::start() +void VLCInput::start() { if (m_fault) { fprintf(stderr, "Cannot start VLC input. Fault detected previsouly!\n"); } else { m_running = true; - m_thread = std::thread(&VLCInputThreaded::process, this); + m_thread = std::thread(&VLCInput::process, this); } } @@ -406,7 +394,7 @@ void VLCInputThreaded::start() // 10 samples @ 32kHz = 3.125ms #define NUM_BYTES_PER_CALL (10 * BYTES_PER_SAMPLE) -void VLCInputThreaded::process() +void VLCInput::process() { uint8_t samplebuf[NUM_BYTES_PER_CALL]; while (m_running) { @@ -418,7 +406,7 @@ void VLCInputThreaded::process() break; } - m_queue.push(samplebuf, n); + m_samplequeue.push(samplebuf, n); } } diff --git a/src/VLCInput.h b/src/VLCInput.h index 3467525..ad23c4d 100644 --- a/src/VLCInput.h +++ b/src/VLCInput.h @@ -1,5 +1,5 @@ /* ------------------------------------------------------------------ - * Copyright (C) 2015 Matthias P. Braendli + * Copyright (C) 2016 Matthias P. Braendli * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,7 +53,8 @@ class VLCInput unsigned verbosity, std::string& gain, std::string& cache, - std::vector& additional_opts) : + std::vector& additional_opts, + SampleQueue& queue) : m_uri(uri), m_verbosity(verbosity), m_channels(channels), @@ -62,13 +63,29 @@ class VLCInput m_additional_opts(additional_opts), m_gain(gain), m_vlc(nullptr), - m_mp(nullptr) { } + m_mp(nullptr), + m_fault(false), + m_running(false), + m_samplequeue(queue) {} + + VLCInput(const VLCInput& other) = delete; + VLCInput& operator=(const VLCInput& other) = delete; + ~VLCInput() + { + if (m_running) { + m_running = false; + m_thread.join(); + } - ~VLCInput() { cleanup(); } + cleanup(); + } /* Prepare the audio input */ int prepare(); + /* Start the libVLC thread that fills the samplequeue */ + void start(); + /* Write the last received ICY-Text to the * file. */ @@ -92,7 +109,9 @@ class VLCInput int getChannels() { return m_channels; } - protected: + bool fault_detected() { return m_fault; }; + + private: void cleanup(void); // Fill exactly length bytes into buf. Blocking. @@ -126,73 +145,15 @@ class VLCInput libvlc_instance_t *m_vlc; libvlc_media_player_t *m_mp; - private: - VLCInput(const VLCInput& other) {} -}; - -class VLCInputDirect : public VLCInput -{ - public: - VLCInputDirect(const std::string& uri, - int rate, - unsigned channels, - unsigned verbosity, - std::string& gain, - std::string& cache, - std::vector& additional_opts) : - VLCInput(uri, rate, channels, verbosity, gain, cache, additional_opts) {} - - /* Read exactly length bytes into buf. - * Blocks if not enough data is available, - * or returns zero if EOF reached. - * - * Returns the number of bytes written into - * the buffer. - */ - ssize_t read(uint8_t* buf, size_t length); - -}; - -class VLCInputThreaded : public VLCInput -{ - public: - VLCInputThreaded(const std::string& uri, - int rate, - unsigned channels, - unsigned verbosity, - std::string& gain, - std::string& cache, - std::vector& additional_opts, - SampleQueue& queue) : - VLCInput(uri, rate, channels, verbosity, gain, cache, additional_opts), - m_fault(false), - m_running(false), - m_queue(queue) {} - - ~VLCInputThreaded() - { - if (m_running) { - m_running = false; - m_thread.join(); - } - } - - /* Start the libVLC thread that fills the queue */ - virtual void start(); - - bool fault_detected() { return m_fault; }; - - private: - VLCInputThreaded(const VLCInputThreaded& other) = delete; - VLCInputThreaded& operator=(const VLCInputThreaded& other) = delete; + // For the thread + /* The function runnin in the thread */ void process(); std::atomic m_fault; std::atomic m_running; std::thread m_thread; - SampleQueue& m_queue; - + SampleQueue& m_samplequeue; }; #endif // HAVE_VLC diff --git a/src/dabplus-enc.cpp b/src/dabplus-enc.cpp index 0385930..5b2b403 100644 --- a/src/dabplus-enc.cpp +++ b/src/dabplus-enc.cpp @@ -750,8 +750,7 @@ int main(int argc, char *argv[]) JackInput jack_in(jack_name, channels, sample_rate, queue); #endif #if HAVE_VLC - VLCInputDirect vlc_in_direct(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts); - VLCInputThreaded vlc_in_threaded(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts, queue); + VLCInput vlc_input(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts, queue); #endif if (infile) { @@ -770,21 +769,13 @@ int main(int argc, char *argv[]) #endif #if HAVE_VLC else if (vlc_uri != "") { - if (drift_compensation) { - if (vlc_in_threaded.prepare() != 0) { - fprintf(stderr, "VLC with drift compensation: preparation failed\n"); - return 1; - } - - fprintf(stderr, "Start VLC thread\n"); - vlc_in_threaded.start(); - } - else { - if (vlc_in_direct.prepare() != 0) { - fprintf(stderr, "VLC preparation failed\n"); - return 1; - } + if (vlc_input.prepare() != 0) { + fprintf(stderr, "VLC with drift compensation: preparation failed\n"); + return 1; } + + fprintf(stderr, "Start VLC thread\n"); + vlc_input.start(); } #endif #if HAVE_ALSA @@ -896,17 +887,14 @@ int main(int argc, char *argv[]) } #if HAVE_VLC else if (not vlc_uri.empty()) { - VLCInput* vlc_in = nullptr; - - if (drift_compensation) { - vlc_in = &vlc_in_threaded; - if (drift_compensation && vlc_in_threaded.fault_detected()) { - fprintf(stderr, "Detected fault in VLC input!\n"); - retval = 5; - break; - } + if (drift_compensation && vlc_input.fault_detected()) { + fprintf(stderr, "Detected fault in VLC input!\n"); + retval = 5; + break; + } + if (drift_compensation) { size_t overruns; size_t bytes_from_queue = queue.pop(&input_buf[0], input_buf.size(), &overruns); // returns bytes read_bytes = input_buf.size(); @@ -921,21 +909,20 @@ int main(int argc, char *argv[]) } } else { - vlc_in = &vlc_in_direct; + const int timeout_ms = 1000; + read_bytes = input_buf.size(); + size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms); // returns bytes - read_bytes = vlc_in_direct.read(&input_buf[0], input_buf.size()); - if (read_bytes < 0) { - fprintf(stderr, "Detected fault in VLC input!\n"); - break; - } - else if (read_bytes != input_buf.size()) { - fprintf(stderr, "Short VLC read !\n"); + if (bytes_from_queue < read_bytes) { + // queue timeout occurred + fprintf(stderr, "Detected fault in VLC input! No data in time.\n"); + retval = 5; break; } } if (not vlc_icytext_file.empty()) { - vlc_in->write_icy_text(vlc_icytext_file, vlc_icytext_dlplus); + vlc_input.write_icy_text(vlc_icytext_file, vlc_icytext_dlplus); } } #endif -- cgit v1.2.3