aboutsummaryrefslogtreecommitdiffstats
path: root/src/ModPlugin.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/ModPlugin.cpp')
-rw-r--r--src/ModPlugin.cpp79
1 files changed, 78 insertions, 1 deletions
diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp
index 775b284..c39d883 100644
--- a/src/ModPlugin.cpp
+++ b/src/ModPlugin.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -26,6 +26,7 @@
#include "ModPlugin.h"
#include "PcDebug.h"
+#include "Utils.h"
#include <stdexcept>
#include <string>
@@ -71,3 +72,79 @@ int ModOutput::process(
return process(dataIn[0]);
}
+PipelinedModCodec::PipelinedModCodec() :
+ ModCodec(),
+ m_number_of_runs(0),
+ m_input_queue(),
+ m_output_queue(),
+ m_running(false),
+ m_thread()
+{
+}
+
+PipelinedModCodec::~PipelinedModCodec()
+{
+ m_input_queue.push({});
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void PipelinedModCodec::start_pipeline_thread()
+{
+ m_running = true;
+ m_thread = std::thread(&PipelinedModCodec::process_thread, this);
+}
+
+int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)
+{
+ if (!m_running) {
+ return 0;
+ }
+
+ std::shared_ptr<Buffer> inbuffer =
+ std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData());
+
+ m_input_queue.push(inbuffer);
+
+ if (m_number_of_runs > 0) {
+ std::shared_ptr<Buffer> outbuffer;
+ m_output_queue.wait_and_pop(outbuffer);
+
+ dataOut->setData(outbuffer->getData(), outbuffer->getLength());
+ }
+ else {
+ dataOut->setLength(dataIn->getLength());
+ memset(dataOut->getData(), 0, dataOut->getLength());
+ m_number_of_runs++;
+ }
+
+ return dataOut->getLength();
+
+}
+
+void PipelinedModCodec::process_thread()
+{
+ set_thread_name(name());
+ set_realtime_prio(1);
+
+ while (m_running) {
+ std::shared_ptr<Buffer> dataIn;
+ m_input_queue.wait_and_pop(dataIn);
+
+ if (!dataIn or dataIn->getLength() == 0) {
+ break;
+ }
+
+ std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>();
+ dataOut->setLength(dataIn->getLength());
+
+ if (internal_process(dataIn.get(), dataOut.get()) == 0) {
+ m_running = false;
+ }
+
+ m_output_queue.push(dataOut);
+ }
+
+ m_running = false;
+}