diff options
Diffstat (limited to 'src/MemlessPoly.cpp')
-rw-r--r-- | src/MemlessPoly.cpp | 87 |
1 files changed, 64 insertions, 23 deletions
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index d5188f2..d7f9a96 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -53,7 +53,6 @@ using namespace std; MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) : PipelinedModCodec(), RemoteControllable("memlesspoly"), - m_num_threads(num_threads), m_coefs_am(), m_coefs_pm(), m_coefs_file(coefs_file), @@ -62,19 +61,37 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n", coefs_file.c_str(), this); - if (m_num_threads == 0) { + RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); + RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " + "When set, the file gets loaded."); + + if (num_threads == 0) { const unsigned int hw_concurrency = std::thread::hardware_concurrency(); etiLog.level(info) << "Polynomial Predistorter will use " << hw_concurrency << " threads (auto detected)"; + + for (size_t i = 0; i < hw_concurrency; i++) { + m_workers.emplace_back(); + } + + for (auto& worker : m_workers) { + worker.thread = std::thread( + &MemlessPoly::worker_thread, &worker); + } } else { etiLog.level(info) << "Polynomial Predistorter will use " << - m_num_threads << " threads (set in config file)"; - } + num_threads << " threads (set in config file)"; - RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); - RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " - "When set, the file gets loaded."); + for (size_t i = 0; i < num_threads; i++) { + m_workers.emplace_back(); + } + + for (auto& worker : m_workers) { + worker.thread = std::thread( + &MemlessPoly::worker_thread, &worker); + } + } load_coefficients(m_coefs_file); @@ -100,7 +117,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile) std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS)); } - const size_t n_entries = 2 * n_coefs; + const int n_entries = 2 * n_coefs; etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries); @@ -138,7 +155,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile) * instead, and this allows the compiler to auto-vectorize the loop. */ static void apply_coeff( - const vector<float> &coefs_am, const vector<float> &coefs_pm, + const float *__restrict coefs_am, const float *__restrict coefs_pm, const complexf *__restrict in, size_t start, size_t stop, complexf *__restrict out) { @@ -178,6 +195,24 @@ static void apply_coeff( } } +void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata) +{ + while (true) { + worker_t::input_data_t in_data; + workerdata->in_queue.wait_and_pop(in_data); + + if (in_data.terminate) { + break; + } + + apply_coeff(in_data.coefs_am, in_data.coefs_pm, + in_data.in, in_data.start, in_data.stop, + in_data.out); + + workerdata->out_queue.push(1); + } +} + int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { dataOut->setLength(dataIn->getLength()); @@ -188,34 +223,40 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { std::lock_guard<std::mutex> lock(m_coefs_mutex); - const unsigned int hw_concurrency = std::thread::hardware_concurrency(); - - const unsigned int num_threads = - (m_num_threads > 0) ? m_num_threads : hw_concurrency; + const size_t num_threads = m_workers.size(); - if (num_threads) { + if (num_threads > 0) { const size_t step = sizeOut / num_threads; - vector<future<void> > flags; size_t start = 0; - for (size_t i = 0; i < num_threads - 1; i++) { - flags.push_back(async(launch::async, apply_coeff, - m_coefs_am, m_coefs_pm, - in, start, start + step, out)); + for (auto& worker : m_workers) { + worker_t::input_data_t dat; + dat.terminate = false; + dat.coefs_am = m_coefs_am.data(); + dat.coefs_pm = m_coefs_pm.data(); + dat.in = in; + dat.start = start; + dat.stop = start + step; + dat.out = out; + + worker.in_queue.push(dat); start += step; } // Do the last in this thread - apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out); + apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), + in, start, sizeOut, out); // Wait for completion of the tasks - for (auto& f : flags) { - f.get(); + for (auto& worker : m_workers) { + int ret; + worker.out_queue.wait_and_pop(ret); } } else { - apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out); + apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), + in, 0, sizeOut, out); } } |