diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/ConfigParser.cpp | 5 | ||||
| -rw-r--r-- | src/ConfigParser.h | 2 | ||||
| -rw-r--r-- | src/DabModulator.cpp | 9 | ||||
| -rw-r--r-- | src/MemlessPoly.cpp | 203 | ||||
| -rw-r--r-- | src/MemlessPoly.h | 12 | ||||
| -rw-r--r-- | src/OutputUHDFeedback.cpp | 275 | ||||
| -rw-r--r-- | src/OutputUHDFeedback.h | 1 | 
7 files changed, 298 insertions, 209 deletions
| diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 459811f..9ac1280 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -171,7 +171,10 @@ static void parse_configfile(      // Poly coefficients:      if (pt.get("poly.enabled", 0) == 1) {          mod_settings.polyCoefFilename = -            pt.get<std::string>("poly.polycoeffile", "default"); +            pt.get<std::string>("poly.polycoeffile", "dpd/poly.coef"); + +        mod_settings.polyNumThreads = +            pt.get<int>("poly.num_threads", 0);      }      // Output options diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 22a4fc5..89f0fb7 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -75,7 +75,7 @@ struct mod_settings_t {      std::string filterTapsFilename = "";      std::string polyCoefFilename = ""; - +    unsigned polyNumThreads = 0;  #if defined(HAVE_OUTPUT_UHD)      OutputUHDConfig outputuhd_conf; diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 34d8e66..cc2642a 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -205,13 +205,8 @@ int DabModulator::process(Buffer* dataOut)          shared_ptr<MemlessPoly> cifPoly;          if (not m_settings.polyCoefFilename.empty()) { -            cifPoly = make_shared<MemlessPoly>(m_settings.polyCoefFilename); -            etiLog.level(debug) << m_settings.polyCoefFilename << "\n"; -            etiLog.level(debug) << cifPoly->m_coefs[0] << " " << -                cifPoly->m_coefs[1] << " "<< cifPoly->m_coefs[2] << " "<< -                cifPoly->m_coefs[3] << " "<< cifPoly->m_coefs[4] << " "<< -                cifPoly->m_coefs[5] << " "<< cifPoly->m_coefs[6] << " "<< -                cifPoly->m_coefs[7] << "\n"; +            cifPoly = make_shared<MemlessPoly>(m_settings.polyCoefFilename, +                                               m_settings.polyNumThreads);              rcs.enrol(cifPoly.get());          } diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index 7e074eb..d5188f2 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -29,6 +29,8 @@     along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.   */ +#pragma GCC optimize ("O3") +  #include "MemlessPoly.h"  #include "PcDebug.h"  #include "Utils.h" @@ -36,31 +38,43 @@  #include <stdio.h>  #include <stdexcept> +#include <future>  #include <array>  #include <iostream>  #include <fstream>  #include <memory> +#include <complex>  using namespace std; +// Number of AM/AM coefs, identical to number of AM/PM coefs +#define NUM_COEFS 5 -// By default the signal is unchanged -static const std::array<float, 8> default_coefficients({ -        1, 0.0, 0.0, 0.0, -        0.0, 0.0, 0.0, 0.0 -        }); - - -MemlessPoly::MemlessPoly(const std::string& coefs_file) : +MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads) :      PipelinedModCodec(),      RemoteControllable("memlesspoly"), -    m_coefs_file(coefs_file) +    m_num_threads(num_threads), +    m_coefs_am(), +    m_coefs_pm(), +    m_coefs_file(coefs_file), +    m_coefs_mutex()  {      PDEBUG("MemlessPoly::MemlessPoly(%s) @ %p\n",              coefs_file.c_str(), this); +    if (m_num_threads == 0) { +        const unsigned int hw_concurrency = std::thread::hardware_concurrency(); +        etiLog.level(info) << "Polynomial Predistorter will use " << +            hw_concurrency << " threads (auto detected)"; +    } +    else { +        etiLog.level(info) << "Polynomial Predistorter will use " << +            m_num_threads << " threads (set in config file)"; +    } +      RC_ADD_PARAMETER(ncoefs, "(Read-only) number of coefficients."); -    RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. When written to, the new file gets automatically loaded."); +    RC_ADD_PARAMETER(coeffile, "Filename containing coefficients. " +            "When set, the file gets loaded.");      load_coefficients(m_coefs_file); @@ -69,76 +83,141 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file) :  void MemlessPoly::load_coefficients(const std::string &coefFile)  { -    std::vector<float> coefs; -    if (coefFile == "default") { -        std::copy(default_coefficients.begin(), default_coefficients.end(), -                std::back_inserter(coefs)); +    std::vector<float> coefs_am; +    std::vector<float> coefs_pm; +    std::ifstream coef_fstream(coefFile.c_str()); +    if (!coef_fstream) { +        throw std::runtime_error("MemlessPoly: Could not open file with coefs!");      } -    else { -        std::ifstream coef_fstream(coefFile.c_str()); -        if(!coef_fstream) { -            fprintf(stderr, "MemlessPoly: file %s could not be opened !\n", coefFile.c_str()); -            throw std::runtime_error("MemlessPoly: Could not open file with coefs! "); -        } -        int n_coefs; -        coef_fstream >> n_coefs; +    int n_coefs; +    coef_fstream >> n_coefs; -        if (n_coefs <= 0) { -            fprintf(stderr, "MemlessPoly: warning: coefs file has invalid format\n"); -            throw std::runtime_error("MemlessPoly: coefs file has invalid format."); -        } +    if (n_coefs <= 0) { +        throw std::runtime_error("MemlessPoly: coefs file has invalid format."); +    } +    else if (n_coefs != NUM_COEFS) { +        throw std::runtime_error("MemlessPoly: invalid number of coefs: " + +                std::to_string(n_coefs) + " expected " + std::to_string(NUM_COEFS)); +    } -        if (n_coefs != 8) { -            throw std::runtime_error( "MemlessPoly: error: coefs file does not have 8 coefs\n"); -        } +    const size_t n_entries = 2 * n_coefs; -        fprintf(stderr, "MemlessPoly: Reading %d coefs...\n", n_coefs); +    etiLog.log(debug, "MemlessPoly: Reading %d coefs...", n_entries); -        coefs.resize(n_coefs); +    coefs_am.resize(n_coefs); +    coefs_pm.resize(n_coefs); -        int n; -        for (n = 0; n < n_coefs; n++) { -            coef_fstream >> coefs[n]; -            PDEBUG("MemlessPoly: coef: %f\n",  coefs[n] ); -            if (coef_fstream.eof()) { -                fprintf(stderr, "MemlessPoly: file %s should contains %d coefs, but EOF reached "\ -                        "after %d coefs !\n", coefFile.c_str(), n_coefs, n); -                throw std::runtime_error("MemlessPoly: coefs file invalid ! "); -            } +    for (int n = 0; n < n_entries; n++) { +        float a; +        coef_fstream >> a; + +        if (n < n_coefs) { +            coefs_am[n] = a; +        } +        else { +            coefs_pm[n - n_coefs] = a; +        } + +        if (coef_fstream.eof()) { +            etiLog.log(error, "MemlessPoly: file %s should contains %d coefs, " +                    "but EOF reached after %d coefs !", +                    coefFile.c_str(), n_entries, n); +            throw std::runtime_error("MemlessPoly: coefs file invalid !");          }      }      {          std::lock_guard<std::mutex> lock(m_coefs_mutex); -        m_coefs = coefs; +        m_coefs_am = coefs_am; +        m_coefs_pm = coefs_pm;      }  } +/* The restrict keyword is C99, g++ and clang++ however support __restrict + * instead, and this allows the compiler to auto-vectorize the loop. + */ +static void apply_coeff( +        const vector<float> &coefs_am, const vector<float> &coefs_pm, +        const complexf *__restrict in, size_t start, size_t stop, +        complexf *__restrict out) +{ +    for (size_t i = start; i < stop; i+=1) { + +        float in_mag_sq = in[i].real() * in[i].real() + in[i].imag() * in[i].imag(); + +        float amplitude_correction = +            ( coefs_am[0] + in_mag_sq * +              ( coefs_am[1] + in_mag_sq * +                ( coefs_am[2] + in_mag_sq * +                  ( coefs_am[3] + in_mag_sq * +                    coefs_am[4])))); + +        float phase_correction = -1 * +            ( coefs_pm[0] + in_mag_sq * +              ( coefs_pm[1] + in_mag_sq * +                ( coefs_pm[2] + in_mag_sq * +                  ( coefs_pm[3] + in_mag_sq * +                    coefs_pm[4])))); + +        float phase_correction_sq = phase_correction * phase_correction; + +        // Approximation for Cosinus 1 - 1/2 x^2 + 1/24 x^4 - 1/720 x^6 +        float re = (1.0f - phase_correction_sq * +                ( -0.5f + phase_correction_sq * +                    ( 0.486666f  + phase_correction_sq * +                        ( -0.00138888f)))); + +        // Approximation for Sinus x + 1/6 x^3 + 1/120 x^5 +        float im = phase_correction * +                (1.0f + phase_correction_sq * +                    (0.166666f + phase_correction_sq * +                        (0.00833333f))); + +        out[i] = in[i] * amplitude_correction * complex<float>(re, im); +    } +}  int MemlessPoly::internal_process(Buffer* const dataIn, Buffer* dataOut)  { -        const float* in = reinterpret_cast<const float*>(dataIn->getData()); -        float* out      = reinterpret_cast<float*>(dataOut->getData()); -        size_t sizeIn   = dataIn->getLength() / sizeof(float); - -        { -             std::lock_guard<std::mutex> lock(m_coefs_mutex); -             for (size_t i = 0; i < sizeIn; i += 1) { -                 float mag = std::abs(in[i]); -                 //out[i] = in[i]; -                 out[i] = in[i] * ( -                     m_coefs[0] + -                     m_coefs[1] * mag + -                     m_coefs[2] * mag*mag + -                     m_coefs[3] * mag*mag*mag + -                     m_coefs[4] * mag*mag*mag*mag + -                     m_coefs[5] * mag*mag*mag*mag*mag + -                     m_coefs[6] * mag*mag*mag*mag*mag*mag + -                     m_coefs[7] * mag*mag*mag*mag*mag*mag*mag -                     ); +    dataOut->setLength(dataIn->getLength()); + +    const complexf* in = reinterpret_cast<const complexf*>(dataIn->getData()); +    complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); +    size_t sizeOut = dataOut->getLength() / sizeof(complexf); + +    { +        std::lock_guard<std::mutex> lock(m_coefs_mutex); +        const unsigned int hw_concurrency = std::thread::hardware_concurrency(); + +        const unsigned int num_threads = +            (m_num_threads > 0) ? m_num_threads : hw_concurrency; + +        if (num_threads) { +            const size_t step = sizeOut / num_threads; +            vector<future<void> > flags; + +            size_t start = 0; +            for (size_t i = 0; i < num_threads - 1; i++) { +                flags.push_back(async(launch::async, apply_coeff, +                            m_coefs_am, m_coefs_pm, +                            in, start, start + step, out)); + +                start += step;              } + +            // Do the last in this thread +            apply_coeff(m_coefs_am, m_coefs_pm, in, start, sizeOut, out); + +            // Wait for completion of the tasks +            for (auto& f : flags) { +                f.get(); +            } +        } +        else { +            apply_coeff(m_coefs_am, m_coefs_pm, in, 0, sizeOut, out);          } +    }      return dataOut->getLength();  } @@ -172,9 +251,9 @@ const string MemlessPoly::get_parameter(const string& parameter) const  {      stringstream ss;      if (parameter == "ncoefs") { -        ss << m_coefs.size(); +        ss << m_coefs_am.size();      } -    else if (parameter == "coefFile") { +    else if (parameter == "coeffile") {          ss << m_coefs_file;      }      else { diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 210b4b4..4dcd44a 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -52,7 +52,7 @@ typedef std::complex<float> complexf;  class MemlessPoly : public PipelinedModCodec, public RemoteControllable  {  public: -    MemlessPoly(const std::string& coefs_file); +    MemlessPoly(const std::string& coefs_file, unsigned int num_threads);      virtual const char* name() { return "MemlessPoly"; } @@ -63,16 +63,14 @@ public:      virtual const std::string get_parameter(              const std::string& parameter) const; -//TODO to protected -    std::vector<float> m_coefs; - - -protected: +private:      int internal_process(Buffer* const dataIn, Buffer* dataOut);      void load_coefficients(const std::string &coefFile); +    unsigned int m_num_threads; +    std::vector<float> m_coefs_am; // AM/AM coefficients +    std::vector<float> m_coefs_pm; // AM/PM coefficients      std::string m_coefs_file; -      mutable std::mutex m_coefs_mutex;  }; diff --git a/src/OutputUHDFeedback.cpp b/src/OutputUHDFeedback.cpp index 2a99e6b..a8f2c2e 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/OutputUHDFeedback.cpp @@ -43,6 +43,7 @@ DESCRIPTION:  #include <sys/socket.h>  #include <errno.h>  #include <poll.h> +#include <boost/date_time/posix_time/posix_time.hpp>  #include "OutputUHDFeedback.h"  #include "Utils.h" @@ -218,152 +219,164 @@ static ssize_t sendall(int socket, const void *buffer, size_t buflen)      return buflen;  } -void OutputUHDFeedback::ServeFeedbackThread() +void OutputUHDFeedback::ServeFeedback()  { -    set_thread_name("uhdservefeedback"); +    if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { +        throw std::runtime_error("Can't create TCP socket"); +    } + +    struct sockaddr_in addr; +    addr.sin_family = AF_INET; +    addr.sin_port = htons(m_port); +    addr.sin_addr.s_addr = htonl(INADDR_ANY); + +    if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { +        close(m_server_sock); +        throw std::runtime_error("Can't bind TCP socket"); +    } + +    if (listen(m_server_sock, 1) < 0) { +        close(m_server_sock); +        throw std::runtime_error("Can't listen TCP socket"); +    } + +    etiLog.level(info) << "DPD Feedback server listening on port " << m_port; + +    while (m_running) { +        struct sockaddr_in client; +        int client_sock = accept_with_timeout(m_server_sock, 1000, &client); -    try { -        if ((m_server_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) { -            throw std::runtime_error("Can't create TCP socket"); +        if (client_sock == -1) { +            close(m_server_sock); +            throw runtime_error("Could not establish new connection"); +        } +        else if (client_sock == -2) { +            continue;          } -        struct sockaddr_in addr; -        addr.sin_family = AF_INET; -        addr.sin_port = htons(m_port); -        addr.sin_addr.s_addr = htonl(INADDR_ANY); +        uint8_t request_version = 0; +        ssize_t read = recv(client_sock, &request_version, 1, 0); +        if (!read) break; // done reading +        if (read < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client read request version failed: " << strerror(errno); +            break; +        } -        if (bind(m_server_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { -            throw std::runtime_error("Can't bind TCP socket"); +        if (request_version != 1) { +            etiLog.level(info) << "DPD Feedback Server wrong request version"; +            break;          } -        if (listen(m_server_sock, 1) < 0) { -            throw std::runtime_error("Can't listen TCP socket"); +        uint32_t num_samples = 0; +        read = recv(client_sock, &num_samples, 4, 0); +        if (!read) break; // done reading +        if (read < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client read num samples failed"; +            break;          } -        etiLog.level(info) << "DPD Feedback server listening on port " << m_port; - -        while (m_running) { -            struct sockaddr_in client; -            int client_sock = accept_with_timeout(m_server_sock, 1000, &client); - -            if (client_sock == -1) { -                throw runtime_error("Could not establish new connection"); -            } -            else if (client_sock == -2) { -                continue; -            } - -            uint8_t request_version = 0; -            ssize_t read = recv(client_sock, &request_version, 1, 0); -            if (!read) break; // done reading -            if (read < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client read request version failed: " << strerror(errno); -                break; -            } - -            if (request_version != 1) { -                etiLog.level(info) << "DPD Feedback Server wrong request version"; -                break; -            } - -            uint32_t num_samples = 0; -            read = recv(client_sock, &num_samples, 4, 0); -            if (!read) break; // done reading -            if (read < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client read num samples failed"; -                break; -            } - -            // We are ready to issue the request now -            { -                boost::mutex::scoped_lock lock(burstRequest.mutex); -                burstRequest.num_samples = num_samples; -                burstRequest.state = BurstRequestState::SaveTransmitFrame; - -                lock.unlock(); -            } - -            // Wait for the result to be ready +        // We are ready to issue the request now +        {              boost::mutex::scoped_lock lock(burstRequest.mutex); -            while (burstRequest.state != BurstRequestState::Acquired) { -                if (not m_running) break; -                burstRequest.mutex_notification.wait(lock); -            } +            burstRequest.num_samples = num_samples; +            burstRequest.state = BurstRequestState::SaveTransmitFrame; -            burstRequest.state = BurstRequestState::None;              lock.unlock(); +        } + +        // Wait for the result to be ready +        boost::mutex::scoped_lock lock(burstRequest.mutex); +        while (burstRequest.state != BurstRequestState::Acquired) { +            if (not m_running) break; +            burstRequest.mutex_notification.wait(lock); +        } + +        burstRequest.state = BurstRequestState::None; +        lock.unlock(); + +        burstRequest.num_samples = std::min(burstRequest.num_samples, +                std::min( +                    burstRequest.tx_samples.size() / sizeof(complexf), +                    burstRequest.rx_samples.size() / sizeof(complexf))); -            burstRequest.num_samples = std::min(burstRequest.num_samples, -                    std::min( -                        burstRequest.tx_samples.size() / sizeof(complexf), -                        burstRequest.rx_samples.size() / sizeof(complexf))); - -            uint32_t num_samples_32 = burstRequest.num_samples; -            if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send num_samples failed"; -                break; -            } - -            if (sendall(client_sock, -                        &burstRequest.tx_second, -                        sizeof(burstRequest.tx_second)) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send tx_second failed"; -                break; -            } - -            if (sendall(client_sock, -                        &burstRequest.tx_pps, -                        sizeof(burstRequest.tx_pps)) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send tx_pps failed"; -                break; -            } - -            const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); - -            assert(burstRequest.tx_samples.size() >= frame_bytes); -            if (sendall(client_sock, -                        &burstRequest.tx_samples[0], -                        frame_bytes) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send tx_frame failed"; -                break; -            } - -            if (sendall(client_sock, -                        &burstRequest.rx_second, -                        sizeof(burstRequest.rx_second)) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send rx_second failed"; -                break; -            } - -            if (sendall(client_sock, -                        &burstRequest.rx_pps, -                        sizeof(burstRequest.rx_pps)) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send rx_pps failed"; -                break; -            } - -            assert(burstRequest.rx_samples.size() >= frame_bytes); -            if (sendall(client_sock, -                        &burstRequest.rx_samples[0], -                        frame_bytes) < 0) { -                etiLog.level(info) << -                    "DPD Feedback Server Client send rx_frame failed"; -                break; -            } - -            close(client_sock); +        uint32_t num_samples_32 = burstRequest.num_samples; +        if (sendall(client_sock, &num_samples_32, sizeof(num_samples_32)) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send num_samples failed"; +            break;          } + +        if (sendall(client_sock, +                    &burstRequest.tx_second, +                    sizeof(burstRequest.tx_second)) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send tx_second failed"; +            break; +        } + +        if (sendall(client_sock, +                    &burstRequest.tx_pps, +                    sizeof(burstRequest.tx_pps)) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send tx_pps failed"; +            break; +        } + +        const size_t frame_bytes = burstRequest.num_samples * sizeof(complexf); + +        assert(burstRequest.tx_samples.size() >= frame_bytes); +        if (sendall(client_sock, +                    &burstRequest.tx_samples[0], +                    frame_bytes) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send tx_frame failed"; +            break; +        } + +        if (sendall(client_sock, +                    &burstRequest.rx_second, +                    sizeof(burstRequest.rx_second)) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send rx_second failed"; +            break; +        } + +        if (sendall(client_sock, +                    &burstRequest.rx_pps, +                    sizeof(burstRequest.rx_pps)) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send rx_pps failed"; +            break; +        } + +        assert(burstRequest.rx_samples.size() >= frame_bytes); +        if (sendall(client_sock, +                    &burstRequest.rx_samples[0], +                    frame_bytes) < 0) { +            etiLog.level(info) << +                "DPD Feedback Server Client send rx_frame failed"; +            break; +        } + +        close(client_sock);      } -    catch (runtime_error &e) { -        etiLog.level(error) << "DPD Feedback Server fault: " << e.what(); +} + +void OutputUHDFeedback::ServeFeedbackThread() +{ +    set_thread_name("uhdservefeedback"); + +    while (m_running) { +        try { +            ServeFeedback(); +        } +        catch (runtime_error &e) { +            etiLog.level(error) << "DPD Feedback Server fault: " << e.what(); +        } + +        boost::this_thread::sleep(boost::posix_time::seconds(5));      }      m_running = false; diff --git a/src/OutputUHDFeedback.h b/src/OutputUHDFeedback.h index 32668b6..c68f4c2 100644 --- a/src/OutputUHDFeedback.h +++ b/src/OutputUHDFeedback.h @@ -101,6 +101,7 @@ class OutputUHDFeedback {          // Thread that listens for requests over TCP to get TX and RX feedback          void ServeFeedbackThread(void); +        void ServeFeedback(void);          boost::thread rx_burst_thread;          boost::thread burst_tcp_thread; | 
