summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-03-17 14:14:31 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-03-17 14:14:31 +0100
commit937473ba892e1965be1614f95b32a22e3eb8ba4e (patch)
tree0ea627b3a7c828494d35ebd797d3fe0a5794bd87
parent987a31954ea574e04c2e79fe3765448c1b607e49 (diff)
downloaddabmod-937473ba892e1965be1614f95b32a22e3eb8ba4e.tar.gz
dabmod-937473ba892e1965be1614f95b32a22e3eb8ba4e.tar.bz2
dabmod-937473ba892e1965be1614f95b32a22e3eb8ba4e.zip
Simplify FIRFilter and make it use PipelinedModCodec
-rw-r--r--src/FIRFilter.cpp264
-rw-r--r--src/FIRFilter.h69
2 files changed, 113 insertions, 220 deletions
diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp
index 2feb702..6331b73 100644
--- a/src/FIRFilter.cpp
+++ b/src/FIRFilter.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -72,26 +72,73 @@ static const std::array<float, 45> default_filter_taps({
0.00184351124335, -0.000187368263141, -0.000840645749122, 0.00120703084394,
-0.00110450468492});
-void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
+
+FIRFilter::FIRFilter(const std::string& taps_file) :
+ PipelinedModCodec(),
+ RemoteControllable("firfilter"),
+ m_taps_file(taps_file)
{
- size_t i;
- struct timespec time_start;
- struct timespec time_end;
+ PDEBUG("FIRFilter::FIRFilter(%s) @ %p\n",
+ taps_file.c_str(), this);
+
+ RC_ADD_PARAMETER(ntaps, "(Read-only) number of filter taps.");
+ RC_ADD_PARAMETER(tapsfile, "Filename containing filter taps. When written to, the new file gets automatically loaded.");
+
+ load_filter_taps(m_taps_file);
+}
- set_realtime_prio(1);
- set_thread_name("firfilter");
+void FIRFilter::load_filter_taps(const std::string &tapsFile)
+{
+ std::vector<float> filter_taps;
+ if (tapsFile == "default") {
+ std::copy(default_filter_taps.begin(), default_filter_taps.end(),
+ std::back_inserter(filter_taps));
+ }
+ else {
+ std::ifstream taps_fstream(tapsFile.c_str());
+ if(!taps_fstream) {
+ fprintf(stderr, "FIRFilter: file %s could not be opened !\n", tapsFile.c_str());
+ throw std::runtime_error("FIRFilter: Could not open file with taps! ");
+ }
+ int n_taps;
+ taps_fstream >> n_taps;
- // This thread creates the dataOut buffer, and deletes
- // the incoming buffer
+ if (n_taps <= 0) {
+ fprintf(stderr, "FIRFilter: warning: taps file has invalid format\n");
+ throw std::runtime_error("FIRFilter: taps file has invalid format.");
+ }
- while(running) {
- std::shared_ptr<Buffer> dataIn;
- fwd->input_queue.wait_and_pop(dataIn);
+ if (n_taps > 100) {
+ fprintf(stderr, "FIRFilter: warning: taps file has more than 100 taps\n");
+ }
- std::shared_ptr<Buffer> dataOut = make_shared<Buffer>();
- dataOut->setLength(dataIn->getLength());
+ fprintf(stderr, "FIRFilter: Reading %d taps...\n", n_taps);
- PDEBUG("FIRFilterWorker: dataIn->getLength() %zu\n", dataIn->getLength());
+ filter_taps.resize(n_taps);
+
+ int n;
+ for (n = 0; n < n_taps; n++) {
+ taps_fstream >> filter_taps[n];
+ PDEBUG("FIRFilter: tap: %f\n", filter_taps[n] );
+ if (taps_fstream.eof()) {
+ fprintf(stderr, "FIRFilter: file %s should contains %d taps, but EOF reached "\
+ "after %d taps !\n", tapsFile.c_str(), n_taps, n);
+ throw std::runtime_error("FIRFilter: filtertaps file invalid ! ");
+ }
+ }
+ }
+
+ {
+ std::lock_guard<std::mutex> lock(m_taps_mutex);
+
+ m_taps = filter_taps;
+ }
+}
+
+
+int FIRFilter::internal_process(Buffer* const dataIn, Buffer* dataOut)
+{
+ size_t i;
#if __SSE__
// The SSE accelerated version cannot work on the complex values,
@@ -108,18 +155,16 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
throw std::runtime_error("FIRFilterWorker: out not aligned");
}
- clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start);
-
__m128 SSEout;
__m128 SSEtaps;
__m128 SSEin;
{
- boost::mutex::scoped_lock lock(fwd->taps_mutex);
+ std::lock_guard<std::mutex> lock(m_taps_mutex);
- for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 4) {
+ for (i = 0; i < sizeIn - 2*m_taps.size(); i += 4) {
SSEout = _mm_setr_ps(0,0,0,0);
- for (size_t j = 0; j < fwd->taps.size(); j++) {
+ for (size_t j = 0; j < m_taps.size(); j++) {
if ((uintptr_t)(&in[i+2*j]) % 16 == 0) {
SSEin = _mm_load_ps(&in[i+2*j]); //faster when aligned
}
@@ -127,7 +172,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
SSEin = _mm_loadu_ps(&in[i+2*j]);
}
- SSEtaps = _mm_load1_ps(&fwd->taps[j]);
+ SSEtaps = _mm_load1_ps(&m_taps[j]);
SSEout = _mm_add_ps(SSEout, _mm_mul_ps(SSEin, SSEtaps));
}
@@ -137,11 +182,10 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
for (; i < sizeIn; i++) {
out[i] = 0.0;
for (int j = 0; i+2*j < sizeIn; j++) {
- out[i] += in[i+2*j] * fwd->taps[j];
+ out[i] += in[i+2*j] * m_taps[j];
}
}
}
- clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_end);
#else
// No SSE ? Loop unrolling should make this faster. As for the SSE,
@@ -153,19 +197,19 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &time_start);
{
- boost::mutex::scoped_lock lock(fwd->taps_mutex);
+ std::lock_guard<std::mutex> lock(m_taps_mutex);
// Convolve by aligning both frame and taps at zero.
- for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 4) {
+ for (i = 0; i < sizeIn - 2*m_taps.size(); i += 4) {
out[i] = 0.0;
out[i+1] = 0.0;
out[i+2] = 0.0;
out[i+3] = 0.0;
- for (size_t j = 0; j < fwd->taps.size(); j++) {
- out[i] += in[i + 2*j] * fwd->taps[j];
- out[i+1] += in[i+1 + 2*j] * fwd->taps[j];
- out[i+2] += in[i+2 + 2*j] * fwd->taps[j];
- out[i+3] += in[i+3 + 2*j] * fwd->taps[j];
+ for (size_t j = 0; j < m_taps.size(); j++) {
+ out[i] += in[i + 2*j] * m_taps[j];
+ out[i+1] += in[i+1 + 2*j] * m_taps[j];
+ out[i+2] += in[i+2 + 2*j] * m_taps[j];
+ out[i+3] += in[i+3 + 2*j] * m_taps[j];
}
}
@@ -175,7 +219,7 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
for (; i < sizeIn; i++) {
out[i] = 0.0;
for (int j = 0; i+2*j < sizeIn; j++) {
- out[i] += in[i+2*j] * fwd->taps[j];
+ out[i] += in[i+2*j] * m_taps[j];
}
}
}
@@ -192,18 +236,20 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
float* out = reinterpret_cast<float*>(dataOut->getData());
size_t sizeIn = dataIn->getLength() / sizeof(float);
- for (i = 0; i < sizeIn - 2*fwd->taps.size(); i += 1) {
+ std::lock_guard<std::mutex> lock(m_taps_mutex);
+
+ for (i = 0; i < sizeIn - 2*m_taps.size(); i += 1) {
out[i] = 0.0;
- for (size_t j = 0; j < fwd->taps.size(); j++) {
- out[i] += in[i+2*j] * fwd->taps[j];
+ for (size_t j = 0; j < m_taps.size(); j++) {
+ out[i] += in[i+2*j] * m_taps[j];
}
}
for (; i < sizeIn; i++) {
out[i] = 0.0;
for (int j = 0; i+2*j < sizeIn; j++) {
- out[i] += in[i+2*j] * fwd->taps[j];
+ out[i] += in[i+2*j] * m_taps[j];
}
}
@@ -214,24 +260,26 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
complexf* out = reinterpret_cast<complexf*>(dataOut->getData());
size_t sizeIn = dataIn->getLength() / sizeof(complexf);
- for (i = 0; i < sizeIn - fwd->taps.size(); i += 4) {
+ std::lock_guard<std::mutex> lock(m_taps_mutex);
+
+ for (i = 0; i < sizeIn - m_taps.size(); i += 4) {
out[i] = 0.0;
out[i+1] = 0.0;
out[i+2] = 0.0;
out[i+3] = 0.0;
- for (size_t j = 0; j < fwd->taps.size(); j++) {
- out[i] += in[i+j ] * fwd->taps[j];
- out[i+1] += in[i+1+j] * fwd->taps[j];
- out[i+2] += in[i+2+j] * fwd->taps[j];
- out[i+3] += in[i+3+j] * fwd->taps[j];
+ for (size_t j = 0; j < m_taps.size(); j++) {
+ out[i] += in[i+j ] * m_taps[j];
+ out[i+1] += in[i+1+j] * m_taps[j];
+ out[i+2] += in[i+2+j] * m_taps[j];
+ out[i+3] += in[i+3+j] * m_taps[j];
}
}
for (; i < sizeIn; i++) {
out[i] = 0.0;
for (int j = 0; j+i < sizeIn; j++) {
- out[i] += in[i+j] * fwd->taps[j];
+ out[i] += in[i+j] * m_taps[j];
}
}
@@ -241,132 +289,25 @@ void FIRFilterWorker::process(struct FIRFilterWorkerData *fwd)
complexf* out = reinterpret_cast<complexf*>(dataOut->getData());
size_t sizeIn = dataIn->getLength() / sizeof(complexf);
- for (i = 0; i < sizeIn - fwd->taps.size(); i += 1) {
+ std::lock_guard<std::mutex> lock(m_taps_mutex);
+
+ for (i = 0; i < sizeIn - m_taps.size(); i += 1) {
out[i] = 0.0;
- for (size_t j = 0; j < fwd->taps.size(); j++) {
- out[i] += in[i+j ] * fwd->taps[j];
+ for (size_t j = 0; j < m_taps.size(); j++) {
+ out[i] += in[i+j ] * m_taps[j];
}
}
for (; i < sizeIn; i++) {
out[i] = 0.0;
for (int j = 0; j+i < sizeIn; j++) {
- out[i] += in[i+j] * fwd->taps[j];
+ out[i] += in[i+j] * m_taps[j];
}
}
#endif
- calculationTime += (time_end.tv_sec - time_start.tv_sec) * 1000000000L +
- time_end.tv_nsec - time_start.tv_nsec;
- fwd->output_queue.push(dataOut);
- }
-}
-
-
-FIRFilter::FIRFilter(const std::string& taps_file) :
- ModCodec(),
- RemoteControllable("firfilter"),
- myTapsFile(taps_file)
-{
- PDEBUG("FIRFilter::FIRFilter(%s) @ %p\n",
- taps_file.c_str(), this);
-
- RC_ADD_PARAMETER(ntaps, "(Read-only) number of filter taps.");
- RC_ADD_PARAMETER(tapsfile, "Filename containing filter taps. When written to, the new file gets automatically loaded.");
-
- number_of_runs = 0;
-
- load_filter_taps(myTapsFile);
-
- PDEBUG("FIRFilter: Starting worker\n" );
- worker.start(&firwd);
-}
-
-void FIRFilter::load_filter_taps(const std::string &tapsFile)
-{
- std::vector<float> filter_taps;
- if (tapsFile == "default") {
- std::copy(default_filter_taps.begin(), default_filter_taps.end(),
- std::back_inserter(filter_taps));
- }
- else {
- std::ifstream taps_fstream(tapsFile.c_str());
- if(!taps_fstream) {
- fprintf(stderr, "FIRFilter: file %s could not be opened !\n", tapsFile.c_str());
- throw std::runtime_error("FIRFilter: Could not open file with taps! ");
- }
- int n_taps;
- taps_fstream >> n_taps;
-
- if (n_taps <= 0) {
- fprintf(stderr, "FIRFilter: warning: taps file has invalid format\n");
- throw std::runtime_error("FIRFilter: taps file has invalid format.");
- }
-
- if (n_taps > 100) {
- fprintf(stderr, "FIRFilter: warning: taps file has more than 100 taps\n");
- }
-
- fprintf(stderr, "FIRFilter: Reading %d taps...\n", n_taps);
-
- filter_taps.resize(n_taps);
-
- int n;
- for (n = 0; n < n_taps; n++) {
- taps_fstream >> filter_taps[n];
- PDEBUG("FIRFilter: tap: %f\n", filter_taps[n] );
- if (taps_fstream.eof()) {
- fprintf(stderr, "FIRFilter: file %s should contains %d taps, but EOF reached "\
- "after %d taps !\n", tapsFile.c_str(), n_taps, n);
- throw std::runtime_error("FIRFilter: filtertaps file invalid ! ");
- }
- }
- }
-
- {
- boost::mutex::scoped_lock lock(firwd.taps_mutex);
-
- firwd.taps = filter_taps;
- }
-}
-
-
-FIRFilter::~FIRFilter()
-{
- PDEBUG("FIRFilter::~FIRFilter() @ %p\n", this);
-
- worker.stop();
-}
-
-
-int FIRFilter::process(Buffer* const dataIn, Buffer* dataOut)
-{
- PDEBUG("FIRFilter::process(dataIn: %p, dataOut: %p)\n",
- dataIn, dataOut);
-
- // This thread creates the dataIn buffer, and deletes
- // the outgoing buffer
-
- std::shared_ptr<Buffer> inbuffer =
- make_shared<Buffer>(dataIn->getLength(), dataIn->getData());
-
- firwd.input_queue.push(inbuffer);
-
- if (number_of_runs > 2) {
- std::shared_ptr<Buffer> outbuffer;
- firwd.output_queue.wait_and_pop(outbuffer);
-
- dataOut->setData(outbuffer->getData(), outbuffer->getLength());
- }
- else {
- dataOut->setLength(dataIn->getLength());
- memset(dataOut->getData(), 0, dataOut->getLength());
- number_of_runs++;
- }
-
return dataOut->getLength();
-
}
void FIRFilter::set_parameter(const string& parameter, const string& value)
@@ -380,7 +321,7 @@ void FIRFilter::set_parameter(const string& parameter, const string& value)
else if (parameter == "tapsfile") {
try {
load_filter_taps(value);
- myTapsFile = value;
+ m_taps_file = value;
}
catch (std::runtime_error &e) {
throw ParameterError(e.what());
@@ -388,7 +329,8 @@ void FIRFilter::set_parameter(const string& parameter, const string& value)
}
else {
stringstream ss;
- ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name();
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
throw ParameterError(ss.str());
}
}
@@ -397,16 +339,16 @@ const string FIRFilter::get_parameter(const string& parameter) const
{
stringstream ss;
if (parameter == "ntaps") {
- ss << firwd.taps.size();
+ ss << m_taps.size();
}
else if (parameter == "tapsfile") {
- ss << myTapsFile;
+ ss << m_taps_file;
}
else {
- ss << "Parameter '" << parameter << "' is not exported by controllable " << get_rc_name();
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
throw ParameterError(ss.str());
}
return ss.str();
-
}
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index 209d79d..1fe0004 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -2,8 +2,10 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Written by
- 2012, Matthias P. Braendli, matthias.braendli@mpb.li
+ Copyright (C) 2017
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -28,7 +30,6 @@
# include <config.h>
#endif
-#include <boost/thread.hpp>
#include "RemoteControl.h"
#include "ModPlugin.h"
@@ -37,6 +38,7 @@
#include <sys/types.h>
#include <complex>
+#include <thread>
#include <vector>
#include <time.h>
#include <cstdio>
@@ -47,62 +49,11 @@
typedef std::complex<float> complexf;
-struct FIRFilterWorkerData {
- /* Thread-safe queues to give data to and get data from
- * the worker
- */
- ThreadsafeQueue<std::shared_ptr<Buffer> > input_queue;
- ThreadsafeQueue<std::shared_ptr<Buffer> > output_queue;
-
- /* Remote-control can change the taps while the filter
- * runs. This lock makes sure nothing bad happens when
- * the taps are being modified
- */
- mutable boost::mutex taps_mutex;
- std::vector<float> taps;
-};
-
-class FIRFilterWorker {
- public:
- FIRFilterWorker () {
- running = false;
- calculationTime = 0;
- }
-
- ~FIRFilterWorker() {
- PDEBUG("~FIRFilterWorker: Total elapsed thread time filtering: %zu\n", calculationTime);
- }
-
- void start(struct FIRFilterWorkerData *firworkerdata) {
- running = true;
- fir_thread = boost::thread(&FIRFilterWorker::process, this, firworkerdata);
- }
-
- void stop() {
- running = false;
- fir_thread.interrupt();
- fir_thread.join();
- }
-
- void process(struct FIRFilterWorkerData *fwd);
-
-
- private:
- time_t calculationTime;
- bool running;
- boost::thread fir_thread;
-};
-
-
-class FIRFilter : public ModCodec, public RemoteControllable
+class FIRFilter : public PipelinedModCodec, public RemoteControllable
{
public:
FIRFilter(const std::string& taps_file);
- virtual ~FIRFilter();
- FIRFilter(const FIRFilter&);
- FIRFilter& operator=(const FIRFilter&);
- int process(Buffer* const dataIn, Buffer* dataOut);
const char* name() { return "FIRFilter"; }
/******* REMOTE CONTROL ********/
@@ -114,12 +65,12 @@ public:
protected:
+ int internal_process(Buffer* const dataIn, Buffer* dataOut);
void load_filter_taps(const std::string &tapsFile);
- std::string myTapsFile;
+ std::string m_taps_file;
- FIRFilterWorker worker;
- int number_of_runs;
- struct FIRFilterWorkerData firwd;
+ mutable std::mutex m_taps_mutex;
+ std::vector<float> m_taps;
};