diff options
| -rw-r--r-- | src/SampleQueue.h | 104 | ||||
| -rw-r--r-- | src/VLCInput.cpp | 22 | ||||
| -rw-r--r-- | src/VLCInput.h | 93 | ||||
| -rw-r--r-- | src/dabplus-enc.cpp | 55 | 
4 files changed, 141 insertions, 133 deletions
| diff --git a/src/SampleQueue.h b/src/SampleQueue.h index 2df1934..09b67c7 100644 --- a/src/SampleQueue.h +++ b/src/SampleQueue.h @@ -12,6 +12,9 @@  #define DEBUG_SAMPLE_QUEUE 0  #include <mutex> +#include <thread> +#include <chrono> +#include <condition_variable>  #include <queue>  #include <cassert>  #include <sstream> @@ -55,28 +58,35 @@ public:      /* Push a bunch of samples into the buffer */      size_t push(const T *val, size_t len)      { -        std::lock_guard<std::mutex> lock(m_mutex); +        size_t new_size = 0; -        assert(len % (m_channels * m_bytes_per_sample) == 0); +        { +            std::lock_guard<std::mutex> lock(m_mutex); + +            assert(len % (m_channels * m_bytes_per_sample) == 0);  #if DEBUG_SAMPLE_QUEUE -        fprintf(stdout, "######## push %s %zu, %zu >= %zu\n", -                (m_queue.size() >= m_max_size) ? "overrun" : "ok", -                len / 4, -                m_queue.size() / 4, -                m_max_size / 4); +            fprintf(stdout, "######## push %s %zu, %zu >= %zu\n", +                    (m_queue.size() >= m_max_size) ? "overrun" : "ok", +                    len / 4, +                    m_queue.size() / 4, +                    m_max_size / 4);  #endif -        if (m_queue.size() >= m_max_size) { -            m_overruns++; -            return 0; -        } +            if (m_queue.size() < m_max_size) { +                for (size_t i = 0; i < len; i++) { +                    m_queue.push_back(val[i]); +                } -        for (size_t i = 0; i < len; i++) { -            m_queue.push_back(val[i]); +                new_size = m_queue.size(); +            } +            else { +                m_overruns++; +                new_size = 0; +            }          } -        size_t new_size = m_queue.size(); +        m_push_notification.notify_all();          return new_size;      } @@ -87,7 +97,68 @@ public:          return m_queue.size();      } -    /* Get len elements, place them into the buf array +    /* Wait until len elements in the queue are available, +     * and then fill the buf. If the timeout_ms (expressed in milliseconds +     * expires), fill the available number of elements. +     * Returns the number +     * of elemets written into buf +     */ +    size_t pop_wait(T* buf, size_t len, int timeout_ms) +    { +        assert(len % (m_channels * m_bytes_per_sample) == 0); + +#if DEBUG_SAMPLE_QUEUE +        fprintf(stdout, "######## pop_wait %zu\n", len); +#endif +        std::unique_lock<std::mutex> lock(m_mutex); + +        auto time_start = std::chrono::steady_clock::now(); +        const auto timeout = std::chrono::milliseconds(timeout_ms); + +#if 1 +        do { +            const auto wait_timeout = std::chrono::milliseconds(10); +            m_push_notification.wait_for(lock, wait_timeout); + +#if DEBUG_SAMPLE_QUEUE +                fprintf(stdout, "######## pop_wait %zu need %zu\n", +                        m_queue.size(), len); +#endif + +            if (std::chrono::steady_clock::now() - time_start > timeout) { +#if DEBUG_SAMPLE_QUEUE +                fprintf(stdout, "######## pop_wait timeout\n"); +#endif +                break; +            } +        } while (m_queue.size() < len); +#else +        while (m_queue.size() < len) { +            lock.unlock(); +            std::this_thread::sleep_for(std::chrono::milliseconds(1)); +            lock.lock(); +        } +#endif + +        size_t num_to_copy = (m_queue.size() < len) ? +            m_queue.size() : len; + +        std::copy( +                m_queue.begin(), +                m_queue.begin() + num_to_copy, +                buf); + +        m_queue.erase(m_queue.begin(), m_queue.begin() + num_to_copy); + +        lock.unlock(); + +#if DEBUG_SAMPLE_QUEUE +        fprintf(stdout, "######## pop_wait returns %zu\n", num_to_copy); +#endif +        return num_to_copy; +    } + +    /* Get up to len elements, place them into the buf array       * Returns the number of elements it was able to take       * from the queue       */ @@ -97,7 +168,7 @@ public:          return pop(buf, len, ovr);      } -    /* Get len elements, place them into the buf array. +    /* Get up to len elements, place them into the buf array.       * Also update the overrun variable with the information       * of how many overruns we saw since the last pop.       * Returns the number of elements it was able to take @@ -166,6 +237,7 @@ public:  private:      std::deque<T> m_queue;      mutable std::mutex m_mutex; +    std::condition_variable m_push_notification;      unsigned int m_channels;      unsigned int m_bytes_per_sample; diff --git a/src/VLCInput.cpp b/src/VLCInput.cpp index fc7e07f..9cf13d8 100644 --- a/src/VLCInput.cpp +++ b/src/VLCInput.cpp @@ -1,5 +1,5 @@  /* ------------------------------------------------------------------ - * Copyright (C) 2015 Matthias P. Braendli + * Copyright (C) 2016 Matthias P. Braendli   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License. @@ -327,16 +327,6 @@ ssize_t VLCInput::m_read(uint8_t* buf, size_t length)      return err;  } -ssize_t VLCInputDirect::read(uint8_t* buf, size_t length) -{ -    int bytes_per_frame = m_channels * BYTES_PER_SAMPLE; -    assert(length % bytes_per_frame == 0); - -    ssize_t read = m_read(buf, length); - -    return read; -} -  bool write_icy_to_file(const std::string& text, const std::string& filename, bool dl_plus)  {      FILE* fd = fopen(filename.c_str(), "wb"); @@ -389,16 +379,14 @@ void VLCInput::write_icy_text(const std::string& filename, bool dl_plus)  } -// ==================== VLCInputThreaded ==================== - -void VLCInputThreaded::start() +void VLCInput::start()  {      if (m_fault) {          fprintf(stderr, "Cannot start VLC input. Fault detected previsouly!\n");      }      else {          m_running = true; -        m_thread = std::thread(&VLCInputThreaded::process, this); +        m_thread = std::thread(&VLCInput::process, this);      }  } @@ -406,7 +394,7 @@ void VLCInputThreaded::start()  // 10 samples @ 32kHz = 3.125ms  #define NUM_BYTES_PER_CALL (10 * BYTES_PER_SAMPLE) -void VLCInputThreaded::process() +void VLCInput::process()  {      uint8_t samplebuf[NUM_BYTES_PER_CALL];      while (m_running) { @@ -418,7 +406,7 @@ void VLCInputThreaded::process()              break;          } -        m_queue.push(samplebuf, n); +        m_samplequeue.push(samplebuf, n);      }  } diff --git a/src/VLCInput.h b/src/VLCInput.h index 3467525..ad23c4d 100644 --- a/src/VLCInput.h +++ b/src/VLCInput.h @@ -1,5 +1,5 @@  /* ------------------------------------------------------------------ - * Copyright (C) 2015 Matthias P. Braendli + * Copyright (C) 2016 Matthias P. Braendli   *   * Licensed under the Apache License, Version 2.0 (the "License");   * you may not use this file except in compliance with the License. @@ -53,7 +53,8 @@ class VLCInput                   unsigned verbosity,                   std::string& gain,                   std::string& cache, -                 std::vector<std::string>& additional_opts) : +                 std::vector<std::string>& additional_opts, +                 SampleQueue<uint8_t>& queue) :              m_uri(uri),              m_verbosity(verbosity),              m_channels(channels), @@ -62,13 +63,29 @@ class VLCInput              m_additional_opts(additional_opts),              m_gain(gain),              m_vlc(nullptr), -            m_mp(nullptr) { } +            m_mp(nullptr), +            m_fault(false), +            m_running(false), +            m_samplequeue(queue) {} + +        VLCInput(const VLCInput& other) = delete; +        VLCInput& operator=(const VLCInput& other) = delete; +        ~VLCInput() +        { +            if (m_running) { +                m_running = false; +                m_thread.join(); +            } -        ~VLCInput() { cleanup(); } +            cleanup(); +        }          /* Prepare the audio input */          int prepare(); +        /* Start the libVLC thread that fills the samplequeue */ +        void start(); +          /* Write the last received ICY-Text to the           * file.           */ @@ -92,7 +109,9 @@ class VLCInput          int getChannels() { return m_channels; } -    protected: +        bool fault_detected() { return m_fault; }; + +    private:          void cleanup(void);          // Fill exactly length bytes into buf. Blocking. @@ -126,73 +145,15 @@ class VLCInput          libvlc_instance_t     *m_vlc;          libvlc_media_player_t *m_mp; -    private: -        VLCInput(const VLCInput& other) {} -}; - -class VLCInputDirect : public VLCInput -{ -    public: -        VLCInputDirect(const std::string& uri, -                       int rate, -                       unsigned channels, -                       unsigned verbosity, -                       std::string& gain, -                       std::string& cache, -                       std::vector<std::string>& additional_opts) : -            VLCInput(uri, rate, channels, verbosity, gain, cache, additional_opts) {} - -        /* Read exactly length bytes into buf. -         * Blocks if not enough data is available, -         * or returns zero if EOF reached. -         * -         * Returns the number of bytes written into -         * the buffer. -         */ -        ssize_t read(uint8_t* buf, size_t length); - -}; - -class VLCInputThreaded : public VLCInput -{ -    public: -        VLCInputThreaded(const std::string& uri, -                         int rate, -                         unsigned channels, -                         unsigned verbosity, -                         std::string& gain, -                         std::string& cache, -                         std::vector<std::string>& additional_opts, -                         SampleQueue<uint8_t>& queue) : -            VLCInput(uri, rate, channels, verbosity, gain, cache, additional_opts), -            m_fault(false), -            m_running(false), -            m_queue(queue) {} - -        ~VLCInputThreaded() -        { -            if (m_running) { -                m_running = false; -                m_thread.join(); -            } -        } - -        /* Start the libVLC thread that fills the queue */ -        virtual void start(); - -        bool fault_detected() { return m_fault; }; - -    private: -        VLCInputThreaded(const VLCInputThreaded& other) = delete; -        VLCInputThreaded& operator=(const VLCInputThreaded& other) = delete; +        // For the thread +        /* The function runnin in the thread */          void process();          std::atomic<bool> m_fault;          std::atomic<bool> m_running;          std::thread m_thread; -        SampleQueue<uint8_t>& m_queue; - +        SampleQueue<uint8_t>& m_samplequeue;  };  #endif // HAVE_VLC diff --git a/src/dabplus-enc.cpp b/src/dabplus-enc.cpp index 0385930..5b2b403 100644 --- a/src/dabplus-enc.cpp +++ b/src/dabplus-enc.cpp @@ -750,8 +750,7 @@ int main(int argc, char *argv[])      JackInput         jack_in(jack_name, channels, sample_rate, queue);  #endif  #if HAVE_VLC -    VLCInputDirect    vlc_in_direct(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts); -    VLCInputThreaded  vlc_in_threaded(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts, queue); +    VLCInput          vlc_input(vlc_uri, sample_rate, channels, verbosity, vlc_gain, vlc_cache, vlc_additional_opts, queue);  #endif      if (infile) { @@ -770,21 +769,13 @@ int main(int argc, char *argv[])  #endif  #if HAVE_VLC      else if (vlc_uri != "") { -        if (drift_compensation) { -            if (vlc_in_threaded.prepare() != 0) { -                fprintf(stderr, "VLC with drift compensation: preparation failed\n"); -                return 1; -            } - -            fprintf(stderr, "Start VLC thread\n"); -            vlc_in_threaded.start(); -        } -        else { -            if (vlc_in_direct.prepare() != 0) { -                fprintf(stderr, "VLC preparation failed\n"); -                return 1; -            } +        if (vlc_input.prepare() != 0) { +            fprintf(stderr, "VLC with drift compensation: preparation failed\n"); +            return 1;          } + +        fprintf(stderr, "Start VLC thread\n"); +        vlc_input.start();      }  #endif  #if HAVE_ALSA @@ -896,17 +887,14 @@ int main(int argc, char *argv[])          }  #if HAVE_VLC          else if (not vlc_uri.empty()) { -            VLCInput* vlc_in = nullptr; - -            if (drift_compensation) { -                vlc_in = &vlc_in_threaded; -                if (drift_compensation && vlc_in_threaded.fault_detected()) { -                    fprintf(stderr, "Detected fault in VLC input!\n"); -                    retval = 5; -                    break; -                } +            if (drift_compensation && vlc_input.fault_detected()) { +                fprintf(stderr, "Detected fault in VLC input!\n"); +                retval = 5; +                break; +            } +            if (drift_compensation) {                  size_t overruns;                  size_t bytes_from_queue = queue.pop(&input_buf[0], input_buf.size(), &overruns); // returns bytes                  read_bytes = input_buf.size(); @@ -921,21 +909,20 @@ int main(int argc, char *argv[])                  }              }              else { -                vlc_in = &vlc_in_direct; +                const int timeout_ms = 1000; +                read_bytes = input_buf.size(); +                size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms); // returns bytes -                read_bytes = vlc_in_direct.read(&input_buf[0], input_buf.size()); -                if (read_bytes < 0) { -                    fprintf(stderr, "Detected fault in VLC input!\n"); -                    break; -                } -                else if (read_bytes != input_buf.size()) { -                    fprintf(stderr, "Short VLC read !\n"); +                if (bytes_from_queue < read_bytes) { +                    // queue timeout occurred +                    fprintf(stderr, "Detected fault in VLC input! No data in time.\n"); +                    retval = 5;                      break;                  }              }              if (not vlc_icytext_file.empty()) { -                vlc_in->write_icy_text(vlc_icytext_file, vlc_icytext_dlplus); +                vlc_input.write_icy_text(vlc_icytext_file, vlc_icytext_dlplus);              }          }  #endif | 
