From 0ab36b05bba931c97a0c17cc663e7afb9f89b3cd Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 2 Sep 2017 23:01:13 +0200 Subject: MemlessPoly: replace async by own threadpool It is slightly faster, but the main advantage is that there is no slowdown when running under gdb because it doesn't create and destroy threads for every frame. --- src/MemlessPoly.cpp | 87 +++++++++++++++++++++++++++++++++++++++-------------- src/MemlessPoly.h | 46 +++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 24 deletions(-) diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index d5188f2..d7f9a96 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -53,7 +53,6 @@ using namespace std; MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) : PipelinedModCodec(), RemoteControllable("memlesspoly"), - m_num_threads(num_threads), m_coefs_am(), m_coefs_pm(), m_coefs_file(coefs_file), @@ -62,19 +61,37 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n", coefs_file.c_str(), this); - if (m_num_threads == 0) { + RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); + RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " + "When set, the file gets loaded."); + + if (num_threads == 0) { const unsigned int hw_concurrency = std::thread::hardware_concurrency(); etiLog.level(info) << "Polynomial Predistorter will use " << hw_concurrency << " threads (auto detected)"; + + for (size_t i = 0; i < hw_concurrency; i++) { + m_workers.emplace_back(); + } + + for (auto& worker : m_workers) { + worker.thread = std::thread( + &MemlessPoly::worker_thread, &worker); + } } else { etiLog.level(info) << "Polynomial Predistorter will use " << - m_num_threads << " threads (set in config file)"; - } + num_threads << " threads (set in config file)"; - RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); - RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " - "When set, the file gets loaded."); + for (size_t i = 0; i < num_threads; i++) { + m_workers.emplace_back(); + } + + for (auto& worker : m_workers) { + worker.thread = std::thread( + &MemlessPoly::worker_thread, &worker); + } + } load_coefficients(m_coefs_file); @@ -100,7 +117,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile) std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS)); } - const size_t n_entries = 2 * n_coefs; + const int n_entries = 2 * n_coefs; etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries); @@ -138,7 +155,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile) * instead, and this allows the compiler to auto-vectorize the loop. */ static void apply_coeff( - const vector &coefs_am, const vector &coefs_pm, + const float *__restrict coefs_am, const float *__restrict coefs_pm, const complexf *__restrict in, size_t start, size_t stop, complexf *__restrict out) { @@ -178,6 +195,24 @@ static void apply_coeff( } } +void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata) +{ + while (true) { + worker_t::input_data_t in_data; + workerdata->in_queue.wait_and_pop(in_data); + + if (in_data.terminate) { + break; + } + + apply_coeff(in_data.coefs_am, in_data.coefs_pm, + in_data.in, in_data.start, in_data.stop, + in_data.out); + + workerdata->out_queue.push(1); + } +} + int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { dataOut->setLength(dataIn->getLength()); @@ -188,34 +223,40 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut) { std::lock_guard lock(m_coefs_mutex); - const unsigned int hw_concurrency = std::thread::hardware_concurrency(); - - const unsigned int num_threads = - (m_num_threads > 0) ? m_num_threads : hw_concurrency; + const size_t num_threads = m_workers.size(); - if (num_threads) { + if (num_threads > 0) { const size_t step = sizeOut / num_threads; - vector > flags; size_t start = 0; - for (size_t i = 0; i < num_threads - 1; i++) { - flags.push_back(async(launch::async, apply_coeff, - m_coefs_am, m_coefs_pm, - in, start, start + step, out)); + for (auto& worker : m_workers) { + worker_t::input_data_t dat; + dat.terminate = false; + dat.coefs_am = m_coefs_am.data(); + dat.coefs_pm = m_coefs_pm.data(); + dat.in = in; + dat.start = start; + dat.stop = start + step; + dat.out = out; + + worker.in_queue.push(dat); start += step; } // Do the last in this thread - apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out); + apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), + in, start, sizeOut, out); // Wait for completion of the tasks - for (auto& f : flags) { - f.get(); + for (auto& worker : m_workers) { + int ret; + worker.out_queue.wait_and_pop(ret); } } else { - apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out); + apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), + in, 0, sizeOut, out); } } diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 4dcd44a..57c0924 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -67,7 +67,51 @@ private: int internal_process(Buffer* const dataIn, Buffer* dataOut); void load_coefficients(const std::string &coefFile); - unsigned int m_num_threads; + struct worker_t { + struct input_data_t { + bool terminate = false; + + const float *coefs_am = nullptr; + const float *coefs_pm = nullptr; + const complexf *in = nullptr; + size_t start = 0; + size_t stop = 0; + complexf *out = nullptr; + }; + + worker_t() {} + worker_t(const worker_t& other) = delete; + worker_t operator=(const worker_t& other) = delete; + worker_t operator=(worker_t&& other) = delete; + + // The move constructor creates a new in_queue and out_queue, + // because ThreadsafeQueue is neither copy- nor move-constructible. + // Not an issue because creating the workers happens at startup, before + // the first work item. + worker_t(worker_t&& other) : + in_queue(), + out_queue(), + thread(std::move(other.thread)) {} + + ~worker_t() { + if (thread.joinable()) { + input_data_t terminate_tag; + terminate_tag.terminate = true; + in_queue.push(terminate_tag); + thread.join(); + } + } + + ThreadsafeQueue in_queue; + ThreadsafeQueue out_queue; + + std::thread thread; + }; + + std::vector m_workers; + + static void worker_thread(worker_t *workerdata); + std::vector m_coefs_am; // AM/AM coefficients std::vector m_coefs_pm; // AM/PM coefficients std::string m_coefs_file; -- cgit v1.2.3