diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/MemlessPoly.cpp | 87 | ||||
| -rw-r--r-- | src/MemlessPoly.h | 46 | 
2 files changed, 109 insertions, 24 deletions
| diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index d5188f2..d7f9a96 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -53,7 +53,6 @@ using namespace std;  MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) :      PipelinedModCodec(),      RemoteControllable("memlesspoly"), -    m_num_threads(num_threads),      m_coefs_am(),      m_coefs_pm(),      m_coefs_file(coefs_file), @@ -62,19 +61,37 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads      PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n",              coefs_file.c_str(), this); -    if (m_num_threads == 0) { +    RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); +    RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " +            "When set, the file gets loaded."); + +    if (num_threads == 0) {          const unsigned int hw_concurrency = std::thread::hardware_concurrency();          etiLog.level(info) << "Polynomial Predistorter will use " <<              hw_concurrency << " threads (auto detected)"; + +        for (size_t i = 0; i < hw_concurrency; i++) { +            m_workers.emplace_back(); +        } + +        for (auto& worker : m_workers) { +            worker.thread = std::thread( +                    &MemlessPoly::worker_thread, &worker); +        }      }      else {          etiLog.level(info) << "Polynomial Predistorter will use " << -            m_num_threads << " threads (set in config file)"; -    } +            num_threads << " threads (set in config file)"; -    RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); -    RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " -            "When set, the file gets loaded."); +        for (size_t i = 0; i < num_threads; i++) { +            m_workers.emplace_back(); +        } + +        for (auto& worker : m_workers) { +            worker.thread = std::thread( +                    &MemlessPoly::worker_thread, &worker); +        } +    }      load_coefficients(m_coefs_file); @@ -100,7 +117,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile)                  std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS));      } -    const size_t n_entries = 2 * n_coefs; +    const int n_entries = 2 * n_coefs;      etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries); @@ -138,7 +155,7 @@ void MemlessPoly::load_coefficients(const std::string &coefFile)   * instead, and this allows the compiler to auto-vectorize the loop.   */  static void apply_coeff( -        const vector<float> &coefs_am, const vector<float> &coefs_pm, +        const float *__restrict coefs_am, const float *__restrict coefs_pm,          const complexf *__restrict in, size_t start, size_t stop,          complexf *__restrict out)  { @@ -178,6 +195,24 @@ static void apply_coeff(      }  } +void MemlessPoly::worker_thread(MemlessPoly::worker_t *workerdata) +{ +    while (true) { +        worker_t::input_data_t in_data; +        workerdata->in_queue.wait_and_pop(in_data); + +        if (in_data.terminate) { +            break; +        } + +        apply_coeff(in_data.coefs_am, in_data.coefs_pm, +                in_data.in, in_data.start, in_data.stop, +                in_data.out); + +        workerdata->out_queue.push(1); +    } +} +  int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)  {      dataOut->setLength(dataIn->getLength()); @@ -188,34 +223,40 @@ int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)      {          std::lock_guard<std::mutex> lock(m_coefs_mutex); -        const unsigned int hw_concurrency = std::thread::hardware_concurrency(); - -        const unsigned int num_threads = -            (m_num_threads > 0) ? m_num_threads : hw_concurrency; +        const size_t num_threads = m_workers.size(); -        if (num_threads) { +        if (num_threads > 0) {              const size_t step = sizeOut / num_threads; -            vector<future<void> > flags;              size_t start = 0; -            for (size_t i = 0; i < num_threads - 1; i++) { -                flags.push_back(async(launch::async, apply_coeff, -                            m_coefs_am, m_coefs_pm, -                            in, start, start + step, out)); +            for (auto& worker : m_workers) { +                worker_t::input_data_t dat; +                dat.terminate = false; +                dat.coefs_am = m_coefs_am.data(); +                dat.coefs_pm = m_coefs_pm.data(); +                dat.in = in; +                dat.start = start; +                dat.stop = start + step; +                dat.out = out; + +                worker.in_queue.push(dat);                  start += step;              }              // Do the last in this thread -            apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out); +            apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), +                    in, start, sizeOut, out);              // Wait for completion of the tasks -            for (auto& f : flags) { -                f.get(); +            for (auto& worker : m_workers) { +                int ret; +                worker.out_queue.wait_and_pop(ret);              }          }          else { -            apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out); +            apply_coeff(m_coefs_am.data(), m_coefs_pm.data(), +                    in, 0, sizeOut, out);          }      } diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 4dcd44a..57c0924 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -67,7 +67,51 @@ private:      int internal_process(Buffer* const dataIn, Buffer* dataOut);      void load_coefficients(const std::string &coefFile); -    unsigned int m_num_threads; +    struct worker_t { +        struct input_data_t { +            bool terminate = false; + +            const float *coefs_am = nullptr; +            const float *coefs_pm = nullptr; +            const complexf *in = nullptr; +            size_t start = 0; +            size_t stop = 0; +            complexf *out = nullptr; +        }; + +        worker_t() {} +        worker_t(const worker_t& other) = delete; +        worker_t operator=(const worker_t& other) = delete; +        worker_t operator=(worker_t&& other) = delete; + +        // The move constructor creates a new in_queue and out_queue, +        // because ThreadsafeQueue is neither copy- nor move-constructible. +        // Not an issue because creating the workers happens at startup, before +        // the first work item. +        worker_t(worker_t&& other) : +            in_queue(), +            out_queue(), +            thread(std::move(other.thread)) {} + +        ~worker_t() { +            if (thread.joinable()) { +                input_data_t terminate_tag; +                terminate_tag.terminate = true; +                in_queue.push(terminate_tag); +                thread.join(); +            } +        } + +        ThreadsafeQueue<input_data_t> in_queue; +        ThreadsafeQueue<int> out_queue; + +        std::thread thread; +    }; + +    std::vector<worker_t> m_workers; + +    static void worker_thread(worker_t *workerdata); +      std::vector<float> m_coefs_am; // AM/AM coefficients      std::vector<float> m_coefs_pm; // AM/PM coefficients      std::string m_coefs_file; | 
