From c31496c68f2bbbab71c15635a2053f52e7a0ecf8 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 17 Mar 2017 10:57:08 +0100 Subject: Add threaded ModCodec and use it for GainControl --- src/ModPlugin.cpp | 71 ++++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 70 insertions(+), 1 deletion(-) (limited to 'src/ModPlugin.cpp') diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index 775b284..314a689 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,7 @@ #include "ModPlugin.h" #include "PcDebug.h" +#include "Utils.h" #include #include @@ -71,3 +72,71 @@ int ModOutput::process( return process(dataIn[0]); } +PipelinedModCodec::PipelinedModCodec() : + ModCodec(), + m_input_queue(), + m_output_queue(), + m_number_of_runs(0), + m_thread(&PipelinedModCodec::process_thread, this) +{ +} + +PipelinedModCodec::~PipelinedModCodec() +{ + m_input_queue.push({}); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) +{ + if (!m_running) { + return 0; + } + + std::shared_ptr inbuffer = + std::make_shared(dataIn->getLength(), dataIn->getData()); + + m_input_queue.push(inbuffer); + + if (m_number_of_runs > 0) { + std::shared_ptr outbuffer; + m_output_queue.wait_and_pop(outbuffer); + + dataOut->setData(outbuffer->getData(), outbuffer->getLength()); + } + else { + dataOut->setLength(dataIn->getLength()); + memset(dataOut->getData(), 0, dataOut->getLength()); + m_number_of_runs++; + } + + return dataOut->getLength(); + +} + +void PipelinedModCodec::process_thread() +{ + set_thread_name(name()); + + while (m_running) { + std::shared_ptr dataIn; + m_input_queue.wait_and_pop(dataIn); + + if (!dataIn or dataIn->getLength() == 0) { + break; + } + + std::shared_ptr dataOut = std::make_shared(); + dataOut->setLength(dataIn->getLength()); + + if (internal_process(dataIn.get(), dataOut.get()) == 0) { + m_running = false; + } + + m_output_queue.push(dataOut); + } + + m_running = false; +} -- cgit v1.2.3