diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ConfigParser.cpp | 2 | ||||
-rw-r--r-- | src/ConfigParser.h | 1 | ||||
-rw-r--r-- | src/DabMod.cpp | 2 | ||||
-rw-r--r-- | src/DabModulator.cpp | 6 | ||||
-rw-r--r-- | src/DabModulator.h | 4 | ||||
-rw-r--r-- | src/FIRFilter.cpp | 274 | ||||
-rw-r--r-- | src/FIRFilter.h | 69 | ||||
-rw-r--r-- | src/GainControl.cpp | 119 | ||||
-rw-r--r-- | src/GainControl.h | 29 | ||||
-rw-r--r-- | src/ModPlugin.cpp | 79 | ||||
-rw-r--r-- | src/ModPlugin.h | 37 | ||||
-rw-r--r-- | src/Resampler.cpp | 4 |
12 files changed, 355 insertions, 271 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index a1e6e34..393f58a 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -154,6 +154,8 @@ static void parse_configfile( // modulator parameters: const string gainMode_setting = pt.get("modulator.gainmode", "var"); mod_settings.gainMode = parse_gainmode(gainMode_setting); + mod_settings.gainmodeVariance = pt.get("modulator.normalise_variance", + mod_settings.gainmodeVariance); mod_settings.dabMode = pt.get("modulator.mode", mod_settings.dabMode); mod_settings.clockRate = pt.get("modulator.dac_clk_rate", (size_t)0); diff --git a/src/ConfigParser.h b/src/ConfigParser.h index fe48f01..02b798a 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -58,6 +58,7 @@ struct mod_settings_t { float digitalgain = 1.0f; float normalise = 1.0f; GainMode gainMode = GainMode::GAIN_VAR; + float gainmodeVariance = 4.0f; // To handle the timestamp offset of the modulator unsigned tist_delay_stages = 0; diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 4fedac4..4e4cdab 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -321,6 +321,7 @@ int launch_modulator(int argc, char* argv[]) mod_settings.gainMode, mod_settings.digitalgain, mod_settings.normalise, + mod_settings.gainmodeVariance, mod_settings.filterTapsFilename); if (format_converter) { @@ -424,6 +425,7 @@ int launch_modulator(int argc, char* argv[]) mod_settings.gainMode, mod_settings.digitalgain, mod_settings.normalise, + mod_settings.gainmodeVariance, mod_settings.filterTapsFilename); if (format_converter) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 678d175..c41b8fc 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -60,6 +60,7 @@ DabModulator::DabModulator( unsigned outputRate, unsigned clockRate, unsigned dabMode, GainMode gainMode, float& digGain, float normalise, + float gainmodeVariance, const std::string& filterTapsFilename ) : ModInput(), @@ -69,6 +70,7 @@ DabModulator::DabModulator( myGainMode(gainMode), myDigGain(digGain), myNormalise(normalise), + myGainmodeVariance(gainmodeVariance), myEtiSource(etiSource), myFlowgraph(NULL), myFilterTapsFilename(filterTapsFilename), @@ -201,7 +203,7 @@ int DabModulator::process(Buffer* dataOut) (1 + myNbSymbols), myNbCarriers, mySpacing); auto cifGain = make_shared<GainControl>( - mySpacing, myGainMode, myDigGain, myNormalise); + mySpacing, myGainMode, myDigGain, myNormalise, myGainmodeVariance); rcs.enrol(cifGain.get()); diff --git a/src/DabModulator.h b/src/DabModulator.h index d768875..c9bdbe1 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -54,6 +54,7 @@ public: unsigned outputRate, unsigned clockRate, unsigned dabMode, GainMode gainMode, float& digGain, float normalise, + float gainmodeVariance, const std::string& filterTapsFilename); DabModulator(const DabModulator& other) = delete; DabModulator& operator=(const DabModulator& other) = delete; @@ -74,6 +75,7 @@ protected: GainMode myGainMode; float& myDigGain; float myNormalise; + float myGainmodeVariance; EtiSource& myEtiSource; Flowgraph* myFlowgraph; OutputMemory* myOutput; diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp index 2feb702..4296822 100644 --- a/src/FIRFilter.cpp +++ b/src/FIRFilter.cpp @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -47,8 +47,6 @@ using namespace std; -#include <sys/time.h> - /* This is the FIR Filter calculated with the doc/fir-filter/generate-filter.py script * with settings * gain = 1 @@ -72,26 +70,75 @@ static const std::array<float, 45> default_filter_taps({ 0.00184351124335, -0.000187368263141, -0.000840645749122, 0.00120703084394, -0.00110450468492}); -void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) + +FIRFilter::FIRFilter(const std::string& taps_file) : + PipelinedModCodec(), + RemoteControllable("firfilter"), + m_taps_file(taps_file) { - size_t i; - struct timespec time_start; - struct timespec time_end; + PDEBUG("FIRFilter::FIRFilter(%s) @ %p\n", + taps_file.c_str(), this); - set_realtime_prio(1); - set_thread_name("firfilter"); + RC_ADD_PARAMETER(ntaps, "(Read-only) number of filter taps."); + RC_ADD_PARAMETER(tapsfile, "Filename containing filter taps. When written to, the new file gets automatically loaded."); - // This thread creates the dataOut buffer, and deletes - // the incoming buffer + load_filter_taps(m_taps_file); - while(running) { - std::shared_ptr<Buffer> dataIn; - fwd->input_queue.wait_and_pop(dataIn); + start_pipeline_thread(); +} - std::shared_ptr<Buffer> dataOut = make_shared<Buffer>(); - dataOut->setLength(dataIn->getLength()); +void FIRFilter::load_filter_taps(const std::string &tapsFile) +{ + std::vector<float> filter_taps; + if (tapsFile == "default") { + std::copy(default_filter_taps.begin(), default_filter_taps.end(), + std::back_inserter(filter_taps)); + } + else { + std::ifstream taps_fstream(tapsFile.c_str()); + if(!taps_fstream) { + fprintf(stderr, "FIRFilter: file %s could not be opened !\n", tapsFile.c_str()); + throw std::runtime_error("FIRFilter: Could not open file with taps! "); + } + int n_taps; + taps_fstream >> n_taps; + + if (n_taps <= 0) { + fprintf(stderr, "FIRFilter: warning: taps file has invalid format\n"); + throw std::runtime_error("FIRFilter: taps file has invalid format."); + } - PDEBUG("FIRFilterWorker: dataIn->getLength() %zu\n", dataIn->getLength()); + if (n_taps > 100) { + fprintf(stderr, "FIRFilter: warning: taps file has more than 100 taps\n"); + } + + fprintf(stderr, "FIRFilter: Reading %d taps...\n", n_taps); + + filter_taps.resize(n_taps); + + int n; + for (n = 0; n < n_taps; n++) { + taps_fstream >> filter_taps[n]; + PDEBUG("FIRFilter: tap: %f\n", filter_taps[n] ); + if (taps_fstream.eof()) { + fprintf(stderr, "FIRFilter: file %s should contains %d taps, but EOF reached "\ + "after %d taps !\n", tapsFile.c_str(), n_taps, n); + throw std::runtime_error("FIRFilter: filtertaps file invalid ! "); + } + } + } + + { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + m_taps = filter_taps; + } +} + + +int FIRFilter::internal_process(Buffer* const dataIn, Buffer* dataOut) +{ + size_t i; #if __SSE__ // The SSE accelerated version cannot work on the complex values, @@ -108,18 +155,16 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) throw std::runtime_error("FIRFilterWorker: out not aligned"); } - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start); - __m128 SSEout; __m128 SSEtaps; __m128 SSEin; { - boost::mutex::scoped_lock lock(fwd->taps_mutex); + std::lock_guard<std::mutex> lock(m_taps_mutex); - for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 4) { + for (i = 0; i < sizeIn - 2*m_taps.size(); i += 4) { SSEout = _mm_setr_ps(0,0,0,0); - for (size_t j = 0; j < fwd->taps.size(); j++) { + for (size_t j = 0; j < m_taps.size(); j++) { if ((uintptr_t)(&in[i+2*j]) % 16 == 0) { SSEin = _mm_load_ps(&in[i+2*j]); //faster when aligned } @@ -127,7 +172,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) SSEin = _mm_loadu_ps(&in[i+2*j]); } - SSEtaps = _mm_load1_ps(&fwd->taps[j]); + SSEtaps = _mm_load1_ps(&m_taps[j]); SSEout = _mm_add_ps(SSEout, _mm_mul_ps(SSEin, SSEtaps)); } @@ -137,11 +182,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; i+2*j < sizeIn; j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + out[i] += in[i+2*j] * m_taps[j]; } } } - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_end); #else // No SSE ? Loop unrolling should make this faster. As for the SSE, @@ -150,22 +194,20 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) float* out = reinterpret_cast<float*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(float); - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start); - { - boost::mutex::scoped_lock lock(fwd->taps_mutex); + std::lock_guard<std::mutex> lock(m_taps_mutex); // Convolve by aligning both frame and taps at zero. - for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 4) { + for (i = 0; i < sizeIn - 2*m_taps.size(); i += 4) { out[i] = 0.0; out[i+1] = 0.0; out[i+2] = 0.0; out[i+3] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i + 2*j] * fwd->taps[j]; - out[i+1] += in[i+1 + 2*j] * fwd->taps[j]; - out[i+2] += in[i+2 + 2*j] * fwd->taps[j]; - out[i+3] += in[i+3 + 2*j] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i + 2*j] * m_taps[j]; + out[i+1] += in[i+1 + 2*j] * m_taps[j]; + out[i+2] += in[i+2 + 2*j] * m_taps[j]; + out[i+3] += in[i+3 + 2*j] * m_taps[j]; } } @@ -175,14 +217,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; i+2*j < sizeIn; j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + out[i] += in[i+2*j] * m_taps[j]; } } } - - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_end); - - #endif // The following implementations are for debugging only. @@ -192,18 +230,20 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) float* out = reinterpret_cast<float*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(float); - for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 1) { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + for (i = 0; i < sizeIn - 2*m_taps.size(); i += 1) { out[i] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i+2*j] * m_taps[j]; } } for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; i+2*j < sizeIn; j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + out[i] += in[i+2*j] * m_taps[j]; } } @@ -214,24 +254,26 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(complexf); - for (i = 0; i < sizeIn - fwd->taps.size(); i += 4) { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + for (i = 0; i < sizeIn - m_taps.size(); i += 4) { out[i] = 0.0; out[i+1] = 0.0; out[i+2] = 0.0; out[i+3] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i+j ] * fwd->taps[j]; - out[i+1] += in[i+1+j] * fwd->taps[j]; - out[i+2] += in[i+2+j] * fwd->taps[j]; - out[i+3] += in[i+3+j] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i+j ] * m_taps[j]; + out[i+1] += in[i+1+j] * m_taps[j]; + out[i+2] += in[i+2+j] * m_taps[j]; + out[i+3] += in[i+3+j] * m_taps[j]; } } for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; j+i < sizeIn; j++) { - out[i] += in[i+j] * fwd->taps[j]; + out[i] += in[i+j] * m_taps[j]; } } @@ -241,132 +283,25 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(complexf); - for (i = 0; i < sizeIn - fwd->taps.size(); i += 1) { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + for (i = 0; i < sizeIn - m_taps.size(); i += 1) { out[i] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i+j ] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i+j ] * m_taps[j]; } } for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; j+i < sizeIn; j++) { - out[i] += in[i+j] * fwd->taps[j]; + out[i] += in[i+j] * m_taps[j]; } } #endif - calculationTime += (time_end.tv_sec - time_start.tv_sec) * 1000000000L + - time_end.tv_nsec - time_start.tv_nsec; - fwd->output_queue.push(dataOut); - } -} - - -FIRFilter::FIRFilter(const std::string& taps_file) : - ModCodec(), - RemoteControllable("firfilter"), - myTapsFile(taps_file) -{ - PDEBUG("FIRFilter::FIRFilter(%s) @ %p\n", - taps_file.c_str(), this); - - RC_ADD_PARAMETER(ntaps, "(Read-only) number of filter taps."); - RC_ADD_PARAMETER(tapsfile, "Filename containing filter taps. When written to, the new file gets automatically loaded."); - - number_of_runs = 0; - - load_filter_taps(myTapsFile); - - PDEBUG("FIRFilter: Starting worker\n" ); - worker.start(&firwd); -} - -void FIRFilter::load_filter_taps(const std::string &tapsFile) -{ - std::vector<float> filter_taps; - if (tapsFile == "default") { - std::copy(default_filter_taps.begin(), default_filter_taps.end(), - std::back_inserter(filter_taps)); - } - else { - std::ifstream taps_fstream(tapsFile.c_str()); - if(!taps_fstream) { - fprintf(stderr, "FIRFilter: file %s could not be opened !\n", tapsFile.c_str()); - throw std::runtime_error("FIRFilter: Could not open file with taps! "); - } - int n_taps; - taps_fstream >> n_taps; - - if (n_taps <= 0) { - fprintf(stderr, "FIRFilter: warning: taps file has invalid format\n"); - throw std::runtime_error("FIRFilter: taps file has invalid format."); - } - - if (n_taps > 100) { - fprintf(stderr, "FIRFilter: warning: taps file has more than 100 taps\n"); - } - - fprintf(stderr, "FIRFilter: Reading %d taps...\n", n_taps); - - filter_taps.resize(n_taps); - - int n; - for (n = 0; n < n_taps; n++) { - taps_fstream >> filter_taps[n]; - PDEBUG("FIRFilter: tap: %f\n", filter_taps[n] ); - if (taps_fstream.eof()) { - fprintf(stderr, "FIRFilter: file %s should contains %d taps, but EOF reached "\ - "after %d taps !\n", tapsFile.c_str(), n_taps, n); - throw std::runtime_error("FIRFilter: filtertaps file invalid ! "); - } - } - } - - { - boost::mutex::scoped_lock lock(firwd.taps_mutex); - - firwd.taps = filter_taps; - } -} - - -FIRFilter::~FIRFilter() -{ - PDEBUG("FIRFilter::~FIRFilter() @ %p\n", this); - - worker.stop(); -} - - -int FIRFilter::process(Buffer* const dataIn, Buffer* dataOut) -{ - PDEBUG("FIRFilter::process(dataIn: %p, dataOut: %p)\n", - dataIn, dataOut); - - // This thread creates the dataIn buffer, and deletes - // the outgoing buffer - - std::shared_ptr<Buffer> inbuffer = - make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); - - firwd.input_queue.push(inbuffer); - - if (number_of_runs > 2) { - std::shared_ptr<Buffer> outbuffer; - firwd.output_queue.wait_and_pop(outbuffer); - - dataOut->setData(outbuffer->getData(), outbuffer->getLength()); - } - else { - dataOut->setLength(dataIn->getLength()); - memset(dataOut->getData(), 0, dataOut->getLength()); - number_of_runs++; - } - return dataOut->getLength(); - } void FIRFilter::set_parameter(const string& parameter, const string& value) @@ -380,7 +315,7 @@ void FIRFilter::set_parameter(const string& parameter, const string& value) else if (parameter == "tapsfile") { try { load_filter_taps(value); - myTapsFile = value; + m_taps_file = value; } catch (std::runtime_error &e) { throw ParameterError(e.what()); @@ -388,7 +323,8 @@ void FIRFilter::set_parameter(const string& parameter, const string& value) } else { stringstream ss; - ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); throw ParameterError(ss.str()); } } @@ -397,16 +333,16 @@ const string FIRFilter::get_parameter(const string& parameter) const { stringstream ss; if (parameter == "ntaps") { - ss << firwd.taps.size(); + ss << m_taps.size(); } else if (parameter == "tapsfile") { - ss << myTapsFile; + ss << m_taps_file; } else { - ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); throw ParameterError(ss.str()); } return ss.str(); - } diff --git a/src/FIRFilter.h b/src/FIRFilter.h index 209d79d..fb6b4d6 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -2,8 +2,10 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Written by - 2012, Matthias P. Braendli, matthias.braendli@mpb.li + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -28,7 +30,6 @@ # include <config.h> #endif -#include <boost/thread.hpp> #include "RemoteControl.h" #include "ModPlugin.h" @@ -37,6 +38,7 @@ #include <sys/types.h> #include <complex> +#include <thread> #include <vector> #include <time.h> #include <cstdio> @@ -47,62 +49,11 @@ typedef std::complex<float> complexf; -struct FIRFilterWorkerData { - /* Thread-safe queues to give data to and get data from - * the worker - */ - ThreadsafeQueue<std::shared_ptr<Buffer> > input_queue; - ThreadsafeQueue<std::shared_ptr<Buffer> > output_queue; - - /* Remote-control can change the taps while the filter - * runs. This lock makes sure nothing bad happens when - * the taps are being modified - */ - mutable boost::mutex taps_mutex; - std::vector<float> taps; -}; - -class FIRFilterWorker { - public: - FIRFilterWorker () { - running = false; - calculationTime = 0; - } - - ~FIRFilterWorker() { - PDEBUG("~FIRFilterWorker: Total elapsed thread time filtering: %zu\n", calculationTime); - } - - void start(struct FIRFilterWorkerData *firworkerdata) { - running = true; - fir_thread = boost::thread(&FIRFilterWorker::process, this, firworkerdata); - } - - void stop() { - running = false; - fir_thread.interrupt(); - fir_thread.join(); - } - - void process(struct FIRFilterWorkerData *fwd); - - - private: - time_t calculationTime; - bool running; - boost::thread fir_thread; -}; - - -class FIRFilter : public ModCodec, public RemoteControllable +class FIRFilter : public PipelinedModCodec, public RemoteControllable { public: FIRFilter(const std::string& taps_file); - virtual ~FIRFilter(); - FIRFilter(const FIRFilter&); - FIRFilter& operator=(const FIRFilter&); - int process(Buffer* const dataIn, Buffer* dataOut); const char* name() { return "FIRFilter"; } /******* REMOTE CONTROL ********/ @@ -114,12 +65,12 @@ public: protected: + virtual int internal_process(Buffer* const dataIn, Buffer* dataOut); void load_filter_taps(const std::string &tapsFile); - std::string myTapsFile; + std::string m_taps_file; - FIRFilterWorker worker; - int number_of_runs; - struct FIRFilterWorkerData firwd; + mutable std::mutex m_taps_mutex; + std::vector<float> m_taps; }; diff --git a/src/GainControl.cpp b/src/GainControl.cpp index 4a05be1..f363d20 100644 --- a/src/GainControl.cpp +++ b/src/GainControl.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -43,12 +43,14 @@ union __u128 { using namespace std; +static float var_variance; GainControl::GainControl(size_t framesize, GainMode mode, float& digGain, - float normalise) : - ModCodec(), + float normalise, + float varVariance) : + PipelinedModCodec(), RemoteControllable("gain"), #ifdef __SSE__ m_frameSize(framesize * sizeof(complexf) / sizeof(__m128)), @@ -56,40 +58,27 @@ GainControl::GainControl(size_t framesize, m_frameSize(framesize), #endif m_digGain(digGain), - m_normalise(normalise) + m_normalise(normalise), + m_var_variance_rc(varVariance), + m_gainmode(mode), + m_mutex() { PDEBUG("GainControl::GainControl(%zu, %zu) @ %p\n", framesize, (size_t)mode, this); /* register the parameters that can be remote controlled */ RC_ADD_PARAMETER(digital, "Digital Gain"); + RC_ADD_PARAMETER(mode, "Gainmode (fix|max|var)"); + RC_ADD_PARAMETER(var, "Variance setting for gainmode var (default: 4)"); - switch(mode) { - case GainMode::GAIN_FIX: - PDEBUG("Gain mode: fix\n"); - computeGain = computeGainFix; - break; - case GainMode::GAIN_MAX: - PDEBUG("Gain mode: max\n"); - computeGain = computeGainMax; - break; - case GainMode::GAIN_VAR: - PDEBUG("Gain mode: var\n"); - computeGain = computeGainVar; - break; - default: - throw std::runtime_error( - "GainControl::GainControl invalid computation gain mode!"); - } + start_pipeline_thread(); } - GainControl::~GainControl() { PDEBUG("GainControl::~GainControl() @ %p\n", this); } - -int GainControl::process(Buffer* const dataIn, Buffer* dataOut) +int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut) { PDEBUG("GainControl::process" "(dataIn: %p, dataOut: %p)\n", @@ -98,12 +87,41 @@ int GainControl::process(Buffer* const dataIn, Buffer* dataOut) dataOut->setLength(dataIn->getLength()); #ifdef __SSE__ + __m128 (*computeGain)(const __m128* in, size_t sizeIn); +#else + float (*computeGain)(const complexf* in, size_t sizeIn); +#endif + { + std::lock_guard<std::mutex> lock(m_mutex); + + var_variance = m_var_variance_rc; + + switch (m_gainmode) { + case GainMode::GAIN_FIX: + PDEBUG("Gain mode: fix\n"); + computeGain = computeGainFix; + break; + case GainMode::GAIN_MAX: + PDEBUG("Gain mode: max\n"); + computeGain = computeGainMax; + break; + case GainMode::GAIN_VAR: + PDEBUG("Gain mode: var\n"); + computeGain = computeGainVar; + break; + default: + throw std::logic_error("Internal error: invalid gainmode"); + } + } + +#ifdef __SSE__ const __m128* in = reinterpret_cast<const __m128*>(dataIn->getData()); __m128* out = reinterpret_cast<__m128*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(__m128); size_t sizeOut = dataOut->getLength() / sizeof(__m128); __u128 gain128; + if ((sizeIn % m_frameSize) != 0) { PDEBUG("%zu != %zu\n", sizeIn, m_frameSize); throw std::runtime_error( @@ -287,7 +305,7 @@ __m128 GainControl::computeGainVar(const __m128* in, size_t sizeIn) var128.m = _mm_sqrt_ps(var128.m); PDEBUG("********** Var: %10f + %10fj, %10f + %10fj **********\n", var128.f[0], var128.f[1], var128.f[2], var128.f[3]); - var128.m = _mm_mul_ps(var128.m, _mm_set1_ps(4.0f)); + var128.m = _mm_mul_ps(var128.m, _mm_set1_ps(var_variance)); PDEBUG("********** 4*Var: %10f + %10fj, %10f + %10fj **********\n", var128.f[0], var128.f[1], var128.f[2], var128.f[3]); @@ -446,7 +464,7 @@ float GainControl::computeGainVar(const complexf* in, size_t sizeIn) complexf var(sqrt(tmpvar.real()), sqrt(tmpvar.imag())); PDEBUG("********** Var: %10f + %10fj **********\n", var.real(), var.imag()); - var = var * 4.0f; + var = var * var_variance; PDEBUG("********** 4*Var: %10f + %10fj **********\n", var.real(), var.imag()); //////////////////////////////////////////////////////////////////////////// @@ -480,6 +498,39 @@ void GainControl::set_parameter(const string& parameter, const string& value) ss >> new_factor; m_digGain = new_factor; } + else if (parameter == "mode") { + string new_mode; + ss >> new_mode; + std::transform(new_mode.begin(), new_mode.end(), new_mode.begin(), + [](const char c) { return std::tolower(c); } ); + + GainMode m; + if (new_mode == "fix") { + m = GainMode::GAIN_FIX; + } + else if (new_mode == "max") { + m = GainMode::GAIN_MAX; + } + else if (new_mode == "var") { + m = GainMode::GAIN_VAR; + } + else { + throw ParameterError("Gainmode " + new_mode + " unknown"); + } + + { + std::lock_guard<std::mutex> lock(m_mutex); + m_gainmode = m; + } + } + else if (parameter == "var") { + float newvar = 0; + ss >> newvar; + { + std::lock_guard<std::mutex> lock(m_mutex); + m_var_variance_rc = newvar; + } + } else { stringstream ss; ss << "Parameter '" << parameter @@ -494,6 +545,22 @@ const string GainControl::get_parameter(const string& parameter) const if (parameter == "digital") { ss << std::fixed << m_digGain; } + else if (parameter == "mode") { + switch (m_gainmode) { + case GainMode::GAIN_FIX: + ss << "fix"; + break; + case GainMode::GAIN_MAX: + ss << "max"; + break; + case GainMode::GAIN_VAR: + ss << "var"; + break; + } + } + else if (parameter == "var") { + ss << std::fixed << m_var_variance_rc; + } else { ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); diff --git a/src/GainControl.h b/src/GainControl.h index a8f8233..e8f1be9 100644 --- a/src/GainControl.h +++ b/src/GainControl.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2014 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -37,6 +37,7 @@ #include <sys/types.h> #include <complex> #include <string> +#include <mutex> #ifdef __SSE__ # include <xmmintrin.h> #endif @@ -46,40 +47,48 @@ typedef std::complex<float> complexf; enum class GainMode { GAIN_FIX = 0, GAIN_MAX = 1, GAIN_VAR = 2 }; -class GainControl : public ModCodec, public RemoteControllable +class GainControl : public PipelinedModCodec, public RemoteControllable { public: GainControl(size_t framesize, GainMode mode, float& digGain, - float normalise); + float normalise, + float varVariance); virtual ~GainControl(); GainControl(const GainControl&); GainControl& operator=(const GainControl&); - int process(Buffer* const dataIn, Buffer* dataOut); - const char* name() { return "GainControl"; } + const char* name() override { return "GainControl"; } /* Functions for the remote control */ /* Base function to set parameters. */ virtual void set_parameter(const std::string& parameter, - const std::string& value); + const std::string& value) override; /* Getting a parameter always returns a string. */ - virtual const std::string get_parameter(const std::string& parameter) const; + virtual const std::string get_parameter( + const std::string& parameter) const override; protected: + virtual int internal_process( + Buffer* const dataIn, Buffer* dataOut) override; + size_t m_frameSize; float& m_digGain; - float m_normalise; + float m_normalise; + + // The following variables are accessed from the RC thread + float m_var_variance_rc; + GainMode m_gainmode; + mutable std::mutex m_mutex; + #ifdef __SSE__ - __m128 (*computeGain)(const __m128* in, size_t sizeIn); __m128 static computeGainFix(const __m128* in, size_t sizeIn); __m128 static computeGainMax(const __m128* in, size_t sizeIn); __m128 static computeGainVar(const __m128* in, size_t sizeIn); #else - float (*computeGain)(const complexf* in, size_t sizeIn); float static computeGainFix(const complexf* in, size_t sizeIn); float static computeGainMax(const complexf* in, size_t sizeIn); float static computeGainVar(const complexf* in, size_t sizeIn); diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index 775b284..c39d883 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -26,6 +26,7 @@ #include "ModPlugin.h" #include "PcDebug.h" +#include "Utils.h" #include <stdexcept> #include <string> @@ -71,3 +72,79 @@ int ModOutput::process( return process(dataIn[0]); } +PipelinedModCodec::PipelinedModCodec() : + ModCodec(), + m_number_of_runs(0), + m_input_queue(), + m_output_queue(), + m_running(false), + m_thread() +{ +} + +PipelinedModCodec::~PipelinedModCodec() +{ + m_input_queue.push({}); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void PipelinedModCodec::start_pipeline_thread() +{ + m_running = true; + m_thread = std::thread(&PipelinedModCodec::process_thread, this); +} + +int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut) +{ + if (!m_running) { + return 0; + } + + std::shared_ptr<Buffer> inbuffer = + std::make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); + + m_input_queue.push(inbuffer); + + if (m_number_of_runs > 0) { + std::shared_ptr<Buffer> outbuffer; + m_output_queue.wait_and_pop(outbuffer); + + dataOut->setData(outbuffer->getData(), outbuffer->getLength()); + } + else { + dataOut->setLength(dataIn->getLength()); + memset(dataOut->getData(), 0, dataOut->getLength()); + m_number_of_runs++; + } + + return dataOut->getLength(); + +} + +void PipelinedModCodec::process_thread() +{ + set_thread_name(name()); + set_realtime_prio(1); + + while (m_running) { + std::shared_ptr<Buffer> dataIn; + m_input_queue.wait_and_pop(dataIn); + + if (!dataIn or dataIn->getLength() == 0) { + break; + } + + std::shared_ptr<Buffer> dataOut = std::make_shared<Buffer>(); + dataOut->setLength(dataIn->getLength()); + + if (internal_process(dataIn.get(), dataOut.get()) == 0) { + m_running = false; + } + + m_output_queue.push(dataOut); + } + + m_running = false; +} diff --git a/src/ModPlugin.h b/src/ModPlugin.h index bdc3843..d3aa780 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -2,7 +2,7 @@ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -32,9 +32,13 @@ #include "Buffer.h" +#include "ThreadsafeQueue.h" #include <sys/types.h> #include <vector> +#include <memory> +#include <thread> +#include <atomic> class ModPlugin { @@ -65,6 +69,37 @@ public: virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0; }; +class PipelinedModCodec : public ModCodec +{ +public: + PipelinedModCodec(); + PipelinedModCodec(const PipelinedModCodec&) = delete; + PipelinedModCodec& operator=(const PipelinedModCodec&) = delete; + PipelinedModCodec(PipelinedModCodec&&) = delete; + PipelinedModCodec& operator=(PipelinedModCodec&&) = delete; + ~PipelinedModCodec(); + + virtual int process(Buffer* const dataIn, Buffer* dataOut) final; + virtual const char* name() = 0; + +protected: + // Once the instance implementing PipelinedModCodec has been constructed, + // it must call start_pipeline_thread() + void start_pipeline_thread(void); + virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0; + +private: + size_t m_number_of_runs; + + ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue; + ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue; + + std::atomic<bool> m_running; + std::thread m_thread; + void process_thread(void); +}; + + /* Muxes are N-input 1-output flowgraph plugins */ class ModMux : public ModPlugin { diff --git a/src/Resampler.cpp b/src/Resampler.cpp index ee2b865..8786e91 100644 --- a/src/Resampler.cpp +++ b/src/Resampler.cpp @@ -76,9 +76,9 @@ Resampler::Resampler(size_t inputRate, size_t outputRate, size_t resolution) : PDEBUG(" FFT size in: %zu, FFT size out: %zu\n", myFftSizeIn, myFftSizeOut); if (myFftSizeIn > myFftSizeOut) { - myFactor = 1.0f / myFftSizeIn; + myFactor = 1.0f / myFftSizeIn * outputRate / inputRate; } else { - myFactor = 1.0f / myFftSizeOut; + myFactor = 1.0f / myFftSizeOut * outputRate / inputRate; } myWindow = (float*)memalign(16, myFftSizeIn * sizeof(float)); |