diff options
-rw-r--r-- | src/MemlessPoly.cpp | 87 | ||||
-rw-r--r-- | src/MemlessPoly.h | 46 |
2 files changed, 109 insertions, 24 deletions
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index d5188f2..d7f9a96 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -53,7 +53,6 @@ using namespace std; MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) : PipelinedModCodec(), RemoteControllable("memlesspoly"), - m_num_threads(num_threads), m_coefs_am(), m_coefs_pm(), m_coefs_file(coefs_file), @@ -62,19 +61,37 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n", coefs_file.c_str(), this); - if (m_num_threads == 0) { + RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); + RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " + "When set, the file gets loaded."); + + if (num_threads == 0) { const unsigned int hw_concurrency = std::thread::hardware_concurrency(); etiLog.level(info) << "Polynomial Predistorter will use " << hw_concurrency << " threads (auto detected)"; + + for (size_t i = 0; i < hw_concurrency; i++) { + m_workers.emplace_back(); + } + + for (auto& worker : m_workers) { + worker.thread = std::thread( + &MemlessPoly::worker_thread, &worker); + } } else { etiLog.level(info) << "Polynomial Predistorter will use " << - m_num_threads << " threads (set in config file)"; - } + num_threads << " threads (set in config file)"; - RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); - RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " - "When set, the file gets loaded."); + for (size_t i = 0; i < num_threads; i++) { + m_workers.emplace_back(); + } + + for (auto& worker : m_workers) { + worker.thread = std::thread( + &MemlessPoly::worker_thread, &worker); + } + } load_coefficients(m_coefs_file); @@ -100,7 +117,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile) std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS)); } - const size_t n_entries = 2 * n_coefs; + const int n_entries = 2 * n_coefs; etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries); @@ -138,7 +155,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile) * instead, and this allows the compiler to auto-vectorize the loop. */ static void apply_coeff( - const vector<float> &coefs_am, const vector<float> &coefs_pm, + const float *__restrict coefs_am, const float *__restrict coefs_pm, const complexf *__restrict in, size_t start, size_t stop, complexf *__restrict out) { @@ -178,6 +195,24 @@ static void apply_coeff( } } +void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata) +{ + while (true) { + worker_t::input_data_t in_data; + workerdata->in_queue.wait_and_pop(in_data); + + if (in_data.terminate) { + break; + } + + apply_coeff(in_data.coefs_am, in_data.coefs_pm, + in_data.in, in_data.start, in_data.stop, + in_data.out); + + workerdata->out_queue.push(1); + } +} + int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { dataOut->setLength(dataIn->getLength()); @@ -188,34 +223,40 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { std::lock_guard<std::mutex> lock(m_coefs_mutex); - const unsigned int hw_concurrency = std::thread::hardware_concurrency(); - - const unsigned int num_threads = - (m_num_threads > 0) ? m_num_threads : hw_concurrency; + const size_t num_threads = m_workers.size(); - if (num_threads) { + if (num_threads > 0) { const size_t step = sizeOut / num_threads; - vector<future<void> > flags; size_t start = 0; - for (size_t i = 0; i < num_threads - 1; i++) { - flags.push_back(async(launch::async, apply_coeff, - m_coefs_am, m_coefs_pm, - in, start, start + step, out)); + for (auto& worker : m_workers) { + worker_t::input_data_t dat; + dat.terminate = false; + dat.coefs_am = m_coefs_am.data(); + dat.coefs_pm = m_coefs_pm.data(); + dat.in = in; + dat.start = start; + dat.stop = start + step; + dat.out = out; + + worker.in_queue.push(dat); start += step; } // Do the last in this thread - apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out); + apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), + in, start, sizeOut, out); // Wait for completion of the tasks - for (auto& f : flags) { - f.get(); + for (auto& worker : m_workers) { + int ret; + worker.out_queue.wait_and_pop(ret); } } else { - apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out); + apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), + in, 0, sizeOut, out); } } diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 4dcd44a..57c0924 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -67,7 +67,51 @@ private: int internal_process(Buffer* const dataIn, Buffer* dataOut); void load_coefficients(const std::string &coefFile); - unsigned int m_num_threads; + struct worker_t { + struct input_data_t { + bool terminate = false; + + const float *coefs_am = nullptr; + const float *coefs_pm = nullptr; + const complexf *in = nullptr; + size_t start = 0; + size_t stop = 0; + complexf *out = nullptr; + }; + + worker_t() {} + worker_t(const worker_t& other) = delete; + worker_t operator=(const worker_t& other) = delete; + worker_t operator=(worker_t&& other) = delete; + + // The move constructor creates a new in_queue and out_queue, + // because ThreadsafeQueue is neither copy- nor move-constructible. + // Not an issue because creating the workers happens at startup, before + // the first work item. + worker_t(worker_t&& other) : + in_queue(), + out_queue(), + thread(std::move(other.thread)) {} + + ~worker_t() { + if (thread.joinable()) { + input_data_t terminate_tag; + terminate_tag.terminate = true; + in_queue.push(terminate_tag); + thread.join(); + } + } + + ThreadsafeQueue<input_data_t> in_queue; + ThreadsafeQueue<int> out_queue; + + std::thread thread; + }; + + std::vector<worker_t> m_workers; + + static void worker_thread(worker_t *workerdata); + std::vector<float> m_coefs_am; // AM/AM coefficients std::vector<float> m_coefs_pm; // AM/PM coefficients std::string m_coefs_file; |