summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-03-17 10:57:08 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-03-17 10:57:08 +0100
commitc31496c68f2bbbab71c15635a2053f52e7a0ecf8 (patch)
tree4067427b706aa72fc0b4a779647852889f0b5b59
parent0a88998e2e183ac62aa3858a17a28ce0303a9780 (diff)
downloaddabmod-c31496c68f2bbbab71c15635a2053f52e7a0ecf8.tar.gz
dabmod-c31496c68f2bbbab71c15635a2053f52e7a0ecf8.tar.bz2
dabmod-c31496c68f2bbbab71c15635a2053f52e7a0ecf8.zip
Add threaded ModCodec and use it for GainControl
-rw-r--r--src/GainControl.cpp6
-rw-r--r--src/GainControl.h7
-rw-r--r--src/ModPlugin.cpp71
-rw-r--r--src/ModPlugin.h31
4 files changed, 107 insertions, 8 deletions
diff --git a/src/GainControl.cpp b/src/GainControl.cpp
index 4a05be1..cf0247d 100644
--- a/src/GainControl.cpp
+++ b/src/GainControl.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -48,7 +48,7 @@ GainControl::GainControl(size_t framesize,
GainMode mode,
float& digGain,
float normalise) :
- ModCodec(),
+ PipelinedModCodec(),
RemoteControllable("gain"),
#ifdef __SSE__
m_frameSize(framesize * sizeof(complexf) / sizeof(__m128)),
@@ -89,7 +89,7 @@ GainControl::~GainControl()
}
-int GainControl::process(Buffer* const dataIn, Buffer* dataOut)
+int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut)
{
PDEBUG("GainControl::process"
"(dataIn: %p, dataOut: %p)\n",
diff --git a/src/GainControl.h b/src/GainControl.h
index a8f8233..d686402 100644
--- a/src/GainControl.h
+++ b/src/GainControl.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2014
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -46,7 +46,7 @@ typedef std::complex<float> complexf;
enum class GainMode { GAIN_FIX = 0, GAIN_MAX = 1, GAIN_VAR = 2 };
-class GainControl : public ModCodec, public RemoteControllable
+class GainControl : public PipelinedModCodec, public RemoteControllable
{
public:
GainControl(size_t framesize,
@@ -58,7 +58,6 @@ class GainControl : public ModCodec, public RemoteControllable
GainControl(const GainControl&);
GainControl& operator=(const GainControl&);
- int process(Buffer* const dataIn, Buffer* dataOut);
const char* name() { return "GainControl"; }
/* Functions for the remote control */
@@ -70,6 +69,8 @@ class GainControl : public ModCodec, public RemoteControllable
virtual const std::string get_parameter(const std::string& parameter) const;
protected:
+ virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) override;
+
size_t m_frameSize;
float& m_digGain;
float m_normalise;
diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp
index 775b284..314a689 100644
--- a/src/ModPlugin.cpp
+++ b/src/ModPlugin.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include "ModPlugin.h"
#include "PcDebug.h"
+#include "Utils.h"
#include <stdexcept>
#include <string>
@@ -71,3 +72,71 @@ int ModOutput::process(
return process(dataIn[0]);
}
+PipelinedModCodec::PipelinedModCodec() :
+ ModCodec(),
+ m_input_queue(),
+ m_output_queue(),
+ m_number_of_runs(0),
+ m_thread(&PipelinedModCodec::process_thread, this)
+{
+}
+
+PipelinedModCodec::~PipelinedModCodec()
+{
+ m_input_queue.push({});
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
+{
+ if (!m_running) {
+ return 0;
+ }
+
+ std::shared_ptr<Buffer> inbuffer =
+ std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData());
+
+ m_input_queue.push(inbuffer);
+
+ if (m_number_of_runs > 0) {
+ std::shared_ptr<Buffer> outbuffer;
+ m_output_queue.wait_and_pop(outbuffer);
+
+ dataOut->setData(outbuffer->getData(), outbuffer->getLength());
+ }
+ else {
+ dataOut->setLength(dataIn->getLength());
+ memset(dataOut->getData(), 0, dataOut->getLength());
+ m_number_of_runs++;
+ }
+
+ return dataOut->getLength();
+
+}
+
+void PipelinedModCodec::process_thread()
+{
+ set_thread_name(name());
+
+ while (m_running) {
+ std::shared_ptr<Buffer> dataIn;
+ m_input_queue.wait_and_pop(dataIn);
+
+ if (!dataIn or dataIn->getLength() == 0) {
+ break;
+ }
+
+ std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>();
+ dataOut->setLength(dataIn->getLength());
+
+ if (internal_process(dataIn.get(), dataOut.get()) == 0) {
+ m_running = false;
+ }
+
+ m_output_queue.push(dataOut);
+ }
+
+ m_running = false;
+}
diff --git a/src/ModPlugin.h b/src/ModPlugin.h
index bdc3843..edb7d2d 100644
--- a/src/ModPlugin.h
+++ b/src/ModPlugin.h
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty
the Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -32,9 +32,12 @@
#include "Buffer.h"
+#include "ThreadsafeQueue.h"
#include <sys/types.h>
#include <vector>
+#include <memory>
+#include <thread>
class ModPlugin
{
@@ -65,6 +68,32 @@ public:
virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0;
};
+class PipelinedModCodec : public ModCodec
+{
+public:
+ PipelinedModCodec();
+ PipelinedModCodec(const PipelinedModCodec&) = delete;
+ PipelinedModCodec& operator=(const PipelinedModCodec&) = delete;
+ PipelinedModCodec(PipelinedModCodec&&) = delete;
+ PipelinedModCodec& operator=(PipelinedModCodec&&) = delete;
+ ~PipelinedModCodec();
+
+ virtual int process(Buffer* const dataIn, Buffer* dataOut) final;
+ virtual const char* name() = 0;
+
+protected:
+ void process_thread(void);
+ virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0;
+
+ ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue;
+ ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue;
+
+ std::atomic<bool> m_running;
+ size_t m_number_of_runs;
+ std::thread m_thread;
+};
+
+
/* Muxes are N-input 1-output flowgraph plugins */
class ModMux : public ModPlugin
{