diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/ConfigParser.cpp | 5 | ||||
-rw-r--r-- | src/ConfigParser.h | 2 | ||||
-rw-r--r-- | src/DabModulator.cpp | 9 | ||||
-rw-r--r-- | src/MemlessPoly.cpp | 203 | ||||
-rw-r--r-- | src/MemlessPoly.h | 12 | ||||
-rw-r--r-- | src/OutputUHDFeedback.cpp | 275 | ||||
-rw-r--r-- | src/OutputUHDFeedback.h | 1 |
7 files changed, 298 insertions, 209 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 459811f..9ac1280 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -171,7 +171,10 @@ static void parse_configfile( // Poly coefficients: if (pt.get("poly.enabled", 0) == 1) { mod_settings.polyCoefFilename = - pt.get<std::string>("poly.polycoeffile", "default"); + pt.get<std::string>("poly.polycoeffile", "dpd/poly.coef"); + + mod_settings.polyNumThreads = + pt.get<int>("poly.num_threads", 0); } // Output options diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 22a4fc5..89f0fb7 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -75,7 +75,7 @@ struct mod_settings_t { std::string filterTapsFilename = ""; std::string polyCoefFilename = ""; - + unsigned polyNumThreads = 0; #if defined(HAVE_OUTPUT_UHD) OutputUHDConfig outputuhd_conf; diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 34d8e66..cc2642a 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -205,13 +205,8 @@ int DabModulator::process(Buffer* dataOut) shared_ptr<MemlessPoly> cifPoly; if (not m_settings.polyCoefFilename.empty()) { - cifPoly = make_shared<MemlessPoly>(m_settings.polyCoefFilename); - etiLog.level(debug) << m_settings.polyCoefFilename << "\n"; - etiLog.level(debug) << cifPoly->m_coefs[0] << " " << - cifPoly->m_coefs[1] << " "<< cifPoly->m_coefs[2] << " "<< - cifPoly->m_coefs[3] << " "<< cifPoly->m_coefs[4] << " "<< - cifPoly->m_coefs[5] << " "<< cifPoly->m_coefs[6] << " "<< - cifPoly->m_coefs[7] << "\n"; + cifPoly = make_shared<MemlessPoly>(m_settings.polyCoefFilename, + m_settings.polyNumThreads); rcs.enrol(cifPoly.get()); } diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index 7e074eb..d5188f2 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -29,6 +29,8 @@ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. */ +#pragma GCC optimize ("O3") + #include "MemlessPoly.h" #include "PcDebug.h" #include "Utils.h" @@ -36,31 +38,43 @@ #include <stdio.h> #include <stdexcept> +#include <future> #include <array> #include <iostream> #include <fstream> #include <memory> +#include <complex> using namespace std; +// Number of AM/AM coefs, identical to number of AM/PM coefs +#define NUM_COEFS 5 -// By default the signal is unchanged -static const std::array<float, 8> default_coefficients({ - 1, 0.0, 0.0, 0.0, - 0.0, 0.0, 0.0, 0.0 - }); - - -MemlessPoly::MemlessPoly(const std::string& coefs_file) : +MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) : PipelinedModCodec(), RemoteControllable("memlesspoly"), - m_coefs_file(coefs_file) + m_num_threads(num_threads), + m_coefs_am(), + m_coefs_pm(), + m_coefs_file(coefs_file), + m_coefs_mutex() { PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n", coefs_file.c_str(), this); + if (m_num_threads == 0) { + const unsigned int hw_concurrency = std::thread::hardware_concurrency(); + etiLog.level(info) << "Polynomial Predistorter will use " << + hw_concurrency << " threads (auto detected)"; + } + else { + etiLog.level(info) << "Polynomial Predistorter will use " << + m_num_threads << " threads (set in config file)"; + } + RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); - RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. When written to, the new file gets automatically loaded."); + RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " + "When set, the file gets loaded."); load_coefficients(m_coefs_file); @@ -69,76 +83,141 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file) : void MemlessPoly::load_coefficients(const std::string &coefFile) { - std::vector<float> coefs; - if (coefFile == "default") { - std::copy(default_coefficients.begin(), default_coefficients.end(), - std::back_inserter(coefs)); + std::vector<float> coefs_am; + std::vector<float> coefs_pm; + std::ifstream coef_fstream(coefFile.c_str()); + if (!coef_fstream) { + throw std::runtime_error("MemlessPoly: Could not open file with coefs!"); } - else { - std::ifstream coef_fstream(coefFile.c_str()); - if(!coef_fstream) { - fprintf(stderr, "MemlessPoly: file %s could not be opened !\n", coefFile.c_str()); - throw std::runtime_error("MemlessPoly: Could not open file with coefs! "); - } - int n_coefs; - coef_fstream >> n_coefs; + int n_coefs; + coef_fstream >> n_coefs; - if (n_coefs <= 0) { - fprintf(stderr, "MemlessPoly: warning: coefs file has invalid format\n"); - throw std::runtime_error("MemlessPoly: coefs file has invalid format."); - } + if (n_coefs <= 0) { + throw std::runtime_error("MemlessPoly: coefs file has invalid format."); + } + else if (n_coefs != NUM_COEFS) { + throw std::runtime_error("MemlessPoly: invalid number of coefs: " + + std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS)); + } - if (n_coefs != 8) { - throw std::runtime_error( "MemlessPoly: error: coefs file does not have 8 coefs\n"); - } + const size_t n_entries = 2 * n_coefs; - fprintf(stderr, "MemlessPoly: Reading %d coefs...\n", n_coefs); + etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries); - coefs.resize(n_coefs); + coefs_am.resize(n_coefs); + coefs_pm.resize(n_coefs); - int n; - for (n = 0; n < n_coefs; n++) { - coef_fstream >> coefs[n]; - PDEBUG("MemlessPoly: coef: %f\n", coefs[n] ); - if (coef_fstream.eof()) { - fprintf(stderr, "MemlessPoly: file %s should contains %d coefs, but EOF reached "\ - "after %d coefs !\n", coefFile.c_str(), n_coefs, n); - throw std::runtime_error("MemlessPoly: coefs file invalid ! "); - } + for (int n = 0; n < n_entries; n++) { + float a; + coef_fstream >> a; + + if (n < n_coefs) { + coefs_am[n] = a; + } + else { + coefs_pm[n - n_coefs] = a; + } + + if (coef_fstream.eof()) { + etiLog.log(error, "MemlessPoly: file %s should contains %d coefs, " + "but EOF reached after %d coefs !", + coefFile.c_str(), n_entries, n); + throw std::runtime_error("MemlessPoly: coefs file invalid !"); } } { std::lock_guard<std::mutex> lock(m_coefs_mutex); - m_coefs = coefs; + m_coefs_am = coefs_am; + m_coefs_pm = coefs_pm; } } +/* The restrict keyword is C99, g++ and clang++ however support __restrict + * instead, and this allows the compiler to auto-vectorize the loop. + */ +static void apply_coeff( + const vector<float> &coefs_am, const vector<float> &coefs_pm, + const complexf *__restrict in, size_t start, size_t stop, + complexf *__restrict out) +{ + for (size_t i = start; i < stop; i+=1) { + + float in_mag_sq = in[i].real() * in[i].real() + in[i].imag() * in[i].imag(); + + float amplitude_correction = + ( coefs_am[0] + in_mag_sq * + ( coefs_am[1] + in_mag_sq * + ( coefs_am[2] + in_mag_sq * + ( coefs_am[3] + in_mag_sq * + coefs_am[4])))); + + float phase_correction = -1 * + ( coefs_pm[0] + in_mag_sq * + ( coefs_pm[1] + in_mag_sq * + ( coefs_pm[2] + in_mag_sq * + ( coefs_pm[3] + in_mag_sq * + coefs_pm[4])))); + + float phase_correction_sq = phase_correction * phase_correction; + + // Approximation for Cosinus 1 - 1/2 x^2 + 1/24 x^4 - 1/720 x^6 + float re = (1.0f - phase_correction_sq * + ( -0.5f + phase_correction_sq * + ( 0.486666f + phase_correction_sq * + ( -0.00138888f)))); + + // Approximation for Sinus x + 1/6 x^3 + 1/120 x^5 + float im = phase_correction * + (1.0f + phase_correction_sq * + (0.166666f + phase_correction_sq * + (0.00833333f))); + + out[i] = in[i] * amplitude_correction * complex<float>(re, im); + } +} int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { - const float* in = reinterpret_cast<const float*>(dataIn->getData()); - float* out = reinterpret_cast<float*>(dataOut->getData()); - size_t sizeIn = dataIn->getLength() / sizeof(float); - - { - std::lock_guard<std::mutex> lock(m_coefs_mutex); - for (size_t i = 0; i < sizeIn; i += 1) { - float mag = std::abs(in[i]); - //out[i] = in[i]; - out[i] = in[i] * ( - m_coefs[0] + - m_coefs[1] * mag + - m_coefs[2] * mag*mag + - m_coefs[3] * mag*mag*mag + - m_coefs[4] * mag*mag*mag*mag + - m_coefs[5] * mag*mag*mag*mag*mag + - m_coefs[6] * mag*mag*mag*mag*mag*mag + - m_coefs[7] * mag*mag*mag*mag*mag*mag*mag - ); + dataOut->setLength(dataIn->getLength()); + + const complexf* in = reinterpret_cast<const complexf*>(dataIn->getData()); + complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); + size_t sizeOut = dataOut->getLength() / sizeof(complexf); + + { + std::lock_guard<std::mutex> lock(m_coefs_mutex); + const unsigned int hw_concurrency = std::thread::hardware_concurrency(); + + const unsigned int num_threads = + (m_num_threads > 0) ? m_num_threads : hw_concurrency; + + if (num_threads) { + const size_t step = sizeOut / num_threads; + vector<future<void> > flags; + + size_t start = 0; + for (size_t i = 0; i < num_threads - 1; i++) { + flags.push_back(async(launch::async, apply_coeff, + m_coefs_am, m_coefs_pm, + in, start, start + step, out)); + + start += step; } + + // Do the last in this thread + apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out); + + // Wait for completion of the tasks + for (auto& f : flags) { + f.get(); + } + } + else { + apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out); } + } return dataOut->getLength(); } @@ -172,9 +251,9 @@ const string MemlessPoly::get_parameter(const string& parameter) const { stringstream ss; if (parameter == "ncoefs") { - ss << m_coefs.size(); + ss << m_coefs_am.size(); } - else if (parameter == "coefFile") { + else if (parameter == "coeffile") { ss << m_coefs_file; } else { diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 210b4b4..4dcd44a 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -52,7 +52,7 @@ typedef std::complex<float> complexf; class MemlessPoly : public PipelinedModCodec, public RemoteControllable { public: - MemlessPoly(const std::string& coefs_file); + MemlessPoly(const std::string& coefs_file, unsigned int num_threads); virtual const char* name() { return "MemlessPoly"; } @@ -63,16 +63,14 @@ public: virtual const std::string get_parameter( const std::string& parameter) const; -//TODO to protected - std::vector<float> m_coefs; - - -protected: +private: int internal_process(Buffer* const dataIn, Buffer* dataOut); void load_coefficients(const std::string &coefFile); + unsigned int m_num_threads; + std::vector<float> m_coefs_am; // AM/AM coefficients + std::vector<float> m_coefs_pm; // AM/PM coefficients std::string m_coefs_file; - mutable std::mutex m_coefs_mutex; }; diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index 2a99e6b..a8f2c2e 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -43,6 +43,7 @@ DESCRIPTION: #include <sys/socket.h> #include <errno.h> #include <poll.h> +#include <boost/date_time/posix_time/posix_time.hpp> #include "OutputUHDFeedback.h" #include "Utils.h" @@ -218,152 +219,164 @@ static ssize_t sendall(int socket, const void *buffer, size_t buflen) return buflen; } -void OutputUHDFeedback::ServeFeedbackThread() +void OutputUHDFeedback::ServeFeedback() { - set_thread_name("uhdservefeedback"); + if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { + throw std::runtime_error("Can't create TCP socket"); + } + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(m_port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + close(m_server_sock); + throw std::runtime_error("Can't bind TCP socket"); + } + + if (listen(m_server_sock, 1) < 0) { + close(m_server_sock); + throw std::runtime_error("Can't listen TCP socket"); + } + + etiLog.level(info) << "DPD Feedback server listening on port " << m_port; + + while (m_running) { + struct sockaddr_in client; + int client_sock = accept_with_timeout(m_server_sock, 1000, &client); - try { - if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { - throw std::runtime_error("Can't create TCP socket"); + if (client_sock == -1) { + close(m_server_sock); + throw runtime_error("Could not establish new connection"); + } + else if (client_sock == -2) { + continue; } - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(m_port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); + uint8_t request_version = 0; + ssize_t read = recv(client_sock, &request_version, 1, 0); + if (!read) break; // done reading + if (read < 0) { + etiLog.level(info) << + "DPD Feedback Server Client read request version failed: " << strerror(errno); + break; + } - if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { - throw std::runtime_error("Can't bind TCP socket"); + if (request_version != 1) { + etiLog.level(info) << "DPD Feedback Server wrong request version"; + break; } - if (listen(m_server_sock, 1) < 0) { - throw std::runtime_error("Can't listen TCP socket"); + uint32_t num_samples = 0; + read = recv(client_sock, &num_samples, 4, 0); + if (!read) break; // done reading + if (read < 0) { + etiLog.level(info) << + "DPD Feedback Server Client read num samples failed"; + break; } - etiLog.level(info) << "DPD Feedback server listening on port " << m_port; - - while (m_running) { - struct sockaddr_in client; - int client_sock = accept_with_timeout(m_server_sock, 1000, &client); - - if (client_sock == -1) { - throw runtime_error("Could not establish new connection"); - } - else if (client_sock == -2) { - continue; - } - - uint8_t request_version = 0; - ssize_t read = recv(client_sock, &request_version, 1, 0); - if (!read) break; // done reading - if (read < 0) { - etiLog.level(info) << - "DPD Feedback Server Client read request version failed: " << strerror(errno); - break; - } - - if (request_version != 1) { - etiLog.level(info) << "DPD Feedback Server wrong request version"; - break; - } - - uint32_t num_samples = 0; - read = recv(client_sock, &num_samples, 4, 0); - if (!read) break; // done reading - if (read < 0) { - etiLog.level(info) << - "DPD Feedback Server Client read num samples failed"; - break; - } - - // We are ready to issue the request now - { - boost::mutex::scoped_lock lock(burstRequest.mutex); - burstRequest.num_samples = num_samples; - burstRequest.state = BurstRequestState::SaveTransmitFrame; - - lock.unlock(); - } - - // Wait for the result to be ready + // We are ready to issue the request now + { boost::mutex::scoped_lock lock(burstRequest.mutex); - while (burstRequest.state != BurstRequestState::Acquired) { - if (not m_running) break; - burstRequest.mutex_notification.wait(lock); - } + burstRequest.num_samples = num_samples; + burstRequest.state = BurstRequestState::SaveTransmitFrame; - burstRequest.state = BurstRequestState::None; lock.unlock(); + } + + // Wait for the result to be ready + boost::mutex::scoped_lock lock(burstRequest.mutex); + while (burstRequest.state != BurstRequestState::Acquired) { + if (not m_running) break; + burstRequest.mutex_notification.wait(lock); + } + + burstRequest.state = BurstRequestState::None; + lock.unlock(); + + burstRequest.num_samples = std::min(burstRequest.num_samples, + std::min( + burstRequest.tx_samples.size() / sizeof(complexf), + burstRequest.rx_samples.size() / sizeof(complexf))); - burstRequest.num_samples = std::min(burstRequest.num_samples, - std::min( - burstRequest.tx_samples.size() / sizeof(complexf), - burstRequest.rx_samples.size() / sizeof(complexf))); - - uint32_t num_samples_32 = burstRequest.num_samples; - if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send num_samples failed"; - break; - } - - if (sendall(client_sock, - &burstRequest.tx_second, - sizeof(burstRequest.tx_second)) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send tx_second failed"; - break; - } - - if (sendall(client_sock, - &burstRequest.tx_pps, - sizeof(burstRequest.tx_pps)) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send tx_pps failed"; - break; - } - - const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); - - assert(burstRequest.tx_samples.size() >= frame_bytes); - if (sendall(client_sock, - &burstRequest.tx_samples[0], - frame_bytes) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send tx_frame failed"; - break; - } - - if (sendall(client_sock, - &burstRequest.rx_second, - sizeof(burstRequest.rx_second)) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send rx_second failed"; - break; - } - - if (sendall(client_sock, - &burstRequest.rx_pps, - sizeof(burstRequest.rx_pps)) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send rx_pps failed"; - break; - } - - assert(burstRequest.rx_samples.size() >= frame_bytes); - if (sendall(client_sock, - &burstRequest.rx_samples[0], - frame_bytes) < 0) { - etiLog.level(info) << - "DPD Feedback Server Client send rx_frame failed"; - break; - } - - close(client_sock); + uint32_t num_samples_32 = burstRequest.num_samples; + if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send num_samples failed"; + break; } + + if (sendall(client_sock, + &burstRequest.tx_second, + sizeof(burstRequest.tx_second)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_second failed"; + break; + } + + if (sendall(client_sock, + &burstRequest.tx_pps, + sizeof(burstRequest.tx_pps)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_pps failed"; + break; + } + + const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); + + assert(burstRequest.tx_samples.size() >= frame_bytes); + if (sendall(client_sock, + &burstRequest.tx_samples[0], + frame_bytes) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send tx_frame failed"; + break; + } + + if (sendall(client_sock, + &burstRequest.rx_second, + sizeof(burstRequest.rx_second)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_second failed"; + break; + } + + if (sendall(client_sock, + &burstRequest.rx_pps, + sizeof(burstRequest.rx_pps)) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_pps failed"; + break; + } + + assert(burstRequest.rx_samples.size() >= frame_bytes); + if (sendall(client_sock, + &burstRequest.rx_samples[0], + frame_bytes) < 0) { + etiLog.level(info) << + "DPD Feedback Server Client send rx_frame failed"; + break; + } + + close(client_sock); } - catch (runtime_error &e) { - etiLog.level(error) << "DPD Feedback Server fault: " << e.what(); +} + +void OutputUHDFeedback::ServeFeedbackThread() +{ + set_thread_name("uhdservefeedback"); + + while (m_running) { + try { + ServeFeedback(); + } + catch (runtime_error &e) { + etiLog.level(error) << "DPD Feedback Server fault: " << e.what(); + } + + boost::this_thread::sleep(boost::posix_time::seconds(5)); } m_running = false; diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index 32668b6..c68f4c2 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -101,6 +101,7 @@ class OutputUHDFeedback { // Thread that listens for requests over TCP to get TX and RX feedback void ServeFeedbackThread(void); + void ServeFeedback(void); boost::thread rx_burst_thread; boost::thread burst_tcp_thread; |