From 84febca8b268129cdd79ff0d1c4f8eeed092c5fb Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 7 Oct 2017 11:47:05 +0200 Subject: Use queue for all inputs and unify interface This also changes the --fifo-silence option. Instead of inserting silence separately, it uses the drift compensation to do that. --- src/AlsaInput.cpp | 21 +++++++-- src/AlsaInput.h | 24 ++++++---- src/FileInput.cpp | 49 ++++++++++++-------- src/FileInput.h | 18 ++++---- src/InputInterface.h | 16 ++++++- src/JackInput.cpp | 6 +++ src/JackInput.h | 3 ++ src/VLCInput.cpp | 6 +++ src/VLCInput.h | 2 + src/odr-audioenc.cpp | 128 ++++++++++++++++++--------------------------------- 10 files changed, 150 insertions(+), 123 deletions(-) diff --git a/src/AlsaInput.cpp b/src/AlsaInput.cpp index af3c284..747814f 100644 --- a/src/AlsaInput.cpp +++ b/src/AlsaInput.cpp @@ -137,6 +137,12 @@ void AlsaInputThreaded::prepare() } } +bool AlsaInputThreaded::read_source(size_t num_bytes) +{ + // Reading done in separate thread, no normal termination condition possible + return true; +} + void AlsaInputThreaded::process() { uint8_t samplebuf[NUM_SAMPLES_PER_CALL * BYTES_PER_SAMPLE * m_channels]; @@ -158,14 +164,19 @@ void AlsaInputDirect::prepare() m_init_alsa(); } -ssize_t AlsaInputDirect::read(uint8_t* buf, size_t length) +bool AlsaInputDirect::read_source(size_t num_bytes) { - int bytes_per_frame = m_channels * BYTES_PER_SAMPLE; - assert(length % bytes_per_frame == 0); + const int bytes_per_frame = m_channels * BYTES_PER_SAMPLE; + assert(num_bytes % bytes_per_frame == 0); - ssize_t read = m_read(buf, length / bytes_per_frame); + const size_t num_frames = num_bytes / bytes_per_frame; + vector buf(num_bytes); + ssize_t ret = m_read(buf.data(), num_frames); - return (read > 0) ? read * bytes_per_frame : read; + if (ret > 0) { + m_queue.push(buf.data(), ret * bytes_per_frame); + } + return ret == num_frames; } #endif // HAVE_ALSA diff --git a/src/AlsaInput.h b/src/AlsaInput.h index 08ccbf6..e90ed36 100644 --- a/src/AlsaInput.h +++ b/src/AlsaInput.h @@ -47,10 +47,12 @@ class AlsaInput : public InputInterface public: AlsaInput(const std::string& alsa_dev, unsigned int channels, - unsigned int rate) : + unsigned int rate, + SampleQueue& queue) : m_alsa_dev(alsa_dev), m_channels(channels), - m_rate(rate) { } + m_rate(rate), + m_queue(queue) { } AlsaInput(const AlsaInput& other) = delete; AlsaInput& operator=(const AlsaInput& other) = delete; @@ -70,6 +72,8 @@ class AlsaInput : public InputInterface unsigned int m_channels; unsigned int m_rate; + SampleQueue& m_queue; + snd_pcm_t *m_alsa_handle = nullptr; }; @@ -78,8 +82,9 @@ class AlsaInputDirect : public AlsaInput public: AlsaInputDirect(const std::string& alsa_dev, unsigned int channels, - unsigned int rate) : - AlsaInput(alsa_dev, channels, rate) { } + unsigned int rate, + SampleQueue& queue) : + AlsaInput(alsa_dev, channels, rate, queue) { } #if 0 AlsaInputDirect(AlsaInputDirect&& other) : @@ -102,6 +107,8 @@ class AlsaInputDirect : public AlsaInput virtual bool fault_detected(void) const override { return false; }; + virtual bool read_source(size_t num_bytes) override; + /*! Read length Bytes from from the alsa device. * length must be a multiple of channels * bytes_per_sample. * @@ -117,10 +124,9 @@ class AlsaInputThreaded : public AlsaInput unsigned int channels, unsigned int rate, SampleQueue& queue) : - AlsaInput(alsa_dev, channels, rate), + AlsaInput(alsa_dev, channels, rate, queue), m_fault(false), - m_running(false), - m_queue(queue) { } + m_running(false) { } virtual ~AlsaInputThreaded() { @@ -135,6 +141,8 @@ class AlsaInputThreaded : public AlsaInput virtual bool fault_detected(void) const override { return m_fault; }; + virtual bool read_source(size_t num_bytes) override; + private: void process(); @@ -142,8 +150,6 @@ class AlsaInputThreaded : public AlsaInput std::atomic m_running; std::thread m_thread; - SampleQueue& m_queue; - }; #endif // HAVE_ALSA diff --git a/src/FileInput.cpp b/src/FileInput.cpp index 89e1dab..5eb39ee 100644 --- a/src/FileInput.cpp +++ b/src/FileInput.cpp @@ -84,31 +84,44 @@ void FileInput::prepare(void) } } -ssize_t FileInput::read(uint8_t* buf, size_t length) +bool FileInput::read_source(size_t num_bytes) { - ssize_t pcmread; + vector samplebuf(num_bytes); + + ssize_t ret = 0; if (m_raw_input) { - if (fread(buf, length, 1, m_in_fh) == 1) { - pcmread = length; - } - else { - //fprintf(stderr, "Unable to read from input!\n"); - return 0; - } + ret = fread(samplebuf.data(), 1, num_bytes, m_in_fh); } else { - pcmread = wav_read_data(m_wav, buf, length); + ret = wav_read_data(m_wav, samplebuf.data(), num_bytes); } - return pcmread; -} + if (ret > 0) { + m_queue.push(samplebuf.data(), ret); + } -int FileInput::eof() -{ - int eof = feof(m_in_fh); - clearerr(m_in_fh); - return eof; -} + if (ret < num_bytes) { + if (m_raw_input) { + if (ferror(m_in_fh)) { + return false; + } + + if (feof(m_in_fh)) { + if (m_continue_after_eof) { + clearerr(m_in_fh); + } + else { + return false; + } + } + } + else { + // the wavfile input doesn't support the continuation after EOF + return false; + } + } + return true; +} diff --git a/src/FileInput.h b/src/FileInput.h index 59e0f0b..c839b42 100644 --- a/src/FileInput.h +++ b/src/FileInput.h @@ -31,6 +31,7 @@ #include #include #include +#include "SampleQueue.h" #include "InputInterface.h" class FileInput : public InputInterface @@ -38,10 +39,14 @@ class FileInput : public InputInterface public: FileInput(const std::string& filename, bool raw_input, - int sample_rate) : + int sample_rate, + bool continue_after_eof, + SampleQueue& queue) : m_filename(filename), m_raw_input(raw_input), - m_sample_rate(sample_rate) {} + m_sample_rate(sample_rate), + m_continue_after_eof(continue_after_eof), + m_queue(queue) {} ~FileInput(); FileInput(const FileInput& other) = delete; @@ -52,17 +57,14 @@ class FileInput : public InputInterface virtual bool fault_detected(void) const override { return false; }; - /*! Read length bytes into buf. - * - * \return the number of bytes read. - */ - ssize_t read(uint8_t* buf, size_t length); - int eof(); + virtual bool read_source(size_t num_bytes) override; protected: std::string m_filename; bool m_raw_input; int m_sample_rate; + bool m_continue_after_eof; + SampleQueue& m_queue; /* handle to the wav reader */ void *m_wav = nullptr; diff --git a/src/InputInterface.h b/src/InputInterface.h index b582c33..d2fcf97 100644 --- a/src/InputInterface.h +++ b/src/InputInterface.h @@ -32,6 +32,20 @@ class InputInterface { */ virtual void prepare(void) = 0; - /*! Return true if the input detected some sort of fault */ + /*! Return true if the input detected some sort of fault or + * abnormal termination + */ virtual bool fault_detected(void) const = 0; + + /*! Tell the input that it shall read from source and fill the queue. + * The num_samples argument is an indication on how many bytes + * the encoder needs. + * Some inputs fill the queue from another thread, in which case + * this function might only serve as indication that data gets + * consumed. + * + * A return value of true means data was read, a return value of + * false means a normal termination of the input (e.g. end of file) + */ + virtual bool read_source(size_t num_bytes) = 0; }; diff --git a/src/JackInput.cpp b/src/JackInput.cpp index 70c354f..958685d 100644 --- a/src/JackInput.cpp +++ b/src/JackInput.cpp @@ -104,6 +104,12 @@ void JackInput::prepare() } } +bool JackInput::read_source(size_t num_bytes) +{ + // Reading done in separate thread, no normal termination condition possible + return true; +} + void JackInput::jack_process(jack_nframes_t nframes) { /*! JACK works with float samples, we need to convert diff --git a/src/JackInput.h b/src/JackInput.h index aa33a03..d5e2bcf 100644 --- a/src/JackInput.h +++ b/src/JackInput.h @@ -59,8 +59,11 @@ class JackInput : public InputInterface virtual ~JackInput(); virtual void prepare() override; + virtual bool fault_detected(void) const override { return m_fault; }; + virtual bool read_source(size_t num_bytes) override; + private: jack_client_t *m_client; diff --git a/src/VLCInput.cpp b/src/VLCInput.cpp index 9c424bb..494d620 100644 --- a/src/VLCInput.cpp +++ b/src/VLCInput.cpp @@ -252,6 +252,12 @@ void VLCInput::prepare() m_thread = std::thread(&VLCInput::process, this); } +bool VLCInput::read_source(size_t num_bytes) +{ + // Reading done in separate thread, no normal termination condition possible + return true; +} + void VLCInput::preRender_cb(uint8_t** pp_pcm_buffer, size_t size) { const size_t max_length = 20 * size; diff --git a/src/VLCInput.h b/src/VLCInput.h index 703641d..076e961 100644 --- a/src/VLCInput.h +++ b/src/VLCInput.h @@ -117,6 +117,8 @@ class VLCInput : public InputInterface * the libVLC thread that fills m_samplequeue */ virtual void prepare() override; + virtual bool read_source(size_t num_bytes) override; + /*! Write the last received ICY-Text to the * file. */ diff --git a/src/odr-audioenc.cpp b/src/odr-audioenc.cpp index 11978a9..098d700 100644 --- a/src/odr-audioenc.cpp +++ b/src/odr-audioenc.cpp @@ -299,12 +299,12 @@ static int prepare_aac_encoder( return 1; } } - if (aacEncEncode(*encoder, NULL, NULL, NULL, NULL) != AACENC_OK) { + if (aacEncEncode(*encoder, nullptr, nullptr, nullptr, nullptr) != AACENC_OK) { fprintf(stderr, "Unable to initialize the encoder\n"); return 1; } - uint32_t bw = aacEncoder_GetParam(*encoder, AACENC_BANDWIDTH); + const uint32_t bw = aacEncoder_GetParam(*encoder, AACENC_BANDWIDTH); fprintf(stderr, "Bandwidth is %d\n", bw); return 0; @@ -395,6 +395,7 @@ int main(int argc, char *argv[]) // For the file input string infile; + bool continue_after_eof = false; int raw_input = 0; // For the VLC input @@ -407,17 +408,16 @@ int main(int argc, char *argv[]) unsigned verbosity = 0; // For the file output - FILE *out_fh = NULL; + FILE *out_fh = nullptr; string jack_name; vector output_uris; int sample_rate=48000, channels=2; - void *rs_handler = NULL; + void *rs_handler = nullptr; bool afterburner = true; uint32_t bandwidth = 0; - bool inFifoSilence = false; bool drift_compensation = false; AACENC_InfoStruct info = { 0 }; int aot = AOT_NONE; @@ -449,7 +449,7 @@ int main(int argc, char *argv[]) int show_level = 0; /* Data for ZMQ CURVE authentication */ - char* keyfile = NULL; + char* keyfile = nullptr; char secretkey[CURVE_KEYLEN+1]; const struct option longopts[] = { @@ -519,7 +519,11 @@ int main(int argc, char *argv[]) case 2: // PS aot = AOT_DABPLUS_PS; break; - case 3: // FIFO SILENCE + case 3: // FIFO Silence + continue_after_eof = true; + // Enable drift compensation, otherwise we would block instead of inserting silence. + drift_compensation = true; + break; case 4: // DAB channel mode dab_channel_mode = optarg; if (not( dab_channel_mode == "s" or @@ -694,7 +698,7 @@ int main(int argc, char *argv[]) if (not output_uris.empty()) { for (auto uri : output_uris) { if (uri == "-") { - if (out_fh != NULL) { + if (out_fh != nullptr) { fprintf(stderr, "You can't write to more than one file!\n"); return 1; } @@ -722,7 +726,7 @@ int main(int argc, char *argv[]) zmq_sock.connect(uri.c_str()); } else { // We assume it's a file name - if (out_fh != NULL) { + if (out_fh != nullptr) { fprintf(stderr, "You can't write to more than one file!\n"); return 1; } @@ -860,7 +864,7 @@ int main(int argc, char *argv[]) /* symsize=8, gfpoly=0x11d, fcr=0, prim=1, nroots=10, pad=135 */ rs_handler = init_rs_char(8, 0x11d, 0, 1, 10, 135); - if (rs_handler == NULL) { + if (rs_handler == nullptr) { perror("init_rs_char failed"); return 1; } @@ -868,9 +872,9 @@ int main(int argc, char *argv[]) // We'll use one of the tree possible inputs #if HAVE_ALSA AlsaInputThreaded alsa_in_threaded(alsa_device, channels, sample_rate, queue); - AlsaInputDirect alsa_in_direct(alsa_device, channels, sample_rate); + AlsaInputDirect alsa_in_direct(alsa_device, channels, sample_rate, queue); #endif - FileInput file_in(infile, raw_input, sample_rate); + FileInput file_in(infile, raw_input, sample_rate, continue_after_eof, queue); #if HAVE_JACK JackInput jack_in(jack_name, channels, sample_rate, queue); #endif @@ -992,8 +996,8 @@ int main(int argc, char *argv[]) * without drift compensation) or in a non-blocking way (VLC or ALSA * with drift compensation, JACK). * - * The file input doesn't need the queue at all. But the other inputs - * do, and either use \c pop() or \c pop_wait() depending on if it's blocking or not + * All inputs write samples into the queue, and either use \c pop() or + * \c pop_wait() depending on if it's blocking or not * * In non-blocking, the \c queue makes the data available without delay, and the * \c drift_compensation_delay() function handles rate throttling. @@ -1005,68 +1009,14 @@ int main(int argc, char *argv[]) break; } - if (not infile.empty()) { - read_bytes = file_in.read(&input_buf[0], input_buf.size()); - if (read_bytes < 0) { - break; - } - else if (read_bytes != input_buf.size()) { - if (inFifoSilence && file_in.eof()) { - memset(&input_buf[0], 0, input_buf.size()); - read_bytes = input_buf.size(); - usleep((long)input_buf.size() * 1000000 / - (BYTES_PER_SAMPLE * channels * sample_rate)); - } - else { - fprintf(stderr, "Short file read !\n"); - read_bytes = 0; - } - } + if (not input->read_source(input_buf.size())) { + fprintf(stderr, "End of input reached\n"); + retval = 0; + break; } -#if HAVE_VLC - else if (not vlc_uri.empty()) { - - if (drift_compensation) { - size_t overruns; - size_t bytes_from_queue = queue.pop(&input_buf[0], input_buf.size(), &overruns); // returns bytes - if (bytes_from_queue != input_buf.size()) { - expand_missing_samples(input_buf, channels, bytes_from_queue); - } - read_bytes = input_buf.size(); - drift_compensation_delay(sample_rate, channels, read_bytes); - - if (bytes_from_queue != input_buf.size()) { - status |= STATUS_UNDERRUN; - } - - if (overruns) { - status |= STATUS_OVERRUN; - } - } - else { - const int timeout_ms = 10000; - read_bytes = input_buf.size(); - - /*! pop_wait() must return after a timeout, otherwise the silence detector cannot do - * its job. - */ - size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms); // returns bytes - if (bytes_from_queue < read_bytes) { - // queue timeout occurred - fprintf(stderr, "Detected fault in VLC input! No data in time.\n"); - retval = 5; - break; - } - } - - if (not vlc_icytext_file.empty()) { - vlc_input.write_icy_text(vlc_icytext_file, vlc_icytext_dlplus); - } - } -#endif - else if (drift_compensation or not jack_name.empty()) { - size_t overruns; + if (drift_compensation) { + size_t overruns = 0; size_t bytes_from_queue = queue.pop(&input_buf[0], input_buf.size(), &overruns); // returns bytes if (bytes_from_queue != input_buf.size()) { expand_missing_samples(input_buf, channels, bytes_from_queue); @@ -1083,17 +1033,31 @@ int main(int argc, char *argv[]) } } else { -#if HAVE_ALSA - read_bytes = alsa_in_direct.read(&input_buf[0], input_buf.size()); - if (read_bytes < 0) { + const int timeout_ms = 10000; + read_bytes = input_buf.size(); + + /*! pop_wait() must return after a timeout, otherwise the silence detector cannot do + * its job. */ + size_t bytes_from_queue = queue.pop_wait(&input_buf[0], read_bytes, timeout_ms); // returns bytes + + if (bytes_from_queue < read_bytes) { + // queue timeout occurred + fprintf(stderr, "Detected fault in VLC input! No data in time.\n"); + retval = 5; break; } - else if (read_bytes != input_buf.size()) { - fprintf(stderr, "Short alsa read !\n"); - } -#endif } + /*! \section MetadataFromSource + * The VLC input is the only input that can also give us metadata, which + * we can hand over to ODR-PadEnc. + */ +#if HAVE_VLC + if (not vlc_uri.empty() and not vlc_icytext_file.empty()) { + vlc_input.write_icy_text(vlc_icytext_file, vlc_icytext_dlplus); + } +#endif + /*! \section AudioLevel * Audio level measurement is always done assuming we have two * channels, and is formally wrong in mono, but still gives @@ -1113,7 +1077,7 @@ int main(int argc, char *argv[]) * only useful if the connection dropped, or if no data is available. It is not * useful if the source is nearly silent (some noise present), because the * threshold is 0, and not configurable. The rationale is that we want to - * guard against connection issues, not source level issues + * guard against connection issues, not source level issues. */ if (die_on_silence && MAX(peak_left, peak_right) == 0) { const unsigned int frame_time_msec = 1000ul * -- cgit v1.2.3