diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-03-17 10:57:08 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-03-17 10:57:08 +0100 |
commit | c31496c68f2bbbab71c15635a2053f52e7a0ecf8 (patch) | |
tree | 4067427b706aa72fc0b4a779647852889f0b5b59 | |
parent | 0a88998e2e183ac62aa3858a17a28ce0303a9780 (diff) | |
download | dabmod-c31496c68f2bbbab71c15635a2053f52e7a0ecf8.tar.gz dabmod-c31496c68f2bbbab71c15635a2053f52e7a0ecf8.tar.bz2 dabmod-c31496c68f2bbbab71c15635a2053f52e7a0ecf8.zip |
Add threaded ModCodec and use it for GainControl
-rw-r--r-- | src/GainControl.cpp | 6 | ||||
-rw-r--r-- | src/GainControl.h | 7 | ||||
-rw-r--r-- | src/ModPlugin.cpp | 71 | ||||
-rw-r--r-- | src/ModPlugin.h | 31 |
4 files changed, 107 insertions, 8 deletions
diff --git a/src/GainControl.cpp b/src/GainControl.cpp index 4a05be1..cf0247d 100644 --- a/src/GainControl.cpp +++ b/src/GainControl.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -48,7 +48,7 @@ GainControl::GainControl(size_t framesize, GainMode mode, float& digGain, float normalise) : - ModCodec(), + PipelinedModCodec(), RemoteControllable("gain"), #ifdef __SSE__ m_frameSize(framesize * sizeof(complexf) / sizeof(__m128)), @@ -89,7 +89,7 @@ GainControl::~GainControl() } -int GainControl::process(Buffer* const dataIn, Buffer* dataOut) +int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut) { PDEBUG("GainControl::process" "(dataIn: %p, dataOut: %p)\n", diff --git a/src/GainControl.h b/src/GainControl.h index a8f8233..d686402 100644 --- a/src/GainControl.h +++ b/src/GainControl.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -46,7 +46,7 @@ typedef std::complex<float> complexf; enum class GainMode { GAIN_FIX = 0, GAIN_MAX = 1, GAIN_VAR = 2 }; -class GainControl : public ModCodec, public RemoteControllable +class GainControl : public PipelinedModCodec, public RemoteControllable { public: GainControl(size_t framesize, @@ -58,7 +58,6 @@ class GainControl : public ModCodec, public RemoteControllable GainControl(const GainControl&); GainControl& operator=(const GainControl&); - int process(Buffer* const dataIn, Buffer* dataOut); const char* name() { return "GainControl"; } /* Functions for the remote control */ @@ -70,6 +69,8 @@ class GainControl : public ModCodec, public RemoteControllable virtual const std::string get_parameter(const std::string& parameter) const; protected: + virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) override; + size_t m_frameSize; float& m_digGain; float m_normalise; diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index 775b284..314a689 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,7 @@ #include "ModPlugin.h" #include "PcDebug.h" +#include "Utils.h" #include <stdexcept> #include <string> @@ -71,3 +72,71 @@ int ModOutput::process( return process(dataIn[0]); } +PipelinedModCodec::PipelinedModCodec() : + ModCodec(), + m_input_queue(), + m_output_queue(), + m_number_of_runs(0), + m_thread(&PipelinedModCodec::process_thread, this) +{ +} + +PipelinedModCodec::~PipelinedModCodec() +{ + m_input_queue.push({}); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) +{ + if (!m_running) { + return 0; + } + + std::shared_ptr<Buffer> inbuffer = + std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); + + m_input_queue.push(inbuffer); + + if (m_number_of_runs > 0) { + std::shared_ptr<Buffer> outbuffer; + m_output_queue.wait_and_pop(outbuffer); + + dataOut->setData(outbuffer->getData(), outbuffer->getLength()); + } + else { + dataOut->setLength(dataIn->getLength()); + memset(dataOut->getData(), 0, dataOut->getLength()); + m_number_of_runs++; + } + + return dataOut->getLength(); + +} + +void PipelinedModCodec::process_thread() +{ + set_thread_name(name()); + + while (m_running) { + std::shared_ptr<Buffer> dataIn; + m_input_queue.wait_and_pop(dataIn); + + if (!dataIn or dataIn->getLength() == 0) { + break; + } + + std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>(); + dataOut->setLength(dataIn->getLength()); + + if (internal_process(dataIn.get(), dataOut.get()) == 0) { + m_running = false; + } + + m_output_queue.push(dataOut); + } + + m_running = false; +} diff --git a/src/ModPlugin.h b/src/ModPlugin.h index bdc3843..edb7d2d 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -32,9 +32,12 @@ #include "Buffer.h" +#include "ThreadsafeQueue.h" #include <sys/types.h> #include <vector> +#include <memory> +#include <thread> class ModPlugin { @@ -65,6 +68,32 @@ public: virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0; }; +class PipelinedModCodec : public ModCodec +{ +public: + PipelinedModCodec(); + PipelinedModCodec(const PipelinedModCodec&) = delete; + PipelinedModCodec& operator=(const PipelinedModCodec&) = delete; + PipelinedModCodec(PipelinedModCodec&&) = delete; + PipelinedModCodec& operator=(PipelinedModCodec&&) = delete; + ~PipelinedModCodec(); + + virtual int process(Buffer* const dataIn, Buffer* dataOut) final; + virtual const char* name() = 0; + +protected: + void process_thread(void); + virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0; + + ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue; + ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue; + + std::atomic<bool> m_running; + size_t m_number_of_runs; + std::thread m_thread; +}; + + /* Muxes are N-input 1-output flowgraph plugins */ class ModMux : public ModPlugin { |