aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp5
-rw-r--r--src/ConfigParser.h2
-rw-r--r--src/DabModulator.cpp9
-rw-r--r--src/MemlessPoly.cpp203
-rw-r--r--src/MemlessPoly.h12
-rw-r--r--src/OutputUHDFeedback.cpp275
-rw-r--r--src/OutputUHDFeedback.h1
7 files changed, 298 insertions, 209 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 459811f..9ac1280 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -171,7 +171,10 @@ static void parse_configfile(
// Poly coefficients:
if (pt.get("poly.enabled", 0) == 1) {
mod_settings.polyCoefFilename =
- pt.get<std::string>("poly.polycoeffile", "default");
+ pt.get<std::string>("poly.polycoeffile", "dpd/poly.coef");
+
+ mod_settings.polyNumThreads =
+ pt.get<int>("poly.num_threads", 0);
}
// Output options
diff --git a/src/ConfigParser.h b/src/ConfigParser.h
index 22a4fc5..89f0fb7 100644
--- a/src/ConfigParser.h
+++ b/src/ConfigParser.h
@@ -75,7 +75,7 @@ struct mod_settings_t {
std::string filterTapsFilename = "";
std::string polyCoefFilename = "";
-
+ unsigned polyNumThreads = 0;
#if defined(HAVE_OUTPUT_UHD)
OutputUHDConfig outputuhd_conf;
diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp
index 34d8e66..cc2642a 100644
--- a/src/DabModulator.cpp
+++ b/src/DabModulator.cpp
@@ -205,13 +205,8 @@ int DabModulator::process(Buffer* dataOut)
shared_ptr<MemlessPoly> cifPoly;
if (not m_settings.polyCoefFilename.empty()) {
- cifPoly = make_shared<MemlessPoly>(m_settings.polyCoefFilename);
- etiLog.level(debug) << m_settings.polyCoefFilename << "\n";
- etiLog.level(debug) << cifPoly->m_coefs[0] << " " <<
- cifPoly->m_coefs[1] << " "<< cifPoly->m_coefs[2] << " "<<
- cifPoly->m_coefs[3] << " "<< cifPoly->m_coefs[4] << " "<<
- cifPoly->m_coefs[5] << " "<< cifPoly->m_coefs[6] << " "<<
- cifPoly->m_coefs[7] << "\n";
+ cifPoly = make_shared<MemlessPoly>(m_settings.polyCoefFilename,
+ m_settings.polyNumThreads);
rcs.enrol(cifPoly.get());
}
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp
index 7e074eb..d5188f2 100644
--- a/src/MemlessPoly.cpp
+++ b/src/MemlessPoly.cpp
@@ -29,6 +29,8 @@
along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
*/
+#pragma GCC optimize ("O3")
+
#include "MemlessPoly.h"
#include "PcDebug.h"
#include "Utils.h"
@@ -36,31 +38,43 @@
#include <stdio.h>
#include <stdexcept>
+#include <future>
#include <array>
#include <iostream>
#include <fstream>
#include <memory>
+#include <complex>
using namespace std;
+// Number of AM/AM coefs, identical to number of AM/PM coefs
+#define NUM_COEFS 5
-// By default the signal is unchanged
-static const std::array<float, 8> default_coefficients({
- 1, 0.0, 0.0, 0.0,
- 0.0, 0.0, 0.0, 0.0
- });
-
-
-MemlessPoly::MemlessPoly(const std::string& coefs_file) :
+MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) :
PipelinedModCodec(),
RemoteControllable("memlesspoly"),
- m_coefs_file(coefs_file)
+ m_num_threads(num_threads),
+ m_coefs_am(),
+ m_coefs_pm(),
+ m_coefs_file(coefs_file),
+ m_coefs_mutex()
{
PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n",
coefs_file.c_str(), this);
+ if (m_num_threads == 0) {
+ const unsigned int hw_concurrency = std::thread::hardware_concurrency();
+ etiLog.level(info) << "Polynomial Predistorter will use " <<
+ hw_concurrency << " threads (auto detected)";
+ }
+ else {
+ etiLog.level(info) << "Polynomial Predistorter will use " <<
+ m_num_threads << " threads (set in config file)";
+ }
+
RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients.");
- RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. When written to, the new file gets automatically loaded.");
+ RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. "
+ "When set, the file gets loaded.");
load_coefficients(m_coefs_file);
@@ -69,76 +83,141 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file) :
void MemlessPoly::load_coefficients(const std::string &coefFile)
{
- std::vector<float> coefs;
- if (coefFile == "default") {
- std::copy(default_coefficients.begin(), default_coefficients.end(),
- std::back_inserter(coefs));
+ std::vector<float> coefs_am;
+ std::vector<float> coefs_pm;
+ std::ifstream coef_fstream(coefFile.c_str());
+ if (!coef_fstream) {
+ throw std::runtime_error("MemlessPoly: Could not open file with coefs!");
}
- else {
- std::ifstream coef_fstream(coefFile.c_str());
- if(!coef_fstream) {
- fprintf(stderr, "MemlessPoly: file %s could not be opened !\n", coefFile.c_str());
- throw std::runtime_error("MemlessPoly: Could not open file with coefs! ");
- }
- int n_coefs;
- coef_fstream >> n_coefs;
+ int n_coefs;
+ coef_fstream >> n_coefs;
- if (n_coefs <= 0) {
- fprintf(stderr, "MemlessPoly: warning: coefs file has invalid format\n");
- throw std::runtime_error("MemlessPoly: coefs file has invalid format.");
- }
+ if (n_coefs <= 0) {
+ throw std::runtime_error("MemlessPoly: coefs file has invalid format.");
+ }
+ else if (n_coefs != NUM_COEFS) {
+ throw std::runtime_error("MemlessPoly: invalid number of coefs: " +
+ std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS));
+ }
- if (n_coefs != 8) {
- throw std::runtime_error( "MemlessPoly: error: coefs file does not have 8 coefs\n");
- }
+ const size_t n_entries = 2 * n_coefs;
- fprintf(stderr, "MemlessPoly: Reading %d coefs...\n", n_coefs);
+ etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries);
- coefs.resize(n_coefs);
+ coefs_am.resize(n_coefs);
+ coefs_pm.resize(n_coefs);
- int n;
- for (n = 0; n < n_coefs; n++) {
- coef_fstream >> coefs[n];
- PDEBUG("MemlessPoly: coef: %f\n", coefs[n] );
- if (coef_fstream.eof()) {
- fprintf(stderr, "MemlessPoly: file %s should contains %d coefs, but EOF reached "\
- "after %d coefs !\n", coefFile.c_str(), n_coefs, n);
- throw std::runtime_error("MemlessPoly: coefs file invalid ! ");
- }
+ for (int n = 0; n < n_entries; n++) {
+ float a;
+ coef_fstream >> a;
+
+ if (n < n_coefs) {
+ coefs_am[n] = a;
+ }
+ else {
+ coefs_pm[n - n_coefs] = a;
+ }
+
+ if (coef_fstream.eof()) {
+ etiLog.log(error, "MemlessPoly: file %s should contains %d coefs, "
+ "but EOF reached after %d coefs !",
+ coefFile.c_str(), n_entries, n);
+ throw std::runtime_error("MemlessPoly: coefs file invalid !");
}
}
{
std::lock_guard<std::mutex> lock(m_coefs_mutex);
- m_coefs = coefs;
+ m_coefs_am = coefs_am;
+ m_coefs_pm = coefs_pm;
}
}
+/* The restrict keyword is C99, g++ and clang++ however support __restrict
+ * instead, and this allows the compiler to auto-vectorize the loop.
+ */
+static void apply_coeff(
+ const vector<float> &coefs_am, const vector<float> &coefs_pm,
+ const complexf *__restrict in, size_t start, size_t stop,
+ complexf *__restrict out)
+{
+ for (size_t i = start; i < stop; i+=1) {
+
+ float in_mag_sq = in[i].real() * in[i].real() + in[i].imag() * in[i].imag();
+
+ float amplitude_correction =
+ ( coefs_am[0] + in_mag_sq *
+ ( coefs_am[1] + in_mag_sq *
+ ( coefs_am[2] + in_mag_sq *
+ ( coefs_am[3] + in_mag_sq *
+ coefs_am[4]))));
+
+ float phase_correction = -1 *
+ ( coefs_pm[0] + in_mag_sq *
+ ( coefs_pm[1] + in_mag_sq *
+ ( coefs_pm[2] + in_mag_sq *
+ ( coefs_pm[3] + in_mag_sq *
+ coefs_pm[4]))));
+
+ float phase_correction_sq = phase_correction * phase_correction;
+
+ // Approximation for Cosinus 1 - 1/2 x^2 + 1/24 x^4 - 1/720 x^6
+ float re = (1.0f - phase_correction_sq *
+ ( -0.5f + phase_correction_sq *
+ ( 0.486666f + phase_correction_sq *
+ ( -0.00138888f))));
+
+ // Approximation for Sinus x + 1/6 x^3 + 1/120 x^5
+ float im = phase_correction *
+ (1.0f + phase_correction_sq *
+ (0.166666f + phase_correction_sq *
+ (0.00833333f)));
+
+ out[i] = in[i] * amplitude_correction * complex<float>(re, im);
+ }
+}
int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)
{
- const float* in = reinterpret_cast<const float*>(dataIn->getData());
- float* out = reinterpret_cast<float*>(dataOut->getData());
- size_t sizeIn = dataIn->getLength() / sizeof(float);
-
- {
- std::lock_guard<std::mutex> lock(m_coefs_mutex);
- for (size_t i = 0; i < sizeIn; i += 1) {
- float mag = std::abs(in[i]);
- //out[i] = in[i];
- out[i] = in[i] * (
- m_coefs[0] +
- m_coefs[1] * mag +
- m_coefs[2] * mag*mag +
- m_coefs[3] * mag*mag*mag +
- m_coefs[4] * mag*mag*mag*mag +
- m_coefs[5] * mag*mag*mag*mag*mag +
- m_coefs[6] * mag*mag*mag*mag*mag*mag +
- m_coefs[7] * mag*mag*mag*mag*mag*mag*mag
- );
+ dataOut->setLength(dataIn->getLength());
+
+ const complexf* in = reinterpret_cast<const complexf*>(dataIn->getData());
+ complexf* out = reinterpret_cast<complexf*>(dataOut->getData());
+ size_t sizeOut = dataOut->getLength() / sizeof(complexf);
+
+ {
+ std::lock_guard<std::mutex> lock(m_coefs_mutex);
+ const unsigned int hw_concurrency = std::thread::hardware_concurrency();
+
+ const unsigned int num_threads =
+ (m_num_threads > 0) ? m_num_threads : hw_concurrency;
+
+ if (num_threads) {
+ const size_t step = sizeOut / num_threads;
+ vector<future<void> > flags;
+
+ size_t start = 0;
+ for (size_t i = 0; i < num_threads - 1; i++) {
+ flags.push_back(async(launch::async, apply_coeff,
+ m_coefs_am, m_coefs_pm,
+ in, start, start + step, out));
+
+ start += step;
}
+
+ // Do the last in this thread
+ apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out);
+
+ // Wait for completion of the tasks
+ for (auto& f : flags) {
+ f.get();
+ }
+ }
+ else {
+ apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out);
}
+ }
return dataOut->getLength();
}
@@ -172,9 +251,9 @@ const string MemlessPoly::get_parameter(const string& parameter) const
{
stringstream ss;
if (parameter == "ncoefs") {
- ss << m_coefs.size();
+ ss << m_coefs_am.size();
}
- else if (parameter == "coefFile") {
+ else if (parameter == "coeffile") {
ss << m_coefs_file;
}
else {
diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h
index 210b4b4..4dcd44a 100644
--- a/src/MemlessPoly.h
+++ b/src/MemlessPoly.h
@@ -52,7 +52,7 @@ typedef std::complex<float> complexf;
class MemlessPoly : public PipelinedModCodec, public RemoteControllable
{
public:
- MemlessPoly(const std::string& coefs_file);
+ MemlessPoly(const std::string& coefs_file, unsigned int num_threads);
virtual const char* name() { return "MemlessPoly"; }
@@ -63,16 +63,14 @@ public:
virtual const std::string get_parameter(
const std::string& parameter) const;
-//TODO to protected
- std::vector<float> m_coefs;
-
-
-protected:
+private:
int internal_process(Buffer* const dataIn, Buffer* dataOut);
void load_coefficients(const std::string &coefFile);
+ unsigned int m_num_threads;
+ std::vector<float> m_coefs_am; // AM/AM coefficients
+ std::vector<float> m_coefs_pm; // AM/PM coefficients
std::string m_coefs_file;
-
mutable std::mutex m_coefs_mutex;
};
diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp
index 2a99e6b..a8f2c2e 100644
--- a/src/OutputUHDFeedback.cpp
+++ b/src/OutputUHDFeedback.cpp
@@ -43,6 +43,7 @@ DESCRIPTION:
#include <sys/socket.h>
#include <errno.h>
#include <poll.h>
+#include <boost/date_time/posix_time/posix_time.hpp>
#include "OutputUHDFeedback.h"
#include "Utils.h"
@@ -218,152 +219,164 @@ static ssize_t sendall(int socket, const void *buffer, size_t buflen)
return buflen;
}
-void OutputUHDFeedback::ServeFeedbackThread()
+void OutputUHDFeedback::ServeFeedback()
{
- set_thread_name("uhdservefeedback");
+ if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
+ throw std::runtime_error("Can't create TCP socket");
+ }
+
+ struct sockaddr_in addr;
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(m_port);
+ addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
+ close(m_server_sock);
+ throw std::runtime_error("Can't bind TCP socket");
+ }
+
+ if (listen(m_server_sock, 1) < 0) {
+ close(m_server_sock);
+ throw std::runtime_error("Can't listen TCP socket");
+ }
+
+ etiLog.level(info) << "DPD Feedback server listening on port " << m_port;
+
+ while (m_running) {
+ struct sockaddr_in client;
+ int client_sock = accept_with_timeout(m_server_sock, 1000, &client);
- try {
- if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {
- throw std::runtime_error("Can't create TCP socket");
+ if (client_sock == -1) {
+ close(m_server_sock);
+ throw runtime_error("Could not establish new connection");
+ }
+ else if (client_sock == -2) {
+ continue;
}
- struct sockaddr_in addr;
- addr.sin_family = AF_INET;
- addr.sin_port = htons(m_port);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ uint8_t request_version = 0;
+ ssize_t read = recv(client_sock, &request_version, 1, 0);
+ if (!read) break; // done reading
+ if (read < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client read request version failed: " << strerror(errno);
+ break;
+ }
- if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
- throw std::runtime_error("Can't bind TCP socket");
+ if (request_version != 1) {
+ etiLog.level(info) << "DPD Feedback Server wrong request version";
+ break;
}
- if (listen(m_server_sock, 1) < 0) {
- throw std::runtime_error("Can't listen TCP socket");
+ uint32_t num_samples = 0;
+ read = recv(client_sock, &num_samples, 4, 0);
+ if (!read) break; // done reading
+ if (read < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client read num samples failed";
+ break;
}
- etiLog.level(info) << "DPD Feedback server listening on port " << m_port;
-
- while (m_running) {
- struct sockaddr_in client;
- int client_sock = accept_with_timeout(m_server_sock, 1000, &client);
-
- if (client_sock == -1) {
- throw runtime_error("Could not establish new connection");
- }
- else if (client_sock == -2) {
- continue;
- }
-
- uint8_t request_version = 0;
- ssize_t read = recv(client_sock, &request_version, 1, 0);
- if (!read) break; // done reading
- if (read < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client read request version failed: " << strerror(errno);
- break;
- }
-
- if (request_version != 1) {
- etiLog.level(info) << "DPD Feedback Server wrong request version";
- break;
- }
-
- uint32_t num_samples = 0;
- read = recv(client_sock, &num_samples, 4, 0);
- if (!read) break; // done reading
- if (read < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client read num samples failed";
- break;
- }
-
- // We are ready to issue the request now
- {
- boost::mutex::scoped_lock lock(burstRequest.mutex);
- burstRequest.num_samples = num_samples;
- burstRequest.state = BurstRequestState::SaveTransmitFrame;
-
- lock.unlock();
- }
-
- // Wait for the result to be ready
+ // We are ready to issue the request now
+ {
boost::mutex::scoped_lock lock(burstRequest.mutex);
- while (burstRequest.state != BurstRequestState::Acquired) {
- if (not m_running) break;
- burstRequest.mutex_notification.wait(lock);
- }
+ burstRequest.num_samples = num_samples;
+ burstRequest.state = BurstRequestState::SaveTransmitFrame;
- burstRequest.state = BurstRequestState::None;
lock.unlock();
+ }
+
+ // Wait for the result to be ready
+ boost::mutex::scoped_lock lock(burstRequest.mutex);
+ while (burstRequest.state != BurstRequestState::Acquired) {
+ if (not m_running) break;
+ burstRequest.mutex_notification.wait(lock);
+ }
+
+ burstRequest.state = BurstRequestState::None;
+ lock.unlock();
+
+ burstRequest.num_samples = std::min(burstRequest.num_samples,
+ std::min(
+ burstRequest.tx_samples.size() / sizeof(complexf),
+ burstRequest.rx_samples.size() / sizeof(complexf)));
- burstRequest.num_samples = std::min(burstRequest.num_samples,
- std::min(
- burstRequest.tx_samples.size() / sizeof(complexf),
- burstRequest.rx_samples.size() / sizeof(complexf)));
-
- uint32_t num_samples_32 = burstRequest.num_samples;
- if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send num_samples failed";
- break;
- }
-
- if (sendall(client_sock,
- &burstRequest.tx_second,
- sizeof(burstRequest.tx_second)) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send tx_second failed";
- break;
- }
-
- if (sendall(client_sock,
- &burstRequest.tx_pps,
- sizeof(burstRequest.tx_pps)) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send tx_pps failed";
- break;
- }
-
- const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf);
-
- assert(burstRequest.tx_samples.size() >= frame_bytes);
- if (sendall(client_sock,
- &burstRequest.tx_samples[0],
- frame_bytes) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send tx_frame failed";
- break;
- }
-
- if (sendall(client_sock,
- &burstRequest.rx_second,
- sizeof(burstRequest.rx_second)) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send rx_second failed";
- break;
- }
-
- if (sendall(client_sock,
- &burstRequest.rx_pps,
- sizeof(burstRequest.rx_pps)) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send rx_pps failed";
- break;
- }
-
- assert(burstRequest.rx_samples.size() >= frame_bytes);
- if (sendall(client_sock,
- &burstRequest.rx_samples[0],
- frame_bytes) < 0) {
- etiLog.level(info) <<
- "DPD Feedback Server Client send rx_frame failed";
- break;
- }
-
- close(client_sock);
+ uint32_t num_samples_32 = burstRequest.num_samples;
+ if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send num_samples failed";
+ break;
}
+
+ if (sendall(client_sock,
+ &burstRequest.tx_second,
+ sizeof(burstRequest.tx_second)) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send tx_second failed";
+ break;
+ }
+
+ if (sendall(client_sock,
+ &burstRequest.tx_pps,
+ sizeof(burstRequest.tx_pps)) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send tx_pps failed";
+ break;
+ }
+
+ const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf);
+
+ assert(burstRequest.tx_samples.size() >= frame_bytes);
+ if (sendall(client_sock,
+ &burstRequest.tx_samples[0],
+ frame_bytes) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send tx_frame failed";
+ break;
+ }
+
+ if (sendall(client_sock,
+ &burstRequest.rx_second,
+ sizeof(burstRequest.rx_second)) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send rx_second failed";
+ break;
+ }
+
+ if (sendall(client_sock,
+ &burstRequest.rx_pps,
+ sizeof(burstRequest.rx_pps)) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send rx_pps failed";
+ break;
+ }
+
+ assert(burstRequest.rx_samples.size() >= frame_bytes);
+ if (sendall(client_sock,
+ &burstRequest.rx_samples[0],
+ frame_bytes) < 0) {
+ etiLog.level(info) <<
+ "DPD Feedback Server Client send rx_frame failed";
+ break;
+ }
+
+ close(client_sock);
}
- catch (runtime_error &e) {
- etiLog.level(error) << "DPD Feedback Server fault: " << e.what();
+}
+
+void OutputUHDFeedback::ServeFeedbackThread()
+{
+ set_thread_name("uhdservefeedback");
+
+ while (m_running) {
+ try {
+ ServeFeedback();
+ }
+ catch (runtime_error &e) {
+ etiLog.level(error) << "DPD Feedback Server fault: " << e.what();
+ }
+
+ boost::this_thread::sleep(boost::posix_time::seconds(5));
}
m_running = false;
diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h
index 32668b6..c68f4c2 100644
--- a/src/OutputUHDFeedback.h
+++ b/src/OutputUHDFeedback.h
@@ -101,6 +101,7 @@ class OutputUHDFeedback {
// Thread that listens for requests over TCP to get TX and RX feedback
void ServeFeedbackThread(void);
+ void ServeFeedback(void);
boost::thread rx_burst_thread;
boost::thread burst_tcp_thread;