From 1d833b718845b97a5b1d90f33b547b1772bc0708 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2018 08:49:29 +0100 Subject: Add new flowgraph path for metadata --- src/ModPlugin.h | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'src/ModPlugin.h') diff --git a/src/ModPlugin.h b/src/ModPlugin.h index d3aa780..3c3e8b3 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -119,3 +119,20 @@ public: std::vector dataOut); virtual int process(Buffer* dataIn) = 0; }; + +struct frame_timestamp; +struct flowgraph_metadata { + std::shared_ptr ts; +}; + + +using meta_vec_t = std::vector; + +/* Some ModPlugins also support metadata */ +class ModMetadata { + public: + // Receives metadata from all inputs, and process them, and output + // a sequence of metadata. + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; +}; + -- cgit v1.2.3 From 23b369e1145c5652778345827b7df9c33e09d0e8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 7 Jan 2018 09:21:25 +0100 Subject: Add metadata latency for all PipelinedModCodec --- src/DabMod.cpp | 6 ++++++ src/ModPlugin.cpp | 19 ++++++++++++++++--- src/ModPlugin.h | 52 +++++++++++++++++++++++++++++++--------------------- src/OutputFile.cpp | 22 +++++++++++++--------- src/OutputFile.h | 6 ++++++ 5 files changed, 72 insertions(+), 33 deletions(-) (limited to 'src/ModPlugin.h') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 25a93bf..4a4ea82 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -426,6 +426,12 @@ int launch_modulator(int argc, char* argv[]) } #endif + // TODO remove + auto output_as_file = dynamic_pointer_cast(output); + if (output_as_file) { + output_as_file->setETISource(modulator->getEtiSource()); + } + inputReader->PrintInfo(); run_modulator_state_t st = run_modulator(m); diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index c39d883..f86bfb2 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -74,9 +74,9 @@ int ModOutput::process( PipelinedModCodec::PipelinedModCodec() : ModCodec(), - m_number_of_runs(0), m_input_queue(), m_output_queue(), + m_metadata_fifo(), m_running(false), m_thread() { @@ -107,7 +107,7 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) m_input_queue.push(inbuffer); - if (m_number_of_runs > 0) { + if (m_ready_to_output_data) { std::shared_ptr outbuffer; m_output_queue.wait_and_pop(outbuffer); @@ -116,13 +116,26 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) else { dataOut->setLength(dataIn->getLength()); memset(dataOut->getData(), 0, dataOut->getLength()); - m_number_of_runs++; + m_ready_to_output_data = true; } return dataOut->getLength(); } +meta_vec_t PipelinedModCodec::process_metadata(const meta_vec_t& metadataIn) +{ + m_metadata_fifo.push_back(metadataIn); + if (m_metadata_fifo.size() == 2) { + auto r = std::move(m_metadata_fifo.front()); + m_metadata_fifo.pop_front(); + return r; + } + else { + return {}; + } +} + void PipelinedModCodec::process_thread() { set_thread_name(name()); diff --git a/src/ModPlugin.h b/src/ModPlugin.h index 3c3e8b3..c0f1c1a 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -30,16 +30,34 @@ # include #endif - #include "Buffer.h" #include "ThreadsafeQueue.h" - -#include +#include #include #include #include #include +// All flowgraph elements derive from ModPlugin, or a variant of it. +// Some ModPlugins also support handling metadata. + +struct frame_timestamp; +struct flowgraph_metadata { + std::shared_ptr ts; +}; + +using meta_vec_t = std::vector; + +/* ModPlugins that support metadata derive from ModMetadata */ +class ModMetadata { + public: + // Receives metadata from all inputs, and process them, and output + // a sequence of metadata. + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; +}; + + +/* Abstract base class for all flowgraph elements */ class ModPlugin { public: @@ -69,7 +87,11 @@ public: virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0; }; -class PipelinedModCodec : public ModCodec +/* Pipelined ModCodecs run their processing in a separate thread, and + * have a one-call-to-process() latency. Because of this latency, they + * must also handle the metadata + */ +class PipelinedModCodec : public ModCodec, public ModMetadata { public: PipelinedModCodec(); @@ -82,6 +104,8 @@ public: virtual int process(Buffer* const dataIn, Buffer* dataOut) final; virtual const char* name() = 0; + virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final; + protected: // Once the instance implementing PipelinedModCodec has been constructed, // it must call start_pipeline_thread() @@ -89,11 +113,13 @@ protected: virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0; private: - size_t m_number_of_runs; + bool m_ready_to_output_data = false; ThreadsafeQueue > m_input_queue; ThreadsafeQueue > m_output_queue; + std::deque m_metadata_fifo; + std::atomic m_running; std::thread m_thread; void process_thread(void); @@ -120,19 +146,3 @@ public: virtual int process(Buffer* dataIn) = 0; }; -struct frame_timestamp; -struct flowgraph_metadata { - std::shared_ptr ts; -}; - - -using meta_vec_t = std::vector; - -/* Some ModPlugins also support metadata */ -class ModMetadata { - public: - // Receives metadata from all inputs, and process them, and output - // a sequence of metadata. - virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; -}; - diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index 3bb45c4..481e858 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -77,22 +77,26 @@ meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) for (const auto& md : metadataIn) { if (md.ts) { - ss << "FCT=" << md.ts->fct << + ss << " FCT=" << md.ts->fct << " FP=" << (int)md.ts->fp; - if (md.ts->timestamp_valid) { - ss << " ts=" << md.ts->timestamp_sec << - "+" << md.ts->timestamp_pps << ", "; - } - else { - ss << " no ts"; - } } else { - ss << "void, "; + ss << " void, "; } } + if (myEtiSource) { + frame_timestamp ts; + myEtiSource->calculateTimestamp(ts); + ss << " ETI FCT=" << ts.fct; + } + etiLog.level(debug) << "Output File got metadata: " << ss.str(); return {}; } + +void OutputFile::setETISource(EtiSource *etiSource) +{ + myEtiSource = etiSource; +} diff --git a/src/OutputFile.h b/src/OutputFile.h index 2b92369..97fdcb7 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -32,6 +32,7 @@ #include "ModPlugin.h" +#include "EtiReader.h" #include #include @@ -57,7 +58,12 @@ public: virtual meta_vec_t process_metadata( const meta_vec_t& metadataIn) override; + void setETISource(EtiSource *etiSource); + protected: + // TODO remove + EtiSource *myEtiSource = nullptr; + std::string myFilename; std::unique_ptr myFile; }; -- cgit v1.2.3 From d929ecf1e0196161acb5a106761e8fad7040e9d3 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 14 Jan 2018 07:10:35 +0100 Subject: Make ~PipelinedModcodec virtual, simplify GainControl and FIRFilter declarations --- src/FIRFilter.h | 10 ++++------ src/GainControl.cpp | 4 ---- src/GainControl.h | 6 +++--- src/ModPlugin.h | 3 ++- 4 files changed, 9 insertions(+), 14 deletions(-) (limited to 'src/ModPlugin.h') diff --git a/src/FIRFilter.h b/src/FIRFilter.h index a63bfb9..6ace338 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -53,20 +53,18 @@ class FIRFilter : public PipelinedModCodec, public RemoteControllable { public: FIRFilter(const std::string& taps_file); - virtual ~FIRFilter() = default; - const char* name() { return "FIRFilter"; } + const char* name() override { return "FIRFilter"; } /******* REMOTE CONTROL ********/ virtual void set_parameter(const std::string& parameter, - const std::string& value); + const std::string& value) override; virtual const std::string get_parameter( - const std::string& parameter) const; - + const std::string& parameter) const override; protected: - virtual int internal_process(Buffer* const dataIn, Buffer* dataOut); + virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) override; void load_filter_taps(const std::string &tapsFile); std::string m_taps_file; diff --git a/src/GainControl.cpp b/src/GainControl.cpp index 0411482..dbb9464 100644 --- a/src/GainControl.cpp +++ b/src/GainControl.cpp @@ -73,10 +73,6 @@ GainControl::GainControl(size_t framesize, start_pipeline_thread(); } -GainControl::~GainControl() -{ - PDEBUG("GainControl::~GainControl() @ %p\n", this); -} int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut) { diff --git a/src/GainControl.h b/src/GainControl.h index e9eaa8c..92c905b 100644 --- a/src/GainControl.h +++ b/src/GainControl.h @@ -56,9 +56,9 @@ class GainControl : public PipelinedModCodec, public RemoteControllable float normalise, float varVariance); - virtual ~GainControl(); - GainControl(const GainControl&); - GainControl& operator=(const GainControl&); + virtual ~GainControl() = default; + GainControl(const GainControl&) = delete; + GainControl& operator=(const GainControl&) = delete; const char* name() override { return "GainControl"; } diff --git a/src/ModPlugin.h b/src/ModPlugin.h index c0f1c1a..5635fca 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -65,6 +65,7 @@ public: std::vector dataIn, std::vector dataOut) = 0; virtual const char* name() = 0; + virtual ~ModPlugin() = default; }; /* Inputs are sources, the output buffers without reading any */ @@ -99,7 +100,7 @@ public: PipelinedModCodec& operator=(const PipelinedModCodec&) = delete; PipelinedModCodec(PipelinedModCodec&&) = delete; PipelinedModCodec& operator=(PipelinedModCodec&&) = delete; - ~PipelinedModCodec(); + virtual ~PipelinedModCodec(); virtual int process(Buffer* const dataIn, Buffer* dataOut) final; virtual const char* name() = 0; -- cgit v1.2.3 From db5e90c258f94e65df70830ab1c053debfa15cdb Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 14 Jan 2018 07:43:46 +0100 Subject: Avoid race condition on teardown of pipelined plugins --- src/FIRFilter.cpp | 5 +++++ src/FIRFilter.h | 3 +++ src/GainControl.cpp | 4 ++++ src/GainControl.h | 2 +- src/MemlessPoly.cpp | 5 +++++ src/MemlessPoly.h | 3 +++ src/ModPlugin.cpp | 12 +----------- src/ModPlugin.h | 14 ++++++-------- 8 files changed, 28 insertions(+), 20 deletions(-) (limited to 'src/ModPlugin.h') diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp index bc2314a..96ad1b9 100644 --- a/src/FIRFilter.cpp +++ b/src/FIRFilter.cpp @@ -87,6 +87,11 @@ FIRFilter::FIRFilter(const std::string& taps_file) : start_pipeline_thread(); } +FIRFilter::~FIRFilter() +{ + stop_pipeline_thread(); +} + void FIRFilter::load_filter_taps(const std::string &tapsFile) { std::vector filter_taps; diff --git a/src/FIRFilter.h b/src/FIRFilter.h index 6ace338..d04c456 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -53,6 +53,9 @@ class FIRFilter : public PipelinedModCodec, public RemoteControllable { public: FIRFilter(const std::string& taps_file); + FIRFilter(const FIRFilter& other) = delete; + FIRFilter& operator=(const FIRFilter& other) = delete; + virtual ~FIRFilter(); const char* name() override { return "FIRFilter"; } diff --git a/src/GainControl.cpp b/src/GainControl.cpp index dbb9464..5657fc2 100644 --- a/src/GainControl.cpp +++ b/src/GainControl.cpp @@ -73,6 +73,10 @@ GainControl::GainControl(size_t framesize, start_pipeline_thread(); } +GainControl::~GainControl() +{ + stop_pipeline_thread(); +} int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut) { diff --git a/src/GainControl.h b/src/GainControl.h index 92c905b..b4579cd 100644 --- a/src/GainControl.h +++ b/src/GainControl.h @@ -56,7 +56,7 @@ class GainControl : public PipelinedModCodec, public RemoteControllable float normalise, float varVariance); - virtual ~GainControl() = default; + virtual ~GainControl(); GainControl(const GainControl&) = delete; GainControl& operator=(const GainControl&) = delete; diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index ae097c9..5d1c02d 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -99,6 +99,11 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads start_pipeline_thread(); } +MemlessPoly::~MemlessPoly() +{ + stop_pipeline_thread(); +} + void MemlessPoly::load_coefficients(const std::string &coefFile) { std::ifstream coef_fstream(coefFile.c_str()); diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 612934f..4c67d46 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -59,6 +59,9 @@ class MemlessPoly : public PipelinedModCodec, public RemoteControllable { public: MemlessPoly(const std::string& coefs_file, unsigned int num_threads); + MemlessPoly(const MemlessPoly& other) = delete; + MemlessPoly& operator=(const MemlessPoly& other) = delete; + virtual ~MemlessPoly(); virtual const char* name() { return "MemlessPoly"; } diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index f86bfb2..d567a90 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -72,17 +72,7 @@ int ModOutput::process( return process(dataIn[0]); } -PipelinedModCodec::PipelinedModCodec() : - ModCodec(), - m_input_queue(), - m_output_queue(), - m_metadata_fifo(), - m_running(false), - m_thread() -{ -} - -PipelinedModCodec::~PipelinedModCodec() +void PipelinedModCodec::stop_pipeline_thread() { m_input_queue.push({}); if (m_thread.joinable()) { diff --git a/src/ModPlugin.h b/src/ModPlugin.h index 5635fca..e9cfa21 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -95,13 +95,6 @@ public: class PipelinedModCodec : public ModCodec, public ModMetadata { public: - PipelinedModCodec(); - PipelinedModCodec(const PipelinedModCodec&) = delete; - PipelinedModCodec& operator=(const PipelinedModCodec&) = delete; - PipelinedModCodec(PipelinedModCodec&&) = delete; - PipelinedModCodec& operator=(PipelinedModCodec&&) = delete; - virtual ~PipelinedModCodec(); - virtual int process(Buffer* const dataIn, Buffer* dataOut) final; virtual const char* name() = 0; @@ -111,6 +104,11 @@ protected: // Once the instance implementing PipelinedModCodec has been constructed, // it must call start_pipeline_thread() void start_pipeline_thread(void); + // To avoid race conditions on teardown, plugins must call + // stop_pipeline_thread in their destructor. + void stop_pipeline_thread(void); + + // The real processing must be implemented in internal_process virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0; private: @@ -121,7 +119,7 @@ private: std::deque m_metadata_fifo; - std::atomic m_running; + std::atomic m_running = ATOMIC_VAR_INIT(false); std::thread m_thread; void process_thread(void); }; -- cgit v1.2.3