diff options
Diffstat (limited to 'src/ModPlugin.cpp')
-rw-r--r-- | src/ModPlugin.cpp | 79 |
1 files changed, 78 insertions, 1 deletions
diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index 775b284..c39d883 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,7 @@ #include "ModPlugin.h" #include "PcDebug.h" +#include "Utils.h" #include <stdexcept> #include <string> @@ -71,3 +72,79 @@ int ModOutput::process( return process(dataIn[0]); } +PipelinedModCodec::PipelinedModCodec() : + ModCodec(), + m_number_of_runs(0), + m_input_queue(), + m_output_queue(), + m_running(false), + m_thread() +{ +} + +PipelinedModCodec::~PipelinedModCodec() +{ + m_input_queue.push({}); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void PipelinedModCodec::start_pipeline_thread() +{ + m_running = true; + m_thread = std::thread(&PipelinedModCodec::process_thread, this); +} + +int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) +{ + if (!m_running) { + return 0; + } + + std::shared_ptr<Buffer> inbuffer = + std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); + + m_input_queue.push(inbuffer); + + if (m_number_of_runs > 0) { + std::shared_ptr<Buffer> outbuffer; + m_output_queue.wait_and_pop(outbuffer); + + dataOut->setData(outbuffer->getData(), outbuffer->getLength()); + } + else { + dataOut->setLength(dataIn->getLength()); + memset(dataOut->getData(), 0, dataOut->getLength()); + m_number_of_runs++; + } + + return dataOut->getLength(); + +} + +void PipelinedModCodec::process_thread() +{ + set_thread_name(name()); + set_realtime_prio(1); + + while (m_running) { + std::shared_ptr<Buffer> dataIn; + m_input_queue.wait_and_pop(dataIn); + + if (!dataIn or dataIn->getLength() == 0) { + break; + } + + std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>(); + dataOut->setLength(dataIn->getLength()); + + if (internal_process(dataIn.get(), dataOut.get()) == 0) { + m_running = false; + } + + m_output_queue.push(dataOut); + } + + m_running = false; +} |