aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorandreas128 <Andreas>2017-09-12 16:12:49 +0200
committerandreas128 <Andreas>2017-09-12 16:12:49 +0200
commit7539b330f40275351e6d0aba8f314f5e4a7626e7 (patch)
tree1aa0f2cd84f2ee0820b1a5b27e792c792759b2bd /src
parentdd46ed939e56a4e56c3dcec60cce1b93c8786a4a (diff)
parent0ab36b05bba931c97a0c17cc663e7afb9f89b3cd (diff)
downloaddabmod-7539b330f40275351e6d0aba8f314f5e4a7626e7.tar.gz
dabmod-7539b330f40275351e6d0aba8f314f5e4a7626e7.tar.bz2
dabmod-7539b330f40275351e6d0aba8f314f5e4a7626e7.zip
Merge branch 'next_memless' of github.com:Opendigitalradio/ODR-DabMod into next_memless
Diffstat (limited to 'src')
-rw-r--r--src/MemlessPoly.cpp87
-rw-r--r--src/MemlessPoly.h46
2 files changed, 109 insertions, 24 deletions
diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp
index d5188f2..d7f9a96 100644
--- a/src/MemlessPoly.cpp
+++ b/src/MemlessPoly.cpp
@@ -53,7 +53,6 @@ using namespace std;
MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) :
PipelinedModCodec(),
RemoteControllable("memlesspoly"),
- m_num_threads(num_threads),
m_coefs_am(),
m_coefs_pm(),
m_coefs_file(coefs_file),
@@ -62,19 +61,37 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads
PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n",
coefs_file.c_str(), this);
- if (m_num_threads == 0) {
+ RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients.");
+ RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. "
+ "When set, the file gets loaded.");
+
+ if (num_threads == 0) {
const unsigned int hw_concurrency = std::thread::hardware_concurrency();
etiLog.level(info) << "Polynomial Predistorter will use " <<
hw_concurrency << " threads (auto detected)";
+
+ for (size_t i = 0; i < hw_concurrency; i++) {
+ m_workers.emplace_back();
+ }
+
+ for (auto& worker : m_workers) {
+ worker.thread = std::thread(
+ &MemlessPoly::worker_thread, &worker);
+ }
}
else {
etiLog.level(info) << "Polynomial Predistorter will use " <<
- m_num_threads << " threads (set in config file)";
- }
+ num_threads << " threads (set in config file)";
- RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients.");
- RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. "
- "When set, the file gets loaded.");
+ for (size_t i = 0; i < num_threads; i++) {
+ m_workers.emplace_back();
+ }
+
+ for (auto& worker : m_workers) {
+ worker.thread = std::thread(
+ &MemlessPoly::worker_thread, &worker);
+ }
+ }
load_coefficients(m_coefs_file);
@@ -100,7 +117,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile)
std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS));
}
- const size_t n_entries = 2 * n_coefs;
+ const int n_entries = 2 * n_coefs;
etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries);
@@ -138,7 +155,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile)
* instead, and this allows the compiler to auto-vectorize the loop.
*/
static void apply_coeff(
- const vector<float> &coefs_am, const vector<float> &coefs_pm,
+ const float *__restrict coefs_am, const float *__restrict coefs_pm,
const complexf *__restrict in, size_t start, size_t stop,
complexf *__restrict out)
{
@@ -178,6 +195,24 @@ static void apply_coeff(
}
}
+void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata)
+{
+ while (true) {
+ worker_t::input_data_t in_data;
+ workerdata->in_queue.wait_and_pop(in_data);
+
+ if (in_data.terminate) {
+ break;
+ }
+
+ apply_coeff(in_data.coefs_am, in_data.coefs_pm,
+ in_data.in, in_data.start, in_data.stop,
+ in_data.out);
+
+ workerdata->out_queue.push(1);
+ }
+}
+
int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)
{
dataOut->setLength(dataIn->getLength());
@@ -188,34 +223,40 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)
{
std::lock_guard<std::mutex> lock(m_coefs_mutex);
- const unsigned int hw_concurrency = std::thread::hardware_concurrency();
-
- const unsigned int num_threads =
- (m_num_threads > 0) ? m_num_threads : hw_concurrency;
+ const size_t num_threads = m_workers.size();
- if (num_threads) {
+ if (num_threads > 0) {
const size_t step = sizeOut / num_threads;
- vector<future<void> > flags;
size_t start = 0;
- for (size_t i = 0; i < num_threads - 1; i++) {
- flags.push_back(async(launch::async, apply_coeff,
- m_coefs_am, m_coefs_pm,
- in, start, start + step, out));
+ for (auto& worker : m_workers) {
+ worker_t::input_data_t dat;
+ dat.terminate = false;
+ dat.coefs_am = m_coefs_am.data();
+ dat.coefs_pm = m_coefs_pm.data();
+ dat.in = in;
+ dat.start = start;
+ dat.stop = start + step;
+ dat.out = out;
+
+ worker.in_queue.push(dat);
start += step;
}
// Do the last in this thread
- apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out);
+ apply_coeff(m_coefs_am.data(), m_coefs_pm.data(),
+ in, start, sizeOut, out);
// Wait for completion of the tasks
- for (auto& f : flags) {
- f.get();
+ for (auto& worker : m_workers) {
+ int ret;
+ worker.out_queue.wait_and_pop(ret);
}
}
else {
- apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out);
+ apply_coeff(m_coefs_am.data(), m_coefs_pm.data(),
+ in, 0, sizeOut, out);
}
}
diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h
index 4dcd44a..57c0924 100644
--- a/src/MemlessPoly.h
+++ b/src/MemlessPoly.h
@@ -67,7 +67,51 @@ private:
int internal_process(Buffer* const dataIn, Buffer* dataOut);
void load_coefficients(const std::string &coefFile);
- unsigned int m_num_threads;
+ struct worker_t {
+ struct input_data_t {
+ bool terminate = false;
+
+ const float *coefs_am = nullptr;
+ const float *coefs_pm = nullptr;
+ const complexf *in = nullptr;
+ size_t start = 0;
+ size_t stop = 0;
+ complexf *out = nullptr;
+ };
+
+ worker_t() {}
+ worker_t(const worker_t& other) = delete;
+ worker_t operator=(const worker_t& other) = delete;
+ worker_t operator=(worker_t&& other) = delete;
+
+ // The move constructor creates a new in_queue and out_queue,
+ // because ThreadsafeQueue is neither copy- nor move-constructible.
+ // Not an issue because creating the workers happens at startup, before
+ // the first work item.
+ worker_t(worker_t&& other) :
+ in_queue(),
+ out_queue(),
+ thread(std::move(other.thread)) {}
+
+ ~worker_t() {
+ if (thread.joinable()) {
+ input_data_t terminate_tag;
+ terminate_tag.terminate = true;
+ in_queue.push(terminate_tag);
+ thread.join();
+ }
+ }
+
+ ThreadsafeQueue<input_data_t> in_queue;
+ ThreadsafeQueue<int> out_queue;
+
+ std::thread thread;
+ };
+
+ std::vector<worker_t> m_workers;
+
+ static void worker_thread(worker_t *workerdata);
+
std::vector<float> m_coefs_am; // AM/AM coefficients
std::vector<float> m_coefs_pm; // AM/PM coefficients
std::string m_coefs_file;