diff options
-rw-r--r-- | src/FIRFilter.cpp | 264 | ||||
-rw-r--r-- | src/FIRFilter.h | 69 |
2 files changed, 113 insertions, 220 deletions
diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp index 2feb702..6331b73 100644 --- a/src/FIRFilter.cpp +++ b/src/FIRFilter.cpp @@ -2,7 +2,7 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2017 Matthias P. Braendli, matthias.braendli@mpb.li http://opendigitalradio.org @@ -72,26 +72,73 @@ static const std::array<float, 45> default_filter_taps({ 0.00184351124335, -0.000187368263141, -0.000840645749122, 0.00120703084394, -0.00110450468492}); -void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) + +FIRFilter::FIRFilter(const std::string& taps_file) : + PipelinedModCodec(), + RemoteControllable("firfilter"), + m_taps_file(taps_file) { - size_t i; - struct timespec time_start; - struct timespec time_end; + PDEBUG("FIRFilter::FIRFilter(%s) @ %p\n", + taps_file.c_str(), this); + + RC_ADD_PARAMETER(ntaps, "(Read-only) number of filter taps."); + RC_ADD_PARAMETER(tapsfile, "Filename containing filter taps. When written to, the new file gets automatically loaded."); + + load_filter_taps(m_taps_file); +} - set_realtime_prio(1); - set_thread_name("firfilter"); +void FIRFilter::load_filter_taps(const std::string &tapsFile) +{ + std::vector<float> filter_taps; + if (tapsFile == "default") { + std::copy(default_filter_taps.begin(), default_filter_taps.end(), + std::back_inserter(filter_taps)); + } + else { + std::ifstream taps_fstream(tapsFile.c_str()); + if(!taps_fstream) { + fprintf(stderr, "FIRFilter: file %s could not be opened !\n", tapsFile.c_str()); + throw std::runtime_error("FIRFilter: Could not open file with taps! "); + } + int n_taps; + taps_fstream >> n_taps; - // This thread creates the dataOut buffer, and deletes - // the incoming buffer + if (n_taps <= 0) { + fprintf(stderr, "FIRFilter: warning: taps file has invalid format\n"); + throw std::runtime_error("FIRFilter: taps file has invalid format."); + } - while(running) { - std::shared_ptr<Buffer> dataIn; - fwd->input_queue.wait_and_pop(dataIn); + if (n_taps > 100) { + fprintf(stderr, "FIRFilter: warning: taps file has more than 100 taps\n"); + } - std::shared_ptr<Buffer> dataOut = make_shared<Buffer>(); - dataOut->setLength(dataIn->getLength()); + fprintf(stderr, "FIRFilter: Reading %d taps...\n", n_taps); - PDEBUG("FIRFilterWorker: dataIn->getLength() %zu\n", dataIn->getLength()); + filter_taps.resize(n_taps); + + int n; + for (n = 0; n < n_taps; n++) { + taps_fstream >> filter_taps[n]; + PDEBUG("FIRFilter: tap: %f\n", filter_taps[n] ); + if (taps_fstream.eof()) { + fprintf(stderr, "FIRFilter: file %s should contains %d taps, but EOF reached "\ + "after %d taps !\n", tapsFile.c_str(), n_taps, n); + throw std::runtime_error("FIRFilter: filtertaps file invalid ! "); + } + } + } + + { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + m_taps = filter_taps; + } +} + + +int FIRFilter::internal_process(Buffer* const dataIn, Buffer* dataOut) +{ + size_t i; #if __SSE__ // The SSE accelerated version cannot work on the complex values, @@ -108,18 +155,16 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) throw std::runtime_error("FIRFilterWorker: out not aligned"); } - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start); - __m128 SSEout; __m128 SSEtaps; __m128 SSEin; { - boost::mutex::scoped_lock lock(fwd->taps_mutex); + std::lock_guard<std::mutex> lock(m_taps_mutex); - for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 4) { + for (i = 0; i < sizeIn - 2*m_taps.size(); i += 4) { SSEout = _mm_setr_ps(0,0,0,0); - for (size_t j = 0; j < fwd->taps.size(); j++) { + for (size_t j = 0; j < m_taps.size(); j++) { if ((uintptr_t)(&in[i+2*j]) % 16 == 0) { SSEin = _mm_load_ps(&in[i+2*j]); //faster when aligned } @@ -127,7 +172,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) SSEin = _mm_loadu_ps(&in[i+2*j]); } - SSEtaps = _mm_load1_ps(&fwd->taps[j]); + SSEtaps = _mm_load1_ps(&m_taps[j]); SSEout = _mm_add_ps(SSEout, _mm_mul_ps(SSEin, SSEtaps)); } @@ -137,11 +182,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; i+2*j < sizeIn; j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + out[i] += in[i+2*j] * m_taps[j]; } } } - clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_end); #else // No SSE ? Loop unrolling should make this faster. As for the SSE, @@ -153,19 +197,19 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start); { - boost::mutex::scoped_lock lock(fwd->taps_mutex); + std::lock_guard<std::mutex> lock(m_taps_mutex); // Convolve by aligning both frame and taps at zero. - for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 4) { + for (i = 0; i < sizeIn - 2*m_taps.size(); i += 4) { out[i] = 0.0; out[i+1] = 0.0; out[i+2] = 0.0; out[i+3] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i + 2*j] * fwd->taps[j]; - out[i+1] += in[i+1 + 2*j] * fwd->taps[j]; - out[i+2] += in[i+2 + 2*j] * fwd->taps[j]; - out[i+3] += in[i+3 + 2*j] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i + 2*j] * m_taps[j]; + out[i+1] += in[i+1 + 2*j] * m_taps[j]; + out[i+2] += in[i+2 + 2*j] * m_taps[j]; + out[i+3] += in[i+3 + 2*j] * m_taps[j]; } } @@ -175,7 +219,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; i+2*j < sizeIn; j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + out[i] += in[i+2*j] * m_taps[j]; } } } @@ -192,18 +236,20 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) float* out = reinterpret_cast<float*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(float); - for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 1) { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + for (i = 0; i < sizeIn - 2*m_taps.size(); i += 1) { out[i] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i+2*j] * m_taps[j]; } } for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; i+2*j < sizeIn; j++) { - out[i] += in[i+2*j] * fwd->taps[j]; + out[i] += in[i+2*j] * m_taps[j]; } } @@ -214,24 +260,26 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(complexf); - for (i = 0; i < sizeIn - fwd->taps.size(); i += 4) { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + for (i = 0; i < sizeIn - m_taps.size(); i += 4) { out[i] = 0.0; out[i+1] = 0.0; out[i+2] = 0.0; out[i+3] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i+j ] * fwd->taps[j]; - out[i+1] += in[i+1+j] * fwd->taps[j]; - out[i+2] += in[i+2+j] * fwd->taps[j]; - out[i+3] += in[i+3+j] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i+j ] * m_taps[j]; + out[i+1] += in[i+1+j] * m_taps[j]; + out[i+2] += in[i+2+j] * m_taps[j]; + out[i+3] += in[i+3+j] * m_taps[j]; } } for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; j+i < sizeIn; j++) { - out[i] += in[i+j] * fwd->taps[j]; + out[i] += in[i+j] * m_taps[j]; } } @@ -241,132 +289,25 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd) complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); size_t sizeIn = dataIn->getLength() / sizeof(complexf); - for (i = 0; i < sizeIn - fwd->taps.size(); i += 1) { + std::lock_guard<std::mutex> lock(m_taps_mutex); + + for (i = 0; i < sizeIn - m_taps.size(); i += 1) { out[i] = 0.0; - for (size_t j = 0; j < fwd->taps.size(); j++) { - out[i] += in[i+j ] * fwd->taps[j]; + for (size_t j = 0; j < m_taps.size(); j++) { + out[i] += in[i+j ] * m_taps[j]; } } for (; i < sizeIn; i++) { out[i] = 0.0; for (int j = 0; j+i < sizeIn; j++) { - out[i] += in[i+j] * fwd->taps[j]; + out[i] += in[i+j] * m_taps[j]; } } #endif - calculationTime += (time_end.tv_sec - time_start.tv_sec) * 1000000000L + - time_end.tv_nsec - time_start.tv_nsec; - fwd->output_queue.push(dataOut); - } -} - - -FIRFilter::FIRFilter(const std::string& taps_file) : - ModCodec(), - RemoteControllable("firfilter"), - myTapsFile(taps_file) -{ - PDEBUG("FIRFilter::FIRFilter(%s) @ %p\n", - taps_file.c_str(), this); - - RC_ADD_PARAMETER(ntaps, "(Read-only) number of filter taps."); - RC_ADD_PARAMETER(tapsfile, "Filename containing filter taps. When written to, the new file gets automatically loaded."); - - number_of_runs = 0; - - load_filter_taps(myTapsFile); - - PDEBUG("FIRFilter: Starting worker\n" ); - worker.start(&firwd); -} - -void FIRFilter::load_filter_taps(const std::string &tapsFile) -{ - std::vector<float> filter_taps; - if (tapsFile == "default") { - std::copy(default_filter_taps.begin(), default_filter_taps.end(), - std::back_inserter(filter_taps)); - } - else { - std::ifstream taps_fstream(tapsFile.c_str()); - if(!taps_fstream) { - fprintf(stderr, "FIRFilter: file %s could not be opened !\n", tapsFile.c_str()); - throw std::runtime_error("FIRFilter: Could not open file with taps! "); - } - int n_taps; - taps_fstream >> n_taps; - - if (n_taps <= 0) { - fprintf(stderr, "FIRFilter: warning: taps file has invalid format\n"); - throw std::runtime_error("FIRFilter: taps file has invalid format."); - } - - if (n_taps > 100) { - fprintf(stderr, "FIRFilter: warning: taps file has more than 100 taps\n"); - } - - fprintf(stderr, "FIRFilter: Reading %d taps...\n", n_taps); - - filter_taps.resize(n_taps); - - int n; - for (n = 0; n < n_taps; n++) { - taps_fstream >> filter_taps[n]; - PDEBUG("FIRFilter: tap: %f\n", filter_taps[n] ); - if (taps_fstream.eof()) { - fprintf(stderr, "FIRFilter: file %s should contains %d taps, but EOF reached "\ - "after %d taps !\n", tapsFile.c_str(), n_taps, n); - throw std::runtime_error("FIRFilter: filtertaps file invalid ! "); - } - } - } - - { - boost::mutex::scoped_lock lock(firwd.taps_mutex); - - firwd.taps = filter_taps; - } -} - - -FIRFilter::~FIRFilter() -{ - PDEBUG("FIRFilter::~FIRFilter() @ %p\n", this); - - worker.stop(); -} - - -int FIRFilter::process(Buffer* const dataIn, Buffer* dataOut) -{ - PDEBUG("FIRFilter::process(dataIn: %p, dataOut: %p)\n", - dataIn, dataOut); - - // This thread creates the dataIn buffer, and deletes - // the outgoing buffer - - std::shared_ptr<Buffer> inbuffer = - make_shared<Buffer>(dataIn->getLength(), dataIn->getData()); - - firwd.input_queue.push(inbuffer); - - if (number_of_runs > 2) { - std::shared_ptr<Buffer> outbuffer; - firwd.output_queue.wait_and_pop(outbuffer); - - dataOut->setData(outbuffer->getData(), outbuffer->getLength()); - } - else { - dataOut->setLength(dataIn->getLength()); - memset(dataOut->getData(), 0, dataOut->getLength()); - number_of_runs++; - } - return dataOut->getLength(); - } void FIRFilter::set_parameter(const string& parameter, const string& value) @@ -380,7 +321,7 @@ void FIRFilter::set_parameter(const string& parameter, const string& value) else if (parameter == "tapsfile") { try { load_filter_taps(value); - myTapsFile = value; + m_taps_file = value; } catch (std::runtime_error &e) { throw ParameterError(e.what()); @@ -388,7 +329,8 @@ void FIRFilter::set_parameter(const string& parameter, const string& value) } else { stringstream ss; - ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); throw ParameterError(ss.str()); } } @@ -397,16 +339,16 @@ const string FIRFilter::get_parameter(const string& parameter) const { stringstream ss; if (parameter == "ntaps") { - ss << firwd.taps.size(); + ss << m_taps.size(); } else if (parameter == "tapsfile") { - ss << myTapsFile; + ss << m_taps_file; } else { - ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name(); + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); throw ParameterError(ss.str()); } return ss.str(); - } diff --git a/src/FIRFilter.h b/src/FIRFilter.h index 209d79d..1fe0004 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -2,8 +2,10 @@ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Written by - 2012, Matthias P. Braendli, matthias.braendli@mpb.li + Copyright (C) 2017 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -28,7 +30,6 @@ # include <config.h> #endif -#include <boost/thread.hpp> #include "RemoteControl.h" #include "ModPlugin.h" @@ -37,6 +38,7 @@ #include <sys/types.h> #include <complex> +#include <thread> #include <vector> #include <time.h> #include <cstdio> @@ -47,62 +49,11 @@ typedef std::complex<float> complexf; -struct FIRFilterWorkerData { - /* Thread-safe queues to give data to and get data from - * the worker - */ - ThreadsafeQueue<std::shared_ptr<Buffer> > input_queue; - ThreadsafeQueue<std::shared_ptr<Buffer> > output_queue; - - /* Remote-control can change the taps while the filter - * runs. This lock makes sure nothing bad happens when - * the taps are being modified - */ - mutable boost::mutex taps_mutex; - std::vector<float> taps; -}; - -class FIRFilterWorker { - public: - FIRFilterWorker () { - running = false; - calculationTime = 0; - } - - ~FIRFilterWorker() { - PDEBUG("~FIRFilterWorker: Total elapsed thread time filtering: %zu\n", calculationTime); - } - - void start(struct FIRFilterWorkerData *firworkerdata) { - running = true; - fir_thread = boost::thread(&FIRFilterWorker::process, this, firworkerdata); - } - - void stop() { - running = false; - fir_thread.interrupt(); - fir_thread.join(); - } - - void process(struct FIRFilterWorkerData *fwd); - - - private: - time_t calculationTime; - bool running; - boost::thread fir_thread; -}; - - -class FIRFilter : public ModCodec, public RemoteControllable +class FIRFilter : public PipelinedModCodec, public RemoteControllable { public: FIRFilter(const std::string& taps_file); - virtual ~FIRFilter(); - FIRFilter(const FIRFilter&); - FIRFilter& operator=(const FIRFilter&); - int process(Buffer* const dataIn, Buffer* dataOut); const char* name() { return "FIRFilter"; } /******* REMOTE CONTROL ********/ @@ -114,12 +65,12 @@ public: protected: + int internal_process(Buffer* const dataIn, Buffer* dataOut); void load_filter_taps(const std::string &tapsFile); - std::string myTapsFile; + std::string m_taps_file; - FIRFilterWorker worker; - int number_of_runs; - struct FIRFilterWorkerData firwd; + mutable std::mutex m_taps_mutex; + std::vector<float> m_taps; }; |