diff options
Diffstat (limited to 'src')
58 files changed, 2953 insertions, 2531 deletions
| diff --git a/src/BlockPartitioner.cpp b/src/BlockPartitioner.cpp index 9e9f80b..5767650 100644 --- a/src/BlockPartitioner.cpp +++ b/src/BlockPartitioner.cpp @@ -1,6 +1,11 @@  /*     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -21,6 +26,7 @@  #include "BlockPartitioner.h"  #include "PcDebug.h" +#include "Log.h"  #include <stdio.h>  #include <stdexcept> @@ -31,6 +37,7 @@  BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :      ModMux(), +    ModMetadata(),      d_mode(mode)  {      PDEBUG("BlockPartitioner::BlockPartitioner(%i)\n", mode); @@ -68,17 +75,11 @@ BlockPartitioner::BlockPartitioner(unsigned mode, unsigned phase) :      d_cifNb = 0;      // For Synchronisation purpose, count nb of CIF to drop      d_cifPhase = phase % d_cifCount; +    d_metaPhase = phase % d_cifCount;      d_cifSize = 864 * 8;  } -BlockPartitioner::~BlockPartitioner() -{ -    PDEBUG("BlockPartitioner::~BlockPartitioner()\n"); - -} - -  // dataIn[0] -> FIC  // dataIn[1] -> CIF  int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut) @@ -124,10 +125,10 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)      uint8_t* out = reinterpret_cast<uint8_t*>(dataOut->getData());      // Copy FIC data -    PDEBUG("Writting FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize); +    PDEBUG("Writing FIC %zu bytes to %zu\n", d_ficSize, d_cifNb * d_ficSize);      memcpy(out + (d_cifNb * d_ficSize), fic, d_ficSize);      // Copy CIF data -    PDEBUG("Writting CIF %u bytes to %zu\n", 864 * 8, +    PDEBUG("Writing CIF %u bytes to %zu\n", 864 * 8,              (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8));      memcpy(out + (d_cifCount * d_ficSize) + (d_cifNb * 864 * 8), cif, 864 * 8); @@ -137,3 +138,28 @@ int BlockPartitioner::process(std::vector<Buffer*> dataIn, Buffer* dataOut)      return d_cifNb == 0;  } + +meta_vec_t BlockPartitioner::process_metadata(const meta_vec_t& metadataIn) +{ +    // Synchronize CIF phase +    if (d_metaPhase != 0) { +        if (++d_metaPhase == d_cifCount) { +            d_metaPhase = 0; +        } +        // Drop this metadata +        return {}; +    } + +    if (d_cifNb == 1) { +        d_meta.clear(); +    } + +    std::copy(metadataIn.begin(), metadataIn.end(), std::back_inserter(d_meta)); + +    if (d_cifNb == 0) { +        return d_meta; +    } +    else { +        return {}; +    } +} diff --git a/src/BlockPartitioner.h b/src/BlockPartitioner.h index 90cffa3..a4656a1 100644 --- a/src/BlockPartitioner.h +++ b/src/BlockPartitioner.h @@ -1,6 +1,11 @@  /*     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org   */  /*     This file is part of ODR-DabMod. @@ -25,32 +30,32 @@  #   include <config.h>  #endif -  #include "ModPlugin.h"  #include <vector> +#include <cstddef> -#include <sys/types.h> - - -class BlockPartitioner : public ModMux +class BlockPartitioner : public ModMux, public ModMetadata  {  public:      BlockPartitioner(unsigned mode, unsigned phase); -    virtual ~BlockPartitioner(); -    BlockPartitioner(const BlockPartitioner&); -    BlockPartitioner& operator=(const BlockPartitioner&);      int process(std::vector<Buffer*> dataIn, Buffer* dataOut);      const char* name() { return "BlockPartitioner"; } +    // The implementation assumes process_metadata is always called after process +    virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn); +  protected:      int d_mode;      size_t d_ficSize;      size_t d_cifCount;      size_t d_cifNb;      size_t d_cifPhase; +    size_t d_metaPhase;      size_t d_cifSize;      size_t d_outputFramesize;      size_t d_outputFramecount; + +    meta_vec_t d_meta;  }; diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2c93a57..7603c1e 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -29,15 +29,17 @@  #   include "config.h"  #endif +#include <cstdint> +#include <boost/property_tree/ptree.hpp> +#include <boost/property_tree/ini_parser.hpp> +  #include "ConfigParser.h"  #include "porting.h"  #include "Utils.h"  #include "Log.h"  #include "DabModulator.h" +#include "output/SDR.h" -#include <unistd.h> -#include <boost/property_tree/ptree.hpp> -#include <boost/property_tree/ini_parser.hpp>  using namespace std; @@ -202,87 +204,101 @@ static void parse_configfile(      if (output_selected == "file") {          try {              mod_settings.outputName = pt.get<std::string>("fileoutput.filename"); +            mod_settings.fileOutputShowMetadata = +                (pt.get("fileoutput.show_metadata", 0) > 0);          }          catch (std::exception &e) {              std::cerr << "Error: " << e.what() << "\n";              std::cerr << "       Configuration does not specify file name for file output\n";              throw std::runtime_error("Configuration error");          } -        mod_settings.useFileOutput = 1; +        mod_settings.useFileOutput = true;          mod_settings.fileOutputFormat = pt.get("fileoutput.format", mod_settings.fileOutputFormat);      }  #if defined(HAVE_OUTPUT_UHD)      else if (output_selected == "uhd") { -        OutputUHDConfig outputuhd_conf; +        Output::SDRDeviceConfig sdr_device_config; -        outputuhd_conf.device = pt.get("uhdoutput.device", ""); -        outputuhd_conf.usrpType = pt.get("uhdoutput.type", ""); -        outputuhd_conf.subDevice = pt.get("uhdoutput.subdevice", ""); -        outputuhd_conf.masterClockRate = pt.get<long>("uhdoutput.master_clock_rate", 0); +        string device = pt.get("uhdoutput.device", ""); +        const auto usrpType = pt.get("uhdoutput.type", ""); +        if (usrpType != "") { +            if (not device.empty()) { +                device += ","; +            } +            device += "type=" + usrpType; +        } +        sdr_device_config.device = device; + +        sdr_device_config.subDevice = pt.get("uhdoutput.subdevice", ""); +        sdr_device_config.masterClockRate = pt.get<long>("uhdoutput.master_clock_rate", 0); -        if (outputuhd_conf.device.find("master_clock_rate") != std::string::npos) { +        if (sdr_device_config.device.find("master_clock_rate") != std::string::npos) {              std::cerr << "Warning:"                  "setting master_clock_rate in [uhd] device is deprecated !\n";          } -        if (outputuhd_conf.device.find("type=") != std::string::npos) { +        if (sdr_device_config.device.find("type=") != std::string::npos) {              std::cerr << "Warning:"                  "setting type in [uhd] device is deprecated !\n";          } -        outputuhd_conf.txgain = pt.get("uhdoutput.txgain", 0.0); -        outputuhd_conf.rxgain = pt.get("uhdoutput.rxgain", 0.0); -        outputuhd_conf.frequency = pt.get<double>("uhdoutput.frequency", 0); +        sdr_device_config.txgain = pt.get("uhdoutput.txgain", 0.0); +        sdr_device_config.tx_antenna = pt.get("uhdoutput.tx_antenna", ""); +        sdr_device_config.rx_antenna = pt.get("uhdoutput.rx_antenna", "RX2"); +        sdr_device_config.rxgain = pt.get("uhdoutput.rxgain", 0.0); +        sdr_device_config.frequency = pt.get<double>("uhdoutput.frequency", 0);          std::string chan = pt.get<std::string>("uhdoutput.channel", ""); -        outputuhd_conf.dabMode = mod_settings.dabMode; +        sdr_device_config.dabMode = mod_settings.dabMode; -        if (outputuhd_conf.frequency == 0 && chan == "") { +        if (sdr_device_config.frequency == 0 && chan == "") {              std::cerr << "       UHD output enabled, but neither frequency nor channel defined.\n";              throw std::runtime_error("Configuration error");          } -        else if (outputuhd_conf.frequency == 0) { -            outputuhd_conf.frequency = parseChannel(chan); +        else if (sdr_device_config.frequency == 0) { +            sdr_device_config.frequency = parseChannel(chan);          } -        else if (outputuhd_conf.frequency != 0 && chan != "") { +        else if (sdr_device_config.frequency != 0 && chan != "") {              std::cerr << "       UHD output: cannot define both frequency and channel.\n";              throw std::runtime_error("Configuration error");          } -        outputuhd_conf.lo_offset = pt.get<double>("uhdoutput.lo_offset", 0); +        sdr_device_config.lo_offset = pt.get<double>("uhdoutput.lo_offset", 0); -        outputuhd_conf.refclk_src = pt.get("uhdoutput.refclk_source", "internal"); -        outputuhd_conf.pps_src = pt.get("uhdoutput.pps_source", "none"); -        outputuhd_conf.pps_polarity = pt.get("uhdoutput.pps_polarity", "pos"); +        sdr_device_config.refclk_src = pt.get("uhdoutput.refclk_source", "internal"); +        sdr_device_config.pps_src = pt.get("uhdoutput.pps_source", "none"); +        sdr_device_config.pps_polarity = pt.get("uhdoutput.pps_polarity", "pos");          std::string behave = pt.get("uhdoutput.behaviour_refclk_lock_lost", "ignore");          if (behave == "crash") { -            outputuhd_conf.refclk_lock_loss_behaviour = CRASH; +            sdr_device_config.refclk_lock_loss_behaviour = Output::CRASH;          }          else if (behave == "ignore") { -            outputuhd_conf.refclk_lock_loss_behaviour = IGNORE; +            sdr_device_config.refclk_lock_loss_behaviour = Output::IGNORE;          }          else {              std::cerr << "Error: UHD output: behaviour_refclk_lock_lost invalid." << std::endl;              throw std::runtime_error("Configuration error");          } -        outputuhd_conf.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0); +        sdr_device_config.maxGPSHoldoverTime = pt.get("uhdoutput.max_gps_holdover_time", 0); -        outputuhd_conf.dpdFeedbackServerPort = pt.get<long>("uhdoutput.dpd_port", 0); +        sdr_device_config.dpdFeedbackServerPort = pt.get<long>("uhdoutput.dpd_port", 0); -        mod_settings.outputuhd_conf = outputuhd_conf; -        mod_settings.useUHDOutput = 1; +        mod_settings.sdr_device_config = sdr_device_config; +        mod_settings.useUHDOutput = true;      }  #endif  #if defined(HAVE_SOAPYSDR)      else if (output_selected == "soapysdr") { -        auto& outputsoapy_conf = mod_settings.outputsoapy_conf; +        auto& outputsoapy_conf = mod_settings.sdr_device_config;          outputsoapy_conf.device = pt.get("soapyoutput.device", "");          outputsoapy_conf.masterClockRate = pt.get<long>("soapyoutput.master_clock_rate", 0);          outputsoapy_conf.txgain = pt.get("soapyoutput.txgain", 0.0); +        outputsoapy_conf.tx_antenna = pt.get("soapyoutput.tx_antenna", ""); +        outputsoapy_conf.lo_offset = pt.get<double>("soapyoutput.lo_offset", 0.0);          outputsoapy_conf.frequency = pt.get<double>("soapyoutput.frequency", 0);          std::string chan = pt.get<std::string>("soapyoutput.channel", "");          outputsoapy_conf.dabMode = mod_settings.dabMode; @@ -299,14 +315,16 @@ static void parse_configfile(              throw std::runtime_error("Configuration error");          } -        mod_settings.useSoapyOutput = 1; +        outputsoapy_conf.dpdFeedbackServerPort = pt.get<long>("soapyoutput.dpd_port", 0); + +        mod_settings.useSoapyOutput = true;      }  #endif  #if defined(HAVE_ZEROMQ)      else if (output_selected == "zmq") {          mod_settings.outputName = pt.get<std::string>("zmqoutput.listen");          mod_settings.zmqOutputSocketType = pt.get<std::string>("zmqoutput.socket_type"); -        mod_settings.useZeroMQOutput = 1; +        mod_settings.useZeroMQOutput = true;      }  #endif      else { @@ -315,9 +333,9 @@ static void parse_configfile(      }  #if defined(HAVE_OUTPUT_UHD) -    mod_settings.outputuhd_conf.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1); -    mod_settings.outputuhd_conf.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1); -    if (mod_settings.outputuhd_conf.enableSync) { +    mod_settings.sdr_device_config.enableSync = (pt.get("delaymanagement.synchronous", 0) == 1); +    mod_settings.sdr_device_config.muteNoTimestamps = (pt.get("delaymanagement.mutenotimestamps", 0) == 1); +    if (mod_settings.sdr_device_config.enableSync) {          std::string delay_mgmt = pt.get<std::string>("delaymanagement.management", "");          std::string fixedoffset = pt.get<std::string>("delaymanagement.fixedoffset", "");          std::string offset_filename = pt.get<std::string>("delaymanagement.dynamicoffsetfile", ""); @@ -388,11 +406,11 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)              }  #endif              mod_settings.outputName = optarg; -            mod_settings.useFileOutput = 1; +            mod_settings.useFileOutput = true;              break;          case 'F':  #if defined(HAVE_OUTPUT_UHD) -            mod_settings.outputuhd_conf.frequency = strtof(optarg, NULL); +            mod_settings.sdr_device_config.frequency = strtof(optarg, NULL);  #endif              break;          case 'g': @@ -400,7 +418,7 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)              break;          case 'G':  #if defined(HAVE_OUTPUT_UHD) -            mod_settings.outputuhd_conf.txgain = strtod(optarg, NULL); +            mod_settings.sdr_device_config.txgain = strtod(optarg, NULL);  #endif              break;          case 'l': @@ -408,9 +426,7 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)              break;          case 'o':              mod_settings.tist_offset_s = strtod(optarg, NULL); -#if defined(HAVE_OUTPUT_UHD) -            mod_settings.outputuhd_conf.enableSync = true; -#endif +            mod_settings.sdr_device_config.enableSync = true;              break;          case 'm':              mod_settings.dabMode = strtol(optarg, NULL, 0); @@ -427,11 +443,13 @@ void parse_args(int argc, char **argv, mod_settings_t& mod_settings)                  fprintf(stderr, "Options -u and -f are mutually exclusive\n");                  throw std::invalid_argument("Invalid command line options");              } -            mod_settings.outputuhd_conf.device = optarg; -            mod_settings.outputuhd_conf.refclk_src = "internal"; -            mod_settings.outputuhd_conf.pps_src = "none"; -            mod_settings.outputuhd_conf.pps_polarity = "pos"; -            mod_settings.useUHDOutput = 1; +            mod_settings.sdr_device_config.device = optarg; +            mod_settings.sdr_device_config.refclk_src = "internal"; +            mod_settings.sdr_device_config.pps_src = "none"; +            mod_settings.sdr_device_config.pps_polarity = "pos"; +            mod_settings.useUHDOutput = true; +#else +            throw std::invalid_argument("Cannot select UHD output, not compiled in!");  #endif              break;          case 'V': diff --git a/src/ConfigParser.h b/src/ConfigParser.h index 0be3558..dc5ac4f 100644 --- a/src/ConfigParser.h +++ b/src/ConfigParser.h @@ -34,23 +34,21 @@  #include <string>  #include "GainControl.h"  #include "TII.h" -#if defined(HAVE_OUTPUT_UHD) -#   include "OutputUHD.h" -#endif -#if defined(HAVE_SOAPYSDR) -#   include "OutputSoapy.h" -#endif +#include "output/SDR.h" +#include "output/UHD.h" +#include "output/Soapy.h"  #define ZMQ_INPUT_MAX_FRAME_QUEUE 500  struct mod_settings_t {      std::string outputName; -    int useZeroMQOutput = 0; +    bool useZeroMQOutput = false;      std::string zmqOutputSocketType = ""; -    int useFileOutput = 0; +    bool useFileOutput = false;      std::string fileOutputFormat = "complexf"; -    int useUHDOutput = 0; -    int useSoapyOutput = 0; +    bool fileOutputShowMetadata = false; +    bool useUHDOutput = false; +    bool useSoapyOutput = false;      size_t outputRate = 2048000;      size_t clockRate = 0; @@ -61,7 +59,6 @@ struct mod_settings_t {      float gainmodeVariance = 4.0f;      // To handle the timestamp offset of the modulator -    unsigned tist_delay_stages = 1; // because GainControl is pipelined      double tist_offset_s = 0.0;      bool loop = false; @@ -85,12 +82,8 @@ struct mod_settings_t {      // Settings for the OFDM windowing      unsigned ofdmWindowOverlap = 0; -#if defined(HAVE_OUTPUT_UHD) -    OutputUHDConfig outputuhd_conf; -#endif - -#if defined(HAVE_SOAPYSDR) -    OutputSoapyConfig outputsoapy_conf; +#if defined(HAVE_OUTPUT_UHD) || defined(HAVE_SOAPYSDR) +    Output::SDRDeviceConfig sdr_device_config;  #endif  }; diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 25a93bf..8a0ee03 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -37,12 +37,9 @@  #include "OutputFile.h"  #include "FormatConverter.h"  #include "FrameMultiplexer.h" -#if defined(HAVE_OUTPUT_UHD) -#   include "OutputUHD.h" -#endif -#if defined(HAVE_SOAPYSDR) -#   include "OutputSoapy.h" -#endif +#include "output/SDR.h" +#include "output/UHD.h" +#include "output/Soapy.h"  #include "OutputZeroMQ.h"  #include "InputReader.h"  #include "PcDebug.h" @@ -95,18 +92,12 @@ void signalHandler(int signalNb)  struct modulator_data  { -    modulator_data() : -        inputReader(nullptr), -        framecount(0), -        flowgraph(nullptr), -        etiReader(nullptr) {} - -    InputReader* inputReader; +    std::shared_ptr<InputReader> inputReader;      Buffer data; -    uint64_t framecount; +    uint64_t framecount = 0; -    Flowgraph* flowgraph; -    EtiReader* etiReader; +    Flowgraph* flowgraph = nullptr; +    EtiReader* etiReader = nullptr;  };  enum class run_modulator_state_t { @@ -116,7 +107,7 @@ enum class run_modulator_state_t {      reconfigure // Some sort of change of configuration we cannot handle happened  }; -run_modulator_state_t run_modulator(modulator_data& m); +static run_modulator_state_t run_modulator(modulator_data& m);  static void printModSettings(const mod_settings_t& mod_settings)  { @@ -133,15 +124,15 @@ static void printModSettings(const mod_settings_t& mod_settings)      else if (mod_settings.useUHDOutput) {          fprintf(stderr, " UHD\n"                          "  Device: %s\n" -                        "  Type: %s\n" +                        "  Subdevice: %s\n"                          "  master_clock_rate: %ld\n"                          "  refclk: %s\n"                          "  pps source: %s\n", -                mod_settings.outputuhd_conf.device.c_str(), -                mod_settings.outputuhd_conf.usrpType.c_str(), -                mod_settings.outputuhd_conf.masterClockRate, -                mod_settings.outputuhd_conf.refclk_src.c_str(), -                mod_settings.outputuhd_conf.pps_src.c_str()); +                mod_settings.sdr_device_config.device.c_str(), +                mod_settings.sdr_device_config.subDevice.c_str(), +                mod_settings.sdr_device_config.masterClockRate, +                mod_settings.sdr_device_config.refclk_src.c_str(), +                mod_settings.sdr_device_config.pps_src.c_str());      }  #endif  #if defined(HAVE_SOAPYSDR) @@ -149,8 +140,8 @@ static void printModSettings(const mod_settings_t& mod_settings)          fprintf(stderr, " SoapySDR\n"                          "  Device: %s\n"                          "  master_clock_rate: %ld\n", -                mod_settings.outputsoapy_conf.device.c_str(), -                mod_settings.outputsoapy_conf.masterClockRate); +                mod_settings.sdr_device_config.device.c_str(), +                mod_settings.sdr_device_config.masterClockRate);      }  #endif      else if (mod_settings.useZeroMQOutput) { @@ -180,7 +171,7 @@ static shared_ptr<ModOutput> prepare_output(      if (s.useFileOutput) {          if (s.fileOutputFormat == "complexf") { -            output = make_shared<OutputFile>(s.outputName); +            output = make_shared<OutputFile>(s.outputName, s.fileOutputShowMetadata);          }          else if (s.fileOutputFormat == "complexf_normalised") {              if (s.gainMode == GainMode::GAIN_FIX) @@ -189,7 +180,7 @@ static shared_ptr<ModOutput> prepare_output(                  s.normalise = 1.0f / normalise_factor_file_max;              else if (s.gainMode == GainMode::GAIN_VAR)                  s.normalise = 1.0f / normalise_factor_file_var; -            output = make_shared<OutputFile>(s.outputName); +            output = make_shared<OutputFile>(s.outputName, s.fileOutputShowMetadata);          }          else if (s.fileOutputFormat == "s8" or                  s.fileOutputFormat == "u8") { @@ -198,7 +189,7 @@ static shared_ptr<ModOutput> prepare_output(              // [0; 255]              s.normalise = 127.0f / normalise_factor; -            output = make_shared<OutputFile>(s.outputName); +            output = make_shared<OutputFile>(s.outputName, s.fileOutputShowMetadata);          }          else {              throw runtime_error("File output format " + s.fileOutputFormat + @@ -208,18 +199,20 @@ static shared_ptr<ModOutput> prepare_output(  #if defined(HAVE_OUTPUT_UHD)      else if (s.useUHDOutput) {          s.normalise = 1.0f / normalise_factor; -        s.outputuhd_conf.sampleRate = s.outputRate; -        output = make_shared<OutputUHD>(s.outputuhd_conf); -        rcs.enrol((OutputUHD*)output.get()); +        s.sdr_device_config.sampleRate = s.outputRate; +        auto uhddevice = make_shared<Output::UHD>(s.sdr_device_config); +        output = make_shared<Output::SDR>(s.sdr_device_config, uhddevice); +        rcs.enrol((Output::SDR*)output.get());      }  #endif  #if defined(HAVE_SOAPYSDR)      else if (s.useSoapyOutput) {          /* We normalise the same way as for the UHD output */          s.normalise = 1.0f / normalise_factor; -        s.outputsoapy_conf.sampleRate = s.outputRate; -        output = make_shared<OutputSoapy>(s.outputsoapy_conf); -        rcs.enrol((OutputSoapy*)output.get()); +        s.sdr_device_config.sampleRate = s.outputRate; +        auto soapydevice = make_shared<Output::Soapy>(s.sdr_device_config); +        output = make_shared<Output::SDR>(s.sdr_device_config, soapydevice); +        rcs.enrol((Output::SDR*)output.get());      }  #endif  #if defined(HAVE_ZEROMQ) @@ -270,16 +263,8 @@ int launch_modulator(int argc, char* argv[])          throw std::runtime_error("Configuration error");      } -    // When using the FIRFilter, increase the modulator offset pipelining delay -    // by the correct amount -    if (not mod_settings.filterTapsFilename.empty()) { -        mod_settings.tist_delay_stages += FIRFILTER_PIPELINE_DELAY; -    } -      printModSettings(mod_settings); -    modulator_data m; -      shared_ptr<FormatConverter> format_converter;      if (mod_settings.useFileOutput and              (mod_settings.fileOutputFormat == "s8" or @@ -296,7 +281,7 @@ int launch_modulator(int argc, char* argv[])      set_thread_name("modulator");      if (mod_settings.inputTransport == "edi") { -        EdiReader ediReader(mod_settings.tist_offset_s, mod_settings.tist_delay_stages); +        EdiReader ediReader(mod_settings.tist_offset_s);          EdiDecoder::ETIDecoder ediInput(ediReader, false);          if (mod_settings.edi_max_delay_ms > 0.0f) {              // setMaxDelay wants number of AF packets, which correspond to 24ms ETI frames @@ -321,17 +306,6 @@ int launch_modulator(int argc, char* argv[])              flowgraph.connect(modulator, output);          } -#if defined(HAVE_OUTPUT_UHD) -        if (mod_settings.useUHDOutput) { -            ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource()); -        } -#endif -#if defined(HAVE_SOAPYSDR) -        if (mod_settings.useSoapyOutput) { -            ((OutputSoapy*)output.get())->setETISource(modulator->getEtiSource()); -        } -#endif -          size_t framecount = 0;          while (running) { @@ -397,11 +371,12 @@ int launch_modulator(int argc, char* argv[])          while (run_again) {              Flowgraph flowgraph; -            m.inputReader = inputReader.get(); +            modulator_data m; +            m.inputReader = inputReader;              m.flowgraph = &flowgraph;              m.data.setLength(6144); -            EtiReader etiReader(mod_settings.tist_offset_s, mod_settings.tist_delay_stages); +            EtiReader etiReader(mod_settings.tist_offset_s);              m.etiReader = &etiReader;              auto input = make_shared<InputMemory>(&m.data); @@ -415,17 +390,6 @@ int launch_modulator(int argc, char* argv[])                  flowgraph.connect(modulator, output);              } -#if defined(HAVE_OUTPUT_UHD) -            if (mod_settings.useUHDOutput) { -                ((OutputUHD*)output.get())->setETISource(modulator->getEtiSource()); -            } -#endif -#if defined(HAVE_SOAPYSDR) -            if (mod_settings.useSoapyOutput) { -                ((OutputSoapy*)output.get())->setETISource(modulator->getEtiSource()); -            } -#endif -              inputReader->PrintInfo();              run_modulator_state_t st = run_modulator(m); @@ -490,7 +454,7 @@ int launch_modulator(int argc, char* argv[])      return ret;  } -run_modulator_state_t run_modulator(modulator_data& m) +static run_modulator_state_t run_modulator(modulator_data& m)  {      auto ret = run_modulator_state_t::failure;      try { @@ -527,13 +491,26 @@ run_modulator_state_t run_modulator(modulator_data& m)                  }              }              if (framesize == 0) { -                etiLog.level(info) << "End of file reached."; +                if (dynamic_pointer_cast<InputFileReader>(m.inputReader)) { +                    etiLog.level(info) << "End of file reached."; +                    running = 0; +                    ret = run_modulator_state_t::normal_end; +                } +#if defined(HAVE_ZEROMQ) +                else if (dynamic_pointer_cast<InputZeroMQReader>(m.inputReader)) { +                    /* An empty frame marks a timeout. We ignore it, but we are +                     * now able to handle SIGINT properly. +                     */ +                } +#endif // defined(HAVE_ZEROMQ) +                // No need to handle the TCP input in a special way to get SIGINT working, +                // because recv() will return with EINTR.              }              else {                  etiLog.level(error) << "Input read error."; +                running = 0; +                ret = run_modulator_state_t::normal_end;              } -            running = 0; -            ret = run_modulator_state_t::normal_end;          }      }      catch (const zmq_input_overflow& e) { diff --git a/src/DabModulator.cpp b/src/DabModulator.cpp index 8ba8af6..5b77d18 100644 --- a/src/DabModulator.cpp +++ b/src/DabModulator.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -162,10 +162,13 @@ int DabModulator::process(Buffer* dataOut)              }          } -        auto cifCicEq = make_shared<CicEqualizer>( +        shared_ptr<CicEqualizer> cifCicEq; +        if (useCicEq) { +            cifCicEq = make_shared<CicEqualizer>(                  myNbCarriers,                  (float)mySpacing * (float)m_settings.outputRate / 2048000.0f,                  cic_ratio); +        }          shared_ptr<TII> tii;          shared_ptr<PhaseReference> tiiRef; @@ -218,7 +221,7 @@ int DabModulator::process(Buffer* dataOut)              rcs.enrol(cifPoly.get());          } -        auto myOutput = make_shared<OutputMemory>(dataOut); +        myOutput = make_shared<OutputMemory>(dataOut);          shared_ptr<Resampler> cifRes;          if (m_settings.outputRate != 2048000) { @@ -347,43 +350,24 @@ int DabModulator::process(Buffer* dataOut)              myFlowgraph->connect(tii, cifSig);          } -        if (useCicEq) { -            myFlowgraph->connect(cifSig, cifCicEq); -            myFlowgraph->connect(cifCicEq, cifOfdm); -        } -        else { -            myFlowgraph->connect(cifSig, cifOfdm); -        } -        myFlowgraph->connect(cifOfdm, cifGain); -        myFlowgraph->connect(cifGain, cifGuard); - -        auto cifOut = cifPoly ? -            static_pointer_cast<ModPlugin>(cifPoly) : -            static_pointer_cast<ModPlugin>(myOutput); - -        if (cifFilter) { -            myFlowgraph->connect(cifGuard, cifFilter); -            if (cifRes) { -                myFlowgraph->connect(cifFilter, cifRes); -                myFlowgraph->connect(cifRes, cifOut); -            } -            else { -                myFlowgraph->connect(cifFilter, cifOut); -            } -        } -        else { -            if (cifRes) { -                myFlowgraph->connect(cifGuard, cifRes); -                myFlowgraph->connect(cifRes, cifOut); -            } -            else { -                myFlowgraph->connect(cifGuard, cifOut); +        shared_ptr<ModPlugin> prev_plugin = static_pointer_cast<ModPlugin>(cifSig); +        const std::list<shared_ptr<ModPlugin> > plugins({ +                static_pointer_cast<ModPlugin>(cifCicEq), +                static_pointer_cast<ModPlugin>(cifOfdm), +                static_pointer_cast<ModPlugin>(cifGain), +                static_pointer_cast<ModPlugin>(cifGuard), +                static_pointer_cast<ModPlugin>(cifFilter), // optional block +                static_pointer_cast<ModPlugin>(cifRes),    // optional block +                static_pointer_cast<ModPlugin>(cifPoly),   // optional block +                static_pointer_cast<ModPlugin>(myOutput), +                }); + +        for (auto& p : plugins) { +            if (p) { +                myFlowgraph->connect(prev_plugin, p); +                prev_plugin = p;              }          } - -        if (cifPoly) { -            myFlowgraph->connect(cifPoly, myOutput); -        }      }      //////////////////////////////////////////////////////////////////// @@ -392,3 +376,12 @@ int DabModulator::process(Buffer* dataOut)      return myFlowgraph->run();  } +meta_vec_t DabModulator::process_metadata(const meta_vec_t& metadataIn) +{ +    if (myOutput) { +        return myOutput->get_latest_metadata(); +    } + +    return {}; +} + diff --git a/src/DabModulator.h b/src/DabModulator.h index 2d7bc35..e84ce96 100644 --- a/src/DabModulator.h +++ b/src/DabModulator.h @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -46,7 +46,7 @@  #include "TII.h" -class DabModulator : public ModInput +class DabModulator : public ModInput, public ModMetadata  {  public:      DabModulator(EtiSource& etiSource, @@ -55,6 +55,9 @@ public:      int process(Buffer* dataOut);      const char* name() { return "DabModulator"; } +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn); +      /* Required to get the timestamp */      EtiSource* getEtiSource() { return &myEtiSource; } diff --git a/src/EtiReader.cpp b/src/EtiReader.cpp index d84ed1f..17f4953 100644 --- a/src/EtiReader.cpp +++ b/src/EtiReader.cpp @@ -54,10 +54,9 @@ enum ETI_READER_STATE {  EtiReader::EtiReader( -        double& tist_offset_s, -        unsigned tist_delay_stages) : +        double& tist_offset_s) :      state(EtiReaderStateSync), -    myTimestampDecoder(tist_offset_s, tist_delay_stages), +    myTimestampDecoder(tist_offset_s),      eti_fc_valid(false)  {      rcs.enrol(&myTimestampDecoder); @@ -281,6 +280,8 @@ int EtiReader::loadEtiData(const Buffer& dataIn)      myTimestampDecoder.updateTimestampEti(eti_fc.FP & 0x3,              eti_eoh.MNSC, getPPSOffset(), eti_fc.FCT); +    myFicSource->loadTimestamp(myTimestampDecoder.getTimestamp()); +      return dataIn.getLength() - input_size;  } @@ -290,11 +291,6 @@ bool EtiReader::sourceContainsTimestamp()      /* See ETS 300 799, Annex C.2.2 */  } -void EtiReader::calculateTimestamp(struct frame_timestamp& ts) -{ -    myTimestampDecoder.calculateTimestamp(ts); -} -  uint32_t EtiReader::getPPSOffset()  {      if (!sourceContainsTimestamp()) { @@ -309,9 +305,8 @@ uint32_t EtiReader::getPPSOffset()  }  EdiReader::EdiReader( -        double& tist_offset_s, -        unsigned tist_delay_stages) : -    m_timestamp_decoder(tist_offset_s, tist_delay_stages) +        double& tist_offset_s) : +    m_timestamp_decoder(tist_offset_s)  {      rcs.enrol(&m_timestamp_decoder);  } @@ -359,11 +354,6 @@ bool EdiReader::sourceContainsTimestamp()      return m_fc.tsta != 0xFFFFFF;  } -void EdiReader::calculateTimestamp(struct frame_timestamp& ts) -{ -    m_timestamp_decoder.calculateTimestamp(ts); -} -  bool EdiReader::isFrameReady()  {      return m_frameReady; @@ -533,8 +523,9 @@ void EdiReader::assemble()      const std::time_t posix_timestamp_1_jan_2000 = 946684800;      auto utc_ts = posix_timestamp_1_jan_2000 + m_seconds - m_utco; -    m_timestamp_decoder.updateTimestampEdi( -            utc_ts, m_fc.tsta, m_fc.fct()); +    m_timestamp_decoder.updateTimestampEdi(utc_ts, m_fc.tsta, m_fc.fct(), m_fc.fp); + +    myFicSource->loadTimestamp(m_timestamp_decoder.getTimestamp());      m_frameReady = true;  } diff --git a/src/EtiReader.h b/src/EtiReader.h index f3a9764..8270592 100644 --- a/src/EtiReader.h +++ b/src/EtiReader.h @@ -59,7 +59,6 @@ public:      /* Returns true if we have valid time stamps in the ETI*/      virtual bool sourceContainsTimestamp() = 0; -    virtual void calculateTimestamp(struct frame_timestamp& ts) = 0;      /* Return the FIC source to be used for modulation */      virtual std::shared_ptr<FicSource>& getFic(void); @@ -75,9 +74,7 @@ protected:  class EtiReader : public EtiSource  {  public: -    EtiReader( -            double& tist_offset_s, -            unsigned tist_delay_stages); +    EtiReader(double& tist_offset_s);      virtual unsigned getMode();      virtual unsigned getFp(); @@ -88,7 +85,6 @@ public:      int loadEtiData(const Buffer& dataIn);      virtual bool sourceContainsTimestamp(); -    virtual void calculateTimestamp(struct frame_timestamp& ts);      virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const; @@ -118,14 +114,11 @@ private:  class EdiReader : public EtiSource, public EdiDecoder::DataCollector  {  public: -    EdiReader( -            double& tist_offset_s, -            unsigned tist_delay_stages); +    EdiReader(double& tist_offset_s);      virtual unsigned getMode();      virtual unsigned getFp();      virtual bool sourceContainsTimestamp(); -    virtual void calculateTimestamp(struct frame_timestamp& ts);      virtual const std::vector<std::shared_ptr<SubchannelSource> > getSubchannels() const;      virtual bool isFrameReady(void); diff --git a/src/FIRFilter.cpp b/src/FIRFilter.cpp index bc2314a..96ad1b9 100644 --- a/src/FIRFilter.cpp +++ b/src/FIRFilter.cpp @@ -87,6 +87,11 @@ FIRFilter::FIRFilter(const std::string& taps_file) :      start_pipeline_thread();  } +FIRFilter::~FIRFilter() +{ +    stop_pipeline_thread(); +} +  void FIRFilter::load_filter_taps(const std::string &tapsFile)  {      std::vector<float> filter_taps; diff --git a/src/FIRFilter.h b/src/FIRFilter.h index a63bfb9..d04c456 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -53,20 +53,21 @@ class FIRFilter : public PipelinedModCodec, public RemoteControllable  {  public:      FIRFilter(const std::string& taps_file); -    virtual ~FIRFilter() = default; +    FIRFilter(const FIRFilter& other) = delete; +    FIRFilter& operator=(const FIRFilter& other) = delete; +    virtual ~FIRFilter(); -    const char* name() { return "FIRFilter"; } +    const char* name() override { return "FIRFilter"; }      /******* REMOTE CONTROL ********/      virtual void set_parameter(const std::string& parameter, -            const std::string& value); +            const std::string& value) override;      virtual const std::string get_parameter( -            const std::string& parameter) const; - +            const std::string& parameter) const override;  protected: -    virtual int internal_process(Buffer* const dataIn, Buffer* dataOut); +    virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) override;      void load_filter_taps(const std::string &tapsFile);      std::string m_taps_file; diff --git a/src/FicSource.cpp b/src/FicSource.cpp index 04197db..2b95085 100644 --- a/src/FicSource.cpp +++ b/src/FicSource.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,6 +26,8 @@  #include "FicSource.h"  #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h"  #include <stdexcept>  #include <string> @@ -92,3 +94,23 @@ int FicSource::process(Buffer* outputData)      return outputData->getLength();  } +void FicSource::loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts) +{ +    d_ts = ts; +} + + +meta_vec_t FicSource::process_metadata(const meta_vec_t& metadataIn) +{ +    if (not d_ts) { +        return {}; +    } + +    using namespace std; +    meta_vec_t md_vec; +    flowgraph_metadata meta; +    meta.ts = d_ts; +    md_vec.push_back(meta); +    return md_vec; +} + diff --git a/src/FicSource.h b/src/FicSource.h index 77ac741..93c1a7f 100644 --- a/src/FicSource.h +++ b/src/FicSource.h @@ -36,7 +36,7 @@  #include <vector>  #include <sys/types.h> -class FicSource : public ModInput +class FicSource : public ModInput, public ModMetadata  {  public:      FicSource(unsigned ficf, unsigned mid); @@ -45,12 +45,17 @@ public:      const std::vector<PuncturingRule>& get_rules();      void loadFicData(const Buffer& fic); -    int process(Buffer* outputData); -    const char* name() { return "FicSource"; } +    int process(Buffer* outputData) override; +    const char* name() override { return "FicSource"; } + +    void loadTimestamp(const std::shared_ptr<struct frame_timestamp>& ts); +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn) override;  private:      size_t d_framesize;      Buffer d_buffer; +    std::shared_ptr<struct frame_timestamp> d_ts;      std::vector<PuncturingRule> d_puncturing_rules;  }; diff --git a/src/Flowgraph.cpp b/src/Flowgraph.cpp index 465ef41..506832c 100644 --- a/src/Flowgraph.cpp +++ b/src/Flowgraph.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,6 +26,7 @@  #include "Flowgraph.h"  #include "PcDebug.h" +#include "Log.h"  #include <string>  #include <memory>  #include <algorithm> @@ -57,9 +58,10 @@ Node::~Node()      assert(myOutputBuffers.size() == 0);  } -void Node::addOutputBuffer(Buffer::sptr& buffer) +void Node::addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      myOutputBuffers.push_back(buffer); +    myOutputMetadata.push_back(md);  #if DEBUG      std::string fname = string(myPlugin->name()) +          "-" + to_string(myDebugFiles.size()) + @@ -71,7 +73,7 @@ void Node::addOutputBuffer(Buffer::sptr& buffer)  #endif  } -void Node::removeOutputBuffer(Buffer::sptr& buffer) +void Node::removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      auto it = std::find(              myOutputBuffers.begin(), @@ -89,14 +91,23 @@ void Node::removeOutputBuffer(Buffer::sptr& buffer)  #endif          myOutputBuffers.erase(it);      } + +    auto mdit = std::find( +            myOutputMetadata.begin(), +            myOutputMetadata.end(), +            md); +    if (mdit != myOutputMetadata.end()) { +        myOutputMetadata.erase(mdit); +    }  } -void Node::addInputBuffer(Buffer::sptr& buffer) +void Node::addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      myInputBuffers.push_back(buffer); +    myInputMetadata.push_back(md);  } -void Node::removeInputBuffer(Buffer::sptr& buffer) +void Node::removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md)  {      auto it = std::find(              myInputBuffers.begin(), @@ -105,6 +116,14 @@ void Node::removeInputBuffer(Buffer::sptr& buffer)      if (it != myInputBuffers.end()) {          myInputBuffers.erase(it);      } + +    auto mdit = std::find( +            myInputMetadata.begin(), +            myInputMetadata.end(), +            md); +    if (mdit != myInputMetadata.end()) { +        myInputMetadata.erase(mdit); +    }  }  int Node::process() @@ -127,6 +146,37 @@ int Node::process()      }      int ret = myPlugin->process(inBuffers, outBuffers); + +    // Collect all incoming metadata into a single vector +    meta_vec_t all_input_mds; +    for (auto& md_vec_sp : myInputMetadata) { +        if (md_vec_sp) { +            move(md_vec_sp->begin(), md_vec_sp->end(), +                    back_inserter(all_input_mds)); +            md_vec_sp->clear(); +        } +    } + +    auto mod_meta = dynamic_pointer_cast<ModMetadata>(myPlugin); +    if (mod_meta) { +        auto outputMetadata = mod_meta->process_metadata(all_input_mds); +        // Distribute the result metadata to all outputs +        for (auto& out_md : myOutputMetadata) { +            out_md->clear(); +            std::move(outputMetadata.begin(), outputMetadata.end(), +                    std::back_inserter(*out_md)); +        } +    } +    else { +        // Propagate the unmodified input metadata to all outputs +        for (auto& out_md : myOutputMetadata) { +            out_md->clear(); +            std::move(all_input_mds.begin(), all_input_mds.end(), +                    std::back_inserter(*out_md)); +        } +    } + +  #if DEBUG      assert(myDebugFiles.size() == myOutputBuffers.size()); @@ -158,8 +208,10 @@ Edge::Edge(shared_ptr<Node>& srcNode, shared_ptr<Node>& dstNode) :              this);      myBuffer = make_shared<Buffer>(); -    srcNode->addOutputBuffer(myBuffer); -    dstNode->addInputBuffer(myBuffer); +    myMetadata = make_shared<vector<flowgraph_metadata> >(); + +    srcNode->addOutputBuffer(myBuffer, myMetadata); +    dstNode->addInputBuffer(myBuffer, myMetadata);  } @@ -168,8 +220,8 @@ Edge::~Edge()      PDEBUG("Edge::~Edge() @ %p\n", this);      if (myBuffer) { -        mySrcNode->removeOutputBuffer(myBuffer); -        myDstNode->removeInputBuffer(myBuffer); +        mySrcNode->removeOutputBuffer(myBuffer, myMetadata); +        myDstNode->removeInputBuffer(myBuffer, myMetadata);      }  } @@ -186,9 +238,8 @@ Flowgraph::~Flowgraph()  {      PDEBUG("Flowgraph::~Flowgraph() @ %p\n", this); -    stringstream ss; -      if (myProcessTime) { +        stringstream ss;          ss << "Process time:\n";          char node_time_sz[1024] = {}; diff --git a/src/Flowgraph.h b/src/Flowgraph.h index ebb7314..b074ee6 100644 --- a/src/Flowgraph.h +++ b/src/Flowgraph.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -39,6 +39,8 @@  #include <list>  #include <cstdio> +using Metadata_vec_sptr = std::shared_ptr<std::vector<flowgraph_metadata> >; +  class Node  {  public: @@ -55,15 +57,18 @@ public:          myProcessTime += processTime;      } -    void addOutputBuffer(Buffer::sptr& buffer); -    void removeOutputBuffer(Buffer::sptr& buffer); +    void addOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); +    void removeOutputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); -    void addInputBuffer(Buffer::sptr& buffer); -    void removeInputBuffer(Buffer::sptr& buffer); +    void addInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md); +    void removeInputBuffer(Buffer::sptr& buffer, Metadata_vec_sptr& md);  protected:      std::list<Buffer::sptr> myInputBuffers;      std::list<Buffer::sptr> myOutputBuffers; +    std::list<Metadata_vec_sptr> myInputMetadata; +    std::list<Metadata_vec_sptr> myOutputMetadata; +  #if DEBUG      std::list<FILE*> myDebugFiles;  #endif @@ -85,6 +90,7 @@ protected:      std::shared_ptr<Node> mySrcNode;      std::shared_ptr<Node> myDstNode;      std::shared_ptr<Buffer> myBuffer; +    std::shared_ptr<std::vector<flowgraph_metadata> > myMetadata;  }; diff --git a/src/GainControl.cpp b/src/GainControl.cpp index 0411482..5657fc2 100644 --- a/src/GainControl.cpp +++ b/src/GainControl.cpp @@ -75,7 +75,7 @@ GainControl::GainControl(size_t framesize,  GainControl::~GainControl()  { -    PDEBUG("GainControl::~GainControl() @ %p\n", this); +    stop_pipeline_thread();  }  int GainControl::internal_process(Buffer* const dataIn, Buffer* dataOut) diff --git a/src/GainControl.h b/src/GainControl.h index e9eaa8c..b4579cd 100644 --- a/src/GainControl.h +++ b/src/GainControl.h @@ -57,8 +57,8 @@ class GainControl : public PipelinedModCodec, public RemoteControllable                      float varVariance);          virtual ~GainControl(); -        GainControl(const GainControl&); -        GainControl& operator=(const GainControl&); +        GainControl(const GainControl&) = delete; +        GainControl& operator=(const GainControl&) = delete;          const char* name() override { return "GainControl"; } diff --git a/src/GuardIntervalInserter.cpp b/src/GuardIntervalInserter.cpp index afb9213..79692f5 100644 --- a/src/GuardIntervalInserter.cpp +++ b/src/GuardIntervalInserter.cpp @@ -97,8 +97,6 @@ void GuardIntervalInserter::update_window(size_t new_window_overlap)      }  } -#pragma GCC optimize ("O0") -  int GuardIntervalInserter::process(Buffer* const dataIn, Buffer* dataOut)  {      PDEBUG("GuardIntervalInserter::process(dataIn: %p, dataOut: %p)\n", diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 84f0be4..5e93477 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyrigth (C) 2013 +   Copyrigth (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li @@ -47,28 +47,29 @@ int InputFileReader::Open(std::string filename, bool loop)  {      filename_ = filename;      loop_ = loop; -    inputfile_ = fopen(filename_.c_str(), "r"); -    if (inputfile_ == NULL) { +    FILE* fd = fopen(filename_.c_str(), "r"); +    if (fd == nullptr) {          etiLog.level(error) << "Unable to open input file!";          perror(filename_.c_str());          return -1;      } +    inputfile_.reset(fd);      return IdentifyType();  }  int InputFileReader::Rewind()  { -    rewind(inputfile_); // Also clears the EOF flag +    rewind(inputfile_.get()); // Also clears the EOF flag      return IdentifyType();  }  int InputFileReader::IdentifyType()  { -    EtiStreamType streamType = ETI_STREAM_TYPE_NONE; +    EtiStreamType streamType = EtiStreamType::None;      struct stat inputFileStat; -    fstat(fileno(inputfile_), &inputFileStat); +    fstat(fileno(inputfile_.get()), &inputFileStat);      inputfilelength_ = inputFileStat.st_size;      uint32_t sync; @@ -77,22 +78,22 @@ int InputFileReader::IdentifyType()      char discard_buffer[6144]; -    if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) { +    if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) {          etiLog.level(error) << "Unable to read sync in input file!";          perror(filename_.c_str());          return -1;      }      if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { -        streamType = ETI_STREAM_TYPE_RAW; +        streamType = EtiStreamType::Raw;          if (inputfilelength_ > 0) {              nbframes_ = inputfilelength_ / 6144;          }          else {              nbframes_ = ~0;          } -        if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) { +        if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) {              // if the seek fails, consume the rest of the frame -            if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_) +            if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get())                      != 1) {                  etiLog.level(error) << "Unable to read from input file!";                  perror(filename_.c_str()); @@ -104,7 +105,7 @@ int InputFileReader::IdentifyType()      }      nbFrames = sync; -    if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { +    if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {          etiLog.level(error) << "Unable to read frame size in input file!";          perror(filename_.c_str());          return -1; @@ -114,7 +115,7 @@ int InputFileReader::IdentifyType()      sync |= ((uint32_t)frameSize) << 16;      if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { -        streamType = ETI_STREAM_TYPE_STREAMED; +        streamType = EtiStreamType::Streamed;          frameSize = nbFrames & 0xffff;          if (inputfilelength_ > 0) {              nbframes_ = inputfilelength_ / (frameSize + 2); @@ -122,9 +123,9 @@ int InputFileReader::IdentifyType()          else {              nbframes_ = ~0;          } -        if (fseek(inputfile_, -6, SEEK_CUR) != 0) { +        if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) {              // if the seek fails, consume the rest of the frame -            if (fread(discard_buffer, frameSize - 4, 1, inputfile_) +            if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get())                      != 1) {                  etiLog.level(error) << "Unable to read from input file!";                  perror(filename_.c_str()); @@ -135,16 +136,16 @@ int InputFileReader::IdentifyType()          return 0;      } -    if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) { +    if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) {          etiLog.level(error) << "Unable to read nb frame in input file!";          perror(filename_.c_str());          return -1;      }      if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { -        streamType = ETI_STREAM_TYPE_FRAMED; -        if (fseek(inputfile_, -6, SEEK_CUR) != 0) { +        streamType = EtiStreamType::Framed; +        if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) {              // if the seek fails, consume the rest of the frame -            if (fread(discard_buffer, frameSize - 4, 1, inputfile_) +            if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get())                      != 1) {                  etiLog.level(error) << "Unable to read from input file!";                  perror(filename_.c_str()); @@ -160,21 +161,21 @@ int InputFileReader::IdentifyType()      for (size_t i = 10; i < 6144 + 10; ++i) {          sync >>= 8;          sync &= 0xffffff; -        if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) { +        if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_.get()) != 1) {              etiLog.level(error) << "Unable to read from input file!";              perror(filename_.c_str());              return -1;          }          if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { -            streamType = ETI_STREAM_TYPE_RAW; +            streamType = EtiStreamType::Raw;              if (inputfilelength_ > 0) {                  nbframes_ = (inputfilelength_ - i) / 6144;              }              else {                  nbframes_ = ~0;              } -            if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) { -                if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_) +            if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) { +                if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get())                          != 1) {                      etiLog.level(error) << "Unable to read from input file!";                      perror(filename_.c_str()); @@ -190,17 +191,17 @@ int InputFileReader::IdentifyType()      return -1;  } -void InputFileReader::PrintInfo() +void InputFileReader::PrintInfo() const  {      fprintf(stderr, "Input file format: ");      switch (streamtype_) { -        case ETI_STREAM_TYPE_RAW: +        case EtiStreamType::Raw:              fprintf(stderr, "raw");              break; -        case ETI_STREAM_TYPE_STREAMED: +        case EtiStreamType::Streamed:              fprintf(stderr, "streamed");              break; -        case ETI_STREAM_TYPE_FRAMED: +        case EtiStreamType::Framed:              fprintf(stderr, "framed");              break;          default: @@ -221,15 +222,15 @@ int InputFileReader::GetNextFrame(void* buffer)  {      uint16_t frameSize; -    if (streamtype_ == ETI_STREAM_TYPE_RAW) { +    if (streamtype_ == EtiStreamType::Raw) {          frameSize = 6144;      }      else { -        if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { +        if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {              etiLog.level(error) << "Reached end of file.";              if (loop_) {                  if (Rewind() == 0) { -                    if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { +                    if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {                          PDEBUG("Error after rewinding file!\n");                          etiLog.level(error) << "Error after rewinding file!";                          return -1; @@ -252,15 +253,15 @@ int InputFileReader::GetNextFrame(void* buffer)      }      PDEBUG("Frame size: %u\n", frameSize); -    size_t read_bytes = fread(buffer, 1, frameSize, inputfile_); +    size_t read_bytes = fread(buffer, 1, frameSize, inputfile_.get());      if (    loop_ && -            streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144 -            read_bytes == 0 && feof(inputfile_)) { +            streamtype_ == EtiStreamType::Raw && //implies frameSize == 6144 +            read_bytes == 0 && feof(inputfile_.get())) {          // in case of an EOF from a RAW that we loop, rewind          // otherwise, we won't tolerate it          if (Rewind() == 0) { -            read_bytes = fread(buffer, 1, frameSize, inputfile_); +            read_bytes = fread(buffer, 1, frameSize, inputfile_.get());          }          else {              PDEBUG("Impossible to rewind file!\n"); diff --git a/src/InputReader.h b/src/InputReader.h index 7d6b373..07326cf 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://www.opendigitalradio.org @@ -33,6 +33,7 @@  #include <cstdio>  #include <vector> +#include <atomic>  #include <memory>  #if defined(HAVE_ZEROMQ)  #  include "zmq.hpp" @@ -40,47 +41,8 @@  #endif  #include "porting.h"  #include "Log.h" -#include "lib/UdpSocket.h" -#include <sys/socket.h> -#include <netinet/in.h>  #include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#define SOCKET           int  #define INVALID_SOCKET   -1 -#define SOCKET_ERROR     -1 - -/* Known types of input streams. Description taken from the CRC mmbTools forum. - -    All numbers are little-endian. - -    Framed format is used for file recording. It is the default format. The -    padding can be removed from data. Format: -        uint32_t nbFrames -        for each frame -          uint16_t frameSize -          uint8_t data[frameSize] - -    Streamed format is used for streamed applications. As the total number of -    frames is unknown before end of transmission, the corresponding field is -    removed. The padding can be removed from data. Format: -        for each frame -          uint16_t frameSize -          uint8_t data[frameSize] - -    Raw format is a bit-by-bit (but byte aligned on sync) recording of a G.703 -    data stream. The padding is always present. Format: -        for each frame -          uint8_t data[6144] - -    Please note that our raw format can also be referred to as ETI(NI, G.703) or ETI(NI). -*/ -enum EtiStreamType { -    ETI_STREAM_TYPE_NONE = 0, -    ETI_STREAM_TYPE_RAW, -    ETI_STREAM_TYPE_STREAMED, -    ETI_STREAM_TYPE_FRAMED, -};  class InputReader  { @@ -91,43 +53,25 @@ class InputReader          virtual int GetNextFrame(void* buffer) = 0;          // Print some information -        virtual void PrintInfo() = 0; +        virtual void PrintInfo() const = 0;  };  class InputFileReader : public InputReader  {      public: -        InputFileReader() : -            streamtype_(ETI_STREAM_TYPE_NONE), -            inputfile_(NULL) { } - -        ~InputFileReader() -        { -            if (inputfile_ != NULL) { -                fprintf(stderr, "\nClosing input file...\n"); - -                fclose(inputfile_); -            } -        } +        InputFileReader() = default; +        InputFileReader(const InputFileReader& other) = delete; +        InputFileReader& operator=(const InputFileReader& other) = delete;          // open file and determine stream type          // When loop=1, GetNextFrame will never return 0          int Open(std::string filename, bool loop);          // Print information about the file opened -        void PrintInfo(); - +        void PrintInfo() const;          int GetNextFrame(void* buffer); -        EtiStreamType GetStreamType() -        { -            return streamtype_; -        } -      private: -        InputFileReader(const InputFileReader& other) = delete; -        InputFileReader& operator=(const InputFileReader& other) = delete; -          int IdentifyType();          // Rewind the file, and replay anew @@ -136,19 +80,60 @@ class InputFileReader : public InputReader          bool loop_; // if shall we loop the file over and over          std::string filename_; -        EtiStreamType streamtype_; -        FILE* inputfile_; -        size_t inputfilelength_; -        uint64_t nbframes_; // 64-bit because 32-bit overflow is -                            // after 2**32 * 24ms ~= 3.3 years +        /* Known types of input streams. Description taken from the CRC +         * mmbTools forum. All values are are little-endian.  */ +        enum class EtiStreamType { +            /* Not yet identified */ +            None, + +            /* Raw format is a bit-by-bit (but byte aligned on sync) recording +             * of a G.703 data stream. The padding is always present. +             * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI). +             * Format: +                 for each frame: +                   uint8_t data[6144] +             */ +            Raw, + +            /* Streamed format is used for streamed applications. As the total +             * number of frames is unknown before end of transmission, the +             * corresponding field is removed. The padding can be removed from +             * data. +             * Format: +                 for each frame: +                   uint16_t frameSize +                   uint8_t data[frameSize] +             */ +            Streamed, + +            /* Framed format is used for file recording. It is the default format. +             * The padding can be removed from data. +             * Format: +                 uint32_t nbFrames +                 for each frame: +                   uint16_t frameSize +                   uint8_t data[frameSize] +             */ +            Framed, +        }; + +        EtiStreamType streamtype_ = EtiStreamType::None; +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> inputfile_; + +        size_t inputfilelength_ = 0; +        uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is +        // after 2**32 * 24ms ~= 3.3 years  };  class InputTcpReader : public InputReader  {      public:          InputTcpReader(); -        ~InputTcpReader(); +        InputTcpReader(const InputTcpReader& other) = delete; +        InputTcpReader& operator=(const InputTcpReader& other) = delete; +        virtual ~InputTcpReader();          // Endpoint is either host:port or tcp://host:port          void Open(const std::string& endpoint); @@ -159,89 +144,54 @@ class InputTcpReader : public InputReader          virtual int GetNextFrame(void* buffer);          // Print some information -        virtual void PrintInfo(); +        virtual void PrintInfo() const;      private: -        InputTcpReader(const InputTcpReader& other) = delete; -        InputTcpReader& operator=(const InputTcpReader& other) = delete; -        SOCKET m_sock; +        int m_sock = INVALID_SOCKET;          std::string m_uri;  };  struct zmq_input_overflow : public std::exception  { -  const char* what () const throw () -  { -    return "InputZMQ buffer overflow"; -  } +    const char* what () const throw () +    { +        return "InputZMQ buffer overflow"; +    }  };  #if defined(HAVE_ZEROMQ)  /* A ZeroMQ input. See www.zeromq.org for more info */ -struct InputZeroMQThreadData -{ -    ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages; -    std::string uri; -    size_t max_queued_frames; -}; - -class InputZeroMQWorker +class InputZeroMQReader : public InputReader  {      public: -        InputZeroMQWorker() : -            running(false), -            zmqcontext(1), -            m_to_drop(0) { } +        InputZeroMQReader() = default; +        InputZeroMQReader(const InputZeroMQReader& other) = delete; +        InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; +        ~InputZeroMQReader(); -        void Start(struct InputZeroMQThreadData* workerdata); -        void Stop(); +        int Open(const std::string& uri, size_t max_queued_frames); +        int GetNextFrame(void* buffer); +        void PrintInfo() const; -        bool is_running(void) { return running; }      private: -        bool running; +        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); +        std::string m_uri; +        size_t m_max_queued_frames = 0; +        ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages; -        void RecvProcess(struct InputZeroMQThreadData* workerdata); +        void RecvProcess(void); -        zmq::context_t zmqcontext; // is thread-safe -        boost::thread recv_thread; +        zmq::context_t m_zmqcontext; // is thread-safe +        boost::thread m_recv_thread;          /* We must be careful to keep frame phase consistent. If we           * drop a single ETI frame, we will break the transmission           * frame vs. ETI frame phase.           * -         * Here we keep track of how many ETI frames we must drop +         * Here we keep track of how many ETI frames we must drop.           */ -        int m_to_drop; -}; - -class InputZeroMQReader : public InputReader -{ -    public: -        InputZeroMQReader() -        { -            workerdata_.in_messages = &in_messages_; -        } - -        ~InputZeroMQReader() -        { -            worker_.Stop(); -        } - -        int Open(const std::string& uri, size_t max_queued_frames); - -        int GetNextFrame(void* buffer); - -        void PrintInfo(); - -    private: -        InputZeroMQReader(const InputZeroMQReader& other) = delete; -        InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; -        std::string uri_; - -        InputZeroMQWorker worker_; -        ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > in_messages_; -        struct InputZeroMQThreadData workerdata_; +        int m_to_drop = 0;  };  #endif diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp index 94ec0ad..9a93ad1 100644 --- a/src/InputTcpReader.cpp +++ b/src/InputTcpReader.cpp @@ -32,9 +32,12 @@  #include "InputReader.h"  #include "PcDebug.h"  #include "Utils.h" -#include <sys/socket.h>  #include <unistd.h>  #include <errno.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h>  InputTcpReader::InputTcpReader()  { @@ -126,7 +129,7 @@ int InputTcpReader::GetNextFrame(void* buffer)      return r;  } -void InputTcpReader::PrintInfo() +void InputTcpReader::PrintInfo() const  {      fprintf(stderr, "Input TCP:\n");      fprintf(stderr, "  Receiving from %s\n\n", m_uri.c_str()); diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 783f0f5..f6a816a 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -68,29 +68,35 @@ struct zmq_dab_message_t  #define ZMQ_DAB_MESSAGE_T_HEADERSIZE \      (sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE*sizeof(uint16_t)) +InputZeroMQReader::~InputZeroMQReader() +{ +    m_running = false; +    m_zmqcontext.close(); +    if (m_recv_thread.joinable()) { +            m_recv_thread.join(); +    } +} +  int InputZeroMQReader::Open(const string& uri, size_t max_queued_frames)  {      // The URL might start with zmq+tcp://      if (uri.substr(0, 4) == "zmq+") { -        uri_ = uri.substr(4); +        m_uri = uri.substr(4);      }      else { -        uri_ = uri; +        m_uri = uri;      } -    workerdata_.uri = uri_; -    workerdata_.max_queued_frames = max_queued_frames; -    // launch receiver thread -    worker_.Start(&workerdata_); +    m_max_queued_frames = max_queued_frames; + +    m_recv_thread = boost::thread(&InputZeroMQReader::RecvProcess, this);      return 0;  }  int InputZeroMQReader::GetNextFrame(void* buffer)  { -    const size_t framesize = 6144; - -    if (not worker_.is_running()) { +    if (not m_running) {          return 0;      } @@ -100,77 +106,100 @@ int InputZeroMQReader::GetNextFrame(void* buffer)       * (4 ETI frames in TM1) and we should make sure that       * we can serve the data required for a full transmission frame.       */ -    if (in_messages_.size() < 4) { +    if (m_in_messages.size() < 4) {          const size_t prebuffering = 10;          etiLog.log(trace, "ZMQ,wait1"); -        in_messages_.wait_and_pop(incoming, prebuffering); +        m_in_messages.wait_and_pop(incoming, prebuffering);      }      else {          etiLog.log(trace, "ZMQ,wait2"); -        in_messages_.wait_and_pop(incoming); +        m_in_messages.wait_and_pop(incoming);      }      etiLog.log(trace, "ZMQ,pop"); -    if (not worker_.is_running()) { +    if (not m_running) {          throw zmq_input_overflow();      } -    memcpy(buffer, &incoming->front(), framesize); + +    const size_t framesize = 6144; +    if (incoming->empty()) { +        return 0; +    } +    else if (incoming->size() == framesize) { +        memcpy(buffer, &incoming->front(), framesize); +    } +    else { +        throw logic_error("ZMQ ETI not 6144"); +    }      return framesize;  } -void InputZeroMQReader::PrintInfo() +void InputZeroMQReader::PrintInfo() const  {      fprintf(stderr, "Input ZeroMQ:\n"); -    fprintf(stderr, "  Receiving from %s\n\n", uri_.c_str()); +    fprintf(stderr, "  Receiving from %s\n\n", m_uri.c_str());  } -// ------------- Worker functions - -void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) +void InputZeroMQReader::RecvProcess()  {      set_thread_name("zmqinput"); -    size_t queue_size = 0; +    m_running = true; +    size_t queue_size = 0;      bool buffer_full = false; -    zmq::socket_t subscriber(zmqcontext, ZMQ_SUB); +    zmq::socket_t subscriber(m_zmqcontext, ZMQ_SUB);      // zmq sockets are not thread safe. That's why      // we create it here, and not at object creation.      bool success = true;      try { -        subscriber.connect(workerdata->uri.c_str()); +        subscriber.connect(m_uri.c_str());      }      catch (zmq::error_t& err) { -        etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << workerdata->uri << "': '" << err.what() << "'"; +        etiLog.level(error) << "Failed to connect ZeroMQ socket to '" << +            m_uri << "': '" << err.what() << "'";          success = false;      }      if (success) try { -        subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages +        // subscribe to all messages +        subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);      }      catch (zmq::error_t& err) { -        etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << err.what() << "'"; +        etiLog.level(error) << "Failed to subscribe ZeroMQ socket to messages: '" << +            err.what() << "'";          success = false;      }      if (success) try { -        while (running) -        { +        while (m_running) {              zmq::message_t incoming; +            zmq::pollitem_t items[1]; +            items[0].socket = subscriber; +            items[0].events = ZMQ_POLLIN; +            const int zmq_timeout_ms = 100; +            const int num_events = zmq::poll(items, 1, zmq_timeout_ms); +            if (num_events == 0) { +                // timeout is signalled by an empty buffer +                auto buf = make_shared<vector<uint8_t> >(); +                m_in_messages.push(buf); +                continue; +            } +              subscriber.recv(&incoming);              if (m_to_drop) { -                queue_size = workerdata->in_messages->size(); +                queue_size = m_in_messages.size();                  if (queue_size > 4) { -                    workerdata->in_messages->notify(); +                    m_in_messages.notify();                  }                  m_to_drop--;              } -            else if (queue_size < workerdata->max_queued_frames) { +            else if (queue_size < m_max_queued_frames) {                  if (buffer_full) {                      etiLog.level(info) << "ZeroMQ buffer recovered: " <<                          queue_size << " elements"; @@ -214,14 +243,14 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                              offset += framesize; -                            queue_size = workerdata->in_messages->push(buf); +                            queue_size = m_in_messages.push(buf);                              etiLog.log(trace, "ZMQ,push %zu", queue_size);                          }                      }                  }              }              else { -                workerdata->in_messages->notify(); +                m_in_messages.notify();                  if (!buffer_full) {                      etiLog.level(warn) << "ZeroMQ buffer overfull !"; @@ -230,7 +259,7 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)                      throw runtime_error("ZMQ input full");                  } -                queue_size = workerdata->in_messages->size(); +                queue_size = m_in_messages.size();                  /* Drop three more incoming ETI frames before                   * we start accepting them again, to guarantee @@ -256,21 +285,8 @@ void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)      subscriber.close(); -    running = false; -    workerdata->in_messages->notify(); -} - -void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) -{ -    running = true; -    recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); -} - -void InputZeroMQWorker::Stop() -{ -    running = false; -    zmqcontext.close(); -    recv_thread.join(); +    m_running = false; +    m_in_messages.notify();  }  #endif diff --git a/src/Log.cpp b/src/Log.cpp index 0792fcf..f2219eb 100644 --- a/src/Log.cpp +++ b/src/Log.cpp @@ -109,6 +109,17 @@ LogLine Logger::level(log_level_t level)      return LogLine(this, level);  } +LogToFile::LogToFile(const std::string& filename) : name("FILE") +{ +    FILE* fd = fopen(filename.c_str(), "a"); +    if (fd == nullptr) { +        fprintf(stderr, "Cannot open log file !"); +        throw std::runtime_error("Cannot open log file !"); +    } + +    log_file.reset(fd); +} +  void LogToFile::log(log_level_t level, const std::string& message)  {      if (level != log_level_t::trace) { @@ -116,9 +127,9 @@ void LogToFile::log(log_level_t level, const std::string& message)              "DEBUG", "INFO", "WARN", "ERROR", "ALERT", "EMERG"};          // fprintf is thread-safe -        fprintf(log_file, SYSLOG_IDENT ": %s: %s\n", +        fprintf(log_file.get(), SYSLOG_IDENT ": %s: %s\n",                  log_level_text[(size_t)level], message.c_str()); -        fflush(log_file); +        fflush(log_file.get());      }  } @@ -142,31 +153,33 @@ void LogToSyslog::log(log_level_t level, const std::string& message)      }  } -LogTracer::LogTracer(const string& trace_filename) +LogTracer::LogTracer(const string& trace_filename) : name("TRACE")  { -    name = "TRACE";      etiLog.level(info) << "Setting up TRACE to " << trace_filename; -    m_trace_file = fopen(trace_filename.c_str(), "a"); -    if (m_trace_file == NULL) { +    FILE* fd = fopen(trace_filename.c_str(), "a"); +    if (fd == nullptr) {          fprintf(stderr, "Cannot open trace file !");          throw std::runtime_error("Cannot open trace file !");      } +    m_trace_file.reset(fd); -    auto now = chrono::steady_clock::now().time_since_epoch(); -    m_trace_micros_startup = -        chrono::duration_cast<chrono::microseconds>(now).count(); +    using namespace std::chrono; +    auto now = steady_clock::now().time_since_epoch(); +    m_trace_micros_startup = duration_cast<microseconds>(now).count(); -    fprintf(m_trace_file, "0,TRACER,startup at %ld\n", m_trace_micros_startup); +    fprintf(m_trace_file.get(), +            "0,TRACER,startup at %ld\n", m_trace_micros_startup);  }  void LogTracer::log(log_level_t level, const std::string& message)  {      if (level == log_level_t::trace) { -        const auto now = chrono::steady_clock::now().time_since_epoch(); -        const auto micros = chrono::duration_cast<chrono::microseconds>(now).count(); +        using namespace std::chrono; +        const auto now = steady_clock::now().time_since_epoch(); +        const auto micros = duration_cast<microseconds>(now).count(); -        fprintf(m_trace_file, "%ld,%s\n", +        fprintf(m_trace_file.get(), "%ld,%s\n",                  micros - m_trace_micros_startup,                  message.c_str());      } @@ -57,7 +57,7 @@ static const std::string levels_as_str[] =  class LogBackend {      public:          virtual void log(log_level_t level, const std::string& message) = 0; -        virtual std::string get_name() = 0; +        virtual std::string get_name() const = 0;  };  /** A Logging backend for Syslog */ @@ -73,7 +73,7 @@ class LogToSyslog : public LogBackend {          void log(log_level_t level, const std::string& message); -        std::string get_name() { return name; } +        std::string get_name() const { return name; }      private:          const std::string name; @@ -84,27 +84,15 @@ class LogToSyslog : public LogBackend {  class LogToFile : public LogBackend {      public: -        LogToFile(const std::string& filename) : name("FILE") { -            log_file = fopen(filename.c_str(), "a"); -            if (log_file == NULL) { -                fprintf(stderr, "Cannot open log file !"); -                throw std::runtime_error("Cannot open log file !"); -            } -        } - -        ~LogToFile() { -            if (log_file != NULL) { -                fclose(log_file); -            } -        } - +        LogToFile(const std::string& filename);          void log(log_level_t level, const std::string& message); - -        std::string get_name() { return name; } +        std::string get_name() const { return name; }      private:          const std::string name; -        FILE* log_file; + +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> log_file;          LogToFile(const LogToFile& other) = delete;          const LogToFile& operator=(const LogToFile& other) = delete; @@ -113,19 +101,14 @@ class LogToFile : public LogBackend {  class LogTracer : public LogBackend {      public:          LogTracer(const std::string& filename); - -        ~LogTracer() { -            if (m_trace_file != NULL) { -                fclose(m_trace_file); -            } -        } -          void log(log_level_t level, const std::string& message); -        std::string get_name() { return name; } +        std::string get_name() const { return name; }      private:          std::string name; -        uint64_t m_trace_micros_startup; -        FILE* m_trace_file; +        uint64_t m_trace_micros_startup = 0; + +        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; +        std::unique_ptr<FILE, FILEDeleter> m_trace_file;          LogTracer(const LogTracer& other) = delete;          const LogTracer& operator=(const LogTracer& other) = delete; diff --git a/src/MemlessPoly.cpp b/src/MemlessPoly.cpp index ae097c9..5d1c02d 100644 --- a/src/MemlessPoly.cpp +++ b/src/MemlessPoly.cpp @@ -99,6 +99,11 @@ MemlessPoly::MemlessPoly(const std::string& coefs_file, unsigned int num_threads      start_pipeline_thread();  } +MemlessPoly::~MemlessPoly() +{ +    stop_pipeline_thread(); +} +  void MemlessPoly::load_coefficients(const std::string &coefFile)  {      std::ifstream coef_fstream(coefFile.c_str()); diff --git a/src/MemlessPoly.h b/src/MemlessPoly.h index 612934f..4c67d46 100644 --- a/src/MemlessPoly.h +++ b/src/MemlessPoly.h @@ -59,6 +59,9 @@ class MemlessPoly : public PipelinedModCodec, public RemoteControllable  {  public:      MemlessPoly(const std::string& coefs_file, unsigned int num_threads); +    MemlessPoly(const MemlessPoly& other) = delete; +    MemlessPoly& operator=(const MemlessPoly& other) = delete; +    virtual ~MemlessPoly();      virtual const char* name() { return "MemlessPoly"; } diff --git a/src/ModPlugin.cpp b/src/ModPlugin.cpp index c39d883..d567a90 100644 --- a/src/ModPlugin.cpp +++ b/src/ModPlugin.cpp @@ -72,17 +72,7 @@ int ModOutput::process(      return process(dataIn[0]);  } -PipelinedModCodec::PipelinedModCodec() : -    ModCodec(), -    m_number_of_runs(0), -    m_input_queue(), -    m_output_queue(), -    m_running(false), -    m_thread() -{ -} - -PipelinedModCodec::~PipelinedModCodec() +void PipelinedModCodec::stop_pipeline_thread()  {      m_input_queue.push({});      if (m_thread.joinable()) { @@ -107,7 +97,7 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)      m_input_queue.push(inbuffer); -    if (m_number_of_runs > 0) { +    if (m_ready_to_output_data) {          std::shared_ptr<Buffer> outbuffer;          m_output_queue.wait_and_pop(outbuffer); @@ -116,13 +106,26 @@ int PipelinedModCodec::process(Buffer* dataIn, Buffer* dataOut)      else {          dataOut->setLength(dataIn->getLength());          memset(dataOut->getData(), 0, dataOut->getLength()); -        m_number_of_runs++; +        m_ready_to_output_data = true;      }      return dataOut->getLength();  } +meta_vec_t PipelinedModCodec::process_metadata(const meta_vec_t& metadataIn) +{ +    m_metadata_fifo.push_back(metadataIn); +    if (m_metadata_fifo.size() == 2) { +        auto r = std::move(m_metadata_fifo.front()); +        m_metadata_fifo.pop_front(); +        return r; +    } +    else { +        return {}; +    } +} +  void PipelinedModCodec::process_thread()  {      set_thread_name(name()); diff --git a/src/ModPlugin.h b/src/ModPlugin.h index d3aa780..e9cfa21 100644 --- a/src/ModPlugin.h +++ b/src/ModPlugin.h @@ -30,16 +30,34 @@  #   include <config.h>  #endif -  #include "Buffer.h"  #include "ThreadsafeQueue.h" - -#include <sys/types.h> +#include <cstddef>  #include <vector>  #include <memory>  #include <thread>  #include <atomic> +// All flowgraph elements derive from ModPlugin, or a variant of it. +// Some ModPlugins also support handling metadata. + +struct frame_timestamp; +struct flowgraph_metadata { +    std::shared_ptr<struct frame_timestamp> ts; +}; + +using meta_vec_t = std::vector<flowgraph_metadata>; + +/* ModPlugins that support metadata derive from ModMetadata */ +class ModMetadata { +    public: +        // Receives metadata from all inputs, and process them, and output +        // a sequence of metadata. +        virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) = 0; +}; + + +/* Abstract base class for all flowgraph elements */  class ModPlugin  {  public: @@ -47,6 +65,7 @@ public:              std::vector<Buffer*> dataIn,              std::vector<Buffer*> dataOut) = 0;      virtual const char* name() = 0; +    virtual ~ModPlugin() = default;  };  /* Inputs are sources, the output buffers without reading any */ @@ -69,32 +88,38 @@ public:      virtual int process(Buffer* const dataIn, Buffer* dataOut) = 0;  }; -class PipelinedModCodec : public ModCodec +/* Pipelined ModCodecs run their processing in a separate thread, and + * have a one-call-to-process() latency. Because of this latency, they + * must also handle the metadata + */ +class PipelinedModCodec : public ModCodec, public ModMetadata  {  public: -    PipelinedModCodec(); -    PipelinedModCodec(const PipelinedModCodec&) = delete; -    PipelinedModCodec& operator=(const PipelinedModCodec&) = delete; -    PipelinedModCodec(PipelinedModCodec&&) = delete; -    PipelinedModCodec& operator=(PipelinedModCodec&&) = delete; -    ~PipelinedModCodec(); -      virtual int process(Buffer* const dataIn, Buffer* dataOut) final;      virtual const char* name() = 0; +    virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) final; +  protected:      // Once the instance implementing PipelinedModCodec has been constructed,      // it must call start_pipeline_thread()      void start_pipeline_thread(void); +    // To avoid race conditions on teardown, plugins must call +    // stop_pipeline_thread in their destructor. +    void stop_pipeline_thread(void); + +    // The real processing must be implemented in internal_process      virtual int internal_process(Buffer* const dataIn, Buffer* dataOut) = 0;  private: -    size_t m_number_of_runs; +    bool m_ready_to_output_data = false;      ThreadsafeQueue<std::shared_ptr<Buffer> > m_input_queue;      ThreadsafeQueue<std::shared_ptr<Buffer> > m_output_queue; -    std::atomic<bool> m_running; +    std::deque<meta_vec_t> m_metadata_fifo; + +    std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);      std::thread m_thread;      void process_thread(void);  }; @@ -119,3 +144,4 @@ public:              std::vector<Buffer*> dataOut);      virtual int process(Buffer* dataIn) = 0;  }; + diff --git a/src/OfdmGenerator.cpp b/src/OfdmGenerator.cpp index b00d66b..57e0e0e 100644 --- a/src/OfdmGenerator.cpp +++ b/src/OfdmGenerator.cpp @@ -139,6 +139,14 @@ OfdmGenerator::~OfdmGenerator()          fftwf_destroy_plan(myFftPlan);      } +    if (myCfrPostClip) { +        fftwf_free(myCfrPostClip); +    } + +    if (myCfrPostFft) { +        fftwf_free(myCfrPostFft); +    } +      if (myCfrFft) {          fftwf_destroy_plan(myCfrFft);      } diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index 23d5523..46a9ec9 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,47 +26,78 @@  #include "OutputFile.h"  #include "PcDebug.h" +#include "Log.h" +#include "TimestampDecoder.h"  #include <string>  #include <assert.h>  #include <stdexcept> -OutputFile::OutputFile(std::string filename) : -    ModOutput(), +using namespace std; + +OutputFile::OutputFile(const std::string& filename, bool show_metadata) : +    ModOutput(), ModMetadata(), +    myShowMetadata(show_metadata),      myFilename(filename)  {      PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n",              filename.c_str(), this); -    myFile = fopen(filename.c_str(), "w"); -    if (myFile == NULL) { +    FILE* fd = fopen(filename.c_str(), "w"); +    if (fd == nullptr) {          perror(filename.c_str());          throw std::runtime_error(                  "OutputFile::OutputFile() unable to open file!");      } -} - - -OutputFile::~OutputFile() -{ -    PDEBUG("OutputFile::~OutputFile() @ %p\n", this); - -    if (myFile != NULL) { -        fclose(myFile); -    } +    myFile.reset(fd);  }  int OutputFile::process(Buffer* dataIn)  {      PDEBUG("OutputFile::process(%p)\n", dataIn); -    assert(dataIn != NULL); +    assert(dataIn != nullptr); -    if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile) == 0) { +    if (fwrite(dataIn->getData(), dataIn->getLength(), 1, myFile.get()) == 0) {          throw std::runtime_error(                  "OutputFile::process() unable to write to file!");      }      return dataIn->getLength();  } + +meta_vec_t OutputFile::process_metadata(const meta_vec_t& metadataIn) +{ +    if (myShowMetadata) { +        stringstream ss; + +        for (const auto& md : metadataIn) { +            if (md.ts) { +                ss << " FCT=" << md.ts->fct << +                    " FP=" << (int)md.ts->fp; +                if (md.ts->timestamp_valid) { +                    ss << " TS=" << md.ts->timestamp_sec << " + " << +                        std::fixed +                        << (double)md.ts->timestamp_pps / 163840000.0 << ";"; +                } +                else { +                    ss << " TS invalid;"; +                } +            } +            else { +                ss << " void, "; +            } +        } + +        if (metadataIn.empty()) { +            etiLog.level(debug) << "Output File got no mdIn"; +        } +        else { +            etiLog.level(debug) << "Output File got metadata: " << ss.str(); +        } + +    } +    return {}; +} + diff --git a/src/OutputFile.h b/src/OutputFile.h index 7121ef3..745e672 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -32,23 +32,29 @@  #include "ModPlugin.h" +#include "EtiReader.h"  #include <string>  #include <stdio.h>  #include <sys/types.h> +#include <memory> - -class OutputFile : public ModOutput +class OutputFile : public ModOutput, public ModMetadata  {  public: -    OutputFile(std::string filename); -    virtual ~OutputFile(); +    OutputFile(const std::string& filename, bool show_metadata); + +    virtual int process(Buffer* dataIn) override; +    const char* name() override { return "OutputFile"; } -    virtual int process(Buffer* dataIn); -    const char* name() { return "OutputFile"; } +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn) override;  protected: +    bool myShowMetadata = false;      std::string myFilename; -    FILE* myFile; + +    struct FILEDeleter{ void operator()(FILE* fd){ if (fd) fclose(fd); }}; +    std::unique_ptr<FILE, FILEDeleter> myFile;  }; diff --git a/src/OutputMemory.cpp b/src/OutputMemory.cpp index 6e2fd49..5f24095 100644 --- a/src/OutputMemory.cpp +++ b/src/OutputMemory.cpp @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,6 +26,7 @@  #include "OutputMemory.h"  #include "PcDebug.h" +#include "Log.h"  #include <stdexcept>  #include <string.h> @@ -94,3 +95,14 @@ int OutputMemory::process(Buffer* dataIn)      return myDataOut->getLength();  } +meta_vec_t OutputMemory::process_metadata(const meta_vec_t& metadataIn) +{ +    myMetadata = metadataIn; +    return {}; +} + +meta_vec_t OutputMemory::get_latest_metadata() +{ +    return myMetadata; +} + diff --git a/src/OutputMemory.h b/src/OutputMemory.h index 715cb2d..f0a5fbb 100644 --- a/src/OutputMemory.h +++ b/src/OutputMemory.h @@ -2,7 +2,7 @@     Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in     Right of Canada (Communications Research Center Canada) -   Copyright (C) 2016 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -46,18 +46,26 @@  #include "ModPlugin.h" -class OutputMemory : public ModOutput +class OutputMemory : public ModOutput, public ModMetadata  {  public:      OutputMemory(Buffer* dataOut);      virtual ~OutputMemory(); -    virtual int process(Buffer* dataIn); -    const char* name() { return "OutputMemory"; } +    OutputMemory(OutputMemory& other) = delete; +    OutputMemory& operator=(OutputMemory& other) = delete; + +    virtual int process(Buffer* dataIn) override; +    const char* name() override { return "OutputMemory"; } +    virtual meta_vec_t process_metadata( +            const meta_vec_t& metadataIn) override; + +    meta_vec_t get_latest_metadata(void);      void setOutput(Buffer* dataOut);  protected:      Buffer* myDataOut; +    meta_vec_t myMetadata;  #if OUTPUT_MEM_HISTOGRAM      // keep track of max value diff --git a/src/OutputSoapy.cpp b/src/OutputSoapy.cpp deleted file mode 100644 index 699501c..0000000 --- a/src/OutputSoapy.cpp +++ /dev/null @@ -1,287 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://opendigitalradio.org - -DESCRIPTION: -   It is an output driver using the SoapySDR library that can output to -   many devices. -*/ - -/* -   This file is part of ODR-DabMod. - -   ODR-DabMod is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMod is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. - */ - -#include "OutputSoapy.h" -#ifdef HAVE_SOAPYSDR - -#include <SoapySDR/Errors.hpp> -#include <deque> -#include <chrono> - -#include "Log.h" -#include "Utils.h" - -#include <stdio.h> - -static const size_t FRAMES_MAX_SIZE = 2; - - -using namespace std; - - - -OutputSoapy::OutputSoapy(OutputSoapyConfig& config) : -    ModOutput(), -    RemoteControllable("soapy"), -    m_conf(config), -    m_device(nullptr) -{ -    RC_ADD_PARAMETER(txgain, "SoapySDR analog daughterboard TX gain"); -    RC_ADD_PARAMETER(freq,   "SoapySDR transmission frequency"); -    RC_ADD_PARAMETER(overflows, "SoapySDR overflow count [r/o]"); -    RC_ADD_PARAMETER(underflows, "SoapySDR underflow count [r/o]"); - -    etiLog.level(info) << -        "OutputSoapy:Creating the device with: " << -        config.device; -    try -    { -        m_device = SoapySDR::Device::make(config.device); -        stringstream ss; -        ss << "SoapySDR driver=" << m_device->getDriverKey(); -        ss << " hardware=" << m_device->getHardwareKey(); -        for (const auto &it : m_device->getHardwareInfo()) -        { -            ss << "  " << it.first << "=" << it.second; -        } -    } -    catch (const std::exception &ex) -    { -        etiLog.level(error) << "Error making SoapySDR device: " << -            ex.what(); -        throw std::runtime_error("Cannot create SoapySDR output"); -    } - -    m_device->setMasterClockRate(config.masterClockRate); -    etiLog.level(info) << "SoapySDR master clock rate set to " << -        m_device->getMasterClockRate()/1000.0 << " kHz"; - -    m_device->setSampleRate(SOAPY_SDR_TX, 0, m_conf.sampleRate); -    etiLog.level(info) << "OutputSoapySDR:Actual TX rate: " << -        m_device->getSampleRate(SOAPY_SDR_TX, 0) / 1000.0 << -        " ksps."; - -    m_device->setFrequency(SOAPY_SDR_TX, 0, m_conf.frequency); -    m_conf.frequency = m_device->getFrequency(SOAPY_SDR_TX, 0); -    etiLog.level(info) << "OutputSoapySDR:Actual frequency: " << -        m_conf.frequency / 1000.0 << -        " kHz."; - -    m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain); -    etiLog.level(info) << "OutputSoapySDR:Actual tx gain: " << -        m_device->getGain(SOAPY_SDR_TX, 0); - -} - -OutputSoapy::~OutputSoapy() -{ -    m_worker.stop(); -    if (m_device != nullptr) { -        SoapySDR::Device::unmake(m_device); -    } -} - -void SoapyWorker::stop() -{ -    running = false; -    queue.push({}); -    if (m_thread.joinable()) { -        m_thread.join(); -    } -} - -void SoapyWorker::start(SoapySDR::Device *device) -{ -    m_device = device; -    underflows = 0; -    overflows = 0; -    running = true; -    m_thread = std::thread(&SoapyWorker::process_start, this); -} - -void SoapyWorker::process_start() -{ -    // Set thread priority to realtime -    if (int ret = set_realtime_prio(1)) { -        etiLog.level(error) << "Could not set priority for SoapySDR worker:" << ret; -    } - -    set_thread_name("soapyworker"); - -    std::vector<size_t> channels; -    channels.push_back(0); -    auto stream = m_device->setupStream(SOAPY_SDR_TX, "CF32", channels); -    m_device->activateStream(stream); -    process(stream); -    m_device->closeStream(stream); -    running = false; -    etiLog.level(warn) << "SoapySDR worker terminated"; -} - -void SoapyWorker::process(SoapySDR::Stream *stream) -{ -    while (running) { -        struct SoapyWorkerFrameData frame; -        queue.wait_and_pop(frame); - -        // The frame buffer contains bytes representing FC32 samples -        const complexf *buf = reinterpret_cast<complexf*>(frame.buf.data()); -        const size_t numSamples = frame.buf.size() / sizeof(complexf); -        if ((frame.buf.size() % sizeof(complexf)) != 0) { -            throw std::runtime_error("OutputSoapy: invalid buffer size"); -        } - -        // Stream MTU is in samples, not bytes. -        const size_t mtu = m_device->getStreamMTU(stream); - -        size_t num_acc_samps = 0; -        while (running && (num_acc_samps < numSamples)) { -            const void *buffs[1]; -            buffs[0] = buf + num_acc_samps; - -            const size_t samps_to_send = std::min(numSamples - num_acc_samps, mtu); - -            int flags = 0; - -            auto ret = m_device->writeStream(stream, buffs, samps_to_send, flags); - -            if (ret == SOAPY_SDR_TIMEOUT) { -                continue; -            } -            else if (ret == SOAPY_SDR_OVERFLOW) { -                overflows++; -                continue; -            } -            else if (ret == SOAPY_SDR_UNDERFLOW) { -                underflows++; -                continue; -            } - -            if (ret < 0) { -                etiLog.level(error) << "Unexpected stream error " << -                    SoapySDR::errToStr(ret); -                running = false; -            } - -            num_acc_samps += ret; -        } -    } -} - -int OutputSoapy::process(Buffer* dataIn) -{ -    if (first_run) { -        m_worker.start(m_device); -        first_run = false; -    } -    else if (!m_worker.running) { -        etiLog.level(error) << "OutputSoapy: worker thread died"; -        throw std::runtime_error("Fault in OutputSoapy"); -    } - -    SoapyWorkerFrameData frame; -    m_eti_source->calculateTimestamp(frame.ts); - - -    if (frame.ts.fct == -1) { -        etiLog.level(info) << -            "OutputSoapy: dropping one frame with invalid FCT"; -    } -    else { -        const uint8_t* pInData = reinterpret_cast<uint8_t*>(dataIn->getData()); -        frame.buf.resize(dataIn->getLength()); -        std::copy(pInData, pInData + dataIn->getLength(), -                frame.buf.begin()); -        m_worker.queue.push_wait_if_full(frame, FRAMES_MAX_SIZE); -    } - -    return dataIn->getLength(); -} - - -void OutputSoapy::setETISource(EtiSource *etiSource) -{ -    m_eti_source = etiSource; -} - -void OutputSoapy::set_parameter(const string& parameter, const string& value) -{ -    stringstream ss(value); -    ss.exceptions ( stringstream::failbit | stringstream::badbit ); - -    if (parameter == "txgain") { -        ss >> m_conf.txgain; -        m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain); -    } -    else if (parameter == "freq") { -        ss >> m_conf.frequency; -        m_device->setFrequency(SOAPY_SDR_TX, 0, m_conf.frequency); -    m_conf.frequency = m_device->getFrequency(SOAPY_SDR_TX, 0); -    } -    else if (parameter == "underflows") { -        throw ParameterError("Parameter 'underflows' is read-only"); -    } -    else if (parameter == "overflows") { -        throw ParameterError("Parameter 'overflows' is read-only"); -    } -    else { -        stringstream ss_err; -        ss_err << "Parameter '" << parameter -            << "' is not exported by controllable " << get_rc_name(); -        throw ParameterError(ss_err.str()); -    } -} - -const string OutputSoapy::get_parameter(const string& parameter) const -{ -    stringstream ss; -    if (parameter == "txgain") { -        ss << m_conf.txgain; -    } -    else if (parameter == "freq") { -        ss << m_conf.frequency; -    } -    else if (parameter == "underflows") { -        ss << m_worker.underflows; -    } -    else if (parameter == "overflows") { -        ss << m_worker.overflows; -    } -    else { -        ss << "Parameter '" << parameter << -            "' is not exported by controllable " << get_rc_name(); -        throw ParameterError(ss.str()); -    } -    return ss.str(); -} - -#endif // HAVE_SOAPYSDR - diff --git a/src/OutputSoapy.h b/src/OutputSoapy.h deleted file mode 100644 index 230f11b..0000000 --- a/src/OutputSoapy.h +++ /dev/null @@ -1,138 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://opendigitalradio.org - -DESCRIPTION: -   It is an output driver using the SoapySDR library that can output to -   many devices. -*/ - -/* -   This file is part of ODR-DabMod. - -   ODR-DabMod is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMod is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#   include <config.h> -#endif - -#ifdef HAVE_SOAPYSDR -#include <SoapySDR/Version.hpp> -#include <SoapySDR/Modules.hpp> -#include <SoapySDR/Registry.hpp> -#include <SoapySDR/Device.hpp> - -#include <string> -#include <memory> - -#include "ModPlugin.h" -#include "EtiReader.h" -#include "RemoteControl.h" -#include "ThreadsafeQueue.h" - -typedef std::complex<float> complexf; - -/* This structure is used as initial configuration for the Soapy output. - * It must also contain all remote-controllable settings, otherwise - * they will get lost on a modulator restart. */ -struct OutputSoapyConfig { -    std::string device; - -    long masterClockRate = 32768000; -    unsigned sampleRate = 2048000; -    double frequency = 0.0; -    double txgain = 0.0; -    unsigned dabMode = 0; -}; - -// Each frame contains one OFDM frame, and its -// associated timestamp -struct SoapyWorkerFrameData { -    // Buffer holding frame data -    std::vector<uint8_t> buf; - -    // A full timestamp contains a TIST according to standard -    // and time information within MNSC with tx_second. -    struct frame_timestamp ts; -}; - -class SoapyWorker -{ -    public: -        ThreadsafeQueue<SoapyWorkerFrameData> queue; -        SoapySDR::Device *m_device; -        std::atomic<bool> running; -        size_t underflows; -        size_t overflows; - -        SoapyWorker() {} -        SoapyWorker(const SoapyWorker&) = delete; -        SoapyWorker operator=(const SoapyWorker&) = delete; -        ~SoapyWorker() { stop(); } - -        void start(SoapySDR::Device *device); -        void stop(void); - -    private: -        std::thread m_thread; - -        void process_start(void); -        void process(SoapySDR::Stream *stream); -}; - -class OutputSoapy: public ModOutput, public RemoteControllable -{ -    public: -        OutputSoapy(OutputSoapyConfig& config); -        OutputSoapy(const OutputSoapy& other) = delete; -        OutputSoapy& operator=(const OutputSoapy& other) = delete; -        ~OutputSoapy(); - -        int process(Buffer* dataIn); - -        const char* name() { return "OutputSoapy"; } - -        void setETISource(EtiSource *etiSource); - -        /*********** REMOTE CONTROL ***************/ - -        /* Base function to set parameters. */ -        virtual void set_parameter(const std::string& parameter, -                const std::string& value); - -        /* Getting a parameter always returns a string. */ -        virtual const std::string get_parameter( -                const std::string& parameter) const; - - -    protected: -        SoapyWorker m_worker; -        EtiSource *m_eti_source; -        OutputSoapyConfig& m_conf; - -        SoapySDR::Device *m_device; - -        bool first_run = true; -}; - - -#endif //HAVE_SOAPYSDR diff --git a/src/OutputUHD.cpp b/src/OutputUHD.cpp deleted file mode 100644 index b533075..0000000 --- a/src/OutputUHD.cpp +++ /dev/null @@ -1,1035 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://opendigitalradio.org - */ -/* -   This file is part of ODR-DabMod. - -   ODR-DabMod is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMod is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. - */ - -#include "OutputUHD.h" - -#ifdef HAVE_OUTPUT_UHD - -#include "PcDebug.h" -#include "Log.h" -#include "RemoteControl.h" -#include "Utils.h" - -#include <boost/thread/future.hpp> - -#include <uhd/utils/msg.hpp> - -#include <cmath> -#include <iostream> -#include <assert.h> -#include <stdexcept> -#include <stdio.h> -#include <time.h> -#include <errno.h> -#include <unistd.h> -#include <pthread.h> - -using namespace std; - -// Maximum number of frames that can wait in frames -static const size_t FRAMES_MAX_SIZE = 8; - -typedef std::complex<float> complexf; - -std::string stringtrim(const std::string &s) -{ -    auto wsfront = std::find_if_not(s.begin(), s.end(), -            [](int c){ return std::isspace(c);} ); -    return std::string(wsfront, -            std::find_if_not(s.rbegin(), -                std::string::const_reverse_iterator(wsfront), -                [](int c){ return std::isspace(c);} ).base()); -} - -void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg) -{ -    if (type == uhd::msg::warning) { -        etiLog.level(warn) << "UHD Warning: " << msg; -    } -    else if (type == uhd::msg::error) { -        etiLog.level(error) << "UHD Error: " << msg; -    } -    else { -        // do not print very short U messages and such -        if (stringtrim(msg).size() != 1) { -            etiLog.level(debug) << "UHD Message: " << msg; -        } -    } -} - -static void tune_usrp_to( -        uhd::usrp::multi_usrp::sptr usrp, -        double lo_offset, -        double frequency) -{ -    if (lo_offset != 0.0) { -        etiLog.level(info) << std::fixed << std::setprecision(3) << -            "OutputUHD:Setting freq to " << frequency << -            "  with LO offset " << lo_offset << "..."; - -        const auto tr = uhd::tune_request_t(frequency, lo_offset); -        uhd::tune_result_t result = usrp->set_tx_freq(tr); - -        etiLog.level(debug) << "OutputUHD:" << -            std::fixed << std::setprecision(0) << -            " Target RF: " << result.target_rf_freq << -            " Actual RF: " << result.actual_rf_freq << -            " Target DSP: " << result.target_dsp_freq << -            " Actual DSP: " << result.actual_dsp_freq; -    } -    else { -        //set the centre frequency -        etiLog.level(info) << std::fixed << std::setprecision(3) << -            "OutputUHD:Setting freq to " << frequency << "..."; -        usrp->set_tx_freq(frequency); -    } - -    usrp->set_rx_freq(frequency); -} - -// Check function for GPS TIMELOCK sensor from the ODR LEA-M8F board GPSDO -bool check_gps_timelock(uhd::usrp::multi_usrp::sptr usrp) -{ -    try { -        std::string sensor_value( -                usrp->get_mboard_sensor("gps_timelock", 0).to_pp_string()); - -        if (sensor_value.find("TIME LOCKED") == std::string::npos) { -            etiLog.level(warn) << "OutputUHD: gps_timelock " << sensor_value; -            return false; -        } - -        return true; -    } -    catch (uhd::lookup_error &e) { -        etiLog.level(warn) << "OutputUHD: no gps_timelock sensor"; -        return false; -    } -} - -// Check function for GPS LOCKED sensor from the Ettus GPSDO -bool check_gps_locked(uhd::usrp::multi_usrp::sptr usrp) -{ -    try { -        uhd::sensor_value_t sensor_value( -                usrp->get_mboard_sensor("gps_locked", 0)); -        if (not sensor_value.to_bool()) { -            etiLog.level(warn) << "OutputUHD: gps_locked " << -                sensor_value.to_pp_string(); -            return false; -        } - -        return true; -    } -    catch (uhd::lookup_error &e) { -        etiLog.level(warn) << "OutputUHD: no gps_locked sensor"; -        return false; -    } -} - - -OutputUHD::OutputUHD( -        OutputUHDConfig& config) : -    ModOutput(), -    RemoteControllable("uhd"), -    myConf(config), -    // Since we don't know the buffer size, we cannot initialise -    // the buffers at object initialisation. -    myDelayBuf(0), -    running(false) -{ -    myConf.muting = true;     // is remote-controllable, and reset by the GPS fix check -    myConf.staticDelayUs = 0; // is remote-controllable - -    // Variables needed for GPS fix check -    first_gps_fix_check.tv_sec = 0; -    last_gps_fix_check.tv_sec = 0; -    time_last_frame.tv_sec = 0; - - -    std::stringstream device; -    device << myConf.device; - -    if (myConf.masterClockRate != 0) { -        if (device.str() != "") { -            device << ","; -        } -        device << "master_clock_rate=" << myConf.masterClockRate; -    } - -    if (myConf.usrpType != "") { -        if (device.str() != "") { -            device << ","; -        } -        device << "type=" << myConf.usrpType; -    } - -    MDEBUG("OutputUHD::OutputUHD(device: %s) @ %p\n", -            device.str().c_str(), this); - -    /* register the parameters that can be remote controlled */ -    RC_ADD_PARAMETER(txgain, "UHD analog daughterboard TX gain"); -    RC_ADD_PARAMETER(rxgain, "UHD analog daughterboard RX gain for DPD feedback"); -    RC_ADD_PARAMETER(freq, "UHD transmission frequency"); -    RC_ADD_PARAMETER(muting, "Mute the output by stopping the transmitter"); -    RC_ADD_PARAMETER(staticdelay, "Set static delay (uS) between 0 and 96000"); -    RC_ADD_PARAMETER(underruns, "Read-only counter of number of underruns"); -    RC_ADD_PARAMETER(latepackets, "Read-only counter of number of late packets"); -    RC_ADD_PARAMETER(frames, "Read-only counter of number of frames modulated"); - -    uhd::msg::register_handler(uhd_msg_handler); - -    uhd::set_thread_priority_safe(); - -    etiLog.log(info, "OutputUHD:Creating the usrp device with: %s...", -            device.str().c_str()); - -    myUsrp = uhd::usrp::multi_usrp::make(device.str()); - -    etiLog.log(info, "OutputUHD:Using device: %s...", -            myUsrp->get_pp_string().c_str()); - -    if (myConf.masterClockRate != 0.0) { -        double master_clk_rate = myUsrp->get_master_clock_rate(); -        etiLog.log(debug, "OutputUHD:Checking master clock rate: %f...", -                master_clk_rate); - -        if (fabs(master_clk_rate - myConf.masterClockRate) > -                (myConf.masterClockRate * 1e-6)) { -            throw std::runtime_error("Cannot set USRP master_clock_rate. Aborted."); -        } -    } - -    MDEBUG("OutputUHD:Setting REFCLK and PPS input...\n"); - -    if (myConf.refclk_src == "gpsdo-ettus") { -        myUsrp->set_clock_source("gpsdo"); -    } -    else { -        myUsrp->set_clock_source(myConf.refclk_src); -    } -    myUsrp->set_time_source(myConf.pps_src); - -    if (myConf.subDevice != "") { -        myUsrp->set_tx_subdev_spec(uhd::usrp::subdev_spec_t(myConf.subDevice), -                uhd::usrp::multi_usrp::ALL_MBOARDS); -    } - -    etiLog.level(debug) << "UHD clock source is " << myUsrp->get_clock_source(0); - -    etiLog.level(debug) << "UHD time source is " << myUsrp->get_time_source(0); - -    myUsrp->set_tx_rate(myConf.sampleRate); -    etiLog.log(debug, "OutputUHD:Set rate to %d. Actual TX Rate: %f sps...", -            myConf.sampleRate, myUsrp->get_tx_rate()); - -    if (fabs(myUsrp->get_tx_rate() / myConf.sampleRate) > -             myConf.sampleRate * 1e-6) { -        throw std::runtime_error("Cannot set USRP sample rate. Aborted."); -    } - -    tune_usrp_to(myUsrp, myConf.lo_offset, myConf.frequency); - -    myConf.frequency = myUsrp->get_tx_freq(); -    etiLog.level(info) << std::fixed << std::setprecision(3) << -        "OutputUHD:Actual TX frequency: " << myConf.frequency; - -    etiLog.level(info) << std::fixed << std::setprecision(3) << -        "OutputUHD:Actual RX frequency: " << myUsrp->get_tx_freq(); - -    myUsrp->set_tx_gain(myConf.txgain); -    etiLog.log(debug, "OutputUHD:Actual TX Gain: %f", myUsrp->get_tx_gain()); - -    etiLog.log(debug, "OutputUHD:Mute on missing timestamps: %s", -            myConf.muteNoTimestamps ? "enabled" : "disabled"); - -    // preparing output thread worker data -    sync_and_ts_valid = false; - -    SetDelayBuffer(myConf.dabMode); - -    myUsrp->set_rx_rate(myConf.sampleRate); -    etiLog.log(debug, "OutputUHD:Actual RX Rate: %f sps.", myUsrp->get_rx_rate()); - -    myUsrp->set_rx_antenna("RX2"); -    etiLog.log(debug, "OutputUHD:Set RX Antenna: %s", -            myUsrp->get_rx_antenna().c_str()); - -    myUsrp->set_rx_gain(myConf.rxgain); -    etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", myUsrp->get_rx_gain()); - -    uhdFeedback = std::make_shared<OutputUHDFeedback>( -            myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); - -    MDEBUG("OutputUHD:UHD ready.\n"); -} - -bool OutputUHD::refclk_loss_needs_check() const -{ -    if (suppress_refclk_loss_check) { -        return false; -    } -    return myConf.refclk_src != "internal"; -} - -bool OutputUHD::gpsfix_needs_check() const -{ -    if (myConf.refclk_src == "internal") { -        return false; -    } -    else if (myConf.refclk_src == "gpsdo") { -        return (myConf.maxGPSHoldoverTime != 0); -    } -    else if (myConf.refclk_src == "gpsdo-ettus") { -        return (myConf.maxGPSHoldoverTime != 0); -    } -    else { -        return false; -    } -} - -bool OutputUHD::gpsdo_is_ettus() const -{ -    return (myConf.refclk_src == "gpsdo-ettus"); -} - -OutputUHD::~OutputUHD() -{ -    stop_threads(); -} - -void OutputUHD::stop_threads() -{ -    running.store(false); -    uhd_thread.interrupt(); -    uhd_thread.join(); -    async_rx_thread.join(); -} - - -void OutputUHD::setETISource(EtiSource *etiSource) -{ -    myEtiSource = etiSource; -} - -int transmission_frame_duration_ms(unsigned int dabMode) -{ -    switch (dabMode) { -        // could happen when called from constructor and we take the mode from ETI -        case 0: return 0; - -        case 1: return 96; -        case 2: return 24; -        case 3: return 24; -        case 4: return 48; -        default: -            throw std::runtime_error("OutputUHD: invalid DAB mode"); -    } -} - -void OutputUHD::SetDelayBuffer(unsigned int dabMode) -{ -    // find out the duration of the transmission frame (Table 2 in ETSI 300 401) -    myTFDurationMs = transmission_frame_duration_ms(dabMode); - -    // The buffer size equals the number of samples per transmission frame so -    // we calculate it by multiplying the duration of the transmission frame -    // with the samplerate. -    myDelayBuf.resize(myTFDurationMs * myConf.sampleRate / 1000); -} - -int OutputUHD::process(Buffer* dataIn) -{ -    if (not gps_fix_verified) { -        if (gpsfix_needs_check()) { -            initial_gps_check(); - -            if (num_checks_without_gps_fix == 0) { -                set_usrp_time(); -                gps_fix_verified = true; -                myConf.muting = false; -            } -        } -        else { -            set_usrp_time(); -            gps_fix_verified = true; -            myConf.muting = false; -        } -    } -    else { -        if (first_run) { -            etiLog.level(debug) << "OutputUHD: UHD initialising..."; - -            // we only set the delay buffer from the dab mode signaled in ETI if the -            // dab mode was not set in contructor -            if (myTFDurationMs == 0) { -                SetDelayBuffer(myEtiSource->getMode()); -            } - -            running.store(true); -            uhd_thread = boost::thread(&OutputUHD::workerthread, this); -            async_rx_thread = boost::thread( -                    &OutputUHD::print_async_thread, this); - -            lastLen = dataIn->getLength(); -            first_run = false; -            etiLog.level(debug) << "OutputUHD: UHD initialising complete"; -        } - -        if (lastLen != dataIn->getLength()) { -            // I expect that this never happens. -            etiLog.level(emerg) << -                "OutputUHD: Fatal error, input length changed from " << lastLen << -                " to " << dataIn->getLength(); -            throw std::runtime_error("Non-constant input length!"); -        } - -        sync_and_ts_valid = myConf.enableSync and -            myEtiSource->sourceContainsTimestamp(); - -        if (gpsfix_needs_check()) { -            try { -                check_gps(); -            } -            catch (std::runtime_error& e) { -                running.store(false); -                etiLog.level(error) << e.what(); -            } -        } - -        // Prepare the frame for the worker -        UHDWorkerFrameData frame; -        frame.buf.resize(dataIn->getLength()); - -        // calculate delay and fill buffer -        uint32_t noSampleDelay = (myConf.staticDelayUs * (myConf.sampleRate / 1000)) / 1000; -        uint32_t noByteDelay = noSampleDelay * sizeof(complexf); - -        const uint8_t* pInData = (uint8_t*)dataIn->getData(); - -        uint8_t *pTmp = &frame.buf[0]; -        if (noByteDelay) { -            // copy remain from delaybuf -            memcpy(pTmp, &myDelayBuf[0], noByteDelay); -            // copy new data -            memcpy(&pTmp[noByteDelay], pInData, dataIn->getLength() - noByteDelay); -            // copy remaining data to delay buf -            memcpy(&myDelayBuf[0], &pInData[dataIn->getLength() - noByteDelay], noByteDelay); -        } -        else { -            std::copy(pInData, pInData + dataIn->getLength(), -                    frame.buf.begin()); -        } - -        myEtiSource->calculateTimestamp(frame.ts); - -        if (not running.load()) { -            uhd_thread.interrupt(); -            uhd_thread.join(); -            async_rx_thread.join(); -            first_run = true; - -            etiLog.level(error) << "OutputUHD UHD worker failed"; -            throw std::runtime_error("UHD worker failed"); -        } - -        if (frame.ts.fct == -1) { -            etiLog.level(info) << -                "OutputUHD: dropping one frame with invalid FCT"; -        } -        else { -            try { -                uhdFeedback->set_tx_frame(frame.buf, frame.ts); -            } -            catch (const runtime_error& e) { -                etiLog.level(warn) << -                    "OutputUHD: Feedback server failed, restarting..."; - -                uhdFeedback = std::make_shared<OutputUHDFeedback>( -                        myUsrp, myConf.dpdFeedbackServerPort, myConf.sampleRate); -            } - -            size_t num_frames = m_frames.push_wait_if_full(frame, -                    FRAMES_MAX_SIZE); -            etiLog.log(trace, "UHD,push %zu", num_frames); -        } -    } - -    return dataIn->getLength(); -} - - -void OutputUHD::set_usrp_time() -{ -    if (myConf.enableSync and (myConf.pps_src == "none")) { -        etiLog.level(warn) << -            "OutputUHD: WARNING:" -            " you are using synchronous transmission without PPS input!"; - -        struct timespec now; -        if (clock_gettime(CLOCK_REALTIME, &now)) { -            perror("OutputUHD:Error: could not get time: "); -            etiLog.level(error) << "OutputUHD: could not get time"; -        } -        else { -            myUsrp->set_time_now(uhd::time_spec_t(now.tv_sec)); -            etiLog.level(info) << "OutputUHD: Setting USRP time to " << -                std::fixed << -                uhd::time_spec_t(now.tv_sec).get_real_secs(); -        } -    } - -    if (myConf.pps_src != "none") { -        /* handling time for synchronisation: wait until the next full -         * second, and set the USRP time at next PPS */ -        struct timespec now; -        time_t seconds; -        if (clock_gettime(CLOCK_REALTIME, &now)) { -            etiLog.level(error) << "OutputUHD: could not get time :" << -                strerror(errno); -            throw std::runtime_error("OutputUHD: could not get time."); -        } -        else { -            seconds = now.tv_sec; - -            MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec); -            while (seconds + 1 > now.tv_sec) { -                usleep(1); -                if (clock_gettime(CLOCK_REALTIME, &now)) { -                    etiLog.level(error) << "OutputUHD: could not get time :" << -                        strerror(errno); -                    throw std::runtime_error("OutputUHD: could not get time."); -                } -            } -            MDEBUG("OutputUHD:sec+1: %ld ; now: %ld ...\n", seconds+1, now.tv_sec); -            /* We are now shortly after the second change. */ - -            usleep(200000); // 200ms, we want the PPS to be later -            myUsrp->set_time_unknown_pps(uhd::time_spec_t(seconds + 2)); -            etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " << -                std::fixed << -                uhd::time_spec_t(seconds + 2).get_real_secs(); -        } - -        usleep(1e6); -        etiLog.log(info,  "OutputUHD: USRP time %f\n", -                myUsrp->get_time_now().get_real_secs()); -    } -} - -void OutputUHD::initial_gps_check() -{ -    if (first_gps_fix_check.tv_sec == 0) { -        etiLog.level(info) << "Waiting for GPS fix"; - -        if (clock_gettime(CLOCK_MONOTONIC, &first_gps_fix_check) != 0) { -            stringstream ss; -            ss << "clock_gettime failure: " << strerror(errno); -            throw std::runtime_error(ss.str()); -        } -    } - -    check_gps(); - -    if (last_gps_fix_check.tv_sec > -            first_gps_fix_check.tv_sec + initial_gps_fix_wait) { -        stringstream ss; -        ss << "GPS did not show time lock in " << initial_gps_fix_wait << " seconds"; -        throw std::runtime_error(ss.str()); -    } - -    if (time_last_frame.tv_sec == 0) { -        if (clock_gettime(CLOCK_MONOTONIC, &time_last_frame) != 0) { -            stringstream ss; -            ss << "clock_gettime failure: " << strerror(errno); -            throw std::runtime_error(ss.str()); -        } -    } - -    struct timespec now; -    if (clock_gettime(CLOCK_MONOTONIC, &now) != 0) { -        stringstream ss; -        ss << "clock_gettime failure: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } - -    long delta_us = timespecdiff_us(time_last_frame, now); -    long wait_time_us = transmission_frame_duration_ms(myConf.dabMode); - -    if (wait_time_us - delta_us > 0) { -        usleep(wait_time_us - delta_us); -    } - -    time_last_frame.tv_nsec += wait_time_us * 1000; -    if (time_last_frame.tv_nsec >= 1000000000L) { -        time_last_frame.tv_nsec -= 1000000000L; -        time_last_frame.tv_sec++; -    } -} - -void OutputUHD::check_gps() -{ -    struct timespec time_now; -    if (clock_gettime(CLOCK_MONOTONIC, &time_now) != 0) { -        stringstream ss; -        ss << "clock_gettime failure: " << strerror(errno); -        throw std::runtime_error(ss.str()); -    } - -    // Divide interval by two because we alternate between -    // launch and check -    if (gpsfix_needs_check() and -            last_gps_fix_check.tv_sec + gps_fix_check_interval/2.0 < -            time_now.tv_sec) { -        last_gps_fix_check = time_now; - -        // Alternate between launching thread and checking the -        // result. -        if (gps_fix_task.joinable()) { -            if (gps_fix_future.has_value()) { - -                gps_fix_future.wait(); - -                gps_fix_task.join(); - -                if (not gps_fix_future.get()) { -                    if (num_checks_without_gps_fix == 0) { -                        etiLog.level(alert) << -                            "OutputUHD: GPS Time Lock lost"; -                    } -                    num_checks_without_gps_fix++; -                } -                else { -                    if (num_checks_without_gps_fix) { -                        etiLog.level(info) << -                            "OutputUHD: GPS Time Lock recovered"; -                    } -                    num_checks_without_gps_fix = 0; -                } - -                if (gps_fix_check_interval * num_checks_without_gps_fix > -                        myConf.maxGPSHoldoverTime) { -                    std::stringstream ss; -                    ss << "Lost GPS Time Lock for " << gps_fix_check_interval * -                        num_checks_without_gps_fix << " seconds"; -                    throw std::runtime_error(ss.str()); -                } -            } -        } -        else { -            // Checking the sensor here takes too much -            // time, it has to be done in a separate thread. -            if (gpsdo_is_ettus()) { -                gps_fix_pt = boost::packaged_task<bool>( -                        boost::bind(check_gps_locked, myUsrp) ); -            } -            else { -                gps_fix_pt = boost::packaged_task<bool>( -                        boost::bind(check_gps_timelock, myUsrp) ); -            } -            gps_fix_future = gps_fix_pt.get_future(); - -            gps_fix_task = boost::thread(boost::move(gps_fix_pt)); -        } -    } -} - -void OutputUHD::workerthread() -{ -    // Set thread priority to realtime -    if (int ret = set_realtime_prio(1)) { -        etiLog.level(error) << "Could not set priority for UHD worker:" << ret; -    } - -    set_thread_name("uhdworker"); - -    last_tx_time_initialised = false; - -    uhd::stream_args_t stream_args("fc32"); //complex floats -    myTxStream = myUsrp->get_tx_stream(stream_args); - -    md.start_of_burst = false; -    md.end_of_burst   = false; - -    num_underflows   = 0; -    num_late_packets = 0; - -    size_t last_num_underflows = 0; -    size_t pop_prebuffering = FRAMES_MAX_SIZE; - -    while (running.load()) { -        md.has_time_spec  = false; -        md.time_spec      = uhd::time_spec_t(0.0); - -        struct UHDWorkerFrameData frame; -        etiLog.log(trace, "UHD,wait"); -        m_frames.wait_and_pop(frame, pop_prebuffering); -        etiLog.log(trace, "UHD,pop"); - -        handle_frame(&frame); -        num_frames_modulated++; - -        /* Ensure we fill frames after every underrun and -         * at startup to reduce underrun likelihood. */ -        if (last_num_underflows < num_underflows) { -            pop_prebuffering = FRAMES_MAX_SIZE; -        } -        else { -            pop_prebuffering = 1; -        } -        last_num_underflows = num_underflows; -    } -    running.store(false); -    etiLog.level(warn) << "UHD worker terminated"; -} - -void OutputUHD::handle_frame(const struct UHDWorkerFrameData *frame) -{ -    // Transmit timeout -    static const double tx_timeout = 20.0; - -    // Check for ref_lock -    if (refclk_loss_needs_check()) { -        try { -            if (not myUsrp->get_mboard_sensor("ref_locked", 0).to_bool()) { -                etiLog.log(alert, -                        "OutputUHD: External reference clock lock lost !"); -                if (myConf.refclk_lock_loss_behaviour == CRASH) { -                    throw std::runtime_error( -                            "OutputUHD: External reference clock lock lost."); -                } -            } -        } -        catch (uhd::lookup_error &e) { -            suppress_refclk_loss_check = true; -            etiLog.log(warn, "OutputUHD: This USRP does not have mboard " -                    "sensor for ext clock loss. Check disabled."); -        } -    } - -    double usrp_time = myUsrp->get_time_now().get_real_secs(); -    bool timestamp_discontinuity = false; - -    if (sync_and_ts_valid) { -        // Tx time from MNSC and TIST -        uint32_t tx_second = frame->ts.timestamp_sec; -        uint32_t tx_pps    = frame->ts.timestamp_pps; - -        if (!frame->ts.timestamp_valid) { -            /* We have not received a full timestamp through -             * MNSC. We sleep through the frame. -             */ -            etiLog.level(info) << -                "OutputUHD: Throwing sample " << frame->ts.fct << -                " away: incomplete timestamp " << tx_second << -                " / " << tx_pps; -            usleep(20000); //TODO should this be TM-dependant ? -            return; -        } - -        if (last_tx_time_initialised) { -            const size_t sizeIn = frame->buf.size() / sizeof(complexf); -            uint64_t increment = (uint64_t)sizeIn * 16384000ul / -                                 (uint64_t)myConf.sampleRate; -                                  // samps  * ticks/s  / (samps/s) -                                  // (samps * ticks * s) / (s * samps) -                                  // ticks - -            uint32_t expected_sec = last_tx_second + increment / 16384000ul; -            uint32_t expected_pps = last_tx_pps + increment % 16384000ul; - -            while (expected_pps >= 16384000) { -                expected_sec++; -                expected_pps -= 16384000; -            } - -            if (expected_sec != tx_second or -                    expected_pps != tx_pps) { -                etiLog.level(warn) << "OutputUHD: timestamp irregularity!" << -                    std::fixed << -                    " Expected " << -                    expected_sec << "+" << (double)expected_pps/16384000.0 << -                    "(" << expected_pps << ")" << -                    " Got " << -                    tx_second << "+" << (double)tx_pps/16384000.0 << -                    "(" << tx_pps << ")"; - -                timestamp_discontinuity = true; -            } -        } - -        last_tx_second = tx_second; -        last_tx_pps    = tx_pps; -        last_tx_time_initialised = true; - -        double pps_offset = tx_pps / 16384000.0; - -        md.has_time_spec = true; -        md.time_spec = uhd::time_spec_t(tx_second, pps_offset); -        etiLog.log(trace, "UHD,tist %f", md.time_spec.get_real_secs()); - -        // md is defined, let's do some checks -        if (md.time_spec.get_real_secs() + tx_timeout < usrp_time) { -            etiLog.level(warn) << -                "OutputUHD: Timestamp in the past! offset: " << -                std::fixed << -                md.time_spec.get_real_secs() - usrp_time << -                "  (" << usrp_time << ")" -                " frame " << frame->ts.fct << -                ", tx_second " << tx_second << -                ", pps " << pps_offset; -            return; -        } - -        if (md.time_spec.get_real_secs() > usrp_time + TIMESTAMP_ABORT_FUTURE) { -            etiLog.level(error) << -                "OutputUHD: Timestamp way too far in the future! offset: " << -                std::fixed << -                md.time_spec.get_real_secs() - usrp_time; -            throw std::runtime_error("Timestamp error. Aborted."); -        } -    } -    else { // !sync_and_ts_valid -        if (myConf.muting or myConf.muteNoTimestamps) { -            /* There was some error decoding the timestamp */ -            if (myConf.muting) { -                etiLog.log(info, -                        "OutputUHD: Muting sample %d requested\n", -                        frame->ts.fct); -            } -            else { -                etiLog.log(info, -                        "OutputUHD: Muting sample %d : no timestamp\n", -                        frame->ts.fct); -            } -            usleep(20000); -            return; -        } -    } - -    tx_frame(frame, timestamp_discontinuity); -} - -void OutputUHD::tx_frame(const struct UHDWorkerFrameData *frame, bool ts_update) -{ -    const double tx_timeout = 20.0; -    const size_t sizeIn = frame->buf.size() / sizeof(complexf); -    const complexf* in_data = reinterpret_cast<const complexf*>(&frame->buf[0]); - -    size_t usrp_max_num_samps = myTxStream->get_max_num_samps(); -    size_t num_acc_samps = 0; //number of accumulated samples -    while (running.load() and (not myConf.muting) and (num_acc_samps < sizeIn)) { -        size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); - -        uhd::tx_metadata_t md_tx = md; - -        //ensure the the last packet has EOB set if the timestamps has been -        //refreshed and need to be reconsidered. -        md_tx.end_of_burst = ( -                sync_and_ts_valid and -                (frame->ts.timestamp_refresh or ts_update) and -                samps_to_send <= usrp_max_num_samps ); - - -        //send a single packet -        size_t num_tx_samps = myTxStream->send( -                &in_data[num_acc_samps], -                samps_to_send, md_tx, tx_timeout); -        etiLog.log(trace, "UHD,sent %zu of %zu", num_tx_samps, samps_to_send); - -        num_acc_samps += num_tx_samps; - -        md_tx.time_spec = md.time_spec + -            uhd::time_spec_t(0, num_tx_samps/myConf.sampleRate); - -        if (num_tx_samps == 0) { -            etiLog.log(warn, -                    "OutputUHD::workerthread() unable to write to device, skipping frame!\n"); -            break; -        } -    } -} - -void OutputUHD::print_async_thread() -{ -    while (running.load()) { -        uhd::async_metadata_t async_md; -        if (myUsrp->get_device()->recv_async_msg(async_md, 1)) { -            const char* uhd_async_message = ""; -            bool failure = false; -            switch (async_md.event_code) { -                case uhd::async_metadata_t::EVENT_CODE_BURST_ACK: -                    break; -                case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW: -                    uhd_async_message = "Underflow"; -                    num_underflows++; -                    break; -                case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR: -                    uhd_async_message = "Packet loss between host and device."; -                    failure = true; -                    break; -                case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR: -                    uhd_async_message = "Packet had time that was late."; -                    num_late_packets++; -                    break; -                case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET: -                    uhd_async_message = "Underflow occurred inside a packet."; -                    failure = true; -                    break; -                case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST: -                    uhd_async_message = "Packet loss within a burst."; -                    failure = true; -                    break; -                default: -                    uhd_async_message = "unknown event code"; -                    failure = true; -                    break; -            } - -            if (failure) { -                etiLog.level(alert) << -                    "Received Async UHD Message '" << -                    uhd_async_message << "' at time " << -                    md.time_spec.get_real_secs(); - -            } -        } - -        auto time_now = std::chrono::steady_clock::now(); -        if (last_print_time + std::chrono::seconds(1) < time_now) { -            const double usrp_time = -                myUsrp->get_time_now().get_real_secs(); - -            if ( (num_underflows > num_underflows_previous) or -                 (num_late_packets > num_late_packets_previous)) { -                etiLog.log(info, -                        "OutputUHD status (usrp time: %f): " -                        "%d underruns and %d late packets since last status.\n", -                        usrp_time, -                        num_underflows, num_late_packets); -            } - -            num_underflows_previous = num_underflows; -            num_late_packets_previous = num_late_packets; - -            last_print_time = time_now; -        } -    } -} - -// ======================================= -// Remote Control for UHD -// ======================================= -void OutputUHD::set_parameter(const string& parameter, const string& value) -{ -    stringstream ss(value); -    ss.exceptions ( stringstream::failbit | stringstream::badbit ); - -    if (parameter == "txgain") { -        ss >> myConf.txgain; -        myUsrp->set_tx_gain(myConf.txgain); -    } -    else if (parameter == "rxgain") { -        ss >> myConf.rxgain; -        myUsrp->set_rx_gain(myConf.rxgain); -    } -    else if (parameter == "freq") { -        ss >> myConf.frequency; -        tune_usrp_to(myUsrp, myConf.lo_offset, myConf.frequency); -        myConf.frequency = myUsrp->get_tx_freq(); -    } -    else if (parameter == "muting") { -        ss >> myConf.muting; -    } -    else if (parameter == "staticdelay") { -        int64_t adjust; -        ss >> adjust; -        if (adjust > (myTFDurationMs * 1000)) -        { // reset static delay for values outside range -            myConf.staticDelayUs = 0; -        } -        else -        { // the new adjust value is added to the existing delay and the result -            // is wrapped around at TF duration -            int newStaticDelayUs = myConf.staticDelayUs + adjust; -            if (newStaticDelayUs > (myTFDurationMs * 1000)) -                myConf.staticDelayUs = newStaticDelayUs - (myTFDurationMs * 1000); -            else if (newStaticDelayUs < 0) -                myConf.staticDelayUs = newStaticDelayUs + (myTFDurationMs * 1000); -            else -                myConf.staticDelayUs = newStaticDelayUs; -        } -    } -    else if (parameter == "underruns" or -            parameter == "latepackets" or -            parameter == "frames") { -        throw ParameterError("Parameter " + parameter + " is read-only."); -    } -    else { -        stringstream ss_err; -        ss_err << "Parameter '" << parameter -            << "' is not exported by controllable " << get_rc_name(); -        throw ParameterError(ss_err.str()); -    } -} - -const string OutputUHD::get_parameter(const string& parameter) const -{ -    stringstream ss; -    if (parameter == "txgain") { -        ss << myConf.txgain; -    } -    else if (parameter == "rxgain") { -        ss << myConf.rxgain; -    } -    else if (parameter == "freq") { -        ss << myConf.frequency; -    } -    else if (parameter == "muting") { -        ss << myConf.muting; -    } -    else if (parameter == "staticdelay") { -        ss << myConf.staticDelayUs; -    } -    else if (parameter == "underruns") { -        ss << num_underflows; -    } -    else if (parameter == "latepackets") { -        ss << num_late_packets; -    } -    else if (parameter == "frames") { -        ss << num_frames_modulated; -    } -    else { -        ss << "Parameter '" << parameter << -            "' is not exported by controllable " << get_rc_name(); -        throw ParameterError(ss.str()); -    } -    return ss.str(); -} - -#endif // HAVE_OUTPUT_UHD - diff --git a/src/OutputUHD.h b/src/OutputUHD.h deleted file mode 100644 index 9213183..0000000 --- a/src/OutputUHD.h +++ /dev/null @@ -1,250 +0,0 @@ -/* -   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the -   Queen in Right of Canada (Communications Research Center Canada) - -   Copyright (C) 2017 -   Matthias P. Braendli, matthias.braendli@mpb.li - -    http://opendigitalradio.org - -DESCRIPTION: -   It is an output driver for the USRP family of devices, and uses the UHD -   library. This version is multi-threaded. A separate thread sends the data to -   the device. - -   Data between the modulator and the UHD thread are exchanged through a -   threadsafe queue. -*/ - -/* -   This file is part of ODR-DabMod. - -   ODR-DabMod is free software: you can redistribute it and/or modify -   it under the terms of the GNU General Public License as -   published by the Free Software Foundation, either version 3 of the -   License, or (at your option) any later version. - -   ODR-DabMod is distributed in the hope that it will be useful, -   but WITHOUT ANY WARRANTY; without even the implied warranty of -   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the -   GNU General Public License for more details. - -   You should have received a copy of the GNU General Public License -   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. - */ - -#pragma once - -#ifdef HAVE_CONFIG_H -#   include <config.h> -#endif - -#ifdef HAVE_OUTPUT_UHD - -#include <uhd/utils/thread_priority.hpp> -#include <uhd/utils/safe_main.hpp> -#include <uhd/usrp/multi_usrp.hpp> -#include <boost/thread.hpp> -#include <deque> -#include <chrono> -#include <memory> -#include <string> -#include <atomic> - -#include "Log.h" -#include "ModPlugin.h" -#include "EtiReader.h" -#include "TimestampDecoder.h" -#include "RemoteControl.h" -#include "ThreadsafeQueue.h" -#include "OutputUHDFeedback.h" - -#include <stdio.h> -#include <sys/types.h> - -//#define MDEBUG(fmt, args...) fprintf(LOG, fmt , ## args) -#define MDEBUG(fmt, args...) - -// If the timestamp is further in the future than -// 100 seconds, abort -#define TIMESTAMP_ABORT_FUTURE 100 - -// Add a delay to increase buffers when -// frames are too far in the future -#define TIMESTAMP_MARGIN_FUTURE 0.5 - -typedef std::complex<float> complexf; - -// Each frame contains one OFDM frame, and its -// associated timestamp -struct UHDWorkerFrameData { -    // Buffer holding frame data -    std::vector<uint8_t> buf; - -    // A full timestamp contains a TIST according to standard -    // and time information within MNSC with tx_second. -    struct frame_timestamp ts; -}; - -enum refclk_lock_loss_behaviour_t { CRASH, IGNORE }; - -/* This structure is used as initial configuration for OutputUHD. - * It must also contain all remote-controllable settings, otherwise - * they will get lost on a modulator restart. */ -struct OutputUHDConfig { -    std::string device; -    std::string usrpType; // e.g. b100, b200, usrp2 - -    // The USRP1 can accept two daughterboards -    std::string subDevice; // e.g. A:0 - -    long masterClockRate = 32768000; -    unsigned sampleRate = 2048000; -    double frequency = 0.0; -    double lo_offset = 0.0; -    double txgain = 0.0; -    double rxgain = 0.0; -    bool enableSync = false; - -    // When working with timestamps, mute the frames that -    // do not have a timestamp -    bool muteNoTimestamps = false; -    unsigned dabMode = 0; -    unsigned maxGPSHoldoverTime = 0; - -    /* allowed values : auto, int, sma, mimo */ -    std::string refclk_src; - -    /* allowed values : int, sma, mimo */ -    std::string pps_src; - -    /* allowed values : pos, neg */ -    std::string pps_polarity; - -    /* What to do when the reference clock PLL loses lock */ -    refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour; - -    // muting can only be changed using the remote control -    bool muting = false; - -    // static delay in microseconds -    int staticDelayUs = 0; - -    // TCP port on which to serve TX and RX samples for the -    // digital pre distortion learning tool -    uint16_t dpdFeedbackServerPort = 0; -}; - -class OutputUHD: public ModOutput, public RemoteControllable { -    public: -        OutputUHD(OutputUHDConfig& config); -        OutputUHD(const OutputUHD& other) = delete; -        OutputUHD operator=(const OutputUHD& other) = delete; -        ~OutputUHD(); - -        int process(Buffer* dataIn); - -        const char* name() { return "OutputUHD"; } - -        void setETISource(EtiSource *etiSource); - -        /*********** REMOTE CONTROL ***************/ - -        /* Base function to set parameters. */ -        virtual void set_parameter(const std::string& parameter, -                const std::string& value); - -        /* Getting a parameter always returns a string. */ -        virtual const std::string get_parameter( -                const std::string& parameter) const; - -    protected: -        EtiSource *myEtiSource = nullptr; -        OutputUHDConfig& myConf; -        uhd::usrp::multi_usrp::sptr myUsrp; -        std::shared_ptr<boost::barrier> mySyncBarrier; -        bool first_run = true; -        bool gps_fix_verified = false; -        std::shared_ptr<OutputUHDFeedback> uhdFeedback; - -    private: -        // Resize the internal delay buffer according to the dabMode and -        // the sample rate. -        void SetDelayBuffer(unsigned int dabMode); - -        // data -        // The remote-controllable static delay is in the OutputUHDConfig -        int myTFDurationMs; // TF duration in milliseconds -        std::vector<complexf> myDelayBuf; -        size_t lastLen = 0; - -        // GPS Fix check variables -        int num_checks_without_gps_fix = 1; -        struct timespec first_gps_fix_check; -        struct timespec last_gps_fix_check; -        struct timespec time_last_frame; -        boost::packaged_task<bool> gps_fix_pt; -        boost::unique_future<bool> gps_fix_future; -        boost::thread gps_fix_task; - -        // Wait time in seconds to get fix -        static const int initial_gps_fix_wait = 180; - -        // Interval for checking the GPS at runtime -        static constexpr double gps_fix_check_interval = 10.0; // seconds - -        // Asynchronous message statistics -        size_t num_underflows = 0; -        size_t num_late_packets = 0; -        size_t num_underflows_previous = 0; -        size_t num_late_packets_previous = 0; - -        size_t num_frames_modulated = 0; - -        uhd::tx_metadata_t md; -        bool     last_tx_time_initialised = false; -        uint32_t last_tx_second = 0; -        uint32_t last_tx_pps = 0; - -        // Used to print statistics once a second -        std::chrono::steady_clock::time_point last_print_time; - -        bool sync_and_ts_valid = false; - -        ThreadsafeQueue<UHDWorkerFrameData> m_frames; - -        // Returns true if we want to verify loss of refclk -        bool refclk_loss_needs_check(void) const; -        bool suppress_refclk_loss_check = false; - -        // Returns true if we want to check for the gps_timelock sensor -        bool gpsfix_needs_check(void) const; - -        // Return true if the gpsdo is from ettus, false if it is the ODR -        // LEA-M8F board is used -        bool gpsdo_is_ettus(void) const; - -        std::atomic<bool> running; -        boost::thread uhd_thread; -        boost::thread async_rx_thread; -        void stop_threads(void); - -        uhd::tx_streamer::sptr myTxStream; - -        // The worker thread decouples the modulator from UHD -        void workerthread(); -        void handle_frame(const struct UHDWorkerFrameData *frame); -        void tx_frame(const struct UHDWorkerFrameData *frame, bool ts_update); - -        // Poll asynchronous metadata from UHD -        void print_async_thread(void); - -        void check_gps(); - -        void set_usrp_time(); - -        void initial_gps_check(); -}; - -#endif // HAVE_OUTPUT_UHD - diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp index 93fe3c0..69f4aa1 100644 --- a/src/OutputZeroMQ.cpp +++ b/src/OutputZeroMQ.cpp @@ -32,8 +32,8 @@  #if defined(HAVE_ZEROMQ) -OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) -    : ModOutput(), +OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut) : +    ModOutput(),      m_type(type),      m_zmq_context(1),      m_zmq_sock(m_zmq_context, type), @@ -59,11 +59,6 @@ OutputZeroMQ::OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut)      m_zmq_sock.bind(m_endpoint.c_str());  } -OutputZeroMQ::~OutputZeroMQ() -{ -    PDEBUG("OutputZeroMQ::~OutputZeroMQ() @ %p\n", this); -} -  int OutputZeroMQ::process(Buffer* dataIn)  {      PDEBUG("OutputZeroMQ::process" diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h index 3107225..be2451b 100644 --- a/src/OutputZeroMQ.h +++ b/src/OutputZeroMQ.h @@ -38,10 +38,9 @@  class OutputZeroMQ : public ModOutput  {      public: -        OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = NULL); -        virtual ~OutputZeroMQ(); -        virtual int process(Buffer* dataIn); -        const char* name() { return m_name.c_str(); } +        OutputZeroMQ(std::string endpoint, int type, Buffer* dataOut = nullptr); +        virtual int process(Buffer* dataIn) override; +        const char* name() override { return m_name.c_str(); }      protected:          int m_type;                   // zmq socket type diff --git a/src/Resampler.h b/src/Resampler.h index d9d9d89..ed94a8c 100644 --- a/src/Resampler.h +++ b/src/Resampler.h @@ -59,7 +59,6 @@ protected:      FFT_PLAN myFftPlan2;      size_t L;      size_t M; -    size_t K;      size_t myFftSizeIn;      size_t myFftSizeOut;      FFT_TYPE* myFftIn; diff --git a/src/Socket.h b/src/Socket.h index 39554ca..392e758 100644 --- a/src/Socket.h +++ b/src/Socket.h @@ -49,6 +49,14 @@ class TCPSocket {              if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) < 0) {                  throw std::runtime_error("Can't create TCP socket");              } + +#if defined(HAVE_SO_NOSIGPIPE) +            int val = 1; +            if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, +                        &val, sizeof(val)) < 0) { +                throw std::runtime_error("Can't set SO_NOSIGPIPE"); +            } +#endif          }          ~TCPSocket() { @@ -89,11 +97,12 @@ class TCPSocket {              addr.sin_addr.s_addr = htonl(INADDR_ANY);              const int reuse = 1; -            if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0) { +            if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, +                        &reuse, sizeof(reuse)) < 0) {                  throw std::runtime_error("Can't reuse address for TCP socket");              } -            if (bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { +            if (::bind(m_sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) {                  close();                  throw std::runtime_error("Can't bind TCP socket");              } @@ -138,8 +147,16 @@ class TCPSocket {          {              uint8_t *buf = (uint8_t*)buffer;              while (buflen > 0) { -                // Set MSG_NOSIGNAL to avoid that this thread gets a SIGPIPE -                ssize_t sent = send(m_sock, buf, buflen, MSG_NOSIGNAL); +                /* On Linux, the MSG_NOSIGNAL flag ensures that the process +                 * would not receive a SIGPIPE and die. +                 * Other systems have SO_NOSIGPIPE set on the socket for the +                 * same effect. */ +#if defined(HAVE_MSG_NOSIGNAL) +                const int flags = MSG_NOSIGNAL; +#else +                const int flags = 0; +#endif +                ssize_t sent = ::send(m_sock, buf, buflen, flags);                  if (sent < 0) {                      return -1;                  } diff --git a/src/TII.cpp b/src/TII.cpp index 89cd6d0..3c5823b 100644 --- a/src/TII.cpp +++ b/src/TII.cpp @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -187,7 +187,7 @@ int TII::process(Buffer* dataIn, Buffer* dataOut)      memset(dataOut->getData(), 0,  dataOut->getLength());      if (m_conf.enable and m_insert) { -        boost::mutex::scoped_lock lock(m_enabled_carriers_mutex); +        std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);          complexf* in = reinterpret_cast<complexf*>(dataIn->getData());          complexf* out = reinterpret_cast<complexf*>(dataOut->getData()); @@ -231,7 +231,7 @@ void TII::enable_carrier(int k) {  void TII::prepare_pattern() {      int comb = m_conf.comb; // Convert from unsigned to signed -    boost::mutex::scoped_lock lock(m_enabled_carriers_mutex); +    std::lock_guard<std::mutex> lock(m_enabled_carriers_mutex);      // Clear previous pattern      for (size_t i = 0; i < m_enabled_carriers.size(); i++) { @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011 Her Majesty     the Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -35,8 +35,8 @@  #include "ModPlugin.h"  #include "RemoteControl.h" -#include <boost/thread.hpp> -#include <sys/types.h> +#include <cstddef> +#include <thread>  #include <complex>  #include <vector>  #include <string> @@ -118,7 +118,7 @@ class TII : public ModCodec, public RemoteControllable          // m_enabled_carriers is read by modulator thread, and written          // to by RC thread. -        mutable boost::mutex m_enabled_carriers_mutex; +        mutable std::mutex m_enabled_carriers_mutex;          // m_enabled_carriers is true only for the first carrier in the          // active pair diff --git a/src/TimestampDecoder.cpp b/src/TimestampDecoder.cpp index 26deb60..b942c37 100644 --- a/src/TimestampDecoder.cpp +++ b/src/TimestampDecoder.cpp @@ -24,93 +24,61 @@     along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.   */ -#include <queue>  #include <iostream>  #include <fstream>  #include <string> -#include <sys/types.h>  #include "PcDebug.h"  #include "TimestampDecoder.h" -#include "Eti.h"  #include "Log.h" +#include "Eti.h"  //#define MDEBUG(fmt, args...) fprintf (LOG, "*****" fmt , ## args)  #define MDEBUG(fmt, args...) PDEBUG(fmt, ## args) +TimestampDecoder::TimestampDecoder(double& offset_s) : +        RemoteControllable("tist"), +        timestamp_offset(offset_s) +{ +    // Properly initialise temp_time +    memset(&temp_time, 0, sizeof(temp_time)); +    const time_t timep = 0; +    gmtime_r(&timep, &temp_time); + +    RC_ADD_PARAMETER(offset, "TIST offset [s]"); +    RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]"); -void TimestampDecoder::calculateTimestamp(frame_timestamp& ts) +    etiLog.level(info) << "Setting up timestamp decoder with " << +        timestamp_offset << " offset"; +} + +std::shared_ptr<frame_timestamp> TimestampDecoder::getTimestamp()  { -    std::shared_ptr<frame_timestamp> ts_queued = -        std::make_shared<frame_timestamp>(); +    auto ts = std::make_shared<frame_timestamp>(); -    /* Push new timestamp into queue */ -    ts_queued->timestamp_valid = full_timestamp_received; -    ts_queued->timestamp_sec = time_secs; -    ts_queued->timestamp_pps = time_pps; -    ts_queued->fct = latestFCT; +    ts->timestamp_valid = full_timestamp_received; +    ts->timestamp_sec = time_secs; +    ts->timestamp_pps = time_pps; +    ts->fct = latestFCT; +    ts->fp = latestFP; -    ts_queued->timestamp_refresh = offset_changed; +    ts->timestamp_refresh = offset_changed;      offset_changed = false;      MDEBUG("time_secs=%d, time_pps=%f\n", time_secs,              (double)time_pps / 16384000.0); -    *ts_queued += timestamp_offset; - -    queue_timestamps.push(ts_queued); - -    /* Here, the queue size is one more than the pipeline delay, because -     * we've just added a new element in the queue. -     * -     * Therefore, use <= and not < for comparison -     */ -    if (queue_timestamps.size() <= m_tist_delay_stages) { -        //fprintf(stderr, "* %zu %u ", queue_timestamps.size(), m_tist_delay_stages); -        /* Return invalid timestamp until the queue is full */ -        ts.timestamp_valid = false; -        ts.timestamp_sec = 0; -        ts.timestamp_pps = 0; -        ts.timestamp_refresh = false; -        ts.fct = -1; -    } -    else { -        //fprintf(stderr, ". %zu ", queue_timestamps.size()); -        /* Return timestamp from queue */ -        ts_queued = queue_timestamps.front(); -        queue_timestamps.pop(); -        /*fprintf(stderr, "ts_queued v:%d, sec:%d, pps:%f, ref:%d\n", -                ts_queued->timestamp_valid, -                ts_queued->timestamp_sec, -                ts_queued->timestamp_pps_offset, -                ts_queued->timestamp_refresh);*/ -        ts = *ts_queued; -        /*fprintf(stderr, "ts v:%d, sec:%d, pps:%f, ref:%d\n\n", -                ts.timestamp_valid, -                ts.timestamp_sec, -                ts.timestamp_pps_offset, -                ts.timestamp_refresh);*/ -    } - -    MDEBUG("Timestamp queue size %zu, delay_calc %u\n", -            queue_timestamps.size(), -            m_tist_delay_stages); - -    if (queue_timestamps.size() > m_tist_delay_stages) { -        etiLog.level(error) << "Error: Timestamp queue is too large : size " << -            queue_timestamps.size() << "! This should not happen !"; -    } +    *ts += timestamp_offset; -    //ts.print("calc2 "); +    return ts;  } -void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc) +void TimestampDecoder::pushMNSCData(uint8_t framephase, uint16_t mnsc)  {      struct eti_MNSC_TIME_0 *mnsc0;      struct eti_MNSC_TIME_1 *mnsc1;      struct eti_MNSC_TIME_2 *mnsc2;      struct eti_MNSC_TIME_3 *mnsc3; -    switch (framephase) -    { +    switch (framephase) {          case 0:              mnsc0 = (struct eti_MNSC_TIME_0*)&mnsc;              enableDecode = (mnsc0->type == 0) && @@ -126,10 +94,10 @@ void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)              temp_time.tm_sec = mnsc1->second_tens * 10 + mnsc1->second_unit;              temp_time.tm_min = mnsc1->minute_tens * 10 + mnsc1->minute_unit; -            if (!mnsc1->sync_to_frame) -            { +            if (!mnsc1->sync_to_frame) {                  enableDecode = false; -                PDEBUG("TimestampDecoder: MNSC time info is not synchronised to frame\n"); +                PDEBUG("TimestampDecoder: " +                        "MNSC time info is not synchronised to frame\n");              }              break; @@ -145,9 +113,7 @@ void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)              temp_time.tm_mon = (mnsc3->month_tens * 10 + mnsc3->month_unit) - 1;              temp_time.tm_year = (mnsc3->year_tens * 10 + mnsc3->year_unit) + 100; -            if (enableDecode) -            { -                full_timestamp_received = true; +            if (enableDecode) {                  updateTimestampSeconds(mktime(&temp_time));              }              break; @@ -160,15 +126,14 @@ void TimestampDecoder::pushMNSCData(int framephase, uint16_t mnsc)  void TimestampDecoder::updateTimestampSeconds(uint32_t secs)  { -    if (inhibit_second_update > 0) -    { +    if (inhibit_second_update > 0) {          MDEBUG("TimestampDecoder::updateTimestampSeconds(%d) inhibit\n", secs);          inhibit_second_update--;      } -    else -    { +    else {          MDEBUG("TimestampDecoder::updateTimestampSeconds(%d) apply\n", secs);          time_secs = secs; +        full_timestamp_received = true;      }  } @@ -176,8 +141,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps)  {      MDEBUG("TimestampDecoder::updateTimestampPPS(%f)\n", (double)pps / 16384000.0); -    if (time_pps > pps) // Second boundary crossed -    { +    if (time_pps > pps) { // Second boundary crossed          MDEBUG("TimestampDecoder::updateTimestampPPS crossed second\n");          // The second for the next eight frames will not @@ -190,7 +154,7 @@ void TimestampDecoder::updateTimestampPPS(uint32_t pps)  }  void TimestampDecoder::updateTimestampEti( -        int framephase, +        uint8_t framephase,          uint16_t mnsc,          uint32_t pps, // In units of 1/16384000 s          int32_t fct) @@ -198,16 +162,19 @@ void TimestampDecoder::updateTimestampEti(      updateTimestampPPS(pps);      pushMNSCData(framephase, mnsc);      latestFCT = fct; +    latestFP = framephase;  }  void TimestampDecoder::updateTimestampEdi(          uint32_t seconds_utc,          uint32_t pps, // In units of 1/16384000 s -        int32_t fct) +        int32_t fct, +        uint8_t framephase)  {      time_secs = seconds_utc;      time_pps  = pps;      latestFCT = fct; +    latestFP = framephase;      full_timestamp_received = true;  } diff --git a/src/TimestampDecoder.h b/src/TimestampDecoder.h index db8f816..33d9992 100644 --- a/src/TimestampDecoder.h +++ b/src/TimestampDecoder.h @@ -2,7 +2,7 @@     Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the     Queen in Right of Canada (Communications Research Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -26,41 +26,24 @@  #pragma once -#include <queue> +#include <cstdint>  #include <memory>  #include <string> -#include <time.h>  #include <math.h>  #include <stdio.h> -#include "Eti.h" -#include "Log.h"  #include "RemoteControl.h"  struct frame_timestamp  {      // Which frame count does this timestamp apply to      int32_t fct; +    uint8_t fp; // Frame Phase      uint32_t timestamp_sec;      uint32_t timestamp_pps; // In units of 1/16384000 s -    bool timestamp_valid; +    bool timestamp_valid = false;      bool timestamp_refresh; -    frame_timestamp() = default; -    frame_timestamp(const frame_timestamp& other) = default; -    frame_timestamp& operator=(const frame_timestamp &rhs) -    { -        if (this != &rhs) { -            this->timestamp_sec = rhs.timestamp_sec; -            this->timestamp_pps = rhs.timestamp_pps; -            this->timestamp_valid = rhs.timestamp_valid; -            this->timestamp_refresh = rhs.timestamp_refresh; -            this->fct = rhs.fct; -        } - -        return *this; -    } -      frame_timestamp& operator+=(const double& diff)      {          double offset_pps, offset_secs; @@ -69,16 +52,14 @@ struct frame_timestamp          this->timestamp_sec += lrintf(offset_secs);          this->timestamp_pps += lrintf(offset_pps * 16384000.0); -        while (this->timestamp_pps >= 16384000) -        { +        while (this->timestamp_pps >= 16384000) {              this->timestamp_pps -= 16384000;              this->timestamp_sec += 1; -        }; +        }          return *this;      } -    const frame_timestamp operator+(const double diff) -    { +    const frame_timestamp operator+(const double diff) {          frame_timestamp ts = *this;          ts += diff;          return ts; @@ -88,8 +69,19 @@ struct frame_timestamp          return timestamp_pps / 16384000.0;      } -    void print(const char* t) -    { +    double get_real_secs() const { +        double t = timestamp_sec; +        t += pps_offset(); +        return t; +    } + +    long long int get_ns() const { +        long long int ns = timestamp_sec * 1000000000ull; +        ns += llrint((double)timestamp_pps / 0.016384); +        return ns; +    } + +    void print(const char* t) const {          fprintf(stderr,                  "%s <frame_timestamp(%s, %d, %.9f, %d)>\n",                  t, this->timestamp_valid ? "valid" : "invalid", @@ -103,49 +95,16 @@ struct frame_timestamp  class TimestampDecoder : public RemoteControllable  {      public: -        TimestampDecoder( -                /* The modulator adds this offset to the TIST to define time of -                 * frame transmission -                 */ -                double& offset_s, - -                /* Specifies by how many stages the timestamp must be delayed. -                 * (e.g. The FIRFilter is pipelined, therefore we must increase -                 * tist_delay_stages by one if the filter is used -                 */ -                unsigned tist_delay_stages) : -            RemoteControllable("tist"), -            timestamp_offset(offset_s) -        { -            m_tist_delay_stages = tist_delay_stages; -            inhibit_second_update = 0; -            time_pps = 0.0; -            time_secs = 0; -            latestFCT = 0; -            enableDecode = false; -            full_timestamp_received = false; - -            // Properly initialise temp_time -            memset(&temp_time, 0, sizeof(temp_time)); -            const time_t timep = 0; -            gmtime_r(&timep, &temp_time); - -            offset_changed = false; - -            RC_ADD_PARAMETER(offset, "TIST offset [s]"); -            RC_ADD_PARAMETER(timestamp, "FCT and timestamp [s]"); - -            etiLog.level(info) << "Setting up timestamp decoder with " << -                timestamp_offset << " offset"; - -        }; - -        /* Calculate the timestamp for the current frame. */ -        void calculateTimestamp(frame_timestamp& ts); +        /* offset_s: The modulator adds this offset to the TIST to define time of +         * frame transmission +         */ +        TimestampDecoder(double& offset_s); + +        std::shared_ptr<frame_timestamp> getTimestamp(void);          /* Update timestamp data from ETI */          void updateTimestampEti( -                int framephase, +                uint8_t framephase,                  uint16_t mnsc,                  uint32_t pps, // In units of 1/16384000 s                  int32_t fct); @@ -154,7 +113,8 @@ class TimestampDecoder : public RemoteControllable          void updateTimestampEdi(                  uint32_t seconds_utc,                  uint32_t pps, // In units of 1/16384000 s -                int32_t fct); +                int32_t fct, +                uint8_t framephase);          /*********** REMOTE CONTROL ***************/ @@ -171,7 +131,7 @@ class TimestampDecoder : public RemoteControllable      protected:          /* Push a new MNSC field into the decoder */ -        void pushMNSCData(int framephase, uint16_t mnsc); +        void pushMNSCData(uint8_t framephase, uint16_t mnsc);          /* Each frame contains the TIST field with the PPS offset.           * For each frame, this function must be called to update @@ -191,28 +151,20 @@ class TimestampDecoder : public RemoteControllable          void updateTimestampSeconds(uint32_t secs);          struct tm temp_time; -        uint32_t time_secs; -        int32_t latestFCT; -        uint32_t time_pps; +        uint32_t time_secs = 0; +        int32_t latestFCT = 0; +        uint32_t latestFP = 0; +        uint32_t time_pps = 0;          double& timestamp_offset; -        unsigned m_tist_delay_stages; -        int inhibit_second_update; -        bool offset_changed; +        int inhibit_second_update = 0; +        bool offset_changed = false;          /* When the type or identifier don't match, the decoder must           * be disabled           */ -        bool enableDecode; +        bool enableDecode = false;          /* Disable timstamps until full time has been received */ -        bool full_timestamp_received; - -        /* when pipelining, we must shift the calculated timestamps -         * through this queue. Otherwise, it would not be possible to -         * synchronise two modulators if only one uses (for instance) the -         * FIRFilter (1 stage pipeline) -         */ -        std::queue<std::shared_ptr<frame_timestamp> > queue_timestamps; - +        bool full_timestamp_received = false;  }; diff --git a/src/Utils.cpp b/src/Utils.cpp index f423dc1..b4816d3 100644 --- a/src/Utils.cpp +++ b/src/Utils.cpp @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -214,3 +214,17 @@ double parseChannel(const std::string& chan)      }      return freq;  } + +std::chrono::milliseconds transmission_frame_duration(unsigned int dabmode) +{ +    using namespace std::chrono; +    switch (dabmode) { +        case 1: return milliseconds(96); +        case 2: return milliseconds(24); +        case 3: return milliseconds(24); +        case 4: return milliseconds(48); +        default: +            throw std::runtime_error("invalid DAB mode"); +    } +} + diff --git a/src/Utils.h b/src/Utils.h index 6a36baf..9e88488 100644 --- a/src/Utils.h +++ b/src/Utils.h @@ -3,7 +3,7 @@     Her Majesty the Queen in Right of Canada (Communications Research     Center Canada) -   Copyright (C) 2017 +   Copyright (C) 2018     Matthias P. Braendli, matthias.braendli@mpb.li      http://opendigitalradio.org @@ -36,6 +36,7 @@  #include <stdio.h>  #include <time.h>  #include <string> +#include <chrono>  void printUsage(const char* progName); @@ -43,22 +44,6 @@ void printVersion(void);  void printStartupInfo(void); -inline long timespecdiff_us(struct timespec& oldTime, struct timespec& time) -{ -    long tv_sec; -    long tv_nsec; -    if (time.tv_nsec < oldTime.tv_nsec) { -        tv_sec = time.tv_sec - 1 - oldTime.tv_sec; -        tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec; -    } -    else { -        tv_sec = time.tv_sec - oldTime.tv_sec; -        tv_nsec = time.tv_nsec - oldTime.tv_nsec; -    } - -    return tv_sec * 1000 + tv_nsec / 1000; -} -  // Set SCHED_RR with priority prio (0=lowest)  int set_realtime_prio(int prio); @@ -68,3 +53,6 @@ void set_thread_name(const char *name);  // Convert a channel like 10A to a frequency  double parseChannel(const std::string& chan); +// dabMode is either 1, 2, 3, 4, corresponding to TM I, TM II, TM III and TM IV. +// throws a runtime_error if dabMode is not one of these values. +std::chrono::milliseconds transmission_frame_duration(unsigned int dabmode); diff --git a/src/OutputUHDFeedback.cpp b/src/output/Feedback.cpp index 68783f2..f0bbd98 100644 --- a/src/OutputUHDFeedback.cpp +++ b/src/output/Feedback.cpp @@ -34,41 +34,41 @@ DESCRIPTION:  #   include <config.h>  #endif -#ifdef HAVE_OUTPUT_UHD -  #include <vector>  #include <complex>  #include <cstring> -#include <uhd/types/stream_cmd.hpp>  #include <sys/socket.h>  #include <errno.h>  #include <poll.h>  #include <boost/date_time/posix_time/posix_time.hpp> -#include "OutputUHDFeedback.h" +#include "output/Feedback.h"  #include "Utils.h"  #include "Socket.h"  using namespace std; -typedef std::complex<float> complexf; -OutputUHDFeedback::OutputUHDFeedback( -        uhd::usrp::multi_usrp::sptr usrp, +namespace Output { + +DPDFeedbackServer::DPDFeedbackServer( +        std::shared_ptr<SDRDevice> device,          uint16_t port, -        uint32_t sampleRate) +        uint32_t sampleRate) : +    m_port(port), +    m_sampleRate(sampleRate), +    m_device(device)  { -    m_port = port; -    m_sampleRate = sampleRate; -    m_usrp = usrp; -      if (m_port) {          m_running.store(true); -        rx_burst_thread = boost::thread(&OutputUHDFeedback::ReceiveBurstThread, this); -        burst_tcp_thread = boost::thread(&OutputUHDFeedback::ServeFeedbackThread, this); +        rx_burst_thread = boost::thread( +                &DPDFeedbackServer::ReceiveBurstThread, this); + +        burst_tcp_thread = boost::thread( +                &DPDFeedbackServer::ServeFeedbackThread, this);      }  } -OutputUHDFeedback::~OutputUHDFeedback() +DPDFeedbackServer::~DPDFeedbackServer()  {      m_running.store(false); @@ -83,12 +83,12 @@ OutputUHDFeedback::~OutputUHDFeedback()      }  } -void OutputUHDFeedback::set_tx_frame( +void DPDFeedbackServer::set_tx_frame(          const std::vector<uint8_t> &buf,          const struct frame_timestamp &buf_ts)  {      if (not m_running) { -        throw runtime_error("OutputUHDFeedback not running"); +        throw runtime_error("DPDFeedbackServer not running");      }      boost::mutex::scoped_lock lock(burstRequest.mutex); @@ -131,13 +131,10 @@ void OutputUHDFeedback::set_tx_frame(      }  } -void OutputUHDFeedback::ReceiveBurstThread() +void DPDFeedbackServer::ReceiveBurstThread()  {      try { -        set_thread_name("uhdreceiveburst"); - -        uhd::stream_args_t stream_args("fc32"); //complex floats -        auto rxStream = m_usrp->get_rx_stream(stream_args); +        set_thread_name("dpdreceiveburst");          while (m_running) {              boost::mutex::scoped_lock lock(burstRequest.mutex); @@ -148,43 +145,40 @@ void OutputUHDFeedback::ReceiveBurstThread()              if (not m_running) break; -            uhd::stream_cmd_t cmd( -                    uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); -            cmd.num_samps = burstRequest.num_samples; -            cmd.stream_now = false; +            const size_t num_samps = burstRequest.num_samples; -            double pps = burstRequest.rx_pps / 16384000.0; -            cmd.time_spec = uhd::time_spec_t(burstRequest.rx_second, pps); +            frame_timestamp ts; +            ts.timestamp_sec = burstRequest.rx_second; +            ts.timestamp_pps = burstRequest.rx_pps; +            ts.timestamp_valid = true;              // We need to free the mutex while we recv(), because otherwise we block the              // TX thread              lock.unlock(); -            const double usrp_time = m_usrp->get_time_now().get_real_secs(); -            const double cmd_time = cmd.time_spec.get_real_secs(); - -            rxStream->issue_stream_cmd(cmd); +            const double device_time = m_device->get_real_secs(); +            const double cmd_time = ts.get_real_secs(); -            uhd::rx_metadata_t md; - -            std::vector<uint8_t> buf(cmd.num_samps * sizeof(complexf)); +            std::vector<uint8_t> buf(num_samps * sizeof(complexf));              const double timeout = 60; -            size_t samples_read = rxStream->recv(&buf[0], cmd.num_samps, md, timeout); +            size_t samples_read = m_device->receive_frame( +                    reinterpret_cast<complexf*>(buf.data()), +                    num_samps, ts, timeout);              lock.lock();              burstRequest.rx_samples = std::move(buf);              burstRequest.rx_samples.resize(samples_read * sizeof(complexf));              // The recv might have happened at another time than requested -            burstRequest.rx_second = md.time_spec.get_full_secs(); -            burstRequest.rx_pps = md.time_spec.get_frac_secs() * 16384000.0; +            burstRequest.rx_second = ts.timestamp_sec; +            burstRequest.rx_pps = ts.timestamp_pps;              etiLog.level(debug) << "DPD: acquired " << samples_read <<                  " RX feedback samples " <<                  "at time " << burstRequest.tx_second << " + " <<                  std::fixed << burstRequest.tx_pps / 16384000.0 << -                " Delta=" << cmd_time - usrp_time; +                " Delta=" << cmd_time - device_time;              burstRequest.state = BurstRequestState::Acquired; @@ -205,7 +199,7 @@ void OutputUHDFeedback::ReceiveBurstThread()      m_running.store(false);  } -void OutputUHDFeedback::ServeFeedback() +void DPDFeedbackServer::ServeFeedback()  {      TCPSocket m_server_sock;      m_server_sock.listen(m_port); @@ -335,9 +329,9 @@ void OutputUHDFeedback::ServeFeedback()      }  } -void OutputUHDFeedback::ServeFeedbackThread() +void DPDFeedbackServer::ServeFeedbackThread()  { -    set_thread_name("uhdservefeedback"); +    set_thread_name("dpdfeedbackserver");      while (m_running) {          try { @@ -359,4 +353,4 @@ void OutputUHDFeedback::ServeFeedbackThread()      m_running.store(false);  } -#endif +} // namespace Output diff --git a/src/OutputUHDFeedback.h b/src/output/Feedback.h index 80d287f..2cad508 100644 --- a/src/OutputUHDFeedback.h +++ b/src/output/Feedback.h @@ -36,11 +36,6 @@ DESCRIPTION:  #   include <config.h>  #endif -#ifdef HAVE_OUTPUT_UHD - -#include <uhd/utils/thread_priority.hpp> -#include <uhd/utils/safe_main.hpp> -#include <uhd/usrp/multi_usrp.hpp>  #include <boost/thread.hpp>  #include <memory>  #include <string> @@ -48,6 +43,9 @@ DESCRIPTION:  #include "Log.h"  #include "TimestampDecoder.h" +#include "output/SDRDevice.h" + +namespace Output {  enum class BurstRequestState {      None, // To pending request @@ -56,7 +54,7 @@ enum class BurstRequestState {      Acquired, // Both TX and RX frames are ready  }; -struct UHDReceiveBurstRequest { +struct FeedbackBurstRequest {      // All fields in this struct are protected      mutable boost::mutex mutex;      boost::condition_variable mutex_notification; @@ -83,21 +81,21 @@ struct UHDReceiveBurstRequest {  };  // Serve TX samples and RX feedback samples over a TCP connection -class OutputUHDFeedback { +class DPDFeedbackServer {      public: -        OutputUHDFeedback( -                uhd::usrp::multi_usrp::sptr usrp, -                uint16_t port, +        DPDFeedbackServer( +                std::shared_ptr<SDRDevice> device, +                uint16_t port, // Set to 0 to disable the Feedbackserver                  uint32_t sampleRate); -        OutputUHDFeedback(const OutputUHDFeedback& other) = delete; -        OutputUHDFeedback& operator=(const OutputUHDFeedback& other) = delete; -        ~OutputUHDFeedback(); +        DPDFeedbackServer(const DPDFeedbackServer& other) = delete; +        DPDFeedbackServer& operator=(const DPDFeedbackServer& other) = delete; +        ~DPDFeedbackServer();          void set_tx_frame(const std::vector<uint8_t> &buf,                  const struct frame_timestamp& ts);      private: -        // Thread that reacts to burstRequests and receives from the USRP +        // Thread that reacts to burstRequests and receives from the SDR device          void ReceiveBurstThread(void);          // Thread that listens for requests over TCP to get TX and RX feedback @@ -107,13 +105,12 @@ class OutputUHDFeedback {          boost::thread rx_burst_thread;          boost::thread burst_tcp_thread; -        UHDReceiveBurstRequest burstRequest; +        FeedbackBurstRequest burstRequest;          std::atomic_bool m_running;          uint16_t m_port = 0;          uint32_t m_sampleRate = 0; -        uhd::usrp::multi_usrp::sptr m_usrp; +        std::shared_ptr<SDRDevice> m_device;  }; - -#endif // HAVE_OUTPUT_UHD +} // namespace Output diff --git a/src/output/SDR.cpp b/src/output/SDR.cpp new file mode 100644 index 0000000..34341bd --- /dev/null +++ b/src/output/SDR.cpp @@ -0,0 +1,425 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + */ +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "output/SDR.h" + +#include "PcDebug.h" +#include "Log.h" +#include "RemoteControl.h" +#include "Utils.h" + +#include <cmath> +#include <iostream> +#include <assert.h> +#include <stdexcept> +#include <stdio.h> +#include <time.h> +#include <errno.h> +#include <unistd.h> +#include <pthread.h> + +using namespace std; + +namespace Output { + +// Maximum number of frames that can wait in frames +static constexpr size_t FRAMES_MAX_SIZE = 8; + +// If the timestamp is further in the future than +// 100 seconds, abort +static constexpr double TIMESTAMP_ABORT_FUTURE = 100; + +// Add a delay to increase buffers when +// frames are too far in the future +static constexpr double TIMESTAMP_MARGIN_FUTURE = 0.5; + +SDR::SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device) : +    ModOutput(), ModMetadata(), RemoteControllable("sdr"), +    m_config(config), +    m_device(device) +{ +    // muting is remote-controllable +    m_config.muting = false; + +    m_device_thread = std::thread(&SDR::process_thread_entry, this); + +    m_dpd_feedback_server = make_shared<DPDFeedbackServer>( +            m_device, +            m_config.dpdFeedbackServerPort, +            m_config.sampleRate); +} + +SDR::~SDR() +{ +    m_running.store(false); + +    FrameData end_marker; +    end_marker.buf.clear(); +    m_queue.push(end_marker); + +    if (m_device_thread.joinable()) { +        m_device_thread.join(); +    } +} + +int SDR::process(Buffer *dataIn) +{ +    if (not m_running) { +        throw std::runtime_error("SDR thread failed"); +    } + +    const uint8_t* pDataIn = (uint8_t*)dataIn->getData(); +    m_frame.resize(dataIn->getLength()); +    std::copy(pDataIn, pDataIn + dataIn->getLength(), +            m_frame.begin()); + +    // We will effectively transmit the frame once we got the metadata. + +    return dataIn->getLength(); +} + +meta_vec_t SDR::process_metadata(const meta_vec_t& metadataIn) +{ +    if (m_device and m_running) { +        FrameData frame; +        frame.buf = std::move(m_frame); + +        if (metadataIn.empty()) { +            etiLog.level(info) << +                "SDR output: dropping one frame with invalid FCT"; +        } +        else { +            /* In transmission modes where several ETI frames are needed to +             * build one transmission frame (like in TM 1), we will have +             * several entries in metadataIn. Take the first one, which +             * comes from the earliest ETI frame. +             * This behaviour is different to earlier versions of ODR-DabMod, +             * which took the timestamp from the latest ETI frame. +             */ +            frame.ts = *(metadataIn[0].ts); + +            // TODO check device running + +            try { +                if (m_dpd_feedback_server) { +                    m_dpd_feedback_server->set_tx_frame(frame.buf, frame.ts); +                } +            } +            catch (const runtime_error& e) { +                etiLog.level(warn) << +                    "SDR output: Feedback server failed, restarting..."; + +                m_dpd_feedback_server = std::make_shared<DPDFeedbackServer>( +                        m_device, +                        m_config.dpdFeedbackServerPort, +                        m_config.sampleRate); +            } + +            size_t num_frames = m_queue.push_wait_if_full(frame, +                    FRAMES_MAX_SIZE); +            etiLog.log(trace, "SDR,push %zu", num_frames); +        } +    } +    else { +        // Ignore frame +    } +    return {}; +} + + +void SDR::process_thread_entry() +{ +    // Set thread priority to realtime +    if (int ret = set_realtime_prio(1)) { +        etiLog.level(error) << "Could not set priority for SDR device thread:" << ret; +    } + +    set_thread_name("sdrdevice"); + +    last_tx_time_initialised = false; + +    size_t last_num_underflows = 0; +    size_t pop_prebuffering = FRAMES_MAX_SIZE; + +    m_running.store(true); + +    try { +        while (m_running.load()) { +            struct FrameData frame; +            etiLog.log(trace, "SDR,wait"); +            m_queue.wait_and_pop(frame, pop_prebuffering); +            etiLog.log(trace, "SDR,pop"); + +            if (m_running.load() == false or frame.buf.empty()) { +                break; +            } + +            if (m_device) { +                handle_frame(frame); + +                const auto rs = m_device->get_run_statistics(); + +                /* Ensure we fill frames after every underrun and +                 * at startup to reduce underrun likelihood. */ +                if (last_num_underflows < rs.num_underruns) { +                    pop_prebuffering = FRAMES_MAX_SIZE; +                } +                else { +                    pop_prebuffering = 1; +                } + +                last_num_underflows = rs.num_underruns; +            } +        } +    } +    catch (const runtime_error& e) { +        etiLog.level(error) << "SDR output thread caught runtime error: " << e.what(); +    } + +    m_running.store(false); +} + +const char* SDR::name() +{ +    if (m_device) { +        m_name = "OutputSDR("; +        m_name += m_device->device_name(); +        m_name += ")"; +    } +    else { +        m_name = "OutputSDR(<no device>)"; +    } +    return m_name.c_str(); +} + +void SDR::sleep_through_frame() +{ +    using namespace std::chrono; + +    const auto now = steady_clock::now(); + +    if (not t_last_frame_initialised) { +        t_last_frame = now; +        t_last_frame_initialised = true; +    } + +    const auto delta = now - t_last_frame; +    const auto wait_time = transmission_frame_duration(m_config.dabMode); + +    if (wait_time > delta) { +        this_thread::sleep_for(wait_time - delta); +    } + +    t_last_frame += wait_time; +} + +void SDR::handle_frame(struct FrameData& frame) +{ +    // Assumes m_device is valid + +    constexpr double tx_timeout = 20.0; + +    if (not m_device->is_clk_source_ok()) { +        sleep_through_frame(); +        return; +    } + +    double device_time = m_device->get_real_secs(); +    const auto& time_spec = frame.ts; + +    if (m_config.enableSync and m_config.muteNoTimestamps and +            not time_spec.timestamp_valid) { +        sleep_through_frame(); +        etiLog.log(info, +                "OutputSDR: Muting sample %d : no timestamp\n", +                frame.ts.fct); +        return; +    } + +    if (m_config.enableSync and time_spec.timestamp_valid) { +        // Tx time from MNSC and TIST +        const uint32_t tx_second = frame.ts.timestamp_sec; +        const uint32_t tx_pps    = frame.ts.timestamp_pps; + +        if (not frame.ts.timestamp_valid) { +            /* We have not received a full timestamp through +             * MNSC. We sleep through the frame. +             */ +            etiLog.level(info) << +                "OutputSDR: Throwing sample " << frame.ts.fct << +                " away: incomplete timestamp " << tx_second << +                " / " << tx_pps; +            return; +        } + +        if (last_tx_time_initialised) { +            const size_t sizeIn = frame.buf.size() / sizeof(complexf); +            uint64_t increment = (uint64_t)sizeIn * 16384000ul / +                                 (uint64_t)m_config.sampleRate; +                                  // samps  * ticks/s  / (samps/s) +                                  // (samps * ticks * s) / (s * samps) +                                  // ticks + +            uint32_t expected_sec = last_tx_second + increment / 16384000ul; +            uint32_t expected_pps = last_tx_pps + increment % 16384000ul; + +            while (expected_pps >= 16384000) { +                expected_sec++; +                expected_pps -= 16384000; +            } + +            if (expected_sec != tx_second or expected_pps != tx_pps) { +                etiLog.level(warn) << "OutputSDR: timestamp irregularity!" << +                    std::fixed << +                    " Expected " << +                    expected_sec << "+" << (double)expected_pps/16384000.0 << +                    "(" << expected_pps << ")" << +                    " Got " << +                    tx_second << "+" << (double)tx_pps/16384000.0 << +                    "(" << tx_pps << ")"; + +                frame.ts.timestamp_refresh = true; +            } +        } + +        last_tx_second = tx_second; +        last_tx_pps    = tx_pps; +        last_tx_time_initialised = true; + +        const double pps_offset = tx_pps / 16384000.0; + +        etiLog.log(trace, "SDR,tist %f", time_spec.get_real_secs()); + +        if (time_spec.get_real_secs() + tx_timeout < device_time) { +            etiLog.level(warn) << +                "OutputSDR: Timestamp in the past! offset: " << +                std::fixed << +                time_spec.get_real_secs() - device_time << +                "  (" << device_time << ")" +                " frame " << frame.ts.fct << +                ", tx_second " << tx_second << +                ", pps " << pps_offset; +            return; +        } + +        if (time_spec.get_real_secs() > device_time + TIMESTAMP_ABORT_FUTURE) { +            etiLog.level(error) << +                "OutputSDR: Timestamp way too far in the future! offset: " << +                std::fixed << +                time_spec.get_real_secs() - device_time; +            throw std::runtime_error("Timestamp error. Aborted."); +        } +    } + +    if (m_config.muting) { +        etiLog.log(info, +                "OutputSDR: Muting sample %d requested\n", +                frame.ts.fct); +        return; +    } + +    m_device->transmit_frame(frame); +} + +// ======================================= +// Remote Control +// ======================================= +void SDR::set_parameter(const string& parameter, const string& value) +{ +    stringstream ss(value); +    ss.exceptions ( stringstream::failbit | stringstream::badbit ); + +    if (parameter == "txgain") { +        ss >> m_config.txgain; +        m_device->set_txgain(m_config.txgain); +    } +    else if (parameter == "rxgain") { +        ss >> m_config.rxgain; +        m_device->set_rxgain(m_config.rxgain); +    } +    else if (parameter == "freq") { +        ss >> m_config.frequency; +        m_device->tune(m_config.lo_offset, m_config.frequency); +        m_config.frequency = m_device->get_tx_freq(); +    } +    else if (parameter == "muting") { +        ss >> m_config.muting; +    } +    else if (parameter == "underruns" or +             parameter == "latepackets" or +             parameter == "frames") { +        throw ParameterError("Parameter " + parameter + " is read-only."); +    } +    else { +        stringstream ss_err; +        ss_err << "Parameter '" << parameter +            << "' is not exported by controllable " << get_rc_name(); +        throw ParameterError(ss_err.str()); +    } +} + +const string SDR::get_parameter(const string& parameter) const +{ +    stringstream ss; +    if (parameter == "txgain") { +        ss << m_config.txgain; +    } +    else if (parameter == "rxgain") { +        ss << m_config.rxgain; +    } +    else if (parameter == "freq") { +        ss << m_config.frequency; +    } +    else if (parameter == "muting") { +        ss << m_config.muting; +    } +    else if (parameter == "underruns" or +            parameter == "latepackets" or +            parameter == "frames" ) { +        if (not m_device) { +            throw ParameterError("OutputSDR has no device"); +        } +        const auto stat = m_device->get_run_statistics(); + +        if (parameter == "underruns") { +            ss << stat.num_underruns; +        } +        else if (parameter == "latepackets") { +            ss << stat.num_late_packets; +        } +        else if (parameter == "frames") { +            ss << stat.num_frames_modulated; +        } +    } +    else { +        ss << "Parameter '" << parameter << +            "' is not exported by controllable " << get_rc_name(); +        throw ParameterError(ss.str()); +    } +    return ss.str(); +} + +} // namespace Output diff --git a/src/output/SDR.h b/src/output/SDR.h new file mode 100644 index 0000000..a55f7c0 --- /dev/null +++ b/src/output/SDR.h @@ -0,0 +1,95 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   Common interface for all SDR outputs +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include <config.h> +#endif + +#include <chrono> +#include "ModPlugin.h" +#include "EtiReader.h" +#include "output/SDRDevice.h" +#include "output/Feedback.h" + +namespace Output { + +using complexf = std::complex<float>; + +class SDR : public ModOutput, public ModMetadata, public RemoteControllable { +    public: +        SDR(SDRDeviceConfig& config, std::shared_ptr<SDRDevice> device); +        SDR(const SDR& other) = delete; +        SDR operator=(const SDR& other) = delete; +        ~SDR(); + +        virtual int process(Buffer *dataIn) override; +        virtual meta_vec_t process_metadata(const meta_vec_t& metadataIn) override; + +        virtual const char* name() override; + +        /*********** REMOTE CONTROL ***************/ + +        /* Base function to set parameters. */ +        virtual void set_parameter(const std::string& parameter, +                const std::string& value) override; + +        /* Getting a parameter always returns a string. */ +        virtual const std::string get_parameter( +                const std::string& parameter) const override; + +    private: +        void process_thread_entry(void); +        void handle_frame(struct FrameData &frame); +        void sleep_through_frame(void); + +        SDRDeviceConfig& m_config; + +        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); +        std::thread m_device_thread; +        std::vector<uint8_t> m_frame; +        ThreadsafeQueue<FrameData> m_queue; + +        std::shared_ptr<SDRDevice> m_device; +        std::string m_name; + +        std::shared_ptr<DPDFeedbackServer> m_dpd_feedback_server; + +        bool     last_tx_time_initialised = false; +        uint32_t last_tx_second = 0; +        uint32_t last_tx_pps = 0; + +        bool     t_last_frame_initialised = false; +        std::chrono::steady_clock::time_point t_last_frame; +}; + +} + diff --git a/src/output/SDRDevice.h b/src/output/SDRDevice.h new file mode 100644 index 0000000..bd1a518 --- /dev/null +++ b/src/output/SDRDevice.h @@ -0,0 +1,137 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2017 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   Common interface for all SDR outputs +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include <config.h> +#endif + +#include <cstdint> +#include <string> +#include <vector> +#include <complex> + +#include "TimestampDecoder.h" + +namespace Output { + +enum refclk_lock_loss_behaviour_t { CRASH, IGNORE }; + +using complexf = std::complex<float>; + +/* This structure is used as initial configuration for all SDR devices. + * It must also contain all remote-controllable settings, otherwise + * they will get lost on a modulator restart. */ +struct SDRDeviceConfig { +    std::string device; +    std::string subDevice; // For UHD +    std::string tx_antenna; +    std::string rx_antenna; + +    long masterClockRate = 32768000; +    unsigned sampleRate = 2048000; +    double frequency = 0.0; +    double lo_offset = 0.0; +    double txgain = 0.0; +    double rxgain = 0.0; +    bool enableSync = false; + +    // When working with timestamps, mute the frames that +    // do not have a timestamp +    bool muteNoTimestamps = false; +    unsigned dabMode = 0; +    unsigned maxGPSHoldoverTime = 0; + +    /* allowed values for UHD : auto, int, sma, mimo */ +    std::string refclk_src; + +    /* allowed values for UHD : int, sma, mimo */ +    std::string pps_src; + +    /* allowed values for UHD : pos, neg */ +    std::string pps_polarity; + +    /* What to do when the reference clock PLL loses lock */ +    refclk_lock_loss_behaviour_t refclk_lock_loss_behaviour; + +    // muting can only be changed using the remote control +    bool muting = false; + +    // TCP port on which to serve TX and RX samples for the +    // digital pre distortion learning tool +    uint16_t dpdFeedbackServerPort = 0; +}; + +// Each frame contains one OFDM frame, and its +// associated timestamp +struct FrameData { +    // Buffer holding frame data +    std::vector<uint8_t> buf; + +    // A full timestamp contains a TIST according to standard +    // and time information within MNSC with tx_second. +    struct frame_timestamp ts; +}; + + +// All SDR Devices must implement the SDRDevice interface +class SDRDevice { +    public: +        struct RunStatistics { +            size_t num_underruns; +            size_t num_late_packets; +            size_t num_overruns; +            size_t num_frames_modulated; +        }; + +        virtual void tune(double lo_offset, double frequency) = 0; +        virtual double get_tx_freq(void) const = 0; +        virtual void set_txgain(double txgain) = 0; +        virtual double get_txgain(void) const = 0; +        virtual void transmit_frame(const struct FrameData& frame) = 0; +        virtual RunStatistics get_run_statistics(void) const = 0; +        virtual double get_real_secs(void) const = 0; +        virtual void set_rxgain(double rxgain) = 0; +        virtual double get_rxgain(void) const = 0; +        virtual size_t receive_frame( +                complexf *buf, +                size_t num_samples, +                struct frame_timestamp& ts, +                double timeout_secs) = 0; + + +        // Return true if GPS and reference clock inputs are ok +        virtual bool is_clk_source_ok(void) const = 0; + +        virtual const char* device_name(void) const = 0; +}; + +} // namespace Output diff --git a/src/output/Soapy.cpp b/src/output/Soapy.cpp new file mode 100644 index 0000000..8ee420e --- /dev/null +++ b/src/output/Soapy.cpp @@ -0,0 +1,274 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   It is an output driver using the SoapySDR library that can output to +   many devices. +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "output/Soapy.h" + +#ifdef HAVE_SOAPYSDR + +#include <SoapySDR/Errors.hpp> +#include <chrono> +#include <cstdio> + +#include "Log.h" +#include "Utils.h" + +using namespace std; + +namespace Output { + +static constexpr size_t FRAMES_MAX_SIZE = 2; + +Soapy::Soapy(SDRDeviceConfig& config) : +    SDRDevice(), +    m_conf(config) +{ +    etiLog.level(info) << +        "Soapy:Creating the device with: " << +        m_conf.device; + +    try { +        m_device = SoapySDR::Device::make(m_conf.device); +        stringstream ss; +        ss << "SoapySDR driver=" << m_device->getDriverKey(); +        ss << " hardware=" << m_device->getHardwareKey(); +        for (const auto &it : m_device->getHardwareInfo()) { +            ss << "  " << it.first << "=" << it.second; +        } +    } +    catch (const std::exception &ex) { +        etiLog.level(error) << "Error making SoapySDR device: " << +            ex.what(); +        throw std::runtime_error("Cannot create SoapySDR output"); +    } + +    m_device->setMasterClockRate(m_conf.masterClockRate); +    etiLog.level(info) << "SoapySDR master clock rate set to " << +        std::fixed << std::setprecision(4) << +        m_device->getMasterClockRate()/1000.0 << " kHz"; + +    m_device->setSampleRate(SOAPY_SDR_TX, 0, m_conf.sampleRate); +    etiLog.level(info) << "SoapySDR:Actual TX rate: " << +        std::fixed << std::setprecision(4) << +        m_device->getSampleRate(SOAPY_SDR_TX, 0) / 1000.0 << +        " ksps."; + +    tune(m_conf.lo_offset, m_conf.frequency); +    m_conf.frequency = m_device->getFrequency(SOAPY_SDR_TX, 0); +    etiLog.level(info) << "SoapySDR:Actual frequency: " << +        std::fixed << std::setprecision(3) << +        m_conf.frequency / 1000.0 << " kHz."; + +    m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain); +    etiLog.level(info) << "SoapySDR:Actual tx gain: " << +        std::fixed << std::setprecision(2) << +        m_device->getGain(SOAPY_SDR_TX, 0); + +    if (not m_conf.tx_antenna.empty()) { +        m_device->setAntenna(SOAPY_SDR_TX, 0, m_conf.tx_antenna); +    } +    etiLog.level(info) << "SoapySDR:Actual tx antenna: " << +        m_device->getAntenna(SOAPY_SDR_TX, 0); + +    const std::vector<size_t> channels({0}); +    m_tx_stream = m_device->setupStream(SOAPY_SDR_TX, "CF32", channels); +    m_device->activateStream(m_tx_stream); + +    m_rx_stream = m_device->setupStream(SOAPY_SDR_RX, "CF32", channels); +} + +Soapy::~Soapy() +{ +    if (m_device != nullptr) { +        if (m_tx_stream != nullptr) { +            m_device->closeStream(m_tx_stream); +        } +        SoapySDR::Device::unmake(m_device); +    } +} + +void Soapy::tune(double lo_offset, double frequency) +{ +    if (not m_device) throw runtime_error("Soapy device not set up"); + +    SoapySDR::Kwargs offset_arg; +    offset_arg["OFFSET"] = to_string(lo_offset); +    m_device->setFrequency(SOAPY_SDR_TX, 0, m_conf.frequency, offset_arg); +} + +double Soapy::get_tx_freq(void) const +{ +    if (not m_device) throw runtime_error("Soapy device not set up"); + +    // TODO lo offset +    return m_device->getFrequency(SOAPY_SDR_TX, 0); +} + +void Soapy::set_txgain(double txgain) +{ +    m_conf.txgain = txgain; +    if (not m_device) throw runtime_error("Soapy device not set up"); +    m_device->setGain(SOAPY_SDR_TX, 0, m_conf.txgain); +} + +double Soapy::get_txgain(void) const +{ +    if (not m_device) throw runtime_error("Soapy device not set up"); +    return m_device->getGain(SOAPY_SDR_TX, 0); +} + +SDRDevice::RunStatistics Soapy::get_run_statistics(void) const +{ +    RunStatistics rs; +    rs.num_underruns = underflows; +    rs.num_overruns = overflows; +    rs.num_late_packets = late_packets; +    rs.num_frames_modulated = num_frames_modulated; +    return rs; +} + + +double Soapy::get_real_secs(void) const +{ +    if (m_device) { +        long long time_ns = m_device->getHardwareTime(); +        return time_ns / 1e9; +    } +    else { +        return 0.0; +    } +} + +void Soapy::set_rxgain(double rxgain) +{ +    m_device->setGain(SOAPY_SDR_RX, 0, m_conf.rxgain); +    m_conf.rxgain = m_device->getGain(SOAPY_SDR_RX, 0); +} + +double Soapy::get_rxgain(void) const +{ +    return m_device->getGain(SOAPY_SDR_RX, 0); +} + +size_t Soapy::receive_frame( +        complexf *buf, +        size_t num_samples, +        struct frame_timestamp& ts, +        double timeout_secs) +{ +    int flags = 0; +    long long timeNs = ts.get_ns(); +    const size_t numElems = num_samples; + +    void *buffs[1]; +    buffs[0] = buf; + +    m_device->activateStream(m_rx_stream, flags, timeNs, numElems); + +    auto ret = m_device->readStream(m_tx_stream, buffs, num_samples, flags, timeNs); + +    m_device->deactivateStream(m_rx_stream); + +    // TODO update effective receive ts + +    if (ret < 0) { +        throw runtime_error("Soapy readStream error: " + to_string(ret)); +    } + +    return ret; +} + + +bool Soapy::is_clk_source_ok() const +{ +    // TODO +    return true; +} + +const char* Soapy::device_name(void) const +{ +    return "Soapy"; +} + +void Soapy::transmit_frame(const struct FrameData& frame) +{ +    if (not m_device) throw runtime_error("Soapy device not set up"); + +    // TODO timestamps + +    // The frame buffer contains bytes representing FC32 samples +    const complexf *buf = reinterpret_cast<const complexf*>(frame.buf.data()); +    const size_t numSamples = frame.buf.size() / sizeof(complexf); +    if ((frame.buf.size() % sizeof(complexf)) != 0) { +        throw std::runtime_error("Soapy: invalid buffer size"); +    } + +    // Stream MTU is in samples, not bytes. +    const size_t mtu = m_device->getStreamMTU(m_tx_stream); + +    size_t num_acc_samps = 0; +    while (num_acc_samps < numSamples) { +        const void *buffs[1]; +        buffs[0] = buf + num_acc_samps; + +        const size_t samps_to_send = std::min(numSamples - num_acc_samps, mtu); + +        int flags = 0; + +        auto ret = m_device->writeStream(m_tx_stream, buffs, samps_to_send, flags); + +        if (ret == SOAPY_SDR_TIMEOUT) { +            continue; +        } +        else if (ret == SOAPY_SDR_OVERFLOW) { +            overflows++; +            continue; +        } +        else if (ret == SOAPY_SDR_UNDERFLOW) { +            underflows++; +            continue; +        } + +        if (ret < 0) { +            etiLog.level(error) << "Unexpected stream error " << +                SoapySDR::errToStr(ret); +            throw std::runtime_error("Fault in Soapy"); +        } + +        num_acc_samps += ret; +    } +    num_frames_modulated++; +} + +} // namespace Output + +#endif // HAVE_SOAPYSDR + + diff --git a/src/output/Soapy.h b/src/output/Soapy.h new file mode 100644 index 0000000..67b280d --- /dev/null +++ b/src/output/Soapy.h @@ -0,0 +1,98 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   It is an output driver using the SoapySDR library that can output to +   many devices. +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include <config.h> +#endif + +#ifdef HAVE_SOAPYSDR +#include <SoapySDR/Version.hpp> +#include <SoapySDR/Modules.hpp> +#include <SoapySDR/Registry.hpp> +#include <SoapySDR/Device.hpp> + +#include <string> +#include <memory> + +#include "output/SDR.h" +#include "ModPlugin.h" +#include "EtiReader.h" +#include "RemoteControl.h" +#include "ThreadsafeQueue.h" + +namespace Output { + +class Soapy : public Output::SDRDevice +{ +    public: +        Soapy(SDRDeviceConfig& config); +        Soapy(const Soapy& other) = delete; +        Soapy& operator=(const Soapy& other) = delete; +        ~Soapy(); + +        virtual void tune(double lo_offset, double frequency) override; +        virtual double get_tx_freq(void) const override; +        virtual void set_txgain(double txgain) override; +        virtual double get_txgain(void) const override; +        virtual void transmit_frame(const struct FrameData& frame) override; +        virtual RunStatistics get_run_statistics(void) const override; +        virtual double get_real_secs(void) const override; + +        virtual void set_rxgain(double rxgain) override; +        virtual double get_rxgain(void) const override; +        virtual size_t receive_frame( +                complexf *buf, +                size_t num_samples, +                struct frame_timestamp& ts, +                double timeout_secs) override; + +        // Return true if GPS and reference clock inputs are ok +        virtual bool is_clk_source_ok(void) const override; +        virtual const char* device_name(void) const override; + +    private: +        SDRDeviceConfig& m_conf; +        SoapySDR::Device *m_device = nullptr; +        SoapySDR::Stream *m_tx_stream = nullptr; +        SoapySDR::Stream *m_rx_stream = nullptr; + +        size_t underflows = 0; +        size_t overflows = 0; +        size_t late_packets = 0; +        size_t num_frames_modulated = 0; +}; + +} // namespace Output + +#endif //HAVE_SOAPYSDR + diff --git a/src/output/UHD.cpp b/src/output/UHD.cpp new file mode 100644 index 0000000..2c571fd --- /dev/null +++ b/src/output/UHD.cpp @@ -0,0 +1,500 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + */ +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "output/UHD.h" + +#ifdef HAVE_OUTPUT_UHD + +//#define MDEBUG(fmt, args...) fprintf(LOG, fmt , ## args) +#define MDEBUG(fmt, args...) + +#include "PcDebug.h" +#include "Log.h" +#include "RemoteControl.h" +#include "Utils.h" + +#include <boost/thread/future.hpp> + +#include <uhd/utils/msg.hpp> + +#include <cmath> +#include <iostream> +#include <assert.h> +#include <stdexcept> +#include <stdio.h> +#include <time.h> +#include <errno.h> +#include <unistd.h> +#include <pthread.h> + +using namespace std; + +namespace Output { + +// Maximum number of frames that can wait in frames +static const size_t FRAMES_MAX_SIZE = 8; + +static std::string stringtrim(const std::string &s) +{ +    auto wsfront = std::find_if_not(s.begin(), s.end(), +            [](int c){ return std::isspace(c);} ); +    return std::string(wsfront, +            std::find_if_not(s.rbegin(), +                std::string::const_reverse_iterator(wsfront), +                [](int c){ return std::isspace(c);} ).base()); +} + +static void uhd_msg_handler(uhd::msg::type_t type, const std::string &msg) +{ +    if (type == uhd::msg::warning) { +        etiLog.level(warn) << "UHD Warning: " << msg; +    } +    else if (type == uhd::msg::error) { +        etiLog.level(error) << "UHD Error: " << msg; +    } +    else { +        // do not print very short U messages and such +        if (stringtrim(msg).size() != 1) { +            etiLog.level(debug) << "UHD Message: " << msg; +        } +    } +} + + + +UHD::UHD(SDRDeviceConfig& config) : +    SDRDevice(), +    m_conf(config), +    m_running(false) +{ +    std::stringstream device; +    device << m_conf.device; + +    if (m_conf.masterClockRate != 0) { +        if (device.str() != "") { +            device << ","; +        } +        device << "master_clock_rate=" << m_conf.masterClockRate; +    } + +    MDEBUG("OutputUHD::OutputUHD(device: %s) @ %p\n", +            device.str().c_str(), this); + +    uhd::msg::register_handler(uhd_msg_handler); + +    uhd::set_thread_priority_safe(); + +    etiLog.log(info, "OutputUHD:Creating the usrp device with: %s...", +            device.str().c_str()); + +    m_usrp = uhd::usrp::multi_usrp::make(device.str()); + +    etiLog.log(info, "OutputUHD:Using device: %s...", +            m_usrp->get_pp_string().c_str()); + +    if (m_conf.masterClockRate != 0.0) { +        double master_clk_rate = m_usrp->get_master_clock_rate(); +        etiLog.log(debug, "OutputUHD:Checking master clock rate: %f...", +                master_clk_rate); + +        if (fabs(master_clk_rate - m_conf.masterClockRate) > +                (m_conf.masterClockRate * 1e-6)) { +            throw std::runtime_error("Cannot set USRP master_clock_rate. Aborted."); +        } +    } + +    MDEBUG("OutputUHD:Setting REFCLK and PPS input...\n"); + +    if (m_conf.refclk_src == "gpsdo-ettus") { +        m_usrp->set_clock_source("gpsdo"); +    } +    else { +        m_usrp->set_clock_source(m_conf.refclk_src); +    } +    m_usrp->set_time_source(m_conf.pps_src); + +    m_device_time = std::make_shared<USRPTime>(m_usrp, m_conf); + +    if (m_conf.subDevice != "") { +        m_usrp->set_tx_subdev_spec(uhd::usrp::subdev_spec_t(m_conf.subDevice), +                uhd::usrp::multi_usrp::ALL_MBOARDS); +    } + +    etiLog.level(debug) << "UHD clock source is " << m_usrp->get_clock_source(0); + +    etiLog.level(debug) << "UHD time source is " << m_usrp->get_time_source(0); + +    m_usrp->set_tx_rate(m_conf.sampleRate); +    etiLog.log(debug, "OutputUHD:Set rate to %d. Actual TX Rate: %f sps...", +            m_conf.sampleRate, m_usrp->get_tx_rate()); + +    if (fabs(m_usrp->get_tx_rate() / m_conf.sampleRate) > +             m_conf.sampleRate * 1e-6) { +        throw std::runtime_error("Cannot set USRP sample rate. Aborted."); +    } + +    tune(m_conf.lo_offset, m_conf.frequency); + +    m_conf.frequency = m_usrp->get_tx_freq(); +    etiLog.level(debug) << std::fixed << std::setprecision(3) << +        "OutputUHD:Actual TX frequency: " << m_conf.frequency; + +    etiLog.level(debug) << std::fixed << std::setprecision(3) << +        "OutputUHD:Actual RX frequency: " << m_usrp->get_tx_freq(); + +    m_usrp->set_tx_gain(m_conf.txgain); +    m_conf.txgain = m_usrp->get_tx_gain(); +    etiLog.log(debug, "OutputUHD:Actual TX Gain: %f", m_conf.txgain); + +    etiLog.log(debug, "OutputUHD:Mute on missing timestamps: %s", +            m_conf.muteNoTimestamps ? "enabled" : "disabled"); + +    m_usrp->set_rx_rate(m_conf.sampleRate); +    etiLog.log(debug, "OutputUHD:Actual RX Rate: %f sps.", m_usrp->get_rx_rate()); + +    if (not m_conf.rx_antenna.empty()) { +        m_usrp->set_rx_antenna(m_conf.rx_antenna); +    } +    etiLog.log(debug, "OutputUHD:Actual RX Antenna: %s", +            m_usrp->get_rx_antenna().c_str()); + +    if (not m_conf.tx_antenna.empty()) { +        m_usrp->set_tx_antenna(m_conf.tx_antenna); +    } +    etiLog.log(debug, "OutputUHD:Actual TX Antenna: %s", +            m_usrp->get_tx_antenna().c_str()); + +    m_usrp->set_rx_gain(m_conf.rxgain); +    etiLog.log(debug, "OutputUHD:Actual RX Gain: %f", m_usrp->get_rx_gain()); + +    const uhd::stream_args_t stream_args("fc32"); //complex floats +    m_rx_stream = m_usrp->get_rx_stream(stream_args); +    m_tx_stream = m_usrp->get_tx_stream(stream_args); + +    m_running.store(true); +    m_async_rx_thread = boost::thread(&UHD::print_async_thread, this); + +    MDEBUG("OutputUHD:UHD ready.\n"); +} + +UHD::~UHD() +{ +    stop_threads(); +} + +void UHD::tune(double lo_offset, double frequency) +{ +    if (lo_offset != 0.0) { +        etiLog.level(info) << std::fixed << std::setprecision(3) << +            "OutputUHD:Setting freq to " << frequency << +            "  with LO offset " << lo_offset << "..."; + +        const auto tr = uhd::tune_request_t(frequency, lo_offset); +        uhd::tune_result_t result = m_usrp->set_tx_freq(tr); + +        etiLog.level(debug) << "OutputUHD: TX freq" << +            std::fixed << std::setprecision(0) << +            " Target RF: " << result.target_rf_freq << +            " Actual RF: " << result.actual_rf_freq << +            " Target DSP: " << result.target_dsp_freq << +            " Actual DSP: " << result.actual_dsp_freq; + +        uhd::tune_result_t result_rx = m_usrp->set_rx_freq(tr); + +        etiLog.level(debug) << "OutputUHD: RX freq" << +            std::fixed << std::setprecision(0) << +            " Target RF: " << result_rx.target_rf_freq << +            " Actual RF: " << result_rx.actual_rf_freq << +            " Target DSP: " << result_rx.target_dsp_freq << +            " Actual DSP: " << result_rx.actual_dsp_freq; +    } +    else { +        //set the centre frequency +        etiLog.level(info) << std::fixed << std::setprecision(3) << +            "OutputUHD:Setting freq to " << frequency << "..."; +        m_usrp->set_tx_freq(frequency); + +        m_usrp->set_rx_freq(frequency); +    } +} + +double UHD::get_tx_freq(void) const +{ +    return m_usrp->get_tx_freq(); +} + +void UHD::set_txgain(double txgain) +{ +    m_usrp->set_tx_gain(txgain); +    m_conf.txgain = m_usrp->get_tx_gain(); +} + +double UHD::get_txgain(void) const +{ +    return m_usrp->get_tx_gain(); +} + +void UHD::transmit_frame(const struct FrameData& frame) +{ +    const double tx_timeout = 20.0; +    const size_t sizeIn = frame.buf.size() / sizeof(complexf); +    const complexf* in_data = reinterpret_cast<const complexf*>(&frame.buf[0]); + +    uhd::tx_metadata_t md_tx; + +    bool tx_allowed = true; + +    // muting and mutenotimestamp is handled by SDR +    if (m_conf.enableSync and frame.ts.timestamp_valid) { +        uhd::time_spec_t timespec( +                frame.ts.timestamp_sec, frame.ts.pps_offset()); +        md_tx.time_spec = timespec; +        md_tx.has_time_spec = true; +    } +    else { +        md_tx.has_time_spec = false; +    } + +    size_t usrp_max_num_samps = m_tx_stream->get_max_num_samps(); +    size_t num_acc_samps = 0; //number of accumulated samples +    while (tx_allowed and m_running.load() and (num_acc_samps < sizeIn)) { +        size_t samps_to_send = std::min(sizeIn - num_acc_samps, usrp_max_num_samps); + +        const bool eob_because_muting = m_conf.muting; + +        // ensure the the last packet has EOB set if the timestamps has been +        // refreshed and need to be reconsidered. If muting was set, set the +        // EOB and quit the loop afterwards, to avoid an underrun. +        md_tx.end_of_burst = eob_because_muting or ( +                frame.ts.timestamp_valid and +                frame.ts.timestamp_refresh and +                samps_to_send <= usrp_max_num_samps ); + +        //send a single packet +        size_t num_tx_samps = m_tx_stream->send( +                &in_data[num_acc_samps], +                samps_to_send, md_tx, tx_timeout); +        etiLog.log(trace, "UHD,sent %zu of %zu", num_tx_samps, samps_to_send); + +        num_acc_samps += num_tx_samps; + +        md_tx.time_spec = md_tx.time_spec + +            uhd::time_spec_t(0, num_tx_samps/m_conf.sampleRate); + +        if (num_tx_samps == 0) { +            etiLog.log(warn, +                    "OutputUHD unable to write to device, skipping frame!"); +            break; +        } + +        if (eob_because_muting) { +            break; +        } +    } + +    num_frames_modulated++; +} + + +SDRDevice::RunStatistics UHD::get_run_statistics(void) const +{ +    RunStatistics rs; +    rs.num_underruns = num_underflows; +    rs.num_overruns = num_overflows; +    rs.num_late_packets = num_late_packets; +    rs.num_frames_modulated = num_frames_modulated; +    return rs; +} + +double UHD::get_real_secs(void) const +{ +    return m_usrp->get_time_now().get_real_secs(); +} + +void UHD::set_rxgain(double rxgain) +{ +    m_usrp->set_rx_gain(m_conf.rxgain); +    m_conf.rxgain = m_usrp->get_rx_gain(); +} + +double UHD::get_rxgain() const +{ +    return m_usrp->get_rx_gain(); +} + +size_t UHD::receive_frame( +        complexf *buf, +        size_t num_samples, +        struct frame_timestamp& ts, +        double timeout_secs) +{ +    uhd::stream_cmd_t cmd( +            uhd::stream_cmd_t::stream_mode_t::STREAM_MODE_NUM_SAMPS_AND_DONE); +    cmd.num_samps = num_samples; +    cmd.stream_now = false; +    cmd.time_spec = uhd::time_spec_t(ts.timestamp_sec, ts.pps_offset()); + +    m_rx_stream->issue_stream_cmd(cmd); + +    uhd::rx_metadata_t md_rx; + +    constexpr double timeout = 60; +    size_t samples_read = m_rx_stream->recv(buf, num_samples, md_rx, timeout); + +    // Update the ts with the effective receive TS +    ts.timestamp_sec = md_rx.time_spec.get_full_secs(); +    ts.timestamp_pps = md_rx.time_spec.get_frac_secs() * 16384000.0; +    return samples_read; +} + +// Return true if GPS and reference clock inputs are ok +bool UHD::is_clk_source_ok(void) const +{ +    bool ok = true; + +    if (refclk_loss_needs_check()) { +        try { +            if (not m_usrp->get_mboard_sensor("ref_locked", 0).to_bool()) { +                ok = false; + +                etiLog.level(alert) << +                    "OutputUHD: External reference clock lock lost !"; + +                if (m_conf.refclk_lock_loss_behaviour == CRASH) { +                    throw std::runtime_error( +                            "OutputUHD: External reference clock lock lost."); +                } +            } +        } +        catch (uhd::lookup_error &e) { +            suppress_refclk_loss_check = true; +            etiLog.log(warn, "OutputUHD: This USRP does not have mboard " +                    "sensor for ext clock loss. Check disabled."); +        } +    } + +    if (m_device_time) { +        ok |= m_device_time->verify_time(); +    } + +    return ok; +} + +const char* UHD::device_name(void) const +{ +    return "UHD"; +} + + +bool UHD::refclk_loss_needs_check() const +{ +    if (suppress_refclk_loss_check) { +        return false; +    } +    return m_conf.refclk_src != "internal"; +} + +void UHD::stop_threads() +{ +    m_running.store(false); +    if (m_async_rx_thread.joinable()) { +        m_async_rx_thread.join(); +    } +} + + + +void UHD::print_async_thread() +{ +    while (m_running.load()) { +        uhd::async_metadata_t async_md; +        if (m_usrp->get_device()->recv_async_msg(async_md, 1)) { +            const char* uhd_async_message = ""; +            bool failure = false; +            switch (async_md.event_code) { +                case uhd::async_metadata_t::EVENT_CODE_BURST_ACK: +                    break; +                case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW: +                    uhd_async_message = "Underflow"; +                    num_underflows++; +                    break; +                case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR: +                    uhd_async_message = "Packet loss between host and device."; +                    failure = true; +                    break; +                case uhd::async_metadata_t::EVENT_CODE_TIME_ERROR: +                    uhd_async_message = "Packet had time that was late."; +                    num_late_packets++; +                    break; +                case uhd::async_metadata_t::EVENT_CODE_UNDERFLOW_IN_PACKET: +                    uhd_async_message = "Underflow occurred inside a packet."; +                    failure = true; +                    break; +                case uhd::async_metadata_t::EVENT_CODE_SEQ_ERROR_IN_BURST: +                    uhd_async_message = "Packet loss within a burst."; +                    failure = true; +                    break; +                default: +                    uhd_async_message = "unknown event code"; +                    failure = true; +                    break; +            } + +            if (failure) { +                etiLog.level(alert) << +                    "Received Async UHD Message '" << +                    uhd_async_message << "' at time " << +                    async_md.time_spec.get_real_secs(); +            } +        } + +        auto time_now = std::chrono::steady_clock::now(); +        if (last_print_time + std::chrono::seconds(1) < time_now) { +            const double usrp_time = +                m_usrp->get_time_now().get_real_secs(); + +            if ( (num_underflows > num_underflows_previous) or +                 (num_late_packets > num_late_packets_previous)) { +                etiLog.log(info, +                        "OutputUHD status (usrp time: %f): " +                        "%d underruns and %d late packets since last status.\n", +                        usrp_time, +                        num_underflows - num_underflows_previous, +                        num_late_packets - num_late_packets_previous); +            } + +            num_underflows_previous = num_underflows; +            num_late_packets_previous = num_late_packets; + +            last_print_time = time_now; +        } +    } +} + +} // namespace Output + +#endif // HAVE_OUTPUT_UHD + diff --git a/src/output/UHD.h b/src/output/UHD.h new file mode 100644 index 0000000..b34455c --- /dev/null +++ b/src/output/UHD.h @@ -0,0 +1,127 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   It is an output driver for the USRP family of devices, and uses the UHD +   library. +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include <config.h> +#endif + +#ifdef HAVE_OUTPUT_UHD + +#include <uhd/utils/thread_priority.hpp> +#include <uhd/utils/safe_main.hpp> +#include <uhd/usrp/multi_usrp.hpp> +#include <chrono> +#include <memory> +#include <string> +#include <atomic> + +#include "Log.h" +#include "output/SDR.h" +#include "output/USRPTime.h" +#include "TimestampDecoder.h" +#include "RemoteControl.h" +#include "ThreadsafeQueue.h" + +#include <stdio.h> +#include <sys/types.h> + +// If the timestamp is further in the future than +// 100 seconds, abort +#define TIMESTAMP_ABORT_FUTURE 100 + +// Add a delay to increase buffers when +// frames are too far in the future +#define TIMESTAMP_MARGIN_FUTURE 0.5 + +namespace Output { + +class UHD : public Output::SDRDevice +{ +    public: +        UHD(SDRDeviceConfig& config); +        UHD(const UHD& other) = delete; +        UHD& operator=(const UHD& other) = delete; +        ~UHD(); + +        virtual void tune(double lo_offset, double frequency) override; +        virtual double get_tx_freq(void) const override; +        virtual void set_txgain(double txgain) override; +        virtual double get_txgain(void) const override; +        virtual void transmit_frame(const struct FrameData& frame) override; +        virtual RunStatistics get_run_statistics(void) const override; +        virtual double get_real_secs(void) const override; + +        virtual void set_rxgain(double rxgain) override; +        virtual double get_rxgain(void) const override; +        virtual size_t receive_frame( +                complexf *buf, +                size_t num_samples, +                struct frame_timestamp& ts, +                double timeout_secs) override; + +        // Return true if GPS and reference clock inputs are ok +        virtual bool is_clk_source_ok(void) const override; +        virtual const char* device_name(void) const override; + +    private: +        SDRDeviceConfig& m_conf; +        uhd::usrp::multi_usrp::sptr m_usrp; +        uhd::tx_streamer::sptr m_tx_stream; +        uhd::rx_streamer::sptr m_rx_stream; +        std::shared_ptr<USRPTime> m_device_time; + +        size_t num_underflows = 0; +        size_t num_overflows = 0; +        size_t num_late_packets = 0; +        size_t num_frames_modulated = 0; +        size_t num_underflows_previous = 0; +        size_t num_late_packets_previous = 0; + +        // Used to print statistics once a second +        std::chrono::steady_clock::time_point last_print_time; + +        // Returns true if we want to verify loss of refclk +        bool refclk_loss_needs_check(void) const; +        mutable bool suppress_refclk_loss_check = false; + +        // Poll asynchronous metadata from UHD +        std::atomic<bool> m_running; +        boost::thread m_async_rx_thread; +        void stop_threads(void); +        void print_async_thread(void); +}; + +} // namespace Output + +#endif // HAVE_OUTPUT_UHD + diff --git a/src/output/USRPTime.cpp b/src/output/USRPTime.cpp new file mode 100644 index 0000000..935d56b --- /dev/null +++ b/src/output/USRPTime.cpp @@ -0,0 +1,283 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2018 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   The part of the UHD output that takes care of the GPSDO. +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#include "output/USRPTime.h" + +#ifdef HAVE_OUTPUT_UHD + +//#define MDEBUG(fmt, args...) fprintf(LOG, fmt , ## args) +#define MDEBUG(fmt, args...) + +namespace Output { + +using namespace std; + + +// Check function for GPS TIMELOCK sensor from the ODR LEA-M8F board GPSDO +static bool check_gps_timelock(uhd::usrp::multi_usrp::sptr& usrp) +{ +    try { +        const string sensor_value = +            usrp->get_mboard_sensor("gps_timelock", 0).to_pp_string(); + +        if (sensor_value.find("TIME LOCKED") == string::npos) { +            etiLog.level(warn) << "OutputUHD: gps_timelock " << sensor_value; +            return false; +        } + +        return true; +    } +    catch (const uhd::lookup_error &e) { +        etiLog.level(warn) << "OutputUHD: no gps_timelock sensor"; +        return false; +    } +} + +// Check function for GPS LOCKED sensor from the Ettus GPSDO +static bool check_gps_locked(uhd::usrp::multi_usrp::sptr& usrp) +{ +    try { +        const uhd::sensor_value_t sensor_value( +                usrp->get_mboard_sensor("gps_locked", 0)); +        if (not sensor_value.to_bool()) { +            etiLog.level(warn) << "OutputUHD: gps_locked " << +                sensor_value.to_pp_string(); +            return false; +        } + +        return true; +    } +    catch (const uhd::lookup_error &e) { +        etiLog.level(warn) << "OutputUHD: no gps_locked sensor"; +        return false; +    } +} + + +USRPTime::USRPTime( +        uhd::usrp::multi_usrp::sptr usrp, +        SDRDeviceConfig& conf) : +    m_usrp(usrp), +    m_conf(conf), +    time_last_check(timepoint_t::clock::now()) +{ +    if (m_conf.pps_src == "none") { +        if (m_conf.enableSync) { +            etiLog.level(warn) << +                "OutputUHD: WARNING:" +                " you are using synchronous transmission without PPS input!"; +        } + +        set_usrp_time_from_localtime(); +    } +    else if (m_conf.pps_src == "pps" or m_conf.pps_src == "gpsdo") { +        set_usrp_time_from_pps(); +    } +    else { +        throw std::runtime_error("USRPTime not implemented yet: " + +                m_conf.pps_src); +    } +} + +bool USRPTime::verify_time() +{ +    if (not gpsfix_needs_check()) { +        return true; +    } + +    /* During bootup, we say the gpsdo is not ok, and we poll the GPSDO until +     * we reach lock. Then we sync time. If we do not reach lock in time, we +     * crash. +     * +     * Once we are synced and we have lock, everything ok. If we lose lock for +     * a number of seconds, we switch to the lost_fix state. +     * +     * In the lost fix state, we return false to get the TX muted, and we monitor. +     * If the fix comes back, we unmute. If we reach the timeout, we crash. +     */ + +    check_gps(); + +    const auto duration_without_fix = +        gps_fix_check_interval * num_checks_without_gps_fix; + +    switch (gps_state) { +        case gps_state_e::bootup: +            if (duration_without_fix > initial_gps_fix_wait) { +                throw runtime_error("GPS did not fix in " + +                        to_string(initial_gps_fix_wait) + " seconds"); +            } + +            if (num_checks_without_gps_fix == 0) { +                if (m_conf.pps_src != "none") { +                    set_usrp_time_from_pps(); +                } +                gps_state = gps_state_e::monitor_fix; +                return true; +            } + +            return false; + +        case gps_state_e::monitor_fix: +            if (duration_without_fix > m_conf.maxGPSHoldoverTime) { +                throw runtime_error("Lost GPS Fix for " + +                        to_string(duration_without_fix) + " seconds"); +            } + +            return true; +    } + +    throw logic_error("End of USRPTime::verify_time() reached"); +} + +void USRPTime::check_gps() +{ +    timepoint_t time_now = timepoint_t::clock::now(); + +    // Divide interval by two because we alternate between +    // launch and check +    const auto checkinterval = chrono::seconds(lrint(gps_fix_check_interval/2.0)); + +    if (gpsfix_needs_check() and time_last_check + checkinterval < time_now) { +        time_last_check = time_now; + +        // Alternate between launching thread and checking the +        // result. +        if (gps_fix_task.joinable()) { +            if (gps_fix_future.has_value()) { + +                gps_fix_future.wait(); + +                gps_fix_task.join(); + +                if (not gps_fix_future.get()) { +                    if (num_checks_without_gps_fix == 0) { +                        etiLog.level(alert) << "OutputUHD: GPS Time Lock lost"; +                    } +                    num_checks_without_gps_fix++; +                } +                else { +                    if (num_checks_without_gps_fix) { +                        etiLog.level(info) << "OutputUHD: GPS Time Lock recovered"; +                    } +                    num_checks_without_gps_fix = 0; +                } +            } +        } +        else { +            // Checking the sensor here takes too much +            // time, it has to be done in a separate thread. +            if (gpsdo_is_ettus()) { +                gps_fix_pt = boost::packaged_task<bool>( +                        boost::bind(check_gps_locked, m_usrp) ); +            } +            else { +                gps_fix_pt = boost::packaged_task<bool>( +                        boost::bind(check_gps_timelock, m_usrp) ); +            } +            gps_fix_future = gps_fix_pt.get_future(); + +            gps_fix_task = boost::thread(boost::move(gps_fix_pt)); +        } +    } +} + +bool USRPTime::gpsfix_needs_check() const +{ +    if (m_conf.refclk_src == "internal") { +        return false; +    } +    else if (m_conf.refclk_src == "gpsdo") { +        return (m_conf.maxGPSHoldoverTime != 0); +    } +    else if (m_conf.refclk_src == "gpsdo-ettus") { +        return (m_conf.maxGPSHoldoverTime != 0); +    } +    else { +        return false; +    } +} + +bool USRPTime::gpsdo_is_ettus() const +{ +    return (m_conf.refclk_src == "gpsdo-ettus"); +} + +/* Return a uhd:time_spec representing current system time + * with 1ms granularity.  */ +static uhd::time_spec_t uhd_timespec_now(void) +{ +    using namespace std::chrono; +    auto n = system_clock::now(); +    const long long ticks = duration_cast<milliseconds>(n.time_since_epoch()).count(); +    return uhd::time_spec_t::from_ticks(ticks, 1000); +} + +void USRPTime::set_usrp_time_from_localtime() +{ +    const auto t = uhd_timespec_now(); +    m_usrp->set_time_now(t); + +    etiLog.level(info) << "OutputUHD: Setting USRP time to " << +        std::fixed << t.get_real_secs(); +} + +void USRPTime::set_usrp_time_from_pps() +{ +    using namespace std::chrono; + +    /* handling time for synchronisation: wait until the next full +     * second, and set the USRP time at next PPS */ +    auto now = uhd_timespec_now(); +    const time_t secs_since_epoch = now.get_full_secs(); + +    while (secs_since_epoch + 1 > now.get_full_secs()) { +        this_thread::sleep_for(milliseconds(1)); +        now = uhd_timespec_now(); +    } +    /* We are now shortly after the second change. +     * Wait 200ms to ensure the PPS comes later. */ +    this_thread::sleep_for(milliseconds(200)); + +    const auto time_set = uhd::time_spec_t(secs_since_epoch + 2); +    etiLog.level(info) << "OutputUHD: Setting USRP time next pps to " << +        std::fixed << time_set.get_real_secs(); +    m_usrp->set_time_next_pps(time_set); + +    // The UHD doc says we need to give the USRP one second to update +    // all the internal registers. +    this_thread::sleep_for(seconds(1)); +    etiLog.level(info) << "OutputUHD: USRP time " << +        std::fixed << m_usrp->get_time_now().get_real_secs(); +} + +} // namespace Output + +#endif // HAVE_OUTPUT_UHD diff --git a/src/output/USRPTime.h b/src/output/USRPTime.h new file mode 100644 index 0000000..7527f21 --- /dev/null +++ b/src/output/USRPTime.h @@ -0,0 +1,116 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010 Her Majesty the +   Queen in Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2017 +   Matthias P. Braendli, matthias.braendli@mpb.li + +    http://opendigitalradio.org + +DESCRIPTION: +   The part of the UHD output that takes care of the GPSDO and setting device +   time. +*/ + +/* +   This file is part of ODR-DabMod. + +   ODR-DabMod is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   ODR-DabMod is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +#   include <config.h> +#endif + +#ifdef HAVE_OUTPUT_UHD + +#include <uhd/usrp/multi_usrp.hpp> +#include <chrono> +#include <memory> +#include <string> +#include <atomic> + +#include "Log.h" +#include "output/SDR.h" +#include "TimestampDecoder.h" +#include "RemoteControl.h" +#include "ThreadsafeQueue.h" + +#include <stdio.h> +#include <sys/types.h> + +namespace Output { + +class USRPTime { +    public: +        USRPTime( uhd::usrp::multi_usrp::sptr usrp, +                SDRDeviceConfig& conf); + +        // Verifies the GPSDO state, that the device time is ok. +        // Returns true if all ok. +        // Should be called more often than the gps_fix_check_interval +        bool verify_time(void); + +        // Wait time in seconds to get fix +        static const int initial_gps_fix_wait = 180; + +        // Interval for checking the GPS at runtime +        static constexpr double gps_fix_check_interval = 10.0; // seconds + +    private: +        enum class gps_state_e { +            /* At startup, the LEA-M8F GPSDO gets issued a hotstart request to +             * make sure we will not sync time on a PPS edge that is generated +             * while the GPSDO is in holdover. In the bootup state, we wait for +             * the first PPS after hotstart, and then sync time. +             */ +            bootup, + +            /* Once the system is up, we check lock every now and then. If the +             * fix is lost for too long, we crash. +             */ +            monitor_fix, +        }; + +        void check_gps(); + +        uhd::usrp::multi_usrp::sptr m_usrp; +        SDRDeviceConfig& m_conf; + +        gps_state_e gps_state = gps_state_e::bootup; +        int num_checks_without_gps_fix = 1; + +        using timepoint_t = std::chrono::time_point<std::chrono::steady_clock>; +        timepoint_t time_last_check; + +        boost::packaged_task<bool> gps_fix_pt; +        boost::unique_future<bool> gps_fix_future; +        boost::thread gps_fix_task; + +        // Returns true if we want to check for the gps_timelock sensor +        bool gpsfix_needs_check(void) const; + +        // Return true if the gpsdo is from ettus, false if it is the ODR +        // LEA-M8F board is used +        bool gpsdo_is_ettus(void) const; + +        void set_usrp_time_from_localtime(void); +        void set_usrp_time_from_pps(void); +}; + +} // namespace Output + +#endif // HAVE_OUTPUT_UHD | 
